Skip to content

Commit

Permalink
Merge pull request #528 from icraggs-sparkplug/develop
Browse files Browse the repository at this point in the history
Allow nodes without a separate device #517
  • Loading branch information
wes-johnson authored Jun 18, 2024
2 parents cb82610 + 117e9d3 commit 370c6d9
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 122 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2021, 2023 Ian Craggs
* Copyright (c) 2021, 2024 Ian Craggs
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
Expand Down Expand Up @@ -84,11 +84,11 @@ public class MultipleBrokerTest extends TCKTest {
ID_OPERATIONAL_BEHAVIOR_PRIMARY_APPLICATION_STATE_WITH_MULTIPLE_SERVERS_WALK,
ID_OPERATIONAL_BEHAVIOR_EDGE_NODE_BIRTH_SEQUENCE_WAIT);

private @NotNull String deviceId;
private @NotNull String groupId;
private @NotNull String edgeNodeId;
private @NotNull String hostApplicationId;
private @NotNull String brokerURL;
private @NotNull String deviceId = null;
private @NotNull String groupId = null;
private @NotNull String edgeNodeId = null;
private @NotNull String hostApplicationId = null;
private @NotNull String brokerURL = null;

private HostApplication broker2 = new HostApplication("tcp://localhost:1884");

Expand All @@ -111,16 +111,20 @@ public MultipleBrokerTest(TCK aTCK, Utilities utilities, String[] parms, Results
// a portion of the bdSeq numbers
utilities.getMonitor().setIgnoreBdSeqNumCheck(true);

if (parms.length < 5) {
if (parms.length < 4) {
log("Not enough parameters: " + Arrays.toString(parms));
log("Parameters to edge multiple broker test must be: hostApplicationId, groupId edgeNodeId deviceId brokerURL");
log("Parameters to edge multiple broker test must be: hostApplicationId, groupId edgeNodeId {deviceId} brokerURL");
throw new IllegalArgumentException();
}
hostApplicationId = parms[0];
groupId = parms[1];
edgeNodeId = parms[2];
deviceId = parms[3];
brokerURL = parms[4];
if (parms.length == 4) {
brokerURL = parms[3];
} else {
deviceId = parms[3];
brokerURL = parms[4];
}
logger.info("Parameters are HostApplicationId: {}, GroupId: {}, EdgeNodeId: {}, DeviceId: {} BrokerURL: {}",
hostApplicationId, groupId, edgeNodeId, deviceId, brokerURL);

Expand Down Expand Up @@ -358,10 +362,31 @@ public void publish(String clientId, PublishPacket packet) {
Utils.setResultIfNotFail(testResults, true, ID_OPERATIONAL_BEHAVIOR_EDGE_NODE_BIRTH_SEQUENCE_WAIT,
OPERATIONAL_BEHAVIOR_EDGE_NODE_BIRTH_SEQUENCE_WAIT);

if (deviceId == null) {
// we've received NBIRTH, and not expecting DBIRTH, so set the second host online
executorService.schedule(new Runnable() {
@Override
public void run() {
setHost2Online();
}
}, 1, TimeUnit.SECONDS);
}

} else if (state == TestStatus.EXPECT_DEATHS_AND_BIRTHS && births_on == 1) {
Utils.setResultIfNotFail(testResults, true, ID_OPERATIONAL_BEHAVIOR_EDGE_NODE_BIRTH_SEQUENCE_WAIT,
OPERATIONAL_BEHAVIOR_EDGE_NODE_BIRTH_SEQUENCE_WAIT);

if (deviceId == null) { // there's not going to be a DBIRTH
// the edge node has reconnected to server 1 so that's the end of the test
state = TestStatus.ENDING;
executorService.schedule(new Runnable() {
@Override
public void run() {
theTCK.endTest();
}
}, 1, TimeUnit.SECONDS);
}

} else {
// any other state is wrong
logger.error("{} error received NBIRTH in state {}", getName(), state.toString());
Expand All @@ -373,7 +398,7 @@ public void publish(String clientId, PublishPacket packet) {
Utils.setResult(testResults, false, ID_OPERATIONAL_BEHAVIOR_EDGE_NODE_BIRTH_SEQUENCE_WAIT,
OPERATIONAL_BEHAVIOR_EDGE_NODE_BIRTH_SEQUENCE_WAIT);
}
} else if (topic.equals(Constants.TOPIC_ROOT_SP_BV_1_0 + "/" + groupId + "/" + Constants.TOPIC_PATH_DBIRTH + "/"
} else if (deviceId != null && topic.equals(Constants.TOPIC_ROOT_SP_BV_1_0 + "/" + groupId + "/" + Constants.TOPIC_PATH_DBIRTH + "/"
+ edgeNodeId + "/" + deviceId)) {
// found the device DBIRTH
dbirthReceived = true;
Expand Down Expand Up @@ -428,7 +453,7 @@ public void run() {
ID_OPERATIONAL_BEHAVIOR_PRIMARY_APPLICATION_STATE_WITH_MULTIPLE_SERVERS_WALK,
OPERATIONAL_BEHAVIOR_PRIMARY_APPLICATION_STATE_WITH_MULTIPLE_SERVERS_WALK);
}
} else if (topic.equals(Constants.TOPIC_ROOT_SP_BV_1_0 + "/" + groupId + "/" + Constants.TOPIC_PATH_DDEATH + "/"
} else if (deviceId != null && topic.equals(Constants.TOPIC_ROOT_SP_BV_1_0 + "/" + groupId + "/" + Constants.TOPIC_PATH_DDEATH + "/"
+ edgeNodeId + "/" + deviceId)) {

if (state == TestStatus.ENDING) {
Expand Down Expand Up @@ -460,13 +485,13 @@ private void checkBirths() {
if (topic.equals(Constants.TOPIC_ROOT_SP_BV_1_0 + "/" + groupId + "/" + Constants.TOPIC_PATH_NBIRTH + "/"
+ edgeNodeId)) {
nbirth_found = true;
} else if (topic.equals(Constants.TOPIC_ROOT_SP_BV_1_0 + "/" + groupId + "/" + Constants.TOPIC_PATH_DBIRTH
} else if (deviceId != null && topic.equals(Constants.TOPIC_ROOT_SP_BV_1_0 + "/" + groupId + "/" + Constants.TOPIC_PATH_DBIRTH
+ "/" + edgeNodeId + "/" + deviceId)) {
dbirth_found = true;
}
msg = broker2.getNextMessage();
}
Utils.setResultIfNotFail(testResults, nbirth_found && dbirth_found,
Utils.setResultIfNotFail(testResults, nbirth_found && (deviceId == null || dbirth_found),
ID_OPERATIONAL_BEHAVIOR_PRIMARY_APPLICATION_STATE_WITH_MULTIPLE_SERVERS_WALK,
OPERATIONAL_BEHAVIOR_PRIMARY_APPLICATION_STATE_WITH_MULTIPLE_SERVERS_WALK);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,30 +99,32 @@ public class PrimaryHostTest extends TCKTest {
private final @NotNull TCK theTCK;
private final ManagedExtensionExecutorService executorService = Services.extensionExecutorService();

private @NotNull String deviceId;
private @NotNull String groupId;
private @NotNull String edgeNodeId;
private @NotNull String hostApplicationId;
private @NotNull String deviceId = null;
private @NotNull String groupId = null;
private @NotNull String edgeNodeId = null;
private @NotNull String hostApplicationId = null;
private @NotNull long seqUnassigned = -1;
private @NotNull long birthSeq = -1; // record the nbirth seq to check for matching ndeath
private Utilities utilities = null;

private TestStatus state = TestStatus.NONE;

public PrimaryHostTest(TCK aTCK, Utilities utilities, String[] parms, Results.Config config) {
logger.info("Edge Node payload validation test. Parameters: {} ", Arrays.asList(parms));
logger.info("Edge Node Primary Host test. Parameters: {} ", Arrays.asList(parms));
theTCK = aTCK;
this.utilities = utilities;

if (parms.length < 4) {
if (parms.length < 3) {
log("Not enough parameters: " + Arrays.toString(parms));
log("Parameters to edge primary host test must be: hostId groupId edgeNodeId deviceId");
log("Parameters to edge primary host test must be: hostId groupId edgeNodeId {deviceId}");
throw new IllegalArgumentException();
}
hostApplicationId = parms[0];
groupId = parms[1];
edgeNodeId = parms[2];
deviceId = parms[3];
if (parms.length == 4) {
deviceId = parms[3];
}
logger.info("Parameters are HostId: {}, GroupId: {}, EdgeNodeId: {}, DeviceId: {}", hostApplicationId, groupId,
edgeNodeId, deviceId);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2021, 2023 Ian Craggs
* Copyright (c) 2021, 2024 Ian Craggs
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
Expand Down Expand Up @@ -105,13 +105,13 @@ public class ReceiveCommandTest extends TCKTest {
ID_OPERATIONAL_BEHAVIOR_DATA_COMMANDS_REBIRTH_ACTION_2,
ID_OPERATIONAL_BEHAVIOR_DATA_COMMANDS_REBIRTH_ACTION_3, ID_PAYLOADS_NDEATH_WILL_MESSAGE);

private @NotNull TestStatus state;
private @NotNull String deviceId;
private @NotNull String groupId;
private @NotNull String edgeNodeId;
private @NotNull String hostApplicationId;
private @NotNull TestStatus state = null;
private @NotNull String deviceId = null;
private @NotNull String groupId = null;
private @NotNull String edgeNodeId = null;
private @NotNull String hostApplicationId = null;
private @NotNull long deathBdSeq = -1;
private String edgeNodeClientId;
private String edgeNodeClientId = null;
private boolean bNBirth = false, bDBirth = false;
private PublishService publishService = Services.publishService();
private final ManagedExtensionExecutorService executorService = Services.extensionExecutorService();
Expand All @@ -124,17 +124,19 @@ public ReceiveCommandTest(TCK aTCK, Utilities utilities, String[] params, Result
theTCK = aTCK;
this.utilities = utilities;

if (params.length < 4) {
if (params.length < 3) {
log("Not enough parameters: " + Arrays.toString(params));
log("Parameters to edge receive command test must be: hostApplicationId groupId edgeNodeId deviceId");
log("Parameters to edge receive command test must be: hostApplicationId groupId edgeNodeId {deviceId}");
throw new IllegalArgumentException();
}
state = TestStatus.NONE;
deathBdSeq = -1;
hostApplicationId = params[0];
groupId = params[1];
edgeNodeId = params[2];
deviceId = params[3];
if (params.length == 4) {
deviceId = params[3];
}
logger.info("Host application id: {}, Group id: {}, Edge node id: {}, Device id: {}", hostApplicationId,
groupId, edgeNodeId, deviceId);

Expand Down Expand Up @@ -314,8 +316,13 @@ public void publish(final String clientId, final PublishPacket packet) {

if (state == TestStatus.EXPECT_NODE_BIRTH
&& topic.equals(TOPIC_ROOT_SP_BV_1_0 + "/" + groupId + "/" + TOPIC_PATH_NBIRTH + "/" + edgeNodeId)) {
log("Edge " + edgeNodeId + " is now online");
state = TestStatus.EXPECT_DEVICE_BIRTH;
if (deviceId == null) {
log("Edge " + edgeNodeId + " is now online, sending rebirth");
sendRebirth(true);
} else {
log("Edge " + edgeNodeId + " is now online");
state = TestStatus.EXPECT_DEVICE_BIRTH;
}
} else if (state == TestStatus.EXPECT_DEVICE_BIRTH && topic.equals(
TOPIC_ROOT_SP_BV_1_0 + "/" + groupId + "/" + TOPIC_PATH_DBIRTH + "/" + edgeNodeId + "/" + deviceId)) {
log("Device " + deviceId + " is now online, sending rebirth");
Expand Down Expand Up @@ -343,7 +350,7 @@ public void publish(final String clientId, final PublishPacket packet) {
deathBdSeq, birthSeq);
}
}
} else if (levels.length == 5 && levels[2].equals(TOPIC_PATH_DBIRTH)) {
} else if (levels.length == 5 && levels[2].equals(TOPIC_PATH_DBIRTH) && levels[4].equals(deviceId)) {
log("Device birth received for device: " + levels[4]);
bDBirth = true;

Expand All @@ -363,7 +370,7 @@ public void publish(final String clientId, final PublishPacket packet) {
setResult(false, OPERATIONAL_BEHAVIOR_DATA_COMMANDS_REBIRTH_ACTION_1));
}

if (bNBirth && bDBirth && state == TestStatus.SENDING_NODE_REBIRTH) {
if (bNBirth && (deviceId == null || bDBirth) && state == TestStatus.SENDING_NODE_REBIRTH) {
logger.debug(
"Check Req: {} After an Edge Node stops sending DATA messages, it MUST send a complete BIRTH sequence including the NBIRTH and DBIRTH(s) if applicable.",
ID_OPERATIONAL_BEHAVIOR_DATA_COMMANDS_REBIRTH_ACTION_2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ public class SendComplexDataTest extends TCKTest {
private final @NotNull TCK theTCK;
private @NotNull Utilities utilities = null;

private @NotNull String deviceId;
private @NotNull String groupId;
private @NotNull String edgeNodeId;
private @NotNull String hostApplicationId;
private @NotNull String deviceId = null;
private @NotNull String groupId = null;
private @NotNull String edgeNodeId = null;
private @NotNull String hostApplicationId = null;
private @NotNull long seqUnassigned = -1;

// Host Application variables
Expand All @@ -179,15 +179,17 @@ public SendComplexDataTest(TCK aTCK, Utilities utilities, String[] parms, Result
theTCK = aTCK;
this.utilities = utilities;

if (parms.length < 4) {
if (parms.length < 3) {
log("Not enough parameters: " + Arrays.toString(parms));
log(getName() + " Parameters must be: hostId groupId edgeNodeId deviceId");
log(getName() + " Parameters must be: hostId groupId edgeNodeId {deviceId}");
throw new IllegalArgumentException();
}
hostApplicationId = parms[0];
groupId = parms[1];
edgeNodeId = parms[2];
deviceId = parms[3];
if (parms.length == 4) {
deviceId = parms[3];
}
logger.info("Parameters are HostId: {}, GroupId: {}, EdgeNodeId: {}, DeviceId: {}", hostApplicationId, groupId,
edgeNodeId, deviceId);

Expand Down Expand Up @@ -271,10 +273,26 @@ public void publish(final @NotNull String clientId, final @NotNull PublishPacket
logger.error("Skip Edge payload validation - no sparkplug payload.");
return;
}
boolean isDataTopic =
isSparkplugTopic && (topic.contains(TOPIC_PATH_DDATA) || topic.contains(TOPIC_PATH_NDATA));
boolean isCommandTopic =
isSparkplugTopic && (topic.contains(TOPIC_PATH_NCMD) || topic.contains(TOPIC_PATH_DCMD));

String cmd = "";
String[] levels = topic.split("/");
if (levels.length >= 3) {
cmd = levels[2];
logger.info("Looking for {}", cmd);
} else {
return;
}
if (!levels[1].equals(groupId) || !levels[3].equals(edgeNodeId)) {
logger.error("GroupId {} or edgeNodeId {} not matched.", groupId, edgeNodeId);
return;
}
// ignore device messages if we don't have a deviceId
boolean isDataTopic = (deviceId != null && cmd.equals(TOPIC_PATH_DDATA)) || cmd.equals(TOPIC_PATH_NDATA);
boolean isCommandTopic = (deviceId != null && cmd.equals(TOPIC_PATH_DCMD)) || cmd.equals(TOPIC_PATH_NCMD);
if (deviceId != null && levels.length >= 5 && !levels[4].equals(deviceId)) {
logger.error("deviceId not matched.");
return;
}

checkPropertiesValidType(packet, topic);
checkSequenceNumberIncluded(packet, topic);
Expand All @@ -300,31 +318,27 @@ public void publish(final @NotNull String clientId, final @NotNull PublishPacket
}

private void checkDataTopicPayload(final @NotNull String clientId, final @NotNull PublishPacket packet,
final @NotNull String topic) {
if (clientId.contentEquals(deviceId) || topic.contains(groupId) && topic.contains(edgeNodeId)) {
final PayloadOrBuilder sparkplugPayload = Utils.getSparkplugPayload(packet);
if (sparkplugPayload != null) {
checkPayloadsNameRequirement(sparkplugPayload);
checkAliasInData(sparkplugPayload, topic);
checkMetricsDataTypeNotRec(sparkplugPayload, topic);
checkPayloadsNameInDataRequirement(sparkplugPayload);
} else {
logger.error("Skip Edge payload validation - no sparkplug payload.");
}
final @NotNull String topic) {
final PayloadOrBuilder sparkplugPayload = Utils.getSparkplugPayload(packet);
if (sparkplugPayload != null) {
checkPayloadsNameRequirement(sparkplugPayload);
checkAliasInData(sparkplugPayload, topic);
checkMetricsDataTypeNotRec(sparkplugPayload, topic);
checkPayloadsNameInDataRequirement(sparkplugPayload);
} else {
logger.error("Skip Edge payload validation - no sparkplug payload.");
}
}

private void checkCommandTopicPayload(final @NotNull String clientId, final @NotNull PublishPacket packet,
final @NotNull String topic) {
if (clientId.contentEquals(deviceId) || topic.contains(groupId) && topic.contains(edgeNodeId)) {
final PayloadOrBuilder sparkplugPayload = Utils.getSparkplugPayload(packet);
if (sparkplugPayload != null) {
checkAliasInData(sparkplugPayload, topic);
checkMetricsDataTypeNotRec(sparkplugPayload, topic);
checkPayloadsNameRequirement(sparkplugPayload);
} else {
logger.error("Skip Edge payload validation - no sparkplug payload.");
}
final @NotNull String topic) {
final PayloadOrBuilder sparkplugPayload = Utils.getSparkplugPayload(packet);
if (sparkplugPayload != null) {
checkAliasInData(sparkplugPayload, topic);
checkMetricsDataTypeNotRec(sparkplugPayload, topic);
checkPayloadsNameRequirement(sparkplugPayload);
} else {
logger.error("Skip Edge payload validation - no sparkplug payload.");
}
}

Expand Down Expand Up @@ -381,6 +395,7 @@ public void checkDatatypeValidType(final @NotNull PublishPacket packet) {
if(!Utils.hasValidDatatype(m)){
isValid_DataType = false;
isValid_DataTypeValue = false;
logger.error("Metric error {}", m);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,17 @@ public SendDataTest(TCK aTCK, Utilities utilities, String[] params, Results.Conf
this.utilities = utilities;
this.config = config;

if (params.length < 3) {
if (params.length < 2) {
log("Not enough parameters: " + Arrays.toString(params));
log("Parameters to send data test must be: groupId edgeNodeId deviceId");
log("Parameters to send data test must be: hostApplicationId groupId edgeNodeId {deviceId}");
throw new IllegalArgumentException();
}
hostApplicationId = params[0];
groupId = params[1];
edgeNodeId = params[2];
deviceId = params[3];
if (params.length == 4) {
deviceId = params[3];
}
logger.info("Parameters are Host application ID: {}, GroupId: {}, EdgeNodeId: {}, DeviceId: {}",
hostApplicationId, groupId, edgeNodeId, deviceId);

Expand Down Expand Up @@ -285,7 +287,7 @@ public void publish(String clientId, PublishPacket packet) {
checkDDATA(clientId, packet);
}

if (isEdgeNodeChecked && isDeviceChecked) {
if (isEdgeNodeChecked && (deviceId == null || isDeviceChecked)) {
theTCK.endTest();
}
}
Expand Down
Loading

0 comments on commit 370c6d9

Please sign in to comment.