Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvement to have specify scaling factor for timestamp column used in partitioning by configuration #160

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,20 @@ public class PartitionerConfig extends AbstractConfig implements ComposableConfi
public static final String TIMESTAMP_FIELD_NAME_DEFAULT = "timestamp";
public static final String TIMESTAMP_FIELD_NAME_DISPLAY = "Record Field for Timestamp Extractor";

public static final String TIMESTAMP_SCALING_FACTOR_CONFIG = "timestamp.scaling.factor";
public static final String TIMESTAMP_SCALING_FACTOR_DOC =
"The scaling factor to be applied to the timestamp by the timestamp extractor.";
public static final long TIMESTAMP_SCALING_FACTOR_DEFAULT = 1L;
public static final String TIMESTAMP_SCALING_FACTOR_DISPLAY =
"Timestamp scaling factor for Timestamp Extractor";

public static final String TIMESTAMP_SCALING_OPERATION_CONFIG = "timestamp.scaling.operation";
public static final String TIMESTAMP_SCALING_OPERATION_DOC =
"The scaling operation to be applied to the timestamp by the timestamp extractor.";
public static final String TIMESTAMP_SCALING_OPERATION_DEFAULT = "Division";
public static final String TIMESTAMP_SCALING_OPERATION_DISPLAY =
"Timestamp scaling operation for Timestamp Extractor";

