MENU

Concurrency

2021 年 01 月 10 日 • 阅读: 902 • 英语

电脑用户理所当然地认为系统可以同时处理多个任务。他们假定可以一边处理 Word,一边下载文件,一边管理打印队列,还能进行音乐串流。甚至单个应用也被期待同时完成多个任务。例如,那个音频串流程序必须同时从网络读取数字音频,解压,管理播放,还要更新显示。甚至 Word 处理程序也总是时刻准备好响应键盘和鼠标事件,无论它多么忙于格式化文本或者更新显示信息。能够同时完成这些事情的软件称为 concurrent 软件。

Java 平台从地基设计支持并发编程,语言和类库都支持基本的并发操作。从 5.0 版本开始,Java 平台也包含了高级并发 API。本课程将介绍平台的基本并发支持,并总结一些 java.util.concurrent 包下的高级 API。

Processes and Threads

并发编程里有两个基本可执行单元:processesthreads。Java 编程语言中的并发主要关注线程,但进程同样重要。

正常情况下,电脑系统运行着许多活跃的进程和线程。即便是单核系统,特定时刻只能执行一个线程,该核心的处理时间也通过操作系统的时间片功能被许多进程和线程共享。

Processes

进程拥有独立执行环境,含有完全私有的基本运行时资源集,尤其是,每个进程拥有自己的内存空间。

进程通常被看作是程序或应用的同义词。但是,用户看到的单个应用事实上可以包含一个协作进程集。为了方便进程通信,大多数操作系统都支持 Inter Process Communication (IPC) 资源,比如管道和套接字。IPC 不仅用作同系统进程通信,也能用于不同系统进程。

大多数 Java 虚拟机以单进程形式实现。Java 应用可通过 ProcessBuilder 对象创建附加进程。多进程应用不在本课程的讨论范围。

Threads

线程有时又叫 lightweight processes。进程和线程都提供了执行环境,但创建线程比进程需要更少的资源。

线程存在于进程中 —— 进程至少含一个线程。线程共享进程资源,包括内存和打开的文件。这提高了通信效率,也导致了潜在问题。

多线程执行是 Java 平台的核心功能。应用至少包含一个线程 —— 或几个,如果你把 system 线程(完成内存管理,信号处理等任务)算上的话。但从应用开发者角度看,你启动的只有一个线程,叫 main thread。它有能力创建更多线程,就像下面章节描述的那样。

Thread Objects

每个线程都是 Thread 类的实例。通过 Thread 对象有两种基本策略创建并发应用。

  • 直接控制线程的创建和管理,每次应用需要初始化异步任务时,简单地实例化一个 Thread
  • 将线程管理从应用的其他部分抽象出来,把应用任务传递给 executor

本章介绍 Thread 对象用法。Executor 的讨论放在 High Level Concurrency Objects

Defining and Starting a Thread

创建 Thread 实例的应用必须提供线程运行的代码。有两种实现方式:

  • Provide a Runnable ObjectRunnable 接口定义了单个方法,run,用来包含线程执行的代码。Runnalbe 对象需要传递给 Thread 构造器,就像 HelloRunnable 示例那样:
public class HelloRunnable implements Runnable {
    public static void main(String[] args) {
        new Thread(new HelloRunnable()).start();
    }

    @Override
    public void run() {
        System.out.println("Hello from a thread!");
    }
}
  • Subclass ThreadThread 类自身实现了 Runnale,但是它的 run 什么也不做。应用可以继承 Thread,提供自己的 run 实现,就像 HelloThread 示例这样:
public class HelloThread extends Thread {
    public static void main(String[] args) {
        new HelloThread().start();
    }

    @Override
    public void run() {
        System.out.println("Hello from a thread!");
    }
}

注意两例都调用了 Thread.start 来启动线程。

你该使用哪种方式呢?第一种更通用,因为它实现 Runnable 接口,之后可以继承其他类。第二种在简单应用中更方便,但它限制了任务类必须继承 Thread。本课程着重关注第一种方法,它分离了 Runnable 任务和执行任务的 Thread 对象。这种方式不仅更具弹性,也适用于后文介绍的高级线程管理 API。

Thread 类定义了许多方便线程管理的方法。其中有许多 static 方法,提供了获取调用方法线程的信息,也能影响线程状态。其它方法用于从其他线程调用,涉及线程和 Thread 对象的管理。下面我们将测试这些方法。

Pausing Execution with Sleep

Thread.sleep 会致使当前线程暂停执行指定时间。这是让应用的其他线程或系统上的其他应用获得处理器时间的有效方式。sleep 方法也可用于调步,等待其它有时间需求的线程,就像之后章节的 SimpleThreads 示例。

有两个重载 sleep 版本:一个指定毫秒睡眠时间,另一个指定纳秒。但是,这些时间的精度不被保证,因为它们受限于底层系统提供的设施。另外,睡眠周期也可被中断终止,就像下面章节那样。无论如何,你不能假定调用 sleep 就能让线程休眠准确的时间周期。

SleepMessages 示例使用 sleep 每 4 秒打印一条消息:

public class SleepMessages {
    public static void main(String[] args) throws InterruptedException {
        String importantInfo[] = {
                "Mares eat oats",
                "Does eat oats",
                "Little lambs eat ivy",
                "A kid will eat ivy too"
        };

        for (String info : importantInfo) {
            // Pause for 4 seconds
            Thread.sleep(4000);
            // Print a message
            System.out.println(info);
        }
    }
}

注意 main 声明抛出了 InterruptedException。当其他线程中断了当前正在 sleep 的线程时,sleep 方法会抛出该异常。因为应用没有定义其他导致中断的线程,所以没有必要捕捉它。

Interrupts

Interrupt 表明线程应该停止现有任务,去做其他事情。线程如何响应中断由开发者决定,但终止线程非常常见,也是下文强调的用法。

线程通过调用 Thread 对象的 interrupt 方法发送中断。为了中断机制工作正常,线程必须支持自己的中断。

Supporting Interruption

线程如何支持自身的中断?这取决于它现在在做什么。如果线程频繁调用抛出 InterruptedException 的方法,当捕捉到异常时,它会简单地从 run 方法返回。例如,假如 SleepMessages 示例中间的信息循环位于 Runnable 对象的 run 方法中,那它就可以这样修改支持中断:

for (String info : importantInfo) {
    // Pause for 4 seconds
    try {
        Thread.sleep(4000);
    } catch (InterruptedException e) {
        // We've been interrupted: no more messages.
        return;
    }
    // Print a message
    System.out.println(info);
}

sleep 一样,许多方法都会抛出 InterruptedException,用于收到中断时取消当前操作,立即返回。

