Javenue logo

Javenue

Программирование на Java

Информационные технологии

Многопоточность. Wait / notify и приоритеты при захвате монитора. Spurious wakeups. Starvation.

Продолжаю разрушать мифы по поводу многопоточности в Java.

Миф: Предположим, что несколько потоков (threads) висят на мониторе (monitor) объекта A. Поток T1 захватывает монитор (допустим, входит в synchronized блок) и через некоторое время уходит в wait. Если затем некоторый поток вызовет notify на объекте A, то приоритет по захвату монитора будет иметь тот поток, который уходил в wait, то есть T1.

Попытка объяснения мифа: Поток, который уходил в wait, мог сделать некоторые действия с учетом синхронизации по объекту A. Поэтому, чтобы свести к минимуму влияние работы других потоков на консистентность данных/состояния необходимо сначала дать возможность потоку T1 завершить свою работу в пределах блока синхронизации, а потом уже предоставить монитор объекта на растерзание другим потокам.

Предлагаю такой регламент: для начала совсем чуть-чуть порассуждаем самостоятельно, потом посмотрим, что по этому поводу думают "Java Language Specification" и "The Java Virtual Machine Specification", ну и наконец проведем ряд тестов, для подтверждения сомнительных высказываний из спецификаций :).

На первый взгляд пояснение и правда имеет право на существование. Но это только на первый. Действительно, зачем вообще вызывать wait? Собственно для того, чтобы не допустить поток к данным или состоянию, которые разработчик кода не считает консистентными. А может ли быть такое, что данные/состояние будут таковыми еще до ухода потока в waitset? Да, может. Но возможно в этом случае имело смысл вызвать wait ранее? Думаю, что да... А вызов метода notify и говорит нам: "все хорошо, вот ваши права, счастливой дороги".

Теперь посмотрим в Java Virtual Machine Specification:

Every object, in addition to having an associated lock, has an associated wait set, which is a set of threads. When an object is first created, its wait set is empty. Wait sets are used by the methods wait, notify, and notifyAll of class Object. These methods also interact with the scheduling mechanism for threads.

Далее:

...The thread T is then removed from the wait set and re-enabled for thread scheduling. It then locks the object again (which may involve competing in the usual manner with other threads)...

Что и требовалось доказать: никакой очереди приоритетов при попытке захвата монитора несколькими потоками не существует.

Перейдем к тестам. Сразу же приведу фрагмент кода:

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class TestWaitNotify extends Thread {
    // should be >= 3
    static final int THREAD_COUNT = 3;

    static final CyclicBarrier barrier = 
            new CyclicBarrier(THREAD_COUNT);
    static final Object mutex = new Object();
    static final AtomicInteger counter = new AtomicInteger(0);

    static volatile boolean exitCondition = false;

    TestWaitNotify(String name) {
        super(name);
    }

    public void run() {
        try {
            barrier.await();

            synchronized (mutex) {
                System.out.println(getName() + ": monitor acquired");

                if (counter.getAndIncrement() < THREAD_COUNT - 2) {
                    System.out.println(getName() + ": invoking wait");

                    while (!exitCondition)
                        mutex.wait();
                    exitCondition = false;

                    System.out.println(getName() + ": woken up");
                } 

                exitCondition = true;
                System.out.println(getName() + ": invoking notify");
                mutex.notify();
            }
            System.out.println(getName() + ": monitor released");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            Thread t = new TestWaitNotify("Thread-" + i);
            t.start();
        }
    }
}

CyclicBarrier используется для обеспечения практически одновременного попадания потоков на монитор объекта mutex. Чтобы быть более уверенным, можно перед вызовом wait добавить еще sleep. Поле THREAD_COUNT я ввел для того, чтобы вам было удобнее проверить правильность утверждения для большего количества потоков на wait set'e.

Для чистоты эксперимента обрамил вызов wait в цикл. Подробности читайте ниже, а так же в комментариях.

Ниже представлены 2 результата для THREAD_COUNT = 3:

Thread-2: monitor acquired
Thread-2: invoking wait
Thread-0: monitor acquired
Thread-0: invoking notify
Thread-0: monitor released
Thread-2: woken up
Thread-2: invoking notify
Thread-2: monitor released
Thread-1: monitor acquired
Thread-1: invoking notify
Thread-1: monitor released
Thread-1: monitor acquired
Thread-1: invoking wait
Thread-2: monitor acquired
Thread-2: invoking notify
Thread-2: monitor released
Thread-0: monitor acquired
Thread-0: invoking notify
Thread-0: monitor released
Thread-1: woken up
Thread-1: invoking notify
Thread-1: monitor released

