-
Notifications
You must be signed in to change notification settings - Fork 3
/
lambda_handler.py
112 lines (99 loc) · 3.43 KB
/
lambda_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
import json
import boto3
import botocore
import os
import logging
import os.path
from datetime import datetime
import dateutil.tz
# Function for logger
def load_log_config():
"""
Configure logging
@return:
"""
root = logging.getLogger()
root.setLevel(logging.INFO)
return root
# Logger initiation
logger = load_log_config()
def lambda_handler(event, context):
"""
Lambda function's entry point. This function receives a success event
from Step Functions State machine, transforms error message, and update
DynamoDB table.
@param event:
@param context:
@return:
"""
print(event)
print(event['Input']['execution_id'])
execution_id = event['Input']['execution_id']
central = dateutil.tz.gettz('US/Central')
now = datetime.now(tz=central)
p_ingest_time = now.strftime('%m/%d/%Y %H:%M:%S')
logger.info(p_ingest_time)
# update table
if "JobRunState" in event['Input']['taskresult']:
print("JobRunState Exists")
print(event['Input']['taskresult']['JobRunState'])
print(event)
status = event['Input']['taskresult']['JobRunState']
# Time stamp for the stepfunction name
p_stp_fn_time = now.strftime("%Y%m%d%H%M%S%f")
# update table
try:
dynamo_client = boto3.resource('dynamodb')
table = dynamo_client.Table(os.environ['DYNAMODB_TABLE_NAME'])
table.update_item(
Key={
'execution_id': execution_id
},
UpdateExpression="set joblast_updated_timestamp=:lut,job_latest_status=:sts",
ExpressionAttributeValues={
':sts': status,
':lut': p_stp_fn_time,
},
ReturnValues="UPDATED_NEW"
)
except botocore.exceptions.ClientError as error:
logger.info("[ERROR] Dynamodb process failed:{}".format(error))
raise error
except Exception as e:
logger.info("[ERROR] Dynamodb process failed:{}".format(e))
raise e
else:
print("JobRunState Does not exist")
print(event)
status = "FAILED"
error_msg = event['Input']['taskresult']['Cause']
# Time stamp for the stepfunction name
p_stp_fn_time = now.strftime("%Y%m%d%H%M%S%f")
# update table
try:
dynamo_client = boto3.resource('dynamodb')
table = dynamo_client.Table(os.environ['DYNAMODB_TABLE_NAME'])
table.update_item(
Key={
'execution_id': execution_id
},
UpdateExpression="set joblast_updated_timestamp=:lut,job_latest_status=:sts,error_message=:emsg",
ExpressionAttributeValues={
':sts': status,
':lut': p_stp_fn_time,
':emsg': error_msg
},
ReturnValues="UPDATED_NEW"
)
except botocore.exceptions.ClientError as error:
logger.info("[ERROR] Dynamodb process failed:{}".format(error))
raise error
except Exception as e:
logger.info("[ERROR] Dynamodb process failed:{}".format(e))
raise e
return {
'statusCode': 200,
'body': json.dumps('Dynamodb status updated!')
}