Skip to content

Commit

Permalink
Merge pull request #7 from sfc-gh-chmarshall/main
Browse files Browse the repository at this point in the history
Updates for Summit '24 Hands on Lab
  • Loading branch information
iamontheinet authored May 31, 2024
2 parents 83a36f7 + fc0fb61 commit 648ed20
Show file tree
Hide file tree
Showing 18 changed files with 238 additions and 321 deletions.
12 changes: 5 additions & 7 deletions .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ USER vscode
WORKDIR /home/vscode

# Configure SnowSQL
RUN mkdir .snowsql
COPY .devcontainer/config .snowsql

# Install SnowSQL
RUN curl -O https://sfc-repo.snowflakecomputing.com/snowsql/bootstrap/1.2/linux_x86_64/snowsql-1.2.28-linux_x86_64.bash \
&& SNOWSQL_DEST=~/bin SNOWSQL_LOGIN_SHELL=~/.profile bash snowsql-1.2.28-linux_x86_64.bash \
&& rm snowsql-1.2.28-linux_x86_64.bash
RUN mkdir .snowflake
COPY --chown=vscode:vscode .devcontainer/config.toml .snowflake
COPY --chown=vscode:vscode .devcontainer/connections.toml .snowflake
RUN chmod 0600 .snowflake/config.toml \
&& chmod 0600 .snowflake/connections.toml

# Create the conda environment
COPY environment.yml .
Expand Down
9 changes: 0 additions & 9 deletions .devcontainer/config

This file was deleted.

7 changes: 7 additions & 0 deletions .devcontainer/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Can override the default connection name with an environment variable as follows
#export SNOWFLAKE_DEFAULT_CONNECTION_NAME="default"

# Only for Snow CLI, can override connection details as follows
#export SNOWFLAKE_CONNECTIONS_DEFAULT_PASSWORD=""

default_connection_name = "default"
11 changes: 11 additions & 0 deletions .devcontainer/connections.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Can override the default connection name with an environment variable as follows
#export SNOWFLAKE_DEFAULT_CONNECTION_NAME="default"

[default]
account = "myaccount"
user = "myuser"
password = "mypassword"
role = "HOL_ROLE"
warehouse = "HOL_WH"
database = "HOL_DB"
schema = "HOL_SCHEMA"
10 changes: 10 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"configurations": [
{
"name": "Python Debugger: Python File",
"type": "debugpy",
"request": "launch",
"program": "${file}"
}
]
}
70 changes: 70 additions & 0 deletions app/05_raw_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#------------------------------------------------------------------------------
# Hands-On Lab: Data Engineering with Snowpark
# Script: 02_load_raw.py
# Author: Jeremiah Hansen, Caleb Baechtold
# Last Updated: 1/9/2023
#------------------------------------------------------------------------------

import time
from snowflake.snowpark import Session


POS_TABLES = ['country', 'franchise', 'location', 'menu', 'truck', 'order_header', 'order_detail']
CUSTOMER_TABLES = ['customer_loyalty']
TABLE_DICT = {
"pos": {"schema": "RAW_POS", "tables": POS_TABLES},
"customer": {"schema": "RAW_CUSTOMER", "tables": CUSTOMER_TABLES}
}

# SNOWFLAKE ADVANTAGE: Schema detection
# SNOWFLAKE ADVANTAGE: Data ingestion with COPY
# SNOWFLAKE ADVANTAGE: Snowflake Tables (not file-based)

def load_raw_table(session, tname=None, s3dir=None, year=None, schema=None):
session.use_schema(schema)
if year is None:
location = "@external.frostbyte_raw_stage/{}/{}".format(s3dir, tname)
else:
print('\tLoading year {}'.format(year))
location = "@external.frostbyte_raw_stage/{}/{}/year={}".format(s3dir, tname, year)

# we can infer schema using the parquet read option
df = session.read.option("compression", "snappy") \
.parquet(location)
df.copy_into_table("{}".format(tname))

