Programowanie współbieżne

Wątki, zadania, synchronizacja, monitory, wartości atomowe, kolekcje


dr inż. Aleksander Smywiński-Pohl

apohllo@agh.edu.pl

http://apohllo.pl/dydaktyka/programowanie-obiektowe

konsultacje: wtorek 15:30 - 18:00, pokój 4.61

Część przykładów pochodzi ze strony: http://winterbe.com

Pytanie 1

1) Refleksja pozwala na uzyskanie definicji pól protected lub final, usunięcie modyfikatora protected i manipulowanie tym polem tak jakby było zadeklarowane jako mutowalne. Taka możliwość pozwala na obejście założeń, które jako developerzy uczyniliśmy projektując jakąś aplikacje i ukrywając część jej implmentacji. Czy można uczynić nasze klasy i obiekty odpornymi na refleksje?

Pytanie 2

Dlaczego w typach generycznych w Javie jest rozróżnienie na <? extends NameOfType> oraz <? super NameOfType>.

Pytanie 3

Dlaczego nie można używać typu prostego jako parametru w klasie generycznej?
Może być to irytujące dla programistów.

Pytanie 4

Czy może nastąpić sytuacja, kiedy możemy poznać typ typu generycznego podczas runtime'a?

Java + Threads

Program, proces, wątek

  • program - plik lub zestaw plików opisujących w jakis sposób należy przetwarzać dane
  • proces - uruchomiony program posiadający własną pamięć oraz licznik instrukcji
  • wątek - lekki proces w obrębie działającego programu, posiadający własny stos oraz licznik instrukcji

Klasa Thread

API klasy Thread:

  • run()
  • start()
  • join()
  • currentThread()
  • getName()
  • ...
In [14]:
import static java.lang.System.out;
import java.util.List;
import java.util.LinkedList;

List<Thread> threads = new LinkedList<>();

threads.add(new Thread(() -> out.println("Wątek " + Thread.currentThread().getName())));
threads.add(new Thread(() -> out.println("Wątek " + Thread.currentThread().getName())));
threads.add(new Thread(() -> out.println("Wątek " + Thread.currentThread().getName())));

threads.forEach(Thread::start);
for(Thread thread : threads){
    thread.join();
}
Wątek Thread-41
Wątek Thread-42
Wątek Thread-40

Interfejs Runnable

API interfejsu Runnable:

  • run()
In [31]:
import static java.lang.System.out;
import java.util.List;
import java.util.LinkedList;

List<Runnable> tasks = new LinkedList<>();

tasks.add(() -> out.println("Zadanie " + Thread.currentThread().getName()));
tasks.add(() -> out.println("Zadanie " + Thread.currentThread().getName()));
tasks.add(() -> out.println("Zadanie " + Thread.currentThread().getName()));

tasks.forEach((task) -> new Thread(task).start());
Zadanie Thread-92
Zadanie Thread-91
Zadanie Thread-93

Thread#sleep()

In [32]:
import static java.lang.System.out;
import java.util.concurrent.*;

Thread sleepingThread = new Thread(() -> {
    try{
        out.println("Idę spać na 3 sekudny");
        TimeUnit.SECONDS.sleep(3);
        out.println("Godzinę później...");
    } catch (InterruptedException ex) {
        out.println("Sen został przerwany");
    }
});
sleepingThread.start();
sleepingThread.join();
Idę spać na 3 sekudny
Godzinę później...

ExecutorService

In [ ]:
import java.util.concurrent.*;
import static java.lang.System.out;

ExecutorService executor =  Executors.newSingleThreadExecutor();
executor.submit(() -> out.println("Egzekucja w " + Thread.currentThread().getName()));
executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);

Future

In [ ]:
interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    V get();
    V get(long timeout, TimeUnit unit);
    boolean isCancelled();
    boolean isDone();
}
In [34]:
import java.util.concurrent.*;
Callable<Integer> task = () -> {
    try {
        TimeUnit.SECONDS.sleep(3);
        return 123;
    } catch (InterruptedException e) {
        throw new IllegalStateException("wątek został przerwany", e);
    }
};

ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);

out.println("obliczenie zakończone? " + future.isDone());

Integer result = future.get();

out.println("obliczenie zakończone? " + future.isDone());
out.print("wynik: " + result);
obliczenie zakończone? false
obliczenie zakończone? true
wynik: 123

Rodzaje wykonawców (ExecutorService)

  • newCachedThreadPool - tworzy wątki w zależności od potrzeb i usuwa je jeśli nie są używane przez 60 sekund
  • newFixedThreadPool - cały czas przechowuje niezakończone wątki
  • newScheduledThreadPool - posiada możliwość odroczonego i periodycznego wykonania wątków
  • newSingleThreadExecutor - wykonanie jednowątkowe
  • newSingleThreadScheduledExecutor - jw. ale z możliwością odroczonego i periodycznego wykonania

