1. 程式人生 > 程式設計 >TiKV 原始碼解析系列文章(十五)表示式計算框架

TiKV 原始碼解析系列文章(十五)表示式計算框架

作者:駱迪安

上一篇 《TiKV 原始碼解析系列文章(十四)Coprocessor 概覽》講到了 TiDB 為了最大化利用分散式計算能力,會盡量將 Selection 運算元、聚合運算元等運算元下推到 TiKV 節點上。本文將繼續介紹 Coprocessor 中表達式計算框架的原始碼架構,帶大家看看 SQL 中的表示式是如何在 Coprocessor 中執行的。

什麼是表示式

比如說我們有這個 SQL 作為例子:

SELECT (count * price) AS sum FROM orders WHERE order_id < 100 
複製程式碼

其中 order_id < 10 就是一個表示式,它有一個列輸入引數: order_id

,輸出:Bool

RPN 表示式

因為 TiDB 下推的是樹狀結構表示式,所以我們需要選擇一種樹的遍歷方式, 這裡 Coprocessor 選擇了由下而上遞推的 RPN(逆波蘭表示法)。RPN 是樹的後序遍歷,後序遍歷在每個節點知道自己有幾個子節點的時候等價於原本的樹結構。

比如說我們有一個數學算式 2 *(3 + 4)+ 5

由於數學上習慣寫法是中序遍歷,我們通常要加上括號消除歧義(比如加減和乘除的順序)。通過把操作符後移 我們得到 RPN:2 3 4 + * 5 +,這樣我們無需括號就能無歧義地遍歷這個表示式:

  1. 執行 RPN 的過程需要一個棧來快取中間結果,比如說對於 2 3 4 + * 5 +
    ,我們從左到右遍歷表示式,遇到值就壓入棧中。直到 + 操作符,棧中已經壓入了 2 3 4

  1. 因為 + 是二元操作符,需要從棧中彈出兩個值 3 4,結果為 7,重新壓入棧中:

  1. 此時棧中的值為 2 7

  1. 下一個是 * 運運算元,也需要彈出兩個值 2 7,結果為 14 壓入棧中。

  1. 接著壓入 5

  1. 最後 + 運運算元彈出 14 5,結果為 19,壓入棧。

  1. 最後留在棧裡的就是表示式的結果。

構建 RPN 表示式

以表示式 order_id < 10 下推為例,其下推的樹狀表示式如下圖所示,其中 ColumnRef(2) 表示列 order_id

2 表示 order_id 列在該表結構中對應的 offset:

轉化為 RPN 表示式:

Coprocessor 中表達式的定義:

/// An expression in Reverse Polish notation,which is simply a list of RPN expression nodes.
///
/// You may want to build it using `RpnExpressionBuilder`.
#[derive(Debug,Clone)]
pub struct RpnExpression(Vec<RpnExpressionNode>);

