MENU

文章目录

Java Concurrency Basics

2021 年 06 月 24 日 • 阅读: 5816 • 英语

Java Concurrency

1. Introduction

Java Concurrency 是一个涵盖 Java 平台多线程,并发和并行的术语,包括并发工具,并发问题和并发解决方案。本文将介绍多线程的核心概念,并发结构,并发问题,以及多线程的代价和优点。

What is Multithreading?

多线程即同一应用内部执行多个线程。假如单线程代表只有一个 CPU 执行你的应用,那么多线程应用中,则同时存在多个 CPU 执行不同部分的代码。

what-is-multithreading

然而,单线程并不等同于单个 CPU。通常,一个 CPU 把执行时间分享给多个线程,每隔一段时间,CPU 会切换一个线程。当然,让不同线程被不同 CPU 执行也是可能的。

relationship-between-threads-and-cpus

Why Multithreading?

在应用中使用多线程有许多原因,最常见的是以下几个:

  • 更好地利用单个 CPU
  • 更好地利用多个 CPU 或多核
  • 更快的用户响应
  • 更公平的用户响应

Better Utilization of a Single CPU

最常见的原因是可以更好利用计算机中的资源。例如,如果一个线程正在等待网络请求响应,另一个线程可以同时使用 CPU 做其它事情。此外,如果计算机有多个 CPU,或者 CPU 拥有多个运算核心,那么多线程也可以帮助应用利用这些额外的核心。

Better Utilization of Multiple CPUs or CPU Cores

如果一台计算机包含多个 CPU 或 CPU 含有多个执行核心,那么你需要通过多线程让你的应用利用所有 CPU 或 CPU 核心。单线程最多可以利用单个 CPU,并且就像我上面提到的,有时还不能完全利用它。

Better User Experience with Regards to Responsiveness

使用多线程的另一个原因是提供更好的用户体验。例如,你点击了界面上的一个按钮,它将触发一个网络请求,那么请求由哪个线程发出是很重要的。如果你仍然使用更新 GUI 的线程,那么在等待请求响应时,用户可能无法操作界面。相反,这一请求可以被后台线程执行,那么界面线程就可以同时响应其它用户操作。

Better User Experience with Regards to Fairness

第四个原因是在各个用户间更公平地共享计算机资源。就像例子想象的那样,一个接收客户端请求的服务器,如果仅有一个线程执行这些请求,当某个客户端发送了一个长耗时请求时,其它客户端将不得不等待那个请求结束。如果每个客户端请求被它们自己的线程执行,就没有任何一个任务可以完全独占CPU。

Multithreading vs. Multitasking

以前,那些只有一个 CPU 的计算机只能同时运行一个程序。大多数小型机没有足够的性能同时执行多个程序,所以它们没有那样做。公平的说,大型系统具备同时执行多个程序的能力要比个人电脑早许多年。

Multitasking

随后出现了多任务,即计算机可以同时执行多个程序(任务或进程),但这并非真正的同时,而是单个 CPU 被多个程序共享,操作系统会切换不同的程序执行,每个程序运行一小段时间。

多任务也为开发者带来了新挑战。程序再也不能假定拥有所有 CPU 时间,同样还有内存或其它计算资源。程序中的 “良好公民” 应该释放所有不再使用的资源,这样其它程序才能获得它们。

Multithreading

最后出现的是多线程,它意味着你可以在同一应用内部执行多个线程。单线程执行可以想象成只有一个 CPU 执行这个程序。当你拥有多线程时,就像同一程序同时被不同 CPU 执行。

Multithreading is Hard

多线程是增强某类程序性能的绝佳手段。然而,实现多线程比多任务更具挑战性。同一程序中的多个线程会同时读写相同的内存单元。这会导致单线程应用中不会出现的错误。某些错误可能不会出现在单 CPU 机器,因为这种情况下,两个线程永远不能真正同时运行。然而,现代计算机的 CPU 都拥有多核心,有些计算机还拥有多个 CPU,这意味着不同线程可以被不同核心或 CPU 同时执行。

multithreading-on-multi-cpus

如果一个线程正在读取另一个线程将要写入的内存地址,那么它最终读到的将是哪个值?旧的值?第二个线程写入后的值?还是一个介于两者之间的值?又或者,两个线程同时向同一内存地址写入数据,当它们完成时,该内存位置的值哪个呢?第一个线程写入的?第二个线程写入的?还是两个值的混合呢?

如果没有恰当的处理,以上结果都有可能出现。它们的行为甚至是不可预测的,结果每次都可能不同。因此了解和采取恰当措施对开发者来说是十分重要的——即学习控制线程如何访问共享资源,如内存,文件,数据库等。这就是本文要讨论的话题之一。

Multithreading and Concurrency in Java

Java 是第一批让开发者能方便地使用多线程的语言之一,它从一开始就支持多线程。因此,Java 开发者总是面临上面描述的问题。这就是我写这篇教程的原因,作为我和可能从本文获得帮助的 Java 开发者的笔记。

这篇教程主要讨论 Java 中的多线程,但多线程中的一些问题和多任务及分布式系统中的许多问题是类似的。本文也会提到多任务和分布式系统,所以使用了 "concurrency" 而不是 "multithreading"。

Concurrency Models

第一个 Java 并发模型假定同一应用中执行的多线程也会共享对象。这类并发模型通常被叫做 共享状态并发模型 (shared state concurrency model)。许多并发语言结构和工具都支持这种模型。

但是,自从第一本 Java 并发书籍编写,特别是 Java 5 并发工具发布以来,并发架构和设计已经发生了很多改变。

共享状态并发模型导致的许多问题不能得到优雅解决。因此,一种被称为 非共享 (shared nothing)状态分离 (separate state) 的并发模型流行起来。在这一模型中,线程不共享任何对象和状态,这避免了许多共享模型存在的访问问题。

现在出现了新的异步状态分离 (separate state) 平台和工具,如 Netty, Vert.x and Play / Akka and Qbit;新的非阻塞并发算法和工具,如 LMax Disrupter;Java 7 引入的 Fork and Join 框架,支持函数式并行;还有 Java 8 引入的集合流式 API。

在如此多进展的情况下,是时候更新这篇 Java 并发教程了。因此,这篇教程又一次 work in progress

2. Multithreading Benefits

Better CPU Utilization

想象一个应用从本地文件系统读取和处理文件。假定从磁盘读取文件需要 5s,处理需要 2s,那么处理 2 个文件需要花费:

  5 seconds reading file A
  2 seconds processing file A
  5 seconds reading file B
  2 seconds processing file B
-----------------------
 14 seconds total

读取文件时,大多数 CPU 时间都用来等待磁盘。在此期间,CPU 非常空闲,它本可以做其他事情。通过改变操作顺序,CPU 就能被更好利用。看下面的顺序:

  5 seconds reading file A
  5 seconds reading file B + 2 seconds processing file A
  2 seconds processing file B
-----------------------
 12 seconds total

CPU 首先等待第一个文件被读取。随后它开始读取第二个文件。在计算机的 IO 组件读取第二个文件时,CPU 同时处理第一个文件。记住,等待文件从磁盘读取时,CPU 很大可能是空闲的。

换句话说,等待 IO 时,CPU 可以做其它事情。这不仅局限于磁盘 IO,也可以是网络 IO 或用户输入。网络和磁盘 IO 总是比 CPU 和内存 IO 慢很多。

Simpler Program Design

如果你在单线程应用中处理上述读取和处理顺序,那么你必须跟踪每个文件的读取和处理状态。取而代之,你可以启动两个线程,让它们分别读取和处理单个文件。每个线程在等待文件读取时会阻塞。一个线程等待时,其它线程可以使用 CPU 处理它们已经取得的文件部分。结果是,磁盘总是处于繁忙状态,将多个文件读取到内存。这不仅提高了磁盘和 CPU 利用率,实现起来也更简单,因为每个线程只需要跟踪单个文件。

More Responsive Programs

重构单线程为多线程应用的另一个常见目标是获得及时响应。想象一个监听多个端口请求的服务程序。当请求到达时,它处理请求随后继续监听。服务循环的骨架就像下面这样:

while(server is active) {
    listen for request
    process request
}

如果请求需要很长时间处理,那么在此期间,没有其它客户端可以发送请求。只有当服务处于监听状态时,请求才能被接收。

替代的设计是监听线程把请求传递给工作线程,随后立即返回继续监听。工作线程会处理请求,进而响应客户端。这种设计的伪代码如下:

while(server is active) {
    listen for request
    hand request to worker thread
}

这样服务线程可以马上返回监听,因此更多客户端可以发送请求给服务器,响应性能得到提升。

对于桌面应用同样如此。你点击一个按钮,它会启动一个长时间任务,如果执行该任务和更新窗口,按钮等使用同一线程,那么在执行任务时应用就会无响应。相反,任务可以被工作线程处理。即使工作线程非常繁忙,窗口线程也能响应其它用户请求。当工作线程完成工作时,它会向窗口线程发送信号。窗口线程便可使用任务结果更新窗口。使用工作线程设计的应用在用户看来会更具响应性。

More Fair Distribution of CPU Resources

想象一个接收客户端请求的服务器。如果其中一个客户端的请求需要长时间处理,例如 10s。如果服务器使用单线程处理所有任务,那么所有后续请求必须等待该任务被处理。

通过将 CPU 时间在多线程间切换,CPU 可以更公平地执行每个请求。即使某个请求需要长时间处理,服务器也可以同时处理其它短耗时请求。当然,慢请求的处理将变得更加缓慢,因为它不能独享 CPU 时间。但是其它请求的等待时间将大大缩短,因为它们无需等待慢请求结束。如果只有慢请求要处理,它仍然能独享 CPU。

3. Multithreading Costs

重构应用为多线程并不仅仅带来好处,它也伴随着一定的代价。不要仅仅因为你能使用就在应用中使用多线程。你应该有足够理由相信引入多线程的优势大于代价。有疑问时,尝试测试应用的性能和响应,而非仅靠猜测。

More complex design

尽管多线程应用的某些部分较单线程简单,但其它部分会更加复杂。多线程访问共享数据的代码需要特别关照,线程竞争远不简单。由不正确的线程同步带来的错误将很难被检测,重现和修复。

Context Switching Overhead

CPU 从一个线程切换到另一个时,它需要保存当前线程的局部数据,程序指针等,还需加载下一线程的局部数据,程序指针。这种切换叫 上下文切换 (context switch),CPU 从一个线程的运行上下文切换到另一个。

上下文切换并不便宜。如非必要,不要进行。你可以阅读 维基百科 了解更多。

Increased Resource Consumption

线程运行需要计算机的某些资源。除了 CPU 时间,线程还需内存保存本地栈。操作系统为了管理线程,也要消耗一些内部资源。尝试创建包含 100 个忙等线程的应用,运行它,观察它会占用多少内存。

4. Concurrency Models

并发系统可以使用不同并发模型实现。并发模型指定了系统中的线程如何合作完成分配给它们的任务。不同并发模型划分任务的方式不同,线程间通信和合作的方式也不同。本章将深入介绍当前节点 (2015 - 2019) 最流行的并发模型。

Concurrency Models and Distributed System Similarities

本文描述的并发模型和分布式系统中使用的不同架构很相似。并发系统中的多线程互相通信,而分布式系统中的不同进程也相互通信(很大可能位于不同计算机)。线程和进程在本质上非常相似。这就是为何不同并发模型看上去就像不同分布式系统架构。

当然,分布式系统会面临额外挑战,如网络故障,远程计算机宕机或进程崩溃等。但运行于大型服务的并发系统也会面临类似问题,如 CPU 故障,网卡故障,磁盘故障等。虽然故障率很低,但它们在理论上都可发生。

由于并发模型和分布式系统的相似性,它们可以互相借鉴各自的思想。例如,工作线程分发模型类似于 分布式系统中的负载均衡。错误处理技术也是类似的,如日志记录,故障转移,任务的幂等性 等。

Shared State vs. Separate State

并发模型的一个重要方面是,组件和线程被设计为共享状态或者分离状态,即状态不在线程间共享。

共享状态 (Shared state) 指系统中的不同线程共享某些状态。状态指数据,通常是一到多个对象。当线程共享状态时,像 竞态条件死锁 问题便会发生。当然,这取决于线程如何使用和访问共享对象。

shared-state

分离状态 (Separate state) 指系统中的不同线程间不共享任何状态。它们之间的通信,要么通过交换不可变对象,要么发送对象(数据)的拷贝。因此,由于不会出现两个线程向相同对象(data / state)写入数据,便可避免大多数常见的并发问题。

separate-state

使用分离状态并发设计通常可以使代码的某些部分变得易于实现和理解,因为你知道只有一个线程会向特定对象写入数据。你无需担心对象的并发访问。但是,分离状态并发可能需要你花更多时间从全局思考应用设计。

Parallel Workers

第一个并发模型我把它叫做 平行作业 (parallel workers)。新的工作被分配给不同作业者。看下图说明:

parallel-workers

代理人负责分发任务,作业者完成委派的全部工作。作业者平行运作,运行于不同的线程,很可能位于不同 CPU。

如果用平行作业模型实现汽车工厂,那么每个作业者将生产一整台车。作业者会得到汽车的建造规范,完整生产需要的每个部件。

平行作业模型是 Java 应用中最常用的并发模型(虽然在不断变化)。java.util.concurrent 包下的许多并发工具都设计成这种模型。你也能在 Java EE 服务应用中见到这种模型。

平行作业既可共享状态,也能分离状态,所以作业者能够访问某些共享状态(共享对象或数据),也可能不共享状态。

Parallel Workers Advantages

平行作业模型的优点是易于理解。要增加应用的并发性,你只需添加更多作业者。

例如,你正在实现一个网页爬虫,你可以使用不同数量的作业者爬取特定网页,观察多少作业者会获得最短的爬取时间(意味着最高性能)。由于网页爬虫是 IO 敏感型工作,对于计算机上的每个 CPU 或核心,你最终可能需要几个线程。每个 CPU 一个线程就太少了,因为在等待数据下载时,它会空闲很长时间。

Parallel Workers Disadvantages

然而,平行作业模型的简单外表下潜藏着许多缺点。下面我会挑选几个最明显的缺点予以说明。

Shared State Can Get Complex

一旦共享作业者需要访问某类共享数据,它要么在内存中,要么位于共享数据库,正确的管理并发访问将变得困难。下图展示了共享状态如何使该模型变得复杂:

shared-state-among-parallel-workers

某些共享状态通过工作队列机制通信。但还有些共享状态是业务数据,数据缓存,数据库连接池等。

一旦平行作业不经意间引入了共享状态,它将变得复杂起来。线程访问共享数据时必须确保它对数据的改变对其它线程可见(变更同步到主存,而不仅仅是执行线程的 CPU 所在的缓存)。线程需要避免 竞态条件死锁 和许多其它共享状态并发问题。

此外,当共享数据结构正在被访问时,工作线程必须互相等待,这将导致部分并发性损失。许多并发数据结构是阻塞的,这就意味着每次只允许一个或有限的线程集访问它们。在这些共享数据结构上会发生争夺,大量争用基本上会让访问共享数据结构的那些代码一定程度上退化为串行执行(消除了并行)。

现代 非阻塞并发算法 可能会减少争用,增加性能,但非阻塞算法难于实现。

持久化数据结构是另一个解决方案。当被修改时,它总是保存之前的版本。因此,如果多线程指向同一个持久化数据结构,如果其中一个线程修改了它,该线程将得到一份新结构的拷贝。其它线程仍然保持旧结构的引用,它包含着未改变的值,因此是一致的。Scala 标准 API 包含若干持久化数据结构。

尽管持久化数据结构是并发修改共享数据的优雅方案,但这种结构的表现并不理想。

举例来说,一个持久化列表会把新加元素放到表头,返回新加元素的引用(表头指向链表的剩余部分)。所有其它线程仍然指向旧的表头,对于这些线程而言,链表似乎没有改变。它们看不到新添加的元素。

这种持久化列表以链式结构实现。不幸的是,链表在现代硬件上的性能并不完美。其中的每个元素都是单独的对象,这些对象可以散布于整个计算机内存。现代 CPU 善于访问顺序数据,所以基于数组的列表性能会更好。数组顺序储存数据,CPU 缓存可以一次加载大块片段。而链表的元素分散在整个 RAM,不便批量加载。

Stateless Workers

共享状态可以被系统中的其它线程修改,因此作业者必须每次重载状态来确保它们获得的是最新拷贝。无论共享状态位于内存还是外部数据库,线程都要完成这种操作。不维护内部状态的线程称为 无状态 (stateless) 线程(但是仍需每次重载)。

每次需要时重载很慢,尤其当状态位于外部数据库时。

Job Ordering is Nondeterministic

平行作业的另一个缺点是任务执行的顺序无法指定。你无法保证哪个任务最先或最后执行。任务 A 可能先于任务 B 被分发,但任务 A 的执行可能后于任务 B。

这种不可确定性让给定时间点获取系统状态变得困难。它也使保证任务先后顺序变得困难(如果可能的话)。但这不总是一个问题,取决于系统的需求。

Assembly Line

第二个模型我把它叫做 流水作业 (assembly line) 并发模型。我使用这个名称仅为了与前面的平行作业比喻一致。其它开发者可能根据平台或社区使用其它名称(如响应式系统,或事件驱动系统)。示意图如下:

assembly-line

作业者就像工厂中的装配线工人一样被组织。每个作业者只完成任务的一部分。当该部分完成时,上游作业者将其传递给下游作业者。

使用流水作业模型的系统通常设计成使用非阻塞 IO。非阻塞 IO 指作业者发起 IO 操作时(如从网络取得文件或数据),它并不等待该操作完成。IO 操作是很缓慢的,所以等待它是对 CPU 时间的浪费。CPU 在此期间可以完成其他事情。当 IO 操作结束时,其结果(如读数据或写数据状态)会被传递给下个作业者。

使用无阻塞 IO,IO 操作决定作业者边界。作业者可以尽可能做更多,直到不得不发起一个 IO 操作。随后它放弃对作业的控制。当 IO 操作完成时,流水线中的下个作业者继续之前的工作,直到它也必须发起 IO 操作,如此往复。

boundary-between-workers

实际上,系统可能不限于单条流水线。因为大多数系统可以执行许多作业,只有作业部分需要往下执行,作业流才会在作业者间传递。现实中,可能同时运行着多条虚拟流水线,见下图:

multi-assembly-lines

作业甚至可能被转发给多个线程同步处理。例如,作业可能被同时转发给作业执行者和作业记录者。下图展示了三条流水线最终把任务都转发给了相同作业者。

job-formarding

真正的流水线可能比上面还要复杂。

Reactive, Event Driven Systems

使用流水线并发模型的系统有时也被叫做响应式系统或事件驱动系统。系统中的作业者响应发生的事件,这些事件要么是外部世界发送的,要么是其它作业者发出的。例如新的 HTTP 请求或某个文件完成了内存加载等。

截止成文,已经出现许多有趣的响应式/事件驱动平台,未来会出现更多。其中最流行的几个可能是:

  • Vert.x
  • Akka
  • Node.JS (JavaScript)

我个人认为 Vert.x 非常有意思(尤其对于像我这样守旧的 Java / JVM 使用者)。

Actors vs. Channels

行动者和频道是两个类似的流水线模型(响应式 / 事件驱动)示例。

行动者模型中,每个作业者被称为行动者。行动者间可以直接互相发送消息。消息被异步发送和处理。就像前面描述的,行动者可以实现一到多条作业处理流水线,下图展示了这种模型:

actor-model

在频道模型中,作业者间不能直接交流。取而代之,它们会将消息(事件)发布到不同频道。其它作业者可以监听这些频道上的消息,发送者对此并无感知。下图阐述了这种模型:

channel-model

截止成文,我觉得频道模型更具弹性。作业者无需知道流水线上的下个作业者是谁。它只需知道要将作业转发(或消息发送等)给哪个频道。频道的监听者可以订阅和退订频道,这不影响作业发送者。这允许对作业者进行一定程度的解耦。

Assembly Line Advantages

相比平行作业,流水线作业有不少优点,我将挑选最大的几个予以说明。

No Shared State

作业者间不共享状态意味着实现上不用考虑那些并发访问共享状态才会出现的问题。这使作业者的实现更加简单。你实现作业者时就像它是唯一执行作业的线程——基本上是单线程实现。

Stateful Workers

因为作业者知道其它线程不会修改它们的数据,所以可以持有状态。有状态指它们可以在内存中保存它们需要操作的数据,只要在最后把变化写回外部存储系统。有状态作业者通常快于其无状态实现。

Better Hardware Conformity

单线程代码的优点是,它总是更好符合底层硬件的工作方式。首先,如果你能确保代码以单线程执行,你就可能创造更优化的数据结构和算法。

其次,如上所说,有状态的单线程作业者可以将数据缓存到内存。当数据被缓存到内存中时,它有很大可能也会被缓存到执行线程的 CPU 缓存。缓存数据的访问会更快。

我把以天然符合底层硬件工作方式编写的代码称为具有 硬件一致性 (hardware conformity)。有些开发者把它叫做 机械共情 (mechanical sympathy)。我倾向于前者,因为计算机的机械部件很少,并且在当前语境下,同情是对 配合更好 的比喻,我认为 一致 更加合理。好吧,这是吹毛求疵,使用你喜欢的术语就好。

Job Ordering is Possible

流水线并发模型可以保证作业的顺序。而顺序又使得特定时间点系统状态的获取变得简单。更进一步,你可以把所有新作业写入日志。如果系统的某个部分出现故障,这份日志可以用来从零开始重建系统状态。作业以特定顺序写入日志,此顺序也是作业顺序的保证。见下图:

channel-model

保证作业顺序的实现并不简单,但总是可能的。如果可以,它将大大简化像备份,数据恢复,数据复制等任务,因为它们都可以借助日志完成。

Assembly Line Disadvantages

流水线并发模型的主要缺点是,作业的执行通常遍布多个作业者,因此你的项目会存在许多类。所以要确定特定作业的确切代码将变得困难。

编写代码也会变得困难。作业者总是作为回调处理器。嵌套太多的回调处理可能导致通常所说的 回调地狱 callback hell。这意味着跟踪代码在整个回调过程中做了什么将变得困难,同样困难的是确定每个回调对所需数据的访问。

在平行作业模型中这些工作相对简单。你可以打开作业者代码,从头读到尾,它们基本就是整个作业的代码。当然平行作业代码也可能分散成不同的类,但执行顺序通常容易从代码辨识。

Functional Parallelism

函数式并行 是第三个在当前时间点(2015)被讨论较多的并发模型。

它的基本思想是使用函数调用实现程序。函数可以视作 “代理” 或 “行动者”,它们互相发送消息,就像流水线模型那样。函数的调用就像消息的发送。

传递给函数的参数都会被拷贝,所以除接收函数以外的实体都无法操纵数据。这种拷贝有效避免了共享数据上的竞态条件。它使函数执行类似于原子操作。函数调用之间相互独立。

每个函数调用相互独立意味着它们可以被不同 CPU 执行。也就是说,函数式算法可以在多个 CPU 上并行执行。

Java 7 带来的 java.util.concurrent 包下的 ForkAndJoinPool 可以帮助你实现类似的函数式并行。Java 8 带来的并行 streams 可以帮助你并行地进行大集合遍历。记住有些开发者对 ForkAndJoinPool 持批判态度(你可以在我的 ForkAndJoinPool 教程中看到)。

函数式并行的难点在于指定哪些函数调用去并行。跨 CPU 的函数协同调用伴随着额外消耗。函数要完成的作业单元必须具有一定规模才能抵消这一消耗。如果函数调用规模很小,那么尝试对它并行化实际可能比单线程,单 CPU 执行更慢。

按照我的理解(它一点也不完美),你可以使用响应式,事件驱动模型实现一个算法,它可以获得类似于函数式并行的工作拆分。通过事件驱动,你还可以更加精确地控制并行作业及其规模(我的观点)。

此外,将任务拆分到多个 CPU 伴随着协作消耗,这仅当它是程序的唯一任务时才有意义。然而,如果系统当前正在执行多个其它任务(如网页服务器,数据库服务器和许多其它系统),尝试并行化单一任务就没有多大意义。无论如何,计算机中的其它 CPU 都正忙于其它任务,所以试图用一个更加缓慢的函数式并行任务打扰它们并不合理。

使用流水线模型可能更好,因为它的消耗更小(以单线程顺序执行),并且更符合底层硬件机制。

Which Concurrency Model is Best?

所以,哪种并发模型最好呢?

和通常案例一样,答案是取决于你的系统要做什么。如果你的任务天然平行,独立,也没有必要共享状态,那么可以使用平行作业实现它。

但是很多任务并不天然平行且独立。对于它们,我相信流水线模型的优势大于劣势,并且要优于平行作业。

你甚至无需自己实现流水线架构。像 Vert.x 那样的现代平台已经为你做了很多。我会在下个项目里探索这些平台的设计。我觉得 Java EE 完全没有边界。

5. Same-threading

同线程是一种并发模型,它将单线程系统扩展为 N 线程系统,结果是 N 条线程并行执行。

同线程系统并非只有一个线程,而是每个线程像单线程系统那样运行。因此使用术语 同线程 (same-threaded) 而非 单线程(single-threaded)

Why Single-threaded Systems?

你可能好奇为何今天还有人设计单线程系统。单线程系统流行的原因是它的并发模型相较多线程简单。单线程系统无状态共享,能够使用非并发数据结构,并且充分利用 CPU 及其缓存。

然而,单线程系统无法完全利用现代 CPU。现代 CPU 通常有 2,4,6,8 甚至更多核心。每个核心的功能等价于独立 CPU。如下图所示,单线程系统只能使用一个 CPU:

single-threaded-system

Same-threading: Single-threading Scaled Out

为了使用所有 CPU 核心,单线程系统可以被扩展。

One Thread Per CPU

通常,计算机的每个 CPU 核心包含一个同线程。如果一台计算机拥有 4 个 CPU 或者包含 4 个核心,那么运行 4 个同线程实例是很正常的,下图展示了这一原则:

multi-same-threaded-instances

No Shared State

同线程系统看上去和传统多线程类似,因为它的内部也运行着多条线程。但它们之间存在一点微妙区别,即同线程系统不共享状态。

同线程系统中的线程不会并发访问共享内存,不存在线程共享的并发数据结构。下图展示了这种不同:

a-difference-between-single-threaded-system-and-traditional-multi-threaded-system

共享状态的缺失使得每个线程表现得就像是单线程系统。同线程的基本含义是数据在同一线程处理,线程间不共享并发数据。有时它仅被叫做 无共享状态 (no shared state)状态分离 (separate state) 并发。

Load Distribution

显然,同线程系统需要在单线程实例间分享工作负载。如果只有一个线程工作,那它就变成了真正的单线程。

如何在不同线程间分发负载取决于你的系统设计,下面我将简要介绍。

Single-threaded Microservices

如果你的系统由许多微服务组成,每个微服务能够以单线程模式运行。当你在相同机器上部署多个单线程微服务时,每个微服务能够以单线程占用单个 CPU。

微服务天然无数据共享,所以它是同线程系统的良好用例。

Services With Shared Data

如果你的系统确实需要共享数据,或者至少是一个数据库,你也许可以对数据库进行拆分。拆分指把数据划分到多个数据库。通常的数据拆分原则是让彼此关联的数据位于相同数据库。例如,某个实体所属的所有数据被插入一个数据库中。然而分库超出了本篇教程的范畴,你需要搜索该话题的教程。

Thread Communication

同线程系统中,线程通过消息传递通信。如果线程 A 需要向线程 B 发送一条消息,它可以生成一个字节序列。线程 B 会复制并读取该消息。消息复制保证了线程 B 在读取时不会被线程 A 修改。一旦消息被复制,线程 A 便失去了对它的访问权。下图阐释了通过消息完成线程通信:

thread-communication-via-messaging

消息可以通过队列,管道,UNIX 套接字,TCP 套接字等承载,只要适合你的系统。

Simpler Concurrency Model

每个系统运行在自己的线程里组成同线程系统,实现上就像它是单线程的。这意味着它的内部并发模型比线程共享状态简单。你无需担心并发数据结构,及其产生的问题。

Illustrations

下面几张图比较了单线程,多线程和同线程系统,你能很容易看出它们的大体不同。首先是单线程系统。

single-threaded-system

随后是多线程系统,线程间存在共享数据。

multi-threaded-system

最后是同线程系统,它有两个线程,各自持有独立数据,通过传递消息通信。

same-threaded-system

Thread Ops for Java

Thread Ops for Java 是一个开源工具,它能帮你更容易地实现分离状态的同线程系统。它可以启动和停止独立线程,通过单线程获得一定的并发。如果你对同线程应用设计感兴趣,不妨去看一看。要了解更多,阅读我的 Thread Ops for Java Tutorial

6. Concurrency vs. Parallelism

术语 并发和并行 (concurrency and parallelism) 经常在多线程编程中出现。起初它们似乎指代相同事物,但实际上其含义并不相同。我将会在这篇教程分别解答。

为了澄清,本文中的并发和并行都位于一个应用 —— 一个进程。不是多应用,进程或计算机。

Concurrency

并发指应用同一时间(至少是表面上)进行着多个任务。

如果计算机只有一个 CPU,那应用不可能真的同时进行多个任务,为了看上去同时进行,CPU 要在不同任务间切换,就像下图这样:

switching-cpu-among-tasks

Parallel Execution

并行执行是当计算机拥有多个 CPU 或多核时,同步执行多个任务。但是并行执行并不代指并行性,后者我待会介绍。下面是并行执行示意图:

parallel-execution

