1. 程式人生 > >搭建SpringBoot+ Netty + WebSocket 通訊協議框架

搭建SpringBoot+ Netty + WebSocket 通訊協議框架

運用場景:與機器裝置進行通訊或者其他場景;
pom檔案就不上傳了,直接上程式碼,網上都可以找的到 主要是 SpringBoot 和 Netty 的依賴

1.配置類

@Component
@ConfigurationProperties(prefix = "ws")
public class WebSocketConfig {

    private int port;
    private String host;
    private boolean ssl;
    private String cert;
    private String key;

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public boolean getSsl() {
        return ssl;
    }

    public void setSsl(boolean ssl) {
        this.ssl = ssl;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public String getCert() {
        return cert;
    }

    public void setCert(String cert) {
        this.cert = cert;
    }
}

2.properties 配置檔案

ws.ssl=false
ws.port=8082
ws.host=localhost

3.Action

public class Action {

    private Object target;

    private Method method;


    /**
     * @param target
     * @param method
     */
    public Action(Object target, Method method) {
        this.target = target;
        this.method = method;
    }

    public Object call(Object... args) throws InvocationTargetException, IllegalAccessException {
        return method.invoke(target, args);
    }

    public Object getTarget() {
        return target;
    }

    public void setTarget(Object target) {
        this.target = target;
    }

    public Method getMethod() {
        return method;
    }

    public void setMethod(Method method) {
        this.method = method;
    }

    @Override
    public String toString() {
        return "ActionMethod{" +
                "target=" + target +
                ", method=" + method +
                '}';
    }
}

4.定義Command註解

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface Command {
    int value();
}

5.路由類

public class Router {

    private static Map<Integer, Action> routers = new ConcurrentHashMap<Integer, Action>();

    public static void add(int command, Action action) {

        if (routers.containsKey(command)) {
//            log.warn("Router: duplicate command " + command);
        }

        routers.put(command, action);
    }

    public static void dispatcher(Integer command, ChannelHandlerContext ctx, Object msg) throws Exception {
        Action action = routers.get(command);

        if (action == null) {
//            throw new Exception("Action Not Found: " + command);
//            log.warn("Router not found: " + command);
            return;
        }

        action.call(ctx, msg);
    }
}

6.Handler掃描器

@Component
public class Scaner implements BeanPostProcessor {

    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

        Class<? extends Object> clazz = bean.getClass();
        clazz.getAnnotation(Command.class);
        Command command = clazz.getAnnotation(Command.class);

        if (command != null) {
            Method[] methods = clazz.getMethods();
            for (Method method : methods) {
                if (method.getName().equals("handler")) {
                    Router.add(command.value(), new Action(bean, method));
                }
            }
        }

        return bean;
    }
}

7.Netty服務

@Component
public class Server {

    @Autowired
    WebSocket webSocket;
    /**
     * 建立bootstrap
     */
    private ServerBootstrap serverBootstrap;
    /**
     * BOSS
     */
    private EventLoopGroup bossGroup;
    /**
     * Worker
     */
    private EventLoopGroup workerGroup;
    /**
     * NETT伺服器配置類
     */
    @Resource
    private WebSocketConfig webSocketConfig;

    /**
     * 關閉伺服器方法
     */
    @PreDestroy
    public void close() {
        // 優雅退出
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }

    /**
     * 開啟及服務執行緒
     */
    public void start() throws Exception {
        int port = webSocketConfig.getPort();

        serverBootstrap = new ServerBootstrap();
        bossGroup = new NioEventLoopGroup();
        workerGroup = new NioEventLoopGroup();

        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.INFO))
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childOption(ChannelOption.SO_LINGER, 0)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_KEEPALIVE, true);


        try {
            //設定事件處理
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    
                    pipeline
                            .addLast(new IdleStateHandler(50, 50, 50, TimeUnit.SECONDS))
                            .addLast(new HttpServerCodec())
                            .addLast(new HttpObjectAggregator(65536))
                            .addLast(new ChunkedWriteHandler())
                            //設定websocket路徑
                            .addLast(new WebSocketServerProtocolHandler("/sweeper", null, true))
                            .addLast(webSocket);
                }
            });
            
            ChannelFuture f = serverBootstrap.bind(port).sync();
            f.channel().closeFuture().sync();

        } catch (InterruptedException e) {

            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

8.websocket核心類

@Service()
@Scope("prototype")
@ChannelHandler.Sharable
public class WebSocket extends SimpleChannelInboundHandler<Object> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    }
     
  

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            if (((IdleStateEvent) evt).state() == IdleState.READER_IDLE) {

                ctx.channel().writeAndFlush(new TextWebSocketFrame(Protocol.PING));
            } else if (((IdleStateEvent) evt).state() == IdleState.WRITER_IDLE) {
                ctx.channel().writeAndFlush(new TextWebSocketFrame(Protocol.PING));
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof TextWebSocketFrame) {
            String text = ((TextWebSocketFrame) msg).text();
            if (text != null && text.length() > 0) {
                Protocol message = new Protocol(text);
                Router.dispatcher(message.getCmd(), ctx, message);
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

9.定義Handler介面

public interface Handler {
    public void handler(ChannelHandlerContext ctx, Protocol msg);
}

10.幀

@Command(Protocol.CMD_PING)
public class Ping implements Handler {

    public void handler(ChannelHandlerContext ctx, Protocol msg) {
        ctx.writeAndFlush(new TextWebSocketFrame(Protocol.PONG));
    }
}

11.定製協議

public class Protocol {

	public static final int CMD_PING = 0;
	public static final int CMD_PONG = 1;
	
	public static final int CMD_TEST = 123;

	public static final String PING = "[" + CMD_PING + "]";
	public static final String PONG = "[" + CMD_PONG + "]";

	public static final int RESPONSE_OK = 0;
	public static final int RESPONSE_ERROR = 1;

	private Integer cmd;
	private String msg;
	private JSONArray json;

	public Protocol() {

	}

	public Protocol(String msg) {
		this.json = JSON.parseArray(msg);
		this.cmd = json.getInteger(0);
		this.msg = msg;
	}

	public Integer getCmd() {
		return cmd;
	}

	public String getMsg() {
		return msg;
	}

	public Object getBody() {
		Object body = null;

		return body;
	}

	public static class Response {

		private JSONArray jsonArray = new JSONArray();

		public Response(int cmd) {
			jsonArray.add(0, cmd);
			jsonArray.add(1, Protocol.RESPONSE_OK);
		}

		public Response(int cmd, int error) {
			jsonArray.add(0, cmd);
			jsonArray.add(1, error);
		}

		public String toJSONString() {
			return jsonArray.toJSONString();
		}
	}

}

12.啟動類

@SpringBootApplication
@EnableAutoConfiguration
@MapperScan("com.xxx.dao")
public class Application implements CommandLineRunner{
	
	Logger Log = LoggerFactory.getLogger(Application.class);
	
	@Autowired
	Server server;

	public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

	public void run(String... args) throws Exception {
		// TODO Auto-generated method stub
		Log.info("-------------------開啟websocket----------------------");
		server.start();
	}
}

測試:

@Command(Protocol.CMD_TEST)
public class Test implements Handler{

	@Override
	public void handler(ChannelHandlerContext ctx, Protocol msg) {
		// TODO Auto-generated method stub
		ctx.writeAndFlush(new TextWebSocketFrame("[生病][生病][生病][生病]"));
	}

}

這裡必須要實現Handler介面重寫handler方法

測試效果

在這裡插入圖片描述
在這裡插入圖片描述
在這裡插入圖片描述
在這裡插入圖片描述