构建逻辑计划

本章讨论的源代码可以在 KQuery 项目的 logical-plan 模块中找到。

笨方法构建逻辑计划

现在我们已经为逻辑计划的一个子集定义了类,我们可以通过编程的方式将它们组合起来。

下面是一些粗略的代码,用于建立一个查询 SELECT * FROM employee WHERE state = 'CO' 的计划,该计划针对的是一个 CSV 文件,该文件包含 id,first_name,last_name,state,job_title,salary等列。

// create a plan to represent the data source val csv = CsvDataSource("employee.csv") // create a plan to represent the scan of the data source (FROM) val scan = Scan("employee", csv, listOf()) // create a plan to represent the selection (WHERE) val filterExpr = Eq(Column("state"), LiteralString("CO")) val selection = Selection(scan, filterExpr) // create a plan to represent the projection (SELECT) val projectionList = listOf(Column("id"), Column("first_name"), Column("last_name"), Column("state"), Column("salary")) val plan = Projection(selection, projectionList) // print the plan println(format(plan))

打印出的计划如下:

Projection: #id, #first_name, #last_name, #state, #salary Filter: #state = 'CO' Scan: employee; projection=None

同样的代码也可以写得更简洁,就像这样:

val plan = Projection( Selection( Scan("employee", CsvDataSource("employee.csv"), listOf()), Eq(Column(3), LiteralString("CO")) ), listOf(Column("id"), Column("first_name"), Column("last_name"), Column("state"), Column("salary")) ) println(format(plan))

虽然更简洁,但也更难解释,所以如果有一种更优雅的方式来创建逻辑计划就更好了。这就需要 DataFrame 接口来帮忙了。

使用 DataFrame 创建逻辑计划

实现 DataFrame 风格的 API 可以让我们以更友好的方式建立逻辑查询计划。DataFrame 只是逻辑查询计划的抽象,它有执行转换和操作的方法。它类似于流式风格的生成器 API。

下面是一个最小的 DataFrame 接口的初始版本,它允许我们对已有的 DataFrame 执行投影和选择。

interface DataFrame { /** Apply a projection */ fun project(expr: List<LogicalExpr>): DataFrame /** Apply a filter */ fun filter(expr: LogicalExpr): DataFrame /** Aggregate */ fun aggregate(groupBy: List<LogicalExpr>, aggregateExpr: List<AggregateExpr>): DataFrame /** Returns the schema of the data that will be produced by this DataFrame. */ fun schema(): Schema /** Get the logical plan */ fun logicalPlan() : LogicalPlan }

以下是这个接口的实现。

class DataFrameImpl(private val plan: LogicalPlan) : DataFrame { override fun project(expr: List<LogicalExpr>): DataFrame { return DataFrameImpl(Projection(plan, expr)) } override fun filter(expr: LogicalExpr): DataFrame { return DataFrameImpl(Selection(plan, expr)) } override fun aggregate(groupBy: List<LogicalExpr>, aggregateExpr: List<AggregateExpr>): DataFrame { return DataFrameImpl(Aggregate(plan, groupBy, aggregateExpr)) } override fun schema(): Schema { return plan.schema() } override fun logicalPlan(): LogicalPlan { return plan } }

在我们应用投影或选择之前,我们需要一种方法来创建一个代表底层数据源的初始 DataFrame。这通常是通过一个执行上下文获得的。

这里是一个简单的执行上下文的初始版本,我们将在后面改进它。

class ExecutionContext { fun csv(filename: String): DataFrame { return DataFrameImpl(Scan(filename, CsvDataSource(filename), listOf())) } fun parquet(filename: String): DataFrame { return DataFrameImpl(Scan(filename, ParquetDataSource(filename), listOf())) } }

有了这些基础工作,我们现在可以使用上下文和DataFrame API 创建逻辑查询计划。

val ctx = ExecutionContext() val plan = ctx.csv("employee.csv") .filter(Eq(Column("state"), LiteralString("CO"))) .select(listOf(Column("id"), Column("first_name"), Column("last_name"), Column("state"), Column("salary")))

这就比之前更简洁、更直观了,但我们还可以更进一步,增加一些方便的方法,使之更容易理解。这是 Kotlin 特有的,但其他语言也有类似的概念。

我们可以创建一些方便的方法来创建支持的表达式对象。

fun col(name: String) = Column(name) fun lit(value: String) = LiteralString(value) fun lit(value: Long) = LiteralLong(value) fun lit(value: Double) = LiteralDouble(value)

我们还可以在 LogicalExpr 接口上定义中置运算符,用于构建二进制表达式。

infix fun LogicalExpr.eq(rhs: LogicalExpr): LogicalExpr { return Eq(this, rhs) } infix fun LogicalExpr.neq(rhs: LogicalExpr): LogicalExpr { return Neq(this, rhs) } infix fun LogicalExpr.gt(rhs: LogicalExpr): LogicalExpr { return Gt(this, rhs) } infix fun LogicalExpr.gteq(rhs: LogicalExpr): LogicalExpr { return GtEq(this, rhs) } infix fun LogicalExpr.lt(rhs: LogicalExpr): LogicalExpr { return Lt(this, rhs) } infix fun LogicalExpr.lteq(rhs: LogicalExpr): LogicalExpr { return LtEq(this, rhs) }

有了这些方便的方法之后,就可以写出富有表现力的代码来建立逻辑查询计划了。

val df = ctx.csv(employeeCsv) .filter(col("state") eq lit("CO")) .select(listOf( col("id"), col("first_name"), col("last_name"), col("salary"), (col("salary") mult lit(0.1)) alias "bonus")) .filter(col("bonus") gt lit(1000))

本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。

Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。