diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java index 84d6d378242..f754d6f5b1d 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java @@ -28,10 +28,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; @Slf4j -public abstract class AbstractActivityManager implements ActivityManager { +public abstract class AbstractActivityManager implements ActivityManager { private final ConcurrentMap states = new ConcurrentHashMap<>(); @@ -54,8 +53,6 @@ protected void init() { protected abstract long getReportingPeriodMillis(); - protected abstract ActivityState createNewState(Key key); - protected abstract ActivityStrategy getStrategy(); protected abstract ActivityState updateState(Key key, ActivityState state); @@ -67,7 +64,7 @@ protected void init() { protected abstract void reportActivity(Key key, Metadata metadata, long timeToReport, ActivityReportCallback callback); @Override - public void onActivity(Key key, long newLastRecordedTime) { + public void onActivity(Key key, Metadata metadata, long newLastRecordedTime) { if (key == null) { log.error("Failed to process activity event: provided activity key is null."); return; @@ -77,36 +74,28 @@ public void onActivity(Key key, long newLastRecordedTime) { var shouldReport = new AtomicBoolean(false); var lastRecordedTime = new AtomicLong(); var lastReportedTime = new AtomicLong(); - var metadata = new AtomicReference(); - var activityStateWrapper = states.compute(key, (__, stateWrapper) -> { + states.compute(key, (__, stateWrapper) -> { if (stateWrapper == null) { - var newState = createNewState(key); - if (newState == null) { - return null; - } + ActivityState newState = new ActivityState<>(); stateWrapper = new ActivityStateWrapper(); stateWrapper.setState(newState); stateWrapper.setStrategy(getStrategy()); } var state = stateWrapper.getState(); + state.setMetadata(metadata); if (state.getLastRecordedTime() < newLastRecordedTime) { state.setLastRecordedTime(newLastRecordedTime); } shouldReport.set(stateWrapper.getStrategy().onActivity()); lastRecordedTime.set(state.getLastRecordedTime()); lastReportedTime.set(stateWrapper.getLastReportedTime()); - metadata.set(state.getMetadata()); return stateWrapper; }); - if (activityStateWrapper == null) { - return; - } - if (shouldReport.get() && lastReportedTime.get() < lastRecordedTime.get()) { log.debug("Going to report first activity event for key: [{}].", key); - reportActivity(key, metadata.get(), lastRecordedTime.get(), new ActivityReportCallback<>() { + reportActivity(key, metadata, lastRecordedTime.get(), new ActivityReportCallback<>() { @Override public void onSuccess(Key key, long reportedTime) { updateLastReportedTime(key, reportedTime); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java index 0f2145ab6f6..5d27738429e 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java @@ -15,9 +15,9 @@ */ package org.thingsboard.server.common.transport.activity; -public interface ActivityManager { +public interface ActivityManager { - void onActivity(Key key, long activityTimeMillis); + void onActivity(Key key, Metadata metadata, long activityTimeMillis); void onReportingPeriodEnd(); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index f397a6c02a7..0d7fbfc332f 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -96,8 +96,6 @@ import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.common.AsyncCallbackTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.QueueKey; -import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TopicService; @@ -774,7 +772,7 @@ public void recordActivity(TransportProtos.SessionInfoProto sessionInfo) { } private void recordActivityInternal(TransportProtos.SessionInfoProto sessionInfo) { - onActivity(toSessionId(sessionInfo), getCurrentTimeMillis()); + onActivity(toSessionId(sessionInfo), sessionInfo, getCurrentTimeMillis()); } @Override diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportActivityManager.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportActivityManager.java index 1368785466c..0c9ebd0e4b2 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportActivityManager.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportActivityManager.java @@ -57,17 +57,6 @@ protected long getReportingPeriodMillis() { return sessionReportTimeout; } - @Override - protected ActivityState createNewState(UUID sessionId) { - SessionMetaData session = sessions.get(sessionId); - if (session == null) { - return null; - } - ActivityState state = new ActivityState<>(); - state.setMetadata(session.getSessionInfo()); - return state; - } - @Override protected ActivityStrategy getStrategy() { return reportingStrategyType.toStrategy(); diff --git a/common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/service/TransportActivityManagerTest.java b/common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/service/TransportActivityManagerTest.java index e0f5ef128e8..a87532541db 100644 --- a/common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/service/TransportActivityManagerTest.java +++ b/common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/service/TransportActivityManagerTest.java @@ -40,6 +40,8 @@ import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -63,6 +65,34 @@ public void setup() { ReflectionTestUtils.setField(transportServiceMock, "sessions", sessions); } + @Test + void givenFirstActivityForAlreadyRemovedSessionAndFirstEventReportingStrategy_whenOnActivity_thenShouldRecordActivityAndReport() { + // GIVEN + ConcurrentMap states = new ConcurrentHashMap<>(); + ReflectionTestUtils.setField(transportServiceMock, "states", states); + + var strategyMock = mock(ActivityStrategy.class); + when(transportServiceMock.getStrategy()).thenReturn(strategyMock); + when(strategyMock.onActivity()).thenReturn(true); + + long activityTime = 123L; + var sessionInfo = TransportProtos.SessionInfoProto.newBuilder() + .setSessionIdMSB(SESSION_ID.getMostSignificantBits()) + .setSessionIdLSB(SESSION_ID.getLeastSignificantBits()) + .build(); + + doCallRealMethod().when(transportServiceMock).getLastRecordedTime(SESSION_ID); + doCallRealMethod().when(transportServiceMock).onActivity(SESSION_ID, sessionInfo, activityTime); + + // WHEN + transportServiceMock.onActivity(SESSION_ID, sessionInfo, activityTime); + + // THEN + assertThat(states).containsKey(SESSION_ID); + assertThat(transportServiceMock.getLastRecordedTime(SESSION_ID)).isEqualTo(activityTime); + verify(transportServiceMock).reportActivity(eq(SESSION_ID), eq(sessionInfo), eq(activityTime), any(ActivityReportCallback.class)); + } + @Test void givenKeyAndTimeToReportAndSessionExists_whenReportingActivity_thenShouldReportActivityWithSubscriptionsAndSessionInfoFromSession() { // GIVEN @@ -175,28 +205,7 @@ void givenActivityHappened_whenRecordActivity_thenShouldDelegateToOnActivity() { transportServiceMock.recordActivity(sessionInfo); // THEN - verify(transportServiceMock).onActivity(SESSION_ID, expectedTime); - } - - @Test - void givenKey_whenCreatingNewState_thenShouldCorrectlyCreateNewEmptyState() { - // GIVEN - TransportProtos.SessionInfoProto sessionInfo = TransportProtos.SessionInfoProto.newBuilder() - .setSessionIdMSB(SESSION_ID.getMostSignificantBits()) - .setSessionIdLSB(SESSION_ID.getLeastSignificantBits()) - .build(); - sessions.put(SESSION_ID, new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, null)); - - when(transportServiceMock.createNewState(SESSION_ID)).thenCallRealMethod(); - - ActivityState expectedState = new ActivityState<>(); - expectedState.setMetadata(sessionInfo); - - // WHEN - ActivityState actualState = transportServiceMock.createNewState(SESSION_ID); - - // THEN - assertThat(actualState).isEqualTo(expectedState); + verify(transportServiceMock).onActivity(SESSION_ID, sessionInfo, expectedTime); } @ParameterizedTest