Parallel Concurrent Execution

并行并发执行是可能的,这种情况下线程被分发到多个 CPU。因此,相同 CPU 上的线程并发执行,不同 CPU 间的线程并行执行,下图展示了这种情况:

parallel-concurrent-execution

Parallelism

并发性指应用将任务拆分成更小的子任务,它们可以被并行处理。例如,多 CPU 可以真正做到同时处理。因此,并行性与并行并发执行的模型并不相同,尽管它们表面看去很像。

要获得真正的并行性,你的应用必须包含多条线程,并且每个线程运行在独立 CPU / 核心 / GPU 等。

下图中,大任务被拆分成 4 个小任务,每个任务是一个线程,它们运行于 2 个 CPU。这意味着部分任务是并发执行的(那些处于相同 CPU 上的),还有部分是并行执行的(那些处于不同 CPU 上的)。

parallelism

取而代之,如果 4 个子任务的 4 条线程都运行于自己的 CPU (总共 4 个 CPU),那么这些任务就是完全并行执行的。但是,要把任务拆分成和 CPU 数量相等是不容易的。通常,更容易的做法是按实际意义拆分任务,让线程调度器去决定如何在 CPU 间分发线程。

Concurrency and Parallelism Combinations

回想下,并发指单个 CPU 如何看上去同时执行多个任务。

并行则和应用如何平行化执行单个任务有关 —— 通常通过将任务拆分成可以完全平行的子任务。

Concurrent, Not Parallel

应用可以是并发非并行的。这意味着它表面上同时进行多个任务,但会在每个任务间切换,直到所有任务完成。并不存在真正位于并行线程 / CPU 的平行化任务

Parallel, Not Concurrent

应用也可以是并行非并发的。这意味着它每次只能处理一个任务,该任务被拆分为可以平行化执行的子任务。但是,在下个任务被拆分和并行执行前,前一个已处理完毕。

Neither Concurrent Nor Parallel

此外,应用可以既不并发也不并行。这意味着它只能同时处理一个任务,并且该任务不会被拆分后并行执行。这种情况可能是小型命令行程序,它只有一个任务,太小了以至于没有必要并行化执行。

Concurrent and Parallel

最后,应用可以以两种方式同时并发和并行。

首先是简单的并行并发执行。即应用启动多个线程,它们在多个 CPU 上执行。

其次是应用并发执行多个任务,并且将每个任务拆分成并行执行的子任务。但是,这种情况下,某些并发和并行优势可能丧失,因为计算机中的 CPU 可能已经相当忙于单独的并发或并行任务。将两者结合可能只能提升些微性能,甚至会带来性能损失。确保在盲目使用并发并行模型前分析和测试它。

7. Single-threaded Concurrency

单线程并发 (Single-threaded Concurrency) 指在单线程内同时执行多个任务。传统上,你会使用多线程执行这些任务,让每个线程完成自己的工作。使用传统多线程并发,任务切换是操作系统完成的,CPU 会在不同线程间切换。但是,使用单线程并发技术,通过切换执行每个任务,一个线程事实上也能完成多任务执行。本章我将解释单线程并发是如何工作的,这种设计能带来哪些好处。

Single-threaded Concurrency is Still New Ground

我研究单线程并发设计始于为使用非阻塞 IO 的单线程服务设计寻找更好的线程模型,它使用 Java NIO。如 Netty,Vert.x 和 Undertow 这样的高性能 IO 工具都使用单线程服务设计。Node.JS 也使用单线程设计。据我所知,Vert.x 和 Undertow 底层使用了 Netty,因此都符合它的线程模型。

Netty 线程模型的核心概念是事件循环。一个事件循环是一个处于循环中的线程 —— 查询系统中发生的事件。当事件发生时,你的代码被调用来响应它。

尽管事件循环在某些类型系统和工作负载上表现良好,但也存在表现不佳的情况。因此我决定到别处寻找满足我需求的单线程并发模型。那么听我娓娓道来。

除了 Netty 和 Node.JS 我并未发现太多其它单线程并发设计。也许它隐藏在付费墙后的科学文章中,又或许它不是热点研究领域。

所有我不得不自己设计一些模型。我将它们发布在本篇教程中,但我怀疑随着时间推移可能会有更好的设计出现(如果它们出现我的设计会变得很奇怪)。如果你有任何想法或引用,我非常乐意学习,在社交网站上给我留言。

请注意:本文仍在完善中,不久的将来会添加更多内容。

Classic Multi-threaded Concurrency

在传统多线程并发架构中,你通常会将任务分配给独立线程执行。每个线程每次只能执行一个任务。在一些设计中,每个任务会创建一个线程,任务结束线程也会死亡。在其它设计中,会维护一个线程池,它每次从任务队列中取出一个任务执行它,然后取下一个,如果往复。阅读我的 thread pools 教程了解更多信息。

多线程架构的优势是可以相对容易地进行跨线程和 CPU 的负载分发。只要把任务交给线程,让操作系统和 CPU 调度它们。

但是,如果任务间需要共享数据,多线程架构便会出现许多并发问题,如 race conditionsdeadlockslipped conditionsnested monitor lockout 等。大体上,共享相同数据和数据结构的线程越多,发生并发问题的可能性越大。换句话说,就需要越仔细地检查你的设计。

一个经典多线程架构有时也可能导致拥塞,当多线程试图同时访问相同的数据结构时。取决于特定数据结构的实现有多优秀。当其它线程访问该数据结构时,一些线程可能会等待访问而阻塞。

Single-threaded or Same-threaded Concurrency

经典多线程架构的取代物是单线程或 同线程 并发。在单线程并发设计中,你需要自己实现任务切换。

你可以将单线程架构扩展为多线程,这些线程自身就像是单独,隔离的单线程系统。这种情况我把它叫做同线程。所有任务需要的数据被隔离在线程内部。

Single-threaded Concurrency Benefits

我认为单线程设计相比多线程有一些优势,下面是我信服的结论。

Full Thread Visibility

首先,单线程并发避免了大多数多线程并发的问题。当相同线程执行多个任务时,你避免了诸如线程可见性的并发问题,它指对共享数据结构的更新对其它线程不可见。

使用多线程并发,你必须确保正确使用了 synchronization,volatile 变量和(或)并发数据结构,确保对共享数据的更新对其它线程可见。

阅读我的 Java Memory ModelJava Happens Before Guarantee 了解更多 Java 线程可见性问题。

No Race Conditions

当仅有一个线程访问多任务的共享数据结构时(因为所有任务都被相同线程执行),你就能避免竞态条件问题。竞态条件发生的原因是多线程访问相同的关键代码而不保证线程访问顺序。你可以阅读 Race Conditions and Critical Sections 深入了解该问题。

Control Over Task Switching

当手动切换任务时,你能控制切换何时发生。你能确保切换前共享资源处于合理状态,也能决定达到多少负载增量(chunks)切换合理。

控制切换前负载增量的大小有利于更好控制 CPU 的使用。增量过小会导致任务切换过多。更大的增量意味着更少的切换。你想要的是减少任务切换消耗。花费在暂停任务和恢复另一个任务的 CPU 周期会被浪费。这部分 CPU 时间自身没有在应用中产生任何有用结果。你也许不想小于特定大小的任务被打扰 —— 来避免不必要的切换。

能够决定工作增量的大小也让你可以指定任务优先级。如果有两个任务要切换,你可以决定其中之一的增量是 1,而另一个的增量为 2 或者更多。这样,第二个任务就会获得更多 CPU 时间。你可以自己控制任务的优先级。

Control Over Task Prioritization

实现一个这样的单线程任务切换,你可以指定任务优先级 —— 使某些任务获得更多 CPU 时间。我将会在后面的教程中再次讨论。

Single-threaded Concurrency Challenges

使用单线程并发设计也伴随着一些挑战。下面我将挑选几个介绍。

在不损失单线程架构的简单优势和太过复杂化整体设计的前提下克服这些挑战是可能的。

Implementation Required

自己实现任务切换需要你学习如何实现这种设计,并且手动编码,这是第一个挑战。它也会增加一些代码库开销(尽管不是很大我要说)。幸运的是,你可以创建一个可重用跨应用的单线程任务切换设计,那样就能最小化实现开销。

Blocking Operations Have to be Avoided or Handled in Background Threads

如果任务需要执行阻塞 IO 操作,那么该任务和线程会一直阻塞直到 IO 完成。在此之前,线程不能切换到其它任务。

由于这种阻塞 IO 限制,有必要把其放到自己的后台线程执行,系统也因此恢复为经典多线程设计。

A Single Thread Can Only Utilize a Single CPU

单线程只能使用单个 CPU。如果你的应用运行在包含多 CPU 或多核的计算机,并且想要充分利用它们,你就不得不扩展单线程设计为同线程。这是可能的,但需要额外工作。

Single-threaded Concurrency Designs

现在让我们看看一些单线程设计,它们提供了我之前描述的功能,这样你就能明白它们如何工作,了解它们的优缺点。

Thread Loops

大多数长期运行的应用都运行于某种循环中,其中主线程时刻监听外部输入,继而处理输入,最后继续监听。

thread-loops

这种线程循环在服务程序(web services, services etc.)和 GUI 应用中都存在。有时线程循环被隐藏了,有时没有。

Pausing the Thread Loop

你可能好奇线程进行密集循环是否会浪费大量 CPU 时间。如果线程没有任何真正的工作要完成那确实如此。但是,执行循环的线程可以自由 “睡眠”,如果它认为睡眠几毫秒没有问题。那样的话,浪费的 CPU 时间就能减少。

Agents

通常线程循环会调用一些携带应用任务的组件。我把这些组件叫 代理 (Agent)。代理是一个携带工作负载的类任务组件。

代理的生命周期可能不同。它可以:

  • 运行于整个应用的生命周期
  • 运行一个长期工作 —— 最终会结束
  • 运行一个短期任务 —— 马上结束

因此,代理可以执行应用的长周期逻辑,一个由许多小任务组成的长期工作;或者一个一次性任务,它几乎马上完成。所以,代理覆盖了许多规模的任务和职责。

我倾向于使用代理这个术语,而不是工作或任务,因为它的生命周期,职责和能力可能超过了我们正常认为的单个工作或任务。我把代理看作执行工作或任务的组件。它自身并非工作或任务,尽管有时看上去类似。

Thread Loop Call Agents

通常,线程循环会重复调用代理组件 —— 把实际应用责任交给代理。这种设计分离了线程循环和代理的职责:

线程循环专注循环(重复调用代理)并检测代理何时终止,继而终止线程循环。代理的职责是执行应用逻辑自身,而非循环。

使用这种设计,你可以将不同类型的线程循环和代理结合。见下图:

thread-loop-call-agents

Agents May Call Other Agents

一个代理可能把它的工作分给其它代理。因此,代理有不同等级的职责。下图包含一个应用级别代理和多个任务级别代理。

agent-call-agents

Single-threaded Task Switching

如果上图中的某些任务代理需要长时间执行会怎样呢?如果任务代理在被应用代理第一次调用时简单地执行全部工作,那么整个系统(线程循环和代理)将会阻塞直到第一个任务代理完成所有工作。

为了同时进行多个任务(宏观上),线程必须能够在不同任务间切换。这也被叫做 任务切换 (task switching)。当只有一个线程时,你需要通过代理设计完成任务切换。

为了在单线程内完成任务切换,每个任务必须划分成一到多个增量:

splitting-task-into-increments

任何时刻代理被调用,它都将执行一到多个增量。一旦增量全部被执行,整个任务就完成了。

循环不断调用代理,让它们每次执行一些增量:

task-switching

Increment Size Balancing

如果单线程能够在多任务间切换,那就说明这些任务没有被过大划分。换句话说,帮助确保公平的执行时间划分是每个任务的职责。大体上说,具体的增量大小取决于具体任务和应用。

Prioritized Execution

指定某些任务由其它任务执行是可能的。你可以将参数传递给代理,告诉它执行多少工作增量。因此,一些代理可能被命令执行 1 个工作增量,而另一些可能被命令执行 2 个或更多增量。这就导致一些任务获得更多 CPU 执行时间,它们因此更快完成。

Agent Parking

如果代理在等待某些异步操作完成,例如远程服务响应,那么只有等到异步任务完成,它才能继续工作。这种情况下,一遍遍调用该代理是没有意义的,那样只是让它意识到它无法继续工作,随后立即返回到调用线程。

这种情况,如果能使代理 “停车 (Park)” 是很有意义的,那样它就不再会被调用。一旦异步任务完成,代理可以解除停车状态,并插入到活跃代理队列,活跃代理可以被持续调用将任务进行下去。当然,要能够解除停车,系统中的某些其它部分必须负责检测异步任务完成,并且计算哪个代理需要解除。

Scaling Single-threaded Concurrency

显然,如果应用中只有一个线程执行,你将无法使用超过一个 CPU。解决办法是启动多个线程。通常是每个 CPU 一个线程,取决于线程需要执行什么任务。如果你的任务需要执行阻塞 IO,例如从文件系统或网络读取文件,那么每个 CPU 可能需要多个线程。在等待阻塞 IO 完成时,线程除了等待不能做任何事情。

scaling-single-threaded-concurrency

当你把线程扩展为多个单线程子系统时,它在技术上就不再是单线程了。但是,每个单线程子系统会被设计成并且表现成典型的单线程系统。我习惯于把这种多线程的单线程系统称作 同线程 (same-threaded) 系统,尽管我不确定最准确的术语是什么。我们可能需要重新观察不同的设计,想出更具描述性的术语。

Event Loop vs. Thread Loops

本章开头,我提到像 Netty 那样的事件循环工具与本文的线程循环有所不同。为了展示这种不同,我总结了下面两张控制流图。

第一张图是事件循环。执行事件循环的线程首先调用事件循环,当不同事件发生时,事件循环又调用你的应用代码。事件间隔的时间由事件循环代码控制,你的应用不能使用它们。

event-loop

第二张图是线程循环。事件循环线程首先调用你的应用,应用调用 IO 工具检查新的入站连接,或者已有连接的入站数据,或者计时器事件。

thread-loop

在线程循环设计中,任何处于事件之间的时间都可以被应用使用做任何想做的事。例如,应用可以使用单线程任务切换继续调用没有完成的任务集合。

此外,如果应用有许多负载需要完成,它可以选择不检测入站连接,或读取已有连接的入站数据,以这种方式抵抗网络压力,应用便可集中精力执行当前任务。

这种可以选择何时检测事件,如何利用事件间时间的自由,就是我喜爱线程循环而非事件循环设计的原因。的确,这种区别很小,并且需要更多代码,但也给了你更多控制和弹性。

8. Creating and Starting Java Threads

一个 Java Thread 就像一个可以执行代码的虚拟 CPU —— 在你的 Java 应用中。main() 方法启动后会被主线程执行。主线程是一个特殊线程,它由 Java 虚拟机创建来运行你的应用。在应用内部,你可以创建和启动更多线程,它们可以和主线程并行执行你的代码。

Java 线程是和其它任何对象一样的 Java 对象,它是 java.lang.Thread 的实例。除了是对象,它还能执行代码。本章我将解释如何创建和启动线程。

Creating and Starting Threads

在 Java 里可以像这样创建线程:

Thread thread = new Thread();

要启动线程,你需要调用它的 start() 方法:

thread.start();

这个示例并未指定线程执行任何代码,因此启动后它会立即停止。

有两种方式指定线程要执行什么代码。首先是创建 Thread 的子类并重写它的 run() 方法。其次是传递一个 java.lang.Runnable 实例给 Thread 的构造器。下面逐一介绍。

Thread Subclass

Thread 的 run() 方法会在调用 start() 后被择机执行。下面是创建一个 Thread 子类的示例:

public class MyThread extends Thread {

    @Override
    public void run() {
        System.out.println("MyThread running");
    }
}

你可以像下面这样创建 MyThread 实例并启动它:

MyThread myThread = new MyThread();
myThread.start();

一旦线程启动,start() 就会立即返回,它不会等待 run() 完成。run() 方法会像在其它 CPU 上一样。执行时,它会打印 "MyThread running"。

你也可以像下面这样创建一个匿名 Thread 子类:

Thread thread = new Thread() {
    @Override
    public void run() {
        System.out.println("Thread Running");
    }
};

thread.start();

run() 方法执行时,它将打印 "Thread running"。

Runnable Interface Implementation

第二种指定线程执行代码的方式是创建一个实现了 java.lang.Runnable 的类。实现了 Runnable 接口的对象可以被 Thread 类执行。

Runnable 是一个 Java 平台的的标准 Interface,它只有一个 run() 方法,下面是它的声明:

@FunctionalInterface
public interface Runnable {

    public abstract void run();
}

线程应该做的事情必须包含在实现类的 run() 方法中。有三种方式实现这个接口:

  1. 创建一个实现该接口的具名类
  2. 创建一个实现该接口的匿名类
  3. 创建一个实现该接口的 Lambda 表达式

我将逐一解释。

Java Class Implements Runnable

下面是一个实现 Runnable 的自定义类:

public class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("MyRunnable running");
    }
}

该实现要完成的事是打印 MyRunnable running。打印完字符后,run() 就会退出,线程也随之停止。

Anonymous Implementation of Runnable

下面是一个实现了 Runnable 接口的匿名类实例:

Runnable myRunnable = new Runnable() {
    @Override
    public void run() {
        System.out.println("Runnable running");
    }
};

除了匿名,它和具名类实现非常相似。

Java Lambda Implementation of Runnable

第三种实现 Runnable 接口的方式是创建 Java Lambda,因为 Runnable 接口是单方法接口,即 functional Java interface

Runnable runnable = () -> System.out.println("Lambda Runnable running");

Starting a Thread With a Runnable

要让 run() 方法被线程执行,传递一个实现了 Runnable 接口的具名、匿名类实例或 Lambda 表达式给 Thread 构造器。就像下面这样:

// or an anonymous class, or lambda...
Runnable runnable = new MyRunnable();

Thread thread = new Thread(runnable);
thread.start();

线程执行时,它会调用 MyRunnable 实例而非自身的 run() 方法。上例会打印 "MyRunnable running"。

Subclass or Runnable?

没有规则规定哪种方式最好,它们都能工作。但我个人更喜欢实现 Runnable 接口,把实例传给 Thread。当使用 thread pool 执行 Runnable 时,很容易将 Runnable 实例排队,直到来自该池的线程处于空闲状态。这通过 Thread 类是比较困难的。

有时你可能需要同时继承 Thread 类和继承 Runnable 接口。例如,要创建一个可以执行多个 RunnableThread 子类,这是实现线程池的典型用例。

Common Pitfall: Calling run() Instead of start()

创建和启动线程时的常见错误是像下面这样调用 Threadrun() 而不是 start() 方法:

Thread newThread = new Thread(MyRunnable());
newThread.run(); // should be start();

起初你可能没有注意到任何事情,因为 run() 就像你期待的那样运行了。但是,它并非由新创建的线程执行,而是创建线程的线程,也就是执行上面两行代码的线程。为了让它被新创建的线程调用,你必须调用 newThread.start() 方法。

Thread Names

创建线程时,你可以给它一个名称。名称可以帮助你区分不同线程。比如,如果有多个线程向 System.out 输出内容,通过名称你可以方便地识别出是哪个线程。下面是一个示例:

Thread thread = new Thread("New Thread") {
    @Override
    public void run() {
        System.out.println("run by:" + getName());
    }
};

thread.start();
System.out.println(thread.getName());

注意作为参数传递给 Thread 构造器的字符串 "New Thread",它就是线程的名称。可以通过 Thread.getName() 获得线程的名称。对于 Runnable 的实现类,你也能通过类似方法为线程命名,下面是一个例子:

MyRunnable runnable = new MyRunnable();
Thread thread = new Thread(runnable, "New Thread");

thread.start();
System.out.println(thread.getName());

但是注意,因为 MyRunnable 不是 Thread 的子类,所以它不能访问执行它的线程的 getName() 方法。

Thread.currentThread()

Thread.currentThread() 方法返回调用该方法的线程实例的引用。通过它,你可以访问执行特定代码块的线程对象。下面是使用示例:

Thread thread = Thread.currentThread();

一旦获得了线程对象的引用,你就能调用它的方法。例如,你可以获得执行当前代码线程的名称:

String threadName = Thread.currentThread().getName();

Java Thread Example

下面是一个简单示例。首先,它打印了执行 main() 方法的线程的名称。该线程由 JVM 分配。随后启动了 10 个线程,它们以数字命名。每个线程都会打印自己的名称,随后结束执行。

public class ThreadExample {

  public static void main(String[] args){
    System.out.println(Thread.currentThread().getName());
    for(int i=0; i<10; i++){
      new Thread("" + i){
        public void run(){
          System.out.println("Thread: " + getName() + " running");
        }
      }.start();
    }
  }
}

注意,即便这些线程以数字顺序启动(1,2,3 等),它们可能不会按顺序执行,这意味着线程 1 可能不是首先向 System.out 写入名称的线程。这是因为线程原则上并行执行,而非串行。JVM 和(或)操作系统决定线程执行的顺序。该顺序无需与启动顺序一致。

Pause a Thread

线程可以通过调用 Thread.sleep() 使自己暂停。该方法接收一个数字作为参数,含义是毫秒。它会使线程睡眠指定毫秒再恢复运行。该方法不是 100% 准确的,但相对而言还可以。下面的示例使线程暂停 3 秒(3000 毫秒):

try {
    Thread.sleep(3 * 1000L);
} catch (InterruptedException e) {
    e.printStackTrace();
}

Stop a Thread

停止线程需要额外的准备来实现。Thread 类包含一个 stop() 方法,但它过时了。该方法起初也不保证线程停止时它会处于何种状态。这意味着,被停止线程访问的所有对象都将处于不定状态。如果应用中的其它线程也访问了相同对象,你的应用可能会在非预料和不可预测的情况下崩溃。

替代方法是实现自己的线程停止代码。下面的示例中有一个 Runnable 实现类,它包含额外的 doStop() 方法,该方法会发出停止信号,Runnable 检测该信号在适当时候主动停止线程。

public class MyRunnable implements Runnable {

    private boolean doStop = false;

    public synchronized void doStop() {
        this.doStop = true;
    }

    public synchronized boolean keepRunning() {
        return !this.doStop;
    }

