查询规划器

本章讨论的源代码可以在 KQuery 项目的 query-planner 模块中找到。

我们已经定义了逻辑和物理查询计划,现在我们需要一个能够将逻辑计划转化为物理计划的查询规划器。

查询规划器可以根据配置选项或基于目标平台的硬件能力选择不同的物理计划。例如,查询可以在 CPU 或 GPU 上执行,在单个节点上执行,或在集群中分布式执行。

翻译逻辑表达式

第一步是定义一个方法,将逻辑表达式递归地翻译成物理表达式。下面的代码示例是基于 switch 语句的实现,展示了翻译一个二进制表达式(它有两个输入表达式)如何导致代码递归到同一个翻译这些输入的方法。这种方法遍历整个逻辑表达式树,并创建一个相应的物理表达式树。

fun createPhysicalExpr(expr: LogicalExpr, 
                       input: LogicalPlan): PhysicalExpr = when (expr) {
  is ColumnIndex -> ColumnExpression(expr.i)
  is LiteralString -> LiteralStringExpression(expr.str)
  is BinaryExpr -> {
    val l = createPhysicalExpr(expr.l, input)
    val r = createPhysicalExpr(expr.r, input)
    ...
  }
  ...
}

下面几节将解释每种类型的表达式实现。

列表达式

逻辑 Column 表达式通过名称引用列,但是物理表达式为了提高性能使用了列索引,所以查询规划器需要执行从列名到列索引的转换,如果列名无效就抛出一个异常。

这个简化的例子寻找第一个匹配的列名,而不检查是否有多个匹配的列,这应该是一个错误条件。

is Column -> {
  val i = input.schema().fields.indexOfFirst { it.name == expr.name }
  if (i == -1) {
    throw SQLException("No column named '${expr.name}'")
  }
  ColumnExpression(i)

字面量表达式

字面值的物理表达式是很直观的,从逻辑表达式到物理表达式的映射是微不足道的,我们只需要将字面值复制过来。

is LiteralLong -> LiteralLongExpression(expr.n)
is LiteralDouble -> LiteralDoubleExpression(expr.n)
is LiteralString -> LiteralStringExpression(expr.str)

二进制表达式

要为二进制表达式创建物理表达式,我们首先需要为左右输入创建物理表达式,然后我们需要创建具体的物理表达式。

is BinaryExpr -> {
  val l = createPhysicalExpr(expr.l, input)
  val r = createPhysicalExpr(expr.r, input)
  when (expr) {
    // comparision
    is Eq -> EqExpression(l, r)
    is Neq -> NeqExpression(l, r)
    is Gt -> GtExpression(l, r)
    is GtEq -> GtEqExpression(l, r)
    is Lt -> LtExpression(l, r)
    is LtEq -> LtEqExpression(l, r)

    // boolean
    is And -> AndExpression(l, r)
    is Or -> OrExpression(l, r)

    // math
    is Add -> AddExpression(l, r)
    is Subtract -> SubtractExpression(l, r)
    is Multiply -> MultiplyExpression(l, r)
    is Divide -> DivideExpression(l, r)

    else -> throw IllegalStateException(
        "Unsupported binary expression: $expr")
    }
}

翻译逻辑计划

需要实现一个递归函数来遍历逻辑计划树并将其翻译成物理计划,使用前面描述的翻译表达式的相同模式。

fun createPhysicalPlan(plan: LogicalPlan) : PhysicalPlan {
  return when (plan) {
    is Scan -> ...
    is Selection -> ...
    ...
}

扫描(Scan)

翻译扫描计划只需要复制数据源的引用以及逻辑计划的投影。

is Projection -> {
  val input = createPhysicalPlan(plan.input)
  val projectionExpr = plan.expr.map { createPhysicalExpr(it, plan.input) }
  val projectionSchema = Schema(plan.expr.map { it.toField(plan.input) })
  ProjectionExec(input, projectionSchema, projectionExpr)
}

投影(Projection)

翻译投影有两个步骤。首先,我们需要为投影的输入创建一个物理计划,然后我们需要将投影的逻辑表达式转换成物理表达式。

is Projection -> {
  val input = createPhysicalPlan(plan.input)
  val projectionExpr = plan.expr.map { createPhysicalExpr(it, plan.input) }
  val projectionSchema = Schema(plan.expr.map { it.toField(plan.input) })
  ProjectionExec(input, projectionSchema, projectionExpr)
}

选择(也叫过滤) (Selection (also known as Filter))

选择查询计划的处理步骤与投影非常相似。

is Selection -> {
  val input = createPhysicalPlan(plan.input)
  val filterExpr = createPhysicalExpr(plan.expr, plan.input)
  SelectionExec(input, filterExpr)
}

聚合(Aggregate)

聚合查询的查询计划步骤包括对定义可选分组键的表达式求值,和对聚合函数输入表达式求值,然后创建物理聚合表达式。

is Aggregate -> {
  val input = createPhysicalPlan(plan.input)
  val groupExpr = plan.groupExpr.map { createPhysicalExpr(it, plan.input) }
  val aggregateExpr = plan.aggregateExpr.map {
    when (it) {
      is Max -> MaxExpression(createPhysicalExpr(it.expr, plan.input))
      is Min -> MinExpression(createPhysicalExpr(it.expr, plan.input))
      is Sum -> SumExpression(createPhysicalExpr(it.expr, plan.input))
      else -> throw java.lang.IllegalStateException(
          "Unsupported aggregate function: $it")
    }
  }
  HashAggregateExec(input, groupExpr, aggregateExpr, plan.schema())
}

本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。

Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。