# SNOWFLAKE ADVANTAGE: Warehouse elasticity (dynamic scaling)

def load_all_raw_tables(session):
_ = session.sql("ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE").collect()

for s3dir, data in TABLE_DICT.items():
tnames = data['tables']
schema = data['schema']
for tname in tnames:
print("Loading {}".format(tname))
# Only load the first 3 years of data for the order tables at this point
# We will load the 2022 data later in the lab
if tname in ['order_header', 'order_detail']:
for year in ['2019', '2020', '2021']:
load_raw_table(session, tname=tname, s3dir=s3dir, year=year, schema=schema)
else:
load_raw_table(session, tname=tname, s3dir=s3dir, schema=schema)

_ = session.sql("ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XSMALL").collect()

def validate_raw_tables(session):
# check column names from the inferred schema
for tname in POS_TABLES:
print('{}: \n\t{}\n'.format(tname, session.table('RAW_POS.{}'.format(tname)).columns))

for tname in CUSTOMER_TABLES:
print('{}: \n\t{}\n'.format(tname, session.table('RAW_CUSTOMER.{}'.format(tname)).columns))


# For local debugging
if __name__ == "__main__":
# Create a local Snowpark session
with Session.builder.getOrCreate() as session:
load_all_raw_tables(session)
validate_raw_tables(session)
Original file line number Diff line number Diff line change
@@ -1,31 +1,3 @@
/*-----------------------------------------------------------------------------
Hands-On Lab: Intro to Data Engineering with Snowpark Python
Script: 06_load_daily_city_metrics.sql
Author: Jeremiah Hansen
Last Updated: 9/26/2023
-----------------------------------------------------------------------------*/

-- SNOWFLAKE ADVANTAGE: Snowpark Python programmability
-- SNOWFLAKE ADVANTAGE: Snowpark DataFrame API


USE ROLE HOL_ROLE;
USE WAREHOUSE HOL_WH;
USE SCHEMA HOL_DB.HOL_SCHEMA;


-- ----------------------------------------------------------------------------
-- Step 1: Create the stored procedure to load DAILY_CITY_METRICS
-- ----------------------------------------------------------------------------

CREATE OR REPLACE PROCEDURE LOAD_DAILY_CITY_METRICS_SP()
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.10'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'main'
AS
$$
from snowflake.snowpark import Session
import snowflake.snowpark.functions as F

Expand All @@ -34,17 +6,20 @@ def table_exists(session, schema='', name=''):
return exists

def main(session: Session) -> str:
_ = session.sql('ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE').collect()
schema_name = "HOL_SCHEMA"
table_name = "DAILY_CITY_METRICS"

# Define the tables
order_detail = session.table("ORDER_DETAIL")
order_detail = session.table("RAW_POS.ORDER_DETAIL")
order_header = session.table("RAW_POS.ORDER_HEADER")
history_day = session.table("FROSTBYTE_WEATHERSOURCE.ONPOINT_ID.HISTORY_DAY")
location = session.table("LOCATION")
location = session.table("RAW_POS.LOCATION")

# Join the tables
order_detail = order_detail.join(location, order_detail['LOCATION_ID'] == location['LOCATION_ID'])
order_detail = order_detail.join(history_day, (F.builtin("DATE")(order_detail['ORDER_TS']) == history_day['DATE_VALID_STD']) & (location['ISO_COUNTRY_CODE'] == history_day['COUNTRY']) & (location['CITY'] == history_day['CITY_NAME']))
orders = order_header.join(order_detail, order_header['ORDER_ID'] == order_detail['ORDER_ID'])
orders = orders.join(location, orders['LOCATION_ID'] == location['LOCATION_ID'])
order_detail = orders.join(history_day, (F.builtin("DATE")(order_header['ORDER_TS']) == history_day['DATE_VALID_STD']) & (orders['ISO_COUNTRY_CODE'] == history_day['COUNTRY']) & (orders['CITY'] == history_day['CITY_NAME']))

