Skip to content

Commit

Permalink
Move code to own package
Browse files Browse the repository at this point in the history
  • Loading branch information
muXxer committed Nov 21, 2023
1 parent 55e62b1 commit 55e4f9b
Show file tree
Hide file tree
Showing 10 changed files with 39 additions and 38 deletions.
43 changes: 20 additions & 23 deletions components/mqtt/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@ import (
"github.com/iotaledger/hive.go/app"
"github.com/iotaledger/hive.go/app/shutdown"
"github.com/iotaledger/inx-app/pkg/nodebridge"
"github.com/iotaledger/inx-mqtt/pkg/broker"
"github.com/iotaledger/inx-mqtt/pkg/daemon"
"github.com/iotaledger/inx-mqtt/pkg/mqtt"
)

const (
APIRoute = "mqtt/v2"
)

func init() {
Component = &app.Component{
Name: "MQTT",
Expand All @@ -29,7 +26,7 @@ func init() {
type dependencies struct {
dig.In
NodeBridge *nodebridge.NodeBridge
Server *Server
Server *mqtt.Server
}

var (
Expand All @@ -45,27 +42,27 @@ func provide(c *dig.Container) error {
*shutdown.ShutdownHandler
}

return c.Provide(func(deps inDeps) (*Server, error) {
return NewServer(
return c.Provide(func(deps inDeps) (*mqtt.Server, error) {
return mqtt.NewServer(
Component.Logger(),
deps.NodeBridge,
deps.ShutdownHandler,
mqtt.WithBufferSize(ParamsMQTT.BufferSize),
mqtt.WithBufferBlockSize(ParamsMQTT.BufferBlockSize),
mqtt.WithMaxTopicSubscriptionsPerClient(ParamsMQTT.Subscriptions.MaxTopicSubscriptionsPerClient),
mqtt.WithTopicCleanupThresholdCount(ParamsMQTT.Subscriptions.TopicsCleanupThresholdCount),
mqtt.WithTopicCleanupThresholdRatio(ParamsMQTT.Subscriptions.TopicsCleanupThresholdRatio),
mqtt.WithWebsocketEnabled(ParamsMQTT.Websocket.Enabled),
mqtt.WithWebsocketBindAddress(ParamsMQTT.Websocket.BindAddress),
mqtt.WithWebsocketAdvertiseAddress(ParamsMQTT.Websocket.AdvertiseAddress),
mqtt.WithTCPEnabled(ParamsMQTT.TCP.Enabled),
mqtt.WithTCPBindAddress(ParamsMQTT.TCP.BindAddress),
mqtt.WithTCPAuthEnabled(ParamsMQTT.TCP.Auth.Enabled),
mqtt.WithTCPAuthPasswordSalt(ParamsMQTT.TCP.Auth.PasswordSalt),
mqtt.WithTCPAuthUsers(ParamsMQTT.TCP.Auth.Users),
mqtt.WithTCPTLSEnabled(ParamsMQTT.TCP.TLS.Enabled),
mqtt.WithTCPTLSCertificatePath(ParamsMQTT.TCP.TLS.CertificatePath),
mqtt.WithTCPTLSPrivateKeyPath(ParamsMQTT.TCP.TLS.PrivateKeyPath),
broker.WithBufferSize(ParamsMQTT.BufferSize),
broker.WithBufferBlockSize(ParamsMQTT.BufferBlockSize),
broker.WithMaxTopicSubscriptionsPerClient(ParamsMQTT.Subscriptions.MaxTopicSubscriptionsPerClient),
broker.WithTopicCleanupThresholdCount(ParamsMQTT.Subscriptions.TopicsCleanupThresholdCount),
broker.WithTopicCleanupThresholdRatio(ParamsMQTT.Subscriptions.TopicsCleanupThresholdRatio),
broker.WithWebsocketEnabled(ParamsMQTT.Websocket.Enabled),
broker.WithWebsocketBindAddress(ParamsMQTT.Websocket.BindAddress),
broker.WithWebsocketAdvertiseAddress(ParamsMQTT.Websocket.AdvertiseAddress),
broker.WithTCPEnabled(ParamsMQTT.TCP.Enabled),
broker.WithTCPBindAddress(ParamsMQTT.TCP.BindAddress),
broker.WithTCPAuthEnabled(ParamsMQTT.TCP.Auth.Enabled),
broker.WithTCPAuthPasswordSalt(ParamsMQTT.TCP.Auth.PasswordSalt),
broker.WithTCPAuthUsers(ParamsMQTT.TCP.Auth.Users),
broker.WithTCPTLSEnabled(ParamsMQTT.TCP.TLS.Enabled),
broker.WithTCPTLSCertificatePath(ParamsMQTT.TCP.TLS.CertificatePath),
broker.WithTCPTLSPrivateKeyPath(ParamsMQTT.TCP.TLS.PrivateKeyPath),
)
})
}
Expand Down
2 changes: 1 addition & 1 deletion components/prometheus/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"go.uber.org/dig"

"github.com/iotaledger/hive.go/app"
"github.com/iotaledger/inx-mqtt/components/mqtt"
"github.com/iotaledger/inx-mqtt/pkg/daemon"
"github.com/iotaledger/inx-mqtt/pkg/mqtt"
)

func init() {
Expand Down
2 changes: 1 addition & 1 deletion components/prometheus/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package prometheus
import (
"github.com/prometheus/client_golang/prometheus"

"github.com/iotaledger/inx-mqtt/components/mqtt"
"github.com/iotaledger/inx-mqtt/pkg/mqtt"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion pkg/mqtt/auth.go → pkg/broker/auth.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package mqtt
package broker

import (
"encoding/hex"
Expand Down
2 changes: 1 addition & 1 deletion pkg/mqtt/broker.go → pkg/broker/broker.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package mqtt
package broker

import (
"crypto/tls"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package mqtt
package broker

// BrokerOptions are options around the broker.
type BrokerOptions struct {

Check failure on line 4 in pkg/broker/broker_options.go

View workflow job for this annotation

GitHub Actions / golangci

[golangci] pkg/broker/broker_options.go#L4

exported: type name will be used as broker.BrokerOptions by other packages, and that stutters; consider calling this Options (revive)
Raw output
pkg/broker/broker_options.go:4:6: exported: type name will be used as broker.BrokerOptions by other packages, and that stutters; consider calling this Options (revive)
type BrokerOptions struct {
     ^
Expand Down
2 changes: 1 addition & 1 deletion pkg/mqtt/tls.go → pkg/broker/tls.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package mqtt
package broker

import (
"crypto/tls"
Expand Down
2 changes: 1 addition & 1 deletion components/mqtt/publish.go → pkg/mqtt/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (s *Server) publishOutputWithMetadataIfSubscribed(ctx context.Context, outp
)
}

bech32HRP := deps.NodeBridge.APIProvider().CommittedAPI().ProtocolParameters().Bech32HRP()
bech32HRP := s.NodeBridge.APIProvider().CommittedAPI().ProtocolParameters().Bech32HRP()
topics = append(topics, getChainTopicsForOutput(outputID, iotaOutput, bech32HRP)...)
topics = append(topics, getUnlockConditionTopicsForOutput(topicOutputsByUnlockConditionAndAddress, iotaOutput, bech32HRP)...)
}
Expand Down
20 changes: 12 additions & 8 deletions components/mqtt/server.go → pkg/mqtt/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ import (
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/web/subscriptionmanager"
"github.com/iotaledger/inx-app/pkg/nodebridge"
"github.com/iotaledger/inx-mqtt/pkg/mqtt"
"github.com/iotaledger/inx-mqtt/pkg/broker"
inx "github.com/iotaledger/inx/go"
iotago "github.com/iotaledger/iota.go/v4"
)

const (
APIRoute = "mqtt/v2"
)

const (
grpcListenToBlocks = "INX.ListenToBlocks"
grpcListenToAcceptedBlocks = "INX.ListenToAcceptedBlocks"
Expand All @@ -43,10 +47,10 @@ type topicSubcription struct {
type Server struct {
*logger.WrappedLogger

MQTTBroker *mqtt.Broker
MQTTBroker *broker.Broker
NodeBridge *nodebridge.NodeBridge
shutdownHandler *shutdown.ShutdownHandler
brokerOptions *mqtt.BrokerOptions
brokerOptions *broker.BrokerOptions

grpcSubscriptionsLock sync.Mutex
grpcSubscriptions map[string]*topicSubcription
Expand All @@ -55,8 +59,8 @@ type Server struct {
func NewServer(log *logger.Logger,
bridge *nodebridge.NodeBridge,
shutdownHandler *shutdown.ShutdownHandler,
brokerOpts ...mqtt.BrokerOption) (*Server, error) {
opts := &mqtt.BrokerOptions{}
brokerOpts ...broker.BrokerOption) (*Server, error) {
opts := &broker.BrokerOptions{}
opts.ApplyOnDefault(brokerOpts...)

s := &Server{
Expand All @@ -71,7 +75,7 @@ func NewServer(log *logger.Logger,
}

func (s *Server) Run(ctx context.Context) {
broker, err := mqtt.NewBroker(s.brokerOptions)
broker, err := broker.NewBroker(s.brokerOptions)
if err != nil {
s.LogErrorfAndExit("failed to create MQTT broker: %s", err.Error())
}
Expand Down Expand Up @@ -107,7 +111,7 @@ func (s *Server) Run(ctx context.Context) {
advertisedAddress = s.brokerOptions.WebsocketAdvertiseAddress
}

if err := deps.NodeBridge.RegisterAPIRoute(ctxRegister, APIRoute, advertisedAddress, ""); err != nil {
if err := s.NodeBridge.RegisterAPIRoute(ctxRegister, APIRoute, advertisedAddress, ""); err != nil {
s.LogErrorfAndExit("failed to register API route via INX: %s", err.Error())
}
s.LogInfo("Registering API route ... done")
Expand Down Expand Up @@ -142,7 +146,7 @@ func (s *Server) Run(ctx context.Context) {

s.LogInfo("Removing API route ...")
//nolint:contextcheck // false positive
if err := deps.NodeBridge.UnregisterAPIRoute(ctxUnregister, APIRoute); err != nil {
if err := s.NodeBridge.UnregisterAPIRoute(ctxUnregister, APIRoute); err != nil {
s.LogErrorf("failed to remove API route via INX: %s", err.Error())
}
cancelUnregister()
Expand Down
File renamed without changes.

0 comments on commit 55e4f9b

Please sign in to comment.