/// A type for each node in the RPN expression list.
#[derive(Debug,Clone)]
pub enum RpnExpressionNode {
    /// Represents a function call.
    FnCall {
        func_meta: RpnFnMeta,args_len: usize,field_type: FieldType,implicit_args: Vec<ScalarValue>,},/// Represents a scalar constant value.
    Constant {
        value: ScalarValue,/// Represents a reference to a column in the columns specified in evaluation.
    ColumnRef { offset: usize },}
複製程式碼

執行 RPN 表示式

有了表示式後,接下來我們需要執行表示式,為此我們要使用一個棧結構來快取中間值。由於表示式中的操作符(RpnExpressionNode::FnCall)不會被存入棧,我們定義了只包含值的 RpnStackNode 儲存中間值:

// A type for each node in the RPN evaluation stack. It can be one of a scalar value node or a
/// vector value node. The vector value node can be either an owned vector value or a reference.
#[derive(Debug)]
pub enum RpnStackNode<'a> {
    /// Represents a scalar value. Comes from a constant node in expression list.
    Scalar {
        value: &'a ScalarValue,field_type: &'a FieldType,/// Represents a vector value. Comes from a column reference or evaluated result.
    Vector {
        value: RpnStackNodeVectorValue<'a>,}
複製程式碼

注意,Coprocessor 中表達式是向量化計算的,每次都儘量會計算多行,通常為 1024 行,即 op([]value,[]value) 而不是 op(value,value),從而減少分支並提高 Cache Locality。但運算數並不總是一個來自列的向量,還可能是使用者直接指定的常量(例如 SELECT a+1a 是向量,但 1 只是標量)。因此,RpnStackNode 分兩種:

  • 標量:由 Constant 生成。
  • 向量:執行 ColumnRe f 生成,或是 FnCall 呼叫返回的結果。

另外為了避免 Selection 運算元移動大量的資料,向量使用了間接的儲存方式,每個向量有真實資料和邏輯索引,只有邏輯索引中對應的真實資料才是邏輯有效的,這樣 Selection 運算元便可以只需改動邏輯索引而不需搬動大量的真實資料:

/// Represents a vector value node in the RPN stack.
///
/// It can be either an owned node or a reference node.
///
/// When node comes from a column reference,it is a reference node (both value and field_type
/// are references).
///
/// When nodes comes from an evaluated result,it is an owned node.
#[derive(Debug)]
pub enum RpnStackNodeVectorValue<'a> {
    Generated {
        physical_value: VectorValue,logical_rows: Arc<[usize]>,Ref {
        physical_value: &'a VectorValue,logical_rows: &'a [usize],}
複製程式碼

接下來我們用上面的 order_id < 100 作為例子看看錶達式是如何執行的。

  1. 首先我們準備好一個棧結構:

  1. 接著逐一遍歷表示式,第一個取出的是 ColumnRef,我們取出輸入 Selection 運算元的資料中對應 offset 的列的向量資料,並將向量壓入棧:

  1. 接著是 Constant,轉化為標量然後壓入棧:

  1. 最後一個是 LT 運運算元,它需要兩個入參,因此我們從棧中彈出兩個值作為引數呼叫 LTLT 會生成一個新的向量,將結果壓入棧:

  1. 最後留在棧裡的就是表示式的執行結果。

  1. Selection 運算元根據結果的布林值過濾原輸入的邏輯索引:

  1. 這樣就間接的過濾出有效資料而不用改變 Physical Vector:

實現 RPN 表示式函式

實現表示式函式(FnCall)是比較繁瑣的。比如對於二元操作符加法, 它既可以接受其中一元輸入常量,也可以接受來自列資料的向量。一種解決方法是將標量都重複填充為向量,這樣所有函式運算都是向量引數,但這個方法會有額外的標量拷貝開銷。為了避免這個開銷,Coprocessor 直接實現了向量與標量的運算,rpn_expr_codegen 提供了過程巨集 #[rpn_fn] ,我們只需定義標量邏輯,過程巨集將自動生成剩下帶有向量的邏輯。

下面我們來試著定義一個整數加法操作符,這裡入參和返回值都為標量即可,原始碼的實現引入了泛型更進一步將其抽象為所有數值型別間的加法:

#[rpn_fn]
#[inline]
pub fn int_plus_int(
    lhs: &Option<Int>,rhs: &Option<Int>,) -> Result<Option<Int>> {
    if let (Some(lhs),Some(rhs)) = (arg0,arg1) {
        lhs.checked_add(*rhs)
            .ok_or_else(|| Error::overflow("BIGINT",&format!("({} + {})",lhs,rhs)).into())
            .map(Some)
    } else {
        Ok(None)
    }
}
複製程式碼

#[rpn_fn] 巨集會分析這個操作符定義的引數數量和型別,自動生成既可以處理標量也可以處理向量的 int_plus_int_fn_meta(),這個函式將可以放進 FnCall 被用於表示式計算:

pub fn int_plus_int_fn_meta(
        _ctx: &mut EvalContext,output_rows: usize,args: &[RpnStackNode<'_>],_extra: &mut RpnFnCallExtra<'_>,) -> Result<VectorValue>
{
    assert!(args.len() >= 2);

    let lhs = args[0];
    let rhs = args[1];

    let mut result: Vec<Int> = Vec::with_capacity(output_rows);

    match lhs {
        RpnStackNode::Scalar { value: ScalarValue::Int(lhs),.. } => {
            match rhs {
                RpnStackNode::Scalar { value: ScalarValue::Int(rhs),.. } => {
                    let value = int_plus_int(lhs,rhs);
                    result.push(result);
                }
                RpnStackNode::Vector { value: VectorValue::Int(rhs_vector),.. } => {
                    for rhs_row in rhs_vector.logical_rows() {
                        let rhs = rhs_vector.physical_value[rhs_row];
                        let value = int_plus_int(lhs,rhs);
                        result.push(result);
                    }
                }
                _ => panic!("invalid expression")
            }
        }
        RpnStackNode::Vector { value: VectorValue::Int(lhs_vector),.. } => {
                    for lhs in lhs_vector {
                        let value = int_plus_int(lhs,rhs);
                        result.push(result);
                    }
                }
                RpnStackNode::Vector { value: VectorValue::Int(rhs_vector),.. } => {
                    for (lhs,rhs) in lhs_vector.logical_rows().iter().zip(rhs_vector.logical_rows()) {
                        let lhs = lhs_vector.physical_value[lhs_row];
                        let rhs = rhs_vector.physical_value[rhs_row];
                        let value = int_plus_int(lhs,rhs);
                        result.push(result);
                    }
                }
                _ => panic!("invalid expression")
            }
        }
        _ => panic!("invalid expression")
    }

    result
}
複製程式碼

注意:TiKV 原始碼使用泛型展開生成邏輯程式碼,較為複雜,因此上面給出的這段是展開後的等價虛擬碼。

小結

以上就是 Coprocessor 表示式框架實現解析。下一篇我們將詳細介紹各運算元的內部實現。

原文閱讀pingcap.com/blog-cn/tik…