如果线程要长时间运行但不调用抛出 InterruptedException 的方法该怎么办呢?那它必须周期性地调用 Thread.interrupted,如果收到中断,该方法会返回 true。例如:

for (String input: inputs) {
    heavyCrunch(input);
    if (Thread.interrupted()) {
        // We've been interrupted; no more crunching.
        return;
    }
}

上例很简单,代码不断检测,收到中断后退出线程。在更复杂的应用中,抛出 InterruptedException 更有意义:

if (Thread.interrupted()) {
    throw new InterruptedException();
}

这让中断处理代码集中于 catch 子句。

The Interrupt Status Flag

中断机制使用内部标志实现,它被称作 interrupt status。调用实例方法 interrupt 会设置此标志。调用静态方法 Thread.interrupted 检测中断时,中断标志被清空。实例方法 isInterrupted 用于查询其它线程的中断状态,但不会改变它。

通常,任何通过抛出 InterruptedException 退出的方法都会清空中断状态。但是,中断状态可以立即被其它调用 interrupt 的线程重新设置。

Joins

join 方法允许一个线程等待另一个线程执行完成。如果 t 是一个 Thread 对象,持有它的线程正在执行,

t.join()

会导致当然线程暂停执行直到 t 线程终止。它的重载方法允许开发者指定等待时间。但是,和 sleep 一样,join 也依赖系统时钟,所以你不应假定 join 会精确地等待你所设定的时间。

sleep 一样,响应中断时,join 也会抛出 InterruptedException 来退出执行。

The SimpleThreads Example

下面的例子综合使用了本章的部分概念。SimpleThreads 包含两个线程,第一个是每个 Java 应用都有的主线程。主线程从 Runnable 对象创建了一个新线程 MessageLoop,等待它执行完成。如果 MessageLoop 长时间不结束,主线程就中断它。

MessageLoop 线程打印一系列消息。如果在打印完所有信息之前被中断,MessageLoop 线程将打印一条消息随后退出。

public class SimpleThreads {
    // Display a message, preceded by
    // the name of the current thread
    static void threadMessage(String message) {
        String threadName = Thread.currentThread().getName();
        System.out.printf("%s: %s%n", threadName, message);
    }

    private static class MessageLoop implements Runnable {
        @Override
        public void run() {
            String importantInfo[] = {
                    "Mares eat oats",
                    "Does eat oats",
                    "Little lambs eat ivy",
                    "A kid will eat ivy too"
            };
            try {
                for (String m : importantInfo) {
                    // Pause for 4 seconds
                    Thread.sleep(4000);
                    // Print a message
                    threadMessage(m);
                }
            } catch (InterruptedException e) {
                threadMessage("I wasn't done!");
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // Delay, in milliseconds before
        // we interrupt MessageLoop
        // thread (default one hour).
        long patience = 1000 * 60 * 60;

        // If command line argument
        // present, gives patience
        // in seconds.
        if (args.length > 0) {
            try {
                patience = Long.parseLong(args[0]) * 1000;
            } catch (NumberFormatException e) {
                System.err.println("Argument must be an integer.");
                System.exit(1);
            }
        }

        threadMessage("Start MessageLoop thread");
        long startTime = System.currentTimeMillis();
        Thread t = new Thread(new MessageLoop());
        t.start();

        threadMessage("Waiting for MessageLoop thread to finish");
        // loop util MessageLoop
        // thread exits
        while (t.isAlive()) {
            threadMessage("Still waiting...");
            // Wait maximum of 1 second
            // for MessageLoop thread
            // to finish
            t.join(1000);
            if (System.currentTimeMillis() - startTime > patience
                    && t.isAlive()) {
                threadMessage("Tired fo waiting!");
                t.interrupt();
                // Shouldn't be long now
                // -- wait indefinitely
                t.join();
            }
        }
        threadMessage("Finally!");
    }
}

Synchronization

线程通信主要靠共享属性,以及属性指向的对象引用。这种通信形式很高效,但也导致了两种可能错误:thread interferencememory consistency errors。防止这些错误的工具是 synchronization

然而,同步可能导致 thread contention,它会发生在两个或更多线程试图同时访问同一资源时,进而减慢 Java 运行时执行一个或多个线程的速度,甚至暂停它们的执行。Starvation and Livelock 是线程竞争的两种形式。阅读 Liveness 了解更多信息。

本章讨论如下话题:

Thread Interference

考虑简单类 Counter

class Counter {
    private int c = 0;

    public void increment() {
        c++;
    }

    public void decrement() {
        c--;
    }

    public int value() {
        return c;
    }
}

Counter 被设计为,每次调用 incrementc + 1,每次调用 decrementc - 1。但是,如果一个 Counter 对象被多个线程引用时,线程干扰可能导致这些行为不按预期发生。

当两个操作在不同线程发生,它们操作同一数据,并且发送了干扰,这意味着这两个操作由多个步骤组成,这些步骤的顺序重叠了。

Counter 实例可能不会发生交织操作,因为对 c 的两个操作都是单个简单语句。然而,即便简单语句也会被 Java 虚拟机翻译成多个步骤。我们无需检测虚拟机执行哪些步骤 —— 但只要知道 c++ 这一简单表达式可以被分成以下三步就足够了:

  • 获取当前 c
  • 在获取值上加 1
  • 把结果储存回 c

表达式 c-- 也以相同方式分解,除了第二步是减一。

假定线程 A 调用了 increment,几乎与此同时,线程 B 调用了 decrement。如果 c 的初始值是 0,它们的交织操作可能像下面这样:

  1. 线程 A:获取 c。
  2. 线程 B:获取 c。
  3. 线程 A:在获取值上加 1,结果为 1。
  4. 线程 B:在获取值上减 1,结果为 -1。
  5. 线程 A:保存结果到 c,c 现在是 1。
  6. 线程 B:保存结果到 c,c 现在是 -1。

线程 A 的结果被线程 B 覆盖了。上面的交织操作仅是一种可能。在其它情况下,可能是 B 丢失了结果,又或者没有错误。由于它们无法预测,线程干扰 BUG 很难检测和修复。

Memory Consistency Errors

内存一致性错误 发生在不同线程对相同数据存在不一致视角。导致它的原因很复杂超出了本教程的讨论范围。幸运的是,开发者无需理解这些原因细节。只需知道避免它们的策略。

避免内存一致性错误的关键是理解 happens-before 关系。该关系简单地保证,被一个声明写入的内存对另一个声明是可见的。要理解它,看下面的例子。我们定义并初始化一个简单的 int

int counter = 0;

counter 被两个线程 A 和 B 共享。线程 A 让它自增

counter++;

马上,线程 B 打印 counter

System.out.println(counter);

如果上面两个声明在同一线程执行,那么应该可以安全的打印出 1。但是如果它们被单独的线程执行,打印出的值可能是 0,因为线程 A 对 counter 的更改对线程 B 的可见不被保证 —— 除非开发者在两个声明间建立了 happens-before 关系。

有许多建立这种关系的动作,接下来我们会看到,同步是其中之一。

实际上,我们已经见到了两种建立 happens-before 关系的动作。

