Skip to content

Commit

Permalink
Merge pull request thingsboard#10013 from dskarzh/fix/activity-manage…
Browse files Browse the repository at this point in the history
…r/new-state-creation

Fix new activity state not being created correctly
  • Loading branch information
ashvayka authored Jan 19, 2024
2 parents ebbd290 + cc5fba1 commit 6c6b606
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

@Slf4j
public abstract class AbstractActivityManager<Key, Metadata> implements ActivityManager<Key> {
public abstract class AbstractActivityManager<Key, Metadata> implements ActivityManager<Key, Metadata> {

private final ConcurrentMap<Key, ActivityStateWrapper> states = new ConcurrentHashMap<>();

Expand All @@ -54,8 +53,6 @@ protected void init() {

protected abstract long getReportingPeriodMillis();

protected abstract ActivityState<Metadata> createNewState(Key key);

protected abstract ActivityStrategy getStrategy();

protected abstract ActivityState<Metadata> updateState(Key key, ActivityState<Metadata> state);
Expand All @@ -67,7 +64,7 @@ protected void init() {
protected abstract void reportActivity(Key key, Metadata metadata, long timeToReport, ActivityReportCallback<Key> callback);

@Override
public void onActivity(Key key, long newLastRecordedTime) {
public void onActivity(Key key, Metadata metadata, long newLastRecordedTime) {
if (key == null) {
log.error("Failed to process activity event: provided activity key is null.");
return;
Expand All @@ -77,36 +74,28 @@ public void onActivity(Key key, long newLastRecordedTime) {
var shouldReport = new AtomicBoolean(false);
var lastRecordedTime = new AtomicLong();
var lastReportedTime = new AtomicLong();
var metadata = new AtomicReference<Metadata>();

var activityStateWrapper = states.compute(key, (__, stateWrapper) -> {
states.compute(key, (__, stateWrapper) -> {
if (stateWrapper == null) {
var newState = createNewState(key);
if (newState == null) {
return null;
}
ActivityState<Metadata> newState = new ActivityState<>();
stateWrapper = new ActivityStateWrapper();
stateWrapper.setState(newState);
stateWrapper.setStrategy(getStrategy());
}
var state = stateWrapper.getState();
state.setMetadata(metadata);
if (state.getLastRecordedTime() < newLastRecordedTime) {
state.setLastRecordedTime(newLastRecordedTime);
}
shouldReport.set(stateWrapper.getStrategy().onActivity());
lastRecordedTime.set(state.getLastRecordedTime());
lastReportedTime.set(stateWrapper.getLastReportedTime());
metadata.set(state.getMetadata());
return stateWrapper;
});

if (activityStateWrapper == null) {
return;
}

if (shouldReport.get() && lastReportedTime.get() < lastRecordedTime.get()) {
log.debug("Going to report first activity event for key: [{}].", key);
reportActivity(key, metadata.get(), lastRecordedTime.get(), new ActivityReportCallback<>() {
reportActivity(key, metadata, lastRecordedTime.get(), new ActivityReportCallback<>() {
@Override
public void onSuccess(Key key, long reportedTime) {
updateLastReportedTime(key, reportedTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
package org.thingsboard.server.common.transport.activity;

public interface ActivityManager<Key> {
public interface ActivityManager<Key, Metadata> {

void onActivity(Key key, long activityTimeMillis);
void onActivity(Key key, Metadata metadata, long activityTimeMillis);

void onReportingPeriodEnd();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.AsyncCallbackTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
Expand Down Expand Up @@ -774,7 +772,7 @@ public void recordActivity(TransportProtos.SessionInfoProto sessionInfo) {
}

private void recordActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
onActivity(toSessionId(sessionInfo), getCurrentTimeMillis());
onActivity(toSessionId(sessionInfo), sessionInfo, getCurrentTimeMillis());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,6 @@ protected long getReportingPeriodMillis() {
return sessionReportTimeout;
}

@Override
protected ActivityState<TransportProtos.SessionInfoProto> createNewState(UUID sessionId) {
SessionMetaData session = sessions.get(sessionId);
if (session == null) {
return null;
}
ActivityState<TransportProtos.SessionInfoProto> state = new ActivityState<>();
state.setMetadata(session.getSessionInfo());
return state;
}

@Override
protected ActivityStrategy getStrategy() {
return reportingStrategyType.toStrategy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand All @@ -63,6 +65,34 @@ public void setup() {
ReflectionTestUtils.setField(transportServiceMock, "sessions", sessions);
}

@Test
void givenFirstActivityForAlreadyRemovedSessionAndFirstEventReportingStrategy_whenOnActivity_thenShouldRecordActivityAndReport() {
// GIVEN
ConcurrentMap<UUID, Object> states = new ConcurrentHashMap<>();
ReflectionTestUtils.setField(transportServiceMock, "states", states);

var strategyMock = mock(ActivityStrategy.class);
when(transportServiceMock.getStrategy()).thenReturn(strategyMock);
when(strategyMock.onActivity()).thenReturn(true);

long activityTime = 123L;
var sessionInfo = TransportProtos.SessionInfoProto.newBuilder()
.setSessionIdMSB(SESSION_ID.getMostSignificantBits())
.setSessionIdLSB(SESSION_ID.getLeastSignificantBits())
.build();

doCallRealMethod().when(transportServiceMock).getLastRecordedTime(SESSION_ID);
doCallRealMethod().when(transportServiceMock).onActivity(SESSION_ID, sessionInfo, activityTime);

// WHEN
transportServiceMock.onActivity(SESSION_ID, sessionInfo, activityTime);

// THEN
assertThat(states).containsKey(SESSION_ID);
assertThat(transportServiceMock.getLastRecordedTime(SESSION_ID)).isEqualTo(activityTime);
verify(transportServiceMock).reportActivity(eq(SESSION_ID), eq(sessionInfo), eq(activityTime), any(ActivityReportCallback.class));
}

@Test
void givenKeyAndTimeToReportAndSessionExists_whenReportingActivity_thenShouldReportActivityWithSubscriptionsAndSessionInfoFromSession() {
// GIVEN
Expand Down Expand Up @@ -175,28 +205,7 @@ void givenActivityHappened_whenRecordActivity_thenShouldDelegateToOnActivity() {
transportServiceMock.recordActivity(sessionInfo);

// THEN
verify(transportServiceMock).onActivity(SESSION_ID, expectedTime);
}

@Test
void givenKey_whenCreatingNewState_thenShouldCorrectlyCreateNewEmptyState() {
// GIVEN
TransportProtos.SessionInfoProto sessionInfo = TransportProtos.SessionInfoProto.newBuilder()
.setSessionIdMSB(SESSION_ID.getMostSignificantBits())
.setSessionIdLSB(SESSION_ID.getLeastSignificantBits())
.build();
sessions.put(SESSION_ID, new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, null));

when(transportServiceMock.createNewState(SESSION_ID)).thenCallRealMethod();

ActivityState<TransportProtos.SessionInfoProto> expectedState = new ActivityState<>();
expectedState.setMetadata(sessionInfo);

// WHEN
ActivityState<TransportProtos.SessionInfoProto> actualState = transportServiceMock.createNewState(SESSION_ID);

// THEN
assertThat(actualState).isEqualTo(expectedState);
verify(transportServiceMock).onActivity(SESSION_ID, sessionInfo, expectedTime);
}

@ParameterizedTest
Expand Down

0 comments on commit 6c6b606

Please sign in to comment.