[Java, Concurrency] Синхронизаторы в Java

ВВЕДЕНИЕ

Синхронизаторы - часть пакета java.util.concurrent, позволяющие управлять потоками на высоком уровне (в отличие от управления на низком уровне такими средствами, как wait / notify / join и т.д.)

В статье будут рассмотрены основные синхронизаторы Java. Это:

  • CyclicBarrier
  • CountDownLatch
  • Phaser
  • Semaphore
  • Exchanger

CyclicBarrier

CyclicBarrier - это синхронизатор, позволяющий потокам ожидать друг друга в некоторой точке, перед тем, как продолжить выполнение. Также CyclicBarrier позволяет определить общую задачу, которая будет запущена после достижения барьера заданным количеством потоков.

Описанная функциональность достигается с помощью метода await(). Вызов этого метода означает, что вызывающий его поток достиг барьера.

После вызова await() поток останавливается, и продолжает свое выполнение лишь после того, как заданное число потоков также вызовут данный метод.

Таким образом, ситуация, когда await() вызван требуемым количеством потоков, означает преодоление барьера. После этого потоки продолжают выполнение, а также запускается заданная общая задача.

package com.alex.b.cyclicBarrier;

import com.alex.z.utils.TimeUtils;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class Test {

    public static void main(String[] args) {
        Runnable action = () -> {
            System.out.println("3 threads are done! Eureka! " + TimeUtils.currTime());
        };

        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, action);

        createThread(cyclicBarrier, 1000).start();
        createThread(cyclicBarrier, 3000).start();
        createThread(cyclicBarrier, 5000).start();
        createThread(cyclicBarrier, 7000).start();
    }

    private static Thread createThread(CyclicBarrier cb, long timeToSleep) {
        return new Thread(() -> {
            try {
                Thread.sleep(timeToSleep);
                System.out.println("Await. " + TimeUtils.currTime());
                cb.await();
                System.out.println("Continuing working");
            } catch (InterruptedException | BrokenBarrierException e) {
                // do nothing
            }
        });
    }
}

В этом примере был создан CyclicBarrier с параметром 3, означающим, что для преодоления барьера достаточно вызова метода await в 3-х потоках.

Результат исполнения программы следующий:

Await. 14:37:04
Await. 14:37:06
Await. 14:37:08
3 threads are done! Eureka! 14:37:08
Continuing working
Continuing working
Continuing working
Await. 14:37:10

Как видим, результат строго ожидаемый: после вызова await в трех потоках, все три потока продолжают выполнение.

CountDownLatch

CountDownLatch - похож на CyclicBarrier. Цель обеих сущностей - задать, как потоки будут ждать друг друга.

Однако есть важное отличие:

  • CyclicBarrier ориентирован на ожидание заданного количества потоков

  • CountDownLatch ориентирован на ожидание заданного количества задач,

Что это значит?

Если CyclicBarrier предоставляет лишь один метод await(), после которого поток останавливается, то CountDownLatch имеет метод countDown(). Метод countdown() даже в рамках одного потока может быть вызван сколько угодно раз, ведь он не останавливает выполнение потока.

Разберем на примере:

package com.alex.b.countdownLatch;

import com.alex.z.utils.TimeUtils;
import java.util.concurrent.CountDownLatch;

public class Test {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch cdl = new CountDownLatch(5);

        startThread(cdl, 2000);
        startThread(cdl, 3000);
        startThread(cdl, 5000);

        cdl.await();

        System.out.println("CountdownLatch RELEASED! " + TimeUtils.currTime());
    }

    private static void startThread(CountDownLatch cdl, long timeToSleep) {
        new Thread(() -> {
            try {
                Thread.sleep(timeToSleep);
                System.out.println("Counting down. " + TimeUtils.currTime());
                cdl.countDown();

                Thread.sleep(timeToSleep);
                System.out.println("Counting down. " + TimeUtils.currTime());
                cdl.countDown();

            } catch (InterruptedException e) {
                // do nothing
            }
        }).start();
    }
}

Результат выполнения:

Counting down. 08:56:48
Counting down. 08:56:49
Counting down. 08:56:50
Counting down. 08:56:51
Counting down. 08:56:52
CountdownLatch RELEASED! 08:56:52
Counting down. 08:56:56

Как видим в примере был создан CountDownLatch с числом 5. При этом было создано лишь три потока. Однако это не помешало в рамках одного потока вызвать countDown() дважды и тем самым в итоге высвободить CountDownLatch.

Phaser

Неформально говоря, Phaser - это напичканый разными возможностями синхронизатор, который может заменить как CyclicBarrier, так и CountDownLatch, которые по сути являются его частными случаями.

Пример использования CountDownLatch можно переписать, используя на этот раз Phaser:

package com.alex.b.phaser;

import com.alex.z.utils.TimeUtils;
import java.util.concurrent.Phaser;

public class Test1 {

    public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new Phaser(5);

        startThread(phaser, 2000);
        startThread(phaser, 3000);
        startThread(phaser, 5000);

        phaser.awaitAdvance(0);

        System.out.println("Phaser RELEASED! " + TimeUtils.currTime());
    }

    private static void startThread(Phaser phaser, long timeToSleep) {
        new Thread(() -> {
            try {
                Thread.sleep(timeToSleep);
                System.out.println("Arriving. " + TimeUtils.currTime());
                phaser.arrive();

                Thread.sleep(timeToSleep);
                System.out.println("Arriving. " + TimeUtils.currTime());
                phaser.arrive();

            } catch (InterruptedException e) {
                // do nothing
            }
        }).start();
    }
}

