Friday, March 25, 2016

Leveraging power of asynchronous in Java 8


Introduction



Emergent of multi-core hardware processor provides us the opportunity execute asynchronous and parallel tasks in modern machines. Java8 designers has put considerable effort to design the language in addition to Functional style programming, so the developers can write some powerful asynchronous tasks. Java7 was initially to respond with fork/join frame work so that you can slice the tasks with 'Future' with help of thread executor pools. But it was still felt not enough to leverage multi-core architectures for increasing complex problems. Java8 has introduced CompletableFuture<T> for a solution. Together with functional style programming offered by java these constructs provides real power to the language. Not using it would be a sin!

Lets checkout,
public CompletableFuture assembleGeneralTires() {
		CompletableFuture tireFuture = new CompletableFuture<>();
		new Thread(() -> {
			Tire tire = getTire("general");
			tireFuture.complete(tire);
		}).start();
		return tireFuture;
}

//Access value as:

CompletableFuture assembleGeneralTires = assembler.assembleGeneralTires();
Tire tire = assembleGeneralTires.get();

Seems everything would be fine, until something trips up, error. so, what to do if an error comes. We have ''completeExceptionally(Exception e)" for that,

public CompletableFuture assembleGeneralTiresCautiously() {
		CompletableFuture tireFuture = new CompletableFuture<>();
		new Thread(() -> {
			try {
				Tire tire = getTire("general");
				tireFuture.complete(tire);
			}
			catch (Exception e) {
				tireFuture.completeExceptionally(e);
			}
		}).start();
		return tireFuture;
}

Can we improve bit,
Yes. Java8 'CompletableFuture' has some factory methods, we don't always need to manually wire the thread as we did before.

public CompletableFuture assembleGeneralTiresFactory() {
		return CompletableFuture.supplyAsync(()-> getTire("general"));
	}

supplyAsync() accepts a supplier as argument. if you would like to know what is supplier, it is a functional gets you <T> .

Leverage non-blocking async operations

Lets do a lousy sequential operation.

public int[] getAverageVehicleSpeeds(){
		Vehicle carLamborghini = new Vehicle("lamborghini", "car", 485);
		Vehicle truck = new Vehicle("volvo", "truck", 190);
		Vehicle van = new Vehicle("toyoto", "van", 255);
		Vehicle bike = new Vehicle("harley", "bike", 360);		
		
		List vehicles = Arrays.asList(carLamborghini,truck,van,bike);
		int[] speeds = vehicles.stream().map(t->t.getAverageSpeed()).mapToInt(t->t.intValue()).toArray();
		
		return speeds;
}

public class Vehicle {
....
......

public int getAverageSpeed(){
		Util.delay(); //a delay is simulated
		return new Random(50).nextInt(topSpeed);
	}
}

As this operation is inherently sequential, this can perform terribly sluggish, if we think terms of improving, first things comes to mind is using parallel streams.

....
List vehicles = Arrays.asList(carLamborghini,truck,van,bike);
int[] speeds = vehicles.parallelStream().map(t->t.getAverageSpeed()).mapToInt(t->t.intValue()).toArray();
....

This may perform better, but we are not sure how this would scale, saying when we add many more vehicles in the future. Let try with our 'completeable future' way too

List> speedsFuture = vehicles.stream()
				.map(t -> CompletableFuture
						.supplyAsync(() -> t.getAverageSpeed()))
				.collect(Collectors.toList());

//This gives me list of completableFuture, that means each of this operation on the vehicles start asynchronously

//now lets take there values, to do that
int[] speeds = speedsFuture.stream().map(t->t.join()).mapToInt(t->t.intValue()).toArray(); 


This will scale bit better(trust this, people have done the numbers against parallel stream, 'completable future' does better when things crowd up), but to get the real advantage we have another weapon, executors. CompletableFuture.supplyAsync(...supplier.., ,*executor ) accepts executors as another parameter. We should be able get fine tuned executors for our machine based on the number of core processors in our machine

For optimal cpu utilization, using following formula (refer Java concurrency in practice) we can count number of threads
Nthreads = NCPU * UCPU * (1 + W/C)
For 4 core cpu (NCPU), 100% (UCPC)  and (wait time )/(compute time ) ration => 50. We can have 200 threads in the pool for optimal use of the system resources


Executor executor = Executors.newFixedThreadPool(Math.min(vehicles.size(), 200), new ThreadFactory() {
	        @Override
	        public Thread newThread(Runnable r) {
	            Thread t = new Thread(r);
	            t.setDaemon(true);
	            return t;
	        }
});
//we set 200 as the limit so application inadvertently doesn't crash by overrunning number of threads

//then just simply pass the executor to supplyAsync

List> speedsFuture = vehicles.stream()
				.map(t -> CompletableFuture
						.supplyAsync(() -> t.getAverageSpeed(),executor))
				.collect(Collectors.toList());
		
int[] speeds = speedsFuture.stream().map(t->t.join()).mapToInt(t->t.intValue()).toArray();

//advantage here is we can fine tune with executor which actually more control

What if we want to pipeline asynchronous  tasks, suppose lets say we want find the engine type of vehicles based on there average speed. we have to use 'thenCompose'

List> enginesFutures = vehicles.stream()
				.map(t -> CompletableFuture
						.supplyAsync(() -> t.getAverageSpeed(),executor))
				.map(f->f.thenCompose(p-> CompletableFuture.supplyAsync(()-> Vehicle.getEngine(p),executor)))
				.collect(Collectors.toList());
		
List engines = enginesFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());

Here, even though it may look convoluted piece of code, it is a simple concept. To combine one input to a asynchronous functionality that is dependent on another asynchronous functionality we simply 'thenCompose'

What if we want execute asynchronous  and synchrous, we have to use 'thenApply'. Assume our 'getEngine' is not long running or I/O operation and it synchronous


List> collect = vehicles.stream()
		.map(t -> CompletableFuture
				.supplyAsync(() -> t.getAverageSpeed(),executor))
		.map(f->f.thenApply(Vehicle::getEngine)).collect(Collectors.toList());
List engines = enginesFuture.stream().map(CompletableFuture::join).collect(Collectors.toList());


That is a little peek of what asynchronous Java8 has to offer. Explore API at your own will, and sure that will be a exiting ride. 

2 comments: