本文主要是介绍Hadoop 1.x的Shuffle源码分析之2,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
ReduceTask类的内嵌类ReduceCopier的内嵌类MapOutputCopier的函数copyOutput是Shuffle里最重要的一环,它以http的方式,从远程主机取数据:创建临时文件名,然后用http读数据,再保存到内存文件系统或者本地文件系统。它读取远程文件的函数是getMapOutput。
getMapOutput函数如下:
private MapOutput getMapOutput(MapOutputLocation mapOutputLoc, Path filename, int reduce)throws IOException, InterruptedException {//建立http链接URL url = mapOutputLoc.getOutputLocation();HttpURLConnection connection = (HttpURLConnection)url.openConnection();//创建输入流InputStream input = setupSecureConnection(mapOutputLoc, connection);//检查连接姿势是否正确int rc = connection.getResponseCode();if (rc != HttpURLConnection.HTTP_OK) {throw new IOException("Got invalid response code " + rc + " from " + url +": " + connection.getResponseMessage());}//从http链接获取mapIdTaskAttemptID mapId = null;try {mapId =TaskAttemptID.forName(connection.getHeaderField(FROM_MAP_TASK));} catch (IllegalArgumentException ia) {LOG.warn("Invalid map id ", ia);return null;}
</pre><pre code_snippet_id="665348" snippet_file_name="blog_20150513_3_7696491" name="code" class="java"> //检查mapId是否一致TaskAttemptID expectedMapId = mapOutputLoc.getTaskAttemptId();if (!mapId.equals(expectedMapId)) {LOG.warn("data from wrong map:" + mapId +" arrived to reduce task " + reduce +", where as expected map output should be from " + expectedMapId);return null;}
//如果数据有压缩,要获取压缩长度long decompressedLength = Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH)); long compressedLength = Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));if (compressedLength < 0 || decompressedLength < 0) {LOG.warn(getName() + " invalid lengths in map output header: id: " +mapId + " compressed len: " + compressedLength +", decompressed len: " + decompressedLength);return null;}
int forReduce =(int)Integer.parseInt(connection.getHeaderField(FOR_REDUCE_TASK));if (forReduce != reduce) {LOG.warn("data for the wrong reduce: " + forReduce +" with compressed len: " + compressedLength +", decompressed len: " + decompressedLength +" arrived to reduce task " + reduce);return null;}if (LOG.isDebugEnabled()) {LOG.debug("header: " + mapId + ", compressed len: " + compressedLength +", decompressed len: " + decompressedLength);}//We will put a file in memory if it meets certain criteria://1. The size of the (decompressed) file should be less than 25% of // the total inmem fs//2. There is space available in the inmem fs// Check if this map-output can be saved in-memoryboolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength); // ShuffleMapOutput mapOutput = null;if (shuffleInMemory) {if (LOG.isDebugEnabled()) {LOG.debug("Shuffling " + decompressedLength + " bytes (" + compressedLength + " raw bytes) " + "into RAM from " + mapOutputLoc.getTaskAttemptId());}//在内存做shuffle处理mapOutput = shuffleInMemory(mapOutputLoc, connection, input,(int)decompressedLength,(int)compressedLength);} else {if (LOG.isDebugEnabled()) {LOG.debug("Shuffling " + decompressedLength + " bytes (" + compressedLength + " raw bytes) " + "into Local-FS from " + mapOutputLoc.getTaskAttemptId());}//在本地做shuffle处理mapOutput = shuffleToDisk(mapOutputLoc, input, filename, compressedLength);}mapOutput.decompressedSize = decompressedLength; return mapOutput;}
这篇关于Hadoop 1.x的Shuffle源码分析之2的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!