(1)sparkstreaming从kafka接入实时数据流最终实现数据可视化展示,我们先看下整体方案架构: (2)方案说明: 1)我们通过kafka与各个业务系统的数据对接,将各系统中的数据实时接到 2)通过sparkstreaming接入kafka数据流,定义时间窗口和计算窗口大小,业务计算逻辑处理; 3)将结果数据写入到 4)通过可视化平台接入mysql数据库,这里使用的是NBI大数据可视化构建平台; 5)在平台上通过拖拽式构建各种数据应用,数据展示; (3)代码演示: 定义一个kafka生产者,模拟数据源packagecom。importcom。alibaba。fastjson。JSONOimportcom。pojo。WaterSimportorg。apache。kafka。clients。producer。KafkaPimportorg。apache。kafka。clients。producer。ProducerRimportorg。apache。kafka。clients。producer。RecordMimportjava。util。Pimportjava。util。RCreatedbyljon20220718。publicclassKafakaProducer{publicfinalstaticStringbootstrapServers127。0。0。1:9092;publicstaticvoidmain(String〔〕args){PropertiespropsnewProperties();设置Kafka服务器地址props。put(bootstrap。servers,bootstrapServers);设置数据key的序列化处理类props。put(key。serializer,org。apache。kafka。common。serialization。StringSerializer);设置数据value的序列化处理类props。put(value。serializer,org。apache。kafka。common。serialization。StringSerializer);KafkaProducerString,StringproducernewKafkaProducer(props);try{inti0;RandomrnewRandom();String〔〕lang{flink,spark,hadoop,hive,hbase,impala,presto,superset,nbi};while(true){Thread。sleep(2000);WaterSensorwaterSensornewWaterSensor(lang〔r。nextInt(lang。length)〕kafka,i,i);i;StringmsgJSONObject。toJSONString(waterSensor);System。out。println(msg);RecordMetadatarecordMetadataproducer。send(newProducerRecord(kafkadatawaterSensor,null,null,msg))。get();System。out。println(recordMetadata:{recordMetadata});}}catch(Exceptione){System。out。println(e。getMessage());}}} 根据业务需要,定义各种消息对象packagecom。importjava。io。Simportjava。util。DCreatedbyljon20220713。publicclassWaterSensorimplementsSerializable{publicSpublicWaterSensor(){}publicWaterSensor(Stringid,longts,intvc){this。this。this。}publicintgetVc(){}publicvoidsetVc(intvc){this。}publicStringgetId(){}publicvoidsetId(Stringid){this。}publiclonggetTs(){}publicvoidsetTs(longts){this。}} sparkstreaming数据流计算packagecom。importcom。alibaba。fastjson。JSONOimportcom。pojo。WaterSimportorg。apache。kafka。clients。consumer。ConsumerRimportorg。apache。kafka。common。TopicPimportorg。apache。spark。SparkCimportorg。apache。spark。api。java。JavaRDD;importorg。apache。spark。api。java。function。Fimportorg。apache。spark。api。java。function。VoidFunction2;importorg。apache。spark。sql。Dimportorg。apache。spark。sql。Rimportorg。apache。spark。sql。SparkSimportorg。apache。spark。streaming。Dimportorg。apache。spark。streaming。Timportorg。apache。spark。streaming。api。java。JavaDSimportorg。apache。spark。streaming。api。java。JavaInputDSimportorg。apache。spark。streaming。api。java。JavaReceiverInputDSimportorg。apache。spark。streaming。api。java。JavaStreamingCimportorg。apache。spark。streaming。kafka010。ConsumerSimportorg。apache。spark。streaming。kafka010。KafkaUimportorg。apache。spark。streaming。kafka010。LocationSimportjava。util。;Createdbyljon20220718。publicclassSparkSqlKafka{privatestaticStringappNamespark。streaming。privatestaticStringmasterlocal〔〕;privatestaticStringtopicskafkadatawaterSprivatestaticStringbrokers127。0。0。1:9092;publicstaticvoidmain(String〔〕args){初始化sparkConfSparkConfsparkConfnewSparkConf()。setMaster(master)。setAppName(appName);获得JavaStreamingContextJavaStreamingContextsscnewJavaStreamingContext(sparkConf,Durations。minutes(3));设置ahrefhttps:www。bs178。comrizhitargetblankclassinfotextkey日志a的级别:避免ahrefhttps:www。bs178。comrizhitargetblankclassinfotextkey日志a重复ssc。sparkContext()。setLogLevel(ERROR);CollectionStringtopicsSetnewHashSet(Arrays。asList(topics。split(,)));kafka相关参数,必要!缺了会报错MapString,ObjectkafkaParamsnewHashMap();kafkaParams。put(metadata。broker。list,brokers);kafkaParams。put(bootstrap。servers,brokers);kafkaParams。put(group。id,group1);kafkaParams。put(key。serializer,org。apache。kafka。common。serialization。StringSerializer);kafkaParams。put(key。deserializer,org。apache。kafka。common。serialization。StringDeserializer);kafkaParams。put(value。deserializer,org。apache。kafka。common。serialization。StringDeserializer);通过KafkaUtils。createDirectStream(。。。)获得kafka数据,kafka相关参数由kafkaParams指定JavaInputDStreamConsumerRecordObject,ObjectlinesKafkaUtils。createDirectStream(ssc,LocationStrategies。PreferConsistent(),ConsumerStrategies。Subscribe(topicsSet,kafkaParams));JavaDStreamWaterSensormapDStreamlines。map(newFunctionConsumerRecordObject,Object,WaterSensor(){OverridepublicWaterSensorcall(ConsumerRecordObject,Objects)throwsException{WaterSensorwaterSensorJSONObject。parseObject(s。value()。toString(),WaterSensor。class);returnwaterS}})。window(Durations。minutes(9),Durations。minutes(6));指定窗口大小和滑动频率必须是批处理时间的整数倍;mapDStream。foreachRDD(newVoidFunction2JavaRDDWaterSensor,Time(){Overridepublicvoidcall(JavaRDDWaterSensorwaterSensorJavaRDD,Timetime)throwsException{SparkSessionsparkJavaSparkSessionSingleton。getInstance(waterSensorJavaRDD。context()。getConf());DatasetRowdataFramespark。createDataFrame(waterSensorJavaRDD,WaterSensor。class);创建临时表dataFrame。createOrReplaceTempView(log);DatasetRowresultspark。sql(selectfromlog);System。out。println(time);输出前20条数据result。show();数据写入mysqlwriteDataToMysql(result);}});开始作业ssc。start();try{ssc。awaitTermination();}catch(Exceptione){e。printStackTrace();}finally{ssc。close();}}} NBI大数据可视化构建平台对接mysql,构建数据应用: NBI可视化