Skip to content

Commit

Permalink
Adapt to latest hive.go changes (#95)
Browse files Browse the repository at this point in the history
* Adapt to latest hive.go changes

* Move `core` and `plugins` to `components` folder

* Fix gendoc

* Disable depguard linter

* Update workflows

* Fix linter warnings

* Update to go version 1.21
  • Loading branch information
muXxer authored Aug 30, 2023
1 parent 1bd420b commit 97ac91b
Show file tree
Hide file tree
Showing 26 changed files with 703 additions and 695 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Set up Go
uses: actions/setup-go@v3
uses: actions/setup-go@v4
with:
go-version: 1.19
go-version: "1.21"
id: go

- name: Print Go version
run: go version

- name: Check out code into the Go module directory
uses: actions/checkout@v2
uses: actions/checkout@v3

- name: Build
run: go build -v .
Expand All @@ -28,7 +28,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v2
uses: actions/checkout@v3

- name: Build Docker image
run: docker build . --file Dockerfile --tag inx-mqtt:latest
8 changes: 4 additions & 4 deletions .github/workflows/gendoc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,23 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Set up Go
uses: actions/setup-go@v1
uses: actions/setup-go@v4
with:
go-version: 1.19
go-version: "1.21"
id: go

- name: Print Go version
run: go version

- name: Check out code into the Go module directory
uses: actions/checkout@v2
uses: actions/checkout@v3

- name: Run gendoc
working-directory: tools/gendoc
run: go mod tidy && go run main.go

- name: Create Pull Request
uses: peter-evans/create-pull-request@v4
uses: peter-evans/create-pull-request@v5
with:
token: ${{ secrets.GITHUB_TOKEN }}
title: "chore(gendoc): update docs"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
password: ${{ secrets.IOTALEDGER_DOCKER_PASSWORD }}

- name: Build and push to Dockerhub
uses: docker/build-push-action@v3
uses: docker/build-push-action@v4
with:
file: ./Dockerfile
platforms: linux/amd64,linux/arm64
Expand Down
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ linters:
- containedctx
- contextcheck
- decorder
- depguard
#- depguard
- dogsled
- dupl
- durationcheck
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# https://hub.docker.com/_/golang
FROM golang:1.19-bullseye AS build
FROM golang:1.21-bullseye AS build

# Ensure ca-certificates are up to date
RUN update-ca-certificates
Expand Down
48 changes: 48 additions & 0 deletions components/app/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package app

import (
"github.com/iotaledger/hive.go/app"
"github.com/iotaledger/hive.go/app/components/profiling"
"github.com/iotaledger/hive.go/app/components/shutdown"
"github.com/iotaledger/inx-app/components/inx"
"github.com/iotaledger/inx-mqtt/components/mqtt"
"github.com/iotaledger/inx-mqtt/components/prometheus"
)

var (
// Name of the app.
Name = "inx-mqtt"

// Version of the app.
Version = "1.0.0-rc.2"
)

func App() *app.App {
return app.New(Name, Version,
app.WithInitComponent(InitComponent),
app.WithComponents(
inx.Component,
mqtt.Component,
shutdown.Component,
profiling.Component,
prometheus.Component,
),
)
}

var (
InitComponent *app.InitComponent
)

func init() {
InitComponent = &app.InitComponent{
Component: &app.Component{
Name: "App",
},
NonHiddenFlags: []string{
"config",
"help",
"version",
},
}
}
30 changes: 14 additions & 16 deletions core/mqtt/component.go → components/mqtt/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (

"go.uber.org/dig"

"github.com/iotaledger/hive.go/core/app"
"github.com/iotaledger/hive.go/core/app/pkg/shutdown"
"github.com/iotaledger/hive.go/app"
"github.com/iotaledger/hive.go/app/shutdown"
"github.com/iotaledger/inx-app/pkg/nodebridge"
"github.com/iotaledger/inx-mqtt/pkg/daemon"
"github.com/iotaledger/inx-mqtt/pkg/mqtt"
Expand All @@ -17,14 +17,12 @@ const (
)

func init() {
CoreComponent = &app.CoreComponent{
Component: &app.Component{
Name: "MQTT",
DepsFunc: func(cDeps dependencies) { deps = cDeps },
Params: params,
Provide: provide,
Run: run,
},
Component = &app.Component{
Name: "MQTT",
DepsFunc: func(cDeps dependencies) { deps = cDeps },
Params: params,
Provide: provide,
Run: run,
}
}

Expand All @@ -35,8 +33,8 @@ type dependencies struct {
}

var (
CoreComponent *app.CoreComponent
deps dependencies
Component *app.Component
deps dependencies
)

func provide(c *dig.Container) error {
Expand All @@ -49,7 +47,7 @@ func provide(c *dig.Container) error {

return c.Provide(func(deps inDeps) (*Server, error) {
return NewServer(
CoreComponent.Logger(),
Component.Logger(),
deps.NodeBridge,
deps.ShutdownHandler,
mqtt.WithBufferSize(ParamsMQTT.BufferSize),
Expand All @@ -73,9 +71,9 @@ func provide(c *dig.Container) error {
}

func run() error {
return CoreComponent.Daemon().BackgroundWorker("MQTT", func(ctx context.Context) {
CoreComponent.LogInfo("Starting MQTT Broker ...")
return Component.Daemon().BackgroundWorker("MQTT", func(ctx context.Context) {
Component.LogInfo("Starting MQTT Broker ...")
deps.Server.Run(ctx)
CoreComponent.LogInfo("Stopped MQTT Broker")
Component.LogInfo("Stopped MQTT Broker")
}, daemon.PriorityStopMQTT)
}
2 changes: 1 addition & 1 deletion core/mqtt/params.go → components/mqtt/params.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package mqtt

import "github.com/iotaledger/hive.go/core/app"
import "github.com/iotaledger/hive.go/app"

type ParametersMQTT struct {
BufferSize int `default:"0" usage:"the size of the client buffers in bytes"`
Expand Down
File renamed without changes.
63 changes: 31 additions & 32 deletions core/mqtt/server.go → components/mqtt/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/iotaledger/hive.go/core/app/pkg/shutdown"
"github.com/iotaledger/hive.go/core/events"
"github.com/iotaledger/hive.go/core/generics/event"
"github.com/iotaledger/hive.go/core/logger"
"github.com/iotaledger/hive.go/core/subscriptionmanager"
"github.com/iotaledger/hive.go/app/shutdown"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/web/subscriptionmanager"
"github.com/iotaledger/inx-app/pkg/nodebridge"
"github.com/iotaledger/inx-mqtt/pkg/mqtt"
inx "github.com/iotaledger/inx/go"
Expand Down Expand Up @@ -83,20 +82,21 @@ func (s *Server) Run(ctx context.Context) {
s.LogErrorfAndExit("failed to create MQTT broker: %s", err.Error())
}

// events
broker.Events().ClientConnected.Hook(event.NewClosure(func(event *subscriptionmanager.ClientEvent[string]) {
s.onClientConnect(event.ClientID)
}))
broker.Events().ClientDisconnected.Hook(event.NewClosure(func(event *subscriptionmanager.ClientEvent[string]) {
s.onClientDisconnect(event.ClientID)
}))
broker.Events().TopicSubscribed.Hook(event.NewClosure(func(event *subscriptionmanager.ClientTopicEvent[string, string]) {
s.onSubscribeTopic(ctx, event.ClientID, event.Topic)
}))
broker.Events().TopicUnsubscribed.Hook(event.NewClosure(func(event *subscriptionmanager.ClientTopicEvent[string, string]) {
s.onUnsubscribeTopic(event.ClientID, event.Topic)
}))

// register broker events
unhookBrokerEvents := lo.Batch(
broker.Events().ClientConnected.Hook(func(event *subscriptionmanager.ClientEvent[string]) {
s.onClientConnect(event.ClientID)
}).Unhook,
broker.Events().ClientDisconnected.Hook(func(event *subscriptionmanager.ClientEvent[string]) {
s.onClientDisconnect(event.ClientID)
}).Unhook,
broker.Events().TopicSubscribed.Hook(func(event *subscriptionmanager.ClientTopicEvent[string, string]) {
s.onSubscribeTopic(ctx, event.ClientID, event.Topic)
}).Unhook,
broker.Events().TopicUnsubscribed.Hook(func(event *subscriptionmanager.ClientTopicEvent[string, string]) {
s.onUnsubscribeTopic(event.ClientID, event.Topic)
}).Unhook,
)
s.MQTTBroker = broker

if err := broker.Start(); err != nil {
Expand All @@ -113,29 +113,28 @@ func (s *Server) Run(ctx context.Context) {
advertisedAddress = s.brokerOptions.WebsocketAdvertiseAddress
}

if err := deps.NodeBridge.RegisterAPIRoute(ctxRegister, APIRoute, advertisedAddress); err != nil {
if err := deps.NodeBridge.RegisterAPIRoute(ctxRegister, APIRoute, advertisedAddress, ""); err != nil {
s.LogErrorfAndExit("failed to register API route via INX: %s", err.Error())
}
s.LogInfo("Registering API route ... done")
cancelRegister()
}

onLatestMilestone := events.NewClosure(func(ms *nodebridge.Milestone) {
s.PublishMilestoneOnTopic(topicMilestoneInfoLatest, ms)
})

onConfirmedMilestone := events.NewClosure(func(ms *nodebridge.Milestone) {
s.PublishMilestoneOnTopic(topicMilestoneInfoConfirmed, ms)
})

s.NodeBridge.Events.LatestMilestoneChanged.Hook(onLatestMilestone)
s.NodeBridge.Events.ConfirmedMilestoneChanged.Hook(onConfirmedMilestone)
// register node bridge events
unhookNodeBridgeEvents := lo.Batch(
s.NodeBridge.Events.LatestMilestoneChanged.Hook(func(ms *nodebridge.Milestone) {
s.PublishMilestoneOnTopic(topicMilestoneInfoLatest, ms)
}).Unhook,
s.NodeBridge.Events.ConfirmedMilestoneChanged.Hook(func(ms *nodebridge.Milestone) {
s.PublishMilestoneOnTopic(topicMilestoneInfoConfirmed, ms)
}).Unhook,
)

s.LogInfo("Starting MQTT Broker ... done")
<-ctx.Done()

s.NodeBridge.Events.LatestMilestoneChanged.Detach(onLatestMilestone)
s.NodeBridge.Events.ConfirmedMilestoneChanged.Detach(onConfirmedMilestone)
unhookBrokerEvents()
unhookNodeBridgeEvents()

if s.brokerOptions.WebsocketEnabled {
ctxUnregister, cancelUnregister := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,19 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/dig"

"github.com/iotaledger/hive.go/core/app"
"github.com/iotaledger/inx-mqtt/core/mqtt"
"github.com/iotaledger/hive.go/app"
"github.com/iotaledger/inx-mqtt/components/mqtt"
"github.com/iotaledger/inx-mqtt/pkg/daemon"
)

func init() {
Plugin = &app.Plugin{
Component: &app.Component{
Name: "Prometheus",
DepsFunc: func(cDeps dependencies) { deps = cDeps },
Params: params,
Provide: provide,
Configure: configure,
Run: run,
},
IsEnabled: func() bool {
return ParamsPrometheus.Enabled
},
Component = &app.Component{
Name: "Prometheus",
DepsFunc: func(cDeps dependencies) { deps = cDeps },
Params: params,
Provide: provide,
Configure: configure,
Run: run,
}
}

Expand All @@ -41,8 +36,8 @@ type dependencies struct {
}

var (
Plugin *app.Plugin
deps dependencies
Component *app.Component
deps dependencies
)

func provide(c *dig.Container) error {
Expand Down Expand Up @@ -91,29 +86,29 @@ func configure() error {
}

func run() error {
return Plugin.Daemon().BackgroundWorker("Prometheus exporter", func(ctx context.Context) {
Plugin.LogInfo("Starting Prometheus exporter ... done")
return Component.Daemon().BackgroundWorker("Prometheus exporter", func(ctx context.Context) {
Component.LogInfo("Starting Prometheus exporter ... done")

go func() {
Plugin.LogInfof("You can now access the Prometheus exporter using: http://%s/metrics", ParamsPrometheus.BindAddress)
Component.LogInfof("You can now access the Prometheus exporter using: http://%s/metrics", ParamsPrometheus.BindAddress)
if err := deps.PrometheusEcho.Start(ParamsPrometheus.BindAddress); err != nil && !errors.Is(err, http.ErrServerClosed) {
Plugin.LogWarnf("Stopped Prometheus exporter due to an error (%s)", err)
Component.LogWarnf("Stopped Prometheus exporter due to an error (%s)", err)
}
}()

<-ctx.Done()
Plugin.LogInfo("Stopping Prometheus exporter ...")
Component.LogInfo("Stopping Prometheus exporter ...")

shutdownCtx, shutdownCtxCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCtxCancel()

//nolint:contextcheck // false positive
err := deps.PrometheusEcho.Shutdown(shutdownCtx)
if err != nil {
Plugin.LogWarn(err)
Component.LogWarn(err)
}

Plugin.LogInfo("Stopping Prometheus exporter ... done")
Component.LogInfo("Stopping Prometheus exporter ... done")
}, daemon.PriorityStopPrometheus)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package prometheus
import (
"github.com/prometheus/client_golang/prometheus"

"github.com/iotaledger/inx-mqtt/core/mqtt"
"github.com/iotaledger/inx-mqtt/components/mqtt"
)

var (
Expand Down Expand Up @@ -80,8 +80,8 @@ func registerMQTTMetrics(registry *prometheus.Registry) {

func collectMQTTBroker(server *mqtt.Server) {
mqttBrokerAppInfo.With(prometheus.Labels{
"name": Plugin.App().Info().Name,
"version": Plugin.App().Info().Version,
"name": Component.App().Info().Name,
"version": Component.App().Info().Version,
"broker_version": server.MQTTBroker.SystemInfo().Version,
}).Set(1)
mqttBrokerStarted.Set(float64(server.MQTTBroker.SystemInfo().Started))
Expand Down
Loading

0 comments on commit 97ac91b

Please sign in to comment.