分布式查询执行

上一节关于并行查询执行的内容涵盖了一些基本的概念,比如分区,我们将在这一节中继续深入讨论这些概念。

在某种程度上过度简化了分布式查询执行的概念,其目标是能够创建一个物理查询计划,定义如何将工作分配给集群中的一些“执行器”。分布式查询计划通常包含新的运算符,描述在查询执行过程中各执行器之间如何交换数据。

让我们使用上一章平行查询执行中的 SQL 查询例子,看看这个查询的分布式计划的含义。

SELECT passenger_count, MAX(max_fare) FROM tripdata GROUP BY passenger_count

我们可以在所有的数据分区上并行地执行这个查询,集群中的每个执行器都处理这些分区的一个子集。然而,我们需要把所有产生的聚合数据合并到一个节点上,然后应用最终的聚合查询,这样我们就可以得到一个没有重复分组键(本例中的 passenger_count)的单一结果集。下面是一个可能的逻辑查询计划,用于表示这一点。注意新的 Exchange 操作符,它表示执行器之间的数据交换。交换的物理计划可以通过将中间结果写入共享存储来实现,也可以通过将数据直接发送到其他执行器来实现。

HashAggregate: groupBy=[passenger_count], aggr=[MAX(max_fare)] Exchange: HashAggregate: groupBy=[passenger_count], aggr=[MAX(max_fare)] Scan: tripdata.parquet

新的 Exchange 操作符与我们迄今为止讨论过的其他操作符有根本的不同,因为我们不能只是在一个节点上建立一个操作符树并开始执行它。现在的查询需要跨执行器的协调,这意味着我们现在需要建立一个调度器。

分布式查询调度

在上层,分布式查询调度器的概念并不复杂。调度器需要检查整个查询,并将其分解为可以单独执行的阶段(通常是在各执行器之间并行执行),然后根据集群中的可用资源来安排这些阶段的执行。一旦每个查询阶段完成,就可以安排任何后续的附属查询阶段。这个过程重复进行,直到所有的查询阶段都被执行。

调度器也可以负责管理集群中的计算资源,这样就可以根据需要启动额外的执行器来处理查询负载。

在本章的剩余部分,我们将参照 Ballista 和该项目中正在实施的设计,讨论以下主题。

  • 管理集群中的计算资源
  • 序列化查询计划并在执行器之间交换查询计划
  • 在执行器之间交换中间结果
  • 优化分布式查询

管理计算资源

合理的起点是决定如何在网络环境中部署和管理执行器。现在有许多资源调度和协调工具,包括 YARN、Mesos 和 Kubernetes。也有一些项目旨在为这些资源调度器提供一个抽象,如 Apache YuniKorn

Ballista 使用 Kubernetes 是出于以下原因:

所有主要的云供应商都对 Kubernetes 有最好的支持。亚马逊有 Elastic Kubernetes Service(EKS),微软有 Azure Kubernetes Service(AKS),谷歌有谷歌 Kubernetes Engine(GKE)。

Kubernetes 背后有巨大的动力,虽然有一个陡峭的学习曲线和一些原始的边缘,但相信随着时间的推移,它将变得更加容易。 MicroK8s 和 Minikube 等项目使得在本地开发机器上创建一个 Kubernetes 集群变得容易。 Kubernetes 是一个开源框架,使用声明式配置来管理容器化的工作负载和服务。这意味着,为了在 Kubernetes 中实现某些功能,我们只需要在 YAML 文件中声明所需的状态,并将其提交给 Kubernetes API。然后,Kubernetes 将执行必要的行动以实现所需的状态。

Kubernetes 使用术语 “job” 来表示运行的容器,其目的是启动、执行一些处理,然后停止。术语 “pod” 用于打算保持运行直到被要求停止的容器。

下面是一个运行作业的 YAML 文件的例子。

apiVersion: batch/v1 kind: Job metadata: name: parallel-aggregate spec: template: spec: containers: - name: parallel-aggregate image: ballistacompute/parallel-aggregate-rs

这个 YAML 文件可以使用 kubectl 命令行工具提交至 Kubernetes 集群。

kubectl apply -f parallel-aggregate-rs.yaml

下面是一个 YAML 文件的例子,用于创建一个由 12 个 Ballista Rust 执行器组成的集群。这里使用了 Kubernetes 的“StatefulSet”概念。请求的是 12 个副本,每个实例将在这个范围内被分配一个唯一的副本编号。

apiVersion: apps/v1 kind: StatefulSet metadata: name: ballista spec: serviceName: "ballista" replicas: 12 selector: matchLabels: app: ballista template: metadata: labels: app: ballista ballista-cluster: ballista spec: containers: - name: ballista image: ballistacompute/rust:0.2.0-alpha-2

