Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observe timeout #573

Open
palsivertsen opened this issue Aug 9, 2024 · 0 comments
Open

Observe timeout #573

palsivertsen opened this issue Aug 9, 2024 · 0 comments

Comments

@palsivertsen
Copy link

palsivertsen commented Aug 9, 2024

I'm playing around with observe and I'm experiencing context cancellation when sending replies to the client. Since I'm running locally I did not expect there to be a deadline for observe notifications. I this the intended behavior and is there a way to configure this?

To reproduce this one can change the sleep time in the example code to something like 30 seconds. The server will then report an error on the second transmission.

Server log:

2024/08/09 11:47:17 Got message path=/some/path: Code: GET, Token: e81949f7e5f85be4, Path: /some/path, Type: Confirmable, MessageID: 62273 from [::1]:53810
2024/08/09 11:47:47 Error on transmitter, stopping: cannot write request: context canceled

Client log:

2024/08/09 11:47:17 Got Code: Content, Token: e81949f7e5f85be4, ContentFormat: text/plain; charset=utf-8, Type: Confirmable, MessageID: 25594, PayloadLen: 22

EDIT 1

server.go:

package main

import (
	"bytes"
	"fmt"
	"log"
	"time"

	coap "github.com/plgd-dev/go-coap/v3"
	"github.com/plgd-dev/go-coap/v3/message"
	"github.com/plgd-dev/go-coap/v3/message/codes"
	"github.com/plgd-dev/go-coap/v3/mux"
)

func getPath(opts message.Options) string {
	path, err := opts.Path()
	if err != nil {
		log.Printf("cannot get path: %v", err)
		return ""
	}
	return path
}

func sendResponse(cc mux.Conn, token []byte, subded time.Time, obs int64) error {
	m := cc.AcquireMessage(cc.Context())
	defer cc.ReleaseMessage(m)
	m.SetCode(codes.Content)
	m.SetToken(token)
	m.SetBody(bytes.NewReader([]byte(fmt.Sprintf("Been running for %v", time.Since(subded)))))
	m.SetContentFormat(message.TextPlain)
	if obs >= 0 {
		m.SetObserve(uint32(obs))
	}
	return cc.WriteMessage(m)
}

func periodicTransmitter(cc mux.Conn, token []byte) {
	subded := time.Now()

	for obs := int64(2); ; obs++ {
		err := sendResponse(cc, token, subded, obs)
		if err != nil {
			log.Printf("Error on transmitter, stopping: %v", err)
			return
		}
		time.Sleep(time.Second * 30)
	}
}

func main() {
	log.Fatal(coap.ListenAndServe("udp", ":5688",
		mux.HandlerFunc(func(w mux.ResponseWriter, r *mux.Message) {
			log.Printf("Got message path=%v: %+v from %v", getPath(r.Options()), r, w.Conn().RemoteAddr())
			obs, err := r.Options().Observe()
			switch {
			case r.Code() == codes.GET && err == nil && obs == 0:
				go periodicTransmitter(w.Conn(), r.Token())
			case r.Code() == codes.GET:
				err := sendResponse(w.Conn(), r.Token(), time.Now(), -1)
				if err != nil {
					log.Printf("Error on transmitter: %v", err)
				}
			}
		})))
}

client.go:

package main

import (
	"bytes"
	"fmt"
	"log"
	"time"

	coap "github.com/plgd-dev/go-coap/v3"
	"github.com/plgd-dev/go-coap/v3/message"
	"github.com/plgd-dev/go-coap/v3/message/codes"
	"github.com/plgd-dev/go-coap/v3/mux"
)

func getPath(opts message.Options) string {
	path, err := opts.Path()
	if err != nil {
		log.Printf("cannot get path: %v", err)
		return ""
	}
	return path
}

func sendResponse(cc mux.Conn, token []byte, subded time.Time, obs int64) error {
	m := cc.AcquireMessage(cc.Context())
	defer cc.ReleaseMessage(m)
	m.SetCode(codes.Content)
	m.SetToken(token)
	m.SetBody(bytes.NewReader([]byte(fmt.Sprintf("Been running for %v", time.Since(subded)))))
	m.SetContentFormat(message.TextPlain)
	if obs >= 0 {
		m.SetObserve(uint32(obs))
	}
	return cc.WriteMessage(m)
}

