001/*
002 * $RCSfile: ThreadPool.java,v $
003 * $Revision: 1.2 $
004 * $Date: 2005/09/26 22:08:13 $
005 * $State: Exp $
006 *
007 * Class:                   ThreadPool
008 *
009 * Description:             A pool of threads
010 *
011 *
012 *
013 * COPYRIGHT:
014 *
015 * This software module was originally developed by Raphaël Grosbois and
016 * Diego Santa Cruz (Swiss Federal Institute of Technology-EPFL); Joel
017 * Askelöf (Ericsson Radio Systems AB); and Bertrand Berthelot, David
018 * Bouchard, Félix Henry, Gerard Mozelle and Patrice Onno (Canon Research
019 * Centre France S.A) in the course of development of the JPEG2000
020 * standard as specified by ISO/IEC 15444 (JPEG 2000 Standard). This
021 * software module is an implementation of a part of the JPEG 2000
022 * Standard. Swiss Federal Institute of Technology-EPFL, Ericsson Radio
023 * Systems AB and Canon Research Centre France S.A (collectively JJ2000
024 * Partners) agree not to assert against ISO/IEC and users of the JPEG
025 * 2000 Standard (Users) any of their rights under the copyright, not
026 * including other intellectual property rights, for this software module
027 * with respect to the usage by ISO/IEC and Users of this software module
028 * or modifications thereof for use in hardware or software products
029 * claiming conformance to the JPEG 2000 Standard. Those intending to use
030 * this software module in hardware or software products are advised that
031 * their use may infringe existing patents. The original developers of
032 * this software module, JJ2000 Partners and ISO/IEC assume no liability
033 * for use of this software module or modifications thereof. No license
034 * or right to this software module is granted for non JPEG 2000 Standard
035 * conforming products. JJ2000 Partners have full right to use this
036 * software module for his/her own purpose, assign or donate this
037 * software module to any third party and to inhibit third parties from
038 * using this software module for non JPEG 2000 Standard conforming
039 * products. This copyright notice must be included in all copies or
040 * derivative works of this software module.
041 *
042 * Copyright (c) 1999/2000 JJ2000 Partners.
043 *
044 *
045 *
046 */
047
048
049package jj2000.j2k.util;
050
051/**
052 * This class implements a thread pool. The thread pool contains a set of
053 * threads which can be given work to do.
054 *
055 * <P>If the Java Virtual Machine (JVM) uses native threads, then the
056 * different threads will be able to execute in different processors in
057 * parallel on multiprocessors machines. However, under some JVMs and
058 * operating systems using native threads is not sufficient to allow the JVM
059 * access to multiple processors. This is the case when native threads are
060 * implemented using POSIX threads on lightweight processes
061 * (i.e. PTHREAD_SCOPE_PROCESS sopce scheduling), which is the case on most
062 * UNIX operating systems. In order to do provide access to multiple
063 * processors it is necessary to set the concurrency level to the number of
064 * processors or slightly higher. This can be achieved by setting the Java
065 * system property with the name defined by CONCURRENCY_PROP_NAME to some
066 * non-negative number. This will make use of the 'NativeServices' class and
067 * supporting native libraries. See 'NativeServices' for details. See
068 * 'CONCURRENCY_PROP_NAME' for the name of the property.
069 *
070 * <P>Initially the thread pool contains a user specified number of idle
071 * threads. Idle threads can be given a target which is run. While running the
072 * target the thread temporarily leaves the idle list. When the target
073 * finishes, it joins the idle list again, waiting for a new target. When a
074 * target is finished a thread can be notified on a particular object that is
075 * given as a lock.
076 *
077 * <P>Jobs can be submitted using Runnable interfaces, using the 'runTarget()'
078 * methods. When the job is submitted, an idle thread will be obtained, the
079 * 'run()' method of the 'Runnable' interface will be executed and when it
080 * completes the thread will be returned to the idle list. In general the
081 * 'run()' method should complete in a rather short time, so that the threds
082 * of the pool are not starved.
083 *
084 * <P>If using the non-asynchronous calls to 'runTarget()', it is important
085 * that any target's 'run()' method, or any method called from it, does not
086 * use non-asynchronous calls to 'runTarget()' on the same thread pool where
087 * it was started. Otherwise this could create a dead-lock when there are not
088 * enough idle threads.
089 *
090 * <P>The pool also has a global error and runtime exception condition (one
091 * for 'Error' and one for 'RuntimeException'). If a target's 'run()' method
092 * throws an 'Error' or 'RuntimeException' the corresponding exception
093 * condition is set and the exception object saved. In any subsequent call to
094 * 'checkTargetErrors()' the saved exception object is thrown. Likewise, if a
095 * target's 'run()' method throws any other subclass of 'Throwable' a new
096 * 'RuntimeException' is created and saved. It will be thrown on a subsequent
097 * call to 'checkTargetErrors()'. If more than one exception occurs between
098 * calls to 'checkTargetErrors()' only the last one is saved. Any 'Error'
099 * condition has precedence on all 'RuntimeException' conditions. The threads
100 * in the pool are unaffected by any exceptions thrown by targets.
101 *
102 * <P>The only exception to the above is the 'ThreadDeath' exception. If a
103 * target's 'run()' method throws the 'ThreadDeath' exception a warning
104 * message is printed and the exception is propagated, which will terminate
105 * the thread in which it occurs. This could lead to instabilities of the
106 * pool. The 'ThreadDeath' exception should never be thrown by the program. It
107 * is thrown by the Java(TM) Virtual Machine when Thread.stop() is
108 * called. This method is deprecated and should never be called.
109 *
110 * <P>All the threads in the pool are "daemon" threads and will automatically
111 * terminate when no daemon threads are running.
112 *
113 * @see NativeServices
114 *
115 * @see #CONCURRENCY_PROP_NAME
116 *
117 * @see Runnable
118 *
119 * @see Thread
120 *
121 * @see Error
122 *
123 * @see RuntimeException
124 *
125 * */
126public class ThreadPool {
127
128    /** The name of the property that sets the concurrency level:
129        jj2000.j2k.util.ThreadPool.concurrency */
130    public final static String CONCURRENCY_PROP_NAME =
131        "jj2000.j2k.util.ThreadPool.concurrency";
132
133    /** The array of idle threads and the lock for the manipulation of the
134     * idle thread list. */
135    private ThreadPoolThread idle[];
136
137    /** The number of idle threads */
138    private int nidle;
139
140    /** The name of the pool */
141    private String poolName;
142
143    /** The priority for the pool */
144    private int poolPriority;
145
146    /** The last error thrown by a target. Null if none */
147    // NOTE: needs to be volatile, so that only one copy exits in memory
148    private volatile Error targetE;
149
150    /** The last runtime exception thrown by a target. Null if none */
151    // NOTE: needs to be volatile, so that only one copy exits in memory
152    private volatile RuntimeException targetRE;
153
154    /**
155     * The threads that are managed by the pool.
156     * */
157    class ThreadPoolThread extends Thread {
158        private Runnable target;
159        private Object lock;
160        private boolean doNotifyAll;
161
162        /**
163         * Creates a ThreadPoolThread object, setting its name according to
164         * the given 'idx', daemon type and the priority to the one of the
165         * pool.
166         *
167         * @param idx The index of this thread in the pool
168         *
169         * @param name The name of the thread
170         * */
171        public ThreadPoolThread(int idx, String name) {
172            super(name);
173            setDaemon(true);
174            setPriority(poolPriority);
175        }
176
177        /**
178         * The method that is run by the thread. This method first joins the
179         * idle state in the pool and then enters an infinite loop. In this
180         * loop it waits until a target to run exists and runs it. Once the
181         * target's run() method is done it re-joins the idle state and
182         * notifies the waiting lock object, if one exists.
183         *
184         * <P>An interrupt on this thread has no effect other than forcing a
185         * check on the target. Normally the target is checked every time the
186         * thread is woken up by notify, no interrupts should be done.
187         *
188         * <P>Any exception thrown by the target's 'run()' method is catched
189         * and this thread is not affected, except for 'ThreadDeath'. If a
190         * 'ThreadDeath' exception is catched a warning message is printed by
191         * the 'FacilityManager' and the exception is propagated up. For
192         * exceptions which are subclasses of 'Error' or 'RuntimeException'
193         * the corresponding error condition is set and this thread is not
194         * affected. For any other exceptions a new 'RuntimeException' is
195         * created and the error condition is set, this thread is not affected.
196         * */
197        public void run() {
198            // Join the idle threads list
199            putInIdleList(this);
200            // Permanently lock the object while running so that target can
201            // not be changed until we are waiting again. While waiting for a
202            // target the lock is released.
203            synchronized (this) {
204                while (true) {
205                    // Wait until we get a target
206                    while (target == null) {
207                        try {
208                            this.wait();
209                        } catch (InterruptedException e) {
210                        }
211                    }
212                    // Run the target and catch all possible errors
213                    try {
214                        target.run();
215                    } catch (ThreadDeath td) {
216                        // We have been instructed to abruptly terminate
217                        // the thread, which should never be done. This can
218                        // cause another thread, or the system, to lock.
219                        FacilityManager.getMsgLogger().
220                            printmsg(MsgLogger.WARNING,
221                                     "Thread.stop() called on a ThreadPool "+
222                                     "thread or ThreadDeath thrown. This is "+
223                                     "deprecated. Lock-up might occur.");
224                        throw td;
225                    } catch (Error e) {
226                        targetE = e;
227                    } catch (RuntimeException re) {
228                        targetRE = re;
229                    } catch (Throwable ue) {
230                        // A totally unexpected error has occurred
231                        // (Thread.stop(Throwable) has been used, which should
232                        // never be.
233                        targetRE = new RuntimeException("Unchecked exception "+
234                                                        "thrown by target's "+
235                                                        "run() method in pool "+
236                                                        poolName+".");
237                    }
238                    // Join idle threads
239                    putInIdleList(this);
240                    // Release the target and notify lock (i.e. wakeup)
241                    target = null;
242                    if (lock != null) {
243                        synchronized (lock) {
244                            if (doNotifyAll) {
245                                lock.notifyAll();
246                            }
247                            else {
248                                lock.notify();
249                            }
250                        }
251                    }
252                }
253            }
254        }
255
256        /**
257         * Assigns a target to this thread, with an optional notify lock and a
258         * notify mode. The another target is currently running the method
259         * will block until it terminates. After setting the new target the
260         * runner thread will be wakenup and execytion will start.
261         *
262         * @param target The runnable object containing the 'run()' method to
263         * run.
264         *
265         * @param lock An object on which notify will be called once the
266         * target's run method has finished. A thread to be notified should be
267         * waiting on that object. If null no thread is notified.
268         *
269         * @param notifyAll If true 'notifyAll()', instead of 'notify()', will
270         * be called on tghe lock.
271         * */
272        synchronized void setTarget(Runnable target, Object lock,
273                                    boolean notifyAll) {
274            // Set the target
275            this.target = target;
276            this.lock = lock;
277            doNotifyAll = notifyAll;
278            // Wakeup the thread
279            this.notify();
280        }
281    }
282
283    /**
284     * Creates a new thread pool of the given size, thread priority and pool
285     * name.
286     *
287     * <P>If the Java system property of the name defined by
288     * 'CONCURRENCY_PROP_NAME' is set, then an attempt will be made to load
289     * the library that supports concurrency setting (see
290     * 'NativeServices'). If that succeds the concurrency level will be set to
291     * the specified value. Otherwise a warning is printed.
292     *
293     * @param size The size of the pool (number of threads to create in the
294     * pool).
295     *
296     * @param priority The priority to give to the threads in the pool. If
297     * less than 'Thread.MIN_PRIORITY' it will be the same as the priority of
298     * the calling thread.
299     *
300     * @param name The name of the pool. If null a default generic name is
301     * chosen.
302     *
303     * @see NativeServices
304     *
305     * @see #CONCURRENCY_PROP_NAME
306     * */
307    public ThreadPool(int size, int priority, String name) {
308        int i;
309        ThreadPoolThread t;
310        String prop;
311        int clevel;
312
313        // Initialize variables checking for special cases
314        if (size <= 0) {
315            throw new IllegalArgumentException("Pool must be of positive size");
316        }
317        if (priority < Thread.MIN_PRIORITY) {
318            poolPriority = Thread.currentThread().getPriority();
319        }
320        else {
321            poolPriority = (priority < Thread.MAX_PRIORITY) ? priority :
322                Thread.MAX_PRIORITY;
323        }
324        if (name == null) {
325            poolName = "Anonymous ThreadPool";
326        }
327        else {
328            poolName = name;
329        }
330
331        // If requested to set concurrency try to do it
332        prop = null;
333        try {
334            prop = System.getProperty(CONCURRENCY_PROP_NAME);
335        } catch(SecurityException se) {
336            // Ignore it.
337        }
338        if (prop == null) {
339            // No concurrency to set, do nothing
340        }
341        else {
342            // Get concurrency level
343            try {
344                clevel = Integer.parseInt(prop);
345                if (clevel < 0) throw new NumberFormatException();
346            } catch (NumberFormatException e) {
347                throw new IllegalArgumentException("Invalid concurrency level "+
348                                                   "in property "+
349                                                   CONCURRENCY_PROP_NAME);
350            }
351            // Attempt to load library
352            if (NativeServices.loadLibrary()) {
353                // Library load successful
354                FacilityManager.getMsgLogger().
355                    printmsg(MsgLogger.INFO,"Changing thread concurrency "+
356                             "level from "+
357                             NativeServices.getThreadConcurrency()+
358                             " to "+clevel+".");
359                NativeServices.setThreadConcurrency(clevel);
360            }
361            else {
362                // Could not load the library => warn
363                FacilityManager.getMsgLogger().
364                    printmsg(MsgLogger.WARNING,"Native library to set "+
365                             "thread concurrency level as specified by the "+
366                             CONCURRENCY_PROP_NAME+" property not found. "+
367                             "Thread concurrency unchanged.");
368            }
369        }
370
371        // Allocate internal variables
372        idle = new ThreadPoolThread[size];
373        nidle = 0;
374
375        // Create and start the threads
376        for (i=0; i<size; i++) {
377            t = new ThreadPoolThread(i,poolName+"-"+i);
378            t.start();
379        }
380    }
381
382    /**
383     * Returns the size of the pool. That is the number of threads in this
384     * pool (idle + busy).
385     *
386     * @return The pool's size.
387     *
388     * */
389    public int getSize() {
390        return idle.length;
391    }
392
393    /**
394     * Runs the run method of the specified target in an idle thread of this
395     * pool. When the target's run method completes, the thread waiting on the
396     * lock object is notified, if any. If there is currently no idle thread
397     * the method will block until a thread of the pool becomes idle or the
398     * calling thread is interrupted.
399     *
400     * <P>This method is the same as <tt>runTarget(t,l,true,false)</tt>.
401     *
402     * @param t The target. The 'run()' method of this object will be run in
403     * an idle thread of the pool.
404     *
405     * @param l The lock object. A thread waiting on the lock of the 'l'
406     * object will be notified, through the 'notify()' call, when the target's
407     * run method completes. If null no thread is notified.
408     *
409     * @return True if the target was submitted to some thread. False if no
410     * idle thread could be found and the target was not submitted for
411     * execution.
412     *
413     * */
414    public boolean runTarget(Runnable t, Object l) {
415        return runTarget(t,l,false,false);
416    }
417
418    /**
419     * Runs the run method of the specified target in an idle thread of this
420     * pool. When the target's run method completes, the thread waiting on the
421     * lock object is notified, if any. If there is currently no idle thread
422     * and the asynchronous mode is not used the method will block until a
423     * thread of the pool becomes idle or the calling thread is
424     * interrupted. If the asynchronous mode is used then the method will not
425     * block and will return false.
426     *
427     * <P>This method is the same as <tt>runTarget(t,l,async,false)</tt>.
428     *
429     * @param t The target. The 'run()' method of this object will be run in
430     * an idle thread of the pool.
431     *
432     * @param l The lock object. A thread waiting on the lock of the 'l'
433     * object will be notified, through the 'notify()' call, when the target's
434     * run method completes. If null no thread is notified.
435     *
436     * @param async If true the asynchronous mode will be used.
437     *
438     * @return True if the target was submitted to some thread. False if no
439     * idle thread could be found and the target was not submitted for
440     * execution.
441     *
442     * */
443    public boolean runTarget(Runnable t, Object l, boolean async) {
444        return runTarget(t,l,async,false);
445    }
446
447    /**
448     * Runs the run method of the specified target in an idle thread of this
449     * pool. When the target's run method completes, the thread waiting on the
450     * lock object is notified, if any. If there is currently no idle thread
451     * and the asynchronous mode is not used the method will block until a
452     * thread of the pool becomes idle or the calling thread is
453     * interrupted. If the asynchronous mode is used then the method will not
454     * block and will return false.
455     *
456     * @param t The target. The 'run()' method of this object will be run in
457     * an idle thread of the pool.
458     *
459     * @param l The lock object. A thread waiting on the lock of the 'l'
460     * object will be notified, through the 'notify()' call, when the target's
461     * run method completes. If null no thread is notified.
462     *
463     * @param async If true the asynchronous mode will be used.
464     *
465     * @param notifyAll If true, threads waiting on the lock of the 'l' object
466     * will be notified trough the 'notifyAll()' instead of the normal
467     * 'notify()' call. This is not normally needed.
468     *
469     * @return True if the target was submitted to some thread. False if no
470     * idle thread could be found and the target was not submitted for
471     * execution.
472     *
473     * */
474    public boolean runTarget(Runnable t, Object l,
475                             boolean async, boolean notifyAll) {
476        ThreadPoolThread runner;   // The thread to run the target
477
478        // Get a thread to run
479        runner = getIdle(async);
480        // If no runner return failure
481        if (runner == null) return false;
482        // Set the runner
483        runner.setTarget(t,l,notifyAll);
484        return true;
485    }
486
487    /**
488     * Checks that no error or runtime exception in any target have occurred
489     * so far. If an error or runtime exception has occurred in a target's run
490     * method they are thrown by this method.
491     *
492     * @exception Error If an error condition has been thrown by a target
493     * 'run()' method.
494     *
495     * @exception RuntimeException If a runtime exception has been thrown by a
496     * target 'run()' method.
497     * */
498    public void checkTargetErrors() {
499        // Check for Error
500        if (targetE != null) throw targetE;
501        // Check for RuntimeException
502        if (targetRE != null) throw targetRE;
503    }
504
505    /**
506     * Clears the current target error conditions, if any. Note that a thread
507     * in the pool might have set the error conditions since the last check
508     * and that those error conditions will be lost. Likewise, before
509     * returning from this method another thread might set the error
510     * conditions. There is no guarantee that no error conditions exist when
511     * returning from this method.
512     *
513     * <P>In order to ensure that no error conditions exist when returning
514     * from this method cooperation from the targets and the thread using this
515     * pool is necessary (i.e. currently no targets running or waiting to
516     * run).
517     * */
518    public void clearTargetErrors() {
519        // Clear the error and runtime exception conditions
520        targetE = null;
521        targetRE = null;
522    }
523
524    /**
525     * Puts the thread 't' in the idle list. The thread 't' should be in fact
526     * idle and ready to accept a new target when it joins the idle list.
527     *
528     * <P> An idle thread that is already in the list should never add itself
529     * to the list before it is removed. For efficiency reasons there is no
530     * check to see if the thread is already in the list of idle threads.
531     *
532     * <P> If the idle list was empty 'notify()' will be called on the 'idle'
533     * array, to wake up a thread that might be waiting (within the
534     * 'getIdle()' method) on an idle thread to become available.
535     *
536     * @param t The thread to put in the idle list.
537     * */
538    private void putInIdleList(ThreadPoolThread t) {
539        // NOTE: if already in idle => catastrophe! (should be OK since //
540        // this is private method)
541        // Lock the idle array to avoid races with 'getIdle()'
542        synchronized (idle) {
543            idle[nidle] = t;
544            nidle++;
545            // If idle array was empty wakeup any waiting threads.
546            if (nidle == 1) idle.notify();
547        }
548    }
549
550    /**
551     * Returns and idle thread and removes it from the list of idle
552     * threads. In asynchronous mode it will immediately return an idle
553     * thread, or null if none is available. In non-asynchronous mode it will
554     * block until a thread of the pool becomes idle or the calling thread is
555     * interrupted.
556     *
557     * <P>If in non-asynchronous mode and there are currently no idle threads
558     * available the calling thread will wait on the 'idle' array lock, until
559     * notified by 'putInIdleList()' that an idle thread might have become
560     * available.
561     *
562     * @param async If true asynchronous mode is used.
563     *
564     * @return An idle thread of the pool, that has been removed from the idle
565     * list, or null if none is available.
566     * */
567    private ThreadPoolThread getIdle(boolean async) {
568        // Lock the idle array to avoid races with 'putInIdleList()'
569        synchronized (idle) {
570            if (async) {
571                // In asynchronous mode just return null if no idle thread
572                if (nidle == 0) return null;
573            }
574            else {
575                // In synchronous mode wait until a thread becomes idle
576                while (nidle == 0) {
577                    try {
578                        idle.wait();
579                    } catch (InterruptedException e) {
580                        // If we were interrupted just return null
581                        return null;
582                    }
583                }
584            }
585            // Decrease the idle count and return one of the idle threads
586            nidle--;
587            return idle[nidle];
588        }
589    }
590}