Skip to content

Commit

Permalink
KAFKA-18026: KIP-1112, ProcessorWrapper API with PAPI and partial DSL…
Browse files Browse the repository at this point in the history
… implementation (apache#17892)

This PR includes the API for KIP-1112 and a partial implementation, which wraps any processors added through the PAPI and the DSL processors that are written to the topology through the ProcessorParameters#addProcessorTo method.

Further PRs will complete the implementation by converting the remaining DSL operators to using the #addProcessorTo method, and future-proof the processor writing mechanism to prevent new DSL operators from being implemented incorrectly/without the wrapper

Reviewers: Almog Gavra <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
ableegoldman authored and chiacyu committed Nov 30, 2024
1 parent 632820c commit 9d7a35c
Show file tree
Hide file tree
Showing 16 changed files with 629 additions and 18 deletions.
11 changes: 11 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.assignment.TaskAssignor;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.NoOpProcessorWrapper;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
Expand Down Expand Up @@ -675,6 +676,11 @@ public class StreamsConfig extends AbstractConfig {
"recommended setting for production; for development you can change this, by adjusting broker setting " +
"<code>transaction.state.log.replication.factor</code> and <code>transaction.state.log.min.isr</code>.";

/** {@code processor.wrapper.class} */
public static final String PROCESSOR_WRAPPER_CLASS_CONFIG = "processor.wrapper.class";
public static final String PROCESSOR_WRAPPER_CLASS_DOC = "A processor wrapper class or class name that implements the <code>org.apache.kafka.streams.state.ProcessorWrapper</code> interface. "
+ "Must be passed in to the StreamsBuilder or Topology constructor in order to take effect";

/** {@code repartition.purge.interval.ms} */
@SuppressWarnings("WeakerAccess")
public static final String REPARTITION_PURGE_INTERVAL_MS_CONFIG = "repartition.purge.interval.ms";
Expand Down Expand Up @@ -1124,6 +1130,11 @@ public class StreamsConfig extends AbstractConfig {
atLeast(60 * 1000L),
Importance.LOW,
PROBING_REBALANCE_INTERVAL_MS_DOC)
.define(PROCESSOR_WRAPPER_CLASS_CONFIG,
Type.CLASS,
NoOpProcessorWrapper.class,
Importance.LOW,
PROCESSOR_WRAPPER_CLASS_DOC)
.define(RECEIVE_BUFFER_CONFIG,
Type.INT,
32 * 1024,
Expand Down
6 changes: 4 additions & 2 deletions streams/src/main/java/org/apache/kafka/streams/Topology.java
Original file line number Diff line number Diff line change
Expand Up @@ -714,8 +714,10 @@ public org.apache.kafka.streams.processor.api.Processor<Object, Object, Object,
public synchronized <KIn, VIn, KOut, VOut> Topology addProcessor(final String name,
final ProcessorSupplier<KIn, VIn, KOut, VOut> supplier,
final String... parentNames) {
internalTopologyBuilder.addProcessor(name, supplier, parentNames);
final Set<StoreBuilder<?>> stores = supplier.stores();
final ProcessorSupplier<KIn, VIn, KOut, VOut> wrapped = internalTopologyBuilder.wrapProcessorSupplier(name, supplier);
internalTopologyBuilder.addProcessor(name, wrapped, parentNames);
final Set<StoreBuilder<?>> stores = wrapped.stores();

if (stores != null) {
for (final StoreBuilder<?> storeBuilder : stores) {
internalTopologyBuilder.addStateStore(storeBuilder, name);
Expand Down
23 changes: 20 additions & 3 deletions streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.NoOpProcessorWrapper;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.apache.kafka.streams.state.DslStoreSuppliers;

Expand All @@ -38,6 +39,7 @@
import java.util.function.Supplier;

import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_DOC;
import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG;
Expand All @@ -57,6 +59,8 @@
import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC;
import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_DOC;
import static org.apache.kafka.streams.StreamsConfig.ROCKS_DB;
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_DOC;
Expand All @@ -68,13 +72,26 @@
* Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the
* {@link org.apache.kafka.streams.KafkaStreams} constructor or the {@link KafkaStreamsNamedTopologyWrapper} constructor (deprecated)
* will determine the defaults, which can then be overridden for specific topologies by passing them in when creating the
* topology builders via the {@link org.apache.kafka.streams.StreamsBuilder#StreamsBuilder(TopologyConfig) StreamsBuilder(TopologyConfig)} method.
* topology builders via the {@link StreamsBuilder#StreamsBuilder(TopologyConfig)} constructor for DSL applications,
* or the {@link Topology#Topology(TopologyConfig)} for PAPI applications.
* <p>
* Note that some configs, such as the {@code processor.wrapper.class} config, can only take effect while the
* topology is being built, which means they have to be passed in as a TopologyConfig to the
* {@link Topology#Topology(TopologyConfig)} constructor (PAPI) or the
* {@link StreamsBuilder#StreamsBuilder(TopologyConfig)} constructor (DSL).
* If they are only set in the configs passed in to the KafkaStreams constructor, it will be too late for them
* to be applied and the config will be ignored.
*/
@SuppressWarnings("deprecation")
public final class TopologyConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
static {
CONFIG = new ConfigDef()
.define(PROCESSOR_WRAPPER_CLASS_CONFIG,
Type.CLASS,
NoOpProcessorWrapper.class.getName(),
Importance.LOW,
PROCESSOR_WRAPPER_CLASS_DOC)
.define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
Type.INT,
null,
Expand Down Expand Up @@ -147,8 +164,8 @@ public final class TopologyConfig extends AbstractConfig {
public final Supplier<DeserializationExceptionHandler> deserializationExceptionHandlerSupplier;
public final Supplier<ProcessingExceptionHandler> processingExceptionHandlerSupplier;

public TopologyConfig(final StreamsConfig globalAppConfigs) {
this(null, globalAppConfigs, new Properties());
public TopologyConfig(final StreamsConfig configs) {
this(null, configs, mkObjectProperties(configs.originals()));
}

public TopologyConfig(final String topologyName, final StreamsConfig globalAppConfigs, final Properties topologyOverrides) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.DslStoreSuppliers;
Expand All @@ -35,9 +34,11 @@ public AbstractConfigurableStoreFactory(final DslStoreSuppliers initialStoreSupp
@Override
public void configure(final StreamsConfig config) {
if (dslStoreSuppliers == null) {
dslStoreSuppliers = Utils.newInstance(
config.getClass(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG),
DslStoreSuppliers.class);
dslStoreSuppliers = config.getConfiguredInstance(
StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG,
DslStoreSuppliers.class,
config.originals()
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.kafka.streams.kstream.internals.graph;

import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorAdapter;
import org.apache.kafka.streams.state.StoreBuilder;

import java.util.Set;

/**
* Class used to represent a {@link ProcessorSupplier} or {@link FixedKeyProcessorSupplier} and the name
* used to register it with the {@link org.apache.kafka.streams.processor.internals.InternalTopologyBuilder}
Expand Down Expand Up @@ -78,18 +81,30 @@ public FixedKeyProcessorSupplier<KIn, VIn, VOut> fixedKeyProcessorSupplier() {

public void addProcessorTo(final InternalTopologyBuilder topologyBuilder, final String[] parentNodeNames) {
if (processorSupplier != null) {
topologyBuilder.addProcessor(processorName, processorSupplier, parentNodeNames);
if (processorSupplier.stores() != null) {
for (final StoreBuilder<?> storeBuilder : processorSupplier.stores()) {
ApiUtils.checkSupplier(processorSupplier);

final ProcessorSupplier<KIn, VIn, KOut, VOut> wrapped =
topologyBuilder.wrapProcessorSupplier(processorName, processorSupplier);

topologyBuilder.addProcessor(processorName, wrapped, parentNodeNames);
final Set<StoreBuilder<?>> stores = wrapped.stores();
if (stores != null) {
for (final StoreBuilder<?> storeBuilder : stores) {
topologyBuilder.addStateStore(storeBuilder, processorName);
}
}
}

if (fixedKeyProcessorSupplier != null) {
topologyBuilder.addProcessor(processorName, fixedKeyProcessorSupplier, parentNodeNames);
if (fixedKeyProcessorSupplier.stores() != null) {
for (final StoreBuilder<?> storeBuilder : fixedKeyProcessorSupplier.stores()) {
ApiUtils.checkSupplier(fixedKeyProcessorSupplier);

final FixedKeyProcessorSupplier<KIn, VIn, VOut> wrapped =
topologyBuilder.wrapFixedKeyProcessorSupplier(processorName, fixedKeyProcessorSupplier);

topologyBuilder.addProcessor(processorName, wrapped, parentNodeNames);
final Set<StoreBuilder<?>> stores = wrapped.stores();
if (stores != null) {
for (final StoreBuilder<?> storeBuilder : stores) {
topologyBuilder.addStateStore(storeBuilder, processorName);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.streams.processor.api;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.processor.internals.NoOpProcessorWrapper.WrappedFixedKeyProcessorSupplierImpl;
import org.apache.kafka.streams.processor.internals.NoOpProcessorWrapper.WrappedProcessorSupplierImpl;

import java.util.Map;

/**
* Wrapper class that can be used to inject custom wrappers around the processors of their application topology.
* The returned instance should wrap the supplied {@code ProcessorSupplier} and the {@code Processor} it supplies
* to avoid disrupting the regular processing of the application, although this is not required and any processor
* implementation can be substituted in to replace the original processor entirely (which may be useful for example
* while testing or debugging an application topology).
* <p>
* NOTE: in order to use this feature, you must set the {@link StreamsConfig#PROCESSOR_WRAPPER_CLASS_CONFIG} config and pass it
* in as a {@link TopologyConfig} when creating the {@link StreamsBuilder} or {@link Topology} by using the
* appropriate constructor (ie {@link StreamsBuilder#StreamsBuilder(TopologyConfig)} or {@link Topology#Topology(TopologyConfig)})
* <p>
* Can be configured, if desired, by implementing the {@link #configure(Map)} method. This will be invoked when
* the {@code ProcessorWrapper} is instantiated, and will provide it with the TopologyConfigs that were passed in
* to the {@link StreamsBuilder} or {@link Topology} constructor.
*/
public interface ProcessorWrapper extends Configurable {

@Override
default void configure(final Map<String, ?> configs) {
// do nothing
}

/**
* Wrap or replace the provided {@link ProcessorSupplier} and return a {@link WrappedProcessorSupplier}
* To convert a {@link ProcessorSupplier} instance into a {@link WrappedProcessorSupplier},
* use the {@link ProcessorWrapper#asWrapped(ProcessorSupplier)} method
*/
<KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, VOut> wrapProcessorSupplier(final String processorName,
final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier);

/**
* Wrap or replace the provided {@link FixedKeyProcessorSupplier} and return a {@link WrappedFixedKeyProcessorSupplier}
* To convert a {@link FixedKeyProcessorSupplier} instance into a {@link WrappedFixedKeyProcessorSupplier},
* use the {@link ProcessorWrapper#asWrappedFixedKey(FixedKeyProcessorSupplier)} method
*/
<KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> wrapFixedKeyProcessorSupplier(final String processorName,
final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier);

/**
* Use to convert a {@link ProcessorSupplier} instance into a {@link WrappedProcessorSupplier}
*/
static <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, VOut> asWrapped(
final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier
) {
return new WrappedProcessorSupplierImpl<>(processorSupplier);
}

/**
* Use to convert a {@link FixedKeyProcessorSupplier} instance into a {@link WrappedFixedKeyProcessorSupplier}
*/
static <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> asWrappedFixedKey(
final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier
) {
return new WrappedFixedKeyProcessorSupplierImpl<>(processorSupplier);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.streams.processor.api;

/**
* Marker interface for classes implementing {@link FixedKeyProcessorSupplier}
* that have been wrapped via a {@link ProcessorWrapper}.
* <p>
* To convert a {@link FixedKeyProcessorSupplier} instance into a {@link WrappedFixedKeyProcessorSupplier},
* use the {@link ProcessorWrapper#asWrappedFixedKey(FixedKeyProcessorSupplier)} method
*/
public interface WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> extends FixedKeyProcessorSupplier<KIn, VIn, VOut> {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.streams.processor.api;

/**
* Marker interface for classes implementing {@link ProcessorSupplier}
* that have been wrapped via a {@link ProcessorWrapper}.
* <p>
* To convert a {@link ProcessorSupplier} instance into a {@link WrappedProcessorSupplier},
* use the {@link ProcessorWrapper#asWrapped(ProcessorSupplier)} method
*/
public interface WrappedProcessorSupplier<KIn, VIn, KOut, VOut> extends ProcessorSupplier<KIn, VIn, KOut, VOut> {

}
Loading

0 comments on commit 9d7a35c

Please sign in to comment.