问题由来:
多线程接收kafka的消息,有时消息几乎同时达到,先简单处理后提交给线程池再次处理,结果出现当先到达的消息msgA和后到达的消息msgB到达时间相差很小时,例如10毫秒,几乎同时提交到线程池,因为提交的线程池的时间相差更小有时几乎是完全相同的时间,导致偶然消息处理乱序,消息msgB被先处理了。
解决思路:
所有提交给线程池的Runnable带有优先级,虽然几乎同时提交到线程池,但是线程执行任务时从自己的任务队列中找当前优先级最高的先处理。基于上面的情况,可以根据消息的到达时间进行优先级比较,先到达的优先级高。
具体实现
分为两个类,第一个可比较优先级的Runnable,第二个线程池的创建时使用PriorityBlockingQueue而不是LinkedBlockingQueue。
Runnable代码
package com.yq;
import lombok.extern.slf4j.Slf4j;
import java.util.Random;
@Slf4j
public class PrioritizedRunnable implements Runnable, Comparable<PrioritizedRunnable> {
private long rts;
private String name;
PrioritizedRunnable(long rts, String name) {
this.rts = rts;
this.name = name;
}
public long getRts() {
return rts;
}
public String getName() {
return name;
}
@Override
public int compareTo(PrioritizedRunnable secondOne) {
// 时间越小越优先
log.info("compareTo. this.name={}, secondOne.name={}", this.getName(), secondOne.getName());
if (this.getRts() < secondOne.getRts()) {
return -1;
}else if(this.getRts()> secondOne.getRts()){
return 1;
} else {
return 0;
}
}
@Override
public void run() {
Random random = new Random();
log.info("rts={}, name={}", rts, name);
try {
int sleepRandom = random.nextInt(200);
Thread.sleep(sleepRandom);
} catch (Exception ex) {
log.info("sleep exception", ex);
}
}
}
具体调用的示例代码
package com.yq;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
public class PriorityDemo {
public static void main(String[] args) throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2,
2,
Long.MAX_VALUE, /* timeout */
TimeUnit.NANOSECONDS,
new PriorityBlockingQueue<Runnable>(),
new ThreadPoolExecutor.DiscardOldestPolicy());
PrioritizedRunnable p1 = new PrioritizedRunnable(1234, "name1-1");
PrioritizedRunnable p2 = new PrioritizedRunnable(1500, "name4-2");
PrioritizedRunnable p3 = new PrioritizedRunnable(1590, "name5-3");
PrioritizedRunnable p4 = new PrioritizedRunnable(1490, "name3-4");
PrioritizedRunnable p5 = new PrioritizedRunnable(1290, "name2-5");
executor.execute(p4);
executor.execute(p1);
executor.execute(p2);
executor.execute(p3);
executor.execute(p5);
log.info("submit 5 Runnable");
Thread.sleep(30*1000);
ruleExecutor.shutdown();
log.info("done!");
}
}
备注:提交到线程池使用execute方法,不要使用submit要不然回报
java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparable
at java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:357) ~[na:1.8.0_161]
at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:489) ~[na:1.8.0_161]
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371) ~[na:1.8.0_161]
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) ~[na:1.8.0_161]
运行结果
可以看到当线程池执行任务,虽然p4先提交,但是因为p1优先级更高,所以先执行p1 (23:22:33.283 [pool-1-thread-2] INFO com.yq.PrioritizedRunnable – rts=1234, name=name1-1), 然后是p4.
23:22:33.283 [main] INFO com.yq.PrioritizedRunnable – compareTo. this.name=name5-3, o.name=name4-2
23:22:33.283 [pool-1-thread-2] INFO com.yq.PrioritizedRunnable – rts=1234, name=name1-1
23:22:33.283 [pool-1-thread-1] INFO com.yq.PrioritizedRunnable – rts=1490, name=name3-4
23:22:33.288 [main] INFO com.yq.PrioritizedRunnable – compareTo. this.name=name2-5, o.name=name4-2
23:22:33.288 [main] INFO com.yq.PriorityDemo – submit 5 Runnable
23:22:33.440 [pool-1-thread-1] INFO com.yq.PrioritizedRunnable – compareTo. this.name=name4-2, o.name=name5-3
23:22:33.440 [pool-1-thread-1] INFO com.yq.PrioritizedRunnable – rts=1290, name=name2-5
23:22:33.451 [pool-1-thread-2] INFO com.yq.PrioritizedRunnable – rts=1500, name=name4-2
23:22:33.497 [pool-1-thread-2] INFO com.yq.PrioritizedRunnable – rts=1590, name=name5-3
23:23:03.289 [main] INFO com.yq.PriorityDemo – done!
Process finished with exit code 0