1. 程式人生 > 程式設計 >通過租戶id實現的SaaS方案

通過租戶id實現的SaaS方案

概況

專案開發到一半,使用者突然提出需要多個分公司共同使用,這種需要將系統設計成SaaS架構,將各個分公司的資料進行隔離。

SaaS實現的方案

  • 獨立資料庫

    每個企業 獨立的物理資料庫,隔離性好,成本高。

  • 共享資料庫、獨立schema

    就是一臺物理機,多個邏輯資料庫,oracle叫做schema,mysql叫做database,每個企業獨立的schema。

  • 共享資料庫、資料庫表(本次採用):

    在表中新增“企業”或者“租戶”欄位區分是哪個企業的資料。操作的時候根據“租戶”欄位去查詢相應的資料。

    優點: 所有租戶使用同一資料庫,所以成本低廉。

    缺點:隔離級別低,安全性低,需要在開發時加大對安全的開發量,資料備份和恢復最困難。

改造思路

  1. 本次採用共享資料庫、資料庫表的SaaS方案。改造時需要做以下工作:
  • 建立租戶資訊表。
  • 先要將所有的表新增租戶id欄位tenant_id。用於關聯租戶資訊表。
  • tenant_id和原始表id建立聯合主鍵。注意主鍵的順序,原表主鍵必須在左邊。
  • 將表修改為分割槽表。
  1. 改造後,在新增租戶資訊的時候,同時在所有表中新增該租戶的分割槽,分割槽用於儲存該租戶的資料。
  2. 在後續增加記錄時,需要tenant_id欄位的值,在刪改查中,都需要在where條件中以tenant_id為條件來操作某個租戶的資料。

測試環境介紹

測試庫中有5張表,我下文使用sys_log表進行測試。

sys_log
的建表語句為:

