物理计划与表达式

本章讨论的源代码可以在 KQuery 项目的 physical-plan 模块中找到。

第五章中定义的逻辑计划规定了要做什么,但没有规定如何做,有独立的逻辑计划和物理计划是很好的做法,尽管有可能将它们结合起来以降低复杂性。

将逻辑计划和物理计划分开的一个原因是,有时可能有多种方式来执行一个特定的操作,这意味着逻辑计划和物理计划之间存在一对多的关系。

例如,单进程与分布式执行,或 CPU 与 GPU 执行,都可能有单独的物理计划。

另外,像聚合和连接这样的操作可以用各种算法来实现,有不同的性能权衡。当聚合已经被分组键排序的数据时,使用组聚合(也称为排序聚合)是很有效的,它一次只需要为一组分组键保持状态,并且在一组分组键结束时就可以发出结果。如果数据没有被排序,那么通常会使用哈希聚合。哈希聚合通过分组键维护一个 HashMap 的累加器。

连接有更多的算法,包括嵌套循环连接、排序-合并连接和哈希连接。

物理计划会返回一个针对记录批次的迭代器。

interface PhysicalPlan {
  fun schema(): Schema
  fun execute(): Sequence<RecordBatch>
  fun children(): List<PhysicalPlan>
}

物理表达式

我们已经定义了逻辑计划中所引用的逻辑表达式,但是我们现在需要实现物理表达式类,其中包含了在运行时对表达式求值的代码。

每个逻辑表达式可以有多个物理表达式的实现。例如,对于将两个数字相加的逻辑表达式 AddExpr,我们可以分别拥有使用 CPU 和 GPU 执行计算的实现。查询规划器可以根据代码运行的服务器的硬件能力来选择使用哪一个。

物理表达式是针对记录批次进行求值的,结果是一些列。

下面是我们用来表示物理表达式的接口。

interface Expression {
  fun evaluate(input: RecordBatch): ColumnVector
}

列表达式

Column 表达式简单地对正在处理的 RecordBatch 中的ColumnVector 的引用进行求值。Column 逻辑表达式通过名称来引用输入,这对编写查询很方便,但是对于物理表达式,我们想避免每次表达式求值时查找名称的开销,所以它通过索引来引用列。

class ColumnExpression(val i: Int) : Expression {

  override fun evaluate(input: RecordBatch): ColumnVector {
    return input.field(i)
  }

  override fun toString(): String {
    return "#$i"
  }
}

字面量表达式

字面表达式的物理实现仅仅是一个包装在类中的字面量,该类实现了相应的 trait,并为列中的每个索引提供相同的值。

class LiteralValueVector(
    val arrowType: ArrowType, 
    val value: Any?, 
    val size: Int) : ColumnVector {

  override fun getType(): ArrowType {
    return arrowType
  }

  override fun getValue(i: Int): Any? {
    if (i<0 || i>=size) {
      throw IndexOutOfBoundsException()
    }
    return value
  }

  override fun size(): Int {
    return size
  }

}

有了这个类,我们就可以为每个数据类型的字面量表达式创建物理表达式。

class LiteralLongExpression(val value: Long) : Expression {
  override fun evaluate(input: RecordBatch): ColumnVector {
    return LiteralValueVector(ArrowTypes.Int64Type, 
                              value, 
                              input.rowCount())
  }
}

class LiteralDoubleExpression(val value: Double) : Expression {
  override fun evaluate(input: RecordBatch): ColumnVector {
    return LiteralValueVector(ArrowTypes.DoubleType, 
                              value, 
                              input.rowCount())
  }
}

class LiteralStringExpression(val value: String) : Expression {
  override fun evaluate(input: RecordBatch): ColumnVector {
    return LiteralValueVector(ArrowTypes.StringType, 
                              value.toByteArray(), 
                              input.rowCount())
  }
}

二进制表达式

对于二进制表达式,我们需要求出左和右的输入表达式的值,然后针对这些输入值用具体的二进制运算符求值,因此我们可以提供一个基类来简化每个运算符的实现。

abstract class BinaryExpression(val l: Expression, val r: Expression) : Expression {
  override fun evaluate(input: RecordBatch): ColumnVector {
    val ll = l.evaluate(input)
    val rr = r.evaluate(input)
    assert(ll.size() == rr.size())
    if (ll.getType() != rr.getType()) {
      throw IllegalStateException(
          "Binary expression operands do not have the same type: " +
          "${ll.getType()} != ${rr.getType()}")
    }
    return evaluate(ll, rr)
  }

  abstract fun evaluate(l: ColumnVector, r: ColumnVector) : ColumnVector
}

比较表达式

比较表达式只是比较两个输入列中的所有值,并产生一个包含结果的新列(一个位向量)。

下面是一个相等运算符的例子。