ScheduledExecutor

In [37]:
import java.util.concurrent.*;
import static java.lang.System.out;
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

Runnable task = () -> out.println("Wykonanie odroczonego zadania w " + Thread.currentThread().getName());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);

TimeUnit.MILLISECONDS.sleep(300);

out.println("Czas pozostały do wykonania " + future.getDelay(TimeUnit.MILLISECONDS));
TimeUnit.SECONDS.sleep(3);
executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);
Czas pozostały do wykonania 2675
Wykonanie odroczonego zadania w pool-5-thread-1
Out[37]:
true

Hazard (Race condition)

In [38]:
class RaceCondition {
    private int counter = 0;
    
    public void increment() {
        this.counter += 1;
    }
    
    public int getCounter(){
        return this.counter;
    }
}
In [39]:
import java.util.concurrent.*;
import java.util.stream.*;

RaceCondition object = new RaceCondition();

IntStream.range(0, 1000000).forEach(i -> object.increment());

System.out.println(object.getCounter());
1000000
In [40]:
import java.util.concurrent.*;
import java.util.stream.*;

ExecutorService executor = Executors.newFixedThreadPool(2);

RaceCondition object = new RaceCondition();

IntStream.range(0, 1000000).forEach(i -> executor.submit(object::increment));

executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);
Out[40]:
true
In [ ]:
http://kahoot.com
In [41]:
System.out.println(object.getCounter());
986853

Sekcja krytyczna (synchronized)

In [43]:
class SynchronizedAccessors {
    private int counter = 0;

    public synchronized void increment() {
        this.counter += 1;
    }

    public synchronized int getCounter(){
        return this.counter;
    }
}
In [44]:
import java.util.concurrent.*;
import java.util.stream.*;

ExecutorService executor = Executors.newFixedThreadPool(2);

SynchronizedAccessors object = new SynchronizedAccessors();

IntStream.range(0, 1000000).forEach(i -> executor.submit(object::increment));

executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);

System.out.println(object.getCounter());
1000000
In [ ]:
class Monitor {
    private int counter = 0;

    public void increment() {
        synchronized(this) {
            this.counter += 1;
        }
    }

    public int getCounter(){
        synchronized(this) {
            return this.counter;
        }
    }
}

wait i notify

In [46]:
import java.util.*;

class StringStack {
    private List<String> stack = new LinkedList<>();
    
    public void push(String value){
        synchronized(this) {
            stack.add(value);
            notify();
        }
    }
    
    public String pop(){
        synchronized(this) {
            while(stack.isEmpty()){
                try {
                    wait();
                } catch (InterruptedException ex) {
                    out.println("Wątek został przerwany");
                }
            }
            return stack.remove(stack.size() - 1);
        }
    }
}
In [49]:
StringStack stack = new StringStack();

Thread ideaProducer = new Thread(() -> {
    try {
        out.println("Myślę");
        Thread.sleep(3000);
        out.println("Produkuję myśl");
        stack.push("Myślę więc jestem");
        Thread.sleep(3000);
        out.println("Produkuję myśl");
        stack.push("Nicość nicuje");
    } catch (InterruptedException ex) {
        out.println("Wątek dodający został przerwany");
    }
});

Thread ideaConsumer = new Thread(() -> {
    out.println("Konsumuję myśl: " + stack.pop());
    out.println("Konsumuję myśl: " + stack.pop());
});

ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(ideaConsumer);
executor.submit(ideaProducer);
executor.shutdown();
executor.awaitTermination(7, TimeUnit.SECONDS);
Myślę
Produkuję myśl
Konsumuję myśl: Myślę więc jestem
Produkuję myśl
Konsumuję myśl: Nicość nicuje
Out[49]:
true

Klasa ReentrantLock

In [ ]:
import java.util.concurrent.locks.*;

class ReentrantLockAccessors {
    private int counter = 0;
    private Lock lock = new ReentrantLock();

    public void increment() {
        lock.lock();
        try {
            this.counter += 1;
        } finally {
            lock.unlock();
        }
    }

    public int getCounter(){
        lock.lock();
        try {
            return this.counter;
        } finally {
            lock.unlock();
        }
    }
}

AtomicInteger

In [50]:
import java.util.stream.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000000).forEach(i -> executor.submit(atomicInt::incrementAndGet));

executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);
out.println(atomicInt.get());
1000000
In [51]:
AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 10000).forEach(i -> {
    executor.submit(() -> atomicInt.updateAndGet(n -> n + i));
});

executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);
out.println(atomicInt.get());
49995000

AtomictInteger

  • addAndGet
  • compareAndSet
  • decrementAndGet
  • get
  • getAndAdd
  • getAndDecrement
  • getAndIncrement
  • getAndSet
  • ...

LongAdder

In [52]:
import java.util.stream.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

