安庆大理运城常德铜陵江西
投稿投诉
江西南阳
嘉兴昆明
铜陵滨州
广东西昌
常德梅州
兰州阳江
运城金华
广西萍乡
大理重庆
诸暨泉州
安庆南充
武汉辽宁

Java并发编程的艺术线程间交换数据的Exchanger

12月8日 多上心投稿
  Exchanger是用于线程协作的工具类,主要用于两个线程之间的数据交换。带着BAT大厂的面试问题去理解Exchanger
  请带着这些问题继续后文,会很大程度上帮助你更好的理解Exchanger。Exchanger主要解决什么问题?对比SynchronousQueue,为什么说Exchanger可被视为SynchronousQueue的双向形式?Exchanger在不同的JDK版本中实现有什么差别?Exchanger实现机制?Exchanger已经有了slot单节点,为什么会加入arenanode数组?什么时候会用到数组?arena可以确保不同的slot在arena中是不会相冲突的,那么是怎么保证的呢?什么是伪共享,Exchanger中如何体现的?Exchanger实现举例Exchanger简介
  Exchanger用于进行两个线程之间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange()方法交换数据,当一个线程先执行exchange()方法后,它会一直等待第二个线程也执行exchange()方法,当这两个线程到达同步点时,这两个线程就可以交换数据了。Exchanger实现机制for(;;){if(slotisempty){offerslot为空时,将item设置到Node中placeiteminaNif(canCASslotfromemptytonode){当将node通过CAS交换到slot中时,挂起线程等待被唤醒被唤醒后返回node中匹配到的}}elseif(canCASslotfromnodetoempty){release将slot设置为空获取node中的item,将需要交换的数据设置到匹配的唤醒等待的线程}elseretryonCASfailure}
  比如有2条线程A和B,A线程交换数据时,发现slot为空,则将需要交换的数据放在slot中等待其它线程进来交换数据,等线程B进来,读取A设置的数据,然后设置线程B需要交换的数据,然后唤醒A线程,原理就是这么简单。但是当多个线程之间进行交换数据时就会出现问题,所以Exchanger加入了slot数组。Exchanger源码解析内部类ParticipantstaticfinalclassParticipantextendsThreadLocalNode{publicNodeinitialValue(){returnnewNode();}}
  Participant的作用是为每个线程保留唯一的一个Node节点,它继承ThreadLocal,说明每个线程具有不同的状态。内部类Nodesun。misc。ContendedstaticfinalclassNode{arena的下标,多个槽位的时候利用上一次记录的Exchanger。在当前bound下CAS失败的次数;用于自旋;这个线程的当前项,也就是需要交换的数据;O做releasing操作的线程传递的项;volatileO挂起时设置线程值,其他情况下为volatileT}
  在Node定义中有两个变量值得思考:bound以及collides。前面提到了数组area是为了避免竞争而产生的,如果系统不存在竞争问题,那么完全没有必要开辟一个高效的arena来徒增系统的复杂性。首先通过单个slot的exchanger来交换数据,当探测到竞争时将安排不同的位置的slot来保存线程Node,并且可以确保没有slot会在同一个缓存行上。如何来判断会有竞争呢?CAS替换slot失败,如果失败,则通过记录冲突次数来扩展arena的尺寸,我们在记录冲突的过程中会跟踪bound的值,以及会重新计算冲突次数在bound的值被改变时。核心属性privatefinalPprivatevolatileNode〔〕privatevolatileN为什么会有arena数组槽?
  slot为单个槽,arena为数组槽,他们都是Node类型。在这里可能会感觉到疑惑,slot作为Exchanger交换数据的场景,应该只需要一个就可以了啊?为何还多了一个Participant和数组类型的arena呢?一个slot交换场所原则上来说应该是可以的,但实际情况却不是如此,多个参与者使用同一个交换场所时,会存在严重伸缩性问题。既然单个交换场所存在问题,那么我们就安排多个,也就是数组arena。通过数组arena来安排不同的线程使用不同的slot来降低竞争问题,并且可以保证最终一定会成对交换数据。但是Exchanger不是一来就会生成arena数组来降低竞争,只有当产生竞争是才会生成arena数组。那么怎么将Node与当前线程绑定呢?
  Participant,Participant的作用就是为每个线程保留唯一的一个Node节点,它继承ThreadLocal,同时在Node节点中记录在arena中的下标index。构造函数CreatesanewExchanger。publicExchanger(){participantnewParticipant();}
  初始化participant对象。核心方法exchange(Vx)
  等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象。publicVexchange(Vx)throwsInterruptedException{O当参数为null时需要将item设置为空的对象Objectitem(xnull)?NULLITEM:x;translatenullargs注意到这里的这个表达式是整个方法的核心if((arena!null(vslotExchange(item,false,0L))null)((Thread。interrupted()disambiguatesnullreturn(varenaExchange(item,false,0L))null)))thrownewInterruptedException();return(vNULLITEM)?null:(V)v;}
  这个方法比较好理解:arena为数组槽,如果为null,则执行slotExchange()方法,否则判断线程是否中断,如果中断值抛出InterruptedException异常,没有中断则执行arenaExchange()方法。整套逻辑就是:如果slotExchange(Objectitem,booleantimed,longns)方法执行失败了就执行arenaExchange(Objectitem,booleantimed,longns)方法,最后返回结果V。
  NULLITEM为一个空节点,其实就是一个Object对象而已,slotExchange()为单个slot交换。slotExchange(Objectitem,booleantimed,longns)privatefinalObjectslotExchange(Objectitem,booleantimed,longns){获取当前线程node对象Nodepparticipant。get();当前线程ThreadtThread。currentThread();若果线程被中断,就直接返回nullif(t。isInterrupted())自旋for(N;){将slot值赋给qif((qslot)!null){slot不为null,即表示已有线程已经把需要交换的数据设置在slot中了通过CAS将slot设置成nullif(U。compareAndSwapObject(this,SLOT,q,null)){CAS操作成功后,将slot中的item赋值给对象v,以便返回。这里也是就读取之前线程要交换的数据Objectvq。将当前线程需要交给的数据设置在q中的matchq。获取被挂起的线程Threadwq。if(w!null)如果线程不为null,唤醒它U。unpark(w);返回其他线程给的V}createarenaoncontention,butcontinueuntilslotnullCAS操作失败,表示有其它线程竞争,在此线程之前将数据已取走NCPU:CPU的核数bound0表示arena数组未初始化过,CAS操作bound将其增加SEQif(NCPU1bound0U。compareAndSwapInt(this,BOUND,0,SEQ))初始化arena数组arenanewNode〔(FULL2)ASHIFT〕;}上面分析过,只有当arena不为空才会执行slotExchange方法的所以表示刚好已有其它线程加入进来将arena初始化elseif(arena!null)这里就需要去执行arenaEcallermustreroutetoarenaExchangeelse{这里表示当前线程是以第一个线程进来交换数据或者表示之前的数据交换已进行完毕,这里可以看作是第一个线程将需要交换的数据先存放在当前线程变量p中p。将需要交换的数据通过CAS设置到交换区slotif(U。compareAndSwapObject(this,SLOT,null,p))交换成功后跳出自旋CAS操作失败,表示有其它线程刚好先于当前线程将数据设置到交换区slot将当前线程变量中的item设置为null,然后自旋获取其它线程存放在交换区slot的数据p。}}awaitrelease执行到这里表示当前线程已将需要的交换的数据放置于交换区slot中了,等待其它线程交换数据然后唤醒当前线程inthp。longendtimed?System。nanoTime()ns:0L;自旋次数intspins(NCPU1)?SPINS:1;O自旋等待直到p。match不为null,也就是说等待其它线程将需要交换的数据放置于交换区slotwhile((vp。match)null){下面的逻辑主要是自旋等待,直到spins递减到0为止if(spins0){hh1;hh3;hh10;if(h0)hSPINS(int)t。getId();elseif(h0(spins((SPINS1)1))0)Thread。yield();}elseif(slot!p)spinsSPINS;此处表示未设置超时或者时间未超时elseif(!t。isInterrupted()arenanull(!timed(nsendSystem。nanoTime())0L)){设置线程t被当前对象阻塞U。putObject(t,BLOCKER,this);给p挂机线程的值赋值p。if(slotp)如果slot还没有被置为null,也就表示暂未有线程过来交换数据,需要将当前线程挂起U。park(false,ns);线程被唤醒,将被挂起的线程设置为nullp。设置线程t未被任何对象阻塞U。putObject(t,BLOCKER,null);不是以上条件时(可能是arena已不为null或者超时)}elseif(U。compareAndSwapObject(this,SLOT,p,null)){arena不为null则v为null,其它为超时则v为超市对象TIMEDOUT,并且跳出循环vtimedns0L!t。isInterrupted()?TIMEDOUT:}}取走match值,并将p中的match置为nullU。putOrderedObject(p,MATCH,null);设置item为nullp。p。返回交换值}
  程序首先通过participant获取当前线程节点Node。检测是否中断,如果中断returnnull,等待后续抛出InterruptedException异常。如果slot不为null,则进行slot消除,成功直接返回数据V,否则失败,则创建arena消除数组。如果slot为null,但arena不为null,则返回null,进入arenaExchange逻辑。如果slot为null,且arena也为null,则尝试占领该slot,失败重试,成功则跳出循环进入spinblock(自旋阻塞)模式。
  在自旋阻塞模式中,首先取得结束时间和自旋次数。如果match(做releasing操作的线程传递的项)为null,其首先尝试spins随机次自旋(改自旋使用当前节点中的hash,并改变之)和退让。当自旋数为0后,假如slot发生了改变(slot!p)则重置自旋数并重试。否则假如:当前未中断arena为null(当前不是限时版本或者限时版本当前时间未结束):阻塞或者限时阻塞。假如:当前中断或者arena不为null或者当前为限时版本时间已经结束:不限时版本:置v为限时版本:如果时间结束以及未中断则TIMEDOUT;否则给出null(原因是探测到arena非空或者当前线程中断)。
  match不为空时跳出循环。arenaExchange(Objectitem,booleantimed,longns)
  此方法被执行时表示多个线程进入交换区交换数据,arena数组已被初始化,此方法中的一些处理方式和slotExchange比较类似,它是通过遍历arena数组找到需要交换的数据。timed为true表示设置了超时时间,ns为0的值,反之没有设置超时时间privatefinalObjectarenaExchange(Objectitem,booleantimed,longns){Node〔〕获取当前线程中的存放的nodeNodepparticipant。get();index初始值0for(intip。;){accessslotati遍历,如果在数组中找到数据则直接交换并唤醒线程,如未找到则将需要交换给其它线程的数据放置于数组中intb,m,c;jisrawarrayoffset其实这里就是向右遍历数组,只是用到了元素在内存偏移的偏移量q实际为arena数组偏移(i1)128个地址位上的nodeNodeq(Node)U。getObjectVolatile(a,j(iASHIFT)ABASE);如果q不为null,并且CAS操作成功,将下标j的元素置为nullif(q!nullU。compareAndSwapObject(a,j,q,null)){表示当前线程已发现有交换的数据,然后获取数据,唤醒等待的线程Objectvq。releaseq。Threadwq。if(w!null)U。unpark(w);q为null并且i未超过数组边界}elseif(i(m(bbound)MMASK)qnull){将需要给其它线程的item赋予给p中的itemp。offerif(U。compareAndSwapObject(a,j,null,p)){交换成功longend(timedm0)?System。nanoTime()ns:0L;ThreadtThread。currentThread();wait自旋直到有其它线程进入,遍历到该元素并与其交换,同时当前线程被唤醒for(inthp。hash,spinsSPINS;;){Objectvp。if(v!null){其它线程设置的需要交换的数据match不为null将match设置null,item设置为nullU。putOrderedObject(p,MATCH,null);p。clearfornextusep。}elseif(spins0){hh1;hh3;hh10;xorshiftif(h0)initializehashhSPINS(int)t。getId();elseif(h0approx50true(spins((SPINS1)1))0)Thread。yield();twoyieldsperwait}elseif(U。getObjectVolatile(a,j)!p)和slotExchange方法中的类似,arena数组中的数据已被CAS设置match值还未设置,让其再自旋等待match被设置spinsSPINS;releaserhasntsetmatchyetelseif(!t。isInterrupted()m0(!timed(nsendSystem。nanoTime())0L)){设置线程t被当前对象阻塞U。putObject(t,BLOCKER,this);emulateLockSupport线程t赋值p。minimizewindowif(U。getObjectVolatile(a,j)p)数组中对象还相等,表示线程还未被唤醒,唤醒线程U。park(false,ns);p。设置线程t未被任何对象阻塞U。putObject(t,BLOCKER,null);}elseif(U。getObjectVolatile(a,j)pU。compareAndSwapObject(a,j,p,null)){这里给bound增加加一个SEQif(m!0)trytoshrinkU。compareAndSwapInt(this,BOUND,b,bSEQ1);p。p。ip。index1;descendif(Thread。interrupted())if(timedm0ns0L)returnTIMEDOUT;restart}}}else交换失败,表示有其它线程更改了arena数组中下标i的元素p。clearoffer}else{此时表示下标不在boundMMASK或q不为null但CAS操作失败需要更新bound变化后的值if(p。bound!b){resetp。p。collides0;反向遍历i(i!mm0)?m:m1;}elseif((cp。collides)mmFULL!U。compareAndSwapInt(this,BOUND,b,bSEQ1)){记录CAS失败的次数p。collidesc1;循环遍历i(i0)?m:i1;cyclicallytraverse}else此时表示bound值增加了SEQ1im1;grow设置下标p。}}}
  首先通过participant取得当前节点Node,然后根据当前节点Node的index去取arena中相对应的节点node。前面提到过arena可以确保不同的slot在arena中是不会相冲突的,那么是怎么保证的呢?arenanewNode〔(FULL2)ASHIFT〕;这个arena到底有多大呢?我们先看FULL和ASHIFT的定义:staticfinalintFULL(NCPU(MMASK1))?MMASK:NCPU1;privatestaticfinalintASHIFT7;privatestaticfinalintNCPURuntime。getRuntime()。availableProcessors();privatestaticfinalintMMASK0255假如我的机器NCPU8,则得到的是768大小的arena数组。然后通过以下代码取得在arena中的节点:Nodeq(Node)U。getObjectVolatile(a,j(iASHIFT)ABASE);它仍然是通过右移ASHIFT位来取得Node的,ABASE定义如下:C?akNode〔〕。ABASEU。arrayBaseOffset(ak)(1ASHIFT);U。arrayBaseOffset获取对象头长度,数组元素的大小可以通过unsafe。arrayIndexScale(T〔〕。class)方法获取到。这也就是说要访问类型为T的第N个元素的话,你的偏移量offset应该是arrayOffsetNarrayScale。也就是说BASEarrayOffset128。用sun。misc。Contended来规避伪共享?
  伪共享说明:假设一个类的两个相互独立的属性a和b在内存地址上是连续的(比如FIFO队列的头尾指针),那么它们通常会被加载到相同的cpucacheline里面。并发情况下,如果一个线程修改了a,会导致整个cacheline失效(包括b),这时另一个线程来读b,就需要从内存里再次加载了,这种多线程频繁修改ab的情况下,虽然a和b看似独立,但它们会互相干扰,非常影响性能。
  我们再看Node节点的定义,在Java8中我们是可以利用sun。misc。Contended来规避伪共享的。所以说通过ASHIFT方式加上sun。misc。Contended,所以使得任意两个可用Node不会再同一个缓存行中。sun。misc。ContendedstaticfinalclassNode{。。。。}
  我们再次回到arenaExchange()。取得arena中的node节点后,如果定位的节点q不为空,且CAS操作成功,则交换数据,返回交换的数据,唤醒等待的线程。如果q等于null且下标在boundMMASK范围之内,则尝试占领该位置,如果成功,则采用自旋阻塞的方式进行等待交换数据。如果下标不在boundMMASK范围之内获取由于q不为null但是竞争失败的时候:消除p。加入bound不等于当前节点的bond(b!p。bound),则更新p。boundb,collides0,im或者m1。如果冲突的次数不到m获取m已经为最大值或者修改当前bound的值失败,则通过增加一次collides以及循环递减下标i的值;否则更新当前bound的值成功:我们令i为m1即为此时最大的下标。最后更新当前index的值。更深入理解SynchronousQueue对比?
  Exchanger是一种线程间安全交换数据的机制。可以和之前分析过的SynchronousQueue对比一下:线程A通过SynchronousQueue将数据a交给线程B;线程A通过Exchanger和线程B交换数据,线程A把数据a交给线程B,同时线程B把数据b交给线程A。可见,SynchronousQueue是交给一个数据,Exchanger是交换两个数据。不同JDK实现有何差别?在JDK5中Exchanger被设计成一个容量为1的容器,存放一个等待线程,直到有另外线程到来就会发生数据交换,然后清空容器,等到下一个到来的线程。从JDK6开始,Exchanger用了类似ConcurrentMap的分段思想,提供了多个slot,增加了并发执行时的吞吐量。
  JDK1。6实现可以参考这里(opensnewwindow)Exchanger示例
  来一个非常经典的并发问题:你有相同的数据buffer,一个或多个数据生产者,和一个或多个数据消费者。只是Exchange类只能同步2个线程,所以你只能在你的生产者和消费者问题中只有一个生产者和一个消费者时使用这个类。publicclassTest{staticclassProducerextendsThread{privateExchangerIprivatestaticintdata0;Producer(Stringname,ExchangerIntegerexchanger){super(Producername);this。}Overridepublicvoidrun(){for(inti1;i5;i){try{TimeUnit。SECONDS。sleep(1);System。out。println(getName()交换前:data);dataexchanger。exchange(data);System。out。println(getName()交换后:data);}catch(InterruptedExceptione){e。printStackTrace();}}}}staticclassConsumerextendsThread{privateExchangerIprivatestaticintdata0;Consumer(Stringname,ExchangerIntegerexchanger){super(Consumername);this。}Overridepublicvoidrun(){while(true){data0;System。out。println(getName()交换前:data);try{TimeUnit。SECONDS。sleep(1);dataexchanger。exchange(data);}catch(InterruptedExceptione){e。printStackTrace();}System。out。println(getName()交换后:data);}}}publicstaticvoidmain(String〔〕args)throwsInterruptedException{ExchangerIntegerexchangernewExchangerInteger();newProducer(,exchanger)。start();newConsumer(,exchanger)。start();TimeUnit。SECONDS。sleep(7);System。exit(1);}}pdai:代码已经复制到剪贴板
  可以看到,其结果可能如下:Consumer交换前:0Producer交换前:1Consumer交换后:1Consumer交换前:0Producer交换后:0Producer交换前:2Producer交换后:0Consumer交换后:2Consumer交换前:0Producer交换前:3Producer交换后:0Consumer交换后:3Consumer交换前:0Producer交换前:4Producer交换后:0Consumer交换后:4Consumer交换前:0
