Skip to content

Commit

Permalink
Check NDEATH for matching bdseq - ignore otherwise eclipse-sparkplug#502
Browse files Browse the repository at this point in the history
  • Loading branch information
icraggs-sparkplug committed Apr 22, 2024
1 parent e56de52 commit fc1f887
Showing 1 changed file with 37 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2021, 2022 Ian Craggs
* Copyright (c) 2021, 2024 Ian Craggs
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
Expand Down Expand Up @@ -47,6 +47,7 @@
import static org.eclipse.sparkplug.tck.test.common.Requirements.OPERATIONAL_BEHAVIOR_EDGE_NODE_TERMINATION_HOST_OFFLINE_RECONNECT;
import static org.eclipse.sparkplug.tck.test.common.Requirements.OPERATIONAL_BEHAVIOR_EDGE_NODE_TERMINATION_HOST_OFFLINE_TIMESTAMP;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -62,6 +63,7 @@
import org.eclipse.sparkplug.tck.test.TCKTest;
import org.eclipse.sparkplug.tck.test.common.Constants;
import org.eclipse.sparkplug.tck.test.common.Constants.TestStatus;
import org.eclipse.sparkplug.tck.test.common.SparkplugBProto;
import org.eclipse.sparkplug.tck.test.common.Utils;
import org.jboss.test.audit.annotations.SpecAssertion;
import org.jboss.test.audit.annotations.SpecVersion;
Expand All @@ -83,6 +85,7 @@ public class PrimaryHostTest extends TCKTest {

private static final @NotNull Logger logger = LoggerFactory.getLogger("Sparkplug");
public static final String PROPERTY_KEY_QUALITY = "Quality";
private static final String BD_SEQ = "bdSeq";

public static final @NotNull List<String> testIds = List.of(ID_MESSAGE_FLOW_EDGE_NODE_BIRTH_PUBLISH_PHID_WAIT,
ID_MESSAGE_FLOW_EDGE_NODE_BIRTH_PUBLISH_PHID_WAIT_ID,
Expand All @@ -101,6 +104,7 @@ public class PrimaryHostTest extends TCKTest {
private @NotNull String edgeNodeId;
private @NotNull String hostApplicationId;
private @NotNull long seqUnassigned = -1;
private @NotNull long birthSeq = -1; // record the nbirth seq to check for matching ndeath
private Utilities utilities = null;

private TestStatus state = TestStatus.NONE;
Expand Down Expand Up @@ -350,6 +354,18 @@ public void subscribe(final @NotNull String clientId, final @NotNull SubscribePa
// TODO Auto-generated method stub
}

private long getBdSeq(final ByteBuffer payload) {
final SparkplugBProto.PayloadOrBuilder inboundPayload = Utils.decode(payload);
if (inboundPayload != null) {
for (SparkplugBProto.Payload.Metric m : inboundPayload.getMetricsList()) {
if (m.getName().equals(BD_SEQ)) {
return m.getLongValue();
}
}
}
return -1L;
}

@SpecAssertion(
section = Sections.OPERATIONAL_BEHAVIOR_EDGE_NODE_SESSION_ESTABLISHMENT,
id = ID_MESSAGE_FLOW_EDGE_NODE_BIRTH_PUBLISH_PHID_WAIT)
Expand Down Expand Up @@ -387,6 +403,10 @@ public void publish(final @NotNull String clientId, final @NotNull PublishPacket

if (topic.equals(Constants.TOPIC_ROOT_SP_BV_1_0 + "/" + groupId + "/" + Constants.TOPIC_PATH_NBIRTH + "/"
+ edgeNodeId)) {

ByteBuffer payload = packet.getPayload().orElseGet(null);
birthSeq = getBdSeq(payload);

// found the edge NBIRTH
if (state == TestStatus.WRONG_HOST_ONLINE) {
// received NBIRTH for wrong host
Expand Down Expand Up @@ -466,20 +486,25 @@ public void publish(final @NotNull String clientId, final @NotNull PublishPacket
}
} else if (topic.equals(Constants.TOPIC_ROOT_SP_BV_1_0 + "/" + groupId + "/" + Constants.TOPIC_PATH_NDEATH + "/"
+ edgeNodeId)) {
ByteBuffer payload = packet.getPayload().orElseGet(null);
long deathSeq = getBdSeq(payload);

logger.info("Received NDEATH in state " + state.name());
Utils.setResultIfNotFail(testResults, state == TestStatus.EXPECT_DEATHS || state == TestStatus.HOST_OFFLINE,
ID_OPERATIONAL_BEHAVIOR_EDGE_NODE_TERMINATION_HOST_OFFLINE,
OPERATIONAL_BEHAVIOR_EDGE_NODE_TERMINATION_HOST_OFFLINE);
logger.info("Received NDEATH in state " + state.name() + " with bdseq " + deathSeq + " (birthSeq " + birthSeq + ")");

Utils.setResultIfNotFail(testResults, state == TestStatus.EXPECT_DEATHS || state == TestStatus.HOST_OFFLINE,
ID_MESSAGE_FLOW_EDGE_NODE_BIRTH_PUBLISH_PHID_OFFLINE,
MESSAGE_FLOW_EDGE_NODE_BIRTH_PUBLISH_PHID_OFFLINE);
if (deathSeq == birthSeq) { // ignore ndeaths from different births
Utils.setResultIfNotFail(testResults, state == TestStatus.EXPECT_DEATHS || state == TestStatus.HOST_OFFLINE,
ID_OPERATIONAL_BEHAVIOR_EDGE_NODE_TERMINATION_HOST_OFFLINE,
OPERATIONAL_BEHAVIOR_EDGE_NODE_TERMINATION_HOST_OFFLINE);

if (state == TestStatus.DONT_EXPECT_DEATHS) {
Utils.setResult(testResults, false,
ID_OPERATIONAL_BEHAVIOR_EDGE_NODE_TERMINATION_HOST_OFFLINE_TIMESTAMP,
OPERATIONAL_BEHAVIOR_EDGE_NODE_TERMINATION_HOST_OFFLINE_TIMESTAMP);
Utils.setResultIfNotFail(testResults, state == TestStatus.EXPECT_DEATHS || state == TestStatus.HOST_OFFLINE,
ID_MESSAGE_FLOW_EDGE_NODE_BIRTH_PUBLISH_PHID_OFFLINE,
MESSAGE_FLOW_EDGE_NODE_BIRTH_PUBLISH_PHID_OFFLINE);

if (state == TestStatus.DONT_EXPECT_DEATHS) {
Utils.setResult(testResults, false,
ID_OPERATIONAL_BEHAVIOR_EDGE_NODE_TERMINATION_HOST_OFFLINE_TIMESTAMP,
OPERATIONAL_BEHAVIOR_EDGE_NODE_TERMINATION_HOST_OFFLINE_TIMESTAMP);
}
}

} else if (topic.equals(Constants.TOPIC_ROOT_SP_BV_1_0 + "/" + groupId + "/" + Constants.TOPIC_PATH_DDEATH + "/"
Expand Down

0 comments on commit fc1f887

Please sign in to comment.