Skip to content
This repository has been archived by the owner on Jan 13, 2022. It is now read-only.

Support nosort dataflow #5

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions src/core/org/apache/hadoop/util/IndexedCountingSortable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.util;

/**
* Liner time inplace counting sort
*/
public abstract class IndexedCountingSortable {
abstract public int getKey(int i);

abstract public int get(int i);

abstract public void put(int i, int v);

final int[] counts;
final int[] starts;
final int total;

public IndexedCountingSortable(int[] counts, int total) {
this.total = total;
this.counts = counts;
this.starts = new int[counts.length];
for (int i = 1; i < counts.length; i++) {
starts[i] = starts[i - 1] + counts[i - 1];
}
assert (starts[counts.length - 1] + counts[counts.length - 1] == total);
}

public void sort() {
int[] dest = new int[total];
for (int i = 0; i < total; i++) {
int p = getKey(i);
dest[starts[p]++] = get(i);
}
for (int i = 0; i < total; i++) {
put(i, dest[i]);
}
}

private int findSwapPosition(int partition) {
while (counts[partition] > 0) {
counts[partition]--;
int pos = starts[partition] + counts[partition];
int part = getKey(pos);
if (part != partition) {
return part;
}
}
return -1;
}

public void sortInplace() {
for (int i = 0; i < counts.length; i++) {
while (true) {
int part = findSwapPosition(i);
if (part < 0) {
break;
}
int hole = starts[i] + counts[i];
int tempOffset = get(hole);
while (true) {
int next = findSwapPosition(part);
int pos = starts[part] + counts[part];
int temp = get(pos);
put(pos, tempOffset);
tempOffset = temp;
if (i == next) {
put(hole, tempOffset);
break;
}
part = next;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public int getCollectedBytesSize() {
return collectedBytesSize;
}

abstract void groupOrSort();
abstract void groupOrSort(boolean sort);

public abstract KeyValueSpillIterator getKeyValueSpillIterator();

Expand Down
110 changes: 86 additions & 24 deletions src/mapred/org/apache/hadoop/mapred/BlockMapOutputBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapred.Merger.Segment;
import org.apache.hadoop.mapred.Task.TaskReporter;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ResourceCalculatorPlugin.ProcResourceValues;

Expand All @@ -57,6 +59,7 @@ public class BlockMapOutputBuffer<K extends BytesWritable, V extends BytesWritab
// main output buffer
private byte[] kvbuffer;
private int kvBufferSize;
private boolean sort = true;
// spill accounting
private volatile int numSpills = 0;

Expand Down Expand Up @@ -103,6 +106,7 @@ public void configure(JobConf job) {
}
};
}
this.sort = job.getBoolean("mapred.map.output.sort", true);
rfs = ((LocalFileSystem) localFs).getRaw();

float spillper = job.getFloat("io.sort.spill.percent", (float) 0.9);
Expand Down Expand Up @@ -187,13 +191,12 @@ public void collect(K key, V value) throws IOException {
/*
* return the value of ProcResourceValues for later use
*/
protected ProcResourceValues sortReduceParts() {
protected ProcResourceValues sortReduceParts(boolean sort) {
long sortStartMilli = System.currentTimeMillis();
ProcResourceValues sortStartProcVals =
task.getCurrentProcResourceValues();
// sort
for (int i = 0; i < reducePartitions.length; i++) {
reducePartitions[i].groupOrSort();
reducePartitions[i].groupOrSort(sort);
}
long sortEndMilli = System.currentTimeMillis();
ProcResourceValues sortEndProcVals =
Expand All @@ -205,7 +208,7 @@ protected ProcResourceValues sortReduceParts() {

@Override
public void sortAndSpill() throws IOException {
ProcResourceValues sortEndProcVals = sortReduceParts();
ProcResourceValues sortEndProcVals = sortReduceParts(sort);
long sortEndMilli = System.currentTimeMillis();
// spill
FSDataOutputStream out = null;
Expand Down Expand Up @@ -262,7 +265,7 @@ public synchronized void flush() throws IOException, ClassNotFoundException,
if (numSpills > 0 && lastSpillInMem) {
// if there is already one spills, we can try to hold this last spill in
// memory.
sortReduceParts();
sortReduceParts(sort);
for (int i = 0; i < partitions; i++) {
this.inMemorySegments[i] =
new Segment<K, V>(this.reducePartitions[i].getIReader(),
Expand All @@ -281,6 +284,61 @@ public synchronized void flush() throws IOException, ClassNotFoundException,
mergeEndMilli - mergeStartMilli);
}

private RawKeyValueIterator getNoSortRawKeyValueIterator(
final List<Segment<K, V>> segmentList)
throws IOException {
return new RawKeyValueIterator() {
private Progress progress = new Progress();
Segment<K,V> currentSegment = null;
int currentIndex = -1;
@Override
public boolean next() throws IOException {
while (true) {
if (currentSegment==null) {
currentIndex++;
if (currentIndex<segmentList.size()) {
currentSegment = segmentList.get(currentIndex);
currentSegment.init(null);
progress.set(currentIndex/segmentList.size());
} else {
progress.set(1.0f);
return false;
}
} else {
if (currentSegment.next()) {
return true;
}
currentSegment = null;
}
}
}

@Override
public DataInputBuffer getKey() throws IOException {
return currentSegment.getKey();
}

@Override
public DataInputBuffer getValue() throws IOException {
return currentSegment.getValue();
}

@Override
public Progress getProgress() {
return progress;
}

@Override
public void close() throws IOException {
}

@Override
public long getTotalBytesProcessed() {
return 0;
}
};
}

private void mergeParts() throws IOException, InterruptedException,
ClassNotFoundException {
// get the approximate size of the final output/index files
Expand Down Expand Up @@ -377,25 +435,29 @@ private void mergeParts() throws IOException, InterruptedException,
segmentList.add(numSpills, this.inMemorySegments[parts]);
}

// merge
RawKeyValueIterator kvIter =
Merger.merge(job, rfs, keyClass, valClass, codec,
segmentList, job.getInt("io.sort.factor", 100),
new Path(mapId.toString()), new RawComparator<K>() {
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
return LexicographicalComparerHolder.BEST_COMPARER
.compareTo(b1, s1, l1, b2, s2, l2);
}

@Override
public int compare(K o1, K o2) {
return LexicographicalComparerHolder.BEST_COMPARER
.compareTo(o1.getBytes(), 0, o1.getLength(),
o2.getBytes(), 0, o2.getLength());
}
}, reporter, null, task.spilledRecordsCounter);
RawKeyValueIterator kvIter = null;
if (sort) {
// merge
kvIter = Merger.merge(job, rfs, keyClass, valClass, codec,
segmentList, job.getInt("io.sort.factor", 100),
new Path(mapId.toString()), new RawComparator<K>() {
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
return LexicographicalComparerHolder.BEST_COMPARER
.compareTo(b1, s1, l1, b2, s2, l2);
}

@Override
public int compare(K o1, K o2) {
return LexicographicalComparerHolder.BEST_COMPARER
.compareTo(o1.getBytes(), 0, o1.getLength(),
o2.getBytes(), 0, o2.getLength());
}
}, reporter, null, task.spilledRecordsCounter);
} else {
kvIter = getNoSortRawKeyValueIterator(segmentList);
}

// write merged output to disk
long segmentStart = finalOut.getPos();
Expand Down
Loading