Improving allOf and anyOf in CompletableFuture

Hello again. In anticipation of the start of the course "Java Developer" prepared a translation of useful material for you.


IN CompletableFuture There are two methods whose design surprises me:

  • CompletableFuture # allOf
  • CompletableFuture # anyOf

In this article we will see what is wrong with them and how they can be made more convenient.

CompletableFuture # allOf

Let's look at the method signature:

public static CompletableFuture allOf (CompletableFuture... cfs) {
    // ...
}

There are at least two contentious issues here:

  1. The method accepts several objects. CompletableFuturereturning objects of different types.
  2. Method returns CompletableFuturewhich returns Void

Also, some may not like the variable number of parameters, so let's look at this part.

CompletableFuture often used as a signal to complete an operation, however, by making a small change to the API, this method can be used both as a signal device and as a carrier of the results of all completed operations. Let's try to do it.

Asynchronous CompletableFuture # allOf

First, let's come up with the right signature.

It is fair to assume that in most cases processing a list of homogeneous CompletableFuture and return CompletableFuturecontaining a list of results:

public static  CompletableFuture <List> allOf (
  Collection <CompletableFuture> futures) {
    // ...
}

The innards of the original method are most likely more complex than you expect:

static CompletableFuture andTree (
  CompletableFuture[]  cfs, int lo, int hi) {
    CompletableFuture d = new CompletableFuture();
    if (lo> hi) // empty
        d.result = NIL;
    else {
        CompletableFuture a, b;
        int mid = (lo + hi) >>> 1;
        if ((a = (lo == mid? cfs[lo] :
                  andTree (cfs, lo, mid))) == null ||
            (b = (lo == hi? a: (hi == mid + 1)? cfs[hi] :
                  andTree (cfs, mid + 1, hi))) == null)
            throw new NullPointerException ();
        if (! d.biRelay (a, b)) {
            Birelay c = new BiRelay <> (d, a, b);
            a.bipush (b, c);
            c.tryFire (SYNC);
        }
    }
    return d;
}

Therefore, instead of creating it from scratch, we’ll try to reuse what is already in the original method as if it was intended to be used as a completion signaling device … and then just change the void result to the future list:

CompletableFuture <List <CompletableFuture>> i = futures.stream ()
    .collect (collectingAndThen (
      toList (),
      l -> CompletableFuture.allOf (l.toArray (new CompletableFuture[0])))
        .thenApply (__ -> l)));

So far so good. We managed to get
CompletableFuture <List <CompletableFuture>> instead CompletableFuturethat is already good. But we do not need a list of future with results, we need a list of results.

Now we can simply process the list and remove unwanted future from it. It is perfectly normal to call CompletableFuture # join methods, because we know that they will never be blocked (at this point, all future are already completed):

CompletableFuture <List> result = intermediate
    .thenApply (list -> list.stream ()
        .map (CompletableFuture :: join)
        .collect (toList ()));

Now let's combine all this into a final solution:

public static  CompletableFuture <List> allOf (
  Collection <CompletableFuture> futures) {
    return futures.stream ()
        .collect (collectingAndThen (
          toList (),
          l -> CompletableFuture.allOf (l.toArray (new CompletableFuture[0])))
        .thenApply (__ -> l.stream ()
           .map (CompletableFuture :: join)
           .collect (Collectors.toList ()))));
}

Asynchronous and Falling CompletableFuture # allOf

If there are exceptions, the original CompletableFuture # allOf waits for all remaining operations to complete.

And if we want to report the completion of an operation when an exception occurs in it, then we will have to change the implementation.

To do this, create a new instance. CompletableFuture and finish it manually after one of the operations raises an exception:

CompletableFuture <List> result = new CompletableFuture <> ();
futures.forEach (f -> f
  .handle ((__, ex) -> ex == null || result.completeExceptionally (ex)));

… but then we need to deal with the scenario when all future will be completed successfully. This can be easily done using the improved allOf () method, and then simply terminate the future manually:

allOf (futures) .thenAccept (result :: complete);

Now we can combine everything together to form the final solution:

public static  CompletableFuture <List>
  allOfShortcircuiting (Collection <CompletableFuture> futures) {
    CompletableFuture <List> result = new CompletableFuture <> ();

    for (CompletableFuture f: futures) {
        f.handle ((__, ex) -> ex == null || result.completeExceptionally (ex));
    }

    allOf (futures) .thenAccept (result :: complete);

    return result;
}

CompletableFuture # anyOf

Let's start with the method signature:

public static CompletableFuture

We can immediately detect the same problems as with the method discussed above:

  1. The method accepts several objects. CompletableFuturecontaining objects of different types.
  2. Method returns CompletableFuturecontaining an object of type Object.

As far as I understand, the method CompletableFuture # allOf was designed to be used as a signaling device. But CompletableFuture # anyOf not consistent with this philosophy, returning CompletableFuturewhich is even more confusing.

Have a look at the following example where I am trying to process CompletableFuture containing data of different types:

CompletableFuture f1 = CompletableFuture.completedFuture (1);
CompletableFuture f2 = CompletableFuture.completedFuture ("2");

Integer result = CompletableFuture.anyOf (f1, f2)
  .thenApply (r -> {
      if (r instanceof Integer) {
          return (Integer) r;
      } else if (r instanceof String) {
          return Integer.valueOf ((String) r);
      }
      throw new IllegalStateException ("unexpected object type!");
  }). join ();

Pretty uncomfortable, isn't it?

Fortunately, this is pretty easy to adapt for a more common scenario (waiting for one of many future containing values ​​of the same type) by changing the signature and introducing direct type casting.

Thus, with our improvements, we can reuse existing methods and safely bring the result:

public static  CompletableFuture anyOf (List <CompletableFuture> cfs) {
    return CompletableFuture.anyOf (cfs.toArray (new CompletableFuture[0])))
      .thenApply (o -> (T) o);
}
 
public static  CompletableFuture anyOf (CompletableFuture... cfs) {
    return CompletableFuture.anyOf (cfs) .thenApply (o -> (T) o);
}

Source

Source code can be found on Github.

That's all. See you on the course.

Similar Posts

Leave a Reply