农业资讯网
当前位置: 首页 农业百科

如何通俗理解mapreduce(MapReduce过程详解及其性能优化)

时间:2023-05-24 作者: 小编 阅读量: 1 栏目名: 农业百科

减少Mapper的个数的话,就要合并小文件,这种小文件有可能是直接来自于数据源的小文件,也可能是Reduce产生的小文件。如果一个键值对的partition值为1,意味着这个键值对会交给第一个Reducer处理。最佳的方式是自己定义一个Partitioner,用输入数据的最大值除以系统Reducetask数量的商作为分割边界,也就是说分割数据的边界为此商的1倍、2倍至numPartitions-1倍,这样就能保证执行partition后的数据是整体有序的。

从JVM的角度看Map和Reduce

Map阶段包括:

第一读数据:从HDFS读取数据

1、问题:读取数据产生多少个Mapper??

Mapper数据过大的话,会产生大量的小文件,由于Mapper是基于虚拟机的,过多的Mapper创建和初始化及关闭虚拟机都会消耗大量的硬件资源;

Mapper数太小,并发度过小,Job执行时间过长,无法充分利用分布式硬件资源;

2、Mapper数量由什么决定??

(1)输入文件数目

(2)输入文件的大小

(3)配置参数

这三个因素决定的。

涉及参数:

mapreduce.input.fileinputformat.split.minsize //启动map最小的split size大小,默认0

mapreduce.input.fileinputformat.split.maxsize //启动map最大的split size大小,默认256M

dfs.block.size//block块大小,默认64M

计算公式:splitSize = Math.max(minSize, Math.min(maxSize, blockSize));

例如默认情况下:例如一个文件800M,Block大小是128M,那么Mapper数目就是7个。6个Mapper处理的数据是128M,1个Mapper处理的数据是32M;

再例如一个目录下有三个文件大小分别为:5M10M 150M 这个时候其实会产生四个Mapper处理的数据分别是5M,10M,128M,22M。

Mapper是基于文件自动产生的,如果想要自己控制Mapper的个数???

就如上面,5M,10M的数据很快处理完了,128M要很长时间;这个就需要通过参数的控制来调节Mapper的个数。

减少Mapper的个数的话,就要合并小文件,这种小文件有可能是直接来自于数据源的小文件,也可能是Reduce产生的小文件。

设置合并器:(set都是在hive脚本,也可以配置Hadoop)

设置合并器本身:

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;set hive.merge.mapFiles=true;set hive.merge.mapredFiles=true;set hive.merge.size.per.task=256000000;//每个Mapper要处理的数据,就把上面的5M10M……合并成为一个

一般还要配合一个参数:

set mapred.max.split.size=256000000 // mapred切分的大小set mapred.min.split.size.per.node=128000000//低于128M就算小文件,数据在一个节点会合并,在多个不同的节点会把数据抓过来进行合并。

Hadoop中的参数:可以通过控制文件的数量控制mapper数量

mapreduce.input.fileinputformat.split.minsize(default:0),小于这个值会合并mapreduce.input.fileinputformat.split.maxsize 大于这个值会切分

第二处理数据:

partition说明

对于map输出的每一个键值对,系统都会给定一个partition,partition值默认是通过计算key的hash值后对Reduce task的数量取模获得。如果一个键值对的partition值为1,意味着这个键值对会交给第一个Reducer处理。

自定义partitioner的情况:

1、我们知道每一个Reduce的输出都是有序的,但是将所有Reduce的输出合并到一起却并非是全局有序的,如果要做到全局有序,我们该怎么做呢?最简单的方式,只设置一个Reduce task,但是这样完全发挥不出集群的优势,而且能应对的数据量也很受限。最佳的方式是自己定义一个Partitioner,用输入数据的最大值除以系统Reduce task数量的商作为分割边界,也就是说分割数据的边界为此商的1倍、2倍至numPartitions-1倍,这样就能保证执行partition后的数据是整体有序的。

