1. 程式人生 > >ActiveMQ學習筆記(12)----ActiveMQ的叢集

ActiveMQ學習筆記(12)----ActiveMQ的叢集

1. Queue consumer cluster

  ActiveMQ支援Consumer對訊息的高可靠性的負載均衡消費,如果一個Consumer死掉,該訊息會轉發到其他的Consumer消費的Queue。如果一個Consumer獲得訊息比其他Consumer快,那麼他將獲得更多的訊息。因此推薦ActiveMQ的Broker和Client使用failover://transport的方式來配置連線

2. Broker clusters

  大部分情況下是使用一系列的Broker和Client連線到一起。如果一個Broker死掉了,那麼Client可以自動連線到其他的Broker上。實現以上行為需用failover協作Client.

  如果啟動了多個Broker,Client可以使用static discovery或者Dynamic discovery容易從一個broker到另一個broker直接連線。這樣當一個broker上沒有Consumer的話,那麼它的訊息不會被消費,然而該broker會通過儲存和轉發的策略來把該訊息傳送到其他broker上。

  特別注意:ActiveMQ預設的兩個Broker, static連結後是單方向的,broker-A可以訪問消費Broker-b的訊息,如果要支援雙向通訊,需要在networConnector配置的時候,設定duplex=true。

  這裡的叢集使用的是靜態網路連線,訪問broker的時建立ConnectionFactory的路徑應寫為:

ConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:61716)?randomize=false");

  使用三個消費者,一個消費者連線埠61716,另外兩個消費者連線61616埠。

  三個消費者使用如下程式碼如下

package com.wangx.activemq.master;

import org.apache.activemq.ActiveMQConnectionFactory;

