-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsql_queries.py
222 lines (190 loc) · 13.2 KB
/
sql_queries.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import configparser
# Using config file to store AWS credentials and cluster details
# Using configparser library to extract the config details to be used within ETL process
config = configparser.ConfigParser()
config.read('dwh.cfg')
# DROP TABLE STATEMENTS
drop_demo_total = 'DROP TABLE IF EXISTS state_demographics;'
drop_demo_race = 'DROP TABLE IF EXISTS state_demographics_by_race;'
drop_visitors_analysis = 'DROP TABLE IF EXISTS visitor_analysis;'
drop_stage_visitors_analysis = 'DROP TABLE IF EXISTS stage_visitor_analysis;'
drop_visitors = 'DROP TABLE IF EXISTS visitors;'
drop_stage_visitors = 'DROP TABLE IF EXISTS stage_visitors;'
drop_arrival_date = 'DROP TABLE IF EXISTS arrival_date;'
drop_stage_arrival_date = 'DROP TABLE IF EXISTS stage_arrival_date;'
# TRUNCATE TABLE
# TRUNCATE command may commit other operations when it commits itself
truncate_demo_total = 'TRUNCATE TABLE state_demographics;'
truncate_demo_race = 'TRUNCATE TABLE state_demographics_by_race;'
truncate_stage_visitors_analysis = 'TRUNCATE TABLE stage_visitor_analysis;'
truncate_stage_visitors = 'TRUNCATE TABLE stage_visitors;'
truncate_stage_arrival_date = 'TRUNCATE TABLE stage_arrival_date;'
# CREATE TABLE STATEMENTS
create_demo_total = ("""CREATE TABLE state_demographics(city VARCHAR(256) NOT NULL
, state_code CHAR(2) NOT NULL
, state VARCHAR(256)
, median_age NUMERIC(3,1)
, male_population BIGINT
, female_population BIGINT
, total_population BIGINT NOT NULL
, number_of_veterans BIGINT
, foreign_born BIGINT
, average_household_size NUMERIC(4,2)
, CONSTRAINT tot_pop_pkey PRIMARY KEY (city, state_code))
DISTSTYLE ALL
SORTKEY (state_code, city);
""")
create_demo_race = ("""CREATE TABLE state_demographics_by_race(city VARCHAR(256) NOT NULL
, state_code CHAR(2) NOT NULL
, state VARCHAR(256)
, race VARCHAR(256)
, count BIGINT NOT NULL
, CONSTRAINT race_pkey PRIMARY KEY (city, state_code, race))
DISTSTYLE ALL
SORTKEY (state_code, city);
""")
create_stage_visitor_analysis = ("""CREATE TABLE stage_visitor_analysis(
admission_id BIGINT
, year_month INT
, arrival_date DATE
, dest_state_code VARCHAR(10)
, port_city VARCHAR(256)
, port_state_code VARCHAR(10)
, entry_mode VARCHAR(10)
, visa_category VARCHAR(20)
, origin_country VARCHAR(256)
, port_dest_equal_flag CHAR(1))
BACKUP NO;
""")
create_visitor_analysis = ("""CREATE TABLE visitor_analysis(admission_id BIGINT NOT NULL
, year_month INT NOT NULL DISTKEY SORTKEY
, arrival_date DATE NOT NULL
, dest_state_code VARCHAR(10)
, port_city VARCHAR(256)
, port_state_code VARCHAR(10)
, entry_mode VARCHAR(10)
, visa_category VARCHAR(20)
, origin_country VARCHAR(256)
, port_dest_equal_flag CHAR(1)
, CONSTRAINT vis_ana_pkey PRIMARY KEY (admission_id, arrival_date))
DISTSTYLE KEY;
""")
create_stage_visitors = ("""CREATE TABLE stage_visitors(
admission_id BIGINT
, year_month INT
, origin_country VARCHAR(256)
, dest_state_code VARCHAR(10)
, port_city VARCHAR(256)
, port_state_code VARCHAR(10)
, entry_mode VARCHAR(10)
, visitor_age INT
, visitor_birth_year INT
, arrival_date DATE
, departure_date DATE
, visa_category VARCHAR(20)
, visatype VARCHAR(10)
, visa_expire_date VARCHAR(20)
, gender VARCHAR(10))
BACKUP NO;
""")
create_visitors = ("""CREATE TABLE visitors(admission_id BIGINT NOT NULL
, year_month INT NOT NULL DISTKEY SORTKEY
, origin_country VARCHAR(256)
, dest_state_code VARCHAR(10)
, port_city VARCHAR(256)
, port_state_code VARCHAR(10)
, entry_mode VARCHAR(10)
, visitor_age INT
, visitor_birth_year INT
, arrival_date DATE NOT NULL
, departure_date DATE
, visa_category VARCHAR(20)
, visatype VARCHAR(10)
, visa_expire_date VARCHAR(20)
, gender VARCHAR(10)
, CONSTRAINT visitors_pkey PRIMARY KEY (admission_id, arrival_date))
DISTSTYLE KEY;
""")
create_stage_arrival_date = ("""CREATE TABLE stage_arrival_date(
arrival_date DATE
, "year" INT
, "month" INT
, "day" INT
, weekday INT
, weeknum INT)
BACKUP NO;
""")
create_arrival_date = ("""CREATE TABLE arrival_date(arrival_date DATE NOT NULL PRIMARY KEY
, "year" INT
, "month" INT
, "day" INT
, weekday INT
, weeknum INT)
DISTSTYLE ALL
SORTKEY (year, month);
""")
# COPY STATEMENTS
copy_demo_total = ("""COPY state_demographics FROM '{}'
CREDENTIALS 'aws_iam_role={}'
FORMAT AS JSON 'auto'
region 'us-west-2';
""").format(config['S3']['DEMO_TOT'],config['IAM_ROLE']['ARN'])
copy_demo_race = ("""COPY state_demographics_by_race FROM '{}'
CREDENTIALS 'aws_iam_role={}'
FORMAT AS JSON 'auto'
region 'us-west-2';
""").format(config['S3']['DEMO_RACE'],config['IAM_ROLE']['ARN'])
copy_stage_visitor_analysis = ("""COPY stage_visitor_analysis FROM '{}'
CREDENTIALS 'aws_iam_role={}'
FORMAT AS JSON 'auto'
region 'us-west-2';
""").format(config['S3']['VISITOR_ANA'],config['IAM_ROLE']['ARN'])
copy_stage_visitors = ("""COPY stage_visitors FROM '{}'
CREDENTIALS 'aws_iam_role={}'
FORMAT AS JSON 'auto'
region 'us-west-2';
""").format(config['S3']['VISITORS'],config['IAM_ROLE']['ARN'])
copy_stage_arr_date = ("""COPY stage_arrival_date FROM '{}'
CREDENTIALS 'aws_iam_role={}'
FORMAT AS JSON 'auto'
region 'us-west-2';
""").format(config['S3']['ARR_DATE'],config['IAM_ROLE']['ARN'])
# INSERT STATEMENTS
insert_visitor_analysis = """INSERT INTO visitor_analysis(admission_id, year_month, arrival_date
, dest_state_code, port_city, port_state_code, entry_mode
, visa_category, origin_country, port_dest_equal_flag)
SELECT admission_id, year_month, arrival_date, dest_state_code
, port_city, port_state_code, entry_mode, visa_category
, origin_country, port_dest_equal_flag
FROM stage_visitor_analysis;
"""
insert_visitors = """INSERT INTO visitors(admission_id, year_month, origin_country, dest_state_code
, port_city, port_state_code, entry_mode, visitor_age, visitor_birth_year
, arrival_date, departure_date, visa_category, visatype
, visa_expire_date, gender)
SELECT admission_id, year_month, origin_country, dest_state_code, port_city
, port_state_code, entry_mode, visitor_age, visitor_birth_year
, arrival_date, departure_date, visa_category, visatype
, visa_expire_date, gender
FROM stage_visitors;
"""
insert_arr_date = """INSERT INTO arrival_date(arrival_date, "year", "month", "day", weekday, weeknum)
SELECT arrival_date, "year", "month", "day", weekday, weeknum
FROM stage_arrival_date;
"""
# Data quality check queries
check_zero_count = ("""SELECT COUNT(*) FROM {};""")
check_unique_key = ("""SELECT COUNT({}) - COUNT(DISTINCT{}) FROM {};""")
# QUERY LISTS
create_table_queries = [create_demo_race, create_demo_total, create_arrival_date
, create_visitor_analysis, create_visitors, create_stage_arrival_date
, create_stage_visitor_analysis, create_stage_visitors]
drop_table_queries = [drop_demo_total, drop_demo_race, drop_visitors_analysis
, drop_visitors, drop_arrival_date, drop_stage_arrival_date
, drop_stage_visitors, drop_stage_visitors_analysis]
trunc_demo_table_queries = [truncate_demo_total, truncate_demo_race]
copy_demo_table_queries = [copy_demo_total, copy_demo_race]
trunc_immi_table_queries = [truncate_stage_arrival_date, truncate_stage_visitors
, truncate_stage_visitors_analysis]
copy_immi_table_queries = [copy_stage_visitor_analysis, copy_stage_visitors, copy_stage_arr_date]
insert_immi_table_queries = [insert_visitor_analysis, insert_visitors, insert_arr_date]