Skip to content

Commit

Permalink
Core: Add base implementations for changelog tasks (#5300)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Jul 27, 2022
1 parent d393f17 commit 2ea5b9e
Show file tree
Hide file tree
Showing 14 changed files with 710 additions and 138 deletions.
10 changes: 10 additions & 0 deletions api/src/main/java/org/apache/iceberg/AddedRowsScanTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,14 @@ public interface AddedRowsScanTask extends ChangelogScanTask, ContentScanTask<Da
default ChangelogOperation operation() {
return ChangelogOperation.INSERT;
}

@Override
default long sizeBytes() {
return length() + deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum();
}

@Override
default int filesCount() {
return 1 + deletes().size();
}
}
10 changes: 10 additions & 0 deletions api/src/main/java/org/apache/iceberg/DeletedDataFileScanTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,14 @@ public interface DeletedDataFileScanTask extends ChangelogScanTask, ContentScanT
default ChangelogOperation operation() {
return ChangelogOperation.DELETE;
}

@Override
default long sizeBytes() {
return length() + existingDeletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum();
}

@Override
default int filesCount() {
return 1 + existingDeletes().size();
}
}
12 changes: 12 additions & 0 deletions api/src/main/java/org/apache/iceberg/DeletedRowsScanTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,16 @@ public interface DeletedRowsScanTask extends ChangelogScanTask, ContentScanTask<
default ChangelogOperation operation() {
return ChangelogOperation.DELETE;
}

@Override
default long sizeBytes() {
return length() +
addedDeletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum() +
existingDeletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum();
}

@Override
default int filesCount() {
return 1 + addedDeletes().size() + existingDeletes().size();
}
}
71 changes: 71 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseAddedRowsScanTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import java.util.List;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;

