-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18026: KIP-1112, ProcessorWrapper API with PAPI and partial DSL implementation #17892
Changes from all commits
9d681db
e45693f
020283f
db821c7
278a2b3
05dac13
e7ae58f
06a2737
549b412
300c269
cb7dbf1
f553b74
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ | |
import org.apache.kafka.streams.kstream.Materialized; | ||
import org.apache.kafka.streams.kstream.internals.MaterializedInternal; | ||
import org.apache.kafka.streams.processor.TimestampExtractor; | ||
import org.apache.kafka.streams.processor.internals.NoOpProcessorWrapper; | ||
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper; | ||
import org.apache.kafka.streams.state.DslStoreSuppliers; | ||
|
||
|
@@ -38,6 +39,7 @@ | |
import java.util.function.Supplier; | ||
|
||
import static org.apache.kafka.common.config.ConfigDef.ValidString.in; | ||
import static org.apache.kafka.common.utils.Utils.mkObjectProperties; | ||
import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG; | ||
import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_DOC; | ||
import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG; | ||
|
@@ -57,6 +59,8 @@ | |
import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG; | ||
import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC; | ||
import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG; | ||
import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG; | ||
import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_DOC; | ||
import static org.apache.kafka.streams.StreamsConfig.ROCKS_DB; | ||
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG; | ||
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_DOC; | ||
|
@@ -68,13 +72,26 @@ | |
* Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the | ||
* {@link org.apache.kafka.streams.KafkaStreams} constructor or the {@link KafkaStreamsNamedTopologyWrapper} constructor (deprecated) | ||
* will determine the defaults, which can then be overridden for specific topologies by passing them in when creating the | ||
* topology builders via the {@link org.apache.kafka.streams.StreamsBuilder#StreamsBuilder(TopologyConfig) StreamsBuilder(TopologyConfig)} method. | ||
* topology builders via the {@link StreamsBuilder#StreamsBuilder(TopologyConfig)} constructor for DSL applications, | ||
* or the {@link Topology#Topology(TopologyConfig)} for PAPI applications. | ||
* <p> | ||
* Note that some configs, such as the {@code processor.wrapper.class} config, can only take effect while the | ||
* topology is being built, which means they have to be passed in as a TopologyConfig to the | ||
* {@link Topology#Topology(TopologyConfig)} constructor (PAPI) or the | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for adding the docs here, would be helpful to users. |
||
* {@link StreamsBuilder#StreamsBuilder(TopologyConfig)} constructor (DSL). | ||
* If they are only set in the configs passed in to the KafkaStreams constructor, it will be too late for them | ||
* to be applied and the config will be ignored. | ||
*/ | ||
@SuppressWarnings("deprecation") | ||
public final class TopologyConfig extends AbstractConfig { | ||
private static final ConfigDef CONFIG; | ||
static { | ||
CONFIG = new ConfigDef() | ||
.define(PROCESSOR_WRAPPER_CLASS_CONFIG, | ||
Type.CLASS, | ||
NoOpProcessorWrapper.class.getName(), | ||
Importance.LOW, | ||
PROCESSOR_WRAPPER_CLASS_DOC) | ||
.define(BUFFERED_RECORDS_PER_PARTITION_CONFIG, | ||
Type.INT, | ||
null, | ||
|
@@ -147,8 +164,8 @@ public final class TopologyConfig extends AbstractConfig { | |
public final Supplier<DeserializationExceptionHandler> deserializationExceptionHandlerSupplier; | ||
public final Supplier<ProcessingExceptionHandler> processingExceptionHandlerSupplier; | ||
|
||
public TopologyConfig(final StreamsConfig globalAppConfigs) { | ||
this(null, globalAppConfigs, new Properties()); | ||
public TopologyConfig(final StreamsConfig configs) { | ||
this(null, configs, mkObjectProperties(configs.originals())); | ||
} | ||
|
||
public TopologyConfig(final String topologyName, final StreamsConfig globalAppConfigs, final Properties topologyOverrides) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,6 @@ | |
*/ | ||
package org.apache.kafka.streams.kstream.internals; | ||
|
||
import org.apache.kafka.common.utils.Utils; | ||
import org.apache.kafka.streams.StreamsConfig; | ||
import org.apache.kafka.streams.processor.internals.StoreFactory; | ||
import org.apache.kafka.streams.state.DslStoreSuppliers; | ||
|
@@ -35,9 +34,11 @@ public AbstractConfigurableStoreFactory(final DslStoreSuppliers initialStoreSupp | |
@Override | ||
public void configure(final StreamsConfig config) { | ||
if (dslStoreSuppliers == null) { | ||
dslStoreSuppliers = Utils.newInstance( | ||
config.getClass(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG), | ||
DslStoreSuppliers.class); | ||
dslStoreSuppliers = config.getConfiguredInstance( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. side fix: we weren't configuring this with the app configs |
||
StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, | ||
DslStoreSuppliers.class, | ||
config.originals() | ||
); | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,12 +17,15 @@ | |
|
||
package org.apache.kafka.streams.kstream.internals.graph; | ||
|
||
import org.apache.kafka.streams.internals.ApiUtils; | ||
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; | ||
import org.apache.kafka.streams.processor.api.ProcessorSupplier; | ||
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; | ||
import org.apache.kafka.streams.processor.internals.ProcessorAdapter; | ||
import org.apache.kafka.streams.state.StoreBuilder; | ||
|
||
import java.util.Set; | ||
|
||
/** | ||
* Class used to represent a {@link ProcessorSupplier} or {@link FixedKeyProcessorSupplier} and the name | ||
* used to register it with the {@link org.apache.kafka.streams.processor.internals.InternalTopologyBuilder} | ||
|
@@ -78,18 +81,30 @@ public FixedKeyProcessorSupplier<KIn, VIn, VOut> fixedKeyProcessorSupplier() { | |
|
||
public void addProcessorTo(final InternalTopologyBuilder topologyBuilder, final String[] parentNodeNames) { | ||
if (processorSupplier != null) { | ||
topologyBuilder.addProcessor(processorName, processorSupplier, parentNodeNames); | ||
if (processorSupplier.stores() != null) { | ||
for (final StoreBuilder<?> storeBuilder : processorSupplier.stores()) { | ||
ApiUtils.checkSupplier(processorSupplier); | ||
|
||
final ProcessorSupplier<KIn, VIn, KOut, VOut> wrapped = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is how the DSL implementation will work -- processors are wrapped when building the DSL into a Topology, specifically in this method where they are added to the InternalTopologyBuilder. Note this PR is only a partial implementation since not all DSL operators go through this class, some just write their processors to the InternalTopologyBuilder directly. Followup PRs will tackle converting these operators and enforcing that any new ones added in the future will have to go through this method and be wrapped There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. curious why we do this here instead of inside There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, good question! that would definitely be simpler. however, to maximize the utility of the new wrapper, we want to allow users to wrap not only the Processor returned by the ProcessorSupplier#get method, but also the StoreBuilders returned from ProcessorSupplier#stores. Unfortunately, the stores are currently extracted and connected outside of the #addProcessor method, and since they aren't attached (via #addStateStore) until after #addProcessor is called, we can't even look up and wrap the stores for a processor from within #addProcessor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ultimately it might make sense to move the extraction of the stores from a ProcessorSupplier into a single InternalTopologyBuilder method (eg #addStatefulProcessor) that calls both #addProcessor and #addStateStore, and do the wrapping there. This will also help future-proof new DSL operators from being implemented incorrectly and missing the wrapper step. But I think it makes sense to wait until we've completed all the followup PRs for the remaining DSL operators, in case there are any weird edge cases we haven't thought of/seen yet. Then we can do one final PR to clean up and future-proof the wrapping process |
||
topologyBuilder.wrapProcessorSupplier(processorName, processorSupplier); | ||
|
||
topologyBuilder.addProcessor(processorName, wrapped, parentNodeNames); | ||
final Set<StoreBuilder<?>> stores = wrapped.stores(); | ||
if (stores != null) { | ||
for (final StoreBuilder<?> storeBuilder : stores) { | ||
topologyBuilder.addStateStore(storeBuilder, processorName); | ||
} | ||
} | ||
} | ||
|
||
if (fixedKeyProcessorSupplier != null) { | ||
topologyBuilder.addProcessor(processorName, fixedKeyProcessorSupplier, parentNodeNames); | ||
if (fixedKeyProcessorSupplier.stores() != null) { | ||
for (final StoreBuilder<?> storeBuilder : fixedKeyProcessorSupplier.stores()) { | ||
ApiUtils.checkSupplier(fixedKeyProcessorSupplier); | ||
|
||
final FixedKeyProcessorSupplier<KIn, VIn, VOut> wrapped = | ||
topologyBuilder.wrapFixedKeyProcessorSupplier(processorName, fixedKeyProcessorSupplier); | ||
|
||
topologyBuilder.addProcessor(processorName, wrapped, parentNodeNames); | ||
final Set<StoreBuilder<?>> stores = wrapped.stores(); | ||
if (stores != null) { | ||
for (final StoreBuilder<?> storeBuilder : stores) { | ||
topologyBuilder.addStateStore(storeBuilder, processorName); | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.kafka.streams.processor.api; | ||
|
||
import org.apache.kafka.common.Configurable; | ||
import org.apache.kafka.streams.StreamsBuilder; | ||
import org.apache.kafka.streams.StreamsConfig; | ||
import org.apache.kafka.streams.Topology; | ||
import org.apache.kafka.streams.TopologyConfig; | ||
import org.apache.kafka.streams.processor.internals.NoOpProcessorWrapper.WrappedFixedKeyProcessorSupplierImpl; | ||
import org.apache.kafka.streams.processor.internals.NoOpProcessorWrapper.WrappedProcessorSupplierImpl; | ||
|
||
import java.util.Map; | ||
|
||
/** | ||
* Wrapper class that can be used to inject custom wrappers around the processors of their application topology. | ||
* The returned instance should wrap the supplied {@code ProcessorSupplier} and the {@code Processor} it supplies | ||
* to avoid disrupting the regular processing of the application, although this is not required and any processor | ||
* implementation can be substituted in to replace the original processor entirely (which may be useful for example | ||
* while testing or debugging an application topology). | ||
* <p> | ||
* NOTE: in order to use this feature, you must set the {@link StreamsConfig#PROCESSOR_WRAPPER_CLASS_CONFIG} config and pass it | ||
* in as a {@link TopologyConfig} when creating the {@link StreamsBuilder} or {@link Topology} by using the | ||
* appropriate constructor (ie {@link StreamsBuilder#StreamsBuilder(TopologyConfig)} or {@link Topology#Topology(TopologyConfig)}) | ||
* <p> | ||
* Can be configured, if desired, by implementing the {@link #configure(Map)} method. This will be invoked when | ||
* the {@code ProcessorWrapper} is instantiated, and will provide it with the TopologyConfigs that were passed in | ||
* to the {@link StreamsBuilder} or {@link Topology} constructor. | ||
*/ | ||
public interface ProcessorWrapper extends Configurable { | ||
|
||
@Override | ||
default void configure(final Map<String, ?> configs) { | ||
// do nothing | ||
} | ||
|
||
/** | ||
* Wrap or replace the provided {@link ProcessorSupplier} and return a {@link WrappedProcessorSupplier} | ||
* To convert a {@link ProcessorSupplier} instance into a {@link WrappedProcessorSupplier}, | ||
* use the {@link ProcessorWrapper#asWrapped(ProcessorSupplier)} method | ||
*/ | ||
<KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, VOut> wrapProcessorSupplier(final String processorName, | ||
final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier); | ||
|
||
/** | ||
* Wrap or replace the provided {@link FixedKeyProcessorSupplier} and return a {@link WrappedFixedKeyProcessorSupplier} | ||
* To convert a {@link FixedKeyProcessorSupplier} instance into a {@link WrappedFixedKeyProcessorSupplier}, | ||
* use the {@link ProcessorWrapper#asWrappedFixedKey(FixedKeyProcessorSupplier)} method | ||
*/ | ||
<KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> wrapFixedKeyProcessorSupplier(final String processorName, | ||
final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier); | ||
|
||
/** | ||
* Use to convert a {@link ProcessorSupplier} instance into a {@link WrappedProcessorSupplier} | ||
*/ | ||
static <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, VOut> asWrapped( | ||
final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier | ||
) { | ||
return new WrappedProcessorSupplierImpl<>(processorSupplier); | ||
} | ||
|
||
/** | ||
* Use to convert a {@link FixedKeyProcessorSupplier} instance into a {@link WrappedFixedKeyProcessorSupplier} | ||
*/ | ||
static <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> asWrappedFixedKey( | ||
final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier | ||
) { | ||
return new WrappedFixedKeyProcessorSupplierImpl<>(processorSupplier); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.kafka.streams.processor.api; | ||
|
||
/** | ||
* Marker interface for classes implementing {@link FixedKeyProcessorSupplier} | ||
* that have been wrapped via a {@link ProcessorWrapper}. | ||
* <p> | ||
* To convert a {@link FixedKeyProcessorSupplier} instance into a {@link WrappedFixedKeyProcessorSupplier}, | ||
* use the {@link ProcessorWrapper#asWrappedFixedKey(FixedKeyProcessorSupplier)} method | ||
*/ | ||
public interface WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> extends FixedKeyProcessorSupplier<KIn, VIn, VOut> { | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.kafka.streams.processor.api; | ||
|
||
/** | ||
* Marker interface for classes implementing {@link ProcessorSupplier} | ||
* that have been wrapped via a {@link ProcessorWrapper}. | ||
* <p> | ||
* To convert a {@link ProcessorSupplier} instance into a {@link WrappedProcessorSupplier}, | ||
* use the {@link ProcessorWrapper#asWrapped(ProcessorSupplier)} method | ||
*/ | ||
public interface WrappedProcessorSupplier<KIn, VIn, KOut, VOut> extends ProcessorSupplier<KIn, VIn, KOut, VOut> { | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the entire implementation for PAPI apps