Skip to content

Commit

Permalink
Tracing span tweaks (#2509)
Browse files Browse the repository at this point in the history
* Instrument scrub_client_report()

* Rename spans for clarity
  • Loading branch information
divergentdave authored Jan 18, 2024
1 parent 7097d09 commit 10c6d0f
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 9 deletions.
2 changes: 1 addition & 1 deletion aggregator/src/aggregator/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl<const SEED_SIZE: usize, Q: AccumulableQueryType, A: vdaf::Aggregator<SEED_S
/// so, a set of unmergeable report IDs is returned; the contribution of the reports
/// corresponding to these IDs was not written back to the datastore because it is too late to
/// do so.
#[tracing::instrument(skip(self, tx), err)]
#[tracing::instrument(name = "Accumulator::flush_to_datastore", skip(self, tx), err)]
pub async fn flush_to_datastore<C: Clock>(
&self,
tx: &Transaction<'_, C>,
Expand Down
14 changes: 11 additions & 3 deletions aggregator/src/aggregator/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
observer.await;
}

#[tracing::instrument(skip_all, err)]
#[tracing::instrument(name = "AggregationJobCreator::update_tasks", skip_all, err)]
async fn update_tasks(
self: &Arc<Self>,
job_creation_task_shutdown_handles: &mut HashMap<TaskId, Stopper>,
Expand Down Expand Up @@ -207,7 +207,10 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
Ok(())
}

#[tracing::instrument(skip(self, stopper, job_creation_time_histogram))]
#[tracing::instrument(
name = "AggregationJobCreator::run_for_task",
skip(self, stopper, job_creation_time_histogram)
)]
async fn run_for_task(
self: Arc<Self>,
stopper: Stopper,
Expand Down Expand Up @@ -257,7 +260,12 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
}

// Returns true if at least one aggregation job was created.
#[tracing::instrument(skip(self, task), fields(task_id = ?task.id()), err)]
#[tracing::instrument(
name = "AggregationJobCreator::create_aggregation_jobs_for_task",
skip(self, task),
fields(task_id = ?task.id()),
err
)]
async fn create_aggregation_jobs_for_task(
self: Arc<Self>,
task: Arc<AggregatorTask>,
Expand Down
2 changes: 1 addition & 1 deletion aggregator/src/aggregator/aggregation_job_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl<const SEED_SIZE: usize, Q: CollectableQueryType, A: vdaf::Aggregator<SEED_S
///
/// A call to write, successful or not, does not change the internal state of the aggregation
/// job writer; calling write again will cause the same set of aggregation jobs to be written.
#[tracing::instrument(skip(self, tx), err)]
#[tracing::instrument(name = "AggregationJobWriter::write", skip(self, tx), err)]
pub async fn write<C>(
&self,
tx: &Transaction<'_, C>,
Expand Down
4 changes: 2 additions & 2 deletions aggregator/src/aggregator/garbage_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl<C: Clock> GarbageCollector<C> {
}
}

#[tracing::instrument(skip(self))]
#[tracing::instrument(name = "GarbageCollector::run", skip(self))]
pub async fn run(&self) -> Result<()> {
// TODO(#224): add support for handling only a subset of tasks in a single job (i.e. sharding).

Expand Down Expand Up @@ -113,7 +113,7 @@ impl<C: Clock> GarbageCollector<C> {
Ok(())
}

#[tracing::instrument(skip(self))]
#[tracing::instrument(name = "GarbageCollector::gc_tasks", skip(self))]
async fn gc_tasks(&self, task_ids: Vec<TaskId>) -> Result<()> {
let task_ids = Arc::new(task_ids);
let (client_reports_deleted, aggregation_jobs_deleted, batches_deleted) = self
Expand Down
4 changes: 2 additions & 2 deletions aggregator/src/aggregator/report_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl<C: Clock> ReportWriteBatcher<C> {
rslt_rx.await.unwrap()
}

#[tracing::instrument(skip(ds, report_rx))]
#[tracing::instrument(name = "ReportWriteBatcher::run_upload_batcher", skip(ds, report_rx))]
async fn run_upload_batcher(
ds: Arc<Datastore<C>>,
mut report_rx: ReportWriteBatcherReceiver<C>,
Expand Down Expand Up @@ -111,7 +111,7 @@ impl<C: Clock> ReportWriteBatcher<C> {
}
}

#[tracing::instrument(skip_all)]
#[tracing::instrument(name = "ReportWriteBatcher::write_batch", skip_all)]
async fn write_batch(
ds: Arc<Datastore<C>>,
report_writers: Vec<Box<dyn ReportWriter<C>>>,
Expand Down
1 change: 1 addition & 0 deletions aggregator_core/src/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1516,6 +1516,7 @@ impl<C: Clock> Transaction<'_, C> {
/// This method is intended for use by aggregators acting in the Leader role. Scrubbed reports
/// can no longer be read, so this method should only be called once all aggregations over the
/// report have stepped past their START state.
#[tracing::instrument(skip(self), err)]
pub async fn scrub_client_report(
&self,
task_id: &TaskId,
Expand Down

0 comments on commit 10c6d0f

Please sign in to comment.