  • 当一个声明调用 Thread.start,每个与该声明有 happens-before 关系的声明,也与被新线程执行的每条语句有这种关系。新线程创建前代码的影响对新线程可见。
  • 当一个线程终止,返回到另一线程中的 Thread.join,那么被终止线程中的所有声明,与成功 join 后的所有声明有 happens-before 关系。线程中代码的影响现在对执行 join 的线程可见。

所有能创建 happens-before 关系的动作,见 Summary page of the java.util.concurrent package.

Synchronized Methods

Java 编程语言提供了两种基本同步习语:synchronization methodssynchronization statements。后者更复杂,将在下面的章节介绍。本章讨论同步方法。

要让方法同步,只需简单地在其声明上加上 synchronized 关键字:

public class SynchronizedCounter {
    private int c = 0;

    public synchronized void increment() {
        c++;
    }

    public synchronized void decrement() {
        c--;
    }

    public synchronized int value() {
        return c;
    }
}

如果 countSynchronizedCounter 实例,则其同步方法有如下效果:

  • 首先,无法在同一对象上交织执行两个同步方法调用。当一个线程执行对象上的同步方法,所有调用同一对象上同步方法的其他线程都会阻塞(暂停执行),直到第一个线程执行完成。
  • 其次,当同步方法退出,它自动与同一对象上同步方法的 任何后续调用 建立 happens-before 关系。这保证了该对象状态的改变对其它线程可见。

注意构造方法无法被同步 —— 在它上面运用 synchronized 关键字是非法的。同步构造器没有意义,因为只有创建对象的线程能够访问正在创建的对象。

警告

创建将被线程共享的对象时,切勿过早 释放 该对象的引用。比如,你想维护包含每个类实例,名为 instancesList。你可能理算当然地在构造器中添加如下代码:

instances.add(this);

但是这样,其他线程便可在对象创建完成前通过 instances 访问它。

同步方法使用简单策略防止线程干扰和内存一致性错误:如果一个对象对多个线程可见,所有对该对象的读写只能通过 synchronized 方法完成。(一个重要例外:final 属性,它在对象创建后无法修改,可以安全地通过非同步方法读取。)该策略很有效,但会产生 Liveness 问题,我们将在后面介绍。

Intrinsic Locks and Synchronization

同步围绕名为 intrinsic lockmonitor lock 的内部实体构建。(API 规范通常简单地使用 monitor 代指它。)内部锁在同步的两个方面扮演角色:强制排它访问对象状态,以及建立 happens-before 关系,它对可见性至关重要。

每个对象都有与之相关的内部锁。通常,线程想要排它和持续访问对象属性前,必须 acquire 对象的内部锁,访问结束后再 release 它。获得锁后和释放锁前的时间内,称该线程 own 该内部锁。只要一个线程持有锁,其它线程就无法再持有它。当其它线程试图获得该锁时,它们会阻塞。

当线程释放内部锁,它就与后续获取相同锁的行为建立了 happens-before 关系。

Locks In Synchronized Methods

当线程调用同步方法,它自动获得方法所属对象的内部锁,在方法返回时,锁被释放。即使是未捕获异常导致的返回,锁仍然释放。

你可能好奇调用静态同步方法会发生什么,因为静态方法属于类,而非对象。此时,线程获得与类相关的 Class 对象的内部锁。因此,控制静态属性访问的锁与任何类实例的锁不同。

Synchronized Statements

另一种创建同步代码的方式是 synchronized statements。不像同步方法,同步声明必须指定提供内部锁的对象:

public void addName(String name) {
    synchronized (this) {
        lastName = name;
        nameCount++;
    }
    nameList.add(name);
}

本例中,addName 方法需要同步 lastNamenameCount 的改变,还必须避免同步调用其他对象的方法。(在同步代码中调用其他对象的方法会产生 Liveness 问题。)没有同步声明,则必须声明一个单独的非同步方法,仅仅为了调用 nameList.add

同步声明的细粒度同步也有利于提高并发数。例如,假定 MsLunch 有两个实例属性,c1c2,它们无法同时使用。对属性的更新必须被同步,但没有理由阻止 c1 和 c2 的更新交织进行 —— 创建不必要阻塞会减少并发。不使用同步方法,也不使用 this 锁,我们创建两个单独提供锁的对象。

public class MsLunch {
    private long c1 = 0;
    private long c2 = 0;
    private Object lock1 = new Object();
    private Object lock2 = new Object();

    public void inc1() {
        synchronized (lock1) {
            c1++;
        }
    }

    public void inc2() {
        synchronized (lock2) {
            c2++;
        }
    }
}

使用这种习语必须异常小心,你必须完全确定交织访问受影响属性是安全的。

Reentrant Synchronization

回想一下,线程无法获取被另一线程持有的锁。但线程 can 获得已经持有的锁。允许线程多次持有同一个锁就是 reentrant synchronization。它描述了这样的情形:同步代码直接或间接调用包含同步代码的方法,这些同步代码使用相同的内部锁。没有重入同步,同步代码将不得不采取许多附加防范措施,避免自己阻塞。

Atomic Access

编程语境下,atomic 指一组动作一次执行全部生效。原子动作无法被中途打断:它要么完全发生,要么什么也不做。原子动作完成后,其副作用才对外可见。

我们已经见到自增表达式,例如 c++ 并非原子动作。即使是非常简单的表达式,也可能定义成可由其它动作组成的复杂动作。但是,以下动作可以认为是原子的:

