Scheduler
package com.schedular;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* A scheduler that internally uses a thread pool to permit concurrent execution
* of scheduled tasks. This scheduler is preferable to a {@link java.util.Timer}
* when the execution of one task may block long enough to delay execution of
* other tasks.
*/
public
class Scheduler
implements ShutdownCallback {
/**
* Specify no initial delay when scheduling a task.
*/
public
static
final
long NO_INITIAL_DELAY = 0;
private
final ScheduledThreadPoolExecutor _executor;
/**
* Creates a scheduler.
*
* @param poolSize The thread pool size.
* @throws IllegalArgumentException if the pool size is less than or equal to zero.
*/
public Scheduler(
int poolSize) {
if (poolSize <= 0) {
throw
new IllegalArgumentException(
“illegal pool size: “+poolSize);
}
LoggingThreadGroup threadGroup =
new LoggingThreadGroup(
“SchedulerGroup”);
// set pool threads as daemons in case an orderly shutdown does not occur
ThreadGroupFactory tFactory=
new ThreadGroupFactory(threadGroup,
“Scheduler-“);
tFactory.createDaemonThreads(
true);
_executor =
new ScheduledThreadPoolExecutor(poolSize,
tFactory,
new ThreadPoolExecutor.DiscardPolicy());
// delayed tasks should not execute after shutdown
_executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(
false);
// prestart one thread to make sure the first execution is timely
_executor.prestartCoreThread();
}
/**
* Creates and executes a periodic action that becomes enabled first after
* the given initial delay, and subsequently with the given period; that
* is executions will commence after initialDelay then initialDelay+period,
* then initialDelay + 2 * period, and so on. If any execution of the task
* encounters an exception, subsequent executions are suppressed. Otherwise,
* the task will only terminate via cancellation or termination of the
* executor. If any execution of this task takes longer than its period,
* then subsequent executions may start late, but will not concurrently
* execute.
*
* @param task The task to execute.
* @param initialDelay The initial delay (in msec) before the first execution.
* @param period The period (in msec) between successive executions.
* @return A ScheduledFuture that may be used to cancel task execution.
*/
public ScheduledFuture scheduleAtFixedRate(Runnable task,
long initialDelay,
long period) {
return _executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.MILLISECONDS);
}
/**
* Creates and executes a periodic action that becomes enabled first after
* the given initial delay, and subsequently with the given delay between
* the termination of one execution and the commencement of the next.
* If any execution of the task encounters an exception, subsequent
* executions are suppressed. Otherwise, the task will only terminate via
* cancellation or termination of the executor.
*
* @param task The task to execute.
* @param initialDelay The initial delay (in msec) before the first execution.
* @param delay The delay (in msec) between termination of one execution
* and commencement of the next.
* @return A ScheduledFuture that may be used to cancel task execution.
*/
public ScheduledFuture scheduleWithFixedDelay(Runnable task,
long initialDelay,
long delay) {
return _executor.scheduleWithFixedDelay(task, initialDelay, delay, TimeUnit.MILLISECONDS);
}
/**
* Tries to remove from the work queue all {@link Future}
* tasks that have been cancelled. This method can be useful as a
* storage reclamation operation, that has no other impact on
* functionality. Cancelled tasks are never executed, but may
* accumulate in work queues until worker threads can actively
* remove them. Invoking this method instead tries to remove them now.
* However, this method may fail to remove tasks in
* the presence of interference by other threads.
*/
public
void purgeTasks() {
_executor.purge();
}
/**
* Shut down the scheduler in an orderly manner, allowing any currently
* executing tasks to complete.
*/
public
void shutdown() {
_executor.shutdown();
// block at most for 5 seconds for any currently executing task to terminate
try {
_executor.awaitTermination(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
// do nothing
}
}
}
ThreadGroupFactory
import java.util.concurrent.ThreadFactory;
public
class ThreadGroupFactory
implements ThreadFactory
{
private ThreadGroup _group;
private String _namePrefix;
private
int _numThreads;
private
boolean _createDaemonThreads;
private
final Object _syncLock =
new Object();
/**
* Creates an instance where the threads created by this factory are
* assigned to the specified ThreadGroup.
*
* @param group The ThreadGroup.
* @param namePrefix The name prefix for each thread created by this factory.
*/
public ThreadGroupFactory(ThreadGroup group, String namePrefix) {
_group = group;
_namePrefix = namePrefix;
_numThreads = 0;
}
/**
* Creates an instance where the threads created by this factory are
* assigned to the current thread’s ThreadGroup.
*
* @param namePrefix The name prefix for each thread created by this factory.
*/
public ThreadGroupFactory(String namePrefix) {
this(Thread.currentThread().getThreadGroup(), namePrefix);
}
/**
* Set the threads created by this factory to be daemon threads.
*
* @param daemonThreads <code>true</code> to set threads created by this
* factory to be daemon threads.
*/
public
void createDaemonThreads(
boolean daemonThreads) {
synchronized (_syncLock) {
_createDaemonThreads = daemonThreads;
}
}
public Thread newThread(Runnable r) {
String name;
boolean daemon;
synchronized (_syncLock) {
name = _namePrefix + ++_numThreads;
daemon = _createDaemonThreads;
}
Thread thread =
new Thread(_group, r, name);
thread.setDaemon(daemon);
return thread;
}
}
LoggingThreadGroup
/**
* Create your threads using this ThreadGroup to have uncaught exceptions
* logged via Log4j
*/
public
class LoggingThreadGroup
extends ThreadGroup
{
private
final Log _log = LogFactory.getLog(LoggingThreadGroup.
class);
public LoggingThreadGroup(String groupName) {
super(groupName);
}
public
void uncaughtException(Thread t, Throwable exc) {
_log.warn(
“ThreadGroup[“ +
this.getName() +
“]: Unhandled exception”, exc);
super.uncaughtException(t, exc);
}
} Junit
/*
* NOTE: This copyright does *not* cover user programs that use HQ
* program services by normal system calls through the application
* program interfaces provided as part of the Hyperic Plug-in Development
* Kit or the Hyperic Client Development Kit – this is merely considered
* normal use of the program, and does *not* fall under the heading of
* “derived work”.
*
* Copyright (C) [2004-2008], Hyperic, Inc.
* This file is part of HQ.
*
* HQ is free software; you can redistribute it and/or modify
* it under the terms version 2 of the GNU General Public License as
* published by the Free Software Foundation. This program is distributed
* in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
* even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*/
package org.hyperic.hq.application;
import junit.framework.TestCase;
import java.util.concurrent.ScheduledFuture;
/**
* Tests the Scheduler class.
*/
public
class Scheduler_test
extends TestCase {
public Scheduler_test(String name) {
super(name);
}
public
void testIllegalPoolSize() {
try {
new Scheduler(-1);
fail(
“Expected IllegalArgumentException.”);
}
catch (IllegalArgumentException e) {
// expected outcome
}
catch (Exception e) {
fail(
“Expected IllegalArgumentException instead of:”+e);
}
try {
new Scheduler(0);
fail(
“Expected IllegalArgumentException.”);
}
catch (IllegalArgumentException e) {
// expected outcome
}
catch (Exception e) {
fail(
“Expected IllegalArgumentException instead of:”+e);
}
}
public
void testExecuteAtFixedRate()
throws Exception {
Scheduler scheduler =
new Scheduler(1);
RunnableCounter counter =
new RunnableCounter(50,
false);
ScheduledFuture future =
scheduler.scheduleAtFixedRate(counter, Scheduler.NO_INITIAL_DELAY, 100);
Thread.sleep(210);
assertFalse(future.isDone());
assertFalse(future.isCancelled());
assertEquals(3, counter.numRuns());
scheduler.shutdown();
}
public
void testExecuteWithFixedDelay()
throws Exception {
Scheduler scheduler =
new Scheduler(1);
RunnableCounter counter =
new RunnableCounter(100,
false);
// the execution period is about 200 msec (delay+runtime)
ScheduledFuture future =
scheduler.scheduleWithFixedDelay(counter, Scheduler.NO_INITIAL_DELAY, 100);
Thread.sleep(210);
assertFalse(future.isDone());
assertFalse(future.isCancelled());
assertEquals(2, counter.numRuns());
scheduler.shutdown();
}
public
void testCancellingScheduledTask()
throws Exception {
Scheduler scheduler =
new Scheduler(1);
RunnableCounter counter =
new RunnableCounter(50,
false);
ScheduledFuture future =
scheduler.scheduleAtFixedRate(counter, Scheduler.NO_INITIAL_DELAY, 100);
Thread.sleep(210);
assertFalse(future.isDone());
assertFalse(future.isCancelled());
future.cancel(
true);
assertTrue(future.isDone());
assertTrue(future.isCancelled());
Thread.sleep(100);
assertEquals(3, counter.numRuns());
scheduler.shutdown();
}
public
void testExecuteAtFixedRateWithInitialDelay()
throws Exception {
Scheduler scheduler =
new Scheduler(1);
RunnableCounter counter =
new RunnableCounter(50,
false);
ScheduledFuture future =
scheduler.scheduleAtFixedRate(counter, 100, 100);
Thread.sleep(250);
assertFalse(future.isDone());
assertFalse(future.isCancelled());
assertEquals(2, counter.numRuns());
scheduler.shutdown();
}
public
void testExecuteWithFixedDelayWithInitialDelay()
throws Exception {
Scheduler scheduler =
new Scheduler(1);
RunnableCounter counter =
new RunnableCounter(50,
false);
ScheduledFuture future =
scheduler.scheduleWithFixedDelay(counter, 100, 50);
Thread.sleep(300);
assertFalse(future.isDone());
assertFalse(future.isCancelled());
assertEquals(2, counter.numRuns());
scheduler.shutdown();
}
/**
* Test executing one task at a fixed rate and one task with a fixed delay.
*
* @throws Exception
*/
public
void testExecuteConcurrentTasks()
throws Exception {
Scheduler scheduler =
new Scheduler(2);
RunnableCounter counter1 =
new RunnableCounter(10,
false);
RunnableCounter counter2 =
new RunnableCounter(10,
false);
ScheduledFuture future1 =
scheduler.scheduleAtFixedRate(counter1, Scheduler.NO_INITIAL_DELAY, 50);
// the execution period is about 60 msec (delay+runtime)
ScheduledFuture future2 =
scheduler.scheduleWithFixedDelay(counter2, Scheduler.NO_INITIAL_DELAY, 50);
Thread.sleep(210);
assertFalse(future1.isDone());
assertFalse(future1.isCancelled());
assertFalse(future2.isDone());
assertFalse(future2.isCancelled());
assertEquals(5, counter1.numRuns());
assertEquals(4, counter2.numRuns());
scheduler.shutdown();
}
/**
* Test that a scheduled task will stop executing if it throws an
* unchecked exception.
*/
public
void testUncheckedExceptionInTask()
throws Exception {
Scheduler scheduler =
new Scheduler(1);
RunnableCounter counter =
new RunnableCounter(50,
true);
ScheduledFuture future =
scheduler.scheduleAtFixedRate(counter, Scheduler.NO_INITIAL_DELAY, 100);
Thread.sleep(210);
assertTrue(future.isDone());
assertFalse(future.isCancelled());
// only should have run once since a runtime exception was thrown
assertEquals(1, counter.numRuns());
scheduler.shutdown();
}
private
class RunnableCounter
implements Runnable {
private
final
long _sleepTime;
private
int _numRuns;
private
final
boolean _throwException;
private
final Object _lock =
new Object();
public RunnableCounter(
long sleepTime,
boolean throwUncheckedException) {
_sleepTime = sleepTime;
_throwException = throwUncheckedException;
}
public
void run() {
synchronized (
this) {
_numRuns++;
}
try {
Thread.sleep(_sleepTime);
}
catch (InterruptedException e) {
}
if (_throwException) {
throw
new RuntimeException(
“unchecked exception”);
}
}
public
synchronized
int numRuns() {
return _numRuns;
}
}
}
本文转自danni505 51CTO博客,原文链接:http://blog.51cto.com/danni505/204896,如需转载请自行联系原作者