1. 程式人生 > >java-消息中間件-基於內存的mq

java-消息中間件-基於內存的mq

null message main rate 鏈表實現 測試 模型 true 做到

  1. 如果用戶的請求比較費時,可以考慮將用戶的請求信息放到隊列中,立即返回給用戶處理中等信息,這樣可以給用戶比較流暢的體驗,後端可以利用單獨的服務消費消息,做到了解耦,提高了並發能力。

  2. 本文使用jdk為我們提供的阻塞隊列api,來實現一個基於內存的簡單消息隊列。主要涉及的接口BlockingQueue,以及它的實現類ArrayBlockingQueue(數組實現的)和LinkedBlockingQueue(鏈表實現的)。

  3. BlockingQueue的主要方法
    添加元素
    put() //往隊列裏插入元素,如果隊列已經滿,則會一直等待直到隊列為空插入新元素,或者線程被中斷拋出異常;
    offer() //往隊列添加元素如果隊列已滿直接返回false,隊列未滿則直接插入並返回true;


    add() //對offer()方法的簡單封裝.如果隊列已滿,拋出異常new IllegalStateException("Queue full");
    刪除元素
    remove() //方法直接刪除隊頭的元素;
    take() //取出並刪除隊頭的元素,當隊列為空,則會一直等待直到隊列有新元素可以取出,或者線程被中斷拋出異常;
    pool() //取出並刪除隊頭的元素,當隊列為空,返回null;
    peek() //直接取出隊頭的元素,並不刪除;
    element() //對peek方法進行簡單封裝,如果隊頭元素存在則取出並不刪除,如果不存在拋出異常NoSuchElementException();

  4. 基於內存的隊列,隊列的大小依賴於JVM內存的大小,一般如果是內存占用不大且處理相對較為及時的都可以采用此種方法。如果你在隊列處理的時候需要有失敗重試機制,那麽用此種隊列就不是特別合適了,可以使用基於數據庫的mq。

  5. 使用示例,簡單模仿基於生產者、消費者、消息隊列模型的內存mq

*生產者模型

    public class Produce {
    
    private int id ;
    
    private BlockingQueue<String> queue;
    
    public Produce() {
    }
    public Produce(int id, BlockingQueue<String> queue) {
        this.id = id;
        this.queue = queue;
        System.out.println("創建了生產者"+ id + "號");
    }
    
    public void produce(String message){
        boolean add = this.queue.add(message);
        if(add){
            System.out.println("生產者"+id+"號,生產了一條消息,"+message);  
        }
    }
}

*消費者模型

public class Consumer {
    
    private int id;

    private BlockingQueue<String> queue;
    
    private static ScheduledExecutorService  executors = Executors.newScheduledThreadPool(1);
    
    public Consumer() {
    }
    
    public Consumer(BlockingQueue<String> queue, int id){
        this.id = id;
        this.queue = queue;
        System.out.println("創建了消費者:" + id +"號");
        consumer();
    }
    public void consumer(){
        
            executors.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    System.out.println("消費者手裏的線程池核武器收到了一個命令,此時隊列中的任務數"+queue.size()+"個");
                    try {
                        String message = queue.take();
                        System.out.println("消費者: " + id + "號,開始消費了");
                        Thread.sleep(3000);
                        System.out.println("消費者: " + id + "消費結束了,"+message);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }                   
                }
            }, 0, 5, TimeUnit.SECONDS);
            
    }
}

*測試類

public class Test {

    public static void main(String[] args) {
        LinkedBlockingDeque<String> queue = new LinkedBlockingDeque<>(1000);
        Produce produce = new Produce(1, queue);
        new Consumer(queue, 1);
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        for(int i=0;i<20;i++){
            produce.produce("@"+i+"@");
        }   
    }
}

ps:消息隊列無論在分布式還是在高並發概念中,都是一個非常重要的數據處理方式,本文只是我對消息中間件這個技術棧的第一次試水,後面會陸續增加基於數據庫的mq,以及現在已經比較成熟商業中間件產品rabbitmq、rocketmq等的學習筆記。

java-消息中間件-基於內存的mq