Skip to content

Commit

Permalink
Fixed-size: return each batch to at most one current-batch query. (#2391
Browse files Browse the repository at this point in the history
)

Part of DAP-09 implementation (#2389). With this change, Collectors are
expected to remember the collection job IDs they generate before sending
the collection request. If the Collector encounters a transient failure,
it can then resend the request (with the same collection job ID), and
rely on request idempotency for things to work out.

The upside of this change is that Collectors no longer need to have
logic to deduplicate multiple current-batch requests that are mapped to
the same underlying batch, and they can generate current-batch requests
at higher rates without risking more duplicated collections.
  • Loading branch information
branlwyd authored Jan 13, 2024
1 parent bba8b91 commit 0d83a83
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 112 deletions.
18 changes: 7 additions & 11 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2655,15 +2655,13 @@ impl VdafOps {
)
})?;

let (batches, _) = try_join!(
Q::get_batches_for_collection_identifier(
tx,
&task,
collection_job.batch_identifier(),
collection_job.aggregation_parameter()
),
Q::acknowledge_collection(tx, task.id(), collection_job.batch_identifier()),
)?;
let batches = Q::get_batches_for_collection_identifier(
tx,
&task,
collection_job.batch_identifier(),
collection_job.aggregation_parameter(),
)
.await?;

// Merge the intervals spanned by the constituent batch aggregations into the
// interval spanned by the collection.
Expand Down Expand Up @@ -2823,8 +2821,6 @@ impl VdafOps {
Error::UnrecognizedCollectionJob(collection_job_id).into(),
)
})?;
Q::acknowledge_collection(tx, task.id(), collection_job.batch_identifier())
.await?;
if collection_job.state() != &CollectionJobState::Deleted {
tx.update_collection_job::<SEED_SIZE, Q, A>(
&collection_job.with_state(CollectionJobState::Deleted),
Expand Down
70 changes: 22 additions & 48 deletions aggregator_core/src/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3806,61 +3806,35 @@ impl<C: Clock> Transaction<'_, C> {
))
}

/// Deletes an outstanding batch.
#[tracing::instrument(skip(self), err)]
pub async fn delete_outstanding_batch(
&self,
task_id: &TaskId,
batch_id: &BatchId,
) -> Result<(), Error> {
let stmt = self
.prepare_cached(
"DELETE FROM outstanding_batches
WHERE task_id = (SELECT id FROM tasks WHERE task_id = $1)
AND batch_id = $2",
)
.await?;

self.execute(
&stmt,
&[
/* task_id */ task_id.as_ref(),
/* batch_id */ batch_id.as_ref(),
],
)
.await?;
Ok(())
}

/// Retrieves an outstanding batch for the given task with at least the given number of
/// successfully-aggregated reports.
/// successfully-aggregated reports, removing it from the datastore.
#[tracing::instrument(skip(self), err)]
pub async fn get_filled_outstanding_batch(
pub async fn acquire_filled_outstanding_batch(
&self,
task_id: &TaskId,
min_report_count: u64,
) -> Result<Option<BatchId>, Error> {
let stmt = self
.prepare_cached(
"WITH batches AS (
SELECT
outstanding_batches.batch_id AS batch_id,
SUM(batch_aggregations.report_count) AS count
FROM outstanding_batches
JOIN tasks ON tasks.id = outstanding_batches.task_id
JOIN batch_aggregations
ON batch_aggregations.task_id = outstanding_batches.task_id
AND batch_aggregations.batch_identifier = outstanding_batches.batch_id
JOIN batches
ON batches.task_id = outstanding_batches.task_id
AND batches.batch_identifier = outstanding_batches.batch_id
WHERE tasks.task_id = $1
AND UPPER(batches.client_timestamp_interval) >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)
GROUP BY outstanding_batches.batch_id
)
SELECT batch_id FROM batches WHERE count >= $2::BIGINT LIMIT 1",
let stmt = self.prepare_cached(
"WITH selected_outstanding_batch AS (
SELECT outstanding_batches.id
FROM outstanding_batches
JOIN tasks ON tasks.id = outstanding_batches.task_id
JOIN batch_aggregations
ON batch_aggregations.task_id = outstanding_batches.task_id
AND batch_aggregations.batch_identifier = outstanding_batches.batch_id
JOIN batches
ON batches.task_id = outstanding_batches.task_id
AND batches.batch_identifier = outstanding_batches.batch_id
WHERE tasks.task_id = $1
AND UPPER(batches.client_timestamp_interval) >= COALESCE($3::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)
GROUP BY outstanding_batches.id
HAVING SUM(batch_aggregations.report_count) >= $2::BIGINT
LIMIT 1
)
.await?;
DELETE FROM outstanding_batches WHERE id IN (SELECT id FROM selected_outstanding_batch) RETURNING batch_id"
)
.await?;