    @Override
    public void run() {
        while (keepRunning()) {
            // keep doing what this thread should do.
            System.out.println("Running");

            try {
                Thread.sleep(3 * 1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

注意 doStop()keepRunning() 方法,前者意在被其它线程调用,后者被执行 MyRunnable.run() 的线程调用。只要 doStop() 没有被调用,keepRunning() 就会返回 true,这意味着执行 run() 方法的线程要继续运行。

下面的例子中,一个线程启动了上述 MyRunnable 实例,在一段延迟后停止了它。

public class MyRunnableMain {

    public static void main(String[] args) {
        MyRunnable myRunnable = new MyRunnable();

        Thread thread = new Thread(myRunnable);

        thread.start();

        try {
            Thread.sleep(10L * 1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        myRunnable.doStop();
    }
}

本例中,main() 方法(主线程)睡眠 10 秒后调用了 MyRunnable.doStop() 方法。这会导致执行 MyRunnable 方法的线程停止,因为在此之后 keepRunning() 会返回 false

请记住,如果你的 Runnable 实现需要除了 run() 之外的其他方法(例如 stop()pause()),那么你就不能使用 Lambda 表达式,因为 Lambda 表达式只可以实现单方法接口。取而代之,你必须使用自定义类,或继承了 Runnable 的自定义接口,它可以包含额外方法。

9. Race Conditions and Critical Sections

竞态条件 (race condition) 是一个可能发生在临界区的并发问题。临界区 (critical section) 是一个被多线程执行的代码段,在该区域,不同线程执行顺序会导致不同结果。

如果执行临界区的多个线程的执行顺序不同会导致结果不同,就称该临界区包含竞态条件。竞态条件这个术语是个比喻,表示线程在临界区内竞速,竞速结果影响临界区的执行结果。

这听起来可能有点复杂,接下来我会详述它们。

Two Types of Race Conditions

竞态条件发生的时机是多个线程通过以下两种模式读写相同变量:

  • Read-modify-write
  • Check-then-act

Read-modify-write 模式指,多个线程首先读取特定变量,随后修改它的值,最后把修改写回变量。导致问题的必要条件是,新值依赖旧值。产生的问题是,如果两个线程读取变量值(到 CPU 寄存器),随后修改它(在 CPU 寄存器中),最后写回主存,结果会不合预期。后文还将详细解释。

Check-then-act 模式指,多个线程检测特定条件,例如 Map 中是否包含给定值,随后根据条件执行动作,例如把值从 Map 中取出。问题发生的条件是,如果两个线程同时检测 Map 中是否存在指定值,它们都看到该值存在,随后都试图拿走(remove)该值。但是,只有其中之一可以真正拿到值,另一个会得到 null。使用队列时也会发生这种情况。

Read-Modify-Write Critical Sections

如上所说,一个读改写临界区可能导致竞态条件。本节我们将近距离观察它为何会发生。下面是一个读改写临界区代码示例,多个线程同时执行它时会产生错误。

public class Counter {
    protected long count = 0;

    public void add(long value) {
        this.count = this.count + value;
    }
}

想象有两个线程 A 和 B 正在执行同一个 Counter 实例的 add 方法。我们无法知道操作系统何时会切换两个线程。add() 方法中的代码并非被 JVM 以原子操作执行,而是一组更小的指令集合,类似下面这样:

  1. this.count 从内存读到寄存器。
  2. 在寄存器中执行加法。
  3. 把结果从寄存器写回内存。

以下面这种顺序混合执行线程 A 和 B,观察会发生什么:

/**
 this.count = 0;
 
 A:  Reads this.count into a register (0)
 B:  Reads this.count into a register (0)
 B:  Adds value 2 to register
 B:  Writes register value (2) back to  memory. this.count now equals 2
 A:  Adds value 3 to register
 A:  Writes register value (3) back to  memory. this.count now equals 3
 */

两个线程想要把 2 和 3 加到计数器,因此最后的值应该是 5。但由于两个线程交织执行,实际结果与期望不符。

在上面的执行顺序示例中,两个线程都从内存读到了 0。随后它们独立把自己的值 2 和 3 加到计数器上,最后把结果写回内存。结果不是 5,取而代之是最后一个写值的线程写入的值。上例中它是线程 A,但客观上也可能是线程 B。

Race Conditions in Read-Modify-Write Critical Sections

之前 add() 方法中的代码包含一个临界区。当多线程执行临界区时,竞态条件就发生了。

更正式的说,当两个线程竞争相同资源,资源被访问的顺序非常重要时,这种情况叫竞态条件。导致竞态条件的代码段称为临界区。

Check-Then-Act Critical Sections

同样如上所说,检测行动模型也会导致竞态条件。如果两个线程检测相同条件,根据它执行动作,此动作会改变条件,此时竞态条件就会发生。如果两个线程同时检测条件,其中之一首先改变了条件,这将导致另一线程表现异常。

要解释检测行动模型为何会导致竞态条件,看下面这个示例:

public class CheckThenActExample {
    public void checkThenAct(Map<String, String> sharedMap) {
        if (sharedMap.containsKey("key")) {
            String val = sharedMap.remove("key");
            if (val == null) {
                System.out.println("Value for 'key' was null");
            }
        } else {
            sharedMap.put("key", "value");
        }
    }
}

如果多个线程调用同一 CheckThenActExample 对象的 checkThenAct() 方法,且超过两个线程同时执行 sharedMap.containsKey("key") 得到 true,因此进入 if 语句块内部。在那儿,它们都试图移除键为 key 的键值对,但只有一个线程能执行成功,其它线程都会得到 null 值。

Preventing Race Conditions

要防止竞态条件发生,你必须确保临界区作为原子指令执行。那意味着一旦有线程执行它,其它线程必须等待该线程离开临界区才能执行。

竞态条件可以通过恰当的线程同步避免。使用 synchronized block of Java code 可以同步线程,像 locks 或原子变量,如 java.util.concurrent.atomic.AtomicInteger 一样的同步结构也可以实现线程同步。

Critical Section Throughput

对于较小的临界区,让其全部成为同步块就能工作。但对于较大的那些,把它们拆分成多个较小区域比较有益,这样可以让多个线程执行每个小临界区。原因是它可以减少共享资源竞争,因此增加整个临界区的并发量。

下面是一段非常简单的示例代码:

public class TwoSums {
    private int sum1 = 0;
    private int sum2 = 0;

    public void add(int val1, int val2) {
        synchronized (this) {
            this.sum1 += val1;
            this.sum2 += val2;
        }
    }
}

注意 add() 方法是如何把值加到两个不同求和成员变量的。为了防止竞态条件,求和只能在同步块里执行。在这种实现下,一次只有一个线程可以执行该方法。

但是,由于两个求和变量是互相独立的,你可以把它们拆分成两个独立同步块,就像下面这样:

public class TwoSums {
    private int sum1 = 0;
    private int sum2 = 0;

    private final Integer sum1Lock = new Integer(1);
    private final Integer sum2Lock = new Integer(2);

    public void add(int val1, int val2) {
        synchronized (this.sum1Lock) {
            this.sum1 += val1;
        }
        synchronized (this.sum2Lock) {
            this.sum2 += val2;
        }
    }
}

现在两个线程就可以同时执行 add() 方法了。其中之一处于第一个同步块,另一个处于第二个。两个同步块对不同对象执行同步,所以两个不同线程可以独立执行它们。这种方式线程需要互相等待的时间就会减少。

当然,本示例非常简单。真实的共享资源案例中,拆分临界区可能非常困难,并且需要对执行顺序的可能性做更多分析。

10. Thread Safety and Shared Resources

可以安全地同时被多个线程调用的代码被认为是 线程安全 (thread safe) 的。如果一段代码线程安全,那它就不包含 竞态条件。竞态条件仅在多线程更新共享资源时发生。因此搞清楚什么资源在执行时会被线程共享是很重要的。

Local Variables

局部变量储存在线程自己的栈中。这意味着它们永远不会被多线程共享。这也意味着所有局部基本变量都是线程安全的。下面是一个线程安全的局部基本变量示例:

public void someMethod() {
    long threadSafeInt = 0;
    threadSafeInt++;
}

Local Object References

局部对象引用有点不同。引用自身不是共享的,但被引用的对象并非存储在线程栈。所有对象都存储在共享堆上。

如果一个局部变量永远不会从创建方法逃出,它就是线程安全的。事实上,你也可以把它传给其它方法或对象,只要这些方法和对象不会把该对象传给其它线程。

下面是一个线程安全的局部对象示例:

public void someMethod() {
    LocalObject localObject = new LocalObject();

    localObject.callMethod();
    method2(localObject);
}

public void method2(LocalObject localObject) {
    localObject.setValue("value");
}

每个执行 someMethod() 的线程都会创建自己的 LocalObject 实例,并把它赋值给 localObject 引用。即使 LocalObject 实例被作为参数传给同一类的其它方法,或者其它类,因为这些方法和类是线程安全的,所以这里的 LocalObject 是线程安全的。

当然,唯一例外是,存在一个把 LocalObject 当成参数的方法,把该实例存储到允许其它线程访问的地方。

Object Member Variables

对象的成员变量(属性)和对象一样存储在堆中。因此,如果两个线程调用同一对象实例的某个方法,并且该方法更新对象的成员变量,那么该方法就不是线程安全的。下面是一个实例:

public class NotThreadSafe {
    StringBuilder builder = new StringBuilder();

    public void add(String text) {
        this.builder.append(text);
    }
}

如果两个线程同时调用相同 NotThreadSafe 实例的 add() 方法,它将导致竞态条件。例如:

public class MyRunnable implements Runnable {
    NotThreadSafe instance = null;

    public MyRunnable(NotThreadSafe instance) {
        this.instance = instance;
    }

    @Override
    public void run() {
        this.instance.add("some text");
    }

    public static void main(String[] args) {
        NotThreadSafe sharedInstance = new NotThreadSafe();

        new Thread(new MyRunnable(sharedInstance)).start();
        new Thread(new MyRunnable(sharedInstance)).start();
    }
}

注意 MyRunnable 是如何共享相同 NotThreadSafe 实例的。这种情况,当它们调用 add() 方法时,竞态条件就发生了。

但是,如果两个线程调用不同实例的 add() 方法就不会发生竞态条件。下面的示例和之前类似,但稍有不同:

new Thread(new MyRunnable(new NotThreadSafe())).start();
new Thread(new MyRunnable(new NotThreadSafe())).start();

现在,两个线程拥有自己的 NotThreadSafe 实例,所以它们调用 add 时彼此互不干扰。现在代码不会发生竞态条件。所以,即便一个对象不是线程安全的,它仍然可被以不会导致竞态条件的方式使用。

The Thread Control Escape Rule

要评估你的代码对特定资源的访问是否是线程安全的,可以使用线程失控规则:

/**
 * thread control escape rule
 * 
 * If a resource is created, used and disposed within
 * the control of the same thread, and never escape the
 * control of this thread, the use of that resource is
 * thread safe.
 */

资源可以是任何共享资源,像对象,数组,文件,数据库连接,套接字等。在 Java 中,你总是不显示释放对象,所以 "disposed" 意味着失去或者 nulling 对象的引用。

即便一个对象的使用是线程安全的,如果它指向一个共享资源,如文件或数据库,那么你的应用整体上也不是线程安全的。例如,如果线程 1 和 2 创建了自己的数据库连接,连接 1 和 2,对每个连接的使用是线程安全的。但使用连接指向的数据库可能不是线程安全的。例如,如果两个线程都执行下面的代码:

check if record X exists
if not, insert record X

如果两个线程同时执行上述代码,并且它们检查的 X 是相同记录,那么两个线程都执行插入操作最终会导致危险。下面是解释:

Thread 1 checks if record X exists. Result = no
Thread 2 checks if record X exists. Result = no
Thread 1 inserts record X
Thread 2 inserts record X

这对于操纵文件或其它共享资源的线程也是同样的。因此区分被线程控制的对象是资源,还是仅仅是资源的引用(如数据库连接)是非常重要的。

11. Thread Safety and Immutability

只有多线程访问相同资源,并且有线程修改该资源时才会发生 竞态条件。如果多个线程只是读取相同资源,竞态条件不会发生。

通过使线程共享对象不可变,我们可以确保它永远不会被任何线程更新,因此是线程安全的。下面是一个示例:

public class ImmutableValue {

    private int value = 0;

    public ImmutableValue(int value) {
        this.value = value;
    }

    public int getValue() {
        return value;
    }
}

注意 ImmutableValue 实例的值是通过构造器传递的,并且它没有 setter 方法。一旦实例被创建,你就不能更改它的值,它是不可变的。但是你可以使用 getValue() 方法获得它的值。

你可以通过返回新实例的方式,在该实例上执行操作。下面是加法操作示例:

public ImmutableValue add(int valueToAdd) {
    return new ImmutableValue(this.value + valueToAdd);
}

注意到,并非在自身的值上进行加法,而是把加法结果通过新实例返回。

The Reference is not Thread Safe

需要记住,即便对象是不可变的,因此是线程安全的,但它的引用可能不是线程安全的。看下面的示例:

public class Calculator {
    private ImmutableValue currentValue = null;

    public ImmutableValue getCurrentValue() {
        return currentValue;
    }

    public void setCurrentValue(ImmutableValue currentValue) {
        this.currentValue = currentValue;
    }

    public void add(int newValue) {
        this.currentValue = this.currentValue.add(newValue);
    }
}

Calculator 类持有一个 ImmutableValue 实例,注意到可以通过 setValue()add() 方法改变该引用。因此,即使它内部使用了一个不可变对象,但它自身可以改变,因此不是线程安全的。换句话说,ImmutableValue 类是线程安全的,但使用它并不安全。通过不可变获得线程安全时要谨记这一点。

要使 Calculator 类线程安全,你可以把 setValue()getValue()add() 都声明为 synchronized。那样便可取得效果。

12. Java Memory Model

Java 内存模型指定了 Java 虚拟机如何使用计算机内存。Java 虚拟机是完备的计算机模型,所以它自然包含内存模型 —— 也就是 Java 内存模型。

想要正确设计表现良好的并发程序,搞懂 Java 内存模型非常重要。Java 内存模型指定了不同线程怎样以及何时能看到被其它线程写入的共享变量,在必要时如何同步对共享变量的访问。

原始的 Java 内存模型是不完备的,因此 Java 1.5 对其进行了改进,这一版本在今天 (Java 14+)仍被使用。

The Internal Java Memory Model

JVM 内部使用的内存模型分为线程栈和堆。下图展示了它的逻辑视图:

logic-perspective-of-java-memory-model

Java 虚拟机内运行的每个线程都有自己的线程栈。线程栈包含线程调用的方法信息,它标示了线程如何执行到当前点。我喜欢把它叫做 “调用栈”。随着线程执行其代码,调用栈随之改变。

线程栈也包含每个被执行方法(调用栈的所有方法)的所有局部变量。线程只可访问它自己的线程栈。线程创建的局部变量对其它所有线程都不可见除了自己。即使两个线程执行完全相同的代码,它们仍然会在各自线程栈中创建自己的局部变量。因此,每个线程保存着自己的局部变量版本。

所有基本类型(boolean, byte, short, char, int, long, float, double)局部变量可以完全存储于线程栈,因此对其它线程不可见。线程可以传递一份基本类型变量拷贝给其他线程,但它们不能分享基本类型局部变量自身。

堆上包含 Java 应用创建的所有对象,无论哪个线程创建了它。这包含基本类型版本的包装类(Byte,Integer,Long 等)。无论对象被创建和赋值给局部变量,还是作为其它对象的成员变量,它们都储存于堆。

下面的图表中,调用栈和局部变量存储在线程栈,对象储存于堆:

call-stack-on-stack-and-object-on-heap

局部变量只要是基本类型,它们都储存在线程栈。

它也可能是对象引用。此时引用(局部变量)仍然存储在线程栈,但对象自身存储于堆。

一个对象可能包含方法,这些方法可能包含局部变量。这些局部变量仍然存储于线程栈,尽管方法所属对象存储于堆。

对象的成员变量和对象一起存储于堆。无论成员变量是基本类型还是引用类型。

静态变量也和类定义一起存储于堆。

堆上的对象可以被所有持有它引用的线程访问。当线程访问对象时,它也可以访问该对象的成员变量。如果两个线程同时调用相同对象的方法,它们都能访问到对象的成员变量,但每个线程会持有局部变量的拷贝。

下面这张图展示了上述情况:

threads-access-to-objects

两个线程都有自己的局部变量集合。其中一个局部变量(Local Variable 2)指向堆上的共享对象(Object 3)。两个线程对相同对象持有的引用不同。引用自身是局部变量,因此存储在各自的线程栈。但两个不同引用都指向堆上同一个对象。

注意到共享对象(Object 3)持有 Object 2 和 Object 4 的引用,这两个对象是 Object 3 的成员变量(Object 3 两侧的箭头指向了 Object 2 和 Object 4)。通过 Object 3 的成员变量引用,两个线程都能访问 Object 2 和 Object 4。

图中还显示了一个指向堆上两个不同对象的局部变量(Object 1 和 Object 5)。理论上,两个线程都可以访问 Object 1 和 Object 5,如果它们具有两个对象的引用。但图中每个线程只可以访问其中一个对象。

所以,什么样的 Java 代码会导致上图中的内存布局?实际上,代码就像下面这样简单:

public class MySharedObject {
    // static variable pointing to instance of MySharedObject

    public static final MySharedObject sharedInstance = new MySharedObject();

    // member variables pointing to two objects on the heap

    public Integer object2 = new Integer(22);
    public Integer object4 = new Integer(44);

    public long member1 = 12345;
    public long member2 = 67890;
}

public class MyRunnable implements Runnable {
    @Override
    public void run() {
        methodOne();
    }

    public void methodOne() {
        int localVariable1 = 45;

        MySharedObject localVariable2 = MySharedObject.sharedInstance;

        // ... do more with local variable.

        methodTwo();
    }

    public void methodTwo() {
        Integer localVariable1 = new Integer(99);

        // ... do more with local variable.
    }
}

如果有两个线程执行 run() 方法,图表中的结果就出现了。run() 方法调用了 methodOne()methodOne() 调用了 methodTwo()

methodOne() 声明了一个基本类型局部变量(int 型的 localVariable1),和一个引用类型局部变量(localVariable2)。

每个执行 methodOne() 的线程都会在各自线程栈上创建 localVariable1localVariable2 拷贝。localVariable1 会完全独立,只存活于各自线程的栈中。一个线程对自己 localVariable1 的拷贝对另一个线程不可见。

执行 methodOne() 的每个线程也会创建自己的 localVariable2 拷贝。但这两份不同的拷贝最终都指向堆上的同一个对象。localVariable2 指向的是一个被静态变量引用的对象。静态变量的拷贝只有一份,它存储在堆上。因此,两份 localVariable2 拷贝最终都指向相同的实例 MySharedObject,该实例又被静态变量引用,并且也存储于堆。它对应上图中的 Object 3。

注意到,MySharedObject 类还包含两个成员变量。成员变量自身和对象一起存储于堆。两个成员变量指向两个其它 Integer 对象,对应上图中的 Object 2 和 Object 4。

注意到 methodTwo() 创建了一个名为 localVariable1 的局部变量。该变量指向一个 Integer 对象。localVariable1 的引用将被存储在每个线程的拷贝中。两个 Integer 对象会被存储到堆上,但因为方法每次都会创建一个新对象,两个线程创建独立的 Integer 实例,对应上图中的 Object 1 和 Object 5。

还注意到 MySharedObject 类还有两个类型为 long 的成员变量,它们是基本类型。由于它们是成员变量,因此也会和对象一起存储到堆中。只有局部变量会存储到线程栈。

Hardware Memory Architecture

现代硬件内存架构与 Java 内存模型有所不同。要搞懂两者如何搭配工作,理解硬件内存架构非常重要。本节介绍常见硬件内存架构,下节描述 Java 内存模型如何与其搭配工作。

下图简单展示了现代计算机硬件结构:

modern-computer-hardware-architecture

现代计算机通常包含 2 到多个 CPU,这些 CPU 有时还有多核。重点是,这种计算机允许同时运行多个线程。单个 CPU 能够每次执行一个线程。这意味着如果你的应用是多线程的,每个 CPU 上可能都有一个线程,这些线程在应用中同时执行。

每个 CPU 都包含一套寄存器,它们基本上是 CPU 内部的内存。CPU 在寄存器上执行操作比在内存中快。这是因为 CPU 访问寄存器的速度快于访问主存。

每个 CPU 还可以有一个 CPU 缓存层。事实上,大多数现代 CPU 都包含一定大小的缓存层。CPU 访问缓存的速度大于主存,但通常小于访问内部寄存器。所以,CPU 缓存的速度介于内部寄存器和主存之间。某些 CPU 可能有多个缓存层(Level 1 和 Level 2),但这对于理解 Java 内存模型如何与硬件内存交互并不重要。重要的是要知道 CPU 包含某种缓存层。

每台计算机还包含一个主内存区(RAM)。所有 CPU 都可以访问主存。主存尺寸通常远比 CPU 的缓存大。

通常,当 CPU 需要访问主存时,它会把主存中的部分数据读到 CPU 缓存。它甚至会把部分缓存读到内部寄存器,在寄存器上执行操作。当 CPU 需要把结果写回主存时,它会从内部寄存器将值刷新到缓存,并且在恰当时刻再把值刷回主存。

当 CPU 需要在缓存中储存其它东西,通常会将缓存中的值刷新回主存。CPU 可以一次写入和刷新部分缓存,不需要读写整个缓存。通常缓存以名为 “缓存行” 的小内存块更新。一到多个缓存行可能被读进缓存,或者被刷回主存。

Bridging The Gap Between The Java Memory Model And The Hardware Memory Architecture

就像之前提到的,Java 内存模型和硬件内存架构有所不同。硬件内存架构不区分线程栈和堆。硬件层面,线程栈和堆都位于主存。部分线程栈和堆有时可能存在于 CPU 缓存和 CPU 内部寄存器。下图阐述了这种情况:

hardware-memory-architecture-and-java-memory-model

由于对象和变量可以被存储在计算机上如此多不同的内存区域,特定问题可能发生。两个主要问题是:

  • 线程更新(写入)共享变量的可见性。
  • 读取,检测和写入共享变量时的竞态条件。

下面我将解释这些问题。

Visibility of Shared Objects

如果多个线程共享相同对象,在没有恰当使用 volatile 声明或同步的情况下,一个线程对共享变量的更新可能对其它线程不可见。

想象共享对象一开始存储在主存中。运行在一个 CPU 上线程把共享对象读进 CPU 缓存。在缓存中,线程对共享对象做了更新。只要 CPU 缓存没有被刷新回主存,改变版本的共享变量对运行在其它 CPU 的线程就不可见。这种情况下,每个线程最终持有自身拷贝版本的共享对象,每份拷贝存在于不同 CPU 缓存。

下图展示了大概情况。运行于左边 CPU 的线程把共享对象拷贝进自己的 CPU 缓存,并且把 count 更新为 2。这个改变对运行于右边 CPU 的线程不可见,因为对 count 的更新还没有被刷新回主存。

update-variables-in-cpu-cache

要解决这个问题,你可以使用 Java 的 volatile 关键字。该关键字可以确保特定变量直接从主存读取,并且更新时总是写回主存。

Race Conditions

如果多个线程共享相同对象,并且超过一个线程更新该变量,竞态条件 就会发生。

想象如果线程 A 把共享对象 count 读到自己的 CPU 缓存,线程 B 也这样做,但是读到另一个 CPU 缓存。现在线程 A 为 count 加 1,线程 B 也这样做。现在 count 被增加了两次,每次在各自的 CPU 缓存。

如果这些增加被顺序执行,那么变量 count 就被增加了两次,原始值 + 2 会被写回主存。

但是,这两次增加已经在没有恰当同步的情况下并发执行。无论线程 A 和 B 中的哪个把它更新版本的 count 写回主存,更新后的值都只会比原始值大 1,尽管有两次增加。

下图展示了上面描述的一个带有竞态条件的并发问题:

a-race-condition-example

要解决这个问题你可以使用 Java 同步块。同步块保证特定时间只有一个线程可以进入代码的临界区。同步块也保证从主存读取在同步块内部访问所有变量,当线程退出同步块时,所有变量更新会被刷新回主存,不管变量是否被声明为 volatile。

13. Java Happens Before Guarantee

Java happens before 保证是一系列控制 Java 虚拟机和 CPU 如何重排指令以提高性能的规则。该保证使线程可以得知变量何时与主存同步,同时,哪些变量已经被同步。该保证以 volatile 变量的访问和同步块内变量的访问为核心。

本章教程会提及由 Java volatilesynchronized 声明提供的 happens before 保证,但我不会对这两个声明详细展开,它们将在后面的章节介绍。

Instruction Reordering

现代 CPU 有能力并行执行指令,只要这些指令不存在互相依赖。例如下面这两条:

a = b + c
d = e + f

但是,下面这两条指令就不容易并行执行,因为第二条指令依赖第一条指令的结果:

a = b + c
d = a + e

想象上面两条指令是一条大指令集合的一部分,像下面这样:

a = b + c
d = a + e

l = m + n
y = x + z

这些指令可以被重排为以下序列。那么 CPU 至少可以并行执行前 3 条指令,它们执行完毕才会开始执行第 4 条。

a = b + c

l = m + n
y = x + z

d = a + e

如你所见,重排指令可以增加 CPU 执行指令的并行性,这意味着性能提升。

只要程序语义没有改变,JVM 和 CPU 就可以进行指令重排。最终结果就像这些指令是完全按照源码中的排列顺序执行的。

Instruction Reordering Problems in Multi CPU Computers

指令重排为多线程多 CPU 系统带来了一些挑战。我将试图通过代码示例阐释这些问题。记住,这些示例都是为了解释问题特意编写的。所以无论如何,它们都不是推荐写法。

想象两个线程合作在屏幕上尽可能快地绘制画面帧。其中一个线程负责帧生成,另一个负责屏幕绘制。

这两个线程需要通过某种通信机制交换画面帧。在接下来的代码示例中,我创建了一个 Java 类 FrameExchanger 作为通信工具。

帧产生线程尽可能快地产生画面,帧绘制线程尽可能快地绘制画面。

有时生产者线程在绘制者线程有时间绘制之前可能产生了两帧。这种情况下,只有最后一帧会被绘制。我们不想绘制线程落后于生产线程。如果生产线程在前一帧被绘制之前又产生了新帧,那么前一帧会被简单覆盖。换句话说,前一帧被丢弃了。

有时绘制线程可能准备好绘制下一帧了但生产线程还没有产生新帧。这种情况下我们想让绘制线程等待生产者。没有理由浪费 CPU 和 GPU 资源重复绘制相同的帧画面。这种情况屏幕不会刷新,用户也不会看到任何改变。

FrameExchanger 会统计保存的帧数和拿走的帧数,这样我们就能知道有多少帧丢失了。

下面是 FrameExchanger 的代码。注意:Frame 类定义被省略了,它对于理解 FrameExchanger 原理并不重要。生产线程会持续调用 storeFrame(),绘制线程会持续调用 takeFrame() 方法。

public class FrameExchanger {

    private long framesStoredCount = 0;
    private long framesTakenCount = 0;
    private boolean hasNewFrame = false;
    private Frame frame = null;

    // called by Frame producing thread
    public void storeFrame(Frame frame) {
        this.frame = frame;
        this.framesStoredCount++;
        this.hasNewFrame = true;
    }

    // called by Frame drawing thread
    public Frame takeFrame() {
        while (!hasNewFrame) {
            // busy wait util new frame arrives
        }

        Frame newFrame = this.frame;
        this.framesTakenCount++;
        this.hasNewFrame = false;
        return newFrame;
    }
}

注意到 storeFrame() 方法内部的指令似乎不存在互相依赖。这意味着,JVM 和 CPU 会认为对它们进行指令重排是可以的,假定它们判断重排能够带来好处。但是,想象如果指令被重排为以下顺序会发生什么呢:

public void storeFrame(Frame frame) {
    this.hasNewFrame = true;
    this.framesStoredCount++;
    this.frame = frame;
}

注意到 hasNewFrame 被赋值 true 的时间早于 frame 属性被赋值新帧对象引用的时间。这意味着,如果绘制线程正在 takeFrame() 内部的循环中等待,它会退出循环继而拿走旧的帧对象。这会导致旧帧重绘,也引起资源浪费。

显然,这一特殊示例中,旧帧重绘不会导致应用崩溃或功能紊乱。它只是浪费了 CPU 和 GPU 资源。但其它指令重排情况可能导致应用功能紊乱。

The Java volatile Visibility Guarantee

Java volatile 关键字提供了可见性保证,即变量的写入和读取都和主存同步。这种写入和读取的主存同步使变量值对其它线程可见,因此使用术语 可见性保证 (visibility guarantee)

本节我将简要介绍 Java volatile 可见性保证,并解释指令重排如何打破这种保证。这就是为什么我们还有 Java volatile happens before 保证,来对指令重排做出限制,以确保可见性保证不会被指令重排破坏。

The Java volatile Write Visibility Guarantee

当你写入 volatile 变量值时,JDK 保证它被直接写到主存。此外,当线程写入 volatile 变量时,对它可见的其它所有变量也会一并同步到主存。

要解释 volatile 写入可见性保证,看下面的示例:

this.nonVolatileVarA = 34;
this.nonVolatileVarB = new String("Text");
this.volatileVarC    = 300;

示例包含两个非 volatile 变量写入,以及一个 volatile 变量写入。因为没有显示展示变量声明,为了澄清,想象名称为 volatileVarC 的变量(属性)被声明为 volatile

当示例中的第三条指令写入 volatile 变量 volatileVarC 时,两个非 volatile 变量的值也会被同步到主存,因为此时这些变量对正在写入 volatile 变量的线程可见。

The Java volatile Read Visibility Guarantee

当你读取 volatile 变量时,它被保证直接从主存读取。此外,当线程读取 volatile 变量时,所有对其可见的变量的值也会从主存刷新。

要解释 volatile 读可见性保证,看下面的示例:

c = other.volatileVarC;
b = other.nonVolatileB;
a = other.nonVoaltileA;

注意到第一条指令是读取 volatile 变量。当该变量被从主存读取时,另外两个非 volatile 变量也会从主存同步。

The Java Volatile Happens Before Guarantee

Java volatile happens before 保证对 volatile 变量周围的指令重排设置了一些限制。要解释为何这些限制是必要的,让我们修改教程前面的 FrameExchanger 类,把 hasNewFrame 变量声明为 volatile

public class FrameExchanger {

    private long framesStoredCount = 0;
    private long framesTakenCount = 0;
    private volatile boolean hasNewFrame = false;
    private Frame frame = null;

    // called by Frame producing thread
    public void storeFrame(Frame frame) {
        this.frame = frame;
        this.framesStoredCount++;
        this.hasNewFrame = true;
    }

    // called by Frame drawing thread
    public Frame takeFrame() {
        while (!hasNewFrame) {
            // busy wait util new frame arrives
        }

        Frame newFrame = this.frame;
        this.framesTakenCount++;
        this.hasNewFrame = false;
        return newFrame;
    }
}

现在,每当 hasNewFrame 变量被设置为 trueframeframesStoredCount 也会被同步到主存。此外,每次绘制线程在循环中读取该变量时,frameframesTakenCount 也会从主存更新。

想象如果 JVM 把 storeFrame() 方法中的指令重排为以下顺序:

// called by Frame producing thread
public void storeFrame(Frame frame) {
    this.hasNewFrame = true;
    this.framesStoredCount++;
    this.frame = frame;
}

现在当第一条指令执行时(因为 hasNewFramevolatile 的), frameframesStoredCount 属性也会被同步到主存,但同步先于它们被赋予新值。

这意味着,在 frame 变量被赋予新值之前,执行 takeFrame() 方法的绘制线程可能会从循环退出。即使生产者线程会为 frame 赋值,但该值不会被保证同步到主存,继而对绘制线程可见。

Happens Before Guarantee for Writes to volatile Variables

如你所见,storeFrame() 方法的指令重排可能使应用功能紊乱。这就需要引入 volatile 写 happens before 保证了,即对写入 volatile 变量周围的指令重排设置一些限制。

在写入 volatile 变量之前对任何变量的写入,被保证发生在写入 volatile 变量之前。

storeFrame() 示例中,前两条指令不能重排到最后写入 hasNewFrame 指令之前,因为 hasNewFrame 是 volatile 变量。

// called by Frame producing thread
public void storeFrame(Frame frame) {
    this.frame = frame;
    this.framesStoredCount++;
    this.hasNewFrame = true;  // hasNewFrame is volatile
}

前两条指令不是写入 volatile 变量,所以 JVM 可以自由对它们重排。因此,下面的重排是允许的:

// called by Frame producing thread
public void storeFrame(Frame frame) {
    this.framesStoredCount++;
    this.frame = frame;
    this.hasNewFrame = true;  // hasNewFrame is volatile
}

这种重排没有破坏 takeFrame() 方法的代码语义,因为 frame 变量仍然先于 hasNewFrame 写入,整个程序可以按预期工作。

Happens Before Guarantee for Reads of volatile Variables

Java 中的 volatile 变量对读取存在一个类似的保证,不过方向是相反的。

读取 volatile 变量先于后续对任何变量的读取。

我说方向和写入不同是指,先于 volatile 变量写入的其它任何变量写入仍然先于 volatile 写入。而后于 volatile 变量读取的其它任何变量读取仍然后于 volatile 读取。

看下面的示例:

int a = this.volatileVarA;
int b = this.nonVolatileVarB;
int c = this.nonVolatileVarC;

指令 2 和 3 必须仍然落后于指令 1,因为指令 1 读取了 volatile 变量。换句话说,对 volatile 变量的读取被保证先于后续两次对非 volatile 变量的读取。

后面两条指令可以自由重排,这不违反第一条指令读取 volatile 变量的先发生保证。因此,下面的重排是允许的:

int a = this.volatileVarA;
int c = this.nonVolatileVarC;
int b = this.nonVolatileVarB;

因为 volatile 的读可见性保证,当 this.volatileVarA 被从主存读取时,此时对该线程可见的其它所有变量也会这样。因此,this.nonVolatileVarBthis.nonVolatileVarC 也会同时从主存读取。这意味着,读取 volatileVarA 的线程可以相信 nonVolatileVarBnonVolatileVarC 也同样和主存一致。

如果后两条指令有一条被重排到第一条读取 volatile 变量的指令前,那么此时期待的对该指令的保证就不能维持了。这就是为何后续读取不能重排到 volatile 读取之前。

至于 takeFrame() 方法,第一条对 volatile 变量的读取是读取循环内部的 hasNewFrame 属性。这意味着,没有任何读取指令可以重排到这之前。此例中,将任何其它读取操作代码移动到循环之前都会打破代码语义,所以这些重排无论如何都不被允许。

// called by Frame drawing thread
public Frame takeFrame() {
    while( !hasNewFrame) {
        //busy wait until new frame arrives
    }

    Frame newFrame = this.frame;
    this.framesTakenCount++;
    this.hasNewFrame = false;
    return newFrame;
}

The Java Synchronized Visibility Guarantee

Java synchronized 块提供的可见性保证与 volatile 变量类似,我将简要介绍它们。

Java Synchronized Entry Visibility Guarantee

当线程进入 synchronized 块时,所有对其可见的变量都会从主存刷新。

Java Synchronized Exit Visibility Guarantee

当线程退出 synchronized 块时,所有对其可见的变量都会写会主存。

Java Synchronized Visibility Example

看下面的 ValueExchanger 类:

class Values {
    int valA;
    int valB;
    int valC;
}

public class ValueExchanger {
    private int valA;
    private int valB;
    private int valC;

    public void set(Values v) {
        this.valA = v.valA;
        this.valB = v.valB;

        synchronized (this) {
            this.valC = v.valC;
        }
    }

    public void get(Values v) {
        synchronized (this) {
            v.valC = this.valC;
        }

        v.valB = this.valB;
        v.valA = this.valA;
    }
}

注意 set()get() 方法内的同步块,它们分别置于方法的首和尾。

位于 set() 方法结尾的同步块会强制被更新后的所有变量同步回主存。刷新动作发生在线程退出同步块时。这就是把它置于方法结尾的原因 —— 保证所有变量值更新刷新回主存。

位于 get() 方法开头的同步块会强制线程进入它时,所有变量从主存重读取。这就是把它置于方法开头的原因 —— 保证所有变量被读取前已经从主存刷新。

Java Synchronized Happens Before Guarantee

Java 同步块提供了两个先发生保证:其中之一与同步块开始有关,另一个与同步块结束有关。下面我将分别介绍。

Java Synchronized Block Beginning Happens Before Guarantee

先前提到,同步块开始提供了可见性保证,即线程进入同步块时,任何对其可见的变量都从主存刷新。

要维持该保证,有必要对指令重排做出限制。为了解释原因,我会使用之前 ValueExchanger 的 get() 方法:

public void get(Values v) {
    synchronized (this) {
        v.valC = this.valC;
    }

    v.valB = this.valB;
    v.valA = this.valA;
}

如你所见,方法开始的同步块会保证所有变量 this.valCthis.valBthis.valA 都从主存刷新(读入)。接下来对这些变量的读取都会使用最新值。

要确保上述保证,没有任何变量读取可以被重排到同步块开始之前。如果有个变量读取被重排到同步块开始之前,你将失去变量值会从主存刷新的保证。就像下例这样,一个不被允许的指令重排:

public void get(Values v) {
    v.valB = this.valB;
    v.valA = this.valA;

    synchronized (this) {
        v.valC = this.valC;
    }
}

Java Synchronized Block End Happens Before Guarantee

同步块结束提供了这样的可见性保证,当线程退出同步块时,所有变量更改被写回主存。

要保持该保证,必须对指令重排做一些限制。为了说明,我会使用之前 ValueExchanger 的 set() 方法:

public void set(Values v) {
    this.valA = v.valA;
    this.valB = v.valB;

    synchronized(this) {
        this.valC = v.valC;
    }
}

如你所见,方法结尾的同步块保证,执行该方法的线程退出同步块时,所有变量更改 this.valA, this.valBthis.valC 都被写回(刷新)主存。

要确保该可见性,没有任何变量写入可以被重排到同步块结束之后。如果这样做,你就会失去变量值会被写回主存的保证。就像下面这样,一个不被允许的指令重排:

public void set(Values v) {
    synchronized(this) {
        this.valC = v.valC;
    }

    this.valA = v.valA;
    this.valB = v.valB;
}

14. Java Synchronized Blocks

Java 同步块 (synchronized block) 标识一个方法或一块代码是 同步的 (synchronized)。同步块一次只能被一个线程执行(取决于你怎样使用它)。因此它可以被用来避免 竞态条件。本章将详细解释 Java synchronized 关键字如何工作。

Java Concurrency Utilities

synchronized 机制是 Java 为多线程同步访问共享对象设计的第一个机制。但它并不太高级,这就是为何 Java 5 添加了一整套 并发工具类 帮助开发者实现比 synchronized 更精细的并发控制。

The Java synchronized Keyword

Java 中的同步块使用 synchronized 关键字标识,用来对某些对象同步。对相同对象同步的同步块某个时刻只能让一个线程执行其内部代码。所有其它试图进入同步块的线程会阻塞直到同步块内部线程退出。

synchronized 可以标识四种不同类型的代码块:

  1. 实例方法
  2. 静态方法
  3. 实例方法中的代码块
  4. 静态方法中的代码块

这些同步块同步不同对象,要使用哪种取决于具体情况。我将对它们逐一解释。

Synchronized Instance Methods

下面是一个同步的实例方法:

public class MyCounter {

    private int count = 0;

    public synchronized void add(int value) {
        this.count += value;
    }
}

注意到 add() 方法声明上使用了 synchronized 关键字,这告诉 Java 该方法是同步的。

Java 中的同步实例方法对拥有该方法的实例(对象)同步。因此,每个实例让自己的同步方法同步不同对象:拥有方法的实例。

对于每个实例,只有一个线程可以在同步实例方法内部执行。如果存在多个实例,则每个实例同时可以有一个线程在同步实例方法内部执行。每个实例一个线程。

这个规则对于相同对象(实例)上的所有同步实例方法都是成立的。因此,下面的示例中,只有一个线程可以在两个同步方法之一的内部执行。每个实例加起来只有一个线程。

public class MyCounter {

    private int count = 0;

    public synchronized void add(int value) {
        this.count += value;
    }

    public synchronized void subtract(int value) {
        this.count -= value;
    }
}

Synchronized Static Methods

静态方法和实例方法一样,也使用 synchronized 关键字标识。下面是一个同步静态方法示例:

public static class MyStaticCounter {

    private static int count = 0;

    public static synchronized void add(int value) {
        count += value;
    }
}

这儿的 synchronized 关键字也是告诉 Java 同步 add() 方法。

同步静态方法同步它所属类的 class 对象。由于 JVM 中每个类只有一个 class 对象,所以同一个类上,只有一个线程可以在静态同步方法内部执行。

假如类上包含多个静态同步方法,那么同时只有一个线程可以在这些方法中任意一个的内部执行。看下面的例子:

public static class MyStaticCounter {

    private static int count = 0;

    public static synchronized void add(int value) {
        count += value;
    }

    public static synchronized void subtract(int value) {
        count -= value;
    }
}

特定时间,只有一个线程可以在 add()subtract() 内部执行。如果线程 A 在执行 add(),那么线程 B 直到线程 A 退出才能执行 add() 或者 subtract()

如果静态同步方法处于不同类,那么每个类上可以有一个线程在静态同步方法内部执行。每个类一个线程无论它调用哪个静态同步方法。

Synchronized Blocks in Instance Methods

你无需同步整个方法,有时更推荐同步方法的一部分。Java 方法内部的同步块使这成为可能。

下面是一个非同步方法内部的同步块示例:

public void add(int value) {
    synchronized (this) {
        count += value;
    }
}

注意到 Java 同步块结构的括号中接收一个对象。本例中使用了 this,它是调用 add 方法的实例。括号中被同步结构接收的对象叫做 监视对象 (monitor object)。同步块会同步监视对象。同步实例方法使用的监视对象也是 this

对于同一个受同步的监视对象,只有一个线程可以执行同步块中的代码。

下面两个方法同步了同一个调用它们的实例,因此它们对于同步而言是等价的。

public class MonitorObject {

    static class Log {
        void writeln(String msg) {
        }
    }

    private static final Log log = new Log();

    public synchronized void log1(String msg1, String msg2) {
        log.writeln(msg1);
        log.writeln(msg1);
    }

    public void log2(String msg1, String msg2) {
        synchronized (this) {
            log.writeln(msg1);
            log.writeln(msg2);
        }
    }
}

因此本例中,只有一个线程可以在两个同步块中任意一个内部执行。

如果第二个同步块同步的对象不是 this,那么每个同步方法同时都可以有一个线程执行。

Synchronized Blocks in Static Methods

同步块也可以在静态方法内部使用。下面的两个示例和上一节的示例相同,只不过是静态的。这些方法同步的对象是方法所属类的 class 对象:

class MonitorObjectOfStaticMethods {

    static class Log {
        void writeln(String msg) {
        }
    }

    private static final Log log = new Log();

    public static synchronized void log1(String msg1, String msg2) {
        log.writeln(msg1);
        log.writeln(msg1);
    }

    public static void log2(String msg1, String msg2) {
        synchronized (MonitorObjectOfStaticMethods.class) {
            log.writeln(msg1);
            log.writeln(msg2);
        }
    }
}

同时只能有一个线程在两个方法任意一个内部执行。

如果第二个同步块同步的对象不是 MonitorObjectOfStaticMethods.class,那么每个方法同时都允许一个线程执行。

Synchronized Blocks in Lambda Expressions

在 Java Lambda 表达式 或者匿名类内部使用同步块也是可能的。

下面是一个包含同步块的 Java lambda 表达式。注意到同步块同步的对象是包含表达式类的 class 对象。如果其它对象更有意义(取决于特定用例),它也可以同步其它对象,但这儿同步该对象是没有问题的。

class SynchronizedBlockInLambdaExpression {

    public void lambdaExample() {
        Consumer<String> func = (param) -> {
            synchronized (SynchronizedBlockInLambdaExpression.class) {
                String name = Thread.currentThread().getName();

                System.out.println(name + " step 1: " + param);

                try {
                    Thread.sleep((long) (Math.random() * 1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println(name + " step 2: " + param);
            }
        };

        new Thread(() -> func.accept("Parameter"), "Thread 1")
                .start();

        new Thread(() -> func.accept("Parameter"), "Thread 2")
                .start();
    }
}

Java Synchronized Example

下面的示例启动了 2 个线程,它们都调用同一 Counter 实例的 add 方法。每次只有一个线程可以调用相同实例的 add 方法,因为方法同步了它所属的实例。

class Example {

    static class Counter {

        long count = 0;

        public synchronized void add(long value) {
            count += value;
        }
    }

    static class CounterThread extends Thread {

        protected Counter counter;

        public CounterThread(Counter counter) {
            this.counter = counter;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                counter.add(i);
            }
        }
    }

    public void currentExample() {
        Counter counter = new Counter();
        new CounterThread(counter).start();
        new CounterThread(counter).start();
    }
}

示例创建了两个线程,同一个 Counter 实例被传给了它们的构造器。Counter.add() 方法同步该实例,因为它是实例方法,并且使用 synchronized 标识。因此每次只能有一个线程调用 add() 方法。其它线程只能等待直到第一个线程离开。

如果两个线程引用了两个独立的 Counter 实例,那么它们同时调用 add() 方法就没什么问题。这些调用的目标将是不同对象,所以被调方法同步的对象也不同(方法所属对象)。所以调用不会阻塞。下面是代码示例:

public void currentExample() {
    new CounterThread(new Counter().start();
    new CounterThread(new Counter().start();
}

注意到现在两个线程不再引用相同 counter 实例。两个实例的 add 方法同步的是各自所属对象。所以调用第一个实例的 add 不会阻塞第二个实例的 add。

Synchronized and Data Visibility

不使用 synchronized(或 volatile)关键字,就不能保证一个线程修改了共享变量的值(一个所有线程都能访问的变量),其它线程可以看到更改。就不能保证何时一个线程所在 CPU 寄存器内的变量被 提交 (committed) 到主存,就不能保证何时其它线程会从主存 刷新 (refresh) 位于 CPU 寄存器中的变量。

synchronized 关键字改变了这种状况。当线程进入同步块时,它会刷新所有可见变量的值。当线程退出同步块时,它会把所见可见变量的更改提交到主存。volatile 关键字的工作机制类似。

Synchronized and Instruction Reordering

Java 编译器和虚拟机允许对你的代码进行指令重排,目的是使它们执行更快,通常是多个 CPU 并行执行。

如果代码被多线程同时执行,那么指令重排可能导致潜在问题。例如,如果同步块中写入变量的代码被重排到同步块外部。

为了解决这些问题,同步关键字在同步块开始,内部和结束上设置了一些限制。volatile 关键字也有类似限制。

最终结果是,你可以放心代码会工作良好 —— 没有会导致代码最终行为与你编写时不一致的指令重排。

What Objects to Synchronize On

上文多次提及,同步块必须指定同步对象。你其实可以选择任何对象,不过推荐做法是不要同步 String 和基本类型包装类,因为编译器会优化它们,以至于你认为你在代码中的不同地方使用了不同对象,实际上它们是相同的。看下面的示例:

synchronized("Hey") {
    // do something in here.
}

如果你有多个同步块对字面量 “Hey” 同步,那么编译器很可能会使用相同的字符串对象。结果是这些同步块同步了相同对象,那应该不是你期望的行为。

使用基本类型包装类也是类似情况,看下面的代码:

synchronized(Integer.valueOf(1)) {
    // do something in here.
}

如果你多次调用 Integer.valueOf(1),对于相同的输入参数,它们可能返回相同的包装类对象实例。这意味着,如果你在多个同步块使用相同基本类型包装类(例如多次使用 Integer.valueOf(1) 作为监视对象),那么你就面临这些同步块同步了相同对象的风险。这应该也不是你期望的行为。

最安全的做法是,同步 thisnew Object()。它们不会被编译器,JVM 或 Java 库缓存。

Synchronized Block Limitations and Alternatives

同步块有一些限制。例如,它一次只允许一个线程进入。但是,如果恰巧两个线程都想读取共享值,并且不更新它呢?允许这种行为是安全的。作为替代,你可以使用 Read / Write Lock 保护那段代码,读写锁是比同步块高级的锁语法。Java 实际上内置了一个 ReadWriteLock 供你使用。

如果你想让 N 个线程进入同步块而不仅仅是一个呢?你可以使用 Semaphore 完成这一行为。Java 实际上内置了一个 Semaphore 类供你使用。

同步块不保证等待进入其内部线程的进入顺序。如果你需要保证这一顺序要怎么办呢?你需要自己实现 Fairness

如果你仅有一个线程写共享变量,其它线程仅仅读取它呢?那么你只需使用 volatile 变量而非同步块。

Synchronized Block Performance Overhead

进入和退出同步块有些许性能开销。虽然 Java 已经尽可能优化,但代价仍然存在。

如果你在循环或类似情况下多次进入和退出同步块,就需要考虑这种消耗。

并且,如无必要,不要编写大型同步块。换句话说,只同步确实必要的操作 —— 避免阻塞其它线程执行没有必要同步的代码。确保同步块内只有绝对必要的指令。这会增加代码的并发性。

Synchronized Block Reentrance

一旦线程进入了同步块,它就持有了同步块监视对象的锁。如果线程调用另一个方法,该方法回调了包含同步块的第一个方法,那么持有锁的线程可以重新进入同步块。线程不会阻塞,因为它自己持有同一监视对象的锁。只有其它线程持有锁它才会阻塞。看下面的示例:

public class Reentrancy {
    List<String> elements = new ArrayList<>();

    public int count() {
        if (elements.size() == 0) {
            return 0;
        }
        synchronized (this) {
            elements.remove(0);
            return 1 + count();
        }
    }
}

忽略以上对列表元素的计数方式,它完全没有意义。只关注同步块内部递归调用了 count() 方法。那么,调用 count 的线程最终会多次进入相同的同步块,这被允许也是可能的。

但请记住,如果你没有仔细设计代码,让线程进入多个同步块可能导致 nested monitor lockout

Synchronized Blocks in Cluster Setups

记住同步块只能阻塞相同 JVM 上进入代码块的线程。如果你在多个 JVM —— 集群中运行了多个相同应用,那么同一时刻,每个 JVM 上只允许一个线程进入同步块。

如果需要在集群中跨 JVM 同步,你不能只使用同步块,而是需要其它同步机制。

15. Java Volatile Keyword

Java volatile 关键字标识变量 被存储到主存中 (being stored in main memory)。更准确的表述是,每次都从计算机主存读取 volatile 变量,而非 CPU 缓存,每次都把 volatile 变量写回主存,而不仅仅是 CPU 缓存。

实际上,自从 Java 5,该关键字有了更强保证,我将在下面章节为你解释。

Variable Visibility Problems

volatile 关键字保证变量的修改跨线程可见。这听起来可能很抽象,让我详细展开。

在多线程应用中,线程操作非 volatile 变量时,会将它们从主存拷贝到 CPU 缓存来提高性能。如果你的计算机包含多个 CPU,每个线程可能运行于不同 CPU。这意味着,每个线程会将变量拷贝到不同 CPU 的缓存上,就像下图这样:

copy-variables-to-different-cpu-cahces

不使用 volatile 变量,不能保证何时 JVM 会把数据读到 CPU 缓存,何时会把数据写回主存。这会导致一些问题,接下来我会解释它们。

想象一个解决方案,存在某个被多线程访问的共享变量,该变量包含一个计数器变量,它的声明如下:

public class SharedObject {

    public int counter = 0;
}

再想象,只有线程 1 增加 counter 变量,但线程 1 和 2 都时刻读取它。

如果 counter 变量不是 volatile 的,那么就不能保证它的值何时会从 CPU 缓存写回主存。这意味着,CPU 缓存中的值可能和主存中不一致。下图展示了该方案:

non-volatile-variables

由于变量没有被写回主存,导致其它线程看不到它的最新值的情况叫做 可见性 (visibility) 问题。一个线程的更新对其它线程不可见。

The Java volatile Visibility Guarantee

Java volatile 旨在解决可见性问题。通过把变量 counter 声明为 volatile,所有对它的修改都会立刻写回主存。同样,所有对它的读取都会直接从主存进行。

下面是声明 volatile 变量的语法:

public class SharedObject {

    public volatile int counter = 0;
}

把变量声明为 volatile 能保证线程对变量修改的可见性。

以上场景中,线程 T1 修改计数器,线程 T2 读取计数器(永不修改它),把计数器声明为 volatile 就足以保证 T2 可以看到计数器的改变。

但如果 T1 和 T2 都增加计数器,那么仅把计数器声明为 volatile 就不行了。见下文。

Full volatile Visibility Guarantee

事实上,volatile 的可见性保证超越了 volatile 变量自身,下面是完整内容:

  • 如果线程 A 写入 volatile 变量,线程 B 随后读取相同变量,那么在线程 A 写入 volatile 变量前所有对线程 A 可见的变量,在线程 B 读取 volatile 变量后都对线程 B 可见。
  • 如果线程 A 读取 volatile 变量,那么此时所有对线程 A 可见的变量都会重新从主存读取。

让我用代码解释这些保证:

public class VolatileGuarantee {

    private int years;
    private int months;
    private volatile int days;

    public void update(int years, int months, int days) {
        this.years = years;
        this.months = months;
        this.days = days;
    }
}

update() 方法写入了三个变量,只有 days 是 volatile 的。

完整的 volatile 可见性保证指,当值被写入 days 时,所有对线程可见的变量都被写回主存。那意味着,当线程写入 days 时,yearsmonths 也会被写回主存。

当读取 years, monthsdays 时,你可以这样做:

public class VolatileGuarantee {

    private int years;
    private int months;
    private volatile int days;

    public int totalDays() {
        int total = this.days;
        total += months * 30;
        total += years * 365;
        return total;
    }

    public void update(int years, int months, int days) {
        this.years = years;
        this.months = months;
        this.days = days;
    }
}

注意到 totalDays() 方法以 days 的读取开始。当读取 days 时,monthsyears 的值也会从主存刷新到缓存。因此,使用以上读取顺序,你被保证能看到 3 个变量的最新值。

Instruction Reordering Challenges

出于性能考量,JVM 和 CPU 被允许重排程序中的指令,只要指令语义保持不变。看下面的指令:

int a = 1;
int b = 2;

a++;
b++;

这段指令可以被重排为以下顺序而不丢失它们的语义:

int a = 1;
a++;

int b = 2;
b++;

但是,当存在 volatile 变量时,指令重排可能带来挑战。让我们再看之前的 VolatileGuarantee 类:

public class VolatileGuarantee {

    private int years;
    private int months;
    private volatile int days;

    public void update(int years, int months, int days) {
        this.years = years;
        this.months = months;
        this.days = days;
    }
}

只要 update() 方法写入了 days 的值,新的 yearsmonths 值也会写回主存。但是,如果 JVM 把指令重排成下面这样呢:

public void update(int years, int months, int days){
    this.days   = days;
    this.months = months;
    this.years  = years;
}

days 变量被修改时,monthsyears 的值仍会被写入主存,但此时它们还没有被写入新值。因此新值不能被其它线程看到。指令重排后的语义改变了。

Java 对此提出了解决方案,继续往下看。

The Java volatile Happens-Before Guarantee

旨在应对指令重排挑战,除了可见性保证,volatile 关键字还有一个 先发生 (happens-before) 保证。先发生保证的具体内容是

  • 对其它变量的读写如果本来就发生在 volatile 变量写入之前,它们就不能重排到其之后。发生在 volatile 变量写入之前的读写被保证 发生在 (happen) volatile 变量写入 之前 (before)。注意下面的情况仍是可能的,例如位于 volatile 变量写入之后的其它变量读取被重排到 volatile 变量写入之前。只是不能反过来。之后到之前被允许,之前到之后不被允许。
  • 如果读写本来就发生在 volatile 变量读之后,它们就不能被重排到其读取之前。注意下面这种情况是可能的,即发生在 volatile 变量读取之前的其它变量读被重排到 volatile 变量读之后。只是不能反过来,从前到后是允许的,但从后到前不允许。

以上 先发生 (happens-before) 强制保证了 volatile 关键字的可见性保证。

volatile is Not Always Enough

尽管 volatile 关键字保证所有 volatile 变量都直接从主存读取,直接写入主存,但仍然存在仅声明 volatile 变量不足以满足需求的情况。

之前的情形中,只有线程 1 对共享变量 counter 写,把它声明为 volatile 足以确保线程 2 总是能看见最新修改的值。

事实上,多线程同时写入共享 volatile 变量有时也能让正确的值保存到主存,只要新值不依赖旧值。换句话说,如果向共享 volatile 变量写值的线程,不会首先读取它的值,再根据旧值计算新值。

只要线程需要首先读取 volatile 值,并根据它生成新值,volatile 变量就不足以保证正确的可见性。读取和写入新值之间短暂的时间间隔会产生 竞态条件,此时多线程可能读取到相同的旧值,根据它产生各自新值,再将新值写回主存时,覆盖彼此的结果。

多线程增加相同计数器的场景就是以上情形的完美示例。下面我会详细解释。

想象如果线程 1 把共享 counter 变量的值 0 读到 CPU 缓存,在它上面加 1,并且还没有把改变写回主存。线程 2 也从主存读取 counter 变量到 CPU 缓存,它仍是 0,线程 2 继而为计算器加 1,并且也未把更改写回主存。下面的图片展示了这种情况:

multi-threads-read-then-write-shared-count-variable

线程 1 和 2 现在实际上失去了同步。共享 counter 的值本该是 2,但每个线程自己的 CPU 缓存中都是 1,主存中它仍是 0,完全乱了。即使最终线程会把它们的值写回主存,但结果是错的。

When is volatile Enough?

就像我前面提及的,如果两个线程都读写共享变量,那么使用 volatile 是不够的。你需要 synchronized 来保证变量读写是原子的。volatile 变量的读写不会阻塞读写线程。要达到该目的,你必须在临界区周围使用 synchronized 关键字。

作为 synchronized 的替代,你可以使用 java.util.concurrent 包下的诸多原子数据类型。例如,AtomicLongAtomicReference 或其它。

假如只有一个线程读写 volatile 变量,其它线程只是读取它,那么可以保证读线程看到写线程的最新修改。不使用 volatile,这种可见性不能保证。

volatile 关键字支持 32 和 64 位变量。

Performance Considerations of volatile

读写 volatile 变量会导致变量与主存同步。操作主存比操作 CPU 缓存代价高。访问 volatile 变量也会阻止某些指令重排,它是一种性能增强技术。因此,只有确实需要增强变量可见性时,你才应该使用 volatile 变量。

16. Java ThreadLocal

Java ThreadLocal 类使你能创建只能被创建线程读写的变量。因此,即使两个线程正在执行相同代码,并且持有同一个 ThreadLocal 变量引用,它们也看不到各自的 ThreadLocal 值。因此,ThreadLocal 类提供了一种实现 线程安全 的简单方式。

Creating a ThreadLocal

你可以像创建其它任何 Java 对象一样,使用 new 操作符创建 ThreadLocal 实例。见如下代码:

private ThreadLocal threadLocal = new ThreadLocal();

每个线程只需初始化一次,多线程现在就能获得和设置 ThreadLocal 中的值,并且每个线程只能看到它自己设置的值。

Set ThreadLocal Value

一旦 ThreadLocal 被创建,你就可以通过 set() 方法把值存储到它里面。

threadLocal.set("A thread local value");

Get ThreadLocal Value

使用 get() 方法获取存储于其中的值。见下例:

String threadLocalValue = (String) threadLocal.get();

Remove ThreadLocal Value

使用 remove() 方法可以删除设置在 ThreadLocal 中的值:

threadLocal.remove();

Generic ThreadLocal

你可以创建一个泛型 ThreadLocal。使用泛型使得它只能存储指定的数据类型。此外,你不需要对 get() 的返回值做类型转换。下面是一个泛型 ThreadLocal 示例:

private ThreadLocal<String> myThreadLocal = new ThreadLocal<String>();

现在你只能把字符串存储到 ThreadLocal 实例,并且,从其内部取值时,你无需进行类型转换:

myThreadLocal.set("Hello ThreadLocal");
String threadLocalValue = myThreadLocal.get();

Initial ThreadLocal Value

可以为 ThreadLocal 设置一个初始值,它会在调用 set() 设置新值前,用于 get() 方法返回。你有两个选项指定初始值:

  • 创建一个 ThreadLocal 子类,重写其 initialValue() 方法。
  • 创建一个附带 Supplier 接口实现的 ThreadLocal。

我将逐一介绍。

Override initialValue()

ThreadLocal 变量指定初始值的第一种方式是创建子类重写 initialValue() 方法。创建子类的最简方式是在声明 ThreadLocal 变量时,创建匿名类,见如下代码:

private final ThreadLocal<String> threadLocal = new ThreadLocal<String>() {
    @Override
    protected String initialValue() {
        return String.valueOf(System.currentTimeMillis());
    }
};

注意,不同线程仍然会看到不同的初始值。每个线程会创建自己的初始值。只有 initialValue() 方法确实返回了相同对象,所有线程才会看到这个对象。但是,使用 ThreadLocal 的第一考量就是避免不同线程看到相同实例。

Provide a Supplier Implementation

ThreadLocal 指定初始值的第二种方式是使用它的静态工厂方法 withInitial(Supplier),为该方法传递一个 Supplier 接口实现。该供应商实现为 ThreadLocal 供应初始值。下面是一个简单示例:

private final ThreadLocal<String> threadLocal = ThreadLocal.withInitial(new Supplier<String>() {
    @Override
    public String get() {
        return String.valueOf(System.currentTimeMillis());
    }
});

由于 Supplier函数式接口,可以使用 Lambda 表达式 实现。下面就是使用 lambda 表达式实现 Supplier 作为 withInitial() 参数的例子:

private final ThreadLocal<String> threadLocal = ThreadLocal.withInitial(
        () -> String.valueOf(System.currentTimeMillis()));

Lazy Setting of ThreadLocal Value

某些情况下,你不能使用标准方式设置初始值。例如,也许你需要一些配置信息,它们在创建 ThreadLocal 变量时无法获取。此时,你可以实施初始值懒设置。见下例:

public class DateFormatter {

    private final ThreadLocal<SimpleDateFormat> threadLocal = new ThreadLocal<>();

    private SimpleDateFormat getFormatter() {
        SimpleDateFormat formatter = threadLocal.get();
        if (formatter == null) {
            formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            threadLocal.set(formatter);
        }
        return formatter;
    }

    public String format(Date date) {
        return getFormatter().format(date);
    }
}

注意 format() 方法如何调用 getFormatter 来获取一个 SimpleDateFormat 实例。如果 SimpleDateFormat 还没有被设置到 ThreadLocal 中,一个新的 SimpleDateFormat 会被创建并设置。一旦线程设置了自己的 SimpleDateFormat,后续该线程使用的都是该 SimpleDateFormat 对象,但是只针对此线程。每个线程会创建自己的 SimpleDateFormat 实例,因为它们看不到各自 ThreadLocal 上的值。

SimpleDateFormat 类不是线程安全的,所有不允许多线程同时使用它。为了解决该问题,上例为每个线程创建了自己的 SimpleDateFormat,所以每个调用 format() 方法的线程会使用自己的 SimpleDateFormat 实例。

Using a ThreadLocal with a Thread Pool or ExecutorService

如果你打算在任务内部使用 ThreadLocal,这些任务会被传递给 Thread PoolExecutorService,那么请记住你无法确保哪个线程会执行它们。但是,如果你的需求只是确保每个线程使用自己的对象实例,这就不是一个问题。此时你可以搭配使用 ThreadLocal 和线程池、执行服务。

Full ThreadLocal Example

public class ThreadLocalExample {
    private static class Task implements Runnable {

        private final ThreadLocal<Integer> threadLocal = new ThreadLocal<>();

        @Override
        public void run() {
            threadLocal.set((int) (Math.random() * 100D));

            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(threadLocal.get());
        }
    }

    public static void main(String[] args) {
        Task sharedTask = new Task();

        new Thread(sharedTask).start();
        new Thread(sharedTask).start();
    }
}

本例创建了一个 Task 实例,它被传递给两个不同线程。两个线程都会执行 run() 方法,因此在 ThreadLocal 实例上设置不同值。如果对 set() 方法的调用被同步了,并且其所属对象类型不是 ThreadLocal,那么后执行的线程将会覆盖先执行线程设定的值。

但此处是 ThreadLocal,所以两个线程看不到各自的值,因此它们设置和得到了不同值。

InheritableThreadLocal

InheritableThreadLocalThreadLocal 的一个子类。持有 InheritableThreadLocal 的线程在创建子线程时,会将自己的 InheritableThreadLocal 值复制到子线程,作为它们 InheritableThreadLocal 的初始值。之后父子线程修改 InheritableThreadLocal 仍然是独立的。下面是一个完整的 InheritableThreadLocal 示例:

public class InheritableThreadLocalExample {

    public static void main(String[] args) {
        ThreadLocal<String> threadLocal = new ThreadLocal<>();
        InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>();

        new Thread(() -> {
            System.out.println("===== Thread 1 =====");
            threadLocal.set("Thread 1 - ThreadLocal");
            inheritableThreadLocal.set("Thread 1 - InheritableThreadLocal");

            System.out.println(threadLocal.get());
            System.out.println(inheritableThreadLocal.get());

            new Thread(() -> {
                System.out.println("===== ChildThread =====");
                System.out.println(threadLocal.get());
                System.out.println(inheritableThreadLocal.get());
            }).start();
        }).start();

        new Thread(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("==== Thread 2 =====");
            System.out.println(threadLocal.get());
            System.out.println(inheritableThreadLocal.get());
        }).start();
    }
}

本例创建了一个常规 ThreadLocal 和一个 InheritableThreadLocal。又创建了一个线程,它为 ThreadLocalInheritableThreadLocal 设置了值,随后又创建了一个子线程,子线程访问了 ThreadLocalInheritableThreadLocal 的值。只有 InheritableThreadLocal 的值对子线程可见。

示例最后创建了第三个线程,它试图访问 ThreadLocalInheritableThreadLocal 的值,但它看不到第一个线程设置的任何值。

示例运行输出如下:

===== Thread 1 =====
Thread 1 - ThreadLocal
Thread 1 - InheritableThreadLocal
===== ChildThread =====
null
Thread 1 - InheritableThreadLocal
==== Thread 2 =====
null
null

17. Thread Signaling

线程可以互相发送信号,也能等待其它线程的信号。比如,线程 B 可能等待线程 A 的信号,告诉它数据准备好可以处理了。

Signaling via Shared Objects

线程互相发送信号的简单方式是设置保存在共享对象中的信号值。线程 A 可以在同步块里把 boolean 成员变量 hasDataToProcess 赋值为 true,线程 B 也可以在同步块里读取该成员变量的值。下面的简单示例是一个持有信号的对象,它提供了设置和检测信号的方法:

public class Signal {

    protected boolean hasDataToProcess = false;

    public synchronized boolean hasDataToProcess() {
        return hasDataToProcess;
    }

    public synchronized void setHasDataToProcess(boolean hasData) {
        hasDataToProcess = hasData;
    }
}

要能够正常发送和接收信号,线程 A 和 B 必须持有同一个 Signal 实例引用。否则,它们就无法检测彼此的信号。要被处理的数据可以位于一个独立于 Signal 的共享缓存中。

Busy Wait

处理数据的线程 B 时刻等待有数据处理。换句话说,它等待线程 A 发送的信号,该信号会使 hasDataToProcess() 返回 true。下例中,线程 B 运行于循环中等待信号:

public static void main(String[] args) {
    Signal sharedSignal = new Signal();

    new Thread(() -> {
        while (!sharedSignal.hasDataToProcess()) {
            // do nothing... busy waiting
        }
        System.out.println("processing data");
    }, "ThreadB").start();

    new Thread(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        sharedSignal.setHasDataToProcess(true);
    }, "ThreadA").start();
}

注意到循环会一直执行,直到 hasDataToProcess() 返回 true。这叫忙等,线程忙于等待。

wait(), notify() and notifyAll()

忙等不能充分利用执行等待线程的 CPU,除非平均等待时间很短。否则,在收到它等待的信息前,让线程休眠或不活跃更加明智。

Java 内置了等待机制让线程在等待信号时处于不活跃状态。java.lang.Object 对象定义了 3 个方法完成该机制,它们是 wait(),notify(),和 notifyAll()。

线程调用任何对象的 wait() 方法都会变得不活跃,直到其它线程调用了同一对象的 notify()。为了调用 wait() 或 notify(),线程必须首先获得方法所属对象的锁。换句话说,它必须处于同步块中。下面是一个使用 wait() 和 notify() 的修改版 Signal。

public class WaitNotify {
    private static class MonitorObject {
    }

    private final MonitorObject monitorObject = new MonitorObject();

    public void doWait() {
        synchronized (monitorObject) {
            try {
                monitorObject.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void doNotify() {
        synchronized (monitorObject) {
            monitorObject.notify();
        }
    }
}

等待线程会调用 doWait(),通知线程会调用 doNotify()。当线程调用一个对象的 notify() 时,某个等待该对象的线程会被唤醒并准备执行。还有一个 notifyAll() 方法,它会唤醒所有等待特定对象的线程。

如你所见,等待和通知线程都必须在同步块中调用 wait() 或 notify(),这是强制的。线程不能在没有获得对象锁的情况下,调用该对象的上述 3 个方法。否则会抛出 IllegalMonitorStateException。

但是,会发生这种情况吗?等待线程一直在同步块内执行,不释放监视对象的锁,它阻塞通知线程,永远不能进入 doNotify() 的同步块。答案是,否。一旦线程调用了 wait(),它就释放了监视对象的锁。这样其它线程才能调用 wait() 或 notify(),因为这些方法需要调用者处于同步块中。

直到调用 notify() 的线程离开同步块,被唤醒线程才能退出 wait() 方法。换句话说,被唤醒线程在退出 wait() 方法前,必须重新获得监视对象的锁,因为 wait 调用处于同步块内部。如果使用 notifyAll() 唤醒多个线程,每次只有一个可以退出 wait() 方法,因为每个线程在退出前必须获得监视对象的锁。

Missed Signals

如果没有等待线程,notify() 和 notifyAll() 的唤醒信号会丢失。因此,如果线程调用 wait() 方法前另一个线程调用了 notify(),信号不会被等待线程收到。这可能会也可能不会导致问题,但某些情况下等待线程会永远等待,无法醒来,因为它错过了唤醒信号。

为了避免丢失信号,应该把它存在类中。在 WaitNotify 示例中,唤醒信号应该被存储在实例的成员变量上。下面是一个修改版 WaitNotify:

public class WaitNotify {
    private static class MonitorObject {
    }

    private final MonitorObject monitorObject = new MonitorObject();
    private boolean wasSignalled = false;

    public void doWait() {
        synchronized (monitorObject) {
            if (!wasSignalled) {
                try {
                    monitorObject.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // clear signal and continue running.
            wasSignalled = false;
        }
    }

    public void doNotify() {
        synchronized (monitorObject) {
            wasSignalled = true;
            monitorObject.notify();
        }
    }
}

注意到 doNotify() 方法在调用 notify() 前把 wasSignalled 设置为 true,并且 doWait() 方法在调用 wait() 前会检测该变量。事实上,只有前一次 doWait() 调用和本次调用间没有收到信号,wait() 方法才会执行。

Spurious Wakeups

由于未知原因,线程会被唤醒,即使没有其它线程调用 notify() 或 notifyAll()。这种情况叫假唤醒。没有原因的醒来。

如果 WaitNotify 类的 doWait() 方法发生了假唤醒,等待线程可能会在没有收到适当信号的情形下继续处理任务。这会导致应用的严重问题。

为了防止假唤醒,信号成员变量必须在循环中检测,而非 if 声明。这种循环又称自旋锁 (spin lock)。唤醒的线程自旋直到自选锁(循环)的条件变成 false。下面是一个使用自旋锁的 WaitNotify 版本:

public class WaitNotify {
    private static class MonitorObject {
    }

    private final MonitorObject monitorObject = new MonitorObject();
    private boolean wasSignalled = false;

    public void doWait() {
        synchronized (monitorObject) {
            while (!wasSignalled) {
                try {
                    monitorObject.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // clear signal and continue running.
            wasSignalled = false;
        }
    }

    public void doNotify() {
        synchronized (monitorObject) {
            wasSignalled = true;
            monitorObject.notify();
        }
    }
}

注意到 wait() 调用现在处于循环而非 if 声明中。如果等待线程醒来后发现没有收到信号,即 wasSignalled 成员变量仍是 false,程序会进行下一次循环,使唤醒线程重新等待。

Multiple Threads Waiting for the Same Signals

循环也是解决多线程等待的好方法,即便使用 notifyAll() 唤醒所有线程,也只有其中之一被允许继续运行。一次只有一个线程可以获得监视对象的锁,意味着只有一个线程可以退出 wait() 调用,随后清除 wasSignalled 标志。一旦该线程退出同步块,其它线程便可退出 wait(),在循环中检测 wasSignalled 成员变量。但是,标志被第一个唤醒线程清空了,所有剩下的唤醒线程会再次等待,直到下个信号到达。

Don't call wait() on constant String's or global objects

之前章节使用了字符串常量作为监视对象,就像下面这样:

class WaitNotifyWrongMonitor {
    final String monitor = "";
    boolean wasSignalled = false;

    public void doWait() {
        synchronized (monitor) {
            while (!wasSignalled) {
                try {
                    monitor.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            wasSignalled = false;
        }
    }

    public void doNotify() {
        synchronized (monitor) {
            wasSignalled = true;
            monitor.notify();
        }
    }
}

使用字符串常量的 wait() 和 notify() 的问题在于,JVM 或编译器内部会把它们转译成相同对象。这意味着即使你有两个不同 WaitNotify 实例,它们可能引用着相同的字符串对象。这还意味着调用第一个 WaitNotify 实例 doWait() 方法的线程,有被调用第二个实例 doNotify() 方法线程唤醒的风险。

基本情形如下图:

string-constant-as-monitor

记住,即使 4 个线程调用了共享 String 实例的 wait() 和 notify(),它们的信号也会保存在各自的 WaitNotify 实例中。一个 WaitNotify 实例的 doNotify() 可能唤醒另一个 WaitNotify 实例,但信号只会保存在第一个实例。

初看这似乎不是个大问题,毕竟,第二个实例调用了 doNotify() 只会不小心唤醒线程 A 和 B。被唤醒线程(A 和 B)会在循环中检测信号,随后重新等待,因为没有调用第一个 WaitNotify 实例的 doNotify(),这才是它们等待的对象。这种情形等价于假唤醒。线程 A 和 B 在没有收到信号的情况下醒来。但代码可以处理它,所以线程会重新等待。

问题在于,因为调用的是 notify() 而非 notifyAll(),只有一个线程会被唤醒,即使存在 4 个等待相同字符串实例的线程。所以,如果线程 A 和 B 有一个被唤醒,但实际信号却是给 C 和 D 的,那么线程 A 或 B 会检测信号,发现没有收到,重新等待。C 和 D 都不会醒来检测实际上发给它们的信号,所以信号丢失了。这种情况等价于之前描述的信号丢失问题。C 和 D 被发送了信号,但没有对它做出响应。

如果调用了 notifyAll(),所有等待线程会被唤醒并检测信号。线程 A 和 B 会重新等待,但线程 C 和 D 中的一个会注意到该信号,退出 doWait() 方法。另一个会重新等待,因为发现信号的线程在离开 doWait() 时清空了信号。

你可能倾向于总是调用 notifyAll(),但出于性能考虑,这不是个好主意。没有理由总是唤醒所有等待线程,在只有一个线程可以响应该信号的情况下。

所以:不要使用全局对象,字符串常量等完成 wait(),notify() 机制。使用代码块唯一的对象,对于上例,让每个 WaitNotify 实例持有自己的监视对象,而非空字符。

18. Deadlock

Thread Deadlock

死锁指多个处于阻塞状态的线程,互相等待彼此持有的锁。当多个线程都需要几个相同的锁,但它们获取锁的顺序不同时就可能发送死锁。

Deadlock Example

下面是一个死锁示例:

如果线程 1 锁定 A,并且试图锁定 B,线程 2 已经锁定 B,试图锁定 A,死锁就发生了。线程 1 永远无法获得 B,线程 2 永远无法获得 A。此外,它们自己都不知道这种情况。它们会在自己持有的对象上永远阻塞。这就是死锁。

伪代码如下:

Thread 1 locks A, waits for B
Thread 2 locks B, waits for A

下面的 TreeNode 类调用了不同实例的同步方法:

public class TreeNode {
    TreeNode parent = null;
    List<TreeNode> children = new ArrayList<>();

    public synchronized void setParentOnly(TreeNode parent) {
        this.parent = parent;
    }

    public synchronized void addChildOnly(TreeNode child) {
        if (!children.contains(child)) {
            children.add(child);
        }
    }

    public synchronized void setParent(TreeNode parent) {
        this.parent = parent;
        parent.addChildOnly(this);
    }

    public synchronized void addChild(TreeNode child) {
        if (!children.contains(child)) {
            children.add(child);
            child.setParentOnly(this);
        }
    }
}

如果线程 1 调用 parent.addChild(child),同时线程 2 调用 child.setParent(parent),并且两个线程持有的 parent 和 child 相同,死锁就会发生。下面是伪代码:

Thread 1: parent.addChild(child); // locks parent
          --> child.setParentOnly(parent);
Thread 2: child.setParent(parent); // locks child
          --> parent.addChildOnly(child);

首先线程 1 调用 parent.addChild(child)。由于它是同步方法,线程 1 锁定 parent 对象,其它线程不能访问它。

随后线程 2 调用 child.setParent(parent)。因为它是同步方法,线程 2 锁定 child 对象,其它线程不能访问它。

现在 child 和 parent 对象被不同线程锁定。接下来线程 1 试图调用 child.setParentOnly(),但 child 对象被线程 2 锁定了,所以方法调用会阻塞。线程 2 也试图调用 parent.addChildOnly(),但 parent 被线程 1 锁定了,所以该调用也会阻塞。现在两个线程都阻塞了,等待获得对方持有的锁。

注意:就像上面描述的,两个线程必须同时调用 parent.addChild(child) 和 child.setParent(parent),并且两个 parent 和 child 实例必须相同,这样死锁才能发生。以上代码可能会正常执行很长时间,直到突然进入死锁。

线程需要同时请求锁。如果线程 1 稍微先于线程 2,因此锁定了 A 和 B,那么线程 2 请求 B 的锁时就会阻塞,就不会发生死锁。通常线程执行时机不可预测,所以不能知道死锁何时发生。只是它可以发生。

More Complicated Deadlocks

死锁可能包含超过 2 个线程,这使它很难探测。下面是一个包含 4 个线程的死锁示例:

Thread 1 locks A, waits for B
Thread 2 locks B, waits for C
Thread 3 locks C, waits for D
Thread 4 locks D, waits for A

4 个线程依次循环等待。

Database Deadlocks

更复杂的死锁情形是数据库事务。一个事务可能由多条 SQL 更新组成。事务更新某条记录时,该记录会锁定,阻塞其它事务更新相同记录。相同事务中的每条更新都会锁定一些记录。

如果多个事务同时更新多条相同记录,就有死锁风险。

例如:

Transaction 1, request 1, locks record 1 for update
Transaction 2, request 1, locks record 2 for update
Transaction 1, request 2, tries to lock record 2 for update
Transaction 2, request 2, tries to lock record 1 for update

由于锁被不同请求拿走,并且无法提前知道事务需要的所有锁,很难检测和预防数据库事务死锁。

19. Deadlock Prevention

某些情况下可以避免死锁,本章会介绍 3 种技术。

  1. Lock Ordering
  2. Lock Timeout
  3. Deadlock Detection

你可以在以下仓库找到代码示例:

https://github.com/jjenkov/java-examples

Lock Ordering

死锁发生的条件是多个线程需要多个相同的锁,但请求它们的顺序不同。

如果你能确保任何线程获取锁的顺序是相同的,死锁就不会发生,看下面的示例:

Thread 1:

    lock A
    lock B

Thread 2:

    wait for A
    lock C (when A locked)

Thread 3:
    
    wait for A
    wait for B
    wait for C

如果某个线程,例如线程 3,需要多个锁,它必须以特定顺序持有。只有获得了前一个锁,才能获取后续的锁。

比如,线程 2 和 3 都不能锁定 C,除非它们首先锁定了 A。因为线程 1 持有了锁 A,线程 2 和 3 必须首先等待锁 A 释放。随后它们再尝试锁定 B 和 C 前,也必须成功锁定 A。

顺序锁定是简单但有效的死锁预防机制。但只有提前知道所有需要的锁,你才能使用它。然而实际情况通常并非如此。

Lock Timeout

另一个避免死锁的机制是为锁申请设定超时时间,这意味着试图获得锁的线程在尝试指定时间后会放弃锁。如果一个线程在指定时间内没有获得所有必要的锁,它会备份,释放所有获得的锁,随机等待一段时间再重新尝试。等待随机时间是让其它需要相同锁的线程有机会获得所有锁,因此让应用继续运行,避免死锁。

下面的示例中。两个线程试图以不同顺序获得两个相同的锁,它们会备份并重试:

Thread 1 locks A
Thread 2 locks B

Thread 1 attempts to lock B but is blocked
Thread 2 attempts to lock A but is blocked

Thread 1's lock attempt on B times out
Thread 1 backs up and release A as well
Thread 1 waits randomly (e.g. 257 millis) before retrying.

Thread 2's lock attempt on A times out
Thread 2 backs up and release B as well
Thread 2 waits randomly (e.g. 43 millis) before retrying.

上例中,线程 2 重试的时间大约比线程 1 早 200 毫秒,因此可能会获得所有锁。线程 1 此时将等待获得锁 A。线程 2 完成时,线程 1 也就可以获得两个锁(除非线程 2 或其它线程期间又持有了锁)。

需要记住的是,仅仅因为锁超时不足以判断线程已经死锁。它也可能意味着持有锁的线程(让其它线程超时的线程)要花很长时间完成任务。

此外,如果竞争多个相同资源的线程足够多,它们仍有可能一次次同时执行,即使有超时和备份。这在两个 2 线程,彼此等待 0 到 500 毫秒时可能不会发生,但 10 到 20 个线程情况就会不同。此时两个线程等待相同时间(或近到足以导致问题)的可能性就会很高。

锁定超时机制的问题在于,无法为进入同步块设定超时时间。你必须自己创建锁类或使用 java.util.concurrency 包下的并发结构。编写自定义锁类不难,但将在后续章节介绍。

Deadlock Detection

死锁检测是在无法进行锁排序,且锁超时不可行的情况下实施的较重的死锁预防机制。

每次线程 持有申请 锁都会记录到线程和锁的数据结构中(map,graph 等)。

当线程申请锁但被否决时,它可以遍历锁图检测死锁。比如,如果线程 A 申请锁 7,但该锁已经被线程 B 持有,那么线程 A 可以检测线程 B 是否申请了自己持有的任何锁。如果线程 B 确实申请过,那么死锁就发生了(持有锁 1 的线程 A 申请锁 7,持有锁 7 的线程 B 申请锁 1)。

当然,死锁场景可能比两个线程互相持锁更加复杂。线程 A 可能等待线程 B,线程 B 等待线程 C,线程 C 等待线程 D,线程 D 等待线程 A。线程 A 要想检测死锁,它必须传递检测线程 B 申请的所有锁。从线程 B 申请的锁,线程 A 可以到达线程 C,继而到达线程 D,最终发现线程 D 申请了自己持有的锁。此时它才知道死锁的发生。

下面是 4 个线程申请和持有的锁图(A,B,C,D)。这样的数据结构可以用来检测死锁。

graph-of-locks

那么检测到死锁又该做什么呢?

可能的动作是释放所有锁,备份,等待一段随机时间,然后重试。这类似于简单的锁超时机制,区别在于只有真的发生死锁线程才会备份,不仅仅因为锁超时。然而,如果许多线程都在竞争相同的锁,即便备份等待,它们也会反复进入死锁状态。

更好的选项是为线程计算和设置优先级,只让部分线程备份,其它线程继续持锁就像死锁没有发生。如果优先级是固定的,那么某些线程总是优于其它线程,为了避免这种情况,检测到死锁时,你可以为线程赋予随机优先级。

20. Starvation and Fairness

饥饿 (starvation) 是指其它线程占用了所有 CPU,导致某些线程不被授予时间。因为时间都授予了其它线程,导致部分线程饿死。解决该问题的方法叫 公平 (fairness),即所有线程被公平地授予执行机会。

Causes of Starvation in Java

下面是 Java 中导致线程饥饿的 3 个常见原因:

Threads with high priority swallow all CPU time from threads with lower priority

你可以为每个线程单独设置优先级。更高的优先级意味着线程获得更多 CPU 时间。优先级的值在 1 到 10 之间。该值如何被转译取决于应用运行的操作系统。对于大多数应用,最好保持优先级不变。

Threads are blocked indefinitely waiting to enter a synchronized block

同步块是另一个导致饥饿的原因。同步块不保证进入它的顺序与线程等待顺序相同。理论上,线程可能永远阻塞于进入同步块的尝试中,因为其它线程总是先于它被授予进入权限。该问题叫做饥饿,即线程被饿死了,因为所有时间都授予了其它线程。

Threads waiting on an object (called wait() on it) remain waiting indefinitely

如果有多个调用 wait() 的线程,notify() 不保证哪个线程被唤醒,它可能是任意一个等待线程。因此等待特定对象的某个线程可能永远不被唤醒,因为唤醒的总是其它线程。

Implementing Fairness in Java

尽管无法实现 100% 的公平,我们仍能实现自己的同步结构来增加线程间的公平性。

首先,让我们研究一个简单的同步块:

public class Synchronizer{

    public synchronized void doSynchronized() {
        // do a lot of work which takes a long time
    }

}

如果多个线程调用 doSynchronized() 方法,它们中的某些会阻塞直到第一个获得进入权限的线程离开该方法。如果多个线程被阻塞等待进入,接下来,Java 不保证哪个线程被授予访问权。

Using Locks Instead of Synchronized Blocks

为了增加等待线程的公平性,首先我们会把保护代码块由同步块改为锁:

public class Synchronizer {
    Lock lock = new Lock();

    public void doSynchronized() throws InterruptedException {
        lock.lock();
        // critical section, do a lot of work which takes a long time
        lock.unlock();
    }

}

注意 doSynchronized() 方法不再声明为 synchronized。取而代之,临界区现在被 lock.lock() 和 lock.unlock() 保护。

下面是一个简单的 Lock 类实现:

public class Lock {
    private boolean isLocked      = false;
    private Thread  lockingThread = null;

    public synchronized void lock() throws InterruptedException {
        while(isLocked){
            wait();
        }
        isLocked      = true;
        lockingThread = Thread.currentThread();
    }

    public synchronized void unlock() {
        if(this.lockingThread != Thread.currentThread()) {
            throw new IllegalMonitorStateException(
                "Calling thread has not locked this lock");
        }

        isLocked      = false;
        lockingThread = null;
        notify();
    }
}

观察上面的 Synchronizer 以及 Lock 类实现,你会注意到,如果多个线程同时调用 lock(),它们大都会在试图进入 lock() 时阻塞。其次,如果 lock 被锁定了,线程会被阻塞在 lock() 方法内 while(isLocked) 循环的 wait() 方法调用中。记住,调用 wait() 的线程会释放 Lock 实例的同步锁,这样其它等待进入 lock() 方法的线程才能进入。结果是许多线程最终都进入了 lock() 调用了 wait() 方法。

如果再回看 doSynchronized() 方法,你会注意到 lock() 和 unlock() 之间的注释,它注明两个方法调用间的代码需要较长时间执行。让我们进一步假设本段代码的执行时间长于进入 lock() 后因锁被锁定调用 wait() 的时间。这意味着等待能够锁定锁并进入临界区的主要时间花在 lock() 内调用 wait() 后的等待,而非被阻塞无法进入 lock() 方法。

就如之前所说,同步块不保证多个等待进入其中的线程谁会获得进入权。wait() 方法也不保证当 notify() 被调用时,哪个线程被唤醒。所以,当前版本的锁类在保证公平性方面,和同步版本的 doSynchronized() 没有区别。

当前版本的 Lock 类调用自己的 wait() 方法。反之,如果每个线程调用独立对象的 wait(),那么每个对象的 wait() 只会被一个线程调用,Lock 类就能确定这些对象中的哪个去调用 notify(),因此能够唤醒确定的线程。

A Fair Lock

下面的 FairLock 类由之前的 Lock 类修改而来。你能注意到在同步和 wait()/notify() 上,它的实现存在些许变更。

从之前的 Lock 类到达最终设计的具体过程是一个很长的故事,期间包含多个增量步骤,每个步骤都修复了前一步的问题:Nested Monitor LockoutSlipped ConditionsMissed Signals。为了保持问题清晰,它们不在本章讨论,但每一步都有自己的话题(见以上链接)。重要的是,现在每个调用 lock() 方法的线程被排队了,并且只有队列中的第一个线程被允许锁定 FairLock 实例,如果它处于开锁状态的话。所有其它线程会被搁置直到它们到达队首。

public class FairLock {

    private static class QueueObject {

        private boolean isNotified = false;

        public synchronized void doWait() throws InterruptedException {
            while (!isNotified) wait();
            isNotified = false;
        }

        public synchronized void doNotify() {
            isNotified = true;
            notify();
        }

        @Override
        public boolean equals(Object obj) {
            return this == obj;
        }
    }

    private boolean isLocked = false;
    private Thread lockingThread = null;
    private final List<QueueObject> waitingThreads = new ArrayList<>();

    public void lock() throws InterruptedException {
        QueueObject queueObject = new QueueObject();
        synchronized (this) {
            waitingThreads.add(queueObject);
        }

        while (true) {
            synchronized (this) {
                if (!isLocked && waitingThreads.get(0).equals(queueObject)) {
                    isLocked = true;
                    waitingThreads.remove(queueObject);
                    lockingThread = Thread.currentThread();
                    return;
                }
            }

            try {
                queueObject.doWait();
            } catch (InterruptedException e) {
                synchronized (this) {
                    waitingThreads.remove(queueObject);
                }
                throw e;
            }
        }
    }

    public synchronized void unlock() {
        if (lockingThread != Thread.currentThread()) {
            throw new IllegalMonitorStateException(
                    "Calling thread has not locked this lock"
            );
        }

        isLocked = false;
        lockingThread = null;
        if (waitingThreads.size() > 0) {
            waitingThreads.get(0).doNotify();
        }
    }
}

首先你会注意到 lock() 方法不再声明为 synchronized。反之只有必须同步的代码被嵌套到同步块中。

对于每个调用 lock() 的线程,FairLock 都会创建一个 QueueObject 对象,并把它入队。调用 unlock() 的线程会取出队首的 QueueObject,调用它的 doNotify() 方法,唤醒等待该对象的线程。这样每次只有一个等待线程被唤醒,而非所有线程。这部分代码管理着 FairLock 的公平。

注意,为了避免 滑动条件 (slipped conditions),锁的状态在同一个同步块内检测和设置。

还注意到 QueueObject 实际上是一个信号量,由 doWait()doNotify() 将信号存储于其内部。这样做的目的是解决以下情况,当一个线程调用 queueObject.doWait() 前被其它线程抢占,后者调用了 unlock() 方法,继而调用 queueObject.doNotify(),第一个线程就丢失了信号。queueObject.doWait() 调用位于 synchronized(this) 外部来避免 嵌套监视器锁定 (nested monitor lockout),这样当没有其它线程在 lock() 内部的同步块中时,另一线程就可以调用 unlock()。

最后,queueObject.doWait() 调用位于 try-catch 块内部。如果 InterruptedException 被抛出,线程会离开 lock() 方法,我们需要将监视对象出队。

A Note on Performance

比较 LockFairLock 类,你会发现 FairLock 的 lock() 和 unlock() 有更多代码。这些额外代码会导致它稍慢于 Lock 的同步机制。这对你的应用有多大影响取决于 FairLock 保护的临界区需要多长时间执行。它的执行时间越长,同步器增加的消耗就越不明显。当然这也取决于锁定和解锁被调用的频次。

21. Nested Monitor Lockout

How Nested Monitor Lockout Occurs

嵌套监视器锁定 (Nested monitor lockout) 是一个类似死锁的问题。它会像下面这样发生:

Thread 1 synchronizes on A
Thread 1 synchronizes on B (while synchronized on A)
Thread 1 decides to wait for a signal from another thread before continuing
Thread 1 calls B.wait() thereby releasing the lock on B, but not A.

Thread 2 needs to lock both A and B (in that sequence)
        to send Thread 1 the signal.
Thread 2 cannot lock A, since Thread 1 still holds the lock on A.
Thread 2 remain blocked indefinately waiting for Thread1
        to release the lock on A

Thread 1 remain blocked indefinately waiting for the signal from
        Thread 2, thereby
        never releasing the lock on A, that must be released to make
        it possible for Thread 2 to send the signal to Thread 1, etc.

这听起来像个完美的理论情形,但看下面这个不成熟的 Lock 实现:

// lock implementation with nested monitor lockout problem

public class Lock {
    protected static final class MonitorObject {
    }

    protected final MonitorObject monitorObject = new MonitorObject();
    protected boolean isLocked = false;

    public void lock() throws InterruptedException {
        synchronized (this) {
            while (isLocked) {
                synchronized (monitorObject) {
                    monitorObject.wait();
                }
            }
            isLocked = true;
        }
    }

    public void unlock() {
        synchronized (this) {
            isLocked = false;
            synchronized (monitorObject) {
                monitorObject.notify();
            }
        }
    }

}

注意到 lock() 方法首先同步了 this,随后同步了成员 monitorObject。如果 isLocked 是 false,这不会有什么问题,线程不会调用 monitorObject.wait()。但如果它是 true,那么调用 lock() 的线程就会停止于 monitorObject.wait() 调用的等待中。

问题在于,monitorObject.wait() 只会释放 monitorObject 的同步监视器,而不释放 this。换句话说,线程在持有 this 同步锁的情况下因等待暂停。

当之前锁定了 Lock 的线程试图通过调用 unlock() 解锁时,它会阻塞于进入 synchronized(this)。它将保持阻塞直到 lock() 离开了 synchronized(this)。但处于 lock() 方法中等待的线程,只有 isLocked 被设为 false,并且 monitorObject.notify() 被调用才能离开,而这发生在 unlock() 中。

简单来说,在 lock() 中等待的线程需要 unlock() 方法被成功执行才能退出同步块。但在 lock() 方法中的线程离开同步块之前,没有线程可以真正进入 unlock() 方法。

结果是任何调用 lock() 或 unlock() 的线程都将永远阻塞,这叫做嵌套监视器锁定。

A More Realistic Example

你可能会说你永远不会实现之前那样的锁,调用内部监视对象的 wait() 和 notify(),这可能是真的。但存在一些情形会出现以上设计。比如,如果你要在锁上实现 fairness。你想让每个线程调用自己队列对象的 wait(),这样就能每次唤醒一个线程。

看下面这个不成熟的公平锁实现:

// Fair Lock implementation with nested monitor lockout problem
public class FairLock {
    private static final class QueueObject {
    }

    private boolean isLocked = false;
    private Thread lockingThread = null;
    private final List<QueueObject> waitingThreads = new ArrayList<>();

    public void lock() throws InterruptedException {
        QueueObject queueObject = new QueueObject();

        synchronized (this) {
            waitingThreads.add(queueObject);
            while (isLocked || waitingThreads.get(0) != queueObject) {
                synchronized (queueObject) {
                    try {
                        queueObject.wait();
                    } catch (InterruptedException e) {
                        waitingThreads.remove(queueObject);
                        throw e;
                    }
                }
            }

            waitingThreads.remove(queueObject);
            isLocked = true;
            lockingThread = Thread.currentThread();
        }
    }

    public synchronized void unlock() {
        if (lockingThread != Thread.currentThread()) {
            throw new IllegalMonitorStateException("Calling thread has not locked this lock");
        }

        isLocked = false;
        lockingThread = null;
        if (waitingThreads.size() > 0) {
            QueueObject queueObject = waitingThreads.get(0);
            synchronized (queueObject) {
                queueObject.notify();
            }
        }
    }
}

第一眼看过去这个实现可能还好,但注意 lock() 内部的 queueObject.wait() 位于两层同步块中。上层对 this 同步,下层对 queueObject 局部变量同步。但线程调用 queueObject.wait() 时,它释放了 queueObject 上的锁,但没有释放 this 上的锁。

再注意,unlock() 方法声明为 synchronized,它等同于一个 synchronized(this) 块。这意味着,如果线程在 lock() 方法的 this 监视对象上等待,所有调用 unlock() 的其它线程将无限阻塞,等待持锁线程释放 this 上的锁。但这永远不会发生,因为只有线程成功向它发送了信号它才会醒来,而这只能通过执行 unlock() 方法。

所以,以上公平锁实现会导致嵌套监视器锁定,更好的实现描述在 Starvation and Fairness 章节。

Nested Monitor Lockout vs. Deadlock

嵌套监视器锁定的结果和死锁及其相同:相关线程最终永远阻塞于互相等待。

但是这两种情形并不相同。正如 Deadlock 中的说明,当两个线程获得锁的顺序不同会发生死锁。线程 1 锁定 A,等待 B。线程 2 锁定 B,等待 A。在 Deadlock Prevention 章节,我解释了,如果总是以相同顺序持锁(锁排序),死锁就能避免。然而,嵌套监视器锁定发生时,两个线程持锁的顺序完全相同。线程 1 锁定 A 和 B,随后释放 B 等待线程 2 的信号。线程 2 需要 A 和 B 发信号给线程 1。所以,一个等待信号,另一个等待锁被释放。

不同之处总结如下:

In deadlock, two threads are waiting for each other to release locks.

In nested monitor lockout, Thread 1 is holding a lock A, and waits
for a signal from Thread 2. Thread 2 needs the lock A to send the
signal to Thread 1.

22. Slipped Conditions

What is Slipped Conditions?

滑动条件 (slipped conditions) 指从线程检测特定条件开始,到条件达成执行动作,条件已经被其它线程改变,以至于线程执行动作是错误的。下面是一个简单示例:

public class Lock {
    private boolean isLocked = true;

    public void lock() {
        synchronized (this) {
            while (isLocked) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    // do nothing, keep waiting
                }
            }
        }

        synchronized (this) {
            isLocked = true;
        }
    }

    public synchronized void unlock() {
        isLocked = false;
        notify();
    }
}

注意到 lock() 方法包含两个同步块,第一个会等待直到 isLocked 为 false,第二个将 isLocked 设置为 true 锁定 Lock 实例。

想象 isLocked 为 false,两个线程同时调用 lock() 方法。第一个线程进入第一个同步块检测到 isLocked 为 false,退出同步块。此时第二个线程立即进入第一个同步块,也检测到 isLocked 为 false,随后它们先后进入第二个同步块,把 isLocked 设置为 true,并继续执行。

这种情况便是滑动条件。两个线程都检测条件,随后退出同步块,在任一线程为后续线程改变条件之前,其它线程已被允许检测条件。换句话说,条件从它被检测,到为后续线程更改之间划走了。

要避免滑动条件,它的检测和设值必须被线程原子执行,即在第一个线程检测和设置条件之前,其它线程不能检测条件。

上例的解决方案十分简单,只要把 isLocked = true; 移动到上面的同步块中,刚好位于 while 循环之后。见如下代码:

public class Lock {
    private boolean isLocked = true;

    public void lock() {
        synchronized (this) {
            while (isLocked) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    // do nothing, keep waiting
                }
            }

            isLocked = true;
        }
    }

    public synchronized void unlock() {
        isLocked = false;
        notify();
    }
}

现在,isLocked 条件的检测和设值位于相同同步块中原子完成。

A More Realistic Example

你有理由认为永远不会实现上面那样的 Lock,进而得出滑动条件是一个相当理论的问题。但上面的示例非常简单,只是为了表述滑动条件的概念。

一个更加现实的示例是,如我们在 Starvation and Fairness 讨论的,在实现公平锁的过程中,我们首先实现了一个不成熟版本,见 Nested Monitor Lockout,在尝试解决嵌套监视器锁定之后,很容易遭受滑动条件问题。首先我将展示之前的示例代码:

// Fair Lock implementation with nested monitor lockout problem
public class FairLock {
    private static final class QueueObject {
    }

    private boolean isLocked = false;
    private Thread lockingThread = null;
    private final List<QueueObject> waitingThreads = new ArrayList<>();

    public void lock() throws InterruptedException {
        QueueObject queueObject = new QueueObject();

        synchronized (this) {
            waitingThreads.add(queueObject);
            while (isLocked || waitingThreads.get(0) != queueObject) {
                synchronized (queueObject) {
                    try {
                        queueObject.wait();
                    } catch (InterruptedException e) {
                        waitingThreads.remove(queueObject);
                        throw e;
                    }
                }
            }

            waitingThreads.remove(queueObject);
            isLocked = true;
            lockingThread = Thread.currentThread();
        }
    }

    public synchronized void unlock() {
        if (lockingThread != Thread.currentThread()) {
            throw new IllegalMonitorStateException("Calling thread has not locked this lock");
        }

        isLocked = false;
        lockingThread = null;
        if (waitingThreads.size() > 0) {
            QueueObject queueObject = waitingThreads.get(0);
            synchronized (queueObject) {
                queueObject.notify();
            }
        }
    }
}

注意到 synchronized(queueObject) 中的 queueObject.wait(),它位于 synchronized(this) 块中,这就导致了嵌套监视器锁定问题。要解决它,synchronized(queueObject) 需要移动到 synchronized(this) 外部,见以下代码:

// Fair Lock implementation with slipped conditions problem
public class FairLock {
    private static final class QueueObject {
    }

    private boolean isLocked = false;
    private Thread lockingThread = null;
    private final List<QueueObject> waitingThreads = new ArrayList<>();

    public void lock() throws InterruptedException {
        QueueObject queueObject = new QueueObject();

        synchronized (this) {
            waitingThreads.add(queueObject);
        }

        boolean mustWait = true;
        while (mustWait) {
            synchronized (this) {
                mustWait = isLocked || waitingThreads.get(0) != queueObject;
            }

            synchronized (queueObject) {
                if (mustWait) {
                    try {
                        queueObject.wait();
                    } catch (InterruptedException e) {
                        waitingThreads.remove(queueObject);
                        throw e;
                    }
                }
            }
        }

        synchronized (this) {
            waitingThreads.remove(queueObject);
            isLocked = true;
            lockingThread = Thread.currentThread();
        }
    }
}

注意:以上代码仅展示了 lock() 方法,因为它是我唯一修改的方法。

现在 lock() 方法包含 3 个同步块。

第一个同步块 synchronized(this) 通过设定 mustWait = isLocked || waitingThreads.get(0) != queueObject 检测条件。

第二个同步块 synchronized(queueObject) 检查线程是否需要等待。

第三个同步块 synchronized(this) 只有在 mustWait = false 时才会执行。它改变几个变量,包括将 isLocked 条件设值为 true,随后离开 lock() 方法。

想象如果两个线程在开锁条件下同时调用 lock() 方法,第一个线程将会检测到 isLocked 为 false,第二个线程同样如此。那么它们都不会等待,而是将 isLocked 设置为 true。这就是滑动条件的基本示例。

译者注:这里应该不存在滑动条件,因为 waitingThreads.get(0) != queueObject 处于同步块中,要被顺序检测。除了第一个检测的线程,后续线程检测时,waitingThreads 元素数量大于 1,waitingThreads.get(0) != queueObject 返回 true,它们仍会等待。

Removing the Slipped Conditions Problem

要从上例中移除滑动条件,最后一个 synchronized(this) 必须移动到第一个同步块中。看下面我是如何修改它的:

public class FairLock {
    private static final class QueueObject {
    }

    private boolean isLocked = false;
    private Thread lockingThread = null;
    private final List<QueueObject> waitingThreads = new ArrayList<>();

    public void lock() throws InterruptedException {
        QueueObject queueObject = new QueueObject();

        synchronized (this) {
            waitingThreads.add(queueObject);
        }

        while (true) {
            synchronized (this) {
                if (!isLocked && waitingThreads.get(0) == queueObject) {
                    waitingThreads.remove(queueObject);
                    isLocked = true;
                    lockingThread = Thread.currentThread();
                    return;
                }
            }

            synchronized (queueObject) {
                try {
                    queueObject.wait();
                } catch (InterruptedException e) {
                    waitingThreads.remove(queueObject);
                    throw e;
                }
            }
        }
    }
}

现在条件检测和变量设置代码处于同一同步块中,如果条件满足线程会改变内部条件 (isLocked),其它任何线程再检测时就会得到 false。

当线程获得锁时,return 语句确保它退出 lock() 方法。

细心的读者会注意到以上公平锁实现仍会遭受信号丢失问题。想象线程调用 lock() 方法时,FairLock 实例已经被锁定。它会退出第一个同步块,如果此时之前的持锁线程调用了 unlock(),它会调用 queueObject.notify()。但由于第一个线程还没有调用 queueObject.wait(),唤醒信号就被淹没了。当它调用 wait 阻塞后可能不会再有其它线程调用 unlock() 方法。

信号丢失问题就是 Starvation and Fairness 章节公平锁的实现中,把 QueueObject 改成包含两个方法的信号量的原因:doWait() 和 doNotify()。这些方法在 QueueObject 内部存储和响应信号。这样即使 doNotify() 早于 doWait() 调用,信号也不会丢失。

23. Locks in Java

锁是和同步块类似的线程同步机制,不同之处在于它更精细。Locks (和更多其它同步机制)都由同步块创建,所以我们不可能完全消除 synchronized 关键字。

从 Java 5 开始,java.util.concurrent.locks package 下包含了若干锁实现,所以你可能不需要实现自己的锁。但懂得如何使用它们,并了解其实现背后的理论很有帮助。更多信息见我的 java.util.concurrent.locks.Lock 接口教程。

A Simple Lock

让我们从观察 Java 同步块开始:

public class Counter {
    
    private int count = 0;

    public int inc() {
        synchronized (this) {
            return ++count;
        }
    }
}

注意 inc() 方法中的 synchronized(this)。该同步块确保每次只有一个线程可以执行 return ++count。同步块中的代码可以更复杂,但简单的 ++count 足以说明问题。

Counter 类可以使用 Lock 代替同步块实现:

public class Counter {

    private final Lock lock = new Lock();
    private int count = 0;

    public int inc() {
        lock.lock();
        int newCount = ++count;
        lock.unlock();
        return newCount;
    }
}

lock() 方法锁定了 Lock 实例,这样其它调用 lock() 方法的线程都会阻塞直到 unlock() 被执行。

下面是一个简单的 Lock 实现:

public class Lock {
    private boolean isLocked = false;

    public synchronized void lock()
            throws InterruptedException {
        while (isLocked) {
            wait();
        }

        isLocked = true;
    }

    public synchronized void unlock() {
        isLocked = false;
        notify();
    }
}

注意 while(isLocked) 循环,它也叫做自旋锁(spin lock)。自旋锁和 wait() 及 notity() 方法的详细介绍见 Thread Signaling。当 isLocked 是 true,调用 lock() 的线程会因调用 wait() 而等待。如果线程被唤醒,它假定自己不可以往下安全执行,而是重新检测 isLocked 条件,这是为了避免未收到 notify 的情况下 wait 异常结束(又叫 Spurious Wakeup)。如果 isLocked 为 false,它会退出 while(isLocked) 循环,将 isLocked 设回 true,以此锁定其它调用 lock() 的线程。

当线程完成 critical section (lock() 和 unlock() 之间的代码)代码后,它会调用 unlock()。解锁方法会将 isLocked 设置为 false,并通知(唤醒)其中一个处于 wait() 调用中的等待线程,如果有的话。

Lock Reentrance

Java 同步块是可重入的。即如果线程进入了一个同步块,它就获得了块上监视对象的锁,那么该线程可以进入其它对相同监视对象同步的代码块。下面是一个示例:

public class Reentrant {
    public synchronized void outer() {
        inner();
    }

    public synchronized void inner() {
    }
}

注意到 outer() 和 inner() 都声明为 synchronized,这等价于 synchronized(this) 块。如果线程调用了 outer(),它可以在内部调用 inner(),因为两个方法(或块)同步了相同监视对象(this)。如果线程已经持有了监视对象的锁,那么它有权进入所有对相同监视对象同步的同步块。这叫做重入,线程可以再次进入任何对它已持有锁同步的代码块。

然而,虽然同步块可重入,但之前的 Lock 类并不能。如果我们把它改成下面这样,调用 outer() 的线程将阻塞于 inner() 方法的 lock.lock() 内部。

public class Reentrant {

    Lock lock = new Lock();

    public void outer() throws InterruptedException {
        lock.lock();
        inner();
        lock.unlock();
    }

    public void inner() throws InterruptedException {
        lock.lock();
        // do something
        lock.unlock();
    }
}

调用 outer() 的线程首先锁定 Lock 实例,随后它调用 inner()。在 inner() 内部,它试图重新锁定 Lock,但这不会成功(指线程会阻塞),因为 Lock 实例已经被 outer() 方法锁定了。

通过观察 lock() 方法的实现,我们能很明显地看出,在 unlock() 没有被调用前,第二次调用 lock() 时线程阻塞的原因:

public class Lock {
    private boolean isLocked = false;

    public synchronized void lock()
            throws InterruptedException {
        while (isLocked) {
            wait();
        }

        isLocked = true;
    }

    // ...
}

while 循环(自旋锁)中的条件决定线程是否被允许退出 lock() 方法。当前条件是 isLocked 必须为 false 线程才被允许,不管哪个线程锁定了它。

要使 Lock 类可重入我们需要做如下微小改变:

public class Lock {
    private boolean isLocked = false;
    Thread lockedBy = null;
    int lockedCount = 0;

    public synchronized void lock()
            throws InterruptedException {
        Thread callingThread = Thread.currentThread();
        while (isLocked && lockedBy != callingThread) {
            wait();
        }

        isLocked = true;
        lockedCount++;
        lockedBy = callingThread;
    }

    public synchronized void unlock() {
        if (Thread.currentThread() == lockedBy) {
            lockedCount--;
            if (lockedCount == 0) {
                isLocked = false;
                notify();
            }
        }
    }
}

注意到 while 循环(自旋锁)现在也把锁定 Lock 实例的线程考虑在内。如果锁没有被锁定(isLocked = false)或者调用线程是锁定了 Lock 实例的线程,while 循环将不会执行,调用 lock() 的线程将被允许退出方法。

此外,我们需要统计锁被相同线程锁定的次数。否则,unlock() 的一次调用就会解锁锁实例,即使锁被多次锁定。在持锁线程执行与 lock() 相同次数的 unlock() 之前,我们不想锁被解锁。

现在的 Lock 类变得可重入了。

Lock Fairness

Java 的同步块不保证授予线程访问权限的顺序与它们试图进入的顺序相同。因此,如果多个线程持续竞争相同同步块的访问权,就存在一到多个线程永远不能获得权限的风险 —— 访问权总是被授予其它线程,这叫做饥饿。为了解决该问题,Lock 应该公平。由于本节的锁实现内部使用了同步块,它不保证公平。阅读 Starvation and Fairness 了解更多饥饿和公平的细节。

Calling unlock() From a finally-clause

当使用 Lock 保护临界区时,关键代码可能抛出异常,从 finally 子句内部调用 unlock() 就非常重要。这保证 Lock 被解锁,之后其它线程可以锁定它。见下例:

lock.lock();
try {
    // do critical section code, which may throw exception
} finally {
    lock.unlock();
}

这个微小结构确保临界区抛出异常 Lock 也能被解锁。如果 unlock() 不放到 finally 子句里面,当临界区代码抛出异常时,Lock 会永远保持锁定状态,导致所有调用 Lock 实例上 lock() 方法的线程无限停止。

24. Read / Write Locks in Java

读写锁是比 Locks in Java 章节中的 Lock 实现更精细的锁。想象你有一个应用,它读写某些资源,但写的情况少于读。读取相同资源的两个线程彼此不会造成问题,所以读取资源的多个线程可以同时被授予权限,重叠访问。但是,如果有一个线程想写资源,就不能存在任何其它读、写线程。要解决允许多个读者但只允许一个写者的问题,需要一个读写锁。

Java 5 带来了读写锁实现,位于 java.util.concurrent 包。即便如此,了解理论背后的实现仍然很有帮助。

Read / Write Lock Java Implementation

让我们首先总结赋予资源读写权限的条件:

  • Read Access:没有线程写,也没有线程申请写操作
  • Write Access:没有线程读或写

如果线程想读取资源,只要没有线程正在写,也没有线程发出写请求。优先处理写请求的原因是,假设写请求比读请求更重要。此外,读经常发生时,如果我们不优先处理写,饥饿 就会发生,发送写请求的线程将会阻塞直到所有读者释放了 ReadWriteLock。如果新线程不断被授予读权限,等待写的线程就会无限期阻塞,导致 饥饿。因此只有当前不存在写线程,且不存在写请求线程时,才能授予读线程访问。

当不存在读线程,也不存在写线程时,想要写的线程可以被授予权限。有多少线程申请了写访问,以及它们的顺序并不重要,除非你想保证请求锁的线程间的公平。

有了这些简单规则,我们可以实现下面这样的 ReadWriteLock

public class ReadWriteLock {

    private int readers = 0;
    private int writers = 0;
    private int writeRequests = 0;

    public synchronized void lockRead() throws InterruptedException {
        while (writers > 0 || writeRequests > 0) {
            wait();
        }
        readers++;
    }

    public synchronized void unlockRead() {
        readers--;
        notifyAll();
    }

    public synchronized void lockWrite() throws InterruptedException {
        writeRequests++;

        while (readers > 0 || writers > 0) {
            wait();
        }

        writeRequests--;
        writers++;
    }

    public synchronized void unlockWrite() {
        writers--;
        notifyAll();
    }
}

ReadWriteLock 有两对 lock 和 unlock 方法。一对用于读访问,另一对用于写访问。

读访问规则实现于 lockRead() 方法。线程获得读权限的条件是,没有任何写线程,也没有任何线程申请过写操作。

写访问规则实现于 lockWrite() 方法。想要获得写访问的线程首先发起写申请(writeRequest++)。接着检测自己是否真的可以进行写操作。线程能够获得写权限的条件是,没有任何读线程,也没有任何写线程。有多少线程申请过写访问不重要。

值得注意的是,unlockRead()unlockWrite() 方法都调用了 notifyAll() 而非 notify()。要解释为什么,想象以下情形:

ReadWriteLock 内部存在多个等待读和等待写的线程。如果被 notify() 唤醒的是一个读线程,它将重新等待,因为存在等待写操作的线程。但是,没有任何等待写操作的线程被唤醒,所以什么都做不了,读写线程都没有获得访问权限。通过调用 notifyAll(),所有等待线程都会被唤醒来检测是否能获得自己想要的锁。

调用 notifyAll() 还有另外一个好处。如果存在多个等待读操作的线程,但没有等待写操作的线程,unlockWrite() 被调用后,所有读线程会同时获得访问权限,而非一个接一个。

Read / Write Lock Reentrance

之前展示的 ReadWriteLock 不是 可重入的。如果获得写权限的线程再次申请写锁,它将阻塞,因为已经存在写线程,即它自身。此外,考虑以下情形:

  1. 线程 1 获得读权限。
  2. 线程 2 请求写权限但因存在读线程而阻塞。
  3. 线程 1 重新请求读权限(重入锁),但因存在写请求线程而阻塞。

这种情形在之前的 ReadWriteLock 中就会锁定 —— 类似于死锁。读写请求线程都不能获得访问权。

Read Reentrance

要使 ReadWriteLock 对于读线程可重入,我们首先建立读重入规则:

  • 线程可以重入读锁的条件是,它本可以获得读权限(没有写线程,也没有写申请),或已经获得读权限(此时忽略写申请)。

要确定线程是否已经具有读权限,需要一个 Map 存储每个被授予读权限线程的引用,以及它获得读锁的次数。当判断读线程是否可以被授予权限时,会使用调用线程的引用检测 Map。下面是改变后 lockRead()unlockRead() 的样子:

public class ReentrantReadLock {

    private final Map<Thread, Integer> readingThreads = new HashMap<>();
    private int writers = 0;
    private int writeRequests = 0;

    public synchronized void lockRead() throws InterruptedException {
        Thread callingThread = Thread.currentThread();
        while (!canGrantReadAccess(callingThread)) {
            wait();
        }

        readingThreads.put(callingThread, (getReadAccessCount(callingThread)) + 1);
    }

    public synchronized void unlockRead() {
        Thread callingThread = Thread.currentThread();
        int count = getReadAccessCount(callingThread);

        if (count == 1) {
            readingThreads.remove(callingThread);
        } else {
            readingThreads.put(callingThread, count - 1);
        }

        /*
          在当前读线程没有从 map 移除前,
           1. 被唤醒的其它读线程只有 writeRequests == 0 时才可以一起读
           2. 被唤醒的写线程由于存在读线程只能重新等待
         */
        notifyAll();
    }

    private int getReadAccessCount(Thread callingThread) {
        Integer count = readingThreads.get(callingThread);
        if (count == null) {
            count = 0;
        }
        return count;
    }

    private boolean canGrantReadAccess(Thread callingThread) {
        return writers == 0 &&
                (isReader(callingThread) || writeRequests == 0);
    }

    private boolean isReader(Thread callingThread) {
        return readingThreads.get(callingThread) != null;
    }
}

如你所见,只有不存在写线程时才可以读重入。此外,如果调用线程已经拥有读权限,它的优先级高于写请求。

Write Reentrance

只有线程已经拥有写权限,它才可以写重入。下面是更改后的 lockWrite()unlockWrite()

public class ReentrantWriteLock {

    private final Map<Thread, Integer> readingThreads = new HashMap<>();
    private int writeAccesses = 0;
    private int writeRequests = 0;
    private Thread writingThread = null;

    public synchronized void lockWrite() throws InterruptedException {
        writeRequests++;
        Thread callingThread = Thread.currentThread();
        while (!canGrantWriteAccess(callingThread)) {
            wait();
        }
        writeRequests--;
        writeAccesses++;
        writingThread = callingThread;
    }

    public synchronized void unlockWrite() {
        writeAccesses--;
        if (writeAccesses == 0) {
            writingThread = null;
            notifyAll();
        }
        /*
            在当前写线程没有完全释放写锁时,
            1. 被唤醒的读线程只能等待
            2. 被唤醒的其它写线程也只能等待
         */
    }

    private boolean canGrantWriteAccess(Thread callingThread) {
        return isWriter(callingThread) ||
                (writingThread == null && !hasReaders());
    }

    private boolean hasReaders() {
        return readingThreads.size() > 0;
    }

    private boolean isWriter(Thread callingThread) {
        return writingThread == callingThread;
    }
}

注意现在判断调用线程是否能获得写权限时,考虑了它是否已经持有写锁。

Read to Write Reentrance

有时,有必要为已经获得读权限的线程赋予写权限。要允许这样做,读线程必须是唯一读者。实现该效果,需要对 writeLock() 方法做少许改动。下面是示例代码:

public class ReadToWriteLock {

    private final Map<Thread, Integer> readingThreads = new HashMap<>();
    private int writeAccesses = 0;
    private int writeRequests = 0;
    private Thread writingThread = null;

    public synchronized void lockWrite() throws InterruptedException {
        writeRequests++;
        Thread callingThread = Thread.currentThread();
        while (!canGetWriteAccess(callingThread)) {
            wait();
        }
        writeRequests--;
        writeAccesses++;
        writingThread = callingThread;
    }

    public synchronized void unlockWrite() {
        writeAccesses--;
        if (writeAccesses == 0) {
            writingThread = null;
            notifyAll();
        }
    }

    private boolean canGetWriteAccess(Thread callingThread) {
        return isWriter(callingThread) ||
                isOnlyReader(callingThread) ||
                noReaderOrWriter();
    }

    private boolean isWriter(Thread callingThread) {
        return writingThread == callingThread;
    }

    private boolean isOnlyReader(Thread callingThread) {
        return readingThreads.size() == 1
                && readingThreads.get(callingThread) != null;
    }

    private boolean noReaderOrWriter() {
        return readingThreads.size() == 0
                && writingThread == null;
    }
}

现在读写锁类对于读到写的访问就是可重入的了。

Write to Read Reentrance

有时具有写权限的线程也需要读权限。写者请求读权限总是应该被授予。如果线程具有写权限,其它任何线程都不能读,也不能写,所以这不危险。下面是改变后的 canGrantReadAccess() 方法。

private boolean canGrantReadAccess(Thread callingThread) {
    return isWriter(callingThread) ||
            (noWriter() && isReader(callingThread)) ||
            noWriterOrWriteRequest();
            // 第二个条件中的 noWriter() 可以不加,如果它是读线程,一定不存在写线程
}

private boolean isWriter(Thread callingThread) {
    return writingThread == callingThread;
}

private boolean noWriter() {
    return writingThread == null;
}

private boolean isReader(Thread callingThread) {
    return readingThreads.get(callingThread) != null;
}

private boolean noWriterOrWriteRequest() {
    return writingThread == null
            && writeRequests == 0;
}

Fully Reentant ReadWriteLock

下面是一个完全可重入的 ReadWriteLock 实现类。我做了一些重构让访问条件更易读,因此更容易说服自己它们是正确的。

public class ReentrantReadWriteLock {
    private final Map<Thread, Integer> readingThreads = new HashMap<>();
    private int writeAccesses = 0;
    private int writeRequests = 0;
    private Thread writingThread = null;

    /**
     * 当前线程可读,仅当以下条件之一成立:
     * 1. 自身有写权限(write-to-read, single thread)
     * 2. 自身有读权限
     * 3. 没有写线程,也没有写请求
     */
    public synchronized void lockRead() throws InterruptedException {
        Thread callingThread = Thread.currentThread();
        while (!canGrantReadAccess(callingThread)) {
            wait();
        }

        readingThreads.put(callingThread, getReadAccessCount(callingThread) + 1);
    }

    public synchronized void unlockRead() {
        Thread callingThread = Thread.currentThread();
        if (!isReader(callingThread)) {
            throw new IllegalMonitorStateException(
                    "Calling Thread does not hold a read lock on this ReadWriteLock"
            );
        }

        int count = getReadAccessCount(callingThread);
        if (count == 1) {
            readingThreads.remove(callingThread);
        } else {
            readingThreads.put(callingThread, count - 1);
        }
        // 没有写请求时,等待的读线程可以获得权限
        notifyAll();
    }

    /**
     * 当前线程可写,仅当以下条件之一成立:
     * 1. 自身有写权限
     * 2. 自身是唯一读线程(read-to-write, single thread)
     * 3. 没有读线程也没有写线程
     */
    public synchronized void lockWrite() throws InterruptedException {
        writeRequests++;
        Thread callingThread = Thread.currentThread();
        while (!canGetWriteAccess(callingThread)) {
            wait();
        }
        writeRequests--;
        writeAccesses++;
        writingThread = callingThread;
    }

    public synchronized void unlockWrite() {
        if (!isWriter(Thread.currentThread())) {
            throw new IllegalMonitorStateException(
                    "Calling Thread does not hold the write lock on this ReadWriteLock"
            );
        }

        writeAccesses--;
        if (writeAccesses == 0) {
            writingThread = null;
            notifyAll();
        }
        // 仍持有写锁时,其它线程不能得到访问权限
    }

    private boolean canGrantReadAccess(Thread callingThread) {
        return isWriter(callingThread) ||
                isReader(callingThread) ||
                noWriterOrWriteRequest();
    }

    private boolean isWriter(Thread callingThread) {
        return writingThread == callingThread;
    }

    private boolean isReader(Thread callingThread) {
        return readingThreads.get(callingThread) != null;
    }

    private boolean noWriterOrWriteRequest() {
        return writingThread == null
                && writeRequests == 0;
    }

    private int getReadAccessCount(Thread callingThread) {
        Integer count = readingThreads.get(callingThread);
        if (count == null) {
            count = 0;
        }
        return count;
    }

    private boolean canGetWriteAccess(Thread callingThread) {
        return isWriter(callingThread) ||
                isOnlyReader(callingThread) ||
                noReaderOrWriter();
    }

    private boolean isOnlyReader(Thread callingThread) {
        return readingThreads.size() == 1 &&
                readingThreads.get(callingThread) != null;
    }

    private boolean noReaderOrWriter() {
        return readingThreads.size() == 0 &&
                writingThread == null;
    }
}

Calling unlock() From a finally-clause

使用 ReadWriteLock 保护临界区时,区内代码可能抛出异常,把 unlockRead()unlockWrite() 放到 finally 子句调用是很重要的。这确保 ReadWriteLock 一定被释放,其它线程才能锁定它。下面是一个示例:

lock.lockWrite();
try {
    // do critical section code, which may throw exception
} finally {
    lock.unlockWrite();
}

这一微小结构确保临界区代码抛出异常 ReadWriteLock 也能被解锁。如果 unlockWrite() 没有处于 finally 子句,并且临界区抛出了异常,ReadWriteLock 将永远锁定,导致其它调用 lockRead()lockWrite() 的线程无限期停摆。唯一能重新解锁的方法是,ReadWriteLock 是可重入的,并且持有锁抛出异常的代码,后来又成功获得锁、执行临界区代码、调用 unlockWrite() 方法。但事情可以现在完成时,为何等待它将来发生?在 finally 子句调用 unlockWrite() 是更加健壮的方案。

25. Reentrance Lockout

重入锁定是一种类似于 死锁嵌套监视器锁定 的情形。它在 Locks in JavaRead / Write Locks 的部分小节也有介绍。

当线程重新进入 LockReadWriteLock 或其它不支持重入的同步器时,重入锁定就会发生。重入指已经持有锁的线程可以重新持有。Java 的同步块支持重入,所以以下代码没有问题:

public class Reentrant {
    public synchronized void outer() {
        inner();
    }

    private synchronized void inner() {
        // do something
    }
}

注意,outer()inner() 都被声明为同步方法,这等同于 synchronized(this) 块。如果线程调用了 outer(),它可以在内部正常调用 inner(),因为两个方法(或块)同步了相同监视对象(this)。如果线程已经持有一个监视对象的锁,它就可以进入所有对同一监视对象同步的代码块。这叫做重入。线程可以再次进入任何它已经持有监视对象锁的代码块。

以下锁实现是不可重入的:

public class Lock {
    private boolean isLocked = false;

    public synchronized void lock() throws InterruptedException {
        while (isLocked) {
            wait();
        }
        isLocked = true;
    }

    public synchronized void unlock() {
        isLocked = false;
        notify();
    }
}

如果线程两次调用 lock(),期间没有调用 unlock(),那么第二次调用将会阻塞,重入锁定就发生了。

要避免该问题你有两个选择:

  1. 避免编写锁重入代码
  2. 使用可重入锁

哪个选项最适合你的项目取决于具体情况。重入锁的性能通常弱于不可重入锁,并且更难实现,但对于你来说,这可能并非问题。代码是否易于使用锁重入实现,不能一概而论。

26. Semaphores

信号量是一个线程同步结构,它可以用来在线程间发送信号避免 信号丢失,也可以像 一样用来保护 临界区。Java 5 在 java.util.concurrent 包下引入了信号量类,所以你无需自己实现,但理解理论背后的细节和用法仍然很有帮助。

你可以在 java.util.concurrent.Semaphore 章节了解 Java 的内置实现。

Simple Semaphore

下面是一个简单的 Semaphore

public class Semaphore {
    private boolean signal = false;

    public synchronized void take() {
        signal = true;
        notify();
    }

    public synchronized void release() throws InterruptedException {
        while (!signal) wait();
        signal = false;
    }
}

take() 方法发送信号,该信号存储在 Semaphore 内部。release() 方法等待信号,它会在退出前清除接收到的信号。

这样使用 semaphore 可以避免信号丢失。你调用的是 take() 而非 notify()release() 而非 wait()。如果 take() 先于 release() 调用,调用 release() 的线程也能知道 take() 被调用,因为信号储存在信标内部的 signal 变量。使用 wait()notify() 没有这种效果。

使用信标通信时,方法名 take()release() 看上去有点奇怪。这是因为信标起初用于锁,我将在后文解释,此时这些名称就比较合理。

Using Semaphores for Signaling

下面是两个线程使用 Semaphore 通信的简单示例:

public class Signaling {
    private static class SendingThread extends Thread {
        final Semaphore semaphore;

        public SendingThread(Semaphore semaphore) {
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            while (true) {
                // do something, then signal
                semaphore.take();
            }
        }
    }

    private static class ReceivingThread extends Thread {
        final Semaphore semaphore;

        public ReceivingThread(Semaphore semaphore) {
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    semaphore.release();
                    // receive signal, then do something...
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore();
        SendingThread sender = new SendingThread(semaphore);
        ReceivingThread receiver = new ReceivingThread(semaphore);
        receiver.start();
        sender.start();
    }
}

Counting Semaphore

之前的 Semaphore 实现没有统计 take() 调用发送的信号次数。我们可以把它改成一个计数信标,下面是一个简单实现:

public class CountingSemaphore {
    private int signals;

    public synchronized void take() {
        signals++;
        notify();
    }

    public synchronized void release() throws InterruptedException {
        while (signals == 0) wait();
        signals--;
    }
}

Bounded Semaphore

CountingSemaphore 对它能存储多少信号没有设界,我们可以把它修改它成下面这样的有界计数信标:

public class BoundedSemaphore {
    private int signals;
    private final int bound;

    public BoundedSemaphore(int upperBound) {
        this.bound = upperBound;
    }

    public synchronized void take() throws InterruptedException {
        while (signals == bound) wait();
        signals++;
        notify();
    }

    public synchronized void release() throws InterruptedException {
        while (signals == 0) wait();
        signals--;

        // wake up sender
        notify();
    }
}

注意到如果发送信号的数量达到上界,take() 方法会阻塞。此时,只有线程调用了 release() 方法,调用 take() 方法的线程才会被允许发送信号。

Using Semaphores as Locks

有界信标可以作为锁使用。要这样做,把上界设为 1,在临界区前后调用 take()release() 保护代码。下面是一个示例:

BoundedSemaphore semaphore = new BoundedSemaphore(1);

semaphore.take();
try {
    // critical section
} finally {
    semaphore.release();
}

不同于信号发送用例,现在 take()release() 方法被同一线程调用。由于只有一个线程被允许获得信标,所有其它调用 take() 的线程都将阻塞,直到 release() 方法被调用。release() 调用永远不会阻塞,因为 take() 总是在它之前调用。

你也可以使用有界信标限制允许进入代码段的线程数量。例如,上例中,如果把 BoundedSemaphore 的上界设为 5 会怎样呢?5 个线程会被允许同时进入临界区。然而,你将不得不确保这 5 个线程间的操作不会冲突,否则你的应用将出现问题。

在 finally 块中调用 release() 方法确保即使临界区抛出异常,它也能成功调用。

27. Blocking Queues

阻塞队列是,当队列为空而你试图出队,或队列已满而你试图入队时,会阻塞的队列。试图对空队列出队的线程会阻塞,直到其它线程向队列中插入了元素。试图对满队列入队的线程会阻塞,直到其它线程从队列中腾出了空间,可以是出队,也可以是完全清空。

下图显示了通过阻塞队列协作的两个线程:

blocking-queue

Java 5 在 java.util.concurrent 包下内置了阻塞队列实现,你可以阅读我的 java.util.concurrent.BlockingQueue 文章了解更多信息。即便如此,了解理论背后的细节仍然十分重要。

Blocking Queue Implementation

阻塞队列的实现类似于 有界信标。下面是一个简单示例:

public class BlockingQueue<T> {
    private final List<T> queue = new LinkedList<>();
    private int limit = 10;

    public BlockingQueue(int limit) {
        this.limit = limit;
    }

    public synchronized void enqueue(T item) throws InterruptedException {
        while (queue.size() == limit) wait();

        queue.add(item);
        if (queue.size() == 1) notifyAll();
    }

    public synchronized T dequeue() throws InterruptedException {
        while (queue.size() == 0) wait();

        if (queue.size() == limit) notifyAll();
        return queue.remove(0);
    }
}

注意 enqueue()dequeue() 中的 notifyAll() 只有在队列大小等于边界(0 或上界)时才被调用。否则可能不存在等待出入对的线程。

28. The Producer Consumer Pattern

生产者消费者 是这样一种并发模型,即存在多个生产线程将产生的对象入队,多个消费线程从队列消费产品。入队对象通常代表一些需要完成的工作。将任务检测和任务执行代码解耦意味着你可以控制同时使用多少线程检测或执行任务。

producer-consumer

Use Cases

生产者消费者模型有许多不同用例,最常见是以下几个:

  • 减少前端线程延迟。
  • 不同线程间的负载均衡。
  • 反向压力管理。

Reduce Foreground Thread Latency

一些系统中,前台是一个单独线程,它负责与外界交互。在服务端,它可以是接收客户端连接的线程。在桌面端,它可以是界面线程。

为了让前台线程免于忙等 —— 任何从外界接收到的任务都将分发给多个后台线程。在服务端,存在连接监听线程和数据处理线程。

producer-consumer

对于桌面应用,前台(界面)线程可以响应用户事件 —— 如打开文件,下载文件,或保存文件。为了避免界面线程阻塞导致整个应用失去响应,界面线程可以把长期任务分发给后台工作线程。

producer-consumer

Load Balance Work Between Threads

另一类用例是实现工作线程集合上的负载均衡。事实上,只要保证工作线程一有时间,就立即从队列领取任务,负载均衡会非常自动地发生。

Backpressure Management

如果生产者和消费者使用的是 阻塞队列,你就能实现反向压力管理。这是该模型的又一天生特性。

反向施压指,如果生产线程产生的任务超过消费线程的处理能力 —— 任务会进入队列。在某个时间,阻塞队列会被填满,此时生产线程在试图插入新任务 / 工作对象时就会阻塞。这种现象叫反向施压。系统反过来向生产者施压 —— 控制流水线的启动 —— 阻止更多负载进入。

反向压力会 “溢出” 队列,减慢生产线程。因此,整个流水线上如果存在任何上游步骤,反向压力能够溯流而上。

29. Thread Pools

线程池 是一池线程,它可以被重用来执行任务,所以每个线程可以执行多个任务。线程池是为每个需要执行的任务创建新线程的替代手段。

创建新线程比重用已有线程需要更大性能开销。这就是为何重用已有线程执行任务比为每个任务创建新线程有更高的吞吐量。

此外,使用线程池可以更容易地控制同时存在多少活跃线程。每个线程都会消耗特定数量的计算机资源,如内存,如果同时激活太多线程,过大资源消耗可能导致计算机减速 —— 例如,如果太多内存被消耗,操作系统会开始将内存页交换到硬盘。

本章我将解释线程池原理,如何使用,以及如何实现它。记住,Java 已经内置了线程池 —— ExecutorService —— 所以你无需自行实现。但可能某天你会想要自己实现 —— 为了增加执行服务不支持的功能,或者为了学习经验。

How a Thread Pool Works

不是为每个任务创建新线程同时执行,取而代之,任务可以被传递给线程池。只要池中存在任何空闲线程,任务就会分配给其中之一执行。线程池内部,任务被插入 Blocking Queue,池中线程会从队列领取任务。当新任务插入,其中某个空闲线程会成功将其出队并执行。剩下的空闲线程会等待任务出队而阻塞。

thread-pools.png

Thread Pool Use Cases

线程池经常用于多线程服务器。由网络到达服务端的每个连接被包装为一个任务,传递给线程池。线程池中的线程将并发处理这些请求。我将在后文详细介绍 Java 多线程服务的实现。

Built-in Java Thread Pool

Java 在 java.util.concurrent 包下内置了线程池类,所以你不必自己实现它。阅读 java.util.concurrent.ExecutorService 了解更多信息,无论如何,理解一点线程池的实现细节有益无害。

Java Thread Pool Implementation

下面是一个简单的线程池实现。它使用了 Java 5 引入的标准 BlockingQueue

public class ThreadPool {

    private static final class PoolThreadRunnable implements Runnable {

        private Thread thread;
        private boolean isIdle;
        private boolean isStopped;
        private final BlockingQueue<Runnable> taskQueue;

        public PoolThreadRunnable(BlockingQueue<Runnable> taskQueue) {
            this.taskQueue = taskQueue;
        }

        @Override
        public void run() {
            thread = Thread.currentThread();
            while (!isStopped) {
                try {
                    Runnable task = taskQueue.take();
                    isIdle = false;
                    task.run();
                } catch (InterruptedException e) {
                    // log or otherwise report exception,
                    // but keep pool thread alive.
                } finally {
                    isIdle = true;
                }
            }

        }

        public synchronized void doStop() {
            isStopped = true;
            // break pool thread out of dequeue() call.
            thread.interrupt();
        }

        public synchronized boolean isIdle() {
            return isIdle;
        }
    }

    private final BlockingQueue<Runnable> taskQueue;
    private final List<PoolThreadRunnable> runners = new ArrayList<>();
    private boolean isStopped;

    public ThreadPool(int noOfThreads, int maxNoOfTasks) {
        this.taskQueue = new ArrayBlockingQueue<>(maxNoOfTasks);

        for (int i = 0; i < noOfThreads; i++) {
            runners.add(new PoolThreadRunnable(taskQueue));
        }

        runners.forEach(runnable -> new Thread(runnable).start());
    }

    public synchronized void execute(Runnable task) {
        if (isStopped) {
            throw new IllegalStateException("ThreadPool is stopped");
        }

        taskQueue.offer(task);
    }

    public synchronized void stop() {
        isStopped = true;
        runners.forEach(PoolThreadRunnable::doStop);
    }

    @SuppressWarnings("BusyWait")
    public synchronized void waitUtilAllTasksFinished() {
        // 任务都被领取,线程都处于空闲状态才能退出
        while (taskQueue.size() > 0 ||
                runners.size() != runners
                        .stream()
                        .filter(PoolThreadRunnable::isIdle)
                        .count()) {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        ThreadPool pool = new ThreadPool(3, 10);

        for (int i = 0; i < 10; i++) {
            final int taskNo = i;
            pool.execute(() -> System.out.println(
                    Thread.currentThread().getName() + ": Task " + taskNo
            ));
        }

        pool.waitUtilAllTasksFinished();
        pool.stop();
    }
}

线程池由两部分组成,ThreadPool 类是线程池的公共接口,PoolThread 类是执行任务的线程。

要执行任务,使用 Runnable 作为参数调用 ThreadPool.execute(Runnable r)Runnable 被放到内部的 阻塞队列,等待线程领取。

Runnable 会被空闲的 PoolThread 领取并执行。你可以在 PoolThread.run() 看到该操作。执行完一个任务,PoolThread 会再次循环领取新任务,直到它被停止。

要停止 ThreadPool,调用 ThreadPool.stop() 方法。内部成员变量 isStopped 记录了 stop() 是否被调用。随后通过调用每个线程的 doStop() 停止池内所有线程。注意在 stop() 被调用后,execute() 再次被调用时会抛出 IllegalStateException

线程在完成它们的任务后会停止。注意 PoolThread.doStop() 方法内的 thread.interrupt() 调用,它确保阻塞在 taskQueue.take() 方法内 wait() 调用的线程,打破等待,通过抛出 InterruptedException 离开该方法。该异常会被 PoolThread.run() 捕获和报告,随后 isStopped 变量被检测。因为该变量现在为真,所以 PoolThread.run() 会退出,线程也随之死亡。

30. Compare and Swap

比较和交换 是一种用于设计并发算法的技术。大体上,它将变量的值与预期值比较,如果相等,那么使用新值替换旧值。比较和交换听起来有点复杂,但一旦理解就相当简单,所以让我来深入阐释该话题。

顺便说一句,比较和交换有时缩写为 CAS,所以如果你在某些并发文章或视频中看到 CAS,它很大可能指比较和交换。

Compare and Swap for Check Then Act Cases

并发算法中的常见模式是 检查并执行,即代码先检查变量的值,如果满足条件,执行动作。下面是一个简单示例:

public class ProblematicLock {

    private volatile boolean locked;

    public void lock() {
        while (locked) {
            // busy wait - util locked == false
        }
        locked = true;
    }

    public void unlock() {
        locked = false;
    }
}

上述代码不是百分百正确的多线程锁实现。这就是为何我使用 Problematic 命名该类。但是,创建这一错误实现的目的是阐述如何使用比较和交换功能修复它。

lock() 方法首先 检查 成员变量 locked 是否为 false,这处于 while 循环中。如果该变量为 false,方法会离开循环并将 locked 设置为 true。换句话说,lock() 方法首先检查 locked 的值,随后基于结果执行动作。检查,随后执行。

如果多个线程访问相同 ProblematicLock 实例,上述 lock() 方法将无法正确工作。例如:

如果线程 A 检测到 lockedfalse(预期值),它将退出 while 基于检查执行动作。如果在线程 A 将 locked 设为 true 之前,线程 B 也检测了 locked 的值,那么它也将退出循环,基于它的检查执行动作。这就是一个经典的 race condition

Check Then Act - Must Be Atomic

要使多线程应用工作正常(避免竞态条件),检查和执行 必须是原子的。这意味着,检测和交换必须作为原子代码块(非分离)执行。任何线程开始执行这一代码块,都要执行块内所有代码,期间不能被其它线程打断。其它线程不能同时执行原子块代码。

让 Java 代码块成为原子操作的简单方式是使用 synchronized 关键字,阅读 Java synchronized tutorial 了解更多细节。下面是使用该关键字修改 lock() 方法后的 ProblematicLock

public class ProblematicLock {

    private volatile boolean locked;

    public synchronized void lock() {
        while (locked) {
            // busy wait - util locked == false
        }
        locked = true;
    }

    public void unlock() {
        locked = false;
    }
}

现在 lock() 成为同步方法,所以同一时刻,相同 Lock 实例,只能被一个线程执行。lock() 方法实际上是原子的。

Blocking Thread is Expensive

当两个线程试图同时进入同步块时,其中之一会阻塞,另一个会被允许进入。当进入同步块的线程退出后,等待线程才被允许进入。

如果线程被允许进入同步块,它的代价并不高。但如果它因其它线程已经在执行而阻塞,那么阻塞动作就比较昂贵。

此外,你无法保证同步块自由后,阻塞线程何时被解锁。通常 OS 或执行平台负责协调阻塞线程的解锁,虽然这不会花费几秒或几分,但总会浪费一定时间,在此之前线程本可以访问共享数据结构。见下图说明:

compare-and-swap

Hardware Provided Atomic Compare And Swap Operations

现代 CPU 内置了原子的比较和交换操作。该操作在部分情况下可以取代同步块或其它阻塞数据结构。CPU 保证同时只有一个线程执行比较和交换动作 —— 甚至跨越 CPU 核。后文你会看到它们在代码中的样子。

使用硬件 / CPU 提供的比较和交换功能,而非操作系统或执行平台提供的同步块,锁,互斥量等,系统或执行平台无需处理线程的阻塞和解锁。这能使线程执行比较和交换动作前的等待时间更短,因此带来更少的拥塞和更高的吞吐量。见下图说明:

compare-and-swap

如你所见,试图进入共享数据结构的线程没有完全阻塞。它在持续尝试比较和交换动作,直到成功后被允许访问共享数据结构。这样线程进入共享数据结构前的延迟得以最小化。

当然,如果线程处于长时间重复执行比较和交换的等待中,这会浪费许多 CPU 周期,它们本可被其它任务(其它线程)使用。不过,大多数案例不会发生这种情况,取决于共享数据结构被其它线程使用多长时间。实践中,共享数据不会长期使用,所以上述情况不会经常发生。但再次强调 —— 这取决于具体情况,代码,数据结构,尝试访问数据结构的线程数,系统负载等。相反,阻塞线程完全不使用 CPU。

Compare and Swap in Java

从 Java 5 开始,你可以通过 java.util.concurrent.atomic 包下的类访问 CPU 级别的比较和交换功能。这些类是:

使用 Java 5+ 内置的比较和交换而非自己实现的优点是,它们可以让你利用 CPU 级别特性。这让你的代码更块。

Compare and Swap as Guard

比较和交换功能可以用来保护临界区 —— 因此阻止多线程同时执行关键代码。

下面的示例展示了如何通过 AtomicBoolean 类上的比较和交换功能实现之前的 lock() 方法,让它作为一个守护函数(一次只有一个线程可以退出 lock() 方法)。

public class CompareAndSwapLock {

    private final AtomicBoolean locked = new AtomicBoolean(false);

    public void unlock() {
        locked.set(false);
    }

    public void lock() {
        while (!locked.compareAndSet(false, true)) {
            // busy wait - util compareAndSet() succeeds
        }
    }
}

注意 locked 变量不再是 boolean 而是 AtomicBoolean。该类有一个 compareAndSet() 函数,它会比较 AtomicBoolean 实例的值和预期值,如果与预期值相等,它会使用新值替换旧值。如果替换成功,该方法返回 true,否则返回 false

上例中,compareAndSet() 调用检测 locked 是否为 false,如果是,则将新值设为 true

由于同时只允许一个线程执行 compareAndSet() 方法,只有一个线程会看到它的值是 false,因此将其交换为 true。同时只有一个线程可以退出 while 循环 —— 每次只有一个线程因为 unlock() 调用执行 locked.set(false) 解除锁定。

Compare and Swap as Optimistic Locking Mechanism

可以把比较和交换功能作为乐观锁机制。乐观锁允许多个线程同时进入临界区,但只有一个线程可以在临界区结尾提交其工作。

下面是一个使用乐观锁策略的并发计数器类:

public class OptimisticLockCounter {

    private final AtomicLong count = new AtomicLong();

    public void inc() {
        boolean incSuccessful = false;
        while (!incSuccessful) {
            long value = count.get();
            long newValue = value + 1;

            incSuccessful = count.compareAndSet(value, newValue);
        }
    }

    public long getCount() {
        return count.get();
    }

    @SuppressWarnings("BusyWait")
    public static void main(String[] args) throws InterruptedException {
        OptimisticLockCounter counter = new OptimisticLockCounter();

        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            Thread t = new Thread(counter::inc);
            t.start();
            threads.add(t);
        }

        while (threads.stream().anyMatch(Thread::isAlive)) {
            Thread.sleep(100);
        }

        System.out.println(counter.getCount());
    }
}

注意 inc() 方法从 count 变量(一个 AtomicLong 实例)中取得已有的数量。随后基于旧值计算新值。最后尝试通过 compareAndSet 设置新值。

如果 AtomicLong 仍然保持着最后一次被获取时的值,那么 compareAndSet 将会成功。但如果另一个线程在此期间增加了它的值,该方法将会失败,因为预期值不再是 AtomicLong 内部的值。此时,inc() 会进行下一轮循环迭代,再次尝试增加 AtomicLong

31. Anatomy of a Synchronizer

即便许多同步结构(锁,信号量,阻塞队列等)功能各异,但通常它们的内部设计并非如此不同。换句话说,它们的内部都由相同(或类似)的基本单元构成。理解这些基本单元对设计同步结构大有裨益。本章将着重研究这些部件。

注:本文是一个学生项目的部分结果。该项目由 Jakob Jenkov(作者), Toke Johansen and Lars Bjørn 在 2004 年春于 the IT University of Copenhagen 完成。期间,我们询问了 Doug Lea 是否了解类似工作。有趣的是,他在开发 Java 5 并发工具时也曾独立得出类似结论。我想 Doug Lea 的工作可以在 "Java Concurrency in Practice" 中见到。该书也包含题为 “Anatomy of a Synchronizer” 的章节,内容与本文类似,虽然并不完全相同。

同步结构的大部分(如果不是全部)目的是保护一些代码块(临界区)不被线程同时访问。要实现该效果,同步结构通常需要以下部件:

并非所有同步结构都拥有上述全部构件,即便有也不完全与下文描述相同。但你通常能见到一到多个类似部件。

State

同步结构的状态被用做访问条件,决定线程是否可以被授予访问权限。在 Lock 中,状态被保存到 boolean 变量,标识自身是否被锁定。在 Bounded Semaphore 中,内部状态被保持在计数器(int)和上界(int)中,分别表示当前 “take” 的次数和 “take" 的最大值。在 Blocking Queue 中,状态是队列的 List 元素和最大队列大小(int)(如果存在的话)。

下面是两段分别摘自 LockBoundedSemaphore 的代码,状态变量见注释说明。

public class Lock {

    // state is kept here
    private boolean isLocked;

    public synchronized void lock() throws InterruptedException {
        while (isLocked) wait();
        isLocked = true;
    }
}
public class BoundedSemaphore {

    // state is kept here
    private int signals;
    private final int bound;

    public BoundedSemaphore(int upperBound) {
        this.bound = upperBound;
    }

    public synchronized void take() throws InterruptedException {
        while (signals == bound) wait();
        signals++;
        notify();
    }
}

Access Condition

访问条件决定调用 “测试并设置状态” 方法的线程是否被允许设置状态。访问条件通常基于同步结构的 state。通常在循环中检测访问条件,以防 意外唤醒。条件求值的结果要么是真,要么是假。

Lock 上的访问条件只简单地检查成员变量 isLocked 的值。在 Bounded Semaphore 中,实际上存在两个访问条件,取决于你是拿走还是释放信标。如果线程试图拿走信标,signals 会和上界比较。如果线程试图释放信标,signals 会和 0 比较。

下面两段代码分别从 LockBoundedSemaphore 中摘录,访问条件见注释标注,条件总是在循环中检测。

public class Lock {

    private boolean isLocked = false;

    public synchronized void lock() throws InterruptedException {
        // access condition
        while (isLocked) wait();
        isLocked = true;
    }
}
public class BoundedSemaphore {

    private int signals;
    private final int bound;

    public BoundedSemaphore(int upperBound) {
        this.bound = upperBound;
    }

    public synchronized void take() throws InterruptedException {
        // access condition
        while (signals == bound) wait();
        signals++;
        notify();
    }

    public synchronized void release() throws InterruptedException {
        // access condition
        while (signals == 0) wait();
        signals--;
        notify();
    }
}

State Changes

一旦线程获得临界区访问权,它必须改变同步结构状态来(可能)阻塞其它线程进入。换句话说,状态需要反映线程现在正在执行临界区代码。它应该影响其它试图获得权限线程的访问状态。

Lock 中,状态改变是设置 isLocked = true。在 Semaphore 中,它要么是 signal--,要么是 signal++

下面是两段状态改变代码,见注释标注:

public class Lock {

    private boolean isLocked = false;

    public synchronized void lock() throws InterruptedException {
        while (isLocked) wait();

        // state change
        isLocked = true;
    }

    public synchronized void unlock() {
        // state change
        isLocked = false;
        notify();
    }
}
public class BoundedSemaphore {

    private int signals;
    private final int bound;

    public BoundedSemaphore(int upperBound) {
        this.bound = upperBound;
    }

    public synchronized void take() throws InterruptedException {
        while (signals == bound) wait();
        // state change
        signals++;
        notify();
    }

    public synchronized void release() throws InterruptedException {
        while (signals == 0) wait();
        // state change
        signals--;
        notify();
    }
}

Notification Strategy

一旦线程改变了同步结构状态,它有时需要通知其它等待线程状态已经改变。也许这个改变可能将其它线程的访问条件置真。

通知策略通常分以下三种:

  1. 通知所有等待线程
  2. 随机通知一个等待线程
  3. 通知一个特定等待线程

通知所有等待线程相当简单。让所有等待线程调用同一对象的 wait() 方法,一旦想通知它们,其它线程可以调用同一对象的 notifyAll() 方法。

随机通知一个线程同样非常简单。只要让通知线程调用 notify(),此方法所属对象应该和等待线程调用的 wait() 所属对象相同。调用 notify() 不保证哪个等待线程被通知,因此使用术语 “随机等待线程”。

有时你可能需要通知一个特定而非随机的等待线程。例如你想保证等待线程以特定顺序被通知,可以是调用同步结构的顺序,或者其它优先级顺序。要取得该效果,每个等待线程必须调用不同对象的 wait() 方法。当通知线程想要通知特定等待线程时,它会调用该线程持有的那个包含 wait() 方法对象的 notify() 方法。这一示例可以在 Starvation and Fairness 中找到。

下面是一段通知策略代码(随机通知 1 个等待线程),留意注释标注:

public class Lock {

    private boolean isLocked = false;

    public synchronized void lock() throws InterruptedException {
        while (isLocked) {
            // wait strategy - related to notification strategy
            wait();
        }
        isLocked = true;
    }

    public synchronized void unlock() {
        isLocked = false;
        // notification strategy
        notify();
    }
}

Test and Set Method

同步结构大都包含两类方法,第一类是 test-and-set,另一类是 set。测试并设置指调用该方法的线程会检测同步结构内部状态是否满足访问条件。如果条件满足则改变同步结构内部状态来反映线程已经获得访问权。

状态转变通常导致其它试图获得权限线程的访问条件变为假,但并非总是这样。例如,在 Read Write Lock 中,获得读权限的线程会更新读写锁状态来反映该操作,但其它请求读访问的线程仍然能获得访问权,只要没有线程申请过写访问。

测试并设置操作被原子执行非常重要,这意味着在测试和状态设置期间,没有其它线程被允许进入测试并设置方法。

测试并设置的程序流通常是以下几步:

  1. 如有必要检测前设置状态
  2. 检测状态是否满足访问条件
  3. 如果不满足访问条件,等待
  4. 如果满足访问条件,设置状态,如有必要通知等待线程

下面展示的 ReadWriteLock 类的 lockWrite() 方法就是一个检测并设置的例子。调用 lockWrite() 方法的线程首先设置状态(writeRequests++)。随后在 canGrantWriteAccess() 方法中检测状态是否满足访问条件。如果测试成功,内部状态会在方法退出前再次设置。注意该方法没有通知等待线程。

public class ReadWriteLock {

    private final Map<Thread, Integer> readingThreads = new HashMap<>();

    private int writeAccesses;
    private int writeRequests;
    private Thread writingThread;

    // test-and-set
    public synchronized void lockWrite() throws InterruptedException {
        writeRequests++;
        Thread callingThread = Thread.currentThread();
        while (!canGrantWriteAccess(callingThread)) {
            wait();
        }
        writeRequests--;
        writeAccesses++;
        writingThread = callingThread;
    }
    
    /**
     * 线程可以获得写权限的前提是以下条件之一成立:
     * 1. 没有读线程,也没有写线程
     * 2. 已经获得写权限(写重入,单线程)
     * 3. 是唯一读者(read to write,单线程)
     *
     * @param callingThread 调用线程
     * @return true 如果能获得写权限
     */
    private boolean canGrantWriteAccess(Thread callingThread) {
        return noReaderOrWriter() ||
                isWriter(callingThread) ||
                isOnlyReader(callingThread);
    }

    private boolean noReaderOrWriter() {
        return readingThreads.size() == 0 &&
                writingThread == null;
    }

    private boolean isWriter(Thread callingThread) {
        return callingThread == writingThread;
    }

    private boolean isOnlyReader(Thread callingThread) {
        return readingThreads.size() == 1 &&
                readingThreads.get(callingThread) != null;
    }
}

下面展示的 BoundedSemaphore 类有两个 test-and-set 方法,take()release(),它们都会检测并设置内部状态。

public class BoundedSemaphore {

    private int signals;
    private final int bound;

    public BoundedSemaphore(int upperBound) {
        this.bound = upperBound;
    }

    // test-and-set
    public synchronized void take() throws InterruptedException {
        while (signals == bound) wait();
        signals++;
        notify();
    }

    // test-and-set
    public synchronized void release() throws InterruptedException {
        while (signals == 0) wait();
        signals--;
        notify();
    }
}

Set Method

设置方法是第二类常见的同步结构方法。它仅设置同步结构内部状态,在此之前不会检测状态。一个典型示例是 Lock 类的 unlock() 方法。持锁线程总是可以解锁它,不用检测锁是否已经解锁。

设置方法的过程流通常如下:

  1. 设置内部状态
  2. 通知等待线程

下面是 unlock() 方法示例:

public class Lock {

    private boolean isLocked = false;

    // set method
    public synchronized void unlock() {
        isLocked = false;
        notify();
    }
}

32. Non-blocking Algorithms

并发语境下的非阻塞算法指允许线程不被相关线程阻塞的情况下访问共享状态(或协作,通信)的算法。更宽泛地说,如果其中一个线程暂停不会导致算法中其它线程暂停,这种算法就是非阻塞的。

为了更好理解阻塞与非阻塞并发算法的不同,我会先解释阻塞算法,接着介绍非阻塞。

Blocking Concurrency Algorithms

阻塞并发算法具有以下特征:

  • A: 要么执行线程请求的动作 —— 要么
  • B: 阻塞线程直到动作可以安全执行

许多类型的算法和并发数据结构都是阻塞的。例如,java.util.concurrent.BlockingQueue 接口的不同实现都是阻塞数据结构。如果线程试图向阻塞队列插入元素,而队列没有空间,插入线程会阻塞(暂停)直到队列空出空间。

下图展示了阻塞算法保护共享数据结构时的行为:

blocking-algorithms

Non-blocking Concurency Algorithms

非阻塞算法具有以下特征:

  • A: 要么执行线程请求的动作 —— 要么
  • B: 通知请求线程动作无法完成

Java 同样包含许多非阻塞数据结构。AtomicBooleanAtomicIntegerAtomicLongAtomicReference 都是非阻塞数据结构的示例。

下图展示了非阻塞算法保护共享数据结构时的行为:

non-blocking-algorithms

Non-blocking vs Blocking Algorithms

阻塞与非阻塞算法的主要区别在于上述行为的第二步。换句话说,当请求动作无法执行时,两者的行为不同。

阻塞算法会阻塞线程直到请求动作可以执行,而非阻塞算法则通知线程请求动作无法执行。

使用阻塞算法,线程会在动作可以执行前一直阻塞。通常是其它线程的动作使第二个线程可以执行请求动作。如果由于某些原因,其它线程在应用的其它部分暂停了(阻塞),无法执行动作让第一个线程请求的动作可以执行,第一个线程就会保持阻塞 —— 可能是无限期,也可能直到其它线程最终执行了必要动作。

例如,如果线程试图向已经占满的阻塞队列插入元素,它将阻塞直到其它线程从队列取出元素。如果由于某些原因,应该取走元素的线程在应用其它地方阻塞(暂停),试图插入元素的线程将保持阻塞 —— 可能是无限期,也可能要等到消费线程最终从队列取走元素。

Non-blocking Concurrent Data Structures

在多线程系统中,线程通常通过某种数据结构通信。这些数据结构可以是简单变量,或者更高级的数据结构,如队列,散列,栈等。为了实现多线程正确、并发访问数据结构,这些数据必须被并发算法保护。正是这些算法才使它们成为并发数据结构。

如果保护并发数据结构的算法是阻塞的(使用线程暂停),就称其为阻塞算法,被保护的数据称阻塞并发数据结构。

如果保护并发数据结构的算法是非阻塞的,就称其为非阻塞算法,被保护的数据称非阻塞并发数据结构。

每种并发数据结构被设计来支持特定通信方法。要使用哪种取决于你的通信需求。接下来我会介绍一些非阻塞并发数据结构,解释什么情况下使用它们。对这些结构原理的解释将帮助你理解如何设计和实现非阻塞数据结构。

Volatile Variables

Java volatile 变量总是从主存读取。当新值被赋予 volatile 变量时,值也总是立即写回主存。这保证了 volatile 变量的最新值总是对运行于不同 CPU 上的其它线程可见。其它线程将每次从主存读取 volatile 变量的值,而非例如从它们运行的 CPU 缓存上读取。

volatile 变量是非阻塞的。对 volatile 变量的写是原子操作,不会被打断。但是,读更写并非原子操作序列。因此,以下代码如果被多线程执行仍可能导致 race conditions

public class Volatile {
    volatile int myVar = 0;

    public void inc() {
        int temp = myVar;
        temp++;
        myVar = temp;
    }
}

首先 volatile 变量 myVar 的值被从主存读入临时变量。随后,临时变量执行自增。最后临时变量的值被赋回 myVar,这意味着它将被写回主存。

如果两个线程执行这段代码,它们都读取了 myVar 的值,在值上加一,随后将结果写回主存,此时你就有结果只加了 1 而非 2 的风险(例如,两个线程读到的值都是 19,将其增加到 20,随后 20 被写回)。

你可能认为你不会像上面那样编写代码,但实际上 inc 方法中的代码与 myVar++ 等价。

当被执行时,myVar 的值被读到 CPU 寄存器或本地 CPU 缓存,加上一,随后将寄存器或缓存的结果写回主存。

The Single Writer Case

某些情况下,你只有一个线程对共享变量写入,其它线程都只读取变量的值。此时不会发生竞态条件,无论多少线程读取变量。因此,如果你只需要一个线程更新共享变量,你就可以使用 volatile 变量。

当多个线程对共享变量执行读更写操作序列,竞态条件就会发生。如果你只有一个线程执行读更写,其它线程只读取变量,竞态条件不会发生。

下面是只有一个写者的计数器类,它可以在不使用同步的情况下实现并发:

public class SingleWriterCounter {
    private volatile long count;

    /**
     * Only one thread my ever call this method,
     * or it will lead to race conditions.
     */
    public void inc() {
        //noinspection NonAtomicOperationOnVolatileField
        count++;
    }

    /**
     * Many reading threads may call this methos
     *
     * @return count
     */
    public long count() {
        return count;
    }
}

多个线程可以访问同一 counter 实例,只要只有一个线程调用 inc()。我不是说一次一个线程,而是只有一个调用 inc() 方法的线程,可以有多个调用 count() 方法的线程。这不会导致竞态条件。

下图展示了线程如何访问 volatile count 变量:

non-blocking-algorithms

More Advanced Data Structures Based on Volatile Variables

可以创建一个使用多个 volatile 变量集合的数据结构,每个 volatile 变量只被单线程写入,可被多线程读取。每个 volatile 变量可以被不同线程(但每次只能有一个)写入。使用这种 volatile 变量组成的数据结构,多线程能以非阻塞的方式互相发送信息。

下面这个简单的双写者计数器展示了上述结构的工作方式:

@SuppressWarnings("NonAtomicOperationOnVolatileField")
public class DoubleWriterCounter {

    private volatile long countA;
    private volatile long countB;

    /**
     * Only one (and the same from thereon) thread may ever call this method,
     * or it will lead to race conditions.
     */
    public void incA() {
        countA++;
    }

    /**
     * Only one (and the same from thereon) thread may ever call this method,
     * or it will lead to race conditions.
     */
    public void incB() {
        countB++;
    }

    /**
     * Many reading threads may call this method
     *
     * @return countA
     */
    public long countA() {
        return countA;
    }

    /**
     * Many reading threads may call this method
     *
     * @return countB
     */
    public long countB() {
        return countB;
    }
}

如你所见,DoubleWriterCounter 现在包含两个 volatile 变量和两对自增和读取方法。只有一个线程可以调用 incA()incB() 同样如此,但是可以允许不同线程不同时刻调用它们。允许多个线程调用 countA()countB(),这不会导致竞态条件。

DoubleWriterCounter 可以用于例如两个线程间的通信。两个计数器可以分别代表生产和消费的任务数。下图展示了两个线程通过类似上述类的数据结构进行通信:

non-blocking-algorithms.png

聪明的读者可能已经意识到,可以使用两个 SingleWriterCounter 实现 DoubleWriterCounter 的效果。如果需要,你甚至可以使用更多线程和更多 SingleWriterCounter 实例。

Optimistic Locking with Compare and Swap

如果你确实需要多线程对同一共享变量写入,volatile 变量将无法满足需求,你需要某种排它访问变量。这就是使用 Java 同步块 实现的排它访问的情形:

public class SynchronizedCounter {
    long count;

    public void inc() {
        synchronized (this) {
            count++;
        }
    }

    public long count() {
        synchronized (this) {
            return count;
        }
    }
}

注意 inc()count() 方法都包含同步块。这就是我们想避免的 —— 同步块和 wait() - notify() 调用等。

不是使用同步块,我们可以使用 Java 的原子变量,本例中,使用 AtomicLong。下面就是使用 AtomicLong 实现的相同效果的计数器类:

public class AtomicCounter {
    private final AtomicLong count = new AtomicLong(0);

    public void inc() {
        boolean updated = false;
        while (!updated) {
            long prevCount = count.get();
            updated = count.compareAndSet(prevCount, prevCount + 1);
        }
    }

    public long count() {
        return count.get();
    }
}

这一版本和前一版一样,都是线程安全的。有趣的是,此版 inc() 方法的实现。它不包含同步块,取代的是如下几行代码:

boolean updated = false;
while (!updated) {
    long prevCount = count.get();
    updated = count.compareAndSet(prevCount, prevCount + 1);
}

这几行并非原子操作。这意味着,可能存在两个线程调用 inc() 方法,都执行了 long prevCount = this.count.get() 声明,因此获得了前一个计数器值。但是,上述代码的确不包含任何竞态条件。

秘密是 while 循环中的第二行,compareAndSet() 调用是一个原子操作。它将 AtomicLong 内部的值与预期值比较,如果相等,把内部值设为新值。compareAndSet() 方法通常直接由 CPU 上的比较和交换指令支持。因此不需要同步,也就没有线程暂停。这节省了线程暂停开销。

想象 AtomicLong 内部的值是 20。两个线程读取该变量,都尝试调用 compareAndSet(20, 20 + 1),因为是原子操作,线程会顺序执行(一次一个线程)。

第一个线程比较预期值 20(计数器之前的值)和变量内部值,它们相等,随后内部值被更新为 21 (20 + 1)。updated 变量会被设置为 truewhile 循环终止。

现在,第二个线程调用 compareAndSet(20, 20 + 1)。由于内部值不再是 20,这次调用会失败,内部值不会被设为 21。updated 变量会被设为 false,线程会在循环中自旋一次。这次它会读到 21,试图把它更新为 22。如果没有其它线程同时调用 inc() 方法,这次迭代会成功把 AtomicLong 更新到 22。

Why is it Called Optimistic Locking?

之前展示的代码又叫 乐观锁。它不同于传统锁定,后者有时也叫悲观锁。传统锁定使用同步块或其它锁,阻塞共享变量的访问。同步块或锁可能导致线程暂停。

乐观锁允许多个线程创建共享内存拷贝,不需要阻塞。线程会修改它的拷贝,并尝试将修改版本写回共享内存。如果没有其它线程对共享内存做任何修改,比较和交换操作允许线程将改变写回共享内存。如果另一线程改变了共享内存,前一线程不得不重新拷贝,做更改,尝试再次写共享内存。

之所以叫乐观锁,是因为获得数据拷贝的线程想把改变同步回去,它们乐观地假定没有其它线程同时对内存做更改。如果乐观假设成立,线程就可以成功更新共享内存,不需要锁定。如果假定失败,工作就浪费了,但仍然不需要锁定。

乐观锁更适合对共享内存存在低中级竞争的情形。如果竞争非常高,线程将浪费许多 CPU 周期拷贝和修改,最终也无法将改变写回共享内存。然而,如果存在非常多对共享内存的竞争,你应该考虑重新设计代码,将竞争降低。

Optimistic Locking is Non-blocking

我这儿展示的乐观锁机制是非阻塞的。如果线程获得共享内存的拷贝后,尝试修改它时被阻塞(不知什么原因),其它访问共享内存的线程不会被阻塞。

使用传统锁定、解锁机制,当线程锁定一个锁,其它线程会被阻塞直到持锁线程再次解锁。如果持锁线程在其它地方阻塞,锁定会保持很长时间,甚至是无限期。

Non-swappable Data Structures

简单的比较和交换乐观锁适用于整个数据结构可以通过一次比较和交换操作更新的情形。但是,使用拷贝交换整个数据结构并不总是可能或可行的。

想象如果共享数据结构是队列。每个试图从队列插入或拿取元素的线程必须拷贝整个队列,对拷贝做预定修改。这可以通过 AtomicReference 完成,拷贝引用,复制和修改队列,尝试使用新创建的队列交换 AtomicReference 的引用。

然而,大数据结构可能需要许多内存和 CPU 周期拷贝。这会使你的应用消耗太多内存和时间。它将影响你的应用性能,尤其是竞争比较高的时候。此外,拷贝和修改数据的时间越长,其它线程在此期间更改了共享数据的可能性就越大。我们知道,如果另一线程修改了共享数据,其它线程必须重新执行拷贝修改操作,这将消耗更多时间和内存。

接下来我将解释一种实现非阻塞数据结构的方法,它不需要拷贝和修改整个共享内存,支持并发更新。

Sharing Intended Modifications

不是拷贝和修改整个共享数据结构,线程可以共享它们对共享数据的 修改意向 (intended modification)。想要修改共享数据结构的过程变为:

  1. 检查其它线程是否已经提交了对数据的修改意向。
  2. 如果没有其它线程提交,创建一个修改意向(由一个对象代表),将其提交(使用比较和交换操作)。
  3. 执行对共享数据结构的修改。
  4. 移除对修改意向的引用,让其它线程知道修改意向已经执行。

如你所见,第二步会阻塞其它线程提交修改意向。因此,第二步等同于对共享数据结构加锁。如果一个线程成功提交了修改意向,没有其它线程可以提交意向直到第一个意向被执行。

如果线程提交意向后由于其它工作被阻塞,共享数据结构就阻塞了。它不会直接阻塞其它使用数据的线程,其它线程可以检测到无法提交修改意向,所以可以选择执行其它任务。显然,我们需要修复它。

Completable Intended Modifications

为了避免已提交的修改意向阻塞共享数据结构,被提交的对象必须包含足够信息让其它线程能完成修改。这样,如果提交意向的线程没有完成修改,另一线程可以自己完成修改,让共享数据结构可以被其它线程使用。

下图展示了上面描述的非阻塞算法的原型:

non-blocking-algorithms

修改必须通过一到多个比较和交换操作完成。因此,如果两个线程尝试完成修改意向,只有一个线程可以执行任何一个比较和交换操作。只要一个比较和交换操作被完成,后续完成同一比较和交换操作的尝试都将失败。

The A-B-A Problem

上述算法可能遭受 A-B-A 问题。该问题指某个解决方案中,变量从 A 变为 B,又被改回 A。另一线程无法检测变量是否被改变。

如果线程 A 检测了正在进行的更新,拷贝数据,随后被线程调度器暂停,线程 B 可能同时访问了共享数据结构。如果线程 B 执行了一次完整更新,随后移除修改意向,在线程 A 看来,就像修改没有发生。但是,修改确实发生了。当线程 A 现在基于过期拷贝执行更新后,共享数据结构上,线程 B 的修改就会丢失。

下图展示了 A-B-A 问题:

non-blocking-algorithms

A-B-A Solutions

解决 A-B-A 问题的常见方案是,不仅仅交换修改意向指针,而是组合指针和计数器,使用一个比较和交换操作更新组合体。这在支持指针的语言,如 C 和 C++ 中可以实现。因此,即使当前修改指针被设回 “没有正在进行的修改”,计数器也会增加,使更新对其它线程可见。

在 Java 中,你不能合并引用和计数器到单独变量。但它提供了 AtomicStampedReference 类,可以使用一次比较和交换操作原子更新引用和标记。

A Non-blocking Algorithm Template

以下是一个代码模板,意在告诉你非阻塞算法的实现思路,它基于上文描述。

注意:我并非非阻塞算法专家,所以下面的模板很大可能存在错误。不要使用它实现你的非阻塞算法,该模板旨在给与你非阻塞算法的思路。如果你想实现自己的非阻塞算法,研究一些真实的,正常工作的算法实现。

public class NonblockingTemplate {

    public static final class IntendedModification {
        public AtomicBoolean completed = new AtomicBoolean(false);
    }

    private final AtomicStampedReference<IntendedModification>
            ongoingMod = new AtomicStampedReference<>(null, 0);

    // declare the state of the data structure here.

    public void modify() {
        //noinspection StatementWithEmptyBody
        while (!attemptModifyASR()) ;
    }

    private boolean attemptModifyASR() {
        boolean modified = false;

        IntendedModification currentlyOngoingMod = ongoingMod.getReference();
        int stamp = ongoingMod.getStamp();

        if (currentlyOngoingMod == null) {
            // copy data structure state - for use
            // in intended modification

            // prepare intended modification
            IntendedModification newMod = new IntendedModification();

            boolean modSubmitted =
                    ongoingMod.compareAndSet(null, newMod, stamp, stamp + 1);

            if (modSubmitted) {
                // complete modification via a series of compare-and-swap operations.
                // note: other threads may assist in completing the compare-and-swap
                // operations, so some CAS may fail
                modified = true;
            }
        } else {
            // attempt to complete ongoing modification, so the data structure is freed up
            // to allow access from this thread.
        }

        return modified;
    }

}

Non-blocking Algorithms are Difficult to Implement

非阻塞算法不容易正确设计和实现。尝试自己实现前,寻找是否存在满足需求的他人实现。

Java 已经内置了一些非阻塞实现(如 ConcurrentLinkedQueue),将来很大可能会引入更多。

除了 Java 内置的非阻塞数据结构,还存在许多开源非阻塞结构供你使用。例如,LMAX Disrupter(类队列)和来自 Cliff Click 的非阻塞 HashMap。在 Java concurrency references page 查看更多资源链接。

The Benefit of Non-blocking Algorithms

相比阻塞算法,非阻塞算法存在许多优势,接下来我将介绍几个。

Choice

非阻塞的第一个优点是,线程在请求动作无法完成时可以选择接下来要干什么。有时线程什么也做不了,它可以选择阻塞或使自己等待,为其它任务释放 CPU。但至少请求线程拥有自主选择权。

单 CPU 系统也许适合阻塞无法执行动作的线程,让其它线程使用 CPU 执行工作。但即便如此,阻塞算法也可能导致 死锁饥饿 和其它并发问题。

No Deadlocks

非阻塞的第二个优势是,一个线程的暂停不会导致另一个也暂停,这意味着死锁不会发生。两个线程不会由于互相等待对方释放它们需要的锁而阻塞。由于线程不会在无法执行请求动作时阻塞,它们就不会因互相等待死锁。非阻塞算法可能仍会导致活锁,即线程持续尝试动作,但持续被告知动作无法完成(由于其它线程的动作)。

No Thread Suspension

暂停和恢复对于线程来说是昂贵的。不错,随着操作系统和线程库越来越高效,暂停和激活的代价在降低,但总体来说,代价仍然很高。

线程被阻塞就会暂停,因而带来暂停和激活开销。由于非阻塞算法不会暂停线程,这些消耗不会发生。这意味着 CPU 可以把更多潜在时间花在执行实际业务逻辑,而非上下文切换。

在多 CPU 系统中,阻塞算法对总体性能的影响更大。运行在 CPU A 上的线程会因等待运行于 CPU B 上的线程而阻塞。这会降低应用可以达到的并行等级。当然,CPU A 可以调度其它线程运行,但暂停和恢复线程(上下文切换)是昂贵的,线程暂停越少越好。

Reduced Thread Latency

此处说的延迟指请求动作可以执行到线程真正开始执行它的时间。在非阻塞算法中,由于线程不会暂停,它们不必花费高昂的,缓慢的恢复开销。这意味着当请求动作可以执行时,线程会更快响应,因此降低响应延迟。

非阻塞算法通常通过忙等请求动作满足执行条件来获得低延迟。当然,在对非阻塞数据结构存在高线程竞争的系统中,CPU 最终可能消耗许多周期在忙等上,这是需要考虑的问题。如果你的数据结构存在高线程竞争,非阻塞算法可能不是最好的。然而,总是有方法重构应用来降低线程竞争。

33. Amdahl's Law

阿姆达尔定律可以用来衡量系统的部分并行能对计算带来多少加速。该定律由 Gene Amdahl 在 1967 年提出。即便不知道该定律,大多数并行或并发系统开发者也对潜在加速存在直观感受。但不管怎么说,了解该定理仍然非常有用。

我会先用数学解释阿姆达尔定律,随后辅以图片讲解。

Amdahl's Law Defined

能够并行的程序或算法能够拆分成两部分:

  • 无法并行的部分
  • 可以并行的部分

想象处理磁盘文件的程序。程序的一部分代码会扫描目录,在内存中创建文件列表。随后,每个文件会被传给单独的线程处理。那么扫描目录和创建文件列表的部分不能并行,但文件处理部分可以。

把程序总的串行(非并行)执行时间记为 T。它包括并行和非并行部分的时间。非并行部分时间记作 B,则并行部分时间为 T - B。下面的列表总结了这些定义:

  • T = 总串行执行时间
  • B = 总非并行部分执行时间
  • T - B = 总并行部分执行时间(当被串行执行时,而非并行)

有以下公式:

T = B + (T - B)

初看上去有点奇怪,程序可并行执行的部分在方程中没有自己的符号。但是,因为它可以用总时间 T 和不可并行执行部分 B 表示,实际上在概念上简化了方程,让方程包含的变量更少。

可并行执行的部分 T - B 通过并行执行可以获得加速。能够加速多少取决于使用多少线程或 CPU 执行它。线程或 CPU 的数量如果记作 N,可并行执行部分能达到的最大加速倍数是:

(T - B) / N

可以换种方式表达:

(1 / N) * (T - B)

维基百科使用这一版本,如果你在那阅读阿姆达尔定律的话。

根据定律,当可被并行执行部分使用 N 个线程或 CPU 执行时,程序总的执行时间是:

T(N) = B + (T - B) / N

T(N) 指使用并发因子 N 的总执行时间。因此,T 可以写作 T(1),代表使用并发因子 1 的总执行时间。使用 T(1) 取代 T,阿姆达尔定律看上去像这样:

T(N) = B + (T(1) - B) / N

虽然意义是相同的。

A Calculation Example

为了更好理解阿姆达尔定律,让我们计算一个例子。假定程序总的执行时间是 1。不可并行部分占 40%,乘以总时间 1 得到具体时间为 0.4。因此可并行部分的执行时间是 1 - 0.4 = 0.6。

使用并行因子 2(2 个线程或 CPU 执行可并行部分,所以 N 是 2)的程序执行时间为:

T(2) = 0.4 + (1 - 0.4) / 2
     = 0.4 + 0.6 / 2
     = 0.4 + 0.3
     = 0.7

使用并行因子 5 的总执行时间是:

T(5) = 0.4 + (1 - 0.4) / 5
     = 0.4 + 0.6 / 5
     = 0.4 + 0.12
     = 0.52

Amdahl's Law Illustrated

为了进一步理解阿姆达尔定律,我将试图使用图片解释它是如何派生的。

首先,程序可以被拆分为不可并行的部分 B,和可并行的部分 1 - B,如下图所示:

amdahls-law

顶部带有分界线的线段代替总时间 T(1)。

下面是具有并行因子 2 的执行时间:

amdahls-law

下面是具有并行因子 3 的执行时间:

amdahls-law.png

Optimizing Algorithms

从阿姆达尔定律能自然得出,要加速可并行部分,只需要提供更多硬件,线程、CPU 等。但是非并行部分,只可以通过优化代码加速。因此,通过优化不可并行部分的代码,也可以提高程序的速度和并发性。如果可能的话,你甚至可以通过再提取一些可并行任务,让不可并行比例减少。

Optimizing the Sequential Part

优化了程序的串行部分后,你可以使用阿姆达尔定律再次计算程序的执行时间。如果不可并行部分 B 的优化因子是 O,那么:

T(O, N) = B / O + (1 - B / O) / N

记住,程序不可并行部分现在消耗 B / O 的时间,所以可并行部分是 1 - B / O。

如果 B 是 0.4,O 是 2,N 是 5,那么:

T(2, 5) = 0.4 / 2 + (1 - 0.4 / 2) / 5
        = 0.2 + 0.8 / 5
        = 0.2 + 0.16
        = 0.36

Execution Time vs. Speedup

目前,我们仅用阿姆达尔定律计算了优化或并行后,程序或算法的执行时间。我们也可以进一步计算加速比,即优化后的算法比之前快了多少。

如果起初程序或算法消耗的时间是 T,那么加速比为

Speedup = T / T(O, N)

我们通常把 T 看成 1,那么:

Speedup = 1 / T(O, N)

使用阿姆达尔定律替换 T(O, N),得到以下公式:

Speedup = 1 / (B / O + (1 - B / O) / N )

取 B = 0.4,O = 2,N = 5,则:

Speedup = 1 / (0.4 / 2 + (1 - 0.4 / 2) / 5)
        = 1 / (0.2 + 0.8 / 5)
        = 1 / (0.2 + 0.16)
        = 1 / 0.36
        = 2.77777 ...

这意味着,如果你使用因子 2 优化不可并行部分,因子 5 优化可并行部分,那么优化版本的程序或算法相比初始版本,最快能提速 2.77777 倍。

Measure, Don't Just Calculate

尽管阿姆达尔定律能让你计算并行算法的理论加速比,但不要太依赖它。现实中,当你优化或并行化算法时,可能需要考虑许多其它因素。

内存,CPU 缓存,磁盘,网卡等的速度(如果使用硬盘或网络的话)可能也是一个限制因素。如果并行化后的算法导致过多的 CPU 缓存丢失,你可能无法获得等同于 CPU 数量的加速倍数。对于内存带宽,磁盘和网络或网络连接的过度占用也是如此。

我的推荐是使用阿姆达尔定律分析哪些部分需要优化,但使用测量手段决定真实的优化加速比。记住,有时,一个高度串行(单 CPU)算法可能比并行更快,仅仅因为串行版本没有协作开销(任务拆分和合并),并且单 CPU 算法可能更符合底层硬件的工作机制(CPU 流水线、缓存等)。

34. Java Concurrency References

时不时有人问,我写这篇文章前看了哪些书籍和文章。并发非常微妙,所以人们总是希望可以检测我的写作和其它资源间的差异。因此我列出了编写这篇教程时使用过的并发相关书籍和文章列表。

Books

Java Concurrency in Practice

这是 Java 并发的最新书籍。它在 2006 出版所以可能有些过时。例如,它没有介绍异步架构(该架构在 2015 年的今天很火)。这是一本 Java 并发方面的好书,是介绍 Java 5 及之后版本 java.util.concurrent 包的最佳书籍。

Seven Concurrency Models in Seven Weeks

这本书很新(2014 年出版),结合了不同种类的并发模型 —— 不仅仅是传统的线程,共享内存和锁模型。

Concurrent Programming in Java - 1st + 2nd Ed.

这本(有两个版本)是当时 Java 并发的入门书籍。它在 1999 年出版,自那之后 Java 和并发模型发展了很多。

Taming Java Threads

这在当时也是一本优良的多线程入门读物。它出版于 2000 年,所以和 Doug Lea 的书一样,不包含之后的改变。

Java Threads

2004 年出版的 Java 多线程好书。但是书中的大多数材料在其它书籍都有包含,所以我没有仔细读过。

Articles

https://www.cise.ufl.edu/tr/DOC/REP-1991-12.pdf

关于非阻塞并发算法的好文章(写于 1991)。

Ohter Resources

https://lmax-exchange.github.io/disruptor/

LMAX Disrupter 并发数据结构(一个高并发的单读,单写类队列结构)。

本文译自 Java Concurrency,译者 LOGI,相关 Concurrency,转载请附原、译文地址。
TG 大佬群 QQ 大佬群

返回文章列表 文章二维码
本页链接的二维码
打赏二维码
添加新评论

Loading captcha...

已有 1 条评论
  1. wind wind   Mac OS X 10.15.7  Safari 14.1.1

    很赞!!!!