2、解决数据倾斜:另一种需要我们自己定义一个Partitioner的情况是各个Reduce task处理的键值对数量极不平衡。对于某些数据集,由于很多不同的key的hash值都一样,导致这些键值对都被分给同一个Reducer处理,而其他的Reducer处理的键值对很少,从而拖延整个任务的进度。当然,编写自己的Partitioner必须要保证具有相同key值的键值对分发到同一个Reducer。

3、自定义的Key包含了好几个字段,比如自定义key是一个对象,包括type1,type2,type3,只需要根据type1去分发数据,其他字段用作二次排序。

环形缓冲区

Map的输出结果是由collector处理的,每个Map任务不断地将键值对输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽可能多的数据。

这个数据结构其实就是个字节数组,叫Kvbuffer,名如其义,但是这里面不光放置了数据,还放置了一些索引数据,给放置索引数据的区域起了一个Kvmeta的别名,在Kvbuffer的一块区域上穿了一个IntBuffer(字节序采用的是平台自身的字节序)的马甲。数据区域和索引数据区域在Kvbuffer中是相邻不重叠的两个区域,用一个分界点来划分两者,分界点不是亘古不变的,而是每次Spill之后都会更新一次。初始的分界点是0,数据的存储方向是向上增长,索引数据的存储方向是向下增长Kvbuffer的存放指针bufindex是一直闷着头地向上增长,比如bufindex初始值为0,一个Int型的key写完之后,bufindex增长为4,一个Int型的value写完之后,bufindex增长为8。

索引是对在kvbuffer中的键值对的索引,是个四元组,包括:value的起始位置、key的起始位置、partition值、value的长度,占用四个Int长度,Kvmeta的存放指针Kvindex每次都是向下跳四个“格子”,然后再向上一个格子一个格子地填充四元组的数据。比如Kvindex初始位置是-4,当第一个键值对写完之后,(Kvindex 0)的位置存放value的起始位置、(Kvindex 1)的位置存放key的起始位置、(Kvindex 2)的位置存放partition的值、(Kvindex 3)的位置存放value的长度,然后Kvindex跳到-8位置,等第二个键值对和索引写完之后,Kvindex跳到-12位置。

第三写数据到磁盘

Mapper中的Kvbuffer的大小默认100M,可以通过mapreduce.task.io.sort.mb(default:100)参数来调整。可以根据不同的硬件尤其是内存的大小来调整,调大的话,会减少磁盘spill的次数此时如果内存足够的话,一般都会显著提升性能。spill一般会在Buffer空间大小的80%开始进行spill(因为spill的时候还有可能别的线程在往里写数据,因为还预留空间,有可能有正在写到Buffer中的数据),可以通过mapreduce.map.sort.spill.percent(default:0.80)进行调整,Map Task在计算的时候会不断产生很多spill文件,在Map Task结束前会对这些spill文件进行合并,这个过程就是merge的过程。mapreduce.task.io.sort.factor(default:10),代表进行merge的时候最多能同时merge多少spill,如果有100个spill个文件,此时就无法一次完成整个merge的过程,这个时候需要调大mapreduce.task.io.sort.factor(default:10)来减少merge的次数,从而减少磁盘的操作;

Spill这个重要的过程是由Spill线程承担,Spill线程从Map任务接到“命令”之后就开始正式干活,干的活叫SortAndSpill,原来不仅仅是Spill,在Spill之前还有个颇具争议性的Sort。

Combiner存在的时候,此时会根据Combiner定义的函数对map的结果进行合并,什么时候进行Combiner操作呢???和Map在一个JVM中,是由min.num.spill.for.combine的参数决定的,默认是3,也就是说spill的文件数在默认情况下由三个的时候就要进行combine操作,最终减少磁盘数据;

