diff --git a/Cargo.lock b/Cargo.lock index c5aaa57ac..67d069732 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2533,7 +2533,7 @@ checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" [[package]] name = "janus_aggregator" -version = "0.7.41" +version = "0.8.0-prerelease-1" dependencies = [ "anyhow", "assert_matches", @@ -2557,7 +2557,7 @@ dependencies = [ "janus_aggregator_api", "janus_aggregator_core", "janus_core", - "janus_messages 0.7.41", + "janus_messages 0.8.0-prerelease-1", "k8s-openapi", "kube", "mockito", @@ -2619,7 +2619,7 @@ dependencies = [ [[package]] name = "janus_aggregator_api" -version = "0.7.41" +version = "0.8.0-prerelease-1" dependencies = [ "anyhow", "assert_matches", @@ -2630,7 +2630,7 @@ dependencies = [ "git-version", "janus_aggregator_core", "janus_core", - "janus_messages 0.7.41", + "janus_messages 0.8.0-prerelease-1", "opentelemetry", "querystring", "rand", @@ -2652,7 +2652,7 @@ dependencies = [ [[package]] name = "janus_aggregator_core" -version = "0.7.41" +version = "0.8.0-prerelease-1" dependencies = [ "anyhow", "assert_matches", @@ -2672,7 +2672,7 @@ dependencies = [ "itertools 0.13.0", "janus_aggregator_core", "janus_core", - "janus_messages 0.7.41", + "janus_messages 0.8.0-prerelease-1", "k8s-openapi", "kube", "opentelemetry", @@ -2706,7 +2706,7 @@ dependencies = [ [[package]] name = "janus_client" -version = "0.7.41" +version = "0.8.0-prerelease-1" dependencies = [ "assert_matches", "backoff", @@ -2716,7 +2716,7 @@ dependencies = [ "http", "itertools 0.13.0", "janus_core", - "janus_messages 0.7.41", + "janus_messages 0.8.0-prerelease-1", "mockito", "ohttp", "prio", @@ -2732,7 +2732,7 @@ dependencies = [ [[package]] name = "janus_collector" -version = "0.7.41" +version = "0.8.0-prerelease-1" dependencies = [ "assert_matches", "backoff", @@ -2743,7 +2743,7 @@ dependencies = [ "hpke-dispatch", "janus_collector", "janus_core", - "janus_messages 0.7.41", + "janus_messages 0.8.0-prerelease-1", "mockito", "prio", "rand", @@ -2759,7 +2759,7 @@ dependencies = [ [[package]] name = "janus_core" -version = "0.7.41" +version = "0.8.0-prerelease-1" dependencies = [ "anyhow", "assert_matches", @@ -2776,7 +2776,7 @@ dependencies = [ "http", "http-api-problem", "janus_core", - "janus_messages 0.7.41", + "janus_messages 0.8.0-prerelease-1", "k8s-openapi", "kube", "mockito", @@ -2808,7 +2808,7 @@ dependencies = [ [[package]] name = "janus_integration_tests" -version = "0.7.41" +version = "0.8.0-prerelease-1" dependencies = [ "anyhow", "assert_matches", @@ -2828,7 +2828,7 @@ dependencies = [ "janus_collector", "janus_core", "janus_interop_binaries", - "janus_messages 0.7.41", + "janus_messages 0.8.0-prerelease-1", "k8s-openapi", "kube", "opentelemetry", @@ -2854,7 +2854,7 @@ dependencies = [ [[package]] name = "janus_interop_binaries" -version = "0.7.41" +version = "0.8.0-prerelease-1" dependencies = [ "anyhow", "backoff", @@ -2870,7 +2870,7 @@ dependencies = [ "janus_collector", "janus_core", "janus_interop_binaries", - "janus_messages 0.7.41", + "janus_messages 0.8.0-prerelease-1", "prio", "rand", "regex", @@ -2911,7 +2911,7 @@ dependencies = [ [[package]] name = "janus_messages" -version = "0.7.41" +version = "0.8.0-prerelease-1" dependencies = [ "anyhow", "assert_matches", @@ -2930,7 +2930,7 @@ dependencies = [ [[package]] name = "janus_tools" -version = "0.7.41" +version = "0.8.0-prerelease-1" dependencies = [ "anyhow", "assert_matches", @@ -2941,7 +2941,7 @@ dependencies = [ "fixed", "janus_collector", "janus_core", - "janus_messages 0.7.41", + "janus_messages 0.8.0-prerelease-1", "prio", "rand", "reqwest", @@ -6823,7 +6823,7 @@ dependencies = [ [[package]] name = "xtask" -version = "0.7.41" +version = "0.8.0-prerelease-1" dependencies = [ "anyhow", "clap", diff --git a/Cargo.toml b/Cargo.toml index 3ced55d31..54c8a40db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ homepage = "https://divviup.org" license = "MPL-2.0" repository = "https://github.com/divviup/janus" rust-version = "1.77.0" -version = "0.7.41" +version = "0.8.0-prerelease-1" [workspace.dependencies] anyhow = "1" @@ -50,14 +50,14 @@ hpke-dispatch = "0.7.0" http = "1.1" http-api-problem = "0.58.0" itertools = "0.13" -janus_aggregator = { version = "0.7.41", path = "aggregator" } -janus_aggregator_api = { version = "0.7.41", path = "aggregator_api" } -janus_aggregator_core = { version = "0.7.41", path = "aggregator_core" } -janus_client = { version = "0.7.41", path = "client" } -janus_collector = { version = "0.7.41", path = "collector" } -janus_core = { version = "0.7.41", path = "core" } -janus_interop_binaries = { version = "0.7.41", path = "interop_binaries" } -janus_messages = { version = "0.7.41", path = "messages" } +janus_aggregator = { version = "0.8.0-prerelease-1", path = "aggregator" } +janus_aggregator_api = { version = "0.8.0-prerelease-1", path = "aggregator_api" } +janus_aggregator_core = { version = "0.8.0-prerelease-1", path = "aggregator_core" } +janus_client = { version = "0.8.0-prerelease-1", path = "client" } +janus_collector = { version = "0.8.0-prerelease-1", path = "collector" } +janus_core = { version = "0.8.0-prerelease-1", path = "core" } +janus_interop_binaries = { version = "0.8.0-prerelease-1", path = "interop_binaries" } +janus_messages = { version = "0.8.0-prerelease-1", path = "messages" } k8s-openapi = { version = "0.22.0", features = ["v1_26"] } # keep this version in sync with what is referenced by the indirect dependency via `kube` kube = { version = "0.94.2", default-features = false, features = ["client", "rustls-tls"] } mockito = "1.6.0" diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index 16bc49f2a..838234caf 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -103,7 +103,7 @@ macro_rules! supported_schema_versions { // version is seen, [`Datastore::new`] fails. // // Note that the latest supported version must be first in the list. -supported_schema_versions!(7); +supported_schema_versions!(1); /// Datastore represents a datastore for Janus, with support for transactional reads and writes. /// In practice, Datastore instances are currently backed by a PostgreSQL database. diff --git a/db/00000000000001_initial_schema.down.sql b/db/00000000000001_initial_schema.down.sql index 9b6bd2e5f..03b77c9ae 100644 --- a/db/00000000000001_initial_schema.down.sql +++ b/db/00000000000001_initial_schema.down.sql @@ -1,5 +1,6 @@ -DROP INDEX outstanding_batches_task_and_time_bucket_index; +DROP INDEX outstanding_batches_task_id_and_time_bucket_start; DROP TABLE outstanding_batches CASCADE; +DROP TYPE OUTSTANDING_BATCH_STATE CASCADE; DROP INDEX aggregate_share_jobs_interval_containment_index CASCADE; DROP TABLE aggregate_share_jobs CASCADE; DROP INDEX collection_jobs_interval_containment_index CASCADE; @@ -7,6 +8,7 @@ DROP INDEX collection_jobs_state_and_lease_expiry CASCADE; DROP INDEX collection_jobs_task_id_batch_id CASCADE; DROP TABLE collection_jobs CASCADE; DROP TYPE COLLECTION_JOB_STATE CASCADE; +DROP INDEX batch_aggregations_gc_time CASCADE; DROP TABLE batch_aggregations CASCADE; DROP TYPE BATCH_AGGREGATION_STATE; DROP INDEX report_aggregations_client_report_id_index CASCADE; diff --git a/db/00000000000001_initial_schema.up.sql b/db/00000000000001_initial_schema.up.sql index fe8ba34b8..1433847eb 100644 --- a/db/00000000000001_initial_schema.up.sql +++ b/db/00000000000001_initial_schema.up.sql @@ -31,24 +31,25 @@ CREATE TABLE global_hpke_keys( -- These columns are mutable. state HPKE_KEY_STATE NOT NULL DEFAULT 'PENDING', -- state of the key - updated_at TIMESTAMP NOT NULL, -- when the key state was last changed + last_state_change_at TIMESTAMP NOT NULL, -- when the key state was last changed. Used for key rotation logic. -- creation/update records created_at TIMESTAMP NOT NULL, -- when the row was created + updated_at TIMESTAMP NOT NULL, -- when the row was last changed updated_by TEXT NOT NULL -- the name of the transaction that last updated the row ); -- Another DAP aggregator who we've partnered with to use the taskprov extension. CREATE TABLE taskprov_peer_aggregators( id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal only. - endpoint TEXT NOT NULL, -- peer aggregator HTTPS endpoint - role AGGREGATOR_ROLE NOT NULL, -- the role of this aggregator relative to the peer - verify_key_init BYTEA NOT NULL, -- the preshared key used for VDAF verify key derivation. + endpoint TEXT NOT NULL, -- peer aggregator HTTPS endpoint + role AGGREGATOR_ROLE NOT NULL, -- the role of this peer aggregator + verify_key_init BYTEA NOT NULL, -- the preshared key used for VDAF verify key derivation. -- Parameters applied to every task created with this peer aggregator. - tolerable_clock_skew BIGINT NOT NULL, -- the maximum acceptable clock skew to allow between client and aggregator, in seconds - report_expiry_age BIGINT, -- the maximum age of a report before it is considered expired (and acceptable for garbage collection), in seconds. NULL means that GC is disabled. - collector_hpke_config BYTEA NOT NULL, -- the HPKE config of the collector (encoded HpkeConfig message) + tolerable_clock_skew BIGINT NOT NULL, -- the maximum acceptable clock skew to allow between client and aggregator, in seconds + report_expiry_age BIGINT, -- the maximum age of a report before it is considered expired (and acceptable for garbage collection), in seconds. NULL means that GC is disabled. + collector_hpke_config BYTEA NOT NULL, -- the HPKE config of the collector (encoded HpkeConfig message) -- creation/update records created_at TIMESTAMP NOT NULL, -- when the row was created @@ -165,6 +166,21 @@ CREATE TABLE task_upload_counters( CONSTRAINT task_upload_counters_unique UNIQUE(task_id, ord) ) WITH (fillfactor = 50); +-- Per-task report aggregation counters, used for metrics. +-- +-- Fillfactor is lowered to improve the likelihood of heap-only tuple optimizations. See the +-- discussion around this setting for the task_upload_counters table. +CREATE TABLE task_aggregation_counters( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal only + task_id BIGINT NOT NULL, -- task ID the counter is associated with + ord BIGINT NOT NULL, -- the ordinal index of the task aggregation counter + + success BIGINT NOT NULL DEFAULT 0, -- reports successfully aggregated + + CONSTRAINT task_aggregation_counters_unique_id UNIQUE(task_id, ord), + CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) ON DELETE CASCADE +) WITH (fillfactor = 50); + -- The HPKE public keys (aka configs) and private keys used by a given task. CREATE TABLE task_hpke_keys( id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only @@ -321,6 +337,7 @@ CREATE TABLE batch_aggregations( CONSTRAINT batch_aggregations_unique_task_id_batch_id_aggregation_param UNIQUE(task_id, batch_identifier, aggregation_param, ord), CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) ON DELETE CASCADE ); +CREATE INDEX batch_aggregations_gc_time ON batch_aggregations(task_id, UPPER(COALESCE(batch_interval, client_timestamp_interval))); -- Specifies the possible state of a collection job. CREATE TYPE COLLECTION_JOB_STATE AS ENUM( @@ -345,6 +362,8 @@ CREATE TABLE collection_jobs( helper_aggregate_share BYTEA, -- the helper's encrypted aggregate share (HpkeCiphertext, only if in state FINISHED) leader_aggregate_share BYTEA, -- the leader's unencrypted aggregate share (opaque VDAF message, only if in state FINISHED) + step_attempts BIGINT NOT NULL DEFAULT 0, -- the number of attempts to step the collection job without making progress, regardless of whether the lease was successfully released or not + lease_expiry TIMESTAMP NOT NULL DEFAULT TIMESTAMP '-infinity', -- when lease on this collection job expires; -infinity implies no current lease lease_token BYTEA, -- a value identifying the current leaseholder; NULL implies no current lease lease_attempts BIGINT NOT NULL DEFAULT 0, -- the number of lease acquiries since the last successful lease release @@ -382,6 +401,12 @@ CREATE TABLE aggregate_share_jobs( ); CREATE INDEX aggregate_share_jobs_interval_containment_index ON aggregate_share_jobs USING gist (task_id, batch_interval); +-- Specifies the possible state of an outstanding batch. +CREATE TYPE OUTSTANDING_BATCH_STATE AS ENUM( + 'FILLING', -- this outstanding batch is still being considered for additional reports + 'FILLED' -- this outstanding batch has received enough reports, no more are necessary +); + -- The leader's view of outstanding batches, which are batches which have not yet started -- collection. Used for fixed-size tasks only. CREATE TABLE outstanding_batches( @@ -390,6 +415,8 @@ CREATE TABLE outstanding_batches( batch_id BYTEA NOT NULL, -- 32-byte BatchID as defined by the DAP specification. time_bucket_start TIMESTAMP, + state OUTSTANDING_BATCH_STATE NOT NULL DEFAULT 'FILLING', -- the current state of this outstanding batch + -- creation/update records created_at TIMESTAMP NOT NULL, -- when the row was created updated_by TEXT NOT NULL, -- the name of the transaction that last updated the row @@ -397,4 +424,4 @@ CREATE TABLE outstanding_batches( CONSTRAINT outstanding_batches_unique_task_id_batch_id UNIQUE(task_id, batch_id), CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) ON DELETE CASCADE ); -CREATE INDEX outstanding_batches_task_and_time_bucket_index ON outstanding_batches (task_id, time_bucket_start); +CREATE INDEX outstanding_batches_task_id_and_time_bucket_start ON outstanding_batches(task_id, time_bucket_start) WHERE state = 'FILLING'; diff --git a/db/00000000000002_collection_job_step_attempts.down.sql b/db/00000000000002_collection_job_step_attempts.down.sql deleted file mode 100644 index ce87fd761..000000000 --- a/db/00000000000002_collection_job_step_attempts.down.sql +++ /dev/null @@ -1 +0,0 @@ -ALTER TABLE collection_jobs DROP COLUMN step_attempts; diff --git a/db/00000000000002_collection_job_step_attempts.up.sql b/db/00000000000002_collection_job_step_attempts.up.sql deleted file mode 100644 index d09a05816..000000000 --- a/db/00000000000002_collection_job_step_attempts.up.sql +++ /dev/null @@ -1,3 +0,0 @@ --- step_attempts is the number of attempts to step the collection job without making progress, --- regardless of whether the lease was successfully released or not. -ALTER TABLE collection_jobs ADD COLUMN step_attempts BIGINT NOT NULL DEFAULT 0; diff --git a/db/00000000000003_outstanding_batch_state.down.sql b/db/00000000000003_outstanding_batch_state.down.sql deleted file mode 100644 index 668d932f0..000000000 --- a/db/00000000000003_outstanding_batch_state.down.sql +++ /dev/null @@ -1,4 +0,0 @@ -CREATE INDEX outstanding_batches_task_and_time_bucket_index ON outstanding_batches(task_id, time_bucket_start); -DROP INDEX outstanding_batches_task_id_and_time_bucket_start; -ALTER TABLE outstanding_batches DROP COLUMN state; -DROP TYPE OUTSTANDING_BATCH_STATE; diff --git a/db/00000000000003_outstanding_batch_state.up.sql b/db/00000000000003_outstanding_batch_state.up.sql deleted file mode 100644 index 5c4152bd8..000000000 --- a/db/00000000000003_outstanding_batch_state.up.sql +++ /dev/null @@ -1,8 +0,0 @@ --- Specifies the possible state of an outstanding batch. -CREATE TYPE OUTSTANDING_BATCH_STATE AS ENUM( - 'FILLING', -- this outstanding batch is still being considered for additional reports - 'FILLED' -- this outstanding batch has received enough reports, no more are necessary -); -ALTER TABLE outstanding_batches ADD COLUMN state OUTSTANDING_BATCH_STATE NOT NULL DEFAULT 'FILLING'; -CREATE INDEX outstanding_batches_task_id_and_time_bucket_start ON outstanding_batches(task_id, time_bucket_start) WHERE state = 'FILLING'; -DROP INDEX outstanding_batches_task_and_time_bucket_index; diff --git a/db/00000000000004_batch_aggregation_gc.down.sql b/db/00000000000004_batch_aggregation_gc.down.sql deleted file mode 100644 index e629d38c7..000000000 --- a/db/00000000000004_batch_aggregation_gc.down.sql +++ /dev/null @@ -1 +0,0 @@ -DROP INDEX batch_aggregations_gc_time; diff --git a/db/00000000000004_batch_aggregation_gc.up.sql b/db/00000000000004_batch_aggregation_gc.up.sql deleted file mode 100644 index 6b20413e2..000000000 --- a/db/00000000000004_batch_aggregation_gc.up.sql +++ /dev/null @@ -1,2 +0,0 @@ --- Used to identify batch aggregations which can be garbage collected. -CREATE INDEX batch_aggregations_gc_time ON batch_aggregations(task_id, UPPER(COALESCE(batch_interval, client_timestamp_interval))); diff --git a/db/00000000000005_global_hpke_keys_last_state_change_at.down.sql b/db/00000000000005_global_hpke_keys_last_state_change_at.down.sql deleted file mode 100644 index 5e7ea3647..000000000 --- a/db/00000000000005_global_hpke_keys_last_state_change_at.down.sql +++ /dev/null @@ -1 +0,0 @@ -ALTER TABLE global_hpke_keys DROP COLUMN last_state_change_at; diff --git a/db/00000000000005_global_hpke_keys_last_state_change_at.up.sql b/db/00000000000005_global_hpke_keys_last_state_change_at.up.sql deleted file mode 100644 index 2c6342f33..000000000 --- a/db/00000000000005_global_hpke_keys_last_state_change_at.up.sql +++ /dev/null @@ -1,11 +0,0 @@ --- When the key state was last changed. Used for key rotation logic. -ALTER TABLE global_hpke_keys - ADD COLUMN last_state_change_at TIMESTAMP NOT NULL DEFAULT '-infinity'::TIMESTAMP; - --- Backfill new column using updated_at. Older Janus versions aren't aware of --- this column, so state change operations on this table won't update the new --- column. However, this is an infrequently used table that is only manually --- modified (at the time of writing), so the risk of corruption due to this is --- low. In the worst case, the key rotator service will induce a rotation of --- any keys with `-infinity`. -UPDATE global_hpke_keys SET last_state_change_at = updated_at; diff --git a/db/00000000000006_global_hpke_keys_last_state_change_at_drop_default.down.sql b/db/00000000000006_global_hpke_keys_last_state_change_at_drop_default.down.sql deleted file mode 100644 index bf4ff273f..000000000 --- a/db/00000000000006_global_hpke_keys_last_state_change_at_drop_default.down.sql +++ /dev/null @@ -1,3 +0,0 @@ -ALTER TABLE global_hpke_keys - ALTER COLUMN last_state_change_at SET DEFAULT '-infinity'::TIMESTAMP, - ALTER COLUMN last_state_change_at DROP NOT NULL; diff --git a/db/00000000000006_global_hpke_keys_last_state_change_at_drop_default.up.sql b/db/00000000000006_global_hpke_keys_last_state_change_at_drop_default.up.sql deleted file mode 100644 index 192da5a16..000000000 --- a/db/00000000000006_global_hpke_keys_last_state_change_at_drop_default.up.sql +++ /dev/null @@ -1,3 +0,0 @@ -ALTER TABLE global_hpke_keys - ALTER COLUMN last_state_change_at DROP DEFAULT, - ALTER COLUMN last_state_change_at SET NOT NULL; diff --git a/db/00000000000007_task_aggregation_counters.down.sql b/db/00000000000007_task_aggregation_counters.down.sql deleted file mode 100644 index fe38e502c..000000000 --- a/db/00000000000007_task_aggregation_counters.down.sql +++ /dev/null @@ -1 +0,0 @@ -DROP TABLE task_aggregation_counters; diff --git a/db/00000000000007_task_aggregation_counters.up.sql b/db/00000000000007_task_aggregation_counters.up.sql deleted file mode 100644 index cb38bda2f..000000000 --- a/db/00000000000007_task_aggregation_counters.up.sql +++ /dev/null @@ -1,14 +0,0 @@ --- Per-task report aggregation counters, used for metrics. --- --- Fillfactor is lowered to improve the likelihood of heap-only tuple optimizations. See the --- discussion around this setting for the task_upload_counters table. -CREATE TABLE task_aggregation_counters( - id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal only - task_id BIGINT NOT NULL, -- task ID the counter is associated with - ord BIGINT NOT NULL, -- the ordinal index of the task aggregation counter - - success BIGINT NOT NULL DEFAULT 0, -- reports successfully aggregated - - CONSTRAINT task_aggregation_counters_unique_id UNIQUE(task_id, ord), - CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) ON DELETE CASCADE -) WITH (fillfactor = 50);