Skip to content

Commit

Permalink
Update observation API
Browse files Browse the repository at this point in the history
Extend observation API to allow setting of CoAP options for the
Cancel observation request.
  • Loading branch information
Danielius1922 authored Aug 3, 2023
1 parent 351cd00 commit 5366bb2
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 12 deletions.
2 changes: 1 addition & 1 deletion mux/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

type Observation = interface {
Cancel(ctx context.Context) error
Cancel(ctx context.Context, opts ...message.Option) error
Canceled() bool
}

Expand Down
2 changes: 1 addition & 1 deletion net/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (c *Client[C]) Get(ctx context.Context, path string, opts ...message.Option
}

type Observation = interface {
Cancel(ctx context.Context) error
Cancel(ctx context.Context, opts ...message.Option) error
Canceled() bool
}

Expand Down
2 changes: 1 addition & 1 deletion net/client/limitParallelRequests/limitParallelRequests.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type (
)

type Observation = interface {
Cancel(ctx context.Context) error
Cancel(ctx context.Context, opts ...message.Option) error
Canceled() bool
}

Expand Down
5 changes: 3 additions & 2 deletions net/observation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,15 @@ func (o *Observation[C]) Request() message.Message {
}

// Cancel remove observation from server. For recreate observation use Observe.
func (o *Observation[C]) Cancel(ctx context.Context) error {
func (o *Observation[C]) Cancel(ctx context.Context, opts ...message.Option) error {
if !o.cleanUp() {
// observation was already cleanup
return nil
}

req := o.client().AcquireMessage(ctx)
defer o.client().ReleaseMessage(req)
req.ResetOptionsTo(opts)
req.SetCode(codes.GET)
req.SetObserve(1)
if path, err := o.req.Options.Path(); err == nil {
Expand All @@ -221,7 +222,7 @@ func (o *Observation[C]) Cancel(ctx context.Context) error {
return err
}
defer o.client().ReleaseMessage(resp)
if resp.Code() != codes.Content {
if resp.Code() != codes.Content && resp.Code() != codes.Valid {
return fmt.Errorf("unexpected return code(%v)", resp.Code())
}
return nil
Expand Down
127 changes: 123 additions & 4 deletions tcp/clientobserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func TestConnObserve(t *testing.T) {
payload []byte
numEvents uint32
goFunc func(f func())
etag []byte
}
tests := []struct {
name string
Expand Down Expand Up @@ -55,6 +56,15 @@ func TestConnObserve(t *testing.T) {
goFunc: func(f func()) { go f() },
},
},
{
name: "5000bytes with ETag",
args: args{
path: "/tmp",
numEvents: 20,
payload: make([]byte, 5000),
etag: []byte("1337"),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -100,7 +110,14 @@ func TestConnObserve(t *testing.T) {
}
}
p := bytes.NewReader(tt.args.payload)
etag, errE := message.GetETag(p)
var etag []byte
var errE error
if tt.args.etag != nil {
etag = tt.args.etag
} else {
etag, errE = message.GetETag(p)
require.NoError(t, errE)
}
require.NoError(t, errE)
req := cc.AcquireMessage(cc.Context())
defer cc.ReleaseMessage(req)
Expand All @@ -124,6 +141,13 @@ func TestConnObserve(t *testing.T) {
}
f()
case 1:
if tt.args.etag != nil {
if etag, errE := r.ETag(); errE == nil && bytes.Equal(tt.args.etag, etag) {
errS := w.SetResponse(codes.Valid, message.TextPlain, nil)
require.NoError(t, errS)
return
}
}
errS := w.SetResponse(codes.Content, message.TextPlain, bytes.NewReader([]byte("close")))
require.NoError(t, errS)
default:
Expand All @@ -147,7 +171,7 @@ func TestConnObserve(t *testing.T) {
require.NoError(t, errC)
}()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*3600)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
obs := &observer{
t: t,
Expand All @@ -161,7 +185,14 @@ func TestConnObserve(t *testing.T) {
}
require.NoError(t, err)
<-obs.done
err = got.Cancel(ctx)

opts := make(message.Options, 0, 1)
if tt.args.etag != nil {
buf := make([]byte, len(tt.args.etag))
opts, _, err = opts.SetBytes(buf, message.ETag, tt.args.etag)
require.NoError(t, err)
}
err = got.Cancel(ctx, opts...)
require.NoError(t, err)
})
}
Expand Down Expand Up @@ -263,7 +294,7 @@ func TestConnObserveNotSupported(t *testing.T) {
require.NoError(t, errC)
}()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*3600)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
obs := &observer{
t: t,
Expand All @@ -284,6 +315,94 @@ func TestConnObserveNotSupported(t *testing.T) {
}
}

func TestConnObserveCancel(t *testing.T) {
type cancelType int
const (
cancelContext cancelType = iota
closeListeningConnection
closeClientConnection
)
type args struct {
cancel cancelType
}
tests := []struct {
name string
args args
}{
{
name: "cancel context",
args: args{
cancel: cancelContext,
},
},
{
name: "close listening connection",
args: args{
cancel: closeListeningConnection,
},
},
{
name: "close client connection",
args: args{
cancel: closeClientConnection,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l, err := coapNet.NewTCPListener("tcp", "")
require.NoError(t, err)
closeListener := func() {
errC := l.Close()
require.NoError(t, errC)
}
defer closeListener()

var wg sync.WaitGroup
defer wg.Wait()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

closeClient := func() {}
s := NewServer(options.WithHandlerFunc(func(w *responsewriter.ResponseWriter[*client.Conn], r *pool.Message) {
errS := w.SetResponse(codes.BadRequest, message.TextPlain, nil)
require.NoError(t, errS)
w.Message().SetContext(w.Conn().Context())

switch tt.args.cancel {
case cancelContext:
cancel()
case closeListeningConnection:
closeListener()
case closeClientConnection:
closeClient()
}
}))
defer s.Stop()

wg.Add(1)
go func() {
defer wg.Done()
errS := s.Serve(l)
require.NoError(t, errS)
}()

cc, err := Dial(l.Addr().String())
require.NoError(t, err)
closeClient = func() {
errC := cc.Close()
require.NoError(t, errC)
}
defer closeClient()
_, err = cc.Observe(ctx, "/tmp", func(req *pool.Message) {
// no-op
})
require.Error(t, err)
})
}
}

/*
func TestConnObserveIotivityLite(t *testing.T) {
cc, err := Dial("10.112.112.10:60956")
Expand Down
116 changes: 113 additions & 3 deletions udp/client/observe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func TestConnObserve(t *testing.T) {
payload []byte
numEvents uint32
goFunc func(f func())
etag []byte
}
tests := []struct {
name string
Expand Down Expand Up @@ -55,6 +56,15 @@ func TestConnObserve(t *testing.T) {
goFunc: func(f func()) { go f() },
},
},
{
name: "5000bytes with ETag",
args: args{
path: "/tmp",
numEvents: 20,
payload: make([]byte, 5000),
etag: []byte("1337"),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -100,8 +110,14 @@ func TestConnObserve(t *testing.T) {
}
}
p := bytes.NewReader(tt.args.payload)
etag, errE := message.GetETag(p)
require.NoError(t, errE)
var etag []byte
var errE error
if tt.args.etag != nil {
etag = tt.args.etag
} else {
etag, errE = message.GetETag(p)
require.NoError(t, errE)
}
req := cc.AcquireMessage(cc.Context())
defer cc.ReleaseMessage(req)
req.SetCode(codes.Content)
Expand All @@ -121,6 +137,13 @@ func TestConnObserve(t *testing.T) {
}
f()
case 1:
if tt.args.etag != nil {
if etag, errE := r.ETag(); errE == nil && bytes.Equal(tt.args.etag, etag) {
errS := w.SetResponse(codes.Valid, message.TextPlain, nil)
require.NoError(t, errS)
return
}
}
errS := w.SetResponse(codes.Content, message.TextPlain, bytes.NewReader([]byte("close")))
require.NoError(t, errS)
default:
Expand Down Expand Up @@ -160,7 +183,14 @@ func TestConnObserve(t *testing.T) {
_, err = cc.Get(ctx, tt.args.path)
require.NoError(t, err)
<-obs.done
err = got.Cancel(ctx)

opts := make(message.Options, 0, 1)
if tt.args.etag != nil {
buf := make([]byte, len(tt.args.etag))
opts, _, err = opts.SetBytes(buf, message.ETag, tt.args.etag)
require.NoError(t, err)
}
err = got.Cancel(ctx, opts...)
require.NoError(t, err)
})
}
Expand Down Expand Up @@ -284,6 +314,86 @@ func TestConnObserveNotSupported(t *testing.T) {
}
}

func TestConnObserveCancel(t *testing.T) {
type cancelType int
const (
cancelContext cancelType = iota
closeListeningConnection
closeClientConnection
)
type args struct {
cancel cancelType
}
tests := []struct {
name string
args args
}{
{
name: "cancel context",
args: args{
cancel: cancelContext,
},
},
{
name: "close client connection",
args: args{
cancel: closeClientConnection,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l, err := coapNet.NewListenUDP("udp", "")
require.NoError(t, err)
closeListener := func() {
errC := l.Close()
require.NoError(t, errC)
}
defer closeListener()

var wg sync.WaitGroup
defer wg.Wait()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

closeClient := func() {}
s := udp.NewServer(options.WithHandlerFunc(func(w *responsewriter.ResponseWriter[*client.Conn], r *pool.Message) {
errS := w.SetResponse(codes.BadRequest, message.TextPlain, nil)
require.NoError(t, errS)
w.Message().SetContext(w.Conn().Context())

switch tt.args.cancel {
case cancelContext:
cancel()
case closeClientConnection:
closeClient()
}
}))
defer s.Stop()

wg.Add(1)
go func() {
defer wg.Done()
errS := s.Serve(l)
require.NoError(t, errS)
}()

cc, err := udp.Dial(l.LocalAddr().String())
require.NoError(t, err)
closeClient = func() {
errC := cc.Close()
require.NoError(t, errC)
}
defer closeClient()
_, err = cc.Observe(ctx, "/tmp", func(req *pool.Message) {
// no-op
})
require.Error(t, err)
})
}
}

/*
func TestConnObserveIotivityLite(t *testing.T) {
cc, err := Dial("10.112.112.10:60956")
Expand Down

0 comments on commit 5366bb2

Please sign in to comment.