分布式查询执行
上一节关于并行查询执行的内容涵盖了一些基本的概念,比如分区,我们将在这一节中继续深入讨论这些概念。
在某种程度上过度简化了分布式查询执行的概念,其目标是能够创建一个物理查询计划,定义如何将工作分配给集群中的一些“执行器”。分布式查询计划通常包含新的运算符,描述在查询执行过程中各执行器之间如何交换数据。
让我们使用上一章平行查询执行中的 SQL 查询例子,看看这个查询的分布式计划的含义。
SELECT passenger_count,
MAX(max_fare)
FROM tripdata
GROUP BY passenger_count
我们可以在所有的数据分区上并行地执行这个查询,集群中的每个执行器都处理这些分区的一个子集。然而,我们需要把所有产生的聚合数据合并到一个节点上,然后应用最终的聚合查询,这样我们就可以得到一个没有重复分组键(本例中的 passenger_count
)的单一结果集。下面是一个可能的逻辑查询计划,用于表示这一点。注意新的 Exchange
操作符,它表示执行器之间的数据交换。交换的物理计划可以通过将中间结果写入共享存储来实现,也可以通过将数据直接发送到其他执行器来实现。
HashAggregate: groupBy=[passenger_count], aggr=[MAX(max_fare)]
Exchange:
HashAggregate: groupBy=[passenger_count], aggr=[MAX(max_fare)]
Scan: tripdata.parquet
新的 Exchange
操作符与我们迄今为止讨论过的其他操作符有根本的不同,因为我们不能只是在一个节点上建立一个操作符树并开始执行它。现在的查询需要跨执行器的协调,这意味着我们现在需要建立一个调度器。
分布式查询调度
在上层,分布式查询调度器的概念并不复杂。调度器需要检查整个查询,并将其分解为可以单独执行的阶段(通常是在各执行器之间并行执行),然后根据集群中的可用资源来安排这些阶段的执行。一旦每个查询阶段完成,就可以安排任何后续的附属查询阶段。这个过程重复进行,直到所有的查询阶段都被执行。
调度器也可以负责管理集群中的计算资源,这样就可以根据需要启动额外的执行器来处理查询负载。
在本章的剩余部分,我们将参照 Ballista 和该项目中正在实施的设计,讨论以下主题。
- 管理集群中的计算资源
- 序列化查询计划并在执行器之间交换查询计划
- 在执行器之间交换中间结果
- 优化分布式查询
管理计算资源
合理的起点是决定如何在网络环境中部署和管理执行器。现在有许多资源调度和协调工具,包括 YARN、Mesos 和 Kubernetes。也有一些项目旨在为这些资源调度器提供一个抽象,如 Apache YuniKorn。
Ballista 使用 Kubernetes 是出于以下原因:
所有主要的云供应商都对 Kubernetes 有最好的支持。亚马逊有 Elastic Kubernetes Service(EKS),微软有 Azure Kubernetes Service(AKS),谷歌有谷歌 Kubernetes Engine(GKE)。
Kubernetes 背后有巨大的动力,虽然有一个陡峭的学习曲线和一些原始的边缘,但相信随着时间的推移,它将变得更加容易。 MicroK8s 和 Minikube 等项目使得在本地开发机器上创建一个 Kubernetes 集群变得容易。 Kubernetes 是一个开源框架,使用声明式配置来管理容器化的工作负载和服务。这意味着,为了在 Kubernetes 中实现某些功能,我们只需要在 YAML 文件中声明所需的状态,并将其提交给 Kubernetes API。然后,Kubernetes 将执行必要的行动以实现所需的状态。
Kubernetes 使用术语 “job” 来表示运行的容器,其目的是启动、执行一些处理,然后停止。术语 “pod” 用于打算保持运行直到被要求停止的容器。
下面是一个运行作业的 YAML 文件的例子。
apiVersion: batch/v1
kind: Job
metadata:
name: parallel-aggregate
spec:
template:
spec:
containers:
- name: parallel-aggregate
image: ballistacompute/parallel-aggregate-rs
这个 YAML 文件可以使用 kubectl 命令行工具提交至 Kubernetes 集群。
kubectl apply -f parallel-aggregate-rs.yaml
下面是一个 YAML 文件的例子,用于创建一个由 12 个 Ballista Rust 执行器组成的集群。这里使用了 Kubernetes 的“StatefulSet”概念。请求的是 12 个副本,每个实例将在这个范围内被分配一个唯一的副本编号。
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: ballista
spec:
serviceName: "ballista"
replicas: 12
selector:
matchLabels:
app: ballista
template:
metadata:
labels:
app: ballista
ballista-cluster: ballista
spec:
containers:
- name: ballista
image: ballistacompute/rust:0.2.0-alpha-2
Kubernetes 使服务发现变得简单。在 Kubernetes 集群中运行的每个容器可以检查环境变量,以找到在同一集群中运行的其他服务的主机和端口。也可以配置 Kubernetes 为服务创建 DNS 条目。
序列化查询计划
查询调度器需要将整个查询计划的片段发送给执行器来执行。正如我们在前几章看到的,逻辑和物理查询计划是分层的数据结构,定义了执行查询所需的步骤。通过下面这个逻辑查询计划的例子来回忆一下。
Projection: #id, #first_name, #last_name, #state, #salary
Filter: #state = 'CO'
Scan: employee; projection=None
序列化查询计划有很多选择,以便它可以在进程之间传递。许多查询引擎选择了使用编程语言本地序列化支持的策略,如果没有要求能够在不同的编程语言之间交换查询计划,这是一个合适的选择,这通常是最简单的实现机制。
然而,使用一种与编程语言无关的序列化格式也有好处。Ballista 使用 Google Protocol Buffers 格式来定义查询计划。该项目通常被缩写为 “protobuf”。
下面是 Ballista protobuf 关于查询计划的定义的一部分。
完整的源代码可以在 Ballista github 仓库的 proto/ballista.proto
找到。
message LogicalPlanNode {
LogicalPlanNode input = 1;
FileNode file = 10;
ProjectionNode projection = 20;
SelectionNode selection = 21;
LimitNode limit = 22;
AggregateNode aggregate = 23;
}
message FileNode {
string filename = 1;
Schema schema = 2;
repeated string projection = 3;
}
message ProjectionNode {
repeated LogicalExprNode expr = 1;
}
message SelectionNode {
LogicalExprNode expr = 2;
}
message AggregateNode {
repeated LogicalExprNode group_expr = 1;
repeated LogicalExprNode aggr_expr = 2;
}
message LimitNode {
uint32 limit = 1;
}
protobuf 项目提供了生成特定语言源代码的工具,用于序列化和反序列化数据。
序列化数据
数据也必须被序列化,因为它在客户端和执行器之间以及执行器之间流动。
Apache Arrow 提供了一种 IPC(进程间通信)格式,用于在进程间交换数据。由于 Arrow 提供的标准化内存布局,原始字节可以直接在内存和输入 / 输出设备(磁盘、网络等)之间传输,而没有序列化开销。这实际上是一个零拷贝操作,因为数据不需要从其内存格式转换为单独的序列化格式。
然而,关于数据的元数据,如表结构(列名和数据类型)确实需要使用 Google Flatbuffers 进行编码。这种元数据很小,通常每个结果集或每个批次都会被序列化一次,所以开销很小。
使用 Arrow 的另一个好处是,它提供了不同编程语言之间非常有效的数据交换。
Apache Arrow IPC 定义了数据编码格式,但没有定义交换数据的机制。例如,Arrow IPC 可以用来将数据从 JVM 语言通过 JNI 传输到 C 或 Rust。
选择一种协议
现在我们已经为查询计划和数据选择了序列化格式,下一个问题是我们如何在分布式进程之间交换这些数据。
Apache Arrow 提供的 Flight 协议正是为了这个目的。Flight 是一个新的通用的客户端-服务器框架,用于简化大型数据集在网络接口上的高性能传输。
Arrow 的 Flight 库提供了一个开发框架,用于实现可以发送和接收数据流的服务。一个 Flight 服务器支持几种基本的请求。
- Handshake:一个简单的请求,以确定客户端是否被授权,在某些情况下,可以建立一个由实现方定义的会话令牌,用于之后的请求。
- ListFlights:返回一个可用数据流的列表。
- GetSchema:返回一个数据流的表结构。
- GetFlightInfo:返回一个感兴趣的数据集的“访问计划”,可能需要消费多个数据流。这个请求可以接受包含自定义序列化命令的内容,例如你的特定应用参数。
- DoGet:向客户端发送数据流。
- DoPut:从客户端接收数据流。
- DoAction:执行特定实现的动作并返回任意结果,即一个通用的函数调用。
- ListActions:返回一个可用的动作类型的列表。
GetFlightInfo
方法可以用来编译查询计划,并返回接收结果的必要信息,例如,在每个执行器上调用 DoGet
然后开始接收查询的结果。
流式处理
尽快提供查询结果,并以数据流的方式发送到处理该数据的下一个进程是非常重要的,否则将导致不可接受的延迟,因为每个操作都必须等待前一个操作的完成。
然而,有些操作需要在产生任何输出之前收到所有的输入数据。一个排序操作就是一个很好的例子。在收到整个数据集之前,不可能完全对一个数据集进行排序。这个问题可以通过增加分区的数量来缓解,从而对大量分区进行并行排序,然后使用合并运算符有效地合并排序的批次。
自定义代码
经常需要运行自定义代码作为分布式查询或计算的一部分。对于单一语言的查询引擎,通常可以使用语言内置的序列化机制,在查询执行时通过网络传输这些代码,这在开发过程中非常方便。另一种方法是将编译好的代码发布到资源库中,这样就可以在运行时将其下载到集群中。对于基于 JVM 的系统,可以使用 maven 资源库。一个更通用的方法是将所有运行时的依赖项打包成一个 Docker 镜像。
查询计划需要提供必要的信息,以便在运行时加载用户代码。对于基于 JVM 的系统,这可能是一个 classpath 和一个类名。对于基于 C 语言的系统,这可能是一个共享对象的路径。在任何一种情况下,用户代码都需要实现一些已知的 API。
分布式查询优化
分布式查询执行与单个主机上的并行查询执行相比有很多开销,只有在有好处的情况下才应该使用。我建议阅读论文 Scalability! But at what COST 以了解关于这个话题的一些有趣的观点。
另外,有很多方法可以分发同一个查询,那么我们如何知道该使用哪一种呢?
一个答案是建立一种机制来确定执行一个特定查询计划的成本,然后为这个给定的问题创建所有可能的查询计划组合的一些子集,并确定哪一个是最有效的。
在计算一个操作的成本时,涉及到很多因素,而且有不同的资源成本和限制。
- 内存:我们通常关注的是内存的可用性而不是性能。在内存中处理数据要比读写磁盘快好几个数量级。
- CPU:对于可并行化的工作负载,更多的 CPU 核心意味着更好的吞吐量。
- GPU:与 CPU 相比,某些操作在 GPU 上的速度要快几个数量级。
- 磁盘:磁盘的读写速度是有限的,云计算供应商通常会限制每秒的 I/O 操作数(IOPS)。不同类型的磁盘有不同的性能特点(旋转磁盘与 SSD 与 NVMe)。
- 网络:分布式查询的执行涉及节点之间的数据流。网络基础设施有一个吞吐量的限制。
- **分布式存储。源数据存储在分布式文件系统(HDFS)或对象存储(Amazon S3、Azure Blob Storage)中是非常常见的,在分布式存储和本地文件系统之间传输数据是有成本的。
- 数据大小:数据的大小很重要。当在两个表之间进行连接,并且需要通过网络传输数据时,最好是传输两个表中较小的那个。如果其中一个表可以装在内存中,那么可以使用更有效的连接操作。
- 货币成本:如果一个查询用 3 倍的成本提升了 10% 的计算速度,这值得吗?当然,这个问题最好由用户来回答。货币成本通常是通过限制可用计算资源的数量来控制的。
如果提前知道足够的数据信息,比如数据有多大,查询中使用的连接键分区的基数,分区的数量等等,那么查询成本可以通过算法预先计算出来。这都取决于被查询的数据集的某些统计数据。
另一种方法是直接开始运行查询,让每个执行器根据它收到的输入数据进行调整。Apache Spark 3.0.0 引入了一个自适应查询执行功能,就是这样做的。
连接重排序
在生成查询计划时,查询优化器有时可以根据可用的统计数据来确定最佳的连接顺序。然而,一旦我们开始执行查询,我们往往可以得到更准确的统计数据,并可以利用这个机会进一步调整查询。例如,如果查询包含一个哈希连接,那么我们希望使用连接的较小一侧作为构建侧。
缓存中间结果
同一个查询每天被执行多次,参数不同,但执行计划的某些步骤保持不变,这是非常常见的。坚持执行查询的中间结果,以避免重复相同的操作,可能是有益的。如果将数据缓存在快速的本地存储上,如 NVMe 驱动器,也会有好处,从而消除了从分布式存储中重复获取数据的需要。Dremio 的 Data Lake Engine 和Snowflake 的 Cloud Data Platform 是利用这些技术的复杂查询引擎的好例子。
当然,这种复杂程度带来了新的挑战,比如在规划查询的运行位置时要考虑到数据的地域性。
物化视图
另一种优化技术,特别是对于不断变化的数据,是建立物化视图,可以随着新数据的到来不断更新,以避免从头开始建立视图的成本。Materialize 是这个概念的好例子。
从分布式查询失败中恢复
分布式查询的执行涉及到很多移动部件,在查询执行过程中很有可能会出现故障,所以有某种恢复机制是很重要的。
最简单的解决方案是等待查询的其余部分完成或失败,然后再次尝试整个查询,但这显然非常昂贵,而且让用户感到沮丧。
Kubernetes 的好处之一是它会自动重新启动失败的 pod,但仍然需要查询调度器来监控执行器,以知道何时执行查询的下一阶段。
本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。
Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。