SnapshotSplitReader。submitSplit方法publicvoidsubmitSplit(MySqlSplitmySqlSplit){this。currentSnapshotSplitmySqlSplit。asSnapshotSplit();statefulTaskContext。configure(currentSnapshotSplit);拿到context的queue,在pollSplitSrecords的时候需要this。queuestatefulTaskContext。getQueue();this。nameAdjusterstatefulTaskContext。getSchemaNameAdjuster();this。hasNextElement。set(true);this。reachEnd。set(false);主要读取逻辑在readTask中this。splitSnapshotReadTasknewMySqlSnapshotSplitReadTask(statefulTaskContext。getConnectorConfig(),statefulTaskContext。getOffsetContext(),statefulTaskContext。getSnapshotChangeEventSourceMetrics(),statefulTaskContext。getDatabaseSchema(),statefulTaskContext。getConnection(),statefulTaskContext。getDispatcher(),statefulTaskContext。getTopicSelector(),StatefulTaskContext。getClock(),currentSnapshotSplit);提交一个runnable到线程中,主要是执行readTask的execute方法executor。submit((){try{currentTaskR自己实现的contextImpl主要记录高水位和低水位用finalSnapshotSplitChangeEventSourceContextImplsourceContextnewSnapshotSplitChangeEventSourceContextImpl();执行readTaskSnapshotResultsnapshotResultsplitSnapshotReadTask。execute(sourceContext);finalMySqlBinlogSplitbackfillBinlogSplitcreateBackfillBinlogSplit(sourceContext);optimizationthatskipthebinlogreadwhenthelowwatermarkequalshighwatermark如由于snapshot是并行读取的,所以当该读取该split的数据,低水位和高水位相同,说明在read数据中没有出现其他操作,所以可以退出binlog优化阶段,可以认为该split范围的数据没有变更,不需要在snapshot之后进行binlog的readfinalbooleanbinlogBackfillRequiredbackfillBinlogSplit。getEndingOffset()。isAfter(backfillBinlogSplit。getStartingOffset());if(!binlogBackfillRequired){dispatchHighWatermark(backfillBinlogSplit);currentTaskR}snapshot执行完成后,开始binlogReadTask的读取操作if(snapshotResult。isCompletedOrSkipped()){根据snapshotreadtask读取结束后,会记录高低水位,水位线作为参数构建binlogreadtaskfinalMySqlBinlogSplitReadTaskbackfillBinlogReadTaskcreateBackfillBinlogReadTask(backfillBinlogSplit);执行binlogreadtask,由于里面的处理逻辑太复杂了,我们就不直接进行阅读了我这里直接简单介绍一下流程,就是拿到snapshot的高水位,作为endOffset,在binlogreadtask中,会以endOffset作为结束条件,小宇endOffset的数据都会被read,并发送下游backfillBinlogReadTask。execute(newSnapshotBinlogSplitChangeEventSourceContextImpl());}else{readExceptionnewIllegalStateException(String。format(Readsnapshotformysqlsplitsfail,currentSnapshotSplit));}}catch(Exceptione){currentTaskRLOG。error(String。format(Executesnapshotreadtaskformysqlsplitsfail,currentSnapshotSplit),e);readE}});}MySqlSnapshotSplitReadTask。execute(sourceContext)方法OverridepublicSnapshotResultexecute(ChangeEventSourceContextcontext)throwsInterruptedException{SnapshottingTasksnapshottingTaskgetSnapshottingTask(previousOffset);就是new了一个finalSnapshotCtry{ctxprepare(context);重新new了一个context对象,比较无用}catch(Exceptione){LOG。error(Failedtoinitializesnapshotcontext。,e);thrownewRuntimeException(e);}try{上面都是无用代码,这里直接调用了doExecute方法,我们进入该方法看主要逻辑即可returndoExecute(context,ctx,snapshottingTask);}catch(InterruptedExceptione){LOG。warn(Snapshotwasinterruptedbeforecompletion);}catch(Exceptiont){thrownewDebeziumException(t);}}MySqlSnapshotSplitReadTask。doExecute(sourceContext)方法OverrideprotectedSnapshotResultdoExecute(ChangeEventSourceContextcontext,SnapshotContextsnapshotContext,SnapshottingTasksnapshottingTask)throwsException{finalRelationalSnapshotChangeEventSource。RelationalSnapshotContextctx(RelationalSnapshotChangeEventSource。RelationalSnapshotContext)snapshotCctx。offsetoffsetC一个dispatcher,用于记录水位线事件,后面会通过该dispatcher发射数据,当然是通过emitter发射了finalSignalEventDispatchersignalEventDispatchernewSignalEventDispatcher(offsetContext。getPartition(),topicSelector。topicNameFor(snapshotSplit。getTableId()),dispatcher。getQueue());其实log输出的ahrefhttps:www。bs178。comrizhitargetblankclassinfotextkey日志a就已经很清晰了记录低水位finalBinlogOffsetlowWatermarkcurrentBinlogOffset(jdbcConnection);LOG。info(Snapshotstep1Determininglowwatermark{}forsplit{},lowWatermark,snapshotSplit);((SnapshotSplitReader。SnapshotSplitChangeEventSourceContextImpl)(context))。setLowWatermark(lowWatermark);signalEventDispatcher。dispatchWatermarkEvent(snapshotSplit,lowWatermark,SignalEventDispatcher。WatermarkKind。LOW);LOG。info(Snapshotstep2Snapshottingdata);读取数据主要方法重点介绍的地方createDataEvents(ctx,snapshotSplit。getTableId());记录高水位finalBinlogOffsethighWatermarkcurrentBinlogOffset(jdbcConnection);LOG。info(Snapshotstep3Determininghighwatermark{}forsplit{},highWatermark,snapshotSplit);signalEventDispatcher。dispatchWatermarkEvent(snapshotSplit,highWatermark,SignalEventDispatcher。WatermarkKind。HIGH);((SnapshotSplitReader。SnapshotSplitChangeEventSourceContextImpl)(context))。setHighWatermark(highWatermark);returnSnapshotResult。completed(ctx。offset);}我们看看createDataEvents调用过程privatevoidcreateDataEvents(RelationalSnapshotChangeEventSource。RelationalSnapshotContextsnapshotContext,TableIdtableId)throwsException{EventDispatcher。SnapshotReceiversnapshotReceiverdispatcher。getSnapshotChangeEventReceiver();LOG。debug(Snapshottingtable{},tableId);createDataEventsForTable(snapshotContext,snapshotReceiver,databaseSchema。tableFor(tableId));receiver的逻辑我们就不看了,我这里介绍一下就好receiver通过changeRecord方法接收到数据后,通过一个成员变量(bufferedEvent)控制,如果!null加入队列,然后创建一个新的SourceRecord,直到所有的数据读取完成,所以说最后一条数据创建成功之后,如果没有新的数据了,则不会调用changeRecord该方法,也就是说成员变量记录了最后一个record这里调用completeSnapshot方法的时候会对bufferedEvent变量进行判断,如果不等于null做一些complete相关的工作最后加入队列中,如果不调用该方法,则当前split的snapshot阶段读取的数据少了一条,嘻嘻嘻snapshotReceiver。completeSnapshot();}createDataEvents中调用到本类的createDataEventsForTable,也就是开始了具体读取逻辑privatevoidcreateDataEventsForTable(RelationalSnapshotChangeEventSource。RelationalSnapshotContextsnapshotContext,EventDispatcher。SnapshotReceiversnapshotReceiver,Tabletable)throwsInterruptedException{longexportStartclock。currentTimeInMillis();LOG。info(Exportingdatafromsplit{}oftable{},snapshotSplit。splitId(),table。id());构建sqlfinalStringselectSqlStatementUtils。buildSplitScanQuery(snapshotSplit。getTableId(),snapshotSplit。getSplitKeyType(),snapshotSplit。getSplitStart()null,snapshotSplit。getSplitEnd()null);LOG。info(Forsplit{}oftable{}usingselectstatement:{},snapshotSplit。splitId(),table。id(),selectSql);try(PreparedStatementselectStatementStatementUtils。readTableSplitDataStatement(创建statement,然后查询sqljdbcConnection,selectSql,snapshotSplit。getSplitStart()null,snapshotSplit。getSplitEnd()null,snapshotSplit。getSplitStart(),snapshotSplit。getSplitEnd(),snapshotSplit。getSplitKeyType()。getFieldCount(),connectorConfig。getQueryFetchSize());然后对查询出来的数据进行封装成sourceRecord发送下游ResultSetrsselectStatement。executeQuery()){ColumnUtils。ColumnArraycolumnArrayColumnUtils。toArray(rs,table);longrows0;Threads。TimerlogTimergetTableScanLogTimer();while(rs。next()){finalObject〔〕rownewObject〔columnArray。getGreatestColumnPosition()〕;for(inti0;icolumnArray。getColumns()。i){ColumnactualColumntable。columns()。get(i);row〔columnArray。getColumns()〔i〕。position()1〕readField(rs,i1,actualColumn,table);}if(logTimer。expired()){longstopclock。currentTimeInMillis();LOG。info(Exported{}recordsforsplit{}after{},rows,snapshotSplit。splitId(),Strings。duration(stopexportStart));snapshotProgressListener。rowsScanned(table。id(),rows);logTimergetTableScanLogTimer();}这里会将数据放入队列,通过receiver接收数据,然后再将数据放入其队列的一个过程,其实不必深入,就是封装的比较好,难以理解dispatcher。dispatchSnapshotEvent(table。id(),getChangeRecordEmitter(snapshotContext,table。id(),row),就是new了一个snapshotReceiver);}LOG。info(Finishedexporting{}recordsforsplit{},totalduration{},rows,snapshotSplit。splitId(),Strings。duration(clock。currentTimeInMillis()exportStart));}catch(SQLExceptione){thrownewConnectException(Snapshottingoftabletable。id()failed,e);}}dispatcher。dispatchSnapshotEvent方法之后的流程进入evnentDisptcher。dispatchSnapshotEvent方法publicvoiddispatchSnapshotEvent(TdataCollectionId,ChangeRecordEmitterchangeRecordEmitter,SnapshotReceiverreceiver)throwsInterruptedException{DataCollectionSchemadataCollectionSchemaschema。schemaFor(dataCollectionId);if(dataCollectionSchemanull){errorOnMissingSchema(dataCollectionId,changeRecordEmitter);}changeRecordEmitter。emitChangeRecords(dataCollectionSchema,newReceiver(){OverridepublicvoidchangeRecord(DataCollectionSchemaschema,Operationoperation,Objectkey,Structvalue,OffsetContextoffset,ConnectHeadersheaders)throwsInterruptedException{eventListener。onEvent(dataCollectionSchema。id(),offset,key,value);真正的放入队列的逻辑在这里调用receiver使我们传入的对应BufferingSnapshotChangeRecordReceiver类receiver。changeRecord(dataCollectionSchema,operation,key,value,offset,headers);}});}BufferingSnapshotChangeRecordReceiver的changeRecord方法前面简单介绍过他的处理逻辑了,就不必多做介绍了OverridepublicvoidchangeRecord(DataCollectionSchemadataCollectionSchema,Operationoperation,Objectkey,Structvalue,OffsetContextoffsetContext,ConnectHeadersheaders)throwsInterruptedException{Objects。requireNonNull(value,valuemustnotbenull);LOGGER。trace(Receivedchangerecordfor{}operationonkey{},operation,key);if(bufferedEvent!null){queue。enqueue(bufferedEvent。get());}SchemakeySchemadataCollectionSchema。keySchema();StringtopicNametopicSelector。topicNameFor((T)dataCollectionSchema。id());therecordisproducedlazily,sotohavethecorrectoffsetaspertheprepostcompletioncallbacksbufferedEvent(){SourceRecordrecordnewSourceRecord(offsetContext。getPartition(),offsetContext。getOffset(),topicName,null,keySchema,key,dataCollectionSchema。getEnvelopeSchema()。schema(),value,null,headers);returnchangeEventCreator。createDataChangeEvent(record);};}