代码编织梦想

同步工具类

在容器类中,阻塞队列是一种独特的类:它们不仅能作为保存对象的容器,还能协调生产者和消费者等线程之间的控制流,因为take 和put等方法将阻塞,直到队列达到期望的状态(队列既非空,也非满)。

同步工具类可以是任何一个对象,只要它根据其自身的状态来协调线程的控制流。阻塞队列可以作为同步工具类,其他类型的同步工具类还包括信号量(Semaphore)、栅栏(Barrier)以及闭锁(Latch)。在平台类库中还包含其他一些同步工具类的类,如果这些类还无法满足需要,那么可以按照第14章中给出的机制来创建自己的同步工具类。

所有的同步工具类都包含一些特定的结构化属性:它们封装了一些状态,这些状态将决定执行同步工具类的线程是继续执行还是等待,此外还提供了一些方法对状态进行操作,以及另一些方法用于高效地等待同步工具类进入到预期状态。

  闭锁

闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态[CPJ 3.4.2]。闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。闭锁可以用来确保某些活动直到其他活动都完成后才继续执行,例如:

·确保某个计算在其需要的所有资源都被初始化之后才继续执行。二元闭锁(包括两个状态)可以用来表示“资源R已经被初始化”,而所有需要R的操作都必须先在这个闭锁上等待。

·确保某个服务在其依赖的所有其他服务都已经启动之后才启动。每个服务都有一个相关的二元闭锁。当启动服务S时,将首先在S依赖的其他服务的闭锁上等待,在所有依赖的服务都启动后会释放闭锁S,这样其他依赖S的服务才能继续执行。

·等待直到某个操作的所有参与者(例如,在多玩家游戏中的所有玩家)都就绪再继续执行。在这种情况中,当所有玩家都准备就绪时,闭锁将到达结束状态。

CountDownLatch是一种灵活的闭锁实现,可以在上述各种情况中使用,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而await 方法等待计数器达到零,这表示所有需要等待的事件都已经发生。如果计数器的值非零,那么await 会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。

在程序清单5-11 的TestHarness中给出了闭锁的两种常见用法。TestHarness创建一定数量的线程,利用它们并发地执行指定的任务。它使用两个闭锁,分别表示“起始门(Starting Gate)”和“结束门(Ending Gate)”。起始门计数器的初始值为1,而结束门计数器的初始值为工作线程的数量。每个工作线程首先要做的值就是在启动门上等待,从而确保所有线程都就绪后才开始执行。而每个线程要做的最后一件事情是将调用结束门的countDown方法减1,这能使主线程高效地等待直到所有工作线程都执行完成,因此可以统计所消耗的时间。

public class TestHarness {

public long timeTasks(int nThreads, final Runnable task)

throws InterruptedException {

final CountDownLatch startGate =new CountDownLatch(1);

final CountDownLatch endGate =new CountDownLatch(nThreads);

for   ( int   i  = 0;i   < nThreads;i++)  {

Thread t =new Thread(){

public void run(){

try {

startGate. await();

try  {.

task. run();

}finally {

endGate. countDown();

}

}  catch (InterruptedException ignored){}

}

} ;

t. start();

}

long start =System. nanoTime();

startGate. countDown();

endGate. await();

long end =System. nanoTime();

return end-start;

}

}

为什么要在TestHarness中使用闭锁,而不是在线程创建后就立即启动?或许,我们希望测试n个线程并发执行某个任务时需要的时间。如果在创建线程后立即启动它们,那么先启动的线程将“领先”后启动的线程,并且活跃线程数量会随着时间的推移而增加或减少,竞争程度也在不断发生变化。启动门将使得主线程能够同时释放所有工作线程,而结束门则使主线程能够等待最后一个线程执行完成,而不是顺序地等待每个线程执行完成。

  FutureTask

FutureTask也可以用做闭锁。(FutureTask实现了Future语义,表示一种抽象的可生成结果的计算[CPJ 4.3.3])。FutureTask 表示的计算是通过Callable 来实现的,相当于一种可生成结果的Runnable,并且可以处于以下3种状态:等待运行(Waiting to run),正在运行(Running)和运行完成(Completed)。“执行完成”表示计算的所有可能结束方式,包括正常结束、由于取消而结束和由于异常而结束等。当FutureTask 进入完成状态后,它会永远停止在这个状态上。