func periodicTransmitter(cc mux.Conn, token []byte) {
	subded := time.Now()

	for obs := int64(2); ; obs++ {
		err := sendResponse(cc, token, subded, obs)
		if err != nil {
			log.Printf("Error on transmitter, stopping: %v", err)
			return
		}
		time.Sleep(time.Second * 30)
	}
}

func main() {
	log.Fatal(coap.ListenAndServe("udp", ":5688",
		mux.HandlerFunc(func(w mux.ResponseWriter, r *mux.Message) {
			log.Printf("Got message path=%v: %+v from %v", getPath(r.Options()), r, w.Conn().RemoteAddr())
			obs, err := r.Options().Observe()
			switch {
			case r.Code() == codes.GET && err == nil && obs == 0:
				go periodicTransmitter(w.Conn(), r.Token())
			case r.Code() == codes.GET:
				err := sendResponse(w.Conn(), r.Token(), time.Now(), -1)
				if err != nil {
					log.Printf("Error on transmitter: %v", err)
				}
			}
		})))
}

Setting context.Background() in AquireMessage yields similar results, except that the second message is sent before the error is returned:

server.go:

package main

import (
	"bytes"
	"context"
	"fmt"
	"log"
	"time"

	coap "github.com/plgd-dev/go-coap/v3"
	"github.com/plgd-dev/go-coap/v3/message"
	"github.com/plgd-dev/go-coap/v3/message/codes"
	"github.com/plgd-dev/go-coap/v3/mux"
)

func getPath(opts message.Options) string {
	path, err := opts.Path()
	if err != nil {
		log.Printf("cannot get path: %v", err)
		return ""
	}
	return path
}

func sendResponse(cc mux.Conn, token []byte, subded time.Time, obs int64) error {
	m := cc.AcquireMessage(context.Background()) // <-- I changed this context
	defer cc.ReleaseMessage(m)
	m.SetCode(codes.Content)
	m.SetToken(token)
	m.SetBody(bytes.NewReader([]byte(fmt.Sprintf("Been running for %v", time.Since(subded)))))
	m.SetContentFormat(message.TextPlain)
	if obs >= 0 {
		m.SetObserve(uint32(obs))
	}
	return cc.WriteMessage(m)
}

func periodicTransmitter(cc mux.Conn, token []byte) {
	subded := time.Now()

	for obs := int64(2); ; obs++ {
		err := sendResponse(cc, token, subded, obs)
		if err != nil {
			log.Printf("Error on transmitter, stopping: %v", err)
			return
		}
		time.Sleep(time.Second * 30)
	}
}

func main() {
	log.Fatal(coap.ListenAndServe("udp", ":5688",
		mux.HandlerFunc(func(w mux.ResponseWriter, r *mux.Message) {
			log.Printf("Got message path=%v: %+v from %v", getPath(r.Options()), r, w.Conn().RemoteAddr())
			obs, err := r.Options().Observe()
			switch {
			case r.Code() == codes.GET && err == nil && obs == 0:
				go periodicTransmitter(w.Conn(), r.Token())
			case r.Code() == codes.GET:
				err := sendResponse(w.Conn(), r.Token(), time.Now(), -1)
				if err != nil {
					log.Printf("Error on transmitter: %v", err)
				}
			}
		})))
}

server log:

2024/08/09 12:51:28 Got message path=/some/path: Code: GET, Token: f40f11161a825088, Path: /some/path, Type: Confirmable, MessageID: 23914 from [::1]:54400
2024/08/09 12:51:58 Error on transmitter, stopping: cannot write request: connection was closed: context canceled

client log:

2024/08/09 12:51:28 Got Code: Content, Token: f40f11161a825088, ContentFormat: text/plain; charset=utf-8, Type: Confirmable, MessageID: 28440, PayloadLen: 26
2024/08/09 12:51:58 Got Code: Content, Token: f40f11161a825088, ContentFormat: text/plain; charset=utf-8, Type: Confirmable, MessageID: 28441, PayloadLen: 30

Packet capture:
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant