Mutiny and the Reactiverse
I had the question multiple times: how do I use Eclipse Vert.x in Quarkus? Indeed, you can use Vert.x in Quarkus. You can deploy verticles, communicate with the event bus, or use anything from the Vert.x ecosystem. But, you can also use the Mutiny variant of Vert.x in Quarkus, and get a seamless experience with the other reactive APIs offered by Quarkus. Several posts have already mentioned this, but it deserves a specific blog post. So, here we are.
Eclipse Vert.x
Vert.x is a toolkit to build reactive applications. The Vert.x ecosystem is enormous. From HTTP and data access abilities to messaging clients via microservice and security facilities, the Vert.x ecosystem is remarkably diverse and versatile. To understand that variety, just check the Vert.x documentation. That makes Vert.x popular in many areas such as web applications, IoT gateways, banking applications and so on.
As you may know, Quarkus is based on Vert.x. Under the hood, there is a managed Vert.x instance that powers the rest of Quarkus.
When Quarkus serves a HTTP endpoint, under the hood, there is a Vert.x HTTP server handling the request and response. That’s also true for messaging, gRPC and almost any I/O.
The Vert.x "bare" API and friends
Vert.x provides multiple APIs. Let’s focus on the "bare" one for now.
Following the reactive nature of Vert.x, the API contains mostly asynchronous methods. These methods are following a syntax convention:
public void doSomething(param1, param2, Handler<AsyncResult<T>> handler) {
// ...
}
The interesting part is the last parameter.
It’s a function, a callback to be more precise, that gets called when the operation completes or fails.
Indeed, the asynchronous nature of Vert.x does not allow using try/catch
blocks.
So you need to pass a continuation callback, invoked with the outcome.
AsyncResult
is a structure capturing this outcome.
It contains the result (of type <T>
) produced by the operation, or the failure if it failed.
Let’s take an example:
vertx.fileSystem()
.readFile("my-file.txt", ar -> {
if (ar.failed()) {
System.out.println("D'oh! Cannot read the file: " + ar.cause());
} else {
System.out.println("File content is: " + ar.result());
}
});
This code reads a file, and as it’s an asynchronous operation, invokes the callback when the file is read.
The readFile
method reads the complete content of the file and accumulates it in a buffer.
The callback receives the asynchronous result containing either the file’s content (ar.result()
) or a failure.
Vert.x invokes this callback when the operation has either completed or failed.
Vert.x also supports streams thanks to the ReadStream
and WriteStream
classes.
A ReadStream
represents a stream of data you can read.
So you can attach a callback invoked on every item traversing the stream.
A WriteStream
is a data source.
You can push items to a WriteStream.
These items will be consumed by a ReadStream
:
vertx.fileSystem()
.open("my-file.txt", new OpenOptions().setRead(true), ar -> {
if (ar.failed()) {
System.out.println(
"D'oh! Cannot read the file: " + ar.cause()
);
} else {
AsyncFile file = ar.result();
// AsyncFile is a read stream, we can read from it:
file
.exceptionHandler(t ->
System.out.println("Failure while reading the file: " + t)
)
// Reads the file chunk by chunk
.handler(buffer ->
System.out.println("Received buffer: " + buffer)
);
}
});
Vert.x streams do not implement Reactive Streams. Vert.x provides a different back-pressure protocol. |
Why are these API shaping rules important? Vert.x does not provide a single API. The "bare" API presented above is just one of the proposed API. It also provides API in Kotlin, API for RX Java, and so on.
These APIs are generated. Vert.x provides a code generator that _ translates_ the Vert.x "bare" API into the other APIs. Because all methods are well-formed, the generator understands how they should be adapted.
The generated code exposes a different API; each method delegating to the "bare" API. Asynchronous methods and streams can follow different transformations, so the resulting API uses the right idioms.
The Vert.x Mutiny API
Mutiny is an event-driven reactive programming library. It’s not related to Vert.x. However, we have written a code generator that generates the Mutiny variant for the Vert.x API:
The transformations are straightforward:
-
io.vertx
package ⇒io.vertx.mutiny
package -
Asynchronous methods ⇒ method returning a
Uni<T>
-
ReadStreams<T>
⇒ can be consumed asMulti<T>
-
WriteStreams<T>
⇒ can be consumed as Reactive StreamsSubscriber<T>
It also adapts the Vert.x back pressure protocol to Reactive Streams, as Mutiny implements Reactive Streams.
For example, the first example from above becomes:
Uni<Buffer> uni = vertx.fileSystem().readFile("my-file.txt");
uni.subscribe()
.with(it -> System.out.println("File content is: " + it));
One difference between the two APIs is related to laziness. The Vert.x "bare" API triggers the operation as soon as the method is called. The Mutiny variant expects a subscription to trigger the operation. |
The stream example from above becomes:
Uni<AsyncFile> uni = vertx.fileSystem()
.open("my-file.txt", new OpenOptions().setRead(true));
uni
// Gets a Multi to read the file:
.onItem().transformToMulti(asyncFile -> asyncFile.toMulti())
// Gets the buffers one by one:
.subscribe().with(
buffer -> System.out.println("Received buffer: " + buffer)
);
A bit more than this
The Mutiny variant does not only apply the rules exposed in the previous section. For asynchronous methods, it also provides:
-
xAndAwait()
methods - blocks the caller thread until the outcome is received. In the case of a failure, throws aRuntimeException
-
xAndForget()
methods - triggers the operation, discard the outcome
// Read the content of the file in a blocking manner:
Buffer content = vertx.fileSystem().readFileAndAwait("my-file.txt");
// Open and close the file
// Closing the file is an asynchronous operation (returning a Uni).
// We trigger the operation and discard the outcome
vertx.fileSystem().open("my-file.txt", new OpenOptions().setRead(true))
.subscribe().with(file -> file.closeAndForget());
Where can we find this API?
At the moment of writing, we only provide Vert.x core and Vert.x clients (MongoDB, Redis, Web client, Mqtt, and so on). We are extending the support to cover the full Vert.x stack.
To use the Mutiny clients, you need to add the right dependency to your project. Browse the list of dependency to pick the one you need.
For example, to you use the Mutiny variant of the Vert.x Web client, add the following dependency:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-web-client</artifactId>
<version>...</version>
</dependency>
Once you have the dependency, just create the web client instance:
@Inject Vertx vertx; // Inject the managed io.vertx.mutiny.core.Vertx instance
private WebClient client;
@PostConstruct
public void init() {
client = WebClient.create(vertx, new WebClientOptions()
.setDefaultHost("localhost")
.setDefaultPort(8082)
);
}
private Uni<String> call(String path) {
return client
.get(path).send()
.onItem().transform(HttpResponse::bodyAsString);
}
Something missing? Open an issue on SmallRye Reactive Utils. |
The Javadoc is available here. |