Kubernetes 使服务发现变得简单。在 Kubernetes 集群中运行的每个容器可以检查环境变量,以找到在同一集群中运行的其他服务的主机和端口。也可以配置 Kubernetes 为服务创建 DNS 条目。

序列化查询计划

查询调度器需要将整个查询计划的片段发送给执行器来执行。正如我们在前几章看到的,逻辑和物理查询计划是分层的数据结构,定义了执行查询所需的步骤。通过下面这个逻辑查询计划的例子来回忆一下。

Projection: #id, #first_name, #last_name, #state, #salary Filter: #state = 'CO' Scan: employee; projection=None

序列化查询计划有很多选择,以便它可以在进程之间传递。许多查询引擎选择了使用编程语言本地序列化支持的策略,如果没有要求能够在不同的编程语言之间交换查询计划,这是一个合适的选择,这通常是最简单的实现机制。

然而,使用一种与编程语言无关的序列化格式也有好处。Ballista 使用 Google Protocol Buffers 格式来定义查询计划。该项目通常被缩写为 “protobuf”。

下面是 Ballista protobuf 关于查询计划的定义的一部分。

完整的源代码可以在 Ballista github 仓库的 proto/ballista.proto 找到。

message LogicalPlanNode { LogicalPlanNode input = 1; FileNode file = 10; ProjectionNode projection = 20; SelectionNode selection = 21; LimitNode limit = 22; AggregateNode aggregate = 23; } message FileNode { string filename = 1; Schema schema = 2; repeated string projection = 3; } message ProjectionNode { repeated LogicalExprNode expr = 1; } message SelectionNode { LogicalExprNode expr = 2; } message AggregateNode { repeated LogicalExprNode group_expr = 1; repeated LogicalExprNode aggr_expr = 2; } message LimitNode { uint32 limit = 1; }

protobuf 项目提供了生成特定语言源代码的工具,用于序列化和反序列化数据。

序列化数据

数据也必须被序列化,因为它在客户端和执行器之间以及执行器之间流动。

Apache Arrow 提供了一种 IPC(进程间通信)格式,用于在进程间交换数据。由于 Arrow 提供的标准化内存布局,原始字节可以直接在内存和输入 / 输出设备(磁盘、网络等)之间传输,而没有序列化开销。这实际上是一个零拷贝操作,因为数据不需要从其内存格式转换为单独的序列化格式。

然而,关于数据的元数据,如表结构(列名和数据类型)确实需要使用 Google Flatbuffers 进行编码。这种元数据很小,通常每个结果集或每个批次都会被序列化一次,所以开销很小。

使用 Arrow 的另一个好处是,它提供了不同编程语言之间非常有效的数据交换。

Apache Arrow IPC 定义了数据编码格式,但没有定义交换数据的机制。例如,Arrow IPC 可以用来将数据从 JVM 语言通过 JNI 传输到 C 或 Rust。

选择一种协议

现在我们已经为查询计划和数据选择了序列化格式,下一个问题是我们如何在分布式进程之间交换这些数据。

Apache Arrow 提供的 Flight 协议正是为了这个目的。Flight 是一个新的通用的客户端-服务器框架,用于简化大型数据集在网络接口上的高性能传输。

Arrow 的 Flight 库提供了一个开发框架,用于实现可以发送和接收数据流的服务。一个 Flight 服务器支持几种基本的请求。

  • Handshake:一个简单的请求,以确定客户端是否被授权,在某些情况下,可以建立一个由实现方定义的会话令牌,用于之后的请求。
  • ListFlights:返回一个可用数据流的列表。
  • GetSchema:返回一个数据流的表结构。
  • GetFlightInfo:返回一个感兴趣的数据集的“访问计划”,可能需要消费多个数据流。这个请求可以接受包含自定义序列化命令的内容,例如你的特定应用参数。
  • DoGet:向客户端发送数据流。
  • DoPut:从客户端接收数据流。
  • DoAction:执行特定实现的动作并返回任意结果,即一个通用的函数调用。
  • ListActions:返回一个可用的动作类型的列表。

GetFlightInfo 方法可以用来编译查询计划,并返回接收结果的必要信息,例如,在每个执行器上调用 DoGet 然后开始接收查询的结果。

流式处理

尽快提供查询结果,并以数据流的方式发送到处理该数据的下一个进程是非常重要的,否则将导致不可接受的延迟,因为每个操作都必须等待前一个操作的完成。

然而,有些操作需要在产生任何输出之前收到所有的输入数据。一个排序操作就是一个很好的例子。在收到整个数据集之前,不可能完全对一个数据集进行排序。这个问题可以通过增加分区的数量来缓解,从而对大量分区进行并行排序,然后使用合并运算符有效地合并排序的批次。

