Skip to content

Commit

Permalink
NET-414: add SolarFlux publishing to datum stream poll task.
Browse files Browse the repository at this point in the history
  • Loading branch information
msqr committed Oct 28, 2024
1 parent 93df3f4 commit 6a7c3d8
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@
import net.solarnetwork.central.c2c.domain.CloudDatumStreamSettings;
import net.solarnetwork.central.c2c.domain.CloudIntegrationsUserEvents;
import net.solarnetwork.central.dao.SolarNodeOwnershipDao;
import net.solarnetwork.central.datum.biz.DatumProcessor;
import net.solarnetwork.central.datum.domain.GeneralNodeDatum;
import net.solarnetwork.central.datum.domain.GeneralObjectDatum;
import net.solarnetwork.central.datum.support.DatumUtils;
import net.solarnetwork.central.datum.v2.dao.DatumEntity;
import net.solarnetwork.central.datum.v2.dao.DatumWriteOnlyDao;
import net.solarnetwork.central.domain.BasicClaimableJobState;
Expand Down Expand Up @@ -98,6 +101,7 @@ public class DaoCloudDatumStreamPollService
private final Function<String, CloudDatumStreamService> datumStreamServiceProvider;
private Duration shutdownMaxWait = DEFAULT_SHUTDOWN_MAX_WAIT;
private CloudDatumStreamSettings defaultDatumStreamSettings = DEFAULT_DATUM_STREAM_SETTINGS;
private DatumProcessor fluxPublisher;

/**
* Constructor.
Expand Down Expand Up @@ -351,6 +355,7 @@ private CloudDatumStreamPollTaskEntity executeTask() throws Exception {
if ( polledDatum != null && !polledDatum.isEmpty() ) {
log.debug("Polling for {} found {} datum to import", datumStreamIdent,
polledDatum.size());
final DatumProcessor fluxPublisher = getFluxPublisher();
for ( var datum : polledDatum ) {
if ( datum instanceof DatumEntity d ) {
if ( datumStreamSettings.isPublishToSolarIn() ) {
Expand All @@ -360,10 +365,21 @@ private CloudDatumStreamPollTaskEntity executeTask() throws Exception {
if ( datumStreamSettings.isPublishToSolarIn() ) {
datumDao.persist(d);
}
if ( fluxPublisher != null && datumStreamSettings.isPublishToSolarFlux()
&& datum instanceof GeneralNodeDatum nodeDatum ) {
fluxPublisher.processDatum(nodeDatum);
}
} else {
if ( datumStreamSettings.isPublishToSolarIn() ) {
datumDao.store(datum);
}
if ( fluxPublisher != null && datumStreamSettings.isPublishToSolarFlux()
&& datum.getKind() == ObjectDatumKind.Node ) {
GeneralObjectDatum<?> gd = DatumUtils.convertGeneralDatum(datum);
if ( gd instanceof GeneralNodeDatum nodeDatum ) {
fluxPublisher.processDatum(nodeDatum);
}
}
}
if ( lastDatumDate == null || lastDatumDate.isBefore(datum.getTimestamp()) ) {
lastDatumDate = datum.getTimestamp();
Expand Down Expand Up @@ -482,4 +498,23 @@ public final void setDefaultDatumStreamSettings(
: DEFAULT_DATUM_STREAM_SETTINGS);
}

/**
* Get the SolarFlux publisher.
*
* @return the publisher, or {@literal null}
*/
public DatumProcessor getFluxPublisher() {
return fluxPublisher;
}

