需求背景 将Kafka中的数据通过Flume收集并存储到Hive里。环境准备 这里是使用Apache社区版部署的环境,而非CDH、HDP等方式,可以先参考官方文档搭建好环境。配置Hive修改hivesite。xml文件,添加以下内容。propertynamehive。txn。managernamevalueorg。apache。hadoop。hive。ql。lockmgr。DbTxnManagervaluedescriptionSettoorg。apache。hadoop。hive。ql。lockmgr。DbTxnManageraspartofturningonHivetransactions,whichalsorequiresappropriatesettingsforhive。compactor。initiator。on,hive。compactor。worker。threads,hive。support。concurrency(true),andhive。exec。dynamic。partition。mode(nonstrict)。ThedefaultDummyTxnManagerreplicatespreHive0。13behaviorandprovidesnotransactions。descriptionpropertypropertynamehive。support。concurrencynamevaluetruevaluedescriptionWhetherHivesupportsconcurrencycontrolornot。AZooKeeperinstancemustbeupandrunningwhenusingzookeeperHivelockmanagerdescriptionpropertypropertynamehive。metastore。urisnamevaluethrift:localhost:9083valuedescriptionThriftURIfortheremotemetastore。Usedbymetastoreclienttoconnecttoremotemetastore。descriptionproperty创建hive数据库和表,需要注意,建表时需要分区、分桶、事务。CREATEDATABASECREATETABLEtestkafkatohive(timestring,typestring,valuestring,codestring)PARTITIONEDBY(partitiontimestring)CLUSTEREDBY(time)INTO2BUCKETSstoredASORCTBLPROPERTIES(transactionaltrue);配置Flume新建配置文件kafka2hive。conf,内容如下,更多相关的配置可以参考官方文档kafkasource和hivesink。创建source、channel、sinka。sourceskafkasourcea。channelsmemorychannela。sinkshivesinkkafka为souce的配置a。sources。kafkasource。typeorg。apache。flume。source。kafka。KafkaSourcea。sources。kafkasource。zookeeperConnectlocalhost:2181a。sources。kafkasource。bootstrap。serverslocalhost:9092a。sources。kafkasource。topicprocessedrealtimeDataa。sources。kafkasource。channelsmemorychannela。sources。kafkasource。consumer。timeout。ms1000a。sources。kafkasource。batchSize10hive为sink的配置a。sinks。hivesink。typehivea。sinks。hivesink。hive。metastorethrift:hosthivemetadatastore:9083a。sinks。hivesink。hive。databasetestdba。sinks。hivesink。hive。tabletestkafkatohivea。sinks。hivesink。hive。partitionYmda。sinks。hivesink。hive。txnsPerBatchAsk2a。sinks。hivesink。batchSize1a。sinks。hivesink。serializerJSONa。sinks。hivesink。serializer。fieldnamestime,type,value,codechannel的配置a。channels。memorychannel。typecom。my。flume。channel。JsonParsedMemoryChannela。channels。memorychannel。capacity1000a。channels。memorychannel。transactionCapacity100三者之间的关系a。sources。kafkasource。channelsmemorychannela。sinks。hivesink。channelmemorychannel拷贝Hadoop的配置文件coresite。xml和hdfssite。xml到Flume的conf目录里,编辑coresite。xml文件,添加以下内容。propertynamefs。hdfs。implnamevalueorg。apache。hadoop。hdfs。DistributedFileSystemvalueproperty运行Flume。binflumengagentc。conff。confkafka2hive。confnaDflume。root。loggerINFO,console注意事项运行过程中遇到很多类不存在或者找不到类等相关的异常,解决办法是把Hive和Hadoop的相关jar包拷贝到Flume的lib目录下,如下这些是部署过程中碰到的一些,具体可以根据实际情况来处理。Hivehcatalogsharehcataloghivehcatalogstreaming3。1。0。jarhcatalogsharehcataloghivehcatalogcore3。1。0。jarhcatalogsharehcataloghivehcatalogserverextensions3。1。0。jarhcatalogsharehcataloghivehcatalogpigadapter3。1。0。jarhcatalogsharehcataloghivehcatalogcore3。1。0。jarlibhivejdbc3。1。0。jarliblog4j1。2api2。10。0。jarliblog4japi2。10。0。jarliblog4jcore2。10。0。jarliblog4jslf4jimpl2。10。0。jarliblog4jweb2。10。0。jarlibhivestandalonemetastore3。1。0。jarlibhivecontrib3。1。0。jarlibhiveexec3。1。0。jarliblibfb3030。9。3。jarlibcalcitecore1。16。0。jarjdbchivejdbc3。1。0standalone。jarjdbchivejdbc3。1。0standalone。jarHadoopsharehadoopcommonhadoopcommon2。7。7。jarsharehadoopmapreducehadoopmapreduceclientcommon2。7。7。jarsharehadoopmapreducehadoopmapreduceclientcore2。7。7。jarsharehadoopcommonlibcommonsconfiguration1。6。jarsharehadoopcommonlibhadoopauth2。7。7。jarsharehadoophdfshadoophdfs2。7。7。jarsharehadoophdfslibhtracecore3。1。0incubating。jar