001/* 002 * $RCSfile: ThreadPool.java,v $ 003 * $Revision: 1.2 $ 004 * $Date: 2005/09/26 22:08:13 $ 005 * $State: Exp $ 006 * 007 * Class: ThreadPool 008 * 009 * Description: A pool of threads 010 * 011 * 012 * 013 * COPYRIGHT: 014 * 015 * This software module was originally developed by Raphaël Grosbois and 016 * Diego Santa Cruz (Swiss Federal Institute of Technology-EPFL); Joel 017 * Askelöf (Ericsson Radio Systems AB); and Bertrand Berthelot, David 018 * Bouchard, Félix Henry, Gerard Mozelle and Patrice Onno (Canon Research 019 * Centre France S.A) in the course of development of the JPEG2000 020 * standard as specified by ISO/IEC 15444 (JPEG 2000 Standard). This 021 * software module is an implementation of a part of the JPEG 2000 022 * Standard. Swiss Federal Institute of Technology-EPFL, Ericsson Radio 023 * Systems AB and Canon Research Centre France S.A (collectively JJ2000 024 * Partners) agree not to assert against ISO/IEC and users of the JPEG 025 * 2000 Standard (Users) any of their rights under the copyright, not 026 * including other intellectual property rights, for this software module 027 * with respect to the usage by ISO/IEC and Users of this software module 028 * or modifications thereof for use in hardware or software products 029 * claiming conformance to the JPEG 2000 Standard. Those intending to use 030 * this software module in hardware or software products are advised that 031 * their use may infringe existing patents. The original developers of 032 * this software module, JJ2000 Partners and ISO/IEC assume no liability 033 * for use of this software module or modifications thereof. No license 034 * or right to this software module is granted for non JPEG 2000 Standard 035 * conforming products. JJ2000 Partners have full right to use this 036 * software module for his/her own purpose, assign or donate this 037 * software module to any third party and to inhibit third parties from 038 * using this software module for non JPEG 2000 Standard conforming 039 * products. This copyright notice must be included in all copies or 040 * derivative works of this software module. 041 * 042 * Copyright (c) 1999/2000 JJ2000 Partners. 043 * 044 * 045 * 046 */ 047 048 049package jj2000.j2k.util; 050 051/** 052 * This class implements a thread pool. The thread pool contains a set of 053 * threads which can be given work to do. 054 * 055 * <P>If the Java Virtual Machine (JVM) uses native threads, then the 056 * different threads will be able to execute in different processors in 057 * parallel on multiprocessors machines. However, under some JVMs and 058 * operating systems using native threads is not sufficient to allow the JVM 059 * access to multiple processors. This is the case when native threads are 060 * implemented using POSIX threads on lightweight processes 061 * (i.e. PTHREAD_SCOPE_PROCESS sopce scheduling), which is the case on most 062 * UNIX operating systems. In order to do provide access to multiple 063 * processors it is necessary to set the concurrency level to the number of 064 * processors or slightly higher. This can be achieved by setting the Java 065 * system property with the name defined by CONCURRENCY_PROP_NAME to some 066 * non-negative number. This will make use of the 'NativeServices' class and 067 * supporting native libraries. See 'NativeServices' for details. See 068 * 'CONCURRENCY_PROP_NAME' for the name of the property. 069 * 070 * <P>Initially the thread pool contains a user specified number of idle 071 * threads. Idle threads can be given a target which is run. While running the 072 * target the thread temporarily leaves the idle list. When the target 073 * finishes, it joins the idle list again, waiting for a new target. When a 074 * target is finished a thread can be notified on a particular object that is 075 * given as a lock. 076 * 077 * <P>Jobs can be submitted using Runnable interfaces, using the 'runTarget()' 078 * methods. When the job is submitted, an idle thread will be obtained, the 079 * 'run()' method of the 'Runnable' interface will be executed and when it 080 * completes the thread will be returned to the idle list. In general the 081 * 'run()' method should complete in a rather short time, so that the threds 082 * of the pool are not starved. 083 * 084 * <P>If using the non-asynchronous calls to 'runTarget()', it is important 085 * that any target's 'run()' method, or any method called from it, does not 086 * use non-asynchronous calls to 'runTarget()' on the same thread pool where 087 * it was started. Otherwise this could create a dead-lock when there are not 088 * enough idle threads. 089 * 090 * <P>The pool also has a global error and runtime exception condition (one 091 * for 'Error' and one for 'RuntimeException'). If a target's 'run()' method 092 * throws an 'Error' or 'RuntimeException' the corresponding exception 093 * condition is set and the exception object saved. In any subsequent call to 094 * 'checkTargetErrors()' the saved exception object is thrown. Likewise, if a 095 * target's 'run()' method throws any other subclass of 'Throwable' a new 096 * 'RuntimeException' is created and saved. It will be thrown on a subsequent 097 * call to 'checkTargetErrors()'. If more than one exception occurs between 098 * calls to 'checkTargetErrors()' only the last one is saved. Any 'Error' 099 * condition has precedence on all 'RuntimeException' conditions. The threads 100 * in the pool are unaffected by any exceptions thrown by targets. 101 * 102 * <P>The only exception to the above is the 'ThreadDeath' exception. If a 103 * target's 'run()' method throws the 'ThreadDeath' exception a warning 104 * message is printed and the exception is propagated, which will terminate 105 * the thread in which it occurs. This could lead to instabilities of the 106 * pool. The 'ThreadDeath' exception should never be thrown by the program. It 107 * is thrown by the Java(TM) Virtual Machine when Thread.stop() is 108 * called. This method is deprecated and should never be called. 109 * 110 * <P>All the threads in the pool are "daemon" threads and will automatically 111 * terminate when no daemon threads are running. 112 * 113 * @see NativeServices 114 * 115 * @see #CONCURRENCY_PROP_NAME 116 * 117 * @see Runnable 118 * 119 * @see Thread 120 * 121 * @see Error 122 * 123 * @see RuntimeException 124 * 125 * */ 126public class ThreadPool { 127 128 /** The name of the property that sets the concurrency level: 129 jj2000.j2k.util.ThreadPool.concurrency */ 130 public final static String CONCURRENCY_PROP_NAME = 131 "jj2000.j2k.util.ThreadPool.concurrency"; 132 133 /** The array of idle threads and the lock for the manipulation of the 134 * idle thread list. */ 135 private ThreadPoolThread idle[]; 136 137 /** The number of idle threads */ 138 private int nidle; 139 140 /** The name of the pool */ 141 private String poolName; 142 143 /** The priority for the pool */ 144 private int poolPriority; 145 146 /** The last error thrown by a target. Null if none */ 147 // NOTE: needs to be volatile, so that only one copy exits in memory 148 private volatile Error targetE; 149 150 /** The last runtime exception thrown by a target. Null if none */ 151 // NOTE: needs to be volatile, so that only one copy exits in memory 152 private volatile RuntimeException targetRE; 153 154 /** 155 * The threads that are managed by the pool. 156 * */ 157 class ThreadPoolThread extends Thread { 158 private Runnable target; 159 private Object lock; 160 private boolean doNotifyAll; 161 162 /** 163 * Creates a ThreadPoolThread object, setting its name according to 164 * the given 'idx', daemon type and the priority to the one of the 165 * pool. 166 * 167 * @param idx The index of this thread in the pool 168 * 169 * @param name The name of the thread 170 * */ 171 public ThreadPoolThread(int idx, String name) { 172 super(name); 173 setDaemon(true); 174 setPriority(poolPriority); 175 } 176 177 /** 178 * The method that is run by the thread. This method first joins the 179 * idle state in the pool and then enters an infinite loop. In this 180 * loop it waits until a target to run exists and runs it. Once the 181 * target's run() method is done it re-joins the idle state and 182 * notifies the waiting lock object, if one exists. 183 * 184 * <P>An interrupt on this thread has no effect other than forcing a 185 * check on the target. Normally the target is checked every time the 186 * thread is woken up by notify, no interrupts should be done. 187 * 188 * <P>Any exception thrown by the target's 'run()' method is catched 189 * and this thread is not affected, except for 'ThreadDeath'. If a 190 * 'ThreadDeath' exception is catched a warning message is printed by 191 * the 'FacilityManager' and the exception is propagated up. For 192 * exceptions which are subclasses of 'Error' or 'RuntimeException' 193 * the corresponding error condition is set and this thread is not 194 * affected. For any other exceptions a new 'RuntimeException' is 195 * created and the error condition is set, this thread is not affected. 196 * */ 197 public void run() { 198 // Join the idle threads list 199 putInIdleList(this); 200 // Permanently lock the object while running so that target can 201 // not be changed until we are waiting again. While waiting for a 202 // target the lock is released. 203 synchronized (this) { 204 while (true) { 205 // Wait until we get a target 206 while (target == null) { 207 try { 208 this.wait(); 209 } catch (InterruptedException e) { 210 } 211 } 212 // Run the target and catch all possible errors 213 try { 214 target.run(); 215 } catch (ThreadDeath td) { 216 // We have been instructed to abruptly terminate 217 // the thread, which should never be done. This can 218 // cause another thread, or the system, to lock. 219 FacilityManager.getMsgLogger(). 220 printmsg(MsgLogger.WARNING, 221 "Thread.stop() called on a ThreadPool "+ 222 "thread or ThreadDeath thrown. This is "+ 223 "deprecated. Lock-up might occur."); 224 throw td; 225 } catch (Error e) { 226 targetE = e; 227 } catch (RuntimeException re) { 228 targetRE = re; 229 } catch (Throwable ue) { 230 // A totally unexpected error has occurred 231 // (Thread.stop(Throwable) has been used, which should 232 // never be. 233 targetRE = new RuntimeException("Unchecked exception "+ 234 "thrown by target's "+ 235 "run() method in pool "+ 236 poolName+"."); 237 } 238 // Join idle threads 239 putInIdleList(this); 240 // Release the target and notify lock (i.e. wakeup) 241 target = null; 242 if (lock != null) { 243 synchronized (lock) { 244 if (doNotifyAll) { 245 lock.notifyAll(); 246 } 247 else { 248 lock.notify(); 249 } 250 } 251 } 252 } 253 } 254 } 255 256 /** 257 * Assigns a target to this thread, with an optional notify lock and a 258 * notify mode. The another target is currently running the method 259 * will block until it terminates. After setting the new target the 260 * runner thread will be wakenup and execytion will start. 261 * 262 * @param target The runnable object containing the 'run()' method to 263 * run. 264 * 265 * @param lock An object on which notify will be called once the 266 * target's run method has finished. A thread to be notified should be 267 * waiting on that object. If null no thread is notified. 268 * 269 * @param notifyAll If true 'notifyAll()', instead of 'notify()', will 270 * be called on tghe lock. 271 * */ 272 synchronized void setTarget(Runnable target, Object lock, 273 boolean notifyAll) { 274 // Set the target 275 this.target = target; 276 this.lock = lock; 277 doNotifyAll = notifyAll; 278 // Wakeup the thread 279 this.notify(); 280 } 281 } 282 283 /** 284 * Creates a new thread pool of the given size, thread priority and pool 285 * name. 286 * 287 * <P>If the Java system property of the name defined by 288 * 'CONCURRENCY_PROP_NAME' is set, then an attempt will be made to load 289 * the library that supports concurrency setting (see 290 * 'NativeServices'). If that succeds the concurrency level will be set to 291 * the specified value. Otherwise a warning is printed. 292 * 293 * @param size The size of the pool (number of threads to create in the 294 * pool). 295 * 296 * @param priority The priority to give to the threads in the pool. If 297 * less than 'Thread.MIN_PRIORITY' it will be the same as the priority of 298 * the calling thread. 299 * 300 * @param name The name of the pool. If null a default generic name is 301 * chosen. 302 * 303 * @see NativeServices 304 * 305 * @see #CONCURRENCY_PROP_NAME 306 * */ 307 public ThreadPool(int size, int priority, String name) { 308 int i; 309 ThreadPoolThread t; 310 String prop; 311 int clevel; 312 313 // Initialize variables checking for special cases 314 if (size <= 0) { 315 throw new IllegalArgumentException("Pool must be of positive size"); 316 } 317 if (priority < Thread.MIN_PRIORITY) { 318 poolPriority = Thread.currentThread().getPriority(); 319 } 320 else { 321 poolPriority = (priority < Thread.MAX_PRIORITY) ? priority : 322 Thread.MAX_PRIORITY; 323 } 324 if (name == null) { 325 poolName = "Anonymous ThreadPool"; 326 } 327 else { 328 poolName = name; 329 } 330 331 // If requested to set concurrency try to do it 332 prop = null; 333 try { 334 prop = System.getProperty(CONCURRENCY_PROP_NAME); 335 } catch(SecurityException se) { 336 // Ignore it. 337 } 338 if (prop == null) { 339 // No concurrency to set, do nothing 340 } 341 else { 342 // Get concurrency level 343 try { 344 clevel = Integer.parseInt(prop); 345 if (clevel < 0) throw new NumberFormatException(); 346 } catch (NumberFormatException e) { 347 throw new IllegalArgumentException("Invalid concurrency level "+ 348 "in property "+ 349 CONCURRENCY_PROP_NAME); 350 } 351 // Attempt to load library 352 if (NativeServices.loadLibrary()) { 353 // Library load successful 354 FacilityManager.getMsgLogger(). 355 printmsg(MsgLogger.INFO,"Changing thread concurrency "+ 356 "level from "+ 357 NativeServices.getThreadConcurrency()+ 358 " to "+clevel+"."); 359 NativeServices.setThreadConcurrency(clevel); 360 } 361 else { 362 // Could not load the library => warn 363 FacilityManager.getMsgLogger(). 364 printmsg(MsgLogger.WARNING,"Native library to set "+ 365 "thread concurrency level as specified by the "+ 366 CONCURRENCY_PROP_NAME+" property not found. "+ 367 "Thread concurrency unchanged."); 368 } 369 } 370 371 // Allocate internal variables 372 idle = new ThreadPoolThread[size]; 373 nidle = 0; 374 375 // Create and start the threads 376 for (i=0; i<size; i++) { 377 t = new ThreadPoolThread(i,poolName+"-"+i); 378 t.start(); 379 } 380 } 381 382 /** 383 * Returns the size of the pool. That is the number of threads in this 384 * pool (idle + busy). 385 * 386 * @return The pool's size. 387 * 388 * */ 389 public int getSize() { 390 return idle.length; 391 } 392 393 /** 394 * Runs the run method of the specified target in an idle thread of this 395 * pool. When the target's run method completes, the thread waiting on the 396 * lock object is notified, if any. If there is currently no idle thread 397 * the method will block until a thread of the pool becomes idle or the 398 * calling thread is interrupted. 399 * 400 * <P>This method is the same as <tt>runTarget(t,l,true,false)</tt>. 401 * 402 * @param t The target. The 'run()' method of this object will be run in 403 * an idle thread of the pool. 404 * 405 * @param l The lock object. A thread waiting on the lock of the 'l' 406 * object will be notified, through the 'notify()' call, when the target's 407 * run method completes. If null no thread is notified. 408 * 409 * @return True if the target was submitted to some thread. False if no 410 * idle thread could be found and the target was not submitted for 411 * execution. 412 * 413 * */ 414 public boolean runTarget(Runnable t, Object l) { 415 return runTarget(t,l,false,false); 416 } 417 418 /** 419 * Runs the run method of the specified target in an idle thread of this 420 * pool. When the target's run method completes, the thread waiting on the 421 * lock object is notified, if any. If there is currently no idle thread 422 * and the asynchronous mode is not used the method will block until a 423 * thread of the pool becomes idle or the calling thread is 424 * interrupted. If the asynchronous mode is used then the method will not 425 * block and will return false. 426 * 427 * <P>This method is the same as <tt>runTarget(t,l,async,false)</tt>. 428 * 429 * @param t The target. The 'run()' method of this object will be run in 430 * an idle thread of the pool. 431 * 432 * @param l The lock object. A thread waiting on the lock of the 'l' 433 * object will be notified, through the 'notify()' call, when the target's 434 * run method completes. If null no thread is notified. 435 * 436 * @param async If true the asynchronous mode will be used. 437 * 438 * @return True if the target was submitted to some thread. False if no 439 * idle thread could be found and the target was not submitted for 440 * execution. 441 * 442 * */ 443 public boolean runTarget(Runnable t, Object l, boolean async) { 444 return runTarget(t,l,async,false); 445 } 446 447 /** 448 * Runs the run method of the specified target in an idle thread of this 449 * pool. When the target's run method completes, the thread waiting on the 450 * lock object is notified, if any. If there is currently no idle thread 451 * and the asynchronous mode is not used the method will block until a 452 * thread of the pool becomes idle or the calling thread is 453 * interrupted. If the asynchronous mode is used then the method will not 454 * block and will return false. 455 * 456 * @param t The target. The 'run()' method of this object will be run in 457 * an idle thread of the pool. 458 * 459 * @param l The lock object. A thread waiting on the lock of the 'l' 460 * object will be notified, through the 'notify()' call, when the target's 461 * run method completes. If null no thread is notified. 462 * 463 * @param async If true the asynchronous mode will be used. 464 * 465 * @param notifyAll If true, threads waiting on the lock of the 'l' object 466 * will be notified trough the 'notifyAll()' instead of the normal 467 * 'notify()' call. This is not normally needed. 468 * 469 * @return True if the target was submitted to some thread. False if no 470 * idle thread could be found and the target was not submitted for 471 * execution. 472 * 473 * */ 474 public boolean runTarget(Runnable t, Object l, 475 boolean async, boolean notifyAll) { 476 ThreadPoolThread runner; // The thread to run the target 477 478 // Get a thread to run 479 runner = getIdle(async); 480 // If no runner return failure 481 if (runner == null) return false; 482 // Set the runner 483 runner.setTarget(t,l,notifyAll); 484 return true; 485 } 486 487 /** 488 * Checks that no error or runtime exception in any target have occurred 489 * so far. If an error or runtime exception has occurred in a target's run 490 * method they are thrown by this method. 491 * 492 * @exception Error If an error condition has been thrown by a target 493 * 'run()' method. 494 * 495 * @exception RuntimeException If a runtime exception has been thrown by a 496 * target 'run()' method. 497 * */ 498 public void checkTargetErrors() { 499 // Check for Error 500 if (targetE != null) throw targetE; 501 // Check for RuntimeException 502 if (targetRE != null) throw targetRE; 503 } 504 505 /** 506 * Clears the current target error conditions, if any. Note that a thread 507 * in the pool might have set the error conditions since the last check 508 * and that those error conditions will be lost. Likewise, before 509 * returning from this method another thread might set the error 510 * conditions. There is no guarantee that no error conditions exist when 511 * returning from this method. 512 * 513 * <P>In order to ensure that no error conditions exist when returning 514 * from this method cooperation from the targets and the thread using this 515 * pool is necessary (i.e. currently no targets running or waiting to 516 * run). 517 * */ 518 public void clearTargetErrors() { 519 // Clear the error and runtime exception conditions 520 targetE = null; 521 targetRE = null; 522 } 523 524 /** 525 * Puts the thread 't' in the idle list. The thread 't' should be in fact 526 * idle and ready to accept a new target when it joins the idle list. 527 * 528 * <P> An idle thread that is already in the list should never add itself 529 * to the list before it is removed. For efficiency reasons there is no 530 * check to see if the thread is already in the list of idle threads. 531 * 532 * <P> If the idle list was empty 'notify()' will be called on the 'idle' 533 * array, to wake up a thread that might be waiting (within the 534 * 'getIdle()' method) on an idle thread to become available. 535 * 536 * @param t The thread to put in the idle list. 537 * */ 538 private void putInIdleList(ThreadPoolThread t) { 539 // NOTE: if already in idle => catastrophe! (should be OK since // 540 // this is private method) 541 // Lock the idle array to avoid races with 'getIdle()' 542 synchronized (idle) { 543 idle[nidle] = t; 544 nidle++; 545 // If idle array was empty wakeup any waiting threads. 546 if (nidle == 1) idle.notify(); 547 } 548 } 549 550 /** 551 * Returns and idle thread and removes it from the list of idle 552 * threads. In asynchronous mode it will immediately return an idle 553 * thread, or null if none is available. In non-asynchronous mode it will 554 * block until a thread of the pool becomes idle or the calling thread is 555 * interrupted. 556 * 557 * <P>If in non-asynchronous mode and there are currently no idle threads 558 * available the calling thread will wait on the 'idle' array lock, until 559 * notified by 'putInIdleList()' that an idle thread might have become 560 * available. 561 * 562 * @param async If true asynchronous mode is used. 563 * 564 * @return An idle thread of the pool, that has been removed from the idle 565 * list, or null if none is available. 566 * */ 567 private ThreadPoolThread getIdle(boolean async) { 568 // Lock the idle array to avoid races with 'putInIdleList()' 569 synchronized (idle) { 570 if (async) { 571 // In asynchronous mode just return null if no idle thread 572 if (nidle == 0) return null; 573 } 574 else { 575 // In synchronous mode wait until a thread becomes idle 576 while (nidle == 0) { 577 try { 578 idle.wait(); 579 } catch (InterruptedException e) { 580 // If we were interrupted just return null 581 return null; 582 } 583 } 584 } 585 // Decrease the idle count and return one of the idle threads 586 nidle--; 587 return idle[nidle]; 588 } 589 } 590}