LongAdder adder = new LongAdder();
ExecutorService executor = Executors.newFixedThreadPool(4);

IntStream.range(0, 1000000).forEach(i -> executor.submit(adder::increment));

executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);

System.out.println(adder.sumThenReset());
1000000

LongAccumulator

In [53]:
import java.util.stream.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.*;

LongBinaryOperator operation = (x, y) -> x + y;
LongAccumulator accumulator = new LongAccumulator(operation, 0);

ExecutorService executor = Executors.newFixedThreadPool(4);

IntStream.range(0, 1000000).forEach(i -> executor.submit(() -> accumulator.accumulate(i)));

executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);

System.out.println(accumulator.getThenReset());
499999500000

Przetwarzanie współbieżne a kolekcje

  • CopyOnWriteArrayList
  • ConcurrentHashMap
  • parallelStream

ConcurrentModificationException

In [ ]:
import java.util.*;
import java.util.stream.*;
import java.util.concurrent.*;

List<Integer> list = new LinkedList<>();  // <-----------------------
ExecutorService executor = Executors.newFixedThreadPool(200);
IntStream.range(1,50).forEach((i) -> executor.submit(() -> { 
      System.out.println("" + i + " writing to LinkedList " + Thread.currentThread().getName());
      list.add(i); 
}));
executor.submit(() -> {
      Iterator<Integer> iterator = list.iterator();
      while(iterator.hasNext()){
            try {
                  Thread.sleep(20);
                  System.out.println("" + iterator.next() + " reading from LinkedList " + 
                  Thread.currentThread().getName());
            } catch(Exception ex) {
                  ex.printStackTrace();
            }
      }
});
IntStream.range(1,50).forEach((i) -> executor.submit(() -> { 
      System.out.println("" + i + " writing to LinkedList " + Thread.currentThread().getName());
      list.add(i); 
}));
executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);

CopyOnWriteArrayList

In [ ]:
import java.util.*;
import java.util.stream.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
List<Integer> list = new CopyOnWriteArrayList<>();  // <-----------------------
ExecutorService executor = Executors.newFixedThreadPool(200);
IntStream.range(1,50).forEach((i) -> executor.submit(() -> { 
      System.out.println("" + i + " writing to CopyOnWriteArrayList " + Thread.currentThread().getName());
      list.add(i); 
}));
executor.submit(() -> {
      Iterator<Integer> iterator1 = list.iterator();
      while(iterator1.hasNext()){
            try {
                Thread.sleep(20);
                System.out.println("" + iterator1.next() + " reading from CopyOnWriteArrayList " + 
                Thread.currentThread().getName());
            } catch(Exception ex) {
                ex.printStackTrace();
            }
      }
});
IntStream.range(1,50).forEach((i) -> executor.submit(() -> { 
      System.out.println("" + i + " writing to CopyOnWriteArrayList " + Thread.currentThread().getName());
      list.add(i); 
}));
executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);

CommonPoolParallelism

In [ ]:
import java.util.concurrent.*;

System.out.println(ForkJoinPool.getCommonPoolParallelism());

//-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

ConcurrentHashMap

In [ ]:
import java.util.concurrent.*;

ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.put("1", "jeden");
map.put("2", "dwa");
map.put("3", "trzy");
map.put("4", "cztery");
map.put("5", "pięć");
map.put("6", "sześć");
map.put("7", "siedem");

map.forEach(1, (key, value) -> System.out.printf("klucz: %s; wartość: %s; wątek: %s\n",
        key, value, Thread.currentThread().getName()));

search

In [ ]:
String result = map.search(1, (key, value) -> {
    System.out.println(Thread.currentThread().getName());
    if ("2".equals(key)) {
        return value;
    }
    return null;
});
System.out.println("Wynik: " + result);

reduce

In [ ]:
String result = map.reduce(1,
    (key, value) -> {
        System.out.println("Przekształcenie: " + Thread.currentThread().getName());
        return key + "=" + value;
    },
    (s1, s2) -> {
        System.out.println("Redukcja: " + Thread.currentThread().getName());
        return s1 + ", " + s2;
    });

System.out.println("Wynik: " + result);

stream

In [ ]:
import java.util.stream.*;


long start = System.nanoTime();
long multiplier = 1000;

double result = LongStream.range(0, 1000000 * multiplier).filter(i -> i * i % 7 != 0).
    average().getAsDouble();

long end = System.nanoTime();

System.out.println(result);
System.out.println((end - start) / 1000000000.0);

parallel oraz parallelStream

In [ ]:
import java.util.stream.*;

long start = System.nanoTime();
long multiplier = 1000;

double result = LongStream.range(0, 1000000 * multiplier).parallel().filter(i -> i * i % 7 != 0).
    average().getAsDouble();

long end = System.nanoTime();

System.out.println(result);
System.out.println((end - start) / 1000000000.0);

Pytania?