-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add per-task report upload metrics #2508
Conversation
eb46668
to
fd0a125
Compare
This PR is pretty much ready for review, but requires a load test to check for regressive performance. |
aggregator_core/src/datastore.rs
Outdated
"INSERT INTO task_upload_counters (task_id, ord, interval_collected) | ||
VALUES ((SELECT id FROM tasks WHERE task_id = $1), $2, 1) | ||
ON CONFLICT (task_id, ord) DO UPDATE | ||
SET interval_collected = task_upload_counters.interval_collected + 1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
postgres=# EXPLAIN ANALYZE INSERT INTO task_upload_counters (task_id, ord, interval_collected) VALUES ((SELECT id FROM tasks WHERE task_id = '\xfd7d76707c1c36d09c1ec2c69df372fea86ce89245c0405a31d2ffb14e3637c4'::bytea), 1, 1) ON CONFLICT (task_id, ord) DO UPDATE SET interval_collected = task_upload_counters.interval_collected + 1;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------
Insert on task_upload_counters (cost=8.16..8.17 rows=0 width=0) (actual time=0.040..0.041 rows=0 loops=1)
Conflict Resolution: UPDATE
Conflict Arbiter Indexes: task_upload_counters_unique
Tuples Inserted: 0
Conflicting Tuples: 1
InitPlan 1 (returns $0)
-> Index Scan using task_id_index on tasks (cost=0.14..8.16 rows=1 width=8) (actual time=0.007..0.007 rows=1 loops=1)
Index Cond: (task_id = '\xfd7d76707c1c36d09c1ec2c69df372fea86ce89245c0405a31d2ffb14e3637c4'::bytea)
-> Result (cost=0.00..0.01 rows=1 width=88) (actual time=0.010..0.010 rows=1 loops=1)
Planning Time: 0.042 ms
Execution Time: 0.050 ms
(the 99.9% case, where the row already exists and there's no conflict).
"SELECT | ||
COALESCE(SUM(interval_collected)::BIGINT, 0) AS interval_collected, | ||
COALESCE(SUM(report_decode_failure)::BIGINT, 0) AS report_decode_failure, | ||
COALESCE(SUM(report_decrypt_failure)::BIGINT, 0) AS report_decrypt_failure, | ||
COALESCE(SUM(report_expired)::BIGINT, 0) AS report_expired, | ||
COALESCE(SUM(report_outdated_key)::BIGINT, 0) AS report_outdated_key, | ||
COALESCE(SUM(report_success)::BIGINT, 0) AS report_success, | ||
COALESCE(SUM(report_too_early)::BIGINT, 0) AS report_too_early, | ||
COALESCE(SUM(task_expired)::BIGINT, 0) AS task_expired | ||
FROM task_upload_counters | ||
WHERE task_id = (SELECT id FROM tasks WHERE task_id = $1)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
postgres=#
EXPLAIN ANALYZE SELECT
COALESCE(SUM(interval_collected)::BIGINT, 0) AS interval_collected,
COALESCE(SUM(report_decode_failure)::BIGINT, 0) AS report_decode_failure,
COALESCE(SUM(report_decrypt_failure)::BIGINT, 0) AS report_decrypt_failure,
COALESCE(SUM(report_expired)::BIGINT, 0) AS report_expired,
COALESCE(SUM(report_outdated_key)::BIGINT, 0) AS report_outdated_key,
COALESCE(SUM(report_success)::BIGINT, 0) AS report_success,
COALESCE(SUM(report_too_early)::BIGINT, 0) AS report_too_early,
COALESCE(SUM(task_expired)::BIGINT, 0) AS task_expired
FROM task_upload_counters
WHERE task_id = (SELECT id FROM tasks WHERE task_id = '\xfd7d76707c1c36d09c1ec2c69df372fea86ce89245c0405a31d2ffb14e3637c4'::bytea);
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------
Aggregate (cost=125.09..125.12 rows=1 width=64) (actual time=0.121..0.121 rows=1 loops=1)
InitPlan 1 (returns $0)
-> Index Scan using task_id_index on tasks (cost=0.14..8.16 rows=1 width=8) (actual time=0.003..0.003 rows=1 loops=1)
Index Cond: (task_id = '\xfd7d76707c1c36d09c1ec2c69df372fea86ce89245c0405a31d2ffb14e3637c4'::bytea)
-> Bitmap Heap Scan on task_upload_counters (cost=8.48..116.02 rows=44 width=64) (actual time=0.030..0.113 rows=32 loops=1)
Recheck Cond: (task_id = $0)
Heap Blocks: exact=17
-> Bitmap Index Scan on task_upload_counters_unique (cost=0.00..8.47 rows=44 width=0) (actual time=0.010..0.010 rows=62 loops=1)
Index Cond: (task_id = $0)
Planning Time: 0.154 ms
Execution Time: 0.145 ms
(11 rows)
(executed with 32 shards)
// Assume this was a duplicate report, return OK but don't increment the counter | ||
// so we avoid double counting successful reports. | ||
Err(datastore::Error::MutationTargetAlreadyExists) => Ok(()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we want to avoid double counting successful reports, so I do so here. Even though it does cost us to validate whether the report is actually a duplicate or not, it won't progress through the more costlier parts of the system so therefore doesn't seem interesting.
Errored reports are not treated the same--if someone is consistently sending a bad report, it'll get counted each time. I think this is fine, since this indicates something wrong with the client and is therefore interesting. Further upstream rate limiting can mitigate disproportionate impact that a rogue client can have to these metrics.
aggregator_core/src/datastore.rs
Outdated
// Brute force each possible query. We cannot parameterize column names in prepared | ||
// statements and we want to avoid the hazards of string interpolation into SQL. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like this but I really don't want to interpolate strings into SQL. While I could currently guarantee that it is protected from injection, it is high risk for marginal reward (especially since this is in the front-facing hot path).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The string interpolation would be only the column name to increment, correct?
If so, I think this is a place where string interpolation to build a query makes sense. (Other, more structured, methods of building a query may be preferable, but I don't think our current dependencies allow for this & I don't want to take another dep just for this one query.)
The reason I think it is safe to build a query here: the dynamic part of the query is selected from a small set of possible values (i.e. column names based on the TaskUploadIncrementor
), all of which can be seen as "safe" by inspection. The particular incrementor is at least partially controlled by untrusted input (i.e. an attacker can choose which kind of bad report to send), but since all possible values are known & safe by inspection, this does not matter.
Probably the best way to implement this would be to augment TaskUploadIncrementor
to be able to provide the column name to increment, and use that to build the query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough, admittedly some of my hesitance to interpolate strings is paranoia. I will build the query and leave a // safety:
-esque comment.
CI should pass once #2529 is through and PR is rebased. |
a8e5a2a
to
c0fd3f8
Compare
aggregator/src/aggregator.rs
Outdated
let report_time = *report.metadata().time(); | ||
async move { | ||
let rejection = ReportRejection::new(*task.id(), report_id, report_time, reason); | ||
report_writer.write_report(Err(rejection)).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We wait on write_report
in the success case to allow us to faithfully report any errors in the actual write process back to the client (which is important, since this is where we would detect duplicate uploads, among other errors).
However, in the report-rejection case, a failure in write_report
is at most a metrics-update failure, which shouldn't necessarily be bubbled up to the client at all. I don't think we need to wait in this case.
We could implement this by adding a counterpart to write_report
that handles rejections, and enqueues the update without waiting on it. I think this would be similar to write_report
except without the final wait on rslt_rx
. I think this might make the ReportWriter
API a little simpler to understand, too, though the internal types would not be simplified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a couple of times now we've floated the idea of successful uploads returning OK immediately after they're enqueued. One example: https://isrg.slack.com/archives/C0167LT4C73/p1701364685606769?thread_ts=1701364378.124579&cid=C0167LT4C73.
Should we go ahead and take this change? That'll solve your comment as well, since we won't ever be waiting on batches flushing to the DB anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, I don't want to make the change that successful uploads don't wait -- unfortunately, this would mask some real failures (e.g. DB write failures) which the client could react to by re-submitting the report at a later time. That is, not waiting for successful report uploads is a tradeoff.
We may need to make that change for performance reasons, but I want to see that it's a tradeoff we need to make before we do so.
I think this won't complicate the implementation too much; as mentioned above, not waiting on the result is just not waiting on rslt_rx
. If we dropped waiting in all cases we could drop rslt_rx
entirely as well as the logic to route results back to rslt_rx
, of course, which would reduce complexity a bit -- but I think we might need to augment metrics or otherwise have some idea if writes are failing since the report upload metrics would no longer reflect failures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
9a12677 introduces write_rejection()
which doesn't wait for the tx result.
Notice that the result_tx is now tx/rx'ing a (ReportResult<C>, Option<ResultSender>)
, i.e. the result sender is optional and in write_rejection()
we send None
. I did this because otherwise we'd be creating a fake Sender
which is immediately dropped, sending write_batch()
down an unhappy path.
That commit also has some clumsy sleep()
in tests to wait for the report writer to work. I've replaced that with use of Runtime
in 37a3593
assert_eq!(report_writers.len(), result_txs.len()); | ||
let ord = thread_rng().gen_range(0..counter_shard_count); | ||
|
||
// Sort by task ID to prevent deadlocks with concurrently running transactions. Since we are |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Idle thought: we may eventually want to use a B-Tree (i.e. BTreeSet
) ordering on task ID, rather than a Vec
, to amortize the cost of sorting across the uploads. I think batches are likely to stay small enough that sorting is "cheap", though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did think about this (although with a less optimal BinaryHeap) but yeah I figured the batches are small enough where the additional overhead of a complex data structure would prove regressive. Would likely need to dive into a profiler to know one way or the other. I'm planning to do this anyways for other reasons, will check this out along the way and open a PR if it can be improved.
aggregator_core/src/datastore.rs
Outdated
// Brute force each possible query. We cannot parameterize column names in prepared | ||
// statements and we want to avoid the hazards of string interpolation into SQL. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The string interpolation would be only the column name to increment, correct?
If so, I think this is a place where string interpolation to build a query makes sense. (Other, more structured, methods of building a query may be preferable, but I don't think our current dependencies allow for this & I don't want to take another dep just for this one query.)
The reason I think it is safe to build a query here: the dynamic part of the query is selected from a small set of possible values (i.e. column names based on the TaskUploadIncrementor
), all of which can be seen as "safe" by inspection. The particular incrementor is at least partially controlled by untrusted input (i.e. an attacker can choose which kind of bad report to send), but since all possible values are known & safe by inspection, this does not matter.
Probably the best way to implement this would be to augment TaskUploadIncrementor
to be able to provide the column name to increment, and use that to build the query.
4316a4f
to
37a3593
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(sorry, one extremely nitpicky comment I missed on my first pass)
Co-authored-by: Brandon Pitman <[email protected]>
* Add per task report upload metrics. * Change default to 32, add documentation on option * Fix test * Build query instead of brute forcing each possible one * Don't wait on bad reports * Use Runtime and RuntimeManager instead of sleeping in tests * Clippy * Cargo doc * Don't use macro needlessly Co-authored-by: Brandon Pitman <[email protected]> --------- Co-authored-by: Brandon Pitman <[email protected]>
* Add per task report upload metrics. * Change default to 32, add documentation on option * Fix test * Build query instead of brute forcing each possible one * Don't wait on bad reports * Use Runtime and RuntimeManager instead of sleeping in tests * Clippy * Cargo doc * Don't use macro needlessly Co-authored-by: Brandon Pitman <[email protected]> --------- Co-authored-by: Brandon Pitman <[email protected]>
* Add per task report upload metrics. * Change default to 32, add documentation on option * Fix test * Build query instead of brute forcing each possible one * Don't wait on bad reports * Use Runtime and RuntimeManager instead of sleeping in tests * Clippy * Cargo doc * Don't use macro needlessly Co-authored-by: Brandon Pitman <[email protected]> --------- Co-authored-by: Brandon Pitman <[email protected]>
* Add per-task report upload metrics (#2508) * Add per task report upload metrics. * Change default to 32, add documentation on option * Fix test * Build query instead of brute forcing each possible one * Don't wait on bad reports * Use Runtime and RuntimeManager instead of sleeping in tests * Clippy * Cargo doc * Don't use macro needlessly Co-authored-by: Brandon Pitman <[email protected]> --------- Co-authored-by: Brandon Pitman <[email protected]> * Expose upload metrics through aggregator API (#2537) * 0.6 specific fixes Don't change existing schema * Update schema version --------- Co-authored-by: Brandon Pitman <[email protected]>
Supports #2293
Adds upload counters for tasks. They count successful report uploads, as well as errors. They monotonic and are not subject to GC.
I rolled all report rejections into a single enum variant
aggregator::Error::ReportRejected
(n.b. the proper DAP problem type determination is preserved). The report writer is changed to accept aResult<Box<dyn ReportWriter<C>, ReportRejection>>
. Depending on thisResult
, the appropriate counter column is incremented.This implementation is somewhat primitive--we could aggregate counters before submitting SQL. However, the naive approach has proven fine (i.e. non-regressive) in load testing, and aggregation approach is extremely complex with regards to propagating errors. I think our pipelined client is doing well here.
See #2513 (comment) for load test, as I executed it on Janus 0.6.