减少磁盘IO和网络IO还可以进行:压缩,对spill,merge文件都可以进行压缩。中间结果非常的大,IO成为瓶颈的时候压缩就非常有用,可以通过mapreduce.map.output.compress(default:false)设置为true进行压缩,数据会被压缩写入磁盘,读数据读的是压缩数据需要解压,在实际经验中Hive在Hadoop的运行的瓶颈一般都是IO而不是CPU,压缩一般可以10倍的减少IO操作,压缩的方式Gzip,Lzo,BZip2,Lzma等,其中Lzo是一种比较平衡选择,mapreduce.map.output.compress.codec(default:org.apache.hadoop.io.compress.DefaultCodec)参数设置。但这个过程会消耗CPU,适合IO瓶颈比较大。

Shuffle和Reduce阶段包括:

一、Copy

1、由于job的每一个map都会根据reduce(n)数将数据分成map 输出结果分成n个partition,所以map的中间结果中是有可能包含每一个reduce需要处理的部分数据的。所以,为了优化reduce的执行时间,hadoop中是等job的第一个map结束后,所有的reduce就开始尝试从完成的map中下载该reduce对应的partition部分数据,因此map和reduce是交叉进行的,其实就是shuffle。Reduce任务通过HTTP向各个Map任务拖取(下载)它所需要的数据(网络传输),Reducer是如何知道要去哪些机器取数据呢?一旦map任务完成之后,就会通过常规心跳通知应用程序的Application Master。reduce的一个线程会周期性地向master询问,直到提取完所有数据(如何知道提取完?)数据被reduce提走之后,map机器不会立刻删除数据,这是为了预防reduce任务失败需要重做。因此map输出数据是在整个作业完成之后才被删除掉的。

2、reduce进程启动数据copy线程(Fetcher),通过HTTP方式请求maptask所在的TaskTracker获取maptask的输出文件。由于map通常有许多个,所以对一个reduce来说,下载也可以是并行的从多个map下载,那到底同时到多少个Mapper下载数据??这个并行度是可以通过mapreduce.reduce.shuffle.parallelcopies(default5)调整。默认情况下,每个Reducer只会有5个map端并行的下载线程在从map下数据,如果一个时间段内job完成的map有100个或者更多,那么reduce也最多只能同时下载5个map的数据,所以这个参数比较适合map很多并且完成的比较快的job的情况下调大,有利于reduce更快的获取属于自己部分的数据。 在Reducer内存和网络都比较好的情况下,可以调大该参数;

3、reduce的每一个下载线程在下载某个map数据的时候,有可能因为那个map中间结果所在机器发生错误,或者中间结果的文件丢失,或者网络瞬断等等情况,这样reduce的下载就有可能失败,所以reduce的下载线程并不会无休止的等待下去,当一定时间后下载仍然失败,那么下载线程就会放弃这次下载,并在随后尝试从另外的地方下载(因为这段时间map可能重跑)。reduce下载线程的这个最大的下载时间段是可以通过mapreduce.reduce.shuffle.read.timeout(default180000秒)调整的。如果集群环境的网络本身是瓶颈,那么用户可以通过调大这个参数来避免reduce下载线程被误判为失败的情况。一般情况下都会调大这个参数,这是企业级最佳实战。

二、MergeSort

1、这里的merge和map端的merge动作类似,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,然后当使用内存达到一定量的时候才spill磁盘。这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置。这个内存大小的控制就不像map一样可以通过io.sort.mb来设定了,而是通过另外一个参数 mapreduce.reduce.shuffle.input.buffer.percent(default 0.7f 源码里面写死了) 来设置,这个参数其实是一个百分比,意思是说,shuffile在reduce内存中的数据最多使用内存量为:0.7 × maxHeap of reduce task。JVM的heapsize的70%。内存到磁盘merge的启动门限可以通过mapreduce.reduce.shuffle.merge.percent(default0.66)配置。也就是说,如果该reduce task的最大heap使用量(通常通过mapreduce.admin.reduce.child.java.opts来设置,比如设置为-Xmx1024m)的一定比例用来缓存数据。默认情况下,reduce会使用其heapsize的70%来在内存中缓存数据。假设 mapreduce.reduce.shuffle.input.buffer.percent 为0.7,reducetask的max heapsize为1G,那么用来做下载数据缓存的内存就为大概700MB左右。这700M的内存,跟map端一样,也不是要等到全部写满才会往磁盘刷的,而是当这700M中被使用到了一定的限度(通常是一个百分比),就会开始往磁盘刷(刷磁盘前会先做sortMerge)。这个限度阈值也是可以通过参数 mapreduce.reduce.shuffle.merge.percent(default0.66)来设定。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。这种merge方式一直在运行,直到没有map端的数据时才结束,然后启动磁盘到磁盘的merge方式生成最终的那个文件。

