安庆大理运城常德铜陵江西
投稿投诉
江西南阳
嘉兴昆明
铜陵滨州
广东西昌
常德梅州
兰州阳江
运城金华
广西萍乡
大理重庆
诸暨泉州
安庆南充
武汉辽宁

万字长文Flinkcdc源码精讲(推荐收藏)(四)

5月21日 渡缘祠投稿
  SnapshotSplitReader。submitSplit方法publicvoidsubmitSplit(MySqlSplitmySqlSplit){this。currentSnapshotSplitmySqlSplit。asSnapshotSplit();statefulTaskContext。configure(currentSnapshotSplit);拿到context的queue,在pollSplitSrecords的时候需要this。queuestatefulTaskContext。getQueue();this。nameAdjusterstatefulTaskContext。getSchemaNameAdjuster();this。hasNextElement。set(true);this。reachEnd。set(false);主要读取逻辑在readTask中this。splitSnapshotReadTasknewMySqlSnapshotSplitReadTask(statefulTaskContext。getConnectorConfig(),statefulTaskContext。getOffsetContext(),statefulTaskContext。getSnapshotChangeEventSourceMetrics(),statefulTaskContext。getDatabaseSchema(),statefulTaskContext。getConnection(),statefulTaskContext。getDispatcher(),statefulTaskContext。getTopicSelector(),StatefulTaskContext。getClock(),currentSnapshotSplit);提交一个runnable到线程中,主要是执行readTask的execute方法executor。submit((){try{currentTaskR自己实现的contextImpl主要记录高水位和低水位用finalSnapshotSplitChangeEventSourceContextImplsourceContextnewSnapshotSplitChangeEventSourceContextImpl();执行readTaskSnapshotResultsnapshotResultsplitSnapshotReadTask。execute(sourceContext);finalMySqlBinlogSplitbackfillBinlogSplitcreateBackfillBinlogSplit(sourceContext);optimizationthatskipthebinlogreadwhenthelowwatermarkequalshighwatermark如由于snapshot是并行读取的,所以当该读取该split的数据,低水位和高水位相同,说明在read数据中没有出现其他操作,所以可以退出binlog优化阶段,可以认为该split范围的数据没有变更,不需要在snapshot之后进行binlog的readfinalbooleanbinlogBackfillRequiredbackfillBinlogSplit。getEndingOffset()。isAfter(backfillBinlogSplit。getStartingOffset());if(!binlogBackfillRequired){dispatchHighWatermark(backfillBinlogSplit);currentTaskR}snapshot执行完成后,开始binlogReadTask的读取操作if(snapshotResult。isCompletedOrSkipped()){根据snapshotreadtask读取结束后,会记录高低水位,水位线作为参数构建binlogreadtaskfinalMySqlBinlogSplitReadTaskbackfillBinlogReadTaskcreateBackfillBinlogReadTask(backfillBinlogSplit);执行binlogreadtask,由于里面的处理逻辑太复杂了,我们就不直接进行阅读了我这里直接简单介绍一下流程,就是拿到snapshot的高水位,作为endOffset,在binlogreadtask中,会以endOffset作为结束条件,小宇endOffset的数据都会被read,并发送下游backfillBinlogReadTask。execute(newSnapshotBinlogSplitChangeEventSourceContextImpl());}else{readExceptionnewIllegalStateException(String。format(Readsnapshotformysqlsplitsfail,currentSnapshotSplit));}}catch(Exceptione){currentTaskRLOG。error(String。format(Executesnapshotreadtaskformysqlsplitsfail,currentSnapshotSplit),e);readE}});}MySqlSnapshotSplitReadTask。execute(sourceContext)方法OverridepublicSnapshotResultexecute(ChangeEventSourceContextcontext)throwsInterruptedException{SnapshottingTasksnapshottingTaskgetSnapshottingTask(previousOffset);就是new了一个finalSnapshotCtry{ctxprepare(context);重新new了一个context对象,比较无用}catch(Exceptione){LOG。error(Failedtoinitializesnapshotcontext。,e);thrownewRuntimeException(e);}try{上面都是无用代码,这里直接调用了doExecute方法,我们进入该方法看主要逻辑即可returndoExecute(context,ctx,snapshottingTask);}catch(InterruptedExceptione){LOG。warn(Snapshotwasinterruptedbeforecompletion);}catch(Exceptiont){thrownewDebeziumException(t);}}MySqlSnapshotSplitReadTask。doExecute(sourceContext)方法OverrideprotectedSnapshotResultdoExecute(ChangeEventSourceContextcontext,SnapshotContextsnapshotContext,SnapshottingTasksnapshottingTask)throwsException{finalRelationalSnapshotChangeEventSource。RelationalSnapshotContextctx(RelationalSnapshotChangeEventSource。RelationalSnapshotContext)snapshotCctx。offsetoffsetC一个dispatcher,用于记录水位线事件,后面会通过该dispatcher发射数据,当然是通过emitter发射了finalSignalEventDispatchersignalEventDispatchernewSignalEventDispatcher(offsetContext。getPartition(),topicSelector。topicNameFor(snapshotSplit。getTableId()),dispatcher。getQueue());其实log输出的ahrefhttps:www。bs178。comrizhitargetblankclassinfotextkey日志a就已经很清晰了记录低水位finalBinlogOffsetlowWatermarkcurrentBinlogOffset(jdbcConnection);LOG。info(Snapshotstep1Determininglowwatermark{}forsplit{},lowWatermark,snapshotSplit);((SnapshotSplitReader。SnapshotSplitChangeEventSourceContextImpl)(context))。setLowWatermark(lowWatermark);signalEventDispatcher。dispatchWatermarkEvent(snapshotSplit,lowWatermark,SignalEventDispatcher。WatermarkKind。LOW);LOG。info(Snapshotstep2Snapshottingdata);读取数据主要方法重点介绍的地方createDataEvents(ctx,snapshotSplit。getTableId());记录高水位finalBinlogOffsethighWatermarkcurrentBinlogOffset(jdbcConnection);LOG。info(Snapshotstep3Determininghighwatermark{}forsplit{},highWatermark,snapshotSplit);signalEventDispatcher。dispatchWatermarkEvent(snapshotSplit,highWatermark,SignalEventDispatcher。WatermarkKind。HIGH);((SnapshotSplitReader。SnapshotSplitChangeEventSourceContextImpl)(context))。setHighWatermark(highWatermark);returnSnapshotResult。completed(ctx。offset);}我们看看createDataEvents调用过程privatevoidcreateDataEvents(RelationalSnapshotChangeEventSource。RelationalSnapshotContextsnapshotContext,TableIdtableId)throwsException{EventDispatcher。SnapshotReceiversnapshotReceiverdispatcher。getSnapshotChangeEventReceiver();LOG。debug(Snapshottingtable{},tableId);createDataEventsForTable(snapshotContext,snapshotReceiver,databaseSchema。tableFor(tableId));receiver的逻辑我们就不看了,我这里介绍一下就好receiver通过changeRecord方法接收到数据后,通过一个成员变量(bufferedEvent)控制,如果!null加入队列,然后创建一个新的SourceRecord,直到所有的数据读取完成,所以说最后一条数据创建成功之后,如果没有新的数据了,则不会调用changeRecord该方法,也就是说成员变量记录了最后一个record这里调用completeSnapshot方法的时候会对bufferedEvent变量进行判断,如果不等于null做一些complete相关的工作最后加入队列中,如果不调用该方法,则当前split的snapshot阶段读取的数据少了一条,嘻嘻嘻snapshotReceiver。completeSnapshot();}createDataEvents中调用到本类的createDataEventsForTable,也就是开始了具体读取逻辑privatevoidcreateDataEventsForTable(RelationalSnapshotChangeEventSource。RelationalSnapshotContextsnapshotContext,EventDispatcher。SnapshotReceiversnapshotReceiver,Tabletable)throwsInterruptedException{longexportStartclock。currentTimeInMillis();LOG。info(Exportingdatafromsplit{}oftable{},snapshotSplit。splitId(),table。id());构建sqlfinalStringselectSqlStatementUtils。buildSplitScanQuery(snapshotSplit。getTableId(),snapshotSplit。getSplitKeyType(),snapshotSplit。getSplitStart()null,snapshotSplit。getSplitEnd()null);LOG。info(Forsplit{}oftable{}usingselectstatement:{},snapshotSplit。splitId(),table。id(),selectSql);try(PreparedStatementselectStatementStatementUtils。readTableSplitDataStatement(创建statement,然后查询sqljdbcConnection,selectSql,snapshotSplit。getSplitStart()null,snapshotSplit。getSplitEnd()null,snapshotSplit。getSplitStart(),snapshotSplit。getSplitEnd(),snapshotSplit。getSplitKeyType()。getFieldCount(),connectorConfig。getQueryFetchSize());然后对查询出来的数据进行封装成sourceRecord发送下游ResultSetrsselectStatement。executeQuery()){ColumnUtils。ColumnArraycolumnArrayColumnUtils。toArray(rs,table);longrows0;Threads。TimerlogTimergetTableScanLogTimer();while(rs。next()){finalObject〔〕rownewObject〔columnArray。getGreatestColumnPosition()〕;for(inti0;icolumnArray。getColumns()。i){ColumnactualColumntable。columns()。get(i);row〔columnArray。getColumns()〔i〕。position()1〕readField(rs,i1,actualColumn,table);}if(logTimer。expired()){longstopclock。currentTimeInMillis();LOG。info(Exported{}recordsforsplit{}after{},rows,snapshotSplit。splitId(),Strings。duration(stopexportStart));snapshotProgressListener。rowsScanned(table。id(),rows);logTimergetTableScanLogTimer();}这里会将数据放入队列,通过receiver接收数据,然后再将数据放入其队列的一个过程,其实不必深入,就是封装的比较好,难以理解dispatcher。dispatchSnapshotEvent(table。id(),getChangeRecordEmitter(snapshotContext,table。id(),row),就是new了一个snapshotReceiver);}LOG。info(Finishedexporting{}recordsforsplit{},totalduration{},rows,snapshotSplit。splitId(),Strings。duration(clock。currentTimeInMillis()exportStart));}catch(SQLExceptione){thrownewConnectException(Snapshottingoftabletable。id()failed,e);}}dispatcher。dispatchSnapshotEvent方法之后的流程进入evnentDisptcher。dispatchSnapshotEvent方法publicvoiddispatchSnapshotEvent(TdataCollectionId,ChangeRecordEmitterchangeRecordEmitter,SnapshotReceiverreceiver)throwsInterruptedException{DataCollectionSchemadataCollectionSchemaschema。schemaFor(dataCollectionId);if(dataCollectionSchemanull){errorOnMissingSchema(dataCollectionId,changeRecordEmitter);}changeRecordEmitter。emitChangeRecords(dataCollectionSchema,newReceiver(){OverridepublicvoidchangeRecord(DataCollectionSchemaschema,Operationoperation,Objectkey,Structvalue,OffsetContextoffset,ConnectHeadersheaders)throwsInterruptedException{eventListener。onEvent(dataCollectionSchema。id(),offset,key,value);真正的放入队列的逻辑在这里调用receiver使我们传入的对应BufferingSnapshotChangeRecordReceiver类receiver。changeRecord(dataCollectionSchema,operation,key,value,offset,headers);}});}BufferingSnapshotChangeRecordReceiver的changeRecord方法前面简单介绍过他的处理逻辑了,就不必多做介绍了OverridepublicvoidchangeRecord(DataCollectionSchemadataCollectionSchema,Operationoperation,Objectkey,Structvalue,OffsetContextoffsetContext,ConnectHeadersheaders)throwsInterruptedException{Objects。requireNonNull(value,valuemustnotbenull);LOGGER。trace(Receivedchangerecordfor{}operationonkey{},operation,key);if(bufferedEvent!null){queue。enqueue(bufferedEvent。get());}SchemakeySchemadataCollectionSchema。keySchema();StringtopicNametopicSelector。topicNameFor((T)dataCollectionSchema。id());therecordisproducedlazily,sotohavethecorrectoffsetaspertheprepostcompletioncallbacksbufferedEvent(){SourceRecordrecordnewSourceRecord(offsetContext。getPartition(),offsetContext。getOffset(),topicName,null,keySchema,key,dataCollectionSchema。getEnvelopeSchema()。schema(),value,null,headers);returnchangeEventCreator。createDataChangeEvent(record);};}