# Aggregate the data
final_agg = order_detail.group_by(F.col('DATE_VALID_STD'), F.col('CITY_NAME'), F.col('ISO_COUNTRY_CODE')) \
Expand All @@ -59,27 +34,24 @@ def main(session: Session) -> str:
F.round(F.col("AVG_PRECIPITATION_IN"), 2).alias("AVG_PRECIPITATION_INCHES"), \
)

session.use_schema(schema_name)
# If the table doesn't exist then create it
if not table_exists(session, schema=schema_name, name=table_name):
final_agg.write.mode("overwrite").save_as_table(table_name)

_ = session.sql('ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XSMALL').collect()
return f"Successfully created {table_name}"
# Otherwise update it
else:
cols_to_update = {c: final_agg[c] for c in final_agg.schema.names}

dcm = session.table(f"{schema_name}.{table_name}")
dcm = session.table(table_name)
dcm.merge(final_agg, (dcm['DATE'] == final_agg['DATE']) & (dcm['CITY_NAME'] == final_agg['CITY_NAME']) & (dcm['COUNTRY_DESC'] == final_agg['COUNTRY_DESC']), \
[F.when_matched().update(cols_to_update), F.when_not_matched().insert(cols_to_update)])

_ = session.sql('ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XSMALL').collect()
return f"Successfully updated {table_name}"
$$;


-- ----------------------------------------------------------------------------
-- Step 2: Load the DAILY_CITY_METRICS table
-- ----------------------------------------------------------------------------

CALL LOAD_DAILY_CITY_METRICS_SP();

SELECT * FROM DAILY_CITY_METRICS;

# For local debugging
if __name__ == "__main__":
# Create a local Snowpark session
with Session.builder.getOrCreate() as session:
main(session)
File renamed without changes.
43 changes: 25 additions & 18 deletions deploy_snowpark_apps.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import sys;
import os;
import sys
import os
import yaml

ignore_folders = ['__pycache__', '.ipynb_checkpoints']
ignore_folders = ['.git', '__pycache__', '.ipynb_checkpoints']
snowflake_project_config_filename = 'snowflake.yml'

if len(sys.argv) != 2:
print("Root directory is required")
Expand All @@ -20,23 +22,28 @@
# print(f"Skipping ignored folder {directory_path}")
continue

# An app.toml file in the folder is our indication that this folder contains
# a snowcli Snowpark App
if not "app.toml" in file_names:
# An snowflake.yml file in the folder is our indication that this folder contains
# a Snow CLI project
if not snowflake_project_config_filename in file_names:
# print(f"Skipping non-app folder {directory_path}")
continue
print(f"Found Snowflake project in folder {directory_path}")

# Next determine what type of app it is
app_type = "unknown"
if "local_connection.py" in file_names:
app_type = "procedure"
else:
app_type = "function"
# Read the project config
project_settings = {}
with open(f"{directory_path}/{snowflake_project_config_filename}", "r") as yamlfile:
project_settings = yaml.load(yamlfile, Loader=yaml.FullLoader)

# Finally deploy the app with the snowcli tool
print(f"Found {app_type} app in folder {directory_path}")
print(f"Calling snowcli to deploy the {app_type} app")
# Confirm that this is a Snowpark project
if 'snowpark' not in project_settings:
print(f"Skipping non Snowpark project in folder {base_name}")
continue

