From 4e71b9f6c2b9341e2e905958ca1379126f7d709f Mon Sep 17 00:00:00 2001 From: Joshua Rich Date: Sun, 10 Dec 2023 14:29:51 +1000 Subject: [PATCH] style(linux): additonal debug message for sensor shutdown --- internal/agent/agent.go | 34 ++----------------- internal/agent/device_linux.go | 2 +- internal/linux/appSensor.go | 1 + internal/linux/batterySensor.go | 3 +- internal/linux/diskSensor.go | 1 + internal/linux/loadavgSensor.go | 1 + internal/linux/memorySensor.go | 1 + internal/linux/networkConnectionSensor.go | 3 +- internal/linux/networkStatsSensor.go | 1 + .../{powerSensor.go => powerProfileSensor.go} | 3 +- internal/linux/powerStateSensor.go | 1 + internal/linux/problemsSensor.go | 1 + internal/linux/screenlockSensor.go | 1 + internal/linux/tempSensor.go | 1 + internal/linux/timeSensor.go | 1 + internal/linux/usersSensor.go | 1 + internal/tracker/sensor.go | 32 +++++++++++++++++ 17 files changed, 52 insertions(+), 36 deletions(-) rename internal/linux/{powerSensor.go => powerProfileSensor.go} (95%) diff --git a/internal/agent/agent.go b/internal/agent/agent.go index df1d1ca63..84012eed9 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -302,7 +302,7 @@ func (agent *Agent) startWorkers(ctx context.Context) { wg.Add(1) go func() { defer wg.Done() - for s := range mergeSensorCh(ctx, outCh...) { + for s := range tracker.MergeSensorCh(ctx, outCh...) { if err := agent.sensors.UpdateSensors(ctx, s); err != nil { log.Error().Err(err).Msg("Could not update sensor.") } @@ -359,7 +359,7 @@ func (agent *Agent) runScripts(ctx context.Context) { log.Debug().Msg("Starting cron scheduler for script sensors.") c.Start() go func() { - for s := range mergeSensorCh(ctx, outCh...) { + for s := range tracker.MergeSensorCh(ctx, outCh...) { if err := agent.sensors.UpdateSensors(ctx, s); err != nil { log.Error().Err(err).Msg("Could not update script sensor.") } @@ -370,33 +370,3 @@ func (agent *Agent) runScripts(ctx context.Context) { cronCtx := c.Stop() <-cronCtx.Done() } - -func mergeSensorCh(ctx context.Context, sensorCh ...<-chan tracker.Sensor) <-chan tracker.Sensor { - var wg sync.WaitGroup - out := make(chan tracker.Sensor) - - // Start an output goroutine for each input channel in sensorCh. output - // copies values from c to out until c is closed, then calls wg.Done. - output := func(c <-chan tracker.Sensor) { - defer wg.Done() - for n := range c { - select { - case out <- n: - case <-ctx.Done(): - return - } - } - } - wg.Add(len(sensorCh)) - for _, c := range sensorCh { - go output(c) - } - - // Start a goroutine to close out once all the output goroutines are - // done. This must start after the wg.Add call. - go func() { - wg.Wait() - close(out) - }() - return out -} diff --git a/internal/agent/device_linux.go b/internal/agent/device_linux.go index 66372ece8..d20fcc0bd 100644 --- a/internal/agent/device_linux.go +++ b/internal/agent/device_linux.go @@ -25,7 +25,7 @@ func sensorWorkers() []func(context.Context) chan tracker.Sensor { linux.AppUpdater, linux.NetworkConnectionsUpdater, linux.NetworkStatsUpdater, - linux.PowerUpater, + linux.PowerProfileUpater, linux.ProblemsUpdater, linux.MemoryUpdater, linux.LoadAvgUpdater, diff --git a/internal/linux/appSensor.go b/internal/linux/appSensor.go index ace530b26..b6145fecf 100644 --- a/internal/linux/appSensor.go +++ b/internal/linux/appSensor.go @@ -172,6 +172,7 @@ func AppUpdater(ctx context.Context) chan tracker.Sensor { go func() { defer close(sensorCh) <-ctx.Done() + log.Debug().Msg("Stopped app sensor.") }() return sensorCh } diff --git a/internal/linux/batterySensor.go b/internal/linux/batterySensor.go index a43b6a504..5559bc1e2 100644 --- a/internal/linux/batterySensor.go +++ b/internal/linux/batterySensor.go @@ -248,7 +248,7 @@ func (state *upowerBatteryState) Attributes() interface{} { } func BatteryUpdater(ctx context.Context) chan tracker.Sensor { - sensorCh := make(chan tracker.Sensor) + sensorCh := make(chan tracker.Sensor, 1) batteryList := dbushelpers.NewBusRequest(ctx, dbushelpers.SystemBus). Path(upowerDBusPath). Destination(upowerDBusDest). @@ -323,6 +323,7 @@ func BatteryUpdater(ctx context.Context) chan tracker.Sensor { go func() { defer close(sensorCh) <-ctx.Done() + log.Debug().Msg("Stopped battery sensors.") }() return sensorCh } diff --git a/internal/linux/diskSensor.go b/internal/linux/diskSensor.go index 366920159..d4de39ea1 100644 --- a/internal/linux/diskSensor.go +++ b/internal/linux/diskSensor.go @@ -81,6 +81,7 @@ func DiskUsageUpdater(ctx context.Context) chan tracker.Sensor { go func() { defer close(sensorCh) <-ctx.Done() + log.Debug().Msg("Stopped disk usage sensors.") }() return sensorCh } diff --git a/internal/linux/loadavgSensor.go b/internal/linux/loadavgSensor.go index 022d29f48..672f7f809 100644 --- a/internal/linux/loadavgSensor.go +++ b/internal/linux/loadavgSensor.go @@ -55,6 +55,7 @@ func LoadAvgUpdater(ctx context.Context) chan tracker.Sensor { go func() { defer close(sensorCh) <-ctx.Done() + log.Debug().Msg("Stopped load average sensors.") }() return sensorCh } diff --git a/internal/linux/memorySensor.go b/internal/linux/memorySensor.go index 9e4890e58..c22b0af92 100644 --- a/internal/linux/memorySensor.go +++ b/internal/linux/memorySensor.go @@ -76,6 +76,7 @@ func MemoryUpdater(ctx context.Context) chan tracker.Sensor { go func() { defer close(sensorCh) <-ctx.Done() + log.Debug().Msg("Stopped memory usage sensors.") }() return sensorCh } diff --git a/internal/linux/networkConnectionSensor.go b/internal/linux/networkConnectionSensor.go index 0f6cf1070..4e7dff0e1 100644 --- a/internal/linux/networkConnectionSensor.go +++ b/internal/linux/networkConnectionSensor.go @@ -280,11 +280,12 @@ func monitorActiveConnections(ctx context.Context, sensorCh chan tracker.Sensor, } func NetworkConnectionsUpdater(ctx context.Context) chan tracker.Sensor { - sensorCh := make(chan tracker.Sensor) + sensorCh := make(chan tracker.Sensor, 1) go getActiveConnections(ctx, sensorCh) go func() { defer close(sensorCh) <-ctx.Done() + log.Debug().Msg("Stopped network connection state sensors.") }() return sensorCh } diff --git a/internal/linux/networkStatsSensor.go b/internal/linux/networkStatsSensor.go index 2016cf437..bc513269a 100644 --- a/internal/linux/networkStatsSensor.go +++ b/internal/linux/networkStatsSensor.go @@ -121,6 +121,7 @@ func NetworkStatsUpdater(ctx context.Context) chan tracker.Sensor { go func() { defer close(sensorCh) <-ctx.Done() + log.Debug().Msg("Stopped network stats sensors.") }() return sensorCh } diff --git a/internal/linux/powerSensor.go b/internal/linux/powerProfileSensor.go similarity index 95% rename from internal/linux/powerSensor.go rename to internal/linux/powerProfileSensor.go index ad6efdbdd..837b97e79 100644 --- a/internal/linux/powerSensor.go +++ b/internal/linux/powerProfileSensor.go @@ -33,7 +33,7 @@ func newPowerSensor(t sensorType, v dbus.Variant) *powerSensor { return s } -func PowerUpater(ctx context.Context) chan tracker.Sensor { +func PowerProfileUpater(ctx context.Context) chan tracker.Sensor { sensorCh := make(chan tracker.Sensor, 1) activePowerProfile, err := dbushelpers.NewBusRequest(ctx, dbushelpers.SystemBus). Path(powerProfilesDBusPath). @@ -86,6 +86,7 @@ func PowerUpater(ctx context.Context) chan tracker.Sensor { go func() { defer close(sensorCh) <-ctx.Done() + log.Debug().Msg(("Stopped power profile sensor.")) }() return sensorCh } diff --git a/internal/linux/powerStateSensor.go b/internal/linux/powerStateSensor.go index 04a7ed991..aee8c65c8 100644 --- a/internal/linux/powerStateSensor.go +++ b/internal/linux/powerStateSensor.go @@ -65,6 +65,7 @@ func PowerStateUpdater(ctx context.Context) chan tracker.Sensor { go func() { defer close(sensorCh) <-ctx.Done() + log.Debug().Msg("Stopped power state sensor.") }() return sensorCh } diff --git a/internal/linux/problemsSensor.go b/internal/linux/problemsSensor.go index a7615105a..b34e82189 100644 --- a/internal/linux/problemsSensor.go +++ b/internal/linux/problemsSensor.go @@ -101,6 +101,7 @@ func ProblemsUpdater(ctx context.Context) chan tracker.Sensor { go func() { defer close(sensorCh) <-ctx.Done() + log.Debug().Msg("Stopped problems sensor.") }() return sensorCh } diff --git a/internal/linux/screenlockSensor.go b/internal/linux/screenlockSensor.go index 517b41533..5d4ac3f7e 100644 --- a/internal/linux/screenlockSensor.go +++ b/internal/linux/screenlockSensor.go @@ -82,6 +82,7 @@ func ScreenLockUpdater(ctx context.Context) chan tracker.Sensor { go func() { defer close(sensorCh) <-ctx.Done() + log.Debug().Msg("Stopped screen lock sensor.") }() return sensorCh } diff --git a/internal/linux/tempSensor.go b/internal/linux/tempSensor.go index f69536de7..f157ad06c 100644 --- a/internal/linux/tempSensor.go +++ b/internal/linux/tempSensor.go @@ -86,6 +86,7 @@ func TempUpdater(ctx context.Context) chan tracker.Sensor { go func() { defer close(sensorCh) <-ctx.Done() + log.Debug().Msg("Stopped temp sensors.") }() return sensorCh } diff --git a/internal/linux/timeSensor.go b/internal/linux/timeSensor.go index a2e546717..81ea68a9a 100644 --- a/internal/linux/timeSensor.go +++ b/internal/linux/timeSensor.go @@ -68,6 +68,7 @@ func TimeUpdater(ctx context.Context) chan tracker.Sensor { go func() { defer close(sensorCh) <-ctx.Done() + log.Debug().Msg("Stopped time sensors.") }() return sensorCh } diff --git a/internal/linux/usersSensor.go b/internal/linux/usersSensor.go index bfe36befc..e9b81e525 100644 --- a/internal/linux/usersSensor.go +++ b/internal/linux/usersSensor.go @@ -76,6 +76,7 @@ func UsersUpdater(ctx context.Context) chan tracker.Sensor { go func() { defer close(sensorCh) <-ctx.Done() + log.Debug().Msg("Stopped users sensors.") }() return sensorCh } diff --git a/internal/tracker/sensor.go b/internal/tracker/sensor.go index c712ef853..e6478d080 100644 --- a/internal/tracker/sensor.go +++ b/internal/tracker/sensor.go @@ -6,8 +6,10 @@ package tracker import ( + "context" "fmt" "strings" + "sync" "github.com/joshuar/go-hass-agent/internal/hass/sensor" ) @@ -75,3 +77,33 @@ func marshalClass[C ComparableStringer](class C) string { return class.String() } } + +func MergeSensorCh(ctx context.Context, sensorCh ...<-chan Sensor) <-chan Sensor { + var wg sync.WaitGroup + out := make(chan Sensor) + + // Start an output goroutine for each input channel in sensorCh. output + // copies values from c to out until c is closed, then calls wg.Done. + output := func(c <-chan Sensor) { + defer wg.Done() + for n := range c { + select { + case out <- n: + case <-ctx.Done(): + return + } + } + } + wg.Add(len(sensorCh)) + for _, c := range sensorCh { + go output(c) + } + + // Start a goroutine to close out once all the output goroutines are + // done. This must start after the wg.Add call. + go func() { + wg.Wait() + close(out) + }() + return out +}