Результатом выполнения будет:

Arriving. 11:56:44
Arriving. 11:56:45
Arriving. 11:56:46
Arriving. 11:56:47
Arriving. 11:56:48
Phaser RELEASED! 11:56:48
Arriving. 11:56:52

Также можно переписать пример использования CyclicBarrier:

package com.alex.b.phaser;

import com.alex.z.utils.TimeUtils;
import java.util.concurrent.Phaser;

public class Test2 {

    public static void main(String[] args) {

        Phaser phaser = new Phaser(3);

        createThread(phaser, 1000).start();
        createThread(phaser, 3000).start();
        createThread(phaser, 5000).start();
        createThread(phaser, 7000).start();

        phaser.awaitAdvance(0);
        System.out.println("3 threads are done! Eureka! " + TimeUtils.currTime());
    }

    private static Thread createThread(Phaser phaser, long timeToSleep) {
        return new Thread(() -> {
            try {
                Thread.sleep(timeToSleep);
                System.out.println("Await. " + TimeUtils.currTime());
                phaser.arriveAndAwaitAdvance();
                System.out.println("Continuing working");
            } catch (InterruptedException e) {
                // do nothing
            }
        });
    }
}

Результат:

Await. 12:30:13
Await. 12:30:15
Await. 12:30:17
Continuing working
Continuing working
Continuing working
3 threads are done! Eureka! 12:30:17
Await. 12:30:19

Рассмотрим надстройки и дополнительные возможности Phaser.

  1. Возможность динамически изменять количество участников в отличие от CyclicBarrier, где количество участников можно задать только в конструкторе.

Каждый поток может добавить и убирать себя из участников методами register() arriveAndDeregister()

  1. Нумерация циклов синхронизации. Номер цикла называется фазой. Возможность отслеживать и привязываться к номерам фаз.

Основные методы:

  • int register() — регистрирует нового участника, который выполняет фазы. Возвращает номер текущей фазы;

  • int getPhase() — возвращает номер текущей фазы;

  • int arriveAndAwaitAdvance() — указывает что поток завершил выполнение фазы. Поток приостанавливается до момента, пока все остальные стороны не закончат выполнять данную фазу. Точный аналог CyclicBarrier.await(). Возвращает номер текущей фазы;
  • int arrive() — сообщает, что сторона завершила фазу, и возвращает номер фазы. При вызове данного метода поток не приостанавливается, а продолжает выполнятся;
  • int arriveAndDeregister() — сообщает о завершении всех фаз стороной и снимает ее с регистрации. Возвращает номер текущей фазы;
  • int awaitAdvance(int phase) — если phase равно номеру текущей фазы, приостанавливает вызвавший его поток до её окончания. В противном случае сразу возвращает аргумент.

Semaphore

Семафор - возможно, простейший синхронизатор.

Что можно делать с семафором?

  • Семафор можно захватывать и высвобождать (это может сделать любой поток в многопоточной среде).
  • Ограничить число одновременных захватов.

При попытке превысить заданное ограничение поток переходит в ожидание.

package com.alex.a.semaphores;

import com.alex.z.utils.TimeUtils;
import java.util.concurrent.Semaphore;

public class Test1 {

    private static final Semaphore semaphore = new Semaphore(2);

    public static void main(String[] args) {
        createThread().start();
        createThread().start();
        createThread().start();
    }

    private static Thread createThread() {
        return new Thread(() -> {
            try {
                semaphore.acquire();
                System.out.println("Acquired " + TimeUtils.currTime());
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                // do nothing
            } finally {
                semaphore.release();
                System.out.println("Released " + TimeUtils.currTime());
            }
        });
    }
}

Результат выполнения:

Acquired 09:16:32
Acquired 09:16:32
Released 09:16:34
Acquired 09:16:34
Released 09:16:34
Released 09:16:36

Exchanger

Exchanger - синхронизатор, позволяющий обмениваться данными между потоками в неких определенных точках.

В этом синхронизаторе ключевой и по сути единственный метод, - это exchange().

Будучи вызванным, поток останавливается в ожидании до тех пор, пока второй поток также не вызовет данный метод.

Также, разрешено передавать null, что позволяет отправлять данные в одностороннем порядке.

Пример использования:

package com.alex.b.exchager;

import com.alex.z.utils.TimeUtils;

import java.util.concurrent.Exchanger;

public class Test1 {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();

        Runnable taskA = () -> {
            try {
                System.out.println("A starting ... " + TimeUtils.currTime());
                Thread.sleep(1000);
                String message = exchanger.exchange("from A");
                System.out.println("Task A received message: " + message + " in " + TimeUtils.currTime());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        };

        Runnable taskB = () -> {
            try {
                System.out.println("B starting ... " + TimeUtils.currTime());
                Thread.sleep(2000);
                String message = exchanger.exchange("from B");
                System.out.println("Task B received message: " + message + " in " + TimeUtils.currTime());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        };

        new Thread(taskA).start();
        new Thread(taskB).start();
    }
}

Результат выполнения:

A starting ... 12:44:57
B starting ... 12:44:57
Task B received message: from A in 12:44:59
Task A received message: from B in 12:44:59