01背景 随着欧加集团大数据业务的发展,现阶段公司大数据平台20个组件,1EB级别数据量,平台1000人均日活,服务已经有相当大的规模。在这样的业务背景下,越来越多的用户在使用大数据平台时,发现难以定位问题。基于此,我们设计大数据诊断平台,旨在提升用户解决问题效率,降低用户异常成本。代号罗盘,意为用户定位问题,给出优化方案。 此前业务存在问题现状总结如下: 1、问题定位效率相对低。平台组件多,从上层调度器、Livy客户端到中层计算引擎Spark,最后底层Hadoop系统;用户作业日志量大,没法串联一起,问题上下文关联难;用户人员角色非单一研发角色人员,自行分析能力有限,需平台方提供协助解决,沟通与定位让双方工作量只增不减;缺乏自动化工具定位问题等等。各种因素说明,海量作业调度,多种类型运行环境,TB级别日志量,依靠人力盘查作业问题是非常耗的。 2、异常问题类型多,缺乏有效知识库,高效重复利用已有的解决方案。从作业调度任务系统到计算引擎层,常见的业务问题常见如:晚点溯源、高频失败、运行耗时长、数据倾斜、暴力扫描、shuffle失败、CPU浪费、内存浪费、内存溢出等,需将问题数量降低收敛。 3、异常任务、不合理任务成本多。用户任务在执行周期内发生异常或者配置不合理,将导致任务浪费资源,产生许多额外的成本,需将此类问题成本损失降至最低。 总体上希望,从问题出发、经过快速定位、优化方案、问题收敛环节,最后达到降本增效目的。 02业界产品 基于以上问题,我们调研了业界有关大数据诊断系统,目前比较类似的是Dr。Elephant开源系统,Dr。Elephant一个Hadoop和Spark的性能监控调优工具。它能自动采集Airflow、Azkaban、Oozie等调度系统作业流及计算引擎Spark和HadoopMR的运行指标,分析作业的异常和性能结果,指导开发者进行作业调优,从而提升开发者工作效率和集群资源利用率。 工作原理: Dr。Elephant定期从Yarn资源管理中心拉取近期成功和失败的作业列表。每个作业会实时从历史服务器中获取到元数据、配置及调度器作业信息以及监控数据。一旦获取到所有的元数据信息,Dr。Elephant就基于这些元数据运行启发式算法,并生成一份该作业的诊断报告。对该作业报告,进行标记和评级,分为五个级别来评定作业存在新能问题严重程度。 核心功能: 集成多个调度器框架如Azkaban、Airflow、Oozie等;统计历史作业和工作流的性能指标;Job级别工作流对比;支持多个计算引擎框架性能诊断(Spark、Tez、MapReduce、TonY);基于自定义规则的可配置启发式插件,用户诊断作业;提供RESTAPI,用户能通过API获取所有信息; 欠缺功能: 支持Spark,Hadoop系统版本比较低,对于新版本Spark,Hadoop兼容性不友好;不支持Spark,Hadoop新版本的特性的诊断;诊断指标比较少,其中Spark相关指标仅4个,对于高度依赖Spark引擎是非常欠缺的;不支持日志级别问题诊断,不能够诊断调度器运行任务或者App应用程序的出现的异常;调度器和作业App元数据的关联在一些场景下不支持;不支持异常资源的管理,达到降本增效指引目的;对SparkHistory服务接口频繁调用影响History服务的稳定性;缺乏有效的降本增效流程辅组工具; 综上所述,结合我们有大规模Spark集群调度特点,业界产品对我们解决业务痛点效果不佳,我们决定自研诊断系统来解决业务带来的挑战。 03技术方案 由上述可知,系统在业务层面既能快速定位解决用户问题,又能帮助用户管理异常资源;架构层面支持Spark,Hadoop多指标诊断又不影响第三方系统性能问题,我们采用非入侵的方式设计诊断系统。 架构层主要由同步工作流层任务元数据模块、同步YarnSparkApp元数据模块、关联工作流层引擎层App元数据模块、工作流任务异常检测模块,引擎层异常检测模块,Portal展示模块组成。同时调度器(SchedulerServer)可以适配多个开源调度器项目,如内部系统Oflow、Airflow、DolphinScheduler等。 整体架构图 整体架构分3层:第一层为对接外部系统,调度器、Yarn、HistoryServer、HDFS等系统,同步元数据、集群状态、运行环境状态、日志等到诊断系统分析;第二层为架构层,包括数据采集、元数据关联模型标准化、异常检测、诊断Portal模块;第三层为基础组件层,包括MySQL、Elasticsearch、Kafka、Redis等组件。 具体模块流程阶段: (1)数据采集阶段:从调度系统将用户、DAG、作业、执行记录等工作流元数据同步至诊断系统;定时同步YarnResourceManager、SparkHistoryServerApp元数据至诊断系统,标志作业运行指标存储路径,为后续数据处理阶段作基础; (2)数据关联模型标准化阶段:将分步采集的工作流执行记录、SparkApp、YarnApp、集群运行环境配置等数据通过ApplicationID介质进行关联,此时,工作流层与引擎层元数据已关联完毕,得到数据标准模型(user,dag,task,application,clusterConfig,time); (3)工作流层引擎层异常检测阶段:至此已经获得数据标准模型,针对标准模型进一步Workflow异常检测流程,同时平台维护着一套沉淀多年的数据治理知识库,加载知识库到标准模型,通过启发式规则,对标准模型的指标数据、日志同时进行异常挖掘,结合集群状态及运行是环境状态,分析得出工作流层、引擎层异常结果; (4)业务视图:存储、分析数据,提供给用户任务概览、工作流层任务诊断、引擎层作业Application诊断,工作流层展示调度器执行任务引发的异常,如任务失败、回环任务、基线偏离任务等问题,计算引擎层展示Spark作业执行引发的耗时、资源使用、运行时问题; 04实践效果 我们从四个方面简述诊断平台带来的效果:诊断平台UI、效率分析、成本分析、稳定性分析、降本增效分析。 (1)诊断平台UI 引擎层分析主要展示Spark计算过程中异常、不合理的作业,并给作业记录异常标签,如CPU浪费、数据倾斜、Task长尾、大表扫描等异常类型标签,这些标签是数据标准模型经过工作流层、引擎层异常检测得出,同时可以让用户清楚作业的问题原因。 (2)效率分析 长尾Task分析 原因:长尾任务是由于作业运行过程中,一个Task或多个Task单元执行时间过长,拖延整个任务运行时间。 危害:作业执行时间过长,资源浪费 诊断:从时间角度计算,执行时间过长原因在于Task读取数据量多或者数据读取慢。如果读取数据过多,那么将出现数据倾斜,按数据倾斜方式处理;如果读取数据过慢,那么Hadoop集群的节点负载高或者有网络丢包问题等,导致数据读取慢,可以联系运维处理。 HDFS卡顿分析 原因:HDFS卡顿是Spark作业中Task最小执行单元读取数据速率比其他Task慢,低于阈值; 危害:作业执行时间过长,浪费资源; 诊断:作业数据所在机器网络IO问题或者集群配置不一致问题,导致Task从Hadoop读取数据速率低下。这种情况一般伴随着长尾Task出现,同时表现Task执行时间过长、读取数据量少,导致整个数据处理Task无法高效利用回收。这种情况需排查数据在节点配置及机器硬件配置; 推测执行过多分析 原因:推测执行(speculative)是指作业执行单元Task在同一个Stage中的执行时间相比其他Task执行时间长,在其他Executor发起相同Task执行,先完成的Task将Kill另个Task,并取得结果。这样情况下如果作业大部分Task都发起推测执行,超过一定比例,就是推测执行过多的表现; 危害:任务执行时间长,资源浪费恶化; 诊断:机器配置不同、网络波动、集群负载高、作业数据倾斜等都会引起推测执行,过多的慢任务执行推测将会导致资源恶化,推测执行其实是对资源的压榨、用空间换取时间的做法。解决执行推测要从多方面入手,结合集群状态环境。 全局排序异常分析 原因:SparkStage中的Task只有一个时,而且处理的数量级别大,Stage中的所有数据都集中在一个Task中,这种情况即发生全局排序异常。 危害:任务处理时间长、消耗资源大 诊断:全局排序异常并没有发挥Spark并发计算特性,Task处理数据漫长,非常消耗资源,解决这个问题需要对作业进行重新分区,并发计算数据。 (3)成本分析 CPU浪费分析 原因:SparkDriverExecutorcores参数配置不合理导致CPU空闲浪费 危害:没用高效利用资源 诊断:通过SparkApplication采集指标,分析SparkDriver、SparkExecutor执行过程中的CPU的运行时间(单位:vcoresecond)占比,如果空闲时间超过一定的比例,判定为浪费,用户根据比例降低启用CPU数量。 计算ApplicationCPU浪费过程中,采集到Executor执行开始和结束时间、Executor执行所有Job开始和结束时间、Job内部真正执行TaskCPU时间,最终获得以下指标: 所有Executor的并发个数Count,每个Executor固定核数ExecutorCores所有Executor内Job真正执行时间和JobTime(计算Job开始结束时间交叉和)所有Executor内Task个数TaskCount及每个Task执行CPU时间 总CPU计算时间估算为: 实际使用CPU计算时间为: CPU浪费百分比: 如果空闲比很大,可以适当降低参数spark。executor。cores的值,降低并发度,或者减少RDD分区数和Shuffle参数spark。sql。shuffle。partitions。 内存浪费分析 原因:分析DriverExecutor内存使用峰值占总内存比例,当空闲比例值超过阈值,为内存浪费 危害:没用高效利用资源 诊断:采集SparkApplicationDriverExecutor的相关内存指标,与CPU浪费计算同理,获得Executor指标如下: 所有Executor个数Count,每个Executor内存ExecutorMemory每个Executor执行时间每个Executor执行过程内存峰值 总的内存时间估算为: 实际内存时间为: 浪费内存百分比: 如果空闲比很大,可以适当降低参数spark。executor。memory的值; (4)稳定性分析 全表扫描问题 原因:SparkSQL查询大表数据时,没有进行分区条件筛选,或者SQL比较复杂时,发生了全表扫描; 危害:作业执行时间长,集群负载高,影响其他作业执行 诊断:SparkSQL扫描数据表时,尽管现在Spark对优化器已经有不少的优化,如谓词下推、列裁剪、常量合并等,但都相对简单,在没分区的大表或者用户Join大表和小表时,会出现全表扫描或者分区不合理暴力扫描情况。一旦执行了这种作业,一方面用户长时间才能得到数据结果,另一方面平台方承载作业扫描全表的压力,作业会占用集群主要资源,拖慢其他作业。因此用户需要根据具体业务做条件限制,调整SparkSQL以及对表分区等。 数据倾斜分析 原因:数据倾斜是Task计算过程中Key分布不均造成的,个别Key的数据特别多,超出计算节点的计算能力; 危害:会导致任务内存溢出、计算资源利用率低、作业执行时间超出预期; 诊断:数据倾斜发生时,大量的MapStage数据发送到ReduceStage,ReduceStage节点需要处理大量数据,其他依赖该节点将处于长时间等待状态。比如Stage1依赖Stage0的执行执行结果,如果Stage0发生数据倾斜,导致执行过长或者直接挂起,Stage1将处于等待状态,整个作业也一直挂起,这是资源将被这个作业占有,但只有极少数Task在执行,造成计算资源浪费,利用率低;大量数据将集中在少数计算节点上,当数据量超出单个节点的内存范围,最终内存溢出,导致任务失败。一般出现在SQL字段:joinon,groupby,partitionby,countdistinct等,解决数据倾斜常用方式有: 增大并行度spark。sql。shuffle。partitions,使得数据再次分配到不同T过滤异常值的数据,过多冗余值也会导致数据倾斜;SQL中groupby或者RDD的reduceByKey添加key的随机数打散Map,Reduce两个阶段数据,最后在Reduce阶段将随机数去掉;表Join关联时,可以使用Broadcast方式广播小表数据,避免shuffle,就不会发生数据倾斜; Shuffle失败分析 原因:由于作业配置、网络、操作系统、硬件多个因素,Shuffle在节点之间传输数据会失败 危害:作业异常退出,资源浪费 诊断:作业计算过程中,Shuffle作为SparkMapReduce框架中的数据纽带,经常出现失败问题,问题可以分ShuffleRead和ShuffleWrite两部分。 由图看出,ShuffleWrite的分区(partition)数量跟MapTask(RDD)的数量一致,文件被分割后,经算子计算的中间排序结果临时存放在各个Executor所在的本地磁盘,可以理解为ShuffleWrite做了本地磁盘保存文件操作。ShuffleRead的分区数有Spark提供的一些参数控制,参数不合理将会导致ReduceTask异常,如数据倾斜,甚至OOM造成Executor退出,下游网络连接不上。由诊断抓取异常了解到原因后,从Shuffle的数据量和处理Shuffle数据的分区数两个角度给出方案: 减少shuffle数据量,使用BroadcastJoin或者去掉不必要字段等;有groupby、Join、reduceby、partitionby等算子操作可以通过shuffle的partitions参数,根据数据量或计算复杂度提高参数值,另外控制好并行度以及运行任务的总核数,官方推荐运行Task为核数的23倍;提高Executor的内存,防止内存溢出或者JVMC提高Spark网络RPC通信时间配置,可以让数据处理完成等; 内存溢出 原因:Spark内存使用超出了容量造成内存溢出 危害:作业异常退出,资源浪费 诊断:按照Spark内存模型,用户实际使用内存如下 用户作业内存溢出分堆内和堆外两种方式:堆外内存溢出:表现为作业被Yarn节点Kill,主要原因是MonitorMemory超出申请内存限制堆内内存溢出:表现为JVM内存空间不足或者GC超出限制,任务内的数据量过多导致 定位到原因后,可以有多种处理方式:提高executorMemory,堆内内存增大;降低executorCores,减少并行度,处理数据量变少;重新分配分区(repartition),对每个Task产生的RDD、Dataframe数据量减少等;提高executorMemoryOverhead参数,堆外内存增大;处理数据倾斜,如groupby、reduceby等热点key打散; SQL其他常见问题分析 原因:SQL执行过程中没权限、表不存在、语法错误等; 危害:任务执行异常退出,浪费资源 诊断:具有SQL失败特征从指标数据或者日志提取,用户根据问题去申请相应权限、创建表或者修正语法问题,能快速解决问题。 (5)降本增效 以上讲述了常见的问题案例场景,这里不再多介绍,接下来我们分析下降本增效。 通过作业层和引擎层分析识别异常、不合理任务,累计识别任务的内存、CPU资源,转化为相应的成本,通过任务元数据关联,按个人、业务、部门三个维度汇总给用户,并设置排名等机制,推进数据治理。 以下通过长期推进治理,可以看成本趋势,用户聚焦的任务问题得以改善。 05总结与规划 OPPO大数据任务诊断平台主要围绕离线调度任务、计算引擎两个方面对问题进行定位分析,使用丰富的知识库,提供给用户解决优化方案,同时达到降本增效的目的。技术方面采用非入侵方案对接其他系统,保证了其他系统的安全性。系统架构基于启发式规则定位、分析问题方式,但知识库比较依赖人员经验的积累,更深层次问题需要数据挖掘算法扩大检测范围,智能化诊断。另外,除了对Spark任务问题诊断,OPPO大数据诊断平台还针对Flink任务进行异常、资源问题诊断,整体平台包含Spark、Flink两种计算引擎诊断,届时将会对平台(罗盘)进行开源。 作者简介 BobZhuangOPPO高级数据平台工程师 专注大数据分布式系统研发,曾就职于Kingsoft公司。 XiaoyouWangOPPO数据平台工程师 2019年加入OPPO,负责大数据系统相关设计和开发工作,拥有丰富的后端研发经验。 来源:微信公众号:安第斯智能云 出处:https:mp。weixin。qq。comsBkwpNCSIOtepN2LrjcLQ