From af2291671bec8b972a3872e95d7faaa02568d5af Mon Sep 17 00:00:00 2001 From: Brandon Pitman Date: Fri, 17 Nov 2023 14:47:58 -0800 Subject: [PATCH] [WIP] CockroachDB. --- Cargo.lock | 39 +- aggregator/src/aggregator.rs | 28 +- aggregator/src/aggregator/accumulator.rs | 10 +- .../aggregator/aggregation_job_continue.rs | 6 +- .../src/aggregator/aggregation_job_creator.rs | 14 +- .../src/aggregator/aggregation_job_driver.rs | 53 +-- .../src/aggregator/aggregation_job_writer.rs | 2 +- aggregator/src/aggregator/batch_creator.rs | 13 +- .../src/aggregator/collection_job_driver.rs | 8 +- .../src/aggregator/collection_job_tests.rs | 5 +- .../src/aggregator/garbage_collector.rs | 12 +- aggregator/src/aggregator/http_handlers.rs | 51 +- aggregator/src/aggregator/taskprov_tests.rs | 4 +- aggregator_api/src/tests.rs | 5 +- aggregator_core/src/datastore.rs | 449 +++++++++++------- aggregator_core/src/datastore/models.rs | 176 +------ aggregator_core/src/datastore/test_util.rs | 55 ++- aggregator_core/src/datastore/tests.rs | 400 +++++----------- db/00000000000001_initial_schema.up.sql | 31 +- 19 files changed, 551 insertions(+), 810 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cff797c0b..205451301 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1718,9 +1718,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24590385be94998c5def4cf53d34edc5381144c805126f00efb954d986f9a7b2" +checksum = "403f9214f3e703236b221f1a9cd88ec8b4adfa5296de01ab96216361f4692f56" dependencies = [ "bytes", "futures-channel", @@ -1899,7 +1899,7 @@ dependencies = [ "hex", "http 0.2.11", "http-api-problem", - "hyper 1.0.0", + "hyper 1.0.1", "itertools 0.11.0", "janus_aggregator", "janus_aggregator_api", @@ -1942,7 +1942,7 @@ dependencies = [ "tonic 0.9.2", "tracing", "tracing-chrome", - "tracing-log", + "tracing-log 0.2.0", "tracing-opentelemetry", "tracing-stackdriver", "tracing-subscriber", @@ -2011,7 +2011,7 @@ dependencies = [ "hex", "http 0.2.11", "http-api-problem", - "hyper 1.0.0", + "hyper 1.0.1", "janus_aggregator_core", "janus_core", "janus_messages 0.6.4", @@ -2039,7 +2039,7 @@ dependencies = [ "tokio", "tokio-postgres", "tracing", - "tracing-log", + "tracing-log 0.2.0", "trillium", "trillium-macros", "trillium-router", @@ -2066,7 +2066,7 @@ dependencies = [ "thiserror", "tokio", "tracing", - "tracing-log", + "tracing-log 0.2.0", "tracing-subscriber", "url", ] @@ -2133,7 +2133,7 @@ dependencies = [ "tokio", "tokio-stream", "tracing", - "tracing-log", + "tracing-log 0.2.0", "tracing-subscriber", "trillium", "url", @@ -2204,7 +2204,7 @@ dependencies = [ "testcontainers", "tokio", "tracing", - "tracing-log", + "tracing-log 0.2.0", "tracing-subscriber", "trillium", "trillium-api", @@ -2272,7 +2272,7 @@ dependencies = [ "tempfile", "tokio", "tracing", - "tracing-log", + "tracing-log 0.2.0", "tracing-subscriber", "trycmd", "url", @@ -4844,6 +4844,17 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-log" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f751112709b4e791d8ce53e32c4ed2d353565a795ce84da2285393f41557bdf2" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + [[package]] name = "tracing-log" version = "0.2.0" @@ -4868,7 +4879,7 @@ dependencies = [ "smallvec", "tracing", "tracing-core", - "tracing-log", + "tracing-log 0.2.0", "tracing-subscriber", "web-time", ] @@ -4900,9 +4911,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.18" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" dependencies = [ "matchers", "nu-ansi-term", @@ -4915,7 +4926,7 @@ dependencies = [ "thread_local", "tracing", "tracing-core", - "tracing-log", + "tracing-log 0.1.4", "tracing-serde", ] diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index 8449e25f6..aeca5c1ad 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -44,7 +44,7 @@ use janus_core::{ auth_tokens::AuthenticationToken, hpke::{self, HpkeApplicationInfo, HpkeKeypair, Label}, http::HttpErrorResponse, - time::{Clock, DurationExt, IntervalExt, TimeExt}, + time::{Clock, IntervalExt, TimeExt}, vdaf::{VdafInstance, VERIFY_KEY_LENGTH}, }; use janus_messages::{ @@ -52,10 +52,9 @@ use janus_messages::{ taskprov::TaskConfig, AggregateShare, AggregateShareAad, AggregateShareReq, AggregationJobContinueReq, AggregationJobId, AggregationJobInitializeReq, AggregationJobResp, AggregationJobStep, - BatchSelector, Collection, CollectionJobId, CollectionReq, Duration, HpkeConfig, - HpkeConfigList, InputShareAad, Interval, PartialBatchSelector, PlaintextInputShare, - PrepareError, PrepareResp, PrepareStepResult, Report, ReportIdChecksum, ReportShare, Role, - TaskId, + BatchSelector, Collection, CollectionJobId, CollectionReq, HpkeConfig, HpkeConfigList, + InputShareAad, Interval, PartialBatchSelector, PlaintextInputShare, PrepareError, PrepareResp, + PrepareStepResult, Report, ReportIdChecksum, ReportShare, Role, TaskId, }; use opentelemetry::{ metrics::{Counter, Histogram, Meter, Unit}, @@ -1893,31 +1892,19 @@ impl VdafOps { // Store data to datastore. let req = Arc::new(req); - let min_client_timestamp = req - .prepare_inits() - .iter() - .map(|prepare_init| *prepare_init.report_share().metadata().time()) - .min() - .ok_or_else(|| Error::EmptyAggregation(*task.id()))?; let max_client_timestamp = req .prepare_inits() .iter() .map(|prepare_init| *prepare_init.report_share().metadata().time()) .max() .ok_or_else(|| Error::EmptyAggregation(*task.id()))?; - let client_timestamp_interval = Interval::new( - min_client_timestamp, - max_client_timestamp - .difference(&min_client_timestamp)? - .add(&Duration::from_seconds(1))?, - )?; let aggregation_job = Arc::new( AggregationJob::::new( *task.id(), *aggregation_job_id, agg_param, req.batch_selector().batch_identifier().clone(), - client_timestamp_interval, + max_client_timestamp, if saw_continue { AggregationJobState::InProgress } else { @@ -2599,8 +2586,8 @@ impl VdafOps { Q::acknowledge_collection(tx, task.id(), collection_job.batch_identifier()), )?; - // Merge the intervals spanned by the constituent batch aggregations into the - // interval spanned by the collection. + // Merge the intervals spanned by the constituent batches into the interval + // spanned by the collection. let mut spanned_interval: Option = None; for interval in batches .iter() @@ -3063,7 +3050,6 @@ fn empty_batch_aggregations< BatchAggregationState::Collected, None, 0, - Interval::EMPTY, ReportIdChecksum::default(), )) } else { diff --git a/aggregator/src/aggregator/accumulator.rs b/aggregator/src/aggregator/accumulator.rs index 2cc32998c..309dbe97c 100644 --- a/aggregator/src/aggregator/accumulator.rs +++ b/aggregator/src/aggregator/accumulator.rs @@ -11,11 +11,8 @@ use janus_aggregator_core::{ query_type::AccumulableQueryType, task::AggregatorTask, }; -use janus_core::{ - report_id::ReportIdChecksumExt, - time::{Clock, IntervalExt}, -}; -use janus_messages::{Interval, ReportId, ReportIdChecksum, Time}; +use janus_core::{report_id::ReportIdChecksumExt, time::Clock}; +use janus_messages::{ReportId, ReportIdChecksum, Time}; use prio::vdaf; use rand::{thread_rng, Rng}; use std::{ @@ -82,8 +79,6 @@ impl Result<(), datastore::Error> { let batch_identifier = Q::to_batch_identifier(&self.task, partial_batch_identifier, client_timestamp)?; - let client_timestamp_interval = - Interval::from_time(client_timestamp).map_err(|e| datastore::Error::User(e.into()))?; let batch_aggregation_fn = || { BatchAggregation::new( *self.task.id(), @@ -93,7 +88,6 @@ impl AggregationJobCreator { "Creating aggregation job" ); - let min_client_timestamp = - agg_job_reports.iter().map(|(_, time)| time).min().unwrap(); // unwrap safety: agg_job_reports is non-empty let max_client_timestamp = agg_job_reports.iter().map(|(_, time)| time).max().unwrap(); // unwrap safety: agg_job_reports is non-empty - let client_timestamp_interval = Interval::new( - *min_client_timestamp, - max_client_timestamp - .difference(min_client_timestamp)? - .add(&DurationMsg::from_seconds(1))?, - )?; let aggregation_job = AggregationJob::::new( *task.id(), aggregation_job_id, (), (), - client_timestamp_interval, + *max_client_timestamp, AggregationJobState::InProgress, AggregationJobStep::from(0), ); diff --git a/aggregator/src/aggregator/aggregation_job_driver.rs b/aggregator/src/aggregator/aggregation_job_driver.rs index d6a4a58d9..f5a103b4c 100644 --- a/aggregator/src/aggregator/aggregation_job_driver.rs +++ b/aggregator/src/aggregator/aggregation_job_driver.rs @@ -1059,8 +1059,7 @@ mod tests { aggregation_job_id, aggregation_param.clone(), (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::InProgress, AggregationJobStep::from(0), )) @@ -1207,8 +1206,7 @@ mod tests { aggregation_job_id, aggregation_param.clone(), (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::Finished, AggregationJobStep::from(2), ); @@ -1363,8 +1361,7 @@ mod tests { aggregation_job_id, (), (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::InProgress, AggregationJobStep::from(0), )) @@ -1513,8 +1510,7 @@ mod tests { aggregation_job_id, (), (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::Finished, AggregationJobStep::from(1), ); @@ -1710,8 +1706,7 @@ mod tests { aggregation_job_id, aggregation_param.clone(), (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::InProgress, AggregationJobStep::from(0), )) @@ -1818,8 +1813,7 @@ mod tests { aggregation_job_id, aggregation_param.clone(), (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::InProgress, AggregationJobStep::from(1), ); @@ -1958,8 +1952,7 @@ mod tests { aggregation_job_id, (), batch_id, - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::InProgress, AggregationJobStep::from(0), )) @@ -2084,7 +2077,7 @@ mod tests { aggregation_job_id, (), batch_id, - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::Finished, AggregationJobStep::from(1), ); @@ -2219,8 +2212,7 @@ mod tests { aggregation_job_id, aggregation_param.clone(), batch_id, - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::InProgress, AggregationJobStep::from(0), )) @@ -2327,8 +2319,7 @@ mod tests { aggregation_job_id, aggregation_param.clone(), batch_id, - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::InProgress, AggregationJobStep::from(1), ); @@ -2488,8 +2479,7 @@ mod tests { aggregation_job_id, aggregation_param.clone(), (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::InProgress, AggregationJobStep::from(1), )) @@ -2644,8 +2634,7 @@ mod tests { aggregation_job_id, aggregation_param.clone(), (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::Finished, AggregationJobStep::from(2), ); @@ -2677,7 +2666,6 @@ mod tests { BatchAggregationState::Aggregating, Some(leader_aggregate_share), 1, - Interval::from_time(report.metadata().time()).unwrap(), ReportIdChecksum::for_report_id(report.metadata().id()), )]); let want_active_batch = @@ -2793,7 +2781,6 @@ mod tests { *agg.state(), agg.aggregate_share().cloned(), agg.report_count(), - *agg.client_timestamp_interval(), *agg.checksum(), ) }) @@ -2889,8 +2876,7 @@ mod tests { aggregation_job_id, aggregation_param.clone(), batch_id, - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::InProgress, AggregationJobStep::from(1), )) @@ -3032,8 +3018,7 @@ mod tests { aggregation_job_id, aggregation_param.clone(), batch_id, - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::Finished, AggregationJobStep::from(2), ); @@ -3059,7 +3044,6 @@ mod tests { BatchAggregationState::Aggregating, Some(leader_aggregate_share), 1, - Interval::from_time(report.metadata().time()).unwrap(), ReportIdChecksum::for_report_id(report.metadata().id()), )]); let want_batch = Batch::>::new( @@ -3145,7 +3129,6 @@ mod tests { *agg.state(), agg.aggregate_share().cloned(), agg.report_count(), - *agg.client_timestamp_interval(), *agg.checksum(), ) }) @@ -3204,7 +3187,7 @@ mod tests { aggregation_job_id, (), (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::InProgress, AggregationJobStep::from(0), ); @@ -3415,8 +3398,7 @@ mod tests { aggregation_job_id, (), (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::InProgress, AggregationJobStep::from(0), )) @@ -3560,8 +3542,7 @@ mod tests { aggregation_job_id, (), (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::Abandoned, AggregationJobStep::from(0), ), diff --git a/aggregator/src/aggregator/aggregation_job_writer.rs b/aggregator/src/aggregator/aggregation_job_writer.rs index 5f9def79b..b6d72d41d 100644 --- a/aggregator/src/aggregator/aggregation_job_writer.rs +++ b/aggregator/src/aggregator/aggregation_job_writer.rs @@ -13,7 +13,7 @@ use janus_aggregator_core::{ }, task::AggregatorTask, }; -use janus_core::time::{Clock, IntervalExt}; +use janus_core::time::{Clock, IntervalExt as _}; use janus_messages::{AggregationJobId, Interval, PrepareError, ReportId}; use prio::{codec::Encode, vdaf}; use std::{ diff --git a/aggregator/src/aggregator/batch_creator.rs b/aggregator/src/aggregator/batch_creator.rs index 092a5bbe0..613f99bff 100644 --- a/aggregator/src/aggregator/batch_creator.rs +++ b/aggregator/src/aggregator/batch_creator.rs @@ -8,9 +8,9 @@ use janus_aggregator_core::datastore::{ }, Error, Transaction, }; -use janus_core::time::{Clock, DurationExt, TimeExt}; +use janus_core::time::{Clock, TimeExt}; use janus_messages::{ - query_type::FixedSize, AggregationJobStep, BatchId, Duration, Interval, ReportId, TaskId, Time, + query_type::FixedSize, AggregationJobStep, BatchId, Duration, ReportId, TaskId, Time, }; use prio::{codec::Encode, vdaf::Aggregator}; use rand::random; @@ -303,20 +303,13 @@ where }) .collect(); - let min_client_timestamp = min_client_timestamp.unwrap(); // unwrap safety: aggregation_job_size > 0 let max_client_timestamp = max_client_timestamp.unwrap(); // unwrap safety: aggregation_job_size > 0 - let client_timestamp_interval = Interval::new( - min_client_timestamp, - max_client_timestamp - .difference(&min_client_timestamp)? - .add(&Duration::from_seconds(1))?, - )?; let aggregation_job = AggregationJob::::new( task_id, aggregation_job_id, (), batch_id, - client_timestamp_interval, + max_client_timestamp, AggregationJobState::InProgress, AggregationJobStep::from(0), ); diff --git a/aggregator/src/aggregator/collection_job_driver.rs b/aggregator/src/aggregator/collection_job_driver.rs index 719affb6b..045b5bd13 100644 --- a/aggregator/src/aggregator/collection_job_driver.rs +++ b/aggregator/src/aggregator/collection_job_driver.rs @@ -633,7 +633,7 @@ mod tests { aggregation_job_id, aggregation_param, (), - Interval::from_time(&report_timestamp).unwrap(), + report_timestamp, AggregationJobState::Finished, AggregationJobStep::from(1), ), @@ -682,7 +682,6 @@ mod tests { BatchAggregationState::Aggregating, Some(dummy_vdaf::AggregateShare(0)), 5, - Interval::from_time(&report_timestamp).unwrap(), ReportIdChecksum::get_decoded(&[3; 32]).unwrap(), ), ) @@ -700,7 +699,6 @@ mod tests { BatchAggregationState::Aggregating, Some(dummy_vdaf::AggregateShare(0)), 5, - Interval::from_time(&report_timestamp).unwrap(), ReportIdChecksum::get_decoded(&[2; 32]).unwrap(), ), ) @@ -793,7 +791,7 @@ mod tests { aggregation_job_id, aggregation_param, (), - Interval::from_time(&report_timestamp).unwrap(), + report_timestamp, AggregationJobState::Finished, AggregationJobStep::from(1), ), @@ -858,7 +856,6 @@ mod tests { BatchAggregationState::Aggregating, Some(dummy_vdaf::AggregateShare(0)), 5, - Interval::from_time(&report_timestamp).unwrap(), ReportIdChecksum::get_decoded(&[3; 32]).unwrap(), ), ) @@ -878,7 +875,6 @@ mod tests { BatchAggregationState::Aggregating, Some(dummy_vdaf::AggregateShare(0)), 5, - Interval::from_time(&report_timestamp).unwrap(), ReportIdChecksum::get_decoded(&[2; 32]).unwrap(), ), ) diff --git a/aggregator/src/aggregator/collection_job_tests.rs b/aggregator/src/aggregator/collection_job_tests.rs index befc165a0..375d1201e 100644 --- a/aggregator/src/aggregator/collection_job_tests.rs +++ b/aggregator/src/aggregator/collection_job_tests.rs @@ -29,7 +29,7 @@ use janus_core::{ dummy_vdaf::{self, AggregationParam}, install_test_trace_subscriber, }, - time::{Clock, IntervalExt, MockClock}, + time::{Clock, IntervalExt, MockClock, TimeExt as _}, vdaf::VdafInstance, }; use janus_messages::{ @@ -195,7 +195,7 @@ async fn setup_fixed_size_current_batch_collection_job_test_case( aggregation_job_id, AggregationParam::default(), batch_id, - interval, + interval.end().sub(&Duration::from_seconds(1)).unwrap(), AggregationJobState::Finished, AggregationJobStep::from(1), )) @@ -230,7 +230,6 @@ async fn setup_fixed_size_current_batch_collection_job_test_case( BatchAggregationState::Aggregating, Some(dummy_vdaf::AggregateShare(0)), task.min_batch_size() + 1, - interval, ReportIdChecksum::default(), ), ) diff --git a/aggregator/src/aggregator/garbage_collector.rs b/aggregator/src/aggregator/garbage_collector.rs index e07da97f8..be0f6b82c 100644 --- a/aggregator/src/aggregator/garbage_collector.rs +++ b/aggregator/src/aggregator/garbage_collector.rs @@ -187,7 +187,7 @@ mod tests { aggregation_job_id, AggregationParam(0), (), - Interval::from_time(&client_timestamp).unwrap(), + client_timestamp, AggregationJobState::InProgress, AggregationJobStep::from(0), ), @@ -228,7 +228,6 @@ mod tests { BatchAggregationState::Collected, Some(AggregateShare(11)), 1, - Interval::from_time(&client_timestamp).unwrap(), random(), ), ) @@ -376,7 +375,7 @@ mod tests { aggregation_job_id, AggregationParam(0), (), - Interval::from_time(&client_timestamp).unwrap(), + client_timestamp, AggregationJobState::InProgress, AggregationJobStep::from(0), ), @@ -417,7 +416,6 @@ mod tests { BatchAggregationState::Collected, Some(AggregateShare(11)), 1, - Interval::from_time(&client_timestamp).unwrap(), random(), ), ) @@ -559,7 +557,7 @@ mod tests { random(), AggregationParam(0), batch_id, - Interval::from_time(&client_timestamp).unwrap(), + client_timestamp, AggregationJobState::InProgress, AggregationJobStep::from(0), ); @@ -603,7 +601,6 @@ mod tests { BatchAggregationState::Collected, Some(AggregateShare(11)), 1, - Interval::from_time(&client_timestamp).unwrap(), random(), ), ) @@ -754,7 +751,7 @@ mod tests { random(), AggregationParam(0), batch_id, - Interval::from_time(&client_timestamp).unwrap(), + client_timestamp, AggregationJobState::InProgress, AggregationJobStep::from(0), ); @@ -798,7 +795,6 @@ mod tests { BatchAggregationState::Collected, Some(AggregateShare(11)), 1, - Interval::from_time(&client_timestamp).unwrap(), random(), ), ) diff --git a/aggregator/src/aggregator/http_handlers.rs b/aggregator/src/aggregator/http_handlers.rs index b42044bdd..e61c6f31f 100644 --- a/aggregator/src/aggregator/http_handlers.rs +++ b/aggregator/src/aggregator/http_handlers.rs @@ -690,7 +690,7 @@ mod tests { }, report_id::ReportIdChecksumExt, test_util::{dummy_vdaf, install_test_trace_subscriber, run_vdaf}, - time::{Clock, DurationExt, IntervalExt, MockClock, TimeExt}, + time::{Clock, DurationExt, MockClock, TimeExt}, vdaf::{VdafInstance, VERIFY_KEY_LENGTH}, }; use janus_messages::{ @@ -1667,8 +1667,7 @@ mod tests { random(), dummy_vdaf::AggregationParam(0), (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::InProgress, AggregationJobStep::from(0), ); @@ -1698,8 +1697,7 @@ mod tests { random(), dummy_vdaf::AggregationParam(1), (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::InProgress, AggregationJobStep::from(0), ); @@ -2483,8 +2481,7 @@ mod tests { aggregation_job_id, aggregation_param.clone(), (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::InProgress, AggregationJobStep::from(0), )) @@ -2606,8 +2603,7 @@ mod tests { aggregation_job_id, aggregation_param, (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::Finished, AggregationJobStep::from(1), ) @@ -2820,8 +2816,7 @@ mod tests { aggregation_job_id_0, aggregation_param.clone(), (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::InProgress, AggregationJobStep::from(0), )) @@ -2954,7 +2949,6 @@ mod tests { BatchAggregationState::Aggregating, agg.aggregate_share().cloned(), agg.report_count(), - *agg.client_timestamp_interval(), *agg.checksum(), ) }) @@ -2989,7 +2983,6 @@ mod tests { BatchAggregationState::Aggregating, Some(aggregate_share), 2, - Interval::from_time(report_metadata_0.time()).unwrap(), checksum, ),]) ); @@ -3147,8 +3140,7 @@ mod tests { aggregation_job_id_1, aggregation_param, (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::InProgress, AggregationJobStep::from(0), )) @@ -3257,7 +3249,6 @@ mod tests { BatchAggregationState::Aggregating, agg.aggregate_share().cloned(), agg.report_count(), - *agg.client_timestamp_interval(), *agg.checksum(), ) }) @@ -3297,7 +3288,6 @@ mod tests { BatchAggregationState::Aggregating, Some(first_aggregate_share), 3, - Interval::from_time(report_metadata_0.time()).unwrap(), first_checksum, ), ); @@ -3400,8 +3390,7 @@ mod tests { aggregation_job_id, aggregation_param, (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::InProgress, AggregationJobStep::from(0), )) @@ -3506,8 +3495,7 @@ mod tests { aggregation_job_id, aggregation_param, (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::InProgress, AggregationJobStep::from(0), )) @@ -3594,8 +3582,7 @@ mod tests { aggregation_job_id, aggregation_param, (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::Finished, AggregationJobStep::from(1), ) @@ -3675,8 +3662,7 @@ mod tests { aggregation_job_id, aggregation_param, (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::InProgress, AggregationJobStep::from(0), )) @@ -3820,8 +3806,7 @@ mod tests { aggregation_job_id, aggregation_param.clone(), (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::InProgress, AggregationJobStep::from(0), )) @@ -3935,11 +3920,7 @@ mod tests { aggregation_job_id, dummy_vdaf::AggregationParam(0), (), - Interval::new( - Time::from_seconds_since_epoch(0), - Duration::from_seconds(1), - ) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::InProgress, AggregationJobStep::from(0), ), @@ -4510,7 +4491,6 @@ mod tests { BatchAggregationState::Aggregating, Some(dummy_vdaf::AggregateShare(0)), 10, - interval, ReportIdChecksum::get_decoded(&[2; 32]).unwrap(), ), ) @@ -4574,7 +4554,6 @@ mod tests { BatchAggregationState::Aggregating, Some(dummy_vdaf::AggregateShare(0)), 10, - interval, ReportIdChecksum::get_decoded(&[2; 32]).unwrap(), ), ) @@ -4881,7 +4860,6 @@ mod tests { BatchAggregationState::Aggregating, Some(dummy_vdaf::AggregateShare(64)), 5, - interval_1, ReportIdChecksum::get_decoded(&[3; 32]).unwrap(), )) .await @@ -4914,7 +4892,6 @@ mod tests { BatchAggregationState::Aggregating, Some(dummy_vdaf::AggregateShare(128)), 5, - interval_2, ReportIdChecksum::get_decoded(&[2; 32]).unwrap(), )) .await @@ -4947,7 +4924,6 @@ mod tests { BatchAggregationState::Aggregating, Some(dummy_vdaf::AggregateShare(256)), 5, - interval_3, ReportIdChecksum::get_decoded(&[4; 32]).unwrap(), )) .await @@ -4980,7 +4956,6 @@ mod tests { BatchAggregationState::Aggregating, Some(dummy_vdaf::AggregateShare(512)), 5, - interval_4, ReportIdChecksum::get_decoded(&[8; 32]).unwrap(), )) .await diff --git a/aggregator/src/aggregator/taskprov_tests.rs b/aggregator/src/aggregator/taskprov_tests.rs index 60d4c585d..aa70eb137 100644 --- a/aggregator/src/aggregator/taskprov_tests.rs +++ b/aggregator/src/aggregator/taskprov_tests.rs @@ -779,8 +779,7 @@ async fn taskprov_aggregate_continue() { aggregation_job_id, aggregation_param.clone(), batch_id, - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::InProgress, AggregationJobStep::from(0), ), @@ -933,7 +932,6 @@ async fn taskprov_aggregate_share() { BatchAggregationState::Aggregating, Some(transcript.helper_aggregate_share), 1, - interval, ReportIdChecksum::get_decoded(&[3; 32]).unwrap(), )) .await diff --git a/aggregator_api/src/tests.rs b/aggregator_api/src/tests.rs index 3fc70f1f3..0b9246e11 100644 --- a/aggregator_api/src/tests.rs +++ b/aggregator_api/src/tests.rs @@ -42,7 +42,7 @@ use janus_core::{ }; use janus_messages::{ query_type::TimeInterval, AggregationJobStep, Duration, HpkeAeadId, HpkeConfig, HpkeConfigId, - HpkeKdfId, HpkeKemId, HpkePublicKey, Interval, Role, TaskId, Time, + HpkeKdfId, HpkeKemId, HpkePublicKey, Role, TaskId, Time, }; use rand::{distributions::Standard, random, thread_rng, Rng}; use serde_test::{assert_ser_tokens, assert_tokens, Token}; @@ -791,8 +791,7 @@ async fn get_task_metrics() { aggregation_job_id, AggregationParam(0), (), - Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) - .unwrap(), + Time::from_seconds_since_epoch(0), AggregationJobState::InProgress, AggregationJobStep::from(0), )) diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index 443812b77..da5807785 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -5,7 +5,7 @@ use self::models::{ AggregatorRole, AuthenticationTokenType, Batch, BatchAggregation, CollectionJob, CollectionJobState, CollectionJobStateCode, GlobalHpkeKeypair, HpkeKeyState, LeaderStoredReport, Lease, LeaseToken, OutstandingBatch, ReportAggregation, - ReportAggregationState, ReportAggregationStateCode, SqlInterval, + ReportAggregationState, ReportAggregationStateCode, }; use crate::{ query_type::{AccumulableQueryType, CollectableQueryType}, @@ -18,7 +18,7 @@ use futures::future::try_join_all; use janus_core::{ auth_tokens::AuthenticationToken, hpke::{HpkeKeypair, HpkePrivateKey}, - time::{Clock, TimeExt}, + time::{Clock, DurationExt as _, IntervalExt, TimeExt}, vdaf::VdafInstance, }; use janus_messages::{ @@ -55,7 +55,7 @@ use std::{ time::{Duration as StdDuration, Instant}, }; use tokio::{sync::Barrier, try_join}; -use tokio_postgres::{error::SqlState, row::RowIndex, IsolationLevel, Row, Statement, ToStatement}; +use tokio_postgres::{error::SqlState, row::RowIndex, Row, Statement, ToStatement}; use tracing::error; use url::Url; @@ -245,7 +245,7 @@ impl Datastore { }; let raw_tx = match client .build_transaction() - .isolation_level(IsolationLevel::RepeatableRead) + // .isolation_level(IsolationLevel::RepeatableRead) // XXX: consider going to ReadCommitted based on performance testing, once we're on v23.2 where it is supported .start() .await { @@ -517,18 +517,22 @@ impl Transaction<'_, C> { /// Returns the current schema version of the datastore and the description of the migration /// script that applied it. async fn get_current_schema_migration_version(&self) -> Result<(i64, String), Error> { - let stmt = self - .prepare_cached( - "SELECT version, description FROM _sqlx_migrations - WHERE success = TRUE ORDER BY version DESC LIMIT(1)", - ) - .await?; - let row = self.query_one(&stmt, &[]).await?; - - let version = row.try_get("version")?; - let description = row.try_get("description")?; - - Ok((version, description)) + // HACK: SQLx is not used, _sqlx_migrations does not exist. + // let stmt = self + // .prepare_cached( + // "SELECT version, description FROM _sqlx_migrations + // WHERE success = TRUE ORDER BY version DESC LIMIT(1)", + // ) + // .await?; + // let row = self.query_one(&stmt, &[]).await?; + + // let version = row.try_get("version")?; + // let description = row.try_get("description")?; + + // Ok((version, description)) + + Ok((1, "initial schema".to_owned())) + // END HACK } /// Writes a task into the datastore. @@ -933,7 +937,7 @@ impl Transaction<'_, C> { JOIN tasks ON tasks.id = aggregation_jobs.task_id RIGHT JOIN report_aggregations ON report_aggregations.aggregation_job_id = aggregation_jobs.id AND report_aggregations.task_id = tasks.id WHERE tasks.task_id = $1 - AND UPPER(aggregation_jobs.client_timestamp_interval) >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)) AS report_aggregation_count", + AND aggregation_jobs.max_client_timestamp >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)) AS report_aggregation_count", ) .await?; let row = self @@ -1162,6 +1166,7 @@ impl Transaction<'_, C> { &self, task_id: &TaskId, ) -> Result, Error> { + // XXX: go back to using FOR UPDATE SKIP LOCKED once it works properly(?) // TODO(#269): allow the number of returned results to be controlled? let stmt = self .prepare_cached( @@ -1172,7 +1177,7 @@ impl Transaction<'_, C> { AND client_reports.aggregation_started = FALSE AND client_reports.client_timestamp >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP) ORDER BY client_timestamp DESC - FOR UPDATE OF client_reports SKIP LOCKED + -- FOR UPDATE OF client_reports SKIP LOCKED LIMIT 5000 ) UPDATE client_reports SET @@ -1282,15 +1287,18 @@ impl Transaction<'_, C> { task_id: &TaskId, batch_interval: &Interval, ) -> Result { + let (min_timestamp, max_timestamp) = (batch_interval.start(), batch_interval.end()); + let stmt = self .prepare_cached( "SELECT EXISTS( SELECT 1 FROM client_reports JOIN tasks ON tasks.id = client_reports.task_id WHERE tasks.task_id = $1 - AND client_reports.client_timestamp <@ $2::TSRANGE - AND client_reports.client_timestamp >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP) - AND client_reports.aggregation_started = FALSE + AND client_reports.client_timestamp >= $2 + AND client_reports.client_timestamp < $3 + AND client_reports.client_timestamp >= COALESCE($4::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP) + AND client_reports.aggregation_started = FALSE ) AS unaggregated_report_exists", ) .await?; @@ -1299,7 +1307,8 @@ impl Transaction<'_, C> { &stmt, &[ /* task_id */ task_id.as_ref(), - /* batch_interval */ &SqlInterval::from(batch_interval), + /* min_timestamp */ &min_timestamp.as_naive_date_time()?, + /* max_timestamp */ &max_timestamp.as_naive_date_time()?, /* now */ &self.clock.now().as_naive_date_time()?, ], ) @@ -1316,15 +1325,17 @@ impl Transaction<'_, C> { task_id: &TaskId, batch_interval: &Interval, ) -> Result { + let (min_timestamp, max_timestamp) = (batch_interval.start(), batch_interval.end()); + let stmt = self .prepare_cached( "SELECT COUNT(1) AS count FROM client_reports JOIN tasks ON tasks.id = client_reports.task_id WHERE tasks.task_id = $1 - AND client_reports.client_timestamp >= lower($2::TSRANGE) - AND client_reports.client_timestamp < upper($2::TSRANGE) - AND client_reports.client_timestamp >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND client_reports.client_timestamp >= $2 + AND client_reports.client_timestamp < $3 + AND client_reports.client_timestamp >= COALESCE($4::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; let row = self @@ -1332,7 +1343,8 @@ impl Transaction<'_, C> { &stmt, &[ /* task_id */ task_id.as_ref(), - /* batch_interval */ &SqlInterval::from(batch_interval), + /* min_timestamp */ &min_timestamp.as_naive_date_time()?, + /* max_timestamp */ &max_timestamp.as_naive_date_time()?, /* now */ &self.clock.now().as_naive_date_time()?, ], ) @@ -1359,7 +1371,7 @@ impl Transaction<'_, C> { JOIN tasks ON tasks.id = aggregation_jobs.task_id AND tasks.id = report_aggregations.task_id WHERE tasks.task_id = $1 AND aggregation_jobs.batch_id = $2 - AND UPPER(aggregation_jobs.client_timestamp_interval) >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND aggregation_jobs.max_client_timestamp >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; let row = self @@ -1585,13 +1597,13 @@ impl Transaction<'_, C> { let stmt = self .prepare_cached( "SELECT - aggregation_param, batch_id, client_timestamp_interval, state, step, + aggregation_param, batch_id, max_client_timestamp, state, step, last_request_hash FROM aggregation_jobs JOIN tasks ON tasks.id = aggregation_jobs.task_id WHERE tasks.task_id = $1 AND aggregation_jobs.aggregation_job_id = $2 - AND UPPER(aggregation_jobs.client_timestamp_interval) >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND aggregation_jobs.max_client_timestamp >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; self.query_opt( @@ -1622,12 +1634,12 @@ impl Transaction<'_, C> { let stmt = self .prepare_cached( "SELECT - aggregation_job_id, aggregation_param, batch_id, client_timestamp_interval, + aggregation_job_id, aggregation_param, batch_id, max_client_timestamp, state, step, last_request_hash FROM aggregation_jobs JOIN tasks ON tasks.id = aggregation_jobs.task_id WHERE tasks.task_id = $1 - AND UPPER(aggregation_jobs.client_timestamp_interval) >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND aggregation_jobs.max_client_timestamp >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; self.query( @@ -1663,10 +1675,9 @@ impl Transaction<'_, C> { *aggregation_job_id, A::AggregationParam::get_decoded(row.get("aggregation_param"))?, Q::PartialBatchIdentifier::get_decoded(row.get::<_, &[u8]>("batch_id"))?, - row.get::<_, SqlInterval>("client_timestamp_interval") - .as_interval(), + Time::from_naive_date_time(&row.get("max_client_timestamp")), row.get("state"), - row.get_postgres_integer_and_convert::("step")?, + u16::try_from(row.get::<_, i64>("step"))?.into(), ); if let Some(hash) = row.try_get::<_, Option>>("last_request_hash")? { @@ -1700,6 +1711,11 @@ impl Transaction<'_, C> { // We generate the token on the DB to allow each acquired job to receive its own distinct // token. This is not strictly necessary as we only care about token collisions on a // per-row basis. + + // XXX: go back to using FOR UPDATE SKIP LOCKED once it works properly(?) + // XXX: once we are on CockroachDB v23.2, go back to using gen_random_bytes. + let lease_token = random::<[u8; 16]>(); + let stmt = self .prepare_cached( "WITH incomplete_jobs AS ( @@ -1707,16 +1723,17 @@ impl Transaction<'_, C> { JOIN tasks ON tasks.id = aggregation_jobs.task_id WHERE tasks.aggregator_role = 'LEADER' AND aggregation_jobs.state = 'IN_PROGRESS' - AND aggregation_jobs.lease_expiry <= $2 - AND UPPER(aggregation_jobs.client_timestamp_interval) >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP) - FOR UPDATE OF aggregation_jobs SKIP LOCKED LIMIT $3 + AND aggregation_jobs.lease_expiry <= $3 + AND aggregation_jobs.max_client_timestamp >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP) + -- FOR UPDATE OF aggregation_jobs SKIP LOCKED + LIMIT $4 ) UPDATE aggregation_jobs SET lease_expiry = $1, - lease_token = gen_random_bytes(16), + lease_token = $2, lease_attempts = lease_attempts + 1, - updated_at = $4, - updated_by = $5 + updated_at = $5, + updated_by = $6 FROM tasks WHERE tasks.id = aggregation_jobs.task_id AND aggregation_jobs.id IN (SELECT id FROM incomplete_jobs) @@ -1729,6 +1746,7 @@ impl Transaction<'_, C> { &stmt, &[ /* lease_expiry */ &lease_expiry_time, + /* lease_token */ &lease_token, /* now */ &now, /* limit */ &maximum_acquire_count, /* updated_at */ &self.clock.now().as_naive_date_time()?, @@ -1776,7 +1794,7 @@ impl Transaction<'_, C> { AND aggregation_jobs.aggregation_job_id = $4 AND aggregation_jobs.lease_expiry = $5 AND aggregation_jobs.lease_token = $6 - AND UPPER(aggregation_jobs.client_timestamp_interval) >= COALESCE($7::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND aggregation_jobs.max_client_timestamp >= COALESCE($7::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; check_single_row_mutation( @@ -1811,14 +1829,14 @@ impl Transaction<'_, C> { .prepare_cached( "INSERT INTO aggregation_jobs (task_id, aggregation_job_id, aggregation_param, batch_id, - client_timestamp_interval, state, step, last_request_hash, + max_client_timestamp, state, step, last_request_hash, created_at, updated_at, updated_by) VALUES ( (SELECT id FROM tasks WHERE task_id = $1), $2, $3, $4, $5, $6, $7, $8, $9, $10, $11 ) ON CONFLICT DO NOTHING - RETURNING COALESCE(UPPER(client_timestamp_interval) < COALESCE($12::TIMESTAMP - (SELECT report_expiry_age FROM tasks WHERE task_id = $1) * '1 second'::INTERVAL, '-infinity'::TIMESTAMP), FALSE) AS is_expired", + RETURNING COALESCE(max_client_timestamp < COALESCE($12::TIMESTAMP - (SELECT report_expiry_age FROM tasks WHERE task_id = $1) * '1 second'::INTERVAL, '-infinity'::TIMESTAMP), FALSE) AS is_expired", ) .await?; let rows = self @@ -1831,10 +1849,12 @@ impl Transaction<'_, C> { &aggregation_job.aggregation_parameter().get_encoded(), /* batch_id */ &aggregation_job.partial_batch_identifier().get_encoded(), - /* client_timestamp_interval */ - &SqlInterval::from(aggregation_job.client_timestamp_interval()), + /* max_client_timestamp */ + &aggregation_job + .max_client_timestamp() + .as_naive_date_time()?, /* state */ &aggregation_job.state(), - /* step */ &(u16::from(aggregation_job.step()) as i32), + /* step */ &(u16::from(aggregation_job.step()) as i64), /* last_request_hash */ &aggregation_job.last_request_hash(), /* created_at */ &self.clock.now().as_naive_date_time()?, @@ -1895,7 +1915,7 @@ impl Transaction<'_, C> { FROM tasks WHERE tasks.task_id = $6 AND aggregation_jobs.aggregation_job_id = $7 - AND UPPER(aggregation_jobs.client_timestamp_interval) >= COALESCE($8::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND aggregation_jobs.max_client_timestamp >= COALESCE($8::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; check_single_row_mutation( @@ -1903,7 +1923,7 @@ impl Transaction<'_, C> { &stmt, &[ /* state */ &aggregation_job.state(), - /* step */ &(u16::from(aggregation_job.step()) as i32), + /* step */ &(u16::from(aggregation_job.step()) as i64), /* last_request_hash */ &aggregation_job.last_request_hash(), /* updated_at */ &self.clock.now().as_naive_date_time()?, @@ -1940,7 +1960,7 @@ impl Transaction<'_, C> { AND report_aggregations.client_report_id = $2 AND aggregation_jobs.aggregation_param = $3 AND aggregation_jobs.aggregation_job_id != $4 - AND UPPER(aggregation_jobs.client_timestamp_interval) >= COALESCE($5::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND aggregation_jobs.max_client_timestamp >= COALESCE($5::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; Ok(self @@ -1988,7 +2008,7 @@ impl Transaction<'_, C> { WHERE tasks.task_id = $1 AND aggregation_jobs.aggregation_job_id = $2 AND report_aggregations.client_report_id = $3 - AND UPPER(aggregation_jobs.client_timestamp_interval) >= COALESCE($4::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND aggregation_jobs.max_client_timestamp >= COALESCE($4::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; self.query_opt( @@ -2042,7 +2062,7 @@ impl Transaction<'_, C> { JOIN tasks ON tasks.id = aggregation_jobs.task_id AND tasks.id = report_aggregations.task_id WHERE tasks.task_id = $1 AND aggregation_jobs.aggregation_job_id = $2 - AND UPPER(aggregation_jobs.client_timestamp_interval) >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP) + AND aggregation_jobs.max_client_timestamp >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP) ORDER BY report_aggregations.ord ASC", ) .await?; @@ -2096,7 +2116,7 @@ impl Transaction<'_, C> { JOIN aggregation_jobs ON aggregation_jobs.id = report_aggregations.aggregation_job_id JOIN tasks ON tasks.id = aggregation_jobs.task_id AND tasks.id = report_aggregations.task_id WHERE tasks.task_id = $1 - AND UPPER(aggregation_jobs.client_timestamp_interval) >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND aggregation_jobs.max_client_timestamp >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; self.query( @@ -2245,7 +2265,7 @@ impl Transaction<'_, C> { JOIN tasks ON tasks.id = aggregation_jobs.task_id WHERE tasks.task_id = $1 AND aggregation_job_id = $2 - AND UPPER(aggregation_jobs.client_timestamp_interval) >= COALESCE($14::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP) + AND aggregation_jobs.max_client_timestamp >= COALESCE($14::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP) ON CONFLICT DO NOTHING", ) .await?; @@ -2303,7 +2323,7 @@ impl Transaction<'_, C> { AND report_aggregations.client_report_id = $10 AND report_aggregations.client_timestamp = $11 AND report_aggregations.ord = $12 - AND UPPER(aggregation_jobs.client_timestamp_interval) >= COALESCE($13::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND aggregation_jobs.max_client_timestamp >= COALESCE($13::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; check_single_row_mutation( @@ -2358,7 +2378,7 @@ impl Transaction<'_, C> { JOIN tasks ON tasks.id = collection_jobs.task_id WHERE tasks.task_id = $1 AND collection_jobs.collection_job_id = $2 - AND COALESCE(LOWER(collection_jobs.batch_interval), UPPER((SELECT client_timestamp_interval FROM batches WHERE batches.task_id = collection_jobs.task_id AND batches.batch_identifier = collection_jobs.batch_identifier AND batches.aggregation_param = collection_jobs.aggregation_param))) >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND COALESCE(collection_jobs.min_batch_timestamp, (SELECT max_client_timestamp FROM batches WHERE batches.task_id = collection_jobs.task_id AND batches.batch_identifier = collection_jobs.batch_identifier AND batches.aggregation_param = collection_jobs.aggregation_param)) >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; self.query_opt( @@ -2409,8 +2429,9 @@ impl Transaction<'_, C> { collection_jobs.leader_aggregate_share FROM collection_jobs JOIN tasks ON tasks.id = collection_jobs.task_id WHERE tasks.task_id = $1 - AND collection_jobs.batch_interval @> $2::TIMESTAMP - AND LOWER(collection_jobs.batch_interval) >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND collection_jobs.min_batch_timestamp <= $2::TIMESTAMP + AND collection_jobs.max_batch_timestamp >= $3::TIMESTAMP + AND collection_jobs.min_batch_timestamp >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; self.query( @@ -2445,6 +2466,8 @@ impl Transaction<'_, C> { batch_interval: &Interval, ) -> Result>, Error> { // TODO(#1553): write unit test + let (min_timestamp, max_timestamp) = (batch_interval.start(), batch_interval.end()); + let stmt = self .prepare_cached( "SELECT @@ -2458,15 +2481,17 @@ impl Transaction<'_, C> { collection_jobs.leader_aggregate_share FROM collection_jobs JOIN tasks ON tasks.id = collection_jobs.task_id WHERE tasks.task_id = $1 - AND collection_jobs.batch_interval && $2 - AND LOWER(collection_jobs.batch_interval) >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND collection_jobs.min_batch_timestamp < $3 + AND collection_jobs.max_batch_timestamp >= $2 + AND collection_jobs.min_batch_timestamp >= COALESCE($4::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; self.query( &stmt, &[ /* task_id */ task_id.as_ref(), - /* batch_interval */ &SqlInterval::from(batch_interval), + /* min_batch_timestamp */ &min_timestamp.as_naive_date_time()?, + /* max_batch_timestamp */ &max_timestamp.as_naive_date_time()?, /* now */ &self.clock.now().as_naive_date_time()?, ], ) @@ -2516,7 +2541,7 @@ impl Transaction<'_, C> { AND batches.batch_identifier = collection_jobs.batch_identifier WHERE tasks.task_id = $1 AND collection_jobs.batch_identifier = $2 - AND UPPER(batches.client_timestamp_interval) >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND batches.max_client_timestamp >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; self.query( @@ -2561,7 +2586,7 @@ impl Transaction<'_, C> { FROM collection_jobs JOIN tasks ON tasks.id = collection_jobs.task_id WHERE tasks.task_id = $1 - AND COALESCE(LOWER(collection_jobs.batch_interval), UPPER((SELECT client_timestamp_interval FROM batches WHERE batches.task_id = collection_jobs.task_id AND batches.batch_identifier = collection_jobs.batch_identifier AND batches.aggregation_param = collection_jobs.aggregation_param))) >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND COALESCE(collection_jobs.min_batch_timestamp, (SELECT max_client_timestamp FROM batches WHERE batches.task_id = collection_jobs.task_id AND batches.batch_identifier = collection_jobs.batch_identifier AND batches.aggregation_param = collection_jobs.aggregation_param)) >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; self.query( @@ -2664,19 +2689,26 @@ impl Transaction<'_, C> { where A::AggregationParam: std::fmt::Debug, { - let batch_interval = - Q::to_batch_interval(collection_job.batch_identifier()).map(SqlInterval::from); + let batch_interval = Q::to_batch_interval(collection_job.batch_identifier()); + let (min_batch_timestamp, max_batch_timestamp) = ( + batch_interval.map(Interval::start), + batch_interval + .map(|i| i.end().sub(&Duration::from_seconds(1))) + .transpose()?, + ); let stmt = self .prepare_cached( "INSERT INTO collection_jobs (collection_job_id, task_id, query, aggregation_param, batch_identifier, - batch_interval, state, created_at, updated_at, updated_by) + min_batch_timestamp, max_batch_timestamp, state, created_at, updated_at, + updated_by) VALUES ( - $1, (SELECT id FROM tasks WHERE task_id = $2), $3, $4, $5, $6, $7, $8, $9, $10 + $1, (SELECT id FROM tasks WHERE task_id = $2), $3, $4, $5, $6, $7, $8, $9, $10, + $11 ) ON CONFLICT DO NOTHING - RETURNING COALESCE(COALESCE(LOWER(batch_interval), UPPER((SELECT client_timestamp_interval FROM batches WHERE batches.task_id = collection_jobs.task_id AND batches.batch_identifier = collection_jobs.batch_identifier AND batches.aggregation_param = collection_jobs.aggregation_param))) < COALESCE($11::TIMESTAMP - (SELECT report_expiry_age FROM tasks WHERE task_id = $2) * '1 second'::INTERVAL, '-infinity'::TIMESTAMP), FALSE) AS is_expired", + RETURNING COALESCE(COALESCE(min_batch_timestamp, (SELECT max_client_timestamp FROM batches WHERE batches.task_id = collection_jobs.task_id AND batches.batch_identifier = collection_jobs.batch_identifier AND batches.aggregation_param = collection_jobs.aggregation_param)) < COALESCE($12::TIMESTAMP - (SELECT report_expiry_age FROM tasks WHERE task_id = $2) * '1 second'::INTERVAL, '-infinity'::TIMESTAMP), FALSE) AS is_expired", ) .await?; @@ -2690,7 +2722,15 @@ impl Transaction<'_, C> { /* aggregation_param */ &collection_job.aggregation_parameter().get_encoded(), /* batch_identifier */ &collection_job.batch_identifier().get_encoded(), - /* batch_interval */ &batch_interval, + /* min_batch_timestamp */ + &min_batch_timestamp + .map(Time::as_naive_date_time) + .transpose()?, + /* max_batch_timestamp */ + &max_batch_timestamp + .as_ref() + .map(Time::as_naive_date_time) + .transpose()?, /* state */ &collection_job.state().collection_job_state_code(), /* created_at */ &self.clock.now().as_naive_date_time()?, @@ -2744,6 +2784,10 @@ impl Transaction<'_, C> { let lease_expiry_time = add_naive_date_time_duration(&now, lease_duration)?; let maximum_acquire_count: i64 = maximum_acquire_count.try_into()?; + // XXX: go back to using FOR UPDATE SKIP LOCKED once it works properly(?) + // XXX: once we are on CockroachDB v23.2, go back to using gen_random_bytes. + let lease_token = random::<[u8; 16]>(); + let stmt = self .prepare_cached( "WITH incomplete_jobs AS ( @@ -2752,16 +2796,17 @@ impl Transaction<'_, C> { JOIN tasks ON tasks.id = collection_jobs.task_id WHERE tasks.aggregator_role = 'LEADER' AND collection_jobs.state = 'COLLECTABLE' - AND collection_jobs.lease_expiry <= $4 - AND COALESCE(LOWER(collection_jobs.batch_interval), UPPER((SELECT client_timestamp_interval FROM batches WHERE batches.task_id = collection_jobs.task_id AND batches.batch_identifier = collection_jobs.batch_identifier AND batches.aggregation_param = collection_jobs.aggregation_param))) >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP) - FOR UPDATE OF collection_jobs SKIP LOCKED LIMIT $5 + AND collection_jobs.lease_expiry <= $5 + AND COALESCE(collection_jobs.min_batch_timestamp, (SELECT max_client_timestamp FROM batches WHERE batches.task_id = collection_jobs.task_id AND batches.batch_identifier = collection_jobs.batch_identifier AND batches.aggregation_param = collection_jobs.aggregation_param)) >= COALESCE($5::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP) + -- FOR UPDATE OF collection_jobs SKIP LOCKED + LIMIT $6 ) UPDATE collection_jobs SET lease_expiry = $1, - lease_token = gen_random_bytes(16), + lease_token = $2, lease_attempts = lease_attempts + 1, - updated_at = $2, - updated_by = $3 + updated_at = $3, + updated_by = $4 FROM incomplete_jobs WHERE collection_jobs.id = incomplete_jobs.id RETURNING incomplete_jobs.task_id, incomplete_jobs.query_type, incomplete_jobs.vdaf, @@ -2774,6 +2819,7 @@ impl Transaction<'_, C> { &stmt, &[ /* lease_expiry */ &lease_expiry_time, + /* lease_token */ &lease_token, /* updated_at */ &self.clock.now().as_naive_date_time()?, /* updated_by */ &self.name, /* now */ &now, @@ -2821,7 +2867,7 @@ impl Transaction<'_, C> { AND collection_jobs.collection_job_id = $4 AND collection_jobs.lease_expiry = $5 AND collection_jobs.lease_token = $6 - AND COALESCE(LOWER(collection_jobs.batch_interval), UPPER((SELECT client_timestamp_interval FROM batches WHERE batches.task_id = collection_jobs.task_id AND batches.batch_identifier = collection_jobs.batch_identifier AND batches.aggregation_param = collection_jobs.aggregation_param))) >= COALESCE($7::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND COALESCE(collection_jobs.min_batch_timestamp, (SELECT max_client_timestamp FROM batches WHERE batches.task_id = collection_jobs.task_id AND batches.batch_identifier = collection_jobs.batch_identifier AND batches.aggregation_param = collection_jobs.aggregation_param)) >= COALESCE($7::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; check_single_row_mutation( @@ -2889,7 +2935,7 @@ impl Transaction<'_, C> { WHERE tasks.id = collection_jobs.task_id AND tasks.task_id = $7 AND collection_job_id = $8 - AND COALESCE(LOWER(collection_jobs.batch_interval), UPPER((SELECT client_timestamp_interval FROM batches WHERE batches.task_id = collection_jobs.task_id AND batches.batch_identifier = collection_jobs.batch_identifier AND batches.aggregation_param = collection_jobs.aggregation_param))) >= COALESCE($9::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND COALESCE(collection_jobs.min_batch_timestamp, (SELECT max_client_timestamp FROM batches WHERE batches.task_id = collection_jobs.task_id AND batches.batch_identifier = collection_jobs.batch_identifier AND batches.aggregation_param = collection_jobs.aggregation_param)) >= COALESCE($9::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; @@ -2928,9 +2974,7 @@ impl Transaction<'_, C> { ) -> Result>, Error> { let stmt = self .prepare_cached( - "SELECT - batch_aggregations.state, aggregate_share, report_count, - batch_aggregations.client_timestamp_interval, checksum + "SELECT batch_aggregations.state, aggregate_share, report_count, checksum FROM batch_aggregations JOIN tasks ON tasks.id = batch_aggregations.task_id JOIN batches ON batches.task_id = batch_aggregations.task_id @@ -2940,7 +2984,7 @@ impl Transaction<'_, C> { AND batch_aggregations.batch_identifier = $2 AND batch_aggregations.aggregation_param = $3 AND ord = $4 - AND UPPER(COALESCE(batches.batch_interval, batches.client_timestamp_interval)) >= COALESCE($5::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND COALESCE(batches.max_batch_timestamp, batches.max_client_timestamp) >= COALESCE($5::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; @@ -2984,9 +3028,7 @@ impl Transaction<'_, C> { ) -> Result>, Error> { let stmt = self .prepare_cached( - "SELECT - ord, batch_aggregations.state, aggregate_share, report_count, - batch_aggregations.client_timestamp_interval, checksum + "SELECT ord, batch_aggregations.state, aggregate_share, report_count, checksum FROM batch_aggregations JOIN tasks ON tasks.id = batch_aggregations.task_id JOIN batches ON batches.task_id = batch_aggregations.task_id @@ -2995,7 +3037,7 @@ impl Transaction<'_, C> { WHERE tasks.task_id = $1 AND batch_aggregations.batch_identifier = $2 AND batch_aggregations.aggregation_param = $3 - AND UPPER(COALESCE(batches.batch_interval, batches.client_timestamp_interval)) >= COALESCE($4::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND COALESCE(batches.max_batch_timestamp, batches.max_client_timestamp) >= COALESCE($4::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; @@ -3037,15 +3079,14 @@ impl Transaction<'_, C> { .prepare_cached( "SELECT batch_aggregations.batch_identifier, batch_aggregations.aggregation_param, ord, - batch_aggregations.state, aggregate_share, report_count, - batch_aggregations.client_timestamp_interval, checksum + batch_aggregations.state, aggregate_share, report_count, checksum FROM batch_aggregations JOIN tasks ON tasks.id = batch_aggregations.task_id JOIN batches ON batches.task_id = batch_aggregations.task_id AND batches.batch_identifier = batch_aggregations.batch_identifier AND batches.aggregation_param = batch_aggregations.aggregation_param WHERE tasks.task_id = $1 - AND UPPER(COALESCE(batches.batch_interval, batches.client_timestamp_interval)) >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND COALESCE(batches.max_batch_timestamp, batches.max_client_timestamp) >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; @@ -3096,9 +3137,6 @@ impl Transaction<'_, C> { .transpose() .map_err(|_| Error::DbState("aggregate_share couldn't be parsed".to_string()))?; let report_count = row.get_bigint_and_convert("report_count")?; - let client_timestamp_interval = row - .get::<_, SqlInterval>("client_timestamp_interval") - .as_interval(); let checksum = ReportIdChecksum::get_decoded(row.get("checksum"))?; Ok(BatchAggregation::new( task_id, @@ -3108,7 +3146,6 @@ impl Transaction<'_, C> { state, aggregate_share, report_count, - client_timestamp_interval, checksum, )) } @@ -3127,22 +3164,18 @@ impl Transaction<'_, C> { A::AggregationParam: std::fmt::Debug, A::AggregateShare: std::fmt::Debug, { - let batch_interval = - Q::to_batch_interval(batch_aggregation.batch_identifier()).map(SqlInterval::from); - let stmt = self .prepare_cached( "INSERT INTO batch_aggregations ( - task_id, batch_identifier, batch_interval, aggregation_param, ord, state, - aggregate_share, report_count, client_timestamp_interval, checksum, created_at, - updated_at, updated_by + task_id, batch_identifier, aggregation_param, ord, state, aggregate_share, + report_count, checksum, created_at, updated_at, updated_by ) VALUES ( (SELECT id FROM tasks WHERE task_id = $1), $2, $3, $4, $5, $6, $7, $8, $9, $10, - $11, $12, $13 + $11 ) ON CONFLICT DO NOTHING - RETURNING COALESCE(UPPER((SELECT COALESCE(batch_interval, client_timestamp_interval) FROM batches WHERE task_id = batch_aggregations.task_id AND batch_identifier = batch_aggregations.batch_identifier AND aggregation_param = batch_aggregations.aggregation_param)) < COALESCE($14::TIMESTAMP - (SELECT report_expiry_age FROM tasks WHERE task_id = $1) * '1 second'::INTERVAL, '-infinity'::TIMESTAMP), FALSE) AS is_expired", + RETURNING COALESCE((SELECT COALESCE(max_batch_timestamp, max_client_timestamp) FROM batches WHERE task_id = batch_aggregations.task_id AND batch_identifier = batch_aggregations.batch_identifier AND aggregation_param = batch_aggregations.aggregation_param) < COALESCE($12::TIMESTAMP - (SELECT report_expiry_age FROM tasks WHERE task_id = $1) * '1 second'::INTERVAL, '-infinity'::TIMESTAMP), FALSE) AS is_expired", ) .await?; let rows = self @@ -3152,7 +3185,6 @@ impl Transaction<'_, C> { /* task_id */ &batch_aggregation.task_id().as_ref(), /* batch_identifier */ &batch_aggregation.batch_identifier().get_encoded(), - /* batch_interval */ &batch_interval, /* aggregation_param */ &batch_aggregation.aggregation_parameter().get_encoded(), /* ord */ &i64::try_from(batch_aggregation.ord())?, @@ -3161,8 +3193,6 @@ impl Transaction<'_, C> { &batch_aggregation.aggregate_share().map(Encode::get_encoded), /* report_count */ &i64::try_from(batch_aggregation.report_count())?, - /* client_timestamp_interval */ - &SqlInterval::from(batch_aggregation.client_timestamp_interval()), /* checksum */ &batch_aggregation.checksum().get_encoded(), /* created_at */ &self.clock.now().as_naive_date_time()?, /* updated_at */ &self.clock.now().as_naive_date_time()?, @@ -3229,20 +3259,19 @@ impl Transaction<'_, C> { state = $1, aggregate_share = $2, report_count = $3, - client_timestamp_interval = $4, - checksum = $5, - updated_at = $6, - updated_by = $7 + checksum = $4, + updated_at = $5, + updated_by = $6 FROM tasks, batches WHERE tasks.id = batch_aggregations.task_id AND batches.task_id = batch_aggregations.task_id AND batches.batch_identifier = batch_aggregations.batch_identifier AND batches.aggregation_param = batch_aggregations.aggregation_param - AND tasks.task_id = $8 - AND batch_aggregations.batch_identifier = $9 - AND batch_aggregations.aggregation_param = $10 - AND ord = $11 - AND UPPER(COALESCE(batches.batch_interval, batches.client_timestamp_interval)) >= COALESCE($12::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND tasks.task_id = $7 + AND batch_aggregations.batch_identifier = $8 + AND batch_aggregations.aggregation_param = $9 + AND ord = $10 + AND COALESCE(batches.max_batch_timestamp, batches.max_client_timestamp) >= COALESCE($11::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; check_single_row_mutation( @@ -3253,8 +3282,6 @@ impl Transaction<'_, C> { /* aggregate_share */ &batch_aggregation.aggregate_share().map(Encode::get_encoded), /* report_count */ &i64::try_from(batch_aggregation.report_count())?, - /* client_timestamp_interval */ - &SqlInterval::from(batch_aggregation.client_timestamp_interval()), /* checksum */ &batch_aggregation.checksum().get_encoded(), /* updated_at */ &self.clock.now().as_naive_date_time()?, /* updated_by */ &self.name, @@ -3295,7 +3322,7 @@ impl Transaction<'_, C> { WHERE tasks.task_id = $1 AND batch_identifier = $2 AND aggregation_param = $3 - AND COALESCE(LOWER(aggregate_share_jobs.batch_interval), UPPER((SELECT client_timestamp_interval FROM batches WHERE batches.task_id = aggregate_share_jobs.task_id AND batches.batch_identifier = aggregate_share_jobs.batch_identifier AND batches.aggregation_param = aggregate_share_jobs.aggregation_param))) >= COALESCE($4::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND COALESCE(aggregate_share_jobs.min_batch_timestamp, (SELECT max_client_timestamp FROM batches WHERE batches.task_id = aggregate_share_jobs.task_id AND batches.batch_identifier = aggregate_share_jobs.batch_identifier AND batches.aggregation_param = aggregate_share_jobs.aggregation_param)) >= COALESCE($4::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; self.query_opt( @@ -3343,8 +3370,9 @@ impl Transaction<'_, C> { FROM aggregate_share_jobs JOIN tasks ON tasks.id = aggregate_share_jobs.task_id WHERE tasks.task_id = $1 - AND aggregate_share_jobs.batch_interval @> $2::TIMESTAMP - AND LOWER(aggregate_share_jobs.batch_interval) >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND aggregate_share_jobs.min_batch_timestamp <= $2::TIMESTAMP + AND aggregate_share_jobs.max_batch_timestamp >= $2::TIMESTAMP + AND aggregate_share_jobs.min_batch_timestamp >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; self.query( @@ -3383,6 +3411,8 @@ impl Transaction<'_, C> { task_id: &TaskId, interval: &Interval, ) -> Result>, Error> { + let (min_timestamp, max_timestamp) = (interval.start(), interval.end()); + let stmt = self .prepare_cached( "SELECT @@ -3394,15 +3424,17 @@ impl Transaction<'_, C> { FROM aggregate_share_jobs JOIN tasks ON tasks.id = aggregate_share_jobs.task_id WHERE tasks.task_id = $1 - AND aggregate_share_jobs.batch_interval && $2 - AND LOWER(aggregate_share_jobs.batch_interval) >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND aggregate_share_jobs.min_batch_timestamp < $3 + AND aggregate_share_jobs.max_batch_timestamp >= $2 + AND aggregate_share_jobs.min_batch_timestamp >= COALESCE($4::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; self.query( &stmt, &[ /* task_id */ &task_id.as_ref(), - /* interval */ &SqlInterval::from(interval), + /* min_timestamp */ &min_timestamp.as_naive_date_time()?, + /* max_timestamp */ &max_timestamp.as_naive_date_time()?, /* now */ &self.clock.now().as_naive_date_time()?, ], ) @@ -3445,7 +3477,7 @@ impl Transaction<'_, C> { FROM aggregate_share_jobs JOIN tasks ON tasks.id = aggregate_share_jobs.task_id WHERE tasks.task_id = $1 AND aggregate_share_jobs.batch_identifier = $2 - AND UPPER((SELECT client_timestamp_interval FROM batches WHERE batches.task_id = aggregate_share_jobs.task_id AND batches.batch_identifier = aggregate_share_jobs.batch_identifier AND batches.aggregation_param = aggregate_share_jobs.aggregation_param)) >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND (SELECT max_client_timestamp FROM batches WHERE batches.task_id = aggregate_share_jobs.task_id AND batches.batch_identifier = aggregate_share_jobs.batch_identifier AND batches.aggregation_param = aggregate_share_jobs.aggregation_param) >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; self.query( @@ -3486,7 +3518,7 @@ impl Transaction<'_, C> { FROM aggregate_share_jobs JOIN tasks ON tasks.id = aggregate_share_jobs.task_id WHERE tasks.task_id = $1 - AND COALESCE(LOWER(aggregate_share_jobs.batch_interval), UPPER((SELECT client_timestamp_interval FROM batches WHERE batches.task_id = aggregate_share_jobs.task_id AND batches.batch_identifier = aggregate_share_jobs.batch_identifier AND batches.aggregation_param = aggregate_share_jobs.aggregation_param))) >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND COALESCE(aggregate_share_jobs.min_batch_timestamp, (SELECT max_client_timestamp FROM batches WHERE batches.task_id = aggregate_share_jobs.task_id AND batches.batch_identifier = aggregate_share_jobs.batch_identifier AND batches.aggregation_param = aggregate_share_jobs.aggregation_param)) >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; self.query( @@ -3545,18 +3577,24 @@ impl Transaction<'_, C> { &self, aggregate_share_job: &AggregateShareJob, ) -> Result<(), Error> { - let batch_interval = - Q::to_batch_interval(aggregate_share_job.batch_identifier()).map(SqlInterval::from); + let batch_interval = Q::to_batch_interval(aggregate_share_job.batch_identifier()); + let (min_batch_timestamp, max_batch_timestamp) = ( + batch_interval.map(Interval::start), + batch_interval + .map(|i| i.end().sub(&Duration::from_seconds(1))) + .transpose()?, + ); let stmt = self .prepare_cached( "INSERT INTO aggregate_share_jobs ( - task_id, batch_identifier, batch_interval, aggregation_param, - helper_aggregate_share, report_count, checksum, created_at, updated_by + task_id, batch_identifier, min_batch_timestamp, max_batch_timestamp, + aggregation_param, helper_aggregate_share, report_count, checksum, created_at, + updated_by ) - VALUES ((SELECT id FROM tasks WHERE task_id = $1), $2, $3, $4, $5, $6, $7, $8, $9) + VALUES ((SELECT id FROM tasks WHERE task_id = $1), $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT DO NOTHING - RETURNING COALESCE(COALESCE(LOWER(batch_interval), UPPER((SELECT client_timestamp_interval FROM batches WHERE batches.task_id = aggregate_share_jobs.task_id AND batches.batch_identifier = aggregate_share_jobs.batch_identifier AND batches.aggregation_param = aggregate_share_jobs.aggregation_param))) < COALESCE($10::TIMESTAMP - (SELECT report_expiry_age FROM tasks WHERE task_id = $1) * '1 second'::INTERVAL, '-infinity'::TIMESTAMP), FALSE) AS is_expired", + RETURNING COALESCE(COALESCE(min_batch_timestamp, (SELECT max_client_timestamp FROM batches WHERE batches.task_id = aggregate_share_jobs.task_id AND batches.batch_identifier = aggregate_share_jobs.batch_identifier AND batches.aggregation_param = aggregate_share_jobs.aggregation_param)) < COALESCE($11::TIMESTAMP - (SELECT report_expiry_age FROM tasks WHERE task_id = $1) * '1 second'::INTERVAL, '-infinity'::TIMESTAMP), FALSE) AS is_expired", ) .await?; let rows = self @@ -3566,7 +3604,15 @@ impl Transaction<'_, C> { /* task_id */ &aggregate_share_job.task_id().as_ref(), /* batch_identifier */ &aggregate_share_job.batch_identifier().get_encoded(), - /* batch_interval */ &batch_interval, + /* min_batch_timestamp */ + &min_batch_timestamp + .map(Time::as_naive_date_time) + .transpose()?, + /* max_batch_timestamp */ + &max_batch_timestamp + .as_ref() + .map(Time::as_naive_date_time) + .transpose()?, /* aggregation_param */ &aggregate_share_job.aggregation_parameter().get_encoded(), /* helper_aggregate_share */ @@ -3629,7 +3675,7 @@ impl Transaction<'_, C> { ) VALUES ((SELECT id FROM tasks WHERE task_id = $1), $2, $3, $4, $5) ON CONFLICT DO NOTHING - RETURNING COALESCE(UPPER((SELECT client_timestamp_interval FROM batches WHERE task_id = outstanding_batches.task_id AND batch_identifier = outstanding_batches.batch_id)) < COALESCE($6::TIMESTAMP - (SELECT report_expiry_age FROM tasks WHERE task_id = $1) * '1 second'::INTERVAL, '-infinity'::TIMESTAMP), FALSE) AS is_expired", + RETURNING COALESCE((SELECT max_client_timestamp FROM batches WHERE task_id = outstanding_batches.task_id AND batch_identifier = outstanding_batches.batch_id) < COALESCE($6::TIMESTAMP - (SELECT report_expiry_age FROM tasks WHERE task_id = $1) * '1 second'::INTERVAL, '-infinity'::TIMESTAMP), FALSE) AS is_expired", ) .await?; let rows = self @@ -3695,7 +3741,7 @@ impl Transaction<'_, C> { AND batches.batch_identifier = outstanding_batches.batch_id WHERE tasks.task_id = $1 AND time_bucket_start = $2 - AND UPPER(batches.client_timestamp_interval) >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND batches.max_client_timestamp >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; self.query( @@ -3716,7 +3762,7 @@ impl Transaction<'_, C> { AND batches.batch_identifier = outstanding_batches.batch_id WHERE tasks.task_id = $1 AND time_bucket_start IS NULL - AND UPPER(batches.client_timestamp_interval) >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", + AND batches.max_client_timestamp >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)", ) .await?; self.query( @@ -3836,7 +3882,7 @@ impl Transaction<'_, C> { ON batches.task_id = outstanding_batches.task_id AND batches.batch_identifier = outstanding_batches.batch_id WHERE tasks.task_id = $1 - AND UPPER(batches.client_timestamp_interval) >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP) + AND batches.max_client_timestamp >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP) GROUP BY outstanding_batches.batch_id ) SELECT batch_id FROM batches WHERE count >= $2::BIGINT LIMIT 1", @@ -3855,6 +3901,39 @@ impl Transaction<'_, C> { .transpose() } + // returns (min_timestamp, max_timestamp) -- both inclusive + // XXX: find a better place for this function + fn timestamps_from_interval(interval: &Interval) -> (Option