I'd been meaning to get a little example that scrapes the surface of the java.util.concurrent package online. Here's what I came up with:
import java.util.concurrent.*;
import java.util.*;
public class Example implements Runnable {
private static Random random = new Random();
private String payload;
public Example (String someString) {
this.payload = someString;
}
public String getPayload() {
return this.payload;
}
public void run () {
int seconds = random.nextInt(10);
long totalSleep = 1000l*seconds;
try {
Thread.sleep(totalSleep);
} catch (InterruptedException tie) {
throw new RuntimeException("I got interrupted");
}
System.out.println(this.payload);
}
public static void main (String[] args) {
ExecutorService es = Executors.newFixedThreadPool(3);
List<Future<Example>> tasks = new ArrayList<Future<Example>>();
for (int x=0; x<10; x++) {
String name = "I am thread number: " + x;
Example e = new Example(name);
Future<Example> future = es.submit(e, e);
tasks.add(future);
}
// -- all threads should be launching, let's get the Example objects
try {
for (Future<Example> future : tasks) {
Example e = future.get();
System.out.println(" [future complete]: " + e.getPayload());
}
es.shutdown();
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
}
It's a very simple thread. It gets constructed with a String (payload). When run, it sleeps for an arbitrary amount of time (less than 10 seconds), and then prints out the payload it was constructed with.
There's really nothing new with the Runnable interface and the run method here. The fun part is what we get to play with from the java.util.concurrent package.
The first major point to note is that we aren't going to launch Threads the old-fashioned way [Thread t = new Thread(someRunnable); t.start();]. Instead, we're going to create an ExecutorService and submit our Runnable objects to it. There are a variety of ExecutorService implementations that can be created from the Executors factory, but the simplest of the lot might be the one created by the newFixedThreadPool(int size) method. The method name in this case is fairly self-documenting.
public static void main (String[] args) {
ExecutorService es = Executors.newFixedThreadPool(3);
The next fancy object that we encounter is Future. The ExecutorService's submit method can accept a Runnable as well as an arbitrary object, in this case, an Example. In return, it will provide a Future<Example>. (Note that in our case, even though we are submitting the same instance variable (e), it is treated as a Runnable, and then an Example in the context of the submit method). We've also got a List of Future of Examples that we augment with our freshly returned Future<Example> object.
List<Future<Example>> tasks = new ArrayList<Future<Example>>();
for (int x=0; x<10; x++) {
String name = "I am thread number: " + x;
Example e = new Example(name);
Future<Example> future = es.submit(e, e);
tasks.add(future);
}
At this point, the ExecutorService has already begun to start all the Runnables it can (3 concurrently in this case). As soon as one completes, it will replace it with another. However, all of that action is happening in a separate thread, and our main method proceeds execution. And our main method is still chugging along, which gets us here:
try {
for (Future<Example> future : tasks) {
Example e = future.get();
System.out.println(" [future complete]: " + e.getPayload());
}
es.shutdown();
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
We loop over the list, and recover the Future<Example> objects that we threw in there as a result of the Runnable submissions. Note that we're doing this in the same order as we created and submitted our threads to the pool. We then get() the Example object that we asked the ExecutorService to return to us upon thread completion. As you can see, the Future is the conduit for this activity. Finally, we shutdown the pool.
Here's the output from the execution:
~/Code/concurrency keerat$ java Example
I am thread number: 1
I am thread number: 2
I am thread number: 4
I am thread number: 0
[future complete]: I am thread number: 0
[future complete]: I am thread number: 1
[future complete]: I am thread number: 2
I am thread number: 5
I am thread number: 6
I am thread number: 8
I am thread number: 9
I am thread number: 3
[future complete]: I am thread number: 3
[future complete]: I am thread number: 4
[future complete]: I am thread number: 5
[future complete]: I am thread number: 6
I am thread number: 7
[future complete]: I am thread number: 7
[future complete]: I am thread number: 8
[future complete]: I am thread number: 9
As evident, our threads finish execution in an arbitrary order. However, using the List of Future objects, we are able to join them in the same order we put them in the queue.
There's nothing here that we couldn't have done with a little bit of good, old-school threading. But, we got a reliable, clean and easy pool in one line. We got a simple mechanism to join on a thread and get something in return- something that the thread could have mutated or worked on.
A lot of thanx.
ReplyDeleteFuture.get() wait for Future.isDone() but, how i can get the future object status (isDone,isCancelled) if the task duration is long?
ReplyDeleteExample: Task duration is 30 minutes, i send the asynchronous task and 15 minutes after i want to see if the task is finished
Some ideas? Thanks!
You could manage the state of future within the Runnable or Callable that it encapsulates. In the above case, the Example object's run method could set or manage a flag indicating completion.
ReplyDeleteVery helpfull, congratulations
ReplyDeleteExcelent!
ReplyDeletegreat example
ReplyDeleteGreat explanation and Example. Thanks!
ReplyDeleteClean and simple to understand. Thanks.
ReplyDelete