Skip to content

Commit

Permalink
telemetry worker: flush data after stops
Browse files Browse the repository at this point in the history
Telemetry workers are functionally dead after a Stop lifecycle action,
provided there's no intervening Start. While AddPoint actions are still
processed, their data is never flushed, since the Stop action handler
unschedules FlushMetrics and FlushData actions.

PHP sends a Stop action at the end of every request via
ddog_sidecar_telemetry_end(), but a Start action is only generated just
after a telemetry worker is spawned.

It is not clear to me whether the intention is to a Start/Stop pair on
every PHP requests (where Stop flushes the metrics) or if the intention
is to to have only such a pair in the first request, with the Stop event
generated by ddog_sidecar_telemetry_end() effectively a noop. It would
appear, judging by [this
comment](#391):

> Also allow the telemetry worker to have a mode where it's continuing
execution after a start-stop cycle, otherwise it won't send any more
metrics afterwards.

that the intention is to keep sending metrics after a Start/Stop pair.
In that case:

* The Stop action handler should not unschedule FlushData and
  FlushMetrics events and
* FlushData, if called outside a Start-Stop pair, should not be a noop.

Finally: swap the order in which FlushData and FlushMetrics are
scheduled so that FlushMetrics runs first and therefore its generated
data can be sent by the next FlushData.
  • Loading branch information
cataphract committed Jul 1, 2024
1 parent f66cd95 commit 4f63a34
Showing 1 changed file with 10 additions and 12 deletions.
22 changes: 10 additions & 12 deletions ddtelemetry/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,10 @@ impl TelemetryWorker {
Lifecycle(Start) => {
if !self.data.started {
self.deadlines
.schedule_event(LifecycleAction::FlushData)
.schedule_event(LifecycleAction::FlushMetricAggr)
.unwrap();
self.deadlines
.schedule_event(LifecycleAction::FlushMetricAggr)
.schedule_event(LifecycleAction::FlushData)
.unwrap();
self.data.started = true;
}
Expand All @@ -265,9 +265,6 @@ impl TelemetryWorker {
.unwrap();
}
Lifecycle(FlushData) => {
if !self.data.started {
return CONTINUE;
}
let batch = self.build_observability_batch();
if !batch.is_empty() {
let payload = data::Payload::MessageBatch(batch);
Expand Down Expand Up @@ -296,7 +293,9 @@ impl TelemetryWorker {
self.log_err(&e);
}
self.data.started = false;
self.deadlines.clear_pending();
if !self.config.restartable {
self.deadlines.clear_pending();
}
return BREAK;
}
CollectStats(stats_sender) => {
Expand Down Expand Up @@ -341,10 +340,10 @@ impl TelemetryWorker {
Err(err) => self.log_err(&err),
}
self.deadlines
.schedule_event(LifecycleAction::FlushData)
.schedule_event(LifecycleAction::FlushMetricAggr)
.unwrap();
self.deadlines
.schedule_event(LifecycleAction::FlushMetricAggr)
.schedule_event(LifecycleAction::FlushData)
.unwrap();
self.data.started = true;
}
Expand All @@ -368,9 +367,6 @@ impl TelemetryWorker {
.unwrap();
}
Lifecycle(FlushData) => {
if !self.data.started {
return CONTINUE;
}
let mut batch = self.build_app_events_batch();
let payload = if batch.is_empty() {
data::Payload::AppHeartbeat(())
Expand Down Expand Up @@ -458,7 +454,9 @@ impl TelemetryWorker {
.await;

self.data.started = false;
self.deadlines.clear_pending();
if !self.config.restartable {
self.deadlines.clear_pending();
}
return BREAK;
}
CollectStats(stats_sender) => {
Expand Down

0 comments on commit 4f63a34

Please sign in to comment.