Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(filemanager): ensure events are ingested in the correct order #93

Merged
merged 30 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d1b8e22
feat(filemanager): record maximum sequencer value
mmalenic Jan 23, 2024
97441a0
feat(filemanager): select reordered deleted events
mmalenic Jan 24, 2024
d48e863
test(filemanager): expand tests with more event records
mmalenic Jan 24, 2024
63a9914
test(filemanager): update existing object when a reordered event is f…
mmalenic Jan 25, 2024
77cdfe4
test(filemanager): test reorder when no update should occur
mmalenic Jan 28, 2024
4f324df
feat(filemanager): add reorder sql function for created events
mmalenic Jan 29, 2024
f34b710
refactor(filemanager): update queries support multiple nested input p…
mmalenic Jan 29, 2024
a8c9f87
refactor(filemanager): remove s3_object_id from join because it is no…
mmalenic Jan 29, 2024
716ae04
refactor(filemanager): update query with flat s3 event output
mmalenic Jan 29, 2024
4c4f11c
feat(filemanager): implement insert/reorder logic in ingester
mmalenic Jan 30, 2024
e30626a
fix(filemanager): only return values to reprocess if the sequencer is…
mmalenic Jan 30, 2024
78e9121
refactor(filemanager): convert update queries to functions
mmalenic Feb 1, 2024
4aea5d9
refactor(filemanager): catch exception when conflict occurs
mmalenic Feb 1, 2024
009a7c3
refactor(filemanager): ditch functions and revert to plain query
mmalenic Feb 2, 2024
9c7a071
fix(filemanager): check for unique constraint before updating
mmalenic Feb 2, 2024
da5dfee
test(filemanager): add basic reorder test
mmalenic Feb 2, 2024
597d70b
refactor(filemanager): add size and checksum to s3 object
mmalenic Feb 2, 2024
7449622
refactor(filemanager): coalesce some optional fields when updating
mmalenic Feb 4, 2024
04f3237
test(filemanager): insert query tests
mmalenic Feb 4, 2024
32d5802
test(filemanager): add re-order permutation test
mmalenic Feb 5, 2024
f83c0f4
refactor(filemanager): code tidy and refactor repetition
mmalenic Feb 5, 2024
8f5b8a9
style(filemanager): add newlines and debug statement
mmalenic Feb 5, 2024
60da520
docs(filemanager): add description of event ordering and deduplicatio…
mmalenic Feb 5, 2024
5d08f7c
refactor(filemanager): comments, move event ingestion to ARCHITECTURE…
mmalenic Feb 6, 2024
710d341
fix(filemanager): only update object fields on event created, and ign…
mmalenic Feb 6, 2024
4a2050f
docs(filemanager): add note for ARCHITECTURE.md
mmalenic Feb 6, 2024
cc19f10
Merge branch 'main' of github.com:umccr/orcabus into feat/reorder-events
mmalenic Feb 8, 2024
76e53ca
docs(filemanager): update ARCHITECTURE.md
mmalenic Feb 13, 2024
f9847b8
Merge branch 'main' of github.com:umccr/orcabus into feat/reorder-events
mmalenic Feb 13, 2024
528e90c
Merge branch 'main' of github.com:umccr/orcabus into feat/reorder-events
mmalenic Feb 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions lib/workload/stateful/filemanager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ cargo clippy --all-targets --all-features
cargo fmt
```

Note, some tests can take a while to run so they are ignored by default. It is still useful to run these tests sometimes,
especially if changing code related to them. To do so, run the ignored tests with the `--ignored` flag:

```sh
cargo test --all-targets --all-features -- --ignored
```

## Database

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
-- An general object table common across all storage types.
create table object (
-- The unique id for this object.
object_id uuid not null primary key default gen_random_uuid(),
-- The size of the object.
size integer default null,
-- A unique identifier for the object, if it is present.
checksum text default null
object_id uuid not null primary key
);
Original file line number Diff line number Diff line change
@@ -1,38 +1,54 @@
create type storage_class as enum ('DeepArchive', 'Glacier', 'GlacierIr', 'IntelligentTiering', 'OnezoneIa', 'Outposts', 'ReducedRedundancy', 'Snow', 'Standard', 'StandardIa');
-- The AWS S3 storage classes.
create type storage_class as enum (
'DeepArchive',
'Glacier',
'GlacierIr',
'IntelligentTiering',
'OnezoneIa',
'Outposts',
'ReducedRedundancy',
'Snow',
'Standard',
'StandardIa'
);

-- An object contain in AWS S3, maps as a one-to-one relationship with the object table.
create table s3_object(
create table s3_object (
-- The s3 object id.
s3_object_id uuid not null primary key default gen_random_uuid(),
s3_object_id uuid not null primary key,
-- This is initially deferred because we want to create an s3_object before an object to check for duplicates/order.
object_id uuid references object (object_id) deferrable initially deferred,
object_id uuid not null references object (object_id) deferrable initially deferred,

-- General fields
-- The bucket of the object.
bucket text not null,
-- The key of the object.
key text not null,
-- When this object was created.
created_date timestamptz not null default now(),
-- When this object was created. A null value here means that a deleted event has occurred before a created event.
created_date timestamptz default null,
-- When this object was deleted, a null value means that the object has not yet been deleted.
deleted_date timestamptz default null,
-- provenance - history of all objects and how they move?
-- The size of the object.
size integer default null,
-- A unique identifier for the object, if it is present.
checksum text default null,

-- AWS-specific fields
-- The AWS last modified value.
last_modified_date timestamptz default null,
-- An S3-specific e_tag, if it is present.
e_tag text default null,
-- The S3 storage class of the object.
storage_class storage_class not null,
storage_class storage_class default null,
-- The version id of the object, if present.
version_id text default null,
-- A sequencer value for when the object was created. Used to synchronise out of order and duplicate events.
created_sequencer text default null,
-- A sequencer value for when the object was deleted. Used to synchronise out of order and duplicate events.
deleted_sequencer text default null,
-- Record whether the event that generated this object was ever out of order, useful for debugging.
event_out_of_order boolean not null default false,
-- Record the number of times this event has been considered out of order, useful for debugging.
number_reordered integer not null default 0,
-- Record the number of duplicate events received for this object, useful for debugging.
number_duplicate_events integer not null default 0,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ insert into s3_object (
bucket,
key,
created_date,
size,
checksum,
last_modified_date,
e_tag,
storage_class,
Expand All @@ -17,11 +19,13 @@ values (
unnest($3::text[]),
unnest($4::text[]),
unnest($5::timestamptz[]),
unnest($6::timestamptz[]),
unnest($6::integer[]),
unnest($7::text[]),
unnest($8::storage_class[]),
unnest($8::timestamptz[]),
unnest($9::text[]),
unnest($10::text[])
unnest($10::storage_class[]),
unnest($11::text[]),
unnest($12::text[])
) on conflict on constraint created_sequencer_unique do update
set number_duplicate_events = s3_object.number_duplicate_events + 1
returning object_id, number_duplicate_events;
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,30 @@ insert into s3_object (
object_id,
bucket,
key,
-- We default the created date to a value event if this is a deleted event,
-- as we are expecting this to get updated.
created_date,
deleted_date,
size,
checksum,
last_modified_date,
e_tag,
storage_class,
version_id,
deleted_sequencer
deleted_sequencer,
number_reordered
)
values (
unnest($1::uuid[]),
unnest($2::uuid[]),
unnest($3::text[]),
unnest($4::text[]),
unnest($5::timestamptz[]),
unnest($6::timestamptz[]),
unnest($7::timestamptz[]),
unnest($8::text[]),
unnest($9::storage_class[]),
unnest($10::text[]),
unnest($11::text[])
unnest($1::uuid[]),
unnest($2::uuid[]),
unnest($3::text[]),
unnest($4::text[]),
unnest($5::timestamptz[]),
unnest($6::integer[]),
unnest($7::text[]),
unnest($8::timestamptz[]),
unnest($9::text[]),
unnest($10::storage_class[]),
unnest($11::text[]),
unnest($12::text[]),
unnest($13::integer[])
) on conflict on constraint deleted_sequencer_unique do update
set number_duplicate_events = s3_object.number_duplicate_events + 1
returning object_id, number_duplicate_events;

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
-- Update the matching s3_objects which should be re-ordered based on the created event. Returns the
-- data associated with the event before the update, if an update occurred.

-- First, unnest the input parameters into a query.
with input as (
select
*
from unnest(
$1::uuid[],
$2::text[],
$3::text[],
$4::timestamptz[],
$5::integer[],
$6::text[],
$7::timestamptz[],
$8::text[],
$9::storage_class[],
$10::text[],
$11::text[]
) as input (
s3_object_id,
bucket,
key,
created_date,
size,
checksum,
last_modified_date,
e_tag,
storage_class,
version_id,
created_sequencer
)
),
-- Then, select the objects that need to be updated.
current_objects as (
select
s3_object.*,
input.s3_object_id as input_id,
input.bucket as input_bucket,
input.key as input_key,
input.version_id as input_version_id,
input.created_sequencer as input_created_sequencer,
input.created_date as input_created_date,
input.size as input_size,
input.checksum as input_checksum,
input.last_modified_date as input_last_modified_date,
input.e_tag as input_e_tag,
input.storage_class as input_storage_class
from s3_object
-- Grab the relevant values to update with.
join input on
input.bucket = s3_object.bucket and
input.key = s3_object.key and
input.version_id = s3_object.version_id
-- Lock this pre-emptively for the update.
for update
),
-- And filter them to the objects that need to be updated.
objects_to_update as (
select
*
from current_objects
where
-- Check the sequencer condition. We only update if there is a created
-- sequencer that is closer to the deleted sequencer.
current_objects.deleted_sequencer > current_objects.input_created_sequencer and
(
-- Updating a null sequencer doesn't cause the event to be reprocessed.
current_objects.created_sequencer is null or
-- If a sequencer already exists this event should be reprocessed because this
-- sequencer could belong to another object.
current_objects.created_sequencer < current_objects.input_created_sequencer
)
-- And there should not be any objects with a created sequencer that is the same as the input created
-- sequencer because this is a duplicate event that would cause a constraint error in the update.
and current_objects.input_created_sequencer not in (
select created_sequencer from current_objects where created_sequencer is not null
)
),
-- Finally, update the required objects.
update as (
update s3_object
set created_sequencer = objects_to_update.input_created_sequencer,
created_date = objects_to_update.input_created_date,
size = coalesce(objects_to_update.input_size, objects_to_update.size),
checksum = coalesce(objects_to_update.input_checksum, objects_to_update.checksum),
last_modified_date = coalesce(objects_to_update.input_last_modified_date, objects_to_update.last_modified_date),
e_tag = coalesce(objects_to_update.e_tag, objects_to_update.e_tag),
storage_class = objects_to_update.storage_class,
number_reordered = s3_object.number_reordered +
-- Note the asymmetry between this and the reorder for deleted query.
case when objects_to_update.deleted_sequencer is not null or objects_to_update.created_sequencer is not null then
1
else
0
end
from objects_to_update
where s3_object.s3_object_id = objects_to_update.s3_object_id
)
-- Return the old values because these need to be reprocessed.
select
-- Note, this is the passed through value from the input in order to identify this event later.
input_id as "s3_object_id!",
bucket,
key,
created_date as event_time,
last_modified_date,
e_tag,
storage_class as "storage_class?: StorageClass",
version_id,
created_sequencer as sequencer,
number_reordered,
number_duplicate_events,
size,
-- This is used to simplify re-constructing the FlatS3EventMessages in the Lambda. I.e. this update detected an
-- out of order created event, so return a created event back.
'Created' as "event_type!: EventType"
from objects_to_update;
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
-- Update the matching s3_objects which should be re-ordered based on the deleted event. Returns the
-- data associated with the event before the update, if an update occurred.

-- First, unnest the input parameters into a query.
with input as (
select
*
from unnest(
$1::uuid[],
$2::text[],
$3::text[],
$4::timestamptz[],
$5::text[],
$6::text[]
) as input (
s3_object_id,
bucket,
key,
deleted_date,
version_id,
deleted_sequencer
)
),
-- Then, select the objects that match the bucket, key and version_id
current_objects as (
select
s3_object.*,
input.s3_object_id as input_id,
input.bucket as input_bucket,
input.key as input_key,
input.version_id as input_version_id,
input.deleted_sequencer as input_deleted_sequencer,
input.deleted_date as input_deleted_date
from s3_object
-- Grab the relevant values to update with.
join input on
input.bucket = s3_object.bucket and
input.key = s3_object.key and
input.version_id = s3_object.version_id
-- Lock this pre-emptively for the update.
for update
),
-- And filter them to the objects that need to be updated.
objects_to_update as (
select
*
from current_objects
where
-- Check the sequencer condition. We only update if there is a deleted
-- sequencer that is closer to the created sequencer.
current_objects.created_sequencer < current_objects.input_deleted_sequencer and
(
-- Updating a null sequencer doesn't cause the event to be reprocessed.
current_objects.deleted_sequencer is null or
-- If a sequencer already exists this event should be reprocessed because this
-- sequencer would belong to another object.
current_objects.deleted_sequencer > current_objects.input_deleted_sequencer
)
-- And there should not be any objects with a deleted sequencer that is the same as the input deleted
-- sequencer because this is a duplicate event that would cause a constraint error in the update.
and current_objects.input_deleted_sequencer not in (
select deleted_sequencer from current_objects where deleted_sequencer is not null
)
),
-- Finally, update the required objects.
update as (
update s3_object
set deleted_sequencer = objects_to_update.input_deleted_sequencer,
deleted_date = objects_to_update.input_deleted_date,
number_reordered = s3_object.number_reordered +
case when objects_to_update.deleted_sequencer is null then 0 else 1 end
from objects_to_update
where s3_object.s3_object_id = objects_to_update.s3_object_id
)
-- Return the old values because these need to be reprocessed.
select
-- Note, this is the passed through value from the input in order to identify this event later.
input_id as "s3_object_id!",
bucket,
key,
deleted_date as event_time,
last_modified_date,
e_tag,
storage_class as "storage_class?: StorageClass",
version_id,
deleted_sequencer as sequencer,
number_reordered,
number_duplicate_events,
size,
-- This is used to simplify re-constructing the FlatS3EventMessages in the Lambda. I.e. this update detected an
-- out of order deleted event, so return a deleted event back.
'Deleted' as "event_type!: EventType"
from objects_to_update;
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
-- Bulk insert of objects
insert into object (object_id, size, checksum)
insert into object (object_id)
values (
unnest($1::uuid[]),
unnest($2::int[]),
unnest($3::text[])
unnest($1::uuid[])
);
Loading
Loading