-
Notifications
You must be signed in to change notification settings - Fork 53
Spark AWS Exercise2
The previous exercise provides an end-to-end guide to interactively computing word counts on a file on HDFS using Spark, but it doesn't cover writing your own precompiled code or working files that are actually large. This exercise takes us to the next step: running standalone jobs on an EC2 cluster for larger files.
Note: I will use the address for a cluster I obtained for testing this exercise out. You should of course substitute your cluster's address as appropriate.
See the previous exercise as a reminder for how to launch a cluster. This time, request four worker nodes.
Before, we only ran Spark using the Spark interactive shell. However, we generally need to run standalone jobs on the cluster from the command line. To start this, let's run the example spark.examples.SparkPi
, which computes the value of pi.
Login to your cluster, go to the spark directory, and use the run
command to run the main method of SparkPi.
[root@ip-10-169-18-49 spark]# ./run spark.examples.SparkPi
Usage: SparkPi <master> [<slices>]
The first (and only required) argument is the URL of the master node. Spark stores this in the file /root/spark-ec2/cluster-url
. Cat that to get the URL.
[root@ip-10-159-62-80 spark]# cat /root/spark-ec2/cluster-url
spark://ec2-54-242-12-195.compute-1.amazonaws.com:7077
We can supply that as the argument to SparkPi.
[root@ip-10-159-62-80 spark]# ./run spark.examples.SparkPi spark://ec2-54-242-12-195.compute-1.amazonaws.com:7077
You'll see lots of output, and if everything went well, you'll have an estimate of the value of pi as one of the last lines of output.
Now let's compile our own code and run it on the cluster. Let's use the code in the spark-tutorial repository. Clone the repository, then build the code.
[root@ip-10-169-18-49 ~]# git clone git://github.com/jasonbaldridge/spark-tutorial.git
[root@ip-10-169-18-49 spark-tutorial]# cd spark-tutorial
[root@ip-10-169-18-49 spark-tutorial]# ./build package
We need to put a file on HDFS to work with.
root@ip-10-169-18-49 rawd]# ~/ephemeral-hdfs/bin/hadoop fs -mkdir input
[root@ip-10-169-18-49 spark-tutorial]# ~/ephemeral-hdfs/bin/hadoop fs -copyFromLocal ~/spark/README.md input
Now boot up SBT and run the main method of the word count program.
root@ip-10-169-18-49 spark-tutorial]# ./build
> run-main sparktutorial.WordCount spark://ec2-54-242-12-195.compute-1.amazonaws.com:7077 /root/spark target/scala-2.9.2/spark-tutorial_2.9.2-0.1.jar hdfs://ec2-54-242-12-195.compute-1.amazonaws.com:9000/user/root/input/README.md hdfs://ec2-54-242-12-195.compute-1.amazonaws.com:9000/user/root/readme-counts
Check the Spark web UI to see how the job and nodes are doing.
http://ec2-54-242-12-195.compute-1.amazonaws.com:8080/
You'll see that the WordCount program has been run in the "Completed Applications" part of the page, though it was such a trivial job you won't have seen the worker nodes getting fired up. We need more data to do that, so let's do that next.
Once it is done, you can check that the output is where you asked it to be, and then get the counts as a single file using -getmerge
.
[root@ip-10-169-18-49 spark-tutorial]# ~/ephemeral-hdfs/bin/hadoop fs -ls readme-counts
Found 2 items
-rw-r--r-- 3 root supergroup 1375 2013-04-08 17:54 /user/root/readme-counts/part-00000
-rw-r--r-- 3 root supergroup 1555 2013-04-08 17:54 /user/root/readme-counts/part-00001
[root@ip-10-169-18-49 spark-tutorial]# ~/ephemeral-hdfs/bin/hadoop fs -getmerge readme-counts local-readme-counts.txt
Often, after computing results like these, you'll often want to grab them off the cluster and copy them to your local machine. Use scp
for this---run the following on your home machine:
$ scp -i ~/.aws/spark-tutorial.pem [email protected]:/root/spark-tutorial/local-readme-counts.txt .
You can also do this with ssh
.
On my machine, ssh
responds as following:
ssh [email protected]
no such identity: /home/tass/.ssh/id_ecdsa: No such file or directory
If you get this one as well, you can link the spark-tutorial.pem
cd .ssh
ln -s ../.aws/spark-tutorial.pem id_ecdsa
No need for -i
anymore. You can even git push
into a repository on your cluster, but you can't push onto a branch you checked out. So I recommend
git push [email protected]:spark-tutorial master:update
And on the cluster
git merge update
Amazon has public datasets available. This matters because it costs money to upload data to Amazon storage, to keep it there, to transfer it, etc. Even with free files, you need to make sure you are running instances in the same region as where the datasets are held.
Here, we'll work with Freebase's Wikipedia Extraction (WEX) dataset, which has all the articles from Wikipedia from mid-2009.
To get access to the WEX data, we need to create an EBS volume with the snapshot and attach it to our cluster. This is actually quite straightforward to do. To create the volume, go to the volumes screen, and select the "Create Volume" button at the top of the screen. Fill out the information:
- Volume type: select "Standard"
- Size: The WEX data requires 66GB, but grab 80GB for this volume so that there is some space to create and work with subsets of the data.
- Availability zone: Use the same region as where your cluster is, e.g. if your cluster is on us-east-1b, then the EBS volume must be on us-east-1b (and not us-east-1a, etc) in order to be attached to your cluster.
- Snapshot: choose
snap-1781757e WEX (Linux)
The EBS volume we created could be attached to any of several EC2 instances you might have running. To attach a volume means is that you are making the data on the volume available on the file system for an EC2 instance. We can then do all the standard stuff one does with files, including putting them from the node to HDFS.
So, let's attach the volume we created above to our master node. Look at your instances page and find which node is the master node of your cluster---if you look at the "Security Groups" column, it will have the label SparkTutorial-master
. Next, back in the volumes screen, right click on the EBS volume that you created with the WEX snapshot, and then choose "Attach Volume" and select the instance label of the master node from the drop down menu. This makes the volume available on the master node, usually as device /dev/xvdf
(but look at the messages on the AWS interface to see if it might be another device). We can now mount that device on the file system as follows.
[root@ip-10-169-18-49 dev]# mkdir /mnt/wex
[root@ip-10-169-18-49 dev]# mount /dev/xvdf /mnt/wex
That's all you need for now, but if you want more information, there are many more details in this AWS article about EBS.
Now, go to the directory to inspect the data.
[root@ip-10-169-18-49 ~]# cd /mnt/wex/
[root@ip-10-169-18-49 wex]# ls
fixd freebase_wex.icss.yaml log lost+found pkgd rawd README README-credits README-dirs README-license ripd tmp
The file we are most interested in is /mnt/wex/rawd/freebase-wex-2009-01-12-articles.tsv
, which has all the pages of Wikipedia from 2009, one per line. Each line has five fields, separated by tabs:
- page id
- page title
- last date the page was modified
- XML version of the page
- plain text version of the page
For more information, see the full documentation for WEX.
Here are the first three columns of the head of the file.
[root@ip-10-169-18-49 rawd]# head freebase-wex-2009-01-12-articles.tsv | awk -F "\\t" '{print $1"\t"$2"\t"$3}'
12 Anarchism 2008-12-30 06:23:05
25 Autism 2008-12-24 20:41:05
39 Albedo 2008-12-29 18:19:09
290 A 2008-12-27 04:33:16
303 Alabama 2008-12-29 08:15:47
305 Achilles 2008-12-30 06:18:01
307 Abraham Lincoln 2008-12-28 20:18:23
308 Aristotle 2008-12-29 23:54:48
309 An American in Paris 2008-09-27 19:29:28
324 Academy Award 2008-12-28 17:50:43
The XML and plain text of the articles are obviously quite large, so I won't show them here.
So, even though our cluster can now manage the full data set, let's cut it down a bit to allow the exercise to move more quickly. A side note: it isn't ideal to be working with plain text since it is uncompressed and the larger the file, the longer it takes to transfer from the local filesystem to HDFS. But, for present purposes, this makes other things easy, so we'll stick with plain text.
To grab the first 50,000 articles, and getting just the plain text and not the XML, do the following.
root@ip-10-169-18-49 rawd]# head -50000 freebase-wex-2009-01-12-articles.tsv | awk -F "\\t" '{print $1"\t"$2"\t"$3"\t"$5}' > subset.tsv
This takes about 1-2 minutes and creates a file that is 363MB. Now put it on HDFS.
[root@ip-10-169-18-49 rawd]# ~/ephemeral-hdfs/bin/hadoop fs -copyFromLocal subset.tsv input
First, let's use the Spark shell on the cluster. Previously, we were just using Spark on the local machine: though it had access to HDFS, it did not spread the tasks out to the workers and instead computed everything on the master node. To kick the Spark shell off on the cluster, call it as follows:
[root@ip-10-169-18-49 spark]# MASTER=spark://ec2-54-242-12-195.compute-1.amazonaws.com:7077 ./spark-shell
Once everything is booted up, go into paste mode and put in the following code:
scala> :paste
// Entering paste mode (ctrl-D to finish)
val wex = sc.textFile("hdfs://ec2-54-242-12-195.compute-1.amazonaws.com:9000/user/root/input/subset.tsv")
val wexCounts = wex
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
wexCounts.saveAsTextFile("hdfs://ec2-54-242-12-195.compute-1.amazonaws.com:9000/user/root/wex-counts")
// Exiting paste mode, now interpreting.
The output is now in three separate files.
root@ip-10-169-18-49 spark]# ~/ephemeral-hdfs/bin/hadoop fs -ls wex-counts
Found 3 items
-rw-r--r-- 3 root supergroup 13532440 2013-04-08 17:19 /user/root/wex-counts/part-00000
-rw-r--r-- 3 root supergroup 13529297 2013-04-08 17:19 /user/root/wex-counts/part-00001
-rw-r--r-- 3 root supergroup 13519273 2013-04-08 17:19 /user/root/wex-counts/part-00002
As an exercise, get these as a single file on the local machine with a single command.
Exercise: I was sloppy with how I computed word counts, given the format of the WEX file. Fix this and recompute the counts.
Exercise: Adapt the WordCount.scala code so that it works with the WEX data, run it as a stand-alone program to get the word counts.
Exercise: Work with the Google Books Ngrams data. This can be done either via EBS or directly from the Google Books Ngrams S3 bucket (see the AWS page for the Ngrams data). Write a program that computes the histogram of counts per year for a given word.
Exercise: Use the XML version of the pages and create a link graph between pages based on the links and the page titles. (E.g. such that you could run PageRank.)
Make sure to delete unneeded files and volumes, which otherwise will cost you. Detach the EBS volume you created for the WEX snapshot, and delete it.