-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathnewstage_2_21_removeFirstRows.py
62 lines (45 loc) · 2.47 KB
/
newstage_2_21_removeFirstRows.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#%% [markdown]
# # Stage : Remove First Rows
# This involves stepping through each file in the confiug file to extract and do a basic clean up:
# - Based on flag in config, remove the first row;
# - Save the data flow that has been created for each file away so that it can be referenced and used later on
#%%
# Import all of the libraries we need to use...
import pandas as pd
import azureml.dataprep as dprep
import os as os
import re as re
import collections
from azureml.dataprep import col
from azureml.dataprep import Dataflow
from commonDataFlowProcessingLoop import dataFlowProcessingLoop
from commonPackageHandling import saveDataFlowPackage, openDataFlowPackage
from commonInventoryCreation import getColumnStats, getDataFlowStats
# Let's also set up global variables and common functions...
previousStageNumber = '20'
thisStageNumber = '21'
#%%
def removeRowsFromTop(dataName, previousStageNumber, thisStageNumber, qualityFlag, operatorToUse, operationFlag):
dataFlow, fullPackagePath = openDataFlowPackage(dataName, previousStageNumber, qualityFlag)
if dataFlow:
print('{0}: loaded package from path {1}'.format(dataName, fullPackagePath))
# Now perform the operation on the dataFlow : ie remove the number of rows specified from the top
numberOfRowsToRemove = int(operationFlag)
dataFlow = dataFlow.skip(numberOfRowsToRemove)
print('{0}: removed first {1} row(s)'.format(dataName, numberOfRowsToRemove))
dataProfile = dataFlow.get_profile()
# Now generate column and data flow inventories
columnInventory = getColumnStats(dataProfile, dataName, thisStageNumber, operatorToUse, operationFlag)
dataFlowInventory = getDataFlowStats(dataFlow, dataProfile, dataName, thisStageNumber, operatorToUse, operationFlag)
# Finally save the data flow so it can be passed onto the next stage of the process...
targetPackagePath = saveDataFlowPackage(dataFlow, dataName, thisStageNumber, qualityFlag)
print('{0}: saved package to {1}'.format(dataName, targetPackagePath))
# Now return all of the components badk to the main loop...
return dataFlow, columnInventory, dataFlowInventory
else:
print('{0}: no package file found at location {1}'.format(dataName, fullPackagePath))
return None, None, None
#%%
dataFlowInventoryAll = dataFlowProcessingLoop(previousStageNumber, thisStageNumber, 'A', 'RemoveRowsFromTop', removeRowsFromTop)
#%%
dataFlowInventoryAll