Skip to content

Commit

Permalink
Steven's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Dec 4, 2024
1 parent db13789 commit 05bcf7e
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@
/**
* Creates the data file rewriter data stream. Which runs a single iteration of the task for every
* {@link Trigger} event.
*
* <p>The input is a {@link DataStream} with {@link Trigger} events and every event should be
* immediately followed by a {@link org.apache.flink.streaming.api.watermark.Watermark} with the
* same timestamp as the event.
*
* <p>The output is a {@link DataStream} with the {@link TaskResult} of the run followed by the
* {@link org.apache.flink.streaming.api.watermark.Watermark}.
*/
public class RewriteDataFiles {
static final String PLANNER_TASK_NAME = "RDF Planner";
Expand Down Expand Up @@ -175,6 +168,14 @@ public Builder maxFileGroupSizeBytes(long maxFileGroupSizeBytes) {
return this;
}

/**
* The input is a {@link DataStream} with {@link Trigger} events and every event should be
* immediately followed by a {@link org.apache.flink.streaming.api.watermark.Watermark} with the
* same timestamp as the event.
*
* <p>The output is a {@link DataStream} with the {@link TaskResult} of the run followed by the
* {@link org.apache.flink.streaming.api.watermark.Watermark}.
*/
@Override
DataStream<TaskResult> append(DataStream<Trigger> trigger) {
SingleOutputStreamOperator<DataFileRewritePlanner.PlannedGroup> planned =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.slf4j.LoggerFactory;

/**
* Commits the compaction changes using {@link RewriteDataFilesCommitManager}. The input is a {@link
* Commits the rewrite changes using {@link RewriteDataFilesCommitManager}. The input is a {@link
* DataFileRewriteExecutor.ExecutedGroup}. Only {@link Watermark} is emitted which is chained to
* {@link TaskResultAggregator} input 1.
*/
Expand All @@ -68,6 +68,7 @@ public class DataFileRewriteCommitter extends AbstractStreamOperator<Trigger>
private transient ListState<Long> startingSnapshotIdState;
private transient ListState<RewriteFileGroup> inProgressState;
private transient CommitService commitService;
private transient int processed;
private transient Counter errorCounter;
private transient Counter addedDataFileNumCounter;
private transient Counter addedDataFileSizeCounter;
Expand All @@ -87,7 +88,12 @@ public DataFileRewriteCommitter(
}

@Override
public void open() throws Exception {
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);

tableLoader.open();
this.table = tableLoader.loadTable();

MetricGroup taskMetricGroup =
TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, taskName, taskIndex);
this.errorCounter = taskMetricGroup.counter(TableMaintenanceMetrics.ERROR_COUNTER);
Expand All @@ -100,13 +106,6 @@ public void open() throws Exception {
this.removedDataFileSizeCounter =
taskMetricGroup.counter(TableMaintenanceMetrics.REMOVED_DATA_FILE_SIZE_METRIC);

tableLoader.open();
this.table = tableLoader.loadTable();
}

@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
this.startingSnapshotIdState =
context
.getOperatorStateStore()
Expand Down Expand Up @@ -136,6 +135,10 @@ public void initializeState(StateInitializationContext context) throws Exception
inProgress.add(group);
}
}

commitInProgress(System.currentTimeMillis());

this.processed = 0;
}

