使用 Redis 的列表结构可以实现执行一种任务的FIFO队列,也可以实现通过调用不同回调函数的来执行多重不同的任务队列,乃至可以是实现简单的优先级队列,当然也可以实现延时队列。
延时队列的基本实现有3类:
- 在任务信息中包含任务的执行时间,工作进程发现任务时间未到,短暂的等待之后,将任务重新推入队列里面。
- 使用一个任务列表记录所需要的执行的任务,并在每次进行 while循环的时候,扫描检查列表并执行已经到期的任务。
- 把所所有需要执行的任务都添加到有序集合里面,并将任务执行的时间设置分值。再是有个一个额外的进程来查询有序集合里面是否有可以执行的任务,如果有,将任务从有序集合里面移除,并将任务推进适当的任务队列。
无论是短暂的等待,还是将任务从入队列,都是已经很好资源的事情,多以通常不会采用第一种方法。如果在本地维护一个任务列表,可以能会导致任务丢失,除非对任务进行持久化。其次,通过不断的扫描别表,查找合适的任务,每次都需要循环遍历,也是件浪费资源的事情,所以第二种方法也不可取。最后,采用有序结合保存任务、执行时间作为排序的依据是最简单最直接的做法。采用执行时间排序,不需要每次遍历整个队列,只需要判断队首的元素是否到了可执行时间即可。其次,只需要一个工作进程。再者,可以使用 “分布式锁”机制将任务从有序集合中个移动到任务队列。这样处理,语义简单,逻辑清晰。
Redis 的有序集合天生就适合做这件事。
127.0.0.1:6379> zadd task_set 1 task1
(integer) 1
127.0.0.1:6379> zadd task_set 2 task2
(integer) 1
127.0.0.1:6379> zadd task_set 3 task3
(integer) 1
127.0.0.1:6379> zadd task_set 4 task4
(integer) 1
127.0.0.1:6379> ZRANGE task_set 0 10 WITHSCORES
1) "task1"
2) "1"
3) "task2"
4) "2"
5) "task3"
6) "3"
7) "task4"
8) "4"
Java 模拟代码:
package me.touch.redis;
import java.util.Set;
import java.util.UUID;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Tuple;
/**
* 延时队列
* @author Knight-Ran
*
*/
public class delayQueue {
private Jedis jedis;
private JedisPool pool;
private static final String QUEUE_NAME = "deplay_queue";
@Before
public void setUp() {
pool = new JedisPool(new JedisPoolConfig(), "localhost");
jedis = pool.getResource();
}
@After
public void after() {
jedis.close();
pool.destroy();
}
// 模拟任务处理队列
public static void addToTaskQue(String taskInfo){
System.out.println(taskInfo+"已经从延时队列中转至队列"+ "当前时间:"+ System.currentTimeMillis() );
System.out.println();
}
public void addToDeplayQueue(Task task){
System.out.println(task.toString()+ "已经加入延时队列");
jedis.zadd(QUEUE_NAME, task.getTime(), task.toString());
}
public void transferFromDelayQueue() throws InterruptedException{
while(true){
Set<Tuple> item = jedis.zrangeWithScores(QUEUE_NAME, 0, 0);
if(item != null && !item.isEmpty()){
Tuple tuple = item.iterator().next();
if(System.currentTimeMillis() >= tuple.getScore()){
// TODO 获取锁
jedis.zrem(QUEUE_NAME, tuple.getElement()); // 从延时队列中移除
addToTaskQue(tuple.getElement()); //任务推入延时队列,因为这里只是延时
// TODO 释放锁
}
}
Thread.sleep(100);
}
}
@Test
public void test() throws InterruptedException{
long now = System.currentTimeMillis();
Task task = new Task(UUID.randomUUID().toString(), now+10*1000, 10*1000+"后执行");
addToDeplayQueue(task);
task = new Task(UUID.randomUUID().toString(), now+20*1000, 20*1000+"后执行");
addToDeplayQueue(task);
task = new Task(UUID.randomUUID().toString(), now+30*1000, 30*1000+"后执行");
addToDeplayQueue(task);
task = new Task(UUID.randomUUID().toString(), now+40*1000, 40*1000+"后执行");
transferFromDelayQueue();
}
static class Task{
// 任务id
private String id ;
// 任务执行时间
private long time;
// 描述
private String desc;
public Task(String id, long time, String desc){
this.id = id ;
this.time = time;
this.desc = desc;
}
public String getId() {
return id;
}
public long getTime() {
return time;
}
public String getDesc() {
return desc;
}
@Override
public String toString() {
return "Task [id=" + id + ", time=" + time + ", desc=" + desc + "]";
}
}
}
测试结果:
Task [id=441a900e-a4a5-44cc-bddc-117bb3f00130, time=1502006961460, desc=10000后执行]已经加入延时队列
Task [id=9982a932-3c29-4e3c-a940-5c3beb5b55c2, time=1502006971460, desc=20000后执行]已经加入延时队列
Task [id=adfdfdff-b8b0-440d-b85e-06c3432b0094, time=1502006981460, desc=30000后执行]已经加入延时队列
Task [id=441a900e-a4a5-44cc-bddc-117bb3f00130, time=1502006961460, desc=10000后执行]已经从延时队列中转至队列当前时间:1502006961481
Task [id=9982a932-3c29-4e3c-a940-5c3beb5b55c2, time=1502006971460, desc=20000后执行]已经从延时队列中转至队列当前时间:1502006971518
Task [id=adfdfdff-b8b0-440d-b85e-06c3432b0094, time=1502006981460, desc=30000后执行]已经从延时队列中转至队列当前时间:1502006981538