1
private static boolean validateSMTP(final ArrayList mxList, String address) throws ExecutionException, InterruptedException {
  ExecutorService pool = Executors.newFixedThreadPool(mxList.size());
  List<CompletableFuture<Boolean>> allVerifiers = new ArrayList<>();
    for (int mx = 0; mx < mxList.size(); mx++) {
        CompletableFuture<Boolean> verifier = createAsyncVerifier((String) mxList.get(mx), address, pool);
        verifier.thenApply(isvalid -> isvalid);
        verifier.get();
    }
  return false;
}

In the above code I want to create mxList.size() CompletableFuture, execute each. If the result of any of them is true I want to break the loop, when I used the get() method it blocks and I waste the benefit of concurrency, any idea about how to do that?

4
  • 1
    Use a CompletionService. Make the tasks interruptible (otherwise you gain very little) - when you recieve an invalid result then cancel all the remaining tasks. There's a good example in the documentation for ExecutorCompletionService. Commented Nov 8, 2018 at 19:09
  • Thanks @BoristheSpider , but do you have an idea about how to do that using CompletableFuture ? Commented Nov 8, 2018 at 19:17
  • Given that you have a tool specifically designed for the job, why? You can do it trivially by using CompletableFuture.thenAccept to dump the results into a BlockingQueue and having your main thread take() and process the results as they become available - hey, this is exactly what CompletionService does. Commented Nov 8, 2018 at 19:19
  • Incidentally - what is the purpose of verifier.thenApply(isvalid -> isvalid)? Commented Nov 8, 2018 at 19:20

1 Answer 1

1

Here's an implementation that submits all tasks and then gets results, returning on the first true result:

private static boolean validateSMTP(final ArrayList<String> mxList, String address)
        throws ExecutionException, InterruptedException {

    ExecutorService pool = Executors.newFixedThreadPool(mxList.size());

    return mxList.stream()
            .map(mx -> createAsyncVerifier(mx, address, pool))
            .collect(Collectors.toList())
            .stream()
            .map(CompletableFuture<Boolean>::join)
            .filter(b -> b)
            .findFirst()
            .orElse(Boolean.FALSE);
}

The .collect(Collectors.toList()) ensures that all tasks are submitted. In the second stream, join is called, but this doesn't cause unnecessary waits as all tasks are already submitted.

findFirst will return as soon as the first element passes the filter.

8
  • This waits for all items to complete, which is fine. It would be better to failfast I imagine - but this is perfectly acceptable as a first cut. Commented Nov 8, 2018 at 19:25
  • 2
    .map(CompletableFuture<Boolean>::join) will wait for tasks to complete - of course they run in parallel, which is why you have my vote. My point is that using a CompletionService or similar allows you to cancel the rest of things in flight/queued as soon as the first thing fails/succeeds or whatever. Commented Nov 8, 2018 at 19:32
  • 1
    @BoristheSpider I hear that, thanks for clarifying. Stream.findFirst() is a short-circuiting stream operation, meaning it will stop the stream as soon as the first true value is encountered, without waiting for all remaining `.map(CompletableFuture<Boolean>::join)
    – ernest_k
    Commented Nov 8, 2018 at 19:38
  • 1
    The other issue is obviously that the first CompletableFuture in the List might be the longest running; it's rather unlikely it will be the shortest running... Commented Nov 8, 2018 at 19:43
  • 1
    @BoristheSpider 100%. In addition to that, the size of the pool also matters (I assumed it's configured accordingly)
    – ernest_k
    Commented Nov 8, 2018 at 19:47

Not the answer you're looking for? Browse other questions tagged or ask your own question.