Sum It Up: Thread Pools

Now with all this Java thread information, let's create something useful: a thread pool. A thread pool is a group of threads designed to execute arbitrary tasks. Perhaps you want to limit the number of threads used for simultaneous network or I/O connections, or you just want to control the maximum number of threads on the system for processor-intensive tasks. The ThreadPool class in Listing 1.1 enables you to choose the number of threads in the pool and to run tasks defined as Runnables. Here's an example of how to use ThreadPool—creating a pool of eight threads, running a simple task, and then waiting for the task to finish:

ThreadPool myThreadPool = new ThreadPool(8);
myThreadPool.runTask(new Runnable() {
 public void run() {
 System.out.println("Do something cool here.");
 }
});
myThreadPool.join();


The runTask() method returns immediately. If all the threads in the pool are busy executing tasks, calls to runTask() will store new tasks in a queue until a thread is available to run it.

Listing 1.1 ThreadPool.java
import java.util.LinkedList;
/**
 A thread pool is a group of a limited number of threads that
 are used to execute tasks.
*/
public class ThreadPool extends ThreadGroup {
 private boolean isAlive;
 private LinkedList taskQueue;
 private int threadID;
 private static int threadPoolID;
 /**
 Creates a new ThreadPool.
 @param numThreads The number of threads in the pool.
 */
 public ThreadPool(int numThreads) {
 super("ThreadPool-" + (threadPoolID++));
 setDaemon(true);
 isAlive = true;
 taskQueue = new LinkedList();
 for (int i=0; i<numThreads; i++) {
 new PooledThread().start();
 }
 }
 /**
 Requests a new task to run. This method returns
 immediately, and the task executes on the next available
 idle thread in this ThreadPool.
 <p>Tasks start execution in the order they are received.
 @param task The task to run. If null, no action is taken.
 @throws IllegalStateException if this ThreadPool is
 already closed.
 */
 public synchronized void runTask(Runnable task) {
 if (!isAlive) {
 throw new IllegalStateException();
 }
 if (task != null) {
 taskQueue.add(task);
 notify();
 }
 }
 protected synchronized Runnable getTask()
 throws InterruptedException
 {
 while (taskQueue.size() == 0) {
 if (!isAlive) {
 return null;
 }
 wait();
 }
 return (Runnable)taskQueue.removeFirst();
 }
 /**
 Closes this ThreadPool and returns immediately. All
 threads are stopped, and any waiting tasks are not
 executed. Once a ThreadPool is closed, no more tasks can
 be run on this ThreadPool.
 */
 public synchronized void close() {
 if (isAlive) {
 isAlive = false;
 taskQueue.clear();
 interrupt();
 }
 }
 /**
 Closes this ThreadPool and waits for all running threads
 to finish. Any waiting tasks are executed.
 */
 public void join() {
 // notify all waiting threads that this ThreadPool is no
 // longer alive
 synchronized (this) {
 isAlive = false;
 notifyAll();
 }
 // wait for all threads to finish
 Thread[] threads = new Thread[activeCount()];
 int count = enumerate(threads);
 for (int i=0; i<count; i++) {
 try {
 threads[i].join();
 }
 catch (InterruptedException ex) { }
 }
 }
 /**
 A PooledThread is a Thread in a ThreadPool group,
 designed to run tasks (Runnables).
 */
 private class PooledThread extends Thread {
 public PooledThread() {
 super(ThreadPool.this,
 "PooledThread-" + (threadID++));
 }
 public void run() {
 while (!isInterrupted()) {
 // get a task to run
 Runnable task = null;
 try {
 task = getTask();
 }
 catch (InterruptedException ex) { }
 // if getTask() returned null or was interrupted,
 // close this thread by returning.
 if (task == null) {
 return;
 }
 // run the task, and eat any exceptions it throws
 try {
 task.run();
 }
 catch (Throwable t) {
 uncaughtException(this, t);
 }
 }
 }
 }
}


The only concept the ThreadPool class uses that we haven't mentioned yet is use of the ThreadGroup class. A ThreadGroup is what you'd expect it to be—a group of threads and some functions to modify the threads. For example, in this code, ThreadGroup's interrupt() method is used to stop all waiting threads when the ThreadPool is closed. Now let's test the ThreadPool with the ThreadPoolTest class, shown in Listing 1.2. ThreadPoolTest starts a few threads in a ThreadPool and also gives the ThreadPool several tasks to run. In this case, each task simply prints a message to the console. The program takes the number of tasks and the number of threads as arguments. To run the test, at the command line, enter something like this:

java ThreadPoolTest 8 4


This runs the test with eight tasks and four threads in the thread pool. Of course, you can try any other combination of tasks and threads you want!

Listing 1.2 ThreadPoolTest.java
public class ThreadPoolTest {
 public static void main(String[] args) {
 if (args.length != 2) {
 System.out.println("Tests the ThreadPool task.");
 System.out.println(
 "Usage: java ThreadPoolTest numTasks numThreads");
 System.out.println(
 " numTasks - integer: number of task to run.");
 System.out.println(
 " numThreads - integer: number of threads " +
 "in the thread pool.");
 return;
 }
 int numTasks = Integer.parseInt(args[0]);
 int numThreads = Integer.parseInt(args[1]);
 // create the thread pool
 ThreadPool threadPool = new ThreadPool(numThreads);
 // run example tasks
 for (int i=0; i<numTasks; i++) {
 threadPool.runTask(createTask(i));
 }
 // close the pool and wait for all tasks to finish.
 threadPool.join();
 }
 /**
 Creates a simple Runnable that prints an ID, waits 500
 milliseconds, then prints the ID again.
 */
 private static Runnable createTask(final int taskID) {
 return new Runnable() {
 public void run() {
 System.out.println("Task " + taskID + ": start");
 // simulate a long-running task
 try {
 Thread.sleep(500);
 }
 catch (InterruptedException ex) { }
 System.out.println("Task " + taskID + ": end");
 }
 };
 }
}


Because printing a message to the console takes virtually no time, ThreadPoolTest simulates a longer-running task by sleeping for 500 milliseconds.

Screenshot


   
Comments