基於Netty實現的Android 訊息推送(即時通訊)的解決方案
根據Netty框架實現訊息推送(即時聊天)功能.
Netty框架,TCP長連線,心跳,阻塞訊息佇列,執行緒池處理訊息傳送, 基於Google ProtoBuf自定義的訊息協議, TCP粘包/拆包....
客戶端通過TCP連線到伺服器,並建立TCP長連線;當伺服器端收到新訊息後通過TCP連線推送給客戶端, 即訊息傳遞方式: 客戶端A -> 伺服器 -> 客戶端B.
不說了,直接上核心程式碼吧:
================ Server端核心程式碼 ===========================
//服務端主類.
public final class NettyServer {
public final int HEART_SYNC_TIME = 300;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
public void bind(int port) throws InterruptedException{
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap=new ServerBootstrap();
try {
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipe = socketChannel.pipeline();
pipe.addLast( new ProtobufVarint32FrameDecoder());
pipe.addLast(new ProtobufDecoder(MessageProto.Message .getDefaultInstance()));
pipe.addLast(new ProtobufVarint32LengthFieldPrepender());
pipe.addLast(new ProtobufEncoder());
pipe.addLast(new MessageServerHandler());
}
});
// Bind and start to accept incoming connections.
ChannelFuture f = bootstrap.bind(port).sync();
if(f.isSuccess()){
}
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public synchronized void stop(){
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
public static void main(String []args) {
try {
server = new NettyServer();
if(args != null && args.length>0 && args[0].length() > 3){
PORT = Integer.parseInt(args[0]);
}
//message work thread.
new Thread(new Runnable(){
@Override
public void run() {
MessageManager.getInstance().start();
}
}).start();
server.bind(PORT);
} catch (InterruptedException e) {
MessageManager.getInstance().stop();
UserManager.getInstance().clearAll();
server.stop();
e.printStackTrace();
}
}
}
//訊息分發
public class MessageManager {
private static MessageManager manager;
private LinkedBlockingQueue<Message> mMessageQueue = new LinkedBlockingQueue<Message>();
private ThreadPoolExecutor mPoolExecutor = new ThreadPoolExecutor(5, 10, 15, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.AbortPolicy());
private MessageManager(){
}
public static MessageManager getInstance(){
if(manager == null){
synchronized (MessageManager.class) {
if(manager == null){
manager = new MessageManager();
}
}
}
return manager;
}
public void putMessage(Message message){
Log.debug("MessageManager-> putMessage()..." + message.getClientID() + ", " + message.getBody());
try {
mMessageQueue.put(message);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void start(){
Log.debug("MessageManager-> start()... ");
while(true){
try {
Message message = mMessageQueue.take();
mPoolExecutor.execute(new SendMessageTask(message));
} catch (InterruptedException e) {
e.printStackTrace();
break;
}catch (RejectedExecutionException e){
Log.debug("MessageManager-> 伺服器訊息佇列已滿...延時2妙後繼續傳送...");
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
continue;
}
}
}
public void stop(){
Log.debug("MessageManager-> stop()... ");
mMessageQueue.clear();
mPoolExecutor.shutdownNow();
}
class SendMessageTask implements Runnable{
private Message message;
public SendMessageTask(Message message){
this.message = message;
}
@Override
public void run() {
if(message.getReceiveID().length() > 2){
Log.debug("MessageManager-> sendMessage... to client: " + message.getReceiveID() + ", " + message.getBody());
//傳送單聊訊息;
SocketChannel channel = UserManager.getInstance().getUserChannel(message.getReceiveID());
if(channel != null && channel.isActive()) {
channel.writeAndFlush(message);
}
}else{
Log.debug("MessageManager-> sendMessage... to group: " + message.getGroupID() + ", " + message.getBody());
//傳送群聊訊息;
CopyOnWriteArrayList<String> userList = UserManager.getInstance().getUserListInGroup(message.getGroupID());
for(String user:userList){
if(!user.equalsIgnoreCase(message.getClientID())){
SocketChannel channel = UserManager.getInstance().getUserChannel(user);
if(channel != null && channel.isActive()) {
channel.writeAndFlush(message);
}
}
}
}
}
}
}
//負責管理連線的客戶端.
public final class UserManager {
private static UserManager manager;
private static Map<String, SocketChannel> userList = new ConcurrentHashMap();
private static Map<String, CopyOnWriteArrayList<String>> groupList = new ConcurrentHashMap();
private UserManager(){
}
public static UserManager getInstance(){
if(manager == null){
synchronized (UserManager.class) {
if(manager == null){
manager = new UserManager();
}
}
}
return manager;
}
@SuppressWarnings("unchecked")
public void addUser(String groupID, String clientID, SocketChannel channel){
userList.put(clientID, channel);
if(groupList.get(groupID) == null){
Log.debug("addUser()... create new group-> " + groupID + ", " + clientID);
CopyOnWriteArrayList<String> users = new CopyOnWriteArrayList<String>();
users.add(clientID);
groupList.put(groupID, users);
}else{
Log.debug("addUser()... " + groupID + ", " + clientID);
groupList.get(groupID).add(clientID);
}
}
public SocketChannel getUserChannel(String clientID){
return (SocketChannel) userList.get(clientID);
}
public void removeUser(String groupID, String clientID){
Log.debug("removeUser()... " + groupID + ", " + clientID);
userList.remove(clientID);
CopyOnWriteArrayList<String> list = groupList.get(groupID);
int count = list.size();
for(int i=0; i<count; i++){
if(list.get(i).equalsIgnoreCase(clientID)){
groupList.get(groupID).remove(i);
break;
}
}
}
public void removeChannel(SocketChannel channel){
Iterator<Entry<String, SocketChannel>> entries = userList.entrySet().iterator();
while (entries.hasNext()) {
Entry<String, SocketChannel> entry = entries.next();
if(entry.getValue().equals(channel)){
entries.remove();
Log.debug("removeChannel()... " + entry.getKey());
return;
}
}
}
/**
* work in single thread.
* @param groupID
* @return
*/
public CopyOnWriteArrayList<String> getUserListInGroup(String groupID){
return groupList.get(groupID);
}
public int getTotalUserCount(){
return userList.size();
}
public int getGroupSize(String groupID){
return groupList.get(groupID).size();
}
public void clearGroup(String groupID){
groupList.get(groupID).clear();
}
public void clearAll(){
groupList.clear();
userList.clear();
}
}
//連線的ChannelHandler
public class MessageServerHandler extends ChannelHandlerAdapter{
@Override
public void channelInactive(ChannelHandlerContext cxt) throws Exception {
Log.debug("channelInactive()...");
UserManager.getInstance().removeChannel((SocketChannel)cxt.channel());
}
@Override
public void channelRead(ChannelHandlerContext cxt, Object obj) throws Exception {
Log.debug("channelRead()... threadId: " + Thread.currentThread().getId());
MessageProto.Message message = (MessageProto.Message) obj;
switch(message.getMsgType()){
case PING:
Log.debug("received ping..." + message.getClientID());
cxt.channel().writeAndFlush(createResponseMsg(message, MessageType.PING, null));
break;
case LOGIN:
UserManager.getInstance().addUser(message.getGroupID(), message.getClientID(), (SocketChannel)cxt.channel());
int count = UserManager.getInstance().getGroupSize(message.getGroupID());
Log.debug("received login..." + message.getClientID() + ", count: " + count);
cxt.channel().writeAndFlush(createResponseMsg(message, MessageType.LOGIN, "userCount:" + count));
Log.debug("received login sended..." + message.getClientID());
MessageManager.getInstance().putMessage((createResponseMsg(message, MessageType.MESSAGE, "大家好! 我來了...")));
Log.debug("received login sended 222..." + message.getClientID());
cxt.channel().writeAndFlush(createResponseMsg(MessageProto.Message.newBuilder().setClientID("管理員").setGroupID(message.getGroupID()).setMsgType(MessageType.MESSAGE).build(), MessageType.MESSAGE, "歡迎 " + message.getClientID() + " 加入本群..."));
break;
case MESSAGE:
Log.debug("received message..." + message.getClientID() + ", " + message.getBody());
MessageManager.getInstance().putMessage(createResponseMsg(message, MessageType.MESSAGE, null));
break;
case LOGOUT:
UserManager.getInstance().removeUser(message.getGroupID(), message.getClientID());
Log.debug("received logout..." + message.getClientID() + ", count: " + UserManager.getInstance().getGroupSize(message.getGroupID()));
MessageManager.getInstance().putMessage(createResponseMsg(message, MessageType.MESSAGE, "大家聊! 我走了...."));
break;
}
}
@Override
public void exceptionCaught(ChannelHandlerContext cxt, Throwable cause) {
Log.debug("exceptionCaught()...");
UserManager.getInstance().removeChannel((SocketChannel)cxt.channel());
cause.printStackTrace();
cxt.close();
}
private Message createResponseMsg(Message receivedMsg, MessageType type, String body) {
MessageProto.Message.Builder builder = MessageProto.Message.newBuilder();
builder.setClientID(receivedMsg.getClientID());
builder.setMsgType(type);
builder.setGroupID(receivedMsg.getGroupID());
if(body != null){
builder.setBody(body);
}else{
builder.setBody(receivedMsg.getBody());
}
return builder.build();
}
}
===============================客戶端核心程式碼=======================================
public class NettyClient {
private String TAG = "ChatClient";
private static NettyClient client;
private EventLoopGroup eventLoopGroup;
private SocketChannel socketChannel;
private String clientID;
private String groupID;
private final int HEART_PING_TIME = 180;
private NettyClient(){
}
public static NettyClient getInstance(){
if(client == null){
synchronized (NettyClient.class) {
if(client == null){
client = new NettyClient();
}
}
}
return client;
}
public void connect(String serverIP, int port) {
eventLoopGroup = new NioEventLoopGroup();
try{
Bootstrap bootstrap=new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
bootstrap.group(eventLoopGroup);
bootstrap.remoteAddress(serverIP, port);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new IdleStateHandler(0, 0, HEART_PING_TIME));
socketChannel.pipeline().addLast( new ProtobufVarint32FrameDecoder());
socketChannel.pipeline().addLast(new ProtobufDecoder(MessageProto.Message.getDefaultInstance()));
socketChannel.pipeline().addLast( new ProtobufVarint32LengthFieldPrepender());
socketChannel.pipeline().addLast(new ProtobufEncoder());
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
ChannelFuture future =bootstrap.connect(serverIP, port).sync();
if (future.isSuccess()) {
socketChannel = (SocketChannel) future.channel();
Log.d(TAG, "connect server success...");
}
}catch(InterruptedException e) {
e.printStackTrace();
eventLoopGroup.shutdownGracefully();
}
}
public synchronized void sendMessage(MessageProto.Message message){
Log.d(TAG, "sendMessage()..." + message.getBody());
socketChannel.writeAndFlush(message);
}
public void sync() throws InterruptedException{
socketChannel.closeFuture().sync();
}
public synchronized void stop(){
socketChannel.writeAndFlush(createMessage(MessageType.LOGOUT, null));
eventLoopGroup.shutdownGracefully();
}
public void setClientID(String clientID){
this.clientID = clientID;
}
public String getClientID(){
return this.clientID;
}
public void setGroupID(String groupID){
this.groupID = groupID;
}
public String getGroupID(){
return this.groupID;
}
public Message createMessage(MessageType type, String body){
MessageProto.Message.Builder builder = MessageProto.Message.newBuilder();
builder.setClientID(clientID);
builder.setMsgType(type);
builder.setGroupID(groupID);
if(body != null){
builder.setBody(body);
}
return builder.build();
}
}
public class NettyClientHandler extends SimpleChannelInboundHandler<Message> {
private String TAG = "ChatClient";
private int pingCount = 0;
@Override
public void userEventTriggered(ChannelHandlerContext cxt, Object event) throws Exception {
Log.d(TAG, "userEventTriggered()...");
if (event instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) event;
switch (e.state()) {
case ALL_IDLE:
cxt.writeAndFlush(NettyClient.getInstance().createMessage(Message.MessageType.PING, null));
pingCount++;
Log.d(TAG, "send ping to server...");
if(pingCount > 3){
Log.d(TAG, "heart timeout, so disconnect...");
cxt.close();
}
break;
default:
break;
}
}
}
@Override
protected void messageReceived(ChannelHandlerContext channelHandlerContext, MessageProto.Message message) throws Exception {
Log.d(TAG, "messageReceived()...");
switch(message.getMsgType()){
case PING:
pingCount = 0;
break;
case MESSAGE:
Log.d(TAG, "Received message: " + message.getClientID() + ", " + message.getBody());
EventBus.getDefault().post(message);
break;
}
ReferenceCountUtil.release(message);
}
@Override
public void exceptionCaught(ChannelHandlerContext cxt, Throwable cause) {
Log.d(TAG, "messageReceived()...");
cause.printStackTrace();
cxt.close();
}
}