Programowanie współbieżne

Wątki, zadania, synchronizacja, monitory, wartości atomowe, kolekcje


dr inż. Aleksander Smywiński-Pohl

apohllo@o2.pl

http://apohllo.pl/dydaktyka/programowanie-obiektowe

Część przykładów pochodzi ze strony: http://winterbe.com

Java + Threads

Program, proces, wątek

  • program - plik lub zestaw plików opisujących w jakis sposób należy przetwarzać dane
  • proces - uruchomiony program posiadający własną pamięć oraz licznik instrukcji
  • wątek - lekki proces w obrębie działającego programu, posiadający własny stos oraz licznik instrukcji

Thread

In [5]:
import static java.lang.System.out;
import java.util.List;
import java.util.LinkedList;

List<Thread> threads = new LinkedList<>();

threads.add(new Thread(() -> out.println("Wątek " + 
    Thread.currentThread().getName())));
threads.add(new Thread(() -> out.println("Wątek " + 
    Thread.currentThread().getName())));
threads.add(new Thread(() -> out.println("Wątek " + 
    Thread.currentThread().getName())));

threads.forEach(Thread::start);
Out[5]:
null

Runnable

In [6]:
import static java.lang.System.out;
import java.util.List;
import java.util.LinkedList;

List<Runnable> tasks = new LinkedList<>();

tasks.add(() -> out.println("Zadanie " + 
    Thread.currentThread().getName()));
tasks.add(() -> out.println("Zadanie " + 
    Thread.currentThread().getName()));
tasks.add(() -> out.println("Zadanie " + 
    Thread.currentThread().getName()));

tasks.forEach((task) -> new Thread(task).start());
Out[6]:
null

Thread#sleep()

In [9]:
import static java.lang.System.out;
import java.util.concurrent.*;

Thread sleepingThread = new Thread(() -> {
    try{
        out.println("Przed snem");
        TimeUnit.SECONDS.sleep(3);
        out.println("Koniec snu");
    } catch (InterruptedException ex) {
        out.println("Wątek został przerwany");
    }
});
sleepingThread.start();
sleepingThread.join();
Out[9]:
null

ExecutorService

In [19]:
import java.util.concurrent.*;
import static java.lang.System.out;

ExecutorService executor =  Executors.newSingleThreadExecutor();
executor.submit(() -> out.println("Wykonanie " + 
    Thread.currentThread().getName()));
executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);
Out[19]:
null

Future

In [7]:
import java.util.concurrent.*;
Callable<Integer> task = () -> {
    try {
        TimeUnit.SECONDS.sleep(1);
        return 123;
    } catch (InterruptedException e) {
        throw new IllegalStateException("wątek został przerwany", e);
    }
};

ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);

System.out.println("obliczenie zakończone? " + future.isDone());

Integer result = future.get();

System.out.println("obliczenie zakończone? " + future.isDone());
System.out.print("wynik: " + result);
obliczenie zakończone? false
obliczenie zakończone? true
wynik: 123
Out[7]:
null

Rodzaje wykonawców (ExecutorService)

  • newCachedThreadPool - tworzy wątki w zależności od potrzeb i usuwa je jeśli nie są używane przez 60 sekund
  • newFixedThreadPool - cały czas przechowuje niezakończone wątki
  • newScheduledThreadPool - posiada możliwość odroczonego i periodycznego wykonania wątków
  • newSingleThreadExecutor - wykonanie jednowątkowe
  • newSingleThreadScheduledExecutor - jw. ale z możliwością odroczonego i periodycznego wykonania

ScheduledExecutor

In [2]:
import java.util.concurrent.*;
import static java.lang.System.out;
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

Runnable task = () -> out.println("Wykonanie " + 
    Thread.currentThread().getName());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);

TimeUnit.MILLISECONDS.sleep(300);

out.println("Czas pozostały do wykonania " + 
    future.getDelay(TimeUnit.MILLISECONDS));
executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);
Czas pozostały do wykonania 2699
Out[2]:
null

Hazard (Race condition)

In [36]:
class RaceCondition1 {
    private int counter = 0;
    
    public synchronized void increment() {
        this.counter += 1;
    }
    
    public synchronized int getCounter(){
        return this.counter;
    }
}
Out[36]:
com.twosigma.beaker.javash.bkr58d4ad01.RaceCondition1
In [11]:
import java.util.concurrent.*;
import java.util.stream.*;

