The World

scribble

Ralph YY's Blog

05 Jun 2021
Java Blocking Queue

Multiple ways to implement the blocking queue in Java in multiple threads environment.

生产者消费者问题是一个很经典的问题,这里用多种方式展示了Java中写Blocking Queue的方式

1.使用Java自带的Blocking Queue结构,最简单

class BoundedBlockingQueue {
    private LinkedBlockingQueue<Integer> queue;
    public BoundedBlockingQueue(int capacity) {
        this.queue = new LinkedBlockingQueue<>(capacity);
    }
    
    public void enqueue(int element) throws InterruptedException {
        queue.put(element);
    }
    
    public int dequeue() throws InterruptedException {
        return queue.take();
    }
    
    public int size() {
        return queue.size();
    }
}

2.使用信号量的方式
信号量就是标准的多锁模式,可以设置锁的数量, 应用比较简单,
一个信号量equeueSema对enqueue进行阻塞,一个信号量dequeueSema对dequeue进行阻塞。

class BoundedBlockingQueue {
    private Queue<Integer> queue;
    private Semaphore equeueSema;
    private Semaphore dequeueSema;
    public BoundedBlockingQueue(int capacity) {
        this.queue = new LinkedList<>();
        this.equeueSema = new Semaphore(capacity);
        this.dequeueSema = new Semaphore(0);
    }
    public void enqueue(int element) throws InterruptedException {
        equeueSema.acquire();
        queue.offer(element);
        dequeueSema.release();
    }
    public int dequeue() throws InterruptedException {
        dequeueSema.acquire();
        int val = queue.poll();
        equeueSema.release();
        return val;
    }
    public int size() {
        return queue.size();
    }
}

3.最古老传统的synchronized的方式
比较重量级的锁,需要使用wait和notifyAll的方法来切换线程。

class BoundedBlockingQueue {
    private Queue<Integer> queue;
    private int cap;
    public BoundedBlockingQueue(int capacity) {
        this.cap = capacity;
        this.queue = new LinkedList<>();
    }
    public synchronized void enqueue(int element) throws InterruptedException {
        while(queue.size() == cap){
            wait();
        }
        queue.offer(element);
        notifyAll();
    }
    public synchronized int dequeue() throws InterruptedException {
        while(queue.isEmpty()){
            wait(); 
        }
        int val = queue.poll();
        notifyAll();
        return val;
    }
    public int size() {
        return queue.size();
    }
}

4.使用轻量级的可重入锁ReenTrantLock方法
和synchronized的思路是完全一样的,不过是基于CAS的所以特别快,而且减少了死锁的可能性,
对应的lock操作是condition,lock,unlock和await功能, 模板就是如下:

lock.lock();
try {
	//do something
} finally {
	lock.unlock();
}

ReenTrantLock是使用Condition对象来实现wait和notify的功能。
和synchronized对比的话就是:
Signal = Notify
SignalAll = NotifyAll
Await = Wait

class BoundedBlockingQueue {
    private ReentrantLock lock = new ReentrantLock();
    private Condition full = lock.newCondition();
    private Condition empty = lock.newCondition();
    private Queue<Integer> queue;
    private int cap;
    public BoundedBlockingQueue(int capacity) {
        this.queue = new LinkedList<>();
        this.cap = capacity;
    }
    public void enqueue(int element) throws InterruptedException {
        lock.lock();
        try{
            while(queue.size() == cap){
                full.await();
            }
            queue.offer(element);
            empty.signal();
        } finally {
            lock.unlock();
        }
    }
    public int dequeue() throws InterruptedException {
        lock.lock();
        try {
            while(queue.isEmpty()){
                empty.await();
            }
            int val = queue.poll();
            full.signal();
            return val;
        } finally{
            lock.unlock();
        }
    }
    public int size() {
        lock.lock();
        try {
            return queue.size();
        } finally{
            lock.unlock();
        }
    }
}

5.更优雅的读写锁
使用了ReentrantReadWriteLock读写锁,因为在调用size()的时候只是读取,无需阻塞。
写的比较复杂,要从ReadWriteLock分出read锁和write锁,而且其中的write锁还要根据queue的大小来锁定。
读写锁的特点是:
1)write的时候阻塞,只有一个线程能够执行,用于enqueue和dequeue
2)read的时候可以独立执行,随便读,用于获取queue.size()
在写比较多的场景下,性能是最好的。

class BoundedBlockingQueue {
    private ReadWriteLock lock = new ReentrantReadWriteLock();
    private Lock read = lock.readLock();
    private Lock write = lock.writeLock();
    private Condition full = write.newCondition();
    private Condition empty = write.newCondition();
    private Queue<Integer> queue;
    private int cap;
    public BoundedBlockingQueue(int capacity) {
        this.queue = new LinkedList<>();
        this.cap = capacity;
    }
    public void enqueue(int element) throws InterruptedException {
        write.lock();
        try{
            while(queue.size() == cap){
                full.await();
            }
            queue.offer(element);
            empty.signal();
        } finally {
            write.unlock();
        }
    }
    public int dequeue() throws InterruptedException {
        write.lock();
        try {
            while(queue.isEmpty()){
                empty.await();
            }
            int val = queue.poll();
            full.signal();
            return val;
        } finally{
            write.unlock();
        }
    }
    public int size() {
        read.lock();
        try {
            return queue.size();
        } finally{
            read.unlock();
        }
    }
}

Reference

1.使用ReadWriteLock


Til next time,
at 00:00

scribble

comments powered by Disqus