通過租戶id實現的SaaS方案
概況
專案開發到一半,使用者突然提出需要多個分公司共同使用,這種需要將系統設計成SaaS架構,將各個分公司的資料進行隔離。
SaaS實現的方案
-
獨立資料庫
每個企業 獨立的物理資料庫,隔離性好,成本高。
-
共享資料庫、獨立schema
就是一臺物理機,多個邏輯資料庫,oracle叫做schema,mysql叫做database,每個企業獨立的schema。
-
共享資料庫、資料庫表(本次採用):
在表中新增“企業”或者“租戶”欄位區分是哪個企業的資料。操作的時候根據“租戶”欄位去查詢相應的資料。
優點: 所有租戶使用同一資料庫,所以成本低廉。
缺點:隔離級別低,安全性低,需要在開發時加大對安全的開發量,資料備份和恢復最困難。
改造思路
- 本次採用
共享資料庫、資料庫表
的SaaS方案。改造時需要做以下工作:
- 建立租戶資訊表。
- 先要將所有的表新增租戶id欄位
tenant_id
。用於關聯租戶資訊表。 - 將
tenant_id
和原始表id建立聯合主鍵。注意主鍵的順序,原表主鍵必須在左邊。 - 將表修改為分割槽表。
- 改造後,在新增租戶資訊的時候,同時在所有表中新增該租戶的分割槽,分割槽用於儲存該租戶的資料。
- 在後續增加記錄時,需要
tenant_id
欄位的值,在刪改查中,都需要在where條件中以tenant_id
為條件來操作某個租戶的資料。
測試環境介紹
測試庫中有5張表,我下文使用sys_log
表進行測試。
sys_log
CREATE TABLE `sys_log` (
`log_id` BIGINT NOT NULL AUTO_INCREMENT COMMENT '主鍵',`type` TINYINT(1) DEFAULT NULL COMMENT '型別',`content` VARCHAR(255) DEFAULT NULL COMMENT '內容',`create_id` BIGINT(18) DEFAULT NULL COMMENT '建立人ID',`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '建立時間',`tenant_id` INT NOT NULL,PRIMARY KEY (`log_id`,`tenant_id`) USING BTREE
) ENGINE=INNODB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='系統日誌'
複製程式碼
表新增租戶id欄位
找出未新增租戶id(tenant_id)欄位
的表。
SELECT
table_name
FROM
INFORMATION_SCHEMA.TABLES
WHERE table_schema = 'my' -- my 是我的測試資料庫名稱
AND table_name NOT IN
(SELECT
t.table_name
FROM
(SELECT
table_name,column_name
FROM
information_schema.columns
WHERE table_name IN
(SELECT
table_name
FROM
INFORMATION_SCHEMA.TABLES
WHERE table_schema = 'my')) t
WHERE t.column_name = 'tenant_id') ;
複製程式碼
執行,找到兩個符合條件的表,在資料庫進行確認,確實表中沒tenant_id
欄位。
建立租戶資訊表
僅供參考,用於儲存租戶資訊
CREATE TABLE `t_tenant` (
`tenant_id` varchar(40) NOT NULL DEFAULT 'c12dee54f652452b88142a0267ec74b7' COMMENT '租戶id',`tenant_code` varchar(100) DEFAULT NULL COMMENT '租戶編碼',`name` varchar(50) DEFAULT NULL COMMENT '租戶名稱',`desc` varchar(500) DEFAULT NULL COMMENT '租戶描述',`logo` varchar(255) DEFAULT NULL COMMENT '公司logo地址',`status` smallint(6) DEFAULT NULL COMMENT '狀態1有效0無效',`create_by` varchar(100) DEFAULT NULL COMMENT '建立者',`create_time` datetime DEFAULT NULL COMMENT '建立時間',`last_update_by` varchar(100) DEFAULT NULL COMMENT '最後修改人',`last_update_time` datetime DEFAULT NULL COMMENT '最後修改時間',`street_address` varchar(200) DEFAULT NULL COMMENT '街道樓號地址',`province` varchar(20) DEFAULT NULL COMMENT '一級行政單位,如廣東省,上海市等',`city` varchar(20) DEFAULT NULL COMMENT '城市,如廣州市,佛山市等',`district` varchar(20) DEFAULT NULL COMMENT '行政區,如番禺區,天河區等',`link_man` varchar(50) DEFAULT NULL COMMENT '聯絡人',`link_phone` varchar(50) DEFAULT NULL COMMENT '聯絡電話',`longitude` decimal(10,6) DEFAULT NULL COMMENT '經度',`latitude` decimal(10,6) DEFAULT NULL COMMENT '緯度',`adcode` varchar(8) DEFAULT NULL COMMENT '區域編碼,用於通過區域id快速匹配後展示,如廣州是440100',PRIMARY KEY (`tenant_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='租戶的基本資訊表';
複製程式碼
將所有表新增tenant_id
欄位
DROP PROCEDURE IF EXISTS addColumn ;
DELIMITER $$
CREATE PROCEDURE addColumn ()
BEGIN
-- 定義表名變數
DECLARE s_tablename VARCHAR (100) ;
/*顯示錶的資料庫中的所有表
SELECT table_name FROM information_schema.tables WHERE table_schema='databasename' Order by table_name ;
*/
#顯示所有
DECLARE cur_table_structure CURSOR FOR
SELECT
table_name
FROM
INFORMATION_SCHEMA.TABLES
WHERE table_schema = 'my' -- my = 我的測試資料庫名稱
AND table_name NOT IN
(SELECT
t.table_name
FROM
(SELECT
table_name,column_name
FROM
information_schema.columns
WHERE table_name IN
(SELECT
table_name
FROM
INFORMATION_SCHEMA.TABLES
WHERE table_schema = 'my')) t
WHERE t.column_name = 'tenant_id') ;
DECLARE CONTINUE HANDLER FOR SQLSTATE '02000' SET s_tablename = NULL ;
OPEN cur_table_structure ;
FETCH cur_table_structure INTO s_tablename ;
WHILE
(s_tablename IS NOT NULL) DO SET @MyQuery = CONCAT(
"alter table `",s_tablename,"` add COLUMN `tenant_id` INT not null COMMENT '租戶id'"
) ;
PREPARE msql FROM @MyQuery ;
EXECUTE msql ;
#USING @c;
FETCH cur_table_structure INTO s_tablename ;
END WHILE ;
CLOSE cur_table_structure ;
END $$
DELIMITER ;
#執行儲存過程
CALL addColumn () ;
複製程式碼
實現表分割槽
實現的目標:在新增租戶的時候實現對所有表新增分割槽
需要的條件:
- 表必須是分割槽表,如果不是分割槽表,那麼需要改成分割槽表。
-
tenant_id
必須和原表log_id
主鍵組成聯合主鍵。
將表修改成分割槽表
表中新增分割槽有三種方式:
- 建立臨時分割槽表
sys_log_copy
,copy資料過來後,刪除舊的sys_log
,再將sys_log_copy
修改為sys_log
(本次採用,詳見下文) - 直接將表修改為分割槽表,需要原表中無資料,否則無法成功:
-- 如果表中沒資料,可以直接將表進行分割槽
ALTER TABLE sys_log PARTITION BY LIST COLUMNS (tenant_id)
(
PARTITION a1 VALUES IN (1) ENGINE = INNODB,PARTITION a2 VALUES IN (2) ENGINE = INNODB,PARTITION a3 VALUES IN (3) ENGINE = INNODB
);
複製程式碼
- 在分割槽表中新增新分割槽,需要表已經是分割槽表,否則無法成功:
-- 已經是分割槽表中新增分割槽
ALTER TABLE sys_log_copy ADD PARTITION
(
PARTITION a4 VALUES IN (4) ENGINE = INNODB,PARTITION a5 VALUES IN (5) ENGINE = INNODB,PARTITION a6 VALUES IN (6) ENGINE = INNODB
);
複製程式碼
通過建立臨時分割槽表
的方式將原錶轉換成分割槽表
- 檢視錶建表語句:
SHOW CREATE TABLE `sys_log`;
複製程式碼
- 參考建表語句,建立copy表:
CREATE TABLE `sys_log_copy` (
`log_id` BIGINT NOT NULL AUTO_INCREMENT COMMENT '主鍵',`tenant_id`) USING BTREE
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC COMMENT='系統日誌'
PARTITION BY LIST COLUMNS (tenant_id)
(
PARTITION a1 VALUES IN (1) ENGINE = INNODB,PARTITION a3 VALUES IN (3) ENGINE = INNODB
);
複製程式碼
注意上文中的
DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC
CHARSET=utf8mb4
是因為utf8在mysql中是不健全的編碼。ROW_FORMAT=DYNAMIC
是為了避免所以長度過大後導致如下報錯:ERROR 1709 (HY000): Index column size too large. The maximum column size is 767 bytes. 複製程式碼
也可以在my.ini配置檔案中設定為true解決這個問題,但是要重啟資料庫,會比較麻煩。
[mysqld] innodb_large_prefix=true 複製程式碼
- 驗證分割槽情況:
SELECT
partition_name part,partition_expression expr,partition_description descr,table_rows
FROM
information_schema.partitions
WHERE TABLE_SCHEMA = SCHEMA()
AND TABLE_NAME = 'sys_log_copy' ;
複製程式碼
可以檢視到新增的3個分割槽
- 將資料複製到copy表中
INSERT INTO `sys_log_copy` SELECT * FROM `sys_log`
複製程式碼
- 刪除表
sys_log
,再修改sys_log_copy
表中的名字為sys_log
。
編寫自動建立分割槽的倉儲過程
通過儲存過程實現,在分割槽表中新增分割槽
DELIMITER $$
USE `my`$$
DROP PROCEDURE IF EXISTS `add_table_partition`$$
CREATE DEFINER=`root`@`%` PROCEDURE `add_table_partition`(IN _tenantId INT)
BEGIN
DECLARE IS_FOUND INT DEFAULT 1 ;
-- 用於記錄遊標中存在分割槽的表名
DECLARE v_tablename VARCHAR (200) ;
-- 用於快取新增分割槽時候的sql
DECLARE v_sql VARCHAR (5000) ;
-- 分割槽名稱定義
DECLARE V_P_VALUE VARCHAR (100) DEFAULT CONCAT('P',REPLACE(_tenantId,'-','')) ;
DECLARE V_COUNT INT ;
DECLARE V_LOONUM INT DEFAULT 0 ;
DECLARE V_NUM INT DEFAULT 0 ;
-- 定義遊標,值是所有分割槽表的表名
DECLARE curr CURSOR FOR
(SELECT
t.TABLE_NAME
FROM
INFORMATION_SCHEMA.partitions t
WHERE TABLE_SCHEMA = SCHEMA()
AND t.partition_name IS NOT NULL
GROUP BY t.TABLE_NAME) ;
-- 如果沒影響的記錄,程式也繼續執行
DECLARE CONTINUE HANDLER FOR NOT FOUND SET IS_FOUND=0;
-- 獲取上一步中的遊標中獲取到的表名的個數
SELECT
COUNT(1) INTO V_LOONUM
FROM
(SELECT
t.TABLE_NAME
FROM
INFORMATION_SCHEMA.partitions t
WHERE TABLE_SCHEMA = SCHEMA()
AND t.partition_name IS NOT NULL
GROUP BY t.TABLE_NAME) A ;
-- 只有在存在分割槽表的時候才打開遊標
IF V_LOONUM > 0
THEN -- 開啟遊標
OPEN curr ;
-- 迴圈
read_loop :
LOOP
-- 宣告結束的時候
IF V_NUM >= V_LOONUM
THEN LEAVE read_loop ;
END IF ;
-- 取遊標的值給變數
FETCH curr INTO v_tablename ;
-- 依次判斷分割槽表是否存在改分割槽,如果不存在則新增分割槽
SET V_NUM = V_NUM + 1 ;
SELECT
COUNT(1) INTO V_COUNT
FROM
INFORMATION_SCHEMA.partitions t
WHERE LOWER(T.TABLE_NAME) = LOWER(v_tablename)
AND T.PARTITION_NAME = V_P_VALUE
AND T.TABLE_SCHEMA = SCHEMA() ;
IF V_COUNT <= 0
THEN SET v_sql = CONCAT(
' ALTER TABLE ',v_tablename,' ADD PARTITION (PARTITION ',V_P_VALUE,' VALUES IN(',_tenantId,') ENGINE = INNODB) '
) ;
SET @v_sql = v_sql ;
-- 預處理需要執行的動態SQL,其中stmt是一個變數
PREPARE stmt FROM @v_sql ;
-- 執行SQL語句
EXECUTE stmt ;
-- 釋放掉預處理段
DEALLOCATE PREPARE stmt ;
END IF ;
-- 結束迴圈
END LOOP read_loop;
-- 關閉遊標
CLOSE curr ;
END IF ;
END$$
DELIMITER ;
複製程式碼
呼叫儲存過程測試
CALL add_table_partition (8) ;
複製程式碼
- 如果表還不是分割槽表,那麼呼叫儲存過程會有如下報錯:
錯誤程式碼: 1505 Partition management on a not partitioned table is not possible 複製程式碼
翻譯出來的意思是:“在未分割槽的表上進行分割槽管理是不可能的”。
- 可能會報錯如下:
錯誤程式碼: 1329 No data - zero rows fetched,selected,or processed 複製程式碼
但如果通過查詢下面的
information_schema.partitions
無誤,那就是新增分割槽成功。可以通過在定義遊標後,開啟遊標之前,新增如下方式解決:
DECLARE CONTINUE HANDLER FOR NOT FOUND SET IS_FOUND=0; 複製程式碼
SELECT
partition_name part,table_rows
FROM
information_schema.partitions
WHERE TABLE_SCHEMA = SCHEMA()
AND TABLE_NAME = 'sys_log' ;
複製程式碼
通過mybatis
呼叫儲存過程
<select id="testProcedure" statementType="CALLABLE" useCache="false" parameterType="string">
<![CDATA[
call add_table_partition (
#{_tenantId,mode=IN,jdbcType=VARCHAR});
]]>
</select>
複製程式碼
實現簡單的資料許可權
我們可能需要這種場景需求
- 集團公司有多個子公司,集團公司和每個子公司各自是一個租戶,但是子公司下還有子公司。
- 不管是集團公司還是說旗下的子公司都有相應的使用者(t_user)。
- 使用者需要有檢視自己公司下的資料和下面子公司的資料的許可權。
從上面的場景需求中,我們知道,t_tenant
表需要設計成樹樁結構。下面我們進行測試。
修改上文的t_tenant
表為:
CREATE TABLE `t_tenant` (
`tenant_id` VARCHAR(40) NOT NULL DEFAULT '0' COMMENT '租戶id',`path` VARCHAR(200) DEFAULT NOT NULL COMMENT '從根節點開始的id路徑樹,如:0-2-21-211-2111,通過"-"隔開,最末尾為自己id',`tenant_code` VARCHAR(100) DEFAULT NULL COMMENT '租戶編碼',`name` VARCHAR(50) DEFAULT NULL COMMENT '租戶名稱',`logo` VARCHAR(255) DEFAULT NULL COMMENT '公司logo地址',`status` SMALLINT(6) DEFAULT NULL COMMENT '狀態1有效0無效',`create_by` VARCHAR(100) DEFAULT NULL COMMENT '建立者',`create_time` DATETIME DEFAULT NULL COMMENT '建立時間',`last_update_by` VARCHAR(100) DEFAULT NULL COMMENT '最後修改人',`last_update_time` DATETIME DEFAULT NULL COMMENT '最後修改時間',`street_address` VARCHAR(200) DEFAULT NULL COMMENT '街道樓號地址',PRIMARY KEY (`tenant_id`) USING BTREE
) ENGINE=INNODB DEFAULT CHARSET=utf8 COMMENT='租戶的基本資訊表'
複製程式碼
修改的地方有:
- 為了演示,刪除了些感覺沒是沒用的欄位
- 添加了
path
欄位,實現租戶和子租戶的樹形結構
新增測試資料
新增租戶資訊:
通過path快取著
t_tenant
樹的路徑。
建立使用者表(t_user
),新增測試使用者:
測試的使用者id和tenant_id需要對應
建立附件表(t_file
),新增測試業務資料:
建立人欄位(
create_by
)關聯使用者表(t_user
),也要關聯到租戶(tenant_id
),指明是哪個子公司的資料。
進行測試
-
檢視
tenant_id
是"211"的租戶資訊和其下的子租戶資訊SELECT tt.`tenant_id`,tt.path FROM t_tenant tt WHERE (SELECT INSTR(tt.path,"211")) ; 複製程式碼
-
檢視
tenant_id
是"211"的附件和其下的子租戶的附件資訊SELECT * FROM t_file tf WHERE tf.`tenant_id` IN (SELECT tt.`tenant_id` FROM t_tenant tt WHERE (SELECT INSTR(tt.path,"211"))) ; 複製程式碼
-
檢視
tenant_id
是"2"的附件和其下的子租戶的附件資訊
通過mybatis攔截器實現檢視子租戶的資料許可權
編寫攔截器:
package com.iee.orm.mybatis.common;
import com.baomidou.mybatisplus.core.toolkit.PluginUtils;
import com.iee.orm.mybatis.common.UserHelper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.executor.statement.StatementHandler;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.mapping.SqlCommandType;
import org.apache.ibatis.mapping.StatementType;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import org.springframework.context.annotation.Configuration;
import java.sql.Connection;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 實現 攔截select語句,實現尾部拼接sql來查詢本租戶和子租戶資訊
* @author [email protected]
*/
@Slf4j
@Configuration
@Intercepts({
@Signature(type = StatementHandler.class,method = "prepare",args = {Connection.class,Integer.class})
})
public class SqlInterceptor implements Interceptor {
@Override
public Object intercept(Invocation invocation) throws Throwable {
StatementHandler statementHandler = PluginUtils.realTarget(invocation.getTarget());
MetaObject metaObject = SystemMetaObject.forObject(statementHandler);
// 非select語句或者是儲存過程 則跳過)
MappedStatement mappedStatement = (MappedStatement) metaObject.getValue("delegate.mappedstatement");
if (SqlCommandType.SELECT != mappedStatement.getSqlCommandType()
|| StatementType.CALLABLE == mappedStatement.getStatementType()) {
return invocation.proceed();
}
// 拼接sql執行
getSqlByInvocation(metaObject,invocation);
return invocation.proceed();
}
/**
* 拼接sql執行
* @param metaObject
* @param invocation
* @return
*/
private String getSqlByInvocation(MetaObject metaObject,Invocation invocation) throws NoSuchFieldException,IllegalAccessException {
//在原始的sql中拼裝sql,方式一
String originalSql = (String) metaObject.getValue(PluginUtils.DELEGATE_BOUNDSQL_SQL);
metaObject.setValue(PluginUtils.DELEGATE_BOUNDSQL_SQL,originalSql);
String targetSql = addDataSql(originalSql);
return targetSql;
//在原始的sql中拼裝sql,方式二
// StatementHandler statementHandler = (StatementHandler) invocation.getTarget();
// BoundSql boundSql = statementHandler.getBoundSql();
// String sql = boundSql.getSql();
// Field field = boundSql.getClass().getDeclaredField("sql");
// field.setAccessible(true);
// field.set(boundSql,addDataSql(sql));
// return sql;
}
/**
* 將原始sql進行拼接
* @param sql
* @return
*/
static String addDataSql(String sql) {
//需要去掉“;" 因為“;"表示sql結束,不去掉那麼後面的拼接會受到影響
sql = StringUtils.replace(sql,";","");
StringBuilder sb = new StringBuilder(sql);
String tenantId = UserHelper.getTenantId();
//用於檢視子租戶資訊的sql字尾
String suffSql = " `tenant_id` IN " +
"(SELECT " +
"tt.`tenant_id` " +
"FROM " +
"t_tenant tt " +
"WHERE " +
"(SELECT " +
"INSTR(tt.path," + tenantId + "))) ";
String regex = "(.*)(where)(.*)";
Pattern compile = Pattern.compile(regex);
Matcher matcher = compile.matcher(sql);
if (matcher.find()) {
String whereLastSql = matcher.group(matcher.groupCount());
//where 的條件存在 且是括號對的情況下,是不能再加“where”的,但是需要新增“and”
int left = StringUtils.countMatches(whereLastSql,"(");
int right = StringUtils.countMatches(whereLastSql,")");
if(left == right){
sb.append(" and ");
sb.append(suffSql);
log.info("資料許可權替換後sql:--->" + sb.toString());
return sb.toString();
}
}
//其他情況下需要新增where
sb.append(" where ");
sb.append(suffSql);
log.info("資料許可權替換後sql:--->" + sb.toString());
return sb.toString();
}
@Override
public Object plugin(Object target) {
return Plugin.wrap(target,new SqlInterceptor());
}
@Override
public void setProperties(Properties properties) {
}
}
複製程式碼
用到了mybatis-plus的工具類
/*
* Copyright (c) 2011-2020,baomidou ([email protected]).
* <p>
* Licensed under the Apache License,Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
* <p>
* https://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing,software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.baomidou.mybatisplus.core.toolkit;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import java.lang.reflect.Proxy;
import java.util.Properties;
/**
* 外掛工具類
*
* @author TaoYu,hubin
* @since 2017-06-20
*/
public final class PluginUtils {
public static final String DELEGATE_BOUNDSQL_SQL = "delegate.boundSql.sql";
private PluginUtils() {
// to do nothing
}
/**
* 獲得真正的處理物件,可能多層代理.
*/
@SuppressWarnings("unchecked")
public static <T> T realTarget(Object target) {
if (Proxy.isProxyClass(target.getClass())) {
MetaObject metaObject = SystemMetaObject.forObject(target);
return realTarget(metaObject.getValue("h.target"));
}
return (T) target;
}
/**
* 根據 key 獲取 Properties 的值
*/
public static String getProperty(Properties properties,String key) {
String value = properties.getProperty(key);
return StringUtils.isEmpty(value) ? null : value;
}
}
複製程式碼
測試的時候發現只要是select語句就會去關聯查詢出子租戶的資訊。
測試程式碼見: