I have a list of clients for which I want to collect the data from an external api.
I would like to start multiple threads to collect data and wait for all of them to complete and if each thread is not completed in certain time I would like to save this in a database.
I am using CompletableFuture.allOf
My code looks like this
public void fetchDataForAllClients() {
String previousDate = DateUtils.getPreviousDate();
List<Integer> clientIdList = PropertiesUtil.getClientIdList();
CompletableFuture.allOf(clientIdList.stream()
.map(clientId -> fetchData(previousDate, clientId)
.exceptionally(e -> {
LOGGER.error(e.getMessage(), e);
return null;
})
.thenAcceptAsync(s -> System.out.println(s + ". FetchDataThread Finished for "+ clientId + " at " + LocalDateTime.now())))
.toArray(CompletableFuture<?>[]::new))
.join();
}
@Async
CompletableFuture<Integer> fetchData(final String date, final Integer clientId) {
counter++;
System.out.println(counter + ". FetchDataThread Started for "+ clientId + " at " + LocalDateTime.now());
boolean failed = false;
String errorMsg = null;
try {
myApiService.fetchDataForClient(clientId, date, date);
} catch (MyApiException exception) {
failed = true;
errorMsg = exception.getMessage();
}
fetchStatsService.createFetchStats(clientId, date, failed, errorMsg);
return CompletableFuture.completedFuture(counter);
}
The problem with this is that it does not start fetchData(previousDate, clientId)
in Async. It runs sequentially.
#fetchData
, which performs a lot of synchronous code and then wraps the already-retrieved result within theCompletableFuture
. You want something along the lines ofCompletableFuture#supplyAsync
for nearly the entire method body, which would be the generatingSupplier<Integer>
for CompletableFuture to run.