CountDownLatch简介 CountDownLatch是JDK并发包中提供的一个同步工具类。官方文档对这个同步工具的介绍是: Asynchronizationaidthatallowsoneormorethreadstowaituntilasetofoperationsbeingperformedinotherthreadscompletes。 上面的英文介绍大致意思是:CountDownLatch的主要功能是让一个或者多个线程等待直到一组在其他线程中执行的操作完成。 使用列子 观看上面的解释可能并不能直观地说明CountDownLatch的作用,下面我们通过一个简单的列子看下CountDownLatch的使用。 场景:主人(主线程)请客人(子线程)吃晚饭,需要等待所有客人都到了之后才开饭。我们用CountDownLatch来模拟下这个场景。publicclassCountDownLatchDemo{privatestaticfinalintPERSONCOUNT5;privatestaticfinalCountDownLatchcnewCountDownLatch(PERSONCOUNT);publicstaticvoidmain(String〔〕args)throwsInterruptedException{System。out。println(lammaster,waitingguests。。。);for(inti0;iPERSONCOUNT;i){intfinalIi;newThread(newRunnable(){SneakyThrowsOverridepublicvoidrun(){System。out。println(Thread。currentThread()。getName()lamperson〔finalI〕);TimeUnit。MILLISECONDS。sleep(500);System。out。println(Thread。currentThread()。getName()count:c。getCount());c。countDown();}})。start();}c。await();System。out。println(allguestsget,begindinner。。。);}} 上面的列子中,主人(master线程)请了5个客人吃饭,每个客人到了之后会将CountDownLatch的值减一,主人(master)会一直等待所有客人到来,最后输出开饭。 CountDownLatch的使用方式很简单,下面来看下它的实现原理。原理剖析 首先我们先看下CountDownLatch重要的APIgetCount():获取当前count的值。wait():让当前线程在此CountDownLatch对象上等待,可以中断。与notify()、notifyAll()方法对应。await():让当前线程等待此CountDownLatch对象的count变为0,可以中断。await(timeout,TimeUnit):让当前线程等待此CountDownLatch对象的count变为0,可以超时、可以中断。countDown():使此CountDownLatch对象的count值减1(无论执行多少次,count最小值为0)。 下面我们看下具体API的源代码构造函数publicCountDownLatch(intcount){if(count0)thrownewIllegalArgumentException(count0);this。syncnewSync(count);} 在构建CountDownLatch对象时需要传入一个int型的初始值,这个值就是计数器的初始值。从上面的代码中可以看出,创建CountDownLatch是new了一个Sync对象。privatestaticfinalclassSyncextendsAbstractQueuedSynchronizer{privatestaticfinallongserialVersionUID4982264981922014374L;Sync(intcount){setState(count);}intgetCount(){returngetState();}protectedinttryAcquireShared(intacquires){return(getState()0)?1:1;}protectedbooleantryReleaseShared(intreleases){Dsignalwhentransitiontozerofor(;;){intcgetState();if(c0)intnextcc1;if(compareAndSetState(c,nextc))returnnextc0;}}} Sync对象是基于AQS机制实现的,自己实现了tryAcquireShared和tryReleaseShared方法。await方法publicvoidawait()throwsInterruptedException{sync。acquireSharedInterruptibly(1);} 调用await方法其实是调用了AQS的acquireSharedInterruptibly方法。publicfinalvoidacquireSharedInterruptibly(intarg)throwsInterruptedException{if(Thread。interrupted())thrownewInterruptedException();if(tryAcquireShared(arg)0)doAcquireSharedInterruptibly(arg);} 在acquireSharedInterruptibly中先判断了下当前线程有没有被中断,假如线程已经被中断了,直接抛出中断异常。否则进入doAcquireSharedInterruptibly。privatevoiddoAcquireSharedInterruptibly(intarg)throwsInterruptedException{finalNodenodeaddWaiter(Node。SHARED);try{for(;;){finalNodepnode。predecessor();if(phead){intrtryAcquireShared(arg);if(r0){setHeadAndPropagate(node,r);p。helpGC}}if(shouldParkAfterFailedAcquire(p,node)parkAndCheckInterrupt())thrownewInterruptedException();}}finally{if(failed)cancelAcquire(node);}} doAcquireSharedInterruptibly的处理逻辑是先判断队列中是否只有当前线程,如果只有当前线程的先尝试获取下资源,如果获取资源成功就直接返回了。获取资源不成功就判断下是否要park当前线程,如果需要park当前线程, 那么当前线程就进入waiting状态。否则在for循环中一直执行上面的逻辑。countDown方法publicvoidcountDown(){sync。releaseShared(1);} 熟悉AQS机制的会知道上面的代码其实也是调的AQS的releaseShared。releaseShared的方法会调到Sync中的tryReleaseShared。protectedbooleantryReleaseShared(intreleases){Dsignalwhentransitiontozerofor(;;){intcgetState();if(c0)intnextcc1;if(compareAndSetState(c,nextc))returnnextc0;}} 上面的代码逻辑很简单:status的值是0的话就返回true,否则返回false。返回true的话,就会唤醒AQS队列中所有阻塞的线程。使用场景场景一:将任务分割成多个子任务,每个子任务由单个线程去完成,等所有线程完成后再将结果汇总。(MapReduce)这种场景下,CountDoenLatch作为一个完成信号来使用。场景二:多个线程等待,一直等到某个条件发生。比如多个赛跑运动员都做好了准备,就等待裁判手中的发令枪响。这种场景下,就可以将CountdownLatch的初始值设置成1。 简单总结CountDownLatch的初始值不能重置,只能减少不能增加,最多减少到0;当CountDownLatch计数值没减少到0之前,调用await方法可能会让调用线程进组一个阻塞队列,直到计数值减小到0;调用countDown方法会让计数值每次都减小1,但是最多减少到0。当CountDownLatch的计数值减少到0的时候,会唤醒所有在阻塞队列中的线程。