diff --git a/src/core/org/apache/hadoop/util/IndexedCountingSortable.java b/src/core/org/apache/hadoop/util/IndexedCountingSortable.java new file mode 100644 index 00000000..835c5781 --- /dev/null +++ b/src/core/org/apache/hadoop/util/IndexedCountingSortable.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +/** + * Liner time inplace counting sort + */ +public abstract class IndexedCountingSortable { + abstract public int getKey(int i); + + abstract public int get(int i); + + abstract public void put(int i, int v); + + final int[] counts; + final int[] starts; + final int total; + + public IndexedCountingSortable(int[] counts, int total) { + this.total = total; + this.counts = counts; + this.starts = new int[counts.length]; + for (int i = 1; i < counts.length; i++) { + starts[i] = starts[i - 1] + counts[i - 1]; + } + assert (starts[counts.length - 1] + counts[counts.length - 1] == total); + } + + public void sort() { + int[] dest = new int[total]; + for (int i = 0; i < total; i++) { + int p = getKey(i); + dest[starts[p]++] = get(i); + } + for (int i = 0; i < total; i++) { + put(i, dest[i]); + } + } + + private int findSwapPosition(int partition) { + while (counts[partition] > 0) { + counts[partition]--; + int pos = starts[partition] + counts[partition]; + int part = getKey(pos); + if (part != partition) { + return part; + } + } + return -1; + } + + public void sortInplace() { + for (int i = 0; i < counts.length; i++) { + while (true) { + int part = findSwapPosition(i); + if (part < 0) { + break; + } + int hole = starts[i] + counts[i]; + int tempOffset = get(hole); + while (true) { + int next = findSwapPosition(part); + int pos = starts[part] + counts[part]; + int temp = get(pos); + put(pos, tempOffset); + tempOffset = temp; + if (i == next) { + put(hole, tempOffset); + break; + } + part = next; + } + } + } + } +} diff --git a/src/mapred/org/apache/hadoop/mapred/BasicReducePartition.java b/src/mapred/org/apache/hadoop/mapred/BasicReducePartition.java index 8646cf7b..53157c52 100644 --- a/src/mapred/org/apache/hadoop/mapred/BasicReducePartition.java +++ b/src/mapred/org/apache/hadoop/mapred/BasicReducePartition.java @@ -135,7 +135,7 @@ public int getCollectedBytesSize() { return collectedBytesSize; } - abstract void groupOrSort(); + abstract void groupOrSort(boolean sort); public abstract KeyValueSpillIterator getKeyValueSpillIterator(); diff --git a/src/mapred/org/apache/hadoop/mapred/BlockMapOutputBuffer.java b/src/mapred/org/apache/hadoop/mapred/BlockMapOutputBuffer.java index e0b1b3f7..7b0215fc 100644 --- a/src/mapred/org/apache/hadoop/mapred/BlockMapOutputBuffer.java +++ b/src/mapred/org/apache/hadoop/mapred/BlockMapOutputBuffer.java @@ -31,12 +31,14 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.mapred.IFile.Writer; import org.apache.hadoop.mapred.Merger.Segment; import org.apache.hadoop.mapred.Task.TaskReporter; +import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ResourceCalculatorPlugin.ProcResourceValues; @@ -57,6 +59,7 @@ public class BlockMapOutputBuffer 0 && lastSpillInMem) { // if there is already one spills, we can try to hold this last spill in // memory. - sortReduceParts(); + sortReduceParts(sort); for (int i = 0; i < partitions; i++) { this.inMemorySegments[i] = new Segment(this.reducePartitions[i].getIReader(), @@ -281,6 +284,61 @@ public synchronized void flush() throws IOException, ClassNotFoundException, mergeEndMilli - mergeStartMilli); } + private RawKeyValueIterator getNoSortRawKeyValueIterator( + final List> segmentList) + throws IOException { + return new RawKeyValueIterator() { + private Progress progress = new Progress(); + Segment currentSegment = null; + int currentIndex = -1; + @Override + public boolean next() throws IOException { + while (true) { + if (currentSegment==null) { + currentIndex++; + if (currentIndex() { - @Override - public int compare(byte[] b1, int s1, int l1, - byte[] b2, int s2, int l2) { - return LexicographicalComparerHolder.BEST_COMPARER - .compareTo(b1, s1, l1, b2, s2, l2); - } - - @Override - public int compare(K o1, K o2) { - return LexicographicalComparerHolder.BEST_COMPARER - .compareTo(o1.getBytes(), 0, o1.getLength(), - o2.getBytes(), 0, o2.getLength()); - } - }, reporter, null, task.spilledRecordsCounter); + RawKeyValueIterator kvIter = null; + if (sort) { + // merge + kvIter = Merger.merge(job, rfs, keyClass, valClass, codec, + segmentList, job.getInt("io.sort.factor", 100), + new Path(mapId.toString()), new RawComparator() { + @Override + public int compare(byte[] b1, int s1, int l1, + byte[] b2, int s2, int l2) { + return LexicographicalComparerHolder.BEST_COMPARER + .compareTo(b1, s1, l1, b2, s2, l2); + } + + @Override + public int compare(K o1, K o2) { + return LexicographicalComparerHolder.BEST_COMPARER + .compareTo(o1.getBytes(), 0, o1.getLength(), + o2.getBytes(), 0, o2.getLength()); + } + }, reporter, null, task.spilledRecordsCounter); + } else { + kvIter = getNoSortRawKeyValueIterator(segmentList); + } // write merged output to disk long segmentStart = finalOut.getPos(); diff --git a/src/mapred/org/apache/hadoop/mapred/MapTask.java b/src/mapred/org/apache/hadoop/mapred/MapTask.java index 793e8c0f..74574524 100644 --- a/src/mapred/org/apache/hadoop/mapred/MapTask.java +++ b/src/mapred/org/apache/hadoop/mapred/MapTask.java @@ -59,6 +59,7 @@ import org.apache.hadoop.mapred.Merger.Segment; import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.IndexedCountingSortable; import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.IndexedSorter; import org.apache.hadoop.util.Progress; @@ -532,6 +533,7 @@ private class NewOutputCollector TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException { + boolean sort = job.getBoolean("mapred.map.output.sort", true); collector = new MapOutputBuffer(umbilical, job, reporter); partitions = jobContext.getNumReduceTasks(); if (partitions > 0) { @@ -840,6 +842,9 @@ class MapOutputBuffer private ArrayList indexCacheList; private int totalIndexCacheMemory; private static final int INDEX_CACHE_MEMORY_LIMIT = 1024 * 1024; + + private final boolean sort; + private final int [] kvCounts; @SuppressWarnings("unchecked") public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job, @@ -856,6 +861,9 @@ public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job, spillSortCounters = new MapSpillSortCounters(reporter); + sort = job.getBoolean("mapred.map.output.sort", true); + kvCounts = sort ? null : new int[partitions]; + //sanity checks final float spillper = job.getFloat("io.sort.spill.percent",(float)0.8); final float recper = job.getFloat("io.sort.record.percent",(float)0.05); @@ -911,6 +919,9 @@ public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job, combineInputCounter, reporter, null); if (combinerRunner != null) { + if (!sort) { + throw new IOException("Combiner not supported in no sort mode"); + } combineCollector= new CombineOutputCollector(combineOutputCounter); } else { combineCollector = null; @@ -1015,6 +1026,9 @@ public synchronized void collect(K key, V value, int partition kvindices[ind + KEYSTART] = keystart; kvindices[ind + VALSTART] = valstart; kvindex = kvnext; + if (!sort) { + kvCounts[partition]++; + } } catch (MapBufferTooSmallException e) { LOG.info("Record too large for in-memory buffer: " + e.getMessage()); spillSingleRecord(key, value, partition); @@ -1044,6 +1058,12 @@ public int compare(int i, int j) { kvindices[ij + KEYSTART], kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]); } + + public int comparePartition(int i, int j) { + final int ii = kvoffsets[i % kvoffsets.length]; + final int ij = kvoffsets[j % kvoffsets.length]; + return kvindices[ii + PARTITION] - kvindices[ij + PARTITION]; + } /** * Swap logical indices st i, j MOD offset capacity. @@ -1337,7 +1357,11 @@ private void sortAndSpill() throws IOException, ClassNotFoundException, long sortStartMilli = System.currentTimeMillis(); ProcResourceValues sortStartProcVals = getCurrentProcResourceValues(); //do the sort - sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter); + if (sort) { + sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter); + } else { + sortPartitionsOnly(kvstart, endPosition); + } // get the cumulative resources used after the sort, and use the diff as // resources/wallclock consumed by the sort. long sortEndMilli = System.currentTimeMillis(); @@ -1573,6 +1597,95 @@ public long getTotalBytesProcessed() { } } + private void sortPartitionsOnly(final int l, final int r) { + new IndexedCountingSortable(kvCounts, r-l) { + @Override + public int get(int i) { + return kvoffsets[(l+i) % kvoffsets.length]; + } + @Override + public void put(int i, int v) { + kvoffsets[(l+i) % kvoffsets.length] = v; + } + @Override + public int getKey(int i) { + return kvindices[get(i) + PARTITION]; + } + }.sort(); + // clear kvCounts for next sort&spill + for (int i = 0; i < kvCounts.length; i++) { + kvCounts[i] = 0; + } + } + + private RawKeyValueIterator getNoSortRawKeyValueIterator( + final List> segmentList) + throws IOException { + return new RawKeyValueIterator() { + private Progress progress = new Progress(); + Segment currentSegment = null; + int currentIndex = -1; + @Override + public boolean next() throws IOException { + while (true) { + if (currentSegment==null) { + currentIndex++; + if (currentIndex> segmentList, TaskAttemptID mapId) throws IOException { + //merge + return Merger.merge(job, rfs, + keyClass, valClass, codec, + segmentList, job.getInt("io.sort.factor", 100), + new Path(mapId.toString()), + job.getOutputKeyComparator(), reporter, + null, spilledRecordsCounter); + } + + @SuppressWarnings("unchecked") private void mergeParts() throws IOException, InterruptedException, ClassNotFoundException { // get the approximate size of the final output/index files @@ -1659,14 +1772,17 @@ private void mergeParts() throws IOException, InterruptedException, } } - //merge - @SuppressWarnings("unchecked") - RawKeyValueIterator kvIter = Merger.merge(job, rfs, - keyClass, valClass, codec, - segmentList, job.getInt("io.sort.factor", 100), - new Path(mapId.toString()), - job.getOutputKeyComparator(), reporter, - null, spilledRecordsCounter); + RawKeyValueIterator kvIter = null; + if (sort) { + kvIter = Merger.merge(job, rfs, + keyClass, valClass, codec, + segmentList, job.getInt("io.sort.factor", 100), + new Path(mapId.toString()), + job.getOutputKeyComparator(), reporter, + null, spilledRecordsCounter); + } else { + kvIter = getNoSortRawKeyValueIterator(segmentList); + } //write merged output to disk long segmentStart = finalOut.getPos(); diff --git a/src/mapred/org/apache/hadoop/mapred/Merger.java b/src/mapred/org/apache/hadoop/mapred/Merger.java index 2df89b2c..230edcf4 100644 --- a/src/mapred/org/apache/hadoop/mapred/Merger.java +++ b/src/mapred/org/apache/hadoop/mapred/Merger.java @@ -200,7 +200,7 @@ public Segment(Reader reader, boolean preserve) { this.segmentLength = reader.getLength(); } - private void init(Counters.Counter readsCounter) throws IOException { + void init(Counters.Counter readsCounter) throws IOException { if (reader == null) { FSDataInputStream in = fs.open(file); in.seek(segmentOffset); diff --git a/src/mapred/org/apache/hadoop/mapred/ReducePartition.java b/src/mapred/org/apache/hadoop/mapred/ReducePartition.java index c0ca096c..a98837cc 100644 --- a/src/mapred/org/apache/hadoop/mapred/ReducePartition.java +++ b/src/mapred/org/apache/hadoop/mapred/ReducePartition.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapred; import java.io.IOException; +import java.util.Iterator; import java.util.List; import org.apache.hadoop.fs.FSDataOutputStream; @@ -34,6 +35,47 @@ class ReducePartition extends BasicReducePartition { + class NoSortKeyValueSpillIterator implements KeyValueSpillIterator { + int totalRecordsNum = 0; + Iterator blockIterator; + MemoryBlock currentBlock; + int recordIndex; + MemoryBlockIndex memoryBlockIndex; + + public NoSortKeyValueSpillIterator(List memBlks, + int collectedRecordsNum) { + totalRecordsNum = collectedRecordsNum; + blockIterator = memBlks.iterator(); + memoryBlockIndex = new MemoryBlockIndex(); + } + + public void reset(List memBlks, int collectedRecordsNum) { + totalRecordsNum = collectedRecordsNum; + blockIterator = memBlks.iterator(); + currentBlock = null; + recordIndex = 0; + } + + @Override + public int getRecordNumber() { + return totalRecordsNum; + } + + @Override + public MemoryBlockIndex next() { + while (currentBlock == null || recordIndex>=currentBlock.currentPtr) { + if (!blockIterator.hasNext()) { + return null; + } else { + currentBlock = blockIterator.next(); + recordIndex = 0; + } + } + memoryBlockIndex.setMemoryBlockIndex(currentBlock, recordIndex++); + return memoryBlockIndex; + } + } + class KeyValueSortedArray extends PriorityQueue implements KeyValueSpillIterator { @@ -109,7 +151,7 @@ public int getRecordNumber() { } } - protected KeyValueSortedArray keyValueSortArray; + protected KeyValueSpillIterator keyValueSpillIterator; public ReducePartition(int reduceNum, MemoryBlockAllocator memoryBlockAllocator, byte[] kvBuffer, @@ -145,7 +187,7 @@ public int collect(K key, V value) throws IOException { } public KeyValueSpillIterator getKeyValueSpillIterator() { - return keyValueSortArray; + return keyValueSpillIterator; } public IndexRecord spill(JobConf job, FSDataOutputStream out, @@ -179,19 +221,29 @@ public IndexRecord spill(JobConf job, FSDataOutputStream out, return rec; } - public void groupOrSort() { + public void groupOrSort(boolean sort) { reporter.progress(); List memBlks = snapShot(); for (int i = 0; i < memBlks.size(); i++) { MemoryBlock memBlk = memBlks.get(i); - sortMemBlock(memBlk); + if (sort) { + sortMemBlock(memBlk); + } } // now do a merge sort on the list of memory blocks - if (keyValueSortArray == null) { - keyValueSortArray = new KeyValueSortedArray(memBlks, - getCollectedRecordsNum()); + if (sort) { + if (keyValueSpillIterator == null) { + keyValueSpillIterator = new KeyValueSortedArray(memBlks, + getCollectedRecordsNum()); + } else { + ((KeyValueSortedArray)keyValueSpillIterator).reset(memBlks, getCollectedRecordsNum()); + } } else { - keyValueSortArray.reset(memBlks, getCollectedRecordsNum()); + if (keyValueSpillIterator == null) { + keyValueSpillIterator = new NoSortKeyValueSpillIterator(memBlks, getCollectedRecordsNum()); + } else { + ((NoSortKeyValueSpillIterator)keyValueSpillIterator).reset(memBlks, getCollectedRecordsNum()); + } } this.collectedRecordsNum = 0; this.collectedBytesSize = 0; diff --git a/src/mapred/org/apache/hadoop/mapred/ReduceTask.java b/src/mapred/org/apache/hadoop/mapred/ReduceTask.java index 7f514f1d..84f3eabe 100644 --- a/src/mapred/org/apache/hadoop/mapred/ReduceTask.java +++ b/src/mapred/org/apache/hadoop/mapred/ReduceTask.java @@ -102,6 +102,7 @@ class ReduceTask extends Task { private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName()); private int numMaps; private ReduceCopier reduceCopier; + private Exception reduceCopierException; private CompressionCodec codec; @@ -226,6 +227,87 @@ private Path[] getMapFiles(FileSystem fs, boolean isLocal) return fileList.toArray(new Path[0]); } + private class NoSortKVIterator implements RawKeyValueIterator { + private Progress progress = new Progress(); + List> segments = new ArrayList>(); + int currentSegmentIndex = -1; + Segment currentSegment = null; + int usedSegments=0; + + public NoSortKVIterator() { + } + + /** + * constructor for local runner + */ + public NoSortKVIterator(FileSystem rfs, Path [] files) throws IOException { + for (Path file : files) { + segments.add(new Segment(conf, rfs, file, codec, false)); + } + } + + protected boolean getNewSegments() throws IOException { + return false; + } + + private boolean nextSegment() throws IOException { + currentSegmentIndex++; + if (currentSegmentIndex>=segments.size()) { + if (!getNewSegments()) { + return false; + } + currentSegmentIndex = 0; + } + currentSegment = segments.get(currentSegmentIndex); + currentSegment.init(spilledRecordsCounter); + usedSegments++; + progress.set(usedSegments/(float)numMaps); + return true; + } + + @Override + public boolean next() throws IOException { + while (true) { + if (currentSegment==null) { + if (nextSegment()==false) { + return false; + } + } else { + if (currentSegment.next()) { + return true; + } else { + currentSegment.close(); + currentSegment = null; + } + } + } + } + + @Override + public DataInputBuffer getKey() throws IOException { + return currentSegment.getKey(); + } + + @Override + public DataInputBuffer getValue() throws IOException { + return currentSegment.getValue(); + } + + @Override + public Progress getProgress() { + return progress; + } + + @Override + public void close() throws IOException { + } + + @Override + public long getTotalBytesProcessed() { + return 0; + } + } + private class ReduceValuesIterator extends ValuesIterator { public ReduceValuesIterator (RawKeyValueIterator in, @@ -351,6 +433,7 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException, InterruptedException, ClassNotFoundException { this.umbilical = umbilical; job.setBoolean("mapred.skip.on", isSkipping()); + boolean sort = job.getBoolean("mapred.map.output.sort", true); taskStartTime = System.currentTimeMillis(); if (isMapOrReduce()) { @@ -386,12 +469,36 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical) ProcResourceValues copyStartProcVals = getCurrentProcResourceValues(); if (!isLocal) { reduceCopier = new ReduceCopier(umbilical, job, reporter); - if (!reduceCopier.fetchOutputs()) { - if(reduceCopier.mergeThrowable instanceof FSError) { - throw (FSError)reduceCopier.mergeThrowable; + if (sort) { + if (!reduceCopier.fetchOutputs()) { + if(reduceCopier.mergeThrowable instanceof FSError) { + throw (FSError)reduceCopier.mergeThrowable; + } + throw new IOException("Task: " + getTaskID() + + " - The reduce copier failed", reduceCopier.mergeThrowable); } - throw new IOException("Task: " + getTaskID() + - " - The reduce copier failed", reduceCopier.mergeThrowable); + } else { + final Thread mainThread = Thread.currentThread(); + Thread copierThread = new Thread(new Runnable() { + @Override + public void run() { + try { + if (!reduceCopier.fetchOutputs()) { + if(reduceCopier.mergeThrowable instanceof FSError) { + throw (FSError)reduceCopier.mergeThrowable; + } + throw new IOException("Task: " + getTaskID() + + " - The reduce copier failed", reduceCopier.mergeThrowable); + } + } catch (Exception e) { + reduceCopierException = e; + LOG.warn(e); + mainThread.interrupt(); + } + } + }); + copierThread.start(); + copierThread.setName("ReduceCopier"); } } long reducerCopyEndMilli = System.currentTimeMillis(); @@ -402,16 +509,24 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical) statusUpdate(umbilical); final FileSystem rfs = FileSystem.getLocal(job).getRaw(); - RawKeyValueIterator rIter = isLocal - ? Merger.merge(job, rfs, job.getMapOutputKeyClass(), - job.getMapOutputValueClass(), codec, getMapFiles(rfs, true), - !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100), - new Path(getTaskID().toString()), job.getOutputKeyComparator(), - reporter, spilledRecordsCounter, null) - : reduceCopier.createKVIterator(job, rfs, reporter); - - // free up the data structures - mapOutputFilesOnDisk.clear(); + RawKeyValueIterator rIter = null; + if (sort) { + rIter = isLocal + ? Merger.merge(job, rfs, job.getMapOutputKeyClass(), + job.getMapOutputValueClass(), codec, getMapFiles(rfs, true), + !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100), + new Path(getTaskID().toString()), job.getOutputKeyComparator(), + reporter, spilledRecordsCounter, null) + : reduceCopier.createKVIterator(job, rfs, reporter); + // free up the data structures + mapOutputFilesOnDisk.clear(); + } else { + rIter = isLocal ? new NoSortKVIterator(rfs, getMapFiles(rfs, true)) + : reduceCopier.getNoSortRawKVIterator(); + if (isLocal) { + mapOutputFilesOnDisk.clear(); + } + } long sortEndMilli = System.currentTimeMillis(); ProcResourceValues sortEndProcVals = getCurrentProcResourceValues(); @@ -421,7 +536,7 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical) statusUpdate(umbilical); Class keyClass = job.getMapOutputKeyClass(); Class valueClass = job.getMapOutputValueClass(); - RawComparator comparator = job.getOutputValueGroupingComparator(); + RawComparator comparator = sort ? job.getOutputValueGroupingComparator() : null; if (useNewApi) { runNewReducer(job, umbilical, reporter, rIter, comparator, @@ -506,7 +621,7 @@ public void collect(OUTKEY key, OUTVALUE value) comparator, keyClass, valueClass, job, reporter, umbilical) : new ReduceValuesIterator(rIter, - job.getOutputValueGroupingComparator(), keyClass, valueClass, + comparator, keyClass, valueClass, job, reporter); values.informReduceProgress(); while (values.more()) { @@ -650,6 +765,11 @@ class ReduceCopier implements MRConstants { */ private ReduceTask reduceTask; + /** + * whether the map outputs is sorted & needs merge in reduce side + */ + private boolean sort; + /** * the list of map outputs currently being copied */ @@ -866,6 +986,11 @@ class ReduceCopier implements MRConstants { private final List mapOutputsFilesInMemory = Collections.synchronizedList(new LinkedList()); + /** + * Condition for new data coming, used by no sort code path + */ + private Object newDataComming = new Object(); + /** * The map for (Hosts, List of MapIds from this Host) maintaining * map output locations @@ -1184,10 +1309,13 @@ public void setNumCopiedMapOutputs(int numRequiredMapOutputs) { } public void close() { - synchronized (dataAvailable) { - closed = true; - LOG.info("Closed ram manager"); - dataAvailable.notify(); + synchronized (newDataComming) { + synchronized (dataAvailable) { + closed = true; + LOG.info("Closed ram manager"); + dataAvailable.notify(); + } + newDataComming.notify(); } } @@ -1406,7 +1534,10 @@ private long copyOutput(MapOutputLocation loc // Process map-output if (mapOutput.inMemory) { // Save it in the synchronized list of map-outputs - mapOutputsFilesInMemory.add(mapOutput); + synchronized (newDataComming) { + mapOutputsFilesInMemory.add(mapOutput); + newDataComming.notify(); + } } else { // Rename the temporary file to the final file; // ensure it is on the same partition @@ -1419,8 +1550,11 @@ private long copyOutput(MapOutputLocation loc tmpMapOutput + " to " + filename); } - synchronized (mapOutputFilesOnDisk) { - addToMapOutputFilesOnDisk(localFileSys.getFileStatus(filename)); + synchronized (newDataComming) { + synchronized (mapOutputFilesOnDisk) { + addToMapOutputFilesOnDisk(localFileSys.getFileStatus(filename)); + newDataComming.notify(); + } } } @@ -1872,7 +2006,11 @@ private void configureClasspath(JobConf conf) URLClassLoader loader = new URLClassLoader(urls, parent); conf.setClassLoader(loader); } - + + public RawKeyValueIterator getNoSortRawKVIterator() { + return new ShufflingNoSortKVIterator(); + } + public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf, TaskReporter reporter )throws ClassNotFoundException, IOException { @@ -1882,6 +2020,7 @@ public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf, this.shuffleClientMetrics = new ShuffleClientMetrics(conf); this.umbilical = umbilical; this.reduceTask = ReduceTask.this; + this.sort = conf.getBoolean("mapred.map.output.sort", true); this.scheduledCopies = new ArrayList(100); this.copyResults = new ArrayList(100); @@ -1892,7 +2031,7 @@ public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf, this.combinerRunner = CombinerRunner.create(conf, getTaskID(), combineInputCounter, reporter, null); - if (combinerRunner != null) { + if (combinerRunner != null && sort) { combineCollector = new CombineOutputCollector(reduceCombineOutputCounter); } @@ -1970,12 +2109,14 @@ public boolean fetchOutputs() throws IOException { copier.start(); } - //start the on-disk-merge thread - localFSMergerThread = new LocalFSMerger((LocalFileSystem)localFileSys); - //start the in memory merger thread - inMemFSMergeThread = new InMemFSMergeThread(); - localFSMergerThread.start(); - inMemFSMergeThread.start(); + if (sort) { + //start the on-disk-merge thread + localFSMergerThread = new LocalFSMerger((LocalFileSystem)localFileSys); + //start the in memory merger thread + inMemFSMergeThread = new InMemFSMergeThread(); + localFSMergerThread.start(); + inMemFSMergeThread.start(); + } // start the map events thread getMapEventsThread = new GetMapEventsThread(); @@ -2301,7 +2442,7 @@ public boolean fetchOutputs() throws IOException { ramManager.close(); //Do a merge of in-memory files (if there are any) - if (mergeThrowable == null) { + if (mergeThrowable == null && sort) { try { // Wait for the on-disk merge to complete localFSMergerThread.join(); @@ -2547,7 +2688,51 @@ private void addToMapOutputFilesOnDisk(FileStatus status) { } } - + private class ShufflingNoSortKVIterator extends NoSortKVIterator { + public ShufflingNoSortKVIterator() { + super(); + } + + @Override + protected boolean getNewSegments() throws IOException { + synchronized (newDataComming) { + while (mapOutputsFilesInMemory.size() == 0 + && mapOutputFilesOnDisk.size() == 0) { + if (ramManager.closed) { + return false; + } else { + try { + newDataComming.wait(); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + } + segments.clear(); + synchronized (mapOutputsFilesInMemory) { + for (MapOutput mo : mapOutputsFilesInMemory) { + Reader reader = + new InMemoryReader(ramManager, mo.mapAttemptId, + mo.data, 0, mo.data.length); + Segment segment = new Segment(reader, true); + segments.add(segment); + } + mapOutputsFilesInMemory.clear(); + } + ArrayList mapOutputFiles = new ArrayList(); + synchronized (mapOutputFilesOnDisk) { + for (FileStatus st : mapOutputFilesOnDisk) { + mapOutputFiles.add(st.getPath()); + } + mapOutputFilesOnDisk.clear(); + } + for (Path file : mapOutputFiles) { + segments.add(new Segment(conf, rfs, file, codec, false)); + } + } + return segments.size()>0; + } + } /** Starts merging the local copy (on disk) of the map's output so that * most of the reducer's input is sorted i.e overlapping shuffle diff --git a/src/mapred/org/apache/hadoop/mapred/Task.java b/src/mapred/org/apache/hadoop/mapred/Task.java index e181b5fd..ed641199 100644 --- a/src/mapred/org/apache/hadoop/mapred/Task.java +++ b/src/mapred/org/apache/hadoop/mapred/Task.java @@ -1113,7 +1113,7 @@ private void readNextKey() throws IOException { DataInputBuffer nextKeyBytes = in.getKey(); keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength()); nextKey = keyDeserializer.deserialize(nextKey); - hasNext = key != null && (comparator.compare(key, nextKey) == 0); + hasNext = key != null && (comparator != null) && (comparator.compare(key, nextKey) == 0); } else { hasNext = false; } diff --git a/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java b/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java index 8f08f0a4..f02cc54f 100644 --- a/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java +++ b/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java @@ -117,12 +117,13 @@ public boolean nextKeyValue() throws IOException, InterruptedException { hasMore = input.next(); if (hasMore) { next = input.getKey(); - nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, - currentRawKey.getLength(), - next.getData(), - next.getPosition(), - next.getLength() - next.getPosition() - ) == 0; + nextKeyIsSame = (comparator == null) ? false : + (comparator.compare(currentRawKey.getBytes(), 0, + currentRawKey.getLength(), + next.getData(), + next.getPosition(), + next.getLength() - next.getPosition() + ) == 0); } else { nextKeyIsSame = false; } diff --git a/src/test/org/apache/hadoop/mapred/TestNoSort.java b/src/test/org/apache/hadoop/mapred/TestNoSort.java new file mode 100644 index 00000000..fa165111 --- /dev/null +++ b/src/test/org/apache/hadoop/mapred/TestNoSort.java @@ -0,0 +1,77 @@ +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.lib.IdentityMapper; +import org.apache.hadoop.mapred.lib.IdentityReducer; + +public class TestNoSort extends HadoopTestCase { + public TestNoSort() throws IOException { + super(CLUSTER_MR, LOCAL_FS, 2, 1); + } + + private static final Path ROOT_DIR = new Path("testing/nosort"); + private static final Path IN_DIR = new Path(ROOT_DIR, "input"); + private static final Path OUT_DIR = new Path(ROOT_DIR, "output"); + + private Path getDir(Path dir) { + // Hack for local FS that does not have the concept of a 'mounting point' + if (isLocalFS()) { + String localPathRoot = System.getProperty("test.build.data", "/tmp") + .replace(' ', '+'); + dir = new Path(localPathRoot, dir); + } + return dir; + } + + private JobConf conf; + private FileSystem fs; + + @Override + protected void setUp() throws Exception { + super.setUp(); + conf = createJobConf(); + fs = FileSystem.get(conf); + } + + @Override + protected void tearDown() throws Exception { + fs.delete(getDir(ROOT_DIR), true); + super.tearDown(); + } + + void prepareInput(int size, FileSystem fs) throws IOException { + FSDataOutputStream out = fs.create(getDir(new Path(IN_DIR, "part-00000"))); + for (int i=0;i0) { + int l = values[indexes[i]]; + int r = values[indexes[i-1]]; + assertTrue(l>=r); + } + has[indexes[i]]++; + } + for (int i=0;i