Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18026: KIP-1112, ProcessorWrapper API with PAPI and partial DSL implementation #17892

Merged
merged 12 commits into from
Nov 24, 2024
11 changes: 11 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.assignment.TaskAssignor;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.NoOpProcessorWrapper;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
Expand Down Expand Up @@ -675,6 +676,11 @@ public class StreamsConfig extends AbstractConfig {
"recommended setting for production; for development you can change this, by adjusting broker setting " +
"<code>transaction.state.log.replication.factor</code> and <code>transaction.state.log.min.isr</code>.";

/** {@code processor.wrapper.class} */
public static final String PROCESSOR_WRAPPER_CLASS_CONFIG = "processor.wrapper.class";
public static final String PROCESSOR_WRAPPER_CLASS_DOC = "A processor wrapper class or class name that implements the <code>org.apache.kafka.streams.state.ProcessorWrapper</code> interface. "
+ "Must be passed in to the StreamsBuilder or Topology constructor in order to take effect";

/** {@code repartition.purge.interval.ms} */
@SuppressWarnings("WeakerAccess")
public static final String REPARTITION_PURGE_INTERVAL_MS_CONFIG = "repartition.purge.interval.ms";
Expand Down Expand Up @@ -1124,6 +1130,11 @@ public class StreamsConfig extends AbstractConfig {
atLeast(60 * 1000L),
Importance.LOW,
PROBING_REBALANCE_INTERVAL_MS_DOC)
.define(PROCESSOR_WRAPPER_CLASS_CONFIG,
Type.CLASS,
NoOpProcessorWrapper.class,
Importance.LOW,
PROCESSOR_WRAPPER_CLASS_DOC)
.define(RECEIVE_BUFFER_CONFIG,
Type.INT,
32 * 1024,
Expand Down
6 changes: 4 additions & 2 deletions streams/src/main/java/org/apache/kafka/streams/Topology.java
Original file line number Diff line number Diff line change
Expand Up @@ -714,8 +714,10 @@ public org.apache.kafka.streams.processor.api.Processor<Object, Object, Object,
public synchronized <KIn, VIn, KOut, VOut> Topology addProcessor(final String name,
final ProcessorSupplier<KIn, VIn, KOut, VOut> supplier,
final String... parentNames) {
internalTopologyBuilder.addProcessor(name, supplier, parentNames);
final Set<StoreBuilder<?>> stores = supplier.stores();
final ProcessorSupplier<KIn, VIn, KOut, VOut> wrapped = internalTopologyBuilder.wrapProcessorSupplier(name, supplier);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the entire implementation for PAPI apps

internalTopologyBuilder.addProcessor(name, wrapped, parentNames);
final Set<StoreBuilder<?>> stores = wrapped.stores();

if (stores != null) {
for (final StoreBuilder<?> storeBuilder : stores) {
internalTopologyBuilder.addStateStore(storeBuilder, name);
Expand Down
23 changes: 20 additions & 3 deletions streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.NoOpProcessorWrapper;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.apache.kafka.streams.state.DslStoreSuppliers;

Expand All @@ -38,6 +39,7 @@
import java.util.function.Supplier;

import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_DOC;
import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG;
Expand All @@ -57,6 +59,8 @@
import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC;
import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_DOC;
import static org.apache.kafka.streams.StreamsConfig.ROCKS_DB;
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_DOC;
Expand All @@ -68,13 +72,26 @@
* Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the
* {@link org.apache.kafka.streams.KafkaStreams} constructor or the {@link KafkaStreamsNamedTopologyWrapper} constructor (deprecated)
* will determine the defaults, which can then be overridden for specific topologies by passing them in when creating the
* topology builders via the {@link org.apache.kafka.streams.StreamsBuilder#StreamsBuilder(TopologyConfig) StreamsBuilder(TopologyConfig)} method.
* topology builders via the {@link StreamsBuilder#StreamsBuilder(TopologyConfig)} constructor for DSL applications,
* or the {@link Topology#Topology(TopologyConfig)} for PAPI applications.
* <p>
* Note that some configs, such as the {@code processor.wrapper.class} config, can only take effect while the
* topology is being built, which means they have to be passed in as a TopologyConfig to the
* {@link Topology#Topology(TopologyConfig)} constructor (PAPI) or the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the docs here, would be helpful to users.

* {@link StreamsBuilder#StreamsBuilder(TopologyConfig)} constructor (DSL).
* If they are only set in the configs passed in to the KafkaStreams constructor, it will be too late for them
* to be applied and the config will be ignored.
*/
@SuppressWarnings("deprecation")
public final class TopologyConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
static {
CONFIG = new ConfigDef()
.define(PROCESSOR_WRAPPER_CLASS_CONFIG,
Type.CLASS,
NoOpProcessorWrapper.class.getName(),
Importance.LOW,
PROCESSOR_WRAPPER_CLASS_DOC)
.define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
Type.INT,
null,
Expand Down Expand Up @@ -147,8 +164,8 @@ public final class TopologyConfig extends AbstractConfig {
public final Supplier<DeserializationExceptionHandler> deserializationExceptionHandlerSupplier;
public final Supplier<ProcessingExceptionHandler> processingExceptionHandlerSupplier;

public TopologyConfig(final StreamsConfig globalAppConfigs) {
this(null, globalAppConfigs, new Properties());
public TopologyConfig(final StreamsConfig configs) {
this(null, configs, mkObjectProperties(configs.originals()));
}

public TopologyConfig(final String topologyName, final StreamsConfig globalAppConfigs, final Properties topologyOverrides) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.DslStoreSuppliers;
Expand All @@ -35,9 +34,11 @@ public AbstractConfigurableStoreFactory(final DslStoreSuppliers initialStoreSupp
@Override
public void configure(final StreamsConfig config) {
if (dslStoreSuppliers == null) {
dslStoreSuppliers = Utils.newInstance(
config.getClass(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG),
DslStoreSuppliers.class);
dslStoreSuppliers = config.getConfiguredInstance(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side fix: we weren't configuring this with the app configs

StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG,
DslStoreSuppliers.class,
config.originals()
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.kafka.streams.kstream.internals.graph;

import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorAdapter;
import org.apache.kafka.streams.state.StoreBuilder;

import java.util.Set;

/**
* Class used to represent a {@link ProcessorSupplier} or {@link FixedKeyProcessorSupplier} and the name
* used to register it with the {@link org.apache.kafka.streams.processor.internals.InternalTopologyBuilder}
Expand Down Expand Up @@ -78,18 +81,30 @@ public FixedKeyProcessorSupplier<KIn, VIn, VOut> fixedKeyProcessorSupplier() {

public void addProcessorTo(final InternalTopologyBuilder topologyBuilder, final String[] parentNodeNames) {
if (processorSupplier != null) {
topologyBuilder.addProcessor(processorName, processorSupplier, parentNodeNames);
if (processorSupplier.stores() != null) {
for (final StoreBuilder<?> storeBuilder : processorSupplier.stores()) {
ApiUtils.checkSupplier(processorSupplier);

final ProcessorSupplier<KIn, VIn, KOut, VOut> wrapped =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is how the DSL implementation will work -- processors are wrapped when building the DSL into a Topology, specifically in this method where they are added to the InternalTopologyBuilder.

Note this PR is only a partial implementation since not all DSL operators go through this class, some just write their processors to the InternalTopologyBuilder directly. Followup PRs will tackle converting these operators and enforcing that any new ones added in the future will have to go through this method and be wrapped

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why we do this here instead of inside topologyBuilder.addProcessor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, good question! that would definitely be simpler. however, to maximize the utility of the new wrapper, we want to allow users to wrap not only the Processor returned by the ProcessorSupplier#get method, but also the StoreBuilders returned from ProcessorSupplier#stores. Unfortunately, the stores are currently extracted and connected outside of the #addProcessor method, and since they aren't attached (via #addStateStore) until after #addProcessor is called, we can't even look up and wrap the stores for a processor from within #addProcessor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ultimately it might make sense to move the extraction of the stores from a ProcessorSupplier into a single InternalTopologyBuilder method (eg #addStatefulProcessor) that calls both #addProcessor and #addStateStore, and do the wrapping there. This will also help future-proof new DSL operators from being implemented incorrectly and missing the wrapper step.

But I think it makes sense to wait until we've completed all the followup PRs for the remaining DSL operators, in case there are any weird edge cases we haven't thought of/seen yet. Then we can do one final PR to clean up and future-proof the wrapping process

topologyBuilder.wrapProcessorSupplier(processorName, processorSupplier);

topologyBuilder.addProcessor(processorName, wrapped, parentNodeNames);
final Set<StoreBuilder<?>> stores = wrapped.stores();
if (stores != null) {
for (final StoreBuilder<?> storeBuilder : stores) {
topologyBuilder.addStateStore(storeBuilder, processorName);
}
}
}

if (fixedKeyProcessorSupplier != null) {
topologyBuilder.addProcessor(processorName, fixedKeyProcessorSupplier, parentNodeNames);
if (fixedKeyProcessorSupplier.stores() != null) {
for (final StoreBuilder<?> storeBuilder : fixedKeyProcessorSupplier.stores()) {
ApiUtils.checkSupplier(fixedKeyProcessorSupplier);

final FixedKeyProcessorSupplier<KIn, VIn, VOut> wrapped =
topologyBuilder.wrapFixedKeyProcessorSupplier(processorName, fixedKeyProcessorSupplier);

topologyBuilder.addProcessor(processorName, wrapped, parentNodeNames);
final Set<StoreBuilder<?>> stores = wrapped.stores();
if (stores != null) {
for (final StoreBuilder<?> storeBuilder : stores) {
topologyBuilder.addStateStore(storeBuilder, processorName);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.streams.processor.api;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.processor.internals.NoOpProcessorWrapper.WrappedFixedKeyProcessorSupplierImpl;
import org.apache.kafka.streams.processor.internals.NoOpProcessorWrapper.WrappedProcessorSupplierImpl;

import java.util.Map;

/**
* Wrapper class that can be used to inject custom wrappers around the processors of their application topology.
* The returned instance should wrap the supplied {@code ProcessorSupplier} and the {@code Processor} it supplies
* to avoid disrupting the regular processing of the application, although this is not required and any processor
* implementation can be substituted in to replace the original processor entirely (which may be useful for example
* while testing or debugging an application topology).
* <p>
* NOTE: in order to use this feature, you must set the {@link StreamsConfig#PROCESSOR_WRAPPER_CLASS_CONFIG} config and pass it
* in as a {@link TopologyConfig} when creating the {@link StreamsBuilder} or {@link Topology} by using the
* appropriate constructor (ie {@link StreamsBuilder#StreamsBuilder(TopologyConfig)} or {@link Topology#Topology(TopologyConfig)})
* <p>
* Can be configured, if desired, by implementing the {@link #configure(Map)} method. This will be invoked when
* the {@code ProcessorWrapper} is instantiated, and will provide it with the TopologyConfigs that were passed in
* to the {@link StreamsBuilder} or {@link Topology} constructor.
*/
public interface ProcessorWrapper extends Configurable {

@Override
default void configure(final Map<String, ?> configs) {
// do nothing
}

/**
* Wrap or replace the provided {@link ProcessorSupplier} and return a {@link WrappedProcessorSupplier}
* To convert a {@link ProcessorSupplier} instance into a {@link WrappedProcessorSupplier},
* use the {@link ProcessorWrapper#asWrapped(ProcessorSupplier)} method
*/
<KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, VOut> wrapProcessorSupplier(final String processorName,
final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier);

/**
* Wrap or replace the provided {@link FixedKeyProcessorSupplier} and return a {@link WrappedFixedKeyProcessorSupplier}
* To convert a {@link FixedKeyProcessorSupplier} instance into a {@link WrappedFixedKeyProcessorSupplier},
* use the {@link ProcessorWrapper#asWrappedFixedKey(FixedKeyProcessorSupplier)} method
*/
<KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> wrapFixedKeyProcessorSupplier(final String processorName,
final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier);

/**
* Use to convert a {@link ProcessorSupplier} instance into a {@link WrappedProcessorSupplier}
*/
static <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, VOut> asWrapped(
final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier
) {
return new WrappedProcessorSupplierImpl<>(processorSupplier);
}

/**
* Use to convert a {@link FixedKeyProcessorSupplier} instance into a {@link WrappedFixedKeyProcessorSupplier}
*/
static <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> asWrappedFixedKey(
final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier
) {
return new WrappedFixedKeyProcessorSupplierImpl<>(processorSupplier);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.streams.processor.api;

/**
* Marker interface for classes implementing {@link FixedKeyProcessorSupplier}
* that have been wrapped via a {@link ProcessorWrapper}.
* <p>
* To convert a {@link FixedKeyProcessorSupplier} instance into a {@link WrappedFixedKeyProcessorSupplier},
* use the {@link ProcessorWrapper#asWrappedFixedKey(FixedKeyProcessorSupplier)} method
*/
public interface WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> extends FixedKeyProcessorSupplier<KIn, VIn, VOut> {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.streams.processor.api;

/**
* Marker interface for classes implementing {@link ProcessorSupplier}
* that have been wrapped via a {@link ProcessorWrapper}.
* <p>
* To convert a {@link ProcessorSupplier} instance into a {@link WrappedProcessorSupplier},
* use the {@link ProcessorWrapper#asWrapped(ProcessorSupplier)} method
*/
public interface WrappedProcessorSupplier<KIn, VIn, KOut, VOut> extends ProcessorSupplier<KIn, VIn, KOut, VOut> {

}
Loading