Skip to content

Commit

Permalink
fix(): changed block() to blockOptional()
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle committed Sep 5, 2024
1 parent 80e7fcd commit c02565d
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions src/main/java/io/kestra/plugin/gcp/pubsub/Publish.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public Publish.Output run(RunContext runContext) throws Exception {
flowable = FileSerde.readAll(inputStream, Message.class);
resultFlowable = this.buildFlowable(flowable, publisher, runContext);

count = resultFlowable.reduce(Integer::sum).block();
count = resultFlowable.reduce(Integer::sum).blockOptional().orElse(0);
}

} else if (this.from instanceof List) {
Expand All @@ -101,7 +101,7 @@ public Publish.Output run(RunContext runContext) throws Exception {

resultFlowable = this.buildFlowable(flowable, publisher, runContext);

count = resultFlowable.reduce(Integer::sum).block();
count = resultFlowable.reduce(Integer::sum).blockOptional().orElse(0);
} else {
var msg = JacksonMapper.toMap(this.from, Message.class);
publisher.publish(msg.to(runContext, this.serdeType));
Expand Down

0 comments on commit c02565d

Please sign in to comment.