В первом случае после вызова notify монитор захватит поток, уходивший в wait, а во втором - тот, который ждал захвата монитора на synchronized блоке.

Кстати, тест для THREAD_COUNT > 3 показывает еще и то, что вызов notify разбудит случайный поток из wait set'a, что разрушает еще один маленький мифчик об очереди пробуждения потоков, находящихся в wait-set.

Updated (9.02.2011): Всегда с удовольствием веду дискуссии на тему моих статей и еще с большим удовольствием реагирую на замечания и отвечаю на вопросы. Но все же попадаются люди (и часто достаточно опытные), которые не читают статьи, а просто оставляют "умный комментарий по теме" со ссылкой на свой блог. Вот очередной:

Есть такая штука как spurious wakeup:

java.lang.Object#wait:

A thread can also wake up without being notified, interrupted, or 
timing out, a so-called spurious wakeup... 
(и дальше куча копипаста из джавадока)

Так, что причём тут мифы… всё вполне нормально задокументировано.

Все же, если отбросить последнее предложение, можно с натяжкой предположить, что комментарий касался того, что вызов wait в коде не обрамлен циклом с проверкой условия. Насколько мне известно, spurios wakeup может возникнуть только на Unix/Linux системах на многопроцессорных машинах, и то, если будет использоваться API, описанное стандартом Pthreads. Кроме того, уже в нескольких источниках читал, что Sun JVM его не использует, хотя поиском по файлам из JRE все же нашел названия соответсвующих функций в исполняемых файлах.

Делать все равно особо нечего, поэтому написал небольшой класс, который пытается воспроизвести spurious wakeup. Вот код (если при старте будет возникать OutOfMemoryError, просто уменьшите значение MAX_THREADS):

import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class TestSpuriousWakeups {
    static final int MAX_THREADS = 6000;

    static final Object mutex = new Object();

    static final CountDownLatch allThreadsStarted = 
            new CountDownLatch(MAX_THREADS);
    static final CountDownLatch allThreadsFinished = 
            new CountDownLatch(1);

    static final AtomicInteger processedThreads = new AtomicInteger();
    static final AtomicInteger notifiedThreads = new AtomicInteger();

    static volatile boolean continueCondition = true;

    static final Random sleepRandom = new Random();

    static class Worker extends Thread {
        public void run() {
            try {
                synchronized (mutex) {
                    allThreadsStarted.countDown();

                    mutex.wait();
                }

                continueCondition = true;
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                processedThreads.incrementAndGet();
            }
        }
    }

    static class Notifier extends Thread {
        public void run() {
            while (true) {
                // remove block to achieve starvation
                while (!continueCondition)
                    doStuff();

                // all threads finished their execution
                if (processedThreads.get() == MAX_THREADS)
                    break;

                synchronized (mutex) {
                    doStuff();

                    mutex.notify();
                    continueCondition = false;
                    notifiedThreads.incrementAndGet();
                }
            }

            allThreadsFinished.countDown();
        }

        // just to emulate some activity
        void doStuff() {
            try { Thread.sleep(sleepRandom.nextInt(5)); }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < MAX_THREADS; i++)
            new Worker().start();

        // wait for all workers to start execution
        allThreadsStarted.await();

        new Notifier().start();

        // wait for all workers and notifier to finish execution
        allThreadsFinished.await();

        System.out.println("Spurious wakeups count: "
                + (MAX_THREADS - notifiedThreads.get()));
    }
}

Идея в том, чтобы проверить количество вызовов метода notify. Если оно будет меньше, чем MAX_THREADS, значит имело место внезапное пробуждение одного или нескольких потоков. Результат большого количества запусков (на двухъядерной машине):

Spurious wakeups count: 0

Updated (20.02.2011): В комментариях упомянули о starvation.

Starvation describes a situation where a thread is unable to gain regular access to shared resources and is unable to make progress. This happens when shared resources are made unavailable for long periods by "greedy" threads... other threads that also need frequent synchronized access to the same object will often be blocked.

Если убрать цикл while с проверкой условия из метода run класса Notifier, то как раз и возникнет ситуация starvation - поток Notifier просто не будет давать Worker'ам захватить монитор. Вот dump потоков такой ситуации (для MAX_THREADS = 3):