self.query_opt(
&stmt,
&[
Expand Down
45 changes: 19 additions & 26 deletions aggregator_core/src/datastore/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5305,19 +5305,30 @@ async fn roundtrip_outstanding_batch(ephemeral_datastore: EphemeralDatastore) {
.run_unnamed_tx(|tx| {
Box::pin(async move {
let outstanding_batches_task_1 =
tx.get_outstanding_batches(&task_id_1, &None).await?;
let outstanding_batch_1 = tx.get_filled_outstanding_batch(&task_id_1, 1).await?;
let outstanding_batch_2 = tx.get_filled_outstanding_batch(&task_id_1, 2).await?;
let outstanding_batch_3 = tx.get_filled_outstanding_batch(&task_id_1, 3).await?;
tx.get_outstanding_batches(&task_id_1, &None).await.unwrap();
let outstanding_batch_1 = tx
.acquire_filled_outstanding_batch(&task_id_1, 3)
.await
.unwrap();
let outstanding_batch_2 = tx
.acquire_filled_outstanding_batch(&task_id_1, 2)
.await
.unwrap();
let outstanding_batch_3 = tx
.acquire_filled_outstanding_batch(&task_id_1, 1)
.await
.unwrap();
let outstanding_batches_task_2 = tx
.get_outstanding_batches(&task_id_2, &Some(time_bucket_start))
.await?;
.await
.unwrap();
let outstanding_batches_empty_time_bucket = tx
.get_outstanding_batches(
&task_id_2,
&Some(time_bucket_start.add(&Duration::from_hours(24)?)?),
)
.await?;
.await
.unwrap();
Ok((
outstanding_batches_task_1,
outstanding_batch_1,
Expand All @@ -5338,9 +5349,9 @@ async fn roundtrip_outstanding_batch(ephemeral_datastore: EphemeralDatastore) {
RangeInclusive::new(2, 4)
)])
);
assert_eq!(outstanding_batch_1, Some(batch_id_1));
assert_eq!(outstanding_batch_1, None); // min_report_count too large
assert_eq!(outstanding_batch_2, Some(batch_id_1));
assert_eq!(outstanding_batch_3, None);
assert_eq!(outstanding_batch_3, None); // already retrieved
assert_eq!(
outstanding_batches_task_2,
Vec::from([OutstandingBatch::new(
Expand All @@ -5362,24 +5373,6 @@ async fn roundtrip_outstanding_batch(ephemeral_datastore: EphemeralDatastore) {
.await
.unwrap();
assert!(outstanding_batches.is_empty());

// Reset the clock to "un-expire" the written batches. (...don't try this in prod.)
clock.set(OLDEST_ALLOWED_REPORT_TIMESTAMP);

// Delete the outstanding batch, then check that it is no longer available.
ds.run_unnamed_tx(|tx| {
Box::pin(async move { tx.delete_outstanding_batch(&task_id_1, &batch_id_1).await })
})
.await
.unwrap();

let outstanding_batches = ds
.run_unnamed_tx(|tx| {
Box::pin(async move { tx.get_outstanding_batches(&task_id_1, &None).await })
})
.await
.unwrap();
assert!(outstanding_batches.is_empty());
}

#[rstest_reuse::apply(schema_versions_template)]
Expand Down
28 changes: 1 addition & 27 deletions aggregator_core/src/query_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,16 +304,6 @@ pub trait CollectableQueryType: AccumulableQueryType {
.flatten()
.collect::<Vec<_>>())
}

/// Acknowledges that a collection attempt has been made, allowing any query-type specific
/// updates to be made. For exmaple, a task using fixed-size queries might remove the given
/// batch to be removed from the list of batches ready to be returned by a `current-batch`
/// query.
async fn acknowledge_collection<C: Clock>(
tx: &Transaction<'_, C>,
task_id: &TaskId,
batch_identifier: &Self::BatchIdentifier,
) -> Result<(), datastore::Error>;
}

#[async_trait]
Expand Down Expand Up @@ -357,14 +347,6 @@ impl CollectableQueryType for TimeInterval {
tx.count_client_reports_for_interval(task.id(), batch_interval)
.await
}

async fn acknowledge_collection<C: Clock>(
_: &Transaction<'_, C>,
_: &TaskId,
_: &Self::BatchIdentifier,
) -> Result<(), datastore::Error> {
Ok(()) // Purposeful no-op.
}
}

// This type only exists because the CollectableQueryType trait requires specifying the type of the
Expand Down Expand Up @@ -434,7 +416,7 @@ impl CollectableQueryType for FixedSize {
match query.fixed_size_query() {
FixedSizeQuery::ByBatchId { batch_id } => Ok(Some(*batch_id)),
FixedSizeQuery::CurrentBatch => {
tx.get_filled_outstanding_batch(task.id(), task.min_batch_size())
tx.acquire_filled_outstanding_batch(task.id(), task.min_batch_size())
.await
}
}
Expand All @@ -459,14 +441,6 @@ impl CollectableQueryType for FixedSize {
tx.count_client_reports_for_batch_id(task.id(), batch_id)
.await
}

async fn acknowledge_collection<C: Clock>(
tx: &Transaction<'_, C>,
task_id: &TaskId,
batch_identifier: &Self::BatchIdentifier,
) -> Result<(), datastore::Error> {
tx.delete_outstanding_batch(task_id, batch_identifier).await
}
}

#[cfg(test)]
Expand Down

0 comments on commit 0d83a83

Please sign in to comment.