`
kirayuan
  • 浏览: 38644 次
文章分类
社区版块
存档分类
最新评论

mapreduce中控制mapper的数量

 
阅读更多

很多文档中描述,Mapper的数量在默认情况下不可直接控制干预,因为Mapper的数量由输入的大小和个数决定。在默认情况下,最终input占据了多少block,就应该启动多少个Mapper。如果输入的文件数量巨大,但是每个文件的size都小于HDFS的blockSize,那么会造成启动的Mapper等于文件的数量(即每个文件都占据了一个block),那么很可能造成启动的Mapper数量超出限制而导致崩溃。这些逻辑确实是正确的,但都是在默认情况下的逻辑。其实如果进行一些客户化的设置,就可以控制了。

<wbr><wbr><wbr><wbr>在Hadoop中,设置Map task的数量不像设置Reduce task数量那样直接,即:不能够通过API直接精确的告诉Hadoop应该启动多少个Map task。</wbr></wbr></wbr></wbr>

<wbr><wbr><wbr><wbr>你也许奇怪了,在API中不是提供了接口org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)吗?这个值难道不可以设置Map task的数量吗?这个API的确没错,在文档上解释”Note: This is only a hint to the framework.“,即这个值对Hadoop的框架来说仅仅是个提示,不起决定性的作用。也就是说,即便你设置了,也不一定得到你想要的效果。</wbr></wbr></wbr></wbr>

1. InputFormat介绍

在具体设置Map task数量之前,非常有必要了解一下与Map-Reduce输入相关的基础知识。

这个接口(org.apache.hadoop.mapred.InputFormat)描述了Map-Reduce job的输入规格说明(input-specification),它将所有的输入文件分割成逻辑上的InputSplit,每一个InputSplit将会分给一个单独的mapper;它还提供RecordReader的具体实现,这个Reader从逻辑的InputSplit上获取input records并传给Mapper处理。

InputFormat有多种具体实现,诸如FileInputFormat(处理基于文件的输入的基础抽象类),<wbr><strong>DBInputFormat</strong>(处理基于数据库的输入,数据来自于一个能用SQL查询的表),<strong>KeyValueTextInputFormat</strong>(特殊的FineInputFormat,处理Plain Text File,文件由回车或者回车换行符分割成行,每一行由key.value.separator.in.input.line分割成Key和Value),CompositeInputFormat,DelegatingInputFormat等。在绝大多数应用场景中都会使用FileInputFormat及其子类型。</wbr>

通过以上的简单介绍,我们知道InputFormat决定着InputSplit,每个InputSplit会分配给一个单独的Mapper,因此InputFormat决定了具体的Map task数量


2. FileInputFormat中影响Map数量的因素

在日常使用中,FileInputFormat是最常用的InputFormat,它有很多具体的实现。以下分析的影响Map数量的因素仅对FileInputFormat及其子类有效,其他非FileInputFormat可以去查看相应的<wbr>getSplits(JobConf job, int numSplits) 具体实现即可。</wbr>

请看如下代码段(摘抄自org.apache.hadoop.mapred.FileInputFormat.getSplits,hadoop-0.20.205.0源代码):

<wbr></wbr>

[java]<wbr><a title="view plain" href="http://blog.csdn.net/strongerbit/article/details/7440111#" style="text-decoration:none; color:rgb(82,102,115)">view plain</a><a title="copy" href="http://blog.csdn.net/strongerbit/article/details/7440111#" style="text-decoration:none; color:rgb(82,102,115)">copy</a></wbr>
  1. long<wbr>goalSize<wbr>=<wbr>totalSize<wbr>/<wbr>(numSplits<wbr>==<wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr>0<wbr>?<wbr></wbr></wbr>1<wbr>:<wbr>numSplits);<wbr><wbr></wbr></wbr></wbr></wbr>
  2. long<wbr>minSize<wbr>=<wbr>Math.max(job.getLong(</wbr></wbr></wbr>"mapred.min.split.size",<wbr></wbr>1),<wbr>minSplitSize);<wbr><wbr></wbr></wbr></wbr>
  3. <wbr><wbr></wbr></wbr>
  4. for<wbr>(FileStatus<wbr>file:<wbr>files)<wbr>{<wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr>
  5. <wbr><wbr>Path<wbr>path<wbr>=<wbr>file.getPath();<wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr>
  6. <wbr><wbr>FileSystem<wbr>fs<wbr>=<wbr>path.getFileSystem(job);<wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr>
  7. <wbr><wbr><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>if</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>((length<wbr>!=<wbr></wbr></wbr></wbr></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(192,0,0)">0</span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px">)<wbr>&amp;&amp;<wbr>isSplitable(fs,<wbr>path))<wbr>{<wbr><wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr></span></wbr></wbr>
  8. <wbr><wbr><wbr><wbr><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>long</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>blockSize<wbr>=<wbr>file.getBlockSize();<wbr><wbr></wbr></wbr></wbr></wbr></wbr></span></wbr></wbr></wbr></wbr>
  9. <wbr><wbr><wbr><wbr><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>long</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>splitSize<wbr>=<wbr>computeSplitSize(goalSize,<wbr>minSize,<wbr>blockSize);<wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr></span></wbr></wbr></wbr></wbr>
  10. <wbr><wbr><wbr><wbr><wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr>
  11. <wbr><wbr><wbr><wbr><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>long</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>bytesRemaining<wbr>=<wbr>length;<wbr><wbr></wbr></wbr></wbr></wbr></wbr></span></wbr></wbr></wbr></wbr>
  12. <wbr><wbr><wbr><wbr><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>while</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>(((</wbr></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>double</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px">)<wbr>bytesRemaining)/splitSize<wbr>&gt;<wbr>SPLIT_SLOP)<wbr>{<wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr></span></wbr></wbr></wbr></wbr>
  13. <wbr><wbr><wbr><wbr><wbr><wbr>String[]<wbr>splitHosts<wbr>=<wbr>getSplitHosts(blkLocations,length-bytesRemaining,<wbr>splitSize,<wbr>clusterMap);<wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr>
  14. <wbr><wbr><wbr><wbr><wbr><wbr>splits.add(<span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>new</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>FileSplit(path,<wbr>length-bytesRemaining,<wbr>splitSize,<wbr>splitHosts));<wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr></span></wbr></wbr></wbr></wbr></wbr></wbr>
  15. <wbr><wbr><wbr><wbr><wbr><wbr>bytesRemaining<wbr>-=<wbr>splitSize;<wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr>
  16. <wbr><wbr><wbr><wbr>}<wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr>
  17. <wbr><wbr></wbr></wbr>
  18. <wbr><wbr><wbr><wbr><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>if</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>(bytesRemaining<wbr>!=<wbr></wbr></wbr></wbr></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(192,0,0)">0</span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px">)<wbr>{<wbr><wbr></wbr></wbr></wbr></span></wbr></wbr></wbr></wbr>
  19. <wbr><wbr><wbr><wbr><wbr><wbr>splits.add(<span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>new</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>FileSplit(path,<wbr>length-bytesRemaining,<wbr>bytesRemaining,<wbr>blkLocations[blkLocations.length-</wbr></wbr></wbr></wbr></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(192,0,0)">1</span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px">].getHosts()));<wbr><wbr></wbr></wbr></span></wbr></wbr></wbr></wbr></wbr></wbr>
  20. <wbr><wbr><wbr><wbr>}<wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr>
  21. <wbr><wbr>}<wbr><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>else</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr></wbr></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>if</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>(length<wbr>!=<wbr></wbr></wbr></wbr></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(192,0,0)">0</span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px">)<wbr>{<wbr><wbr></wbr></wbr></wbr></span></wbr></wbr></wbr>
  22. <wbr><wbr><wbr><wbr>String[]<wbr>splitHosts<wbr>=<wbr>getSplitHosts(blkLocations,<span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(192,0,0)">0</span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px">,length,clusterMap);<wbr><wbr></wbr></wbr></span></wbr></wbr></wbr></wbr></wbr></wbr></wbr>
  23. <wbr><wbr><wbr><wbr>splits.add(<span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>new</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>FileSplit(path,<wbr></wbr></wbr></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(192,0,0)">0</span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px">,<wbr>length,<wbr>splitHosts));<wbr><wbr></wbr></wbr></wbr></wbr></span></wbr></wbr></wbr></wbr>
  24. <wbr><wbr>}<wbr><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>else</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>{<wbr><wbr><wbr></wbr></wbr></wbr></wbr></span></wbr></wbr></wbr>
  25. <wbr><wbr><wbr><wbr><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,130,0)">//Create<wbr>empty<wbr>hosts<wbr>array<wbr>for<wbr>zero<wbr>length<wbr>files</wbr></wbr></wbr></wbr></wbr></wbr></wbr></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr><wbr></wbr></wbr></span></wbr></wbr></wbr></wbr>
  26. <wbr><wbr><wbr><wbr>splits.add(<span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>new</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>FileSplit(path,<wbr></wbr></wbr></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(192,0,0)">0</span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px">,<wbr>length,<wbr></wbr></wbr></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>new</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>String[</wbr></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(192,0,0)">0</span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px">]));<wbr><wbr></wbr></wbr></span></wbr></wbr></wbr></wbr>
  27. <wbr><wbr>}<wbr><wbr></wbr></wbr></wbr></wbr>
  28. }<wbr><wbr></wbr></wbr>
  29. <wbr><wbr></wbr></wbr>
  30. return<wbr>splits.toArray(</wbr>new<wbr>FileSplit[splits.size()]);<wbr><wbr></wbr></wbr></wbr>
  31. <wbr><wbr></wbr></wbr>
  32. protected<wbr></wbr>long<wbr>computeSplitSize(</wbr>long<wbr>goalSize,<wbr></wbr></wbr>long<wbr>minSize,<wbr></wbr></wbr>long<wbr>blockSize)<wbr>{<wbr><wbr></wbr></wbr></wbr></wbr>
  33. <wbr><wbr><wbr><wbr><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>return</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>Math.max(minSize,<wbr>Math.min(goalSize,<wbr>blockSize));<wbr><wbr></wbr></wbr></wbr></wbr></wbr></span></wbr></wbr></wbr></wbr>
  34. }<wbr><wbr></wbr></wbr>

totalSize:是整个Map-Reduce job所有输入的总大小。

numSplits:来自job.getNumMapTasks(),即在job启动时用org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)设置的值,给M-R框架的Map数量的提示。

goalSize:是输入总大小与提示Map task数量的比值,即期望每个Mapper处理多少的数据,仅仅是期望,具体处理的数据数由下面的computeSplitSize决定。

minSplitSize:默认为1,可由子类复写函数protected void setMinSplitSize(long minSplitSize) 重新设置。一般情况下,都为1,特殊情况除外

minSize:取的1和mapred.min.split.size中较大的一个。

blockSize:HDFS的块大小,默认为64M,一般大的HDFS都设置成128M。

splitSize:就是最终每个Split的大小,那么Map的数量基本上就是totalSize/splitSize。

接下来看看computeSplitSize的逻辑:首先在goalSize(期望每个Mapper处理的数据量)和HDFS的block size中取较小的,然后与mapred.min.split.size相比取较大的


3. 如何调整Map的数量

有了2的分析,下面调整Map的数量就很容易了。


3.1 减小Map-Reduce job 启动时创建的Mapper数量

当处理大批量的大数据时,一种常见的情况是job启动的mapper数量太多而超出了系统限制,导致Hadoop抛出异常终止执行。解决这种异常的思路是减少mapper的数量。具体如下:

3.1.1 输入文件size巨大,但不是小文件

这种情况可以通过增大每个mapper的input size,即增大minSize或者增大blockSize来减少所需的mapper的数量。增大blockSize通常不可行,因为当HDFS被hadoop namenode -format之后,blockSize就已经确定了(由格式化时dfs.block.size决定),如果要更改blockSize,需要重新格式化HDFS,这样当然会丢失已有的数据。所以通常情况下只能通过增大minSize,即增大mapred.min.split.size的值


3.1.2 输入文件数量巨大,且都是小文件

所谓小文件,就是单个文件的size小于blockSize。这种情况通过增大mapred.min.split.size不可行,需要使用FileInputFormat衍生的CombineFileInputFormat将多个input path合并成一个InputSplit送给mapper处理,从而减少mapper的数量。具体细节稍后会更新并展开。


3.2 增加Map-Reduce job 启动时创建的Mapper数量

增加mapper的数量,可以通过减小每个mapper的输入做到,即减小blockSize或者减小mapred.min.split.size的值。


参考资料

http://yaseminavcular.blogspot.com/2011/06/how-to-set-number-of-maps-with-hadoop.html

http://svn.apache.org/repos/asf/hadoop/common/tags/release-0.20.205.0

分享到:
评论

相关推荐

    Hadoop中MapReduce基本案例及代码(五)

    前四节提供了几个小案例 下面详细介绍MapReduce中Map任务Reduce任务以及MapReduce的执行流程。 Map任务: 读取输入文件内容,解析成key,value对。...注意:MapReduce中,Mapper可以单独存在,但是Reducer不能存在。

    mapreduce mapreduce mapreduce

    mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce ...

    hadoop-mapreduce-examples-python:python中的所有Hadoop Mapreduce示例!

    Python 中的 Hadoop Mapreduce 示例 python 中的几个 Mapreduce 示例以及有关运行它们的文档! 运行代码的步骤 文件夹结构 假定文件存储在 Linux 操作系统中的给定位置。 这只是一个示例说明,实际上位置并不重要。 ...

    MapReduce中文文档翻译

    对Google第一版的mapreduce相关文献进行的翻译。结合了的知秋的相关文章翻译的,不收费

    MapReduce中英文 (Word)

    MapReduce中英文,MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算,中文和英文,两个文件。Word

    论文研究-MapReduce中Shuffle优化与重构 .pdf

    MapReduce中Shuffle优化与重构,彭辅权,金苍宏,如今Hadoop已成为目前最主流的云计算平台,在Hadoop分布式计算平台中,如何优化MapReduce计算性能是目前研究的一个热点问题。除了编写高

    MapReduce实现join连接

    简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接

    mapreduce在hadoop实现词统计和列式统计

    mapreduce在hadoop实现词统计和列式统计,mrwordcount工程是统计hadoop文件中的词数,mrflowcount工程是统hadoop文件中的列表

    实验项目 MapReduce 编程

    4 分别在自编 MapReduce 程序 WordCount 运行过程中和运行结束后查看 MapReduce Web 界面。 5. 分别在自编 MapReduce 程序 WordCount 运行过程中和运行结束后练习 MapReduce Shell 常用命令。 。。

    在Hadoop的MapReduce任务中使用C程序的三种方法

    但是有一些时候,我们需要在MapReduce程序中使用C语言、C++以及其他的语言,比如项目的开发人员更熟悉Java之外的语言,或者项目已经有部分功能用其他语言实现等。针对这些情况,我们需要研究如何在基于Java的...

    MapReduce简介

    MPI等并行计算方法缺少高层并行编程模型,为了克服这一缺陷,MapReduce借鉴了Lisp函数式语言中的思想,用Map和Reduce两个函数提供了高层的并行编程抽象模型 上升到构架:统一构架,为程序员隐藏系统层细节 MPI等...

    中文分词mapreduce程序

    对中文进行分词的java代码,分别在map reduce中实现。

    Hadoop中MapReduce基本案例及代码(一)

    MapReduce意味着在计算过程中实际分为两大步,Map过程和Reduce过程。 下面以一个统计单词次数简单案例为例: 数据源 Map类 import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org....

    mapreduce中文版论文

    google云计算三大论文之MapReduce

    基于MapReduce的学生平均成绩统计

    利用MapReduce实现了求学生成绩的最大值,最小值,及成绩分布。结合我的博客“MapReduce之学生平均成绩”看,效果更好。

    MapReduce求行平均值--标准差--迭代器处理--MapReduce案例

    MapReduce求行平均值--标准差--迭代器处理--MapReduce案例

    Mapreduce实验报告.doc

    在整个过程中,Mapreduce库函数 负责原始数据的切割,中间key/value对集的聚合,以及任务的调度,容错、通 信控制等基础工作。而用户定义的map和reduce函数则根据实际问题确定具体操 作。 2. 框架的基本结构和执行...

    MapReduce发明人关于MapReduce的介绍

    MapReduce发明人关于MapReduce的介绍

    论文研究-基于SDN的MapReduce带宽优化设计.pdf

    在实际Hadoop系统中,如何使作业完成时间最短成为了一个NP完全问题,导致这个问题的主要原因是MapReduce计算过程中大量的数据从Map节点向Reduce节点进行迁移,容易造成网络拥塞,使得数据迁移时间过长。软件定义网络...

    MapReduce求行平均值--MapReduce案例

    MapReduce求取行平均值 MapReduce小实例 数据有经过处理已经添加行号的 也有未添加的 行平均值的四种求法

Global site tag (gtag.js) - Google Analytics