Future. get的行为取决于任务的状态。如果任务已经完成,那么get会立即返回结果,否则get将阻塞直到任务进入完成状态,然后返回结果或者抛出异常。FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,而FutureTask的规范确保了这种传递过程能实现结果的安全发布。

FutureTask在Executor框架中表示异步任务,此外还可以用来表示一些时间较长的计算,这些计算可以在使用计算结果之前启动。程序清单5-12 中的Preloader 就使用了FutureTask来执行一个高开销的计算,并且计算结果将在稍后使用。通过提前启动计算,可以减少在等待结果时需要的时间。

public class Preloader {

private final FutureTask<Product Info>future =

new FutureTask<ProductInfo>(new Callable<ProductInfo>(){

public ProductInfo call() throws DataLoadException {

return loadProductInfo();


private final Thread thread =new Thread(future);

public void start(){thread. start();}

public ProductInfo get()

throws DataLoadException,InterruptedException {

try {

return future. get();

}catch (ExecutionException e){

Throwable cause =·e. getCause();

if (cause instanceof DataLoadException)

throw (DataLoadException) cause;

else

throw launderThrowable(cause);

}

}

}

Preloader 创建了一个FutureTask,其中包含从数据库加载产品信息的任务,以及一个执行运算的线程。由于在构造函数或静态初始化方法中启动线程并不是一种好方法,因此提供了一个start 方法来启动线程。当程序随后需要ProductInfo时,可以调用get 方法,如果数据已经加载,那么将返回这些数据,否则将等待加载完成后再返回。

Callable 表示的任务可以抛出受检查的或未受检查的异常,并且任何代码都可能抛出一个Error。无论任务代码抛出什么异常,都会被封装到一个ExecutionException中,并在Future. get 中被重新抛出。这将使调用get的代码变得复杂,因为它不仅需要处理可能出现的ExecutionException(以及未检查的CancellationException),而且还由于ExecutionException是作为一个Throwable 类返回的,因此处理起来并不容易。

在Preloader 中,当get方法抛出ExecutionException时,可能是以下三种情况之一:Callable 抛出的受检查异常,RuntimeException,以及Error。我们必须对每种情况进行单独处理,但我们将使用程序清单5-13 中的launderThrowable辅助方法来封装一些复杂的异常处理逻辑。在调用launderThrowable之前, Preloader 会首先检查已知的受检查异常,并重新抛出它们。剩下的是未检查异常, Preloader 将调用launderThrowable 并抛出结果。如果Throwable传递给

launderThrowable 的是一个Error,那么launderThrowable将直接再次抛出它;如果不是RuntimeException,那么将抛出一个IllegalStateException 表示这是一个逻辑错误。剩下的RuntimeException,launderThrowable 将把它们返回给调用者,而调用者通常会重新抛出它们。

/**如果Throwable 是Error,那么抛出它;如果是RuntimeException,那么返回它,否则抛出IllegalStateException。*/

public static RuntimeException launderThrowable(Throwable t){

if (t instanceof RuntimeException)

return (RuntimeException)t;

else if (t instanceof Error)

throw (Error)t;

else

throw new IllegalStateException("Not unchecked",t);

}

  信号量

计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量[CPJ 3.4.1]。计数信号量还可以用来实现某种资源池,或者对容器施加边界。

Semaphore 中管理着一组虚拟的许可(permit),许可的初始数量可通过构造函数来指定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么acquire 将阻塞直到有许可(或者直到被中断或者操作超时)。release方法将返回一个许可给信号量。计算信号量的一种简化形式是二值信号量,即初始值为1的Semaphore。二值信号量可以用做互斥体(mutex),并具备不可重入的加锁语义:谁拥有这个唯一的许可,谁就拥有了互斥锁。

Semaphore 可以用于实现资源池,例如数据库连接池。我们可以构造一个固定长度的资源池,当池为空时,请求资源将会失败,但你真正希望看到的行为是阻塞而不是失败,并且当池非空时解除阻塞。如果将Semaphore的计数值初始化为池的大小,并在从池中获取一个资源之前首先调用acquire 方法获取一个许可,在将资源返回给池之后调用release 释放许可,那么acquire 将一直阻塞直到资源池不为空。在第12章的有界缓冲类中将使用这项技术。(在构造阻塞对象池时,一种更简单的方法是使用BlockingQueue来保存池的资源。)

同样,你也可以使用Semaphore将任何一种容器变成有界阻塞容器,如程序清单5-14中的BoundedHashSet所示。信号量的计数值会初始化为容器容量的最大值。add操作在向底层容器中添加一个元素之前,首先要获取一个许可。如果add 操作没有添加任何元素,那么会立刻在这种实现中不包含真正的许可对象,并且Semaphore也不会将许可与线程关联起来,因此在一个线程中获得的许可可以在另一个线程中释放。可以将acquire操作视为是消费一个许可,而release 操作是创建一个许可, Semaphore并不受限于它在创建时的初始许可数量。

释放许可。同样, remove操作释放一个许可,使更多的元素能够添加到容器中。底层的Set实现并不知道关于边界的任何信息,这是由BoundedHashSet来处理的。

public class BoundedHashSet<T>{

private final Set<T>set;

private final Semaphore sem;

public BoundedHashSet(int bound){

this. set =Collections. synchronizedSet(new HashSet<T>());

sem =new Semaphore(bound);

}

public boolean add (To) throws InterruptedException {

sem. acquire();

boolean wasAdded =false;

try {

wasAdded =set. add(o);

return wasAdded;

}

finally {

if (!wasAdded)

sem. release();

}

}

public boolean remove(Object o){

boolean wasRemoved =set. remove(o);

if (wasRemoved)

sem. release();

return wasRemoved;

}

}

  栅栏

我们已经看到通过闭锁来启动一组相关的操作,或者等待一组相关的操作结束。闭锁是一次性对象,一旦进入终止状态,就不能被重置。

栅栏(Barrier)类似于闭锁,它能阻塞一组线程直到某个事件发生[CPJ 4,4.3]。栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。栅栏用于实现一些协议,例如几个家庭决定在某个地方集合:“所有人6:00在麦当劳碰头,到了以后要等其他人,之后再讨论下一步要做的事情。”

CyclicBarrier 可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法通常将一个问题拆分成一系列相互独立的子问题。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达了栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置以便下次使用。如果对await 的调用超时,或者await 阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierException。如果成功地通过栅栏,那么await 将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来“选举”产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。CyclicBarrier还可以使你将一个栅栏操作传递给构造函数,这是一个Runnable,当成功通过栅栏时会(在一个子任务线程中)执行它,但在阻塞线程被释放之前是不能执行的。

在模拟程序中通常需要使用栅栏,例如某个步骤中的计算可以并行执行,但必须等到该步骤中的所有计算都执行完毕才能进入下一个步骤。例如,在n-body粒子模拟系统中,每个步骤都根据其他粒子的位置和属性来计算各个粒子的新位置。通过在每两次更新之间等待栅栏,能够确保在第k步中的所有更新操作都已经计算完毕,才进入第k+l步。

在程序清单5-15 的CellularAutomata中给出了如何通过栅栏来计算细胞的自动化模拟;例如Conway 的生命游戏(Gardner,1970)。在把模拟过程并行化时,为每个元素(在这个示例中相当于一个细胞)分配一个独立的线程是不现实的,因为这将产生过多的线程,而在协调这些线程上导致的开销将降低计算性能。合理的做法是,将问题分解成一定数量的子问题,为每个子问题分配一个线程来进行求解,之后再将所有的结果合并起来。CellularAutomata将问题分解为Nepo个子问题,其中Nept等于可用CPU的数量,并将每个子问题分配给一个线程。在每个步骤中,工作线程都为各自子问题中的所有细胞计算新值。当所有工作线程都到达栅栏时,栅栏会把这些新值提交给数据模型。在栅栏的操作执行完以后,工作线程将开始下一步的计算,包括调用isDone方法来判断是否需要进行下一次迭代。

public class CellularAutomata {

private final Board mainBoard;

private final CyclicBarrier barrier;

private final worker[]workers;

public CellularAutomata(Board board){

this. mainBoard =board;

int count='Runtime. getRuntime(). availableProcessors();

this. barrier =new CyclicBarrier(count,

new Runnable(){

public void run(){

mainBoard. commitNewValues();

} } ) ;

this. workers =new Worker[count];

for  ( int  i  =0;i  < count;i++)

workers[i]=new Worker(mainBoard. getSubBoard(count,i));

}

private class worker implements Runnable {

private final Board board;

⊖   在这种不涉及I/O操作或共享数据访问的计算问题中,当线程数量为      Nepu或Nequ+11时将获得最优的吞吐量。更多的线程并不会带来任何帮助,甚至在某种程度上会降低性能,因为多个线程将会在CPU和内存等资源上发生竞争。

在程序清单5-16 的Computable<A,V>接口中声明了一个函数Computable,其输入类型为A,输出类型为V。在ExpensiveFunction 中实现的Computable,需要很长的时间来计算结果,我们将创建一个Computable 包装器,帮助记住之前的计算结果,并将缓存过程封装起来。(这项技术被称为“记忆[Memoization]”。)

public interface Computable<A,V>{


implements Computable<String,BigInteger>{

public BigInteger compute(String arg){

//在经过长时间的计算后

return new BigInteger(arg);

}

public class Memoizerl<A,V>implements Computable<A,V>{

@GuardedBy("this")

private final Map<A,V>cache=new HashMap<A,V>();

private final Computable<A,V>c;

public Memoizerl(Computable<A,V>c){

this. c  = c;

}

public synchronized V compute(A arg) throws InterruptedException {

V result =cache. get(arg);

if (result==null){

result =c. compute(arg);

cache. put(arg, result);

}

return result;

}

}

在程序清单5-16 中的Memoizerl 给出了第一种尝试:使用HashMap来保存之前计算的结果。compute 方法将首先检查需要的结果是否已经在缓存中,如果存在则返回之前计算的值。否则,将把计算结果缓存在HashMap中,然后再返回。

HashMap不是线程安全的,因此要确保两个线程不会同时访问HashMap, Memoizerl采用了一种保守的方法,即对整个compute 方法进行同步。这种方法能确保线程安全性,但会带来一个明显的可伸缩性问题:每次只有一个线程能够执行compute。如果另一个线程正在计算结果,那么其他调用compute 的线程可能被阻塞很长时间。如果有多个线程在排队等待还未计算出的结果,那么compute 方法的计算时间可能比没有“记忆”操作的计算时间更长。在图5-2中给出了当多个线程使用这种方法中的“记忆”操作时发生的情况,这显然不是我们希望通过缓存获得的性能提升结果。

程序清单5-17 中的Memoizer2用ConcurrentHashMap 代替HashMap来改进Memoizerl中糟糕的并发行为。由于ConcurrentHashMap 是线程安全的,因此在访问底层Map时就不需要进行同步,因而避免了在对Memoizerl中的compute 方法进行同步时带来的串行性。

Memoizer2 比Memoizerl有着更好的并发行为:多线程可以并发地使用它。但它在作为缓存时仍然存在一些不足——当两个线程同时调用compute 时存在一个漏洞,可能会导致计算得到相同的值。在使用memoization 的情况下,这只会带来低效,因为缓存的作用是避免相同的数据被计算多次。但对于更通用的缓存机制来说,这种情况将更为糟糕。对于只提供单次初始化的对象缓存来说,这个漏洞就会带来安全风险。.

public V compute(A arg) throws InterruptedException {

V result =cache. get(arg);

if (result ==null){

result =c. compute(arg);

cache. put(arg, result);

}

return result;

}

}

Memoizer2 的问题在于,如果某个线程启动了一个开销很大的计算,而其他线程并不知道这个计算正在进行,那么很可能会重复这个计算,如图5-3所示。我们希望通过某种方法来表达“线程X正在计算f(27)”这种情况,这样当另一个线程查找f(27)时,它能够知道最高效的方法是等待线程X计算结束,然后再去查询缓存“f(27)的结果是多少?”。

我们已经知道有一个类能基本实现这个功能:FutureTask。FutureTask表示一个计算的过程,这个过程可能已经计算完成,也可能正在进行。如果有结果可用,那么FutureTask. get将立即返回结果,否则它会一直阻塞,直到结果计算出来再将其返回。

程序清单5-18 中的Memoizer3 将用于缓存值的Map 重新定义为ConcurrentHashMap<A,Future<V>>,替换原来的ConcurrentHashMap<A,V>。Memoizer3 首先检查某个相应的计算是否已经开始(Memoizer2与之相反,它首先判断某个计算是否已经完成)。如果还没有启动,那么就创建一个FutureTask,并注册到Map 中,然后启动计算:如果已经启动,那么等待现有计算的结果。结果可能很快会得到,也可能还在运算过程中,但这对于Future. get 的调用者来说是透明的。

            序清单5-18基于FutureTask的 Memoizing 封装器                                    

public class Memoizer3<A,V>implements Computable<A,V>{

private final Map<A, Future<V>>cache

=new ConcurrentHashMap<A, Future<V>>();

private final Computable<A,V>c;

public Memoizer3(Computable<A,V>c){this. c =c;}

public V compute (final A arg) throws InterruptedException {

Future<V>f =cache. get(arg);

if   (f  == null) {

Callable<V>eval =new Callable<V>(){

public V call() throws InterruptedException {

return c. compute(arg);

}

}  ;

FutureTask<V>ft =new FutureTask<V>(eval);

f  =  ft;

cache. put(arg, ft);

ft. run();//   在这里将调用c. compute

}

try {

return  f. get();.

}catch (ExecutionException e){

throw launderThrowable(e. getCause());

}

}

}

Memoizer3的实现几乎是完美的:它表现出了非常好的并发性(基本上是源于ConcurrentHashMap 高效的并发性),若结果已经计算出来,那么将立即返回。如果其他线程正在计算该结果,那么新到的线程将一直等待这个结果被计算出来。它只有一个缺陷,即仍然存在两个线程计算出相同值的漏洞。这个漏洞的发生概率要远小于Memoizer2 中发生的概率,但

由于compute 方法中的if代码块仍然是非原子(nonatomic)的“先检查再执行”操作,因此两个线程仍有可能在同一时间内调用compute 来计算相同的值,即二者都没有在缓存中找到期望的值,因此都开始计算。

Memoizer3 中存在这个问题的原因是,复合操作(“若没有则添加”)是在底层的Map对象上执行的,而这个对象无法通过加锁来确保原子性。程序清单5-19中的Memoizer 使用了ConcurrentMap 中的原子方法putIfAbsent,避免了Memoizer3的漏洞。

public class Memoizer<A,V>implements Computable<A,V>{

private final ConcurrentMap<A, Future<V>>cache

=new ConcurrentHashMap<A, Future<V>>();

private final Computable<A,V>c;

public Memoizer(Computable<A,V>c){this. c =c;}

public V compute(final A arg) throws InterruptedException {

while (true){

Future<V>f =cache. get(arg);

if   (f  ==null) {

Callable<V>eval =new Callable<V>(){

public V call() throws InterruptedException {

return c. compute(arg);

}

} ;

FutureTask<V>ft =new FutureTask<V>(eval);

f =cache. putIfAbsent(arg, ft);

if   ( f  == null)  { f  = ft;ft. run( ) ;}

}

try   {

return f. get();

}catch {CancellationException e}{

cache. remove(arg,f);

}catch (ExecutionException e){

throw launderThrowable(e. getCause());

}

}

}

}

当缓存的是Future而不是值时,将导致缓存污染(Cache Pollution)问题:如果某个

计算被取消或者失败,那么在计算这个结果时将指明计算过程被取消或者失败。为了避免这种情况,如果Memoizer发现计算被取消,那么将把Future 从缓存中移除。如果检测到RuntimeException,那么也会移除Future,这样将来的计算才可能成功。Memoizer 同样没有解决缓存逾期的问题,但它可以通过使用FutureTask的子类来解决,在子类中为每个结果指定一个逾期时间,并定期扫描缓存中逾期的元素。(同样,它也没有解决缓存清理的问题,即移除旧的计算结果以便为新的计算结果腾出空间,从而使缓存不会消耗过多的内存。).

在完成并发缓存的实现后,就可以为第2章中因式分解servlet添加结果缓存。程序清单5-20 中的Factorizer 使用Memoizer 来缓存之前的计算结果,这种方式不仅高效,而且可扩展性也更高。

public class Factorizer implements Servlet {

private final Computable<BigInteger,BigInteger[]>c =

new Computable<BigInteger,BigInteger[]>(){

public BigInteger[]compute(BigInteger arg){

return factor(arg);

}

} ;

private final Computable<BigInteger,BigInteger[]>cache

=new Memoizer<BigInteger,BigInteger[]>(c);

public void service(ServletRequest req,

ServletResponse resp){

try {

BigInteger i =extractFromRequest(req);

encodeIntoResponse(resp, cache. compute(i));

}catch (InterruptedException e){

encodeError(resp,"factorization in. errupted");

}

}

}

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/2301_77783312/article/details/131027026