Część przykładów pochodzi ze strony: http://winterbe.com
Thread
¶import static System.out;
List<Thread> threads = new LinkedList<>();
threads.add(new Thread(() -> out.println("Wątek " +
Thread.currentThread().getName())));
threads.add(new Thread(() -> out.println("Wątek " +
Thread.currentThread().getName())));
threads.add(new Thread(() -> out.println("Wątek " +
Thread.currentThread().getName())));
threads.forEach(Thread::start);
Runnable
¶List<Runnable> tasks = new LinkedList<>();
tasks.add(() -> out.println("Zadanie " +
Thread.currentThread().getName()));
tasks.add(() -> out.println("Zadanie " +
Thread.currentThread().getName()));
tasks.add(() -> out.println("Zadanie " +
Thread.currentThread().getName()));
tasks.forEach((task) -> new Thread(task).start());
Thread#sleep()
¶import java.util.concurrent.*;
Thread sleepingThread = new Thread(() -> {
try{
out.println("Przed snem");
TimeUnit.SECONDS.sleep(1);
out.println("Po śnie");
} catch (InterruptedException ex) {
out.println("Wątek został przerwany");
}
});
sleepingThread.start();
ExecutorService
¶ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> out.println("Wykonanie " +
Thread.currentThread().getName()));
Future
¶Callable<Integer> task = () -> {
try {
TimeUnit.SECONDS.sleep(1);
return 123;
} catch (InterruptedException e) {
throw new IllegalStateException("wątek został przerwany", e);
}
};
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);
System.out.println("obliczenie zakończone? " + future.isDone());
Integer result = future.get();
System.out.println("obliczenie zakończone? " + future.isDone());
System.out.print("wynik: " + result);
ExecutorService
)¶newCachedThreadPool
- tworzy wątki w zależności od potrzeb i usuwa je jeśli nie są używane przez 60 sekundnewFixedThreadPool
- cały czas przechowuje niezakończone wątkinewScheduledThreadPool
- posiada możliwość odroczonego i periodycznego wykonania wątkównewSingleThreadExecutor
- wykonanie jednowątkowenewSingleThreadScheduledExecutor
- jw. ale z możliwością odroczonego i periodycznego wykonaniaScheduledExecutor
¶ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
Runnable task = () -> out.println("Wykonanie " +
Thread.currentThread().getName());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);
TimeUnit.MILLISECONDS.sleep(300);
out.println("Czas pozostały do wykonania " +
future.getDelay(TimeUnit.MILLISECONDS));
import java.util.stream.*;
class RaceCondition {
private int counter = 0;
public void increment() {
this.counter += 1;
}
public int getCounter(){
return this.counter;
}
}
RaceCondition object = new RaceCondition();
IntStream.range(0, 10000).forEach(i -> object.increment());
out.println(object.getCounter());
ExecutorService executor = Executors.newFixedThreadPool(2);
object = new RaceCondition();
IntStream.range(0, 10000).forEach(i -> executor.submit(object::increment));
out.println(object.getCounter());
synchronized
)¶class SynchronizedAccessors {
private int counter = 0;
public synchronized void increment() {
this.counter += 1;
}
public synchronized int getCounter(){
return this.counter;
}
}
class Monitor {
private int counter = 0;
public void increment() {
synchronized(this) {
this.counter += 1;
}
}
public int getCounter(){
synchronized(this) {
return this.counter;
}
}
}
wait
i notify
¶class StringStack {
private List<String> stack = new LinkedList<>();
public void push(String value){
synchronized(this) {
stack.add(value);
notify();
}
}
public String pop(){
synchronized(this) {
while(stack.isEmpty()){
try {
wait();
} catch (InterruptedException ex) {
out.println("Wątek został przerwany");
}
}
return stack.remove(stack.size() - 1);
}
}
}
StringStack stack = new StringStack();
Thread adder = new Thread(() -> {
try {
sleep(1000);
stack.push("abc");
sleep(1000);
stack.push("def");
} catch (InterruptedException ex) {
out.println("Wątek dodający został przerwany");
}
});
Thread reader = new Thread(() -> {
out.println(stack.pop());
out.println(stack.pop());
});
reader.start();
adder.start();
ReentrantLock
¶import java.util.concurrent.locks.*;
class ReentrantLockAccessors {
private int counter = 0;
private Lock lock = new ReentrantLock();
public void increment() {
lock.lock();
try {
this.counter += 1;
} finally {
lock.unlock();
}
}
public int getCounter(){
lock.lock();
try {
return this.counter;
} finally {
lock.unlock();
}
}
}
AtomicInteger
¶import java.util.stream.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
AtomicInteger atomicInt = new AtomicInteger(0);
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 1000).forEach(i -> executor.submit(atomicInt::incrementAndGet));
System.out.println(atomicInt.get());
AtomicInteger atomicInt = new AtomicInteger(0);
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 1000).forEach(i -> {
Runnable task = () -> atomicInt.updateAndGet(n -> n + 2);
executor.submit(task);
});
System.out.println(atomicInt.get());
AtomictInt
¶addAndGet
compareAndSet
decrementAndGet
get
getAndAdd
getAndDecrement
getAndIncrement
getAndSet
LongAdder
¶LongAdder adder = new LongAdder();
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 1000).forEach(i -> executor.submit(adder::increment));
System.out.println(adder.sumThenReset());
LongAccumulator
¶import java.util.function.*;
LongBinaryOperator operation = (x, y) -> 2 * x + y;
LongAccumulator accumulator = new LongAccumulator(operation, 1);
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 10).forEach(i -> executor.submit(
() -> accumulator.accumulate(i)));
System.out.println(accumulator.getThenReset());
CopyOnWriteArrayList
ConcurrentHashMap
parallelStream
List<Integer> list = new LinkedList<>(); // <-----------------------
ExecutorService executor = Executors.newFixedThreadPool(200);
IntStream.range(1,50).forEach((i) -> executor.submit(() -> {
out.println("" + i + " from " + Thread.currentThread().getName());
list.add(i);
}));
executor.submit(() -> {
Iterator<Integer> iterator = list.iterator();
while(iterator.hasNext()){
try {
out.println("" + iterator.next() + " reading from " +
Thread.currentThread().getName());
} catch(Exception ex) {
ex.printStackTrace();
}
}
});
IntStream.range(1,50).forEach((i) -> executor.submit(() -> {
out.println("" + i + " from " + Thread.currentThread().getName());
list.add(i);
}));
CopyOnWriteArrayList
¶List<Integer> list = new CopyOnWriteArrayList<>(); // <-----------------------
ExecutorService executor = Executors.newFixedThreadPool(200);
IntStream.range(1,50).forEach((i) -> executor.submit(() -> {
out.println("" + i + " from " + Thread.currentThread().getName());
list.add(i);
}));
executor.submit(() -> {
Iterator<Integer> iterator = list.iterator();
while(iterator.hasNext()){
try {
out.println("" + iterator.next() + " reading from " +
Thread.currentThread().getName());
} catch(Exception ex) {
ex.printStackTrace();
}
}
});
IntStream.range(1,50).forEach((i) -> executor.submit(() -> {
out.println("" + i + " from " + Thread.currentThread().getName());
list.add(i);
}));
CommonPoolParallelism
¶System.out.println(ForkJoinPool.getCommonPoolParallelism());
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
ConcurrentHashMap
¶ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.put("1", "jeden");
map.put("2", "dwa");
map.put("3", "trzy");
map.put("4", "cztery");
map.put("5", "pięć");
map.put("6", "sześć");
map.put("7", "siedem");
map.forEach(1, (key, value) -> out.printf("klucz: %s; wartość: %s; wątek: %s\n",
key, value, Thread.currentThread().getName()));
search
¶String result = map.search(1, (key, value) -> {
out.println(Thread.currentThread().getName());
if ("5".equals(key)) {
return value;
}
return null;
});
out.println("Wynik: " + result);
reduce
¶String result = map.reduce(1,
(key, value) -> {
out.println("Przekształcenie: " + Thread.currentThread().getName());
return key + "=" + value;
},
(s1, s2) -> {
out.println("Redukcja: " + Thread.currentThread().getName());
return s1 + ", " + s2;
});
out.println("Wynik: " + result);
stream
¶import java.util.stream.*;
class SerialRun {
private static long MILLION = 1000 * 1000;
public static void main(String[] args) {
if(args.length != 1){
System.out.println("Podaj liczbę uruchomień (w milionach)");
return;
}
long top = MILLION * Integer.parseInt(args[0]);
long start = System.nanoTime();
double result = LongStream.range(0, top).filter(i -> i * i % 7 != 0).
average().getAsDouble();
long end = System.nanoTime();
System.out.println(result);
System.out.println((end - start) / 1000000000.0);
}
}
parallel
oraz parallelStream
¶import java.util.stream.*;
class ParallelRun {
private static long MILLION = 1000 * 1000;
public static void main(String[] args) {
if(args.length != 1){
System.out.println("Podaj liczbę uruchomień (w milionach)");
return;
}
long top = MILLION * Integer.parseInt(args[0]);
long start = System.nanoTime();
double result = LongStream.range(0, top).parallel().filter(i -> i * i % 7 != 0).
average().getAsDouble();
long end = System.nanoTime();
System.out.println(result);
System.out.println((end - start) / 1000000000.0);
}
}