头条创作挑战赛一、前言 前文我们分析了RocketMQ源码之broker高可用CommitLog管理组件DLedgerCommitLog,本文我们分析DLedgerCommitLog中的mmap内存映射文件存储组件:DLedgerMmapFileS二、源码分析DLedgerMmapFileStore抽象父类DLedgerSDLedgerMmapFileStore成员变量;DLedgerMmapFileStore构造函数;flush数据服务线程;清理空间服务线程;文件内存映射存储实现启动;追加数据;对数据文件进行截断;以follower身份追加数据;根据索引去查询一个数据条目;数据存储格式及编码;1、DLedgerMmapFileStore抽象父类DLedgerStore存储组件抽象类publicabstractclassDLedgerStore{获取到当前server的成员状态publicMemberStategetMemberState(){}作为leader把一个entry追加到磁盘里去publicabstractDLedgerEntryappendAsLeader(DLedgerEntryentry);作为follower把一个entry追加到磁盘里去再追加的时候,我是需要知道是哪个leader同步了这个entry给我的paramentry日志条目paramleaderTermleader选举周期paramleaderIdleaderidreturnpublicabstractDLedgerEntryappendAsFollower(DLedgerEntryentry,longleaderTerm,StringleaderId);根据索引获取日志entrypublicabstractDLedgerEntryget(Longindex);获取已经提交的index索引publicabstractlonggetCommittedIndex();在一轮term里更新已经提交index索引publicvoidupdateCommittedIndex(longterm,longcommittedIndex){}获取结尾termpublicabstractlonggetLedgerEndTerm();获取结尾indexpublicabstractlonggetLedgerEndIndex();获取开始indexpublicabstractlonggetLedgerBeginIndex();更新结尾index和termprotectedvoidupdateLedgerEndIndexAndTerm(){if(getMemberState()!null){getMemberState()。updateLedgerIndexAndTerm(getLedgerEndIndex(),getLedgerEndTerm());}}在存储组件里发起一次flush操作publicvoidflush(){}对指定的leaderterm和id发起一个entry的截断,truncatepubliclongtruncate(DLedgerEntryentry,longleaderTerm,StringleaderId){return1;}存储组件启动publicvoidstartup(){}存储组件停止publicvoidshutdown(){}}2、DLedgerMmapFileStore成员变量基于mmap内存映射文件的存储组件实现,是我们需要重点研究的publicclassDLedgerMmapFileStoreextendsDLedgerStore{publicstaticfinalStringCHECKPOINTFILEpublicstaticfinalStringENDINDEXKEYendIpublicstaticfinalStringCOMMITTEDINDEXKEYcommittedIpublicstaticfinalintMAGIC11;publicstaticfinalintCURRENTMAGICMAGIC1;publicstaticfinalintINDEXUNITSIZE32;privatestaticLoggerloggerLoggerFactory。getLogger(DLedgerMmapFileStore。class);追加entries钩子publicListappendHooksnewArrayList();开始index索引privatelongledgerBeginIndex1;结束index索引privatelongledgerEndIndex1;已经提交的index索引privatelongcommittedIndex1;已经提交的pos位置privatelongcommittedPos1;结束term条目privatelongledgerEndTdledger核心配置组件privateDLedgerConfigdLedgerCserver节点成员状态privateMemberStatememberSmmap内存映射数据文件listprivateMmapFileListdataFileLmmap内存映射索引文件listprivateMmapFileListindexFileL线程本地副本里面的entry缓冲组件privateThreadLocalByteBufferlocalEntryB线程本地副本里面的index缓冲组件privateThreadLocalByteBufferlocalIndexBflush数据服务组件privateFlushDataServiceflushDataS清理空间服务组件privateCleanSpaceServicecleanSpaceS磁盘是否已经满了标识privatevolatilebooleanisDiskF最近一次检查点时间戳privatelonglastCheckPointTimeMsSystem。currentTimeMillis();是否已经加载过的boolean标识privateAtomicBooleanhasLoadednewAtomicBoolean(false);是否已经完成恢复的boolean标识privateAtomicBooleanhasRecoverednewAtomicBoolean(false);完整存储路径setprivatevolatileSetStringfullStorePathsCollections。emptySet();}3、DLedgerMmapFileStore构造函数publicDLedgerMmapFileStore(DLedgerConfigdLedgerConfig,MemberStatememberState){赋值一个dledger配置组件this。dLedgerConfigdLedgerC赋值一个server成员状态组件this。memberStatememberS我们的dledger数据存储路径里面如果说包含有多路径分隔符if(dLedgerConfig。getDataStorePath()。contains(DLedgerConfig。MULTIPATHSPLITTER)){把数据文件list封装为一个多路径mmap内存映射文件listthis。dataFileListnewMultiPathMmapFileList(dLedgerConfig,dLedgerConfig。getMappedFileSizeForEntryData(),this::getFullStorePaths);}else{把数据文件list封装为一个mmap文件listthis。dataFileListnewMmapFileList(dLedgerConfig。getDataStorePath(),dLedgerConfig。getMappedFileSizeForEntryData());}把索引文件list封装成一个mmap内存映射文件listthis。indexFileListnewMmapFileList(dLedgerConfig。getIndexStorePath(),dLedgerConfig。getMappedFileSizeForEntryIndex());线程副本的内存分配,localentry的内存分配,4mblocalEntryBufferThreadLocal。withInitial(()ByteBuffer。allocate(410241024));线程副本的内存分配,localindex的内存分配,一个索引单元大小(32个字节)2localIndexBufferThreadLocal。withInitial(()ByteBuffer。allocate(INDEXUNITSIZE2));构建从内存里flush数据到磁盘文件里的服务组件flushDataServicenewFlushDataService(DLedgerFlushDataService,logger);构建清理空间服务组件cleanSpaceServicenewCleanSpaceService(DLedgerCleanSpaceService,logger);}4、flush数据服务线程分析 实际flush动作会调用MmapFileListflush方法,RocketMQ源码分析之文件内存映射对象层MappedFile核心方法分析过,不再赘叙;flush数据服务classFlushDataServiceextendsShutdownAbleThread{publicFlushDataService(Stringname,Loggerlogger){super(name,logger);}他会不断的周期性的运行,但是支持关闭他OverridepublicvoiddoWork(){try{longstartSystem。currentTimeMillis();他会周期性的去触发数据文件的flush动作DLedgerMmapFileStore。this。dataFileList。flush(0);他会周期性的去触发索引文件的flush动作DLedgerMmapFileStore。this。indexFileList。flush(0);if((elapsedDLedgerUtils。elapsed(start))500){logger。info(Flushdatacost{}ms,elapsed);}如果说超过了一个检查点时间间隔,还需要去发起一次检查点持久化if(DLedgerUtils。elapsed(lastCheckPointTimeMs)dLedgerConfig。getCheckPointInterval()){persistCheckPoint();lastCheckPointTimeMsSystem。currentTimeMillis();}休眠flush间隔时间waitForRunning(dLedgerConfig。getFlushFileInterval());}catch(Throwablet){logger。info(Errorin{},getName(),t);DLedgerUtils。sleep(200);}}}5、清理空间服务线程清理空间服务线程classCleanSpaceServiceextendsShutdownAbleThread{获取到磁盘空间已经使用比例doublestoreBaseRatioDLedgerUtils。getDiskPartitionSpaceUsedPercent(dLedgerConfig。getStoreBaseDir());数据存储路径里物理占用比例doubledataRatiocalcDataStorePathPhysicRatio();publicCleanSpaceService(Stringname,Loggerlogger){super(name,logger);}OverridepublicvoiddoWork(){try{storeBaseRatioDLedgerUtils。getDiskPartitionSpaceUsedPercent(dLedgerConfig。getStoreBaseDir());dataRatiocalcDataStorePathPhysicRatio();longhourOfMs3600L1000L;longfileReservedTimeMsdLedgerConfig。getFileReservedHours()hourOfMs;if(fileReservedTimeMshourOfMs){logger。warn(ThefileReservedTimeMs{}issmallerthanhourOfMs{},fileReservedTimeMs,hourOfMs);fileReservedTimeMshourOfMs;}Ifthediskisfull,shouldpreventmoredatatogetinDLedgerMmapFileStore。this。isDiskFullisNeedForbiddenWrite();booleantimeUpisTimeToDelete();booleancheckExpiredisNeedCheckExpired();booleanforceCleanisNeedForceClean();booleanenableForceCleandLedgerConfig。isEnableDiskForceClean();intintervalForcibly1201000;if(timeUpcheckExpired){intcountgetDataFileList()。deleteExpiredFileByTime(fileReservedTimeMs,100,intervalForcibly,forceCleanenableForceClean);if(count0(forceCleanenableForceClean)isDiskFull){logger。info(Cleanspacecount{}timeUp{}checkExpired{}forceClean{}enableForceClean{}diskFull{}storeBaseRatio{}dataRatio{},count,timeUp,checkExpired,forceClean,enableForceClean,isDiskFull,storeBaseRatio,dataRatio);}if(count0){DLedgerMmapFileStore。this。reviseLedgerBeginIndex();}}getDataFileList()。retryDeleteFirstFile(intervalForcibly);waitForRunning(100);}catch(Throwablet){logger。info(Errorin{},getName(),t);DLedgerUtils。sleep(200);}}privatebooleanisTimeToDelete(){StringwhenDLedgerMmapFileStore。this。dLedgerConfig。getDeleteWhen();if(DLedgerUtils。isItTimeToDo(when)){}}privatebooleanisNeedCheckExpired(){if(storeBaseRatiodLedgerConfig。getDiskSpaceRatioToCheckExpired()dataRatiodLedgerConfig。getDiskSpaceRatioToCheckExpired()){}}privatebooleanisNeedForceClean(){if(storeBaseRatiodLedgerConfig。getDiskSpaceRatioToForceClean()dataRatiodLedgerConfig。getDiskSpaceRatioToForceClean()){}}privatebooleanisNeedForbiddenWrite(){if(storeBaseRatiodLedgerConfig。getDiskFullRatio()dataRatiodLedgerConfig。getDiskFullRatio()){}}计算数据存储路径物理比例publicdoublecalcDataStorePathPhysicRatio(){SetStringfullStorePathnewHashSet();StringstorePathdLedgerConfig。getDataStorePath();String〔〕pathsstorePath。trim()。split(DLedgerConfig。MULTIPATHSPLITTER);doubleminPhysicRatio100;遍历每一个path路径for(Stringpath:paths){doublephysicRatioDLedgerUtils。isPathExists(path)?DLedgerUtils。getDiskPartitionSpaceUsedPercent(path):1;minPhysicRatioMath。min(minPhysicRatio,physicRatio);if(physicRatiodLedgerConfig。getDiskSpaceRatioToForceClean()){fullStorePath。add(path);}}DLedgerMmapFileStore。this。setFullStorePaths(fullStorePath);returnminPhysicR}}6、文件内存映射存储实现启动对存储组件可以去执行startup启动函数Overridepublicvoidstartup(){数据文件和索引文件加载load();数据恢复recover();flush数据服务组件启动flushDataService。start();清理空间服务组件cleanSpaceService。start();}publicvoidload(){if(!hasLoaded。compareAndSet(false,true)){}mmap内存映射数据文件加载和mmap内存映射索引文件加载if(!this。dataFileList。load()!this。indexFileList。load()){logger。error(Loadfilefailed,thisusuallyindicatesfatalerror,youshouldcheckitmanually);System。exit(1);}}7、追加数据我们可以把一条数据entry追加到我们的存储组件里来OverridepublicDLedgerEntryappendAsLeader(DLedgerEntryentry){当前节点的状态是否是Leader,如果不是,则抛出异常PreConditions。check(memberState。isLeader(),DLedgerResponseCode。NOTLEADER);当前磁盘是否已满,其判断依据是DLedger的根目录或数据文件目录的使用率超过了允许使用的最大值,默认值为85PreConditions。check(!isDiskFull,DLedgerResponseCode。DISKFULL);从线程本地副本里获取到一个自己当前线程的数据缓冲区和索引缓冲区ByteBufferdataBufferlocalEntryBuffer。get();ByteBufferindexBufferlocalIndexBuffer。get();把entry数据编码到数据缓冲区里去DLedgerEntryCoder。encode(entry,dataBuffer);通过数据缓冲区里面的remaining可以获取到entry大小intentrySizedataBuffer。remaining();对server成员状态加锁synchronized(memberState){PreConditions。check(memberState。isLeader(),DLedgerResponseCode。NOTLEADER,null);PreConditions。check(memberState。getTransferee()null,DLedgerResponseCode。LEADERTRANSFERRING,null);所以说endindex1了以后,从1到0,随着追加数据累加的索引值longnextIndexledgerEndIndex1;设置一下索引值entry。setIndex(nextIndex);通过server成员状态获取到term第几轮entry。setTerm(memberState。currTerm());设置魔数entry。setMagic(CURRENTMAGIC);把累加索引、当前term、魔数,写入到了数据缓冲区里去DLedgerEntryCoder。setIndexTerm(dataBuffer,nextIndex,memberState。currTerm(),CURRENTMAGIC);我准备把这条数据预追加到我们的数据文件mmapfiles里去longprePosdataFileList。preAppend(dataBuffer。remaining());entry。setPos(prePos);PreConditions。check(prePos!1,DLedgerResponseCode。DISKERROR,null);DLedgerEntryCoder。setPos(dataBuffer,prePos);在正式写入数据之前可以回调我们的追加hook钩子for(AppendHookwriteHook:appendHooks){writeHook。doHook(entry,dataBuffer。slice(),DLedgerEntry。BODYOFFSET);}数据文件mmapfiles追加对应的数据longdataPosdataFileList。append(dataBuffer。array(),0,dataBuffer。remaining());PreConditions。check(dataPos!1,DLedgerResponseCode。DISKERROR,null);PreConditions。check(dataPosprePos,DLedgerResponseCode。DISKERROR,null);关于dledger索引追加写入DLedgerEntryCoder。encodeIndex(dataPos,entrySize,CURRENTMAGIC,nextIndex,memberState。currTerm(),indexBuffer);索引文件mmapfiles追加一条索引进文件longindexPosindexFileList。append(indexBuffer。array(),0,indexBuffer。remaining(),false);PreConditions。check(indexPosentry。getIndex()INDEXUNITSIZE,DLedgerResponseCode。DISKERROR,null);if(logger。isDebugEnabled()){logger。info(〔{}〕AppendasLeader{}{},memberState。getSelfId(),entry。getIndex(),entry。getBody()。length);}每次追加一条数据写入,写入完了以后endIndex就会累加ledgerEndIledgerEndTermmemberState。currTerm();拿到成员状态的当前termif(ledgerBeginIndex1){ledgerBeginIndexledgerEndI}updateLedgerEndIndexAndTerm();}}8、对数据文件进行截断对数据文件做一个截断,有一部分数据就直接不要了Overridepubliclongtruncate(DLedgerEntryentry,longleaderTerm,StringleaderId){PreConditions。check(memberState。isFollower(),DLedgerResponseCode。NOTFOLLOWER,null);获取到线程本地副本里的数据缓冲区和索引缓冲区ByteBufferdataBufferlocalEntryBuffer。get();ByteBufferindexBufferlocalIndexBuffer。get();DLedgerEntryCoder。encode(entry,dataBuffer);intentrySizedataBuffer。remaining();synchronized(memberState){PreConditions。check(memberState。isFollower(),DLedgerResponseCode。NOTFOLLOWER,roles,memberState。getRole());PreConditions。check(leaderTermmemberState。currTerm(),DLedgerResponseCode。INCONSISTENTTERM,termd!d,leaderTerm,memberState。currTerm());PreConditions。check(leaderId。equals(memberState。getLeaderId()),DLedgerResponseCode。INCONSISTENTLEADER,leaderIds!s,leaderId,memberState。getLeaderId());直接去根据索引读取一条数据出来booleanexistedEtry{DLedgerEntrytmpget(entry。getIndex());existedEntryentry。equals(tmp);}catch(Throwableignored){existedE}longtruncatePosexistedEntry?entry。getPos()entry。getSize():entry。getPos();if(truncatePos!dataFileList。getMaxWrotePosition()){logger。warn(〔TRUNCATE〕leaderId{}index{}truncatePos{}!maxPos{},thisisusuallyhappenedontheoldleader,leaderId,entry。getIndex(),truncatePos,dataFileList。getMaxWrotePosition());}对这个位置开始的数据发起一个截断dataFileList。truncateOffset(truncatePos);if(dataFileList。getMaxWrotePosition()!truncatePos){logger。warn(〔TRUNCATE〕rebuildfordatawrotePos:{}!truncatePos:{},dataFileList。getMaxWrotePosition(),truncatePos);PreConditions。check(dataFileList。rebuildWithPos(truncatePos),DLedgerResponseCode。DISKERROR,rebuilddatatruncatePosd,truncatePos);}修订数据文件mmapfiles的已经flush位置reviseDataFileListFlushedWhere(truncatePos);if(!existedEntry){longdataPosdataFileList。append(dataBuffer。array(),0,dataBuffer。remaining());PreConditions。check(dataPosentry。getPos(),DLedgerResponseCode。DISKERROR,d!d,dataPos,entry。getPos());}数据文件做了一个截断,索引文件也需要做一个截断longtruncateIndexOffsetentry。getIndex()INDEXUNITSIZE;indexFileList。truncateOffset(truncateIndexOffset);if(indexFileList。getMaxWrotePosition()!truncateIndexOffset){logger。warn(〔TRUNCATE〕rebuildforindexwrotePos:{}!truncatePos:{},indexFileList。getMaxWrotePosition(),truncateIndexOffset);PreConditions。check(indexFileList。rebuildWithPos(truncateIndexOffset),DLedgerResponseCode。DISKERROR,rebuildindextruncatePosd,truncateIndexOffset);}reviseIndexFileListFlushedWhere(truncateIndexOffset);DLedgerEntryCoder。encodeIndex(entry。getPos(),entrySize,entry。getMagic(),entry。getIndex(),entry。getTerm(),indexBuffer);longindexPosindexFileList。append(indexBuffer。array(),0,indexBuffer。remaining(),false);PreConditions。check(indexPosentry。getIndex()INDEXUNITSIZE,DLedgerResponseCode。DISKERROR,null);ledgerEndTermentry。getTerm();ledgerEndIndexentry。getIndex();reviseLedgerBeginIndex();updateLedgerEndIndexAndTerm();returnentry。getIndex();}}9、以follower身份追加数据以follower身份追加数据OverridepublicDLedgerEntryappendAsFollower(DLedgerEntryentry,longleaderTerm,StringleaderId){PreConditions。check(memberState。isFollower(),DLedgerResponseCode。NOTFOLLOWER,roles,memberState。getRole());PreConditions。check(!isDiskFull,DLedgerResponseCode。DISKFULL);数据缓冲区和索引缓冲区ByteBufferdataBufferlocalEntryBuffer。get();ByteBufferindexBufferlocalIndexBuffer。get();DLedgerEntryCoder。encode(entry,dataBuffer);intentrySizedataBuffer。remaining();synchronized(memberState){PreConditions。check(memberState。isFollower(),DLedgerResponseCode。NOTFOLLOWER,roles,memberState。getRole());longnextIndexledgerEndIndex1;PreConditions。check(nextIndexentry。getIndex(),DLedgerResponseCode。INCONSISTENTINDEX,null);PreConditions。check(leaderTermmemberState。currTerm(),DLedgerResponseCode。INCONSISTENTTERM,null);PreConditions。check(leaderId。equals(memberState。getLeaderId()),DLedgerResponseCode。INCONSISTENTLEADER,null);在指定位置里追加数据进去longdataPosdataFileList。append(dataBuffer。array(),0,dataBuffer。remaining());PreConditions。check(dataPosentry。getPos(),DLedgerResponseCode。DISKERROR,d!d,dataPos,entry。getPos());DLedgerEntryCoder。encodeIndex(dataPos,entrySize,entry。getMagic(),entry。getIndex(),entry。getTerm(),indexBuffer);追加索引数据longindexPosindexFileList。append(indexBuffer。array(),0,indexBuffer。remaining(),false);PreConditions。check(indexPosentry。getIndex()INDEXUNITSIZE,DLedgerResponseCode。DISKERROR,null);ledgerEndTermentry。getTerm();ledgerEndIndexentry。getIndex();if(ledgerBeginIndex1){ledgerBeginIndexledgerEndI}更新结尾index和termupdateLedgerEndIndexAndTerm();}}10、根据索引去查询一个数据条目根据索引去查询一个数据条目OverridepublicDLedgerEntryget(Longindex){indexCheck(index);定义好索引内存缓冲片段和数据内存缓冲片段SelectMmapBufferResultindexSSelectMmapBufferResultdataStry{直接通过索引文件mmapfiles去查询数据index本身其实第几条索引,每一条索引可以是一个单元是有自己大小,所以定位索引的偏移量indexunitsize,从那个位置开始读取unitsize大小的一条数据indexSbrindexFileList。getData(indexINDEXUNITSIZE,INDEXUNITSIZE);PreConditions。check(indexSbr!nullindexSbr。getByteBuffer()!null,DLedgerResponseCode。DISKERROR,Getnullindexford,index);indexSbr。getByteBuffer()。getInt();magiclongposindexSbr。getByteBuffer()。getLong();intsizeindexSbr。getByteBuffer()。getInt();根据数据位置和大小,再次从数据文件mmapfiles里面读取出来一条数据就可以了dataSbrdataFileList。getData(pos,size);PreConditions。check(dataSbr!nulldataSbr。getByteBuffer()!null,DLedgerResponseCode。DISKERROR,Getnulldataford,index);把这个数据做一个解码DLedgerEntrydLedgerEntryDLedgerEntryCoder。decode(dataSbr。getByteBuffer());PreConditions。check(posdLedgerEntry。getPos(),DLedgerResponseCode。DISKERROR,d!d,pos,dLedgerEntry。getPos());returndLedgerE}finally{把之前读取的索引和数据的缓冲片段做一个释放SelectMmapBufferResult。release(indexSbr);SelectMmapBufferResult。release(dataSbr);}}11、数据存储格式及编码 日志条目 DLedgerEntryCoderencode()编码paramentry日志条目parambyteBuffer缓冲区publicstaticvoidencode(DLedgerEntryentry,ByteBufferbyteBuffer){byteBuffer。clear();intsizeentry。computeSizeInBytes();alwaysputmagiconthefirstposition魔数,4字节byteBuffer。putInt(entry。getMagic());条目总长度,包含Header(协议头)消息体,占4字节byteBuffer。putInt(size);当前条目的index,占8字节byteBuffer。putLong(entry。getIndex());当前条目所属的投票轮次,占8字节byteBuffer。putLong(entry。getTerm());该条目的物理偏移量,类似于commitlog文件的物理偏移量,占8字节byteBuffer。putLong(entry。getPos());保留字段,当前版本未使用,占4字节byteBuffer。putInt(entry。getChannel());当前版本未使用,占4字节byteBuffer。putInt(entry。getChainCrc());body的CRC校验和,用来区分数据是否损坏,占4字节。byteBuffer。putInt(entry。getBodyCrc());用来存储body的长度,占4个字节。byteBuffer。putInt(entry。getBody()。length);具体消息的内容。byteBuffer。put(entry。getBody());byteBuffer。flip();} 日志索引 DLedgerEntryCoderencodeIndex()日志索引编码parampos日志条目在文件的偏移量paramsize条目大小parammagic魔数paramindex索引paramterm投票轮次parambyteBuffer缓冲区publicstaticvoidencodeIndex(longpos,intsize,intmagic,longindex,longterm,ByteBufferbyteBuffer){byteBuffer。clear();魔数,4字节byteBuffer。putInt(magic);日志条目在文件的偏移量,8字节byteBuffer。putLong(pos);条目大小,4字节byteBuffer。putInt(size);日志条目索引,8字节byteBuffer。putLong(index);投票轮次,8字节byteBuffer。putLong(term);byteBuffer。flip();}