重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
本篇内容介绍了“Shuffle流程是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
创新互联坚持“要么做到,要么别承诺”的工作理念,服务领域包括:成都做网站、网站设计、企业官网、英文网站、手机端网站、网站推广等服务,满足客户于互联网时代的云岩网站设计、移动媒体设计的需求,帮助企业找到有效的互联网解决方案。努力成为您成熟可靠的网络建设合作伙伴!
1、从WordCountMapper类中的map方法中写出kv后,进入shuffle流程 --context.write(outK,outV); 进入TaskInputOutputContext中的write()方法 --看下就过 进入WrappedMapper.java中的mapContext.write(key, value);方法 //112行 进入TaskInputOutputContextImpl.java 中output.write(key, value);方法 //89行 最终定位到MapTask的write()方法内, //726行
2、重点步骤,收集器对象将kv收集到缓冲区,并在收集前将kv的分区号计算出来. collector.collect(key, value,partitioner.getPartition(key, value, partitions)); 第一次进入该方法时,因为没有设置reduce的个数,所以最终返回的永远是0号分区
3、定位到MapTask类中的collect方法并进入 //1082行 bufferRemaining -= METASIZE; //计算缓冲区剩余大小,该行代码前面的代码是对kv类型的一个判断 如果bufferRemaining < 0 则开始进行溢写操作,内部是对数据的一些校验和计算
4、定位到startSpill(); --1126行 //只有当溢写数据大小满足80%时,才会触发该操作 WordCountMapper持续往缓冲区写数据,当达到溢写条件80%时,开始溢写
5、进入到startSpill()方法内部 --MapTask类1590行 spillReady.signal(); //1602行 --线程通信, 通知溢写线程开始干活 //执行溢写线程(MapTask内部类SpillThread)的run方法 //run方法中调用MapTask$MapOutputBuffer中的sortAndSpill()方法 直接执行下面的排序和溢写方法 --sortAndSpill()方法 --MapTask的1605行
6、定位到1615行 final SpillRecord spillRec = new SpillRecord(partitions); //根据分区数创建溢写记录对象 --排序按照分区排序,溢写按照分区溢写 final Path filename =mapOutputFile.getSpillFileForWrite(numSpills, size);//获取溢写文件名称 ///tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619 _0001/attempt_local1440922619_0001_m_000000_0/output/(spill0.out),这时还没有溢写文件,只有目录 out = rfs.create(filename); //创建执行改步后,在上述的目录下生成溢写文件spill0.out文件
7、继续向下走,定位到MapTask类的1625行 sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); //溢写前排序 8、定位到1629行,进入for循环 --按照分区进行溢写 9、分析for循环内代码,看具体溢写过程 9.1 先有一个writer对象,通过该对象来完成数据溢写 writer = new Writer(job, partitionOut, keyClass, valClass, codec, 9.2 判断是否有设置combinerRunner对象 如果有,则按照设置的combinerRunner业务去处理; 如果没有,则走默认的溢写规则 10、执行到1667行,即writer.close();方法,本次溢写完毕,此时我们再去看溢写文件spill0.out文件有数据
11、if (totalIndexCacheMemory >= indexCacheMemoryLimit(大小为:1M)) {} //MapTask类的1685行 // 如果索引数据超过指定的内存大小,也需要溢写到文件中.(该现象一般情况很难发生.)
12、当本次溢写完毕之后,继续回到WordCountMapper类中的map方法内的context.write(outk,outv);方法处 --说明:因为我们使用本地debug模式调试,所以看不到并行的效果,只能是串行效果,因此看到的是当内存内读取满足 80%时,发生溢写操作,其实溢写并未停止,只不过我们看不到,剩余的溢写数据在20%内存进行
13、如上溢写过程,在整个mapTask中会出现N次,具体多少看数据量. 如果map中最后的数据写到缓冲区,但是没有满足 80%溢写条件的情况,最终也需要将缓冲区的数据刷写到磁盘(最后一次溢写)。 最后一次会发生在 MapTask中关闭 NewOutputCollector对象的时候. 即在该行代码处发生 output.close(mapperContext); --MapTask的805行 14、进入output.close(mapperContext);方法内 --MapTask的732行 定位到collector.flush();方法 // 735行 -->将缓冲区的数据刷写到磁盘-->重新走sortAndSpill()方法(最后一次刷写)
上述流程,每发生一次溢写就会生成一个溢写小文件(溢写文件内的数据是排好序的) 最终所有的数据都写到磁盘中后,在磁盘上就是多个溢写文件, 比如:spill0.out,spill1.out,...spillN.out
15、溢写全部完成之后,就进入归并操作 --MapTask的1527行 mergeParts();方法,进入该方法,定位到MapTask的1844行 filename[0]: /tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local 1440922619_0001/attempt_local1440922619_0001_m_000000_0/output/spill0.out
16、继续向下走,定位到MapTask的1880行 Path finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize); --归并后,最终输出的文件路径 /tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_00 01/attempt_local1440922619_0001_m_000000_0/output/file.out 17、继续向下走,定位到MapTask的1882行 Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize); --归并后,最终输出文件的索引文件 /tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_00 01/attempt_local1440922619_0001_m_000000_0/output/file.out.index 18、创建file.out 文件 FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); 19、for (int parts = 0; parts < partitions; parts++) {} //1925行,按照分区进行归并排序 20、for循环内具体的归并操作 //1950行 RawKeyValueIterator kvIter = Merger.merge(job, rfs, keyClass, valClass, codec, segmentList, mergeFactor, new Path(mapId.toString()), job.getOutputKeyComparator(), reporter, sortSegments, null, spilledRecordsCounter, sortPhase.phase(), TaskType.MAP);
21、归并后的数据写出到文件 Writerwriter = new Writer (job, finalPartitionOut, keyClass, valClass, codec,spilledRecordsCounter); //1961行 //归并也可以使用combiner,但是前提条件是设置了combiner,并且溢写次数大于等于3 if (combinerRunner == null || numSpills < minSpillsForCombine(3)) { Merger.writeFile(kvIter, writer, reporter, job); } else { combineCollector.setWriter(writer); combinerRunner.combine(kvIter, combineCollector); } 22、归并完成 writer.close(); //1972行
23、写出索引文件 spillRec.writeToFile(finalIndexFile, job); //1986行 24、删除所有的溢写文件spill0.out spill1.out ... spill0.out,只保留最终的输出文件。 for(int i = 0; i < numSpills; i++) { rfs.delete(filename[i],true); }
“Shuffle流程是什么”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注创新互联网站,小编将为大家输出更多高质量的实用文章!