/**
* Set the SolarFlux publisher.
*
* @param fluxPublisher
* the publisher to set
*/
public void setFluxPublisher(DatumProcessor fluxPublisher) {
this.fluxPublisher = fluxPublisher;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@
import static net.solarnetwork.domain.datum.DatumId.nodeId;
import static org.assertj.core.api.BDDAssertions.and;
import static org.assertj.core.api.BDDAssertions.from;
import static org.assertj.core.api.InstanceOfAssertFactories.type;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneOffset;
Expand Down Expand Up @@ -68,14 +70,19 @@
import net.solarnetwork.central.c2c.dao.CloudDatumStreamPollTaskDao;
import net.solarnetwork.central.c2c.dao.CloudDatumStreamSettingsEntityDao;
import net.solarnetwork.central.c2c.domain.BasicCloudDatumStreamQueryResult;
import net.solarnetwork.central.c2c.domain.BasicCloudDatumStreamSettings;
import net.solarnetwork.central.c2c.domain.CloudDatumStreamConfiguration;
import net.solarnetwork.central.c2c.domain.CloudDatumStreamPollTaskEntity;
import net.solarnetwork.central.c2c.domain.CloudDatumStreamQueryFilter;
import net.solarnetwork.central.c2c.domain.CloudIntegrationsUserEvents;
import net.solarnetwork.central.dao.SolarNodeOwnershipDao;
import net.solarnetwork.central.datum.biz.DatumProcessor;
import net.solarnetwork.central.datum.domain.GeneralNodeDatum;
import net.solarnetwork.central.datum.domain.GeneralNodeDatumPK;
import net.solarnetwork.central.datum.v2.dao.DatumWriteOnlyDao;
import net.solarnetwork.central.datum.v2.domain.DatumPK;
import net.solarnetwork.central.domain.BasicSolarNodeOwnership;
import net.solarnetwork.domain.Identity;
import net.solarnetwork.domain.datum.Datum;
import net.solarnetwork.domain.datum.DatumSamples;
import net.solarnetwork.domain.datum.GeneralDatum;
Expand Down Expand Up @@ -120,6 +127,9 @@ public class DaoCloudDatumStreamPollServiceTests {
@Mock
private ExecutorService executor;

@Mock
private DatumProcessor fluxProcessor;

@Captor
private ArgumentCaptor<CloudDatumStreamQueryFilter> queryFilterCaptor;

Expand All @@ -129,6 +139,9 @@ public class DaoCloudDatumStreamPollServiceTests {
@Captor
private ArgumentCaptor<Datum> datumCaptor;

@Captor
private ArgumentCaptor<Identity<GeneralNodeDatumPK>> generalNodeDatumCaptor;

private DaoCloudDatumStreamPollService service;

@BeforeEach
Expand All @@ -141,6 +154,8 @@ public void setup() {
service = new DaoCloudDatumStreamPollService(clock, userEventAppenderBiz, nodeOwnershipDao,
taskDao, datumStreamDao, datumStreamSettingsDao, datumDao, executor,
datumStreamServices::get);
service.setFluxPublisher(fluxProcessor);

}

@Test
Expand Down Expand Up @@ -408,4 +423,132 @@ public void executeTask_shutdown() throws Exception {
// @formatter:on
}

@Test
public void executeTask_fluxPublish() throws Exception {
// GIVEN
// submit task
var future = new CompletableFuture<CloudDatumStreamPollTaskEntity>();
given(executor.submit(argThat((Callable<CloudDatumStreamPollTaskEntity> call) -> {
try {
future.complete(call.call());
} catch ( Exception e ) {
future.completeExceptionally(e);
}
return true;
}))).willReturn(future);

final Instant hour = clock.instant().truncatedTo(ChronoUnit.HOURS);

final CloudDatumStreamConfiguration datumStream = new CloudDatumStreamConfiguration(TEST_USER_ID,
randomLong(), now());
datumStream.setDatumStreamMappingId(randomLong());
datumStream.setServiceIdentifier(TEST_DATUM_STREAM_SERVICE_IDENTIFIER);
datumStream.setSchedule("0 0/5 * * * *");
datumStream.setKind(ObjectDatumKind.Node);
datumStream.setObjectId(randomLong());
datumStream.setSourceId(randomString());

// look up datum stream associated with task
given(datumStreamDao.get(datumStream.getId())).willReturn(datumStream);

// resolve datum stream settings (SolarIn OFF, SolarFlux ON)
BasicCloudDatumStreamSettings datumStreamSettings = new BasicCloudDatumStreamSettings(false,
true);
given(datumStreamSettingsDao.resolveSettings(TEST_USER_ID, datumStream.getConfigId(),
DEFAULT_DATUM_STREAM_SETTINGS)).willReturn(datumStreamSettings);

// verify node ownership
final var nodeOwner = new BasicSolarNodeOwnership(datumStream.getObjectId(), TEST_USER_ID, "NZ",
UTC, true, false);
given(nodeOwnershipDao.ownershipForNodeId(datumStream.getObjectId())).willReturn(nodeOwner);

// update task state to "processing"
given(taskDao.updateTaskState(datumStream.getId(), Executing, Claimed)).willReturn(true);

// query for data associated with service configured on datum stream
// here we return a datum for "5 min ago"
final Datum datum1 = new GeneralDatum(
nodeId(datumStream.getObjectId(), datumStream.getSourceId(), hour.minusSeconds(300)),
new DatumSamples(Map.of("watts", 123), Map.of("wattHours", 23456L), null));
final Datum datum2 = new GeneralDatum(
nodeId(datumStream.getObjectId(), datumStream.getSourceId(), hour.minusSeconds(120)),
new DatumSamples(Map.of("watts", 234), Map.of("wattHours", 34567L), null));
given(datumStreamService.datum(same(datumStream), any()))
.willReturn(new BasicCloudDatumStreamQueryResult(List.of(datum1, datum2)));

// post datum to SolarFlux
given(fluxProcessor.processDatum(any())).willReturn(true);

// update task details
given(taskDao.updateTask(any(), eq(Executing))).willReturn(true);

// WHEN
var task = new CloudDatumStreamPollTaskEntity(datumStream.getId());
task.setState(Claimed);
task.setExecuteAt(hour);
task.setStartAt(hour.minusSeconds(300));

Future<CloudDatumStreamPollTaskEntity> result = service.executeTask(task);
CloudDatumStreamPollTaskEntity resultTask = result.get(1, TimeUnit.MINUTES);

// THEN
// @formatter:off
then(datumStreamService).should().datum(same(datumStream), queryFilterCaptor.capture());
and.then(queryFilterCaptor.getValue())
.as("The query start date is the startAt of the task")
.returns(task.getStartAt(), from(CloudDatumStreamQueryFilter::getStartDate))
.as("The query end date is the current date")
.returns(clock.instant(), from(CloudDatumStreamQueryFilter::getEndDate))
;

then(taskDao).should().updateTask(taskCaptor.capture(), eq(Executing));
and.then(taskCaptor.getValue())
.as("Task to update is copy of given task")
.isNotSameAs(task)
.as("Task to update has same ID as given task")
.isEqualTo(task)
.as("Update task state to Queued to run again")
.returns(Queued, from(CloudDatumStreamPollTaskEntity::getState))
.as("Update task execute date to next time based on configuration schedule (every 5min)")
.returns(task.getExecuteAt().plusSeconds(300), from(CloudDatumStreamPollTaskEntity::getExecuteAt))
.as("Update task start date to highest date of datum captured")
.returns(datum2.getTimestamp(), from(CloudDatumStreamPollTaskEntity::getStartAt))
.as("No message generated for successful execution")
.returns(null, from(CloudDatumStreamPollTaskEntity::getMessage))
.as("No service properties generated for successful execution")
.returns(null, from(CloudDatumStreamPollTaskEntity::getServiceProperties))
;

and.then(resultTask)
.as("Result task is same as passed to DAO for update")
.isSameAs(taskCaptor.getValue())
;

then(fluxProcessor).should(times(2)).processDatum(generalNodeDatumCaptor.capture());
and.then(generalNodeDatumCaptor.getAllValues())
.as("Both datum posted to SolarFlux")
.hasSize(2)
.satisfies(list -> {
and.then(list)
.element(0, type(GeneralNodeDatum.class))
.as("GeneralNodeDatum ID derived from datum")
.returns(new GeneralNodeDatumPK(datum1.getObjectId(), datum1.getTimestamp(), datum1.getSourceId()),
from(GeneralNodeDatum::getId))
.as("GeneralNodeDatum properties derviced from datum")
.returns(datum1.getSampleData(), from(GeneralNodeDatum::getSampleData))
;
and.then(list)
.element(1, type(GeneralNodeDatum.class))
.as("GeneralNodeDatum ID derived from datum")
.returns(new GeneralNodeDatumPK(datum2.getObjectId(), datum2.getTimestamp(), datum2.getSourceId()),
from(GeneralNodeDatum::getId))
.as("GeneralNodeDatum properties derviced from datum")
.returns(datum2.getSampleData(), from(GeneralNodeDatum::getSampleData))
;
})
;

// @formatter:on
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import net.solarnetwork.central.c2c.dao.CloudDatumStreamPollTaskDao;
import net.solarnetwork.central.c2c.dao.CloudDatumStreamSettingsEntityDao;
import net.solarnetwork.central.dao.SolarNodeOwnershipDao;
import net.solarnetwork.central.datum.biz.DatumProcessor;
import net.solarnetwork.central.datum.v2.dao.DatumWriteOnlyDao;

/**
Expand Down Expand Up @@ -73,6 +74,10 @@ public class CloudIntegrationsDatumStreamPollConfig implements SolarNetCloudInte
@Autowired
private DatumWriteOnlyDao datumWriteOnlyDao;

@Autowired(required = false)
@Qualifier("solarflux")
private DatumProcessor fluxPublisher;

@ConfigurationProperties(prefix = "app.c2c.ds-poll.executor")
@Qualifier(CLOUD_INTEGRATIONS_POLL)
@Bean
Expand All @@ -92,6 +97,7 @@ public CloudDatumStreamPollService cloudDatumStreamPollService(
var service = new DaoCloudDatumStreamPollService(Clock.systemUTC(), userEventAppenderBiz,
nodeOwnershipDao, taskDao, datumStreamDao, datumStreamSettingsDao, datumWriteOnlyDao,
taskExecutor.getThreadPoolExecutor(), dsMap::get);
service.setFluxPublisher(fluxPublisher);
return service;
}

Expand Down

0 comments on commit 6a7c3d8

Please sign in to comment.