Skip to content

Commit

Permalink
net: Enhance observation management with ETag tracking (#469)
Browse files Browse the repository at this point in the history
* net: Enhance observation management with ETag tracking

This commit introduces an enhancement to the observation mechanism
in the network module by implementing ETag tracking. The latest ETag
value associated with each observation is now stored in the internal
representation.

With this update, each incoming message containing the ETag CoAP option
automatically updates the ETag value for the relevant observation.
When an observation is canceled, the stored ETag value is utilized.
When the server detects a valid ETag match, it now responds with a
VALID code and an empty payload instead of resending the content
of the resource. This optimization minimizes unnecessary data transfer
and reduces network load.

---------

Co-authored-by: Jozef Kralik <[email protected]>
  • Loading branch information
Danielius1922 and jkralik authored Aug 8, 2023
1 parent f73387a commit 3625c08
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 21 deletions.
36 changes: 28 additions & 8 deletions net/observation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,11 @@ type Observation[C Client] struct {
waitForResponse atomic.Bool
observationHandler *Handler[C]

private struct {
private struct { // members guarded by mutex
mutex sync.Mutex
obsSequence uint32 // guarded by mutex
lastEvent time.Time // guarded by mutex
obsSequence uint32
lastEvent time.Time
etag []byte
}
}

Expand Down Expand Up @@ -199,6 +200,12 @@ func (o *Observation[C]) Request() message.Message {
return o.req
}

func (o *Observation[C]) etag() []byte {
o.private.mutex.Lock()
defer o.private.mutex.Unlock()
return o.private.etag
}

// Cancel remove observation from server. For recreate observation use Observe.
func (o *Observation[C]) Cancel(ctx context.Context, opts ...message.Option) error {
if !o.cleanUp() {
Expand All @@ -217,6 +224,10 @@ func (o *Observation[C]) Cancel(ctx context.Context, opts ...message.Option) err
}
}
req.SetToken(o.req.Token)
etag := o.etag()
if len(etag) > 0 {
_ = req.SetETag(etag) // ignore invalid etag
}
resp, err := o.observationHandler.do(req)
if err != nil {
return err
Expand All @@ -237,11 +248,20 @@ func (o *Observation[C]) wantBeNotified(r *pool.Message) bool {

o.private.mutex.Lock()
defer o.private.mutex.Unlock()
if ValidSequenceNumber(o.private.obsSequence, obsSequence, o.private.lastEvent, now) {
o.private.obsSequence = obsSequence
o.private.lastEvent = now
return true
if !ValidSequenceNumber(o.private.obsSequence, obsSequence, o.private.lastEvent, now) {
return false
}

return false
o.private.obsSequence = obsSequence
o.private.lastEvent = now
if etag, err := r.ETag(); err == nil {
if cap(o.private.etag) < len(etag) {
o.private.etag = make([]byte, len(etag))
}
if len(o.private.etag) != len(etag) {
o.private.etag = o.private.etag[:len(etag)]
}
copy(o.private.etag, etag)
}
return true
}
23 changes: 10 additions & 13 deletions tcp/clientobserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ func TestConnObserve(t *testing.T) {
var etag []byte
var errE error
if tt.args.etag != nil {
// force unexpected etag, Cancel should still succeed but with codes.Content
// instead of codes.Valid
etag = tt.args.etag
} else {
etag, errE = message.GetETag(p)
Expand Down Expand Up @@ -141,12 +143,13 @@ func TestConnObserve(t *testing.T) {
}
f()
case 1:
if tt.args.etag != nil {
if etag, errE := r.ETag(); errE == nil && bytes.Equal(tt.args.etag, etag) {
errS := w.SetResponse(codes.Valid, message.TextPlain, nil)
require.NoError(t, errS)
return
}
p := bytes.NewReader(tt.args.payload)
etag, errE := message.GetETag(p)
require.NoError(t, errE)
if retag, errE := r.ETag(); errE == nil && bytes.Equal(etag, retag) {
errS := w.SetResponse(codes.Valid, message.TextPlain, nil)
require.NoError(t, errS)
return
}
errS := w.SetResponse(codes.Content, message.TextPlain, bytes.NewReader([]byte("close")))
require.NoError(t, errS)
Expand Down Expand Up @@ -186,13 +189,7 @@ func TestConnObserve(t *testing.T) {
require.NoError(t, err)
<-obs.done

opts := make(message.Options, 0, 1)
if tt.args.etag != nil {
buf := make([]byte, len(tt.args.etag))
opts, _, err = opts.SetBytes(buf, message.ETag, tt.args.etag)
require.NoError(t, err)
}
err = got.Cancel(ctx, opts...)
err = got.Cancel(ctx)
require.NoError(t, err)
})
}
Expand Down

0 comments on commit 3625c08

Please sign in to comment.