0

I have a list of clients for which I want to collect the data from an external api. I would like to start multiple threads to collect data and wait for all of them to complete and if each thread is not completed in certain time I would like to save this in a database. I am using CompletableFuture.allOf

My code looks like this

    public void fetchDataForAllClients() {
        String previousDate = DateUtils.getPreviousDate();
        List<Integer> clientIdList = PropertiesUtil.getClientIdList();

        CompletableFuture.allOf(clientIdList.stream()
                        .map(clientId -> fetchData(previousDate, clientId)
                                .exceptionally(e -> {
                                    LOGGER.error(e.getMessage(), e);
                                    return null;
                                })
                                .thenAcceptAsync(s -> System.out.println(s + ". FetchDataThread Finished for "+ clientId + " at " + LocalDateTime.now())))
                        .toArray(CompletableFuture<?>[]::new))
                .join();
    }

    @Async
    CompletableFuture<Integer> fetchData(final String date, final Integer clientId) {
        counter++;
        System.out.println(counter + ". FetchDataThread Started for "+ clientId + " at " + LocalDateTime.now());
        boolean failed = false;
        String errorMsg = null;
        try {
            myApiService.fetchDataForClient(clientId, date, date);
        } catch (MyApiException exception) {
            failed = true;
            errorMsg = exception.getMessage();
        }
        fetchStatsService.createFetchStats(clientId, date, failed, errorMsg);
        return CompletableFuture.completedFuture(counter);
    }

The problem with this is that it does not start fetchData(previousDate, clientId) in Async. It runs sequentially.

1
  • The problem is your stream is calling #fetchData, which performs a lot of synchronous code and then wraps the already-retrieved result within the CompletableFuture. You want something along the lines of CompletableFuture#supplyAsync for nearly the entire method body, which would be the generating Supplier<Integer> for CompletableFuture to run.
    – Rogue
    Commented Oct 28, 2021 at 12:14

1 Answer 1

1

@Aync will not work if its invoked from within same class cause it will call the original method not the Intercepted one, so change fetchData method to return Integer then call the method using compleatableFuture.supplyAsync() which actually spawns new thread to execute that method

   List<CompleatbleFutures> futures= clientIdList.stream()
    .map(id->CompleatbleFutures.supplyAsync(fetchdata(..)))
    .collect(Collectors.toList());
    CompleatbleFuture.allOf(futures.toArray(futures.size));
0

Not the answer you're looking for? Browse other questions tagged or ask your own question.