# Finally deploy the Snowpark project with the snowcli tool
print(f"Found Snowflake Snowpark project '{project_settings['snowpark']['project_name']}' in folder {base_name}")
print(f"Calling snowcli to deploy the project")
os.chdir(f"{directory_path}")
# snow login will update the app.toml file with the correct path to the snowsql config file
os.system(f"snow login -c {root_directory}/config -C dev")
os.system(f"snow {app_type} create")
# Make sure all 6 SNOWFLAKE_ environment variables are set
# SnowCLI accesses the passowrd directly from the SNOWFLAKE_PASSWORD environmnet variable
os.system(f"snow snowpark build")
os.system(f"snow snowpark deploy --replace --allow-shared-libraries --temporary-connection --account $SNOWFLAKE_ACCOUNT --user $SNOWFLAKE_USER --role $SNOWFLAKE_ROLE --warehouse $SNOWFLAKE_WAREHOUSE --database $SNOWFLAKE_DATABASE")
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ dependencies:
- openssl=1.1.1
- pip:
# Snowflake
- snowflake-cli-labs
- snowflake
- snowflake-cli-labs==0.2.9
4 changes: 1 addition & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
snowflake-snowpark-python[pandas]
snowflake
snowflake-cli-labs==0.2.9
snowflake-snowpark-python
21 changes: 21 additions & 0 deletions snowflake.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
definition_version: 1
snowpark:
project_name: "hol"
stage_name: "hol_schema.deployment"
src: "app/"

procedures:
- name: "load_daily_city_metrics_sp"
database: "hol_db"
schema: "hol_schema"
handler: "06_load_daily_city_metrics.main"
runtime: "3.10"
signature: ""
returns: string
- name: "load_raw_data_sp"
database: "hol_db"
schema: "hol_schema"
handler: "05_raw_data.load_all_raw_tables"
runtime: "3.10"
signature: ""
returns: string
49 changes: 49 additions & 0 deletions steps/03_git_config.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@

USE ROLE SECURITYADMIN;
SET MY_USER = CURRENT_USER();
CREATE ROLE IF NOT EXISTS GIT_ADMIN;
GRANT ROLE GIT_ADMIN to ROLE SYSADMIN;
GRANT ROLE GIT_ADMIN TO USER IDENTIFIER($MY_USER);


USE ROLE SYSADMIN;
CREATE OR REPLACE DATABASE GIT_REPO;
USE SCHEMA PUBLIC;
GRANT OWNERSHIP ON DATABASE GIT_REPO TO ROLE GIT_ADMIN;
USE DATABASE GIT_REPO;
GRANT OWNERSHIP ON SCHEMA PUBLIC TO ROLE GIT_ADMIN;


USE ROLE GIT_ADMIN;
USE DATABASE GIT_REPO;
USE SCHEMA PUBLIC;
CREATE OR REPLACE SECRET GIT_SECRET
TYPE = PASSWORD
USERNAME = '<your_git_user'
PASSWORD = '<your_personal_access_token>';

--Create an API integration for interacting with the repository API
USE ROLE ACCOUNTADMIN;
GRANT CREATE INTEGRATION ON ACCOUNT TO ROLE GIT_ADMIN;
USE ROLE GIT_ADMIN;

CREATE OR REPLACE API INTEGRATION GIT_API_INTEGRATION
API_PROVIDER = GIT_HTTPS_API
API_ALLOWED_PREFIXES = ('https://github.com/<your_git_user>')
ALLOWED_AUTHENTICATION_SECRETS = (GIT_SECRET)
ENABLED = TRUE;

CREATE OR REPLACE GIT REPOSITORY DE_QUICKSTART
API_INTEGRATION = GIT_API_INTEGRATION
GIT_CREDENTIALS = GIT_SECRET
ORIGIN = '<your git repo URL ending in .git>';

SHOW GIT BRANCHES IN DE_QUICKSTART;
ls @DE_QUICKSTART/branches/main;

USE ROLE ACCOUNTADMIN;
SET MY_USER = CURRENT_USER();
EXECUTE IMMEDIATE
FROM @GIT_REPO.PUBLIC.DE_QUICKSTART/branches/main/steps/03_setup_snowflake.sql
USING (MY_USER=>$MY_USER);

Loading

0 comments on commit 648ed20

Please sign in to comment.