自定义代码

经常需要运行自定义代码作为分布式查询或计算的一部分。对于单一语言的查询引擎,通常可以使用语言内置的序列化机制,在查询执行时通过网络传输这些代码,这在开发过程中非常方便。另一种方法是将编译好的代码发布到资源库中,这样就可以在运行时将其下载到集群中。对于基于 JVM 的系统,可以使用 maven 资源库。一个更通用的方法是将所有运行时的依赖项打包成一个 Docker 镜像。

查询计划需要提供必要的信息,以便在运行时加载用户代码。对于基于 JVM 的系统,这可能是一个 classpath 和一个类名。对于基于 C 语言的系统,这可能是一个共享对象的路径。在任何一种情况下,用户代码都需要实现一些已知的 API。

分布式查询优化

分布式查询执行与单个主机上的并行查询执行相比有很多开销,只有在有好处的情况下才应该使用。我建议阅读论文 Scalability! But at what COST 以了解关于这个话题的一些有趣的观点。

另外,有很多方法可以分发同一个查询,那么我们如何知道该使用哪一种呢?

一个答案是建立一种机制来确定执行一个特定查询计划的成本,然后为这个给定的问题创建所有可能的查询计划组合的一些子集,并确定哪一个是最有效的。

在计算一个操作的成本时,涉及到很多因素,而且有不同的资源成本和限制。

  • 内存:我们通常关注的是内存的可用性而不是性能。在内存中处理数据要比读写磁盘快好几个数量级。
  • CPU:对于可并行化的工作负载,更多的 CPU 核心意味着更好的吞吐量。
  • GPU:与 CPU 相比,某些操作在 GPU 上的速度要快几个数量级。
  • 磁盘:磁盘的读写速度是有限的,云计算供应商通常会限制每秒的 I/O 操作数(IOPS)。不同类型的磁盘有不同的性能特点(旋转磁盘与 SSD 与 NVMe)。
  • 网络:分布式查询的执行涉及节点之间的数据流。网络基础设施有一个吞吐量的限制。
  • **分布式存储。源数据存储在分布式文件系统(HDFS)或对象存储(Amazon S3、Azure Blob Storage)中是非常常见的,在分布式存储和本地文件系统之间传输数据是有成本的。
  • 数据大小:数据的大小很重要。当在两个表之间进行连接,并且需要通过网络传输数据时,最好是传输两个表中较小的那个。如果其中一个表可以装在内存中,那么可以使用更有效的连接操作。
  • 货币成本:如果一个查询用 3 倍的成本提升了 10% 的计算速度,这值得吗?当然,这个问题最好由用户来回答。货币成本通常是通过限制可用计算资源的数量来控制的。

如果提前知道足够的数据信息,比如数据有多大,查询中使用的连接键分区的基数,分区的数量等等,那么查询成本可以通过算法预先计算出来。这都取决于被查询的数据集的某些统计数据。

另一种方法是直接开始运行查询,让每个执行器根据它收到的输入数据进行调整。Apache Spark 3.0.0 引入了一个自适应查询执行功能,就是这样做的。

连接重排序

在生成查询计划时,查询优化器有时可以根据可用的统计数据来确定最佳的连接顺序。然而,一旦我们开始执行查询,我们往往可以得到更准确的统计数据,并可以利用这个机会进一步调整查询。例如,如果查询包含一个哈希连接,那么我们希望使用连接的较小一侧作为构建侧。

缓存中间结果

同一个查询每天被执行多次,参数不同,但执行计划的某些步骤保持不变,这是非常常见的。坚持执行查询的中间结果,以避免重复相同的操作,可能是有益的。如果将数据缓存在快速的本地存储上,如 NVMe 驱动器,也会有好处,从而消除了从分布式存储中重复获取数据的需要。Dremio 的 Data Lake Engine 和Snowflake 的 Cloud Data Platform 是利用这些技术的复杂查询引擎的好例子。

当然,这种复杂程度带来了新的挑战,比如在规划查询的运行位置时要考虑到数据的地域性。

物化视图

另一种优化技术,特别是对于不断变化的数据,是建立物化视图,可以随着新数据的到来不断更新,以避免从头开始建立视图的成本。Materialize 是这个概念的好例子。

从分布式查询失败中恢复

分布式查询的执行涉及到很多移动部件,在查询执行过程中很有可能会出现故障,所以有某种恢复机制是很重要的。

最简单的解决方案是等待查询的其余部分完成或失败,然后再次尝试整个查询,但这显然非常昂贵,而且让用户感到沮丧。

Kubernetes 的好处之一是它会自动重新启动失败的 pod,但仍然需要查询调度器来监控执行器,以知道何时执行查询的下一阶段。

本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。

Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。