@Override
Expand Down Expand Up @@ -164,6 +167,7 @@ public void processElement(StreamRecord<DataFileRewriteExecutor.ExecutedGroup> s

commitService.offer(executedGroup.group());
inProgress.add(executedGroup.group());
++processed;
} catch (Exception e) {
LOG.info(
"Exception processing {} for table {} with {}[{}] at {}",
Expand All @@ -181,12 +185,19 @@ public void processElement(StreamRecord<DataFileRewriteExecutor.ExecutedGroup> s
@Override
public void processWatermark(Watermark mark) throws Exception {
try {
if (commitService == null && !inProgress.isEmpty()) {
this.commitService = createCommitService(null, mark.getTimestamp());
}

if (commitService != null) {
commitService.close();
if (processed != commitService.results().size()) {
throw new RuntimeException(
String.format(
"From %d commits only %d were unsuccessful for table %s with %s[%d] at %d",
processed,
commitService.results().size(),
tableName,
taskName,
taskIndex,
mark.getTimestamp()));
}
}

table.refresh();
Expand All @@ -210,8 +221,9 @@ public void processWatermark(Watermark mark) throws Exception {
}

// Cleanup
commitService = null;
startingSnapshotId = null;
this.commitService = null;
this.startingSnapshotId = null;
this.processed = 0;
inProgress.clear();

super.processWatermark(mark);
Expand All @@ -226,21 +238,24 @@ public void close() throws IOException {

private CommitService createCommitService(
DataFileRewriteExecutor.ExecutedGroup element, long timestamp) {
table.refresh();
CommitService service;
RewriteDataFilesCommitManager manager;
if (element == null) {
manager = new FlinkRewriteDataFilesCommitManager(table, startingSnapshotId, timestamp);
service = manager.service(Integer.MAX_VALUE);
} else {
manager = new FlinkRewriteDataFilesCommitManager(table, element.snapshotId(), timestamp);
service = manager.service(element.groupsPerCommit());
}

FlinkRewriteDataFilesCommitManager commitManager =
new FlinkRewriteDataFilesCommitManager(table, element.snapshotId(), timestamp);
CommitService service = commitManager.service(element.groupsPerCommit());
service.start();

return service;
}

private void commitInProgress(long timestamp) {
if (!inProgress.isEmpty()) {
try {
manager.commitFileGroups(inProgress);
FlinkRewriteDataFilesCommitManager manager =
new FlinkRewriteDataFilesCommitManager(table, startingSnapshotId, timestamp);
CommitService service = manager.service(Integer.MAX_VALUE);
service.start();
manager.commitOrClean(inProgress);
service.close();
inProgress.clear();
} catch (Exception e) {
LOG.info(
"Failed committing pending groups {} for table {} with {}[{}], so skipping.",
Expand All @@ -251,7 +266,6 @@ private CommitService createCommitService(
e);
}
}
return service;
}

private class FlinkRewriteDataFilesCommitManager extends RewriteDataFilesCommitManager {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public void processElement(Trigger value, Context ctx, Collector<PlannedGroup> o
table, Expressions.alwaysTrue(), table.currentSnapshot().snapshotId(), false);

long rewriteBytes = 0;
int totalGroupCount = 0;
List<RewriteFileGroup> groups = plan.fileGroups().collect(Collectors.toList());
ListIterator<RewriteFileGroup> iter = groups.listIterator();
while (iter.hasNext()) {
Expand All @@ -150,12 +151,12 @@ public void processElement(Trigger value, Context ctx, Collector<PlannedGroup> o
iter.remove();
} else {
rewriteBytes += group.sizeInBytes();
++totalGroupCount;
}
}

int groupsPerCommit =
IntMath.divide(
plan.context().totalGroupCount(), partialProgressMaxCommits, RoundingMode.CEILING);
IntMath.divide(totalGroupCount, partialProgressMaxCommits, RoundingMode.CEILING);

LOG.info(
"Rewrite plan created {} for table {} with {}[{}] at {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,33 +220,53 @@ void testStateRestore(boolean withException) throws Exception {

@Test
void testError() throws Exception {
Table table = createTable();
insert(table, 1, "a");
insert(table, 2, "b");
insert(table, 3, "c");
Table table = createPartitionedTable();
insertPartitioned(table, 1, "p1");
insertPartitioned(table, 2, "p1");
insertPartitioned(table, 3, "p2");
insertPartitioned(table, 4, "p2");
insertPartitioned(table, 5, "p3");
insertPartitioned(table, 6, "p3");
insertPartitioned(table, 7, "p4");
insertPartitioned(table, 8, "p4");

List<DataFileRewritePlanner.PlannedGroup> planned = planDataFileRewrite(tableLoader());
assertThat(planned).hasSize(1);
assertThat(planned).hasSize(4);
List<DataFileRewriteExecutor.ExecutedGroup> rewritten = executeRewrite(planned);
assertThat(rewritten).hasSize(1);
assertThat(rewritten).hasSize(4);

OperatorSubtaskState state = null;
try (OneInputStreamOperatorTestHarness<DataFileRewriteExecutor.ExecutedGroup, Trigger>
testHarness = harness()) {
testHarness.open();

testHarness.processElement(updateBatchSize(rewritten.get(0)), EVENT_TIME);
assertNoChange(table);

state = testHarness.snapshot(1, System.currentTimeMillis());
} catch (Exception e) {
// do nothing
}

try (OneInputStreamOperatorTestHarness<DataFileRewriteExecutor.ExecutedGroup, Trigger>
testHarness = harness()) {
testHarness.initializeState(state);
testHarness.open();

// Cause an exception
dropTable();

assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
testHarness.processElement(rewritten.get(0), EVENT_TIME);
testHarness.processElement(rewritten.get(1), EVENT_TIME);
testHarness.processWatermark(EVENT_TIME);
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).hasSize(1);
assertThat(
testHarness
.getSideOutput(TaskResultAggregator.ERROR_STREAM)
.poll()
.getValue()
.getMessage())
.contains("Metadata file for version");
.contains("From 1 commits only 0 were unsuccessful for table");
}
}

Expand Down

0 comments on commit 05bcf7e

Please sign in to comment.