Skip to content

Commit

Permalink
style(linux): additonal debug message for sensor shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuar committed Dec 10, 2023
1 parent b5566a0 commit 4e71b9f
Show file tree
Hide file tree
Showing 17 changed files with 52 additions and 36 deletions.
34 changes: 2 additions & 32 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (agent *Agent) startWorkers(ctx context.Context) {
wg.Add(1)
go func() {
defer wg.Done()
for s := range mergeSensorCh(ctx, outCh...) {
for s := range tracker.MergeSensorCh(ctx, outCh...) {
if err := agent.sensors.UpdateSensors(ctx, s); err != nil {
log.Error().Err(err).Msg("Could not update sensor.")
}
Expand Down Expand Up @@ -359,7 +359,7 @@ func (agent *Agent) runScripts(ctx context.Context) {
log.Debug().Msg("Starting cron scheduler for script sensors.")
c.Start()
go func() {
for s := range mergeSensorCh(ctx, outCh...) {
for s := range tracker.MergeSensorCh(ctx, outCh...) {
if err := agent.sensors.UpdateSensors(ctx, s); err != nil {
log.Error().Err(err).Msg("Could not update script sensor.")
}
Expand All @@ -370,33 +370,3 @@ func (agent *Agent) runScripts(ctx context.Context) {
cronCtx := c.Stop()
<-cronCtx.Done()
}

func mergeSensorCh(ctx context.Context, sensorCh ...<-chan tracker.Sensor) <-chan tracker.Sensor {
var wg sync.WaitGroup
out := make(chan tracker.Sensor)

// Start an output goroutine for each input channel in sensorCh. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan tracker.Sensor) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-ctx.Done():
return
}
}
}
wg.Add(len(sensorCh))
for _, c := range sensorCh {
go output(c)
}

// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
2 changes: 1 addition & 1 deletion internal/agent/device_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func sensorWorkers() []func(context.Context) chan tracker.Sensor {
linux.AppUpdater,
linux.NetworkConnectionsUpdater,
linux.NetworkStatsUpdater,
linux.PowerUpater,
linux.PowerProfileUpater,
linux.ProblemsUpdater,
linux.MemoryUpdater,
linux.LoadAvgUpdater,
Expand Down
1 change: 1 addition & 0 deletions internal/linux/appSensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func AppUpdater(ctx context.Context) chan tracker.Sensor {
go func() {
defer close(sensorCh)
<-ctx.Done()
log.Debug().Msg("Stopped app sensor.")
}()
return sensorCh
}
3 changes: 2 additions & 1 deletion internal/linux/batterySensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (state *upowerBatteryState) Attributes() interface{} {
}

func BatteryUpdater(ctx context.Context) chan tracker.Sensor {
sensorCh := make(chan tracker.Sensor)
sensorCh := make(chan tracker.Sensor, 1)
batteryList := dbushelpers.NewBusRequest(ctx, dbushelpers.SystemBus).
Path(upowerDBusPath).
Destination(upowerDBusDest).
Expand Down Expand Up @@ -323,6 +323,7 @@ func BatteryUpdater(ctx context.Context) chan tracker.Sensor {
go func() {
defer close(sensorCh)
<-ctx.Done()
log.Debug().Msg("Stopped battery sensors.")
}()
return sensorCh
}
1 change: 1 addition & 0 deletions internal/linux/diskSensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func DiskUsageUpdater(ctx context.Context) chan tracker.Sensor {
go func() {
defer close(sensorCh)
<-ctx.Done()
log.Debug().Msg("Stopped disk usage sensors.")
}()
return sensorCh
}
1 change: 1 addition & 0 deletions internal/linux/loadavgSensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func LoadAvgUpdater(ctx context.Context) chan tracker.Sensor {
go func() {
defer close(sensorCh)
<-ctx.Done()
log.Debug().Msg("Stopped load average sensors.")
}()
return sensorCh
}
1 change: 1 addition & 0 deletions internal/linux/memorySensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func MemoryUpdater(ctx context.Context) chan tracker.Sensor {
go func() {
defer close(sensorCh)
<-ctx.Done()
log.Debug().Msg("Stopped memory usage sensors.")
}()
return sensorCh
}
3 changes: 2 additions & 1 deletion internal/linux/networkConnectionSensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,12 @@ func monitorActiveConnections(ctx context.Context, sensorCh chan tracker.Sensor,
}

func NetworkConnectionsUpdater(ctx context.Context) chan tracker.Sensor {
sensorCh := make(chan tracker.Sensor)
sensorCh := make(chan tracker.Sensor, 1)
go getActiveConnections(ctx, sensorCh)
go func() {
defer close(sensorCh)
<-ctx.Done()
log.Debug().Msg("Stopped network connection state sensors.")
}()
return sensorCh
}
1 change: 1 addition & 0 deletions internal/linux/networkStatsSensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func NetworkStatsUpdater(ctx context.Context) chan tracker.Sensor {
go func() {
defer close(sensorCh)
<-ctx.Done()
log.Debug().Msg("Stopped network stats sensors.")
}()
return sensorCh
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func newPowerSensor(t sensorType, v dbus.Variant) *powerSensor {
return s
}

func PowerUpater(ctx context.Context) chan tracker.Sensor {
func PowerProfileUpater(ctx context.Context) chan tracker.Sensor {
sensorCh := make(chan tracker.Sensor, 1)
activePowerProfile, err := dbushelpers.NewBusRequest(ctx, dbushelpers.SystemBus).
Path(powerProfilesDBusPath).
Expand Down Expand Up @@ -86,6 +86,7 @@ func PowerUpater(ctx context.Context) chan tracker.Sensor {
go func() {
defer close(sensorCh)
<-ctx.Done()
log.Debug().Msg(("Stopped power profile sensor."))
}()
return sensorCh
}
1 change: 1 addition & 0 deletions internal/linux/powerStateSensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func PowerStateUpdater(ctx context.Context) chan tracker.Sensor {
go func() {
defer close(sensorCh)
<-ctx.Done()
log.Debug().Msg("Stopped power state sensor.")
}()
return sensorCh
}
Expand Down
1 change: 1 addition & 0 deletions internal/linux/problemsSensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func ProblemsUpdater(ctx context.Context) chan tracker.Sensor {
go func() {
defer close(sensorCh)
<-ctx.Done()
log.Debug().Msg("Stopped problems sensor.")
}()
return sensorCh
}
1 change: 1 addition & 0 deletions internal/linux/screenlockSensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func ScreenLockUpdater(ctx context.Context) chan tracker.Sensor {
go func() {
defer close(sensorCh)
<-ctx.Done()
log.Debug().Msg("Stopped screen lock sensor.")
}()
return sensorCh
}
1 change: 1 addition & 0 deletions internal/linux/tempSensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func TempUpdater(ctx context.Context) chan tracker.Sensor {
go func() {
defer close(sensorCh)
<-ctx.Done()
log.Debug().Msg("Stopped temp sensors.")
}()
return sensorCh
}
1 change: 1 addition & 0 deletions internal/linux/timeSensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func TimeUpdater(ctx context.Context) chan tracker.Sensor {
go func() {
defer close(sensorCh)
<-ctx.Done()
log.Debug().Msg("Stopped time sensors.")
}()
return sensorCh
}
Expand Down
1 change: 1 addition & 0 deletions internal/linux/usersSensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func UsersUpdater(ctx context.Context) chan tracker.Sensor {
go func() {
defer close(sensorCh)
<-ctx.Done()
log.Debug().Msg("Stopped users sensors.")
}()
return sensorCh
}
Expand Down
32 changes: 32 additions & 0 deletions internal/tracker/sensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
package tracker

import (
"context"
"fmt"
"strings"
"sync"

"github.com/joshuar/go-hass-agent/internal/hass/sensor"
)
Expand Down Expand Up @@ -75,3 +77,33 @@ func marshalClass[C ComparableStringer](class C) string {
return class.String()
}
}

func MergeSensorCh(ctx context.Context, sensorCh ...<-chan Sensor) <-chan Sensor {
var wg sync.WaitGroup
out := make(chan Sensor)

// Start an output goroutine for each input channel in sensorCh. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan Sensor) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-ctx.Done():
return
}
}
}
wg.Add(len(sensorCh))
for _, c := range sensorCh {
go output(c)
}

// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}

0 comments on commit 4e71b9f

Please sign in to comment.