Skip to content

Commit

Permalink
[ETL-676] Create data loading procedure for each parquet datatype (#132)
Browse files Browse the repository at this point in the history
* create procedure to copy data into table from stage

* Add external stage for parquet dev

* return resultset rather than table

* Revert "return resultset rather than table"

This reverts commit c265b5f.

* create a procedure for each parquet datatype
  • Loading branch information
philerooski authored Jul 23, 2024
1 parent 355af2b commit 6a610b1
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 5 deletions.
30 changes: 28 additions & 2 deletions snowflake/objects/database/recover/schema/parquet/deploy.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
/*
Create a parquet schema (if it doesn't yet exist) and deploy all child objects.
Jinja templating variables:
git_branch - The name of the git branch from which we are deploying.
*/
CREATE SCHEMA IF NOT EXISTS parquet;
USE SCHEMA parquet;

SET parquet_file_format_name = 'parquet_format';
SET parquet_stage_name = 'parquet_s3';
SET parquet_prod_stage_name = 'parquet_prod_s3';
SET parquet_dev_stage_name = 'parquet_dev_s3';

EXECUTE IMMEDIATE
FROM './file_format/deploy.sql'
Expand All @@ -16,7 +20,29 @@ EXECUTE IMMEDIATE
FROM './stage/deploy.sql'
USING (
git_branch => '{{ git_branch }}',
parquet_stage_name => $parquet_stage_name
parquet_prod_stage_name => $parquet_prod_stage_name,
parquet_dev_stage_name => $parquet_dev_stage_name
);
EXECUTE IMMEDIATE
FROM './table/deploy.sql';
EXECUTE IMMEDIATE
$$
BEGIN
IF ('{{ git_branch }}' = 'main') THEN
-- Our procedures will reference the prod stage
EXECUTE IMMEDIATE
FROM './procedure/deploy.sql'
USING (
stage_name => $parquet_prod_stage_name,
file_format => $parquet_file_format_name
);
ELSE
EXECUTE IMMEDIATE
FROM './procedure/deploy.sql'
USING (
stage_name => $parquet_dev_stage_name,
file_format => $parquet_file_format_name
);
END IF;
END;
$$;
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
A stored procedure which copies Parquet data from a named stage into a table.
Because of limitations in how we can pass variables to stage names,
this procedure is specific to a stage location. That is, we cannot
use Snowflake scripting variables within the stage name, so we instead
use Jinja variables, which has the side effect of "fixing" the procedure
to use a specific stage location.
Jinja templating variables:
datatype - The datatype which our stage location refers to.
stage_name - The name of the stage where our data exists.
stage_path - The location within the stage where our data exists.
file_format - The name of the file format object used during copy.
*/
CREATE OR REPLACE PROCEDURE copy_into_table_from_{{ datatype }}_parquet_stage(
target_table VARCHAR
)
RETURNS TABLE ()
LANGUAGE SQL
as
$$
DECLARE
res RESULTSET DEFAULT (
COPY INTO IDENTIFIER(:target_table)
FROM @{{ stage_name }}/{{ stage_path }}
FILE_FORMAT = (
FORMAT_NAME = '{{ file_format }}'
)
MATCH_BY_COLUMN_NAME = CASE_SENSITIVE
);
BEGIN
RETURN TABLE(res);
END;
$$;
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Deploy all PROCEDURE objects
Jinja templating variables:
stage_name - The name of the stage where our data exists.
file_format - The name of the file format object used by the
`copy_into_table_from_stage.sql` procedure.
*/

WITH create_procedure_for_each_parquet_table AS PROCEDURE ()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
DECLARE
parquet_datatypes ARRAY := [
'enrolledparticipants_customfields_symptoms',
'enrolledparticipants_customfields_treatments',
'enrolledparticipants',
'fitbitactivitylogs',
'fitbitdailydata',
'fitbitdevices',
'fitbitecg',
'fitbitecg_waveformsamples',
'fitbitintradaycombined',
'fitbitrestingheartrates',
'fitbitsleeplogs',
'fitbitsleeplogs_sleeplogdetails',
'googlefitsamples',
'healthkitv2activitysummaries',
'healthkitv2electrocardiogram',
'healthkitv2electrocardiogram_subsamples',
'healthkitv2heartbeat',
'healthkitv2heartbeat_subsamples',
'healthkitv2samples',
'healthkitv2statistics',
'healthkitv2workouts_events',
'healthkitv2workouts',
'symptomlog',
'symptomlog_value_symptoms',
'symptomlog_value_treatments'
];
datatype VARCHAR;
dataset_name VARCHAR;
BEGIN
FOR i in 0 to array_size(:parquet_datatypes)-1 DO
datatype := GET(:parquet_datatypes, :i);
dataset_name := CONCAT('dataset_', :datatype);
-- Create a stored procedure which uses this data type's stage location
EXECUTE IMMEDIATE
FROM './copy_into_table_from_stage.sql'
USING (
datatype => :datatype,
stage_name => '{{ stage_name }}',
stage_path => :dataset_name,
file_format => '{{ file_format }}'
);
END FOR;
END;
$$
CALL create_procedure_for_each_parquet_table();
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@
Deploy all stages under the `parquet` schema.
*/
EXECUTE IMMEDIATE
FROM './parquet_s3.sql'
FROM './parquet_prod_s3.sql'
USING (
git_branch => '{{ git_branch }}',
parquet_stage_name => '{{ parquet_stage_name }}'
parquet_stage_name => '{{ parquet_prod_stage_name }}'
);
EXECUTE IMMEDIATE
FROM './parquet_dev_s3.sql'
USING (
git_branch => '{{ git_branch }}',
parquet_stage_name => '{{ parquet_dev_stage_name }}'
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/*
Create an external stage over the dev Parquet data in S3
*/
CREATE OR REPLACE STAGE {{ parquet_stage_name }}
URL = 's3://recover-dev-processed-data/{{ git_branch }}/parquet/'
STORAGE_INTEGRATION = recover_dev_s3;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Create an external stage over the Parquet data in S3
Create an external stage over the production Parquet data in S3
*/
CREATE OR REPLACE STAGE {{ parquet_stage_name }}
URL = 's3://recover-processed-data/{{ git_branch }}/parquet/'
Expand Down
3 changes: 3 additions & 0 deletions snowflake/objects/deploy.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
- STORAGE INTEGRATION `RECOVER_PROD_S3`
* An S3 storage integration which allows access to the
S3 buckets in the RECOVER production account.
- STORAGE INTEGRATION `RECOVER_DEV_S3`
* An S3 storage integration which allows access to the
S3 buckets in the RECOVER dev account.
Additionally, we assume that the following databases have already been created
when deploying to the "staging" or "main" environment, respectively:
Expand Down

0 comments on commit 6a610b1

Please sign in to comment.