Introduction
Emergent of multi-core hardware processor provides us the opportunity execute asynchronous and parallel tasks in modern machines. Java8 designers has put considerable effort to design the language in addition to Functional style programming, so the developers can write some powerful asynchronous tasks. Java7 was initially to respond with fork/join frame work so that you can slice the tasks with 'Future' with help of thread executor pools. But it was still felt not enough to leverage multi-core architectures for increasing complex problems. Java8 has introduced CompletableFuture<T> for a solution. Together with functional style programming offered by java these constructs provides real power to the language. Not using it would be a sin!
Lets checkout,
public CompletableFutureassembleGeneralTires() { CompletableFuture tireFuture = new CompletableFuture<>(); new Thread(() -> { Tire tire = getTire("general"); tireFuture.complete(tire); }).start(); return tireFuture; } //Access value as: CompletableFuture assembleGeneralTires = assembler.assembleGeneralTires(); Tire tire = assembleGeneralTires.get();
Seems everything would be fine, until something trips up, error. so, what to do if an error comes. We have ''completeExceptionally(Exception e)" for that,
public CompletableFutureassembleGeneralTiresCautiously() { CompletableFuture tireFuture = new CompletableFuture<>(); new Thread(() -> { try { Tire tire = getTire("general"); tireFuture.complete(tire); } catch (Exception e) { tireFuture.completeExceptionally(e); } }).start(); return tireFuture; }
Can we improve bit,
Yes. Java8 'CompletableFuture' has some factory methods, we don't always need to manually wire the thread as we did before.public CompletableFutureassembleGeneralTiresFactory() { return CompletableFuture.supplyAsync(()-> getTire("general")); }
supplyAsync() accepts a supplier as argument. if you would like to know what is supplier, it is a functional gets you <T> .
Leverage non-blocking async operations
Lets do a lousy sequential operation.public int[] getAverageVehicleSpeeds(){ Vehicle carLamborghini = new Vehicle("lamborghini", "car", 485); Vehicle truck = new Vehicle("volvo", "truck", 190); Vehicle van = new Vehicle("toyoto", "van", 255); Vehicle bike = new Vehicle("harley", "bike", 360); ListAs this operation is inherently sequential, this can perform terribly sluggish, if we think terms of improving, first things comes to mind is using parallel streams.vehicles = Arrays.asList(carLamborghini,truck,van,bike); int[] speeds = vehicles.stream().map(t->t.getAverageSpeed()).mapToInt(t->t.intValue()).toArray(); return speeds; } public class Vehicle { .... ...... public int getAverageSpeed(){ Util.delay(); //a delay is simulated return new Random(50).nextInt(topSpeed); } }
.... Listvehicles = Arrays.asList(carLamborghini,truck,van,bike); int[] speeds = vehicles.parallelStream().map(t->t.getAverageSpeed()).mapToInt(t->t.intValue()).toArray(); ....
This may perform better, but we are not sure how this would scale, saying when we add many more vehicles in the future. Let try with our 'completeable future' way too
List> speedsFuture = vehicles.stream() .map(t -> CompletableFuture .supplyAsync(() -> t.getAverageSpeed())) .collect(Collectors.toList()); //This gives me list of completableFuture , that means each of this operation on the vehicles start asynchronously //now lets take there values, to do that int[] speeds = speedsFuture.stream().map(t->t.join()).mapToInt(t->t.intValue()).toArray();
This will scale bit better(trust this, people have done the numbers against parallel stream, 'completable future' does better when things crowd up), but to get the real advantage we have another weapon, executors. CompletableFuture.supplyAsync(...supplier.., ,*executor ) accepts executors as another parameter. We should be able get fine tuned executors for our machine based on the number of core processors in our machine
For optimal cpu utilization, using following formula (refer Java concurrency in practice) we can count number of threads
Nthreads = NCPU * UCPU * (1 + W/C)
For 4 core cpu (NCPU), 100% (UCPC) and (wait time )/(compute time ) ration => 50. We can have 200 threads in the pool for optimal use of the system resources
Executor executor = Executors.newFixedThreadPool(Math.min(vehicles.size(), 200), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } }); //we set 200 as the limit so application inadvertently doesn't crash by overrunning number of threads //then just simply pass the executor to supplyAsync List> speedsFuture = vehicles.stream() .map(t -> CompletableFuture .supplyAsync(() -> t.getAverageSpeed(),executor)) .collect(Collectors.toList()); int[] speeds = speedsFuture.stream().map(t->t.join()).mapToInt(t->t.intValue()).toArray(); //advantage here is we can fine tune with executor which actually more control
What if we want to pipeline asynchronous tasks, suppose lets say we want find the engine type of vehicles based on there average speed. we have to use 'thenCompose'
Here, even though it may look convoluted piece of code, it is a simple concept. To combine one input to a asynchronous functionality that is dependent on another asynchronous functionality we simply 'thenCompose'
What if we want execute asynchronous and synchrous, we have to use 'thenApply'. Assume our 'getEngine' is not long running or I/O operation and it synchronous
That is a little peek of what asynchronous Java8 has to offer. Explore API at your own will, and sure that will be a exiting ride. List> enginesFutures = vehicles.stream() .map(t -> CompletableFuture .supplyAsync(() -> t.getAverageSpeed(),executor)) .map(f->f.thenCompose(p-> CompletableFuture.supplyAsync(()-> Vehicle.getEngine(p),executor))) .collect(Collectors.toList()); List engines = enginesFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
Here, even though it may look convoluted piece of code, it is a simple concept. To combine one input to a asynchronous functionality that is dependent on another asynchronous functionality we simply 'thenCompose'
What if we want execute asynchronous and synchrous, we have to use 'thenApply'. Assume our 'getEngine' is not long running or I/O operation and it synchronous
List> collect = vehicles.stream() .map(t -> CompletableFuture .supplyAsync(() -> t.getAverageSpeed(),executor)) .map(f->f.thenApply(Vehicle::getEngine)).collect(Collectors.toList()); List engines = enginesFuture.stream().map(CompletableFuture::join).collect(Collectors.toList());