CREATE TABLE `sys_log` (
  `log_id` BIGINT NOT NULL AUTO_INCREMENT COMMENT '主鍵',`type` TINYINT(1) DEFAULT NULL COMMENT '型別',`content` VARCHAR(255) DEFAULT NULL COMMENT '內容',`create_id` BIGINT(18) DEFAULT NULL COMMENT '建立人ID',`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '建立時間',`tenant_id`
INT NOT NULL,PRIMARY KEY (`log_id`,`tenant_id`) USING BTREE ) ENGINE=INNODB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='系統日誌' 複製程式碼

表新增租戶id欄位

找出未新增租戶id(tenant_id)欄位的表。

SELECT 
    table_name 
  FROM
    INFORMATION_SCHEMA.TABLES
  WHERE table_schema = 'my'   -- my 是我的測試資料庫名稱
    AND table_name NOT IN 
    (SELECT 
      t.table_name 
    FROM
      (SELECT 
        table_name,column_name 
      FROM
        information_schema.columns 
      WHERE table_name IN 
        (SELECT 
          table_name 
        FROM
          INFORMATION_SCHEMA.TABLES 
        WHERE table_schema = 'my')) t 
    WHERE t.column_name = 'tenant_id') ;
複製程式碼

執行,找到兩個符合條件的表,在資料庫進行確認,確實表中沒tenant_id欄位。

建立租戶資訊表

僅供參考,用於儲存租戶資訊

CREATE TABLE `t_tenant` (
  `tenant_id` varchar(40) NOT NULL DEFAULT 'c12dee54f652452b88142a0267ec74b7' COMMENT '租戶id',`tenant_code` varchar(100) DEFAULT NULL COMMENT '租戶編碼',`name` varchar(50) DEFAULT NULL COMMENT '租戶名稱',`desc` varchar(500) DEFAULT NULL COMMENT '租戶描述',`logo` varchar(255) DEFAULT NULL COMMENT '公司logo地址',`status` smallint(6) DEFAULT NULL COMMENT '狀態1有效0無效',`create_by` varchar(100) DEFAULT NULL COMMENT '建立者',`create_time` datetime DEFAULT NULL COMMENT '建立時間',`last_update_by` varchar(100) DEFAULT NULL COMMENT '最後修改人',`last_update_time` datetime DEFAULT NULL COMMENT '最後修改時間',`street_address` varchar(200) DEFAULT NULL COMMENT '街道樓號地址',`province` varchar(20) DEFAULT NULL COMMENT '一級行政單位,如廣東省,上海市等',`city` varchar(20) DEFAULT NULL COMMENT '城市,如廣州市,佛山市等',`district` varchar(20) DEFAULT NULL COMMENT '行政區,如番禺區,天河區等',`link_man` varchar(50) DEFAULT NULL COMMENT '聯絡人',`link_phone` varchar(50) DEFAULT NULL COMMENT '聯絡電話',`longitude` decimal(10,6) DEFAULT NULL COMMENT '經度',`latitude` decimal(10,6) DEFAULT NULL COMMENT '緯度',`adcode` varchar(8) DEFAULT NULL COMMENT '區域編碼,用於通過區域id快速匹配後展示,如廣州是440100',PRIMARY KEY (`tenant_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='租戶的基本資訊表';
複製程式碼

將所有表新增tenant_id欄位

DROP PROCEDURE IF EXISTS addColumn ;

DELIMITER $$

CREATE PROCEDURE addColumn () 
BEGIN
  -- 定義表名變數
  DECLARE s_tablename VARCHAR (100) ;
  /*顯示錶的資料庫中的所有表
 SELECT table_name FROM information_schema.tables WHERE table_schema='databasename' Order by table_name ;
 */
  #顯示所有
  DECLARE cur_table_structure CURSOR FOR 
  SELECT 
    table_name 
  FROM
    INFORMATION_SCHEMA.TABLES
  WHERE table_schema = 'my'     -- my = 我的測試資料庫名稱
    AND table_name NOT IN 
    (SELECT 
      t.table_name 
    FROM
      (SELECT 
        table_name,column_name 
      FROM
        information_schema.columns 
      WHERE table_name IN 
        (SELECT 
          table_name 
        FROM
          INFORMATION_SCHEMA.TABLES 
        WHERE table_schema = 'my')) t 
    WHERE t.column_name = 'tenant_id') ;
  DECLARE CONTINUE HANDLER FOR SQLSTATE '02000' SET s_tablename = NULL ;
  OPEN cur_table_structure ;
  FETCH cur_table_structure INTO s_tablename ;
  WHILE
    (s_tablename IS NOT NULL) DO SET @MyQuery = CONCAT(
      "alter table `",s_tablename,"` add COLUMN `tenant_id` INT not null COMMENT '租戶id'"
    ) ;
    PREPARE msql FROM @MyQuery ;
    EXECUTE msql ;
    #USING @c; 
    FETCH cur_table_structure INTO s_tablename ;
  END WHILE ;
  CLOSE cur_table_structure ;
END $$

DELIMITER ;

#執行儲存過程
CALL addColumn () ;
複製程式碼

實現表分割槽

實現的目標:在新增租戶的時候實現對所有表新增分割槽

需要的條件:

  • 表必須是分割槽表,如果不是分割槽表,那麼需要改成分割槽表。
  • tenant_id必須和原表log_id主鍵組成聯合主鍵。

將表修改成分割槽表

表中新增分割槽有三種方式:

  • 建立臨時分割槽表sys_log_copy,copy資料過來後,刪除舊的sys_log,再將sys_log_copy修改為sys_log(本次採用,詳見下文)
  • 直接將表修改為分割槽表,需要原表中無資料,否則無法成功:
-- 如果表中沒資料,可以直接將表進行分割槽
ALTER TABLE sys_log PARTITION BY LIST COLUMNS (tenant_id)
(
    PARTITION a1 VALUES IN (1) ENGINE = INNODB,PARTITION a2 VALUES IN (2) ENGINE = INNODB,PARTITION a3 VALUES IN (3) ENGINE = INNODB
);
複製程式碼
  • 在分割槽表中新增新分割槽,需要表已經是分割槽表,否則無法成功:
-- 已經是分割槽表中新增分割槽
ALTER TABLE sys_log_copy ADD PARTITION
(
    PARTITION a4 VALUES IN (4) ENGINE = INNODB,PARTITION a5 VALUES IN (5) ENGINE = INNODB,PARTITION a6 VALUES IN (6) ENGINE = INNODB
);
複製程式碼

通過建立臨時分割槽表的方式將原錶轉換成分割槽表

  1. 檢視錶建表語句:
SHOW CREATE TABLE `sys_log`;
複製程式碼
  1. 參考建表語句,建立copy表:
CREATE TABLE `sys_log_copy` (
  `log_id` BIGINT NOT NULL AUTO_INCREMENT COMMENT '主鍵',`tenant_id`) USING BTREE
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC COMMENT='系統日誌'
PARTITION BY LIST COLUMNS (tenant_id)
(
    PARTITION a1 VALUES IN (1) ENGINE = INNODB,PARTITION a3 VALUES IN (3) ENGINE = INNODB
);
複製程式碼

注意上文中的DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC

  • CHARSET=utf8mb4是因為utf8在mysql中是不健全的編碼。
  • ROW_FORMAT=DYNAMIC是為了避免所以長度過大後導致如下報錯:
ERROR 1709 (HY000): Index column size too large. The maximum column size is 767 bytes.
複製程式碼

也可以在my.ini配置檔案中設定為true解決這個問題,但是要重啟資料庫,會比較麻煩。

[mysqld]
innodb_large_prefix=true
複製程式碼
  1. 驗證分割槽情況:
SELECT 
  partition_name part,partition_expression expr,partition_description descr,table_rows 
FROM
  information_schema.partitions 
WHERE TABLE_SCHEMA = SCHEMA() 
  AND TABLE_NAME = 'sys_log_copy' ;
複製程式碼

可以檢視到新增的3個分割槽

  1. 將資料複製到copy表中
INSERT INTO `sys_log_copy` SELECT * FROM `sys_log`
複製程式碼
  1. 刪除表sys_log,再修改sys_log_copy表中的名字為sys_log

編寫自動建立分割槽的倉儲過程

通過儲存過程實現,在分割槽表中新增分割槽

DELIMITER $$

USE `my`$$

DROP PROCEDURE IF EXISTS `add_table_partition`$$

CREATE DEFINER=`root`@`%` PROCEDURE `add_table_partition`(IN _tenantId INT)
BEGIN
  DECLARE IS_FOUND INT DEFAULT 1 ;
  -- 用於記錄遊標中存在分割槽的表名
  DECLARE v_tablename VARCHAR (200) ;
  -- 用於快取新增分割槽時候的sql
  DECLARE v_sql VARCHAR (5000) ;
  -- 分割槽名稱定義
  DECLARE V_P_VALUE VARCHAR (100) DEFAULT CONCAT('P',REPLACE(_tenantId,'-','')) ;
  DECLARE V_COUNT INT ;
  DECLARE V_LOONUM INT DEFAULT 0 ;
  DECLARE V_NUM INT DEFAULT 0 ;
  -- 定義遊標,值是所有分割槽表的表名
  DECLARE curr CURSOR FOR 
  (SELECT 
    t.TABLE_NAME 
  FROM
    INFORMATION_SCHEMA.partitions t 
  WHERE TABLE_SCHEMA = SCHEMA() 
    AND t.partition_name IS NOT NULL 
  GROUP BY t.TABLE_NAME) ;
  -- 如果沒影響的記錄,程式也繼續執行
  DECLARE CONTINUE HANDLER FOR NOT FOUND SET IS_FOUND=0;
  -- 獲取上一步中的遊標中獲取到的表名的個數
  SELECT 
    COUNT(1) INTO V_LOONUM 
  FROM
    (SELECT 
      t.TABLE_NAME 
    FROM
      INFORMATION_SCHEMA.partitions t 
    WHERE TABLE_SCHEMA = SCHEMA() 
      AND t.partition_name IS NOT NULL 
    GROUP BY t.TABLE_NAME) A ;
  -- 只有在存在分割槽表的時候才打開遊標
  IF V_LOONUM > 0 
  THEN -- 開啟遊標
  OPEN curr ;
  -- 迴圈
  read_loop :
  LOOP
    -- 宣告結束的時候
    IF V_NUM >= V_LOONUM 
    THEN LEAVE read_loop ;
    END IF ;
    -- 取遊標的值給變數
    FETCH curr INTO v_tablename ;
    -- 依次判斷分割槽表是否存在改分割槽,如果不存在則新增分割槽
    SET V_NUM = V_NUM + 1 ;
    SELECT 
      COUNT(1) INTO V_COUNT 
    FROM
      INFORMATION_SCHEMA.partitions t 
    WHERE LOWER(T.TABLE_NAME) = LOWER(v_tablename) 
      AND T.PARTITION_NAME = V_P_VALUE 
      AND T.TABLE_SCHEMA = SCHEMA() ;
    IF V_COUNT <= 0 
    THEN SET v_sql = CONCAT(
      '  ALTER TABLE ',v_tablename,' ADD PARTITION (PARTITION ',V_P_VALUE,' VALUES IN(',_tenantId,') ENGINE = INNODB) '
    ) ;
    SET @v_sql = v_sql ;
    -- 預處理需要執行的動態SQL,其中stmt是一個變數
    PREPARE stmt FROM @v_sql ;
    -- 執行SQL語句
    EXECUTE stmt ;
    -- 釋放掉預處理段
    DEALLOCATE PREPARE stmt ;
    END IF ;
    -- 結束迴圈
  END LOOP read_loop;
  -- 關閉遊標
  CLOSE curr ;
  END IF ;
END$$

DELIMITER ;
複製程式碼

呼叫儲存過程測試

CALL add_table_partition (8) ;
複製程式碼
  • 如果表還不是分割槽表,那麼呼叫儲存過程會有如下報錯:
錯誤程式碼: 1505
Partition management on a not partitioned table is not possible
複製程式碼

翻譯出來的意思是:“在未分割槽的表上進行分割槽管理是不可能的”。

  • 可能會報錯如下:
錯誤程式碼: 1329
No data - zero rows fetched,selected,or processed
複製程式碼

但如果通過查詢下面的information_schema.partitions無誤,那就是新增分割槽成功。

可以通過在定義遊標後,開啟遊標之前,新增如下方式解決:

DECLARE CONTINUE HANDLER FOR NOT FOUND SET IS_FOUND=0;
複製程式碼
SELECT 
  partition_name part,table_rows 
FROM
  information_schema.partitions 
WHERE TABLE_SCHEMA = SCHEMA() 
  AND TABLE_NAME = 'sys_log' ;
複製程式碼

通過mybatis呼叫儲存過程

<select id="testProcedure" statementType="CALLABLE" useCache="false" parameterType="string">
        <![CDATA[
                call add_table_partition (
                        #{_tenantId,mode=IN,jdbcType=VARCHAR});
        ]]>
</select>
複製程式碼

實現簡單的資料許可權

我們可能需要這種場景需求

  1. 集團公司有多個子公司,集團公司和每個子公司各自是一個租戶,但是子公司下還有子公司。
  2. 不管是集團公司還是說旗下的子公司都有相應的使用者(t_user)。
  3. 使用者需要有檢視自己公司下的資料和下面子公司的資料的許可權。

從上面的場景需求中,我們知道,t_tenant表需要設計成樹樁結構。下面我們進行測試。

修改上文的t_tenant表為:

CREATE TABLE `t_tenant` (
  `tenant_id` VARCHAR(40) NOT NULL DEFAULT '0' COMMENT '租戶id',`path` VARCHAR(200) DEFAULT NOT NULL COMMENT '從根節點開始的id路徑樹,如:0-2-21-211-2111,通過"-"隔開,最末尾為自己id',`tenant_code` VARCHAR(100) DEFAULT NULL COMMENT '租戶編碼',`name` VARCHAR(50) DEFAULT NULL COMMENT '租戶名稱',`logo` VARCHAR(255) DEFAULT NULL COMMENT '公司logo地址',`status` SMALLINT(6) DEFAULT NULL COMMENT '狀態1有效0無效',`create_by` VARCHAR(100) DEFAULT NULL COMMENT '建立者',`create_time` DATETIME DEFAULT NULL COMMENT '建立時間',`last_update_by` VARCHAR(100) DEFAULT NULL COMMENT '最後修改人',`last_update_time` DATETIME DEFAULT NULL COMMENT '最後修改時間',`street_address` VARCHAR(200) DEFAULT NULL COMMENT '街道樓號地址',PRIMARY KEY (`tenant_id`) USING BTREE
) ENGINE=INNODB DEFAULT CHARSET=utf8 COMMENT='租戶的基本資訊表'
複製程式碼

修改的地方有:

  • 為了演示,刪除了些感覺沒是沒用的欄位
  • 添加了path欄位,實現租戶和子租戶的樹形結構

新增測試資料

新增租戶資訊:

通過path快取著t_tenant樹的路徑。

建立使用者表(t_user),新增測試使用者:

測試的使用者id和tenant_id需要對應

建立附件表(t_file),新增測試業務資料:

建立人欄位(create_by)關聯使用者表(t_user),也要關聯到租戶(tenant_id),指明是哪個子公司的資料。

進行測試

  • 檢視tenant_id是"211"的租戶資訊和其下的子租戶資訊

    SELECT 
      tt.`tenant_id`,tt.path 
    FROM
      t_tenant tt 
    WHERE 
      (SELECT 
        INSTR(tt.path,"211")) ;
    複製程式碼
  • 檢視tenant_id是"211"的附件和其下的子租戶的附件資訊

    SELECT 
      * 
    FROM
      t_file tf 
    WHERE tf.`tenant_id` IN 
      (SELECT 
        tt.`tenant_id` 
      FROM
        t_tenant tt 
      WHERE 
        (SELECT 
          INSTR(tt.path,"211"))) ;
    複製程式碼

  • 檢視tenant_id是"2"的附件和其下的子租戶的附件資訊

通過mybatis攔截器實現檢視子租戶的資料許可權

編寫攔截器:

package com.iee.orm.mybatis.common;

import com.baomidou.mybatisplus.core.toolkit.PluginUtils;
import com.iee.orm.mybatis.common.UserHelper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.executor.statement.StatementHandler;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.mapping.SqlCommandType;
import org.apache.ibatis.mapping.StatementType;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import org.springframework.context.annotation.Configuration;

import java.sql.Connection;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * 實現 攔截select語句,實現尾部拼接sql來查詢本租戶和子租戶資訊
 * @author [email protected]
 */
@Slf4j
@Configuration
@Intercepts({
        @Signature(type = StatementHandler.class,method = "prepare",args = {Connection.class,Integer.class})
})
public class SqlInterceptor implements Interceptor {
    @Override
    public Object intercept(Invocation invocation) throws Throwable {
        StatementHandler statementHandler = PluginUtils.realTarget(invocation.getTarget());
        MetaObject metaObject = SystemMetaObject.forObject(statementHandler);
        // 非select語句或者是儲存過程 則跳過)
        MappedStatement mappedStatement = (MappedStatement) metaObject.getValue("delegate.mappedstatement");
        if (SqlCommandType.SELECT != mappedStatement.getSqlCommandType()
                || StatementType.CALLABLE == mappedStatement.getStatementType()) {
            return invocation.proceed();
        }
        // 拼接sql執行
        getSqlByInvocation(metaObject,invocation);
        return invocation.proceed();
    }

    /**
     * 拼接sql執行
     * @param metaObject
     * @param invocation
     * @return
     */
    private String getSqlByInvocation(MetaObject metaObject,Invocation invocation) throws NoSuchFieldException,IllegalAccessException {
        //在原始的sql中拼裝sql,方式一
        String originalSql = (String) metaObject.getValue(PluginUtils.DELEGATE_BOUNDSQL_SQL);
        metaObject.setValue(PluginUtils.DELEGATE_BOUNDSQL_SQL,originalSql);
        String targetSql = addDataSql(originalSql);
        return targetSql;

        //在原始的sql中拼裝sql,方式二
//        StatementHandler statementHandler = (StatementHandler) invocation.getTarget();
//        BoundSql boundSql = statementHandler.getBoundSql();
//        String sql = boundSql.getSql();
//        Field field = boundSql.getClass().getDeclaredField("sql");
//        field.setAccessible(true);
//        field.set(boundSql,addDataSql(sql));
//        return sql;
    }

    /**
     * 將原始sql進行拼接
     * @param sql
     * @return
     */
    static String addDataSql(String sql) {
        //需要去掉“;" 因為“;"表示sql結束,不去掉那麼後面的拼接會受到影響
        sql = StringUtils.replace(sql,";","");
        StringBuilder sb = new StringBuilder(sql);
        String tenantId = UserHelper.getTenantId();

        //用於檢視子租戶資訊的sql字尾
        String suffSql = " `tenant_id` IN " +
                "(SELECT " +
                "tt.`tenant_id` " +
                "FROM " +
                "t_tenant tt " +
                "WHERE " +
                "(SELECT " +
                "INSTR(tt.path," + tenantId + "))) ";

        String regex = "(.*)(where)(.*)";
        Pattern compile = Pattern.compile(regex);
        Matcher matcher = compile.matcher(sql);
        if (matcher.find()) {
            String whereLastSql = matcher.group(matcher.groupCount());
            //where 的條件存在 且是括號對的情況下,是不能再加“where”的,但是需要新增“and”
            int left = StringUtils.countMatches(whereLastSql,"(");
            int right = StringUtils.countMatches(whereLastSql,")");
            if(left == right){
                sb.append(" and ");
                sb.append(suffSql);
                log.info("資料許可權替換後sql:--->" + sb.toString());
                return sb.toString();
            }
        }
        //其他情況下需要新增where
        sb.append(" where ");
        sb.append(suffSql);
        log.info("資料許可權替換後sql:--->" + sb.toString());
        return sb.toString();
    }

    @Override
    public Object plugin(Object target) {
        return Plugin.wrap(target,new SqlInterceptor());
    }

    @Override
    public void setProperties(Properties properties) {
    }
}
複製程式碼

用到了mybatis-plus的工具類

/*
 * Copyright (c) 2011-2020,baomidou ([email protected]).
 * <p>
 * Licensed under the Apache License,Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 * <p>
 * https://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing,software
 * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.baomidou.mybatisplus.core.toolkit;

import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;

import java.lang.reflect.Proxy;
import java.util.Properties;

/**
 * 外掛工具類
 *
 * @author TaoYu,hubin
 * @since 2017-06-20
 */
public final class PluginUtils {
    public static final String DELEGATE_BOUNDSQL_SQL = "delegate.boundSql.sql";

    private PluginUtils() {
        // to do nothing
    }

    /**
     * 獲得真正的處理物件,可能多層代理.
     */
    @SuppressWarnings("unchecked")
    public static <T> T realTarget(Object target) {
        if (Proxy.isProxyClass(target.getClass())) {
            MetaObject metaObject = SystemMetaObject.forObject(target);
            return realTarget(metaObject.getValue("h.target"));
        }
        return (T) target;
    }

    /**
     * 根據 key 獲取 Properties 的值
     */
    public static String getProperty(Properties properties,String key) {
        String value = properties.getProperty(key);
        return StringUtils.isEmpty(value) ? null : value;
    }
}
複製程式碼

測試的時候發現只要是select語句就會去關聯查詢出子租戶的資訊。

測試程式碼見:

github.com/longxiaonan…

您的鼓勵我的動力,請點贊支援下,謝謝!