From b5aa8184a73b035eedcabcbd55e3cc0d4c44eca0 Mon Sep 17 00:00:00 2001 From: Tim Geoghegan Date: Sat, 27 Jan 2024 09:57:33 -0800 Subject: [PATCH 1/3] Downgrade various logs to `DEBUG` level The Trillium handler for errors was unconditionally logging them at `WARN`, generating large volumes of logging for problems that aren't Janus' fault, like reports whose timestamps are in the future. Now, we check whether the respones indicates a server error before bothing to log it (keeping in mind that client errors will still appear in metrics). Additionally, we've made liberal use of the [`tracing::instrument`][1] attribute macro to decorate various functions and methods with tracing spans, and in particular its `err` argument for logging anytime a function that returns `Result` returns an `Err`. Mostly, logging those errors is handled at a higher level -- say, in a Trillium handler or in the top-level loop of `aggregation_job_driver` -- so logging them at level `ERROR` at the function call itself isn't that helpful. This commit further qualifies the `err` argument so that those errors are now logged at `DEBUG`, so we can opt back into them should they prove useful. [1]: https://docs.rs/tracing/latest/tracing/attr.instrument.html --- aggregator/src/aggregator.rs | 22 ++-- aggregator/src/aggregator/http_handlers.rs | 9 +- aggregator_core/src/datastore.rs | 122 ++++++++++----------- 3 files changed, 79 insertions(+), 74 deletions(-) diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index a17711eec..cc67c4dd2 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -699,7 +699,7 @@ impl Aggregator { } /// Opts in or out of a taskprov task. - #[tracing::instrument(skip(self, aggregator_auth_token), err)] + #[tracing::instrument(skip(self, aggregator_auth_token), err(level = Level::DEBUG))] async fn taskprov_opt_in( &self, peer_role: &Role, @@ -772,7 +772,7 @@ impl Aggregator { /// Validate and authorize a taskprov request. Returns values necessary for determining whether /// we can opt into the task. This function might return an opt-out error for conditions that /// are relevant for all DAP workflows (e.g. task expiration). - #[tracing::instrument(skip(self, aggregator_auth_token), err)] + #[tracing::instrument(skip(self, aggregator_auth_token), err(level = Level::DEBUG))] async fn taskprov_authorize_request( &self, peer_role: &Role, @@ -1285,7 +1285,7 @@ macro_rules! vdaf_ops_dispatch { } impl VdafOps { - #[tracing::instrument(skip_all, fields(task_id = ?task.id()), err)] + #[tracing::instrument(skip_all, fields(task_id = ?task.id()), err(level = Level::DEBUG))] async fn handle_upload( &self, clock: &C, @@ -1336,7 +1336,7 @@ impl VdafOps { #[tracing::instrument( skip(self, datastore, global_hpke_keypairs, aggregate_step_failure_counter, task, req_bytes), fields(task_id = ?task.id()), - err + err(level = Level::DEBUG) )] async fn handle_aggregate_init( &self, @@ -1387,7 +1387,7 @@ impl VdafOps { #[tracing::instrument( skip(self, datastore, aggregate_step_failure_counter, task, req, request_hash), fields(task_id = ?task.id()), - err + err(level = Level::DEBUG) )] async fn handle_aggregate_continue( &self, @@ -1433,7 +1433,7 @@ impl VdafOps { } } - #[tracing::instrument(skip(self, datastore), fields(task_id = ?task.id()), err)] + #[tracing::instrument(skip(self, datastore), fields(task_id = ?task.id()), err(level = Level::DEBUG))] async fn handle_aggregate_delete( &self, datastore: &Datastore, @@ -2439,7 +2439,7 @@ impl VdafOps { #[tracing::instrument( skip(self, datastore, task, collection_req_bytes), fields(task_id = ?task.id()), - err + err(level = Level::DEBUG) )] async fn handle_create_collection_job( &self, @@ -2741,7 +2741,7 @@ impl VdafOps { /// Handle GET requests to the leader's `tasks/{task-id}/collection_jobs/{collection-job-id}` /// endpoint. The return value is an encoded `CollectResp`. /// - #[tracing::instrument(skip(self, datastore, task), fields(task_id = ?task.id()), err)] + #[tracing::instrument(skip(self, datastore, task), fields(task_id = ?task.id()), err(level = Level::DEBUG))] async fn handle_get_collection_job( &self, datastore: &Datastore, @@ -2918,7 +2918,7 @@ impl VdafOps { } } - #[tracing::instrument(skip(self, datastore, task), fields(task_id = ?task.id()), err)] + #[tracing::instrument(skip(self, datastore, task), fields(task_id = ?task.id()), err(level = Level::DEBUG))] async fn handle_delete_collection_job( &self, datastore: &Datastore, @@ -3001,7 +3001,7 @@ impl VdafOps { #[tracing::instrument( skip(self, datastore, clock, task, req_bytes), fields(task_id = ?task.id()), - err + err(level = Level::DEBUG) )] async fn handle_aggregate_share( &self, @@ -3311,7 +3311,7 @@ struct RequestBody { http_request_duration_histogram, ), fields(url = %url), - err, + err(level = Level::DEBUG), )] async fn send_request_to_helper( http_client: &Client, diff --git a/aggregator/src/aggregator/http_handlers.rs b/aggregator/src/aggregator/http_handlers.rs index 09256d862..2260df7df 100644 --- a/aggregator/src/aggregator/http_handlers.rs +++ b/aggregator/src/aggregator/http_handlers.rs @@ -42,8 +42,7 @@ impl Handler for Error { async fn run(&self, mut conn: Conn) -> Conn { let error_code = self.error_code(); conn.set_state(ErrorCode(error_code)); - warn!(error_code, error=?self, "Error handling endpoint"); - match self { + let conn = match self { Error::InvalidConfiguration(_) => conn.with_status(Status::InternalServerError), Error::MessageDecode(_) => conn .with_problem_document(&ProblemDocument::new_dap(DapProblemType::InvalidMessage)), @@ -154,7 +153,13 @@ impl Handler for Error { &ProblemDocument::new_dap(DapProblemType::InvalidTask).with_task_id(task_id), ), Error::DifferentialPrivacy(_) => conn.with_status(Status::InternalServerError), + }; + + if matches!(conn.status(), Some(status) if status.is_server_error()) { + warn!(error_code, error=?self, "Error handling endpoint"); } + + conn } } diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index e00afeb55..dae439067 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -711,7 +711,7 @@ impl Transaction<'_, C> { } /// Fetch the task parameters corresponing to the provided `task_id`. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_aggregator_task( &self, task_id: &TaskId, @@ -744,7 +744,7 @@ impl Transaction<'_, C> { } /// Fetch all the tasks in the database. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_aggregator_tasks(&self) -> Result, Error> { let stmt = self .prepare_cached( @@ -943,7 +943,7 @@ impl Transaction<'_, C> { /// Retrieves report & report aggregation metrics for a given task: either a tuple /// `Some((report_count, report_aggregation_count))`, or None if the task does not exist. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_task_metrics(&self, task_id: &TaskId) -> Result, Error> { let stmt = self .prepare_cached( @@ -985,7 +985,7 @@ impl Transaction<'_, C> { /// IDs in lexicographic order, but may not retrieve the IDs of all tasks in a single call. To /// retrieve additional task IDs, make additional calls to this method while specifying the /// `lower_bound` parameter to be the last task ID retrieved from the previous call. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_task_ids(&self, lower_bound: Option) -> Result, Error> { let lower_bound = lower_bound.map(|task_id| task_id.as_ref().to_vec()); let stmt = self @@ -1004,7 +1004,7 @@ impl Transaction<'_, C> { } /// get_client_report retrieves a client report by ID. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_client_report( &self, vdaf: &A, @@ -1180,7 +1180,7 @@ impl Transaction<'_, C> { /// relies on this assumption to find relevant reports without consulting collection jobs. For /// VDAFs that do have a different aggregation parameter, /// `get_unaggregated_client_report_ids_by_collect_for_task` should be used instead. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_unaggregated_client_reports_for_task< const SEED_SIZE: usize, A: vdaf::Aggregator, @@ -1242,7 +1242,7 @@ impl Transaction<'_, C> { /// should generally only be called on report IDs returned from /// `get_unaggregated_client_report_ids_for_task`, as part of the same transaction, for any /// client reports that are not added to an aggregation job. - #[tracing::instrument(skip(self, report_ids), err)] + #[tracing::instrument(skip(self, report_ids), err(level = Level::DEBUG))] pub async fn mark_reports_unaggregated( &self, task_id: &TaskId, @@ -1314,7 +1314,7 @@ impl Transaction<'_, C> { /// Determines whether the given task includes any client reports which have not yet started the /// aggregation process in the given interval. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn interval_has_unaggregated_reports( &self, task_id: &TaskId, @@ -1348,7 +1348,7 @@ impl Transaction<'_, C> { /// Return the number of reports in the provided task whose timestamp falls within the provided /// interval, regardless of whether the reports have been aggregated or collected. Applies only /// to time-interval queries. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn count_client_reports_for_interval( &self, task_id: &TaskId, @@ -1383,7 +1383,7 @@ impl Transaction<'_, C> { /// Return the number of reports in the provided task & batch, regardless of whether the reports /// have been aggregated or collected. Applies only to fixed-size queries. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn count_client_reports_for_batch_id( &self, task_id: &TaskId, @@ -1420,7 +1420,7 @@ impl Transaction<'_, C> { /// the associated encrypted helper share. Returns `Ok(())` if the write succeeds, or if there /// was already a row in the table matching `new_report`. Returns an error if something goes /// wrong or if the report ID is already in use with different values. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn put_client_report( &self, vdaf: &A, @@ -1528,7 +1528,7 @@ impl Transaction<'_, C> { /// This method is intended for use by aggregators acting in the Leader role. Scrubbed reports /// can no longer be read, so this method should only be called once all aggregations over the /// report have stepped past their START state. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn scrub_client_report( &self, task_id: &TaskId, @@ -1601,7 +1601,7 @@ impl Transaction<'_, C> { /// /// Returns `Err(Error::MutationTargetAlreadyExists)` if an attempt to mutate an existing row /// (e.g., changing the timestamp for a known report ID) is detected. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn put_report_share( &self, task_id: &TaskId, @@ -1639,7 +1639,7 @@ impl Transaction<'_, C> { } /// get_aggregation_job retrieves an aggregation job by ID. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_aggregation_job< const SEED_SIZE: usize, Q: QueryType, @@ -1677,7 +1677,7 @@ impl Transaction<'_, C> { /// get_aggregation_jobs_for_task returns all aggregation jobs for a given task ID. #[cfg(feature = "test-util")] #[cfg_attr(docsrs, doc(cfg(feature = "test-util")))] - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_aggregation_jobs_for_task< const SEED_SIZE: usize, Q: QueryType, @@ -1751,7 +1751,7 @@ impl Transaction<'_, C> { /// aggregation jobs. At most `maximum_acquire_count` jobs are acquired. The job is acquired /// with a "lease" that will time out; the desired duration of the lease is a parameter, and the /// returned lease provides the absolute timestamp at which the lease is no longer live. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn acquire_incomplete_aggregation_jobs( &self, lease_duration: &StdDuration, @@ -1824,7 +1824,7 @@ impl Transaction<'_, C> { /// release_aggregation_job releases an acquired (via e.g. acquire_incomplete_aggregation_jobs) /// aggregation job. It returns an error if the aggregation job has no current lease. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn release_aggregation_job( &self, lease: &Lease, @@ -1865,7 +1865,7 @@ impl Transaction<'_, C> { } /// put_aggregation_job stores an aggregation job. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn put_aggregation_job< const SEED_SIZE: usize, Q: QueryType, @@ -1942,7 +1942,7 @@ impl Transaction<'_, C> { } /// update_aggregation_job updates a stored aggregation job. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn update_aggregation_job< const SEED_SIZE: usize, Q: QueryType, @@ -1986,7 +1986,7 @@ impl Transaction<'_, C> { /// Check whether the report has ever been aggregated with the given parameter, for an /// aggregation job besides the given one. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn check_other_report_aggregation_exists( &self, task_id: &TaskId, @@ -2026,7 +2026,7 @@ impl Transaction<'_, C> { } /// get_report_aggregation gets a report aggregation by ID. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_report_aggregation< const SEED_SIZE: usize, A: vdaf::Aggregator, @@ -2087,7 +2087,7 @@ impl Transaction<'_, C> { /// get_report_aggregations_for_aggregation_job retrieves all report aggregations associated /// with a given aggregation job, ordered by their natural ordering. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_report_aggregations_for_aggregation_job< const SEED_SIZE: usize, A: vdaf::Aggregator, @@ -2350,7 +2350,7 @@ impl Transaction<'_, C> { } /// put_report_aggregation stores aggregation data for a single report. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn put_report_aggregation< const SEED_SIZE: usize, A: vdaf::Aggregator, @@ -2411,7 +2411,7 @@ impl Transaction<'_, C> { Ok(()) } - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn update_report_aggregation< const SEED_SIZE: usize, A: vdaf::Aggregator, @@ -2479,7 +2479,7 @@ impl Transaction<'_, C> { } /// Returns the collection job for the provided ID, or `None` if no such collection job exists. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_collection_job< const SEED_SIZE: usize, Q: QueryType, @@ -2531,7 +2531,7 @@ impl Transaction<'_, C> { /// Returns all collection jobs for the given task which include the given timestamp. Applies /// only to time-interval tasks. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_collection_jobs_including_time< const SEED_SIZE: usize, A: vdaf::Aggregator, @@ -2580,7 +2580,7 @@ impl Transaction<'_, C> { /// Returns all collection jobs for the given task whose collect intervals intersect with the /// given interval. Applies only to time-interval tasks. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_collection_jobs_intersecting_interval< const SEED_SIZE: usize, A: vdaf::Aggregator, @@ -2635,7 +2635,7 @@ impl Transaction<'_, C> { /// Retrieves all collection jobs for the given batch ID. Multiple collection jobs may be /// returned with distinct aggregation parameters. Applies only to fixed-size tasks. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_collection_jobs_by_batch_id< const SEED_SIZE: usize, A: vdaf::Aggregator, @@ -2798,7 +2798,7 @@ impl Transaction<'_, C> { } /// Stores a new collection job. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn put_collection_job< const SEED_SIZE: usize, Q: CollectableQueryType, @@ -2880,7 +2880,7 @@ impl Transaction<'_, C> { /// collection jobs. At most `maximum_acquire_count` jobs are acquired. The job is acquired with /// a "lease" that will time out; the desired duration of the lease is a parameter, and the /// lease expiration time is returned. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn acquire_incomplete_collection_jobs( &self, lease_duration: &StdDuration, @@ -2948,7 +2948,7 @@ impl Transaction<'_, C> { /// release_collection_job releases an acquired (via e.g. acquire_incomplete_collection_jobs) /// collect job. It returns an error if the collection job has no current lease. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn release_collection_job( &self, lease: &Lease, @@ -2988,7 +2988,7 @@ impl Transaction<'_, C> { } /// Updates an existing collection job. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn update_collection_job< const SEED_SIZE: usize, Q: QueryType, @@ -3059,7 +3059,7 @@ impl Transaction<'_, C> { } /// Retrieves an existing batch aggregation. - #[tracing::instrument(skip(self, aggregation_parameter), err)] + #[tracing::instrument(skip(self, aggregation_parameter), err(level = Level::DEBUG))] pub async fn get_batch_aggregation< const SEED_SIZE: usize, Q: QueryType, @@ -3116,7 +3116,7 @@ impl Transaction<'_, C> { /// Retrieves all batch aggregations stored for a given batch, identified by task ID, batch /// identifier, and aggregation parameter. - #[tracing::instrument(skip(self, aggregation_parameter), err)] + #[tracing::instrument(skip(self, aggregation_parameter), err(level = Level::DEBUG))] pub async fn get_batch_aggregations_for_batch< const SEED_SIZE: usize, Q: QueryType, @@ -3260,7 +3260,7 @@ impl Transaction<'_, C> { } /// Store a new `batch_aggregations` row in the datastore. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn put_batch_aggregation< const SEED_SIZE: usize, Q: AccumulableQueryType, @@ -3358,7 +3358,7 @@ impl Transaction<'_, C> { /// Update an existing `batch_aggregations` row with the values from the provided batch /// aggregation. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn update_batch_aggregation< const SEED_SIZE: usize, Q: QueryType, @@ -3427,7 +3427,7 @@ impl Transaction<'_, C> { /// Fetch an [`AggregateShareJob`] from the datastore corresponding to given parameters, or /// `None` if no such job exists. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_aggregate_share_job< const SEED_SIZE: usize, Q: QueryType, @@ -3474,7 +3474,7 @@ impl Transaction<'_, C> { /// Returns all aggregate share jobs for the given task which include the given timestamp. /// Applies only to time-interval tasks. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_aggregate_share_jobs_including_time< const SEED_SIZE: usize, A: vdaf::Aggregator, @@ -3525,7 +3525,7 @@ impl Transaction<'_, C> { /// Returns all aggregate share jobs for the given task whose collect intervals intersect with /// the given interval. Applies only to time-interval tasks. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_aggregate_share_jobs_intersecting_interval< const SEED_SIZE: usize, A: vdaf::Aggregator, @@ -3577,7 +3577,7 @@ impl Transaction<'_, C> { /// Returns all aggregate share jobs for the given task with the given batch identifier. /// Multiple aggregate share jobs may be returned with distinct aggregation parameters. /// Applies only to fixed-size tasks. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_aggregate_share_jobs_by_batch_id< const SEED_SIZE: usize, A: vdaf::Aggregator, @@ -3688,7 +3688,7 @@ impl Transaction<'_, C> { } /// Put an `aggregate_share_job` row into the datastore. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn put_aggregate_share_job< const SEED_SIZE: usize, Q: CollectableQueryType, @@ -3767,7 +3767,7 @@ impl Transaction<'_, C> { /// Writes an outstanding batch. (This method does not take an [`OutstandingBatch`] as several /// of the included values are read implicitly.) - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn put_outstanding_batch( &self, task_id: &TaskId, @@ -3832,7 +3832,7 @@ impl Transaction<'_, C> { } /// Retrieves all [`OutstandingBatch`]es for a given task and time bucket, if applicable. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_outstanding_batches( &self, task_id: &TaskId, @@ -3940,7 +3940,7 @@ impl Transaction<'_, C> { /// Retrieves an outstanding batch for the given task with at least the given number of /// successfully-aggregated reports, removing it from the datastore. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn acquire_filled_outstanding_batch( &self, task_id: &TaskId, @@ -3982,7 +3982,7 @@ impl Transaction<'_, C> { /// Puts a `batch` into the datastore. Returns `MutationTargetAlreadyExists` if the batch is /// already stored. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn put_batch< const SEED_SIZE: usize, Q: AccumulableQueryType, @@ -4060,7 +4060,7 @@ impl Transaction<'_, C> { /// Updates a given `batch` in the datastore. Returns `MutationTargetNotFound` if no such batch /// is currently stored. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn update_batch< const SEED_SIZE: usize, Q: QueryType, @@ -4106,7 +4106,7 @@ impl Transaction<'_, C> { /// Gets a given `batch` from the datastore, based on the primary key. Returns `None` if no such /// batch is stored in the datastore. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_batch< const SEED_SIZE: usize, Q: QueryType, @@ -4212,7 +4212,7 @@ impl Transaction<'_, C> { /// Deletes old client reports for a given task, that is, client reports whose timestamp is /// older than the task's report expiry age. Up to `limit` client reports will be deleted. /// Returns the number of client reports deleted. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn delete_expired_client_reports( &self, task_id: &TaskId, @@ -4249,7 +4249,7 @@ impl Transaction<'_, C> { /// older than the task's report expiry age. Up to `limit` aggregation jobs will be deleted, /// along with all related aggregation artifacts. Returns the number of aggregation jobs /// deleted. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn delete_expired_aggregation_artifacts( &self, task_id: &TaskId, @@ -4306,7 +4306,7 @@ impl Transaction<'_, C> { /// Up to `limit` batches will be deleted, along with all related collection artifacts. /// /// Returns the number of batches deleted. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn delete_expired_collection_artifacts( &self, task_id: &TaskId, @@ -4371,7 +4371,7 @@ impl Transaction<'_, C> { } /// Retrieve all global HPKE keypairs. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_global_hpke_keypairs(&self) -> Result, Error> { let stmt = self .prepare_cached( @@ -4387,7 +4387,7 @@ impl Transaction<'_, C> { } /// Retrieve a global HPKE keypair by config ID. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_global_hpke_keypair( &self, config_id: &HpkeConfigId, @@ -4424,7 +4424,7 @@ impl Transaction<'_, C> { /// Unconditionally and fully drop a keypair. This is a dangerous operation, /// since report shares encrypted with this key will no longer be decryptable. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn delete_global_hpke_keypair(&self, config_id: &HpkeConfigId) -> Result<(), Error> { let stmt = self .prepare_cached("DELETE FROM global_hpke_keys WHERE config_id = $1;") @@ -4435,7 +4435,7 @@ impl Transaction<'_, C> { ) } - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn set_global_hpke_keypair_state( &self, config_id: &HpkeConfigId, @@ -4463,7 +4463,7 @@ impl Transaction<'_, C> { } // Inserts a new global HPKE keypair and places it in the [`HpkeKeyState::Pending`] state. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn put_global_hpke_keypair(&self, hpke_keypair: &HpkeKeypair) -> Result<(), Error> { let hpke_config_id = u8::from(*hpke_keypair.config().id()) as i16; let hpke_config = hpke_keypair.config().get_encoded()?; @@ -4498,7 +4498,7 @@ impl Transaction<'_, C> { ) } - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_taskprov_peer_aggregators(&self) -> Result, Error> { let stmt = self .prepare_cached( @@ -4568,7 +4568,7 @@ impl Transaction<'_, C> { .collect() } - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_taskprov_peer_aggregator( &self, aggregator_url: &Url, @@ -4694,7 +4694,7 @@ impl Transaction<'_, C> { )) } - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn put_taskprov_peer_aggregator( &self, peer_aggregator: &PeerAggregator, @@ -4821,7 +4821,7 @@ impl Transaction<'_, C> { Ok(()) } - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn delete_taskprov_peer_aggregator( &self, aggregator_url: &Url, @@ -4841,7 +4841,7 @@ impl Transaction<'_, C> { /// Get the [`TaskUploadCounter`] for a task. This is aggregated across all shards. Returns /// `None` if the task doesn't exist. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn get_task_upload_counter( &self, task_id: &TaskId, @@ -4884,7 +4884,7 @@ impl Transaction<'_, C> { /// Add one to the counter associated with the given [`TaskId`]. The column to increment is given /// by [`TaskUploadIncrementor`]. This is sharded, requiring an `ord` parameter to determine which /// shard to add to. `ord` should be randomly generated by the caller. - #[tracing::instrument(skip(self), err)] + #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn increment_task_upload_counter( &self, task_id: &TaskId, From d844674f1b516c1065562f8c07c7f0f9614e1ed7 Mon Sep 17 00:00:00 2001 From: Tim Geoghegan Date: Sat, 27 Jan 2024 13:52:18 -0800 Subject: [PATCH 2/3] use tracing::Level --- aggregator/src/aggregator.rs | 2 +- aggregator_core/src/datastore.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index cc67c4dd2..42d45c6cb 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -91,7 +91,7 @@ use std::{ time::{Duration as StdDuration, Instant}, }; use tokio::{sync::Mutex, try_join}; -use tracing::{debug, info, trace_span, warn}; +use tracing::{debug, info, trace_span, warn, Level}; use url::Url; pub mod accumulator; diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index dae439067..4a92701d0 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -57,7 +57,7 @@ use std::{ }; use tokio::{sync::Barrier, try_join}; use tokio_postgres::{error::SqlState, row::RowIndex, IsolationLevel, Row, Statement, ToStatement}; -use tracing::error; +use tracing::{Level, error}; use url::Url; pub mod models; From 703803f4c095a8d61b2a4d5e0be936ae180af90e Mon Sep 17 00:00:00 2001 From: Tim Geoghegan Date: Sat, 27 Jan 2024 13:54:12 -0800 Subject: [PATCH 3/3] fmt --- aggregator_core/src/datastore.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index 4a92701d0..ef93300d7 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -57,7 +57,7 @@ use std::{ }; use tokio::{sync::Barrier, try_join}; use tokio_postgres::{error::SqlState, row::RowIndex, IsolationLevel, Row, Statement, ToStatement}; -use tracing::{Level, error}; +use tracing::{error, Level}; use url::Url; pub mod models;