Skip to content

Commit

Permalink
debug
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Nov 12, 2024
1 parent 805ac34 commit 02c82d2
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,18 @@ public long getPeakMemoryUsedBytes() {
private long freeMemory() {
updatePeakMemoryUsed();
long memoryFreed = 0;
activeSpillSorter.checkArray("0");

if (isAsync) {
System.out.println("isAsync");
for (SpillSorter sorter : spillingSorters) {
memoryFreed += sorter.freeMemory();
sorter.freeArray();
}
}
activeSpillSorter.checkArray("1");
memoryFreed += activeSpillSorter.freeMemory();
activeSpillSorter.checkArray("2");
activeSpillSorter.freeArray();

return memoryFreed;
Expand Down Expand Up @@ -370,6 +375,10 @@ public void insertRecord(Object recordBase, long recordOffset, int length, int p
activeSpillSorter.insertRecord(recordBase, recordOffset, length, partitionId);
}

public void checkArray(String msg) {
activeSpillSorter.checkArray(msg);
}

/**
* Close the sorter, causing any buffered data to be sorted and written out to disk.
*
Expand All @@ -378,6 +387,7 @@ public void insertRecord(Object recordBase, long recordOffset, int length, int p
*/
public SpillInfo[] closeAndGetSpills() throws IOException {
if (activeSpillSorter != null) {
activeSpillSorter.checkArray("closeAndGetSpills");
// Do not count the final file towards the spill count.
final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
blockManager.diskBlockManager().createTempShuffleBlock();
Expand Down Expand Up @@ -441,6 +451,8 @@ class SpillSorter extends SpillWriter {
sorterArray = allocator.allocateArray(initialSize);
this.inMemSorter.expandPointerArray(sorterArray);

System.out.println("sorterArray: " + sorterArray.memoryBlock().pageNumber);

this.allocatedPages = new LinkedList<>();

this.nativeLib = new Native();
Expand Down Expand Up @@ -477,11 +489,19 @@ protected void spill(int required) throws IOException {
CometShuffleExternalSorter.this.spill();
}

public void checkArray(String msg) {
System.out.println("checkArray " + msg + ": " + sorterArray.memoryBlock().pageNumber);
}

/** Free the pointer array held by this sorter. */
public void freeArray() {
synchronized (this) {
inMemSorter.free();
freed = true;
if (!freed) {
System.out.println("freeArray: " + sorterArray.memoryBlock().pageNumber);

// inMemSorter.free();
freed = true;
}
}
}

Expand All @@ -495,6 +515,8 @@ public void reset() {
inMemSorter.reset();
sorterArray = allocator.allocateArray(initialSize);
inMemSorter.expandPointerArray(sorterArray);

System.out.println("reset: " + sorterArray.memoryBlock().pageNumber);
}

void setSpillInfo(SpillInfo spillInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,14 @@ public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOEx
// generic throwables.
boolean success = false;
try {
sorter.checkArray("before write");

while (records.hasNext()) {
insertRecordIntoSorter(records.next());
}

sorter.checkArray("after write");

closeAndWriteOutput();
success = true;
} finally {
Expand Down Expand Up @@ -237,6 +242,8 @@ private void open() {
sparkConf,
writeMetrics,
schema);
sorter.checkArray("open");

serBuffer = new ExposedByteArrayOutputStream(DEFAULT_INITIAL_SER_BUFFER_SIZE);
serOutputStream = serializer.serializeStream(serBuffer);
}
Expand All @@ -247,6 +254,7 @@ void closeAndWriteOutput() throws IOException {
updatePeakMemoryUsed();
serBuffer = null;
serOutputStream = null;
sorter.checkArray("closeAndWriteOutput");
final SpillInfo[] spills = sorter.closeAndGetSpills();
try {
partitionLengths = mergeSpills(spills);
Expand Down

0 comments on commit 02c82d2

Please sign in to comment.