Sunday, July 24, 2011

Handling completion of concurrent tasks

In the last blog on Java Executors I introduced Executors which have been the preferred way of writing multi-threaded programs in java since JDK5. In this blog we explain java.util.concurrent.Future, which helps in processing the results from concurrent task.

Prior to java.util.concurrent, if you wanted to wait for a task to complete and get some result from the task after it completed, you had to implement the wait/notify mechanism.

Up to JDK5, the interface used to model concurrent tasks was the Runnable, which has a run method. A major limitation of the run method is that it cannot return a result or throw a checked exception.

JDK5 introduced interface Callable to model concurrent tasks. The call method returns a result and can throw a checked exception.
public interface Callable {
    V call() throws Exception ;
}
How to get the result returned by call() ?
The interface Future provides methods to check if the task is completed and to retrieve the result.
public interface Future {
    boolean cancel(boolean interupt) ;
    boolean isCancelled() ;
    boolean isDone() ;
    V get() throws .......
    V get(long timeout, TimeUnit unit) throws ....... 
}
You call one of the get methods on Future to get the result. But how do you create or get a handle to a Future and how is it related to the concurrent task which might be a Callable ?

In the previous blog we discussed the ExecutorService to execute tasks and we did this by calling the execute method passing it a runnable. The ExecutorService also has a submit method that take a Callable as a parameter and returns a Future.

Future submit(Callable task) ;

The sample below brings it all together:

1 public class FileHandler {
2  private static final ExecutorService tp=Executors.newFixedThreadPool(50);
3
4  public void downloadandProcess(FileMetaData fdata) throws IOException {
5       final String filepath = fdata.getfilePath() ;   
6       Callable<File> task = new Callable() {
7       public File call() {
9          File result = download(filepath) ;   
10          return result ;
11      }
12      Future future = tp.submit(task) ;
13      //wait for the file to be downloaded 
14
15      try {
16        File f = future.get() ;
17        process(f) ;
19      } catch (InterruptedException e) {
20        Thread.currentThread().interrupt() ;
21        future.cancel(true) ;
22      }
23  }


In the above example there are 2 threads. The thread that calls the method downloadandProcess. This thread kicks of another task (thread) in line 12 to download a file. The task is defined as a Callable in lines 6-11.In line 16 the calling thread will block until a result is available from the download thread.

Using Executor, Callable & Future is thus much simpler than using java.lang.Thread and the wait/notify mechanism.