class BaseAddedRowsScanTask
extends BaseChangelogContentScanTask<AddedRowsScanTask, DataFile>
implements AddedRowsScanTask {

private final DeleteFile[] deletes;

BaseAddedRowsScanTask(int changeOrdinal, long commitSnapshotId, DataFile file, DeleteFile[] deletes,
String schemaString, String specString, ResidualEvaluator residuals) {
super(changeOrdinal, commitSnapshotId, file, schemaString, specString, residuals);
this.deletes = deletes != null ? deletes : new DeleteFile[0];
}

@Override
protected AddedRowsScanTask self() {
return this;
}

@Override
protected AddedRowsScanTask newSplitTask(AddedRowsScanTask parentTask, long offset, long length) {
return new SplitAddedRowsScanTask(parentTask, offset, length);
}

@Override
public List<DeleteFile> deletes() {
return ImmutableList.copyOf(deletes);
}

private static class SplitAddedRowsScanTask
extends SplitScanTask<SplitAddedRowsScanTask, AddedRowsScanTask, DataFile>
implements AddedRowsScanTask {

SplitAddedRowsScanTask(AddedRowsScanTask parentTask, long offset, long length) {
super(parentTask, offset, length);
}

@Override
protected SplitAddedRowsScanTask copyWithNewLength(long newLength) {
return new SplitAddedRowsScanTask(parentTask(), start(), newLength);
}

@Override
public List<DeleteFile> deletes() {
return parentTask().deletes();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

abstract class BaseChangelogContentScanTask
<ThisT extends ContentScanTask<F> & ChangelogScanTask, F extends ContentFile<F>>
extends BaseContentScanTask<ThisT, F>
implements ChangelogScanTask {

private final int changeOrdinal;
private final long commitSnapshotId;

BaseChangelogContentScanTask(int changeOrdinal, long commitSnapshotId, F file,
String schemaString, String specString, ResidualEvaluator residuals) {
super(file, schemaString, specString, residuals);
this.changeOrdinal = changeOrdinal;
this.commitSnapshotId = commitSnapshotId;
}

@Override
public int changeOrdinal() {
return changeOrdinal;
}

@Override
public long commitSnapshotId() {
return commitSnapshotId;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("change_ordinal", changeOrdinal)
.add("commit_snapshot_id", commitSnapshotId)
.add("file", file().path())
.add("partition_data", file().partition())
.add("residual", residual())
.toString();
}

abstract static class SplitScanTask
<ThisT, ParentT extends ContentScanTask<F> & ChangelogScanTask, F extends ContentFile<F>>
implements ContentScanTask<F>, ChangelogScanTask, MergeableScanTask<ThisT> {

private final ParentT parentTask;
private final long offset;
private final long length;

protected SplitScanTask(ParentT parentTask, long offset, long length) {
this.parentTask = parentTask;
this.offset = offset;
this.length = length;
}

protected abstract ThisT copyWithNewLength(long newLength);

protected ParentT parentTask() {
return parentTask;
}

@Override
public int changeOrdinal() {
return parentTask.changeOrdinal();
}

@Override
public long commitSnapshotId() {
return parentTask.commitSnapshotId();
}

@Override
public F file() {
return parentTask.file();
}

@Override
public PartitionSpec spec() {
return parentTask.spec();
}

@Override
public long start() {
return offset;
}

@Override
public long length() {
return length;
}

@Override
public Expression residual() {
return parentTask.residual();
}

@Override
public boolean canMerge(ScanTask other) {
if (getClass().equals(other.getClass())) {
SplitScanTask<?, ?, ?> that = (SplitScanTask<?, ?, ?>) other;
return changeOrdinal() == that.changeOrdinal() &&
commitSnapshotId() == that.commitSnapshotId() &&
file().equals(that.file()) &&
start() + length() == that.start();

} else {
return false;
}
}

@Override
public ThisT merge(ScanTask other) {
SplitScanTask<?, ?, ?> that = (SplitScanTask<?, ?, ?>) other;
return copyWithNewLength(length() + that.length());
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("change_ordinal", changeOrdinal())
.add("commit_snapshot_id", commitSnapshotId())
.add("file", file().path())
.add("partition_data", file().partition())
.add("offset", offset)
.add("length", length)
.add("residual", residual())
.toString();
}
}
}
104 changes: 104 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseContentScanTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Ordering;

abstract class BaseContentScanTask<ThisT extends ContentScanTask<F>, F extends ContentFile<F>>
implements ContentScanTask<F>, SplittableScanTask<ThisT> {

private static final Ordering<Comparable<Long>> OFFSET_ORDERING = Ordering.natural();

private final F file;
private final String schemaString;
private final String specString;
private final ResidualEvaluator residuals;

private transient volatile PartitionSpec spec = null;

BaseContentScanTask(F file, String schemaString, String specString, ResidualEvaluator residuals) {
this.file = file;
this.schemaString = schemaString;
this.specString = specString;
this.residuals = residuals;
}

protected abstract ThisT self();

protected abstract ThisT newSplitTask(ThisT parentTask, long offset, long length);

@Override
public F file() {
return file;
}

@Override
public PartitionSpec spec() {
if (spec == null) {
synchronized (this) {
if (spec == null) {
this.spec = PartitionSpecParser.fromJson(SchemaParser.fromJson(schemaString), specString);
}
}
}
return spec;
}

@Override
public long start() {
return 0;
}

@Override
public long length() {
return file.fileSizeInBytes();
}

@Override
public Expression residual() {
return residuals.residualFor(file.partition());
}

@Override
public Iterable<ThisT> split(long targetSplitSize) {
if (file.format().isSplittable()) {
if (file.splitOffsets() != null && OFFSET_ORDERING.isOrdered(file.splitOffsets())) {
return () -> new OffsetsAwareSplitScanTaskIterator<>(self(), length(), file.splitOffsets(), this::newSplitTask);
} else {
return () -> new FixedSizeSplitScanTaskIterator<>(self(), length(), targetSplitSize, this::newSplitTask);
}
}

return ImmutableList.of(self());
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("file", file().path())
.add("partition_data", file().partition())
.add("residual", residual())
.toString();
}
}
Loading

0 comments on commit 2ea5b9e

Please sign in to comment.