Skip to content

Commit

Permalink
Fix infinite retry bug (#2125)
Browse files Browse the repository at this point in the history
* Fix infinite retry bug.

An infinite retry could be triggered if a report with the same ID as an expired report was ssubmitted.

* Add more detail to error

It does somewhat matter what error we return here, because errors returned from this function are
logged.

* PR feedback
  • Loading branch information
inahga authored Oct 18, 2023
1 parent adef889 commit e4435c6
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 9 deletions.
53 changes: 44 additions & 9 deletions aggregator_core/src/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1473,12 +1473,43 @@ impl<C: Clock> Transaction<'_, C> {
// Slow path: no rows were affected, meaning a row with the new report ID already existed
// and we hit the query's ON CONFLICT DO NOTHING clause. We need to check whether the new
// report matches the existing one.
let existing_report = match self
.get_client_report(vdaf, new_report.task_id(), new_report.metadata().id())
let existing_report = {
// We intentionally don't use `get_client_report` because it omits expired reports. It
// is possible that we have conflicted with an expired report that hasn't been fully
// GC'd yet.
let stmt = self
.prepare_cached(
"SELECT
client_reports.client_timestamp,
client_reports.extensions,
client_reports.public_share,
client_reports.leader_input_share,
client_reports.helper_encrypted_input_share
FROM client_reports
JOIN tasks ON tasks.id = client_reports.task_id
WHERE tasks.task_id = $1
AND client_reports.report_id = $2",
)
.await?;

self.query_opt(
&stmt,
&[
/* task_id */ &new_report.task_id().as_ref(),
/* report_id */ &new_report.metadata().id().as_ref(),
],
)
.await?
{
Some(e) => e,
None => {
.map(|row| {
Self::client_report_from_row(
vdaf,
*new_report.task_id(),
*new_report.metadata().id(),
row,
)
})
.transpose()?
.ok_or_else(|| {
// This codepath can be taken due to a quirk of how the Repeatable Read isolation
// level works. It cannot occur at the Serializable isolation level.
//
Expand All @@ -1497,11 +1528,12 @@ impl<C: Clock> Transaction<'_, C> {
// `None` (since all reads in the same transaction are from the same snapshot), so
// so it can't evaluate idempotency. All it can do is give up on this transaction
// and try again, by calling `retry` and returning an error; once it retries, it
// will be able to read the report written by the successful writer. (It doesn't
// matter what error we return here, as the transaction will be retried.)
// will be able to read the report written by the successful writer.
self.retry();
return Err(Error::MutationTargetAlreadyExists);
}
Error::Concurrency(
"retrying transaction because another writer has concurrently inserted this report",
)
})?
};

// If the existing report does not match the new report, then someone is trying to mutate an
Expand Down Expand Up @@ -4970,6 +5002,9 @@ pub enum Error {
TimeOverflow(&'static str),
#[error("batch already collected")]
AlreadyCollected,
/// An error that occurred due to concurrency problems with another Janus replica.
#[error("{0}")]
Concurrency(&'static str),
}

impl From<ring::error::Unspecified> for Error {
Expand Down
68 changes: 68 additions & 0 deletions aggregator_core/src/datastore/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7317,3 +7317,71 @@ async fn roundtrip_taskprov_peer_aggregator(ephemeral_datastore: EphemeralDatast
.await
.unwrap();
}

#[rstest_reuse::apply(schema_versions_template)]
#[tokio::test]
async fn reject_expired_reports_with_same_id(ephemeral_datastore: EphemeralDatastore) {
install_test_trace_subscriber();
let clock = MockClock::default();
let datastore = ephemeral_datastore.datastore(clock.clone()).await;

let report_expiry_age = Duration::from_seconds(60);
let task = TaskBuilder::new(task::QueryType::TimeInterval, VdafInstance::Fake)
.with_report_expiry_age(Some(report_expiry_age))
.build()
.leader_view()
.unwrap();

datastore.put_aggregator_task(&task).await.unwrap();

// Use same ID for each report.
let report_id = random();

datastore
.run_unnamed_tx(|tx| {
let report = LeaderStoredReport::<0, dummy_vdaf::Vdaf>::new(
*task.id(),
ReportMetadata::new(report_id, clock.now()),
(),
Vec::new(),
dummy_vdaf::InputShare::default(),
HpkeCiphertext::new(
HpkeConfigId::from(13),
Vec::from("encapsulated_context_0"),
Vec::from("payload_0"),
),
);
Box::pin(async move {
tx.put_client_report(&dummy_vdaf::Vdaf::new(), &report)
.await
})
})
.await
.unwrap();

// Advance the clock well past the report expiry age.
clock.advance(&report_expiry_age.add(&report_expiry_age).unwrap());

// Insert a client report with the same ID, but a more current timestamp.
let result = datastore
.run_unnamed_tx(|tx| {
let report = LeaderStoredReport::<0, dummy_vdaf::Vdaf>::new(
*task.id(),
ReportMetadata::new(report_id, clock.now()),
(),
Vec::new(),
dummy_vdaf::InputShare::default(),
HpkeCiphertext::new(
HpkeConfigId::from(13),
Vec::from("encapsulated_context_0"),
Vec::from("payload_0"),
),
);
Box::pin(async move {
tx.put_client_report(&dummy_vdaf::Vdaf::new(), &report)
.await
})
})
.await;
assert_matches!(result, Err(Error::MutationTargetAlreadyExists));
}

0 comments on commit e4435c6

Please sign in to comment.