netty物聯網實踐之(一)
阿新 • • 發佈:2019-01-05
最近接了個智慧手錶專案,正好學習實踐下netty
對於netty的I/O模型,設計儘量不在在EventLoopGroup執行緒中處理耗時業務邏輯,這裡我引入了MQ(rabbitMQ),netty服務端在接收到客戶端指令以後,不作邏輯處理
只提取出客戶端channel標記資訊,就轉發指令到MQ,通過業務執行緒池專門處理業務。當需要向客戶端主動推送訊息時候,監聽MQ佇列,解析指令,主動推送訊息到需要推送的客戶端。
初步設計流程圖大體如下:
首先netty服務端例項程式碼:
/** * Created by peng.wang on 2017/3/31. */ public class NettyServer { /** * the core handler */ private MessageHandler messageHandler; public void bind(int port){ /** * used to accept client connection */ EventLoopGroup bossGroup = new NioEventLoopGroup(); /** * used to handle all the events and IO for socketChannel */ EventLoopGroup workerGroup = new NioEventLoopGroup(4); try{ ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) // set keepalive .childOption(ChannelOption.SO_KEEPALIVE,true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //ch.pipeline().addLast(new IdleStateHandler(0,0,300)); ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(messageHandler); } }); /** * binding port. sync wait */ ChannelFuture f = bootstrap.bind(port).sync(); /** * sync wait for server close */ f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
其中我們最關心的的是handler的處理邏輯,這裡給出個簡單例子:
/** * Created by peng.wang on 2017/3/31. */ public class MessageHandler extends ChannelHandlerAdapter implements MessageListener{ private static final Logger log = LoggerFactory.getLogger(MessageHandler.class); /** * default thread number */ private static final int DEFAULT_THREADS = 100; /** * send message pool */ private ExecutorService executorService = Executors.newFixedThreadPool(DEFAULT_THREADS); /** * channel container . store the channel user mapped */ private ConcurrentHashMap<String,Channel> channelMap = new ConcurrentHashMap<>(); private ConcurrentHashMap<Integer,String> channelCodeMap = new ConcurrentHashMap<>(); /** * MQ template. used to publish message to rabbitMQ */ private AmqpTemplate amqpTemplate; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException { if (msg.equals("exit") || msg.equals("bye") || msg.equals("quit")) { ctx.close(); } Optional.ofNullable(TcpMessage.parse(msg.toString())).ifPresent((m) -> { if (log.isDebugEnabled()) { log.debug(String.format("\r\nrec msg %s \r\nrsp msg -> 1", msg)); } channelMap.put(m.getId(),ctx.channel()); channelCodeMap.put(ctx.channel().hashCode(),m.getId()); System.out.println("=============================="); System.out.println(JSON.toJSONString(m)); amqpTemplate.convertAndSend("queueTest2",m); //ctx.writeAndFlush("1\r\n"); }); } @Override public void channelReadComplete(ChannelHandlerContext ctx){ ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){ log.error(cause.getMessage()); ctx.close(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { /** remove inactive channel */ channelMap.remove(channelCodeMap.get(ctx.channel().hashCode())); channelCodeMap.remove(ctx.channel().hashCode()); } /** * listen the rabbitMQ message. send message to AI watch * @param message */ @Override public void onMessage(Message message) { executorService.submit(() -> { doWork(message); }); } /** * handler business * @param message */ private void doWork(Message message){ // System.out.println("==============********************" + new String(message.getBody())); JSONObject object = JSON.parseObject(new String(message.getBody())); String id = object.get("id").toString(); String msg = object.get("msg").toString(); channelMap.get(id).writeAndFlush(Unpooled.copiedBuffer((msg+System.getProperty("line.separator")).getBytes())); } public AmqpTemplate getAmqpTemplate() { return amqpTemplate; } public void setAmqpTemplate(AmqpTemplate amqpTemplate) { this.amqpTemplate = amqpTemplate; } }
在接收到客戶端訊息以後沒做過多處理,直接轉發到MQ,這裡我採用Spring整合MQ的方式,具體配置檔案如下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <!--配置connection-factory,指定連線rabbit server引數 --> <rabbit:connection-factory id="connectionFactory" username="wangpeng" password="wangpeng" host="localhost" port="5672" /> <!--定義rabbit template用於資料的接收和傳送 --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter"/> <!--通過指定下面的admin資訊,當前producer中的exchange和queue會在rabbitmq伺服器上自動生成 --> <rabbit:admin connection-factory="connectionFactory" /> <!--定義queue --> <rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue name="queueTest2" durable="true" auto-delete="false" exclusive="false" /> <rabbit:direct-exchange name="exchangeTest2" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="queueTest2" ></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!-- 定義direct exchange,繫結queueTest --> <rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="queueTest" ></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!-- 訊息接收者 --> <!-- 訊息物件json轉換類 --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <!-- queue litener 觀察 監聽模式 當有訊息到達時會通知監聽在對應的佇列上的監聽物件--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <rabbit:listener queues="queueTest" ref="messageHandler"/> </rabbit:listener-container> </beans>
對於訊息的推送,通過監聽MQ佇列 ,當有訊息到達觸發onMessage方法 ,解析訊息所需要推送的客戶端,通過獲取對於channel推送訊息