Improving allOf and anyOf in CompletableFuture
IN CompletableFuture
There are two methods whose design surprises me:
- CompletableFuture # allOf
- CompletableFuture # anyOf
In this article we will see what is wrong with them and how they can be made more convenient.
CompletableFuture # allOf
Let's look at the method signature:
public static CompletableFuture allOf (CompletableFuture>... cfs) {
// ...
}
There are at least two contentious issues here:
- The method accepts several objects.
CompletableFuture
returning objects of different types. - Method returns
CompletableFuture
which returnsVoid
Also, some may not like the variable number of parameters, so let's look at this part.
often used as a signal to complete an operation, however, by making a small change to the API, this method can be used both as a signal device and as a carrier of the results of all completed operations. Let's try to do it.CompletableFuture
Asynchronous CompletableFuture # allOf
First, let's come up with the right signature.
It is fair to assume that in most cases processing a list of homogeneous CompletableFuture
and return CompletableFuture
containing a list of results:
public static CompletableFuture <List> allOf (
Collection <CompletableFuture> futures) {
// ...
}
The innards of the original method are most likely more complex than you expect:
static CompletableFuture andTree (
CompletableFuture>[] cfs, int lo, int hi) {
CompletableFuture d = new CompletableFuture();
if (lo> hi) // empty
d.result = NIL;
else {
CompletableFuture> a, b;
int mid = (lo + hi) >>> 1;
if ((a = (lo == mid? cfs[lo] :
andTree (cfs, lo, mid))) == null ||
(b = (lo == hi? a: (hi == mid + 1)? cfs[hi] :
andTree (cfs, mid + 1, hi))) == null)
throw new NullPointerException ();
if (! d.biRelay (a, b)) {
Birelay,?> c = new BiRelay <> (d, a, b);
a.bipush (b, c);
c.tryFire (SYNC);
}
}
return d;
}
Therefore, instead of creating it from scratch, we’ll try to reuse what is already in the original method as if it was intended to be used as a completion signaling device … and then just change the void result to the future list:
CompletableFuture <List <CompletableFuture>> i = futures.stream ()
.collect (collectingAndThen (
toList (),
l -> CompletableFuture.allOf (l.toArray (new CompletableFuture[0])))
.thenApply (__ -> l)));
So far so good. We managed to get
instead CompletableFuture <List <CompletableFuture
>
>
that is already good. But we do not need a list of future with results, we need a list of results.CompletableFuture
Now we can simply process the list and remove unwanted future from it. It is perfectly normal to call CompletableFuture # join methods, because we know that they will never be blocked (at this point, all future are already completed):
CompletableFuture <List> result = intermediate
.thenApply (list -> list.stream ()
.map (CompletableFuture :: join)
.collect (toList ()));
Now let's combine all this into a final solution:
public static CompletableFuture <List> allOf (
Collection <CompletableFuture> futures) {
return futures.stream ()
.collect (collectingAndThen (
toList (),
l -> CompletableFuture.allOf (l.toArray (new CompletableFuture[0])))
.thenApply (__ -> l.stream ()
.map (CompletableFuture :: join)
.collect (Collectors.toList ()))));
}
Asynchronous and Falling CompletableFuture # allOf
If there are exceptions, the original CompletableFuture # allOf waits for all remaining operations to complete.
And if we want to report the completion of an operation when an exception occurs in it, then we will have to change the implementation.
To do this, create a new instance. CompletableFuture
and finish it manually after one of the operations raises an exception:
CompletableFuture <List> result = new CompletableFuture <> ();
futures.forEach (f -> f
.handle ((__, ex) -> ex == null || result.completeExceptionally (ex)));
… but then we need to deal with the scenario when all future will be completed successfully. This can be easily done using the improved allOf () method, and then simply terminate the future manually:
allOf (futures) .thenAccept (result :: complete);
Now we can combine everything together to form the final solution:
public static CompletableFuture <List>
allOfShortcircuiting (Collection <CompletableFuture> futures) {
CompletableFuture <List> result = new CompletableFuture <> ();
for (CompletableFuture> f: futures) {
f.handle ((__, ex) -> ex == null || result.completeExceptionally (ex));
}
allOf (futures) .thenAccept (result :: complete);
return result;
}
CompletableFuture # anyOf
Let's start with the method signature:
public static CompletableFuture
We can immediately detect the same problems as with the method discussed above:
- The method accepts several objects.
CompletableFuture
containing objects of different types. - Method returns
CompletableFuture
containing an object of typeObject
.
As far as I understand, the method CompletableFuture # allOf
was designed to be used as a signaling device. But CompletableFuture # anyOf
not consistent with this philosophy, returning
which is even more confusing.CompletableFuture
Have a look at the following example where I am trying to process CompletableFuture containing data of different types:
CompletableFuture f1 = CompletableFuture.completedFuture (1);
CompletableFuture f2 = CompletableFuture.completedFuture ("2");
Integer result = CompletableFuture.anyOf (f1, f2)
.thenApply (r -> {
if (r instanceof Integer) {
return (Integer) r;
} else if (r instanceof String) {
return Integer.valueOf ((String) r);
}
throw new IllegalStateException ("unexpected object type!");
}). join ();
Pretty uncomfortable, isn't it?
Fortunately, this is pretty easy to adapt for a more common scenario (waiting for one of many future containing values of the same type) by changing the signature and introducing direct type casting.
Thus, with our improvements, we can reuse existing methods and safely bring the result:
public static CompletableFuture anyOf (List <CompletableFuture> cfs) {
return CompletableFuture.anyOf (cfs.toArray (new CompletableFuture[0])))
.thenApply (o -> (T) o);
}
public static CompletableFuture anyOf (CompletableFuture... cfs) {
return CompletableFuture.anyOf (cfs) .thenApply (o -> (T) o);
}
Source
Source code can be found on Github.
That's all. See you on the course.