Skip to content

Commit

Permalink
Steven's 2nd round of comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Dec 4, 2024
1 parent 05bcf7e commit 997304c
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.util.Iterator;
import java.util.Locale;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
Expand All @@ -35,7 +36,6 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
import org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService;
Expand Down Expand Up @@ -128,7 +128,8 @@ public void initializeState(StateInitializationContext context) throws Exception
}
}

this.inProgress = Sets.newHashSet();
// Has to be concurrent since it is accessed by the CommitService from another thread
this.inProgress = Sets.newConcurrentHashSet();
Iterable<RewriteFileGroup> inProgressIterable = inProgressState.get();
if (inProgressIterable != null) {
for (RewriteFileGroup group : inProgressIterable) {
Expand Down Expand Up @@ -161,21 +162,22 @@ public void processElement(StreamRecord<DataFileRewriteExecutor.ExecutedGroup> s
DataFileRewriteExecutor.ExecutedGroup executedGroup = streamRecord.getValue();
try {
if (commitService == null) {
this.commitService = createCommitService(executedGroup, streamRecord.getTimestamp());
this.startingSnapshotId = executedGroup.snapshotId();
this.commitService =
createCommitService(streamRecord.getTimestamp(), executedGroup.groupsPerCommit());
}

commitService.offer(executedGroup.group());
inProgress.add(executedGroup.group());
++processed;
} catch (Exception e) {
LOG.info(
"Exception processing {} for table {} with {}[{}] at {}",
executedGroup,
LogUtil.MESSAGE_PREFIX + "Exception processing {}",
tableName,
taskName,
taskIndex,
streamRecord.getTimestamp(),
executedGroup,
e);
output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
errorCounter.inc();
Expand All @@ -190,27 +192,28 @@ public void processWatermark(Watermark mark) throws Exception {
if (processed != commitService.results().size()) {
throw new RuntimeException(
String.format(
"From %d commits only %d were unsuccessful for table %s with %s[%d] at %d",
processed,
commitService.results().size(),
Locale.ROOT,
LogUtil.MESSAGE_FORMAT_PREFIX + "From %d commits only %d were unsuccessful",
tableName,
taskName,
taskIndex,
mark.getTimestamp()));
mark.getTimestamp(),
processed,
commitService.results().size()));
}
}

table.refresh();
LOG.info(
"Successfully completed data file compaction to {} for table {} with {}[{}] at {}",
table.currentSnapshot().snapshotId(),
LogUtil.MESSAGE_PREFIX + "Successfully completed data file compaction to {}",
tableName,
taskName,
taskIndex,
mark.getTimestamp());
mark.getTimestamp(),
table.currentSnapshot().snapshotId());
} catch (Exception e) {
LOG.info(
"Exception closing commit service for table {} with {}[{}] at {}",
LogUtil.MESSAGE_PREFIX + "Exception closing commit service",
tableName,
taskName,
taskIndex,
Expand All @@ -236,34 +239,44 @@ public void close() throws IOException {
}
}

private CommitService createCommitService(
DataFileRewriteExecutor.ExecutedGroup element, long timestamp) {
private CommitService createCommitService(long timestamp, int groupsPerCommit) {
FlinkRewriteDataFilesCommitManager commitManager =
new FlinkRewriteDataFilesCommitManager(table, element.snapshotId(), timestamp);
CommitService service = commitManager.service(element.groupsPerCommit());
new FlinkRewriteDataFilesCommitManager(table, startingSnapshotId, timestamp);
CommitService service = commitManager.service(groupsPerCommit);
service.start();

return service;
}

private void commitInProgress(long timestamp) {
if (!inProgress.isEmpty()) {
CommitService service = null;
try {
FlinkRewriteDataFilesCommitManager manager =
new FlinkRewriteDataFilesCommitManager(table, startingSnapshotId, timestamp);
CommitService service = manager.service(Integer.MAX_VALUE);
service.start();
manager.commitOrClean(inProgress);
service.close();
inProgress.clear();
service = createCommitService(timestamp, inProgress.size());
inProgress.forEach(service::offer);
} catch (Exception e) {
LOG.info(
"Failed committing pending groups {} for table {} with {}[{}], so skipping.",
inProgress,
LogUtil.MESSAGE_PREFIX + "Failed committing pending groups {}",
tableName,
taskName,
taskIndex,
timestamp,
inProgress,
e);
} finally {
inProgress.clear();
if (service != null) {
try {
service.close();
} catch (Exception e) {
LOG.warn(
LogUtil.MESSAGE_PREFIX + "Failed close pending groups committer",
tableName,
taskName,
taskIndex,
timestamp,
e);
}
}
}
}
}
Expand All @@ -280,21 +293,14 @@ private class FlinkRewriteDataFilesCommitManager extends RewriteDataFilesCommitM
public void commitFileGroups(Set<RewriteFileGroup> fileGroups) {
super.commitFileGroups(fileGroups);
LOG.debug(
"Committed {} for table {} with {}[{}] at {}",
fileGroups,
LogUtil.MESSAGE_PREFIX + "Committed {}",
tableName,
taskName,
taskIndex,
timestamp);
timestamp,
fileGroups);
updateMetrics(fileGroups);
inProgress.removeAll(fileGroups);
LOG.debug(
"Remaining {} for table {} with {}[{}] at {}",
inProgress,
tableName,
taskName,
taskIndex,
timestamp);
}

private void updateMetrics(Set<RewriteFileGroup> fileGroups) {
Expand All @@ -305,10 +311,6 @@ private void updateMetrics(Set<RewriteFileGroup> fileGroups) {
}

for (DataFile rewritten : fileGroup.rewrittenFiles()) {
Preconditions.checkArgument(
FileContent.DATA.equals(rewritten.content()),
"%s is not supported for metrics collection",
rewritten);
removedDataFileNumCounter.inc();
removedDataFileSizeCounter.inc(rewritten.fileSizeInBytes());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,21 +89,21 @@ public void processElement(PlannedGroup value, Context ctx, Collector<ExecutedGr
throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Rewriting files {} from {} for table {} with {}[{}] at {}",
value.group().info(),
value.group().rewrittenFiles(),
LogUtil.MESSAGE_PREFIX + "Rewriting files {} from {}",
tableName,
taskName,
taskIndex,
ctx.timestamp());
ctx.timestamp(),
value.group().info(),
value.group().rewrittenFiles());
} else {
LOG.info(
"Rewriting {} files for table {} with {}[{}] at {}",
value.group().rewrittenFiles().size(),
LogUtil.MESSAGE_PREFIX + "Rewriting {} files",
tableName,
taskName,
taskIndex,
ctx.timestamp());
ctx.timestamp(),
value.group().rewrittenFiles().size());
}

