逻辑计划与表达式

本章讨论的源代码可以在 KQuery 项目的 logical-plan 模块中找到。

逻辑计划代表一个具有已知表结构的关系(元组的集合)。每个逻辑计划可以有零个或多个逻辑计划作为输入。对于一个逻辑计划来说,暴露它的子计划可以使得用访问者模式来遍历该计划变得很方便。

interface LogicalPlan {
    fun schema(): Schema
    fun children(): List<LogicalPlan>
}

打印逻辑计划

能够以人类可读的形式打印逻辑计划是很重要的,因为这样能帮助调试。逻辑计划通常被打印成一个带有子节点缩进的层次结构。

我们可以实现一个简单的递归辅助函数来格式化一个逻辑计划。

fun format(plan: LogicalPlan, indent: Int = 0): String {
    val b = StringBuilder()
    0.rangeTo(indent).forEach { b.append("\t") }
    b.append(plan.toString()).append("\n")
    plan.children().forEach { b.append(format(it, indent+1)) }
    return b.toString()
}

下面是一个使用该方法格式化的逻辑计划的例子。

Projection: #id, #first_name, #last_name, #state, #salary
  Filter: #state = 'CO'
    Scan: employee.csv; projection=None

序列化

有时希望能够序列化查询计划,以便它们能够很容易地被转移到另一个进程。在早期添加序列化是一个很好的做法,可以防止意外地引用不能被序列化的数据结构(如文件句柄或数据库连接)。

一种方法是使用实现语言的默认机制,以JSON等格式对数据结构执行序列化 / 反序列化。在 Java 中,可以使用 Jackson 库。例如,Kotlin 有 kotlinx、 serialization 库,而 Rust 有 serde crate。

另一个选择是使用 Avro、Thrift 或 Protocol Buffers 定义一种语言无关的序列化格式,然后编写代码在这种格式和特定语言的实现之间进行转换。

自本书第一版出版以来,出现了一个名为 “substrait” 的新标准,其目标是为关系代数提供跨语言的序列化。我对这个项目感到兴奋,并预测它将成为代表查询计划的事实标准,并开辟许多集成的可能性。例如,可以使用一个成熟的基于 Java 的查询规划器,如 Apache Calcite,以 Substrait 格式序列化计划,然后在一个用低级语言(如 C++ 或 Rust)实现的查询引擎中执行该计划。欲了解更多信息,请访问https://substrait.io/。

逻辑表达式

查询计划的基本构件之一是表达式这个概念,它可以在运行时针对数据进行求值。

下面是一些在查询引擎中通常会支持的表达式的例子。

ExpressionExamples
Literal Value"hello", 12.34
Column Referenceuser_id, first_name, last_name
Math Expressionsalary * state_tax
Comparison Expressionx >= y
Boolean Expressionbirthday = today() AND age >= 21
Aggregate ExpressionMIN(salary), MAX(salary), SUM(salary), AVG(salary), COUNT(*)
Scalar FunctionCONCAT(first_name, " ", last_name)
Aliased Expressionsalary * 0.02 AS pay_increase

当然,所有这些表达式都可以被组合成深度嵌套的表达式树。表达式求值是递归编程的一个教科书式的案例。

当我们构建查询计划时,我们需要知道一些关于表达式输出的基本元数据。具体来说,我们需要有一个表达式的名称,以便其他表达式可以引用它,我们还需要知道表达式在求值时将产生的值的数据类型,以便我们可以验证查询计划是否有效。例如,如果我们有一个表达式 a + b,那么只有当 ab 都是数字类型时,它才是有效的。

还要注意的是,表达式的数据类型可能取决于输入数据。例如,一个列引用将具有它所引用的列的数据类型,但是一个比较表达式总是返回一个布尔值。

interface LogicalExpr {
  fun toField(input: LogicalPlan): Field
}

列表达式

Column 表达式简单地表示对一个具名列的引用。这个表达式的元数据是通过在输入中找到具名列并返回该列的元数据而得到的。请注意,这里的“列”指的是由输入逻辑计划产生的列,可以表示数据源中的一个列,也可以表示对其他输入执行表达式求值结果。

class Column(val name: String): LogicalExpr {

  override fun toField(input: LogicalPlan): Field {
    return input.schema().fields.find { it.name == name } ?: 
      throw SQLException("No column named '$name'")
  }

  override fun toString(): String {
    return "#$name"
  }

}

字面量表达式

我们需要将字面值表示为表达式的能力,这样我们就可以写出诸如 salary * 0.05 这样的表达式。

