Skip to content

Commit

Permalink
Bump version to 0.8.0-prerelease-1, merge DB migrations for 0.8.0. (#…
Browse files Browse the repository at this point in the history
…3478)

Also, fix taskprov_peer_aggregators.role help text.

(Previously, the code & the schema disagreed on whether this was our or
our peer's role; go with the code's conception that this is our peer's
role.)
  • Loading branch information
branlwyd authored Nov 12, 2024
1 parent a941244 commit bf396e0
Show file tree
Hide file tree
Showing 17 changed files with 68 additions and 91 deletions.
40 changes: 20 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ homepage = "https://divviup.org"
license = "MPL-2.0"
repository = "https://github.com/divviup/janus"
rust-version = "1.77.0"
version = "0.7.41"
version = "0.8.0-prerelease-1"

[workspace.dependencies]
anyhow = "1"
Expand Down Expand Up @@ -50,14 +50,14 @@ hpke-dispatch = "0.7.0"
http = "1.1"
http-api-problem = "0.58.0"
itertools = "0.13"
janus_aggregator = { version = "0.7.41", path = "aggregator" }
janus_aggregator_api = { version = "0.7.41", path = "aggregator_api" }
janus_aggregator_core = { version = "0.7.41", path = "aggregator_core" }
janus_client = { version = "0.7.41", path = "client" }
janus_collector = { version = "0.7.41", path = "collector" }
janus_core = { version = "0.7.41", path = "core" }
janus_interop_binaries = { version = "0.7.41", path = "interop_binaries" }
janus_messages = { version = "0.7.41", path = "messages" }
janus_aggregator = { version = "0.8.0-prerelease-1", path = "aggregator" }
janus_aggregator_api = { version = "0.8.0-prerelease-1", path = "aggregator_api" }
janus_aggregator_core = { version = "0.8.0-prerelease-1", path = "aggregator_core" }
janus_client = { version = "0.8.0-prerelease-1", path = "client" }
janus_collector = { version = "0.8.0-prerelease-1", path = "collector" }
janus_core = { version = "0.8.0-prerelease-1", path = "core" }
janus_interop_binaries = { version = "0.8.0-prerelease-1", path = "interop_binaries" }
janus_messages = { version = "0.8.0-prerelease-1", path = "messages" }
k8s-openapi = { version = "0.22.0", features = ["v1_26"] } # keep this version in sync with what is referenced by the indirect dependency via `kube`
kube = { version = "0.94.2", default-features = false, features = ["client", "rustls-tls"] }
mockito = "1.6.0"
Expand Down
2 changes: 1 addition & 1 deletion aggregator_core/src/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ macro_rules! supported_schema_versions {
// version is seen, [`Datastore::new`] fails.
//
// Note that the latest supported version must be first in the list.
supported_schema_versions!(7);
supported_schema_versions!(1);

/// Datastore represents a datastore for Janus, with support for transactional reads and writes.
/// In practice, Datastore instances are currently backed by a PostgreSQL database.
Expand Down
4 changes: 3 additions & 1 deletion db/00000000000001_initial_schema.down.sql
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
DROP INDEX outstanding_batches_task_and_time_bucket_index;
DROP INDEX outstanding_batches_task_id_and_time_bucket_start;
DROP TABLE outstanding_batches CASCADE;
DROP TYPE OUTSTANDING_BATCH_STATE CASCADE;
DROP INDEX aggregate_share_jobs_interval_containment_index CASCADE;
DROP TABLE aggregate_share_jobs CASCADE;
DROP INDEX collection_jobs_interval_containment_index CASCADE;
DROP INDEX collection_jobs_state_and_lease_expiry CASCADE;
DROP INDEX collection_jobs_task_id_batch_id CASCADE;
DROP TABLE collection_jobs CASCADE;
DROP TYPE COLLECTION_JOB_STATE CASCADE;
DROP INDEX batch_aggregations_gc_time CASCADE;
DROP TABLE batch_aggregations CASCADE;
DROP TYPE BATCH_AGGREGATION_STATE;
DROP INDEX report_aggregations_client_report_id_index CASCADE;
Expand Down
43 changes: 35 additions & 8 deletions db/00000000000001_initial_schema.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,25 @@ CREATE TABLE global_hpke_keys(

-- These columns are mutable.
state HPKE_KEY_STATE NOT NULL DEFAULT 'PENDING', -- state of the key
updated_at TIMESTAMP NOT NULL, -- when the key state was last changed
last_state_change_at TIMESTAMP NOT NULL, -- when the key state was last changed. Used for key rotation logic.

-- creation/update records
created_at TIMESTAMP NOT NULL, -- when the row was created
updated_at TIMESTAMP NOT NULL, -- when the row was last changed
updated_by TEXT NOT NULL -- the name of the transaction that last updated the row
);

-- Another DAP aggregator who we've partnered with to use the taskprov extension.
CREATE TABLE taskprov_peer_aggregators(
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal only.
endpoint TEXT NOT NULL, -- peer aggregator HTTPS endpoint
role AGGREGATOR_ROLE NOT NULL, -- the role of this aggregator relative to the peer
verify_key_init BYTEA NOT NULL, -- the preshared key used for VDAF verify key derivation.
endpoint TEXT NOT NULL, -- peer aggregator HTTPS endpoint
role AGGREGATOR_ROLE NOT NULL, -- the role of this peer aggregator
verify_key_init BYTEA NOT NULL, -- the preshared key used for VDAF verify key derivation.

-- Parameters applied to every task created with this peer aggregator.
tolerable_clock_skew BIGINT NOT NULL, -- the maximum acceptable clock skew to allow between client and aggregator, in seconds
report_expiry_age BIGINT, -- the maximum age of a report before it is considered expired (and acceptable for garbage collection), in seconds. NULL means that GC is disabled.
collector_hpke_config BYTEA NOT NULL, -- the HPKE config of the collector (encoded HpkeConfig message)
tolerable_clock_skew BIGINT NOT NULL, -- the maximum acceptable clock skew to allow between client and aggregator, in seconds
report_expiry_age BIGINT, -- the maximum age of a report before it is considered expired (and acceptable for garbage collection), in seconds. NULL means that GC is disabled.
collector_hpke_config BYTEA NOT NULL, -- the HPKE config of the collector (encoded HpkeConfig message)

-- creation/update records
created_at TIMESTAMP NOT NULL, -- when the row was created
Expand Down Expand Up @@ -165,6 +166,21 @@ CREATE TABLE task_upload_counters(
CONSTRAINT task_upload_counters_unique UNIQUE(task_id, ord)
) WITH (fillfactor = 50);

-- Per-task report aggregation counters, used for metrics.
--
-- Fillfactor is lowered to improve the likelihood of heap-only tuple optimizations. See the
-- discussion around this setting for the task_upload_counters table.
CREATE TABLE task_aggregation_counters(
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal only
task_id BIGINT NOT NULL, -- task ID the counter is associated with
ord BIGINT NOT NULL, -- the ordinal index of the task aggregation counter

success BIGINT NOT NULL DEFAULT 0, -- reports successfully aggregated

CONSTRAINT task_aggregation_counters_unique_id UNIQUE(task_id, ord),
CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) ON DELETE CASCADE
) WITH (fillfactor = 50);

-- The HPKE public keys (aka configs) and private keys used by a given task.
CREATE TABLE task_hpke_keys(
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only
Expand Down Expand Up @@ -321,6 +337,7 @@ CREATE TABLE batch_aggregations(
CONSTRAINT batch_aggregations_unique_task_id_batch_id_aggregation_param UNIQUE(task_id, batch_identifier, aggregation_param, ord),
CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) ON DELETE CASCADE
);
CREATE INDEX batch_aggregations_gc_time ON batch_aggregations(task_id, UPPER(COALESCE(batch_interval, client_timestamp_interval)));

-- Specifies the possible state of a collection job.
CREATE TYPE COLLECTION_JOB_STATE AS ENUM(
Expand All @@ -345,6 +362,8 @@ CREATE TABLE collection_jobs(
helper_aggregate_share BYTEA, -- the helper's encrypted aggregate share (HpkeCiphertext, only if in state FINISHED)
leader_aggregate_share BYTEA, -- the leader's unencrypted aggregate share (opaque VDAF message, only if in state FINISHED)

step_attempts BIGINT NOT NULL DEFAULT 0, -- the number of attempts to step the collection job without making progress, regardless of whether the lease was successfully released or not

lease_expiry TIMESTAMP NOT NULL DEFAULT TIMESTAMP '-infinity', -- when lease on this collection job expires; -infinity implies no current lease
lease_token BYTEA, -- a value identifying the current leaseholder; NULL implies no current lease
lease_attempts BIGINT NOT NULL DEFAULT 0, -- the number of lease acquiries since the last successful lease release
Expand Down Expand Up @@ -382,6 +401,12 @@ CREATE TABLE aggregate_share_jobs(
);
CREATE INDEX aggregate_share_jobs_interval_containment_index ON aggregate_share_jobs USING gist (task_id, batch_interval);

-- Specifies the possible state of an outstanding batch.
CREATE TYPE OUTSTANDING_BATCH_STATE AS ENUM(
'FILLING', -- this outstanding batch is still being considered for additional reports
'FILLED' -- this outstanding batch has received enough reports, no more are necessary
);

-- The leader's view of outstanding batches, which are batches which have not yet started
-- collection. Used for fixed-size tasks only.
CREATE TABLE outstanding_batches(
Expand All @@ -390,11 +415,13 @@ CREATE TABLE outstanding_batches(
batch_id BYTEA NOT NULL, -- 32-byte BatchID as defined by the DAP specification.
time_bucket_start TIMESTAMP,

state OUTSTANDING_BATCH_STATE NOT NULL DEFAULT 'FILLING', -- the current state of this outstanding batch

-- creation/update records
created_at TIMESTAMP NOT NULL, -- when the row was created
updated_by TEXT NOT NULL, -- the name of the transaction that last updated the row

CONSTRAINT outstanding_batches_unique_task_id_batch_id UNIQUE(task_id, batch_id),
CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) ON DELETE CASCADE
);
CREATE INDEX outstanding_batches_task_and_time_bucket_index ON outstanding_batches (task_id, time_bucket_start);
CREATE INDEX outstanding_batches_task_id_and_time_bucket_start ON outstanding_batches(task_id, time_bucket_start) WHERE state = 'FILLING';
1 change: 0 additions & 1 deletion db/00000000000002_collection_job_step_attempts.down.sql

This file was deleted.

3 changes: 0 additions & 3 deletions db/00000000000002_collection_job_step_attempts.up.sql

This file was deleted.

4 changes: 0 additions & 4 deletions db/00000000000003_outstanding_batch_state.down.sql

This file was deleted.

8 changes: 0 additions & 8 deletions db/00000000000003_outstanding_batch_state.up.sql

This file was deleted.

1 change: 0 additions & 1 deletion db/00000000000004_batch_aggregation_gc.down.sql

This file was deleted.

2 changes: 0 additions & 2 deletions db/00000000000004_batch_aggregation_gc.up.sql

This file was deleted.

This file was deleted.

11 changes: 0 additions & 11 deletions db/00000000000005_global_hpke_keys_last_state_change_at.up.sql

This file was deleted.

This file was deleted.

This file was deleted.

1 change: 0 additions & 1 deletion db/00000000000007_task_aggregation_counters.down.sql

This file was deleted.

Loading

0 comments on commit bf396e0

Please sign in to comment.