Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply transformation chain(s) through parallelStream #1282

Open
wants to merge 2 commits into
base: 5.3.0-cp1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ public class WorkerConfig extends AbstractConfig {
+ "and `Principal`. ";
public static final String CONNECTOR_CLIENT_POLICY_CLASS_DEFAULT = "None";

public static final String PARALLEL_TRANSFORMATION_ENABLED_CONFIG = "enable.parallel.transformation";
private static final String PARALLEL_TRANSFORMATION_ENABLED_DOC
= "Enable parallel processing of transformation chain(s) for optimising."
+ "Caution: Enable only if your transformation chain(s) are thread safe.";
public static final boolean PARALLEL_TRANSFORMATION_ENABLED_DEFAULT = false;


public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
Expand Down Expand Up @@ -299,7 +305,9 @@ protected static ConfigDef baseConfigDef() {
.define(REST_EXTENSION_CLASSES_CONFIG, Type.LIST, "",
Importance.LOW, REST_EXTENSION_CLASSES_DOC)
.define(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, Type.STRING, CONNECTOR_CLIENT_POLICY_CLASS_DEFAULT,
Importance.MEDIUM, CONNECTOR_CLIENT_POLICY_CLASS_DOC);
Importance.MEDIUM, CONNECTOR_CLIENT_POLICY_CLASS_DOC)
.define(PARALLEL_TRANSFORMATION_ENABLED_CONFIG, Type.BOOLEAN, PARALLEL_TRANSFORMATION_ENABLED_DEFAULT,
Importance.LOW, PARALLEL_TRANSFORMATION_ENABLED_DOC);
}

private void logInternalConverterDeprecationWarnings(Map<String, String> props) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;


/**
* WorkerTask that uses a SourceTask to ingest data into Kafka.
*/
Expand Down Expand Up @@ -93,6 +95,7 @@ class WorkerSourceTask extends WorkerTask {
private boolean finishedStart = false;
private boolean startedShutdownBeforeStartCompleted = false;
private boolean stopped = false;
private boolean isParallelTransformEnabled;

public WorkerSourceTask(ConnectorTaskId id,
SourceTask task,
Expand Down Expand Up @@ -133,6 +136,7 @@ public WorkerSourceTask(ConnectorTaskId id,
this.flushing = false;
this.stopRequestedLatch = new CountDownLatch(1);
this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
this.isParallelTransformEnabled = workerConfig.getBoolean(WorkerConfig.PARALLEL_TRANSFORMATION_ENABLED_CONFIG);
}

@Override
Expand Down Expand Up @@ -266,7 +270,6 @@ private ProducerRecord<byte[], byte[]> convertTransformedRecord(SourceRecord rec

byte[] key = retryWithToleranceOperator.execute(() -> keyConverter.fromConnectData(record.topic(), record.keySchema(), record.key()),
Stage.KEY_CONVERTER, keyConverter.getClass());

byte[] value = retryWithToleranceOperator.execute(() -> valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value()),
Stage.VALUE_CONVERTER, valueConverter.getClass());

Expand All @@ -286,11 +289,28 @@ private ProducerRecord<byte[], byte[]> convertTransformedRecord(SourceRecord rec
private boolean sendRecords() {
int processed = 0;
recordBatch(toSend.size());
Map<SourceRecord, SourceRecord> transformedRecordMap = new ConcurrentHashMap<>(toSend.size());
final SourceRecordWriteCounter counter = new SourceRecordWriteCounter(toSend.size(), sourceTaskMetricsGroup);
for (final SourceRecord preTransformRecord : toSend) {

if (isParallelTransformEnabled) {
toSend.parallelStream().forEach(record -> {
SourceRecord transformedRecord = transformationChain.apply(record);
if (transformedRecord != null) {
transformedRecordMap.put(record, transformedRecord);
}
});
}

for (final SourceRecord preTransformRecord : toSend) {
retryWithToleranceOperator.sourceRecord(preTransformRecord);
final SourceRecord record = transformationChain.apply(preTransformRecord);

final SourceRecord record;
if (isParallelTransformEnabled) {
record = transformedRecordMap.get(preTransformRecord);
} else {
record = transformationChain.apply(preTransformRecord);
}

final ProducerRecord<byte[], byte[]> producerRecord = convertTransformedRecord(record);
if (producerRecord == null || retryWithToleranceOperator.failed()) {
counter.skipRecord();
Expand Down