try (TaskWriter<RowData> writer = writerFor(value)) {
Expand All @@ -121,45 +121,45 @@ public void processElement(PlannedGroup value, Context ctx, Collector<ExecutedGr
value.group()));
if (LOG.isDebugEnabled()) {
LOG.debug(
"Rewritten files {} from {} to {} for table {} with {}[{}] at {}",
value.group().info(),
value.group().rewrittenFiles(),
value.group().addedFiles(),
LogUtil.MESSAGE_PREFIX + "Rewritten files {} from {} to {}",
tableName,
taskName,
taskIndex,
ctx.timestamp());
ctx.timestamp(),
value.group().info(),
value.group().rewrittenFiles(),
value.group().addedFiles());
} else {
LOG.info(
"Rewritten {} files to {} files for table {} with {}[{}] at {}",
value.group().rewrittenFiles().size(),
value.group().addedFiles().size(),
LogUtil.MESSAGE_PREFIX + "Rewritten {} files to {} files",
tableName,
taskName,
taskIndex,
ctx.timestamp());
ctx.timestamp(),
value.group().rewrittenFiles().size(),
value.group().addedFiles().size());
}
} catch (Exception ex) {
LOG.info(
"Exception rewriting datafile group {} for table {} with {}[{}] at {}",
value.group(),
LogUtil.MESSAGE_PREFIX + "Exception rewriting datafile group {}",
tableName,
taskName,
taskIndex,
ctx.timestamp(),
value.group(),
ex);
ctx.output(TaskResultAggregator.ERROR_STREAM, ex);
errorCounter.inc();
abort(writer, ctx.timestamp());
}
} catch (Exception ex) {
LOG.info(
"Exception creating compaction writer for group {} for table {} with {}[{}] at {}",
value.group(),
LogUtil.MESSAGE_PREFIX + "Exception creating compaction writer for group {}",
tableName,
taskName,
taskIndex,
ctx.timestamp(),
value.group(),
ex);
ctx.output(TaskResultAggregator.ERROR_STREAM, ex);
errorCounter.inc();
Expand Down Expand Up @@ -203,25 +203,25 @@ private DataIterator<RowData> readerFor(PlannedGroup value) {
private void abort(TaskWriter<RowData> writer, long timestamp) {
try {
LOG.info(
"Aborting rewrite for (subTaskId {}, attemptId {}) for table {} with {}[{}] at {}",
subTaskId,
attemptId,
LogUtil.MESSAGE_PREFIX + "Aborting rewrite for (subTaskId {}, attemptId {})",
tableName,
taskName,
taskIndex,
timestamp);
timestamp,
subTaskId,
attemptId);
writer.abort();
LOG.info(
"Aborted rewrite for (subTaskId {}, attemptId {}) for table {} with {}[{}] at {}",
subTaskId,
attemptId,
LogUtil.MESSAGE_PREFIX + "Aborted rewrite for (subTaskId {}, attemptId {})",
tableName,
taskName,
taskIndex,
timestamp);
timestamp,
subTaskId,
attemptId);
} catch (Exception inner) {
LOG.info(
"Exception in abort for table {} with {}[{}] at {}",
LogUtil.MESSAGE_PREFIX + "Exception in abort",
tableName,
taskName,
taskIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public Set<DataFile> rewrite(List<FileScanTask> group) {
public void processElement(Trigger value, Context ctx, Collector<PlannedGroup> out)
throws Exception {
LOG.debug(
"Creating rewrite plan for table {} with {}[{}] at {}",
LogUtil.MESSAGE_PREFIX + "Creating rewrite plan",
tableName,
taskName,
taskIndex,
Expand All @@ -121,7 +121,7 @@ public void processElement(Trigger value, Context ctx, Collector<PlannedGroup> o
(SerializableTable) SerializableTable.copyOf(tableLoader.loadTable());
if (table.currentSnapshot() == null) {
LOG.info(
"Nothing to plan for in an empty table {} with {}[{}] at {}",
LogUtil.MESSAGE_PREFIX + "Nothing to plan for in an empty table",
tableName,
taskName,
taskIndex,
Expand All @@ -142,12 +142,12 @@ public void processElement(Trigger value, Context ctx, Collector<PlannedGroup> o
if (rewriteBytes + group.sizeInBytes() > maxRewriteBytes) {
// Keep going, maybe some other group might fit in
LOG.info(
"Skipping group {} as max rewrite size reached for table {} with {}[{}] at {}",
group,
LogUtil.MESSAGE_PREFIX + "Skipping group {} as max rewrite size reached",
tableName,
taskName,
taskIndex,
ctx.timestamp());
ctx.timestamp(),
group);
iter.remove();
} else {
rewriteBytes += group.sizeInBytes();
Expand All @@ -159,28 +159,28 @@ public void processElement(Trigger value, Context ctx, Collector<PlannedGroup> o
IntMath.divide(totalGroupCount, partialProgressMaxCommits, RoundingMode.CEILING);

LOG.info(
"Rewrite plan created {} for table {} with {}[{}] at {}",
groups,
LogUtil.MESSAGE_PREFIX + "Rewrite plan created {}",
tableName,
taskName,
taskIndex,
ctx.timestamp());
ctx.timestamp(),
groups);

for (RewriteFileGroup group : groups) {
LOG.debug(
"Emitting {} with for table {} with {}[{}] at {}",
group,
LogUtil.MESSAGE_PREFIX + "Emitting {}",
tableName,
taskName,
taskIndex,
ctx.timestamp());
ctx.timestamp(),
group);
out.collect(
new PlannedGroup(
table, groupsPerCommit, rewriter.splitSize(group.sizeInBytes()), group));
}
} catch (Exception e) {
LOG.info(
"Exception planning data file rewrite groups for table {} with {}[{}] at {}",
LogUtil.MESSAGE_PREFIX + "Exception planning data file rewrite groups",
tableName,
taskName,
taskIndex,
Expand Down
Loading

0 comments on commit 997304c

Please sign in to comment.