Skip to content

Commit

Permalink
Add Request Monitoring Feature to CoAP Connection
Browse files Browse the repository at this point in the history
The WithRequestMonitor function has been implemented to enable request monitoring for the connection. This function is called for each CoAP message received from the peer before it is processed.

Details of the Feature:

- Functionality: WithRequestMonitor allows developers to implement custom request monitoring logic for incoming CoAP messages.
- Error Handling: If the function returns an error, the connection is closed, providing a mechanism to handle and respond to issues in the monitoring process.
- Message Dropping: If the function returns true, the incoming message is dropped, allowing for selective handling or filtering of messages based on monitoring criteria.

---------

Co-authored-by: Jeff Welder <[email protected]>
  • Loading branch information
jkralik and jeffwelder-ellenbytech authored Nov 28, 2023
1 parent ce37a93 commit 42d43eb
Show file tree
Hide file tree
Showing 20 changed files with 689 additions and 44 deletions.
7 changes: 4 additions & 3 deletions dtls/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,11 @@ func Client(conn *dtls.Conn, opts ...udp.Option) *udpClient.Conn {
cfg.MTU,
cfg.CloseSocket,
)
cc := udpClient.NewConn(session,
createBlockWise,
monitor,
cc := udpClient.NewConnWithOpts(session,
&cfg,
udpClient.WithBlockWise(createBlockWise),
udpClient.WithInactivityMonitor(monitor),
udpClient.WithRequestMonitor(cfg.RequestMonitor),
)

cfg.PeriodicRunner(func(now time.Time) bool {
Expand Down
4 changes: 4 additions & 0 deletions dtls/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ var DefaultConfig = func() Config {
}
return inactivity.New(timeout, onInactive)
},
RequestMonitor: func(cc *udpClient.Conn, req *pool.Message) (bool, error) {
return false, nil
},
OnNewConn: func(cc *udpClient.Conn) {
// do nothing by default
},
Expand All @@ -57,6 +60,7 @@ type Config struct {
GetMID GetMIDFunc
Handler HandlerFunc
OnNewConn OnNewConnFunc
RequestMonitor udpClient.RequestMonitorFunc
TransmissionNStart uint32
TransmissionAcknowledgeTimeout time.Duration
TransmissionMaxRetransmit uint32
Expand Down
17 changes: 11 additions & 6 deletions dtls/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func New(opt ...Option) *Server {
return inactivity.NewNilMonitor[*udpClient.Conn]()
}
}

if cfg.MessagePool == nil {
cfg.MessagePool = pool.New(0, 0)
}
Expand Down Expand Up @@ -158,8 +159,10 @@ func (s *Server) Serve(l Listener) error {
}
wg.Add(1)
var cc *udpClient.Conn
monitor := s.cfg.CreateInactivityMonitor()
cc = s.createConn(coapNet.NewConn(rw), monitor)
inactivityMonitor := s.cfg.CreateInactivityMonitor()
requestMonitor := s.cfg.RequestMonitor

cc = s.createConn(coapNet.NewConn(rw), inactivityMonitor, requestMonitor)
if s.cfg.OnNewConn != nil {
s.cfg.OnNewConn(cc)
}
Expand All @@ -184,7 +187,7 @@ func (s *Server) Stop() {
}
}

func (s *Server) createConn(connection *coapNet.Conn, monitor udpClient.InactivityMonitor) *udpClient.Conn {
func (s *Server) createConn(connection *coapNet.Conn, inactivityMonitor udpClient.InactivityMonitor, requestMonitor udpClient.RequestMonitorFunc) *udpClient.Conn {
createBlockWise := func(cc *udpClient.Conn) *blockwise.BlockWise[*udpClient.Conn] {
return nil
}
Expand Down Expand Up @@ -220,11 +223,13 @@ func (s *Server) createConn(connection *coapNet.Conn, monitor udpClient.Inactivi
cfg.MessagePool = s.cfg.MessagePool
cfg.ReceivedMessageQueueSize = s.cfg.ReceivedMessageQueueSize
cfg.ProcessReceivedMessage = s.cfg.ProcessReceivedMessage
cc := udpClient.NewConn(

cc := udpClient.NewConnWithOpts(
session,
createBlockWise,
monitor,
&cfg,
udpClient.WithBlockWise(createBlockWise),
udpClient.WithInactivityMonitor(inactivityMonitor),
udpClient.WithRequestMonitor(requestMonitor),
)

return cc
Expand Down
8 changes: 8 additions & 0 deletions message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,11 @@ func (r *Message) String() string {
}
return buf
}

// IsPing returns true if the message is a ping.
func (r *Message) IsPing(isTCP bool) bool {
if isTCP {
return r.Code == codes.Ping
}
return r.Code == codes.Empty && r.Type == Confirmable && len(r.Token) == 0 && len(r.Options) == 0 && len(r.Payload) == 0
}
73 changes: 73 additions & 0 deletions message/message_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package message

import (
"testing"

"github.com/plgd-dev/go-coap/v3/message/codes"
"github.com/stretchr/testify/require"
)

func TestMessageIsPing(t *testing.T) {
tests := []struct {
name string
message *Message
isTCP bool
want bool
}{
{
name: "Ping message (TCP)",
message: &Message{
Code: codes.Ping,
Type: Confirmable,
Token: nil,
Options: nil,
Payload: nil,
},
isTCP: true,
want: true,
},
{
name: "Ping message (UDP)",
message: &Message{
Code: codes.Empty,
Type: Confirmable,
Token: nil,
Options: nil,
Payload: nil,
},
isTCP: false,
want: true,
},
{
name: "Non-ping message (TCP)",
message: &Message{
Code: codes.GET,
Type: Confirmable,
Token: []byte{1, 2, 3},
Options: []Option{{ID: 1, Value: []byte{4, 5, 6}}},
Payload: []byte{7, 8, 9},
},
isTCP: true,
want: false,
},
{
name: "Non-ping message (UDP)",
message: &Message{
Code: codes.GET,
Type: Confirmable,
Token: []byte{1, 2, 3},
Options: []Option{{ID: 1, Value: []byte{4, 5, 6}}},
Payload: []byte{7, 8, 9},
},
isTCP: false,
want: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.message.IsPing(tt.isTCP)
require.Equal(t, tt.want, got)
})
}
}
4 changes: 4 additions & 0 deletions message/pool/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,3 +618,7 @@ func (r *Message) Clone(msg *Message) error {
}
return nil
}

func (r *Message) IsPing(isTCP bool) bool {
return r.msg.IsPing(isTCP)
}
54 changes: 54 additions & 0 deletions options/commonOptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,60 @@ func WithOnNewConn[F OnNewConnFunc](onNewConn F) OnNewConnOpt[F] {
}
}

// WithRequestMonitor
type WithRequestMonitorFunc interface {
tcpClient.RequestMonitorFunc | udpClient.RequestMonitorFunc
}

// WithRequestMonitorOpt network option.
type WithRequestMonitorOpt[F WithRequestMonitorFunc] struct {
f F
}

func panicForInvalidWithRequestMonitorFunc(t, exp any) {
panic(fmt.Errorf("invalid WithRequestMonitorFunc type %T, expected %T", t, exp))
}

func (o WithRequestMonitorOpt[F]) UDPServerApply(cfg *udpServer.Config) {
switch v := any(o.f).(type) {
case udpClient.RequestMonitorFunc:
cfg.RequestMonitor = v
default:
var exp udpClient.RequestMonitorFunc
panicForInvalidWithRequestMonitorFunc(v, exp)
}
}

func (o WithRequestMonitorOpt[F]) DTLSServerApply(cfg *dtlsServer.Config) {
switch v := any(o.f).(type) {
case udpClient.RequestMonitorFunc:
cfg.RequestMonitor = v
default:
var exp udpClient.RequestMonitorFunc
panicForInvalidWithRequestMonitorFunc(v, exp)
}
}

func (o WithRequestMonitorOpt[F]) TCPServerApply(cfg *tcpServer.Config) {
switch v := any(o.f).(type) {
case tcpClient.RequestMonitorFunc:
cfg.RequestMonitor = v
default:
var exp tcpClient.RequestMonitorFunc
panicForInvalidWithRequestMonitorFunc(v, exp)
}
}

// WithRequestMonitor enables request monitoring for the connection.
// It is called for each CoAP message received from the peer before it is processed.
// If it returns an error, the connection is closed.
// If it returns true, the message is dropped.
func WithRequestMonitor[F WithRequestMonitorFunc](requestMonitor F) WithRequestMonitorOpt[F] {
return WithRequestMonitorOpt[F]{
f: requestMonitor,
}
}

// CloseSocketOpt close socket option.
type CloseSocketOpt struct{}

Expand Down
7 changes: 4 additions & 3 deletions tcp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ func Client(conn net.Conn, opts ...Option) *client.Conn {

l := coapNet.NewConn(conn)
monitor := cfg.CreateInactivityMonitor()
cc := client.NewConn(l,
createBlockWise,
monitor,
cc := client.NewConnWithOpts(l,
&cfg,
client.WithBlockWise(createBlockWise),
client.WithInactivityMonitor(monitor),
client.WithRequestMonitor(cfg.RequestMonitor),
)

cfg.PeriodicRunner(func(now time.Time) bool {
Expand Down
4 changes: 4 additions & 0 deletions tcp/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ var DefaultConfig = func() Config {
CreateInactivityMonitor: func() InactivityMonitor {
return inactivity.NewNilMonitor[*Conn]()
},
RequestMonitor: func(*Conn, *pool.Message) (bool, error) {
return false, nil
},
Dialer: &net.Dialer{Timeout: time.Second * 3},
Net: "tcp",
ConnectionCacheSize: 2048,
Expand All @@ -38,6 +41,7 @@ var DefaultConfig = func() Config {
type Config struct {
config.Common[*Conn]
CreateInactivityMonitor CreateInactivityMonitorFunc
RequestMonitor RequestMonitorFunc
Net string
Dialer *net.Dialer
TLSCfg *tls.Config
Expand Down
55 changes: 53 additions & 2 deletions tcp/client/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/plgd-dev/go-coap/v3/net/blockwise"
"github.com/plgd-dev/go-coap/v3/net/client"
limitparallelrequests "github.com/plgd-dev/go-coap/v3/net/client/limitParallelRequests"
"github.com/plgd-dev/go-coap/v3/net/monitor/inactivity"
"github.com/plgd-dev/go-coap/v3/net/observation"
"github.com/plgd-dev/go-coap/v3/net/responsewriter"
coapErrors "github.com/plgd-dev/go-coap/v3/pkg/errors"
Expand All @@ -32,6 +33,7 @@ type (
EventFunc = func()
GetMIDFunc = func() int32
CreateInactivityMonitorFunc = func() InactivityMonitor
RequestMonitorFunc = func(cc *Conn, req *pool.Message) (drop bool, err error)
)

type Notifier interface {
Expand All @@ -54,16 +56,64 @@ type Conn struct {
receivedMessageReader *client.ReceivedMessageReader[*Conn]
}

type ConnOptions struct {
CreateBlockWise func(cc *Conn) *blockwise.BlockWise[*Conn]
InactivityMonitor InactivityMonitor
RequestMonitor RequestMonitorFunc
}

type Option = func(opts *ConnOptions)

// WithBlockWise enables block-wise transfer for the connection.
func WithBlockWise(createBlockWise func(cc *Conn) *blockwise.BlockWise[*Conn]) Option {
return func(opts *ConnOptions) {
opts.CreateBlockWise = createBlockWise
}
}

// WithInactivityMonitor enables inactivity monitor for the connection.
func WithInactivityMonitor(inactivityMonitor InactivityMonitor) Option {
return func(opts *ConnOptions) {
opts.InactivityMonitor = inactivityMonitor
}
}

// WithRequestMonitor enables request monitoring for the connection.
// It is called for each CoAP message received from the peer before it is processed.
// If it returns an error, the connection is closed.
// If it returns true, the message is dropped.
func WithRequestMonitor(requestMonitor RequestMonitorFunc) Option {
return func(opts *ConnOptions) {
opts.RequestMonitor = requestMonitor
}
}

// NewConn creates connection over session and observation.
func NewConn(
connection *coapNet.Conn,
createBlockWise func(cc *Conn) *blockwise.BlockWise[*Conn],
inactivityMonitor InactivityMonitor,
cfg *Config,
) *Conn {
return NewConnWithOpts(connection, cfg, WithBlockWise(createBlockWise), WithInactivityMonitor(inactivityMonitor))
}

func NewConnWithOpts(connection *coapNet.Conn, cfg *Config, opts ...Option) *Conn {
if cfg.GetToken == nil {
cfg.GetToken = message.GetToken
}
cfgOpts := ConnOptions{
CreateBlockWise: func(cc *Conn) *blockwise.BlockWise[*Conn] {
return nil
},
InactivityMonitor: inactivity.NewNilMonitor[*Conn](),
RequestMonitor: func(*Conn, *pool.Message) (bool, error) {
return false, nil
},
}
for _, o := range opts {
o(&cfgOpts)
}
cc := Conn{
tokenHandlerContainer: coapSync.NewMap[uint64, HandlerFunc](),
blockwiseSZX: cfg.BlockwiseSZX,
Expand All @@ -72,14 +122,15 @@ func NewConn(
limitParallelRequests := limitparallelrequests.New(cfg.LimitClientParallelRequests, cfg.LimitClientEndpointParallelRequests, cc.do, cc.doObserve)
cc.observationHandler = observation.NewHandler(&cc, cfg.Handler, limitParallelRequests.Do)
cc.Client = client.New(&cc, cc.observationHandler, cfg.GetToken, limitParallelRequests)
cc.blockWise = createBlockWise(&cc)
cc.blockWise = cfgOpts.CreateBlockWise(&cc)
session := NewSession(cfg.Ctx,
connection,
cfg.MaxMessageSize,
cfg.Errors,
cfg.DisableTCPSignalMessageCSM,
cfg.CloseSocket,
inactivityMonitor,
cfgOpts.InactivityMonitor,
cfgOpts.RequestMonitor,
cfg.ConnectionCacheSize,
cfg.MessagePool,
)
Expand Down
Loading

0 comments on commit 42d43eb

Please sign in to comment.