这里需要强调的是,merge有三种形式:1)内存到内存(memToMemMerger)2)内存中Merge(inMemoryMerger)3)磁盘上的Merge(onDiskMerger)具体包括两个:(一)Copy过程中磁盘合并(二)磁盘到磁盘。

(1)内存到内存Merge(memToMemMerger) Hadoop定义了一种MemToMem合并,这种合并将内存中的map输出合并,然后再写入内存。这种合并默认关闭,可以通过mapreduce.reduce.merge.memtomem.enabled(default:false)

打开,当map输出文件达到mapreduce.reduce.merge.memtomem.threshold时,触发这种合并。

(2)内存中Merge(inMemoryMerger):当缓冲中数据达到配置的阈值时,这些数据在内存中被合并、写入机器磁盘。阈值有2种配置方式:

配置内存比例:前面提到reduceJVM堆内存的一部分用于存放来自map任务的输入,在这基础之上配置一个开始合并数据的比例。假设用于存放map输出的内存为500M,mapreduce.reduce.shuffle.merge.percent配置为0.66,则当内存中的数据达到330M的时候,会触发合并写入。

配置map输出数量: 通过mapreduce.reduce.merge.inmem.threshold配置。在合并的过程中,会对被合并的文件做全局的排序。如果作业配置了Combiner,则会运行combine函数,减少写入磁盘的数据量。

(3)磁盘上的Merge(onDiskMerger):

(3.1)Copy过程中磁盘Merge:在copy过来的数据不断写入磁盘的过程中,一个后台线程会把这些文件合并为更大的、有序的文件。如果map的输出结果进行了压缩,则在合并过程中,需要在内存中解压后才能给进行合并。这里的合并只是为了减少最终合并的工作量,也就是在map输出还在拷贝时,就开始进行一部分合并工作。合并的过程一样会进行全局排序。

(3.2)最终磁盘中Merge:当所有map输出都拷贝完毕之后,所有数据被最后合并成一个整体有序的文件,作为reduce任务的输入。这个合并过程是一轮一轮进行的,最后一轮的合并结果直接推送给reduce作为输入,节省了磁盘操作的一个来回。最后(所以map输出都拷贝到reduce之后)进行合并的map输出可能来自合并后写入磁盘的文件,也可能来及内存缓冲,在最后写入内存的map输出可能没有达到阈值触发合并,所以还留在内存中。

每一轮合并不一定合并平均数量的文件数,指导原则是使用整个合并过程中写入磁盘的数据量最小,为了达到这个目的,则需要最终的一轮合并中合并尽可能多的数据,因为最后一轮的数据直接作为reduce的输入,无需写入磁盘再读出。因此我们让最终的一轮合并的文件数达到最大,即合并因子的值,通过mapreduce.task.io.sort.factor(default:10)来配置。

如上图:Reduce阶段中一个Reduce过程 可能的合并方式为:假设现在有20个map输出文件,合并因子配置为5,则需要4轮的合并。最终的一轮确保合并5个文件,其中包括2个来自前2轮的合并结果,因此原始的20个中,再留出3个给最终一轮。

三、Reduce函数调用(用户自定义业务逻辑)

