并行查询执行

到目前为止,我们一直在使用单线程来执行对单个文件的查询。这种方法的可扩展性不强,因为对于较大的文件或多个文件而言,查询的运行时间会更长。下一步是实现分布式查询执行,以便查询执行可以利用多个 CPU 核心和多个服务器。

分布式查询执行的最简单形式是利用线程在单个节点上的多个 CPU 核心进行并行查询执行。

纽约出租车数据集已经进行了方便使用的分区,因为每年每个月都有一个 CSV 文件,例如 2019 年的数据集有 12 个分区。最直接的并行查询执行的方法是并行地在每个分区使用一个线程来执行相同的查询,然后合并结果。假设这段代码是在一台有六个 CPU 核心、支持超线程的计算机上运行。在假设每个月的数据量相似的情况下,这 12 个查询的执行时间应该与在一个单线程上运行其中一个查询的时间相同。

下面是一个在 12 个分区中并行运行一个聚合 SQL 查询的例子。这个例子是用Kotlin 协程实现的,而不是直接使用线程。

这个例子的源代码可以在 KQuery GitHub 仓库的 jvm/examples/src/main/kotlin/ParallelQuery.kt 中找到。

让我们从单线程代码开始,对每个分区运行一个查询。

fun executeQuery(path: String, month: Int, sql: String): List<RecordBatch> { val monthStr = String.format("%02d", month); val filename = "$path/yellow_tripdata_2019-$monthStr.csv" val ctx = ExecutionContext() ctx.registerCsv("tripdata", filename) val df = ctx.sql(sql) return ctx.execute(df).toList() }

有了这些,就可以编写以下代码,并行地在这 12 个数据分区中运行这个查询。

val start = System.currentTimeMillis() val deferred = (1..12).map {month -> GlobalScope.async { val sql = "SELECT passenger_count, " + "MAX(CAST(fare_amount AS double)) AS max_fare " + "FROM tripdata " + "GROUP BY passenger_count" val start = System.currentTimeMillis() val result = executeQuery(path, month, sql) val duration = System.currentTimeMillis() - start println("Query against month $month took $duration ms") result } } val results: List<RecordBatch> = runBlocking { deferred.flatMap { it.await() } } val duration = System.currentTimeMillis() - start println("Collected ${results.size} batches in $duration ms")

下面是在一台 24 核心的台式机上运行这个示例的输出结果。

Query against month 8 took 17074 ms Query against month 9 took 18976 ms Query against month 7 took 20010 ms Query against month 2 took 21417 ms Query against month 11 took 21521 ms Query against month 12 took 22082 ms Query against month 6 took 23669 ms Query against month 1 took 23735 ms Query against month 10 took 23739 ms Query against month 3 took 24048 ms Query against month 5 took 24103 ms Query against month 4 took 25439 ms Collected 12 batches in 25505 ms

如你所见,总时长与最慢查询的时间差不多。

尽管我们已经成功地对分区执行了聚合查询,但我们的结果是一个具有重复值的数据批次的列表。例如,很可能在每个分区中都有一个 passenger_count=1 的结果。

合并结果

对于由投影和选择运算符组成的简单查询,并行查询的结果可以合并(类似于SQL的 UNION ALL 操作),并且不需要进一步处理。涉及聚合、排序或连接的更复杂的查询则需要在并行查询的结果上运行一个二级查询来合并结果。术语 “map” 和 “reduce” 经常被用来解释这个两步过程。“map” 步骤指的是在各分区中并行运行一个查询,而 “reduce” 步骤指的是将结果合并为单一的结果。

对于这个特定的例子,现在有必要运行一个二级聚合查询,几乎与针对分区执行的聚合查询相同。一个区别是,第二个查询可能需要应用不同的聚合函数。对于聚合函数 minmaxsum,在 map 和 reduce 步骤中使用相同的操作,以获得这些 min 的最小值或 sum 的总和。对于 count 表达式,我们不希望看到 count 的数量。我们希望看到的是计数的总和。

val sql = "SELECT passenger_count, " + "MAX(max_fare) " + "FROM tripdata " + "GROUP BY passenger_count" val ctx = ExecutionContext() ctx.registerDataSource("tripdata", InMemoryDataSource(results.first().schema, results)) val df = ctx.sql(sql) ctx.execute(df).forEach { println(it) }

这产生了最终的结果集:

1,671123.14 2,1196.35 3,350.0 4,500.0 5,760.0 6,262.5 7,80.52 8,89.0 9,97.5 0,90000.0

更智能的分区

虽然在这个例子中,每个文件使用一个线程的策略很有效,但它并不能作为分区的通用方法。如果一个数据源有数以千计的小分区,每个分区启动一个线程将是低效的。更好的方法是由查询规划器决定如何在指定数量的工作线程(或执行器)之间分享可用数据。

一些文件格式中已经有了自然的分区方案。例如,Apache Parquet 文件由多个 “行组”组成,包含成批的列数据。一个查询规划器可以检查可用的 Parquet 文件,建立一个行组列表,然后在固定数量的线程或执行器中进行调度,以读取这些行组。

甚至有可能将这种技术应用于非结构化文件,如 CSV 文件,但这并非易事。检查文件大小并将文件分成同等大小的块很容易,但一条记录很可能跨越两个块,因此有必要从一个边界向后或向前读取,以找到记录的开始或结束。寻找换行符是不够的,因为换行符经常出现在记录中,也被用来给记录划界。通常的做法是在处理管道的早期将 CSV 文件转换成结构化格式,如 Parquet,以提高后续处理的效率。

分区键

解决这个问题的一种办法是将文件放在目录中,使用由键值对组成的目录名来指定内容。

例如,我们可以按以下方式组织文件:

/mnt/nyxtaxi/csv/year=2019/month=1/tripdata.csv /mnt/nyxtaxi/csv/year=2019/month=2/tripdata.csv ... /mnt/nyxtaxi/csv/year=2019/month=12/tripdata.csv

基于这种结构,查询规划器现在可以实现一种“谓词下推”的形式,以限制物理查询计划中包含的分区数量。这种方法通常被称为“分区裁剪”。

并行连接

当用单线程执行内部连接时,一个简单的方法是将连接的一侧加载到内存中,然后扫描另一侧,对存储在内存中的数据进行查找。对于连接的一边可以装入内存的情况,这种经典的哈希连接算法是有效的。

它的并行版本被称为分区哈希连接或并行哈希连接。它涉及到根据连接键对两个输入进行分区,并在每一对输入分区上执行一个经典哈希连接。

本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。

Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。