A practical Storm Trident tutorial
This tutorial builds on Pere Ferrera's excellent material for the Trident hackaton@Big Data Beers #4 in Berlin. The vagrant setup is based on Taylor Goetz's contribution. The Hazelcast state code is based on wurstmeister's code.
Have a look at the accompanying slides as well.
- Go through Part*.java to learn about the basics of Trident
- Implement your own topology using Skeleton.java, or have a look at other examples
├── environment ------ Vagrant resources to simulate a Storm cluster locally
├── src
└── main
├── java
│ └── tutorial
│ └── storm
│ ├── trident
│ | ├── example ------ Complete examples
│ | ├── operations ------ Functions and filters that is used in the tutorial/examples
│ | └── testutil ------ Test utility classes (e.g test data generators)
| | └── TweetIngestor ------ Creates a local Kafka broker that streams twitter public stream
| ├── Part*.java ------ Illustrates usage of Trident step by step.
| └── Skeleton.java ------ Stub for writing your own topology
└── resources
└── tutorial
└── storm
└── trident
└── testutil ------ Contains test data and config files
- Install Java 1.6 and Maven 3 (these versions are recommended, but you can also use Java 1.7 and/or Maven 2)
- Clone this repo (if you don't have git, you can also download the source as zip file and extract it)
- Go to the project folder and execute
mvn clean package
, and see if the build succeeds
These classes are primarily meant to be read, but you can run them as well. Before you run the main method, you should comment out all streams except the one you are interested in (otherwise there will be lots of output)
These toplogies expect a Kafka spout that streams tweets. The Kafka spout needs a Kafka queue. There is a utility class called Tweetingestor
which starts a local Kafka broker, connects to twitter and publishes tweets. To use this class however, you must provide a valid twitter access token in twitter4j.properties
file.
To do that,
- Go to https://dev.twitter.com and register
- Create an application and obtain a consumer key, consumer secret, access token and an access secret
- Copy
twitter4j.properties.template
astwitter4j.properties
and replcace the*******
with real credentials - After that, execute
java -cp target/trident-tutorial-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
tutorial.storm.trident.Skeleton
You can simulate a multi-machine Storm cluster on your local machine. To do this, first install vagrant. Then, install its host manager plugin by executing
$ vagrant plugin install vagrant-hostmanager
Finally, go to ./environment
and execute vagrant up
. It will take a while to download necessary resources, and you will be asked for root password as it edits the host file.
You can list the vagrant VMs as follows:
$ vagrant status
Current machine states:
util running (virtualbox)
zookeeper running (virtualbox)
nimbus running (virtualbox)
supervisor1 running (virtualbox)
supervisor2 running (virtualbox)
The vagrant configuration file is configured to forward standard storm/kafka/zookeeper ports from the host machine (i.e. your machine) to the appropriate guest VM, so that you can e.g. look at Storm UI by simply navigating to http://localhost:8080
. You can ssh into these machines by doing e.g. vagrant ssh nimbus
and take a look. Storm components are run by the storm
user.
Currently you have to start the Kafka server manually. To do so, ssh into the util
server like so:
vagrant ssh util
Then, start the Kafka server in the background
sudo /bin/su - kafka -c "/usr/share/kafka_2.8.0-0.8.1.1/bin/kafka-server-start.sh -daemon /usr/share/kafka_2.8.0-0.8.1.1/config/server.properties"
Get Twitter API tokens at https://dev.twitter.com
Copy ./src/main/resources/twitter4j.properties.template
as twitter4j.properties
in the same directory, and replace ********
with your actual token/secret etc. Once you've done that, execute tutorial.storm.trident.testutil.TweetIngestor localhost 9092 1.0
. The process will connect to Twitter and post the Tweets to the Kafka broker through the port-forwarding we set up on 9092. The last argument 1.0
will limit the number of Tweets that will be posted to 1.0/second in order to not overload our small virtual cluster!
If you see something like this in the logs, it's working ok
660391 [metrics-logger-reporter-thread-1] INFO tutorial.storm.trident.testutil.TweetIngestor - type=TIMER, name=tweet-ingestion, count=33386, min=0.054, max=1.0779999999999998, mean=0.1831284046692607, stddev=0.09167078947103646, median=0.16699999999999998, p75=0.215, p95=0.3150999999999999, p98=0.46509999999999974, p99=0.5769400000000005, p999=1.0721420000000006, mean_rate=50.751326369025925, m1=50.833859244762294, m5=51.93631323103864, m15=52.897234973285414, rate_unit=events/second, duration_unit=milliseconds
Build a job jar like so:
mvn clean package -DskipTests -Pcluster