The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

How to handle failures with Mutiny

In the past week, I got several questions about failure handling with Mutiny. So, maybe it deserves a bit more explanation.

Failures are events

First, Mutiny is an event-driven reactive programming library. With Mutiny, you handle events. An upstream Uni or Multi propagates these events and gives you the possibility to process them. These events can be item, completion, cancellation, and…​ failure:

Multi.createFrom().range(0, 10)
    .onItem().invoke(i -> System.out.println("Received item " + i))
    .onCompletion().invoke(() -> System.out.println("We are done!"))
    .onCancellation().invoke(() -> System.out.println(
        "The downstream does not want our items anymore!")
    )

    .onFailure().invoke(t -> System.out.println(
        "Oh no! We received a failure: " + t.getMessage())
    )

What can you do when you receive a failure?

In addition to calling an action, as shown in the previous snippet, there are multiple things you can do when you receive a failure.

The most common thing to do is to recover. You can recover by passing a specific item or with another Uni:

upstream
    .onFailure().recoverWithItem(failure -> "hello (fallback)")
    .subscribe().with(i -> System.out.println("Received: " + i));

upstream
    .onFailure().recoverWithUni(failure -> getAnotherUni(failure))
    .subscribe().with(i -> System.out.println("Received: " + i));

On Multi, you can also recover by providing another Multi or completing the stream:

upstream
  .onFailure().recoverWithCompletion();

If you have faith in your system, you can also retry. Note that you need to make sure you can retry the operation safely first!

upstream
      .onFailure().retry()
        .withBackOff(Duration.ofSeconds(1), Duration.ofSeconds(10)).atMost(10)
      .subscribe().with(i -> System.out.println("Received: " + i));

You can also transform the failure. For example, you can map a low-level failure into something more business-friendly. It propagates that second failure downstream, hiding the low-level failure:

Uni.createFrom().failure(new IOException("boom"))
      .onFailure().transform(t -> new BusinessException(t))

Failures are terminal

Failures are terminal events. If your upstream propagates a failure, it means it can’t operate normally. For Uni, that’s not a problem, as you can only have an item or a failure. But for multi, it’s a bit more complicated.

Even if you recover, by handling the failure, you won’t get the rest of the stream. Your upstream is …​ kaputt.

Let’s take the following code:

List<String> list = Multi.createFrom().range(0, 10)
      .onItem().invoke(v -> {
              if (v == 7) {
                throw new IllegalArgumentException("We don't like seven!");
              }
      })
      .onFailure().recoverWithItem(7)
      .map(integer -> integer.toString())
      .onItem().invoke(s -> System.out.println(s))
      .collectItems().asList()
      .await().indefinitely();

It produces [1, 2, 3, 4, 5, 6, 7] and not the rest of the stream. When the onItem().invoke() stage is called with 7, it produces a failure. That stops the stream. It does not process more items from upstream.

So what can we do? Isolate!

When a stage sends a failure, it sends a failure terminating the stream and cancels its subscription to the upstream (informing that it does not need more items as it’s not operating correctly). So, if we need to continue processing the other items from upstream; we just need to isolate that failure and be sure we do not cancel our subscription to the upstream.

The most common approach to achieve this is the following:

List<String> list = Multi.createFrom().range(0, 10)
    .onItem().transformToUniAndConcatenate(i ->
            // Isolate the failure in this block
            Uni.createFrom().item(i)
                    .onItem().invoke(v -> {
                        if (v == 7) {
                            throw new IllegalArgumentException("We don't like seven!");
                        }
                    })
                    .onFailure().recoverWithItem(7)
    )
    .map(integer -> integer.toString())
    .onItem().invoke(s -> System.out.println(s))
    .collectItems().asList()
    .await().indefinitely();

Basically, we isolate the potentially failing operation. If it fails, we recover. But the cancellation only cancels that item, not the full stream, meaning we will receive the next one item and so on. This code produces the expected list.

Summary

Here you go, you can now handle failure and continue the streams gracefully.

If you want to know more about Mutiny, check the following video: