大多数用到定时执行的功能都是用任务调度来做的,单身当碰到类似订餐业务/购物等这种业务就不好处理了,比如购物的订单功能,在你的订单管理中有N个订单,当订单超过十分钟未支付的时候自动释放购物车中的商品,订单失效。这种高频率的延迟任务再用任务调度(定时)实现就得不偿失了。推荐用Java延迟队列来实现,DelayQueue是java.util.concurrent中提供的一个类DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中中取走。这种队列是有序的,即对头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。
1、java延迟队列实现方式
import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; //任务线程 实现delayed接口 public class DelayItem<T extends Runnable> implements Delayed { //到期时间 private final long time; //任务对象 private final T task; //原子类 private static final AtomicLong atomic = new AtomicLong(0); private final long n; public DelayItem(long timeout, T t) { this.time = System.nanoTime() + timeout; this.task = t; this.n = atomic.getAndIncrement(); } //返回与此对象相关的剩余延迟时间,以给定的时间单位表示 public long getDelay(TimeUnit unit) { return unit.convert(this.time - System.nanoTime(),TimeUnit.NANOSECONDS); } public int compareTo(Delayed other) { if(other == this) { return 0; } if(other instanceof DelayItem) { DelayItem<?> x = (DelayItem<?>)other; long diff = tiem - x.time; if(diff < 0) { return -1; }else if(diff > 0) { return 1; }else if( n < x.n){ return -1; }else { return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); } public T getTask(){ return this.task; } @Override public int hashCode(){ return task.hashCode(); } @Override public boolean equals(Object object){ if(object instanceof DelayItem){ return object.hashCode() == hashCode() ? true : false; } return false; } } //管理延迟任务的类 import java.util.Map; import java.util.concurrent.DelayQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; //延迟队列存放有效期对象 public class ItemQueueThread { private static final Logger logger = Logger.getLogger(this.class); private ItemQueueThread(){} //延迟加载(线程安全) private static class LazyHolder{ private static ItemQueueThread itemQueueThread = new ItemQueueThread(); } public static ItemQueueThread getInstance(){ return LazyHolder.itemQueueThread; } //缓存线程池 ExecutorService executor = Executors.newCacheThreadPool(); //线程 private Thread daemonThead; //初始化线程 public void init() { daemonThread = new Thread(() -> { try{ execute(); }cathc(InterruptedException e){ e.printStackTrace(); logger.info(e.getMessage()); } }); System.out.println("init......start"); daemonThread.start(); } private void execute() throws InterrupedException { while(true) { Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces(); System.out.println("线程数...." + map.size()); System.out.println(System.currentTimeMills()); System.out.println(item.size()); System.out.println("线程状态----" + Thread.currentThread().getState()); try{ //从延迟队列中取值,如果没有对象过期责队列一直等待 DelayItem<?> t1 = item.take(); if(t1 != null){ Runnable task = t1.getTask(); if(task == null){ continue; } executor.execute(task); } }catch(Exception e) { e.printStackTrace(); logger.info(e.getMessage()); } } } //创建空的延迟队列 private DelayQueue<DelayItem<?>> item = new DelayQueue<>(); //往队列中添加任务 public void put(long time, Runnable task, TimeUnit timeUnit){ //转换成ns long nanoTime = TimeUnit.NANOSECONDS.convert(time,timeUnit); DelayItem<?> k = new DelayItem(nanoTime,task); item.put(k);_: } //结束任务 public boolean endTask(DelayItem<Runnable> task){ return item.remove(task); } } //把需要延迟的功能代码单独抽取出来作为一个类,继承Runnable实现run方法 public class DataDemo implements Runnable { int a = -1; public DataDemo(int i){ this.a = i; } @Override public void run(){ System.out.println("超时,要撤销订单...." + a); } } //test class import java.util.Random; import java.util.concurrent.TimeUnit; public class DelayTest{ public static void main(String[] args){ ItemQueueThread ith = ItemQueueThread.getInstance(); ith.init(); Random r = new Random(); for(int i = 0; i < 5; i++){ int a = r.nextInt(20); System.out.println("预先知道等待时间:" + a); DataDemo dd = new DataDemo(a);//创建一个任务对象 ith.put(a,dd,TimeUnit.SECONDS);//将任务添加到队列中 } } } //注意ItemQueueThread的init方法,要在容器初始化的时候就要执行,或在第一次put延迟对象任务之前就要初始化完成,当设定的延迟时间到期时会执行任务对象中的run }
2、定时任务实现方式
引入quartz包
import org.quartz.Job; import org.quartz.JobBuilder; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.SimpleScheduleBuilder; import org.quartz.Trigger; import org.quartz.TriggerBuilder; import org.quartz.impl.StdSchedulerFactory; public class DeplayQuartzImpl implements Job{ @Override public void execute(JobExecutionContext context) throws JobExecutionException { //doing somethings
System.out.println("going scan database..."); } public static void main(String[] args) throws SchedulerException { //create task JobDetail jobDetail = JobBuilder.newJob(DeplayQuartzImpl.class).withIdentity("job_1", "group_1").build(); //create trigger Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("trigger_1", "group_trigger") .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds(3).repeatForever()) .build(); Scheduler scheduler = new StdSchedulerFactory().getScheduler(); //put task into trigger scheduler.scheduleJob(jobDetail, trigger); scheduler.start(); } }
实现java的Delayed接口简介版
import java.util.ArrayList; import java.util.List; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class JdkDelayImpl implements Delayed{ private String orderId; private long timeout; public JdkDelayImpl(String orderId, long timeout) { this.orderId = orderId; this.timeout = timeout + System.nanoTime(); } @Override public int compareTo(Delayed delayed) { if(delayed == this) return 0; JdkDelayImpl t = (JdkDelayImpl) delayed; long d = (getDelay(TimeUnit.NANOSECONDS) - t.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); } @Override public long getDelay(TimeUnit unit) { return unit.convert(timeout - System.nanoTime(), TimeUnit.NANOSECONDS ); } void print() { System.out.println(orderId + " order will being delete..."); } public static void main(String[] args) { List<String> list = new ArrayList<String>(); list.add("001"); list.add("002"); list.add("003"); list.add("004"); list.add("005"); DelayQueue<JdkDelayImpl> queue = new DelayQueue<JdkDelayImpl>(); long start = System.currentTimeMillis(); for(int i = 0; i < 5; i++) { queue.put(new JdkDelayImpl(list.get(i), TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS))); try { queue.take().print(); System.out.println("after: " + (System.currentTimeMillis() - start) + " milliSeconds"); } catch (Exception e) { } } } }
3、基于netty方式,添加netty包
import java.util.concurrent.TimeUnit; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; public class NettyDeplayImpl { static class MyTimeTask implements TimerTask { boolean flag; public MyTimeTask(boolean flag) { this.flag = flag; } @Override public void run(Timeout arg0) throws Exception { System.out.println("going to delete order..."); this.flag = false; } } public static void main(String[] args) { MyTimeTask timeTask = new MyTimeTask(true); Timer timer = new HashedWheelTimer(); timer.newTimeout(timeTask, 5, TimeUnit.SECONDS); int i = 1; while(timeTask.flag) { try { Thread.sleep(1000); } catch (Exception e) { } System.err.println(i + " seconds gone"); i++; } } }