Programowanie współbieżne

Wątki, zadania, synchronizacja, monitory, wartości atomowe, kolekcje


dr inż. Aleksander Smywiński-Pohl

apohllo@o2.pl

http://apohllo.pl/dydaktyka/programowanie-obiektowe

Część przykładów pochodzi ze strony: http://winterbe.com

Java + Threads

Program, proces, wątek

  • program - plik lub zestaw plików opisujących w jakis sposób należy przetwarzać dane
  • proces - uruchomiony program posiadający własną pamięć oraz licznik instrukcji
  • wątek - lekki proces w obrębie działającego programu, posiadający własny stos oraz licznik instrukcji

Thread

import static System.out;

List<Thread> threads = new LinkedList<>();

threads.add(new Thread(() -> out.println("Wątek " + 
    Thread.currentThread().getName())));
threads.add(new Thread(() -> out.println("Wątek " + 
    Thread.currentThread().getName())));
threads.add(new Thread(() -> out.println("Wątek " + 
    Thread.currentThread().getName())));

threads.forEach(Thread::start);

Runnable

List<Runnable> tasks = new LinkedList<>();

tasks.add(() -> out.println("Zadanie " + 
    Thread.currentThread().getName()));
tasks.add(() -> out.println("Zadanie " + 
    Thread.currentThread().getName()));
tasks.add(() -> out.println("Zadanie " + 
    Thread.currentThread().getName()));

tasks.forEach((task) -> new Thread(task).start());

Thread#sleep()

import java.util.concurrent.*;

Thread sleepingThread = new Thread(() -> {
    try{
        out.println("Przed snem");
        TimeUnit.SECONDS.sleep(1);
        out.println("Po śnie");
    } catch (InterruptedException ex) {
        out.println("Wątek został przerwany");
    }
});
sleepingThread.start();

ExecutorService

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> out.println("Wykonanie " + 
    Thread.currentThread().getName()));

Future

Callable<Integer> task = () -> {
    try {
        TimeUnit.SECONDS.sleep(1);
        return 123;
    } catch (InterruptedException e) {
        throw new IllegalStateException("wątek został przerwany", e);
    }
};
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);

System.out.println("obliczenie zakończone? " + future.isDone());

Integer result = future.get();

System.out.println("obliczenie zakończone? " + future.isDone());
System.out.print("wynik: " + result);

Rodzaje wykonawców (ExecutorService)

  • newCachedThreadPool - tworzy wątki w zależności od potrzeb i usuwa je jeśli nie są używane przez 60 sekund
  • newFixedThreadPool - cały czas przechowuje niezakończone wątki
  • newScheduledThreadPool - posiada możliwość odroczonego i periodycznego wykonania wątków
  • newSingleThreadExecutor - wykonanie jednowątkowe
  • newSingleThreadScheduledExecutor - jw. ale z możliwością odroczonego i periodycznego wykonania

ScheduledExecutor

ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

Runnable task = () -> out.println("Wykonanie " + 
    Thread.currentThread().getName());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);

TimeUnit.MILLISECONDS.sleep(300);

out.println("Czas pozostały do wykonania " + 
    future.getDelay(TimeUnit.MILLISECONDS));

Hazard (Race condition)

import java.util.stream.*;

class RaceCondition {
    private int counter = 0;

    public void increment() {
        this.counter += 1;
    }

    public int getCounter(){
        return this.counter;
    }
}
RaceCondition object = new RaceCondition();

IntStream.range(0, 10000).forEach(i -> object.increment());

out.println(object.getCounter());
ExecutorService executor = Executors.newFixedThreadPool(2);

object = new RaceCondition();

IntStream.range(0, 10000).forEach(i -> executor.submit(object::increment));

out.println(object.getCounter());

Sekcja krytyczna (synchronized)

class SynchronizedAccessors {
    private int counter = 0;

    public synchronized void increment() {
        this.counter += 1;
    }

    public synchronized int getCounter(){
        return this.counter;
    }
}
class Monitor {
    private int counter = 0;

    public void increment() {
        synchronized(this) {
            this.counter += 1;
        }
    }

    public int getCounter(){
        synchronized(this) {
            return this.counter;
        }
    }
}

wait i notify

class StringStack {
    private List<String> stack = new LinkedList<>();

    public void push(String value){
        synchronized(this) {
            stack.add(value);
            notify();
        }
    }

    public String pop(){
        synchronized(this) {
            while(stack.isEmpty()){
                try {
                    wait();
                } catch (InterruptedException ex) {
                    out.println("Wątek został przerwany");
                }
            }
            return stack.remove(stack.size() - 1);
        }
    }
}
StringStack stack = new StringStack();

Thread adder = new Thread(() -> {
    try {
        sleep(1000);
        stack.push("abc");
        sleep(1000);
        stack.push("def");
    } catch (InterruptedException ex) {
        out.println("Wątek dodający został przerwany");
    }
});

Thread reader = new Thread(() -> {
    out.println(stack.pop());
    out.println(stack.pop());
});


reader.start();
adder.start();

ReentrantLock

import java.util.concurrent.locks.*;

class ReentrantLockAccessors {
    private int counter = 0;
    private Lock lock = new ReentrantLock();

