Skip to content

Commit

Permalink
- Fix extension not applied for hidden courses
Browse files Browse the repository at this point in the history
- Ignore messages with empty changes field
- Add delay processing feature
  • Loading branch information
aydevworks committed Jan 24, 2025
1 parent c67c6f2 commit bf808c9
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 23 deletions.
54 changes: 39 additions & 15 deletions classes/extension/aws_queue_processor.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

namespace local_sitsgradepush\extension;

use core\clock;
use core\di;
use local_sitsgradepush\aws\sqs;
use local_sitsgradepush\logger;

Expand Down Expand Up @@ -44,6 +46,9 @@ abstract class aws_queue_processor {
/** @var string Message status - failed */
const STATUS_FAILED = 'failed';

/** @var string Message status - ignored */
const STATUS_IGNORED = 'ignored';

/** @var int Maximum number of batches */
const MAX_BATCHES = 30;

Expand All @@ -67,9 +72,9 @@ abstract protected function get_queue_url(): string;
* Process the message.
*
* @param array $messagebody AWS SQS message body
* @return void
* @return string Message processing status
*/
abstract protected function process_message(array $messagebody): void;
abstract protected function process_message(array $messagebody): string;

/**
* Get the queue name.
Expand Down Expand Up @@ -106,29 +111,46 @@ protected function fetch_messages(
* Check should we process the message.
*
* @param string $messageid AWS SQS Message ID
* @param array $messagebody AWS SQS Message body
* @return bool True if message is processed already, false otherwise
* @throws \dml_exception
*/
protected function should_not_process_message(string $messageid): bool {
protected function should_not_process_message(string $messageid, array $messagebody): bool {
global $DB;

try {
// Skip if message received time + delay time is greater than current time.
$delaytime = get_config('local_sitsgradepush', 'aws_delay_process_time') ?? 0;
if (isset($messagebody['Timestamp']) && strtotime($messagebody['Timestamp']) + $delaytime > di::get(clock::class)->time()) {
mtrace("Skipping message due to delay time: {$messageid}");
return true;
}

// Skip if message is already processed, ignored or exceeded maximum attempts.
$sql = 'SELECT id
FROM {local_sitsgradepush_aws_log}
WHERE messageid = :messageid
AND (status = :status OR attempts >= :attempts)';
// Allow processing if message has not been processed successfully or exceeded maximum attempts.
return $DB->record_exists_sql(
AND (status = :processed OR status = :ignored OR attempts >= :attempts)';

$handledmessages = $DB->record_exists_sql(
$sql,
[
'messageid' => $messageid,
'status' => self::STATUS_PROCESSED,
'processed' => self::STATUS_PROCESSED,
'ignored' => self::STATUS_IGNORED,
'attempts' => self::MAX_ATTEMPTS,
]
);
if ($handledmessages) {
mtrace("Skipping message due to already processed, ignored or exceeded maximum attempts: {$messageid}");
return true;
}

return false;
} catch (\Exception $e) {
logger::log($e->getMessage(), null, 'Failed to check message status');
return false;
mtrace("Skipping message due to exception: {$messageid}");
return true;
}
}

Expand Down Expand Up @@ -176,16 +198,18 @@ public function execute(): void {

foreach ($messages as $message) {
try {
if ($this->should_not_process_message($message['MessageId'])) {
mtrace("Skipping processed message or exceeded maximum attempts: {$message['MessageId']}");
continue;
}
$data = json_decode($message['Body'], true);
// Decode message body.
$messagebody = json_decode($message['Body'], true);
if (json_last_error() !== JSON_ERROR_NONE) {
throw new \Exception('Invalid JSON data: ' . json_last_error_msg());
}
$this->process_message($data);
$this->save_message_record($message, $this->get_queue_name());

if ($this->should_not_process_message($message['MessageId'], $messagebody)) {
continue;
}

$status = $this->process_message($messagebody);
$this->save_message_record($message, $this->get_queue_name(), $status);
$this->delete_message($message['ReceiptHandle']);
$processedcount++;
} catch (\Exception $e) {
Expand Down
2 changes: 1 addition & 1 deletion classes/extension/extension.php
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public function get_mappings_by_userid(int $userid): array {
global $DB;

// Find all enrolled courses for the student.
$courses = enrol_get_users_courses($userid, true);
$courses = enrol_get_users_courses($userid);

// Get courses that are in the current academic year.
$courses = array_filter($courses, function($course) {
Expand Down
19 changes: 17 additions & 2 deletions classes/extension/sora.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class sora extends extension {
/** @var string|null SORA message type */
protected ?string $soramessagetype;

/** @var array|null SORA changes */
protected ?array $sorachanges;

/**
* Return the whole time extension in seconds, including extra and rest duration.
*
Expand Down Expand Up @@ -159,6 +162,9 @@ public function set_properties_from_aws_message(string $messagebody): void {
// Set SORA message type.
$this->soramessagetype = $messagedata->entity->person_sora->type->code ?? null;

// Set SORA changes.
$this->sorachanges = $messagedata->changes ?? null;

// Check the message is valid and set student code.
$studentcode = $messagedata->entity->person_sora->person->student_code ?? null;
if (!empty($studentcode)) {
Expand Down Expand Up @@ -248,12 +254,21 @@ public function get_data_source(): string {
/**
* Get the SORA message type.
*
* @return string
* @return string|null
*/
public function get_sora_message_type(): string {
public function get_sora_message_type(): ?string {
return $this->soramessagetype;
}

/**
* Get the SORA changes.
*
* @return array|null
*/
public function get_sora_changes(): ?array {
return $this->sorachanges;
}

/**
* Calculate the time extension in seconds.
*
Expand Down
30 changes: 26 additions & 4 deletions classes/extension/sora_queue_processor.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,40 @@ protected function get_queue_url(): string {
* @throws \moodle_exception
* @throws \dml_exception
*/
protected function process_message(array $messagebody): void {
protected function process_message(array $messagebody): string {
$sora = new sora();
$sora->set_properties_from_aws_message($messagebody['Message']);

// As the assessment api only returns exam type SORA, we only process exam type SORA update from AWS.
if ($sora->get_sora_message_type() !== sora::SORA_MESSAGE_TYPE_EXAM) {
return;
// Check if we should ignore the message.
if ($this->should_ignore_message($sora)) {
return self::STATUS_IGNORED;
}

// Get all mappings for the student.
$mappings = $sora->get_mappings_by_userid($sora->get_userid());
$sora->process_extension($mappings);

return self::STATUS_PROCESSED;
}

/**
* Check if we should ignore the message.
*
* @param sora $sora
* @return bool
*/
protected function should_ignore_message(sora $sora): bool {
// As the assessment api only returns exam type SORA, we only process exam type SORA update from AWS.
if ($sora->get_sora_message_type() !== sora::SORA_MESSAGE_TYPE_EXAM) {
return true;
}

// If there are no changes, we should ignore the message.
if (empty($sora->get_sora_changes())) {
return true;
}

return false;
}

/**
Expand Down
2 changes: 2 additions & 0 deletions lang/en/local_sitsgradepush.php
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@
$string['settings:apiclient'] = 'API client';
$string['settings:apiclient:desc'] = 'Choose which API client to use';
$string['settings:apiclientselect'] = 'Select API client';
$string['settings:awsdelayprocesstime'] = 'AWS message delay process time';
$string['settings:awsdelayprocesstime:desc'] = 'Number of seconds to delay processing of AWS messages';
$string['settings:awskey'] = 'AWS Key';
$string['settings:awskey:desc'] = 'AWS access key id';
$string['settings:awsregion'] = 'AWS Region';
Expand Down
8 changes: 8 additions & 0 deletions settings.php
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,14 @@
get_string('settings:awssoraqueueurl:desc', 'local_sitsgradepush'),
'https://sqs.eu-west-2.amazonaws.com/540370667459/person-sora-dev'
));

// AWS delay process time.
$settings->add(new admin_setting_configtext_int_only('local_sitsgradepush/aws_delay_process_time',
get_string('settings:awsdelayprocesstime', 'local_sitsgradepush'),
get_string('settings:awsdelayprocesstime:desc', 'local_sitsgradepush'),
3900, // 1 hour and 5 minutes.
10
));
}

$subplugins = core_plugin_manager::instance()->get_plugins_of_type('sitsapiclient');
Expand Down
2 changes: 1 addition & 1 deletion version.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

$plugin->component = 'local_sitsgradepush';
$plugin->release = '0.1.0';
$plugin->version = 2024110102;
$plugin->version = 2024110103;
$plugin->requires = 2024042200;
$plugin->maturity = MATURITY_ALPHA;
$plugin->dependencies = [
Expand Down

0 comments on commit bf808c9

Please sign in to comment.