Skip to content

Commit

Permalink
[WIP] CockroachDB.
Browse files Browse the repository at this point in the history
  • Loading branch information
branlwyd committed Nov 17, 2023
1 parent 98eb00a commit b0e1635
Show file tree
Hide file tree
Showing 20 changed files with 552 additions and 811 deletions.
39 changes: 25 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 7 additions & 21 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,17 @@ use janus_core::{
auth_tokens::AuthenticationToken,
hpke::{self, HpkeApplicationInfo, HpkeKeypair, Label},
http::HttpErrorResponse,
time::{Clock, DurationExt, IntervalExt, TimeExt},
time::{Clock, IntervalExt, TimeExt},
vdaf::{VdafInstance, VERIFY_KEY_LENGTH},
};
use janus_messages::{
query_type::{FixedSize, TimeInterval},
taskprov::TaskConfig,
AggregateShare, AggregateShareAad, AggregateShareReq, AggregationJobContinueReq,
AggregationJobId, AggregationJobInitializeReq, AggregationJobResp, AggregationJobStep,
BatchSelector, Collection, CollectionJobId, CollectionReq, Duration, HpkeConfig,
HpkeConfigList, InputShareAad, Interval, PartialBatchSelector, PlaintextInputShare,
PrepareError, PrepareResp, PrepareStepResult, Report, ReportIdChecksum, ReportShare, Role,
TaskId,
BatchSelector, Collection, CollectionJobId, CollectionReq, HpkeConfig, HpkeConfigList,
InputShareAad, Interval, PartialBatchSelector, PlaintextInputShare, PrepareError, PrepareResp,
PrepareStepResult, Report, ReportIdChecksum, ReportShare, Role, TaskId,
};
use opentelemetry::{
metrics::{Counter, Histogram, Meter, Unit},
Expand Down Expand Up @@ -1893,31 +1892,19 @@ impl VdafOps {

// Store data to datastore.
let req = Arc::new(req);
let min_client_timestamp = req
.prepare_inits()
.iter()
.map(|prepare_init| *prepare_init.report_share().metadata().time())
.min()
.ok_or_else(|| Error::EmptyAggregation(*task.id()))?;
let max_client_timestamp = req
.prepare_inits()
.iter()
.map(|prepare_init| *prepare_init.report_share().metadata().time())
.max()
.ok_or_else(|| Error::EmptyAggregation(*task.id()))?;
let client_timestamp_interval = Interval::new(
min_client_timestamp,
max_client_timestamp
.difference(&min_client_timestamp)?
.add(&Duration::from_seconds(1))?,
)?;
let aggregation_job = Arc::new(
AggregationJob::<SEED_SIZE, Q, A>::new(
*task.id(),
*aggregation_job_id,
agg_param,
req.batch_selector().batch_identifier().clone(),
client_timestamp_interval,
max_client_timestamp,
if saw_continue {
AggregationJobState::InProgress
} else {
Expand Down Expand Up @@ -2599,8 +2586,8 @@ impl VdafOps {
Q::acknowledge_collection(tx, task.id(), collection_job.batch_identifier()),
)?;

// Merge the intervals spanned by the constituent batch aggregations into the
// interval spanned by the collection.
// Merge the intervals spanned by the constituent batches into the interval
// spanned by the collection.
let mut spanned_interval: Option<Interval> = None;
for interval in batches
.iter()
Expand Down Expand Up @@ -3063,7 +3050,6 @@ fn empty_batch_aggregations<
BatchAggregationState::Collected,
None,
0,
Interval::EMPTY,
ReportIdChecksum::default(),
))
} else {
Expand Down
10 changes: 2 additions & 8 deletions aggregator/src/aggregator/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@ use janus_aggregator_core::{
query_type::AccumulableQueryType,
task::AggregatorTask,
};
use janus_core::{
report_id::ReportIdChecksumExt,
time::{Clock, IntervalExt},
};
use janus_messages::{Interval, ReportId, ReportIdChecksum, Time};
use janus_core::{report_id::ReportIdChecksumExt, time::Clock};
use janus_messages::{ReportId, ReportIdChecksum, Time};
use prio::vdaf;
use rand::{thread_rng, Rng};
use std::{
Expand Down Expand Up @@ -82,8 +79,6 @@ impl<const SEED_SIZE: usize, Q: AccumulableQueryType, A: vdaf::Aggregator<SEED_S
) -> Result<(), datastore::Error> {
let batch_identifier =
Q::to_batch_identifier(&self.task, partial_batch_identifier, client_timestamp)?;
let client_timestamp_interval =
Interval::from_time(client_timestamp).map_err(|e| datastore::Error::User(e.into()))?;
let batch_aggregation_fn = || {
BatchAggregation::new(
*self.task.id(),
Expand All @@ -93,7 +88,6 @@ impl<const SEED_SIZE: usize, Q: AccumulableQueryType, A: vdaf::Aggregator<SEED_S
BatchAggregationState::Aggregating,
Some(A::AggregateShare::from(output_share.clone())),
1,
client_timestamp_interval,
ReportIdChecksum::for_report_id(report_id),
)
};
Expand Down
6 changes: 3 additions & 3 deletions aggregator/src/aggregator/aggregation_job_continue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,12 +400,12 @@ mod tests {
};
use janus_core::{
test_util::install_test_trace_subscriber,
time::{IntervalExt, MockClock},
time::MockClock,
vdaf::{VdafInstance, VERIFY_KEY_LENGTH},
};
use janus_messages::{
query_type::TimeInterval, AggregationJobContinueReq, AggregationJobId, AggregationJobResp,
AggregationJobStep, Interval, PrepareContinue, PrepareResp, PrepareStepResult, Role,
AggregationJobStep, PrepareContinue, PrepareResp, PrepareStepResult, Role,
};
use prio::{
idpf::IdpfInput,
Expand Down Expand Up @@ -487,7 +487,7 @@ mod tests {
aggregation_job_id,
aggregation_param,
(),
Interval::from_time(prepare_init.report_share().metadata().time()).unwrap(),
*prepare_init.report_share().metadata().time(),
AggregationJobState::InProgress,
AggregationJobStep::from(0),
))
Expand Down
14 changes: 3 additions & 11 deletions aggregator/src/aggregator/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ use janus_aggregator_core::{
#[cfg(feature = "fpvec_bounded_l2")]
use janus_core::vdaf::Prio3FixedPointBoundedL2VecSumBitSize;
use janus_core::{
time::{Clock, DurationExt as _, TimeExt as _},
time::Clock,
vdaf::{VdafInstance, VERIFY_KEY_LENGTH},
};
use janus_messages::{
query_type::{FixedSize, TimeInterval},
AggregationJobStep, Duration as DurationMsg, Interval, Role, TaskId,
AggregationJobStep, Role, TaskId,
};
use opentelemetry::{
metrics::{Histogram, Meter, Unit},
Expand Down Expand Up @@ -551,23 +551,15 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
"Creating aggregation job"
);

let min_client_timestamp =
agg_job_reports.iter().map(|(_, time)| time).min().unwrap(); // unwrap safety: agg_job_reports is non-empty
let max_client_timestamp =
agg_job_reports.iter().map(|(_, time)| time).max().unwrap(); // unwrap safety: agg_job_reports is non-empty
let client_timestamp_interval = Interval::new(
*min_client_timestamp,
max_client_timestamp
.difference(min_client_timestamp)?
.add(&DurationMsg::from_seconds(1))?,
)?;

let aggregation_job = AggregationJob::<SEED_SIZE, TimeInterval, A>::new(
*task.id(),
aggregation_job_id,
(),
(),
client_timestamp_interval,
*max_client_timestamp,
AggregationJobState::InProgress,
AggregationJobStep::from(0),
);
Expand Down
Loading

0 comments on commit b0e1635

Please sign in to comment.