diff --git a/net/observation/handler.go b/net/observation/handler.go index ab88a055..0bc9e74f 100644 --- a/net/observation/handler.go +++ b/net/observation/handler.go @@ -146,10 +146,11 @@ type Observation[C Client] struct { waitForResponse atomic.Bool observationHandler *Handler[C] - private struct { + private struct { // members guarded by mutex mutex sync.Mutex - obsSequence uint32 // guarded by mutex - lastEvent time.Time // guarded by mutex + obsSequence uint32 + lastEvent time.Time + etag []byte } } @@ -199,6 +200,12 @@ func (o *Observation[C]) Request() message.Message { return o.req } +func (o *Observation[C]) etag() []byte { + o.private.mutex.Lock() + defer o.private.mutex.Unlock() + return o.private.etag +} + // Cancel remove observation from server. For recreate observation use Observe. func (o *Observation[C]) Cancel(ctx context.Context, opts ...message.Option) error { if !o.cleanUp() { @@ -217,6 +224,10 @@ func (o *Observation[C]) Cancel(ctx context.Context, opts ...message.Option) err } } req.SetToken(o.req.Token) + etag := o.etag() + if len(etag) > 0 { + _ = req.SetETag(etag) // ignore invalid etag + } resp, err := o.observationHandler.do(req) if err != nil { return err @@ -237,11 +248,20 @@ func (o *Observation[C]) wantBeNotified(r *pool.Message) bool { o.private.mutex.Lock() defer o.private.mutex.Unlock() - if ValidSequenceNumber(o.private.obsSequence, obsSequence, o.private.lastEvent, now) { - o.private.obsSequence = obsSequence - o.private.lastEvent = now - return true + if !ValidSequenceNumber(o.private.obsSequence, obsSequence, o.private.lastEvent, now) { + return false } - return false + o.private.obsSequence = obsSequence + o.private.lastEvent = now + if etag, err := r.ETag(); err == nil { + if cap(o.private.etag) < len(etag) { + o.private.etag = make([]byte, len(etag)) + } + if len(o.private.etag) != len(etag) { + o.private.etag = o.private.etag[:len(etag)] + } + copy(o.private.etag, etag) + } + return true } diff --git a/tcp/clientobserve_test.go b/tcp/clientobserve_test.go index 016b42e5..c2bde263 100644 --- a/tcp/clientobserve_test.go +++ b/tcp/clientobserve_test.go @@ -113,6 +113,8 @@ func TestConnObserve(t *testing.T) { var etag []byte var errE error if tt.args.etag != nil { + // force unexpected etag, Cancel should still succeed but with codes.Content + // instead of codes.Valid etag = tt.args.etag } else { etag, errE = message.GetETag(p) @@ -141,12 +143,13 @@ func TestConnObserve(t *testing.T) { } f() case 1: - if tt.args.etag != nil { - if etag, errE := r.ETag(); errE == nil && bytes.Equal(tt.args.etag, etag) { - errS := w.SetResponse(codes.Valid, message.TextPlain, nil) - require.NoError(t, errS) - return - } + p := bytes.NewReader(tt.args.payload) + etag, errE := message.GetETag(p) + require.NoError(t, errE) + if retag, errE := r.ETag(); errE == nil && bytes.Equal(etag, retag) { + errS := w.SetResponse(codes.Valid, message.TextPlain, nil) + require.NoError(t, errS) + return } errS := w.SetResponse(codes.Content, message.TextPlain, bytes.NewReader([]byte("close"))) require.NoError(t, errS) @@ -186,13 +189,7 @@ func TestConnObserve(t *testing.T) { require.NoError(t, err) <-obs.done - opts := make(message.Options, 0, 1) - if tt.args.etag != nil { - buf := make([]byte, len(tt.args.etag)) - opts, _, err = opts.SetBytes(buf, message.ETag, tt.args.etag) - require.NoError(t, err) - } - err = got.Cancel(ctx, opts...) + err = got.Cancel(ctx) require.NoError(t, err) }) }