class EqExpression(l: Expression, 
                   r: Expression): BooleanExpression(l,r) {
    
  override fun evaluate(l: Any?, r: Any?, arrowType: ArrowType) : Boolean {
    return when (arrowType) {
      ArrowTypes.Int8Type -> (l as Byte) == (r as Byte)
      ArrowTypes.Int16Type -> (l as Short) == (r as Short)
      ArrowTypes.Int32Type -> (l as Int) == (r as Int)
      ArrowTypes.Int64Type -> (l as Long) == (r as Long)
      ArrowTypes.FloatType -> (l as Float) == (r as Float)
      ArrowTypes.DoubleType -> (l as Double) == (r as Double)
      ArrowTypes.StringType -> toString(l) == toString(r)
      else -> throw IllegalStateException(
          "Unsupported data type in comparison expression: $arrowType")
    }
  }
}

数学表达式

数学表达式的实现与比较表达式的代码非常相似。所有的数学表达式都使用同一个基类。

abstract class MathExpression(l: Expression, 
                              r: Expression): BinaryExpression(l,r) {

  override fun evaluate(l: ColumnVector, r: ColumnVector): ColumnVector {
    val fieldVector = FieldVectorFactory.create(l.getType(), l.size())
    val builder = ArrowVectorBuilder(fieldVector)
    (0 until l.size()).forEach {
      val value = evaluate(l.getValue(it), r.getValue(it), l.getType())
      builder.set(it, value)
    }
    builder.setValueCount(l.size())
    return builder.build()
  }

  abstract fun evaluate(l: Any?, r: Any?, arrowType: ArrowType) : Any?
}

下面是一个扩展这个基类的具体数学表达式的例子。

class AddExpression(l: Expression, 
                    r: Expression): MathExpression(l,r) {
    
  override fun evaluate(l: Any?, r: Any?, arrowType: ArrowType) : Any? {
      return when (arrowType) {
        ArrowTypes.Int8Type -> (l as Byte) + (r as Byte)
        ArrowTypes.Int16Type -> (l as Short) + (r as Short)
        ArrowTypes.Int32Type -> (l as Int) + (r as Int)
        ArrowTypes.Int64Type -> (l as Long) + (r as Long)
        ArrowTypes.FloatType -> (l as Float) + (r as Float)
        ArrowTypes.DoubleType -> (l as Double) + (r as Double)
        else -> throw IllegalStateException(
            "Unsupported data type in math expression: $arrowType")
      }
  }

  override fun toString(): String {
    return "$l+$r"
  }
}

聚合表达式

到目前为止,我们所看的表达式都是从每个批次的一个或多个输入列中产生一个输出列。聚合表达式更加复杂,因为它们在多批数据中聚合数值,然后产生一个最终值,所以我们需要引入累加器的概念,而且每个聚合表达式的物理表示需要知道如何产生一个适当的累加器,以便查询引擎将输入数据传递给它。

下面是表示聚合表达式和累加器的主要接口。

interface AggregateExpression {
  fun inputExpression(): Expression
  fun createAccumulator(): Accumulator
}

interface Accumulator {
  fun accumulate(value: Any?)
  fun finalValue(): Any?
}

对 Max 聚合表达式的实现将产生一个相应的 MaxAccumulator。

class MaxExpression(private val expr: Expression) : AggregateExpression {

  override fun inputExpression(): Expression {
    return expr
  }

  override fun createAccumulator(): Accumulator {
    return MaxAccumulator()
  }

  override fun toString(): String {
    return "MAX($expr)"
  }
}

下面是一个 MaxAccumulator 实现的示例。

class MaxAccumulator : Accumulator {

  var value: Any? = null

  override fun accumulate(value: Any?) {
    if (value != null) {
      if (this.value == null) {
        this.value = value
      } else {
        val isMax = when (value) {
          is Byte -> value > this.value as Byte
          is Short -> value > this.value as Short
          is Int -> value > this.value as Int
          is Long -> value > this.value as Long
          is Float -> value > this.value as Float
          is Double -> value > this.value as Double
          is String -> value > this.value as String
          else -> throw UnsupportedOperationException(
            "MAX is not implemented for data type: ${value.javaClass.name}")        
        }
            
        if (isMax) {
          this.value = value
        }
      }
    }
  }

  override fun finalValue(): Any? {
    return value
  }
}

物理计划

有了物理表达式,我们现在可以为查询引擎支持的各种转换实现物理计划了。

扫描(Scan)

扫描执行计划只是委托给一个数据源,传入一个投影来限制加载到内存的列数。没有执行额外的逻辑。

class ScanExec(val ds: DataSource, val projection: List<String>) : PhysicalPlan {

  override fun schema(): Schema {
    return ds.schema().select(projection)
  }

  override fun children(): List<PhysicalPlan> {
    // Scan is a leaf node and has no child plans
    return listOf()
  }

  override fun execute(): Sequence<RecordBatch> {
    return ds.scan(projection);
  }

  override fun toString(): String {
    return "ScanExec: schema=${schema()}, projection=$projection"
  }
}

投影(Projection)

投影执行计划只是针对输入列执行投影表达式,然后产生一个包含派生列的记录批次。请注意,对于通过名称引用现有列的投影表达式的情况,派生列只是对输入列的一个指针或引用,底层数据值并没有被复制。

