Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit 44f6f1e
Author: ovi <[email protected]>
Date:   Wed Sep 20 16:41:42 2023 +0200

    removing comment

commit 58db26d
Author: Maxim Urschumzew <[email protected]>
Date:   Wed Sep 20 14:37:16 2023 +0200

    Add comments.

commit cd51caa
Author: ovi <[email protected]>
Date:   Mon Sep 18 12:40:54 2023 +0200

    fixing git dep

commit 128bbba
Author: ovi <[email protected]>
Date:   Mon Sep 18 04:26:46 2023 +0200

    format

commit e9a9ca8
Author: ovi <[email protected]>
Date:   Mon Sep 18 04:21:30 2023 +0200

    Squashed commit of the following:

    commit 29f4fd4
    Author: ovi <[email protected]>
    Date:   Mon Sep 18 01:46:04 2023 +0200

        nits

    commit 6d9503e
    Author: ovi <[email protected]>
    Date:   Wed Sep 13 00:51:40 2023 +0200

        fix import

    commit f318903
    Author: ovi <[email protected]>
    Date:   Tue Sep 12 23:45:00 2023 +0200

        import fix

    commit 82f8b63
    Author: ovi <[email protected]>
    Date:   Tue Sep 12 22:36:57 2023 +0200

        import fix

    commit 8a9e33c
    Author: ovi <[email protected]>
    Date:   Tue Sep 12 19:13:57 2023 +0200

        fmt

    commit 1a6fa3d
    Author: Maxim Urschumzew <[email protected]>
    Date:   Tue Sep 12 17:04:28 2023 +0200

        Fixing imports and feature gates for clippy.

        Co-authored-by: Olivia <[email protected]>

    commit 35c52d1
    Author: ovi <[email protected]>
    Date:   Tue Sep 12 18:43:52 2023 +0200

        clippy happy

    commit 340e7f9
    Author: ovi <[email protected]>
    Date:   Tue Sep 12 18:29:30 2023 +0200

        enabling Poplar1 again, feature flag cleanup

    commit c22282e
    Author: ovi <[email protected]>
    Date:   Tue Sep 12 16:45:08 2023 +0200

        cleanup

    commit d562fdf
    Author: Maxim Urschumzew <[email protected]>
    Date:   Tue Sep 12 11:59:42 2023 +0200

        Squashed commit of the following:

        commit 8934f01
        Author: ovi <[email protected]>
        Date:   Mon Sep 11 21:35:29 2023 +0200

            fixing e2e tests

        commit d5e3122
        Author: Maxim Urschumzew <[email protected]>
        Date:   Sun Sep 10 14:19:53 2023 +0200

            Fix serde error.

        commit 44b9575
        Author: Maxim Urschumzew <[email protected]>
        Date:   Sun Sep 10 13:06:48 2023 +0200

            Fix interop client.

        commit 3cafe7b
        Author: ovi <[email protected]>
        Date:   Sun Sep 10 14:58:24 2023 +0200

            updating fpvec vdaf instance struct

        commit 61b3d02
        Author: Maxim Urschumzew <[email protected]>
        Date:   Sun Sep 10 12:56:22 2023 +0200

            Fix lib for interop_binaries.

        commit f7876e9
        Author: Maxim Urschumzew <[email protected]>
        Date:   Sun Sep 10 12:21:37 2023 +0200

            Rewrite vdaf_dispatch macros to for single-case fixedpoint vecs.

        commit 9801351
        Author: Maxim Urschumzew <[email protected]>
        Date:   Sat Sep 9 16:40:50 2023 +0200

            [WIP] Work towards usable macro system for dispatch.

             - Consolidate VdafInstance cases for fixedpoint types into a single case.
             - Remove the ZCdp cases for VdafOps for fixedpoint types. Couldn't merge the
               others because VdafOps contains actual vdaf instances and not VdafInstance objects.

        commit 2ae5635
        Author: Maxim Urschumzew <[email protected]>
        Date:   Sat Sep 9 12:41:47 2023 +0200

            Add strategy dispatch for collection_job_driver.

        commit eec935a
        Author: ovi <[email protected]>
        Date:   Sat Sep 9 02:18:37 2023 +0200

            adding dp strategy arg in aggregator

        commit c235908
        Author: Maxim Urschumzew <[email protected]>
        Date:   Fri Sep 8 19:24:19 2023 +0200

            Fix some typos.

        commit 11ee488
        Author: ovi <[email protected]>
        Date:   Fri Sep 8 19:13:21 2023 +0200

            cleanup

        commit 469e1a5
        Author: Maxim Urschumzew <[email protected]>
        Date:   Fri Sep 8 16:58:10 2023 +0200

            Remove dp_strategy from more places.

        commit ffe707d
        Author: Maxim Urschumzew <[email protected]>
        Date:   Fri Sep 8 16:55:59 2023 +0200

            Remove dp strategy.

        commit 7908abb
        Author: ovi <[email protected]>
        Date:   Fri Sep 8 18:54:48 2023 +0200

            some last vdaf instance branches

        commit 69e6795
        Author: ovi <[email protected]>
        Date:   Fri Sep 8 18:48:00 2023 +0200

            more interop vdafinstance branches

        commit 357dc97
        Author: Maxim Urschumzew <[email protected]>
        Date:   Fri Sep 8 16:39:28 2023 +0200

            Add NoDifferentialPrivacy instance for fixedvec types.

        commit 99750c9
        Author: ovi <[email protected]>
        Date:   Fri Sep 8 18:38:41 2023 +0200

            interop vdafinstance branches

        commit 6fa4ebb
        Author: Maxim Urschumzew <[email protected]>
        Date:   Fri Sep 8 16:30:40 2023 +0200

            Add ZCdp cases in aggregator.rs.

        commit 16ef61f
        Author: ovi <[email protected]>
        Date:   Fri Sep 8 18:09:01 2023 +0200

            adding enum cases

        commit 404ae32
        Author: Maxim Urschumzew <[email protected]>
        Date:   Fri Sep 8 16:02:09 2023 +0200

            Remove dp_strategy from task definitions.

        commit e88107e
        Author: Maxim Urschumzew <[email protected]>
        Date:   Fri Sep 8 15:55:42 2023 +0200

            Remove strategy field from Task struct.

        commit 46ca5a2
        Author: Maxim Urschumzew <[email protected]>
        Date:   Fri Sep 8 13:24:21 2023 +0200

            Add missing strategy field.

        commit bdd28f4
        Author: Maxim Urschumzew <[email protected]>
        Date:   Fri Sep 8 12:40:09 2023 +0200

            Fix json specification of dp strategy.

        commit fdcf851
        Author: ovi <[email protected]>
        Date:   Fri Sep 8 04:08:22 2023 +0200

            more serialize mistakes

        commit 6a069fe
        Author: Maxim Urschumzew <[email protected]>
        Date:   Tue Sep 5 15:56:30 2023 +0200

            Fix more serialization mismatches.

        commit 891b6c5
        Merge: 5dcd3e4 1fec56f
        Author: ovi <[email protected]>
        Date:   Tue Sep 5 17:23:01 2023 +0200

            Merge branch 'janus-m5-v2' of github.com:dpsa-project/janus into janus-m5-v2

        commit 5dcd3e4
        Author: ovi <[email protected]>
        Date:   Tue Sep 5 17:22:44 2023 +0200

            reverting Cargo.lock

        commit 8572387
        Merge: e03aeec f8ee1f4
        Author: ovi <[email protected]>
        Date:   Tue Sep 5 16:48:09 2023 +0200

            Merge branch 'janus-m5-v2' of github.com:dpsa-project/janus into janus-m5-v2

        commit e03aeec
        Author: ovi <[email protected]>
        Date:   Tue Sep 5 16:47:42 2023 +0200

            fixing macro import stuff

        commit 1fec56f
        Author: Maxim Urschumzew <[email protected]>
        Date:   Tue Sep 5 14:52:06 2023 +0200

            Fix yaml value for test.

        commit f8ee1f4
        Author: Maxim Urschumzew <[email protected]>
        Date:   Tue Sep 5 13:50:41 2023 +0200

            Update prio dep.

        commit a5beebf
        Author: ovi <[email protected]>
        Date:   Mon Sep 4 16:42:29 2023 +0200

            interop stuff

        commit 7db08bc
        Author: ovi <[email protected]>
        Date:   Sun Sep 3 14:19:43 2023 +0200

            some clippies

        commit a6ee8ec
        Author: Maxim Urschumzew <[email protected]>
        Date:   Sat Sep 2 21:42:01 2023 +0200

            Call `add_noise_to_aggregate_share` also in collection_job_driver.

        commit b3a2826
        Author: Maxim Urschumzew <[email protected]>
        Date:   Sat Sep 2 21:35:12 2023 +0200

            Fix error type mismatch.

        commit c126c1d
        Author: Maxim Urschumzew <[email protected]>
        Date:   Sat Sep 2 17:02:33 2023 +0200

            Fix typo.

        commit 834654f
        Author: ovi <[email protected]>
        Date:   Sat Sep 2 18:59:28 2023 +0200

            actually adding noise!

        commit 6dc307c
        Author: ovi <[email protected]>
        Date:   Sat Sep 2 15:31:35 2023 +0200

            stretegy_alias for vdaf_dispatch

        commit 709893d
        Author: ovi <[email protected]>
        Date:   Sat Sep 2 14:05:06 2023 +0200

            not creating type alias on every invocation

        commit b0e14ee
        Author: Maxim Urschumzew <[email protected]>
        Date:   Sat Sep 2 10:49:24 2023 +0200

            Integrate $DpStrategy choice into dispatch macros.

        commit 8a1aa21
        Author: Maxim Urschumzew <[email protected]>
        Date:   Sat Sep 2 09:24:30 2023 +0200

            This commit shows the problem which we have if we match dynamically on the DpStrategyInstance.

        commit ce607f7
        Author: ovi <[email protected]>
        Date:   Sat Sep 2 04:13:16 2023 +0200

            attempting strategies

        commit aec6f53
        Author: Maxim Urschumzew <[email protected]>
        Date:   Fri Sep 1 17:58:14 2023 +0200

            Added `DpStrategyInstance`.

        commit 6c83a19
        Author: Maxim Urschumzew <[email protected]>
        Date:   Fri Sep 1 16:37:26 2023 +0200

            Add `AggregatorWithNoise` references.

        commit 2984618
        Author: Maxim Urschumzew <[email protected]>
        Date:   Thu Aug 31 21:27:00 2023 +0200

            Compile with new prio dep.
  • Loading branch information