下面是一个关于字符串字面量表达式的例子。

class LiteralString(val str: String): LogicalExpr {

  override fun toField(input: LogicalPlan): Field {
    return Field(str, ArrowTypes.StringType)
  }

  override fun toString(): String {
    return "'$str'"
  }

}

下面是一个关于 long 类型字面量表达式的例子。

class LiteralLong(val n: Long): LogicalExpr {

  override fun toField(input: LogicalPlan): Field {
      return Field(n.toString(), ArrowTypes.Int64Type)
  }

  override fun toString(): String {
      return n.toString()
  }

}

二进制表达式是接受两个输入的简单表达式。我们要实现的二进制表达式有三类,它们是比较表达式、布尔表达式和数学表达式。因为所有这些表达式的字符串表示都是一样的,我们可以使用一个提供 toString 方法的通用基类。变量 “l” 和 “r” 是指左边和右边的输入。

abstract class BinaryExpr(
    val name: String,
    val op: String,
    val l: LogicalExpr,
    val r: LogicalExpr) : LogicalExpr {

  override fun toString(): String {
    return "$l $op $r"
  }
}

比较表达式,如 =<,用来比较两个相同数据类型的值并返回一个布尔值。我们还需要实现布尔运算符 AND 和 OR,它们也需要两个参数并产生一个布尔结果,所以我们也可以使用一个共同的基类来实现这些。

abstract class BooleanBinaryExpr(
    name: String,
    op: String,
    l: LogicalExpr,
    r: LogicalExpr) : BinaryExpr(name, op, l, r) {

  override fun toField(input: LogicalPlan): Field {
      return Field(name, ArrowTypes.BooleanType)
  }

}

该基类提供了一种简洁的方式来实现具体的比较表达式。

比较表达式

/** Equality (`=`) comparison */
class Eq(l: LogicalExpr, r: LogicalExpr)
    : BooleanBinaryExpr("eq", "=", l, r)

/** Inequality (`!=`) comparison */
class Neq(l: LogicalExpr, r: LogicalExpr)
    : BooleanBinaryExpr("neq", "!=", l, r)

/** Greater than (`>`) comparison */
class Gt(l: LogicalExpr, r: LogicalExpr)
    : BooleanBinaryExpr("gt", ">", l, r)

/** Greater than or equals (`>=`) comparison */
class GtEq(l: LogicalExpr, r: LogicalExpr)
    : BooleanBinaryExpr("gteq", ">=", l, r)

/** Less than (`<`) comparison */
class Lt(l: LogicalExpr, r: LogicalExpr)
    : BooleanBinaryExpr("lt", "<", l, r)

/** Less than or equals (`<=`) comparison */
class LtEq(l: LogicalExpr, r: LogicalExpr)
    : BooleanBinaryExpr("lteq", "<=", l, r)

布尔表达式

该基类还提供了一种简明的方式来实现具体的布尔逻辑表达式。

/** Logical AND */
class And(l: LogicalExpr, r: LogicalExpr)
    : BooleanBinaryExpr("and", "AND", l, r)

/** Logical OR */
class Or(l: LogicalExpr, r: LogicalExpr)
    : BooleanBinaryExpr("or", "OR", l, r)

数学表达式

数学表达式是另一种具体的二进制表达式。数学表达式通常作用于相同数据类型的值,并产生相同数据类型的结果。

abstract class MathExpr(
    name: String,
    op: String,
    l: LogicalExpr,
    r: LogicalExpr) : BinaryExpr(name, op, l, r) {

  override fun toField(input: LogicalPlan): Field {
      return Field("mult", l.toField(input).dataType)
  }

}

class Add(l: LogicalExpr, r: LogicalExpr) : MathExpr("add", "+", l, r)
class Subtract(l: LogicalExpr, r: LogicalExpr) : MathExpr("subtract", "-", l, r)
class Multiply(l: LogicalExpr, r: LogicalExpr) : MathExpr("mult", "*", l, r)
class Divide(l: LogicalExpr, r: LogicalExpr) : MathExpr("div", "/", l, r)
class Modulus(l: LogicalExpr, r: LogicalExpr) : MathExpr("mod", "%", l, r)

聚合表达式

聚合表达式在输入表达式上执行一个聚合函数,如 MIN、MAX、COUNT、SUM 或 AVG。

abstract class AggregateExpr(
    val name: String, 
    val expr: LogicalExpr) : LogicalExpr {

  override fun toField(input: LogicalPlan): Field {
    return Field(name, expr.toField(input).dataType)
  }

  override fun toString(): String {
    return "$name($expr)"
  }
}

