forked from tl-its-umich-edu/my-learning-analytics
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cron.py
707 lines (579 loc) · 31 KB
/
cron.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
from datetime import datetime
import logging
from collections import namedtuple
from typing import Any, Dict, List, Optional, Union
from zoneinfo import ZoneInfo
from functools import wraps
import hjson
import pandas as pd
import pangres
from django.conf import settings
from django.db import connections as conns, models
from django.db.models import QuerySet
from django_cron import CronJobBase, Schedule
from google.cloud import bigquery
from sqlalchemy import types, text
from sqlalchemy.engine import ResultProxy
from sqlalchemy.orm import sessionmaker
from dashboard.common import db_util
from dashboard.models import Course, Resource, AcademicTerms, User
logger = logging.getLogger(__name__)
# Decorator to clean up function call logging
def log_function_call(func):
@wraps(func)
def wrapper(*args, **kwargs):
logging.info(f"Calling function: {func.__name__}")
result = func(*args, **kwargs)
logging.info(f"Function {func.__name__} completed")
return result
return wrapper
# cron job to populate course and user tables
class DashboardCronJob(CronJobBase):
schedule = Schedule(run_at_times=settings.RUN_AT_TIMES)
code = 'dashboard.DashboardCronJob' # a unique code
def setup_queries(self):
# Set up queries array from configuration file
CRON_QUERY_FILE = settings.CRON_QUERY_FILE
logger.info(CRON_QUERY_FILE)
try:
with open(CRON_QUERY_FILE) as cron_query_file:
self.queries = hjson.load(cron_query_file)
except FileNotFoundError:
logger.error(f'Cannot find cron queries file "{CRON_QUERY_FILE}".')
def setup_bigquery(self):
# Instantiates a client
self.bigquery_client = bigquery.Client()
# BQ Total Bytes Billed to report to status
self.total_bytes_billed = 0
def __init__(self) -> None:
"""Constructor to be used to declare valid_locked_course_ids instance variable."""
super().__init__()
self.myla_engine = db_util.create_sqlalchemy_engine(settings.DATABASES['default'])
self.setup_bigquery()
self.setup_queries()
self.valid_locked_course_ids: List[str]
# Split a list into *size* shorter pieces
def split_list(self, a_list: list, size: int = 20):
return [a_list[i:i + size] for i in range(0, len(a_list), size)]
# This util_function is used to run a query against the context store and insert the result into a MySQL table
def util_function(self, sql_string, mysql_table, bq_job_config:Optional[bigquery.QueryJobConfig]=None, table_identifier=None):
logger.debug(f'sql={sql_string}')
logger.debug(f'table={mysql_table} params={bq_job_config} table_identifier={table_identifier}')
df = self.execute_bq_query(sql_string, bq_job_config).to_dataframe()
# drop duplicates
df = df.drop_duplicates(keep='first')
logger.debug(" table: " + mysql_table + " insert size: " + str(df.shape[0]))
# write to MySQL
try:
df.to_sql(con=self.myla_engine, name=mysql_table, if_exists='append', index=False)
except Exception as e:
logger.exception(f"Error running to_sql on table {mysql_table}")
raise
# returns the row size of dataframe
return f"{str(df.shape[0])} {mysql_table}\n"
# Execute a query against the bigquery database
def execute_bq_query(self, query: str, bq_job_config: Optional[bigquery.QueryJobConfig] = None):
# Remove the newlines from the query
query = query.replace("\n", " ")
if bq_job_config:
try:
# Convert to bq schema object
query_job = self.bigquery_client.query(query, job_config=bq_job_config)
query_job_result = query_job.result()
self.total_bytes_billed += query_job.total_bytes_billed
logger.debug(f"This job had {query_job.total_bytes_billed} bytes. Total: {self.total_bytes_billed}")
return query_job_result
except Exception as e:
logger.error(f"Error ({str(e)}) in setting up schema for query {query}.")
raise Exception(e)
else:
query_job = self.bigquery_client.query(query)
query_job_result = query_job.result()
self.total_bytes_billed += query_job.total_bytes_billed
logger.debug(f"This job had {query_job.total_bytes_billed} bytes. Total: {self.total_bytes_billed}")
return query_job_result
# Execute a query against the MyLA database
def execute_myla_query(self, query: str, params: Optional[Dict] = None) -> ResultProxy:
with self.myla_engine.begin() as connection:
connection.detach()
if params:
return connection.execute(text(query), params)
else:
return connection.execute(text(query))
# remove all records inside the specified table
def execute_myla_delete_query(self, query: str, params: Optional[Dict[str,str]] = None) -> str:
# delete all records in the table first, can have an optional where clause
result_proxy = self.execute_myla_query(query, params)
return(f"\n{result_proxy.rowcount} rows deleted from {query}\n")
def soft_update_datetime_field(
self,
model_inst: models.Model,
field_name: str,
warehouse_field_value: Union[datetime, None],
) -> List[str]:
"""
Uses Django ORM to update DateTime field of model instance if the field value is null and the warehouse data is non-null.
"""
model_name: str = model_inst.__class__.__name__
current_field_value: Union[datetime, None] = getattr(model_inst, field_name)
# Skipping update if the field already has a value, provided by a previous cron run or administrator
if current_field_value is not None:
logger.info(
f'Skipped update of {field_name} for {model_name} instance ({model_inst.id}); existing value was found')
else:
if warehouse_field_value:
setattr(model_inst, field_name, warehouse_field_value)
logger.info(f'Updated {field_name} for {model_name} instance ({model_inst.id})')
return [field_name]
return []
# verify whether course ids are valid
@log_function_call
def verify_course_ids(self):
# whether all course ids are valid ids
invalid_course_id_list = []
supported_courses = Course.objects.get_supported_courses()
course_ids = [str(x) for x in supported_courses.values_list('id', flat=True)]
courses_data = self.execute_bq_query(
self.queries['course'],
bigquery.QueryJobConfig(query_parameters=[
bigquery.ArrayQueryParameter('course_ids', 'STRING', course_ids),
])
)
courses_data = courses_data.to_dataframe()
# error out when course id is invalid, otherwise add DataFrame to list
for course_id, data_last_updated in supported_courses:
if course_id not in list(courses_data['id']):
# Check if the course was ever updated by cron. If it was updated it is invalid, otherwise don't consider this an error and skip it.
if data_last_updated:
logger.error(f"Course {course_id} doesn't have an entry in data warehouse yet. It has local data, so marking invalid.")
invalid_course_id_list.append(course_id)
else:
logger.info(f"Course {course_id} doesn't have an entry in data warehouse yet. It hasn't been updated locally, so skipping.")
if len(invalid_course_id_list) > 0:
logger.error(f'Course {invalid_course_id_list} do not exist in data warehouse yet. ')
if len(courses_data) == 0:
logger.info("No course records were found in the database.")
courses_data = pd.DataFrame(
columns=["id", "canvas_id", "enrollment_term_id", "name", "start_at", "conclude_at"])
CourseVerification = namedtuple("CourseVerification", ["invalid_course_ids", "course_data"])
return CourseVerification(invalid_course_id_list, courses_data)
# Update the user table with the data from the data warehouse
@log_function_call
def update_user(self):
# cron status
status = ""
# delete all records in the table first
status += self.execute_myla_delete_query("DELETE FROM user")
# select all student registered for the course
status += self.util_function(
self.queries['user'],
'user',
bigquery.QueryJobConfig(query_parameters=[
bigquery.ArrayQueryParameter('course_ids', 'STRING', self.valid_locked_course_ids,),
bigquery.ScalarQueryParameter('canvas_data_id_increment', 'INT64', settings.CANVAS_DATA_ID_INCREMENT),
])
)
return status
# update unizin metadata from data in the data warehouse
@log_function_call
def update_unizin_metadata(self):
# cron status
status = ""
# delete all records in the table first
status += self.execute_myla_delete_query("DELETE FROM unizin_metadata")
# select all student registered for the course
metadata_sql = self.queries['metadata']
logger.debug(metadata_sql)
status += self.util_function(metadata_sql, 'unizin_metadata')
return status
# update file records from Canvas that don't have names provided
@log_function_call
def update_canvas_resource(self):
# cron status
status = ""
# Select all the files for these courses
# convert int array to str array
df_attach = self.execute_bq_query(
self.queries['resource'],
bigquery.QueryJobConfig(query_parameters=[
bigquery.ArrayQueryParameter('course_ids', 'STRING', self.valid_locked_course_ids,),
])
).to_dataframe()
logger.debug(df_attach)
# Update these back again based on the dataframe
# Remove any rows where file_state is not available!
for row in df_attach.itertuples(index=False):
if row.file_state == 'available':
Resource.objects.filter(resource_id=row.id).update(name=row.display_name)
logger.debug(f"Row {row.id} updated to {row.display_name}")
else:
Resource.objects.filter(resource_id=row.id).delete()
logger.debug(f"Row {row.id} removed as it is not available")
return status
# update RESOURCE_ACCESS records from BigQuery or LRS data sources
@log_function_call
def update_resource_access(self):
# cron status
status = ""
# return string with concatenated SQL insert result
return_string = ""
data_last_updated = Course.objects.filter(id__in=self.valid_locked_course_ids).get_data_earliest_date()
logger.info(f"Deleting all records in resource_access after {data_last_updated}")
status += self.execute_myla_delete_query("DELETE FROM resource_access WHERE access_time > :data_last_updated", {'data_last_updated': data_last_updated })
# loop through multiple course ids, 20 at a time
# (This is set by the CRON_BQ_IN_LIMIT from settings)
for data_warehouse_course_ids in self.split_list(self.valid_locked_course_ids, settings.CRON_BQ_IN_LIMIT):
# query to retrieve all file access events for one course
# There is no catch if this query fails, event_store.events needs to exist
final_query = []
for k, query_obj in settings.RESOURCE_ACCESS_CONFIG.items():
# concatenate the multi-line presentation of query into one single string
query = query_obj['query']
if (data_last_updated is not None):
# insert the start time parameter for query
if query_obj.get('query_data_last_updated_condition'):
query += f" {query_obj['query_data_last_updated_condition']} "
elif settings.LRS_IS_BIGQUERY:
query += " and event_time > CAST(@data_last_updated as DATETIME) "
final_query.append(query)
final_query = " UNION ALL ".join(final_query)
# convert int array to string array
data_warehouse_course_ids_short = [
db_util.incremented_id_to_canvas_id(id) for id in data_warehouse_course_ids]
course_ids_short = list(map(str, data_warehouse_course_ids_short))
logger.debug(final_query)
logger.debug(data_warehouse_course_ids)
if settings.LRS_IS_BIGQUERY:
query_params = [
bigquery.ArrayQueryParameter('course_ids', 'STRING', data_warehouse_course_ids),
bigquery.ArrayQueryParameter('course_ids_short', 'STRING', course_ids_short),
bigquery.ScalarQueryParameter('canvas_data_id_increment', 'INT64',
settings.CANVAS_DATA_ID_INCREMENT)
]
if (data_last_updated is not None):
# insert the start time parameter for query
query_params.append(bigquery.ScalarQueryParameter(
'data_last_updated', 'TIMESTAMP', data_last_updated))
query_params.append(bigquery.ArrayQueryParameter(
'canvas_event_urls', 'STRING', settings.CANVAS_EVENT_URLS))
job_config = bigquery.QueryJobConfig()
job_config.query_parameters = query_params
# Location must match that of the dataset(s) referenced in the query.
bq_job = self.bigquery_client.query(final_query, location='US', job_config=job_config)
# This is the call that could result in an exception
resource_access_df: pd.DataFrame = bq_job.to_dataframe()
self.total_bytes_billed += bq_job.total_bytes_billed
logger.debug(self.total_bytes_billed)
else:
query_params = {
'course_ids': data_warehouse_course_ids,
'course_ids_short': course_ids_short,
'canvas_data_id_increment': settings.CANVAS_DATA_ID_INCREMENT,
}
if (data_last_updated is not None):
query_params['data_last_updated'] = data_last_updated
resource_access_df = pd.read_sql(final_query, conns['LRS'], params=query_params)
resource_access_row_count = len(resource_access_df)
if resource_access_row_count == 0:
logger.info('No resource access data found. Continuing...')
continue
logger.debug('resource_access_df row count: '
f'({resource_access_row_count})')
logger.debug(f'resource_access_df:\n'
f'{resource_access_df}\n'
f'{resource_access_df.dtypes}')
if 'user_login_name' not in resource_access_df.columns:
logger.warning('Update queries in configuration file '
'to include column "user_login_name".')
else:
# process data which contains user login names, but not IDs
if -1 in resource_access_df['user_id'].values:
login_names = ','.join(
map(repr, resource_access_df['user_login_name']
.drop_duplicates().dropna().values))
logger.debug(f'login_names:\n{login_names}')
# get user ID as string because pd.merge will convert
# int64 to scientific notation; converting SN to int64
# causes Obi-Wan problems (off by one)
user_id_df = pd.read_sql(
'select sis_name as user_login_name,'
'cast(user_id as char) as user_id_str '
f'from user where sis_name in ({login_names})',
self.myla_engine)
logger.debug(f'user_id_df:\n'
f'{user_id_df}\n'
f'{user_id_df.dtypes}')
# combine user login and ID data
resource_access_df = pd.merge(
resource_access_df, user_id_df,
on='user_login_name', how='outer')
# replace real user_id values for missing ones (-1)
resource_access_df.loc[
resource_access_df['user_id'] == -1,
'user_id'] = resource_access_df['user_id_str']
# drops must be in this order; especially dropna() LAST
resource_access_df = resource_access_df \
.drop(columns=['user_id_str', 'user_login_name']) \
.dropna()
resource_access_df['user_id'] = pd.to_numeric(resource_access_df['user_id'])
logger.debug(f'resource_access_df:\n'
f'{resource_access_df}\n'
f'{resource_access_df.dtypes}')
else:
resource_access_df = resource_access_df.drop(
columns='user_login_name')
resource_access_df = resource_access_df.dropna()
# drop duplicates
resource_access_df = resource_access_df.drop_duplicates(
['resource_id', 'user_id', 'access_time'], keep='first')
logger.debug('resource_access_df row count (de-duped): '
f'({len(resource_access_df)})')
logger.debug(f'resource_access_df:\n'
f'{resource_access_df}\n'
f'{resource_access_df.dtypes}')
# Make resource data from resource_access data
resource_df = resource_access_df.filter(["resource_id", "resource_type", "name"])
resource_df = resource_df.drop_duplicates(["resource_id"])
# pangres.upsert() requires DataFrame to have index
resource_df = resource_df.set_index('resource_id')
logger.debug(f'resource_df:\n'
f'{resource_df}\n'
f'{resource_df.dtypes}')
resource_access_df = resource_access_df.drop(
columns=['resource_type', 'name'])
ra_len_before = len(resource_access_df)
# Drop rows with NA in any column
resource_access_df = resource_access_df.dropna()
logger.info(f'{ra_len_before - len(resource_access_df)} / '
f'{ra_len_before} resource_access_df rows with '
'NA values dropped')
logger.debug(f'resource_access_df:\n'
f'{resource_access_df}\n'
f'{resource_access_df.dtypes}')
# only keep access events generated by students
student_enrollment_type = User.EnrollmentType.STUDENT
student_enrollment_df = pd.read_sql(
'select user_id, course_id from user where enrollment_type= %s',
self.myla_engine, params=[(str(student_enrollment_type),)])
resource_access_df = pd.merge(
resource_access_df, student_enrollment_df,
on=['user_id', 'course_id'],
# use inner merge to keep only resource access event (left)
# innitiated by people with student enrollment type (right)
how='inner')
# First, update resource table
try:
dtype = {'resource_id': types.VARCHAR(255)}
pangres.upsert(con=self.myla_engine, df=resource_df,
table_name='resource', if_row_exists='update',
create_schema=False, add_new_columns=False,
dtype=dtype)
except Exception as e:
logger.exception('Error running upsert on table resource')
raise
# Next, update resource_access table
try:
resource_access_df.to_sql(con=self.myla_engine, name='resource_access',
if_exists='append', index=False)
except Exception as e:
logger.exception('Error running to_sql on table '
'resource_access')
raise
return_string += \
f'{len(resource_access_df)} rows for courses [' + ', '.join(
map(repr, data_warehouse_course_ids)) + ']\n'
logger.info(return_string)
return status
@log_function_call
def update_groups(self):
# cron status
status = ""
# delete all records in assignment_group table
status += self.execute_myla_delete_query("DELETE FROM assignment_groups")
# update groups
# Loading the assignment groups inforamtion along with weight/points associated ith arn assignment
logger.debug("update_assignment_groups(): ")
# loop through multiple course ids
status += self.util_function(
self.queries['assignment_groups'],
'assignment_groups',
bigquery.QueryJobConfig(query_parameters=[
bigquery.ArrayQueryParameter('course_ids', 'STRING', self.valid_locked_course_ids,),
]),
)
return status
@log_function_call
def update_assignment(self):
# Load the assignment info w.r.t to a course such as due_date, points etc
status = ""
# delete all records in assignment table
status += self.execute_myla_delete_query("DELETE FROM assignment")
# loop through multiple course ids
status += self.util_function(
self.queries['assignment'],
'assignment',
bigquery.QueryJobConfig(query_parameters=[
bigquery.ArrayQueryParameter('course_ids', 'STRING', self.valid_locked_course_ids,),
]),
)
return status
@log_function_call
def submission(self):
# student submission information for assignments
# cron status
status = ""
# delete all records in submission table
status += self.execute_myla_delete_query("DELETE FROM submission")
# loop through multiple course ids
# filter out not released grades (submission_dim.posted_at date is not null) and partial grades (submission_dim.workflow_state != 'graded')
bq_job_config = bigquery.QueryJobConfig(query_parameters=[
bigquery.ArrayQueryParameter('course_ids', 'STRING', self.valid_locked_course_ids,),
bigquery.ScalarQueryParameter('canvas_data_id_increment', 'INT64', settings.CANVAS_DATA_ID_INCREMENT),
])
df = self.execute_bq_query(self.queries['submission'], bq_job_config).to_dataframe()
df = df.drop_duplicates(keep='first')
df.to_sql(con=self.myla_engine, name='submission', if_exists='append', index=False)
status+=f"{str(df.shape[0])} submission\n"
# returns the row size of dataframe
return status
@log_function_call
def weight_consideration(self):
# load the assignment weight consider information with in a course. Some assignments don't have weight consideration
# the result of it return boolean indicating weight is considered in table calculation or not
status = ""
# delete all records in assignment_weight_consideration table
status += self.execute_myla_delete_query("DELETE FROM assignment_weight_consideration")
# loop through multiple course ids
status += self.util_function(
self.queries['assignment_weight'],
'assignment_weight_consideration',
bigquery.QueryJobConfig(query_parameters=[
bigquery.ArrayQueryParameter('course_ids', 'STRING', self.valid_locked_course_ids,),
]),
'weight')
logger.debug(status + "\n\n")
return status
@log_function_call
def update_term(self) -> str:
"""
Searches warehouse data for new terms and adds them while leaving existing terms as they are.
"""
status: str = ''
term_sql: str = self.queries['term']
logger.debug(term_sql)
warehouse_term_df: pd.DataFrame = self.execute_bq_query(term_sql).to_dataframe()
existing_terms_ids: List[int] = [term.id for term in list(AcademicTerms.objects.all())]
new_term_ids: List[int] = [int(id) for id in warehouse_term_df['id'].to_list() if id not in existing_terms_ids]
if not new_term_ids:
logger.info('No new terms were found to add to the academic_terms table.')
else:
new_term_df: pd.DataFrame = warehouse_term_df.loc[warehouse_term_df['id'].isin(new_term_ids)]
try:
new_term_df.to_sql(con=self.myla_engine, name='academic_terms', if_exists='append', index=False)
term_message: str = f'Added {len(new_term_df)} new records to academic_terms table: {new_term_ids}'
logger.info(term_message)
status += term_message + '\n'
except Exception as e:
logger.error(f'Error running to_sql on term table: {e}')
raise
return status
@log_function_call
def update_course(self, warehouse_courses_data: pd.DataFrame) -> str:
"""
Updates course records with data returned from verify_course_ids, only making changes when necessary.
"""
status: str = ''
logger.debug(warehouse_courses_data.to_json(orient='records'))
courses: QuerySet = Course.objects.filter(id__in=self.valid_locked_course_ids)
courses_string: str = ', '.join([str(x) for x in self.valid_locked_course_ids])
status += f'{str(len(courses))} course(s): {courses_string}\n'
for course in courses:
updated_fields: List[str] = []
warehouse_course_dict: Dict[str, Any] = warehouse_courses_data.loc[warehouse_courses_data['id']
== course.id].iloc[0].to_dict()
warehouse_course_name: str = warehouse_course_dict['name']
if course.name != warehouse_course_name:
course.name = warehouse_course_name
logger.info(f'Name for {course.id} has been updated.')
updated_fields.append('name')
warehouse_term_id: int = int(warehouse_course_dict['enrollment_term_id'])
if (course.term is None) or (course.term.id != warehouse_term_id):
course.term = AcademicTerms.objects.get(id=warehouse_term_id)
logger.info(f'Term for {course.id} has been updated.')
updated_fields.append('term')
warehouse_date_start: Union[datetime, None] = (
warehouse_course_dict['start_at'] if pd.notna(
warehouse_course_dict['start_at']) else None
)
updated_fields += self.soft_update_datetime_field(course, 'date_start', warehouse_date_start)
warehouse_date_end: Union[datetime, None] = (
warehouse_course_dict['conclude_at'] if pd.notna(
warehouse_course_dict['conclude_at']) else None
)
updated_fields += self.soft_update_datetime_field(course, 'date_end', warehouse_date_end)
if updated_fields:
course.save()
status += f'Course {course.id}: updated {", ".join(updated_fields)}\n'
return status
def do(self) -> str:
logger.info("** MyLA cron tab")
status = ""
run_start = datetime.now(ZoneInfo('UTC'))
status += f"Start cron: {str(run_start)} UTC\n"
course_verification = self.verify_course_ids()
invalid_course_id_list = course_verification.invalid_course_ids
if len(invalid_course_id_list) > 0:
# error out and stop cron job
status += f"ERROR: Those course ids are invalid: {invalid_course_id_list}\n"
status += "End cron: " + str(datetime.now()) + "\n"
logger.info("************ total status=" + status + "/n/n")
return status
# Lock in valid course IDs that data will be pulled for.
self.valid_locked_course_ids = [str(x) for x in course_verification.course_data['id'].to_list()]
logger.info(f'Valid locked course IDs: {self.valid_locked_course_ids}')
# continue cron tasks
status += self.update_term()
if len(self.valid_locked_course_ids) == 0:
logger.info("Skipping course-related table updates...")
status += "Skipped course-related table updates.\n"
else:
# Update the date unless there is an exception
exception_in_run = False
status += self.update_course(course_verification.course_data)
status += self.update_user()
status += self.update_groups()
status += self.update_assignment()
status += self.submission()
status += self.weight_consideration()
if 'show_resources_accessed' not in settings.VIEWS_DISABLED:
try:
status += self.update_resource_access()
status += self.update_canvas_resource()
except Exception as e:
logger.error(f"Exception running BigQuery update: {str(e)}")
status += str(e)
exception_in_run = True
status += self.update_unizin_metadata()
all_str_course_ids = set(
str(x) for x in Course.objects.get_supported_courses().values_list('id', flat=True)
)
courses_added_during_cron: List[str] = list(all_str_course_ids - set(self.valid_locked_course_ids))
if courses_added_during_cron:
logger.debug(
f'During the run, users added {len(courses_added_during_cron)} course(s): {courses_added_during_cron}')
logger.debug(f'No data was pulled for these courses.')
# Set all of the courses to have been updated now (this is the same set update_course runs on)
if not exception_in_run:
logger.info(f"Updating all valid courses from when this run was started at {run_start}")
Course.objects.filter(id__in=self.valid_locked_course_ids).update(data_last_updated=run_start)
else:
logger.warn("data_last_updated not updated because of an Exception during this run")
if settings.LRS_IS_BIGQUERY:
total_tbytes_billed = self.total_bytes_billed / 1024 / 1024 / 1024 / 1024
# $6.25 per TB as of Feb 2024 https://cloud.google.com/bigquery/pricing
total_tbytes_price = round(6.25 * total_tbytes_billed, 2)
status += (f'TBytes billed for BQ: {total_tbytes_billed} = '
f'${total_tbytes_price}\n')
status += "End cron: " + str(datetime.now()) + "\n"
logger.info("************ total status=" + status + "\n")
return status