"Notifier" prio=10 tid=0x0a1cc400 nid=0x5979 waiting on condition [0x8f854000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
	at java.lang.Thread.sleep(Native Method)
	at TestSpuriousWakeups$Notifier.doStuff(TestSpuriousWakeups.java:67)
	at TestSpuriousWakeups$Notifier.run(TestSpuriousWakeups.java:54)
	- locked <0xb017fb20> (a java.lang.Object)

"Thread-2" prio=10 tid=0x0a1cac00 nid=0x5978 in Object.wait() [0x8f8a5000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at java.lang.Object.wait(Native Method)
	- waiting on <0xb017fb20> (a java.lang.Object)
	at java.lang.Object.wait(Object.java:485)
	at TestSpuriousWakeups$Worker.run(TestSpuriousWakeups.java:26)
	- locked <0xb017fb20> (a java.lang.Object)

"Thread-1" prio=10 tid=0x0a1bcc00 nid=0x5977 in Object.wait() [0x8f90a000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at java.lang.Object.wait(Native Method)
	- waiting on <0xb017fb20> (a java.lang.Object)
	at java.lang.Object.wait(Object.java:485)
	at TestSpuriousWakeups$Worker.run(TestSpuriousWakeups.java:26)
	- locked <0xb017fb20> (a java.lang.Object)

"Thread-0" prio=10 tid=0x0a1be400 nid=0x5976 waiting for monitor entry [0x8f95b000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at java.lang.Object.wait(Native Method)
	- waiting on <0xb017fb20> (a java.lang.Object)
	at java.lang.Object.wait(Object.java:485)
	at TestSpuriousWakeups$Worker.run(TestSpuriousWakeups.java:26)
	- locked <0xb017fb20> (a java.lang.Object)

"main" prio=10 tid=0x0a035800 nid=0x596a waiting on condition [0xb6a65000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0xb017ff40> (a java.util.concurrent.CountDownLatch$Sync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:747)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:905)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1217)
	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
	at TestSpuriousWakeups.main(TestSpuriousWakeups.java:84)
        ...

Потоки Thread-0, Thread-1, Thread-2 будут блокироваться на длительные периоды времени. И это не смотря на то, что между последовательными захватами монитора mutex потоком Notifier есть возможность его захвата и для других потоков.

Пока что все. Замечания и комментарии приветствуются.



Комментариев: 3

  Выйти

  * для публикации комментариев нужно  

Vladimir Dolzhenko:

Есть такая штука как spurious wakeup:

java.lang.Object#wait:

A thread can also wake up without being notified, interrupted, or timing out, a so-called spurious wakeup. While this will rarely occur in practice, applications must guard against it by testing for the condition that should have caused the thread to be awakened, and continuing to wait if the condition is not satisfied. In other words, waits should always occur in loops, like this one:

synchronized (obj) {
while ()
obj.wait(timeout);
… // Perform action appropriate to condition
}

(For more information on this topic, see Section 3.2.3 in Doug Lea’s “Concurrent Programming in Java (Second Edition)” (Addison-Wesley, 2000), or Item 50 in Joshua Bloch’s “Effective Java Programming Language Guide” (Addison-Wesley, 2001).

Так, что причём тут мифы… всё вполне нормально задокументировано.

c0nst:

Мне известно, что такое spurious wakeup. Не очень понял, как это относится к теме статьи. Особенно меня смущает фраза “Так, что причём тут мифы… всё вполне нормально задокументировано.”
Не захотел я усложнять код, так как у многих начинающих программистов на Java стоит винда + Sun JVM, где spurious wakeups, насколько мне известно, не проявляются. Тем более в таком маленьком фрагменте кода они с огромной степенью вероятности не проявятся и на *NIX системах.

Slava:

Не уверен по поводу того, что то, что Вы описали это миф: synchronized block не исключает возникновение ситуации starvation, это известно и много где описано (это и подразумевается под spurious wakeups), и следовательно никакой очереди ожидания нет. Для этого как раз и был добавлен fair sync в ReentrantLock, чтобы иметь ту самую очередь и чтобы монитор мог захватить тот поток, который дольше всего его ждал. С точки зрения доказательства того, что starvation возникает при использовании synchronized блоков, статья хорошая, но Вы ничего об этом не упомянули.