-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlog_querier.py
175 lines (136 loc) · 5.26 KB
/
log_querier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
from flytekit import task, workflow, LaunchPlan, ImageSpec, Secret
import flytekit
import os
import json
import boto3
from flytekitplugins.flyteinteractive import vscode
from flytekitplugins.chatgpt import ChatGPTTask
from openai import OpenAI
aws_image = ImageSpec(
name="aws_image",
requirements="requirements.txt",
registry=os.environ.get("DOCKER_REGISTRY", None),
)
LOGS_SECRET_GROUP = "arn:aws:secretsmanager:us-east-2:356633062068:secret:"
LOGS_SECRET_KEY = "cloudwatch-readonly-JZZBP6"
OPENAI_SECRET_GROUP = "arn:aws:secretsmanager:us-east-2:356633062068:secret:"
OPENAI_SECRET_KEY = "openai-PqfFLj"
OPENAI_ORG_ID = "org-0pKlDFzPsouGXeUrSMTGsFII"
@task(
container_image=aws_image,
secret_requests=[
Secret(
group=LOGS_SECRET_GROUP,
key=LOGS_SECRET_KEY,
mount_requirement=Secret.MountType.FILE
)
]
)
@vscode()
def query_logs() -> str:
secret_val = flytekit.current_context().secrets.get(LOGS_SECRET_GROUP, LOGS_SECRET_KEY)
secret_data = json.loads(secret_val)
aws_access_key_id = secret_data["AWSAccessKeyId"]
aws_secret_access_key = secret_data["AWSSecretAccessKey"]
# Initialize a boto3 session
session = boto3.Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
log_group_name = "/aws/containerinsights/opta-oc-production/application"
log_stream_name = "https://us-east-2.console.aws.amazon.com/cloudwatch/home?region=us-east-2#logsV2:log-groups/log-group/$252Faws$252Fcontainerinsights$252Fopta-oc-production$252Fapplication/log-events/fluentbit-kube.var.log.containers.f3979948cb5254dac837-n0-0_flytesnacks-development_f3979948cb5254dac837-n0-0-904f61f5c5080852c9f1e30cfc22e8a6f90401f332e8da5ee21564b98f19e639.log"
client = session.client('logs')
response = client.get_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name
)
list_output = []
for event in response["events"]:
json_payload = json.loads(event["message"])
log_json = json_payload["log"]
log_timestamp = ""
log_name = ""
log_level_name = ""
log_message = ""
# if this is a log entry, properly format the inner json so we can parse it
if "{\"asctime\":" in log_json:
start_index = log_json.find("{\"asctime\":")
test = log_json[start_index:]
inner_json = json.loads(log_json[start_index:])
log_timestamp = inner_json["asctime"]
if "name" in inner_json.keys():
log_name = inner_json["name"]
else:
print("Key 'name' does not exist in the parsed JSON.")
if "levelname" in inner_json.keys():
log_level_name = inner_json["levelname"]
else:
print("Key 'levelname' does not exist in the parsed JSON.")
if "message" in inner_json.keys():
log_message = inner_json["message"]
else:
print("Key 'message' does not exist in the parsed JSON.")
output_str = log_timestamp
if log_name != "":
output_str = output_str + " " + log_name
if log_level_name != "":
output_str = output_str + " " + log_level_name
if log_message != "":
output_str = output_str + " " + log_message
list_output.append(output_str)
print(output_str)
else:
print(
"Key 'asctime' does not exist in the parsed JSON and therefore we do not treat this as a log message.")
if len(list_output) > 0:
return '\n'.join(list_output)
else:
return ""
@task(container_image=aws_image)
def preprocess_task(input_error: str) -> str:
preamble = "the following is an error message from the logs of a user's Flyte task. Please write a short (100 words max) explanation of what is happening in this error:\n\n"
return preamble + input_error
# gpt_task = ChatGPTTask(
# name="chatgpt",
# # openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7",
# openai_organization=OPENAI_ORG_ID,
# chatgpt_config={
# "model": "gpt-3.5-turbo",
# "temperature": 0.7,
# },
# )
@task(
container_image=aws_image,
secret_requests=[
Secret(
group=OPENAI_SECRET_GROUP,
key=OPENAI_SECRET_KEY,
mount_requirement=Secret.MountType.FILE
)
]
)
def call_gpt(prompt: str) -> str:
secret_val = flytekit.current_context().secrets.get(OPENAI_SECRET_GROUP, OPENAI_SECRET_KEY)
secret_data = json.loads(secret_val)
openai_secret_key = secret_data["openai_secret_key"]
client = OpenAI(
# This is the default and can be omitted
api_key=openai_secret_key,
)
chat_completion = client.chat.completions.create(
messages=[
{
"role": "user",
"content": prompt,
}
],
model="gpt-3.5-turbo",
)
return chat_completion.choices[0].message.content
@workflow
def query_logs_wf() -> str:
log_error_raw = query_logs()
prompt = preprocess_task(input_error=log_error_raw)
response = call_gpt(prompt=prompt)
# response = gpt_task(message=prompt)
return prompt