-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
153 lines (122 loc) · 4.9 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import requests
import os
import json
import asyncio
from datetime import date
import utility.utils as ut
import utility.db as db
import utility.opportunity as opps
from dotenv import load_dotenv
from utility.scrape import (
request_github_internship24_data,
request_linkedin_data,
request_linkedin_internship24_data,
)
from utility.palm import gpt_job_analyze
from utility.error import ErrorMsg
# Load and determine if all env variables are set
load_dotenv()
ut.verify_set_env_variables()
async def execute_opportunities_webhook(
webhook_url: str, job_message: str, internship_message: str
):
"""
Executes the message which receives the formatted message
from the format_opportunities() function as well as the webhook
url for the respected discord channel
"""
# Create a dictionary payload for the message content
payload = {
"content": "# ✨ NEW OPPORTUNITY POSTINGS BELOW! ✨",
"tts": False,
"embeds": [
{
"title": f"✧・゚: *✧・゚:* 🎀 {date.today()} 🎀 ✧・゚: *✧・゚:*。",
"color": 0xFFFFFF,
},
],
}
if job_message:
payload["embeds"].append(
{
"title": "¸„.-•~¹°”ˆ˜¨ JOB OPPORTUNITIES ¨˜ˆ”°¹~•-.„¸",
"description": job_message,
"color": 0x05A3FF,
},
)
if internship_message:
payload["embeds"].append(
{
"title": " ¸„.-•~¹°”ˆ˜¨ INTERNSHIP OPPORTUNITIES ¨˜ˆ”°¹~•-.„¸",
"description": internship_message,
"color": 0x05A3FF,
},
)
# Convert the payload to JSON format
json_payload = json.dumps(payload)
# This will send a POST request to the webhook_url!
response = requests.post(
webhook_url, data=json_payload, headers={"Content-Type": "application/json"}
)
if response.status_code == 204:
print("Webhook message was sent sucessfully!")
else:
print(
f"Failed to send webhook message. {ErrorMsg().status_code_failure(response.status_code)}"
)
async def main():
# Creates table in database
with_create_table_command = ut.extract_command_value().create
if with_create_table_command:
TABLE_NAME = os.getenv("DB_TABLE")
db.create(TABLE_NAME)
print(f"Sucessfully created {TABLE_NAME}!")
exit() # Exit the main function to avoid calling other functions
file_paths = [os.getenv("MESSAGE_PATH"), os.getenv("PROMPTS_PATH")]
customized_object = ut.user_customization(file_paths)
# Determines the customized prompts for PaLM
prompt_object = ut.determine_prompts(customized_object["customized_prompts"])
# Determines the customized message for the webhook
finalized_message = ut.determine_customized_message(
customized_object["customized_message"]
)
# Consolidates all job-related opportunities into a comprehensive List[Opportunity], eliminating repetitive calls to the LLM SERVER.
job_opps = ut.merge_all_opportunity_data(request_linkedin_data())
filtered_job_opps = gpt_job_analyze(
job_opps,
prompt_object["full_time"],
)
opps.ingest_opportunities(filtered_job_opps)
# Consolidates all job-related opportunities into a comprehensive List[Opportunity], eliminating repetitive calls to the LLM SERVER.
internship_opps = ut.merge_all_opportunity_data(
request_linkedin_internship24_data(), request_github_internship24_data()
)
filtered_internship_opps = gpt_job_analyze(
internship_opps,
prompt_object["internship"],
)
opps.ingest_opportunities(filtered_internship_opps)
# To test the code without consuming API requests, call reset_processed_status().
# This function efficiently resets the processed status of 5 job postings by setting them to _processed = 0.
# By doing so, developers can run tests without wasting valuable API resources.
# To do so, please comment the function calls above this comment.
# After, please uncomment the following line of code:
# db.reset_processed_status()
internship_data_results = opps.list_opportunities(
False, "internship", filtered=True
)
job_data_results = opps.list_opportunities(True, "full_time", filtered=True)
internship_formatted_message = opps.format_opportunities(
internship_data_results, finalized_message
)
job_formatted_message = opps.format_opportunities(
job_data_results, finalized_message
)
discord_webhook = os.getenv("DISCORD_WEBHOOK")
await execute_opportunities_webhook(
discord_webhook, job_formatted_message, internship_formatted_message
)
opps.update_opportunities_status(job_data_results)
opps.update_opportunities_status(internship_data_results)
if __name__ == "__main__":
asyncio.run(main())