Skip to content

Commit

Permalink
STORM-2974: Add transactional spout to storm-kafka-client
Browse files Browse the repository at this point in the history
  • Loading branch information
srdo committed Jul 10, 2018
1 parent 2d7c7d3 commit ba52607
Show file tree
Hide file tree
Showing 38 changed files with 695 additions and 304 deletions.
4 changes: 2 additions & 2 deletions bin/storm.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ def jar(jarfile, klass, *args):
And when you want to ship maven artifacts and its transitive dependencies, you can pass them to --artifacts with comma-separated string.
You can also exclude some dependencies like what you're doing in maven pom.
Please add exclusion artifacts with '^' separated string after the artifact.
For example, --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12" will load jedis and kafka artifact and all of transitive dependencies but exclude slf4j-log4j12 from kafka.
For example, -artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka-clients:1.0.0^org.slf4j:slf4j-api" will load jedis and kafka-clients artifact and all of transitive dependencies but exclude slf4j-api from kafka.
When you need to pull the artifacts from other than Maven Central, you can pass remote repositories to --artifactRepositories option with comma-separated string.
Repository format is "<name>^<url>". '^' is taken as separator because URL allows various characters.
Expand All @@ -373,7 +373,7 @@ def jar(jarfile, klass, *args):
--proxyUsername: username of proxy if it requires basic auth
--proxyPassword: password of proxy if it requires basic auth
Complete example of options is here: `./bin/storm jar example/storm-starter/storm-starter-topologies-*.jar org.apache.storm.starter.RollingTopWords blobstore-remote2 remote --jars "./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka/storm-kafka-1.1.0.jar" --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12" --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/"`
Complete example of options is here: `./bin/storm jar example/storm-starter/storm-starter-topologies-*.jar org.apache.storm.starter.RollingTopWords blobstore-remote2 remote --jars "./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka-client/storm-kafka-client-1.1.0.jar" --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka-clients:1.0.0^org.slf4j:slf4j-api" --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/"`
When you pass jars and/or artifacts options, StormSubmitter will upload them when the topology is submitted, and they will be included to classpath of both the process which runs the class, and also workers for that topology.
Expand Down
2 changes: 1 addition & 1 deletion docs/Command-line-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ And when you want to ship maven artifacts and its transitive dependencies, you c

When you need to pull the artifacts from other than Maven Central, you can pass remote repositories to --artifactRepositories option with comma-separated string. Repository format is "<name>^<url>". '^' is taken as separator because URL allows various characters. For example, --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/" will add JBoss and HDP repositories for dependency resolver.

Complete example of both options is here: `./bin/storm jar example/storm-starter/storm-starter-topologies-*.jar org.apache.storm.starter.RollingTopWords blobstore-remote2 remote --jars "./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka/storm-kafka-1.1.0.jar" --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12" --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/"`
Complete example of both options is here: `./bin/storm jar example/storm-starter/storm-starter-topologies-*.jar org.apache.storm.starter.RollingTopWords blobstore-remote2 remote --jars "./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka-client/storm-kafka-client-1.1.0.jar" --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka-clients:1.0.0^org.slf4j:slf4j-api" --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/"`

When you pass jars and/or artifacts options, StormSubmitter will upload them when the topology is submitted, and they will be included to classpath of both the process which runs the class, and also workers for that topology.

Expand Down
6 changes: 3 additions & 3 deletions docs/Transactional-topologies.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ When using transactional topologies, Storm does the following for you:
3. *Fault detection:* Storm leverages the acking framework to efficiently determine when a batch has successfully processed, successfully committed, or failed. Storm will then replay batches appropriately. You don't have to do any acking or anchoring -- Storm manages all of this for you.
4. *First class batch processing API*: Storm layers an API on top of regular bolts to allow for batch processing of tuples. Storm manages all the coordination for determining when a task has received all the tuples for that particular transaction. Storm will also take care of cleaning up any accumulated state for each transaction (like the partial counts).

