diff --git a/CHANGELOG.md b/CHANGELOG.md index 6325fe556..d6142cdf3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,28 @@ All notable changes to this project will be documented in this file. +## [1.1.1] - 22.12.2021 + +### Changed + - Added ulimit and stop_grace_period to docker-compose.yml and documentation (#1242) + +### Fixed + - Fixed WebSocket disconnecting on Safari browsers (#1243) + - Fixed MQTT memory leak (#1246) + +### Config file changes + +`config.json` +```diff + "prometheus": { + ... + "coordinatorMetrics": true, ++ "mqttBrokerMetrics": true, + "debugMetrics": false, + ... + }, +``` + ## [1.1.0] - 10.12.2021 ### Added diff --git a/README.md b/README.md index 98c256b2b..124d361ea 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ _Table of contents_ ## Documentation -Hornet documentation can be found here: https://hornet.docs.iota.org/ +Hornet documentation can be found here: https://wiki.iota.org/hornet/welcome ## Contributing diff --git a/config.json b/config.json index 39a624024..9688629fa 100644 --- a/config.json +++ b/config.json @@ -252,6 +252,7 @@ "restAPIMetrics": true, "migrationMetrics": true, "coordinatorMetrics": true, + "mqttBrokerMetrics": true, "debugMetrics": false, "goMetrics": false, "processMetrics": false, diff --git a/config_comnet.json b/config_comnet.json index 1aa30d791..efedca23b 100644 --- a/config_comnet.json +++ b/config_comnet.json @@ -258,6 +258,7 @@ "restAPIMetrics": true, "migrationMetrics": true, "coordinatorMetrics": true, + "mqttBrokerMetrics": true, "debugMetrics": false, "goMetrics": false, "processMetrics": false, diff --git a/config_devnet.json b/config_devnet.json index 8e3fcce87..3f86eb941 100644 --- a/config_devnet.json +++ b/config_devnet.json @@ -258,6 +258,7 @@ "restAPIMetrics": true, "migrationMetrics": true, "coordinatorMetrics": true, + "mqttBrokerMetrics": true, "debugMetrics": false, "goMetrics": false, "processMetrics": false, diff --git a/core/app/app.go b/core/app/app.go index 4c02d500f..53a4ed898 100644 --- a/core/app/app.go +++ b/core/app/app.go @@ -20,7 +20,7 @@ var ( Name = "HORNET" // Version of the app. - Version = "1.1.0" + Version = "1.1.1" ) var ( diff --git a/docker-compose.yml b/docker-compose.yml index 95d04d169..28c016777 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,6 +9,7 @@ services: nofile: soft: 8192 hard: 8192 + stop_grace_period: 5m # Best performance via host network: network_mode: host # Else: diff --git a/documentation/docs/api_reference.md b/documentation/docs/api_reference.md index e36a0b138..05c55e4fa 100644 --- a/documentation/docs/api_reference.md +++ b/documentation/docs/api_reference.md @@ -12,7 +12,7 @@ image: /img/logo/HornetLogo.png # API Reference -You can find specifications for the REST API for the IOTA node software in the [IOTA REST API reference](https://editor.swagger.io/?url=https://raw.githubusercontent.com/rufsam/protocol-rfcs/master/text/0026-rest-api/rest-api.yaml) +You can find specifications for the REST API for the IOTA node software in the [IOTA REST API reference](https://editor.swagger.io/?url=https://raw.githubusercontent.com/rufsam/protocol-rfcs/master/text/0026-rest-api/0026-rest-api.yaml) -The node event API is in charge of publishing information about events within the node software. You can find more information in the [Node event API reference](https://playground.asyncapi.io/?load=https://raw.githubusercontent.com/luca-moser/protocol-rfcs/rfc/node-event-api/text/0033-node-event-api/0033-node-event-api.yml) \ No newline at end of file +The node event API is in charge of publishing information about events within the node software. You can find more information in the [Node event API reference](https://studio.asyncapi.com/?url=https://raw.githubusercontent.com/luca-moser/protocol-rfcs/rfc/node-event-api/text/0033-node-event-api/0033-node-event-api.yml) diff --git a/documentation/docs/getting_started/security_101.md b/documentation/docs/getting_started/security_101.md index 1a9106a9d..41193eb38 100644 --- a/documentation/docs/getting_started/security_101.md +++ b/documentation/docs/getting_started/security_101.md @@ -25,7 +25,7 @@ You should consider doing the following before running a node on your device: * [Blocking unnecessary ports](#blocking-unnecessary-ports). ### Securing SSH logins -If you log into your device through SSH, you should take measures to protect it from unauthorized access. Many guides have been written about this subject. For more information, see [10 Steps to Secure Open SSH](https://blog.devolutions.net/2017/4/10-steps-to-secure-open-ssh). In addition to that, you can also leverage tools such as [Fail2ban](https://www.fail2ban.org/wiki/index.php/Main_Page) to further tighten your nodes security. +If you log into your device through SSH, you should take measures to protect it from unauthorized access. Many guides have been written about this subject. For more information, see [10 Steps to Secure Open SSH](https://blog.devolutions.net/2017/04/10-steps-to-secure-open-ssh). In addition to that, you can also leverage tools such as [Fail2ban](https://www.fail2ban.org/wiki/index.php/Main_Page) to further tighten your nodes security. ### Blocking Unnecessary Ports Attackers can abuse any open ports on your device. To secure your device against attacks on unused open ports, you should close all ports except those that are in use. diff --git a/documentation/docs/getting_started/using_docker.md b/documentation/docs/getting_started/using_docker.md index 07a1334bd..9983b83d1 100644 --- a/documentation/docs/getting_started/using_docker.md +++ b/documentation/docs/getting_started/using_docker.md @@ -81,6 +81,7 @@ docker run \ --restart always \ --name hornet\ --net=host \ + --ulimit nofile=8192:8192 \ -d \ hornet:latest ``` @@ -95,10 +96,11 @@ docker run \ * `--restart always` Instructs Docker to restart the container after Docker reboots. * `--name hornet` Name of the running container instance. You can refer to the given container by this name. * `--net=host` Instructs Docker to use the host's network, so the network is not isolated. We recommend that you run on host network for better performance. This way, the container will also open any ports it needs on the host network, so you will not need to specify any ports. +* `--ulimit nofile=8192:8192` increases the ulimits inside the container. This is important when running with large databases. * `-d` Instructs Docker to run the container instance in a detached mode (daemon). -You can run `docker stop -t 200 hornet` to gracefully end the process. +You can run `docker stop -t 300 hornet` to gracefully end the process. ## Create Username and Password for the Hornet Dashboard @@ -166,20 +168,20 @@ docker start hornet You can restart an existing Hornet container by running: ```bash -docker restart -t 200 hornet +docker restart -t 300 hornet ``` -* `-t 200` Instructs Docker to wait for a grace period before shutting down. +* `-t 300` Instructs Docker to wait for a grace period before shutting down. ### Stopping Hornet You can stop an existing Hornet container by running: ```bash -docker stop -t 200 hornet +docker stop -t 300 hornet ``` -* `-t 200` Instructs Docker to wait for a grace period before shutting down. +* `-t 300` Instructs Docker to wait for a grace period before shutting down. ### Displaying Log Output diff --git a/documentation/docs/getting_started/using_docker_compose.md b/documentation/docs/getting_started/using_docker_compose.md index ccc629495..8f410bc2f 100644 --- a/documentation/docs/getting_started/using_docker_compose.md +++ b/documentation/docs/getting_started/using_docker_compose.md @@ -42,6 +42,11 @@ services: image: gohornet/hornet:latest network_mode: host restart: always + ulimits: + nofile: + soft: 8192 + hard: 8192 + stop_grace_period: 5m cap_drop: - ALL volumes: @@ -80,5 +85,5 @@ You can add `-d` to run detached. To gracefully stop the container, you can run the following command: ```sh -docker-compose down -t 200 +docker-compose down ``` diff --git a/documentation/docs/post_installation/configuration.md b/documentation/docs/post_installation/configuration.md index 9d80711a7..b0aa4f35b 100644 --- a/documentation/docs/post_installation/configuration.md +++ b/documentation/docs/post_installation/configuration.md @@ -29,21 +29,21 @@ hornet -h --full ## 1. REST API -| Name | Description | Type | -| :------------------------- | :----------------------------------------------------------------------------------------------- | :--------------- | -| bindAddress | The bind address on which the REST API listens on | string | -| [jwtAuth](#jwt-auth) | Config for JWT auth | object | -| publicRoutes | the HTTP REST routes which can be called without authorization. Wildcards using * are allowed. | array of strings | -| protectedRoutes | the HTTP REST routes which need to be called with authorization. Wildcards using * are allowed. | array of strings | -| powEnabled | Whether the node does PoW if messages are received via API | bool | -| powWorkerCount | The amount of workers used for calculating PoW when issuing messages via API | integer | -| [limits](#limits) | Configuration for api limits | object | +| Name | Description | Type | +| :------------------- | :---------------------------------------------------------------------------------------------- | :--------------- | +| bindAddress | The bind address on which the REST API listens on | string | +| [jwtAuth](#jwt-auth) | Config for JWT auth | object | +| publicRoutes | the HTTP REST routes which can be called without authorization. Wildcards using * are allowed. | array of strings | +| protectedRoutes | the HTTP REST routes which need to be called with authorization. Wildcards using * are allowed. | array of strings | +| powEnabled | Whether the node does PoW if messages are received via API | bool | +| powWorkerCount | The amount of workers used for calculating PoW when issuing messages via API | integer | +| [limits](#limits) | Configuration for api limits | object | ### JWT Auth -| Name | Description | Type | -| :------ | :-------------------------------------------------------------------------------------------------------------------------------------- | :----- | -| salt | Salt used inside the JWT tokens for the REST API. Change this to a different value to invalidate JWT tokens not matching this new value | string | +| Name | Description | Type | +| :--- | :-------------------------------------------------------------------------------------------------------------------------------------- | :----- | +| salt | Salt used inside the JWT tokens for the REST API. Change this to a different value to invalidate JWT tokens not matching this new value | string | ### Limits @@ -795,6 +795,7 @@ Example: | restAPIMetrics | Include restAPI metrics | bool | | migrationMetrics | Include migration metrics | bool | | coordinatorMetrics | Include coordinator metrics | bool | +| mqttBrokerMetrics | Include MQTT broker metrics | bool | | debugMetrics | Include debug metrics | bool | | goMetrics | Include go metrics | bool | | processMetrics | Include process metrics | bool | @@ -825,6 +826,7 @@ Example: "restAPIMetrics": true, "migrationMetrics": true, "coordinatorMetrics": true, + "mqttBrokerMetrics": true, "debugMetrics": false, "goMetrics": false, "processMetrics": false, diff --git a/pkg/model/mselection/heaviest_test.go b/pkg/model/mselection/heaviest_test.go index c10298739..e0846035e 100644 --- a/pkg/model/mselection/heaviest_test.go +++ b/pkg/model/mselection/heaviest_test.go @@ -146,7 +146,7 @@ func TestHeaviestSelector_SelectTipsChains(t *testing.T) { assert.Len(t, hps.trackedMessages, 0) } -func TestHeaviestSelector_SelectTipsCheckTresholds(t *testing.T) { +func TestHeaviestSelector_SelectTipsCheckThresholds(t *testing.T) { te, hps := initTest(t) defer te.CleanupTestEnvironment(true) diff --git a/pkg/mqtt/broker.go b/pkg/mqtt/broker.go index 744a956c5..8e42fa1dd 100644 --- a/pkg/mqtt/broker.go +++ b/pkg/mqtt/broker.go @@ -16,7 +16,7 @@ type Broker struct { } // NewBroker creates a new broker. -func NewBroker(bindAddress string, wsPort int, wsPath string, workerCount int, onSubscribe OnSubscribeHandler, onUnsubscribe OnUnsubscribeHandler) (*Broker, error) { +func NewBroker(bindAddress string, wsPort int, wsPath string, workerCount int, onSubscribe OnSubscribeHandler, onUnsubscribe OnUnsubscribeHandler, cleanupThreshold int) (*Broker, error) { host, port, err := net.SplitHostPort(bindAddress) if err != nil { @@ -36,7 +36,7 @@ func NewBroker(bindAddress string, wsPort int, wsPath string, workerCount int, o return nil, fmt.Errorf("configure broker config error: %w", err) } - t := newTopicManager(onSubscribe, onUnsubscribe) + t := newTopicManager(onSubscribe, onUnsubscribe, cleanupThreshold) b, err := broker.NewBroker(c) if err != nil { @@ -74,3 +74,8 @@ func (b *Broker) Send(topic string, payload []byte) { b.broker.PublishMessage(packet) } + +// TopicsManagerSize returns the size of the underlying map of the topics manager. +func (b *Broker) TopicsManagerSize() int { + return b.topicManager.Size() +} diff --git a/pkg/mqtt/topic_manager.go b/pkg/mqtt/topic_manager.go index abcb92465..230526014 100644 --- a/pkg/mqtt/topic_manager.go +++ b/pkg/mqtt/topic_manager.go @@ -15,8 +15,11 @@ type OnUnsubscribeHandler func(topic []byte) type topicManager struct { mem topics.TopicsProvider - subscribedTopics map[string]int - subscribedTopicsLock sync.RWMutex + subscribedTopics map[string]int + subscribedTopicsLock sync.RWMutex + subscribedTopicsDeleted int + + cleanupThreshold int onSubscribe OnSubscribeHandler onUnsubscribe OnUnsubscribeHandler @@ -49,13 +52,13 @@ func (t *topicManager) Unsubscribe(topic []byte, subscriber interface{}) error { err := t.mem.Unsubscribe(topic, subscriber) - //Ignore error here, always unsubscribe to be safe + // ignore error here, always unsubscribe to be safe topicName := string(topic) count, has := t.subscribedTopics[topicName] if has { - if count <= 0 { - delete(t.subscribedTopics, topicName) + if count <= 1 { + t.deleteTopic(topicName) } else { t.subscribedTopics[topicName] = count - 1 } @@ -82,6 +85,14 @@ func (t *topicManager) Close() error { return t.mem.Close() } +// Size returns the size of the underlying map of the topics manager. +func (t *topicManager) Size() int { + t.subscribedTopicsLock.RLock() + defer t.subscribedTopicsLock.RUnlock() + + return len(t.subscribedTopics) +} + func (t *topicManager) hasSubscribers(topicName string) bool { t.subscribedTopicsLock.RLock() defer t.subscribedTopicsLock.RUnlock() @@ -90,13 +101,35 @@ func (t *topicManager) hasSubscribers(topicName string) bool { return has && count > 0 } -func newTopicManager(onSubscribe OnSubscribeHandler, onUnsubscribe OnUnsubscribeHandler) *topicManager { +// cleanupWithoutLocking recreates the subscribedTopics map to release memory for the garbage collector. +func (t *topicManager) cleanupWithoutLocking() { + subscribedTopics := make(map[string]int) + for topicName, count := range t.subscribedTopics { + subscribedTopics[topicName] = count + } + t.subscribedTopics = subscribedTopics + t.subscribedTopicsDeleted = 0 +} + +// deleteTopic deletes a topic from the manager. +func (t *topicManager) deleteTopic(topicName string) { + delete(t.subscribedTopics, topicName) + + // increase the deletion counter to trigger garbage collection + t.subscribedTopicsDeleted++ + if t.cleanupThreshold != 0 && t.subscribedTopicsDeleted >= t.cleanupThreshold { + t.cleanupWithoutLocking() + } +} + +func newTopicManager(onSubscribe OnSubscribeHandler, onUnsubscribe OnUnsubscribeHandler, cleanupThreshold int) *topicManager { mgr := &topicManager{ mem: topics.NewMemProvider(), subscribedTopics: make(map[string]int), onSubscribe: onSubscribe, onUnsubscribe: onUnsubscribe, + cleanupThreshold: cleanupThreshold, } // The normal MQTT broker uses the `mem` topic manager internally, so first unregister the default one. diff --git a/pkg/protocol/gossip/service.go b/pkg/protocol/gossip/service.go index 3494b017e..7754a51a6 100644 --- a/pkg/protocol/gossip/service.go +++ b/pkg/protocol/gossip/service.go @@ -593,7 +593,7 @@ func (s *Service) registerLoggerOnEvents() { s.LogInfof("canceled inbound protocol stream from %s: %s", remotePeer, reason) })) s.Events.Error.Attach(events.NewClosure(func(err error) { - s.LogError(err) + s.LogWarn(err) })) } diff --git a/plugins/dashboard/plugin.go b/plugins/dashboard/plugin.go index db68d77e3..da7f8d7ae 100644 --- a/plugins/dashboard/plugin.go +++ b/plugins/dashboard/plugin.go @@ -129,9 +129,12 @@ func configure() { } upgrader = &websocket.Upgrader{ - HandshakeTimeout: webSocketWriteTimeout, - CheckOrigin: func(r *http.Request) bool { return true }, // allow any origin for websocket connections - EnableCompression: true, + HandshakeTimeout: webSocketWriteTimeout, + CheckOrigin: func(r *http.Request) bool { return true }, // allow any origin for websocket connections + // Disable compression due to incompatibilities with latest Safari browsers: + // https://github.com/tilt-dev/tilt/issues/4746 + // https://github.com/gorilla/websocket/issues/731 + EnableCompression: false, } hub = websockethub.NewHub(Plugin.Logger(), upgrader, broadcastQueueSize, clientSendChannelSize, maxWebsocketMessageSize) diff --git a/plugins/mqtt/params.go b/plugins/mqtt/params.go index d800b9e7e..9f884116b 100644 --- a/plugins/mqtt/params.go +++ b/plugins/mqtt/params.go @@ -7,12 +7,14 @@ import ( ) const ( - // the bind address on which the MQTT broker listens on + // the bind address on which the MQTT broker listens on. CfgMQTTBindAddress = "mqtt.bindAddress" - // the port of the WebSocket MQTT broker + // the port of the WebSocket MQTT broker. CfgMQTTWSPort = "mqtt.wsPort" - // the number of parallel workers the MQTT broker uses to publish messages + // the number of parallel workers the MQTT broker uses to publish messages. CfgMQTTWorkerCount = "mqtt.workerCount" + // the number of deleted topics that trigger a garbage collection of the topic manager. + CfgMQTTTopicCleanupThreshold = "mqtt.topicCleanupThreshold" ) var params = &node.PluginParams{ @@ -22,6 +24,7 @@ var params = &node.PluginParams{ fs.String(CfgMQTTBindAddress, "localhost:1883", "bind address on which the MQTT broker listens on") fs.Int(CfgMQTTWSPort, 1888, "port of the WebSocket MQTT broker") fs.Int(CfgMQTTWorkerCount, 100, "number of parallel workers the MQTT broker uses to publish messages") + fs.Int(CfgMQTTTopicCleanupThreshold, 10000, "number of deleted topics that trigger a garbage collection of the topic manager") return fs }(), }, diff --git a/plugins/mqtt/plugin.go b/plugins/mqtt/plugin.go index 08950dd06..b09d49cda 100644 --- a/plugins/mqtt/plugin.go +++ b/plugins/mqtt/plugin.go @@ -31,6 +31,7 @@ func init() { Name: "MQTT", DepsFunc: func(cDeps dependencies) { deps = cDeps }, Params: params, + Provide: provide, Configure: configure, Run: run, }, @@ -60,8 +61,6 @@ var ( topicSubscriptionWorkerPool *workerpool.WorkerPool wasSyncBefore = false - - mqttBroker *mqttpkg.Broker ) type dependencies struct { @@ -75,6 +74,30 @@ type dependencies struct { BelowMaxDepth int `name:"belowMaxDepth"` Bech32HRP iotago.NetworkPrefix `name:"bech32HRP"` Echo *echo.Echo `optional:"true"` + MQTTBroker *mqttpkg.Broker +} + +func provide(c *dig.Container) { + + type brokerDeps struct { + dig.In + NodeConfig *configuration.Configuration `name:"nodeConfig"` + } + + if err := c.Provide(func(deps brokerDeps) *mqttpkg.Broker { + mqttBroker, err := mqttpkg.NewBroker(deps.NodeConfig.String(CfgMQTTBindAddress), deps.NodeConfig.Int(CfgMQTTWSPort), "/ws", deps.NodeConfig.Int(CfgMQTTWorkerCount), func(topic []byte) { + Plugin.LogDebugf("Subscribe to topic: %s", string(topic)) + topicSubscriptionWorkerPool.TrySubmit(topic) + }, func(topic []byte) { + Plugin.LogDebugf("Unsubscribe from topic: %s", string(topic)) + }, deps.NodeConfig.Int(CfgMQTTTopicCleanupThreshold)) + if err != nil { + Plugin.LogFatalf("MQTT broker init failed! %s", err) + } + return mqttBroker + }); err != nil { + Plugin.LogPanic(err) + } } func configure() { @@ -185,25 +208,13 @@ func configure() { }, workerpool.WorkerCount(workerCount), workerpool.QueueSize(workerQueueSize), workerpool.FlushTasksAtShutdown(true)) - var err error - mqttBroker, err = mqttpkg.NewBroker(deps.NodeConfig.String(CfgMQTTBindAddress), deps.NodeConfig.Int(CfgMQTTWSPort), "/ws", deps.NodeConfig.Int(CfgMQTTWorkerCount), func(topic []byte) { - Plugin.LogDebugf("Subscribe to topic: %s", string(topic)) - topicSubscriptionWorkerPool.TrySubmit(topic) - }, func(topic []byte) { - Plugin.LogDebugf("Unsubscribe from topic: %s", string(topic)) - }) - - if err != nil { - Plugin.LogFatalf("MQTT broker init failed! %s", err) - } - setupWebSocketRoute() } func setupWebSocketRoute() { // Configure MQTT WebSocket route - mqttWSUrl, err := url.Parse(fmt.Sprintf("http://%s:%s", mqttBroker.Config().Host, mqttBroker.Config().WsPort)) + mqttWSUrl, err := url.Parse(fmt.Sprintf("http://%s:%s", deps.MQTTBroker.Config().Host, deps.MQTTBroker.Config().WsPort)) if err != nil { Plugin.LogFatalf("MQTT WebSocket init failed! %s", err) } @@ -218,7 +229,7 @@ func setupWebSocketRoute() { }), // We need to forward any calls to the MQTT route to the ws endpoint of our broker Rewrite: map[string]string{ - RouteMQTT: mqttBroker.Config().WsPath, + RouteMQTT: deps.MQTTBroker.Config().WsPath, }, } @@ -227,7 +238,7 @@ func setupWebSocketRoute() { func run() { - Plugin.LogInfof("Starting MQTT Broker (port %s) ...", mqttBroker.Config().Port) + Plugin.LogInfof("Starting MQTT Broker (port %s) ...", deps.MQTTBroker.Config().Port) onLatestMilestoneChanged := events.NewClosure(func(cachedMs *storage.CachedMilestone) { if !wasSyncBefore { @@ -298,16 +309,16 @@ func run() { if err := Plugin.Daemon().BackgroundWorker("MQTT Broker", func(ctx context.Context) { go func() { - mqttBroker.Start() - Plugin.LogInfof("Starting MQTT Broker (port %s) ... done", mqttBroker.Config().Port) + deps.MQTTBroker.Start() + Plugin.LogInfof("Starting MQTT Broker (port %s) ... done", deps.MQTTBroker.Config().Port) }() - if mqttBroker.Config().Port != "" { - Plugin.LogInfof("You can now listen to MQTT via: http://%s:%s", mqttBroker.Config().Host, mqttBroker.Config().Port) + if deps.MQTTBroker.Config().Port != "" { + Plugin.LogInfof("You can now listen to MQTT via: http://%s:%s", deps.MQTTBroker.Config().Host, deps.MQTTBroker.Config().Port) } - if mqttBroker.Config().TlsPort != "" { - Plugin.LogInfof("You can now listen to MQTT via: https://%s:%s", mqttBroker.Config().TlsHost, mqttBroker.Config().TlsPort) + if deps.MQTTBroker.Config().TlsPort != "" { + Plugin.LogInfof("You can now listen to MQTT via: https://%s:%s", deps.MQTTBroker.Config().TlsHost, deps.MQTTBroker.Config().TlsPort) } <-ctx.Done() diff --git a/plugins/mqtt/utils.go b/plugins/mqtt/utils.go index 3c779cc68..08c521d97 100644 --- a/plugins/mqtt/utils.go +++ b/plugins/mqtt/utils.go @@ -25,7 +25,7 @@ func publishOnTopic(topic string, payload interface{}) { return } - mqttBroker.Send(topic, jsonPayload) + deps.MQTTBroker.Send(topic, jsonPayload) } func publishConfirmedMilestone(cachedMs *storage.CachedMilestone) { @@ -39,7 +39,7 @@ func publishLatestMilestone(cachedMs *storage.CachedMilestone) { } func publishMilestoneOnTopic(topic string, milestone *storage.Milestone) { - if mqttBroker.HasSubscribers(topic) { + if deps.MQTTBroker.HasSubscribers(topic) { publishOnTopic(topic, &milestonePayload{ Index: uint32(milestone.Index), Time: milestone.Timestamp.Unix(), @@ -48,7 +48,7 @@ func publishMilestoneOnTopic(topic string, milestone *storage.Milestone) { } func publishReceipt(r *iotago.Receipt) { - if mqttBroker.HasSubscribers(topicReceipts) { + if deps.MQTTBroker.HasSubscribers(topicReceipts) { publishOnTopic(topicReceipts, r) } } @@ -56,25 +56,25 @@ func publishReceipt(r *iotago.Receipt) { func publishMessage(cachedMessage *storage.CachedMessage) { defer cachedMessage.Release(true) - if mqttBroker.HasSubscribers(topicMessages) { - mqttBroker.Send(topicMessages, cachedMessage.Message().Data()) + if deps.MQTTBroker.HasSubscribers(topicMessages) { + deps.MQTTBroker.Send(topicMessages, cachedMessage.Message().Data()) } indexation := cachedMessage.Message().Indexation() if indexation != nil { indexationTopic := strings.ReplaceAll(topicMessagesIndexation, "{index}", hex.EncodeToString(indexation.Index)) - if mqttBroker.HasSubscribers(indexationTopic) { - mqttBroker.Send(indexationTopic, cachedMessage.Message().Data()) + if deps.MQTTBroker.HasSubscribers(indexationTopic) { + deps.MQTTBroker.Send(indexationTopic, cachedMessage.Message().Data()) } } } func publishTransactionIncludedMessage(transactionID *iotago.TransactionID, messageID hornet.MessageID) { transactionTopic := strings.ReplaceAll(topicTransactionsIncludedMessage, "{transactionId}", hex.EncodeToString(transactionID[:])) - if mqttBroker.HasSubscribers(transactionTopic) { + if deps.MQTTBroker.HasSubscribers(transactionTopic) { cachedMessage := deps.Storage.CachedMessageOrNil(messageID) if cachedMessage != nil { - mqttBroker.Send(transactionTopic, cachedMessage.Message().Data()) + deps.MQTTBroker.Send(transactionTopic, cachedMessage.Message().Data()) cachedMessage.Release(true) } } @@ -87,9 +87,9 @@ func publishMessageMetadata(cachedMetadata *storage.CachedMetadata) { messageID := metadata.MessageID().ToHex() singleMessageTopic := strings.ReplaceAll(topicMessagesMetadata, "{messageId}", messageID) - hasSingleMessageTopicSubscriber := mqttBroker.HasSubscribers(singleMessageTopic) + hasSingleMessageTopicSubscriber := deps.MQTTBroker.HasSubscribers(singleMessageTopic) - hasAllMessagesTopicSubscriber := mqttBroker.HasSubscribers(topicMessagesReferenced) + hasAllMessagesTopicSubscriber := deps.MQTTBroker.HasSubscribers(topicMessagesReferenced) if hasSingleMessageTopicSubscriber || hasAllMessagesTopicSubscriber { @@ -171,10 +171,10 @@ func publishMessageMetadata(cachedMetadata *storage.CachedMetadata) { } if hasSingleMessageTopicSubscriber { - mqttBroker.Send(singleMessageTopic, jsonPayload) + deps.MQTTBroker.Send(singleMessageTopic, jsonPayload) } if hasAllMessagesTopicSubscriber { - mqttBroker.Send(topicMessagesReferenced, jsonPayload) + deps.MQTTBroker.Send(topicMessagesReferenced, jsonPayload) } } } @@ -217,13 +217,13 @@ func payloadForOutput(ledgerIndex milestone.Index, output *utxo.Output, spent bo func publishOutput(ledgerIndex milestone.Index, output *utxo.Output, spent bool) { outputsTopic := strings.ReplaceAll(topicOutputs, "{outputId}", output.OutputID().ToHex()) - outputsTopicHasSubscribers := mqttBroker.HasSubscribers(outputsTopic) + outputsTopicHasSubscribers := deps.MQTTBroker.HasSubscribers(outputsTopic) addressBech32Topic := strings.ReplaceAll(topicAddressesOutput, "{address}", output.Address().Bech32(deps.Bech32HRP)) - addressBech32TopicHasSubscribers := mqttBroker.HasSubscribers(addressBech32Topic) + addressBech32TopicHasSubscribers := deps.MQTTBroker.HasSubscribers(addressBech32Topic) addressEd25519Topic := strings.ReplaceAll(topicAddressesEd25519Output, "{address}", output.Address().String()) - addressEd25519TopicHasSubscribers := mqttBroker.HasSubscribers(addressEd25519Topic) + addressEd25519TopicHasSubscribers := deps.MQTTBroker.HasSubscribers(addressEd25519Topic) if outputsTopicHasSubscribers || addressEd25519TopicHasSubscribers || addressBech32TopicHasSubscribers { if payload := payloadForOutput(ledgerIndex, output, spent); payload != nil { @@ -236,15 +236,15 @@ func publishOutput(ledgerIndex milestone.Index, output *utxo.Output, spent bool) } if outputsTopicHasSubscribers { - mqttBroker.Send(outputsTopic, jsonPayload) + deps.MQTTBroker.Send(outputsTopic, jsonPayload) } if addressBech32TopicHasSubscribers { - mqttBroker.Send(addressBech32Topic, jsonPayload) + deps.MQTTBroker.Send(addressBech32Topic, jsonPayload) } if addressEd25519TopicHasSubscribers { - mqttBroker.Send(addressEd25519Topic, jsonPayload) + deps.MQTTBroker.Send(addressEd25519Topic, jsonPayload) } } } diff --git a/plugins/prometheus/mqtt_broker.go b/plugins/prometheus/mqtt_broker.go new file mode 100644 index 000000000..a76045ac6 --- /dev/null +++ b/plugins/prometheus/mqtt_broker.go @@ -0,0 +1,28 @@ +package prometheus + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + mqttBrokerTopicsManagerSize prometheus.Gauge +) + +func configureMQTTBroker() { + + mqttBrokerTopicsManagerSize = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "iota", + Subsystem: "mqtt_broker", + Name: "topics_manager_size", + Help: "Number of active topics in the topics manager.", + }) + + registry.MustRegister(mqttBrokerTopicsManagerSize) + + addCollect(collectMQTTBroker) +} + +func collectMQTTBroker() { + mqttBrokerTopicsManagerSize.Set(float64(deps.MQTTBroker.TopicsManagerSize())) +} diff --git a/plugins/prometheus/params.go b/plugins/prometheus/params.go index d6ae30661..081493c93 100644 --- a/plugins/prometheus/params.go +++ b/plugins/prometheus/params.go @@ -29,6 +29,8 @@ const ( CfgPrometheusMigration = "prometheus.migrationMetrics" // include coordinator metrics. CfgPrometheusCoordinator = "prometheus.coordinatorMetrics" + // include MQTT broker metrics. + CfgPrometheusMQTTBroker = "prometheus.mqttBrokerMetrics" // include debug metrics. CfgPrometheusDebug = "prometheus.debugMetrics" // include go metrics. @@ -54,6 +56,7 @@ var params = &node.PluginParams{ fs.Bool(CfgPrometheusRestAPI, true, "include restAPI metrics") fs.Bool(CfgPrometheusMigration, true, "include migration metrics") fs.Bool(CfgPrometheusCoordinator, true, "include coordinator metrics") + fs.Bool(CfgPrometheusMQTTBroker, true, "include MQTT broker metrics") fs.Bool(CfgPrometheusDebug, false, "include debug metrics") fs.Bool(CfgPrometheusGoMetrics, false, "include go metrics") fs.Bool(CfgPrometheusProcessMetrics, false, "include process metrics") diff --git a/plugins/prometheus/plugin.go b/plugins/prometheus/plugin.go index 6aa449bc6..a51ee2f2f 100644 --- a/plugins/prometheus/plugin.go +++ b/plugins/prometheus/plugin.go @@ -25,6 +25,7 @@ import ( "github.com/gohornet/hornet/pkg/model/migrator" "github.com/gohornet/hornet/pkg/model/storage" "github.com/gohornet/hornet/pkg/model/syncmanager" + "github.com/gohornet/hornet/pkg/mqtt" "github.com/gohornet/hornet/pkg/node" "github.com/gohornet/hornet/pkg/p2p" "github.com/gohornet/hornet/pkg/protocol/gossip" @@ -86,6 +87,7 @@ type dependencies struct { TipSelector *tipselect.TipSelector `optional:"true"` SnapshotManager *snapshot.SnapshotManager Coordinator *coordinator.Coordinator `optional:"true"` + MQTTBroker *mqtt.Broker `optional:"true"` } func configure() { @@ -118,6 +120,9 @@ func configure() { if deps.NodeConfig.Bool(CfgPrometheusCoordinator) && deps.Coordinator != nil { configureCoordinator() } + if deps.NodeConfig.Bool(CfgPrometheusMQTTBroker) && deps.MQTTBroker != nil { + configureMQTTBroker() + } if deps.NodeConfig.Bool(CfgPrometheusDebug) { configureDebug() } diff --git a/private_tangle/config_private_tangle.json b/private_tangle/config_private_tangle.json index 8fdae216c..4c0166883 100644 --- a/private_tangle/config_private_tangle.json +++ b/private_tangle/config_private_tangle.json @@ -246,6 +246,7 @@ "restAPIMetrics": true, "migrationMetrics": true, "coordinatorMetrics": true, + "mqttBrokerMetrics": true, "debugMetrics": false, "goMetrics": false, "processMetrics": false,