1. 程式人生 > >netty物聯網實踐之(一)

netty物聯網實踐之(一)

           最近接了個智慧手錶專案,正好學習實踐下netty

           對於netty的I/O模型,設計儘量不在在EventLoopGroup執行緒中處理耗時業務邏輯,這裡我引入了MQ(rabbitMQ),netty服務端在接收到客戶端指令以後,不作邏輯處理

只提取出客戶端channel標記資訊,就轉發指令到MQ,通過業務執行緒池專門處理業務。當需要向客戶端主動推送訊息時候,監聽MQ佇列,解析指令,主動推送訊息到需要推送的客戶端。

           初步設計流程圖大體如下:

                      

      首先netty服務端例項程式碼:

/**
 * Created by peng.wang on 2017/3/31.
 */
public class NettyServer {


    /**
     *  the core handler
     */
    private MessageHandler messageHandler;


   public void bind(int port){

       /**
        * used to accept client connection
        */
       EventLoopGroup bossGroup = new NioEventLoopGroup();
       /**
        * used to handle all the events and IO for socketChannel
        */
       EventLoopGroup workerGroup = new NioEventLoopGroup(4);

       try{
           ServerBootstrap bootstrap = new ServerBootstrap();
           bootstrap.group(bossGroup,workerGroup)
                   .channel(NioServerSocketChannel.class)
                   .option(ChannelOption.SO_BACKLOG,1024)
                   // set keepalive
                   .childOption(ChannelOption.SO_KEEPALIVE,true)
                   .childHandler(new ChannelInitializer<SocketChannel>() {
                       @Override
                       protected void initChannel(SocketChannel ch) throws Exception {
                           //ch.pipeline().addLast(new IdleStateHandler(0,0,300));
                           ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                           ch.pipeline().addLast(new StringDecoder());
                           ch.pipeline().addLast(messageHandler);
                       }
                   });

           /**
            * binding port. sync wait
            */
           ChannelFuture f = bootstrap.bind(port).sync();

           /**
            * sync wait for server close
            */
           f.channel().closeFuture().sync();
       } catch (Exception e) {
           e.printStackTrace();
       } finally {
           bossGroup.shutdownGracefully();
           workerGroup.shutdownGracefully();
       }

   }

        其中我們最關心的的是handler的處理邏輯,這裡給出個簡單例子:
/**
 * Created by peng.wang on 2017/3/31.
 */
public class MessageHandler extends ChannelHandlerAdapter implements MessageListener{


    private static final Logger log = LoggerFactory.getLogger(MessageHandler.class);

    /**
     * default thread number
     */
    private static final int DEFAULT_THREADS = 100;

    /**
     * send message pool
     */
    private ExecutorService executorService = Executors.newFixedThreadPool(DEFAULT_THREADS);

    /**
     *  channel container . store the channel user mapped
     */
    private ConcurrentHashMap<String,Channel> channelMap = new ConcurrentHashMap<>();

    private ConcurrentHashMap<Integer,String> channelCodeMap = new ConcurrentHashMap<>();

    /**
     * MQ template. used to publish message to rabbitMQ
     */
    private AmqpTemplate amqpTemplate;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {

        if (msg.equals("exit") || msg.equals("bye") || msg.equals("quit")) {
            ctx.close();
        }

        Optional.ofNullable(TcpMessage.parse(msg.toString())).ifPresent((m) -> {
            if (log.isDebugEnabled()) {
                log.debug(String.format("\r\nrec msg %s \r\nrsp msg -> 1", msg));
            }
            channelMap.put(m.getId(),ctx.channel());
            channelCodeMap.put(ctx.channel().hashCode(),m.getId());
            System.out.println("==============================");
            System.out.println(JSON.toJSONString(m));
            amqpTemplate.convertAndSend("queueTest2",m);
            //ctx.writeAndFlush("1\r\n");
        });

    }


    @Override
    public void channelReadComplete(ChannelHandlerContext ctx){
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
        log.error(cause.getMessage());
        ctx.close();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        /** remove inactive channel */
        channelMap.remove(channelCodeMap.get(ctx.channel().hashCode()));
        channelCodeMap.remove(ctx.channel().hashCode());
    }

    /**
     * listen the rabbitMQ message. send message to AI watch
     * @param message
     */
    @Override
    public void onMessage(Message message) {

        executorService.submit(() -> {
            doWork(message);
        });

    }

    /**
     * handler business
     * @param message
     */
    private void doWork(Message message){
       // System.out.println("==============********************" + new String(message.getBody()));
        JSONObject object = JSON.parseObject(new String(message.getBody()));
        String id = object.get("id").toString();
        String msg = object.get("msg").toString();
        channelMap.get(id).writeAndFlush(Unpooled.copiedBuffer((msg+System.getProperty("line.separator")).getBytes()));
    }

    public AmqpTemplate getAmqpTemplate() {
        return amqpTemplate;
    }

    public void setAmqpTemplate(AmqpTemplate amqpTemplate) {
        this.amqpTemplate = amqpTemplate;
    }
}

在接收到客戶端訊息以後沒做過多處理,直接轉發到MQ,這裡我採用Spring整合MQ的方式,具體配置檔案如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
    <!--配置connection-factory,指定連線rabbit server引數 -->
    <rabbit:connection-factory id="connectionFactory"
                               username="wangpeng" password="wangpeng" host="localhost" port="5672" />

    <!--定義rabbit template用於資料的接收和傳送 -->
    <rabbit:template id="amqpTemplate"  connection-factory="connectionFactory"
                      message-converter="jsonMessageConverter"/>

    <!--通過指定下面的admin資訊,當前producer中的exchange和queue會在rabbitmq伺服器上自動生成 -->
    <rabbit:admin connection-factory="connectionFactory" />

    <!--定義queue -->
    <rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" />

    <rabbit:queue name="queueTest2" durable="true" auto-delete="false" exclusive="false" />

    <rabbit:direct-exchange name="exchangeTest2" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="queueTest2" ></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 定義direct exchange,繫結queueTest -->
    <rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="queueTest" ></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 訊息接收者 -->
    <!-- 訊息物件json轉換類 -->
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

    <!-- queue litener  觀察 監聽模式 當有訊息到達時會通知監聽在對應的佇列上的監聽物件-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
        <rabbit:listener queues="queueTest" ref="messageHandler"/>
    </rabbit:listener-container>

</beans>

對於訊息的推送,通過監聽MQ佇列 ,當有訊息到達觸發onMessage方法 ,解析訊息所需要推送的客戶端,通過獲取對於channel推送訊息