生产者消费者问题是Java并发中的常见问题之一,在实现时,一般可以考虑使用juc包下的BlockingQueue接口,至于具体使用哪个类,则就需要根据具体的使用场景具体分析了。本文主要实现一个生产者消费者的原型,以及实现一个生产者消费者的典型使用场景。
第一个问题:实现一个生产者消费者的原型。
1 import java.util.concurrent.*; 2 3 class Consumer implements Runnable { 4 BlockingQueue q = null; 5 6 public Consumer(BlockingQueue q) { 7 this.q = q; 8 } 9 10 @Override 11 public void run() { 12 while(true) { 13 try { 14 q.take(); 15 System.out.println("Consumer has taken a product."); 16 }catch(InterruptedException e) { 17 18 } 19 } 20 } 21 } 22 23 class Producer implements Runnable { 24 BlockingQueue q = null; 25 26 public Producer(BlockingQueue q) { 27 this.q = q; 28 } 29 30 @Override 31 public void run() { 32 while(true) { 33 try { // note that if there is any chance that block, usually we need a InterruptedException 34 q.put(new Object()); 35 System.out.println("Producer has puted a product."); 36 }catch(InterruptedException e) { 37 38 } 39 } 40 } 41 42 43 } 44 45 public class JC_ProducerConsumerPrototype { 46 static int queueCapacity = 1024; 47 //static BlockingQueue<Object> q = new ArrayBlockingQueue<Object>(queueCapacity); // Can also compile 48 static BlockingQueue q = new ArrayBlockingQueue(queueCapacity); // ABQ must has a capacity 49 public static void main(String[] args) { 50 Thread t1 = new Thread(new Producer(q)); 51 Thread t2 = new Thread(new Consumer(q)); 52 t1.start(); 53 t2.start(); 54 } 55 56 57 }
第二个问题,现在假设生产者是在读取磁盘上的多个log文件,对于每一个文件,依次读取文件中的每一行,也就是一条log记录;消费者需要读取并分析这些记录,假设消费者是计算密集型的。如何在生产者消费者原型的基础上实现这些功能?
这个场景在server端开发中是经常碰到的,因为在Server端,不可避免地会产生大量的日志文件。
1 import java.util.concurrent.*; 2 import java.io.*; 3 import java.nio.*; 4 import java.nio.file.*; 5 import java.util.*; 6 import java.nio.charset.*; 7 8 9 class Producer implements Runnable { 10 BlockingQueue q = null; 11 String fileName = null; 12 CountDownLatch latch = null; 13 14 public Producer(BlockingQueue q,String fileName,CountDownLatch latch) { 15 this.q = q; 16 this.fileName = fileName; 17 this.latch = latch; 18 } 19 20 @Override 21 public void run() { 22 Path path = Paths.get(".",fileName); 23 try{ 24 List<String> lines = Files.readAllLines(path,StandardCharsets.UTF_8); 25 for(int i=lines.size();i>0;i--){ 26 try{ 27 q.put(lines.get(i)); 28 }catch(InterruptedException e) { 29 30 } 31 } 32 }catch(IOException e){ 33 34 } 35 latch.countDown(); 36 } 37 } 38 39 class Consumer implements Runnable { 40 BlockingQueue<String> q = null; 41 Boolean done = false; 42 43 public Consumer(BlockingQueue q,Boolean done){ 44 this.q = q; 45 this.done = done; 46 } 47 48 @Override 49 public void run(){ 50 while(!done||q.size()!=0){ 51 try{ 52 q.take(); 53 }catch(InterruptedException e){ 54 55 } 56 } 57 } 58 } 59 60 public class JC_ProducerConsumerHandlingLog{ 61 public static int fileCount = 1024; 62 public static String[] fileNames = new String[fileCount]; 63 public static int cpuCount = 8; 64 public static CountDownLatch latch = new CountDownLatch(fileCount); 65 public static volatile boolean done = false; 66 public static BlockingQueue<String> q = new LinkedBlockingQueue<String>(fileCount);//one thread for one file 67 68 public static void main(String[] args){ 69 for(int i=0;i<fileCount;i++){ 70 Thread t = new Thread(new Producer(q,fileNames[i],latch)); 71 t.start(); 72 } 73 for(int i=0;i<cpuCount;i++){//for computing tasks, we don't need too many threads. 74 Thread t = new Thread(new Consumer(q,done)); 75 t.start(); 76 } 77 try{ 78 latch.await(); 79 done = true; 80 }catch(InterruptedException e){ 81 82 } 83 84 } 85 }
需要稍微注意一下线程数的选择,对于计算密集型的任务,我认为线程数达到cpu的核数比较合理(在不考虑超线程的情况下,也就是说一个核只有一个线程)。有不同意见欢迎跟我交流!