Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[experimental] Replace Postgres with CockroachDB for storage. #2285

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 7 additions & 21 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,17 @@ use janus_core::{
auth_tokens::AuthenticationToken,
hpke::{self, HpkeApplicationInfo, HpkeKeypair, Label},
http::HttpErrorResponse,
time::{Clock, DurationExt, IntervalExt, TimeExt},
time::{Clock, IntervalExt, TimeExt},
vdaf::{VdafInstance, VERIFY_KEY_LENGTH},
};
use janus_messages::{
query_type::{FixedSize, TimeInterval},
taskprov::TaskConfig,
AggregateShare, AggregateShareAad, AggregateShareReq, AggregationJobContinueReq,
AggregationJobId, AggregationJobInitializeReq, AggregationJobResp, AggregationJobStep,
BatchSelector, Collection, CollectionJobId, CollectionReq, Duration, HpkeConfig,
HpkeConfigList, InputShareAad, Interval, PartialBatchSelector, PlaintextInputShare,
PrepareError, PrepareResp, PrepareStepResult, Report, ReportIdChecksum, ReportShare, Role,
TaskId,
BatchSelector, Collection, CollectionJobId, CollectionReq, HpkeConfig, HpkeConfigList,
InputShareAad, Interval, PartialBatchSelector, PlaintextInputShare, PrepareError, PrepareResp,
PrepareStepResult, Report, ReportIdChecksum, ReportShare, Role, TaskId,
};
use opentelemetry::{
metrics::{Counter, Histogram, Meter, Unit},
Expand Down Expand Up @@ -1893,31 +1892,19 @@ impl VdafOps {

// Store data to datastore.
let req = Arc::new(req);
let min_client_timestamp = req
.prepare_inits()
.iter()
.map(|prepare_init| *prepare_init.report_share().metadata().time())
.min()
.ok_or_else(|| Error::EmptyAggregation(*task.id()))?;
let max_client_timestamp = req
.prepare_inits()
.iter()
.map(|prepare_init| *prepare_init.report_share().metadata().time())
.max()
.ok_or_else(|| Error::EmptyAggregation(*task.id()))?;
let client_timestamp_interval = Interval::new(
min_client_timestamp,
max_client_timestamp
.difference(&min_client_timestamp)?
.add(&Duration::from_seconds(1))?,
)?;
let aggregation_job = Arc::new(
AggregationJob::<SEED_SIZE, Q, A>::new(
*task.id(),
*aggregation_job_id,
agg_param,
req.batch_selector().batch_identifier().clone(),
client_timestamp_interval,
max_client_timestamp,
if saw_continue {
AggregationJobState::InProgress
} else {
Expand Down Expand Up @@ -2599,8 +2586,8 @@ impl VdafOps {
Q::acknowledge_collection(tx, task.id(), collection_job.batch_identifier()),
)?;

// Merge the intervals spanned by the constituent batch aggregations into the
// interval spanned by the collection.
// Merge the intervals spanned by the constituent batches into the interval
// spanned by the collection.
let mut spanned_interval: Option<Interval> = None;
for interval in batches
.iter()
Expand Down Expand Up @@ -3063,7 +3050,6 @@ fn empty_batch_aggregations<
BatchAggregationState::Collected,
None,
0,
Interval::EMPTY,
ReportIdChecksum::default(),
))
} else {
Expand Down
10 changes: 2 additions & 8 deletions aggregator/src/aggregator/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@ use janus_aggregator_core::{
query_type::AccumulableQueryType,
task::AggregatorTask,
};
use janus_core::{
report_id::ReportIdChecksumExt,
time::{Clock, IntervalExt},
};
use janus_messages::{Interval, ReportId, ReportIdChecksum, Time};
use janus_core::{report_id::ReportIdChecksumExt, time::Clock};
use janus_messages::{ReportId, ReportIdChecksum, Time};
use prio::vdaf;
use rand::{thread_rng, Rng};
use std::{
Expand Down Expand Up @@ -82,8 +79,6 @@ impl<const SEED_SIZE: usize, Q: AccumulableQueryType, A: vdaf::Aggregator<SEED_S
) -> Result<(), datastore::Error> {
let batch_identifier =
Q::to_batch_identifier(&self.task, partial_batch_identifier, client_timestamp)?;
let client_timestamp_interval =
Interval::from_time(client_timestamp).map_err(|e| datastore::Error::User(e.into()))?;
let batch_aggregation_fn = || {
BatchAggregation::new(
*self.task.id(),
Expand All @@ -93,7 +88,6 @@ impl<const SEED_SIZE: usize, Q: AccumulableQueryType, A: vdaf::Aggregator<SEED_S
BatchAggregationState::Aggregating,
Some(A::AggregateShare::from(output_share.clone())),
1,
client_timestamp_interval,
ReportIdChecksum::for_report_id(report_id),
)
};
Expand Down
6 changes: 3 additions & 3 deletions aggregator/src/aggregator/aggregation_job_continue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,12 +400,12 @@ mod tests {
};
use janus_core::{
test_util::install_test_trace_subscriber,
time::{IntervalExt, MockClock},
time::MockClock,
vdaf::{VdafInstance, VERIFY_KEY_LENGTH},
};
use janus_messages::{
query_type::TimeInterval, AggregationJobContinueReq, AggregationJobId, AggregationJobResp,
AggregationJobStep, Interval, PrepareContinue, PrepareResp, PrepareStepResult, Role,
AggregationJobStep, PrepareContinue, PrepareResp, PrepareStepResult, Role,
};
use prio::{
idpf::IdpfInput,
Expand Down Expand Up @@ -487,7 +487,7 @@ mod tests {
aggregation_job_id,
aggregation_param,
(),
Interval::from_time(prepare_init.report_share().metadata().time()).unwrap(),
*prepare_init.report_share().metadata().time(),
AggregationJobState::InProgress,
AggregationJobStep::from(0),
))
Expand Down
14 changes: 3 additions & 11 deletions aggregator/src/aggregator/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ use janus_aggregator_core::{
#[cfg(feature = "fpvec_bounded_l2")]
use janus_core::vdaf::Prio3FixedPointBoundedL2VecSumBitSize;
use janus_core::{
time::{Clock, DurationExt as _, TimeExt as _},
time::Clock,
vdaf::{VdafInstance, VERIFY_KEY_LENGTH},
};
use janus_messages::{
query_type::{FixedSize, TimeInterval},
AggregationJobStep, Duration as DurationMsg, Interval, Role, TaskId,
AggregationJobStep, Role, TaskId,
};
use opentelemetry::{
metrics::{Histogram, Meter, Unit},
Expand Down Expand Up @@ -551,23 +551,15 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
"Creating aggregation job"
);

let min_client_timestamp =
agg_job_reports.iter().map(|(_, time)| time).min().unwrap(); // unwrap safety: agg_job_reports is non-empty
let max_client_timestamp =
agg_job_reports.iter().map(|(_, time)| time).max().unwrap(); // unwrap safety: agg_job_reports is non-empty
let client_timestamp_interval = Interval::new(
*min_client_timestamp,
max_client_timestamp
.difference(min_client_timestamp)?
.add(&DurationMsg::from_seconds(1))?,
)?;

let aggregation_job = AggregationJob::<SEED_SIZE, TimeInterval, A>::new(
*task.id(),
aggregation_job_id,
(),
(),
client_timestamp_interval,
*max_client_timestamp,
AggregationJobState::InProgress,
AggregationJobStep::from(0),
);
Expand Down
Loading