    public void increment() {
        lock.lock();
        try {
            this.counter += 1;
        } finally {
            lock.unlock();
        }
    }

    public int getCounter(){
        lock.lock();
        try {
            return this.counter;
        } finally {
            lock.unlock();
        }
    }
}

AtomicInteger

import java.util.stream.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000).forEach(i -> executor.submit(atomicInt::incrementAndGet));

System.out.println(atomicInt.get());
AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000).forEach(i -> {
    Runnable task = () -> atomicInt.updateAndGet(n -> n + 2);
    executor.submit(task);
});

System.out.println(atomicInt.get());

AtomictInt

  • addAndGet
  • compareAndSet
  • decrementAndGet
  • get
  • getAndAdd
  • getAndDecrement
  • getAndIncrement
  • getAndSet
  • ...

LongAdder

LongAdder adder = new LongAdder();
ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000).forEach(i -> executor.submit(adder::increment));

System.out.println(adder.sumThenReset());

LongAccumulator

import java.util.function.*;

LongBinaryOperator operation = (x, y) -> 2 * x + y;
LongAccumulator accumulator = new LongAccumulator(operation, 1);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 10).forEach(i -> executor.submit(
    () -> accumulator.accumulate(i)));

System.out.println(accumulator.getThenReset());

Przetwarzanie współbieżne a kolekcje

  • CopyOnWriteArrayList
  • ConcurrentHashMap
  • parallelStream

ConcurrentModificationException

List<Integer> list = new LinkedList<>();  // <-----------------------

ExecutorService executor = Executors.newFixedThreadPool(200);

IntStream.range(1,50).forEach((i) -> executor.submit(() -> { 
  out.println("" + i + " from " + Thread.currentThread().getName());
  list.add(i); 
}));

executor.submit(() -> {
  Iterator<Integer> iterator = list.iterator();
  while(iterator.hasNext()){
    try {
      out.println("" + iterator.next() + " reading from " + 
          Thread.currentThread().getName());
    } catch(Exception ex) {
      ex.printStackTrace();
    }
  }
});

IntStream.range(1,50).forEach((i) -> executor.submit(() -> { 
  out.println("" + i + " from " + Thread.currentThread().getName());
  list.add(i); 
}));

CopyOnWriteArrayList

List<Integer> list = new CopyOnWriteArrayList<>();  // <-----------------------

ExecutorService executor = Executors.newFixedThreadPool(200);

IntStream.range(1,50).forEach((i) -> executor.submit(() -> { 
  out.println("" + i + " from " + Thread.currentThread().getName());
  list.add(i); 
}));

executor.submit(() -> {
  Iterator<Integer> iterator = list.iterator();
  while(iterator.hasNext()){
    try {
      out.println("" + iterator.next() + " reading from " + 
          Thread.currentThread().getName());
    } catch(Exception ex) {
      ex.printStackTrace();
    }
  }
});

IntStream.range(1,50).forEach((i) -> executor.submit(() -> { 
  out.println("" + i + " from " + Thread.currentThread().getName());
  list.add(i); 
}));

CommonPoolParallelism

System.out.println(ForkJoinPool.getCommonPoolParallelism());
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

ConcurrentHashMap

ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.put("1", "jeden");
map.put("2", "dwa");
map.put("3", "trzy");
map.put("4", "cztery");
map.put("5", "pięć");
map.put("6", "sześć");
map.put("7", "siedem");
map.forEach(1, (key, value) -> out.printf("klucz: %s; wartość: %s; wątek: %s\n",
        key, value, Thread.currentThread().getName()));

search

String result = map.search(1, (key, value) -> {
    out.println(Thread.currentThread().getName());
    if ("5".equals(key)) {
        return value;
    }
    return null;
});
out.println("Wynik: " + result);

reduce

String result = map.reduce(1,
    (key, value) -> {
        out.println("Przekształcenie: " + Thread.currentThread().getName());
        return key + "=" + value;
    },
    (s1, s2) -> {
        out.println("Redukcja: " + Thread.currentThread().getName());
        return s1 + ", " + s2;
    });

out.println("Wynik: " + result);

stream

import java.util.stream.*;

class SerialRun {
  private static long MILLION = 1000 * 1000;
  public static void main(String[] args) {
    if(args.length != 1){
      System.out.println("Podaj liczbę uruchomień (w milionach)");
      return;
    }
    long top = MILLION * Integer.parseInt(args[0]);
    long start = System.nanoTime();

    double result = LongStream.range(0, top).filter(i -> i * i % 7 != 0).
        average().getAsDouble();

    long end = System.nanoTime();

    System.out.println(result);
    System.out.println((end - start) / 1000000000.0);
  }
}

parallel oraz parallelStream

import java.util.stream.*;

class ParallelRun {
  private static long MILLION = 1000 * 1000;
  public static void main(String[] args) {
    if(args.length != 1){
      System.out.println("Podaj liczbę uruchomień (w milionach)");
      return;
    }
    long top = MILLION * Integer.parseInt(args[0]);
    long start = System.nanoTime();

    double result = LongStream.range(0, top).parallel().filter(i -> i * i % 7 != 0).
        average().getAsDouble();

    long end = System.nanoTime();

    System.out.println(result);
    System.out.println((end - start) / 1000000000.0);
  }
}

Pytania?