1. 程式人生 > >activemq queue開發(持久化方式)

activemq queue開發(持久化方式)

公司要求activemq通訊,我經過三天的努力,今天算是有所前進。現將我對activemq的認識總結如下:
                 要使用activemq進行通訊,就必須開啟一個broker, 他可以理解為管理通訊連線的東東。關於broker的啟動,有兩種方式:一是activemq檔案內的bin/activemq.bat檔案。這種啟動方式比較適合多使用者的開發。另一種就是在我們的java程式碼呼叫activemq相關的類來構造並啟動brokerService。
                 第二個問題就是資訊的持久化問題,這個要達到的最終效果是隻要資訊傳送成功,但沒有被消費掉,不論出現什麼情況(最終導致activemq關閉),當activemq重新啟動時沒有被消費的資訊仍然存在與訊息佇列裡。
                 要實現這個效果需要兩步:一是activemq在訊息被消費時要存進檔案或資料庫,而不是在記憶體裡。二是傳送到訊息要指明實現持久化。第一步的實現有多個方式(具體o就不說了),但大體上就是存進log檔案和存進本地資料庫兩種。關於activemq檔案配置的實現o就不提了。o要提的是通過java程式碼來實現資料的持久化。
程式碼如下:
        BrokerService broker=new BrokerService();

        broker.setBrokerName(brokerName);          //設定資訊持久化         

 //定義datasource

        DataSource orclds=(DataSource) new BasicDataSource();

       ((BasicDataSource)orclds).setPoolPreparedStatements(true);

        ((BasicDataSource)orclds).setMaxActive(200);

        ((BasicDataSource)orclds).setDriverClassName("     ");//資料庫驅動

         ((BasicDataSource)orclds).setUrl("       ");//資料庫地址

      ((BasicDataSource)orclds).setUsername("×××");//資料庫使用者名稱     
   ((BasicDataSource)orclds).setPassword("×××");//資料庫使用者密碼

       //定義JDBCPersistenceAdapter

       PersistenceAdapter jdbcperAdapter=new JDBCPersistenceAdapter();

      ((JDBCPersistenceAdapter)jdbcperAdapter).setBrokerService(broker);

         ((DataSourceSupport)jdbcperAdapter).setDataSource(orclds);

     broker.setPersistenceAdapter(jdbcperAdapter);

    broker.setPersistent(true);

      broker.addConnector("   ");//新增activemq連線

     broker.start();     

Object lock = new Object();                 

synchronized (lock) {

                         lock.wait();

                  }
如果要使用的是mysql資料庫可能還有幾點細節要注意。
在訊息傳送端,要設定訊息生產者為Persistence的。
producer.setDeliveryMode(DeliveryMode.PERSISTENT);//訊息持久化  

當然,還可以通過把activemq檔案拿到我們本地工程下來配置:
BrokerService broker = BrokerFactory.createBroker(new URI("xbean:com/activemq.xml"));
 
關於activemq獲取連線傳送訊息的方式:一個是工廠模式
  ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url);       connection = connectionFactory.createConnection();         

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 

 destination = session.createQueue(subject);                  

  producer = session.createProducer(destination);          
                producer.send(messagep);
另一個是jndi方式:
InitialContext initCtx = new InitialContext();
               Context envContext = (Context) initCtx.lookup("java:comp/env");
             ConnectionFactory connectionFactory = (ConnectionFactory) envContext.lookup("jms/NormalConnectionFactory");
               Connection connection = connectionFactory.createConnection();
               Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
               MessageProducer producer = jmsSession.createProducer((Destination) envContext.lookup("jms/topic/MyTopic")); 
               //設定持久方式
               producer.setDeliveryMode(DeliveryMode.PERSISTENT);
 
關於訊息的管理activemq提供了三個message類TextMessage,MapMessage,ObjectMessage
extmessage:
  Message testMessage = jmsSession.createMessage();
               //釋出重新整理文章訊息
               testMessage.setStringProperty("RefreshArticleId", "2046");      
               producer.send(testMessage);
               //釋出重新整理帖子訊息
               testMessage.clearProperties();
               testMessage.setStringProperty("RefreshThreadId", "331");
mapmessage:
MapMessage messagep = session.createMapMessage();<BR>         //構造訊息頭               <BR>         //messagep.setStringProperty("SENSE", "BTNM 3.0");<BR>         //messagep.setStringProperty("CLASS"," ComputerSystem");<BR>         //構造訊息體<BR>         //messagep.setString("CPULOAD:NO=0, ID=182910293","0.1");
關於object的o還沒有研究。