Skip to content

Commit

Permalink
feat: integrate wrphandlers
Browse files Browse the repository at this point in the history
  • Loading branch information
denopink committed Apr 4, 2024
1 parent 55007a0 commit b40c91c
Show file tree
Hide file tree
Showing 9 changed files with 519 additions and 51 deletions.
9 changes: 9 additions & 0 deletions cmd/xmidt-agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

// Config is the configuration for the xmidt-agent.
type Config struct {
Pubsub Pubsub
Websocket Websocket
Identity Identity
OperationalState OperationalState
Expand All @@ -30,6 +31,11 @@ type Config struct {
MockTr181 MockTr181
}

type Pubsub struct {
// PublishTimeout sets the timeout for publishing a message
PublishTimeout time.Duration
}

type Websocket struct {
// Disable determines whether or not to disable xmidt-agent's websocket
Disable bool
Expand Down Expand Up @@ -296,6 +302,9 @@ var defaultConfig = Config{
MaxInterval: 341*time.Second + 333*time.Millisecond,
},
},
Pubsub: Pubsub{
PublishTimeout: 200 * time.Millisecond,
},
MockTr181: MockTr181{
FilePath: "./mock_tr181.json",
Enabled: true,
Expand Down
10 changes: 6 additions & 4 deletions cmd/xmidt-agent/instructions.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ type instructionsIn struct {
}
type instructionsOut struct {
fx.Out
JWTXT *jwtxt.Instructions
DeviceID wrp.DeviceID
JWTXT *jwtxt.Instructions
DeviceID wrp.DeviceID
PartnerID string
}

func provideInstructions(in instructionsIn) (instructionsOut, error) {
Expand Down Expand Up @@ -93,6 +94,7 @@ func provideInstructions(in instructionsIn) (instructionsOut, error) {
jwtxt, err := jwtxt.New(opts...)

return instructionsOut{
JWTXT: jwtxt,
DeviceID: in.ID.DeviceID}, err
JWTXT: jwtxt,
DeviceID: in.ID.DeviceID,
PartnerID: in.ID.PartnerID}, err
}
28 changes: 18 additions & 10 deletions cmd/xmidt-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
_ "github.com/goschtalt/yaml-encoder"
"github.com/xmidt-org/sallust"
"github.com/xmidt-org/xmidt-agent/internal/credentials"
"github.com/xmidt-org/xmidt-agent/internal/pubsub"
"github.com/xmidt-org/xmidt-agent/internal/websocket"
"github.com/xmidt-org/xmidt-agent/internal/websocket/event"

Expand Down Expand Up @@ -48,12 +49,13 @@ type CLI struct {

type LifeCycleIn struct {
fx.In
Logger *zap.Logger
LC fx.Lifecycle
Shutdowner fx.Shutdowner
WS *websocket.Websocket
Cred *credentials.Credentials
CancelList []event.CancelFunc
Logger *zap.Logger
LC fx.Lifecycle
Shutdowner fx.Shutdowner
WS *websocket.Websocket
Cred *credentials.Credentials
EventCancelList []event.CancelFunc
PubSubCancelList []pubsub.CancelFunc
}

// xmidtAgent is the main entry point for the program. It is responsible for
Expand Down Expand Up @@ -94,10 +96,12 @@ func xmidtAgent(args []string) (*fx.App, error) {
goschtalt.UnmarshalFunc[XmidtService]("xmidt_service"),
goschtalt.UnmarshalFunc[Storage]("storage"),
goschtalt.UnmarshalFunc[Websocket]("websocket"),
goschtalt.UnmarshalFunc[MockTr181]("mocktr181"),
goschtalt.UnmarshalFunc[MockTr181]("mock_tr_181"),
goschtalt.UnmarshalFunc[Pubsub]("pubsub"),
),

fsProvide(),
provideWRPHandlers(),

fx.Invoke(
lifeCycle,
Expand Down Expand Up @@ -235,7 +239,7 @@ func onStart(cred *credentials.Credentials, ws *websocket.Websocket, logger *zap
}
}

func onStop(ws *websocket.Websocket, shutdowner fx.Shutdowner, cancelList []event.CancelFunc, logger *zap.Logger) func(context.Context) error {
func onStop(ws *websocket.Websocket, shutdowner fx.Shutdowner, eventCancelList []event.CancelFunc, pubsubCancelList []pubsub.CancelFunc, logger *zap.Logger) func(context.Context) error {
logger = logger.Named("on_stop")

return func(_ context.Context) error {
Expand All @@ -255,7 +259,11 @@ func onStop(ws *websocket.Websocket, shutdowner fx.Shutdowner, cancelList []even
}

ws.Stop()
for _, c := range cancelList {
for _, c := range eventCancelList {
c()
}

for _, c := range pubsubCancelList {
c()
}

Expand All @@ -268,7 +276,7 @@ func lifeCycle(in LifeCycleIn) {
in.LC.Append(
fx.Hook{
OnStart: onStart(in.Cred, in.WS, logger),
OnStop: onStop(in.WS, in.Shutdowner, in.CancelList, logger),
OnStop: onStop(in.WS, in.Shutdowner, in.EventCancelList, in.PubSubCancelList, logger),
},
)
}
207 changes: 207 additions & 0 deletions cmd/xmidt-agent/wrphandlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC
// SPDX-License-Identifier: Apache-2.0
package main

import (
"errors"

"github.com/xmidt-org/wrp-go/v3"
"github.com/xmidt-org/xmidt-agent/internal/pubsub"
"github.com/xmidt-org/xmidt-agent/internal/websocket"
"github.com/xmidt-org/xmidt-agent/internal/websocket/event"
"github.com/xmidt-org/xmidt-agent/internal/wrphandlers/auth"
"github.com/xmidt-org/xmidt-agent/internal/wrphandlers/missing"
"github.com/xmidt-org/xmidt-agent/internal/wrphandlers/mocktr181"
wshandler "github.com/xmidt-org/xmidt-agent/internal/wrphandlers/websocket"
"go.uber.org/fx"
)

var (
ErrWRPHandlerConfig = errors.New("wrphandler configuration error")
)

func provideWRPHandlers() fx.Option {
return fx.Options(
fx.Provide(
provideWSHandler,
providePubSubHandler,
provideMissingHandler,
provideAuthHandler,
),
fx.Invoke(provideWSEventorToHandlerAdaptor),
)
}

type wsAdaptorIn struct {
fx.In

// Configuration
WSConfg Websocket
WS *websocket.Websocket

// wrphandlers
AuthHandler *auth.Handler
WRPHandlerAdaptorCancel event.CancelFunc
}

func provideWSEventorToHandlerAdaptor(in wsAdaptorIn) {
if in.WSConfg.Disable {
return
}

in.WS.AddMessageListener(
event.MsgListenerFunc(func(m wrp.Message) {
_ = in.AuthHandler.HandleWrp(m)
}),
&in.WRPHandlerAdaptorCancel,
)
}

type wsHandlerIn struct {
fx.In

// Configuration
WSConfg Websocket

WS *websocket.Websocket
}

func provideWSHandler(in wsHandlerIn) (h *wshandler.Handler, err error) {
defer func() {
if err != nil {
err = errors.Join(ErrWRPHandlerConfig, err)
}
}()

if in.WSConfg.Disable {
return nil, nil
}

return wshandler.New(in.WS)
}

type missingIn struct {
fx.In

// Configuration
// Note, DeviceID and PartnerID is pulled from the Identity configuration
DeviceID wrp.DeviceID
WSConfg Websocket

// wrphandlers
WSHandler *wshandler.Handler
Pubsub *pubsub.PubSub
}

func provideMissingHandler(in missingIn) (h *missing.Handler, err error) {
defer func() {
if err != nil {
err = errors.Join(ErrWRPHandlerConfig, err)
}
}()

if in.WSConfg.Disable {
return nil, nil
}

return missing.New(in.Pubsub, in.WSHandler, string(in.DeviceID))
}

type authIn struct {
fx.In

// Configuration
// Note, DeviceID and PartnerID is pulled from the Identity configuration
DeviceID wrp.DeviceID
PartnerID string
WSConfg Websocket

// wrphandlers
WSHandler *wshandler.Handler
MissingHandler *missing.Handler
}

func provideAuthHandler(in authIn) (h *auth.Handler, err error) {
defer func() {
if err != nil {
err = errors.Join(ErrWRPHandlerConfig, err)
}
}()

if in.WSConfg.Disable {
return nil, nil
}

return auth.New(in.MissingHandler, in.WSHandler, string(in.DeviceID), in.PartnerID)
}

type pubsubIn struct {
fx.In

// Configuration
// Note, DeviceID and PartnerID is pulled from the Identity configuration
DeviceID wrp.DeviceID
Pubsub Pubsub
MockTr181 MockTr181
WSConfg Websocket

// wrphandlers
WSHandler *wshandler.Handler
}

type pubsubOut struct {
fx.Out

PubSub *pubsub.PubSub
PubSubCancelList []pubsub.CancelFunc
}

func providePubSubHandler(in pubsubIn) (out pubsubOut, err error) {
defer func() {
if err != nil {
err = errors.Join(ErrWRPHandlerConfig, err)
}
}()

if in.WSConfg.Disable {
return pubsubOut{}, nil
}

var (
egress pubsub.CancelFunc
cancelList = []pubsub.CancelFunc{egress}
)

opts := []pubsub.Option{
pubsub.WithPublishTimeout(in.Pubsub.PublishTimeout),
pubsub.WithEgressHandler(in.WSHandler, &egress),
}

var ps *pubsub.PubSub
if in.MockTr181.Enabled {
var mocktr pubsub.CancelFunc

mockDefaults := []mocktr181.Option{
mocktr181.FilePath("mock_tr181_test.json"),
mocktr181.Enabled(in.MockTr181.Enabled),
}
mocktr181Handler, err := mocktr181.New(ps, string(in.DeviceID), mockDefaults...)
if err != nil {
return pubsubOut{}, nil
}

opts = append(opts,
pubsub.WithServiceHandler("mocktr181", mocktr181Handler, &mocktr),
)
cancelList = append(cancelList, mocktr)
}
ps, err = pubsub.New(
in.DeviceID,
opts...,
)

return pubsubOut{
PubSub: ps,
PubSubCancelList: cancelList,
}, err
}
Loading

0 comments on commit b40c91c

Please sign in to comment.