Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/release-33.x' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
opennms-bamboo committed Mar 6, 2025
2 parents 668afc7 + ad8d858 commit dcafd6d
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,28 @@ See `monitored-services.proto` in the corresponding source distribution for the
$ ssh -p 8101 admin@localhost
...
admin@opennms()> config:edit org.opennms.features.grpc.exporter
admin@opennms()> config:property-set host bsm.onmshs.local:1443 <1>
admin@opennms()> config:property-set bsm.host bsm.onmshs.local:1443 <1>
admin@opennms()> config:property-set nms.inventory.host bsm.onmshs.local:1443 <1>
admin@opennms()> config:property-set tenant.id opennms-prime <2>
admin@opennms()> config:property-set tls.cert.path /opt/opennms/etc/tls.cert <3>
admin@opennms()> config:property-set tls.enabled false <4>
admin@opennms()> config:property-set snapshot.interval 3600 <5>
admin@opennms()> config:update
----

<1> Set the hostname of the external gRPC application.
<2> Set tenant id for the data being sent, defaults to `opennms-prime`
<3> Configure the path to the TLS certificate.
<4> TLS is enabled by default. For testing purposes, it can be disabled by setting this value to false.
<5> Set the interval (in seconds) at which the complete snapshot of services will be sent to the gRPC server,
defaults to 3600 secs.

=== Optional Configuration
[source, karaf]
----
$ ssh -p 8101 admin@localhost
...
admin@opennms()> config:property-set tls.enabled false <1>
admin@opennms()> config:property-set snapshot.interval 3600 <2>
----
<1> TLS is enabled by default. For testing purposes, it can be disabled by setting this value to false.
<2> Update the interval (in seconds) at which the complete snapshot of services will be sent to the gRPC server,
defaults to 3600 secs.

== Enable gRPC Exporter

Expand Down
4 changes: 4 additions & 0 deletions features/grpc/exporter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@
<groupId>org.opennms.integration.api</groupId>
<artifactId>config</artifactId>
</dependency>
<dependency>
<groupId>org.opennms.core</groupId>
<artifactId>org.opennms.core.lib</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.opennms.plugin.grpc.proto.services.InventoryUpdateList;
import org.opennms.plugin.grpc.proto.services.ServiceSyncGrpc;
import org.opennms.plugin.grpc.proto.services.StateUpdateList;
import org.opennms.plugin.grpc.proto.services.HeartBeat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLException;
Expand All @@ -49,6 +50,7 @@ public class BsmGrpcClient extends GrpcExporter {
private ServiceSyncGrpc.ServiceSyncStub monitoredServiceSyncStub;
private StreamObserver<InventoryUpdateList> inventoryUpdateStream;
private StreamObserver<StateUpdateList> stateUpdateStream;
private StreamObserver<HeartBeat> heartBeatStream;
private ScheduledExecutorService scheduler;
private final AtomicBoolean reconnecting = new AtomicBoolean(false);
private Callback inventoryCallback;
Expand Down Expand Up @@ -91,6 +93,8 @@ private synchronized void initializeStreams() {
this.monitoredServiceSyncStub.inventoryUpdate(new LoggingAckReceiver("monitored_service_inventory_update", this));
this.stateUpdateStream =
this.monitoredServiceSyncStub.stateUpdate(new LoggingAckReceiver("monitored_service_state_update", this));
this.heartBeatStream =
this.monitoredServiceSyncStub.heartBeatUpdate(new LoggingAckReceiver("heartbeat_update", this));
this.scheduler.shutdown();
this.scheduler = null;
LOG.info("Streams initialized successfully.");
Expand Down Expand Up @@ -141,6 +145,14 @@ public void sendMonitoredServicesStatusUpdate(final StateUpdateList stateUpdates
}
}

public void sendHeartBeatUpdate(HeartBeat heartBeat) {
if (heartBeatStream != null) {
this.heartBeatStream.onNext(heartBeat);
} else {
LOG.warn("Unable to send heartbeat status update since channel is not ready yet");
}
}

private static class LoggingAckReceiver implements StreamObserver<Empty> {

private final String type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

package org.opennms.features.grpc.exporter.bsm;

import org.opennms.core.utils.SystemInfoUtils;
import org.opennms.features.grpc.exporter.NamedThreadFactory;
import org.opennms.features.grpc.exporter.common.MonitoredServiceWithMetadata;
import org.opennms.features.grpc.exporter.mapper.MonitoredServiceMapper;
Expand All @@ -30,6 +31,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
Expand All @@ -45,6 +47,9 @@ public class InventoryService {
private final Duration snapshotInterval;
private final ScheduledExecutorService scheduler;

private final ScheduledExecutorService heartBeatScheduler =
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("heartbeat-update"));

public InventoryService(final NodeDao nodeDao,
final RuntimeInfo runtimeInfo,
final BsmGrpcClient client,
Expand All @@ -70,6 +75,7 @@ public void start() {
this.snapshotInterval.getSeconds(),
this.snapshotInterval.getSeconds(),
TimeUnit.SECONDS);
this.heartBeatScheduler.scheduleAtFixedRate(this::sendHeartBeatUpdate, 60, 60, TimeUnit.SECONDS);
// Set this callback to send snapshot for initial server connect and reconnects
this.client.setInventoryCallback(this::sendSnapshot);
}
Expand All @@ -93,4 +99,15 @@ public void sendSnapshot() {
final var inventory = MonitoredServiceMapper.INSTANCE.toInventoryUpdates(services, this.runtimeInfo, true);
this.client.sendMonitoredServicesInventoryUpdate(inventory);
}

public void sendHeartBeatUpdate() {
this.client.sendHeartBeatUpdate(org.opennms.plugin.grpc.proto.services.HeartBeat.newBuilder()
.setMonitoringInstance(org.opennms.plugin.grpc.proto.services.MonitoringInstance.newBuilder()
.setInstanceId(runtimeInfo.getSystemId())
.setInstanceName(SystemInfoUtils.getInstanceId())
.setInstanceType("OpenNMS").build())
.setTimestamp(Instant.now().toEpochMilli())
.setMessage("HeartBeat Update from OpenNMS")
.build());
}
}
15 changes: 14 additions & 1 deletion features/grpc/exporter/src/main/proto/monitored-services.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,20 @@ message StateUpdateList {
repeated StateUpdate updates = 3;
}

message MonitoringInstance {
string instance_type = 1;
string instance_id = 2;
string instance_name = 3;
}

message HeartBeat {
MonitoringInstance monitoring_instance = 1;
string message = 2;
uint64 timestamp = 3;
}

service ServiceSync {
rpc InventoryUpdate(stream InventoryUpdateList) returns (stream google.protobuf.Empty) {}
rpc StateUpdate(stream StateUpdateList) returns (stream google.protobuf.Empty) {}
}
rpc HeartBeatUpdate(stream HeartBeat) returns (stream google.protobuf.Empty) {}
}

0 comments on commit dcafd6d

Please sign in to comment.