ooovi committed Sep 22, 2023
1 parent 8a3c9d8 commit 71d2de3
Show file tree
Hide file tree
Showing 20 changed files with 951 additions and 449 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ janus_messages = { version = "0.6.0-prerelease-1", path = "messages" }
k8s-openapi = { version = "0.18.0", features = ["v1_24"] } # keep this version in sync with what is referenced by the indirect dependency via `kube`
kube = { version = "0.82.2", default-features = false, features = ["client", "rustls-tls"] }
opentelemetry = { version = "0.20", features = ["metrics"] }
prio = { version = "0.15.1", features = ["multithreaded", "experimental"] }
#prio = { version = "0.15.1", features = ["multithreaded", "experimental"] }
prio = {git = "https://github.com/dpsa4fl/libprio-rs.git", branch="dp-update-for-janus-rebased", features = ["multithreaded", "experimental"]}
serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.106"
serde_test = "1.0.175"
Expand Down
255 changes: 209 additions & 46 deletions aggregator/src/aggregator.rs

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions aggregator/src/aggregator/aggregate_share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use super::Error;
use janus_aggregator_core::{datastore::models::BatchAggregation, task::Task};
use janus_core::report_id::ReportIdChecksumExt;
use janus_messages::{query_type::QueryType, ReportIdChecksum};
use prio::vdaf::{self, Aggregatable};
use prio::{
dp::DifferentialPrivacyStrategy,
vdaf::{self, Aggregatable},
};

