-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathnewstage_7_70_pushToTargetSchema.py
103 lines (86 loc) · 3.75 KB
/
newstage_7_70_pushToTargetSchema.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#%% [markdown]
# # Stage : Push to Target Schema
# Haven't figured out how best to do this yet!
# It would be great if you could define some canonical form, or create from an existing file / database table schema?
# Then you could apply some ML to infer / learn how to map?
#%%
# Import all of the libraries we need to use...
import pandas as pd
import azureml.dataprep as dprep
import os as os
import re as re
import collections
import pyodbc
import urllib
import sqlalchemy
from azureml.dataprep import value
from azureml.dataprep import col
from azureml.dataprep import Dataflow
from commonCode import savePackage, openPackage, createFullPackagePath
#%%
sourceFileName = 'UPMPERSON'
previousStageNumber = '60'
stageNumber = '70'
#%%
dataFlow = Dataflow.open('./packages/' + sourceFileName + '/' + previousStageNumber + '/' + sourceFileName +'_A_package.dprep')
#%%
# Set SQL connection string and SQL command string...
server = 'localhost'
database = 'HRDev_POC'
driver = 'DRIVER={SQL Server Native Client 11.0};SERVER='+ server +';DATABASE='+ database +';Trusted_Connection=yes;Connection Timeout=120'
conn = pyodbc.connect(driver)
cursor = conn.cursor()
#%%
# Get target schema
strSQL = 'SELECT OBJECT_SCHEMA_NAME(T.[object_id],DB_ID()) AS [Schema], T.[name] AS [table_name], AC.[name] AS [column_name], TY.[name] AS system_data_type, AC.[max_length], AC.[precision], AC.[scale], AC.[is_nullable], AC.[is_ansi_padded] FROM sys.[tables] AS T INNER JOIN sys.[all_columns] AC ON T.[object_id] = AC.[object_id] INNER JOIN sys.[types] TY ON AC.[system_type_id] = TY.[system_type_id] AND AC.[user_type_id] = TY.[user_type_id] WHERE T.[is_ms_shipped] = 0 ORDER BY T.[name], AC.[column_id];'
TargetSchema = pd.read_sql(strSQL, conn)
#%%
# Get target table
TableName = 'UPMPERSON'
strSQL = 'SELECT TOP 0 * FROM ' + TableName + ';'
TargetTable = pd.read_sql(strSQL, conn)
#%%
# NOTE Temp cell to force deletion of UPMPERSON primary key column
TargetTable = TargetTable.drop('PERSONID',axis=1)
#%%
# NOTE Move out to a config table
# Define data type mapping
def fix_datatype (row):
if row['system_data_type'] == 'int' or row['system_data_type'] == 'bigint' or row['system_data_type'] == 'bit':
return 'int64'
if row['system_data_type'] == 'datetime' or row['system_data_type'] == 'smalldatetime':
return 'datetime64[ns]'
if row['system_data_type'] == 'numeric' or row['system_data_type'] == 'decimal' or row['system_data_type'] == 'float':
return 'float64'
return 'object'
#%%
# Add new column to TargetSchema with pandas dtype equivalents
TargetSchema['pd_datatype'] = TargetSchema.apply (lambda row: fix_datatype(row), axis=1)
#%%
# Add new column to TargetScheme with a concatenated string of table name and column name to uniquely identify columns
TargetSchema['Unique_ID'] = TargetSchema[['table_name', 'column_name']].apply(lambda x: '_'.join(x), axis=1)
#%%
# Get dtypes of TargetSchema and change TargetTable dtypes
for column in TargetTable.columns[:]:
#print(column)
#print(TargetTable[column].dtype)
lookup_value = TableName + "_" + TargetTable[column].name
lookup_index = TargetSchema.index[TargetSchema['Unique_ID'] == lookup_value].tolist()
lookup_df = TargetSchema.loc[lookup_index, 'pd_datatype']
targ_dtype = lookup_df.iloc[0][:]
#print(targ_dtype)
TargetTable[column].astype(targ_dtype)
#%%
dataFlow.drop_columns('PERSONID')
dataFlow.head(5)
#%%
dataFrame = dataFlow.to_pandas_dataframe()
TargetTable = TargetTable.append(dataFrame)
#%%
cursor.execute('SET IDENTITY_INSERT UPMPERSON ON')
#%%
params = urllib.parse.quote_plus(driver)
engine = sqlalchemy.create_engine("mssql+pyodbc:///?odbc_connect=%s" % params)
TargetTable.to_sql('UPMPERSON',engine,'dbo','append',False)
#%%
cursor.execute('SET IDENTITY_INSERT UPMPERSON OFF')