Finally, another thing to note is that transactional topologies require a source queue that can replay an exact batch of messages. Technologies like [Kestrel](https://github.com/robey/kestrel) can't do this. [Apache Kafka](http://incubator.apache.org/kafka/index.html) is a perfect fit for this kind of spout, and [storm-kafka](https://github.com/apache/storm/tree/master/external/storm-kafka) contains a transactional spout implementation for Kafka.
Finally, another thing to note is that transactional topologies require a source queue that can replay an exact batch of messages. Technologies like [Kestrel](https://github.com/robey/kestrel) can't do this. [Apache Kafka](http://incubator.apache.org/kafka/index.html) is a perfect fit for this kind of spout, and [storm-kafka-client](https://github.com/apache/storm/tree/master/external/storm-kafka-client) contains a transactional spout implementation for Kafka.

## The basics through example

Expand Down Expand Up @@ -255,7 +255,7 @@ The details of implementing a `TransactionalSpout` are in [the Javadoc](javadocs

#### Partitioned Transactional Spout

A common kind of transactional spout is one that reads the batches from a set of partitions across many queue brokers. For example, this is how [TransactionalKafkaSpout]({{page.git-tree-base}}/external/storm-kafka/src/jvm/org/apache/storm/kafka/TransactionalKafkaSpout.java) works. An `IPartitionedTransactionalSpout` automates the bookkeeping work of managing the state for each partition to ensure idempotent replayability. See [the Javadoc](javadocs/org/apache/storm/transactional/partitioned/IPartitionedTransactionalSpout.html) for more details.
A common kind of transactional spout is one that reads the batches from a set of partitions across many queue brokers. For example, this is how [KafkaTridentSpoutTransactional]({{page.git-tree-base}}/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java) works. An `IPartitionedTransactionalSpout` automates the bookkeeping work of managing the state for each partition to ensure idempotent replayability. See [the Javadoc](javadocs/org/apache/storm/transactional/partitioned/IPartitionedTransactionalSpout.html) for more details.

### Configuration

Expand Down Expand Up @@ -325,7 +325,7 @@ In this scenario, tuples 41-50 are skipped. By failing all subsequent transactio

By failing all subsequent transactions on failure, no tuples are skipped. This also shows that a requirement of transactional spouts is that they always emit where the last transaction left off.

A non-idempotent transactional spout is more concisely referred to as an "OpaqueTransactionalSpout" (opaque is the opposite of idempotent). [IOpaquePartitionedTransactionalSpout](javadocs/org/apache/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.html) is an interface for implementing opaque partitioned transactional spouts, of which [OpaqueTransactionalKafkaSpout]({{page.git-tree-base}}/external/storm-kafka/src/jvm/org/apache/storm/kafka/OpaqueTransactionalKafkaSpout.java) is an example. `OpaqueTransactionalKafkaSpout` can withstand losing individual Kafka nodes without sacrificing accuracy as long as you use the update strategy as explained in this section.
A non-idempotent transactional spout is more concisely referred to as an "OpaqueTransactionalSpout" (opaque is the opposite of idempotent). [IOpaquePartitionedTransactionalSpout](javadocs/org/apache/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.html) is an interface for implementing opaque partitioned transactional spouts, of which [KafkaTridentSpoutOpaque]({{page.git-tree-base}}/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Trident/KafkaTridentSpoutOpaque.java) is an example. `KafkaTridentSpoutOpaque` can withstand losing individual Kafka nodes without sacrificing accuracy as long as you use the update strategy as explained in this section.

## Implementation

Expand Down
4 changes: 2 additions & 2 deletions docs/Trident-state.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Remember, Trident processes tuples as small batches with each batch being given
2. There's no overlap between batches of tuples (tuples are in one batch or another, never multiple).
3. Every tuple is in a batch (no tuples are skipped)

This is a pretty easy type of spout to understand, the stream is divided into fixed batches that never change. storm-contrib has [an implementation of a transactional spout]({{page.git-tree-base}}/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java) for Kafka.
This is a pretty easy type of spout to understand, the stream is divided into fixed batches that never change. Storm has [an implementation of a transactional spout]({{page.git-tree-base}}/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional) for Kafka.

You might be wondering – why wouldn't you just always use a transactional spout? They're simple and easy to understand. One reason you might not use one is because they're not necessarily very fault-tolerant. For example, the way TransactionalTridentKafkaSpout works is the batch for a txid will contain tuples from all the Kafka partitions for a topic. Once a batch has been emitted, any time that batch is re-emitted in the future the exact same set of tuples must be emitted to meet the semantics of transactional spouts. Now suppose a batch is emitted from TransactionalTridentKafkaSpout, the batch fails to process, and at the same time one of the Kafka nodes goes down. You're now incapable of replaying the same batch as you did before (since the node is down and some partitions for the topic are not unavailable), and processing will halt.

Expand Down Expand Up @@ -72,7 +72,7 @@ As described before, an opaque transactional spout cannot guarantee that the bat

1. Every tuple is *successfully* processed in exactly one batch. However, it's possible for a tuple to fail to process in one batch and then succeed to process in a later batch.

[OpaqueTridentKafkaSpout]({{page.git-tree-base}}/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java) is a spout that has this property and is fault-tolerant to losing Kafka nodes. Whenever it's time for OpaqueTridentKafkaSpout to emit a batch, it emits tuples starting from where the last batch finished emitting. This ensures that no tuple is ever skipped or successfully processed by multiple batches.
[KafkaTridentSpoutOpaque]({{page.git-tree-base}}/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java) is a spout that has this property and is fault-tolerant to losing Kafka nodes. Whenever it's time for KafkaTridentSpoutOpaque to emit a batch, it emits tuples starting from where the last batch finished emitting. This ensures that no tuple is ever skipped or successfully processed by multiple batches.

With opaque transactional spouts, it's no longer possible to use the trick of skipping state updates if the transaction id in the database is the same as the transaction id for the current batch. This is because the batch may have changed between state updates.

Expand Down
2 changes: 1 addition & 1 deletion docs/flux.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ the layout and configuration of your topologies.
in your topology code
* Support for existing topology code (see below)
* Define Storm Core API (Spouts/Bolts) using a flexible YAML DSL
* YAML DSL support for most Storm components (storm-kafka, storm-hdfs, storm-hbase, etc.)
* YAML DSL support for most Storm components (storm-kafka-client, storm-hdfs, storm-hbase, etc.)
* Convenient support for multi-lang components
* External property substitution/filtering for easily switching between configurations/environments (similar to Maven-style
`${variable.name}` substitution)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutTransactional;
import org.apache.storm.trident.spout.ITridentDataSource;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

Expand All @@ -53,6 +55,11 @@ public class TridentKafkaClientTopologyNamedTopics {
private KafkaTridentSpoutOpaque<String, String> newKafkaTridentSpoutOpaque(KafkaSpoutConfig<String, String> spoutConfig) {
return new KafkaTridentSpoutOpaque<>(spoutConfig);
}

private KafkaTridentSpoutTransactional<String, String> newKafkaTridentSpoutTransactional(
KafkaSpoutConfig<String, String> spoutConfig) {
return new KafkaTridentSpoutTransactional<>(spoutConfig);
}

private static final Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC = new JustValueFunc();

Expand All @@ -66,7 +73,7 @@ public List<Object> apply(ConsumerRecord<String, String> record) {
return new Values(record.value());
}
}

protected KafkaSpoutConfig<String, String> newKafkaSpoutConfig(String bootstrapServers) {
return KafkaSpoutConfig.builder(bootstrapServers, TOPIC_1, TOPIC_2)
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup_" + System.nanoTime())
Expand All @@ -91,7 +98,8 @@ public static void main(String[] args) throws Exception {
protected void run(String[] args) throws AlreadyAliveException, InvalidTopologyException,
AuthorizationException, InterruptedException {
final String brokerUrl = args.length > 0 ? args[0] : KAFKA_LOCAL_BROKER;
System.out.println("Running with broker url " + brokerUrl);
final boolean isOpaque = args.length > 1 ? Boolean.parseBoolean(args[1]) : true;
System.out.println("Running with broker url " + brokerUrl + " and isOpaque=" + isOpaque);

Config tpConf = new Config();
tpConf.setDebug(true);
Expand All @@ -101,7 +109,9 @@ protected void run(String[] args) throws AlreadyAliveException, InvalidTopologyE
StormSubmitter.submitTopology(TOPIC_1 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_1));
StormSubmitter.submitTopology(TOPIC_2 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_2));
// Consumer
KafkaSpoutConfig<String, String> spoutConfig = newKafkaSpoutConfig(brokerUrl);
ITridentDataSource spout = isOpaque ? newKafkaTridentSpoutOpaque(spoutConfig) : newKafkaTridentSpoutTransactional(spoutConfig);
StormSubmitter.submitTopology("topics-consumer", tpConf,
TridentKafkaConsumerTopology.newTopology(newKafkaTridentSpoutOpaque(newKafkaSpoutConfig(brokerUrl))));
TridentKafkaConsumerTopology.newTopology(spout));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.TopicPartition;
import org.apache.storm.task.TopologyContext;

Expand Down
Loading

0 comments on commit ba52607

Please sign in to comment.