  • 读取和写入引用变量和大多数基本变量(除了 longdouble
  • 读取和写入声明为 volatile 的所有变量(包括 longdouble

原子动作不会被插入,所以使用它们无需担心线程干扰。然而,这并不能消除在原子动作上使用同步,因为内存一致性错误仍有可能发生。使用 volatile 变量可减小内存一致性错误风险,因为任何对 volatile 变量的写,与后续对相同变量的读,建立了 happens-before 关系。这意味着对 volatile 变量的改变总是可见于其它线程。更进一步,线程读取 volatile 变量时,它看到的不仅是最新改变,也会看到导致改变的副作用。

简单原子变量访问相比同步代码更高效,但开发者必须更加小心以防内存一致性错误。额外付出是否值得,取决于应用的大小和复杂性。

java.util.concurrent 包提供了一些包含原子方法,但不依赖同步的类。我们将在 High Level Concurrency Objects 讨论。

Liveness

并发应用及时执行的能力称为 liveness。本章描述最常见的活性问题,Deadlock,并简要介绍另外两种活性问题,Starvation and Livelock

Deadlock

Deadlock 指两个或更多线程互相等待,永远阻塞。下面是一个示例:

Alphonse 和 Gaston 是朋友,彼此都是礼貌的虔诚信徒。礼貌的严格规则是,向朋友鞠躬时,必须保持姿势直到朋友有机会向你鞠躬。不幸的是,这种规则没有考虑两人同时鞠躬的可能性。下面的示例,Deadlock,模仿了这种可能:

public class Deadlock {
    static class Friend {
        private final String name;

        public Friend(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }

        public synchronized void bowFrom(Friend friend) {
            System.out.printf("%s: %s has bowed back to me!%n",
                    name, friend.getName());
            friend.bowBack(this);
        }

        private synchronized void bowBack(Friend friend) {
            System.out.printf("%s: %s has bowed to me!%n",
                    name, friend.getName());
        }
    }

    public static void main(String[] args) {
        final Friend alphonse = new Friend("Alphonse");
        final Friend gaston = new Friend("Gaston");
        new Thread(() -> alphonse.bowFrom(gaston)).start();
        new Thread(() -> gaston.bowFrom(alphonse)).start();
    }
}

运行 Deadlock,两个线程都极有可能在尝试调用 bowBack 时阻塞。阻塞都不会结束,因为两个线程在互相等待另一个从 bowFrom 退出。

Starvation and Livelock

饥饿和活锁比死锁少见,但它们仍是每个并发软件设置者可能遇到的问题。

Starvation

Starvation 描述这样一种状况:线程无法获得共享资源的常规访问以至于不能继续。发生这种现象的原因是,"greedy" 线程长期占有共享资源。例如,假定对象提供了需要长时间执行的同步方法。如果一个线程频繁调用该方法,其它也需频繁调用该方法的线程便会阻塞。

Livelock

线程总是响应另一线程的动作。如果其它线程的动作也是对另一线程动作的响应,那将导致 livelock。和死锁一样,活锁线程也无法取得进一步进展。但是,线程没有阻塞 —— 它们只是太过忙于互相响应以至于无法继续工作。打比方长廊的两人试图让彼此通过:Alphonse 向左移让位 Gaston,同时 Gaston 向右移让位 Alphonse。由于它们仍然阻塞彼此,Alphonse 向右让位 Gaston,同时 Gaston 向左,它们仍然阻塞对方,所以。。。

Guarded Blocks

线程经常需要协作。最常用的协作习语是 guarded block。这种代码块以条件轮询开始,只有条件满足才会继续执行。要正确编写保护块,一些步骤必须遵守。

例如,假设存在共享变量 joy,只有它被另一线程设置,guardedJoy 方法才能继续执行。这一方法理论上可以简单循环直到条件满足,但循环是一种浪费,等待时它仍然执行。

public void guardedJoy() {
    // Simple loop guard. Wastes
    // processor time. Don't do this!
    while (!joy) {
    }
    System.out.println("Joy has been achieved!");
}

更高效的保护是调用 Object.wait 来暂停当前线程。wait 调用直到另一线程发出某些特殊事件已经发生的通知才会返回 —— 但不必是此线程等待的那个:

public synchronized void guardedJoy() {
    // This guard only loops once for each special event, which
    // may not be the event we're waiting for.
    while (!joy) {
        try {
            wait();
        } catch (InterruptedException ignored) {
        }
    }
    System.out.println("Joy and efficiency have been achieved!");
}

备注

总是在等待条件发生的循环中调用 wait。不要假定中断就是你等待的那个特定条件,也不要假定条件仍然是真的。

和许多暂停执行的方法一样,wait 也会抛出 InterruptedException。本例中,我们可以忽略那个异常 —— 我们关心的只是 joy 的值。

为何这一版使用了 synchronized?假设 d 是我们调用 wait 的对象。当一个线程调用 d.wait,它必须持有 d 的内部锁 —— 否则错误会抛出。在同步方法中调用 wait 是获得内部锁的简单方式。

wait 调用完成,线程释放锁,暂停执行。将来某时,另一个线程会获得同一个锁,之后调用 Object.notifyAll 通知所有等待锁的线程,某个重要事件已经发生:

public synchronized void notifyJoy() {
    joy = true;
    notifyAll();
}

第二个线程释放锁,第一个线程获得锁后会从 wait 调用点继续执行。

备注

还有一个通知方法,notify,它唤醒单个线程。因为 nofity 不允许指定哪个线程被唤醒,它只在大型并发应用中有用 —— 即包含大量线程执行相同任务的程序。在这样的应用中,你不必关心唤醒哪个线程。

让我们用保护块创建一个 Producer-Consumer 应用。这种应用在两个线程间共享数据:producer,生产数据;consumer,使用数据。两个线程通过共享对象通信。协作非常重要:在生产者交付数据前,消费者不能试图获取数据;同样,消费者没有获取旧数据前,生产者也不能尝试交付数据。

本例中,数据是消息序列,它们通过 Drop 对象共享:

public class Drop {
    // Message sent from producer
    // to consumer.
    private String message;

    // True if consumer should wait
    // for producer to send message,
    // false if producer should wait for
    // consumer to retrieve message.
    private boolean empty = true;

    public synchronized String take() {
        // Wait util message is available.
        while (empty) {
            try {
                wait();
            } catch (InterruptedException ignored) {
            }
        }
        // Toggle status.
        empty = true;
        // Notify producer that status has changed.
        notifyAll();
        return message;
    }

    public synchronized void put(String message) {
        // Wait util message has
        // been retrieved.
        while (!empty) {
            try {
                wait();
            } catch (InterruptedException ignored) {
            }
        }
        // Toggle status.
        empty = false;
        // Store message.
        this.message = message;
        // Notify consumer that status
        // has changed.
        notifyAll();
    }
}

定义在 Producer 的生产者线程,会发送一系列熟悉的消息。字符串 "DONE" 表明所有消息发送完毕。为了模拟真实世界应用不可预测的自然行为,生产者线程发送消息后会随机暂停一段间隔。

public class Producer implements Runnable {
    private final Drop drop;

    public Producer(Drop drop) {
        this.drop = drop;
    }

    @Override
    public void run() {
        String[] importantInfo = {
                "Mares eat oats",
                "Does eat oats",
                "Little lambs eat ivy",
                "A kid will eat ivy too"
        };
        Random random = new Random();
        for (String info : importantInfo) {
            drop.put(info);
            try {
                Thread.sleep(random.nextInt(5000));
            } catch (InterruptedException ignored) {
            }
        }
        drop.put("DONE");
    }
}

定义在 Consumer 的生产者,简单地获取并打印消息,直到它收到 "DONE" 字符串。该线程也会暂停随机间隔。

public class Consumer implements Runnable {
    private final Drop drop;

    public Consumer(Drop drop) {
        this.drop = drop;
    }

    @Override
    public void run() {
        Random random = new Random();
        String message;
        while (!"DONE".equals(message = drop.take())) {
            System.out.printf("MESSAGE RECEIVED: %s%n", message);
            try {
                Thread.sleep(random.nextInt(5000));
            } catch (InterruptedException ignored) {
            }
        }
    }
}

最后,主线程定义在 ProducerConsumerExample,它启动生产者和消费者线程。

public class ProducerConsumerExample {
    public static void main(String[] args) {
        Drop drop = new Drop();
        new Thread(new Producer(drop)).start();
        new Thread(new Consumer(drop)).start();
    }
}

备注

编写 Drop 类是为了阐释保护块。检查 Java Collections Framework 中的已有数据结构避免重复造轮子。更多信息,参考 Questions and Exercises

Immutable Objects

创建后无法更改状态的对象称 immutable object。作为一种创建简单可信赖代码的健全策略,不可变对象因巨大弹性被广为接受。

不可变对象在并发应用尤为有用。因为无法更改状态,它们不会被线程干扰破坏,也不会出现不一致状态。

开发者经常不愿使用不可变对象,因为相比于更新已有对象,他们担心创建新对象的开销。对象创建的影响经常被高估,并且可以被许多不可变对象相关的效率抵消。这包括垃圾回收消耗的减少,也减少了保护可变对象不被破坏的代码。

下面的小节介绍一个实例是不可变对象的类,由它派生包含此类不可变实例的类。在此过程中,我们给出这种转换的一般规则,并阐述不可变对象的许多优点。

A Synchronized Class Example

SynchronizedRGB 是表示颜色的类。该类包含四个属性,三个表示主颜色的整数和一个表示名称的字符串。

public class SynchronizedRGB {
    // Values must be between 0 and 255.
    private int red;
    private int green;
    private int blue;
    private String name;

    private void check(int... primaryColors) {
        for (int c : primaryColors) {
            if (c < 0 || c > 255) {
                throw new IllegalArgumentException();
            }
        }
    }

    public SynchronizedRGB(int red, int green, int blue, String name) {
        check(red, green, blue);
        this.red = red;
        this.green = green;
        this.blue = blue;
        this.name = name;
    }

    public void set(int red, int green, int blue, String name) {
        check(red, green, blue);
        synchronized (this) {
            this.red = red;
            this.green = green;
            this.blue = blue;
            this.name = name;
        }
    }

    public synchronized int getRGB() {
        return (red << 16) | (green << 8) | blue;
    }

    public synchronized String getName() {
        return name;
    }

    public synchronized void invert() {
        red = 255 - red;
        green = 255 - green;
        blue = 255 - blue;
        name = "Inverse of " + name;
    }
}

SynchronizedRGB 必须谨慎使用以防不一致状态。例如,假如一个线程执行了如下代码:

SynchronizedRGB color =
        new SynchronizedRGB(0, 0, 0, "Pitch Black");
int myColorInt = color.getRGB();      // Statement 1
String myColorName = color.getName(); // Statement 2

如果另一个线程在 Statement 1 和 2 之间调用了 color.setmyColorIntmyColorName 的值就不匹配了。要避免这一结果,那两行声明必须绑定在一起:

synchronized (color) {
    int myColorInt = color.getRGB();
    String myColorName = color.getName();
}

这种不一致仅会发生在可变对象上 —— 不可变版本的 SynchronizedRGB 没有这一问题。

A Strategy for Defining Immutable Objects

以下规则定义了创建不可变对象的简单策略。并非所有不可变类都遵循以下规则。这并不意味着创建这些类的粗心的 —— 它们有足够理由相信它们的实例在创建后不会改变。但是,这些策略需要精密分析,不太适合初学者。

  1. 不要提供 "setter" 方法 —— 改变属性或属性所指对象的方法。
  2. 让所有属性 finalprivate
  3. 不允许子类重写方法。最简单的方式是把类声明为 final。更高级的途径是使构造器 private,使用工厂方法创建实例。
  4. 如果实例属性包含可变对象引用,不允许这些对象被改变:

    • 不要提供修改可变对象的方法。
    • 不要分享可变对象的引用。永远不要存储构造器传递的外部可变对象引用;如有必要,创建拷贝,存储拷贝的引用。简单地,尽可能创建内部可变对象拷贝,不要在方法里返回原始对象。

将此策略运用到 SynchronizedRGB 的步骤如下:

  1. 类中有两个 setter 方法。第一个,set,任意改变对象,在不可变版本中不能存在。第二个,invert,改为创建新对象而不是修改已有对象。
  2. 所有属性已经是 private;它们还必须加上 final
  3. 把类自身声明为 final
  4. 只有一个指向对象的属性,该对象自身是不可变的,因此不需要改变。

应用这些步骤后,我们得到了 ImmutableRGB

public final class ImmutableRGB {
    // Values must be between 0 and 255.
    final private int red;
    final private int green;
    final private int blue;
    final private String name;

    private void check(int... colors) {
        for (int c : colors) {
            if (c < 0 || c > 255) {
                throw new IllegalArgumentException();
            }
        }
    }

    public ImmutableRGB(int red,
                        int green,
                        int blue,
                        String name) {
        check(red, green, blue);
        this.red = red;
        this.green = green;
        this.blue = blue;
        this.name = name;
    }

    public int getRGB() {
        return (red << 16) | (green << 8) | blue;
    }

    public String getName() {
        return name;
    }

    public ImmutableRGB invert() {
        return new ImmutableRGB(255 - red,
                255 - green,
                255 - blue,
                "Inverse of " + name);
    }
}

High Level Concurrency Objects

迄今为止,我们介绍的都是从 Java 平台开始就存在的低级 API。这些 API 适用于基本任务,但高级任务需要更高等级的构建块。尤其是大型并发应用,它们可以充分利用今天的多处理器和多核系统。

本章,我们将学习一些 JDK 5.0 引入的高级并发特性。它们中的大多数都实现在 java.util.concurrent 包。此外集合框架也有一些新的并发数据结构。

  • Lock Objects 支持锁定习语,用于简化并发应用。
  • Executors 定义了一个启动和管理线程的高级 API。java.util.concurrent 下的实现提供了适用于大规模应用的线程池管理。
  • Concurrent Collections 简化了大集合数据管理,并大大减少了同步需要。
  • Atomic Variables 拥有减少同步,避免内存一致性错误的功能。
  • Concurrent Random Numbers (JDK 7)提供了多线程高效生成伪随机数的方法。

Lock Objects

同步代码依赖简单的重入锁。这种锁很容易使用,但有许多局限性。java.util.concurrent.locks 包提供了更高级的锁定习语。我们不会详细检查这个包,但会关注它最基本的接口,Lock

Lock 对象的行为非常像同步代码使用的隐式锁。和隐式锁一样,同一时刻只有一个线程可能持有 Lock 对象。Lock 对象也支持 wait/nofify 机制,通过关联的 Condition 对象。

与隐式锁相比,Lock 对象的最大优势在于它能从获得锁的尝试中返回。如果当时或超时前(可指定)没有获得锁,tryLock 方法会返回。获得锁之前,如果另一线程发送了中断,lockInterruptibly 方法会返回。

让我们使用 Lock 对象解决 Liveness 见到的死锁问题。Alphonse 和 Gaston 学会了注意朋友将要鞠躬。我们这样改进模型,让 Friend 对象鞠躬前获得 both 参与者的锁。下面是改进模型的源代码,Safelock。为了充分展示这一习语的丰富功能,我们假定 Alphonse 和 Gaston 太过执迷于他们新发掘的安全鞠躬能力,以至于无法停止地向对方鞠躬:

public class Safelock {
    static class Friend {
        private final String name;
        private final Lock lock = new ReentrantLock();

        public Friend(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }

        public boolean impendingBow(Friend bower) {
            boolean myLock = false;
            boolean yourLock = false;

            try {
                myLock = lock.tryLock();
                yourLock = bower.lock.tryLock();
            } finally {
                if (!(myLock && yourLock)) {
                    if (myLock) {
                        lock.unlock();
                    }
                    if (yourLock) {
                        lock.unlock();
                    }
                }
            }

            return myLock && yourLock;
        }

        public void bowFrom(Friend bower) {
            if (impendingBow(bower)) {
                try {
                    System.out.printf("%s: %s has bowed to me!%n",
                            name, bower.getName());
                    bower.bowBack(this);
                } finally {
                    lock.unlock();
                    bower.lock.unlock();
                }
            } else {
                System.out.printf("""
                                %s: %s started to bow to me, but saw \
                                that I was already bowing to him.%n""",
                        name, bower.getName());
            }
        }

        private void bowBack(Friend bower) {
            System.out.printf("%s: %s has bowed back to me!%n",
                    name, bower.getName());
        }
    }

    static class BowLoop implements Runnable {
        private final Friend bower;
        private final Friend bowee;

        public BowLoop(Friend bower, Friend bowee) {
            this.bower = bower;
            this.bowee = bowee;
        }

        @Override
        public void run() {
            Random random = new Random();
            while (true) {
                try {
                    Thread.sleep(random.nextInt(10));
                } catch (InterruptedException ignored) {
                }
                bowee.bowFrom(bower);
            }
        }
    }

    public static void main(String[] args) {
        final Friend alphonse = new Friend("Alphonse");
        final Friend gaston = new Friend("Gaston");
        new Thread(new BowLoop(alphonse, gaston)).start();
        new Thread(new BowLoop(gaston, alphonse)).start();
    }
}

Executors

前面的所有示例,被线程完成的任务和线程本身存在紧密联系,前者定义在 Runnable 中,后者定义在 Thread 中。这对于小应用没问题,但大规模应用很有必要分离应用的其它部分和线程管理与创建。

囊括这些功能的对象叫 executors。下面的小节详细介绍执行器。

Executor Interfaces

java.util.concurrent 包定义了三个执行器接口:

  • Executor,支持启动新任务的简单接口。
  • ExecutorServiceExecutor 的子接口,为单个任务和执行器自身增加了生命周期管理。
  • ScheduledExecutorServiceExecutorService 的子接口,支持将来,和/或,周期任务执行。
The Executor Interface

Executor 接口提供了单个方法 execute,旨在替代普通线程创建习语。如果 rRunnbale 对象,eExecutor 对象,你可以替换

new Thread(r).start();

e.execute(r);

但是,execute 的定义更模糊。低级习语创建新线程并立即启动。取决于 Executor 实现,execute 可能也会那样做,但更可能使用已有工作线程执行 r,或者把 r 放入队列等待工作线程。(我们将在 Thread Pools 章节描述工作线程。)

java.util.concurrent 下的执行器实现充分利用了 ExecutorServiceScheduledExecutorServic 接口,但它们也使用了 Executor 父接口。

The ExecutorService Interface

ExecutorService 接口为 execute 补充了一个类似但功能更丰富的 submit 方法。和 execute 一样,submit 接收 Runnable,但也接收 Callable 对象,它允许任务包含返回值。submit 方法返回 Future 对象,用于获取 Callable 返回值,也用于管理 CallableRunnbale 任务。

ExecutorService 提供了提交 Callable 集合的方法。最后,它还包含许多管理执行器终止的方法。要支持立即终止,任务必须正确处理 Interrupts

The ScheduledExecutorService Interface

ScheduledExecutorService 在父接口 ExecutorService 上增加了 schedule 方法,它用于在指定延迟后执行 CallableRunnbale 任务。此外,它还提供了 scheduleAtFixedRatescheduleWithFixedDelay 用于以固定间隔重复执行指定任务。

Thread Pools

大多数 java.util.concurrent 下的执行器实现都使用了 thread pools,它由 worker threads 组成。这种线程独立于它执行的 CallableRunnbale 任务,并且常用于多任务执行。

使用工作线程减少了线程创建开销。线程对象占据较大内存,在大规模应用中,大量线程对象的分配和释放会产生相当大的内存管理开销。

fixed thread pool 是一种常见线程池种类。它总是存在固定数量的运行线程;如果一个线程在使用中无故终止,新线程会自动替代它。任务通过内部队列提交到线程池,如果任务超过线程数,多余部分会进入等待队列。

固定线程池的一大优点是,使用它的应用可以优雅降级(degrade gracefully)。要理解这点,考虑一个 Web 服务器,其中的每个 HTTP 请求被单独线程处理。如果应用简单地为每个请求创建新线程,系统收到的请求多余它能立即处理的数量,当所有线程开销超过系统容量时,应用就会突然停止所有响应。由于应用能够创建的线程数有限,它的服务速度不是 HTTP 请求的接收速度,而是系统可以支撑的速度。

使用固定线程池创建执行器的简单方式是调用 java.util.concurrent.ExecutorsnewFixedThreadPool 工厂方法。该类还提供了以下工厂方法:

  • newCachedThreadPool 方法创建一个含可扩展线程池的执行器。它适用于启动许多短期任务的应用。
  • newSingleThreadExecutor 方法创建每次执行单个任务的执行器。
  • 还有创建上述执行器 ScheduledExecutorService 版本的工厂方法。

如果以上工厂方法都无法满足你的需求,实例化一个 java.util.concurrent.ThreadPoolExecutorjava.util.concurrent.ScheduledThreadPoolExecutor,它们提供了额外选项。

Fork/Join

fork/join 框架实现了 ExecutorService 接口,便于你使用多处理器。它适用于那些可被递归拆分的任务。它的目标是使用所有处理能力增加应用性能。

和任何 ExecutorService 实现一样,fork/join 框架也将任务分发给线程池中的工作线程。它的独特性在于使用了 work-stealing 算法。空闲的工作线程可以从其它繁忙线程处偷取任务。

该框架的核心是 ForkJoinPool 类,它是 AbstractExecutorService 类的扩展。ForkJoinPool 实现了核心工作窃取算法,可以执行 ForkJoinTask 进程。

Base Use

使用该框架的第一步是编写一段执行工作的代码。它应该类似于下面的伪代码:

if (my portion of the work is small enough) {
    do the work directly
} else {
    split my work into two pieces
    invoke the two pieces and wait for the results
}

使用 ForkJoinTask 的子类包裹代码,通常使用更具体的类型,要么是 RecursiveTask(含返回值),要么是 RecursiveAction

一旦子类声明好,创建代表所有要完成工作的对象并把它传给 ForkJoinPool 实例的 invoke() 方法。

Blurring for Clarity

为了帮助你理解该框架如何工作,考虑下面的示例。假定你想模糊一张图片。原始 source 图片使用整型数组表示,每个整数包含单像素颜色值。模糊后的 destination 图片使用大小相同的整形数组。

模糊化动作每次处理源数组的一个像素。每个像素是周围像素的平均(红,绿和蓝成分的平均),结果放到目标数组。由于图像数组很大,处理它要花费很长时间。你可以使用 fork/join 框架实现算法利用多处理系统的并发处理能力。下面是一种可能实现:

import javax.imageio.ImageIO;
import java.awt.image.BufferedImage;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public class ForkBlur extends RecursiveAction {
    private final int[] mSource;
    private final int mStart;
    private final int mLength;
    private final int[] mDestination;

    // Processing window size; should be odd.
    private final int mBlurWidth;

    public ForkBlur(int[] mSource, int mStart, int mLength, int[] mDestination) {
        this.mSource = mSource;
        this.mStart = mStart;
        this.mLength = mLength;
        this.mDestination = mDestination;
        mBlurWidth = 15;
    }

    protected void computeDirectly() {
        int sidePixels = (mBlurWidth - 1) / 2;
        for (int index = mStart; index < mStart + mLength; index++) {
            // Calculate average;
            float rt = 0, gt = 0, bt = 0;
            for (int mi = -sidePixels; mi <= sidePixels; mi++) {
                int mindex = Math.min(Math.max(mi + index, 0),
                        mSource.length - 1);
                int pixel = mSource[mindex];
                rt += (float) ((pixel & 0x00ff0000) >> 16) / mBlurWidth;
                gt += (float) ((pixel & 0x0000ff00) >> 8) / mBlurWidth;
                bt += (float) (pixel & 0x000000ff) / mBlurWidth;
            }

            // Reassemble destination pixel.
            int dpixel = 0xff000000
                    | (int) rt << 16
                    | (int) gt << 8
                    | (int) bt;
            mDestination[index] = dpixel;
        }
    }

    ...
}

现在实现抽象方法 compute(),它要么直接模糊,要么把任务分成两部分。简单的数组长度临界值决定工作直接完成还是继续划分。

protected static int sThreshold = 10000;

@Override
protected void compute() {
    if (mLength < sThreshold) {
        computeDirectly();
        return;
    }

    int split = mLength / 2;
    invokeAll(new ForkBlur(mSource,
                    mStart,
                    split,
                    mDestination),
            new ForkBlur(mSource,
                    mStart + split,
                    mLength - split,
                    mDestination));
}

如果上述方法位于 RecursiveAction 子类,那么可以直接设定运行于 ForkJoinPool 中的任务,步骤如下:

  1. 创建一个代表所有待完成工作的任务。

    // source image pixels are in src
    // destination image pixels are in dst
    ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
  2. 创建 ForkJoinPool 用于运行任务。

    ForkJoinPool pool = new ForkJoinPool();
  3. 运行任务。

    pool.invoke(fb);

包含创建目标图片文件的完整代码见 ForkBlur

Standard Implementations

除了使用 fork/join 框架实现自定义算法利用多处理器系统并发执行任务(比如前面的 ForkBlur.java 示例),Java SE 中还有很多使用 fork/join 实现的通用功能。其中之一便是 Java SE 8 引入的,被 java.util.Arrays 类使用的 parallelSort() 方法。这些方法类似于 sort(),但通过 fork/join 框架实现了并发。在多处理器系统上,并行排序大数组比串行更快。想了解这些信息,阅读 Java API 文档。

fork/join 框架的另一个实现被 java.util.streams 包下方法使用,它们是 Project Lambda 的一部分,已于 Java SE 8 发布。更多信息见 Lambda Expressions

Concurrent Collections

java.util.concurrent 下包含很多附加容器框架。它们最容易通过提供的容器接口分类:

  • BlockingQueue 定义了一个先进先出数据结构,队列满时向其中添加数据会阻塞,队列空时取数据会超时。
  • ConcurrentMapjava.util.Map 的子接口,定义了原子操作。只有键存在才能删除键值对,只有键不存在才能添加键值对。原子化这些操作有利于避免同步。它的通用标准实现是 ConcurrentHashMap,是 HashMap 的并发版本。
  • ConcurrentNavigableMapConcurrentMap 的子接口,支持近似匹配。它的通用标准实现是 ConcurrentSkipListMap,是 TreeMap 的并发版本。

所有这些容器在添加对象到容器和后续访问、删除对象间建立了 happens-before 关系,从而有利于避免 Memory Consistency Errors

Atomic Variables

java.util.concurrent.atomic 包下的类支持单个变量上的原子操作。所有类都有 getset 方法,它们的行为就像读写 volatile 变量。即,set 与任何后续相同变量的 gethappens-before 关系。原子方法 compareAndSet 也包含这些内存一致性功能,适用于整型原子变量的简单原子算术方法也是如此。

要理解这些包如何使用,让我们回到之前用于阐述线程干扰的 Counter 类:

class Counter {
    private int c = 0;

    public void increment() {
        c++;
    }

    public void decrement() {
        c--;
    }

    public int value() {
        return c;
    }
}

使 Counter 免于线程干扰的方式之一是使用同步方法,像 SynchronizedCounter 类那样:

class SynchronizedCounter {
    private int c = 0;

    public synchronized void increment() {
        c++;
    }

    public synchronized void decrement() {
        c--;
    }

    public synchronized int value() {
        return c;
    }
}

对于这一简单类,同步是可接受的方法。但对于更复杂的类,我们可能想要避免不必要同步带来的活性影响。替换 int 属性为 AtomicInteger 允许我们防止线程干扰而无需同步,就像 AtomicCounter 这样:

public class AtomicCounter {
    private AtomicInteger c = new AtomicInteger(0);

    public void increment() {
        c.incrementAndGet();
    }

    public void decrement() {
        c.decrementAndGet();
    }

    public int value() {
        return c.get();
    }
}

Concurrent Ramdom Numbers

在 JDK 7 中,java.util.concurrent 包含一个方便的类 ThreadLocalRandom,它适用于想要在多线程或 ForkJoinTasks 中使用随机数的应用。

对于并发访问,相比 Math.random(),使用 ThreadLocalRandom 能够减少竞争,最终获得更高性能。

你要做的只是调用 ThreadLocalRandom.current(),随后调用它的随机数方法。下面是一个示例:

int r = ThreadLocalRandom.current().nextInt(4, 77);

For Further Reading

  • Concurrent Programming in Java: Design Principles and Pattern (2nd Edition) 作者 Doug Lea。资深专家也是 Java 平台并发框架架构师编写的综合著作。
  • Java Concurrency in Practice 作者 Brian Goetz, Tim Peierls, Joshua Bloch, Joseph Bowbeer, David Holmes, 和 Doug Lea。面向初学者的实战指导。
  • Effective Java Programming Language Guide (2nd Edition) 作者 Joshua Bloch。虽然这是一本通用编程指南,但它的线程章节包含许多并发编程的 “最佳实践”。
  • Concurrency: State Models & Java Programs (2nd Edition) 作者 Jeff Magee 和 Jeff Kramer。建模和实践案例相结合的并发编程指导。
  • Java Concurrent Animated。通过动画展示并发功能的使用。

Questions and Exercises

Questions

  1. 是否能够传递 Thread 对象给 Executor.execute?这样做有意义吗?

Thread 实现了 Runnable 接口,所以可以传递 Thread 对象给 Executor.execute。但是使用 Thread 没有意义。如果对象直接实例化 Thread,它的 run 方法什么也不做。你可以定义一个 Thread 子类重写 run 方法 —— 但是该类将实现执行器不会使用的功能。

Exercises

  1. 编译并运行 BadThreads.java
public class BadThreads {
    static String message;

    private static class CorrectorThread extends Thread {
        @Override
        public void run() {
            try {
                sleep(1000);
            } catch (InterruptedException ignored) {
            }
            // Key statement 1:
            message = "Mares do eat oats.";
        }
    }

    public static void main(String[] args)
            throws InterruptedException {

        new CorrectorThread().start();
        message = "Mares do not eat oats.";
        Thread.sleep(2000);
        // Key statement 2:
        System.out.println(message);
    }
}

应用将打印 "Mares do eat oats." 它被保证总是这样吗?如果不是,原因是什么?改变两次 sleep 调用的参数有用吗?你如何保证 message 的所有改变对主线程可见?

程序几乎总会打印 "Mares do eat oats." 但该结果不被保证,因为 "Key statement 1" 和 "Key statment 2" 没有 happens-before 关系。即使 "Key statement 1" 在 "Key statment 2" 之前执行,它们也没有这种关系 —— 记住,happens-before 关系与可见性有关,与顺序无关。

有两种保证 message 改变对主线程可见的方法:

  • 在主线程中持有 CorrectorThread 实例引用。在引用 message 前调用实例的 join 方法。
  • message 封装到包含同步方法的对象中。不要使用同步方法之外的方式引用 message

它们都能建立必要的 happens-before 关系使 message 可见。

第三种技术是简单地把 message 声明为 volatile。这保证了任何 message 写入(就像 "Key statement 1")与后续读操作(就像 "Key statement 2")有 happens-before 关系。但它不能保证 "Key statement 1" 就可以在 "Key statement 2" 前发生。1、2 可能顺序发生,但因为调度以及 sleep 粒度的不确定,它们无法被保证。

改变两个 sleep 的参数也没有帮助,因为它与 happens-before 毫不相干。

2. 修改 Guarded Blocks 中的生产者-消费者示例,使用标准库代替 Drop 类。

java.util.concurrent.BlockingQueue 接口定义了 get 方法,它在队列空时阻塞。它还定义了 put 方法,它在队列满时阻塞。这些操作实际上与 Drop 定义的一样 —— 除了 Drop 不是队列!但是,有另外一种看待 Drop 的方式:它是容量为 0 的队列。由于队列中没有空间容纳任何元素,每个 get 都会阻塞直到相应的 take 发生,每个 take 也会阻塞等待相应的 get。存在一个准确具有这种行为 BlockingQueue 实现:java.util.concurrent.SynchronousQueue

BlockingQueue 几乎是 Drop 直接替代。但 putget 抛出 InterruptedExceptionProducer 需要修改。已有的 try 必须往上移动一个等级:

public class Producer implements Runnable {
    private BlockingQueue<String> drop;

    public Producer(BlockingQueue<String> drop) {
        this.drop = drop;
    }

    @Override
    public void run() {
        String importantInfo[] = {
                "Mares eat oats",
                "Does eat oats",
                "Little lambs eat ivy",
                "A kid will eat ivy too"
        };
        Random random = new Random();

        try {
            for (String info : importantInfo) {
                drop.put(info);
                Thread.sleep(random.nextInt(5000));
            }
            drop.put("DONE");
        } catch (InterruptedException ignored) {
        }
    }
}

Consumer 也需类似修改:

public class Consumer implements Runnable {
    private BlockingQueue<String> drop;

    public Consumer(BlockingQueue<String> drop) {
        this.drop = drop;
    }

    @Override
    public void run() {
        Random random = new Random();
        String message;
        try {
            while (!"DONE".equals(message = drop.take())) {
                System.out.printf("MESSAGE RECEIVED: %s%n", message);
                Thread.sleep(random.nextInt(5000));
            }
        } catch (InterruptedException ignored) {
        }
    }
}

对于 ProducerConsumerExample类,我们只需更改 drop 对象声明:

public class ProducerConsumerExample {
    public static void main(String[] args) {
        BlockingQueue<String> drop =
                new SynchronousQueue<>();
        new Thread(new Producer(drop)).start();
        new Thread(new Consumer(drop)).start();
    }
}

Check your answers

本文译自 Concurrency,译者 LOGI。

TG 大佬群 QQ 大佬群

返回文章列表 文章二维码
本页链接的二维码
打赏二维码