译者序
用两三天的时间,跟着 Andy 大佬的细致讲解,从零开始实现一个从 SQL 解析到执行的完整的查询引擎,为什么不试试呢? :)
英文原版地址:howqueryengineswork.com
译者:teckick,开源 / 数据库爱好者。
如果对翻译有任何疑问和建议,欢迎通过 teckickin@gmail.com 与我联系!
致谢
如果没有我的家人的支持,这本书是不可能完成的,当我沉浸于另一个副业时,他们对我有极大的耐心。
特别感谢 Matthew Powers 或者称他为 Powers 先生,是他最初鼓励我写这本书。Matthew 是《编写漂亮的Apache Spark代码》一书的作者,该书也可以在 Leanpub 上找到。
我还要感谢过去几年在我做 DataFusion 项目时与我互动的无数人,特别是 Apache Arrow PMC、提交者和贡献者。
最后,我要感谢 Chris George 和 Joe Buszkiewic 在 RMS 工作时的支持和鼓励,在那里我进一步加深了对查询引擎的理解。
本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。
Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。
序言
自从开始我在软件工程领域的第一份工作以来,我就一直对数据库和查询语言着迷。向计算机提出问题并有效地获得有意义的数据,这似乎是一种神奇的事情。在作为一个普通的软件开发人员和数据技术的终端用户工作多年后,我开始为一家创业公司工作,这让我进入了分布式数据库开发的深渊。这是我希望在我开始这段旅程时就有的一本书。虽然这只是一本入门级的书,但我希望能揭开查询引擎工作的神秘面纱。
我对查询引擎的兴趣最终导致我参与了 Apache Arrow 项目,我在 2018 年捐赠了最初的 Rust 实现,然后在 2019 年捐赠了 DataFusion 内存查询引擎,最后,在 2021 年捐赠了 Ballista 分布式计算项目。我不打算在 Arrow 项目之外构建其他东西,现在继续在 Arrow 内部为这些项目做贡献。
现在 Arrow 项目有很多活跃的提交者和贡献者在进行 Rust 的实现,与我最初的贡献相比,它已经有了很大的改进。
虽然 Rust 是高性能查询引擎的最佳选择,但它并不适合教授围绕查询引擎的概念,所以我最近在写这本书时建立了一个新的查询引擎,用 Kotlin 实现。Kotlin 是一种非常简洁的语言,而且易于阅读,因此有可能在本书中加入源代码的例子。希望你在阅读本书的过程中熟悉源代码,并考虑做出一些贡献。没有什么比获得一些实践经验更好的学习方法了!
本书中所涉及的查询引擎最初打算成为 Ballista 项目的一部分(而且曾经有过一段时间),但随着项目的发展,很明显,将查询引擎保留在 Rust 中,并通过 UDF 机制支持 Java 和其他语言,比在多种语言中重复大量的查询执行逻辑更有意义。
现在 Ballista 已经捐给了 Apache Arrow,我已经更新了这本书,在配套的 代码仓库 中把查询引擎简单地称为 “KQuery”,即 Kotlin 查询引擎的简称,但如果有人对更好的名字有建议,请告诉我!
反馈
本书的更新内容将在上市后免费提供,所以请偶尔回来看看,或者在 Twitter 上关注我(@andygrove_io),以便收到新内容的通知。
本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。
Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。
什么是查询引擎?
查询引擎是一个软件,可以对数据执行查询以产生问题的答案,例如:
- 今年到目前为止,我每个月的平均销售额是多少?
- 在过去的一天,我的网站上最受欢迎的五个网页是什么?
- 网络流量与前一年的逐月对比情况如何?
最普遍的查询语言是 结构化查询语言(缩写为 SQL)。许多开发者在其职业生涯的某个阶段都会用到关系型数据库,如 MySQL、Postgres、Oracle 或 SQL Server。所有这些数据库都包含支持 SQL 的查询引擎。
以下是一些SQL查询的例子。
SQL示例:各月平均销售额
SELECT month, AVG(sales)
FROM product_sales
WHERE year = 2020
GROUP BY month;
SQL实例:昨天的前5个网页
SELECT page_url, COUNT(*) AS num_visits
FROM apache_log
WHERE event_date = yesterday()
GROUP BY page_url
ORDER BY num_visits DESC
LIMIT 5;
SQL 功能强大且被广泛理解,但在所谓的“大数据”世界中却有局限性,数据科学家往往需要在他们的查询中混入自定义代码。Apache Hadoop、Apache Hive 和 Apache Spark 等平台和工具现在被广泛用于查询和处理庞大的数据量。
这里有一个例子,演示了如何使用 Apache Spark 来对 Parquet 数据集进行简单的聚合查询。Spark 的真正威力在于,这个查询可以在一台笔记本电脑上运行,也可以在一个由数百台服务器组成的集群上运行,中间不需要修改代码。
使用数据帧进行 Apache Spark 查询的示例:
val spark: SparkSession = SparkSession.builder
.appName("Example")
.master("local[*]")
.getOrCreate()
val df = spark.read.parquet("/mnt/nyctaxi/parquet")
.groupBy("passenger_count")
.sum("fare_amount")
.orderBy("passenger_count")
df.show()
为什么查询引擎很流行?
数据正在以越来越快的速度增长,而且往往无法在一台计算机上容纳。编写适合分布式执行的查询数据的代码需要专业的工程技能,而且每次需要从数据中获得新的答案时都要编写自定义代码是不实际的。
查询引擎提供了一套标准的操作和转换,终端用户可以通过简单的查询语言或应用编程接口以不同的方式进行组合,并为达到良好的性能进行了调优。
本书内容
本书概述了构建通用查询引擎的每一个步骤。
本书讨论的查询引擎是一个专门为本书开发的、简单的查询引擎,代码是在编写本书内容的同时开发的,以确保我在面临设计决策时可以写出相关的主题。
源代码
本书中讨论的查询引擎的完整源代码位于以下 GitHub 仓库。
https://github.com/andygrove/how-query-engines-work
请参考项目中的README,了解使用 Gradle 构建本项目的最新说明。
为什么用Kotlin?
本书主要关注查询引擎的设计,这通常是与编程语言无关的。我为本书选择了 Kotlin 是因为它简明扼要而且易于理解。它还与 Java 100%兼容,这意味着你可以从 Java 以及其他基于 Java 的语言(如 Scala)调用 Kotlin 代码。
不过,Apache Arrow 项目中的 DataFusion 查询引擎也主要是基于本书的设计。相比于 JVM,对 Rust 更感兴趣的读者可以结合本书参考 DataFusion 的源代码。
本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。
Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。
Apache Arrow
Apache Arrow 最初是一种列式数据的内存格式规范,在 Java 和 C++ 语言中都有实现。这种内存格式对于现代硬件上的向量化处理是有效的,比如支持 SIMD(单指令多数据)的 CPU 和 GPU。
拥有标准化的内存数据格式有几个好处:
- 高级语言(如 Python 或 Java)可以通过传递指向数据的指针的方式来调用使用低级语言(如 Rust 或 C++)编写的计算密集型任务,而不是以不同的格式复制数据,这将是非常昂贵的。
- 数据可以在进程之间有效地传输,没有太多的序列化开销,因为内存数据格式也是网络传输数据格式(尽管数据也可以被压缩)。
- 它会让在数据科学和数据分析领域的各种开源和商业项目之间建立连接器、驱动程序和集成变得更加容易,并允许开发人员使用他们最喜欢的语言来利用这些平台。
Apache Arrow 现在有许多编程语言的实现,包括 C, C++, C#, Go, Java, JavaScript, Julia, MATLAB, Python, R, Ruby, 和 Rust。
Arrow 内存模型
Arrow网站 上面有内存模型的详细描述,但从根本上来说,每一列都由一个持有原始数据的向量表示,同时还有单独的向量表示空值和可变宽度类型的原始数据中的偏移。
进程间通信 (IPC)
如前所述,数据可以通过传递一个指向数据的指针在进程之间传递。然而,接收进程需要知道如何解释这些数据,因此需要定义一种 IPC 格式,用于交换元数据,如表结构信息。Arrow 使用 Google Flatbuffers 来定义元数据格式。
计算内核
Apache Arrow 的使用范围已经扩大到提供针对数据进行表达式求值的计算库。Java、C++、C、Python、Ruby、Go、Rust 和JavaScript 的实现都包含了用于在 Arrow 内存上进行计算的计算库。
由于本书主要是使用 Java 的实现,值得指出的是,Dremio 最近捐赠了 Gandiva 项目,这是一个 Java 库,可以将表达式编译成 LLVM 并支持 SIMD。JVM 开发者可以将操作委托给 Gandiva 库,并获得性能提升,这在原生 Java 中是不可能的。
Arrow Flight 协议
最近,Arrow 定义了一个 Flight 协议,用于在网络上有效地传输 Arrow 数据。Flight 是基于 gRPC 和 Google Protocol Buffers 的。
Flight 协议定义了一个具有以下方法的 FlightService:
Handshake
客户端和服务器之间进行握手。根据服务器的情况,握手可能需要确定未来操作中需要使用的令牌。请求和响应都是流,以允许多次往返,这取决于认证机制。
ListFlights
根据特定的条件获取可用的数据流的列表。大多数 flight 服务将公开一个或多个随时可供检索的数据流。这个 API 允许列出可供消费的数据流。用户也可以提供一个查询条件,这个条件可以限制通过该接口列出的数据流的子集。每个 flight 服务都允许自定义处理这些条件的方式。
GetFlightInfo
对于给定的 FlightDescriptor,获得关于如何消费该 flight 的信息。如果该接口的消费者已经可以确定要消费的具体 flight,那么这将是一个有用的接口。这个接口还可以让消费者通过指定的描述符生成一个 flight 流。例如,flight 描述符可能是一个包括 SQL 语句或将被执行的 Pickled Python 操作的东西。这些情况下,描述符将不会在 ListFlights 方法提供的可用数据流列表中出现,但会允许在特定的 flight 服务定义的时间内消费。
GetSchema
对于一个给定的 FlightDescriptor,获得 Schema.fbs::Schema 中描述的表结构。当消费者需要 flight 数据流的表结构时就会用到这个接口。与 GetFlightInfo 类似,这个接口可能会生成一个之前在 ListFlights 中没有的新 flight。
DoGet
检索与 referenced ticket 关联的特定描述符相关的单一数据流。一个 flight 可以由一个或多个数据流组成,其中每个流可以使用单独的 opaque ticket 来检索,flight 服务会用它来管理一个数据流的集合。
DoPut
将一个数据流推送到与某个 flight 服务的 flight 数据流上面。这个接口允许 flight 服务的客户端上传一段流式数据。不同的 flight 服务将会允许客户端消费者对每个描述符上传一段流数据或者无限数量。在后一种情况下,flight 服务需要实现一个 "密封 "动作,一旦所有数据流被上传,就可以 apply 这个描述符。
DoExchange
为给定的描述符打开一个双向数据通道。这允许客户端在一个单一的逻辑数据流中发送和接收任意的 Arrow 数据和应用相关的元数据。与 DoGet / DoPut 相比,这更适合于客户端将计算(而不是存储)卸载给 Flight 服务。
DoAction
除了可能的 ListFlights、GetFlightInfo、DoGet、DoPut 操作外, flight 服务还可以支持任意数量的简单操作。DoAction 允许 flight 客户端对 flight 服务执行一个特定的操作。一个操作包括不透明的请求和响应对象,这些对象是针对正在进行的行动类型的。
ListActions
一个 flight 服务公开了它拥有的所有可用的操作类型及其描述。这使不同的 flight 消费者能够理解 flight 服务所能提供的能力。
Arrow Flight SQL
有一项提议是在 Arrow Flight 中增加 SQL 功能。在撰写本文时(2021年1月),有一个用 C++ 实现的 PR,其跟踪的问题是 ARROW-14698。
查询引擎
DataFusion
Arrow 的 Rust 实现包含一个名为 DataFusion 的内存查询引擎,该引擎在 2019 年被捐赠给该项目。这个项目正在迅速成熟,并且越来越受欢迎。例如,InfluxData 正在使用 DataFusion 构建下一代 InfluxDB 的内核。
Ballista
Ballista 是一个主要用 Rust 实现的分布式计算平台,由 Apache Arrow 驱动。它建立了一个架构,允许其他编程语言(如Python、C++和Java)作为一等公民被支持,而不需要付出序列化带来的成本损耗。
Ballista 的基础技术包括:
- Apache Arrow,用于内存模型和类型系统。
- Apache Arrow Flight 协议,用于进程之间的高效数据传输。
- Google Protocol Buffers,用于序列化查询计划。
- Docker,用于将执行器与用户自定义代码打包在一起。
- Kubernetes,用于部署和管理执行器docker容器。
Ballista 在 2021 年被捐赠给 Arrow 项目。尽管它能够以良好的性能运行流行的 TPC-H 基准的一些查询,但还没有准备好用于生产。
C++ 查询引擎
对于 C++ 实现,目前正在实现一个查询引擎,目前的重点是实现高效计算原语和 Dataset API。
本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。
Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。
选择类型系统
本章讨论的源代码可以在 KQuery 项目的 datatypes 模块中找到。
构建查询引擎的第一步是选择一个类型系统来表示查询引擎处理的不同类型的数据。一种选择是发明一个专门用于查询引擎的专有类型系统。另一种选择是使用查询引擎所面向的数据源的类型系统。
如果查询引擎要支持查询多个数据源(通常是这样),那么在每个支持的数据源和查询引擎的类型系统之间可能需要一些转换,使用一个能够代表所有支持的数据源的所有数据类型的类型系统将是很重要的。
基于行还是基于列?
一个重要的考虑因素是查询引擎是否会逐行处理数据,或者是否会以列的格式表示数据。
现在的许多查询引擎都是基于 火山查询规划器 的,其中物理计划中的每一步基本上都是对行的迭代器。这种模型实现起来比较简单,但往往会引入每行的开销,在对数十亿行进行查询时,开销会迅速增加。这种开销可以通过对成批的数据进行迭代来减少。此外,如果这些批次代表列数据而不是行,就有可能使用“向量化处理”并利用 SIMD(单指令多数据)的优势,用一条 CPU 指令处理一列中的多个值。通过利用 GPU 来并行处理更大数量的数据,这个概念还可以更进一步。
互操作性
另一个考虑是,我们希望能够用多种编程语言访问查询引擎。查询引擎的用户通常会使用 Python、R 或 Java 等语言。也许我们还想开发 ODBC 或 JDBC 驱动,以便于建立集成。
基于这些要求,最好能找到一个行业标准来表示列式数据,并在进程之间有效地交换这些数据。
我相信 Apache Arrow 就提供了一个理想的基础,这可能并不令人惊讶。
类型系统
我们将使用 Apache Arrow 作为类型系统的基础。以下的 Arrow 类被用来表示表结构、字段和数据类型。
- Schema 为数据源或查询的结果提供元数据。一个 Schema 由一个或多个字段组成。
- Field 为 Schema 中的一个字段提供名称和数据类型,并指定它是否允许空值。
- FieldVector 为一个 Field 的数据提供列式存储。
- ArrowType 表示一种数据类型。
KQuery 引入了一些额外的类和帮助类,作为 Apache Arrow 类型系统的一个抽象。
KQuery 提供常量,可以被支持的Arrow数据类型引用。
object ArrowTypes {
val BooleanType = ArrowType.Bool()
val Int8Type = ArrowType.Int(8, true)
val Int16Type = ArrowType.Int(16, true)
val Int32Type = ArrowType.Int(32, true)
val Int64Type = ArrowType.Int(64, true)
val UInt8Type = ArrowType.Int(8, false)
val UInt16Type = ArrowType.Int(16, false)
val UInt32Type = ArrowType.Int(32, false)
val UInt64Type = ArrowType.Int(64, false)
val FloatType = ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
val DoubleType = ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
val StringType = ArrowType.Utf8()
}
KQuery 没有直接使用 FieldVector,而是引入了 ColumnVector 接口作为抽象,以提供更方便的访问方法,避免了为每个数据类型提供特定的 FieldVector 实现。
interface ColumnVector {
fun getType(): ArrowType
fun getValue(i: Int) : Any?
fun size(): Int
}
这种抽象也使得将标量值转换为向量成为可能,避免了创建和填充一个 FieldVector,为列中的每个索引重复一个字面量。
class LiteralValueVector(
val arrowType: ArrowType,
val value: Any?,
val size: Int) : ColumnVector {
override fun getType(): ArrowType {
return arrowType
}
override fun getValue(i: Int): Any? {
if (i<0 || i>=size) {
throw IndexOutOfBoundsException()
}
return value
}
override fun size(): Int {
return size
}
}
KQuery 还提供了一个 RecordBatch 类来表示一批列式数据。
class RecordBatch(val schema: Schema, val fields: List<ColumnVector>) {
fun rowCount() = fields.first().size()
fun columnCount() = fields.size
/** Access one column by index */
fun field(i: Int): ColumnVector {
return fields[i]
}
}
本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。
Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。
数据源
本章讨论的源代码可以在 KQuery 项目的 datasource 模块中找到。
如果无法从数据源读取数据,查询引擎就没有什么用处,我们希望能够支持多个数据源,所以创建一个查询引擎与数据源交互的接口是很重要的。这也允许用户将查询引擎用于自定义数据源。数据源通常是文件或数据库,但也可能是内存对象。
数据源接口
在构建查询计划时,了解数据源的表结构是很重要的,这样可以验证查询计划,以确保引用的列存在,并且数据类型与作用于它们的表达式兼容。在某些情况下,表结构可能不可用,因为有些数据源没有固定的表结构,通常被称为 “无表结构”。JSON文档就是无表结构数据源的一个例子。
在查询执行过程中,我们需要从数据源中获取数据的能力,并且需要能够指定哪些列要加载到内存中以提高效率。如果查询没有引用这些列,那么将其加载到内存中是没有意义的。
KQuery DataSource 接口
interface DataSource {
/** Return the schema for the underlying data source */
fun schema(): Schema
/** Scan the data source, selecting the specified columns */
fun scan(projection: List<String>): Sequence<RecordBatch>
}
数据源示例
在数据科学或分析中经常会遇到一些数据源。
逗号分隔值 (CSV)
CSV 文件是文本文件,每行有一条记录,字段用逗号分隔,因此被称为 "逗号分隔值"。CSV 文件不包含表结构信息(除了文件中第一行的可选列名),尽管有可能通过先读取文件来获得表结构。这可能是一个昂贵的操作。
JSON
JavaScript Object Notation 格式(JSON)是另一种流行的基于文本的文件格式。与 CSV 文件不同,JSON 文件是结构化的,可以存储复杂的嵌套数据类型。
Parquet
Parquet 是为提供一种压缩的、高效的列式数据表示而创建的,是 Hadoop 生态系统中一种流行的文件格式。Parquet 从一开始就考虑到复杂的嵌套数据结构,并使用 Dremel 论文中描述的 记录粉碎和组装算法。
Parquet 文件包含表结构信息,数据被存储在批次中(称为“行组”),每个批次由列组成。行组可以包含压缩的数据,也可以包含可选的元数据,如每一列的最小值和最大值。优化后的查询引擎可以使用这些元数据来决定在扫描过程中何时可以跳过行组。
Orc
优化行列(Orc)格式与 Parquet 类似。数据被存储在称为“条纹”的列式批次中。
本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。
Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。
逻辑计划与表达式
本章讨论的源代码可以在 KQuery 项目的 logical-plan 模块中找到。
逻辑计划代表一个具有已知表结构的关系(元组的集合)。每个逻辑计划可以有零个或多个逻辑计划作为输入。对于一个逻辑计划来说,暴露它的子计划可以使得用访问者模式来遍历该计划变得很方便。
interface LogicalPlan {
fun schema(): Schema
fun children(): List<LogicalPlan>
}
打印逻辑计划
能够以人类可读的形式打印逻辑计划是很重要的,因为这样能帮助调试。逻辑计划通常被打印成一个带有子节点缩进的层次结构。
我们可以实现一个简单的递归辅助函数来格式化一个逻辑计划。
fun format(plan: LogicalPlan, indent: Int = 0): String {
val b = StringBuilder()
0.rangeTo(indent).forEach { b.append("\t") }
b.append(plan.toString()).append("\n")
plan.children().forEach { b.append(format(it, indent+1)) }
return b.toString()
}
下面是一个使用该方法格式化的逻辑计划的例子。
Projection: #id, #first_name, #last_name, #state, #salary
Filter: #state = 'CO'
Scan: employee.csv; projection=None
序列化
有时希望能够序列化查询计划,以便它们能够很容易地被转移到另一个进程。在早期添加序列化是一个很好的做法,可以防止意外地引用不能被序列化的数据结构(如文件句柄或数据库连接)。
一种方法是使用实现语言的默认机制,以JSON等格式对数据结构执行序列化 / 反序列化。在 Java 中,可以使用 Jackson 库。例如,Kotlin 有 kotlinx、 serialization 库,而 Rust 有 serde crate。
另一个选择是使用 Avro、Thrift 或 Protocol Buffers 定义一种语言无关的序列化格式,然后编写代码在这种格式和特定语言的实现之间进行转换。
自本书第一版出版以来,出现了一个名为 “substrait” 的新标准,其目标是为关系代数提供跨语言的序列化。我对这个项目感到兴奋,并预测它将成为代表查询计划的事实标准,并开辟许多集成的可能性。例如,可以使用一个成熟的基于 Java 的查询规划器,如 Apache Calcite,以 Substrait 格式序列化计划,然后在一个用低级语言(如 C++ 或 Rust)实现的查询引擎中执行该计划。欲了解更多信息,请访问https://substrait.io/。
逻辑表达式
查询计划的基本构件之一是表达式这个概念,它可以在运行时针对数据进行求值。
下面是一些在查询引擎中通常会支持的表达式的例子。
Expression | Examples |
---|---|
Literal Value | "hello", 12.34 |
Column Reference | user_id, first_name, last_name |
Math Expression | salary * state_tax |
Comparison Expression | x >= y |
Boolean Expression | birthday = today() AND age >= 21 |
Aggregate Expression | MIN(salary), MAX(salary), SUM(salary), AVG(salary), COUNT(*) |
Scalar Function | CONCAT(first_name, " ", last_name) |
Aliased Expression | salary * 0.02 AS pay_increase |
当然,所有这些表达式都可以被组合成深度嵌套的表达式树。表达式求值是递归编程的一个教科书式的案例。
当我们构建查询计划时,我们需要知道一些关于表达式输出的基本元数据。具体来说,我们需要有一个表达式的名称,以便其他表达式可以引用它,我们还需要知道表达式在求值时将产生的值的数据类型,以便我们可以验证查询计划是否有效。例如,如果我们有一个表达式 a + b
,那么只有当 a
和 b
都是数字类型时,它才是有效的。
还要注意的是,表达式的数据类型可能取决于输入数据。例如,一个列引用将具有它所引用的列的数据类型,但是一个比较表达式总是返回一个布尔值。
interface LogicalExpr {
fun toField(input: LogicalPlan): Field
}
列表达式
Column
表达式简单地表示对一个具名列的引用。这个表达式的元数据是通过在输入中找到具名列并返回该列的元数据而得到的。请注意,这里的“列”指的是由输入逻辑计划产生的列,可以表示数据源中的一个列,也可以表示对其他输入执行表达式求值结果。
class Column(val name: String): LogicalExpr {
override fun toField(input: LogicalPlan): Field {
return input.schema().fields.find { it.name == name } ?:
throw SQLException("No column named '$name'")
}
override fun toString(): String {
return "#$name"
}
}
字面量表达式
我们需要将字面值表示为表达式的能力,这样我们就可以写出诸如 salary * 0.05
这样的表达式。
下面是一个关于字符串字面量表达式的例子。
class LiteralString(val str: String): LogicalExpr {
override fun toField(input: LogicalPlan): Field {
return Field(str, ArrowTypes.StringType)
}
override fun toString(): String {
return "'$str'"
}
}
下面是一个关于 long 类型字面量表达式的例子。
class LiteralLong(val n: Long): LogicalExpr {
override fun toField(input: LogicalPlan): Field {
return Field(n.toString(), ArrowTypes.Int64Type)
}
override fun toString(): String {
return n.toString()
}
}
二进制表达式是接受两个输入的简单表达式。我们要实现的二进制表达式有三类,它们是比较表达式、布尔表达式和数学表达式。因为所有这些表达式的字符串表示都是一样的,我们可以使用一个提供 toString 方法的通用基类。变量 “l” 和 “r” 是指左边和右边的输入。
abstract class BinaryExpr(
val name: String,
val op: String,
val l: LogicalExpr,
val r: LogicalExpr) : LogicalExpr {
override fun toString(): String {
return "$l $op $r"
}
}
比较表达式,如 =
或 <
,用来比较两个相同数据类型的值并返回一个布尔值。我们还需要实现布尔运算符 AND 和 OR,它们也需要两个参数并产生一个布尔结果,所以我们也可以使用一个共同的基类来实现这些。
abstract class BooleanBinaryExpr(
name: String,
op: String,
l: LogicalExpr,
r: LogicalExpr) : BinaryExpr(name, op, l, r) {
override fun toField(input: LogicalPlan): Field {
return Field(name, ArrowTypes.BooleanType)
}
}
该基类提供了一种简洁的方式来实现具体的比较表达式。
比较表达式
/** Equality (`=`) comparison */
class Eq(l: LogicalExpr, r: LogicalExpr)
: BooleanBinaryExpr("eq", "=", l, r)
/** Inequality (`!=`) comparison */
class Neq(l: LogicalExpr, r: LogicalExpr)
: BooleanBinaryExpr("neq", "!=", l, r)
/** Greater than (`>`) comparison */
class Gt(l: LogicalExpr, r: LogicalExpr)
: BooleanBinaryExpr("gt", ">", l, r)
/** Greater than or equals (`>=`) comparison */
class GtEq(l: LogicalExpr, r: LogicalExpr)
: BooleanBinaryExpr("gteq", ">=", l, r)
/** Less than (`<`) comparison */
class Lt(l: LogicalExpr, r: LogicalExpr)
: BooleanBinaryExpr("lt", "<", l, r)
/** Less than or equals (`<=`) comparison */
class LtEq(l: LogicalExpr, r: LogicalExpr)
: BooleanBinaryExpr("lteq", "<=", l, r)
布尔表达式
该基类还提供了一种简明的方式来实现具体的布尔逻辑表达式。
/** Logical AND */
class And(l: LogicalExpr, r: LogicalExpr)
: BooleanBinaryExpr("and", "AND", l, r)
/** Logical OR */
class Or(l: LogicalExpr, r: LogicalExpr)
: BooleanBinaryExpr("or", "OR", l, r)
数学表达式
数学表达式是另一种具体的二进制表达式。数学表达式通常作用于相同数据类型的值,并产生相同数据类型的结果。
abstract class MathExpr(
name: String,
op: String,
l: LogicalExpr,
r: LogicalExpr) : BinaryExpr(name, op, l, r) {
override fun toField(input: LogicalPlan): Field {
return Field("mult", l.toField(input).dataType)
}
}
class Add(l: LogicalExpr, r: LogicalExpr) : MathExpr("add", "+", l, r)
class Subtract(l: LogicalExpr, r: LogicalExpr) : MathExpr("subtract", "-", l, r)
class Multiply(l: LogicalExpr, r: LogicalExpr) : MathExpr("mult", "*", l, r)
class Divide(l: LogicalExpr, r: LogicalExpr) : MathExpr("div", "/", l, r)
class Modulus(l: LogicalExpr, r: LogicalExpr) : MathExpr("mod", "%", l, r)
聚合表达式
聚合表达式在输入表达式上执行一个聚合函数,如 MIN、MAX、COUNT、SUM 或 AVG。
abstract class AggregateExpr(
val name: String,
val expr: LogicalExpr) : LogicalExpr {
override fun toField(input: LogicalPlan): Field {
return Field(name, expr.toField(input).dataType)
}
override fun toString(): String {
return "$name($expr)"
}
}
对于聚合的数据类型与输入类型相同的聚合表达式,我们可以简单地扩展这个基类。
class Sum(input: LogicalExpr) : AggregateExpr("SUM", input)
class Min(input: LogicalExpr) : AggregateExpr("MIN", input)
class Max(input: LogicalExpr) : AggregateExpr("MAX", input)
class Avg(input: LogicalExpr) : AggregateExpr("AVG", input)
对于数据类型不依赖于输入类型的聚合表达式,我们需要覆盖 toField 方法。例如,“COUNT” 聚合表达式总是产生一个整数,而不管被计算的值是什么数据类型。
class Count(input: LogicalExpr) : AggregateExpr("COUNT", input) {
override fun toField(input: LogicalPlan): Field {
return Field("COUNT", ArrowTypes.Int32Type)
}
override fun toString(): String {
return "COUNT($expr)"
}
}
有了逻辑表达式,我们就可以针对查询引擎将支持的各种转换来实现逻辑计划了。
扫描(Scan)
Scan
逻辑计划表示从一个带有可选的投影的 DataSource
获取数据。Scan
是我们的查询引擎中唯一一个没有其他逻辑计划作为输入的逻辑计划。它是查询树中的一个叶子节点。
class Scan(
val path: String,
val dataSource: DataSource,
val projection: List<String>): LogicalPlan {
val schema = deriveSchema()
override fun schema(): Schema {
return schema
}
private fun deriveSchema() : Schema {
val schema = dataSource.schema()
if (projection.isEmpty()) {
return schema
} else {
return schema.select(projection)
}
}
override fun children(): List<LogicalPlan> {
return listOf()
}
override fun toString(): String {
return if (projection.isEmpty()) {
"Scan: $path; projection=None"
} else {
"Scan: $path; projection=$projection"
}
}
}
投影(Projection)
Projection
逻辑计划会对其输入执行投影。投影是针对输入数据进行求值的一个表达式列表。有时这只是一个简单的列的列表,比如 SELECT a, b, c FROM foo
,但是它也可以包括任何其他支持的表达式类型。一个更复杂的例子是 SELECT (CAST(a AS float) * 3.141592)) AS my_float FROM foo
.
class Projection(
val input: LogicalPlan,
val expr: List<LogicalExpr>): LogicalPlan {
override fun schema(): Schema {
return Schema(expr.map { it.toField(input) })
}
override fun children(): List<LogicalPlan> {
return listOf(input)
}
override fun toString(): String {
return "Projection: ${ expr.map {
it.toString() }.joinToString(", ")
}"
}
}
选择(也叫过滤) (Selection (also known as Filter))
Selection
逻辑计划应用一个过滤表达式来决定哪些行应该被选择(包含)在其输出中。这是由 SQL 中的 WHERE 子句表示的。一个简单的例子是 SELECT * FROM foo WHERE a > 5
。过滤表达式需要求值得到一个布尔值的结果。
class Selection(
val input: LogicalPlan,
val expr: Expr): LogicalPlan {
override fun schema(): Schema {
// selection does not change the schema of the input
return input.schema()
}
override fun children(): List<LogicalPlan> {
return listOf(input)
}
override fun toString(): String {
return "Filter: $expr"
}
}
聚合(Aggregate)
Aggregate
逻辑计划比 Projection
、Selection
或 Scan
更复杂,它计算底层数据的聚合,如计算数据的最小、最大、平均值以及总和。聚合数据通常由其他列(或表达式)分组。一个简单的例子是 SELECT job_title, AVG(salary) FROM employee GROUP BY job_title
。
class Aggregate(
val input: LogicalPlan,
val groupExpr: List<LogicalExpr>,
val aggregateExpr: List<AggregateExpr>) : LogicalPlan {
override fun schema(): Schema {
return Schema(groupExpr.map { it.toField(input) } +
aggregateExpr.map { it.toField(input) })
}
override fun children(): List<LogicalPlan> {
return listOf(input)
}
override fun toString(): String {
return "Aggregate: groupExpr=$groupExpr, aggregateExpr=$aggregateExpr"
}
}
请注意,在这个实现中,聚合计划的输出是用分组列来组织的,后面是聚合表达式。通常有必要将聚合逻辑计划包装在一个投影中,以便按照原始查询中要求的顺序返回那些列。
本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。
Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。
构建逻辑计划
本章讨论的源代码可以在 KQuery 项目的 logical-plan 模块中找到。
笨方法构建逻辑计划
现在我们已经为逻辑计划的一个子集定义了类,我们可以通过编程的方式将它们组合起来。
下面是一些粗略的代码,用于建立一个查询 SELECT * FROM employee WHERE state = 'CO'
的计划,该计划针对的是一个 CSV 文件,该文件包含 id,first_name,last_name,state,job_title,salary
等列。
// create a plan to represent the data source
val csv = CsvDataSource("employee.csv")
// create a plan to represent the scan of the data source (FROM)
val scan = Scan("employee", csv, listOf())
// create a plan to represent the selection (WHERE)
val filterExpr = Eq(Column("state"), LiteralString("CO"))
val selection = Selection(scan, filterExpr)
// create a plan to represent the projection (SELECT)
val projectionList = listOf(Column("id"),
Column("first_name"),
Column("last_name"),
Column("state"),
Column("salary"))
val plan = Projection(selection, projectionList)
// print the plan
println(format(plan))
打印出的计划如下:
Projection: #id, #first_name, #last_name, #state, #salary
Filter: #state = 'CO'
Scan: employee; projection=None
同样的代码也可以写得更简洁,就像这样:
val plan = Projection(
Selection(
Scan("employee", CsvDataSource("employee.csv"), listOf()),
Eq(Column(3), LiteralString("CO"))
),
listOf(Column("id"),
Column("first_name"),
Column("last_name"),
Column("state"),
Column("salary"))
)
println(format(plan))
虽然更简洁,但也更难解释,所以如果有一种更优雅的方式来创建逻辑计划就更好了。这就需要 DataFrame 接口来帮忙了。
使用 DataFrame 创建逻辑计划
实现 DataFrame 风格的 API 可以让我们以更友好的方式建立逻辑查询计划。DataFrame 只是逻辑查询计划的抽象,它有执行转换和操作的方法。它类似于流式风格的生成器 API。
下面是一个最小的 DataFrame 接口的初始版本,它允许我们对已有的 DataFrame 执行投影和选择。
interface DataFrame {
/** Apply a projection */
fun project(expr: List<LogicalExpr>): DataFrame
/** Apply a filter */
fun filter(expr: LogicalExpr): DataFrame
/** Aggregate */
fun aggregate(groupBy: List<LogicalExpr>,
aggregateExpr: List<AggregateExpr>): DataFrame
/** Returns the schema of the data that will be produced by this DataFrame. */
fun schema(): Schema
/** Get the logical plan */
fun logicalPlan() : LogicalPlan
}
以下是这个接口的实现。
class DataFrameImpl(private val plan: LogicalPlan) : DataFrame {
override fun project(expr: List<LogicalExpr>): DataFrame {
return DataFrameImpl(Projection(plan, expr))
}
override fun filter(expr: LogicalExpr): DataFrame {
return DataFrameImpl(Selection(plan, expr))
}
override fun aggregate(groupBy: List<LogicalExpr>,
aggregateExpr: List<AggregateExpr>): DataFrame {
return DataFrameImpl(Aggregate(plan, groupBy, aggregateExpr))
}
override fun schema(): Schema {
return plan.schema()
}
override fun logicalPlan(): LogicalPlan {
return plan
}
}
在我们应用投影或选择之前,我们需要一种方法来创建一个代表底层数据源的初始 DataFrame。这通常是通过一个执行上下文获得的。
这里是一个简单的执行上下文的初始版本,我们将在后面改进它。
class ExecutionContext {
fun csv(filename: String): DataFrame {
return DataFrameImpl(Scan(filename, CsvDataSource(filename), listOf()))
}
fun parquet(filename: String): DataFrame {
return DataFrameImpl(Scan(filename, ParquetDataSource(filename), listOf()))
}
}
有了这些基础工作,我们现在可以使用上下文和DataFrame API 创建逻辑查询计划。
val ctx = ExecutionContext()
val plan = ctx.csv("employee.csv")
.filter(Eq(Column("state"), LiteralString("CO")))
.select(listOf(Column("id"),
Column("first_name"),
Column("last_name"),
Column("state"),
Column("salary")))
这就比之前更简洁、更直观了,但我们还可以更进一步,增加一些方便的方法,使之更容易理解。这是 Kotlin 特有的,但其他语言也有类似的概念。
我们可以创建一些方便的方法来创建支持的表达式对象。
fun col(name: String) = Column(name)
fun lit(value: String) = LiteralString(value)
fun lit(value: Long) = LiteralLong(value)
fun lit(value: Double) = LiteralDouble(value)
我们还可以在 LogicalExpr 接口上定义中置运算符,用于构建二进制表达式。
infix fun LogicalExpr.eq(rhs: LogicalExpr): LogicalExpr { return Eq(this, rhs) }
infix fun LogicalExpr.neq(rhs: LogicalExpr): LogicalExpr { return Neq(this, rhs) }
infix fun LogicalExpr.gt(rhs: LogicalExpr): LogicalExpr { return Gt(this, rhs) }
infix fun LogicalExpr.gteq(rhs: LogicalExpr): LogicalExpr { return GtEq(this, rhs) }
infix fun LogicalExpr.lt(rhs: LogicalExpr): LogicalExpr { return Lt(this, rhs) }
infix fun LogicalExpr.lteq(rhs: LogicalExpr): LogicalExpr { return LtEq(this, rhs) }
有了这些方便的方法之后,就可以写出富有表现力的代码来建立逻辑查询计划了。
val df = ctx.csv(employeeCsv)
.filter(col("state") eq lit("CO"))
.select(listOf(
col("id"),
col("first_name"),
col("last_name"),
col("salary"),
(col("salary") mult lit(0.1)) alias "bonus"))
.filter(col("bonus") gt lit(1000))
本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。
Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。
物理计划与表达式
本章讨论的源代码可以在 KQuery 项目的 physical-plan 模块中找到。
第五章中定义的逻辑计划规定了要做什么,但没有规定如何做,有独立的逻辑计划和物理计划是很好的做法,尽管有可能将它们结合起来以降低复杂性。
将逻辑计划和物理计划分开的一个原因是,有时可能有多种方式来执行一个特定的操作,这意味着逻辑计划和物理计划之间存在一对多的关系。
例如,单进程与分布式执行,或 CPU 与 GPU 执行,都可能有单独的物理计划。
另外,像聚合和连接这样的操作可以用各种算法来实现,有不同的性能权衡。当聚合已经被分组键排序的数据时,使用组聚合(也称为排序聚合)是很有效的,它一次只需要为一组分组键保持状态,并且在一组分组键结束时就可以发出结果。如果数据没有被排序,那么通常会使用哈希聚合。哈希聚合通过分组键维护一个 HashMap 的累加器。
连接有更多的算法,包括嵌套循环连接、排序-合并连接和哈希连接。
物理计划会返回一个针对记录批次的迭代器。
interface PhysicalPlan {
fun schema(): Schema
fun execute(): Sequence<RecordBatch>
fun children(): List<PhysicalPlan>
}
物理表达式
我们已经定义了逻辑计划中所引用的逻辑表达式,但是我们现在需要实现物理表达式类,其中包含了在运行时对表达式求值的代码。
每个逻辑表达式可以有多个物理表达式的实现。例如,对于将两个数字相加的逻辑表达式 AddExpr,我们可以分别拥有使用 CPU 和 GPU 执行计算的实现。查询规划器可以根据代码运行的服务器的硬件能力来选择使用哪一个。
物理表达式是针对记录批次进行求值的,结果是一些列。
下面是我们用来表示物理表达式的接口。
interface Expression {
fun evaluate(input: RecordBatch): ColumnVector
}
列表达式
Column
表达式简单地对正在处理的 RecordBatch 中的ColumnVector 的引用进行求值。Column
逻辑表达式通过名称来引用输入,这对编写查询很方便,但是对于物理表达式,我们想避免每次表达式求值时查找名称的开销,所以它通过索引来引用列。
class ColumnExpression(val i: Int) : Expression {
override fun evaluate(input: RecordBatch): ColumnVector {
return input.field(i)
}
override fun toString(): String {
return "#$i"
}
}
字面量表达式
字面表达式的物理实现仅仅是一个包装在类中的字面量,该类实现了相应的 trait,并为列中的每个索引提供相同的值。
class LiteralValueVector(
val arrowType: ArrowType,
val value: Any?,
val size: Int) : ColumnVector {
override fun getType(): ArrowType {
return arrowType
}
override fun getValue(i: Int): Any? {
if (i<0 || i>=size) {
throw IndexOutOfBoundsException()
}
return value
}
override fun size(): Int {
return size
}
}
有了这个类,我们就可以为每个数据类型的字面量表达式创建物理表达式。
class LiteralLongExpression(val value: Long) : Expression {
override fun evaluate(input: RecordBatch): ColumnVector {
return LiteralValueVector(ArrowTypes.Int64Type,
value,
input.rowCount())
}
}
class LiteralDoubleExpression(val value: Double) : Expression {
override fun evaluate(input: RecordBatch): ColumnVector {
return LiteralValueVector(ArrowTypes.DoubleType,
value,
input.rowCount())
}
}
class LiteralStringExpression(val value: String) : Expression {
override fun evaluate(input: RecordBatch): ColumnVector {
return LiteralValueVector(ArrowTypes.StringType,
value.toByteArray(),
input.rowCount())
}
}
二进制表达式
对于二进制表达式,我们需要求出左和右的输入表达式的值,然后针对这些输入值用具体的二进制运算符求值,因此我们可以提供一个基类来简化每个运算符的实现。
abstract class BinaryExpression(val l: Expression, val r: Expression) : Expression {
override fun evaluate(input: RecordBatch): ColumnVector {
val ll = l.evaluate(input)
val rr = r.evaluate(input)
assert(ll.size() == rr.size())
if (ll.getType() != rr.getType()) {
throw IllegalStateException(
"Binary expression operands do not have the same type: " +
"${ll.getType()} != ${rr.getType()}")
}
return evaluate(ll, rr)
}
abstract fun evaluate(l: ColumnVector, r: ColumnVector) : ColumnVector
}
比较表达式
比较表达式只是比较两个输入列中的所有值,并产生一个包含结果的新列(一个位向量)。
下面是一个相等运算符的例子。
class EqExpression(l: Expression,
r: Expression): BooleanExpression(l,r) {
override fun evaluate(l: Any?, r: Any?, arrowType: ArrowType) : Boolean {
return when (arrowType) {
ArrowTypes.Int8Type -> (l as Byte) == (r as Byte)
ArrowTypes.Int16Type -> (l as Short) == (r as Short)
ArrowTypes.Int32Type -> (l as Int) == (r as Int)
ArrowTypes.Int64Type -> (l as Long) == (r as Long)
ArrowTypes.FloatType -> (l as Float) == (r as Float)
ArrowTypes.DoubleType -> (l as Double) == (r as Double)
ArrowTypes.StringType -> toString(l) == toString(r)
else -> throw IllegalStateException(
"Unsupported data type in comparison expression: $arrowType")
}
}
}
数学表达式
数学表达式的实现与比较表达式的代码非常相似。所有的数学表达式都使用同一个基类。
abstract class MathExpression(l: Expression,
r: Expression): BinaryExpression(l,r) {
override fun evaluate(l: ColumnVector, r: ColumnVector): ColumnVector {
val fieldVector = FieldVectorFactory.create(l.getType(), l.size())
val builder = ArrowVectorBuilder(fieldVector)
(0 until l.size()).forEach {
val value = evaluate(l.getValue(it), r.getValue(it), l.getType())
builder.set(it, value)
}
builder.setValueCount(l.size())
return builder.build()
}
abstract fun evaluate(l: Any?, r: Any?, arrowType: ArrowType) : Any?
}
下面是一个扩展这个基类的具体数学表达式的例子。
class AddExpression(l: Expression,
r: Expression): MathExpression(l,r) {
override fun evaluate(l: Any?, r: Any?, arrowType: ArrowType) : Any? {
return when (arrowType) {
ArrowTypes.Int8Type -> (l as Byte) + (r as Byte)
ArrowTypes.Int16Type -> (l as Short) + (r as Short)
ArrowTypes.Int32Type -> (l as Int) + (r as Int)
ArrowTypes.Int64Type -> (l as Long) + (r as Long)
ArrowTypes.FloatType -> (l as Float) + (r as Float)
ArrowTypes.DoubleType -> (l as Double) + (r as Double)
else -> throw IllegalStateException(
"Unsupported data type in math expression: $arrowType")
}
}
override fun toString(): String {
return "$l+$r"
}
}
聚合表达式
到目前为止,我们所看的表达式都是从每个批次的一个或多个输入列中产生一个输出列。聚合表达式更加复杂,因为它们在多批数据中聚合数值,然后产生一个最终值,所以我们需要引入累加器的概念,而且每个聚合表达式的物理表示需要知道如何产生一个适当的累加器,以便查询引擎将输入数据传递给它。
下面是表示聚合表达式和累加器的主要接口。
interface AggregateExpression {
fun inputExpression(): Expression
fun createAccumulator(): Accumulator
}
interface Accumulator {
fun accumulate(value: Any?)
fun finalValue(): Any?
}
对 Max 聚合表达式的实现将产生一个相应的 MaxAccumulator。
class MaxExpression(private val expr: Expression) : AggregateExpression {
override fun inputExpression(): Expression {
return expr
}
override fun createAccumulator(): Accumulator {
return MaxAccumulator()
}
override fun toString(): String {
return "MAX($expr)"
}
}
下面是一个 MaxAccumulator 实现的示例。
class MaxAccumulator : Accumulator {
var value: Any? = null
override fun accumulate(value: Any?) {
if (value != null) {
if (this.value == null) {
this.value = value
} else {
val isMax = when (value) {
is Byte -> value > this.value as Byte
is Short -> value > this.value as Short
is Int -> value > this.value as Int
is Long -> value > this.value as Long
is Float -> value > this.value as Float
is Double -> value > this.value as Double
is String -> value > this.value as String
else -> throw UnsupportedOperationException(
"MAX is not implemented for data type: ${value.javaClass.name}")
}
if (isMax) {
this.value = value
}
}
}
}
override fun finalValue(): Any? {
return value
}
}
物理计划
有了物理表达式,我们现在可以为查询引擎支持的各种转换实现物理计划了。
扫描(Scan)
扫描执行计划只是委托给一个数据源,传入一个投影来限制加载到内存的列数。没有执行额外的逻辑。
class ScanExec(val ds: DataSource, val projection: List<String>) : PhysicalPlan {
override fun schema(): Schema {
return ds.schema().select(projection)
}
override fun children(): List<PhysicalPlan> {
// Scan is a leaf node and has no child plans
return listOf()
}
override fun execute(): Sequence<RecordBatch> {
return ds.scan(projection);
}
override fun toString(): String {
return "ScanExec: schema=${schema()}, projection=$projection"
}
}
投影(Projection)
投影执行计划只是针对输入列执行投影表达式,然后产生一个包含派生列的记录批次。请注意,对于通过名称引用现有列的投影表达式的情况,派生列只是对输入列的一个指针或引用,底层数据值并没有被复制。
class ProjectionExec(
val input: PhysicalPlan,
val schema: Schema,
val expr: List<Expression>) : PhysicalPlan {
override fun schema(): Schema {
return schema
}
override fun children(): List<PhysicalPlan> {
return listOf(input)
}
override fun execute(): Sequence<RecordBatch> {
return input.execute().map { batch ->
val columns = expr.map { it.evaluate(batch) }
RecordBatch(schema, columns)
}
}
override fun toString(): String {
return "ProjectionExec: $expr"
}
}
选择(也叫过滤) (Selection (also known as Filter))
选择执行计划是第一个非平凡的计划,因为它有条件逻辑来确定输入记录批次中的哪些行应该包括在输出批次中。
对于每个输入批次,执行过滤表达式以返回一个位向量,包含代表表达式的布尔结果的位,每行有一个。然后这个位向量被用来过滤输入列以产生新的输出列。这是一个简单的实现,可以针对位向量包含所有 1 或所有 0 的情况进行优化,以避免将数据复制到新向量的开销。
class SelectionExec(
val input: PhysicalPlan,
val expr: Expression) : PhysicalPlan {
override fun schema(): Schema {
return input.schema()
}
override fun children(): List<PhysicalPlan> {
return listOf(input)
}
override fun execute(): Sequence<RecordBatch> {
val input = input.execute()
return input.map { batch ->
val result = (expr.evaluate(batch) as ArrowFieldVector).field as BitVector
val schema = batch.schema
val columnCount = batch.schema.fields.size
val filteredFields = (0 until columnCount).map {
filter(batch.field(it), result)
}
val fields = filteredFields.map { ArrowFieldVector(it) }
RecordBatch(schema, fields)
}
private fun filter(v: ColumnVector, selection: BitVector) : FieldVector {
val filteredVector = VarCharVector("v",
RootAllocator(Long.MAX_VALUE))
filteredVector.allocateNew()
val builder = ArrowVectorBuilder(filteredVector)
var count = 0
(0 until selection.valueCount).forEach {
if (selection.get(it) == 1) {
builder.set(count, v.getValue(it))
count++
}
}
filteredVector.valueCount = count
return filteredVector
}
}
哈希聚合
哈希聚合计划是最复杂的计划,因为它必须处理所有进入的批次,维护一个 HashMap 累加器,并为正在处理的每一行更新累加器。最后,用累加器结果来创建一个包含聚合查询结果的记录批次。
class HashAggregateExec(
val input: PhysicalPlan,
val groupExpr: List<PhysicalExpr>,
val aggregateExpr: List<PhysicalAggregateExpr>,
val schema: Schema) : PhysicalPlan {
override fun schema(): Schema {
return schema
}
override fun children(): List<PhysicalPlan> {
return listOf(input)
}
override fun toString(): String {
return "HashAggregateExec: groupExpr=$groupExpr, aggrExpr=$aggregateExpr"
}
override fun execute(): Sequence<RecordBatch> {
val map = HashMap<List<Any?>, List<Accumulator>>()
// for each batch from the input executor
input.execute().iterator().forEach { batch ->
// evaluate the grouping expressions
val groupKeys = groupExpr.map { it.evaluate(batch) }
// evaluate the expressions that are inputs to the aggregate functions
val aggrInputValues = aggregateExpr.map {
it.inputExpression().evaluate(batch)
}
// for each row in the batch
(0 until batch.rowCount()).forEach { rowIndex ->
// create the key for the hash map
val rowKey = groupKeys.map {
val value = it.getValue(rowIndex)
when (value) {
is ByteArray -> String(value)
else -> value
}
}
// get or create accumulators for this grouping key
val accumulators = map.getOrPut(rowKey) {
aggregateExpr.map { it.createAccumulator() }
}
// perform accumulation
accumulators.withIndex().forEach { accum ->
val value = aggrInputValues[accum.index].getValue(rowIndex)
accum.value.accumulate(value)
}
// create result batch containing final aggregate values
val allocator = RootAllocator(Long.MAX_VALUE)
val root = VectorSchemaRoot.create(schema.toArrow(), allocator)
root.allocateNew()
root.rowCount = map.size
val builders = root.fieldVectors.map { ArrowVectorBuilder(it) }
map.entries.withIndex().forEach { entry ->
val rowIndex = entry.index
val groupingKey = entry.value.key
val accumulators = entry.value.value
groupExpr.indices.forEach {
builders[it].set(rowIndex, groupingKey[it])
}
aggregateExpr.indices.forEach {
builders[groupExpr.size+it].set(rowIndex, accumulators[it].finalValue())
}
}
val outputBatch = RecordBatch(schema, root.fieldVectors.map {
ArrowFieldVector(it)
})
return listOf(outputBatch).asSequence()
}
}
有了物理计划,下一步就是建立一个查询规划器,从逻辑计划中创建物理计划。
本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。
Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。
查询规划器
本章讨论的源代码可以在 KQuery 项目的 query-planner 模块中找到。
我们已经定义了逻辑和物理查询计划,现在我们需要一个能够将逻辑计划转化为物理计划的查询规划器。
查询规划器可以根据配置选项或基于目标平台的硬件能力选择不同的物理计划。例如,查询可以在 CPU 或 GPU 上执行,在单个节点上执行,或在集群中分布式执行。
翻译逻辑表达式
第一步是定义一个方法,将逻辑表达式递归地翻译成物理表达式。下面的代码示例是基于 switch 语句的实现,展示了翻译一个二进制表达式(它有两个输入表达式)如何导致代码递归到同一个翻译这些输入的方法。这种方法遍历整个逻辑表达式树,并创建一个相应的物理表达式树。
fun createPhysicalExpr(expr: LogicalExpr,
input: LogicalPlan): PhysicalExpr = when (expr) {
is ColumnIndex -> ColumnExpression(expr.i)
is LiteralString -> LiteralStringExpression(expr.str)
is BinaryExpr -> {
val l = createPhysicalExpr(expr.l, input)
val r = createPhysicalExpr(expr.r, input)
...
}
...
}
下面几节将解释每种类型的表达式实现。
列表达式
逻辑 Column
表达式通过名称引用列,但是物理表达式为了提高性能使用了列索引,所以查询规划器需要执行从列名到列索引的转换,如果列名无效就抛出一个异常。
这个简化的例子寻找第一个匹配的列名,而不检查是否有多个匹配的列,这应该是一个错误条件。
is Column -> {
val i = input.schema().fields.indexOfFirst { it.name == expr.name }
if (i == -1) {
throw SQLException("No column named '${expr.name}'")
}
ColumnExpression(i)
字面量表达式
字面值的物理表达式是很直观的,从逻辑表达式到物理表达式的映射是微不足道的,我们只需要将字面值复制过来。
is LiteralLong -> LiteralLongExpression(expr.n)
is LiteralDouble -> LiteralDoubleExpression(expr.n)
is LiteralString -> LiteralStringExpression(expr.str)
二进制表达式
要为二进制表达式创建物理表达式,我们首先需要为左右输入创建物理表达式,然后我们需要创建具体的物理表达式。
is BinaryExpr -> {
val l = createPhysicalExpr(expr.l, input)
val r = createPhysicalExpr(expr.r, input)
when (expr) {
// comparision
is Eq -> EqExpression(l, r)
is Neq -> NeqExpression(l, r)
is Gt -> GtExpression(l, r)
is GtEq -> GtEqExpression(l, r)
is Lt -> LtExpression(l, r)
is LtEq -> LtEqExpression(l, r)
// boolean
is And -> AndExpression(l, r)
is Or -> OrExpression(l, r)
// math
is Add -> AddExpression(l, r)
is Subtract -> SubtractExpression(l, r)
is Multiply -> MultiplyExpression(l, r)
is Divide -> DivideExpression(l, r)
else -> throw IllegalStateException(
"Unsupported binary expression: $expr")
}
}
翻译逻辑计划
需要实现一个递归函数来遍历逻辑计划树并将其翻译成物理计划,使用前面描述的翻译表达式的相同模式。
fun createPhysicalPlan(plan: LogicalPlan) : PhysicalPlan {
return when (plan) {
is Scan -> ...
is Selection -> ...
...
}
扫描(Scan)
翻译扫描计划只需要复制数据源的引用以及逻辑计划的投影。
is Projection -> {
val input = createPhysicalPlan(plan.input)
val projectionExpr = plan.expr.map { createPhysicalExpr(it, plan.input) }
val projectionSchema = Schema(plan.expr.map { it.toField(plan.input) })
ProjectionExec(input, projectionSchema, projectionExpr)
}
投影(Projection)
翻译投影有两个步骤。首先,我们需要为投影的输入创建一个物理计划,然后我们需要将投影的逻辑表达式转换成物理表达式。
is Projection -> {
val input = createPhysicalPlan(plan.input)
val projectionExpr = plan.expr.map { createPhysicalExpr(it, plan.input) }
val projectionSchema = Schema(plan.expr.map { it.toField(plan.input) })
ProjectionExec(input, projectionSchema, projectionExpr)
}
选择(也叫过滤) (Selection (also known as Filter))
选择查询计划的处理步骤与投影非常相似。
is Selection -> {
val input = createPhysicalPlan(plan.input)
val filterExpr = createPhysicalExpr(plan.expr, plan.input)
SelectionExec(input, filterExpr)
}
聚合(Aggregate)
聚合查询的查询计划步骤包括对定义可选分组键的表达式求值,和对聚合函数输入表达式求值,然后创建物理聚合表达式。
is Aggregate -> {
val input = createPhysicalPlan(plan.input)
val groupExpr = plan.groupExpr.map { createPhysicalExpr(it, plan.input) }
val aggregateExpr = plan.aggregateExpr.map {
when (it) {
is Max -> MaxExpression(createPhysicalExpr(it.expr, plan.input))
is Min -> MinExpression(createPhysicalExpr(it.expr, plan.input))
is Sum -> SumExpression(createPhysicalExpr(it.expr, plan.input))
else -> throw java.lang.IllegalStateException(
"Unsupported aggregate function: $it")
}
}
HashAggregateExec(input, groupExpr, aggregateExpr, plan.schema())
}
本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。
Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。
查询优化
本章讨论的源代码可以在 KQuery 项目的 optimizer 模块中找到。
我们现在有了实用的查询计划,但需要依靠终端用户以有效的方式构建计划。例如,我们希望用户在构建计划时尽可能早地进行过滤,特别是在连接之前,因为这减少了需要处理的数据量。
这是实现一个简单的基于规则的查询优化器的好时机,它可以重新安排查询计划,使其更有效率。
我们在第十一章开始支持 SQL 之后,这将变得更加重要,因为 SQL 语言只定义了查询的工作方式,不允许用户指定运算符和表达式的求值顺序。
基于规则的优化
基于规则的优化是一种简单而实用的方法,可以将常识性的优化应用到查询计划中。这些优化是在创建物理计划之前针对逻辑计划执行的。
这些优化工作是通过使用访问者模式遍历逻辑计划,并在计划中的每一步创建一个副本,并应用任何必要的修改。这种设计比在遍历计划时试图改变状态要简单得多,而且与期望不可变状态的函数式编程风格很一致。
我们将使用以下接口来表示优化器规则。
interface OptimizerRule {
fun optimize(plan: LogicalPlan) : LogicalPlan
}
投影下推
投影下推规则的目标是在从磁盘读取数据后,在查询执行的其他阶段之前尽快过滤掉列,以减少处理的数据量。
为了知道在查询中引用了哪些列,我们必须编写递归代码来检查表达式,并在累加器中建立一个列的列表。
fun extractColumns(expr: List<LogicalExpr>,
input: LogicalPlan,
accum: MutableSet<String>) {
expr.forEach { extractColumns(it, input, accum) }
}
fun extractColumns(expr: LogicalExpr,
input: LogicalPlan,
accum: MutableSet<String>) {
when (expr) {
is ColumnIndex -> accum.add(input.schema().fields[expr.i].name)
is Column -> accum.add(expr.name)
is BinaryExpr -> {
extractColumns(expr.l, input, accum)
extractColumns(expr.r, input, accum)
}
is Alias -> extractColumns(expr.expr, input, accum)
is CastExpr -> extractColumns(expr.expr, input, accum)
is LiteralString -> {}
is LiteralLong -> {}
is LiteralDouble -> {}
else -> throw IllegalStateException(
"extractColumns does not support expression: $expr")
}
}
有了这个实用的代码,我们就可以继续执行优化器规则了。请注意,对于投影
、选择
和聚合
计划,我们正在建立列名列表,但是当我们到达扫描(这是一个叶子节点)时,我们用一个有列名列表的扫描版本来替换它。
class ProjectionPushDownRule : OptimizerRule {
override fun optimize(plan: LogicalPlan): LogicalPlan {
return pushDown(plan, mutableSetOf())
}
private fun pushDown(plan: LogicalPlan,
columnNames: MutableSet<String>): LogicalPlan {
return when (plan) {
is Projection -> {
extractColumns(plan.expr, columnNames)
val input = pushDown(plan.input, columnNames)
Projection(input, plan.expr)
}
is Selection -> {
extractColumns(plan.expr, columnNames)
val input = pushDown(plan.input, columnNames)
Selection(input, plan.expr)
}
is Aggregate -> {
extractColumns(plan.groupExpr, columnNames)
extractColumns(plan.aggregateExpr.map { it.inputExpr() }, columnNames)
val input = pushDown(plan.input, columnNames)
Aggregate(input, plan.groupExpr, plan.aggregateExpr)
}
is Scan -> Scan(plan.name, plan.dataSource, columnNames.toList().sorted())
else -> throw new UnsupportedOperationException()
}
}
}
给出一个输入的逻辑计划:
Projection: #id, #first_name, #last_name
Filter: #state = 'CO'
Scan: employee; projection=None
这个优化器规则会把它转化为以下计划。
Projection: #id, #first_name, #last_name
Filter: #state = 'CO'
Scan: employee; projection=[first_name, id, last_name, state]
谓词下推
谓词下推优化的目的是在查询中尽可能早地过滤掉行,以避免重复处理。请看下面的例子,它连接了一个雇员表和部门表,然后过滤掉科罗拉多州的雇员。
Projection: #dept_name, #first_name, #last_name
Filter: #state = 'CO'
Join: #employee.dept_id = #dept.id
Scan: employee; projection=[first_name, id, last_name, state]
Scan: dept; projection=[id, dept_name]
这个查询将产生正确的结果,但是会有对所有雇员执行连接的开销,而不仅仅是那些在科罗拉多州的雇员。谓词下推规则将把过滤器下推到连接中,如下面的查询计划所示。
Projection: #dept_name, #first_name, #last_name
Join: #employee.dept_id = #dept.id
Filter: #state = 'CO'
Scan: employee; projection=[first_name, id, last_name, state]
Scan: dept; projection=[id, dept_name]
现在,连接将只处理雇员的子集,从而产生更好的性能。
基于成本的优化
基于成本的优化是指使用关于基础数据的统计数据来确定执行一个特定查询的成本,然后通过寻找一个低成本的执行计划来选择一个最佳执行计划。一个很好的例子是,根据被连接的表的大小来选择使用哪种连接算法。
基于成本的优化超出了本书的范围,尽管这个主题可能会被包括在未来的版本中。建议你看看 Apache Calcite 项目,以了解更多关于这个主题的信息。
本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。
Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。
查询执行
我们现在能够编写代码来执行针对 CSV 文件的优化过的查询。
在我们用 KQuery 执行查询之前,使用一个可信赖的替代方案是有用的,这样我们就能知道正确的结果应该是什么,并获得一些基线性能指标进行比较。
Apache Spark 示例
本章讨论的源代码可以在 KQuery 项目的 spark 模块中找到。
首先,我们需要创建一个 Spark 上下文。注意,我们使用单线程执行,这样我们就可以与 KQuery 中的单线程实现的性能做一个相对公平的比较。
val spark = SparkSession.builder()
.master("local[1]")
.getOrCreate()
接下来,我们需要将 CSV 文件注册到上下文的 DataFrame 中。
val schema = StructType(Seq(
StructField("VendorID", DataTypes.IntegerType),
StructField("tpep_pickup_datetime", DataTypes.TimestampType),
StructField("tpep_dropoff_datetime", DataTypes.TimestampType),
StructField("passenger_count", DataTypes.IntegerType),
StructField("trip_distance", DataTypes.DoubleType),
StructField("RatecodeID", DataTypes.IntegerType),
StructField("store_and_fwd_flag", DataTypes.StringType),
StructField("PULocationID", DataTypes.IntegerType),
StructField("DOLocationID", DataTypes.IntegerType),
StructField("payment_type", DataTypes.IntegerType),
StructField("fare_amount", DataTypes.DoubleType),
StructField("extra", DataTypes.DoubleType),
StructField("mta_tax", DataTypes.DoubleType),
StructField("tip_amount", DataTypes.DoubleType),
StructField("tolls_amount", DataTypes.DoubleType),
StructField("improvement_surcharge", DataTypes.DoubleType),
StructField("total_amount", DataTypes.DoubleType)
))
val tripdata = spark.read.format("csv")
.option("header", "true")
.schema(schema)
.load("/mnt/nyctaxi/csv/yellow_tripdata_2019-01.csv")
tripdata.createOrReplaceTempView("tripdata")
最后,我们可以继续对 DataFrame 执行 SQL。
val start = System.currentTimeMillis()
val df = spark.sql(
"""SELECT passenger_count, MAX(fare_amount)
|FROM tripdata
|GROUP BY passenger_count""".stripMargin)
df.foreach(row => println(row))
val duration = System.currentTimeMillis() - start
println(s"Query took $duration ms")
在我的台式机上面执行这段代码会产生以下输出。
[1,623259.86]
[6,262.5]
[3,350.0]
[5,760.0]
[9,92.0]
[4,500.0]
[8,87.0]
[7,78.0]
[2,492.5]
[0,36090.3]
Query took 14418 ms
KQuery 示例
本章讨论的源代码可以在 KQuery 项目的 examples 模块中找到。
下面是用 KQuery 实现的等价查询。请注意,这段代码与 Spark 的示例不同,因为 KQuery 还没有指定 CSV 文件表结构的选项,所以所有的数据类型都是字符串,这意味着我们需要在查询计划中添加一个显式的转换,将 fare_amount
列转换成数值类型。
val time = measureTimeMillis {
val ctx = ExecutionContext()
val df = ctx.csv("/mnt/nyctaxi/csv/yellow_tripdata_2019-01.csv", 1*1024)
.aggregate(
listOf(col("passenger_count")),
listOf(max(cast(col("fare_amount"), ArrowTypes.FloatType))))
val optimizedPlan = Optimizer().optimize(df.logicalPlan())
val results = ctx.execute(optimizedPlan)
results.forEach { println(it.toCSV()) }
println("Query took $time ms")
在我的台式机上面会产生以下输出。
Schema<passenger_count: Utf8, MAX: FloatingPoint(DOUBLE)>
1,623259.86
2,492.5
3,350.0
4,500.0
5,760.0
6,262.5
7,78.0
8,87.0
9,92.0
0,36090.3
Query took 6740 ms
可以看到,结果与 Apache Spark 产生的结果一致。还可以看到,对于这种规模的输入,其性能还不错。由于 Apache Spark 针对“大数据”进行了优化,因此它很可能在更大的数据集上胜过 KQuery。
去掉查询优化器
现在删除这些查询优化,看看它们对性能有多大帮助。
val time = measureTimeMillis {
val ctx = ExecutionContext()
val df = ctx.csv("/mnt/nyctaxi/csv/yellow_tripdata_2019-01.csv", 1*1024)
.aggregate(
listOf(col("passenger_count")),
listOf(max(cast(col("fare_amount"), ArrowTypes.FloatType))))
val results = ctx.execute(df.logicalPlan())
results.forEach { println(it.toCSV()) }
println("Query took $time ms")
在我的台式机上面会产生以下输出。
1,623259.86
2,492.5
3,350.0
4,500.0
5,760.0
6,262.5
7,78.0
8,87.0
9,92.0
0,36090.3
Query took 36090 ms
结果是一样的,但是查询的执行时间大约是五倍。这清楚地表明了上一章中讨论的投影下推优化的好处。
本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。
Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。
SQL 支持
本章讨论的源代码可以在 KQuery 项目的 sql 模块中找到。
除了拥有手工编码逻辑计划的能力外,在某些情况下,直接写 SQL 会更方便。本章将建立一个 SQL 解析器和查询规划器,将 SQL 查询翻译成逻辑计划。
分词器
第一步是将 SQL 查询字符串转换成代表关键字、字面量、标识符和操作符的分词列表。
这是所有可能的分词的一个子集,但对当前来说已经足够了。
interface Token
data class IdentifierToken(val s: String) : Token
data class LiteralStringToken(val s: String) : Token
data class LiteralLongToken(val s: String) : Token
data class KeywordToken(val s: String) : Token
data class OperatorToken(val s: String) : Token
然后我们将需要一个分词器类。在这里展示全部代码并不特别有趣,完整的源代码可以在配套的 GitHub 仓库中找到。
class Tokenizer {
fun tokenize(sql: String): List<Token> {
// see github repo for code
}
}
给定输入 "SELECT a + b FROM c"
,我们期望输出如下:
listOf(
KeywordToken("SELECT"),
IdentifierToken("a"),
OperatorToken("+"),
IdentifierToken("b"),
KeywordToken("FROM"),
IdentifierToken("c")
)
Pratt 解析器
我们将根据 Vaughan R. Pratt 在 1973 年发表的 Top Down Operator Precedence 论文来手工编码一个 SQL 解析器。虽然有其他的方法来构建 SQL 解析器,比如使用解析器生成器和解析器组合器,但我发现 Pratt 的方法效果很好,它产生的代码高效、易于理解,而且易于调试。
下面是一个 Pratt 解析器的基本实现。在我看来,它有一种简洁的美。表达式解析是由一个简单的循环来完成的,它解析一个“前缀”表达式,后面是一个可选的 “后缀”表达式,并且一直这样做,直到优先级发生变化,使解析器发现它已经完成了对表达式的解析。当然,parsePrefix
和 parseInfix
的实现可以递归地回调到 parse
方法中,这是它变得非常强大的地方。
interface PrattParser {
/** Parse an expression */
fun parse(precedence: Int = 0): SqlExpr? {
var expr = parsePrefix() ?: return null
while (precedence < nextPrecedence()) {
expr = parseInfix(expr, nextPrecedence())
}
return expr
}
/** Get the precedence of the next token */
fun nextPrecedence(): Int
/** Parse the next prefix expression */
fun parsePrefix(): SqlExpr?
/** Parse the next infix expression */
fun parseInfix(left: SqlExpr, precedence: Int): SqlExpr
}
这个接口用一个新的类 SqlExpr
表示,它是对解析后的表达式的表示,并且在很大程度上是对逻辑计划中定义的表达式的一对一映射,但是对于二进制表达式,我们可以使用一个更通用的结构,其中运算符是一个字符串,而不是为我们将要支持的所有不同的二进制表达式创建单独的数据结构。
下面是一些 SqlExpr
实现的例子。
/** SQL Expression */
interface SqlExpr
/** Simple SQL identifier such as a table or column name */
data class SqlIdentifier(val id: String) : SqlExpr {
override fun toString() = id
}
/** Binary expression */
data class SqlBinaryExpr(val l: SqlExpr, val op: String, val r: SqlExpr) : SqlExpr {
override fun toString(): String = "$l $op $r"
}
/** SQL literal string */
data class SqlString(val value: String) : SqlExpr {
override fun toString() = "'$value'"
}
有了这些类,就可以用以下代码来表示表达式 foo = 'bar'
。
val sqlExpr = SqlBinaryExpr(SqlIdentifier("foo"), "=", SqlString("bar"))
解析 SQL 表达式
让我们通过这种方法来解析一个简单的数学表达式,如 1 + 2 * 3
。这个表达式由以下分词组成。
listOf(
LiteralLongToken("1"),
OperatorToken("+"),
LiteralLongToken("2"),
OperatorToken("*"),
LiteralLongToken("3")
)
我们需要创建一个 PrattParser
trait 的实现,并将分词传到它的构造函数中。这些标记被包装在一个 TokenStream
类中,该类提供了一些便捷的方法,如用于消费下一个分词的 next
,以及用于当我们想在不消费分词的情况下进行回溯的 peek
。
class SqlParser(val tokens: TokenStream) : PrattParser {
}
实现 nextPrecedence
方法很简单,因为我们在这里只有少量的分词有任意优先级,我们需要让乘法和除法运算符比加法和减法运算符有更高的优先级。请注意,这个方法返回的具体数字并不重要,因为它们只是用来进行比较的。有一个关于运算符的优先级的很好的参考资料可以在 PostgreSQL 文档
中找到。
override fun nextPrecedence(): Int {
val token = tokens.peek()
return when (token) {
is OperatorToken -> {
when (token.s) {
"+", "-" -> 50
"*", "/" -> 60
else -> 0
}
}
else -> 0
}
}
前缀解析器只需要知道如何解析数值字面量。
override fun parsePrefix(): SqlExpr? {
val token = tokens.next() ?: return null
return when (token) {
is LiteralLongToken -> SqlLong(token.s.toLong())
else -> throw IllegalStateException("Unexpected token $token")
}
}
中缀解析器只需要知道如何解析运算符就可以了。请注意,在解析完一个运算符后,这个方法会递归调用顶层解析方法来解析运算符后面的表达式(二进制表达式的右侧)。
override fun parseInfix(left: SqlExpr, precedence: Int): SqlExpr {
val token = tokens.peek()
return when (token) {
is OperatorToken -> {
tokens.next()
SqlBinaryExpr(left, token.s, parse(precedence) ?:
throw SQLException("Error parsing infix"))
}
else -> throw IllegalStateException("Unexpected infix token $token")
}
}
优先级逻辑可以通过解析数学表达式 1 + 2 * 3
和 1 * 2 + 3
来证明,它们应该分别被解析为 1 + (2 * 3)
和 (1 * 2) + 3
。
示例:解析 1 + 2 * 3
以下这些是分词和它们的优先级。
Tokens: [1] [+] [2] [*] [3]
Precedence: [0] [50] [0] [60] [0]
最后的结果正确地表示为 1 + (2 * 3)
的表达式。
SqlBinaryExpr(
SqlLong(1),
"+",
SqlBinaryExpr(SqlLong(2), "*", SqlLong(3))
)
解析 SELECT 语句
现在已经有能力解析一些简单的表达式了,下一步是扩展解析器以支持将 SELECT 语句解析为具体语法树(CST)。请注意,在其他解析方法中,例如使用像 ANTLR 这样的解析器生成器,有一个中间阶段,称为抽象语法树(AST),然后需要翻译成具体语法树,但是 Pratt 解析器方法会直接将分词转换为 CST 。
这里有一个 CST 的示例,它可以表示一个简单的带有投影和选择的单表查询。后面的章节会进行扩展,支持更复杂的查询。
data class SqlSelect(
val projection: List<SqlExpr>,
val selection: SqlExpr,
val tableName: String) : SqlRelation
SQL 查询规划器
SQL 查询规划器将 SQL 查询树翻译成逻辑计划。由于 SQL 语言的灵活性,这比把逻辑计划翻译成物理计划要难得多。例如下面这个简单的查询。
SELECT id, first_name, last_name, salary/12 AS monthly_salary
FROM employee
WHERE state = 'CO' AND monthly_salary > 1000
虽然这对阅读查询的人来说是很直观的,但是查询的选择部分(WHERE子句)用到了一个表达式(state
),这个表达式不包括在投影的输出中,所以显然需要在投影之前应用,但是也用到了另一个表达式(salary/12 AS monthly_salary
),这个表达式只有在应用了投影之后才能使用。在 GROUP BY
、HAVING
和 ORDER BY
子句中面临类似的问题。
对这个问题有多种解决办法。一种方法是把这个查询翻译成下面的逻辑计划,把选择表达式分成两步,一个在投影之前,一个在投影之后。然而,这只是因为选择表达式是一个共轭谓词(只有当所有部分都是真的时候,表达式才是真的),对于更复杂的表达式,这种方法可能不可行。如果表达式是 state = 'CO' OR monthly_salary > 1000
,那么我们就不能这样做。
Filter: #monthly_salary > 1000
Projection: #id, #first_name, #last_name, #salary/12 AS monthly_salary
Filter: #state = 'CO'
Scan: table=employee
一个更简单、更通用的方法是将所有需要的表达式添加到投影中,这样就可以在投影后应用选择,然后通过将输出包装在另一个投影中来删除任何添加的列。
Projection: #id, #first_name, #last_name, #monthly_salary
Filter: #state = 'CO' AND #monthly_salary > 1000
Projection: #id, #first_name, #last_name, #salary/12 AS monthly_salary, #state
Scan: table=employee
值得注意的是,我们将在后面的章节中建立一个“谓词下推”的查询优化规则,它能够优化这个计划,并将谓词的 state = 'CO'
部分在计划中进一步下推,使其位于投影之前。
翻译 SQL 表达式
将 SQL 表达式转化为逻辑表达式是相当简单的,正如本例代码所演示的那样。
private fun createLogicalExpr(expr: SqlExpr, input: DataFrame) : LogicalExpr {
return when (expr) {
is SqlIdentifier -> Column(expr.id)
is SqlAlias -> Alias(createLogicalExpr(expr.expr, input), expr.alias.id)
is SqlString -> LiteralString(expr.value)
is SqlLong -> LiteralLong(expr.value)
is SqlDouble -> LiteralDouble(expr.value)
is SqlBinaryExpr -> {
val l = createLogicalExpr(expr.l, input)
val r = createLogicalExpr(expr.r, input)
when(expr.op) {
// comparison operators
"=" -> Eq(l, r)
"!=" -> Neq(l, r)
">" -> Gt(l, r)
">=" -> GtEq(l, r)
"<" -> Lt(l, r)
"<=" -> LtEq(l, r)
// boolean operators
"AND" -> And(l, r)
"OR" -> Or(l, r)
// math operators
"+" -> Add(l, r)
"-" -> Subtract(l, r)
"*" -> Multiply(l, r)
"/" -> Divide(l, r)
"%" -> Modulus(l, r)
else -> throw SQLException("Invalid operator ${expr.op}")
}
}
else -> throw new UnsupportedOperationException()
}
}
规划 SELECT
如果我们只想支持在选择中引用的所有列也存在于投影中的情况,我们可以用一些非常简单的逻辑来建立查询计划。
fun createDataFrame(select: SqlSelect, tables: Map<String, DataFrame>) : DataFrame {
// get a reference to the data source
var df = tables[select.tableName] ?:
throw SQLException("No table named '${select.tableName}'")
val projectionExpr = select.projection.map { createLogicalExpr(it, df) }
if (select.selection == null) {
// apply projection
return df.select(projectionExpr)
}
// apply projection then wrap in a selection (filter)
return df.select(projectionExpr)
.filter(createLogicalExpr(select.selection, df))
}
然而,由于选择可以同时引用投影的输入和投影的输出,需要用一个中间投影创建一个更复杂的计划。第一步是确定哪些列是由选择过滤器表达式引用的。为了做到这一点,需要使用访问者模式来遍历表达式树,并建立一个列名的可变集合。
下面遍历表达式树的工具方法。
private fun visit(expr: LogicalExpr, accumulator: MutableSet<String>) {
when (expr) {
is Column -> accumulator.add(expr.name)
is Alias -> visit(expr.expr, accumulator)
is BinaryExpr -> {
visit(expr.l, accumulator)
visit(expr.r, accumulator)
}
}
}
有了这些以后,就可以编写以下代码,将 SELECT 语句转换成有效的逻辑计划。这个代码样本并不完美,可能包含了一些边缘情况的错误,即数据源中的列和别名表达式之间存在名称冲突,但为保持代码简单,暂时忽略这一点。
fun createDataFrame(select: SqlSelect, tables: Map<String, DataFrame>) : DataFrame {
// get a reference to the data source
var df = tables[select.tableName] ?:
throw SQLException("No table named '${select.tableName}'")
// create the logical expressions for the projection
val projectionExpr = select.projection.map { createLogicalExpr(it, df) }
if (select.selection == null) {
// if there is no selection then we can just return the projection
return df.select(projectionExpr)
}
// create the logical expression to represent the selection
val filterExpr = createLogicalExpr(select.selection, df)
// get a list of columns references in the projection expression
val columnsInProjection = projectionExpr
.map { it.toField(df.logicalPlan()).name}
.toSet()
// get a list of columns referenced in the selection expression
val columnNames = mutableSetOf<String>()
visit(filterExpr, columnNames)
// determine if the selection references any columns not in the projection
val missing = columnNames - columnsInProjection
// if the selection only references outputs from the projection we can
// simply apply the filter expression to the DataFrame representing
// the projection
if (missing.size == 0) {
return df.select(projectionExpr)
.filter(filterExpr)
}
// because the selection references some columns that are not in the
// projection output we need to create an interim projection that has
// the additional columns and then we need to remove them after the
// selection has been applied
return df.select(projectionExpr + missing.map { Column(it) })
.filter(filterExpr)
.select(projectionExpr.map {
Column(it.toField(df.logicalPlan()).name)
})
}
规划聚合查询
正如你所看到的,SQL 查询规划器是相对复杂的,解析聚合查询的代码也相当复杂。如果你有兴趣了解更多,请参考源代码。
本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。
Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。
并行查询执行
到目前为止,我们一直在使用单线程来执行对单个文件的查询。这种方法的可扩展性不强,因为对于较大的文件或多个文件而言,查询的运行时间会更长。下一步是实现分布式查询执行,以便查询执行可以利用多个 CPU 核心和多个服务器。
分布式查询执行的最简单形式是利用线程在单个节点上的多个 CPU 核心进行并行查询执行。
纽约出租车数据集已经进行了方便使用的分区,因为每年每个月都有一个 CSV 文件,例如 2019 年的数据集有 12 个分区。最直接的并行查询执行的方法是并行地在每个分区使用一个线程来执行相同的查询,然后合并结果。假设这段代码是在一台有六个 CPU 核心、支持超线程的计算机上运行。在假设每个月的数据量相似的情况下,这 12 个查询的执行时间应该与在一个单线程上运行其中一个查询的时间相同。
下面是一个在 12 个分区中并行运行一个聚合 SQL 查询的例子。这个例子是用Kotlin 协程实现的,而不是直接使用线程。
这个例子的源代码可以在 KQuery GitHub 仓库的 jvm/examples/src/main/kotlin/ParallelQuery.kt
中找到。
让我们从单线程代码开始,对每个分区运行一个查询。
fun executeQuery(path: String, month: Int, sql: String): List<RecordBatch> {
val monthStr = String.format("%02d", month);
val filename = "$path/yellow_tripdata_2019-$monthStr.csv"
val ctx = ExecutionContext()
ctx.registerCsv("tripdata", filename)
val df = ctx.sql(sql)
return ctx.execute(df).toList()
}
有了这些,就可以编写以下代码,并行地在这 12 个数据分区中运行这个查询。
val start = System.currentTimeMillis()
val deferred = (1..12).map {month ->
GlobalScope.async {
val sql = "SELECT passenger_count, " +
"MAX(CAST(fare_amount AS double)) AS max_fare " +
"FROM tripdata " +
"GROUP BY passenger_count"
val start = System.currentTimeMillis()
val result = executeQuery(path, month, sql)
val duration = System.currentTimeMillis() - start
println("Query against month $month took $duration ms")
result
}
}
val results: List<RecordBatch> = runBlocking {
deferred.flatMap { it.await() }
}
val duration = System.currentTimeMillis() - start
println("Collected ${results.size} batches in $duration ms")
下面是在一台 24 核心的台式机上运行这个示例的输出结果。
Query against month 8 took 17074 ms
Query against month 9 took 18976 ms
Query against month 7 took 20010 ms
Query against month 2 took 21417 ms
Query against month 11 took 21521 ms
Query against month 12 took 22082 ms
Query against month 6 took 23669 ms
Query against month 1 took 23735 ms
Query against month 10 took 23739 ms
Query against month 3 took 24048 ms
Query against month 5 took 24103 ms
Query against month 4 took 25439 ms
Collected 12 batches in 25505 ms
如你所见,总时长与最慢查询的时间差不多。
尽管我们已经成功地对分区执行了聚合查询,但我们的结果是一个具有重复值的数据批次的列表。例如,很可能在每个分区中都有一个 passenger_count=1
的结果。
合并结果
对于由投影和选择运算符组成的简单查询,并行查询的结果可以合并(类似于SQL的 UNION ALL
操作),并且不需要进一步处理。涉及聚合、排序或连接的更复杂的查询则需要在并行查询的结果上运行一个二级查询来合并结果。术语 “map” 和 “reduce” 经常被用来解释这个两步过程。“map” 步骤指的是在各分区中并行运行一个查询,而 “reduce” 步骤指的是将结果合并为单一的结果。
对于这个特定的例子,现在有必要运行一个二级聚合查询,几乎与针对分区执行的聚合查询相同。一个区别是,第二个查询可能需要应用不同的聚合函数。对于聚合函数 min
、max
和 sum
,在 map 和 reduce 步骤中使用相同的操作,以获得这些 min 的最小值或 sum 的总和。对于 count 表达式,我们不希望看到 count 的数量。我们希望看到的是计数的总和。
val sql = "SELECT passenger_count, " +
"MAX(max_fare) " +
"FROM tripdata " +
"GROUP BY passenger_count"
val ctx = ExecutionContext()
ctx.registerDataSource("tripdata", InMemoryDataSource(results.first().schema, results))
val df = ctx.sql(sql)
ctx.execute(df).forEach { println(it) }
这产生了最终的结果集:
1,671123.14
2,1196.35
3,350.0
4,500.0
5,760.0
6,262.5
7,80.52
8,89.0
9,97.5
0,90000.0
更智能的分区
虽然在这个例子中,每个文件使用一个线程的策略很有效,但它并不能作为分区的通用方法。如果一个数据源有数以千计的小分区,每个分区启动一个线程将是低效的。更好的方法是由查询规划器决定如何在指定数量的工作线程(或执行器)之间分享可用数据。
一些文件格式中已经有了自然的分区方案。例如,Apache Parquet 文件由多个 “行组”组成,包含成批的列数据。一个查询规划器可以检查可用的 Parquet 文件,建立一个行组列表,然后在固定数量的线程或执行器中进行调度,以读取这些行组。
甚至有可能将这种技术应用于非结构化文件,如 CSV 文件,但这并非易事。检查文件大小并将文件分成同等大小的块很容易,但一条记录很可能跨越两个块,因此有必要从一个边界向后或向前读取,以找到记录的开始或结束。寻找换行符是不够的,因为换行符经常出现在记录中,也被用来给记录划界。通常的做法是在处理管道的早期将 CSV 文件转换成结构化格式,如 Parquet,以提高后续处理的效率。
分区键
解决这个问题的一种办法是将文件放在目录中,使用由键值对组成的目录名来指定内容。
例如,我们可以按以下方式组织文件:
/mnt/nyxtaxi/csv/year=2019/month=1/tripdata.csv
/mnt/nyxtaxi/csv/year=2019/month=2/tripdata.csv
...
/mnt/nyxtaxi/csv/year=2019/month=12/tripdata.csv
基于这种结构,查询规划器现在可以实现一种“谓词下推”的形式,以限制物理查询计划中包含的分区数量。这种方法通常被称为“分区裁剪”。
并行连接
当用单线程执行内部连接时,一个简单的方法是将连接的一侧加载到内存中,然后扫描另一侧,对存储在内存中的数据进行查找。对于连接的一边可以装入内存的情况,这种经典的哈希连接算法是有效的。
它的并行版本被称为分区哈希连接或并行哈希连接。它涉及到根据连接键对两个输入进行分区,并在每一对输入分区上执行一个经典哈希连接。
本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。
Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。
分布式查询执行
上一节关于并行查询执行的内容涵盖了一些基本的概念,比如分区,我们将在这一节中继续深入讨论这些概念。
在某种程度上过度简化了分布式查询执行的概念,其目标是能够创建一个物理查询计划,定义如何将工作分配给集群中的一些“执行器”。分布式查询计划通常包含新的运算符,描述在查询执行过程中各执行器之间如何交换数据。
让我们使用上一章平行查询执行中的 SQL 查询例子,看看这个查询的分布式计划的含义。
SELECT passenger_count,
MAX(max_fare)
FROM tripdata
GROUP BY passenger_count
我们可以在所有的数据分区上并行地执行这个查询,集群中的每个执行器都处理这些分区的一个子集。然而,我们需要把所有产生的聚合数据合并到一个节点上,然后应用最终的聚合查询,这样我们就可以得到一个没有重复分组键(本例中的 passenger_count
)的单一结果集。下面是一个可能的逻辑查询计划,用于表示这一点。注意新的 Exchange
操作符,它表示执行器之间的数据交换。交换的物理计划可以通过将中间结果写入共享存储来实现,也可以通过将数据直接发送到其他执行器来实现。
HashAggregate: groupBy=[passenger_count], aggr=[MAX(max_fare)]
Exchange:
HashAggregate: groupBy=[passenger_count], aggr=[MAX(max_fare)]
Scan: tripdata.parquet
新的 Exchange
操作符与我们迄今为止讨论过的其他操作符有根本的不同,因为我们不能只是在一个节点上建立一个操作符树并开始执行它。现在的查询需要跨执行器的协调,这意味着我们现在需要建立一个调度器。
分布式查询调度
在上层,分布式查询调度器的概念并不复杂。调度器需要检查整个查询,并将其分解为可以单独执行的阶段(通常是在各执行器之间并行执行),然后根据集群中的可用资源来安排这些阶段的执行。一旦每个查询阶段完成,就可以安排任何后续的附属查询阶段。这个过程重复进行,直到所有的查询阶段都被执行。
调度器也可以负责管理集群中的计算资源,这样就可以根据需要启动额外的执行器来处理查询负载。
在本章的剩余部分,我们将参照 Ballista 和该项目中正在实施的设计,讨论以下主题。
- 管理集群中的计算资源
- 序列化查询计划并在执行器之间交换查询计划
- 在执行器之间交换中间结果
- 优化分布式查询
管理计算资源
合理的起点是决定如何在网络环境中部署和管理执行器。现在有许多资源调度和协调工具,包括 YARN、Mesos 和 Kubernetes。也有一些项目旨在为这些资源调度器提供一个抽象,如 Apache YuniKorn。
Ballista 使用 Kubernetes 是出于以下原因:
所有主要的云供应商都对 Kubernetes 有最好的支持。亚马逊有 Elastic Kubernetes Service(EKS),微软有 Azure Kubernetes Service(AKS),谷歌有谷歌 Kubernetes Engine(GKE)。
Kubernetes 背后有巨大的动力,虽然有一个陡峭的学习曲线和一些原始的边缘,但相信随着时间的推移,它将变得更加容易。 MicroK8s 和 Minikube 等项目使得在本地开发机器上创建一个 Kubernetes 集群变得容易。 Kubernetes 是一个开源框架,使用声明式配置来管理容器化的工作负载和服务。这意味着,为了在 Kubernetes 中实现某些功能,我们只需要在 YAML 文件中声明所需的状态,并将其提交给 Kubernetes API。然后,Kubernetes 将执行必要的行动以实现所需的状态。
Kubernetes 使用术语 “job” 来表示运行的容器,其目的是启动、执行一些处理,然后停止。术语 “pod” 用于打算保持运行直到被要求停止的容器。
下面是一个运行作业的 YAML 文件的例子。
apiVersion: batch/v1
kind: Job
metadata:
name: parallel-aggregate
spec:
template:
spec:
containers:
- name: parallel-aggregate
image: ballistacompute/parallel-aggregate-rs
这个 YAML 文件可以使用 kubectl 命令行工具提交至 Kubernetes 集群。
kubectl apply -f parallel-aggregate-rs.yaml
下面是一个 YAML 文件的例子,用于创建一个由 12 个 Ballista Rust 执行器组成的集群。这里使用了 Kubernetes 的“StatefulSet”概念。请求的是 12 个副本,每个实例将在这个范围内被分配一个唯一的副本编号。
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: ballista
spec:
serviceName: "ballista"
replicas: 12
selector:
matchLabels:
app: ballista
template:
metadata:
labels:
app: ballista
ballista-cluster: ballista
spec:
containers:
- name: ballista
image: ballistacompute/rust:0.2.0-alpha-2
Kubernetes 使服务发现变得简单。在 Kubernetes 集群中运行的每个容器可以检查环境变量,以找到在同一集群中运行的其他服务的主机和端口。也可以配置 Kubernetes 为服务创建 DNS 条目。
序列化查询计划
查询调度器需要将整个查询计划的片段发送给执行器来执行。正如我们在前几章看到的,逻辑和物理查询计划是分层的数据结构,定义了执行查询所需的步骤。通过下面这个逻辑查询计划的例子来回忆一下。
Projection: #id, #first_name, #last_name, #state, #salary
Filter: #state = 'CO'
Scan: employee; projection=None
序列化查询计划有很多选择,以便它可以在进程之间传递。许多查询引擎选择了使用编程语言本地序列化支持的策略,如果没有要求能够在不同的编程语言之间交换查询计划,这是一个合适的选择,这通常是最简单的实现机制。
然而,使用一种与编程语言无关的序列化格式也有好处。Ballista 使用 Google Protocol Buffers 格式来定义查询计划。该项目通常被缩写为 “protobuf”。
下面是 Ballista protobuf 关于查询计划的定义的一部分。
完整的源代码可以在 Ballista github 仓库的 proto/ballista.proto
找到。
message LogicalPlanNode {
LogicalPlanNode input = 1;
FileNode file = 10;
ProjectionNode projection = 20;
SelectionNode selection = 21;
LimitNode limit = 22;
AggregateNode aggregate = 23;
}
message FileNode {
string filename = 1;
Schema schema = 2;
repeated string projection = 3;
}
message ProjectionNode {
repeated LogicalExprNode expr = 1;
}
message SelectionNode {
LogicalExprNode expr = 2;
}
message AggregateNode {
repeated LogicalExprNode group_expr = 1;
repeated LogicalExprNode aggr_expr = 2;
}
message LimitNode {
uint32 limit = 1;
}
protobuf 项目提供了生成特定语言源代码的工具,用于序列化和反序列化数据。
序列化数据
数据也必须被序列化,因为它在客户端和执行器之间以及执行器之间流动。
Apache Arrow 提供了一种 IPC(进程间通信)格式,用于在进程间交换数据。由于 Arrow 提供的标准化内存布局,原始字节可以直接在内存和输入 / 输出设备(磁盘、网络等)之间传输,而没有序列化开销。这实际上是一个零拷贝操作,因为数据不需要从其内存格式转换为单独的序列化格式。
然而,关于数据的元数据,如表结构(列名和数据类型)确实需要使用 Google Flatbuffers 进行编码。这种元数据很小,通常每个结果集或每个批次都会被序列化一次,所以开销很小。
使用 Arrow 的另一个好处是,它提供了不同编程语言之间非常有效的数据交换。
Apache Arrow IPC 定义了数据编码格式,但没有定义交换数据的机制。例如,Arrow IPC 可以用来将数据从 JVM 语言通过 JNI 传输到 C 或 Rust。
选择一种协议
现在我们已经为查询计划和数据选择了序列化格式,下一个问题是我们如何在分布式进程之间交换这些数据。
Apache Arrow 提供的 Flight 协议正是为了这个目的。Flight 是一个新的通用的客户端-服务器框架,用于简化大型数据集在网络接口上的高性能传输。
Arrow 的 Flight 库提供了一个开发框架,用于实现可以发送和接收数据流的服务。一个 Flight 服务器支持几种基本的请求。
- Handshake:一个简单的请求,以确定客户端是否被授权,在某些情况下,可以建立一个由实现方定义的会话令牌,用于之后的请求。
- ListFlights:返回一个可用数据流的列表。
- GetSchema:返回一个数据流的表结构。
- GetFlightInfo:返回一个感兴趣的数据集的“访问计划”,可能需要消费多个数据流。这个请求可以接受包含自定义序列化命令的内容,例如你的特定应用参数。
- DoGet:向客户端发送数据流。
- DoPut:从客户端接收数据流。
- DoAction:执行特定实现的动作并返回任意结果,即一个通用的函数调用。
- ListActions:返回一个可用的动作类型的列表。
GetFlightInfo
方法可以用来编译查询计划,并返回接收结果的必要信息,例如,在每个执行器上调用 DoGet
然后开始接收查询的结果。
流式处理
尽快提供查询结果,并以数据流的方式发送到处理该数据的下一个进程是非常重要的,否则将导致不可接受的延迟,因为每个操作都必须等待前一个操作的完成。
然而,有些操作需要在产生任何输出之前收到所有的输入数据。一个排序操作就是一个很好的例子。在收到整个数据集之前,不可能完全对一个数据集进行排序。这个问题可以通过增加分区的数量来缓解,从而对大量分区进行并行排序,然后使用合并运算符有效地合并排序的批次。
自定义代码
经常需要运行自定义代码作为分布式查询或计算的一部分。对于单一语言的查询引擎,通常可以使用语言内置的序列化机制,在查询执行时通过网络传输这些代码,这在开发过程中非常方便。另一种方法是将编译好的代码发布到资源库中,这样就可以在运行时将其下载到集群中。对于基于 JVM 的系统,可以使用 maven 资源库。一个更通用的方法是将所有运行时的依赖项打包成一个 Docker 镜像。
查询计划需要提供必要的信息,以便在运行时加载用户代码。对于基于 JVM 的系统,这可能是一个 classpath 和一个类名。对于基于 C 语言的系统,这可能是一个共享对象的路径。在任何一种情况下,用户代码都需要实现一些已知的 API。
分布式查询优化
分布式查询执行与单个主机上的并行查询执行相比有很多开销,只有在有好处的情况下才应该使用。我建议阅读论文 Scalability! But at what COST 以了解关于这个话题的一些有趣的观点。
另外,有很多方法可以分发同一个查询,那么我们如何知道该使用哪一种呢?
一个答案是建立一种机制来确定执行一个特定查询计划的成本,然后为这个给定的问题创建所有可能的查询计划组合的一些子集,并确定哪一个是最有效的。
在计算一个操作的成本时,涉及到很多因素,而且有不同的资源成本和限制。
- 内存:我们通常关注的是内存的可用性而不是性能。在内存中处理数据要比读写磁盘快好几个数量级。
- CPU:对于可并行化的工作负载,更多的 CPU 核心意味着更好的吞吐量。
- GPU:与 CPU 相比,某些操作在 GPU 上的速度要快几个数量级。
- 磁盘:磁盘的读写速度是有限的,云计算供应商通常会限制每秒的 I/O 操作数(IOPS)。不同类型的磁盘有不同的性能特点(旋转磁盘与 SSD 与 NVMe)。
- 网络:分布式查询的执行涉及节点之间的数据流。网络基础设施有一个吞吐量的限制。
- **分布式存储。源数据存储在分布式文件系统(HDFS)或对象存储(Amazon S3、Azure Blob Storage)中是非常常见的,在分布式存储和本地文件系统之间传输数据是有成本的。
- 数据大小:数据的大小很重要。当在两个表之间进行连接,并且需要通过网络传输数据时,最好是传输两个表中较小的那个。如果其中一个表可以装在内存中,那么可以使用更有效的连接操作。
- 货币成本:如果一个查询用 3 倍的成本提升了 10% 的计算速度,这值得吗?当然,这个问题最好由用户来回答。货币成本通常是通过限制可用计算资源的数量来控制的。
如果提前知道足够的数据信息,比如数据有多大,查询中使用的连接键分区的基数,分区的数量等等,那么查询成本可以通过算法预先计算出来。这都取决于被查询的数据集的某些统计数据。
另一种方法是直接开始运行查询,让每个执行器根据它收到的输入数据进行调整。Apache Spark 3.0.0 引入了一个自适应查询执行功能,就是这样做的。
连接重排序
在生成查询计划时,查询优化器有时可以根据可用的统计数据来确定最佳的连接顺序。然而,一旦我们开始执行查询,我们往往可以得到更准确的统计数据,并可以利用这个机会进一步调整查询。例如,如果查询包含一个哈希连接,那么我们希望使用连接的较小一侧作为构建侧。
缓存中间结果
同一个查询每天被执行多次,参数不同,但执行计划的某些步骤保持不变,这是非常常见的。坚持执行查询的中间结果,以避免重复相同的操作,可能是有益的。如果将数据缓存在快速的本地存储上,如 NVMe 驱动器,也会有好处,从而消除了从分布式存储中重复获取数据的需要。Dremio 的 Data Lake Engine 和Snowflake 的 Cloud Data Platform 是利用这些技术的复杂查询引擎的好例子。
当然,这种复杂程度带来了新的挑战,比如在规划查询的运行位置时要考虑到数据的地域性。
物化视图
另一种优化技术,特别是对于不断变化的数据,是建立物化视图,可以随着新数据的到来不断更新,以避免从头开始建立视图的成本。Materialize 是这个概念的好例子。
从分布式查询失败中恢复
分布式查询的执行涉及到很多移动部件,在查询执行过程中很有可能会出现故障,所以有某种恢复机制是很重要的。
最简单的解决方案是等待查询的其余部分完成或失败,然后再次尝试整个查询,但这显然非常昂贵,而且让用户感到沮丧。
Kubernetes 的好处之一是它会自动重新启动失败的 pod,但仍然需要查询调度器来监控执行器,以知道何时执行查询的下一阶段。
本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。
Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。
测试
查询引擎很复杂,很容易在不经意间引入可能导致查询返回不正确结果的细微错误,因此必须有严格的测试。
单元测试
一个很好的起点是为各个运算符和表达式编写单元测试,断言它们对给定的输入产生正确的输出。覆盖到错误的情况也是至关重要的。
下面是一些关于编写单元测试时需要考虑的问题的建议。
- 如果使用了意外的数据类型会发生什么?例如,在输入的字符串上计算
SUM
。 - 测试应该涵盖边缘情况,如对数字数据类型使用最小值和最大值,对浮点类型使用NaN(非数字),以确保它们被正确处理。
- 需要针对下溢和上溢的场景进行测试。例如,当两个长(64位)整数类型相乘时会发生什么?
- 测试还应该确保正确处理空值。
在编写这些测试时,重要的是能够用任意的数据构造记录批次和列向量,作为运算符和表达式的输入。下面是这样一个实用方法的例子。
private fun createRecordBatch(schema: Schema,
columns: List<List<Any?>>): RecordBatch {
val rowCount = columns[0].size
val root = VectorSchemaRoot.create(schema.toArrow(),
RootAllocator(Long.MAX_VALUE))
root.allocateNew()
(0 until rowCount).forEach { row ->
(0 until columns.size).forEach { col ->
val v = root.getVector(col)
val value = columns[col][row]
when (v) {
is Float4Vector -> v.set(row, value as Float)
is Float8Vector -> v.set(row, value as Double)
...
}
}
}
root.rowCount = rowCount
return RecordBatch(schema, root.fieldVectors.map { ArrowFieldVector(it) })
}
下面是一个“大于等于”(>=)表达式的单元测试示例,该表达式针对一个包含两列双精度浮点值的记录批次进行求值。
@Test
fun `gteq doubles`() {
val schema = Schema(listOf(
Field("a", ArrowTypes.DoubleType),
Field("b", ArrowTypes.DoubleType)
))
val a: List<Double> = listOf(0.0, 1.0,
Double.MIN_VALUE, Double.MAX_VALUE, Double.NaN)
val b = a.reversed()
val batch = createRecordBatch(schema, listOf(a,b))
val expr = GtEqExpression(ColumnExpression(0), ColumnExpression(1))
val result = expr.evaluate(batch)
assertEquals(a.size, result.size())
(0 until result.size()).forEach {
assertEquals(if (a[it] >= b[it]) 1 else 0, result.getValue(it))
}
}
集成测试
一旦单元测试到位,下一步就是编写集成测试,执行由多个运算符和表达式组成的查询,并断言它们产生预期的输出。
对查询引擎的集成测试有几种流行的方法。
- 强制性测试:硬编码的查询和预期结果,要么写成代码,要么存储为包含查询和结果的文件。
- 比较测试:这种方法包括对另一个(受信任的)查询引擎执行查询,并断言两个查询引擎产生相同的结果。
- 模糊测试:生成随机运算符和表达式树,以捕获边缘情况,并获得全面的测试覆盖。
模糊测试
查询引擎的大部分复杂性来自于这样一个事实:由于运算符和表达式树的嵌套性质,运算符和表达式可以通过无限的组合,手工编码测试查询不太可能足够全面。
Fuzzing 是一种产生随机输入数据的技术。当应用于查询引擎时,这意味着创建随机查询计划。
下面是一个针对 DataFrame 创建随机表达式的例子。这是一个递归方法,可以产生深度嵌套的表达式树,所以建立一个最大深度机制是很重要的。
fun createExpression(input: DataFrame, depth: Int, maxDepth: Int): LogicalExpr {
return if (depth == maxDepth) {
// return a leaf node
when (rand.nextInt(4)) {
0 -> ColumnIndex(rand.nextInt(input.schema().fields.size))
1 -> LiteralDouble(rand.nextDouble())
2 -> LiteralLong(rand.nextLong())
3 -> LiteralString(randomString(rand.nextInt(64)))
else -> throw IllegalStateException()
}
} else {
// binary expressions
val l = createExpression(input, depth+1, maxDepth)
val r = createExpression(input, depth+1, maxDepth)
return when (rand.nextInt(8)) {
0 -> Eq(l, r)
1 -> Neq(l, r)
2 -> Lt(l, r)
3 -> LtEq(l, r)
4 -> Gt(l, r)
5 -> GtEq(l, r)
6 -> And(l, r)
7 -> Or(l, r)
else -> throw IllegalStateException()
}
}
}
下面是用这个方法生成的表达式的例子。请注意,这里的列引用是用哈希之后的索引表示的,例如,#1 代表索引为 1 的列。这个表达式几乎肯定是无效的(取决于查询引擎的实现),这在使用模糊引擎时是可以预期的。这仍然是有价值的,因为它将测试到一些错误条件,这在手动编写测试时不会被覆盖。
#5 > 0.5459397414890019 < 0.3511239641785846 OR 0.9137719758607572 > -6938650321297559787 < #0 AND #3 < #4 AND 'qn0NN' OR '1gS46UuarGz2CdeYDJDEW3Go6ScMmRhA3NgPJWMpgZCcML1Ped8haRxOkM9F' >= -8765295514236902140 < 4303905842995563233 OR 'IAseGJesQMOI5OG4KrkitichlFduZGtjXoNkVQI0Alaf2ELUTTIci' = 0.857970478666058 >= 0.8618195163699196 <= '9jaFR2kDX88qrKCh2BSArLq517cR8u2' OR 0.28624225053564 <= 0.6363627130199404 > 0.19648131921514966 >= -567468767705106376 <= #0 AND 0.6582592932801918 = 'OtJ0ryPUeSJCcMnaLngBDBfIpJ9SbPb6hC5nWqeAP1rWbozfkPjcKdaelzc' >= #0 >= -2876541212976899342 = #4 >= -3694865812331663204 = 'gWkQLswcU' != #3 > 'XiXzKNrwrWnQmr3JYojCVuncW9YaeFc' >= 0.5123788261193981 >= #2
在创建逻辑查询计划时可以采取类似的方法。
fun createPlan(input: DataFrame,
depth: Int,
maxDepth: Int,
maxExprDepth: Int): DataFrame {
return if (depth == maxDepth) {
input
} else {
// recursively create an input plan
val child = createPlan(input, depth+1, maxDepth, maxExprDepth)
// apply a transformation to the plan
when (rand.nextInt(2)) {
0 -> {
val exprCount = 1.rangeTo(rand.nextInt(1, 5))
child.project(exprCount.map {
createExpression(child, 0, maxExprDepth)
})
}
1 -> child.filter(createExpression(input, 0, maxExprDepth))
else -> throw IllegalStateException()
}
}
}
下面是这个代码产生的逻辑查询计划的示例。
Filter: 'VejBmVBpYp7gHxHIUB6UcGx' OR 0.7762591612853446
Filter: 'vHGbOKKqR' <= 0.41876514212913307
Filter: 0.9835090312561898 <= 3342229749483308391
Filter: -5182478750208008322 < -8012833501302297790
Filter: 0.3985688976088563 AND #1
Filter: #5 OR 'WkaZ54spnoI4MBtFpQaQgk'
Scan: employee.csv; projection=None
这种直接的模糊处理方法会产生很高比例的无效计划。可以对其进行改进,通过增加更多的上下文信息来减少创建无效逻辑计划和表达式的风险。例如,生成 AND 表达式可以生成返回布尔结果的左表达式和右表达式。然而,只创建正确的计划是有危险的,因为它可能限制测试覆盖率。理想情况下,应该可以用产生具有不同特征的查询计划的规则来配置模糊引擎。
本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。
Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。
基准测试
每个查询引擎在性能、可扩展性和资源要求方面都是独特的,往往有不同的权衡。拥有良好的基准测试以了解性能和伸缩性的特点是很重要的。
衡量性能
性能通常是最简单的测量特征,通常指的是执行一个特定操作所需的时间。例如,可以建立基准来测量特定查询或查询类别的性能。
性能测试通常包括多次执行查询并测量耗费的时间。
衡量伸缩性
伸缩性可能是一个有过多含义的术语,有许多不同类型的伸缩性。术语伸缩性通常指的是性能如何随着影响性能的某些变量的不同值而变化。
一个例子是,当查询 10GB 的数据与 100GB 或 1TB 的数据时,随着总数据量的增加而测量伸缩性,以发现性能是如何受到影响的。一个常见的目标是展示线性伸缩性,这意味着查询 100GB 的数据所需的时间应该是查询 10GB 数据的10倍。线性伸缩性使用户很容易推理出预期行为。
影响性能的其他变量的例子包括:
- 并发用户、请求或查询的数量。
- 数据分区的数量。
- 物理磁盘的数量。
- CPU 核的数量。
- 节点的数量。
- 可用 RAM 数量。
- 硬件类型(例如,树莓派与桌面)。
并发
当根据并发请求的数量来衡量伸缩性时,我们通常对吞吐量(每段时间内执行的查询总数)更感兴趣,而不是单个查询的持续时间,尽管我们通常也会收集这一信息。
自动化
运行基准测试通常非常耗时,因此自动化是必不可少的,这样可以经常运行基准测试,也许是每天一次或每周一次,这样就能及早发现任何性能回退。
自动化对于确保一致地执行基准测试,以及收集结果和分析结果时可能需要的所有相关细节也很重要。
以下是执行基准测试时应收集的数据类型的一些示例。
硬件配置
- 硬件类型
- CPU核心的数量
- 可用的内存和磁盘空间
- 操作系统名称和版本
环境
- 环境变量(注意不要泄露秘钥)
基准配置
- 所用基准测试软件的版本
- 被测软件的版本
- 任何配置参数或文件
- 正在查询的数据文件的文件名
- 数据文件大小和校验和
- 执行查询的细节
基准测试结果
- 基准测试开始的日期 / 时间
- 每个查询的开始时间和结束时间
- 任何失败查询的错误信息
比较基准测试
在软件的不同版本之间比较基准测试是很重要的,这样性能特征的变化就很明显,可以进一步调查。基准测试产生了大量的数据,这些数据往往难以手工比较,因此,建立工具来帮助这一过程是有益的。
与其直接比较两组性能数据,工具可以对数据执行 “diff” 操作,并显示同一基准的两个或多个运行之间的百分比差异。能够制作显示多个基准测试运行的图表也很有用。
发布基准测试结果
下面是一些真实的基准测试结果的例子,比较了 Ballista 中 Rust 和 JVM 执行器的查询执行时间,与 Apache Spark 相比。虽然从这个数据中可以看出 Rust 执行器的表现很好,但通过制作图表可以更好地表现其优势。
CPU Cores | Ballista Rust | Ballista JVM | Apache Spark |
---|---|---|---|
3 | 21.431 | 51.143 | 56.557 |
6 | 9.855 | 26.002 | 30.184 |
9 | 6.51 | 24.435 | 26.401 |
12 | 5.435 | 17.529 | 18.281 |
与其绘制查询执行时间图,不如绘制吞吐量图。在这种情况下,可以用 60 秒除以执行时间来计算每分钟查询的吞吐量。如果一个查询在单线程上执行需要 5 秒,那么每分钟应该可以运行 12 个查询。
下面是一个示例图表,展示了吞吐量随着 CPU 核数的增加的伸缩性。
(原文也缺少这张图)
本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。
Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。
更多资料
希望你觉得这本书很有用,现在你对查询引擎的内部原理有了更好的了解。如果你觉得有什么话题没有充分涉及,或者根本没有涉及,我很希望听到这个消息,这样我就可以考虑在本书未来的修订中增加这些些内容。
你可以在 Leanpub 网站 的公共论坛上发表反馈意见,也可以直接通过 twitter @andygrove_io 给我留言。
开源项目
有许多包含查询引擎的开源项目,学习这些项目是了解该主题的好方法。这里给出了主流开源查询引擎的几个例子。
- Apache Arrow
- Apache Calcite
- Apache Drill
- Apache Hadoop
- Apache Hive
- Apache Impala
- Apache Spark
- Facebook Presto
- NVIDIA RAPIDS Accelerator for Apache Spark
YouTube
我最近才发现 Andy Pavlo 的系列讲座,可以在 YouTube 上看到(链接)。这其中涵盖的内容远不止查询引擎,而是有大量关于查询优化和执行的内容。我强烈建议观看这些视频。
样本数据
前面的章节提到了纽约市出租车和豪华轿车委员会的行程记录数据集。黄色和绿色的出租车行程记录包括记录上车和下车日期 / 时间、上车和下车地点、行程距离、分项票价、费率类型、付款类型以及司机报告的乘客人数。这些数据以 CSV 格式提供。KQuery 项目包含将这些 CSV 文件转换为 Parquet 格式的源代码。
数据可以通过网站上的链接下载或直接从 S3 下载文件。例如,Linux 或 Mac 上的用户可以使用 curl 或 wget 下载 2019 年 1 月黄色出租车的数据,命令如下,并根据文件命名规则创建脚本来下载其他文件。
wget https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-01.csv
本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。
Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。