Skip to content

Latest commit

 

History

History
154 lines (109 loc) · 7.34 KB

README.adoc

File metadata and controls

154 lines (109 loc) · 7.34 KB

Reactive programming with RxJava

Tip
The corresponding source code is in the step-8 folder of the guide repository.

So far we have explored various areas of the Vert.x stack, using the callback-based APIs. It just works and this programming model is well-known from developers in many languages. However, it can become a bit tedious, especially when you combine several sources of events, or deal with complex data flows.

This is exactly where RxJava shines, and Vert.x integrates with it seamlessly.

Note
In this guide, RxJava 2.x is used, but Vert.x also works with RxJava 1.x. RxJava 2.x has been completely rewritten from scratch on top of the Reactive-Streams specification. Learn more in the What’s different in 2.0 wiki page.

Enabling the RxJava APIs

In addition to the callback-based API, the Vert.x modules offer an "Rxified" API. To enable it, start by adding the vertx-rx-java2 module to the Maven POM file:

link:pom.xml[role=include]

Verticles then have to be modified so that they extend io.vertx.reactivex.core.AbstractVerticle instead of io.vertx.core.AbstractVerticle. How is this different? The former class extends the latter and exposes a io.vertx.reactivex.core.Vertx field.

io.vertx.reactivex.core.Vertx defines extra rxSomething(…​) methods that are equivalent to their callback-based counterparts.

Let’s take a look at the MainVerticle to get a better idea of how it works in practice:

link:src/main/java/io/vertx/guides/wiki/MainVerticle.java[role=include]

The rxDeploy method does not take a Handler<AsyncResult<String>> as final parameter. Instead, it returns a Single<String>.

Besides, the operation does not start when the method is called. It starts when you subscribe to the Single. When the operation completes, it emits the deployment id or signals the cause of the problem with a Throwable.

Deploying verticles in order

To finalize the MainVerticle refactoring, we must make sure the deployment operations get triggered and happen in order:

link:src/main/java/io/vertx/guides/wiki/MainVerticle.java[role=include]
  1. The flatMap operator applies the function to the result of dbVerticleDeployment. Here it schedules the deployment of the HttpServerVerticle.

  2. We use the shorter lambda form here.

  3. Operations start when subscribing. On success or on error, the MainVerticle start future is either completed or failed.

Partially "Rxifying" HttpServerVerticle

If you follow the guide in sequence, editing the code as you go, your HttpServerVerticle class is still using the callback-based API. Before you can use the RxJava API to execute asynchronous operations naturally, i.e. concurrently, you need to refactor HttpServerVerticle.

Import RxJava versions of Vert.x classes

link:src/main/java/io/vertx/guides/wiki/http/HttpServerVerticle.java[role=include]

Use delegate on a "Rxified" vertx instance

To call a method expecting a io.vertx.core.Vertx instance when you have a io.vertx.reactivex.core.Vertx one, call the getDelegate() method. Verticle’s start() method needs to be adjusted when creating an instance of WikiDatabaseService:

link:src/main/java/io/vertx/guides/wiki/http/HttpServerVerticle.java[role=include]

Executing authorization queries concurrently

In the previous example, we saw how to use RxJava operators and the Rxified Vert.x API to execute asynchronous operations in order. But sometimes this guarantee is not required, or you simply want them to run concurrently for performance reasons.

The JWT token generation process in the HttpServerVerticle is a good example of such a situation. To create a token, we need all authorization queries to complete, but queries are independent from each other:

link:src/main/java/io/vertx/guides/wiki/http/HttpServerVerticle.java[role=include]
  1. Three Single objects are created, representing the different authorization queries.

  2. When the three operations complete successfully, the zip operator callback is invoked with the results.

Querying the database

Direct queries

Very often, a single database query is needed to prepare a response to the user. For such simple cases, the JDBCClient provides rxQueryXXX and rxUpdateXXX methods:

link:src/main/java/io/vertx/guides/wiki/database/WikiDatabaseServiceImpl.java[role=include]

Working with a database connection

When direct queries do not fit (e.g. when creating then querying a temporary table), you can generate a reactive flow from a single SQLConnection:

link:src/main/java/io/vertx/guides/wiki/database/WikiDatabaseServiceImpl.java[role=include]
  1. borrows a connection from the pool and provides it in a callback.

  2. returns a Single generated from execution of queries and transformation of results.

You may execute any number of queries and transform results with RxJava operators.

Note
io.vertx.reactivex.ext.sql.SQLClientHelper can also generate Flowable, Observable, Maybe and Completable flows.

Bridging the gap between callbacks and RxJava

At times, you may have to mix your RxJava code with a callback-based API. For example, service proxy interfaces can only be defined with callbacks, but the implementation uses the Vert.x Rxified API.

In this case, the io.vertx.reactivex.SingleHelper.toObserver class can adapt a Handler<AsyncResult<T>> to an RxJava SingleObserver<T>:

link:src/main/java/io/vertx/guides/wiki/database/WikiDatabaseServiceImpl.java[role=include]
  1. fetchAllPagesData is an asynchronous service proxy operation, defined with a Handler<AsyncResult<List<JsonObject>>> callback.

  2. The toObserver method adapts the resultHandler to a SingleObserver<List<JsonObject>>, so that the handler is invoked when the list of rows is emitted.

Note
io.vertx.reactivex.CompletableHelper and io.vertx.reactivex.MaybeHelper also provide adapters for Completable and Maybe.

Data flows

RxJava is not only great at combining different sources of events, it is also very helpful with data flows. Unlike a Vert.x or JDK future, a Flowable emits a stream of events, not just a single one. And it comes with an extensive set of data manipulation operators.

We can use a few of them to refactor the fetchAllPages database verticle method:

link:src/main/java/io/vertx/guides/wiki/database/WikiDatabaseServiceImpl.java[role=include]
  1. With flatMapPublisher we will create a Flowable from the item emitted by the Single<Result>.

  2. fromIterable converts the database results Iterable into a Flowable emitting the database row items.

  3. Since we only need the page name we can map each JsonObject row to the first column.

  4. The client expects the data to be sorted in alphabetical order.

  5. The event bus service reply consists in a single JsonArray. collect creates a new one with JsonArray::new and later adds items as they are emitted with JsonArray::add.