对于聚合的数据类型与输入类型相同的聚合表达式,我们可以简单地扩展这个基类。

class Sum(input: LogicalExpr) : AggregateExpr("SUM", input)
class Min(input: LogicalExpr) : AggregateExpr("MIN", input)
class Max(input: LogicalExpr) : AggregateExpr("MAX", input)
class Avg(input: LogicalExpr) : AggregateExpr("AVG", input)

对于数据类型不依赖于输入类型的聚合表达式,我们需要覆盖 toField 方法。例如,“COUNT” 聚合表达式总是产生一个整数,而不管被计算的值是什么数据类型。

class Count(input: LogicalExpr) : AggregateExpr("COUNT", input) {

  override fun toField(input: LogicalPlan): Field {
    return Field("COUNT", ArrowTypes.Int32Type)
  }

  override fun toString(): String {
    return "COUNT($expr)"
  }
}

有了逻辑表达式,我们就可以针对查询引擎将支持的各种转换来实现逻辑计划了。

扫描(Scan)

Scan 逻辑计划表示从一个带有可选的投影的 DataSource 获取数据。Scan 是我们的查询引擎中唯一一个没有其他逻辑计划作为输入的逻辑计划。它是查询树中的一个叶子节点。

class Scan(
    val path: String, 
    val dataSource: DataSource, 
    val projection: List<String>): LogicalPlan {

  val schema = deriveSchema()

  override fun schema(): Schema {
    return schema
  }

  private fun deriveSchema() : Schema {
    val schema = dataSource.schema()
    if (projection.isEmpty()) {
      return schema
    } else {
      return schema.select(projection)
    }
  }

  override fun children(): List<LogicalPlan> {
    return listOf()
  }

  override fun toString(): String {
    return if (projection.isEmpty()) {
      "Scan: $path; projection=None"
    } else {
      "Scan: $path; projection=$projection"
    }
  }

}

投影(Projection)

Projection 逻辑计划会对其输入执行投影。投影是针对输入数据进行求值的一个表达式列表。有时这只是一个简单的列的列表,比如 SELECT a, b, c FROM foo,但是它也可以包括任何其他支持的表达式类型。一个更复杂的例子是 SELECT (CAST(a AS float) * 3.141592)) AS my_float FROM foo.

class Projection(
    val input: LogicalPlan, 
    val expr: List<LogicalExpr>): LogicalPlan {
    
  override fun schema(): Schema {
    return Schema(expr.map { it.toField(input) })
  }

  override fun children(): List<LogicalPlan> {
    return listOf(input)
  }

  override fun toString(): String {
    return "Projection: ${ expr.map { 
        it.toString() }.joinToString(", ") 
    }"
  }
}

选择(也叫过滤) (Selection (also known as Filter))

Selection 逻辑计划应用一个过滤表达式来决定哪些行应该被选择(包含)在其输出中。这是由 SQL 中的 WHERE 子句表示的。一个简单的例子是 SELECT * FROM foo WHERE a > 5。过滤表达式需要求值得到一个布尔值的结果。

class Selection(
    val input: LogicalPlan, 
    val expr: Expr): LogicalPlan {

  override fun schema(): Schema {
    // selection does not change the schema of the input
    return input.schema()
  }

  override fun children(): List<LogicalPlan> {
    return listOf(input)
  }

  override fun toString(): String {
    return "Filter: $expr"
  }
}

聚合(Aggregate)

Aggregate 逻辑计划比 ProjectionSelectionScan 更复杂,它计算底层数据的聚合,如计算数据的最小、最大、平均值以及总和。聚合数据通常由其他列(或表达式)分组。一个简单的例子是 SELECT job_title, AVG(salary) FROM employee GROUP BY job_title

class Aggregate(
    val input: LogicalPlan, 
    val groupExpr: List<LogicalExpr>, 
    val aggregateExpr: List<AggregateExpr>) : LogicalPlan {

  override fun schema(): Schema {
    return Schema(groupExpr.map { it.toField(input) } + 
         		  aggregateExpr.map { it.toField(input) })
  }

  override fun children(): List<LogicalPlan> {
    return listOf(input)
  }

  override fun toString(): String {
    return "Aggregate: groupExpr=$groupExpr, aggregateExpr=$aggregateExpr"
  }
}

请注意,在这个实现中,聚合计划的输出是用分组列来组织的,后面是聚合表达式。通常有必要将聚合逻辑计划包装在一个投影中,以便按照原始查询中要求的顺序返回那些列。

本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。

Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。