1、当reduce将所有的map上对应自己partition的数据下载完成后,就会开始真正的reduce计算阶段。reducetask真正进入reduce函数的计算阶段,由于reduce计算时肯定也是需要消耗内存的,而在读取reduce需要的数据时,同样是需要内存作为buffer,这个参数是控制,reducer需要多少的内存百分比来作为reduce读已经sort好的数据的buffer大小??默认用多大内存呢??默认情况下为0,也就是说,默认情况下,reduce是全部从磁盘开始读处理数据。可以用mapreduce.reduce.input.buffer.percent(default 0.0)(源代码MergeManagerImpl.java:674行)来设置reduce的缓存。如果这个参数大于0,那么就会有一定量的数据被缓存在内存并输送给reduce,当reduce计算逻辑消耗内存很小时,可以分一部分内存用来缓存数据,可以提升计算的速度。所以默认情况下都是从磁盘读取数据,如果内存足够大的话,务必设置该参数让reduce直接从缓存读数据,这样做就有点Spark Cache的感觉;

2、Reduce在这个阶段,框架为已分组的输入数据中的每个 <key, (list of values)>对调用一次 reduce(WritableComparable,Iterator, OutputCollector, Reporter)方法。Reduce任务的输出通常是通过调用 OutputCollector.collect(WritableComparable,Writable)写入文件系统的。Reducer的输出是没有排序的。

性能调优

如果能够根据情况对shuffle过程进行调优,对于提供MapReduce性能很有帮助。相关的参数配置列在后面的表格中。

一个通用的原则是给shuffle过程分配尽可能大的内存,当然你需要确保map和reduce有足够的内存来运行业务逻辑。因此在实现Mapper和Reducer时,应该尽量减少内存的使用,例如避免在Map中不断地叠加。

运行map和reduce任务的JVM,内存通过mapred.child.java.opts属性来设置,尽可能设大内存。容器的内存大小通过mapreduce.map.memory.mb和mapreduce.reduce.memory.mb来设置,默认都是1024M。

map优化

在map端,避免写入多个spill文件可能达到最好的性能,一个spill文件是最好的。通过估计map的输出大小,设置合理的mapreduce.task.io.sort.*属性,使得spill文件数量最小。例如尽可能调大mapreduce.task.io.sort.mb。

map端相关的属性如下表:

reduce优化

在reduce端,如果能够让所有数据都保存在内存中,可以达到最佳的性能。通常情况下,内存都保留给reduce函数,但是如果reduce函数对内存需求不是很高,将mapreduce.reduce.merge.inmem.threshold(触发合并的map输出文件数)设为0,mapreduce.reduce.input.buffer.percent(用于保存map输出文件的堆内存比例)设为1.0,可以达到很好的性能提升。在2008年的TB级别数据排序性能测试中,Hadoop就是通过将reduce的中间数据都保存在内存中胜利的。

reduce端相关属性:

通用优化

Hadoop默认使用4KB作为缓冲,这个算是很小的,可以通过io.file.buffer.size来调高缓冲池大小。


