Skip to content

Commit

Permalink
feat: integrate libparodus
Browse files Browse the repository at this point in the history
  • Loading branch information
denopink committed May 16, 2024
1 parent 55f7bfb commit 02156d6
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 19 deletions.
18 changes: 18 additions & 0 deletions cmd/xmidt-agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
type Config struct {
Pubsub Pubsub
Websocket Websocket
LibParodus LibParodus
Identity Identity
OperationalState OperationalState
XmidtCredentials XmidtCredentials
Expand All @@ -43,6 +44,17 @@ type Config struct {
Metadata Metadata
}

type LibParodus struct {
// ParodusServiceURL is the service url used by libparodus
ParodusServiceURL string
// KeepAliveInterval is the keep alive interval for libparodus.
KeepAliveInterval time.Duration
// ReceiveTimeout is the Receive timeout for libparodus.
ReceiveTimeout time.Duration
// SendTimeout is the send timeout for libparodus.
SendTimeout time.Duration
}

type QOS struct {
// MaxQueueBytes is the allowable max size of the qos' priority queue, based on the sum of all queued wrp message's payload.
MaxQueueBytes int64
Expand Down Expand Up @@ -361,6 +373,12 @@ var defaultConfig = Config{
MaxInterval: 341*time.Second + 333*time.Millisecond,
},
},
LibParodus: LibParodus{
ParodusServiceURL: "tcp://127.0.0.1:6666",
KeepAliveInterval: 30 * time.Second,
ReceiveTimeout: 1 * time.Second,
SendTimeout: 1 * time.Second,
},
Pubsub: Pubsub{
PublishTimeout: 200 * time.Millisecond,
},
Expand Down
32 changes: 32 additions & 0 deletions cmd/xmidt-agent/libparodus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package main

import (
"errors"

"github.com/xmidt-org/xmidt-agent/internal/adapters/libparodus"
"github.com/xmidt-org/xmidt-agent/internal/pubsub"
"go.uber.org/fx"
)

type libParodusIn struct {
fx.In

// Configuration
LibParodus LibParodus

PubSub *pubsub.PubSub
}

func provideLibParodus(in libParodusIn) (*libparodus.Adapter, error) {
libParodusDefaults := []libparodus.Option{
libparodus.KeepaliveInterval(in.LibParodus.KeepAliveInterval),
libparodus.ReceiveTimeout(in.LibParodus.ReceiveTimeout),
libparodus.SendTimeout(in.LibParodus.SendTimeout),
}
libParodus, err := libparodus.New(in.LibParodus.ParodusServiceURL, in.PubSub, libParodusDefaults...)
if err != nil {
return nil, errors.Join(ErrWRPHandlerConfig, err)
}

return libParodus, nil
}
16 changes: 11 additions & 5 deletions cmd/xmidt-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/alecthomas/kong"
"github.com/goschtalt/goschtalt"
"github.com/xmidt-org/sallust"
"github.com/xmidt-org/xmidt-agent/internal/adapters/libparodus"
"github.com/xmidt-org/xmidt-agent/internal/credentials"
"github.com/xmidt-org/xmidt-agent/internal/loglevel"
"github.com/xmidt-org/xmidt-agent/internal/websocket"
Expand Down Expand Up @@ -49,6 +50,7 @@ type LifeCycleIn struct {
LC fx.Lifecycle
Shutdowner fx.Shutdowner
WS *websocket.Websocket
LibParodus *libparodus.Adapter
QOS *qos.Handler
Cred *credentials.Credentials
WaitUntilFetched time.Duration `name:"wait_until_fetched"`
Expand Down Expand Up @@ -85,6 +87,7 @@ func xmidtAgent(args []string) (*fx.App, error) {
provideCredentials,
provideInstructions,
provideWS,
provideLibParodus,

goschtalt.UnmarshalFunc[sallust.Config]("logger", goschtalt.Optional()),
goschtalt.UnmarshalFunc[Identity]("identity"),
Expand All @@ -97,6 +100,7 @@ func xmidtAgent(args []string) (*fx.App, error) {
goschtalt.UnmarshalFunc[Pubsub]("pubsub"),
goschtalt.UnmarshalFunc[Metadata]("metadata"),
goschtalt.UnmarshalFunc[QOS]("qos"),
goschtalt.UnmarshalFunc[LibParodus]("lib_parodus"),

provideNetworkService,
provideMetadataProvider,
Expand Down Expand Up @@ -211,7 +215,7 @@ func provideLogger(in LoggerIn) (*zap.AtomicLevel, *zap.Logger, error) {
return &zcfg.Level, logger, err
}

func onStart(cred *credentials.Credentials, ws *websocket.Websocket, qos *qos.Handler, waitUntilFetched time.Duration, logger *zap.Logger) func(context.Context) error {
func onStart(cred *credentials.Credentials, ws *websocket.Websocket, libParodus *libparodus.Adapter, qos *qos.Handler, waitUntilFetched time.Duration, logger *zap.Logger) func(context.Context) error {
logger = logger.Named("on_start")

return func(ctx context.Context) error {
Expand All @@ -235,13 +239,14 @@ func onStart(cred *credentials.Credentials, ws *websocket.Websocket, qos *qos.Ha
}

ws.Start()
err := libParodus.Start()
qos.Start()

return nil
return err
}
}

func onStop(ws *websocket.Websocket, qos *qos.Handler, shutdowner fx.Shutdowner, cancels []func(), logger *zap.Logger) func(context.Context) error {
func onStop(ws *websocket.Websocket, libParodus *libparodus.Adapter, qos *qos.Handler, shutdowner fx.Shutdowner, cancels []func(), logger *zap.Logger) func(context.Context) error {
logger = logger.Named("on_stop")

return func(_ context.Context) error {
Expand All @@ -261,6 +266,7 @@ func onStop(ws *websocket.Websocket, qos *qos.Handler, shutdowner fx.Shutdowner,
}

ws.Stop()
libParodus.Stop()
qos.Stop()
for _, c := range cancels {
c()
Expand All @@ -274,8 +280,8 @@ func lifeCycle(in LifeCycleIn) {
logger := in.Logger.Named("fx_lifecycle")
in.LC.Append(
fx.Hook{
OnStart: onStart(in.Cred, in.WS, in.QOS, in.WaitUntilFetched, logger),
OnStop: onStop(in.WS, in.QOS, in.Shutdowner, in.Cancels, logger),
OnStart: onStart(in.Cred, in.WS, in.LibParodus, in.QOS, in.WaitUntilFetched, logger),
OnStop: onStop(in.WS, in.LibParodus, in.QOS, in.Shutdowner, in.Cancels, logger),
},
)
}
7 changes: 0 additions & 7 deletions internal/adapters/libparodus/libparodus.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,6 @@ var _ Option = optionFunc(nil)

// New creates a new Service with the given options.
func New(url string, pubsub *pubsub.PubSub, opts ...Option) (*Adapter, error) {
defaults := []Option{
KeepaliveInterval(30 * time.Second),
ReceiveTimeout(1 * time.Second),
SendTimeout(1 * time.Second),
}

required := []Option{
validatePubSub(),
validateParodusServiceURL(),
Expand All @@ -76,7 +70,6 @@ func New(url string, pubsub *pubsub.PubSub, opts ...Option) (*Adapter, error) {
subServices: make(map[string]*external),
}

opts = append(defaults, opts...)
opts = append(opts, required...)

for _, o := range opts {
Expand Down
7 changes: 0 additions & 7 deletions internal/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,6 @@ func New(self wrp.DeviceID, opts ...Option) (*PubSub, error) {
),
}

defaults := []Option{
WithPublishTimeout(5 * time.Second),
}

// Prepend the defaults to the provided options.
opts = append(defaults, opts...)

for _, opt := range opts {
if opt != nil {
if err := opt.apply(&ps); err != nil {
Expand Down

0 comments on commit 02156d6

Please sign in to comment.