Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
huaxingao committed Jan 3, 2025
1 parent 87eda58 commit b8ff9f5
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 63 deletions.
78 changes: 24 additions & 54 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -96,7 +95,6 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.PropertyUtil;
Expand All @@ -117,12 +115,8 @@
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Parquet {
private static final Logger LOG = LoggerFactory.getLogger(Parquet.class);

private Parquet() {}

private static final Collection<String> READ_PROPERTIES_TO_REMOVE =
Expand Down Expand Up @@ -272,47 +266,6 @@ private WriteBuilder createContextFunc(
return this;
}

private <T> void setBloomFilterConfig(
Context context,
MessageType parquetSchema,
BiConsumer<String, Boolean> withBloomFilterEnabled,
BiConsumer<String, Double> withBloomFilterFPP) {

Map<Integer, String> fieldIdToParquetPath =
parquetSchema.getColumns().stream()
.collect(
Collectors.toMap(
col -> col.getPrimitiveType().getId().intValue(),
col -> String.join(".", col.getPath())));

context
.columnBloomFilterEnabled()
.forEach(
(colPath, isEnabled) -> {
Types.NestedField field = schema.findField(colPath);
if (field == null) {
LOG.warn("Skipping bloom filter config for missing field: {}", colPath);
return;
}

int fieldId = field.fieldId();
String parquetColumnPath = fieldIdToParquetPath.get(fieldId);
if (parquetColumnPath == null) {
LOG.warn(
"Skipping bloom filter config for field: {} due to missing parquetColumnPath for fieldId: {}",
colPath,
fieldId);
return;
}

withBloomFilterEnabled.accept(parquetColumnPath, Boolean.valueOf(isEnabled));
String fpp = context.columnBloomFilterFpp().get(colPath);
if (fpp != null) {
withBloomFilterFPP.accept(parquetColumnPath, Double.parseDouble(fpp));
}
});
}

public <D> FileAppender<D> build() throws IOException {
Preconditions.checkNotNull(schema, "Schema is required");
Preconditions.checkNotNull(name, "Table name is required and cannot be null");
Expand All @@ -332,6 +285,8 @@ public <D> FileAppender<D> build() throws IOException {
int rowGroupCheckMinRecordCount = context.rowGroupCheckMinRecordCount();
int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount();
int bloomFilterMaxBytes = context.bloomFilterMaxBytes();
Map<String, String> columnBloomFilterFpp = context.columnBloomFilterFpp();
Map<String, String> columnBloomFilterEnabled = context.columnBloomFilterEnabled();
boolean dictionaryEnabled = context.dictionaryEnabled();

if (compressionLevel != null) {
Expand Down Expand Up @@ -388,8 +343,17 @@ public <D> FileAppender<D> build() throws IOException {
.withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount)
.withMaxBloomFilterBytes(bloomFilterMaxBytes);

setBloomFilterConfig(
context, type, propsBuilder::withBloomFilterEnabled, propsBuilder::withBloomFilterFPP);
for (Map.Entry<String, String> entry : columnBloomFilterEnabled.entrySet()) {
String colPath = AvroSchemaUtil.makeCompatibleName(entry.getKey());
String bloomEnabled = entry.getValue();
propsBuilder.withBloomFilterEnabled(colPath, Boolean.parseBoolean(bloomEnabled));
}

for (Map.Entry<String, String> entry : columnBloomFilterFpp.entrySet()) {
String colPath = AvroSchemaUtil.makeCompatibleName(entry.getKey());
String fpp = entry.getValue();
propsBuilder.withBloomFilterFPP(colPath, Double.parseDouble(fpp));
}

ParquetProperties parquetProperties = propsBuilder.build();

Expand Down Expand Up @@ -422,11 +386,17 @@ public <D> FileAppender<D> build() throws IOException {
.withDictionaryPageSize(dictionaryPageSize)
.withEncryption(fileEncryptionProperties);

setBloomFilterConfig(
context,
type,
parquetWriteBuilder::withBloomFilterEnabled,
parquetWriteBuilder::withBloomFilterFPP);
for (Map.Entry<String, String> entry : columnBloomFilterEnabled.entrySet()) {
String colPath = AvroSchemaUtil.makeCompatibleName(entry.getKey());
String bloomEnabled = entry.getValue();
parquetWriteBuilder.withBloomFilterEnabled(colPath, Boolean.parseBoolean(bloomEnabled));
}

for (Map.Entry<String, String> entry : columnBloomFilterFpp.entrySet()) {
String colPath = AvroSchemaUtil.makeCompatibleName(entry.getKey());
String fpp = entry.getValue();
parquetWriteBuilder.withBloomFilterFPP(colPath, Double.parseDouble(fpp));
}

return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), metricsConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,10 @@ public class TestBloomRowGroupFilter {
optional(22, "timestamp", Types.TimestampType.withoutZone()),
optional(23, "timestamptz", Types.TimestampType.withZone()),
optional(24, "binary", Types.BinaryType.get()),
optional(25, "int-decimal", Types.DecimalType.of(8, 2)),
optional(25, "int_decimal", Types.DecimalType.of(8, 2)),
optional(26, "long_decimal", Types.DecimalType.of(14, 2)),
optional(27, "fixed_decimal", Types.DecimalType.of(31, 2)));
optional(27, "fixed_decimal", Types.DecimalType.of(31, 2)),
optional(28, "incompatible-name", StringType.get()));

private static final Types.StructType UNDERSCORE_STRUCT_FIELD_TYPE =
Types.StructType.of(Types.NestedField.required(16, "_int_field", IntegerType.get()));
Expand Down Expand Up @@ -140,9 +141,10 @@ public class TestBloomRowGroupFilter {
optional(22, "_timestamp", Types.TimestampType.withoutZone()),
optional(23, "_timestamptz", Types.TimestampType.withZone()),
optional(24, "_binary", Types.BinaryType.get()),
optional(25, "_int-decimal", Types.DecimalType.of(8, 2)),
optional(25, "_int_decimal", Types.DecimalType.of(8, 2)),
optional(26, "_long_decimal", Types.DecimalType.of(14, 2)),
optional(27, "_fixed_decimal", Types.DecimalType.of(31, 2)));
optional(27, "_fixed_decimal", Types.DecimalType.of(31, 2)),
optional(28, "_incompatible-name", StringType.get()));

private static final String TOO_LONG_FOR_STATS;

Expand Down Expand Up @@ -193,7 +195,7 @@ public void createInputFile() throws IOException {

// build struct field schema
org.apache.avro.Schema structSchema = AvroSchemaUtil.convert(UNDERSCORE_STRUCT_FIELD_TYPE);
String compatibleFieldName = AvroSchemaUtil.makeCompatibleName("_int-decimal");
String compatibleFieldName = AvroSchemaUtil.makeCompatibleName("_incompatible-name");

OutputFile outFile = Files.localOutput(temp);
try (FileAppender<Record> appender =
Expand Down Expand Up @@ -222,9 +224,10 @@ public void createInputFile() throws IOException {
.set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_timestamp", "true")
.set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_timestamptz", "true")
.set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_binary", "true")
.set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_int-decimal", "true")
.set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_int_decimal", "true")
.set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_long_decimal", "true")
.set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_fixed_decimal", "true")
.set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_incompatible-name", "true")
.build()) {
GenericRecordBuilder builder = new GenericRecordBuilder(convert(FILE_SCHEMA, "table"));
// create 50 records
Expand Down Expand Up @@ -257,9 +260,10 @@ public void createInputFile() throws IOException {
builder.set("_timestamp", INSTANT.plusSeconds(i * 86400).toEpochMilli());
builder.set("_timestamptz", INSTANT.plusSeconds(i * 86400).toEpochMilli());
builder.set("_binary", RANDOM_BYTES.get(i));
builder.set(compatibleFieldName, new BigDecimal(String.valueOf(77.77 + i)));
builder.set("_int_decimal", new BigDecimal(String.valueOf(77.77 + i)));
builder.set("_long_decimal", new BigDecimal(String.valueOf(88.88 + i)));
builder.set("_fixed_decimal", new BigDecimal(String.valueOf(99.99 + i)));
builder.set(compatibleFieldName, "test" + i);

appender.add(builder.build());
}
Expand Down Expand Up @@ -688,13 +692,13 @@ public void testIntDecimalEq() {
for (int i = 0; i < INT_VALUE_COUNT; i++) {
boolean shouldRead =
new ParquetBloomRowGroupFilter(
SCHEMA, equal("int-decimal", new BigDecimal(String.valueOf(77.77 + i))))
SCHEMA, equal("int_decimal", new BigDecimal(String.valueOf(77.77 + i))))
.shouldRead(parquetSchema, rowGroupMetadata, bloomStore);
assertThat(shouldRead).as("Should read: decimal within range").isTrue();
}

boolean shouldRead =
new ParquetBloomRowGroupFilter(SCHEMA, equal("int-decimal", new BigDecimal("1234.56")))
new ParquetBloomRowGroupFilter(SCHEMA, equal("int_decimal", new BigDecimal("1234.56")))
.shouldRead(parquetSchema, rowGroupMetadata, bloomStore);
assertThat(shouldRead).as("Should not read: decimal outside range").isFalse();
}
Expand Down Expand Up @@ -1190,4 +1194,19 @@ public void testTransformFilter() {
.as("Should read: filter contains non-reference evaluate as True")
.isTrue();
}

@Test
public void testIncompatibleColumnName() {
for (int i = 0; i < INT_VALUE_COUNT; i++) {
boolean shouldRead =
new ParquetBloomRowGroupFilter(SCHEMA, equal("incompatible-name", "test" + i))
.shouldRead(parquetSchema, rowGroupMetadata, bloomStore);
assertThat(shouldRead).as("Should read: String within range").isTrue();
}

boolean shouldRead =
new ParquetBloomRowGroupFilter(SCHEMA, equal("incompatible-name", "test100"))
.shouldRead(parquetSchema, rowGroupMetadata, bloomStore);
assertThat(shouldRead).as("Should not read: String outside range").isFalse();
}
}

0 comments on commit b8ff9f5

Please sign in to comment.