Skip to content

Commit

Permalink
Adapt to latest hive.go changes
Browse files Browse the repository at this point in the history
  • Loading branch information
muXxer committed Apr 14, 2023
1 parent fb236af commit 03568c1
Show file tree
Hide file tree
Showing 19 changed files with 349 additions and 357 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.19
go-version: "1.20"
id: go

- name: Print Go version
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/gendoc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v1
with:
go-version: 1.19
go-version: "1.20"
id: go

- name: Print Go version
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# https://hub.docker.com/_/golang
FROM golang:1.19-bullseye AS build
FROM golang:1.20-bullseye AS build

# Ensure ca-certificates are up to date
RUN update-ca-certificates
Expand Down
3 changes: 3 additions & 0 deletions config_defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
"disableStacktrace": false,
"stacktraceLevel": "panic",
"encoding": "console",
"encodingConfig": {
"timeEncoder": "rfc3339"
},
"outputPaths": [
"stdout"
],
Expand Down
22 changes: 10 additions & 12 deletions core/app/app.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package app

import (
"github.com/iotaledger/hive.go/core/app"
"github.com/iotaledger/hive.go/core/app/core/shutdown"
"github.com/iotaledger/hive.go/core/app/plugins/profiling"
"github.com/iotaledger/hive.go/app"
"github.com/iotaledger/hive.go/app/components/profiling"
"github.com/iotaledger/hive.go/app/components/shutdown"
"github.com/iotaledger/inx-app/core/inx"
"github.com/iotaledger/inx-mqtt/core/mqtt"
"github.com/iotaledger/inx-mqtt/plugins/prometheus"
Expand All @@ -20,15 +20,13 @@ var (
func App() *app.App {
return app.New(Name, Version,
app.WithInitComponent(InitComponent),
app.WithCoreComponents([]*app.CoreComponent{
inx.CoreComponent,
mqtt.CoreComponent,
shutdown.CoreComponent,
}...),
app.WithPlugins([]*app.Plugin{
profiling.Plugin,
prometheus.Plugin,
}...),
app.WithComponents(
inx.Component,
mqtt.Component,
shutdown.Component,
profiling.Component,
prometheus.Component,
),
)
}

Expand Down
30 changes: 14 additions & 16 deletions core/mqtt/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (

"go.uber.org/dig"

"github.com/iotaledger/hive.go/core/app"
"github.com/iotaledger/hive.go/core/app/pkg/shutdown"
"github.com/iotaledger/hive.go/app"
"github.com/iotaledger/hive.go/app/shutdown"
"github.com/iotaledger/inx-app/pkg/nodebridge"
"github.com/iotaledger/inx-mqtt/pkg/daemon"
"github.com/iotaledger/inx-mqtt/pkg/mqtt"
Expand All @@ -17,14 +17,12 @@ const (
)

func init() {
CoreComponent = &app.CoreComponent{
Component: &app.Component{
Name: "MQTT",
DepsFunc: func(cDeps dependencies) { deps = cDeps },
Params: params,
Provide: provide,
Run: run,
},
Component = &app.Component{
Name: "MQTT",
DepsFunc: func(cDeps dependencies) { deps = cDeps },
Params: params,
Provide: provide,
Run: run,
}
}

Expand All @@ -35,8 +33,8 @@ type dependencies struct {
}

var (
CoreComponent *app.CoreComponent
deps dependencies
Component *app.Component
deps dependencies
)

func provide(c *dig.Container) error {
Expand All @@ -49,7 +47,7 @@ func provide(c *dig.Container) error {

return c.Provide(func(deps inDeps) (*Server, error) {
return NewServer(
CoreComponent.Logger(),
Component.Logger(),
deps.NodeBridge,
deps.ShutdownHandler,
mqtt.WithBufferSize(ParamsMQTT.BufferSize),
Expand All @@ -73,9 +71,9 @@ func provide(c *dig.Container) error {
}

func run() error {
return CoreComponent.Daemon().BackgroundWorker("MQTT", func(ctx context.Context) {
CoreComponent.LogInfo("Starting MQTT Broker ...")
return Component.Daemon().BackgroundWorker("MQTT", func(ctx context.Context) {
Component.LogInfo("Starting MQTT Broker ...")
deps.Server.Run(ctx)
CoreComponent.LogInfo("Stopped MQTT Broker")
Component.LogInfo("Stopped MQTT Broker")
}, daemon.PriorityStopMQTT)
}
2 changes: 1 addition & 1 deletion core/mqtt/params.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package mqtt

import "github.com/iotaledger/hive.go/core/app"
import "github.com/iotaledger/hive.go/app"

type ParametersMQTT struct {
BufferSize int `default:"0" usage:"the size of the client buffers in bytes"`
Expand Down
63 changes: 31 additions & 32 deletions core/mqtt/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/iotaledger/hive.go/core/app/pkg/shutdown"
"github.com/iotaledger/hive.go/core/events"
"github.com/iotaledger/hive.go/core/generics/event"
"github.com/iotaledger/hive.go/core/logger"
"github.com/iotaledger/hive.go/core/subscriptionmanager"
"github.com/iotaledger/hive.go/app/shutdown"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/web/subscriptionmanager"
"github.com/iotaledger/inx-app/pkg/nodebridge"
"github.com/iotaledger/inx-mqtt/pkg/mqtt"
inx "github.com/iotaledger/inx/go"
Expand Down Expand Up @@ -83,20 +82,21 @@ func (s *Server) Run(ctx context.Context) {
s.LogErrorfAndExit("failed to create MQTT broker: %s", err.Error())
}

// events
broker.Events().ClientConnected.Hook(event.NewClosure(func(event *subscriptionmanager.ClientEvent[string]) {
s.onClientConnect(event.ClientID)
}))
broker.Events().ClientDisconnected.Hook(event.NewClosure(func(event *subscriptionmanager.ClientEvent[string]) {
s.onClientDisconnect(event.ClientID)
}))
broker.Events().TopicSubscribed.Hook(event.NewClosure(func(event *subscriptionmanager.ClientTopicEvent[string, string]) {
s.onSubscribeTopic(ctx, event.ClientID, event.Topic)
}))
broker.Events().TopicUnsubscribed.Hook(event.NewClosure(func(event *subscriptionmanager.ClientTopicEvent[string, string]) {
s.onUnsubscribeTopic(event.ClientID, event.Topic)
}))

// register broker events
unhookBrokerEvents := lo.Batch(
broker.Events().ClientConnected.Hook(func(event *subscriptionmanager.ClientEvent[string]) {
s.onClientConnect(event.ClientID)
}).Unhook,
broker.Events().ClientDisconnected.Hook(func(event *subscriptionmanager.ClientEvent[string]) {
s.onClientDisconnect(event.ClientID)
}).Unhook,
broker.Events().TopicSubscribed.Hook(func(event *subscriptionmanager.ClientTopicEvent[string, string]) {
s.onSubscribeTopic(ctx, event.ClientID, event.Topic)
}).Unhook,
broker.Events().TopicUnsubscribed.Hook(func(event *subscriptionmanager.ClientTopicEvent[string, string]) {
s.onUnsubscribeTopic(event.ClientID, event.Topic)
}).Unhook,
)
s.MQTTBroker = broker

if err := broker.Start(); err != nil {
Expand All @@ -113,29 +113,28 @@ func (s *Server) Run(ctx context.Context) {
advertisedAddress = s.brokerOptions.WebsocketAdvertiseAddress
}

if err := deps.NodeBridge.RegisterAPIRoute(ctxRegister, APIRoute, advertisedAddress); err != nil {
if err := deps.NodeBridge.RegisterAPIRoute(ctxRegister, APIRoute, advertisedAddress, ""); err != nil {
s.LogErrorfAndExit("failed to register API route via INX: %s", err.Error())
}
s.LogInfo("Registering API route ... done")
cancelRegister()
}

onLatestMilestone := events.NewClosure(func(ms *nodebridge.Milestone) {
s.PublishMilestoneOnTopic(topicMilestoneInfoLatest, ms)
})

onConfirmedMilestone := events.NewClosure(func(ms *nodebridge.Milestone) {
s.PublishMilestoneOnTopic(topicMilestoneInfoConfirmed, ms)
})

s.NodeBridge.Events.LatestMilestoneChanged.Hook(onLatestMilestone)
s.NodeBridge.Events.ConfirmedMilestoneChanged.Hook(onConfirmedMilestone)
// register node bridge events
unhookNodeBridgeEvents := lo.Batch(
s.NodeBridge.Events.LatestMilestoneChanged.Hook(func(ms *nodebridge.Milestone) {
s.PublishMilestoneOnTopic(topicMilestoneInfoLatest, ms)
}).Unhook,
s.NodeBridge.Events.ConfirmedMilestoneChanged.Hook(func(ms *nodebridge.Milestone) {
s.PublishMilestoneOnTopic(topicMilestoneInfoConfirmed, ms)
}).Unhook,
)

s.LogInfo("Starting MQTT Broker ... done")
<-ctx.Done()

s.NodeBridge.Events.LatestMilestoneChanged.Detach(onLatestMilestone)
s.NodeBridge.Events.ConfirmedMilestoneChanged.Detach(onConfirmedMilestone)
unhookBrokerEvents()
unhookNodeBridgeEvents()

if s.brokerOptions.WebsocketEnabled {
ctxUnregister, cancelUnregister := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down
30 changes: 20 additions & 10 deletions documentation/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,22 @@ Example:

## <a id="logger"></a> 2. Logger

| Name | Description | Type | Default value |
| ----------------- | --------------------------------------------------------------------------- | ------- | ------------- |
| level | The minimum enabled logging level | string | "info" |
| disableCaller | Stops annotating logs with the calling function's file name and line number | boolean | true |
| disableStacktrace | Disables automatic stacktrace capturing | boolean | false |
| stacktraceLevel | The level stacktraces are captured and above | string | "panic" |
| encoding | The logger's encoding (options: "json", "console") | string | "console" |
| outputPaths | A list of URLs, file paths or stdout/stderr to write logging output to | array | stdout |
| disableEvents | Prevents log messages from being raced as events | boolean | true |
| Name | Description | Type | Default value |
| ---------------------------------------- | --------------------------------------------------------------------------- | ------- | ------------- |
| level | The minimum enabled logging level | string | "info" |
| disableCaller | Stops annotating logs with the calling function's file name and line number | boolean | true |
| disableStacktrace | Disables automatic stacktrace capturing | boolean | false |
| stacktraceLevel | The level stacktraces are captured and above | string | "panic" |
| encoding | The logger's encoding (options: "json", "console") | string | "console" |
| [encodingConfig](#logger_encodingconfig) | Configuration for encodingConfig | object | |
| outputPaths | A list of URLs, file paths or stdout/stderr to write logging output to | array | stdout |
| disableEvents | Prevents log messages from being raced as events | boolean | true |

### <a id="logger_encodingconfig"></a> EncodingConfig

| Name | Description | Type | Default value |
| ----------- | ---------------------------------------------------------------------------------------------------------- | ------ | ------------- |
| timeEncoder | Sets the logger's timestamp encoding. (options: "nanos", "millis", "iso8601", "rfc3339" and "rfc3339nano") | string | "rfc3339" |

Example:

Expand All @@ -91,6 +98,9 @@ Example:
"disableStacktrace": false,
"stacktraceLevel": "panic",
"encoding": "console",
"encodingConfig": {
"timeEncoder": "rfc3339"
},
"outputPaths": [
"stdout"
],
Expand Down Expand Up @@ -209,7 +219,7 @@ Example:

| Name | Description | Type | Default value |
| ----------- | ------------------------------------------------- | ------- | ---------------- |
| enabled | Whether the profiling plugin is enabled | boolean | false |
| enabled | Whether the profiling component is enabled | boolean | false |
| bindAddress | The bind address on which the profiler listens on | string | "localhost:6060" |

Example:
Expand Down
Loading

0 comments on commit 03568c1

Please sign in to comment.