Skip to content

Commit

Permalink
Merge pull request #327 from wes-johnson/develop
Browse files Browse the repository at this point in the history
Ensured STATE status is only updated for the appropriate Sparkplug Host ID
  • Loading branch information
wes-johnson authored Oct 16, 2023
2 parents fdfaebc + 99d8c65 commit 1c23f1c
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public HostApplication(HostApplicationEventHandler eventHandler, String hostId,

SequenceReorderManager sequenceReorderManager = SequenceReorderManager.getInstance();
sequenceReorderManager.init(eventHandler, this, payloadDecoder, 5000L);
this.tahuHostCallback = new TahuHostCallback(eventHandler, this, sequenceReorderManager, payloadDecoder);
this.tahuHostCallback =
new TahuHostCallback(eventHandler, this, sequenceReorderManager, payloadDecoder, hostId);
}

public HostApplication(HostApplicationEventHandler eventHandler, String hostId, List<String> sparkplugSubscriptons,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,11 @@ public class TahuHostCallback implements ClientCallback {

private final PayloadDecoder<SparkplugBPayload> payloadDecoder;

private final String hostId;

public TahuHostCallback(HostApplicationEventHandler eventHandler, CommandPublisher commandPublisher,
SequenceReorderManager sequenceReorderManager, PayloadDecoder<SparkplugBPayload> payloadDecoder) {
SequenceReorderManager sequenceReorderManager, PayloadDecoder<SparkplugBPayload> payloadDecoder,
String hostId) {
this.eventHandler = eventHandler;
this.commandPublisher = commandPublisher;
if (sequenceReorderManager != null) {
Expand All @@ -72,6 +75,7 @@ public TahuHostCallback(HostApplicationEventHandler eventHandler, CommandPublish
this.sequenceReorderManager = null;
}
this.payloadDecoder = payloadDecoder;
this.hostId = hostId;

this.sparkplugBExecutors = new ThreadPoolExecutor[DEFAULT_NUM_OF_THREADS];
for (int i = 0; i < DEFAULT_NUM_OF_THREADS; i++) {
Expand Down Expand Up @@ -137,7 +141,8 @@ public void messageArrived(MqttServerName server, MqttServerUrl url, MqttClientI
// This is a STATE message - handle as needed
ObjectMapper mapper = new ObjectMapper();
StatePayload statePayload = mapper.readValue(new String(message.getPayload()), StatePayload.class);
if (!statePayload.isOnline()) {
if (hostId != null && !hostId.trim().isEmpty() && splitTopic[2].equals(hostId)
&& !statePayload.isOnline()) {
// Make sure this isn't an OFFLINE message
logger.info(
"This is a offline STATE message from {} - correcting with new online STATE message",
Expand Down

0 comments on commit 1c23f1c

Please sign in to comment.