Dubbo原始碼解析(十五)遠端通訊——Mina
遠端通訊——Mina
目標:介紹基於Mina的來實現的遠端通訊、介紹dubbo-remoting-mina內的原始碼解析。
前言
Apache MINA是一個網路應用程式框架,可幫助使用者輕鬆開發高效能和高可擴充套件性的網路應用程式。它通過Java NIO在各種傳輸(如TCP / IP和UDP / IP)上提供抽象的事件驅動非同步API。它通常被稱為NIO框架庫、客戶端伺服器框架庫或者網路套接字型檔。那麼本問就要講解在dubbo專案中,基於mina的API實現服務端和客戶端來完成遠端通訊這件事情。
下面是mina實現的包結構:
原始碼分析
(一)MinaChannel
該類繼承了AbstractChannel,是基於mina實現的通道。
1.屬性
private static final Logger logger = LoggerFactory.getLogger(MinaChannel.class);
/**
* 通道的key
*/
private static final String CHANNEL_KEY = MinaChannel.class.getName() + ".CHANNEL";
/**
* mina中的一個控制程式碼,表示兩個端點之間的連線,與傳輸型別無關
*/
private final IoSession session;
複製程式碼
該類的屬性除了封裝了一個CHANNEL_KEY以外,還用到了mina中的IoSession,它封裝著一個連線所需要的方法,比如獲得遠端地址等。
2.getOrAddChannel
static MinaChannel getOrAddChannel(IoSession session,URL url,ChannelHandler handler) {
// 如果連線session為空,則返回空
if (session == null) {
return null;
}
// 獲得MinaChannel例項
MinaChannel ret = (MinaChannel) session.getAttribute(CHANNEL_KEY);
// 如果不存在,則建立
if (ret == null ) {
// 建立一個MinaChannel例項
ret = new MinaChannel(session,url,handler);
// 如果兩個端點連線
if (session.isConnected()) {
// 把新建立的MinaChannel新增到session 中
MinaChannel old = (MinaChannel) session.setAttribute(CHANNEL_KEY,ret);
// 如果屬性的舊值不為空,則重新設定舊值
if (old != null) {
session.setAttribute(CHANNEL_KEY,old);
ret = old;
}
}
}
return ret;
}
複製程式碼
該方法是一個獲得MinaChannel物件的方法,其中每一個MinaChannel都會被放在session的屬性值中。
3.removeChannelIfDisconnected
static void removeChannelIfDisconnected(IoSession session) {
if (session != null && !session.isConnected()) {
session.removeAttribute(CHANNEL_KEY);
}
}
複製程式碼
該方法是當沒有連線時移除該通道,比較簡單。
4.send
@Override
public void send(Object message,boolean sent) throws RemotingException {
super.send(message,sent);
boolean success = true;
int timeout = 0;
try {
// 傳送訊息,返回future
WriteFuture future = session.write(message);
// 如果已經傳送過了
if (sent) {
// 獲得延遲時間
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
// 等待timeout的連線時間後檢視是否傳送成功
success = future.join(timeout);
}
} catch (Throwable e) {
throw new RemotingException(this,"Failed to send message " + message + " to " + getRemoteAddress() + ",cause: " + e.getMessage(),e);
}
if (!success) {
throw new RemotingException(this,"Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
複製程式碼
該方法是最關鍵的傳送訊息,其中呼叫到了session的write方法,就是mina封裝的傳送訊息。並且根據返回的WriteFuture物件來判斷是否傳送成功。
(二)MinaHandler
該類繼承了IoHandlerAdapter,是通道處理器實現類,其中就是mina專案中IoHandler介面的幾個 方法。
/**
* url物件
*/
private final URL url;
/**
* 通道處理器物件
*/
private final ChannelHandler handler;
複製程式碼
該類有兩個屬性,上述提到的實現IoHandler介面方法都是呼叫了handler來實現的,我就舉例講一個,其他的都一樣的寫法:
@Override
public void sessionOpened(IoSession session) throws Exception {
// 獲得MinaChannel物件
MinaChannel channel = MinaChannel.getOrAddChannel(session,handler);
try {
// 呼叫接連該通道
handler.connected(channel);
} finally {
// 如果沒有連線則移除通道
MinaChannel.removeChannelIfDisconnected(session);
}
}
複製程式碼
該方法在IoHandler中叫做sessionOpened,其實就是連線方法,所以呼叫的是handler.connected。其他方法也一樣,請自行檢視。
(三)MinaClient
該類繼承了AbstractClient類,是基於mina實現的客戶端類。
1.屬性
/**
* 套接字連線集合
*/
private static final Map<String,SocketConnector> connectors = new ConcurrentHashMap<String,SocketConnector>();
/**
* 連線的key
*/
private String connectorKey;
/**
* 套接字連線者
*/
private SocketConnector connector;
/**
* 一個控制程式碼
*/
private volatile IoSession session; // volatile,please copy reference to use
複製程式碼
該類中的屬性都跟mina專案中封裝類有關係。
2.doOpen
@Override
protected void doOpen() throws Throwable {
// 用url來作為key
connectorKey = getUrl().toFullString();
// 先從集合中取套接字連線
SocketConnector c = connectors.get(connectorKey);
if (c != null) {
connector = c;
// 如果為空
} else {
// set thread pool. 設定執行緒池
connector = new SocketConnector(Constants.DEFAULT_IO_THREADS,Executors.newCachedThreadPool(new NamedThreadFactory("MinaClientWorker",true)));
// config 獲得套接字連線配置
SocketConnectorConfig cfg = (SocketConnectorConfig) connector.getDefaultConfig();
cfg.setThreadModel(ThreadModel.MANUAL);
// 啟用TCP_NODELAY
cfg.getSessionConfig().setTcpNoDelay(true);
// 啟用SO_KEEPALIVE
cfg.getSessionConfig().setKeepAlive(true);
int timeout = getConnectTimeout();
// 設定連線超時時間
cfg.setConnectTimeout(timeout < 1000 ? 1 : timeout / 1000);
// set codec.
// 設定編解碼器
connector.getFilterChain().addLast("codec",new ProtocolCodecFilter(new MinaCodecAdapter(getCodec(),getUrl(),this)));
// 加入集合
connectors.put(connectorKey,connector);
}
}
複製程式碼
該方法是開啟客戶端,在mina中用套接字連線者connector來表示。其中的操作就是新建一個connector,並且設定相應的屬性,然後加入集合。
3.doConnect
@Override
protected void doConnect() throws Throwable {
// 連線伺服器
ConnectFuture future = connector.connect(getConnectAddress(),new MinaHandler(getUrl(),this));
long start = System.currentTimeMillis();
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
// 用於對執行緒的阻塞和喚醒
final CountDownLatch finish = new CountDownLatch(1); // resolve future.awaitUninterruptibly() dead lock
// 加入監聽器
future.addListener(new IoFutureListener() {
@Override
public void operationComplete(IoFuture future) {
try {
// 如果已經讀完了
if (future.isReady()) {
// 建立獲得該連線的IoSession例項
IoSession newSession = future.getSession();
try {
// Close old channel 關閉舊的session
IoSession oldSession = MinaClient.this.session; // copy reference
if (oldSession != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old mina channel " + oldSession + " on create new mina channel " + newSession);
}
// 關閉連線
oldSession.close();
} finally {
// 移除通道
MinaChannel.removeChannelIfDisconnected(oldSession);
}
}
} finally {
// 如果MinaClient關閉了
if (MinaClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new mina channel " + newSession + ",because the client closed.");
}
// 關閉session
newSession.close();
} finally {
MinaClient.this.session = null;
MinaChannel.removeChannelIfDisconnected(newSession);
}
} else {
// 設定新的session
MinaClient.this.session = newSession;
}
}
}
} catch (Exception e) {
exception.set(e);
} finally {
// 減少數量,釋放所有等待的執行緒
finish.countDown();
}
}
});
try {
// 當前執行緒等待,直到鎖存器倒計數到零,除非執行緒被中斷,或者指定的等待時間過去
finish.await(getConnectTimeout(),TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RemotingException(this,"client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start)
+ "ms) from netty client " + NetUtils.getLocalHost() + " using dubbo version "
+ Version.getVersion() + ",e);
}
Throwable e = exception.get();
if (e != null) {
throw e;
}
}
複製程式碼
該方法是客戶端連線伺服器的實現方法。其中用到了CountDownLatch來代表完成完成事件,它來做一個執行緒等待,直到1個執行緒完成上述的動作,也就是連線完成結束,才釋放等待的執行緒。保證每次只有一條執行緒去連線,解決future.awaitUninterruptibly()死鎖問題。
其他方法請自行檢視我寫的註釋。
(四)MinaServer
該類繼承了AbstractServer,是基於mina實現的服務端實現類。
1.屬性
private static final Logger logger = LoggerFactory.getLogger(MinaServer.class);
/**
* 套接字接收者物件
*/
private SocketAcceptor acceptor;
複製程式碼
2.doOpen
@Override
protected void doOpen() throws Throwable {
// set thread pool.
// 建立套接字接收者物件
acceptor = new SocketAcceptor(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY,Constants.DEFAULT_IO_THREADS),Executors.newCachedThreadPool(new NamedThreadFactory("MinaServerWorker",true)));
// config
// 設定配置
SocketAcceptorConfig cfg = (SocketAcceptorConfig) acceptor.getDefaultConfig();
cfg.setThreadModel(ThreadModel.MANUAL);
// set codec. 設定編解碼器
acceptor.getFilterChain().addLast("codec",this)));
// 開啟伺服器
acceptor.bind(getBindAddress(),this));
}
複製程式碼
該方法是建立伺服器,並且開啟伺服器。關鍵就是呼叫了acceptor的方法。
3.doClose
@Override
protected void doClose() throws Throwable {
try {
if (acceptor != null) {
// 取消繫結,也就是關閉伺服器
acceptor.unbind(getBindAddress());
}
} catch (Throwable e) {
logger.warn(e.getMessage(),e);
}
}
複製程式碼
該方法是關閉伺服器,就是呼叫了acceptor.unbind方法。
4.getChannels
@Override
public Collection<Channel> getChannels() {
// 獲得連線到該伺服器到所有連線控制程式碼
Set<IoSession> sessions = acceptor.getManagedSessions(getBindAddress());
Collection<Channel> channels = new HashSet<Channel>();
for (IoSession session : sessions) {
if (session.isConnected()) {
// 每次都用一個連線控制程式碼建立一個通道
channels.add(MinaChannel.getOrAddChannel(session,this));
}
}
return channels;
}
複製程式碼
該方法是獲得所有連線該伺服器的通道。
5.getChannel
@Override
public Channel getChannel(InetSocketAddress remoteAddress) {
// 獲得連線到該伺服器到所有連線控制程式碼
Set<IoSession> sessions = acceptor.getManagedSessions(getBindAddress());
// 遍歷所有控制程式碼,找到要找的通道
for (IoSession session : sessions) {
if (session.getRemoteAddress().equals(remoteAddress)) {
return MinaChannel.getOrAddChannel(session,this);
}
}
return null;
}
複製程式碼
該方法是獲得地址對應的單個通道。
(五)MinaTransporter
public class MinaTransporter implements Transporter {
public static final String NAME = "mina";
@Override
public Server bind(URL url,ChannelHandler handler) throws RemotingException {
// 建立MinaServer例項
return new MinaServer(url,handler);
}
@Override
public Client connect(URL url,ChannelHandler handler) throws RemotingException {
// 建立MinaClient例項
return new MinaClient(url,handler);
}
}
複製程式碼
該類實現了Transporter介面,是基於mina的傳輸層實現。可以看到,bind和connect方法分別就是建立了MinaServer和MinaClient例項。這裡我建議檢視一下《dubbo原始碼解析(九)遠端通訊——Transport層》。
(六)MinaCodecAdapter
該類是基於mina實現的編解碼類,實現了ProtocolCodecFactory。
1.屬性
/**
* 編碼物件
*/
private final ProtocolEncoder encoder = new InternalEncoder();
/**
* 解碼物件
*/
private final ProtocolDecoder decoder = new InternalDecoder();
/**
* 編解碼器
*/
private final Codec2 codec;
/**
* url物件
*/
private final URL url;
/**
* 通道處理器物件
*/
private final ChannelHandler handler;
/**
* 緩衝區大小
*/
private final int bufferSize;
複製程式碼
屬性比較好理解,該編解碼器用到了ProtocolEncoder和ProtocolDecoder,而InternalEncoder和InternalDecoder兩個類是該類的內部類,它們實現了ProtocolEncoder和ProtocolDecoder,關鍵的編解碼邏輯在這兩個類中實現。
2.構造方法
public MinaCodecAdapter(Codec2 codec,ChannelHandler handler) {
this.codec = codec;
this.url = url;
this.handler = handler;
int b = url.getPositiveParameter(Constants.BUFFER_KEY,Constants.DEFAULT_BUFFER_SIZE);
// 如果快取區大小在16位元組以內,則設定配置大小,如果不是,則設定8位元組的緩衝區大小
this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE;
}
複製程式碼
3.InternalEncoder
private class InternalEncoder implements ProtocolEncoder {
@Override
public void dispose(IoSession session) throws Exception {
}
@Override
public void encode(IoSession session,Object msg,ProtocolEncoderOutput out) throws Exception {
// 動態分配一個1k的緩衝區
ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(1024);
// 獲得通道
MinaChannel channel = MinaChannel.getOrAddChannel(session,handler);
try {
// 編碼
codec.encode(channel,buffer,msg);
} finally {
// 檢測是否斷開連線,如果斷開,則移除
MinaChannel.removeChannelIfDisconnected(session);
}
// 寫資料到out中
out.write(ByteBuffer.wrap(buffer.toByteBuffer()));
out.flush();
}
}
複製程式碼
該內部類是編碼類,其中的encode方法中寫到了編碼核心呼叫的是codec.encode。
4.InternalDecoder
private class InternalDecoder implements ProtocolDecoder {
private ChannelBuffer buffer = ChannelBuffers.EMPTY_BUFFER;
@Override
public void decode(IoSession session,ByteBuffer in,ProtocolDecoderOutput out) throws Exception {
int readable = in.limit();
if (readable <= 0) return;
ChannelBuffer frame;
// 如果緩衝區還有可讀位元組數
if (buffer.readable()) {
// 如果緩衝區是DynamicChannelBuffer型別的
if (buffer instanceof DynamicChannelBuffer) {
// 往buffer中寫入資料
buffer.writeBytes(in.buf());
frame = buffer;
} else {
// 緩衝區大小
int size = buffer.readableBytes() + in.remaining();
// 動態分配一個緩衝區
frame = ChannelBuffers.dynamicBuffer(size > bufferSize ? size : bufferSize);
// buffer的資料把寫到frame
frame.writeBytes(buffer,buffer.readableBytes());
// 把流中的資料寫到frame
frame.writeBytes(in.buf());
}
} else {
// 否則是基於Java NIO的ByteBuffer生成的緩衝區
frame = ChannelBuffers.wrappedBuffer(in.buf());
}
// 獲得通道
Channel channel = MinaChannel.getOrAddChannel(session,handler);
Object msg;
int savedReadIndex;
try {
do {
// 獲得讀索引
savedReadIndex = frame.readerIndex();
try {
// 解碼
msg = codec.decode(channel,frame);
} catch (Exception e) {
buffer = ChannelBuffers.EMPTY_BUFFER;
throw e;
}
// 拆包
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
frame.readerIndex(savedReadIndex);
break;
} else {
if (savedReadIndex == frame.readerIndex()) {
buffer = ChannelBuffers.EMPTY_BUFFER;
throw new Exception("Decode without read data.");
}
if (msg != null) {
// 把資料寫到輸出流裡面
out.write(msg);
}
}
} while (frame.readable());
} finally {
// 如果frame還有可讀資料
if (frame.readable()) {
//丟棄可讀資料
frame.discardReadBytes();
buffer = frame;
} else {
buffer = ChannelBuffers.EMPTY_BUFFER;
}
MinaChannel.removeChannelIfDisconnected(session);
}
}
@Override
public void dispose(IoSession session) throws Exception {
}
@Override
public void finishDecode(IoSession session,ProtocolDecoderOutput out) throws Exception {
}
}
複製程式碼
該內部類是解碼類,其中decode方法中關鍵的是呼叫了codec.decode,其餘的操作是利用緩衝區對資料的沖刷流轉。
後記
該部分相關的原始碼解析地址:github.com/CrazyHZM/in…
該文章講解了基於mina的來實現的遠端通訊、介紹dubbo-remoting-mina內的原始碼解析,關鍵需要對mina有所瞭解。下一篇我會講解基於netty3實現遠端通訊部分。