Want to become a Sparkmage?
This is a guide for using Spark (PySpark) with Mage in different cloud providers (see a specific section for the cloud provider you use).
Here is an overview of the steps required to use Mage locally with Spark in AWS:
- Create an EC2 key pair
- Create an S3 bucket for Spark
- Start Mage
- Configure project’s metadata settings
- Launch EMR cluster
- SSH into EMR master node
- Sample pipeline with PySpark code
- Debugging
- Clean up
If you get stuck, run into problems, or just want someone to walk you through these steps, please join our Slack and someone will help you ASAP.
If you don’t have an existing EC2 key pair that you use to SSH into EC2 instances, follow AWS’s guide on how to create an EC2 key pair.
Once you created an EC2 key pair, note the name of the EC2 key pair and the full path of where you saved it on your local machine (we’ll need it later).
Using Spark on AWS EMR requires an AWS S3 bucket to store logs, scripts, etc.
Follow AWS’s guide to create an S3 bucket. You don’t need to add any special permissions or policies to this bucket.
Once you created an S3 bucket, note the name of the bucket (we’ll need it later).
Using Mage with Spark is much easier if you use Docker.
Type this command in your terminal to start Mage using docker
(Note: demo_project
is the name of your project, you can change it to anything you want):
docker run -it -p 6789:6789 -v $(pwd):/home/src mageai/mageai \
mage start demo_project
Open your project’s metadata.yaml
file located at the root of your project’s directory:
demo_project/metadata.yaml
(presuming your project is named demo_project
).
Change the values for the keys mentioned in the following steps.
Change the value for key ec2_key_name
to equal the name of the EC2 key pair you created
in an earlier step.
For example, if your EC2 key pair is named aws-ec2.pem
or aws-ec2.pub
,
then the value for the key ec2_key_name
should be aws-ec2
.
Change the value for key remote_variables_dir
to equal the S3 bucket you created in an earlier
step.
For example, if your S3 bucket is named my-awesome-bucket
, then the value for the key
remote_variables_dir
should be s3://my-awesome-bucket
.
You can remove the following 2 keys:
master_security_group
slave_security_group
Your final metadata.yaml
file could look like this:
variables_dir: ./
remote_variables_dir: s3://my-awesome-bucket
emr_config:
master_instance_type: 'r5.4xlarge'
slave_instance_type: 'r5.4xlarge'
ec2_key_name: aws-ec2
Note
You may need to request an increase in quota limits for using those instance types.
For more information on how to view your quota limits and request an increase, check out this AWS document.
You’ll need an AWS Access Key ID and an AWS Secret Access Key. This is provided from AWS’s IAM Management console.
Once you’ve acquired those credentials, do the following:
The following steps will create 2 IAM roles required for EMR.
aws emr create-default-roles
Using your AWS Access Key ID and an AWS Secret Access Key, run the following command in your terminal to launch an EMR cluster:
Using pip
:
Note: You must have your AWS Access Key ID and an AWS Secret Access Key in your environment variables
as AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
respectively.
mage create_spark_cluster demo_project
Using Docker:
docker run -it \
-v $(pwd):/home/src mageai/mageai \
--env AWS_ACCESS_KEY_ID=your_key_id \
--env AWS_SECRET_ACCESS_KEY=your_access_key \
mage create_spark_cluster demo_project
This script will take a few minutes to complete. Once finished, your terminal will output something like this:
Creating EMR cluster for project: /home/src/demo_project
Creating cluster...
{
"JobFlowId": "j-3500M6WJOND9Q",
"ClusterArn": "...",
"ResponseMetadata": {
"RequestId": "...,
"HTTPStatusCode": 200,
"HTTPHeaders": {
"x-amzn-requestid": "...,
"content-type": "application/x-amz-json-1.1",
"content-length": "118",
"date": "Wed, 17 Aug 2022 04:32:33 GMT"
},
"RetryAttempts": 0
}
}
Cluster ID: j-3500M6WJOND9Q
Waiting for cluster, this typically takes several minutes...
Current status: STARTING..BOOTSTRAPPING.....WAITING
Cluster j-3500M6WJOND9Q is created
Add an inbound rule to the EMR master node’s security group to allow SSH access by following these steps:
- Go to Amazon EMR.
- Click on the cluster you just created.
- Under the section Security and access, click the link next to Security groups for Master:. The link could look like this
sg-0bb79fd041def8c5d
(depending on your security group ID). - In the "Security Groups" page that just opened up, click on the row with the "Security group name" value of "ElasticMapReduce-master".
- Under the section "Inbound rules", click the button on the right labeled "Edit inbound rules".
- Scroll down and click "Add rule".
- Change the "type" dropdown from
Custom TCP
toSSH
. - Change the "Source" dropdown from
Custom
toMy IP
. - Click "Save rules" in the bottom right corner of the page.
- Go to Amazon EMR.
- Click on the cluster you just created.
- Find the Master public DNS, it should look something like this:
ec2-some-ip.us-west-2.compute.amazonaws.com
. - Make sure your EC2 key pair is read-only. Run the following command (change the location to wherever you saved your EC2 key pair locally):```bash chmod 400 ~/.ssh/aws-ec2.pem
1. In a separate terminal session, run the following command:
```bash
ssh -i [location of EC2 key pair file] \
-L 0.0.0.0:9999:localhost:8998 \
hadoop@[Master public DNS]
The command could look like this:
ssh -i ~/.ssh/aws-ec2.pem \
-L 0.0.0.0:9999:localhost:8998 \
[email protected]
If you aren’t using Docker and you installed Mage using pip
, you must run the following commands in your terminal to use the pyspark
kernel:
pip install sparkmagic
mkdir ~/.sparkmagic
wget https://raw.githubusercontent.com/jupyter-incubator/sparkmagic/master/sparkmagic/example_config.json
mv example_config.json ~/.sparkmagic/config.json
sed -i 's/localhost:8998/host.docker.internal:9999/g' ~/.sparkmagic/config.json
- Create a new pipeline by going to
File
in the top left corner of the page and then clickingNew pipeline
. - Change the pipeline’s kernel from
python
topyspark
. Click the button with the green dot and the wordpython
next to it. This is located at the top of the page on the right side of your header. - Click
+ Data loader
, thenGeneric (no template)
to add a new data loader block. - Paste the following sample code in the new data loader block:
from pandas import DataFrame
import io
import pandas as pd
import requests
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
def data_from_internet():
url = 'https://raw.githubusercontent.com/mage-ai/datasets/master/restaurant_user_transactions.csv'
response = requests.get(url)
return pd.read_csv(io.StringIO(response.text), sep=',')
@data_loader
def load_data(**kwargs) -> DataFrame:
df_spark = kwargs['spark'].createDataFrame(data_from_internet())
return df_spark
- Click
+ Data exporter
, thenGeneric (no template)
to add a new data exporter block. - Paste the following sample code in the new data exporter block (change the
s3://bucket-name
to the bucket you created from a previous step):
from pandas import DataFrame
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data(df: DataFrame, **kwargs) -> None:
(
df.write
.option('delimiter', ',')
.option('header', 'True')
.mode('overwrite')
.csv('s3://mage-spark-cluster/demo_project/demo_pipeline/')
)
Let’s load the data from S3 that we just created using Spark:
- Click
+ Data loader
, thenGeneric (no template)
to add a new data loader block. - Paste the following sample code in the new data loader block (change the
s3://bucket-name
to the bucket you created from a previous step):
from pandas import DataFrame
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
@data_loader
def load_data(**kwargs) -> DataFrame:
df = (
kwargs['spark'].read
.format('csv')
.option('header', 'true')
.option('inferSchema', 'true')
.option('delimiter', ',')
.load('s3://mage-spark-cluster/demo_project/demo_pipeline/*')
)
return df
If you run into any problems, 1st thing to try is restarting the kernel: Run
> Restart kernel
.
If that doesn’t work, restart the app by stopping the docker container and starting it again.
Please make sure to terminate your EMR cluster when you’re done using it so you can save money.
Coming soon...
Coming soon...