-
Notifications
You must be signed in to change notification settings - Fork 3
/
commonJoinHandling.py
109 lines (80 loc) · 6.1 KB
/
commonJoinHandling.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import pandas as pd
import azureml.dataprep as dprep
from commonPackageHandling import openDataFlowPackage, saveDataFlowPackage
from commonInventoryCreation import getColumnStats, getDataFlowStats
def joinTables(dataName, previousStageNumber, thisStageNumber, qualityFlag, operatorToUse, operationFlag):
dataFlow, fullPackagePath = openDataFlowPackage(dataName, previousStageNumber, qualityFlag)
if dataFlow:
print('{0}: loaded package from path {1}'.format(dataName, fullPackagePath))
# Set up empty intermediate dataframes that we will use to build up inventories at both dataFlow and column level
dataFlowInventoryIntermediate = pd.DataFrame()
columnInventoryIntermediate = pd.DataFrame()
if operationFlag != '':
# Load config file
joinConfig = dprep.read_csv('./Config/' + operationFlag).to_pandas_dataframe()
# For each config in the file...
for index, row in joinConfig.iterrows():
leftDataName = row['LeftDataName']
leftDataFlowJoinColumn = row['LeftDataFlowJoinColumn']
rightDataName = row['RightDataName']
rightDataFlowJoinColumn = row['RightDataFlowJoinColumn']
joinType = row['JoinType']
print('{0}: ready to join {1} {2} -> {3} {4} using jointype {5}'.format(dataName, leftDataName, leftDataFlowJoinColumn, rightDataName, rightDataFlowJoinColumn, joinType))
# Load right hand data flow
rightDataFlow, fullPackagePath = openDataFlowPackage(rightDataName, previousStageNumber, qualityFlag)
print('{0}: loaded package from path {1}'.format(rightDataName, fullPackagePath))
# We always perform the inner "MATCH" stype join
join_builder = dataFlow.builders.join(right_dataflow=rightDataFlow,
left_column_prefix=dataName + '_',
right_column_prefix=rightDataName + '_')
join_builder.detect_column_info()
join_builder.join_key_pairs=[(leftDataFlowJoinColumn, rightDataFlowJoinColumn)]
# Setting up join type:
# NONE = 0
# MATCH = 2
# UNMATCHLEFT = 4
# UNMATCHRIGHT = 8
join_builder.join_type = 2
innerDataFlow = join_builder.to_dataflow()
print('{0} created inner dataflow : Columns : {1}, Rows : {2}'.format(dataName, len(innerDataFlow.get_profile().columns), innerDataFlow.row_count))
if joinType == "LEFT":
# Use the "UNMATCHLEFT" setting to grab the rows that haven't been joined from the left data flow
join_builder.join_type = 4
leftUnmatchedDataFlow = join_builder.to_dataflow()
print('{0} created left unmatched dataflow : Columns : {1}, Rows : {2}'.format(dataName, len(leftUnmatchedDataFlow.get_profile().columns), leftUnmatchedDataFlow.row_count))
# Now append this dataflow to the original inner join dataflow, to create a "left outer join"
newDataFlow = innerDataFlow.append_rows([leftUnmatchedDataFlow])
else:
newDataFlow = innerDataFlow
# Create a new name for this data flow based on concatenation of left dataflow and right
newDataName = dataName + '_' + rightDataName
# Output key stats
print('{0} left table : {0}, Columns : {1}, Rows : {2}'.format(leftDataName, len(dataFlow.get_profile().columns), dataFlow.row_count))
print('{0} right table : {0}, Columns : {1}, Rows : {2}'.format(rightDataName, len(rightDataFlow.get_profile().columns), rightDataFlow.row_count))
newDataProfile = newDataFlow.get_profile()
print('{0} joined table : {0}, Columns : {1}, Rows : {2}'.format(newDataName, len(newDataProfile.columns), newDataFlow.row_count))
# Now generate column and data flow inventories
columnInventory = getColumnStats(newDataProfile, newDataName, thisStageNumber, operatorToUse, operationFlag)
dataFlowInventory = getDataFlowStats(newDataFlow, newDataProfile, newDataName, thisStageNumber, operatorToUse, operationFlag)
# Capture the column inventory for the new dataflow
columnInventoryIntermediate = columnInventoryIntermediate.append(columnInventory)
# Capture the data flow inventory for the new data flow
dataFlowInventoryIntermediate = dataFlowInventoryIntermediate.append(dataFlowInventory)
# Finally save the data flow so it can be passed onto the next stage of the process...
targetPackagePath = saveDataFlowPackage(newDataFlow, newDataName, thisStageNumber, 'A')
print('{0}: saved package to {1}'.format(newDataName, targetPackagePath))
else:
print('{0}: no joining of tables required'.format(dataName))
dataProfile = dataFlow.get_profile()
# Now generate column and data flow inventories
columnInventory = getColumnStats(dataProfile, dataName, thisStageNumber, operatorToUse, operationFlag)
columnInventoryIntermediate = columnInventoryIntermediate.append(columnInventory)
dataFlowInventory = getDataFlowStats(dataFlow, dataProfile, dataName, thisStageNumber, operatorToUse, operationFlag)
dataFlowInventoryIntermediate = dataFlowInventoryIntermediate.append(dataFlowInventory)
# Finally save the data flow so it can be passed onto the next stage of the process...
targetPackagePath = saveDataFlowPackage(dataFlow, dataName, thisStageNumber, qualityFlag)
print('{0}: saved source package to {1}'.format(dataName, targetPackagePath))
return dataFlow, columnInventoryIntermediate, dataFlowInventoryIntermediate
else:
print('{0}: no package file found at location {1}'.format(dataName, fullPackagePath))
return None, None, None