java延迟队列

大多数用到定时执行的功能都是用任务调度来做的,单身当碰到类似订餐业务/购物等这种业务就不好处理了,比如购物的订单功能,在你的订单管理中有N个订单,当订单超过十分钟未支付的时候自动释放购物车中的商品,订单失效。这种高频率的延迟任务再用任务调度(定时)实现就得不偿失了。推荐用Java延迟队列来实现,DelayQueue是java.util.concurrent中提供的一个类DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中中取走。这种队列是有序的,即对头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。

1、java延迟队列实现方式

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

//任务线程 实现delayed接口
public class DelayItem<T extends Runnable> implements Delayed {
//到期时间
private final long time;
//任务对象
private final T task;
//原子类
private static final AtomicLong atomic = new AtomicLong(0);
private final long n;

public DelayItem(long timeout, T t) {
    this.time = System.nanoTime() + timeout;
    this.task = t;
    this.n = atomic.getAndIncrement();
}
//返回与此对象相关的剩余延迟时间,以给定的时间单位表示
public long getDelay(TimeUnit unit) {
    return unit.convert(this.time - System.nanoTime(),TimeUnit.NANOSECONDS);
}

public int compareTo(Delayed other) {
    if(other == this) {
    return 0;
    }
    if(other instanceof DelayItem) {
        DelayItem<?> x = (DelayItem<?>)other;
        long diff = tiem - x.time;
        if(diff < 0) {
            return -1;
        }else if(diff > 0) {
            return 1;
        }else if( n < x.n){
            return -1;
        }else {
            return 1;
        }
        long d = (getDelay(TimeUnit.NANOSECONDS) -           other.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }
    public T getTask(){
        return this.task;
    }
    @Override
    public int hashCode(){
        return task.hashCode();
    }
    @Override
    public boolean equals(Object object){
        if(object instanceof DelayItem){
            return object.hashCode() == hashCode() ? true : false;
    }
    return false;
}
}

//管理延迟任务的类
import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
//延迟队列存放有效期对象
public class ItemQueueThread {
private static final Logger logger = Logger.getLogger(this.class);
private ItemQueueThread(){}
//延迟加载(线程安全)
private static class LazyHolder{
    private static ItemQueueThread itemQueueThread = new ItemQueueThread();
}
public static ItemQueueThread getInstance(){
    return LazyHolder.itemQueueThread;
}
//缓存线程池
ExecutorService executor = Executors.newCacheThreadPool();
//线程
private Thread daemonThead;
//初始化线程
public void init() {
    daemonThread = new Thread(() -> {
        try{
            execute();
        }cathc(InterruptedException e){
            e.printStackTrace();
            logger.info(e.getMessage());
        }
    });
    System.out.println("init......start");
    daemonThread.start();
}

private void execute() throws InterrupedException {
    while(true) {
        Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
        System.out.println("线程数...." + map.size());
        System.out.println(System.currentTimeMills());
        System.out.println(item.size());
        System.out.println("线程状态----" + Thread.currentThread().getState());
        try{
            //从延迟队列中取值,如果没有对象过期责队列一直等待
            DelayItem<?> t1 = item.take();
            if(t1 != null){
                Runnable task = t1.getTask();
                    if(task == null){
                        continue;
                }
                executor.execute(task);
            }
        }catch(Exception e) {
            e.printStackTrace();
            logger.info(e.getMessage());
        }
    }
}
//创建空的延迟队列
private DelayQueue<DelayItem<?>> item = new DelayQueue<>();
//往队列中添加任务
public void put(long time, Runnable task, TimeUnit timeUnit){
    //转换成ns
    long nanoTime = TimeUnit.NANOSECONDS.convert(time,timeUnit);
    DelayItem<?> k = new DelayItem(nanoTime,task);
    item.put(k);_:
}
//结束任务
public boolean endTask(DelayItem<Runnable> task){
    return item.remove(task);
}
}

//把需要延迟的功能代码单独抽取出来作为一个类,继承Runnable实现run方法
public class DataDemo implements Runnable {
    int a = -1;
    public DataDemo(int i){
        this.a = i;
}
@Override
public void run(){
    System.out.println("超时,要撤销订单...." + a);
}
}

//test class
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class DelayTest{
public static void main(String[] args){
    ItemQueueThread  ith = ItemQueueThread.getInstance();
    ith.init();
    Random r = new Random();
    for(int i = 0; i < 5; i++){
        int a = r.nextInt(20);
        System.out.println("预先知道等待时间:" + a);
        DataDemo dd = new DataDemo(a);//创建一个任务对象
        ith.put(a,dd,TimeUnit.SECONDS);//将任务添加到队列中
    }
}
}
//注意ItemQueueThread的init方法,要在容器初始化的时候就要执行,或在第一次put延迟对象任务之前就要初始化完成,当设定的延迟时间到期时会执行任务对象中的run
}            

  

2、定时任务实现方式

引入quartz包

import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;

public class DeplayQuartzImpl implements Job{

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        //doing somethings 
System.out.println(
"going scan database..."); } public static void main(String[] args) throws SchedulerException { //create task JobDetail jobDetail = JobBuilder.newJob(DeplayQuartzImpl.class).withIdentity("job_1", "group_1").build(); //create trigger Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("trigger_1", "group_trigger") .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds(3).repeatForever()) .build(); Scheduler scheduler = new StdSchedulerFactory().getScheduler(); //put task into trigger scheduler.scheduleJob(jobDetail, trigger); scheduler.start(); } }

 

实现java的Delayed接口简介版

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class JdkDelayImpl implements Delayed{

    private String orderId;
    private long timeout;
    
    
    public JdkDelayImpl(String orderId, long timeout) {
        this.orderId = orderId;
        this.timeout = timeout + System.nanoTime();
    }

    @Override
    public int compareTo(Delayed delayed) {
        if(delayed == this)
            return 0;
        JdkDelayImpl t = (JdkDelayImpl) delayed;
        long d = (getDelay(TimeUnit.NANOSECONDS) - t.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(timeout - System.nanoTime(), TimeUnit.NANOSECONDS );
    }
    
    void print() {
        System.out.println(orderId + " order will being delete...");
    }
    
    
    public static void main(String[] args) {
        List<String> list = new ArrayList<String>();
        list.add("001");
        list.add("002");
        list.add("003");
        list.add("004");
        list.add("005");
        
        DelayQueue<JdkDelayImpl> queue = new DelayQueue<JdkDelayImpl>();
        long start = System.currentTimeMillis();
        for(int i = 0; i < 5; i++) {
            queue.put(new JdkDelayImpl(list.get(i), 
                    TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS)));
            try {
                queue.take().print();
                System.out.println("after: " + (System.currentTimeMillis() - start)
                        + " milliSeconds");
                
            } catch (Exception e) {
            }
        }
    }

}

 

3、基于netty方式,添加netty包

import java.util.concurrent.TimeUnit;

import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;

public class NettyDeplayImpl {

    static class MyTimeTask implements TimerTask {

        boolean flag;
        
        public MyTimeTask(boolean flag) {
            this.flag = flag;
        }

        @Override
        public void run(Timeout arg0) throws Exception {
            System.out.println("going to delete order...");
            this.flag = false;
        }
    }
    
    public static void main(String[] args) {
        MyTimeTask timeTask = new MyTimeTask(true);
        Timer timer = new HashedWheelTimer();
        timer.newTimeout(timeTask, 5, TimeUnit.SECONDS);
        int i = 1;
        while(timeTask.flag) {
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
            }
            System.err.println(i + " seconds gone");
            i++;
        }
    }
}

 

    原文作者:秋水秋色
    原文地址: https://www.cnblogs.com/codechange/p/8367208.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