安庆大理运城常德铜陵江西
投稿投诉
江西南阳
嘉兴昆明
铜陵滨州
广东西昌
常德梅州
兰州阳江
运城金华
广西萍乡
大理重庆
诸暨泉州
安庆南充
武汉辽宁

(3)sparkstreaming从kafka接入实时数据流

10月2日 夜如影投稿
  (1)sparkstreaming从kafka接入实时数据流最终实现数据可视化展示,我们先看下整体方案架构:
  (2)方案说明:
  1)我们通过kafka与各个业务系统的数据对接,将各系统中的数据实时接到
  2)通过sparkstreaming接入kafka数据流,定义时间窗口和计算窗口大小,业务计算逻辑处理;
  3)将结果数据写入到
  4)通过可视化平台接入mysql数据库,这里使用的是NBI大数据可视化构建平台;
  5)在平台上通过拖拽式构建各种数据应用,数据展示;
  (3)代码演示:
  定义一个kafka生产者,模拟数据源packagecom。importcom。alibaba。fastjson。JSONOimportcom。pojo。WaterSimportorg。apache。kafka。clients。producer。KafkaPimportorg。apache。kafka。clients。producer。ProducerRimportorg。apache。kafka。clients。producer。RecordMimportjava。util。Pimportjava。util。RCreatedbyljon20220718。publicclassKafakaProducer{publicfinalstaticStringbootstrapServers127。0。0。1:9092;publicstaticvoidmain(String〔〕args){PropertiespropsnewProperties();设置Kafka服务器地址props。put(bootstrap。servers,bootstrapServers);设置数据key的序列化处理类props。put(key。serializer,org。apache。kafka。common。serialization。StringSerializer);设置数据value的序列化处理类props。put(value。serializer,org。apache。kafka。common。serialization。StringSerializer);KafkaProducerString,StringproducernewKafkaProducer(props);try{inti0;RandomrnewRandom();String〔〕lang{flink,spark,hadoop,hive,hbase,impala,presto,superset,nbi};while(true){Thread。sleep(2000);WaterSensorwaterSensornewWaterSensor(lang〔r。nextInt(lang。length)〕kafka,i,i);i;StringmsgJSONObject。toJSONString(waterSensor);System。out。println(msg);RecordMetadatarecordMetadataproducer。send(newProducerRecord(kafkadatawaterSensor,null,null,msg))。get();System。out。println(recordMetadata:{recordMetadata});}}catch(Exceptione){System。out。println(e。getMessage());}}}
  根据业务需要,定义各种消息对象packagecom。importjava。io。Simportjava。util。DCreatedbyljon20220713。publicclassWaterSensorimplementsSerializable{publicSpublicWaterSensor(){}publicWaterSensor(Stringid,longts,intvc){this。this。this。}publicintgetVc(){}publicvoidsetVc(intvc){this。}publicStringgetId(){}publicvoidsetId(Stringid){this。}publiclonggetTs(){}publicvoidsetTs(longts){this。}}
  sparkstreaming数据流计算packagecom。importcom。alibaba。fastjson。JSONOimportcom。pojo。WaterSimportorg。apache。kafka。clients。consumer。ConsumerRimportorg。apache。kafka。common。TopicPimportorg。apache。spark。SparkCimportorg。apache。spark。api。java。JavaRDD;importorg。apache。spark。api。java。function。Fimportorg。apache。spark。api。java。function。VoidFunction2;importorg。apache。spark。sql。Dimportorg。apache。spark。sql。Rimportorg。apache。spark。sql。SparkSimportorg。apache。spark。streaming。Dimportorg。apache。spark。streaming。Timportorg。apache。spark。streaming。api。java。JavaDSimportorg。apache。spark。streaming。api。java。JavaInputDSimportorg。apache。spark。streaming。api。java。JavaReceiverInputDSimportorg。apache。spark。streaming。api。java。JavaStreamingCimportorg。apache。spark。streaming。kafka010。ConsumerSimportorg。apache。spark。streaming。kafka010。KafkaUimportorg。apache。spark。streaming。kafka010。LocationSimportjava。util。;Createdbyljon20220718。publicclassSparkSqlKafka{privatestaticStringappNamespark。streaming。privatestaticStringmasterlocal〔〕;privatestaticStringtopicskafkadatawaterSprivatestaticStringbrokers127。0。0。1:9092;publicstaticvoidmain(String〔〕args){初始化sparkConfSparkConfsparkConfnewSparkConf()。setMaster(master)。setAppName(appName);获得JavaStreamingContextJavaStreamingContextsscnewJavaStreamingContext(sparkConf,Durations。minutes(3));设置ahrefhttps:www。bs178。comrizhitargetblankclassinfotextkey日志a的级别:避免ahrefhttps:www。bs178。comrizhitargetblankclassinfotextkey日志a重复ssc。sparkContext()。setLogLevel(ERROR);CollectionStringtopicsSetnewHashSet(Arrays。asList(topics。split(,)));kafka相关参数,必要!缺了会报错MapString,ObjectkafkaParamsnewHashMap();kafkaParams。put(metadata。broker。list,brokers);kafkaParams。put(bootstrap。servers,brokers);kafkaParams。put(group。id,group1);kafkaParams。put(key。serializer,org。apache。kafka。common。serialization。StringSerializer);kafkaParams。put(key。deserializer,org。apache。kafka。common。serialization。StringDeserializer);kafkaParams。put(value。deserializer,org。apache。kafka。common。serialization。StringDeserializer);通过KafkaUtils。createDirectStream(。。。)获得kafka数据,kafka相关参数由kafkaParams指定JavaInputDStreamConsumerRecordObject,ObjectlinesKafkaUtils。createDirectStream(ssc,LocationStrategies。PreferConsistent(),ConsumerStrategies。Subscribe(topicsSet,kafkaParams));JavaDStreamWaterSensormapDStreamlines。map(newFunctionConsumerRecordObject,Object,WaterSensor(){OverridepublicWaterSensorcall(ConsumerRecordObject,Objects)throwsException{WaterSensorwaterSensorJSONObject。parseObject(s。value()。toString(),WaterSensor。class);returnwaterS}})。window(Durations。minutes(9),Durations。minutes(6));指定窗口大小和滑动频率必须是批处理时间的整数倍;mapDStream。foreachRDD(newVoidFunction2JavaRDDWaterSensor,Time(){Overridepublicvoidcall(JavaRDDWaterSensorwaterSensorJavaRDD,Timetime)throwsException{SparkSessionsparkJavaSparkSessionSingleton。getInstance(waterSensorJavaRDD。context()。getConf());DatasetRowdataFramespark。createDataFrame(waterSensorJavaRDD,WaterSensor。class);创建临时表dataFrame。createOrReplaceTempView(log);DatasetRowresultspark。sql(selectfromlog);System。out。println(time);输出前20条数据result。show();数据写入mysqlwriteDataToMysql(result);}});开始作业ssc。start();try{ssc。awaitTermination();}catch(Exceptione){e。printStackTrace();}finally{ssc。close();}}}
  NBI大数据可视化构建平台对接mysql,构建数据应用:
  NBI可视化
