Skip to content

Commit

Permalink
Collector/connector reactive pipeline minor refactoring (#271)
Browse files Browse the repository at this point in the history
  • Loading branch information
climategadgets committed Sep 23, 2023
1 parent 6d5b731 commit 0f95ce5
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,19 @@ public UnitDirector(

Optional.ofNullable(metricsCollectorSet)
.ifPresent(collectors -> Flux.fromIterable(collectors)
.publishOn(Schedulers.boundedElastic())
.doOnNext(c -> c.connect(feed))
.subscribeOn(Schedulers.boundedElastic())
.doOnComplete(() -> logger.info("{}: connected metric collectors", getAddress()))
.subscribe());

Optional.ofNullable(connectorSet)
.ifPresent(connectors -> Flux.fromIterable(connectors)
.publishOn(Schedulers.boundedElastic())
.doOnNext(c -> {
c.connect(feed);
// VT: FIXME: Connect the control input when the API signature is established
})
.subscribeOn(Schedulers.boundedElastic())
.doOnComplete(() -> logger.info("{}: connected connectors", getAddress()))
.subscribe());

logger.info("Configured: {} ({} zones: {})",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ public void connect(UnitDirector.Feed feed) {
);

zoneStatusSubscription = zoneStatusFeed
.publishOn(Schedulers.boundedElastic())
.buffer(pollInterval)
.doOnNext(this::exchange)
.subscribe();
Expand Down

0 comments on commit 0f95ce5

Please sign in to comment.