原文链接:https://blog.csdn.net/aijiudu/article/details/72353510

    推荐阅读
  • 万词霸屏企业(万词霸屏适合什么行业)

    现如今的社会是信息产生价值的时代,网络产品更新迭代迅速,从线下销售转到线上营销的企业数不胜数,是企业发展的大趋势,但是目前网络市场鱼龙混杂,而企业没有自己的网络运营团队,导致无从下手,也有很多企业对万词霸屏系统进行咨询,那么到底那些行业更适合做霸屏推广呢?

  • 我国新能源乘用车市场现状(全国60余家新能源乘用车企业及其新能源战略盘点)

    全国60余家新能源乘用车企业及其新能源战略盘点近日,电动汽车资源网对汽车行业中涉足新能源乘用车的近60家企业进行了大盘点,以下企业的法人代表及注册资本等基本信息源于政府信息,新能源乘用车的款数以上公告的数量为准(截止到第289批《道路机动车辆生产。

  • 高血压治疗出了新指南(高血压治疗及误区)

    高血压治疗出了新指南来源:高血压治疗的根本目的是减少心脑肾以及血管并发症的出现,提高患者的生活质量,降低患者的死亡率。第一是急性期的治疗。第三是药物治疗。高血压是一种长期的疾病状态,一般是由于复杂的神经内分泌调节等因素所导致的。湘潭市第五人民医院老年病科张瑞英本文来自,仅代表作者观点。全国党媒信息公共平台提供信息发布传播服务。

  • 灭火器如何操作(灭火器怎么操作)

    接下来我们就一起去了解一下吧!灭火器如何操作一提,首先手提提把,保持水平垂直,再把灭火器瓶体上下颠倒摇晃几次,让干粉松动。二拔,拔掉灭火器保险销,在灭火器提拔下的环状金属物拔掉。三瞄,将灭火器的喷管瞄准火源,距离火焰3~5米处瞄准,一手握住喷管的最前端,控制好方向,另一只手提起灭火器提把。四压是指压住灭火器的开关,喷出干粉灭火。

  • qq电子邮件格式怎么写(qq电子邮件格式的填写方法)

    qq号码@qq.com,比如是你的qq号是12345678,则qq邮箱是12345678@qq.com;,下面我们就来说一说关于qq电子邮件格式怎么写?我们一起去了解并探讨一下这个问题吧!qq电子邮件格式怎么写qq号码@qq.com,比如是你的qq号是12345678,则qq邮箱是12345678@qq.com;英文格式,这个格式需要用户在邮箱设置里进行注册,格式为“英文@qq.com“;Foxmail格式,这个格式为“英文或数字@foxmail.com”。

  • 埃克塞特大学简介(埃克塞特大学简介介绍)

    埃克塞特大学简介埃克塞特大学,又名埃克斯特大学,是一所位于英国西南部的研究型大学,起源于十九世纪中叶,于1955年受皇家特许正式创建大学。现为英国罗素大学集团、英联邦大学协会、英国大学联盟和GW4联盟成员。学校位于英国西南部德文郡首府埃克塞特和康沃尔郡首府法尔茅斯,该地美丽的城市和海滨风光一同构成了大学优美的校园环境,被泰晤士报称为英国最美的花园式校园。学校位列2022QS世界大学排名第149名。

  • 什么品种的鸡有五个脚趾,鸡脚趾肿大是什么情况

    乌骨鸡一般有5个脚趾,前面有3个脚趾,后面有2个脚趾。如果是典型的乌骨鸡,那么它便会有丛冠、缨头、绿耳、胡须、丝毛、五爪、毛脚、乌皮、乌肉以及乌骨这10大特征,而在乌骨鸡的品种中,泰和乌骨鸡便满足了这个要求。乌骨鸡不光体型比较小,而且头部也较小,其颈部比较短,腿也比较矮,看起来较为小巧玲珑。

  • 大众途观l后排能放平吗(具体怎么操作呢)

    大众途观l后排能放平吗?以下内容大家不妨参考一二希望能帮到您!大众途观l后排能放平吗大众途观l后排能放平,具体的操作步骤如下:首先在汽车的后排,找到后背座椅的卡扣,这个卡扣是用来调节座椅放倒比例的。然后向上抬起卡扣,稍微用力就可以。然后先向前放倒左手边的座椅。

  • 热的组词(热什么意思)

    以下内容希望对你有帮助!热的组词热的组词:冷热、热心、热烈、热水、热爱、炎热、热天、热闹、闷热、热带、地热、热门、热量、燥热、酷热、热浪、热衷、郁热、热忱、白热、热乎、热度、热孝、内热、沸热、热血、热机、热切、热狗、热辣;热拼音为rè,基本含义是温度高,可引申为喧闹,热闹,情意深厚,很受人关注或欢迎的等意思。出自《素问·五常变大论》等。

  • 什么是老鼠仓(老鼠仓的解释)

    什么是老鼠仓老鼠仓是指庄家在用公有资金在拉升股价之前,先用自己个人的资金在低位建仓,待用公有资金拉升到高位后个人仓位率先卖出获利。大家都知道,中国股市的特色就是无庄不成股,而老鼠仓就存在于这些大大小小的庄股当中。这样的结果就是券商亏损累累,老鼠仓赚个钵满盆满。其实,老鼠仓就是一种财富转移的方式,是券商中某些人化公家资金为私人资金的一种方式,本质上与贪污、盗窃没有区别。