There are 2 ways to run task async in a list:
- use
parallel()
- use traditional
CompletableFuture
parallel
’s threads limited to number of cores of CPU, if CPU is 8 cores, then every time starts 8 threads, no adjust no customized thread pool, this is same as 2nd way use without customized thread pool (executorService)
Example to show how to use parallel
method:
public void parallelCalculator(List<Integer> list) {
List<Integer> result = list.stream().parallel().map(e -> {
System.out.println(e + " thread name: " + Thread.currentThread().getName());
return e * 10;
}).collect(Collectors.toList());
result.forEach(System.out::println);
}
to See hwo many core in your system:
System.out.println("how many core: " + Runtime.getRuntime().availableProcessors());
usually the parallel
is good for compute intense
task, but not for IO intense
works, if you want to connect to other server do something then this is not good for your approach.
Use traditional CompletableFuture
has 2 ways: with or without customized threadpool, if you don’t use customized threadpool, your thread is from ForkJoinPoo.commonPool()
, that is no different as using parallel
method, but the benefit for 2 ways is to use executorService
First way to run (little bit complicated, but easy to understand)
public void asyncCalculator(List<Integer> list) {
ExecutorService executorService = Executors.newFixedThreadPool(Math.min(list.size(), 1000));
CompletableFuture<Integer>[] results = list.stream().map(e -> {
return CompletableFuture.supplyAsync(() -> {
System.out.println(e + " current thread name: " + Thread.currentThread().getName());
return e * 10;
}, executorService);
}).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(results).join();
Arrays.asList(results).forEach(e -> {
try {
System.out.println(e.get());
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
} catch (ExecutionException executionException) {
executionException.printStackTrace();
}
});
executorService.shutdown();
}
Second way is much simpler, but not easy to understand, but general idea is first Collections
is the CompletableFuture<Integer>
list, and second one (after join) is the real result list
public void asyncCalculatorV2(List<Integer> list) {
ExecutorService executorService = Executors.newFixedThreadPool(Math.min(list.size(), 1000));
List<Integer> result = list.stream().map(e -> CompletableFuture.supplyAsync(() -> {
return e * 10;
}, executorService)).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());
executorService.shutdown();
result.forEach(System.out::println);
}
here I used
ExecutorService executorService = Executors.newFixedThreadPool(Math.min(list.size(), 1000));
to calculate the max threads that process need, it is dangerous to use executor service without calculation, you should use following formula to calculate how many threads you need:
T = N * u * (1 + E / C)
T: Total Threads
N: number of cores
u: expected usage rate for CPU
E: waiting time
C: expected caculation time