任务队列实现心得

近日,研究了一下任务队列,于是想到了阻塞队列BlockingQueue,不得不提起到它的两个方法,put,take这两个方法都是阻塞式的,当队列满时,put方法阻塞,当队列空时,take方法阻塞.

我们使用BlockingQueue,只是完成了基本功能FIFO,就是任务先进先出。我们想要实现任务LIFO,这种方式就不行了。需要使用到BlockingDque,双端队列,在队列的两端都可以插入和获取元素,还有阻塞的这一特性。所以我们在使用中,使用的是BlockingDque,来实现我们的需求。

非阻塞队列-FIFO

public class NoneBlockingQTesterFIFO implements ITester {
  private static int MAX_SIZE = 10;
  private static LinkedList<Integer> queue = new LinkedList<>();
  private Consumer consumer;
  private Producer producer;
  private static Random random = new Random();

  public NoneBlockingQTesterFIFO() {
    consumer = new Consumer();
    producer = new Producer();
  }

  @Override
  public void test() {
    producer.start();
    consumer.start();
  }

  public static boolean isFull() {
    return queue.size() == MAX_SIZE;
  }

  public static boolean isEmpty() {
    return queue.size() == 0;
  }

  static class Producer extends Thread {

    int c = 0;

    @Override
    public void run() {
      System.out.println("Producer run consume ...");
      produce();
    }

    private void produce() {
      System.out.println("Producer run produce ...");
      while (c <= 20) {
        synchronized (queue) {
          while (queue.size() == MAX_SIZE) {
            try {
              System.out.println("队列满了...");
              queue.wait();
            } catch (InterruptedException e) {
              e.printStackTrace();
              // 通知其它线程取数据
              queue.notify();
            }
          }

          System.out.println("");
          // 每次插入一个元素
          int v = random.nextInt(100);
          queue.offer(v);
          c++;
          System.out.println("添加了数据: " + v);
          queue.notify();
          // System.out.println("向队列中添加了一个元素,剩余空间 = " + (MAX_SIZE - queue.size()));
        }
      }
    }
  }

  static class Consumer extends Thread {

    @Override
    public void run() {
      System.out.println("Consumer run...");
      consume();
    }

    private void consume() {
      System.out.println("Consumer run consume ...");
      while (true) {
        synchronized (queue) {
          while (queue.size() == 0) {
            try {
              System.out.println("队列空了...");
              queue.wait();
            } catch (InterruptedException e) {
              e.printStackTrace();
              queue.notify();
            }
          }
          System.out.println("");
          // 每次移走队首元素
          Integer v = queue.poll();
          System.out.println("取到了数据 " + v);
          queue.notify();
          // System.out.println("从队列中取走了一个元素,剩余空间 = " + (MAX_SIZE - queue.size()));
        }
      }
    }
  }

  /**
  添加了数据: 45
  添加了数据: 58
  添加了数据: 82
  添加了数据: 20
  添加了数据: 29
  添加了数据: 18
  添加了数据: 39
  添加了数据: 83
  添加了数据: 82
  添加了数据: 41
  队列满了...
  取到了数据 45
  取到了数据 58
  取到了数据 82
  取到了数据 20
  取到了数据 29
  取到了数据 18
  取到了数据 39
  取到了数据 83
  取到了数据 82
  取到了数据 41
  队列空了...
  添加了数据: 51
  添加了数据: 91
  添加了数据: 76
  添加了数据: 46
  添加了数据: 34
  添加了数据: 38
  添加了数据: 19
  添加了数据: 36
  添加了数据: 86
  添加了数据: 14
  队列满了...
  取到了数据 51
  取到了数据 91
  取到了数据 76
  取到了数据 46
  取到了数据 34
  取到了数据 38
  取到了数据 19
  取到了数据 36
  取到了数据 86
  取到了数据 14
   */
}

非阻塞队列的-LIFO

public class NoneBlockingQTesterLIFO implements ITester {
  private static int MAX_SIZE = 10;
  private static LinkedList<Integer> queue = new LinkedList<>();
  private Consumer consumer;
  private Producer producer;
  private static Random random = new Random();

  public NoneBlockingQTesterLIFO() {
    consumer = new Consumer();
    producer = new Producer();
  }

  @Override
  public void test() {
    producer.start();
    consumer.start();
  }

  public static boolean isFull() {
    return queue.size() == MAX_SIZE;
  }

  public static boolean isEmpty() {
    return queue.size() == 0;
  }

  static class Producer extends Thread {

    int c = 0;

    @Override
    public void run() {
      System.out.println("Producer run consume ...");
      produce();
    }

    private void produce() {
      System.out.println("Producer run produce ...");
      while (c <= 20) {
        synchronized (queue) {
          while (queue.size() == MAX_SIZE) {
            try {
              System.out.println("队列满了...");
              queue.wait();
            } catch (InterruptedException e) {
              e.printStackTrace();
              // 通知其它线程取数据
              queue.notify();
            }
          }

          System.out.println("");
          // 每次插入一个元素
          int v = random.nextInt(100);
          queue.offerFirst(v);
          c++;
          System.out.println("添加了数据: " + v);
          queue.notify();
          // System.out.println("向队列中添加了一个元素,剩余空间 = " + (MAX_SIZE - queue.size()));
        }
      }
    }
  }

  static class Consumer extends Thread {

    @Override
    public void run() {
      System.out.println("Consumer run...");
      consume();
    }