RaceCondition object = new RaceCondition();

IntStream.range(0, 10000).forEach(i -> object.increment());

System.out.println(object.getCounter());
10000
Out[11]:
null
In [56]:
import java.util.concurrent.*;
import java.util.stream.*;

ExecutorService executor = Executors.newFixedThreadPool(2);

RaceCondition1 object = new RaceCondition1();

IntStream.range(0, 10000).forEach(i -> executor.submit(object::increment));

executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);

System.out.println(object.getCounter());
10000
Out[56]:
null

Sekcja krytyczna (synchronized)

In [26]:
class SynchronizedAccessors {
    private int counter = 0;

    public synchronized void increment() {
        this.counter += 1;
    }

    public synchronized int getCounter(){
        return this.counter;
    }
}
Out[26]:
com.twosigma.beaker.javash.bkrd472e7ed.SynchronizedAccessors
In [23]:
class Monitor {
    private int counter = 0;

    public void increment() {
        synchronized(this) {
            this.counter += 1;
        }
    }

    public int getCounter(){
        synchronized(this) {
            return this.counter;
        }
    }
}
Out[23]:
com.twosigma.beaker.javash.bkr95d596b3.Monitor

wait i notify

In [92]:
import java.util.*;
import static java.lang.System.out;

class StringStack {
    private List<String> stack = new LinkedList<>();
    
    public void push(String value){
        synchronized(this) {
            stack.add(value);
            notify();
        }
    }
    
    public String pop(){
        synchronized(this) {
            while(stack.isEmpty()){
                try {
                    wait();
                } catch (InterruptedException ex) {
                    out.println("Wątek został przerwany");
                }
            }
            return stack.remove(stack.size() - 1);
        }
    }
}
Out[92]:
com.twosigma.beaker.javash.bkrd242ace9.StringStack
In [95]:
import static java.lang.System.out;

StringStack stack = new StringStack();

Thread adder = new Thread(() -> {
    try {
        Thread.sleep(1000);
        stack.push("abc");
        Thread.sleep(1000);
        stack.push("def");
    } catch (InterruptedException ex) {
        out.println("Wątek dodający został przerwany");
    }
});

Thread reader = new Thread(() -> {
    out.println(stack.pop());
    out.println(stack.pop());
});


reader.start();
adder.start();
Out[95]:
null

ReentrantLock

In [96]:
import java.util.concurrent.locks.*;

class ReentrantLockAccessors {
    private int counter = 0;
    private Lock lock = new ReentrantLock();

    public void increment() {
        lock.lock();
        try {
            this.counter += 1;
        } finally {
            lock.unlock();
        }
    }

    public int getCounter(){
        lock.lock();
        try {
            return this.counter;
        } finally {
            lock.unlock();
        }
    }
}
Out[96]:
com.twosigma.beaker.javash.bkrd242ace9.ReentrantLockAccessors

AtomicInteger

In [ ]:
import java.util.stream.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000).forEach(i -> executor.submit(atomicInt::incrementAndGet));

executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);

System.out.println(atomicInt.get());
In [47]:
import java.util.stream.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000).forEach(i -> {
    executor.submit(() -> atomicInt.updateAndGet(n -> n + 2));
});

executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);

System.out.println(atomicInt.get());
2000
Out[47]:
null

AtomictInt

  • addAndGet
  • compareAndSet
  • decrementAndGet
  • get
  • getAndAdd
  • getAndDecrement
  • getAndIncrement
  • getAndSet
  • ...

LongAdder

In [101]:
import java.util.stream.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

LongAdder adder = new LongAdder();
ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000).forEach(i -> executor.submit(adder::increment));

executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);

System.out.println(adder.sumThenReset());
1000
Out[101]:
null

LongAccumulator

In [86]:
import java.util.stream.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.*;

LongBinaryOperator operation = (x, y) -> x + y;
LongAccumulator accumulator = new LongAccumulator(operation, 1);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 10).forEach(i -> executor.submit(
    () -> accumulator.accumulate(i)));

executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);

System.out.println(accumulator.getThenReset());
46
Out[86]:
null

Przetwarzanie współbieżne a kolekcje

  • CopyOnWriteArrayList
  • ConcurrentHashMap
  • parallelStream

ConcurrentModificationException

In [24]:
import java.util.*;
import java.util.stream.*;
import java.util.concurrent.*;

