Skip to content

Commit

Permalink
feat: connect to xmidt cluster
Browse files Browse the repository at this point in the history
feat: connect to xmidt cluster
  • Loading branch information
denopink committed Mar 20, 2024
1 parent dc7f415 commit 6dc3743
Show file tree
Hide file tree
Showing 10 changed files with 456 additions and 50 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*.out

# VS Code directories
.vscode
*.code-workspace
.vscode/*
.dev/*
__debug_bin*

# Dependency directories (remove the comment below to include it)
# vendor/
Expand Down
2 changes: 1 addition & 1 deletion MAINTAINERS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Maintainers of this repository:

* Weston Schmidt @schmidtw
* Joel Unzain @joe94
* Owen Cabalceta @denopink
* John Bass @johnabass
* Nick Harter @njharter
37 changes: 37 additions & 0 deletions cmd/xmidt-agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (

"github.com/goschtalt/goschtalt"
"github.com/xmidt-org/arrange/arrangehttp"
"github.com/xmidt-org/retry"
"github.com/xmidt-org/sallust"
"github.com/xmidt-org/wrp-go/v3"
"gopkg.in/dealancer/validate.v2"
)

// Config is the configuration for the xmidt-agent.
type Config struct {
Client Client
Identity Identity
OperationalState OperationalState
XmidtCredentials XmidtCredentials
Expand All @@ -26,6 +28,41 @@ type Config struct {
Storage Storage
}

type Client struct {
// FetchURL is the url used to fetch the WS url.
FetchURL string
// (optional) FetchURLTimeout is the timeout for the fetching the WS url. If this is not set, the default is 30 seconds.
FetchURLTimeout time.Duration
// (optional) PingInterval is the ping interval allowed for the WS connection.
PingInterval time.Duration
// (optional) PingTimeout is the ping timeout for the WS connection.
PingTimeout time.Duration
// (optional) ConnectTimeout is the connect timeout for the WS connection.
ConnectTimeout time.Duration
// (optional) KeepAliveInterval is the keep alive interval for the WS connection.
KeepAliveInterval time.Duration
// (optional) IdleConnTimeout is the idle connection timeout for the WS connection.
IdleConnTimeout time.Duration
// (optional) TLSHandshakeTimeout is the TLS handshake timeout for the WS connection.
TLSHandshakeTimeout time.Duration
// (optional) ExpectContinueTimeout is the expect continue timeout for the WS connection.
ExpectContinueTimeout time.Duration
// (optional) MaxMessageBytes is the largest allowable message to send or receive.
MaxMessageBytes int64
// (optional) DisableV4 determines whether or not to allow IPv4 for the WS connection.
// If this is not set, the default is false (IPv4 is enabled).
// Either V4 or V6 can be disabled, but not both.
DisableV4 bool
// (optional) DisableV6 determines whether or not to allow IPv6 for the WS connection.
// If this is not set, the default is false (IPv6 is enabled).
// Either V4 or V6 can be disabled, but not both.
DisableV6 bool
// (optional) RetryPolicy sets the retry policy factory used for delaying between retry attempts for reconnection.
RetryPolicy retry.Config
// (optional) Once sets whether or not to only attempt to connect once.
Once bool
}

// Identity contains the information that identifies the device.
type Identity struct {
// DeviceID is the unique identifier for the device. Generally this is a
Expand Down
110 changes: 88 additions & 22 deletions cmd/xmidt-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,19 @@ import (
"context"
"fmt"
"os"
"runtime/debug"

"github.com/alecthomas/kong"
"github.com/goschtalt/goschtalt"
_ "github.com/goschtalt/goschtalt/pkg/typical"
_ "github.com/goschtalt/yaml-decoder"
_ "github.com/goschtalt/yaml-encoder"
"github.com/xmidt-org/sallust"
"github.com/xmidt-org/wrp-go/v3"
"github.com/xmidt-org/xmidt-agent/internal/credentials"
"github.com/xmidt-org/xmidt-agent/internal/jwtxt"
"github.com/xmidt-org/xmidt-agent/internal/websocket"
"github.com/xmidt-org/xmidt-agent/internal/websocket/event"

"go.uber.org/fx"
"go.uber.org/fx/fxevent"
Expand Down Expand Up @@ -43,6 +47,15 @@ type CLI struct {
Files []string `optional:"" short:"f" help:"Specific configuration files or directories."`
}

type LifeCycleIn struct {
fx.In
Logger *zap.Logger
LC fx.Lifecycle
Shutdowner fx.Shutdowner
WS *websocket.Websocket
CancelList []event.CancelFunc
}

// xmidtAgent is the main entry point for the program. It is responsible for
// setting up the dependency injection framework and returning the app object.
func xmidtAgent(args []string) (*fx.App, error) {
Expand Down Expand Up @@ -72,13 +85,18 @@ func xmidtAgent(args []string) (*fx.App, error) {
provideConfig,
provideCredentials,
provideInstructions,
provideWS,
func(id Identity) wrp.DeviceID {
return id.DeviceID
},

goschtalt.UnmarshalFunc[sallust.Config]("logger", goschtalt.Optional()),
goschtalt.UnmarshalFunc[Identity]("identity"),
goschtalt.UnmarshalFunc[OperationalState]("operational_state"),
goschtalt.UnmarshalFunc[XmidtCredentials]("xmidt_credentials"),
goschtalt.UnmarshalFunc[XmidtService]("xmidt_service"),
goschtalt.UnmarshalFunc[Storage]("storage"),
goschtalt.UnmarshalFunc[Client]("client"),
),

fsProvide(),
Expand All @@ -96,6 +114,7 @@ func xmidtAgent(args []string) (*fx.App, error) {
fmt.Println(s)
}
},
lifeCycle,
),
)

Expand Down Expand Up @@ -170,29 +189,76 @@ func provideCLIWithOpts(args cliArgs, testOpts bool) (*CLI, error) {
return &cli, nil
}

type LoggerIn struct {
fx.In
CLI *CLI
Cfg sallust.Config
}

// Create the logger and configure it based on if the program is in
// debug mode or normal mode.
func provideLogger(cli *CLI, cfg sallust.Config) (*zap.Logger, error) {
if cli.Dev {
cfg.Level = "DEBUG"
cfg.Development = true
cfg.Encoding = "console"
cfg.EncoderConfig = sallust.EncoderConfig{
TimeKey: "T",
LevelKey: "L",
NameKey: "N",
CallerKey: "C",
FunctionKey: zapcore.OmitKey,
MessageKey: "M",
StacktraceKey: "S",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: "capitalColor",
EncodeTime: "RFC3339",
EncodeDuration: "string",
EncodeCaller: "short",
}
cfg.OutputPaths = []string{"stderr"}
cfg.ErrorOutputPaths = []string{"stderr"}
func provideLogger(in LoggerIn) (*zap.Logger, error) {
in.Cfg.EncoderConfig = sallust.EncoderConfig{
TimeKey: "T",
LevelKey: "L",
NameKey: "N",
CallerKey: "C",
FunctionKey: zapcore.OmitKey,
MessageKey: "M",
StacktraceKey: "S",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: "capitalColor",
EncodeTime: "RFC3339",
EncodeDuration: "string",
EncodeCaller: "short",
}

if in.CLI.Dev {
in.Cfg.Level = "DEBUG"
in.Cfg.Development = true
in.Cfg.Encoding = "console"
in.Cfg.OutputPaths = append(in.Cfg.OutputPaths, "stderr")
in.Cfg.ErrorOutputPaths = append(in.Cfg.ErrorOutputPaths, "stderr")
}
return cfg.Build()

return in.Cfg.Build()
}

func lifeCycle(in LifeCycleIn) {
logger := in.Logger.With(zap.String("component", "fx_lifecycle"))
in.LC.Append(
fx.Hook{
OnStart: func(ctx context.Context) error {
defer func() {
if r := recover(); nil != r {
logger.Error("stacktrace from panic", zap.String("stacktrace", string(debug.Stack())), zap.Any("panic", r))
}
}()

logger.Info("starting ws")
in.WS.Start()

return nil
},
OnStop: func(ctx context.Context) error {
defer func() {
if r := recover(); nil != r {
logger.Error("stacktrace from panic", zap.String("stacktrace", string(debug.Stack())), zap.Any("panic", r))
}

if err := in.Shutdowner.Shutdown(); err != nil {
logger.Error("encountered error trying to shutdown app: ", zap.Error(err))
}
}()

logger.Info("stopping ws listeners")
in.WS.Stop()
for _, c := range in.CancelList {
c()
}

return nil
},
},
)
}
Loading

0 comments on commit 6dc3743

Please sign in to comment.