Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tibber: resubscribe on clean disconnect #18643

Merged
merged 9 commits into from
Feb 14, 2025
53 changes: 33 additions & 20 deletions meter/tibber-pulse.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ func init() {
}

type Tibber struct {
data *util.Monitor[tibber.LiveMeasurement]
data *util.Monitor[tibber.LiveMeasurement]
homeID string
timeout time.Duration
}

func NewTibberFromConfig(ctx context.Context, other map[string]interface{}) (api.Meter, error) {
Expand Down Expand Up @@ -67,7 +69,9 @@ func NewTibberFromConfig(ctx context.Context, other map[string]interface{}) (api
}

t := &Tibber{
data: util.NewMonitor[tibber.LiveMeasurement](cc.Timeout),
data: util.NewMonitor[tibber.LiveMeasurement](cc.Timeout),
homeID: cc.HomeID,
timeout: cc.Timeout,
}

// subscription client
Expand Down Expand Up @@ -98,18 +102,8 @@ func NewTibberFromConfig(ctx context.Context, other map[string]interface{}) (api
return nil
})

done := make(chan error, 1)
go func(done chan error) {
done <- t.subscribe(client, cc.HomeID)
}(done)

select {
case err := <-done:
if err != nil {
return nil, err
}
case <-time.After(cc.Timeout):
return nil, api.ErrTimeout
if err := t.ensureSubscribed(client); err != nil {
return nil, err
}

go func() {
Expand All @@ -120,11 +114,16 @@ func NewTibberFromConfig(ctx context.Context, other map[string]interface{}) (api
}()

go func() {
// The pulse sometimes declines valid(!) subscription requests, and asks the client to disconnect.
// Therefore we need to restart the client when exiting gracefully upon server request
// https://github.com/evcc-io/evcc/issues/17925#issuecomment-2621458890
for tick := time.Tick(10 * time.Second); ; {
if err := client.Run(); err != nil {
err := client.Run()
if err == nil {
// The pulse sometimes declines valid(!) subscription requests, and asks the client to disconnect.
// This invalidates the subscription, and therefore we resubscribe when exiting Run() gracefully
// upon server request.
// https://github.com/evcc-io/evcc/issues/17925#issuecomment-2621458890
err = t.ensureSubscribed(client)
}
if err != nil {
log.ERROR.Println(err)
}

Expand All @@ -139,13 +138,27 @@ func NewTibberFromConfig(ctx context.Context, other map[string]interface{}) (api
return t, nil
}

func (t *Tibber) subscribe(client *graphql.SubscriptionClient, homeID string) error {
func (t *Tibber) ensureSubscribed(client *graphql.SubscriptionClient) error {
done := make(chan error, 1)
go func(done chan error) {
done <- t.subscribe(client)
}(done)

select {
case err := <-done:
return err
case <-time.After(t.timeout):
return api.ErrTimeout
}
}

func (t *Tibber) subscribe(client *graphql.SubscriptionClient) error {
var query struct {
tibber.LiveMeasurement `graphql:"liveMeasurement(homeId: $homeId)"`
}

_, err := client.Subscribe(&query, map[string]any{
"homeId": graphql.ID(homeID),
"homeId": graphql.ID(t.homeID),
}, func(data []byte, err error) error {
if err != nil {
return err
Expand Down
Loading