搭建SpringBoot+ Netty + WebSocket 通訊協議框架
阿新 • • 發佈:2019-01-05
運用場景:與機器裝置進行通訊或者其他場景;
pom檔案就不上傳了,直接上程式碼,網上都可以找的到 主要是 SpringBoot 和 Netty 的依賴
1.配置類
@Component @ConfigurationProperties(prefix = "ws") public class WebSocketConfig { private int port; private String host; private boolean ssl; private String cert; private String key; public int getPort() { return port; } public void setPort(int port) { this.port = port; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public boolean getSsl() { return ssl; } public void setSsl(boolean ssl) { this.ssl = ssl; } public String getKey() { return key; } public void setKey(String key) { this.key = key; } public String getCert() { return cert; } public void setCert(String cert) { this.cert = cert; } }
2.properties 配置檔案
ws.ssl=false
ws.port=8082
ws.host=localhost
3.Action
public class Action { private Object target; private Method method; /** * @param target * @param method */ public Action(Object target, Method method) { this.target = target; this.method = method; } public Object call(Object... args) throws InvocationTargetException, IllegalAccessException { return method.invoke(target, args); } public Object getTarget() { return target; } public void setTarget(Object target) { this.target = target; } public Method getMethod() { return method; } public void setMethod(Method method) { this.method = method; } @Override public String toString() { return "ActionMethod{" + "target=" + target + ", method=" + method + '}'; } }
4.定義Command註解
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface Command {
int value();
}
5.路由類
public class Router { private static Map<Integer, Action> routers = new ConcurrentHashMap<Integer, Action>(); public static void add(int command, Action action) { if (routers.containsKey(command)) { // log.warn("Router: duplicate command " + command); } routers.put(command, action); } public static void dispatcher(Integer command, ChannelHandlerContext ctx, Object msg) throws Exception { Action action = routers.get(command); if (action == null) { // throw new Exception("Action Not Found: " + command); // log.warn("Router not found: " + command); return; } action.call(ctx, msg); } }
6.Handler掃描器
@Component
public class Scaner implements BeanPostProcessor {
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<? extends Object> clazz = bean.getClass();
clazz.getAnnotation(Command.class);
Command command = clazz.getAnnotation(Command.class);
if (command != null) {
Method[] methods = clazz.getMethods();
for (Method method : methods) {
if (method.getName().equals("handler")) {
Router.add(command.value(), new Action(bean, method));
}
}
}
return bean;
}
}
7.Netty服務
@Component
public class Server {
@Autowired
WebSocket webSocket;
/**
* 建立bootstrap
*/
private ServerBootstrap serverBootstrap;
/**
* BOSS
*/
private EventLoopGroup bossGroup;
/**
* Worker
*/
private EventLoopGroup workerGroup;
/**
* NETT伺服器配置類
*/
@Resource
private WebSocketConfig webSocketConfig;
/**
* 關閉伺服器方法
*/
@PreDestroy
public void close() {
// 優雅退出
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
/**
* 開啟及服務執行緒
*/
public void start() throws Exception {
int port = webSocketConfig.getPort();
serverBootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_LINGER, 0)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);
try {
//設定事件處理
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline
.addLast(new IdleStateHandler(50, 50, 50, TimeUnit.SECONDS))
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(65536))
.addLast(new ChunkedWriteHandler())
//設定websocket路徑
.addLast(new WebSocketServerProtocolHandler("/sweeper", null, true))
.addLast(webSocket);
}
});
ChannelFuture f = serverBootstrap.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
8.websocket核心類
@Service()
@Scope("prototype")
@ChannelHandler.Sharable
public class WebSocket extends SimpleChannelInboundHandler<Object> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
if (((IdleStateEvent) evt).state() == IdleState.READER_IDLE) {
ctx.channel().writeAndFlush(new TextWebSocketFrame(Protocol.PING));
} else if (((IdleStateEvent) evt).state() == IdleState.WRITER_IDLE) {
ctx.channel().writeAndFlush(new TextWebSocketFrame(Protocol.PING));
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof TextWebSocketFrame) {
String text = ((TextWebSocketFrame) msg).text();
if (text != null && text.length() > 0) {
Protocol message = new Protocol(text);
Router.dispatcher(message.getCmd(), ctx, message);
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
9.定義Handler介面
public interface Handler {
public void handler(ChannelHandlerContext ctx, Protocol msg);
}
10.幀
@Command(Protocol.CMD_PING)
public class Ping implements Handler {
public void handler(ChannelHandlerContext ctx, Protocol msg) {
ctx.writeAndFlush(new TextWebSocketFrame(Protocol.PONG));
}
}
11.定製協議
public class Protocol {
public static final int CMD_PING = 0;
public static final int CMD_PONG = 1;
public static final int CMD_TEST = 123;
public static final String PING = "[" + CMD_PING + "]";
public static final String PONG = "[" + CMD_PONG + "]";
public static final int RESPONSE_OK = 0;
public static final int RESPONSE_ERROR = 1;
private Integer cmd;
private String msg;
private JSONArray json;
public Protocol() {
}
public Protocol(String msg) {
this.json = JSON.parseArray(msg);
this.cmd = json.getInteger(0);
this.msg = msg;
}
public Integer getCmd() {
return cmd;
}
public String getMsg() {
return msg;
}
public Object getBody() {
Object body = null;
return body;
}
public static class Response {
private JSONArray jsonArray = new JSONArray();
public Response(int cmd) {
jsonArray.add(0, cmd);
jsonArray.add(1, Protocol.RESPONSE_OK);
}
public Response(int cmd, int error) {
jsonArray.add(0, cmd);
jsonArray.add(1, error);
}
public String toJSONString() {
return jsonArray.toJSONString();
}
}
}
12.啟動類
@SpringBootApplication
@EnableAutoConfiguration
@MapperScan("com.xxx.dao")
public class Application implements CommandLineRunner{
Logger Log = LoggerFactory.getLogger(Application.class);
@Autowired
Server server;
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
public void run(String... args) throws Exception {
// TODO Auto-generated method stub
Log.info("-------------------開啟websocket----------------------");
server.start();
}
}
測試:
@Command(Protocol.CMD_TEST)
public class Test implements Handler{
@Override
public void handler(ChannelHandlerContext ctx, Protocol msg) {
// TODO Auto-generated method stub
ctx.writeAndFlush(new TextWebSocketFrame("[生病][生病][生病][生病]"));
}
}
這裡必須要實現Handler介面重寫handler方法
測試效果