投诉 评论 转载

刘亦菲曾轶可登上热搜,穿情侣鞋戴同款项链,多次约会疑似恋爱不知道从什么时候开始,内娱开始考古了,尤其是当某个人再次爆红后,过往的事情都会被网友扒个底朝天。这不,我们的神仙姐姐刘亦菲和曾轶可又登上热搜了,这一对CP当年曾有不少人磕过,但……整个A股市场估值最低的100股(低估值)更新日期:2023年1月8日10:06:23一、低估值名单二、融资融券融资净最大买入的100股序号代码名称昨日涨跌幅融资净买入(……自主游戏,让幼儿挑战自我获得成长来源:【长沙晚报网】长沙晚报掌上长沙12月18日讯(全媒体记者陈良通讯员陈雅斯)今天我和小朋友们一起建了一个‘汪汪队基地’,太好玩了!近日,望城区教育局万润滨江幼儿园小三……WHL深圳昆仑鸿星3比4负白熊,联赛第一阶段比赛暂列第五名北京时间11月25日,20222023赛季俄罗斯女子冰球联赛(WHL)展开常规赛第22轮较量,由中国国际文化传播中心主管的深圳昆仑鸿星女子冰球队主场以3比4憾负白熊队,七连胜被……来月经这几天需要注意什么,男的女的看了收藏起来以后也用的着女人的月经期这几天需要注意些什么?女的的月经周期都时间一般在37天左右。有的时间短,有的时间长。1。在来月经的时间要注意饮食清淡,不要吃冰冷寒凉的食物和饮料。保暖,……生产环境定位日志太麻烦怎么办?建议了解一下日志框架的MDC功对于每一个开发者来说,查询接口的执行日志都是一个高频率的操作,每当测试说接口有问题时,我们都需要去服务器或者日志系统上查报错的原因。一般情况下,我们会通过对应的关键字或者……俗语人睡三觉,命比纸薄,是什么意思?并非迷信,建议了解人生有13的时间是在睡眠中度过的,有一个良好的睡眠对身体以及精神有着重要作用。但当下年轻人的昼夜颠式,晚睡晚起式睡眠方式正在摧毁你的身。只有保证晚上充足的睡眠,白天才有精……Java并发编程的艺术线程间交换数据的ExchangerExchanger是用于线程协作的工具类,主要用于两个线程之间的数据交换。带着BAT大厂的面试问题去理解Exchanger请带着这些问题继续后文,会很大程度上帮助你更好的……美成童话秘境!金山这些赏红叶宝藏地,许你一场秋日盛宴又是一年红叶季,想不想好好感受一下金山红叶的魅力?这份红叶观赏攻略送给你。秋末冬初,微凉的风吹拂着花开海上生态园,酝酿而成璀璨耀眼的枫叶飘红景观。鸡爪槭、美国红枫、……T1新赛季大名单后天公布,此志无双第二集上线T1新赛季大名单后天公布T1首席执行官Joe在推特上表示2023年选手和教练组大名单将于北京时间11月28日9:00公布。UP下路Elk离队UP官宣下路Elk……今天,我县4个重大项目集中开工9月15日,我县举行重大项目集中开(竣)工活动。市政协副主席李志珍出席活动并宣布项目开(竣)工,县委书记王洪灿,县委副书记、县长黄力,县领导蒋尚庭、贺冬林、胡江海、李晓峰、李敏……西甲巴伦西亚对阵巴萨,结果如何?莱万后悔加盟巴萨?西甲足球本轮巴伦西亚对阵巴萨,巴萨是否能取胜?上周巴萨欧冠被拜仁淘汰,小组赛遗憾不能出线,主场竟然0:3告负,现在只能寄托于打好国内联赛了。但赛后西媒报……
米兰看上19岁天才妖童,中场未来指挥官,先租后买开价1300银行短信服务费,你每月还在扣款吗?这些银行不收了,望周知日常生活日记第43天亲近大自然老狐狸心经(十)sql强化演练超详尽(带数据)中国游客吃日本拉面,叮嘱加多点肉,上桌后嘀咕兰州拉面学着点特朗普73岁的原配去世!梳了一辈子贵妇头发型,年轻时确实美长期喝电热水壶烧的水,会对身体产生什么影响吗?提前了解奥运冠军张继科赌博成瘾与前女友分手有隐情以买房为由借钱还赌债别只盯着iPhone14了,安卓一大波新机可能更适合你科学家尝试在碎片盘中寻找系外行星邦德自强之路04汉源轿顶山

友情链接:中准网聚热点快百科快传网快生活快软网快好知文好找七猫云易事利