Skip to content

Commit

Permalink
Merge pull request #439 from kestra-io/fix/fix-npe-with-result-flowab…
Browse files Browse the repository at this point in the history
…le-block

fix(): changed block() to blockOptional()
  • Loading branch information
mgabelle authored Sep 5, 2024
2 parents 80e7fcd + c02565d commit def5c47
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions src/main/java/io/kestra/plugin/gcp/pubsub/Publish.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public Publish.Output run(RunContext runContext) throws Exception {
flowable = FileSerde.readAll(inputStream, Message.class);
resultFlowable = this.buildFlowable(flowable, publisher, runContext);

count = resultFlowable.reduce(Integer::sum).block();
count = resultFlowable.reduce(Integer::sum).blockOptional().orElse(0);
}

} else if (this.from instanceof List) {
Expand All @@ -101,7 +101,7 @@ public Publish.Output run(RunContext runContext) throws Exception {

resultFlowable = this.buildFlowable(flowable, publisher, runContext);

count = resultFlowable.reduce(Integer::sum).block();
count = resultFlowable.reduce(Integer::sum).blockOptional().orElse(0);
} else {
var msg = JacksonMapper.toMap(this.from, Message.class);
publisher.publish(msg.to(runContext, this.serdeType));
Expand Down

0 comments on commit def5c47

Please sign in to comment.