自定义阻塞队列和自定义线程池

自定义阻塞队列

package jucdemo;

import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/** * @author wardseptember * @create 2021-02-27 17:12 * * 这里只实现了几个接口,其他的太多了,不实现了,感兴趣的朋友可以自行去实现 */
public class MyBlockingQueue<E> implements BlockingQueue<E> { 

    private LinkedList<E> queue = new LinkedList<>();

    private AtomicInteger count = new AtomicInteger(0);

    private final Object lock = new Object();

    private final int maxSize;

    public MyBlockingQueue(int maxSize) { 
        this.maxSize = maxSize;
    }

    private void enqueue(E e) { 
        queue.offer(e);
    }

    private E dequeue() { 
        return queue.poll();
    }

    @Override
    public boolean add(E e) { 
        synchronized (lock) { 
            if (count.get() == maxSize) { 
                return false;
            }
            count.incrementAndGet();
            enqueue(e);
            return true;
        }
    }

    @Override
    public boolean offer(E e) { 
        return this.add(e);
    }

    @Override
    public E remove() { 
        synchronized (lock) { 
            if (count.get() == 0) { 
                return null;
            }
            count.decrementAndGet();
            return dequeue();
        }
    }

    @Override
    public E poll() { 
        return remove();
    }

    @Override
    public E element() { 
        return null;
    }

    @Override
    public E peek() { 
        return null;
    }

    @Override
    public void put(E e) throws InterruptedException { 
        synchronized (lock) { 
            while (count.get() == maxSize) { 
                try { 
                    lock.wait();
                } catch (InterruptedException ex) { 
                    ex.printStackTrace();
                }
            }
            enqueue(e);
            count.incrementAndGet();
            lock.notify();
        }
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { 
        return false;
    }

    @Override
    public E take() throws InterruptedException { 
        synchronized (lock) { 
            while (count.get() == 0) { 
                try { 
                    lock.wait();
                } catch (InterruptedException ex) { 
                    ex.printStackTrace();
                }
            }
            count.decrementAndGet();
            lock.notify();
            return dequeue();
        }
    }

    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException { 
        return null;
    }

    @Override
    public int remainingCapacity() { 
        return 0;
    }

    @Override
    public boolean remove(Object o) { 
        return false;
    }

    @Override
    public boolean containsAll(Collection<?> c) { 
        return false;
    }

    @Override
    public boolean addAll(Collection<? extends E> c) { 
        return false;
    }

    @Override
    public boolean removeAll(Collection<?> c) { 
        return false;
    }

    @Override
    public boolean retainAll(Collection<?> c) { 
        return false;
    }

    @Override
    public void clear() { 

    }

    @Override
    public int size() { 
        return 0;
    }

    @Override
    public boolean isEmpty() { 
        return false;
    }

    @Override
    public boolean contains(Object o) { 
        return false;
    }

    @Override
    public Iterator<E> iterator() { 
        return null;
    }

    @Override
    public Object[] toArray() { 
        return new Object[0];
    }

    @Override
    public <T> T[] toArray(T[] a) { 
        return null;
    }

    @Override
    public int drainTo(Collection<? super E> c) { 
        return 0;
    }

    @Override
    public int drainTo(Collection<? super E> c, int maxElements) { 
        return 0;
    }
}

自定义线程池

package jucdemo;

import java.io.IOException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/** * @author wardseptember * @create 2021-02-27 16:49 */
public class ThreadPoolExecutorDemo { 
    public static void main(String[] args) throws IOException { 
// ArrayBlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(2);

        MyBlockingQueue<Runnable> myBlockingQueue = new MyBlockingQueue<Runnable>(2);

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, myBlockingQueue, new NameThreadFactory(), new MyIgnorePolicy());
        for (int i = 1; i < 10; i++) { 
            MyTask task = new MyTask(String.valueOf(i));
            threadPoolExecutor.execute(task);
        }
        System.in.read();
        threadPoolExecutor.shutdown();
    }

    static class NameThreadFactory implements ThreadFactory { 
        private final AtomicInteger atomicInteger = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) { 
            Thread t = new Thread(r, "my-thread-" + atomicInteger.getAndIncrement());
            System.out.println(t.getName() + "被创建");
            return t;
        }
    }

    static class MyIgnorePolicy implements RejectedExecutionHandler { 

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
            doLog(r, executor);
        }

        private void doLog(Runnable r, ThreadPoolExecutor executor) { 
            System.out.println(r.toString() + "被拒绝");
        }
    }

    static class MyTask implements Runnable { 

        private String name;

        public MyTask(String name) { 
            this.name = name;
        }

        @Override
        public void run() { 
            try { 
                System.out.println(this.toString() + "正在运行");
                Thread.sleep(3000);
            } catch (InterruptedException e) { 
                e.printStackTrace();
            }
        }

        public String getName() { 
            return name;
        }

        @Override
        public String toString() { 
            return "MyTask [name=" + name + "]";
        }
    }
}

推荐阅读

欢迎关注我的公众号呦,率先更新内容,并且后续还有一些源码级的免费教程推出。

《自定义阻塞队列和自定义线程池》

    原文作者:wardseptember
    原文地址: https://blog.csdn.net/wardseptember/article/details/114183029
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