投诉 评论

一只令我心生连串问号的鸟儿我家门外有一棵树,树顶刚好比我二楼房间的窗台略高一些。此时正值暮春,树似乎生怕辜负了春的热情和希望,所以长得枝繁叶茂。花若盛开,蝴蝶自来。也许是树的繁茂华美,每年春天的清……关于睡眠的诸多解答,睡眠不好的一定要看看人的一生有三分之一的时间在睡眠中度过,可见睡眠这件事情有多重要。良好的睡眠是心身健康的保障,可以说,其他的任何一种方式都不能代替睡眠。但是,面对睡眠一些常见的问题,却都很……甘肃省十五运群众组中国象棋围棋桥牌比赛第二日赛况集锦6月3日是甘肃省第十五届运动会群众组中国象棋、围棋、桥牌比赛的第二天,赛场上依旧战况激烈焦灼。中国象棋进行了第三、四、五轮的比赛;围棋进行了第三、四轮的比拼。桥牌则进行男子团体……嘉兴博物馆与班布里市博物馆线上文化交流活动圆满完成【来源:嘉兴市文化广电旅游局文旅要闻】12月17日下午,嘉兴博物馆会议室很是热闹,中英文自如切换,还不时有笑声响起。原来,现场正在举行一场连线直播活动,连接的另一端是远在……1400万人次围观热议,近500部魅力短视频参赛!2022东近日,由东营市文化和旅游局主办,东营周刊承办,东营区文化和旅游局、河口区文化和旅游局、垦利区文化和旅游局、广饶县文化和旅游局、利津县文化和旅游局、东营经济技术开发区管委会综合部……(3)sparkstreaming从kafka接入实时数据流(1)sparkstreaming从kafka接入实时数据流最终实现数据可视化展示,我们先看下整体方案架构:(2)方案说明:1)我们通过kafka与各个业务系统的数……超越苦难,莫辜负父母给予的生命超越苦难,莫辜负父母给予的生命董连辉启一元复始,待四序更新。在这个悲风呼啸的寒冬,元旦暖阳无法带来温暖,随着慈母去世,我彻底成为没有根的孤儿游子。长城脚下,那座拥有……换新电池的特斯拉ModelY在美国上市,真比较好?根据外媒报道,日前美国特斯拉官网被发现有着疑似被称为StandardRangeAWD的ModelY车型代码,而且外媒指出该车型会采用4680电池组,等于搭载4680电池车型不再……虚增超9成利润模塑科技和高管集体收行政处罚来源:中国经济网模塑科技的信披违规案终于等来了处罚落地,12月27日晚间,模塑科技(000700。SZ)公告收到了行政处罚事先告知书。截至12月28日收盘,模塑科技股价为……肾病综合征,想要病情不反复,就要注意三点肾病综合征具有一定的复发率,有的病人复发,是因为疾病本身引起的,但也有部分患者的复发,是因为患者自身因素所导致的,肾病综合征反复发作,可能会加快疾病的进展,这也是很多患者所焦虑……电脑超简单之电脑如何远程开机ampampamp访问远程文件很多人肯定遇到过这样的问题,有时候需要访问家里电脑上的文件资料或者在家办公时需要访问公司电脑上的文件资料。这个时候如果可以远程开机并且访问到电脑上的文件资料那是一件多么好的事情……30万亿资产,迎来历史巨变,像极了2014年大牛市前夜前不久,高层对央企估值偏低的情况,发表了看法,认为这是市场的偏见,应该建立中国特色的估值体系。这番表态之后,中字头的央企股价大幅飙升,甚至连中国建筑这样万年乌龟行情的,都……
2022年夜饭菜单来了,精选12道家常菜,荤素搭配,家人都喜邓颖超去世,聂帅女儿聂力哭着扑向骨灰盒邓妈妈再让我吻你一次您想体验慢节奏的生活吗?您是否一直处于快节奏的生活里呢?十种机器学习算法的预测分析伏明霞父母陪女儿走过跳水生涯有欢乐有泪水,她的婚姻最不放心人间清醒悲喜自渡带上准考证!中高考生到北京这些景区门票减免!全国多地热门景区曼城成欧洲黑店!送走3位二流前锋,狂赚1。6亿,巴萨第一个喊人类有可能穿越未来?这位90后物理博士后冲上热搜中兴Axon40SE真机谍照偷跑屏下摄像头硬朗外观设计帮你准确了解什么是白大衣高血压博尔特的实力究竟到了哪一个层面?韩寒1988经典语录,韩寒青春语录口罩小姐女版193张晋雅魔音牧童笛喜感洋溢想找多点可能性天猫魔盒还有人买吗?改朝换代配置还香吗?来看看这几款吧优秀学生代表发言模板十二(最实用)支付宝怎么退哈罗单车的押金教你四招“智取”婆婆心植物在室内怎么增加光照彩虹摩天轮四年级作文羊奶沐浴露有哪些品牌柑橘鲜奶是孕期补钙的好食谱不妨试试吧比炒鞋还烧钱的盲盒,如何让人“中毒”1年花70万?水务局节水型机关建设工作总结

友情链接:中准网聚热点快百科快传网快生活快软网快好知文好找七猫云易事利