-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscheduler.py
99 lines (67 loc) · 3.23 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import os
import pwd
import grp
import datetime
from googleapiclient.discovery import build
from oauth2client import file, client, tools
from httplib2 import Http
def get_calendar():
dir_path = os.path.dirname(os.path.realpath(__file__))
store = file.Storage(os.path.join(dir_path, "token.json"))
creds = store.get()
if not creds or creds.invalid:
flow = client.flow_from_clientsecrets(
os.path.join(dir_path, "credentials.json"),
'https://www.googleapis.com/auth/calendar.readonly')
creds = tools.run_flow(flow, store)
return build('calendar', 'v3', http=creds.authorize(Http()))
def find_upcoming_zoomerang_events(calendar, days_ahead=7):
now = datetime.datetime.utcnow()
time_min = now.isoformat() + "Z"
time_max = (now + datetime.timedelta(days=days_ahead)).isoformat() + "Z"
results = calendar.events().list(calendarId="primary",
timeMin=time_min,
timeMax=time_max,
maxResults=100,
singleEvents=True,
orderBy="startTime").execute()
events = results.get("items", [])
zoomerang_events = []
for event in events:
if event.get("location", "").lower().startswith("zoomerang"):
zoomerang_events.append(event)
return (zoomerang_events, events)
def format_cron_job(zoomerang_event, user=None):
if user is None:
user = pwd.getpwuid(os.getuid())[0]
dir_path = os.path.dirname(os.path.realpath(__file__))
start_datetime = zoomerang_events[0]["start"]["dateTime"]
start_datetime = ":".join(start_datetime.split(":")[:-1]) \
+ start_datetime[-2:]
args = " ".join(zoomerang_event["location"].split()[1:])
summary = zoomerang_event.get("summary", "").replace('"', '')
st = datetime.datetime.strptime(start_datetime, "%Y-%m-%dT%H:%M:%S%z")
return f"{st.minute} {st.hour} {st.day} {st.month} * {user} "\
f"python {dir_path}/zoomerang.py {args} \"{summary}\" "\
f">> {dir_path}/zoomerang.log 2>&1"
def format_cron_jobs(zoomerang_events, user=None, environment_variables=("SHELL", "PATH")):
cron_prefix = "\n".join([f"{ev.upper()}={os.environ.get(ev)}" \
for ev in environment_variables])
cron_jobs = [format_cron_job(ev, user) for ev in zoomerang_events]
return cron_prefix + "\n\n" + "\n".join(cron_jobs) + "\n\n"
if __name__ == '__main__':
zoomerang_events, events = find_upcoming_zoomerang_events(get_calendar())
print(f"Found {len(zoomerang_events)} upcoming Zoomerang events (o {len(events)})")
content = format_cron_jobs(zoomerang_events, user="ubuntu")
#/etc/cron.d/zoomerang
cron_path = "/etc/cron.d/zoomerang"
print(f"Writing cron jobs to {cron_path}")
with open(cron_path, "w") as fp:
fp.write(content)
# Ensure correct permissions, etc.
os.chmod(cron_path, 0o600)
os.chown(cron_path, pwd.getpwnam("root").pw_uid, grp.getgrnam("root").gr_gid)
# Ensure cron jobs will run.
os.system(f"touch {os.path.dirname(cron_path)}")
os.system("sudo service cron restart")
print("Scheduler done")