/**
* Create a new configuration definition.
*
Expand Down Expand Up @@ -208,6 +222,26 @@ public static ConfigDef newConfigDef(ConfigDef.Recommender partitionerClassRecom
++orderInGroup,
Width.LONG,
TIMESTAMP_FIELD_NAME_DISPLAY);

configDef.define(TIMESTAMP_SCALING_FACTOR_CONFIG,
Type.LONG,
TIMESTAMP_SCALING_FACTOR_DEFAULT,
Importance.MEDIUM,
TIMESTAMP_SCALING_FACTOR_DOC,
group,
++orderInGroup,
Width.LONG,
TIMESTAMP_SCALING_FACTOR_DISPLAY);

configDef.define(TIMESTAMP_SCALING_OPERATION_CONFIG,
Type.STRING,
TIMESTAMP_SCALING_OPERATION_DEFAULT,
Importance.MEDIUM,
TIMESTAMP_SCALING_OPERATION_DOC,
group,
++orderInGroup,
Width.LONG,
TIMESTAMP_SCALING_OPERATION_DISPLAY);
}

return configDef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,38 @@ public Long extract(ConnectRecord<?> record) {
public static class RecordFieldTimestampExtractor implements TimestampExtractor {
private String fieldName;
private DateTimeFormatter dateTime;
private long timestampScalingFactor;
private String timestampScalingOperation;

@Override
public void configure(Map<String, Object> config) {
fieldName = (String) config.get(PartitionerConfig.TIMESTAMP_FIELD_NAME_CONFIG);
dateTime = ISODateTimeFormat.dateTimeParser();
timestampScalingFactor = (long) config.get(PartitionerConfig.TIMESTAMP_SCALING_FACTOR_CONFIG);
timestampScalingOperation =
(String) config.get(PartitionerConfig.TIMESTAMP_SCALING_OPERATION_CONFIG);
}

public Long scalingTimestamp(
long timestamp, String timestampScalingOperation, long timestampScalingFactor) {
try {
switch (timestampScalingOperation) {
case "Division":
return timestamp / timestampScalingFactor;
case "Multiplication":
return timestamp * timestampScalingFactor;
default:
log.error(
"Timestamp scaling operation '{}' is not recognized (timestamp remains unscaled).",
timestampScalingOperation);
return timestamp;
}
} catch (Exception e) {
log.error(
"Timestamp scaling operation failed due to '{}' (timestamp remains unscaled).",
e);
return timestamp;
}
}

@Override
Expand All @@ -288,7 +315,8 @@ public Long extract(ConnectRecord<?> record) {
switch (fieldSchema.type()) {
case INT32:
case INT64:
return ((Number) timestampValue).longValue();
return scalingTimestamp(((Number) timestampValue).longValue(),
timestampScalingOperation, timestampScalingFactor);
case STRING:
return dateTime.parseMillis((String) timestampValue);
default:
Expand All @@ -304,7 +332,8 @@ public Long extract(ConnectRecord<?> record) {
Map<?, ?> map = (Map<?, ?>) value;
Object timestampValue = DataUtils.getNestedFieldValue(map, fieldName);
if (timestampValue instanceof Number) {
return ((Number) timestampValue).longValue();
return scalingTimestamp(((Number) timestampValue).longValue(),
timestampScalingOperation, timestampScalingFactor);
} else if (timestampValue instanceof String) {
return dateTime.parseMillis((String) timestampValue);
} else if (timestampValue instanceof Date) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.confluent.connect.storage.common.StorageCommonConfig;
import io.confluent.connect.storage.errors.PartitionException;
import io.confluent.connect.storage.util.DateTimeUtils;
import io.confluent.connect.storage.partitioner.TimeBasedPartitioner.RecordFieldTimestampExtractor;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.instanceOf;
Expand Down Expand Up @@ -353,6 +354,18 @@ public void testLongTimeExtract() {
validatePathFromDateTime(path, dt, TOPIC);
}

@Test
public void testTimestampScaling() {
RecordFieldTimestampExtractor extractor = new RecordFieldTimestampExtractor();

Long timestamp = 1000L;

assertEquals(extractor.scalingTimestamp(timestamp, "Division", 1000L),(Long) 1L);
assertEquals(extractor.scalingTimestamp(timestamp, "Multiplication", 10L),(Long) 10000L);
assertEquals(extractor.scalingTimestamp(timestamp, "UncorrectOperation", 10L),(Long) 1000L);
assertEquals(extractor.scalingTimestamp(timestamp, "Division", 0L),(Long) 1000L);
}

@Test
public void testFloatTimeExtract() {
String fieldName = "float";
Expand Down Expand Up @@ -492,6 +505,32 @@ public void testRecordFieldTimeDateExtractor() {
String encodedPartition = partitioner.encodePartition(sinkRecord);
assertEquals(expectedPartition, encodedPartition);

long rawTimestampSeconds = rawTimestamp * 1000L;
long rawTimestampMicroSeconds = rawTimestamp / 1000L;

SinkRecord sinkRecordSeconds = createSinkRecord(rawTimestampSeconds);
SinkRecord sinkRecordMicroSeconds = createSinkRecord(rawTimestampMicroSeconds);

Map<String, Object> timestampScalingConfig = new HashMap<>();

timestampScalingConfig.put(PartitionerConfig.TIMESTAMP_SCALING_FACTOR_CONFIG, 1000L);
timestampScalingConfig.put(PartitionerConfig.TIMESTAMP_SCALING_OPERATION_CONFIG, "Division");

TimeBasedPartitioner<String> scaledPartitioner = configurePartitioner(
new TimeBasedPartitioner<>(), timeField, timestampScalingConfig);

encodedPartition = scaledPartitioner.encodePartition(sinkRecordSeconds);
assertEquals(expectedPartition, encodedPartition);

timestampScalingConfig.put(PartitionerConfig.TIMESTAMP_SCALING_FACTOR_CONFIG, 1000L);
timestampScalingConfig.put(PartitionerConfig.TIMESTAMP_SCALING_OPERATION_CONFIG, "Multiplication");

scaledPartitioner = configurePartitioner(
new TimeBasedPartitioner<>(), timeField, timestampScalingConfig);

encodedPartition = scaledPartitioner.encodePartition(sinkRecordMicroSeconds);
assertEquals(expectedPartition, encodedPartition);

String timestamp = ISODateTimeFormat.dateTimeNoMillis().print(moment);
sinkRecord = createSinkRecord(Schema.STRING_SCHEMA, timestamp);
encodedPartition = partitioner.encodePartition(sinkRecord);
Expand Down Expand Up @@ -673,6 +712,8 @@ private Map<String, Object> createConfig(String timeFieldName) {
config.put(PartitionerConfig.TIMEZONE_CONFIG, DATE_TIME_ZONE.toString());
if (timeFieldName != null) {
config.put(PartitionerConfig.TIMESTAMP_FIELD_NAME_CONFIG, timeFieldName);
config.put(PartitionerConfig.TIMESTAMP_SCALING_FACTOR_CONFIG, PartitionerConfig.TIMESTAMP_SCALING_FACTOR_DEFAULT);
config.put(PartitionerConfig.TIMESTAMP_SCALING_OPERATION_CONFIG, PartitionerConfig.TIMESTAMP_SCALING_OPERATION_DEFAULT);
}
return config;
}
Expand Down