import
javax.jms.*; public class QR1 { public static void main(String[] args) { //建立連結工廠 ConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:61716)?randomize=false"); Connection connection = null; try{ //建立連結 connection = factory.createConnection(); //啟動連結 connection.start(); //獲取會話 final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //建立佇列 Destination queue = session.createQueue("myQueue"); MessageConsumer consumer1 = session.createConsumer(queue); MessageConsumer consumer2 = session.createConsumer(queue); new Thread(new MasterRunnable(consumer1, session, "consumer1")).start(); new Thread(new MasterRunnable(consumer2, session, "consumer2")).start(); }catch (Exception e) { e.printStackTrace(); } } }

  執行緒任務類:開兩個執行緒同時使用兩個消費者同時監聽61616埠

package com.wangx.activemq.master;

import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

public class MasterRunnable implements Runnable {

    private MessageConsumer consumer;

    private Session session;

    private String name;
    public  MasterRunnable(MessageConsumer consumer, Session session, String name) {
        this.consumer = consumer;
        this.session = session;
        this.name = name;
    }
    @Override
    public void run() {
        try {
            consumer.setMessageListener(new MyMessageListener(session, name));
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

  訊息監聽類:

package com.wangx.activemq.master;

import javax.jms.*;

public class MyMessageListener implements MessageListener {

    private Session session = null;
    private String name;
    public MyMessageListener(Session session, String name) {
        this.session = session;
        this.name = name;
    }
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println(name + "接受到訊息:" + textMessage.getText());
            session.commit();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

  連線61716埠的consumer

package com.wangx.activemq.master;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class QR2 {

    public static void main(String[] args) {
        //建立連結工廠
        ConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:61716)?randomize=false");
        Connection connection = null;
        try{
            //建立連結
            connection =  factory.createConnection();
            //啟動連結
            connection.start();
            //獲取會話
            final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            //建立佇列
            Destination queue = session.createQueue("myQueue");
            //建立消費者
            MessageConsumer messageConsumer = session.createConsumer(queue);
            //監聽訊息
            messageConsumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("QR2 接受到訊息:" + textMessage.getText());
                        session.commit();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        }catch (Exception e) {

        }
    }
}

  向埠61716傳送訊息

package com.wangx.activemq.master;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class MessageSender {

    public static void main(String[] args) throws JMSException {
        //建立連結工廠
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61716");

        Connection connection = null;
        Session session = null;
        try{
            //建立連結
            connection =  factory.createConnection();
            //啟動連結
            connection.start();
            //獲取會話
            session = connection.createSession(Boolean.TRUE, session.AUTO_ACKNOWLEDGE);
            //建立佇列
            Destination queue = session.createQueue("myQueue");
            //建立生產者物件
            MessageProducer messageProducer = session.createProducer(queue);

            for (int i = 0; i < 30; i++) {
                //建立訊息物件
                TextMessage textMessage = session.createTextMessage("hello:" + i);
                //傳送訊息
                messageProducer.send(textMessage);
                System.out.println(textMessage.getText());
            }
            session.commit();
            session.close();
            connection.close();
        }catch (Exception e) {

        }finally {
        }

    }
}

  分別啟動兩個訊息監聽類,可以發現,此時兩個不同broker所連線的消費者所消費的訊息是均分的,儘管此時有一個broker中有兩個consumer,這是因為ActiveMQ預設的認為網路上的broker作為一個consumer,此時將conduitSubscriptions設定為false即可是整個叢集上的所有consumer都有均分消費訊息的可能。

2. Master Slave

  在5.9的版本中,廢除了Pure Master Slave的方式,目前支援

  1. Shared File System Master Slave:基於共享儲存的Master-Slave;多個broker例項使用一個儲存檔案,誰拿到檔案鎖誰就是master,其他處於待啟動狀態,如果master掛掉了,某個搶到檔案鎖的slave變成master

  2. JDBC Master Slave: 基於JDBC的Master-Slave:使用同一個資料庫,拿到LOCK表的寫鎖的broker成為master.

  3. Replicated LeveDB Store:基於zookeeper複製LeveDB儲存的Master-Slave機制,這個是5.9新家的機制。

  具體的可以檢視官方文件:

  http://activemq.apache.org/masterslave.html

3. JDBC Master Slave方式

  利用資料庫作為資料來源,採用Master/Slave模式,其中啟動的時候Master首先獲得獨有鎖,其他Slaves Broker等待獲取獨有鎖。

  推薦客戶端使用Failover來連線Brokers.

  具體如下圖:

  

  3.1. Master失敗

    如果master失敗,則它釋放獨有鎖,其他Slave獲獨有鎖,其他Slave立即獲得獨有鎖後它將變成Master,並且啟動所有的傳輸連線。同時,Client將停止連線之前的Master並且它將會輪詢其他可以利用到的Broker,即新的Master.如上中圖所示。

  3.2 Master重啟

    任何時候啟動新的Broker,都會作為新的Slave來加入叢集,如上右圖所示。

   3.3 JDBC Master Slave的配置

    使用<jdbcPersistenceAdapter dataSource="#mysql-ds"/>來配置訊息持久化,自動就會使用MasterSlave的方式。具體配置如下:

  

<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
        </property>
    </bean>

   <!-- Allows accessing the server log -->
    <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <!--<policyEntry topic=">" >
                     The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>-->
                <policyEntry queue=">" enableAudit="false">
                    <networkBridgeFilterFactory>
                        <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
                    </networkBridgeFilterFactory>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        <!--
            The managementContext is used to configure how ActiveMQ is exposed in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:

            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
        -->
        <persistenceAdapter>
            <!--<kahaDB directory="${activemq.data}/kahadb_2"/>-->
            <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
        </persistenceAdapter>


          <!--
            The systemUsage controls the maximum amount of space the broker will
            use before disabling caching and/or slowing down producers. For more information, see:
            http://activemq.apache.org/producer-flow-control.html
          -->
          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:

            http://activemq.apache.org/configuring-transports.html
        -->
        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:9999?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

        <!-- destroy the spring context on shutdown to stop jetty -->
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

    </broker>
    <bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://localhost:3306/activemq?useSSL=false"/>
        <property name="username" value="wangx"/>
        <property name="password" value="wangx"/>
        <property name="poolPreparedStatements" value="true"/>
    </bean>
    <!--
        Enable web consoles, REST and Ajax APIs and demos
        The web consoles requires by default login, you can disable this in the jetty.xml file

        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
    -->
    <import resource="jetty.xml"/>

</beans>
<!-- END SNIPPET: example -->

  去掉靜態網路連線配置,將持久化方式改為jdbc資料的方式,去掉訊息迴流的配置。這裡每個Broker都需要如此配置,只需要埠不一致即可,啟動兩個broker,使用如上連線61716埠的Client像Broker傳送訊息,此時停掉埠為61716的Broker,啟動消費者接受訊息,使用了容錯配置,此時仍然可以接收到前面一個Broker存活時接收到的訊息並進行消費。其實這是由於當61716接受到訊息之後做了持久化,所以當它死掉後,61616升級為Master,它會獲取到前一個master存活時未被消費的訊息,所以我們在訪問61616埠的broker時仍然可以消費訊息。

  說明:使用了failover協議之後,當61716死掉後,client會停止與61716的連線。自動輪詢叢集中可用的broker,即新的master(61616埠的broker)