/// Computes the aggregate share over the provided batch aggregations.
/// The assumption is that all aggregation jobs contributing to those batch aggregations have
Expand All @@ -14,7 +17,8 @@ use prio::vdaf::{self, Aggregatable};
pub(crate) async fn compute_aggregate_share<
const SEED_SIZE: usize,
Q: QueryType,
A: vdaf::Aggregator<SEED_SIZE, 16>,
S: DifferentialPrivacyStrategy,
A: vdaf::AggregatorWithNoise<SEED_SIZE, 16, S>,
>(
task: &Task,
batch_aggregations: &[BatchAggregation<SEED_SIZE, Q, A>],
Expand Down
151 changes: 69 additions & 82 deletions aggregator/src/aggregator/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use janus_aggregator_core::{
datastore::{self, Datastore},
task::{self, Task},
};
#[cfg(feature = "fpvec_bounded_l2")]
use janus_core::task::Prio3FixedPointBoundedL2VecSumBitSize;
use janus_core::{
task::{VdafInstance, VERIFY_KEY_LENGTH},
time::{Clock, DurationExt as _, TimeExt as _},
Expand Down Expand Up @@ -311,41 +313,37 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
#[cfg(feature = "fpvec_bounded_l2")]
(
task::QueryType::TimeInterval,
VdafInstance::Prio3FixedPoint16BitBoundedL2VecSum { length },
) => {
let vdaf: Arc<Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI16<U15>>> =
Arc::new(Prio3::new_fixedpoint_boundedl2_vec_sum_multithreaded(
2, *length,
)?);
self.create_aggregation_jobs_for_time_interval_task_no_param::<VERIFY_KEY_LENGTH, Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI16<U15>>>(task, vdaf)
.await
}

#[cfg(feature = "fpvec_bounded_l2")]
(
task::QueryType::TimeInterval,
VdafInstance::Prio3FixedPoint32BitBoundedL2VecSum { length },
) => {
let vdaf: Arc<Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI32<U31>>> =
Arc::new(Prio3::new_fixedpoint_boundedl2_vec_sum_multithreaded(
2, *length,
)?);
self.create_aggregation_jobs_for_time_interval_task_no_param::<VERIFY_KEY_LENGTH, Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI32<U31>>>(task, vdaf)
.await
}

#[cfg(feature = "fpvec_bounded_l2")]
(
task::QueryType::TimeInterval,
VdafInstance::Prio3FixedPoint64BitBoundedL2VecSum { length },
) => {
let vdaf: Arc<Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI64<U63>>> =
Arc::new(Prio3::new_fixedpoint_boundedl2_vec_sum_multithreaded(
2, *length,
)?);
self.create_aggregation_jobs_for_time_interval_task_no_param::<VERIFY_KEY_LENGTH, Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI64<U63>>>(task, vdaf)
.await
}
VdafInstance::Prio3FixedPointBoundedL2VecSum {
bitsize,
dp_strategy: _,
length,
},
) => match bitsize {
Prio3FixedPointBoundedL2VecSumBitSize::BitSize16 => {
let vdaf: Arc<Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI16<U15>>> =
Arc::new(Prio3::new_fixedpoint_boundedl2_vec_sum_multithreaded(
2, *length,
)?);
self.create_aggregation_jobs_for_time_interval_task_no_param::<VERIFY_KEY_LENGTH, Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI16<U15>>>(task, vdaf)
.await
}
Prio3FixedPointBoundedL2VecSumBitSize::BitSize32 => {
let vdaf: Arc<Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI32<U31>>> =
Arc::new(Prio3::new_fixedpoint_boundedl2_vec_sum_multithreaded(
2, *length,
)?);
self.create_aggregation_jobs_for_time_interval_task_no_param::<VERIFY_KEY_LENGTH, Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI32<U31>>>(task, vdaf)
.await
}
Prio3FixedPointBoundedL2VecSumBitSize::BitSize64 => {
let vdaf: Arc<Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI64<U63>>> =
Arc::new(Prio3::new_fixedpoint_boundedl2_vec_sum_multithreaded(
2, *length,
)?);
self.create_aggregation_jobs_for_time_interval_task_no_param::<VERIFY_KEY_LENGTH, Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI64<U63>>>(task, vdaf)
.await
}
},

(
task::QueryType::FixedSize {
Expand Down Expand Up @@ -447,58 +445,47 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
max_batch_size,
batch_time_window_size,
},
VdafInstance::Prio3FixedPoint16BitBoundedL2VecSum { length },
) => {
let vdaf: Arc<Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI16<U15>>> =
Arc::new(Prio3::new_fixedpoint_boundedl2_vec_sum_multithreaded(
2, *length,
)?);
let max_batch_size = *max_batch_size;
let batch_time_window_size = *batch_time_window_size;
self.create_aggregation_jobs_for_fixed_size_task_no_param::<
VERIFY_KEY_LENGTH,
Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI16<U15>>,
>(task, vdaf, max_batch_size, batch_time_window_size).await
}

#[cfg(feature = "fpvec_bounded_l2")]
(
task::QueryType::FixedSize {
max_batch_size,
batch_time_window_size,
VdafInstance::Prio3FixedPointBoundedL2VecSum {
bitsize,
dp_strategy: _,
length,
},
VdafInstance::Prio3FixedPoint32BitBoundedL2VecSum { length },
) => {
let vdaf: Arc<Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI32<U31>>> =
Arc::new(Prio3::new_fixedpoint_boundedl2_vec_sum_multithreaded(
2, *length,
)?);
let max_batch_size = *max_batch_size;
let batch_time_window_size = *batch_time_window_size;
self.create_aggregation_jobs_for_fixed_size_task_no_param::<
VERIFY_KEY_LENGTH,
Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI32<U31>>,
>(task, vdaf, max_batch_size, batch_time_window_size).await
}

#[cfg(feature = "fpvec_bounded_l2")]
(
task::QueryType::FixedSize {
max_batch_size,
batch_time_window_size,
},
VdafInstance::Prio3FixedPoint64BitBoundedL2VecSum { length },
) => {
let vdaf: Arc<Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI64<U63>>> =
Arc::new(Prio3::new_fixedpoint_boundedl2_vec_sum_multithreaded(
2, *length,
)?);
let max_batch_size = *max_batch_size;
let batch_time_window_size = *batch_time_window_size;
self.create_aggregation_jobs_for_fixed_size_task_no_param::<
VERIFY_KEY_LENGTH,
Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI64<U63>>,
>(task, vdaf, max_batch_size, batch_time_window_size).await
match bitsize {
janus_core::task::Prio3FixedPointBoundedL2VecSumBitSize::BitSize16 => {
let vdaf: Arc<Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI16<U15>>> =
Arc::new(Prio3::new_fixedpoint_boundedl2_vec_sum_multithreaded(
2, *length,
)?);
self.create_aggregation_jobs_for_fixed_size_task_no_param::<
VERIFY_KEY_LENGTH,
Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI16<U15>>,
>(task, vdaf, max_batch_size, batch_time_window_size).await
}
janus_core::task::Prio3FixedPointBoundedL2VecSumBitSize::BitSize32 => {
let vdaf: Arc<Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI32<U31>>> =
Arc::new(Prio3::new_fixedpoint_boundedl2_vec_sum_multithreaded(
2, *length,
)?);
self.create_aggregation_jobs_for_fixed_size_task_no_param::<
VERIFY_KEY_LENGTH,
Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI32<U31>>,
>(task, vdaf, max_batch_size, batch_time_window_size).await
}
janus_core::task::Prio3FixedPointBoundedL2VecSumBitSize::BitSize64 => {
let vdaf: Arc<Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI64<U63>>> =
Arc::new(Prio3::new_fixedpoint_boundedl2_vec_sum_multithreaded(
2, *length,
)?);
self.create_aggregation_jobs_for_fixed_size_task_no_param::<
VERIFY_KEY_LENGTH,
Prio3FixedPointBoundedL2VecSumMultithreaded<FixedI64<U63>>,
>(task, vdaf, max_batch_size, batch_time_window_size).await
}
}
}

_ => {
Expand Down
33 changes: 25 additions & 8 deletions aggregator/src/aggregator/collection_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use janus_aggregator_core::{
},
task,
};
use janus_core::{time::Clock, vdaf_dispatch};
use janus_core::{dp::DpStrategyInstance, time::Clock, vdaf_dispatch};
use janus_messages::{
query_type::{FixedSize, QueryType, TimeInterval},
AggregateShare, AggregateShareReq, BatchSelector,
Expand All @@ -27,6 +27,7 @@ use opentelemetry::{
};
use prio::{
codec::{Decode, Encode},
dp::DifferentialPrivacyStrategy,
vdaf,
};
use reqwest::Method;
Expand Down Expand Up @@ -81,24 +82,26 @@ impl CollectionJobDriver {
) -> Result<(), Error> {
match lease.leased().query_type() {
task::QueryType::TimeInterval => {
vdaf_dispatch!(lease.leased().vdaf(), (vdaf, VdafType, VERIFY_KEY_LENGTH) => {
vdaf_dispatch!(lease.leased().vdaf(), (vdaf, VdafType, VERIFY_KEY_LENGTH, DpStrategy, dp_strategy) => {
self.step_collection_job_generic::<
VERIFY_KEY_LENGTH,
C,
TimeInterval,
DpStrategy,
VdafType
>(datastore, Arc::new(vdaf), lease)
>(datastore, Arc::new(vdaf), lease, dp_strategy)
.await
})
}
task::QueryType::FixedSize { .. } => {
vdaf_dispatch!(lease.leased().vdaf(), (vdaf, VdafType, VERIFY_KEY_LENGTH) => {
vdaf_dispatch!(lease.leased().vdaf(), (vdaf, VdafType, VERIFY_KEY_LENGTH, DpStrategy, dp_strategy) => {
self.step_collection_job_generic::<
VERIFY_KEY_LENGTH,
C,
FixedSize,
DpStrategy,
VdafType
>(datastore, Arc::new(vdaf), lease)
>(datastore, Arc::new(vdaf), lease, dp_strategy)
.await
})
}
Expand All @@ -109,12 +112,14 @@ impl CollectionJobDriver {
const SEED_SIZE: usize,
C: Clock,
Q: CollectableQueryType,
A: vdaf::Aggregator<SEED_SIZE, 16> + Send + Sync,
S: DifferentialPrivacyStrategy + TryFrom<DpStrategyInstance>,
A: vdaf::AggregatorWithNoise<SEED_SIZE, 16, S> + Send + Sync,
>(
&self,
datastore: Arc<Datastore<C>>,
vdaf: Arc<A>,
lease: Arc<Lease<AcquiredCollectionJob>>,
dp_strategy: S,
) -> Result<(), Error>
where
A: 'static,
Expand Down Expand Up @@ -208,11 +213,23 @@ impl CollectionJobDriver {
return Ok(());
}

let (leader_aggregate_share, report_count, checksum) =
compute_aggregate_share::<SEED_SIZE, Q, A>(&task, &batch_aggregations)
let (mut leader_aggregate_share, report_count, checksum) =
compute_aggregate_share::<SEED_SIZE, Q, S, A>(&task, &batch_aggregations)
.await
.map_err(|e| datastore::Error::User(e.into()))?;

vdaf.add_noise_to_agg_share(
&dp_strategy,
collection_job.aggregation_parameter(),
&mut leader_aggregate_share,
report_count.try_into()?,
)
.map_err(|e| {
datastore::Error::DifferentialPrivacy(format!(
"Error when adding noise to aggregate share: {e}"
))
})?;

// Send an aggregate share request to the helper.
let req = AggregateShareReq::<Q>::new(
BatchSelector::new(collection_job.batch_identifier().clone()),
Expand Down
2 changes: 2 additions & 0 deletions aggregator/src/bin/janus_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,7 @@ mod tests {
aggregator_auth_tokens: []
collector_auth_tokens: []
hpke_keys: []
dp_strategy: !NoDifferentialPrivacy
- leader_aggregator_endpoint: https://leader
helper_aggregator_endpoint: https://helper
query_type: TimeInterval
Expand All @@ -756,6 +757,7 @@ mod tests {
aggregator_auth_tokens: []
collector_auth_tokens: []
hpke_keys: []
dp_strategy: !NoDifferentialPrivacy
"#;

let ephemeral_datastore = ephemeral_datastore().await;
Expand Down
3 changes: 3 additions & 0 deletions aggregator_core/src/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4967,6 +4967,9 @@ pub enum Error {
/// An invalid parameter was provided.
#[error("invalid parameter: {0}")]
InvalidParameter(&'static str),
/// An error occured when trying to ensure differential privacy.
#[error("differential privacy error: {0}")]
DifferentialPrivacy(String),
/// An error occurred while manipulating timestamps or durations.
#[error("{0}")]
TimeOverflow(&'static str),
Expand Down
1 change: 1 addition & 0 deletions aggregator_core/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use janus_messages::{
taskprov, AggregationJobId, CollectionJobId, Duration, HpkeAeadId, HpkeConfig, HpkeConfigId,
HpkeKdfId, HpkeKemId, Role, TaskId, Time,
};

use rand::{distributions::Standard, random, thread_rng, Rng};
use serde::{de::Error as _, Deserialize, Deserializer, Serialize, Serializer};
use std::{array::TryFromSliceError, collections::HashMap};
Expand Down
3 changes: 2 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[features]
fpvec_bounded_l2 = ["prio/experimental"]
fpvec_bounded_l2 = ["dep:fixed", "prio/experimental"]
test-util = [
"dep:assert_matches",
"dep:k8s-openapi",
Expand All @@ -38,6 +38,7 @@ base64.workspace = true
chrono = { workspace = true, features = ["clock"] }
derivative = "2.2.0"
futures = "0.3.28"
fixed = { version = "1.23", optional = true }
hex = "0.4"
hpke-dispatch = { version = "0.5.1", features = ["serde"] }
http = "0.2.9"
Expand Down
Loading

0 comments on commit 71d2de3

Please sign in to comment.