投诉 评论 转载

万字长文Flinkcdc源码精讲(推荐收藏)(四)SnapshotSplitReader。submitSplit方法publicvoidsubmitSplit(MySqlSplitmySqlSplit){this。curren……大张侃球今日世界杯比赛推荐实单周一053世界杯日本VS克罗地亚日本上一场击败西班牙获得小组第一,积6分小组第一出线,谁能想到死亡之组居然是日本第一,德国出局,西班牙惊险排第二呢。如果不是森保一大意,对……早安只要用心生活,你想要的都会如期来临〔太阳〕早安今天〔太阳〕今天是2022年10月12日,星期三,农历九月十七每一个最美好的日子就是今天,不虚度时光,不为未来感到焦虑,过好每一天的日子,……刘强东三个孩子四个女人,每一个都影响了他的人生说到刘强东想必大家都不陌生,这位坐拥京东商业帝国的企业家,前几年选择了比他年轻19岁的奶茶妹妹章泽天作为人生伴侣,引发了不少热议。其实除了章泽天,刘强东生命中还有另外3个女人和……为什么男生喜欢看女生穿裙子?网友这些理由或许足够了前言:对于大多数女生来说,总觉得自己的衣橱里缺少一件衣服,尤其是出门的时候,挑一件合适的衣服需要浪费很长的时间,但是在一些大部分场合,女生都可以穿上裙子,看起来特别的优雅,又能……苹果要做AirPods青春版?复制iPhoneSE的案例在2016年秋季的苹果新品发布会上,除了万众瞩目的iPhone7系列外,当时同台亮相的AirPods似乎并不起眼。但在随后的日子里,AirPods让TrueWirelessSt……12月18最新漂亮的早上好图片,温馨早晨问候祝福语,朋友圈早早安只要来日可期,今天就值得欣喜早上好!祝你,不管晴天阴天雨天,开心就是晴朗的一天;不管昨天今天明天,能豁然开朗的就是美好的一天!生活不简单,尽量简单过,早安……选购洗碗机,不能忽视这7个顾虑,不是唬人,是的确存在头条创作挑战赛随着人们对生活品质追求得越来越高,一些新型的电器也逐渐流行起来。其中,家用电器中的洗碗机,就是一个典型的例子。洗碗机,可以解放我们的双手,没有了……手机连上WiFi后,该不该关闭移动数据?中国移动给出了答案自从手机走进了我们的生活,改变了人们的沟通方式,从最初的打电话、发短信发展到现在,手机的功能已经发生了重大的变化,不但可以购物、追剧、上网、还可以视频电话、手机支付等等,这是以……龙赛罗皇马球迷想抽到利物浦或巴黎我们可以梦想夺第15冠直播吧11月3日讯近日,阿斯主编同时也是皇马死忠龙赛罗撰文表示,皇马球迷希望在欧冠16强比赛的抽签中抽到巴黎或利物浦。关于皇马晋级淘汰赛在迄今为止的31届欧冠比赛中……杨紫真好看越来越漂亮了杨紫这小姐姐一路走来不容易,现在越来越漂亮了,演技也是不断提高之中,是个低调的小仙女,非常的励志,加油杨紫!红毯上最靓的妞非你莫属!穿古装还是非常的好看的,真的是惊……NBA记者大爆料,因为杜兰特逼宫行为,NBA老板考虑停摆2022年休赛期没有预期中那般热闹,戈贝尔1换10略显荒唐,虽然交易震惊全联盟,但不足以影响整个NBA格局,唐斯爱德华兹戈贝尔的新三巨头组合,可以帮助狼队打进季后赛,但距离夺冠……
欧文19岁女儿不顾其父反对,断然参加爱情岛,采访对话很有趣vivoX90和vivoX80有什么区别,哪个更值得?消息称小鹏汽车计划自研电池,前宝马电池人才加盟负责又将是个冷门狂欢夜天赋异禀!美记在火箭队内一对一训练中,全队没人可以阻挡格林暨大光子技术研究院丁伟孙一之联合北京理工大学路翠翠北京大学胡马斯克再夸中国!电动汽车领先世界这是事实塞雷娜威廉姆斯穿着巴黎世家的披风指挥Vogue世界时装秀旭辉控股暴雷,早有征兆?美国多家大型科技公司宣布裁员区块链元宇宙等新科技概念成为养老诈骗的幌子王晗上任山东队迎重大改变,顶级外援顶替哈德森,陶汉林或将离队

友情链接:中准网聚热点快百科快传网快生活快软网快好知文好找七猫云易事利