class ProjectionExec(
    val input: PhysicalPlan,
    val schema: Schema,
    val expr: List<Expression>) : PhysicalPlan {

  override fun schema(): Schema {
    return schema
  }

  override fun children(): List<PhysicalPlan> {
    return listOf(input)
  }

  override fun execute(): Sequence<RecordBatch> {
    return input.execute().map { batch ->
      val columns = expr.map { it.evaluate(batch) }
        RecordBatch(schema, columns)
      }
  }

  override fun toString(): String {
    return "ProjectionExec: $expr"
  }
}

选择(也叫过滤) (Selection (also known as Filter))

选择执行计划是第一个非平凡的计划,因为它有条件逻辑来确定输入记录批次中的哪些行应该包括在输出批次中。

对于每个输入批次,执行过滤表达式以返回一个位向量,包含代表表达式的布尔结果的位,每行有一个。然后这个位向量被用来过滤输入列以产生新的输出列。这是一个简单的实现,可以针对位向量包含所有 1 或所有 0 的情况进行优化,以避免将数据复制到新向量的开销。

class SelectionExec(
    val input: PhysicalPlan, 
    val expr: Expression) : PhysicalPlan {

  override fun schema(): Schema {
    return input.schema()
  }

  override fun children(): List<PhysicalPlan> {
    return listOf(input)
  }

  override fun execute(): Sequence<RecordBatch> {
    val input = input.execute()
    return input.map { batch ->
      val result = (expr.evaluate(batch) as ArrowFieldVector).field as BitVector
      val schema = batch.schema
      val columnCount = batch.schema.fields.size
      val filteredFields = (0 until columnCount).map { 
          filter(batch.field(it), result) 
      }
      val fields = filteredFields.map { ArrowFieldVector(it) }
      RecordBatch(schema, fields)
    }

  private fun filter(v: ColumnVector, selection: BitVector) : FieldVector {
    val filteredVector = VarCharVector("v", 
                                       RootAllocator(Long.MAX_VALUE))
    filteredVector.allocateNew()

    val builder = ArrowVectorBuilder(filteredVector)

    var count = 0
    (0 until selection.valueCount).forEach {
      if (selection.get(it) == 1) {
        builder.set(count, v.getValue(it))
        count++
      }
    }
    filteredVector.valueCount = count
    return filteredVector
  }
}

哈希聚合

哈希聚合计划是最复杂的计划,因为它必须处理所有进入的批次,维护一个 HashMap 累加器,并为正在处理的每一行更新累加器。最后,用累加器结果来创建一个包含聚合查询结果的记录批次。

class HashAggregateExec(
    val input: PhysicalPlan,
    val groupExpr: List<PhysicalExpr>,
    val aggregateExpr: List<PhysicalAggregateExpr>,
    val schema: Schema) : PhysicalPlan {

  override fun schema(): Schema {
    return schema
  }
  
  override fun children(): List<PhysicalPlan> {
    return listOf(input)
  }

  override fun toString(): String {
    return "HashAggregateExec: groupExpr=$groupExpr, aggrExpr=$aggregateExpr"
  }

  override fun execute(): Sequence<RecordBatch> {
    val map = HashMap<List<Any?>, List<Accumulator>>()

    // for each batch from the input executor
    input.execute().iterator().forEach { batch ->

    // evaluate the grouping expressions
    val groupKeys = groupExpr.map { it.evaluate(batch) }

    // evaluate the expressions that are inputs to the aggregate functions
    val aggrInputValues = aggregateExpr.map { 
        it.inputExpression().evaluate(batch) 
    }
                                         
    // for each row in the batch                                         
    (0 until batch.rowCount()).forEach { rowIndex ->
      // create the key for the hash map                                          
      val rowKey = groupKeys.map {
      val value = it.getValue(rowIndex)
      when (value) {
        is ByteArray -> String(value)
        else -> value
      }
    }

    // get or create accumulators for this grouping key
    val accumulators = map.getOrPut(rowKey) {
        aggregateExpr.map { it.createAccumulator() }
    }

    // perform accumulation
    accumulators.withIndex().forEach { accum ->
      val value = aggrInputValues[accum.index].getValue(rowIndex)
      accum.value.accumulate(value)
    }
 
    // create result batch containing final aggregate values
    val allocator = RootAllocator(Long.MAX_VALUE)
    val root = VectorSchemaRoot.create(schema.toArrow(), allocator)
    root.allocateNew()
    root.rowCount = map.size

    val builders = root.fieldVectors.map { ArrowVectorBuilder(it) }

    map.entries.withIndex().forEach { entry ->
      val rowIndex = entry.index
      val groupingKey = entry.value.key
      val accumulators = entry.value.value
      groupExpr.indices.forEach {
        builders[it].set(rowIndex, groupingKey[it])
      }
      aggregateExpr.indices.forEach {
        builders[groupExpr.size+it].set(rowIndex, accumulators[it].finalValue())
      }
    }

    val outputBatch = RecordBatch(schema, root.fieldVectors.map { 
       ArrowFieldVector(it) 
    })
                                         
    return listOf(outputBatch).asSequence()
  }

}

有了物理计划,下一步就是建立一个查询规划器,从逻辑计划中创建物理计划。

本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。

Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。