資料庫中介軟體 MyCAT原始碼分析——跨庫兩表Join
- 1. 概述
- 2. 主流程
- 3. ShareJoin
- 3.1 JoinParser
- 3.2 ShareJoin.processSQL(...)
- 3.3 BatchSQLJob
- 3.4 ShareDBJoinHandler
- 3.5 ShareRowOutPutDataHandler
- 4. 彩蛋
1. 概述
MyCAT 支援跨庫表 Join,目前版本僅支援跨庫兩表 Join。雖然如此,已經能夠滿足我們大部分的業務場景。況且,Join 過多的表可能帶來的效能問題也是很麻煩的。
本文主要分享:
- 整體流程、呼叫順序圖
- 核心程式碼的分析
前置閱讀:《MyCAT 原始碼分析 —— 【單庫單表】查詢》。
OK,Let's Go。
2. 主流程
當執行跨庫兩表 Join SQL 時,經歷的大體流程如下:
SQL 上,需要添加註解 /*!mycat:catlet=io.mycat.catlets.ShareJoin */${SQL}
。 RouteService#route(...)
解析註解 mycat:catlet
後,路由給 HintCatletHandler
作進一步處理。
HintCatletHandler
獲取註解對應的 Catlet
實現類, io.mycat.catlets.ShareJoin
就是其中一種實現(目前也只有這一種實現),提供了跨庫兩表 Join 的功能。從類命名上看, ShareJoin
核心程式碼如下:
// HintCatletHandler.java public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String realSQL, String charset, ServerConnection sc, LayerCachePool cachePool, String hintSQLValue, int hintSqlType, Map hintMap) throws SQLNonTransientException { String cateletClass = hintSQLValue; if (LOGGER.isDebugEnabled()) { LOGGER.debug("load catelet class:" + hintSQLValue + " to run sql " + realSQL); } try { Catlet catlet = (Catlet) MycatServer.getInstance().getCatletClassLoader().getInstanceofClass(cateletClass); catlet.route(sysConfig, schema, sqlType, realSQL, charset, sc, cachePool); catlet.processSQL(realSQL, new EngineCtx(sc.getSession2())); } catch (Exception e) { LOGGER.warn("catlet error " + e); throw new SQLNonTransientException(e); } return null; }
3. ShareJoin
目前支援跨庫兩表 Join。 ShareJoin
將 SQL 拆分成左表 SQL 和 右表 SQL,傳送給各資料節點執行,彙總資料結果進行合後返回。
虛擬碼如下:
// SELECT u.id, o.id FROM t_order o
// INNER JOIN t_user u ON o.uid = u.id
// 【順序】查詢左表
String leftSQL = "SELECT o.id, u.id FROM t_order o";
List leftList = dn[0].select(leftSQL) + dn[1].select(leftSQL) + ... + dn[n].select(leftsql);
// 【並行】查詢右表
String rightSQL = "SELECT u.id FROM t_user u WHERE u.id IN (${leftList.uid})";
for (dn : dns) { // 此處是並行執行,使用回撥邏輯
for (rightRecord : dn.select(rightSQL)) { // 查詢右表
// 合併結果
for (leftRecord : leftList) {
if (leftRecord.uid == rightRecord.id) {
write(leftRecord + leftRecord.uid 拼接結果);
}
}
}
}
實際情況會更加複雜,我們接下來一點點往下看。
3.1 JoinParser
JoinParser
負責對 SQL 進行解析。整體流程如下:
舉個例子, /*!mycat:catlet=io.mycat.catlets.ShareJoin */SELECT o.id,u.usernamefromt_order o join t_user u on o.uid=u.id;
解析後, TableFilter
結果如下:
- tName :表名
- tAlia :表自定義命名
- where :過濾條件
- order :排序條件
- parenTable :左連線的 Join 的表名。
t_user
表 在join
屬性 的parenTable
為 "o",即t_order
。 - joinParentkey :左連線的 Join 欄位
- joinKey :join 欄位。
t_user
表 在join
屬性 為id
。 - join :子 tableFilter。即,該表連線的右邊的表。
- parent :和
join
屬性 相對。
看到此處,大家可能有疑問,為什麼要把 SQL 解析成 TableFilter
。 JoinParser
根據 TableFilter
生成資料節點執行 SQL。程式碼如下:
// TableFilter.java
public String getSQL() {
String sql = "";
// fields
for (Entry<String, String> entry : fieldAliasMap.entrySet()) {
String key = entry.getKey();
String val = entry.getValue();
if (val == null) {
sql = unionsql(sql, getFieldfrom(key), ",");
} else {
sql = unionsql(sql, getFieldfrom(key) + " as " + val, ",");
}
}
// where
if (parent == null) { // on/where 等於號左邊的表
String parentJoinKey = getJoinKey(true);
// fix sharejoin bug:
// (AbstractConnection.java:458) -close connection,reason:program err:java.lang.IndexOutOfBoundsException:
// 原因是左表的select列沒有包含 join 列,在獲取結果時報上面的錯誤
if (sql != null && parentJoinKey != null &&
!sql.toUpperCase().contains(parentJoinKey.trim().toUpperCase())) {
sql += ", " + parentJoinKey;
}
sql = "select " + sql + " from " + tName;
if (!(where.trim().equals(""))) {
sql += " where " + where.trim();
}
} else { // on/where 等於號右邊邊的表
if (allField) {
sql = "select " + sql + " from " + tName;
} else {
sql = unionField("select " + joinKey, sql, ",");
sql = sql + " from " + tName;
//sql="select "+joinKey+","+sql+" from "+tName;
}
if (!(where.trim().equals(""))) {
sql += " where " + where.trim() + " and (" + joinKey + " in %s )";
} else {
sql += " where " + joinKey + " in %s ";
}
}
// order
if (!(order.trim().equals(""))) {
sql += " order by " + order.trim();
}
// limit
if (parent == null) {
if ((rowCount > 0) && (offset > 0)) {
sql += " limit" + offset + "," + rowCount;
} else {
if (rowCount > 0) {
sql += " limit " + rowCount;
}
}
}
return sql;
}
- 當
parent
為空時,即on/where 等於號左邊的表。例如:selectid,uidfromt_order
。 - 當
parent
不為空時,即on/where 等於號右邊的表。例如:selectid,usernamefromt_userwhereidin(1,2,3)
。
3.2 ShareJoin.processSQL(...)
當 SQL 解析完後,生成左邊的表執行的 SQL,傳送給對應的資料節點查詢資料。大體流程如下:
當 SQL 為 /*!mycat:catlet=io.mycat.catlets.ShareJoin */SELECT o.id,u.usernamefromt_order o join t_user u on o.uid=u.id;
時, sql=getSql()
的返回結果為 selectid,uidfromt_order
。
生成左邊的表執行的 SQL 後,順序順序順序傳送給對應的資料節點查詢資料。具體順序查詢是怎麼實現的,我們來看下章 BatchSQLJob。
3.3 BatchSQLJob
EngineCtx
對 BatchSQLJob
封裝,提供上層兩個方法:
- executeNativeSQLSequnceJob :順序(非併發)在每個資料節點執行SQL任務
- executeNativeSQLParallJob :併發在每個資料節點執行SQL任務
核心程式碼如下:
// EngineCtx.java
public void executeNativeSQLSequnceJob(String[] dataNodes, String sql,
SQLJobHandler jobHandler) {
for (String dataNode : dataNodes) {
SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode,
jobHandler, this);
bachJob.addJob(job, false);
}
}
public void executeNativeSQLParallJob(String[] dataNodes, String sql,
SQLJobHandler jobHandler) {
for (String dataNode : dataNodes) {
SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode,
jobHandler, this);
bachJob.addJob(job, true);
}
}
BatchSQLJob
通過執行中任務列表、待執行任務列表來實現順序/併發執行任務。核心程式碼如下:
// BatchSQLJob.java
/**
* 執行中任務列表
*/
private ConcurrentHashMap<Integer, SQLJob> runningJobs = new ConcurrentHashMap<Integer, SQLJob>();
/**
* 待執行任務列表
*/
private ConcurrentLinkedQueue<SQLJob> waitingJobs = new ConcurrentLinkedQueue<SQLJob>();
public void addJob(SQLJob newJob, boolean parallExecute) {
if (parallExecute) {
runJob(newJob);
} else {
waitingJobs.offer(newJob);
if (runningJobs.isEmpty()) { // 若無正在執行中的任務,則從等待佇列裡獲取任務進行執行。
SQLJob job = waitingJobs.poll();
if (job != null) {
runJob(job);
}
}
}
}
public boolean jobFinished(SQLJob sqlJob) {
runningJobs.remove(sqlJob.getId());
SQLJob job = waitingJobs.poll();
if (job != null) {
runJob(job);
return false;
} else {
if (noMoreJobInput) {
return runningJobs.isEmpty() && waitingJobs.isEmpty();
} else {
return false;
}
}
}
-
順序執行時,當
runningJobs
存在執行中的任務時,#addJob(...)
時,不立即執行,新增到waitingJobs
。當SQLJob
完成時,順序呼叫下一個任務。 -
併發執行時,
#addJob(...)
時,立即執行。
SQLJob
SQL 非同步執行任務。其 jobHandler(SQLJobHandler)
屬性,在 SQL 執行有返回結果時,會進行回撥,從而實現非同步執行。
在 ShareJoin
裡, SQLJobHandler
有兩個實現: ShareDBJoinHandler
、 ShareRowOutPutDataHandler
。前者,左邊的表執行的 SQL 回撥;後者,右邊的表執行的 SQL 回撥。
3.4 ShareDBJoinHandler
ShareDBJoinHandler
,左邊的表執行的 SQL 回撥。流程如下:
-
#fieldEofResponse(...)
:接收資料節點返回的 fields,放入記憶體。 -
#rowResponse(...)
:接收資料節點返回的 row,放入記憶體。 -
#rowEofResponse(...)
:接收完一個數據節點返回所有的 row。當所有資料節點都完成 SQL 執行時,提交右邊的表執行的 SQL 任務,並行執行,即圖中#createQryJob(...)。
當 SQL 為 /*!mycat:catlet=io.mycat.catlets.ShareJoin */SELECT o.id,u.usernamefromt_order o join t_user u on o.uid=u.id;
時, sql=getChildSQL()
的返回結果為 selectid,usernamefromt_userwhereidin(1,2,3)
。
核心程式碼如下:
// ShareJoin.java
private void createQryJob(int batchSize) {
int count = 0;
Map<String, byte[]> batchRows = new ConcurrentHashMap<String, byte[]>();
String theId = null;
StringBuilder sb = new StringBuilder().append('(');
String svalue = "";
for (Map.Entry<String, String> e : ids.entrySet()) {
theId = e.getKey();
byte[] rowbyte = rows.remove(theId);
if (rowbyte != null) {
batchRows.put(theId, rowbyte);
}
if (!svalue.equals(e.getValue())) {
if (joinKeyType == Fields.FIELD_TYPE_VAR_STRING
|| joinKeyType == Fields.FIELD_TYPE_STRING) { // joinkey 為varchar
sb.append("'").append(e.getValue()).append("'").append(','); // ('digdeep','yuanfang')
} else { // 預設joinkey為int/long
sb.append(e.getValue()).append(','); // (1,2,3)
}
}
svalue = e.getValue();
if (count++ > batchSize) {
break;
}
}
if (count == 0) {
return;
}
jointTableIsData = true;
sb.deleteCharAt(sb.length() - 1).append(')');
String sql = String.format(joinParser.getChildSQL(), sb);
getRoute(sql);
ctx.executeNativeSQLParallJob(getDataNodes(), sql, new ShareRowOutPutDataHandler(this, fields, joinindex, joinParser.getJoinRkey(), batchRows, ctx.getSession()));
}
3.5 ShareRowOutPutDataHandler
ShareRowOutPutDataHandler
,右邊的表執行的 SQL 回撥。流程如下:
-
#fieldEofResponse(...)
:接收資料節點返回的 fields,返回 header 給 MySQL Client。 -
#rowResponse(...)
:接收資料節點返回的 row,匹配左表的記錄,返回合併後返回的 row 給 MySQL Client。 -
#rowEofResponse(...)
:當所有 row 都返回完後,返回 eof 給 MySQL Client。
核心程式碼如下:
// ShareRowOutPutDataHandler.java
public boolean onRowData(String dataNode, byte[] rowData) {
RowDataPacket rowDataPkgold = ResultSetUtil.parseRowData(rowData, bfields);
//拷貝一份batchRows
Map<String, byte[]> batchRowsCopy = new ConcurrentHashMap<String, byte[]>();
batchRowsCopy.putAll(arows);
// 獲取Id欄位,
String id = ByteUtil.getString(rowDataPkgold.fieldValues.get(joinR));
// 查詢ID對應的A表的記錄
byte[] arow = getRow(batchRowsCopy, id, joinL);
while (arow != null) {
RowDataPacket rowDataPkg = ResultSetUtil.parseRowData(arow, afields);//ctx.getAllFields());
for (int i = 1; i < rowDataPkgold.fieldCount; i++) {
// 設定b.name 欄位
byte[] bname = rowDataPkgold.fieldValues.get(i);
rowDataPkg.add(bname);
rowDataPkg.addFieldCount(1);
}
// huangyiming add
MiddlerResultHandler middlerResultHandler = session.getMiddlerResultHandler();
if (null == middlerResultHandler) {
ctx.writeRow(rowDataPkg);
} else {
if (middlerResultHandler instanceof MiddlerQueryResultHandler) {
byte[] columnData = rowDataPkg.fieldValues.get(0);
if (columnData != null && columnData.length > 0) {
String rowValue = new String(columnData);
middlerResultHandler.add(rowValue);
}
//}
}
}
arow = getRow(batchRowsCopy, id, joinL);
}
return false;
}
4. 彩蛋
如下是本文涉及到的核心類,有興趣的同學可以翻一翻。
ShareJoin
另外不支援的功能:
- 只支援 inner join,不支援 left join、right join 等等連線。
- 不支援 order by。
- 不支援 group by 以及 相關聚合函式。
- 即使 join 左表的欄位未宣告為返回 fields 也會返回。
恩,MyCAT 弱XA 原始碼繼續走起!