List<Integer> list = new LinkedList<>();  // <-----------------------

ExecutorService executor = Executors.newFixedThreadPool(200);

IntStream.range(1,50).forEach((i) -> executor.submit(() -> { 
  System.out.println("" + i + " from " + Thread.currentThread().getName());
  list.add(i); 
}));

executor.submit(() -> {
  Iterator<Integer> iterator = list.iterator();
  while(iterator.hasNext()){
    try {
      System.out.println("" + iterator.next() + " reading from " + 
          Thread.currentThread().getName());
    } catch(Exception ex) {
      ex.printStackTrace();
    }
  }
});

IntStream.range(1,50).forEach((i) -> executor.submit(() -> { 
  System.out.println("" + i + " from " + Thread.currentThread().getName());
  list.add(i); 
}));

executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);
Out[24]:
null

CopyOnWriteArrayList

In [118]:
import java.util.*;
import java.util.stream.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

List<Integer> list = new CopyOnWriteArrayList<>();  // <-----------------------

ExecutorService executor = Executors.newFixedThreadPool(200);

IntStream.range(1,50).forEach((i) -> executor.submit(() -> { 
  System.out.println("" + i + " from " + Thread.currentThread().getName());
  list.add(i); 
}));

executor.submit(() -> {
  Iterator<Integer> iterator = list.iterator();
  while(iterator.hasNext()){
    try {
      System.out.println("" + iterator.next() + " reading from " + 
          Thread.currentThread().getName());
    } catch(Exception ex) {
      ex.printStackTrace();
    }
  }
});

IntStream.range(1,50).forEach((i) -> executor.submit(() -> { 
  System.out.println("" + i + " from " + Thread.currentThread().getName());
  list.add(i); 
}));

executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);
Out[118]:
null

CommonPoolParallelism

In [ ]:
import java.util.concurrent.*;

System.out.println(ForkJoinPool.getCommonPoolParallelism());

//-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
3

ConcurrentHashMap

In [4]:
import java.util.concurrent.*;

ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.put("1", "jeden");
map.put("2", "dwa");
map.put("3", "trzy");
map.put("4", "cztery");
map.put("5", "pięć");
map.put("6", "sześć");
map.put("7", "siedem");

map.forEach(1, (key, value) -> System.out.printf("klucz: %s; wartość: %s; wątek: %s\n",
        key, value, Thread.currentThread().getName()));
klucz: 1; wartość: jeden; wątek: javash0
Out[4]:
null

search

In [ ]:
String result = map.search(1, (key, value) -> {
    System.out.println(Thread.currentThread().getName());
    if ("5".equals(key)) {
        return value;
    }
    return null;
});
System.out.println("Wynik: " + result);

reduce

In [ ]:
String result = map.reduce(1,
    (key, value) -> {
        System.out.println("Przekształcenie: " + Thread.currentThread().getName());
        return key + "=" + value;
    },
    (s1, s2) -> {
        System.out.println("Redukcja: " + Thread.currentThread().getName());
        return s1 + ", " + s2;
    });

System.out.println("Wynik: " + result);

stream

In [5]:
import java.util.stream.*;

class SerialRun {
  private static long MILLION = 1000 * 1000;
  public static void main(String[] args) {
    if(args.length != 1){
      System.out.println("Podaj liczbę uruchomień (w milionach)");
      return;
    }
    long top = MILLION * Integer.parseInt(args[0]);
    long start = System.nanoTime();

    double result = LongStream.range(0, top).filter(i -> i * i % 7 != 0).
        average().getAsDouble();

    long end = System.nanoTime();

    System.out.println(result);
    System.out.println((end - start) / 1000000000.0);
  }
}
Out[5]:
com.twosigma.beaker.javash.bkr6578c42e.SerialRun

parallel oraz parallelStream

In [ ]:
import java.util.stream.*;

class ParallelRun {
  private static long MILLION = 1000 * 1000;
  public static void main(String[] args) {
    if(args.length != 1){
      System.out.println("Podaj liczbę uruchomień (w milionach)");
      return;
    }
    long top = MILLION * Integer.parseInt(args[0]);
    long start = System.nanoTime();

    double result = LongStream.range(0, top).parallel().filter(i -> i * i % 7 != 0).
        average().getAsDouble();

    long end = System.nanoTime();

    System.out.println(result);
    System.out.println((end - start) / 1000000000.0);
  }
}

Pytania?