    private void consume() {
      System.out.println("Consumer run consume ...");
      while (true) {
        synchronized (queue) {
          while (queue.size() == 0) {
            try {
              System.out.println("队列空了...");
              queue.wait();
            } catch (InterruptedException e) {
              e.printStackTrace();
              queue.notify();
            }
          }
          System.out.println("");
          // 每次移走队首元素
          Integer v = queue.pollFirst();
          System.out.println("取到了数据 " + v);
          queue.notify();
          // System.out.println("从队列中取走了一个元素,剩余空间 = " + (MAX_SIZE - queue.size()));
        }
      }
    }
  }

  /**
   *添加了数据: 39
    添加了数据: 80
    添加了数据: 58
    添加了数据: 87
    添加了数据: 73
    添加了数据: 94
    添加了数据: 0
    添加了数据: 52
    添加了数据: 23
    添加了数据: 59
    队列满了...
    取到了数据 59
    取到了数据 23
    取到了数据 52
    取到了数据 0
    取到了数据 94
    取到了数据 73
    取到了数据 87
    取到了数据 58
    取到了数据 80
    取到了数据 39
    队列空了...
    添加了数据: 38
    添加了数据: 42
    添加了数据: 87
    添加了数据: 53
    添加了数据: 33
    添加了数据: 32
    添加了数据: 68
    添加了数据: 13
    添加了数据: 59
    添加了数据: 43
    队列满了...
    取到了数据 43
    取到了数据 59
    取到了数据 13
    取到了数据 68
    取到了数据 32
    取到了数据 33
    取到了数据 53
    取到了数据 87
    取到了数据 42
    取到了数据 38
    队列空了...
    添加了数据: 39
    取到了数据 39
    队列空了...
   */

}

阻塞队列之双端队列的使用

public class LinkedBlockingDqTester implements Itester {

  int MAX_ITEM_SIZE = 10;
  private LinkedBlockingDeque<Integer> queue = new LinkedBlockingDeque<>(MAX_ITEM_SIZE);
  private int mFlag;

  public LinkedBlockingDqTester(int flag) {
    this.mFlag = flag;
  }

  @Override
  public void test() {
    switch (mFlag) {
      case 0:
        fifo();
        break;
      case 1:
        lifo();
        break;
    }
  }

  // put -> [9,8,7,6,5,4,3,2,1,0]
  // get -> [9]
  // [8,7,6,5,4,3,2,1,0]
  // put -> [10,8,7,6,5,4,3,2,1,0]
  // get -> [10]
  // [8,7,6,5,4,3,2,1,0]
  // put -> [11,8,7,6,5,4,3,2,1,0]
  // get -> [11]
  // [8,7,6,5,4,3,2,1,0]
  // put -> [12,8,7,6,5,4,3,2,1,0]
  // get -> [12]
  // [8,7,6,5,4,3,2,1,0]
  // put -> [13,8,7,6,5,4,3,2,1,0]
  // get -> [13]
  // [8,7,6,5,4,3,2,1,0]
  // put -> [14,8,7,6,5,4,3,2,1,0]
  // get -> [14]
  // [8,7,6,5,4,3,2,1,0]
  // get -> [8]
  // get -> [7]
  // get -> [6]
  // get -> [5]
  // get -> [4]
  // get -> [3]
  // get -> [2]
  // get -> [1]
  // get -> [0]
  /**
   * put -> putFirst
   * take -> takeFirst
   */
  private void lifo() {
    new Thread() {
      public void run() {
        for (int i = 0; i < 15; i++) {
          try {
            // 解决避免线程执行过快,带来的问题
            System.out.println("");
            queue.putFirst(i);
            System.out.println("存数据 " + i);
          } catch (InterruptedException e) {
            e.printStackTrace();
            System.out.println(e);
          }
        }
      };
    }.start();

    new Thread() {
      public void run() {
        try {
          Thread.sleep(3000);
        } catch (InterruptedException e1) {
          e1.printStackTrace();
        }
        while (true) {
          try {
            // 解决避免线程执行过快,带来的问题
            System.out.println("");
            Integer v = queue.takeFirst();
            System.out.println("取数据 = " + v);
          } catch (InterruptedException e) {
            e.printStackTrace();
            System.out.println(e);
          }
        }
      };
    }.start();
  }

  // put(putLast) -> [0,1,2,3,4,5,6,7,8,9]
  // get(getFirst) -> [0,1,2,3]
  // [4,5,6,7,8,9]
  // put(putLast) -> [4,5,6,7,8,9,10]
  // [4,5,6,7,8,9,10]
  // get(getFirst) -> [4,5,6,7]
  // [8,9,10]
  // put(putLast) -> [8,9,10,11]
  // get(getFirst) -> [8,9]
  // [10,11]
  // put(putLast) -> [10,11,12,13]
  // [10,11,12,13]
  // get(getFirst) -> [10]
  // [11,12,13]
  // put(putLast) -> [11,12,13,14]
  // get(getFirst) -> [11]
  // get(getFirst) -> [12]
  // get(getFirst) -> [13]
  // get(getFirst) -> [14]
  /**
   * put -> putLast
   * take -> takeFirst
   */
  private void fifo() {
    new Thread() {
      public void run() {
        for (int i = 0; i < 15; i++) {
          try {
            queue.put(i);
            System.out.println("存数据 " + i);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      };
    }.start();

    new Thread() {
      public void run() {
        try {
          Thread.sleep(3000);
        } catch (InterruptedException e1) {
          e1.printStackTrace();
        }
        System.out.println(queue.toArray());
        while (true) {
          try {
            Integer v = queue.take();
            System.out.println("取数据 = " + v);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      };
    }.start();
  }

}

重点说明
本项目中的代码,有部分是参照网络上的资料进行的修改,如有雷同,敬请谅解。

    原文作者:草蜢的逆袭
    原文地址: https://www.jianshu.com/p/3abb8479e900
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