Skip to content

Commit

Permalink
net: Enhance observation management with ETag tracking
Browse files Browse the repository at this point in the history
This commit introduces an enhancement to the observation mechanism
in the network module by implementing ETag tracking. The latest ETag
value associated with each observation is now stored in the internal
representation.

With this update, each incoming message containing the ETag CoAP option
automatically updates the ETag value for the relevant observation.
When an observation is canceled, the stored ETag value is utilized.
When the server detects a valid ETag match, it now responds with a
VALID code and an empty payload instead of resending the content
of the resource. This optimization minimizes unnecessary data transfer
and reduces network load.
  • Loading branch information
Danielius1922 committed Aug 8, 2023
1 parent 5366bb2 commit 443f67c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 16 deletions.
20 changes: 17 additions & 3 deletions net/observation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,11 @@ type Observation[C Client] struct {
waitForResponse atomic.Bool
observationHandler *Handler[C]

private struct {
private struct { // members guarded by mutex
mutex sync.Mutex
obsSequence uint32 // guarded by mutex
lastEvent time.Time // guarded by mutex
obsSequence uint32
lastEvent time.Time
etag []byte
}
}

Expand Down Expand Up @@ -199,6 +200,12 @@ func (o *Observation[C]) Request() message.Message {
return o.req
}

func (o *Observation[C]) etag() []byte {
o.private.mutex.Lock()
defer o.private.mutex.Unlock()
return o.private.etag
}

// Cancel remove observation from server. For recreate observation use Observe.
func (o *Observation[C]) Cancel(ctx context.Context, opts ...message.Option) error {
if !o.cleanUp() {
Expand All @@ -217,6 +224,10 @@ func (o *Observation[C]) Cancel(ctx context.Context, opts ...message.Option) err
}
}
req.SetToken(o.req.Token)
etag := o.etag()
if etag != nil {
_ = req.SetETag(etag) // ignore invalid etag
}
resp, err := o.observationHandler.do(req)
if err != nil {
return err
Expand All @@ -240,6 +251,9 @@ func (o *Observation[C]) wantBeNotified(r *pool.Message) bool {
if ValidSequenceNumber(o.private.obsSequence, obsSequence, o.private.lastEvent, now) {
o.private.obsSequence = obsSequence
o.private.lastEvent = now
if etag, err := r.ETag(); err == nil {
o.private.etag = etag
}
return true
}

Expand Down
23 changes: 10 additions & 13 deletions tcp/clientobserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ func TestConnObserve(t *testing.T) {
var etag []byte
var errE error
if tt.args.etag != nil {
// force unexpected etag, Cancel should still succeed but with codes.Content
// instead of codes.Valid
etag = tt.args.etag
} else {
etag, errE = message.GetETag(p)
Expand Down Expand Up @@ -141,12 +143,13 @@ func TestConnObserve(t *testing.T) {
}
f()
case 1:
if tt.args.etag != nil {
if etag, errE := r.ETag(); errE == nil && bytes.Equal(tt.args.etag, etag) {
errS := w.SetResponse(codes.Valid, message.TextPlain, nil)
require.NoError(t, errS)
return
}
p := bytes.NewReader(tt.args.payload)
etag, errE := message.GetETag(p)
require.NoError(t, errE)
if retag, errE := r.ETag(); errE == nil && bytes.Equal(etag, retag) {
errS := w.SetResponse(codes.Valid, message.TextPlain, nil)
require.NoError(t, errS)
return
}
errS := w.SetResponse(codes.Content, message.TextPlain, bytes.NewReader([]byte("close")))
require.NoError(t, errS)
Expand Down Expand Up @@ -186,13 +189,7 @@ func TestConnObserve(t *testing.T) {
require.NoError(t, err)
<-obs.done

opts := make(message.Options, 0, 1)
if tt.args.etag != nil {
buf := make([]byte, len(tt.args.etag))
opts, _, err = opts.SetBytes(buf, message.ETag, tt.args.etag)
require.NoError(t, err)
}
err = got.Cancel(ctx, opts...)
err = got.Cancel(ctx)
require.NoError(t, err)
})
}
Expand Down

0 comments on commit 443f67c

Please sign in to comment.