选择类型系统

本章讨论的源代码可以在 KQuery 项目的 datatypes 模块中找到。

构建查询引擎的第一步是选择一个类型系统来表示查询引擎处理的不同类型的数据。一种选择是发明一个专门用于查询引擎的专有类型系统。另一种选择是使用查询引擎所面向的数据源的类型系统。

如果查询引擎要支持查询多个数据源(通常是这样),那么在每个支持的数据源和查询引擎的类型系统之间可能需要一些转换,使用一个能够代表所有支持的数据源的所有数据类型的类型系统将是很重要的。

基于行还是基于列?

一个重要的考虑因素是查询引擎是否会逐行处理数据,或者是否会以列的格式表示数据。

现在的许多查询引擎都是基于 火山查询规划器 的,其中物理计划中的每一步基本上都是对行的迭代器。这种模型实现起来比较简单,但往往会引入每行的开销,在对数十亿行进行查询时,开销会迅速增加。这种开销可以通过对成批的数据进行迭代来减少。此外,如果这些批次代表列数据而不是行,就有可能使用“向量化处理”并利用 SIMD(单指令多数据)的优势,用一条 CPU 指令处理一列中的多个值。通过利用 GPU 来并行处理更大数量的数据,这个概念还可以更进一步。

互操作性

另一个考虑是,我们希望能够用多种编程语言访问查询引擎。查询引擎的用户通常会使用 Python、R 或 Java 等语言。也许我们还想开发 ODBC 或 JDBC 驱动,以便于建立集成。

基于这些要求,最好能找到一个行业标准来表示列式数据,并在进程之间有效地交换这些数据。

我相信 Apache Arrow 就提供了一个理想的基础,这可能并不令人惊讶。

类型系统

我们将使用 Apache Arrow 作为类型系统的基础。以下的 Arrow 类被用来表示表结构、字段和数据类型。

  • Schema 为数据源或查询的结果提供元数据。一个 Schema 由一个或多个字段组成。
  • Field 为 Schema 中的一个字段提供名称和数据类型,并指定它是否允许空值。
  • FieldVector 为一个 Field 的数据提供列式存储。
  • ArrowType 表示一种数据类型。

KQuery 引入了一些额外的类和帮助类,作为 Apache Arrow 类型系统的一个抽象。

KQuery 提供常量,可以被支持的Arrow数据类型引用。

object ArrowTypes { val BooleanType = ArrowType.Bool() val Int8Type = ArrowType.Int(8, true) val Int16Type = ArrowType.Int(16, true) val Int32Type = ArrowType.Int(32, true) val Int64Type = ArrowType.Int(64, true) val UInt8Type = ArrowType.Int(8, false) val UInt16Type = ArrowType.Int(16, false) val UInt32Type = ArrowType.Int(32, false) val UInt64Type = ArrowType.Int(64, false) val FloatType = ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE) val DoubleType = ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) val StringType = ArrowType.Utf8() }

KQuery 没有直接使用 FieldVector,而是引入了 ColumnVector 接口作为抽象,以提供更方便的访问方法,避免了为每个数据类型提供特定的 FieldVector 实现。

interface ColumnVector { fun getType(): ArrowType fun getValue(i: Int) : Any? fun size(): Int }

这种抽象也使得将标量值转换为向量成为可能,避免了创建和填充一个 FieldVector,为列中的每个索引重复一个字面量。

class LiteralValueVector( val arrowType: ArrowType, val value: Any?, val size: Int) : ColumnVector { override fun getType(): ArrowType { return arrowType } override fun getValue(i: Int): Any? { if (i<0 || i>=size) { throw IndexOutOfBoundsException() } return value } override fun size(): Int { return size } }

KQuery 还提供了一个 RecordBatch 类来表示一批列式数据。

class RecordBatch(val schema: Schema, val fields: List<ColumnVector>) { fun rowCount() = fields.first().size() fun columnCount() = fields.size /** Access one column by index */ fun field(i: Int): ColumnVector { return fields[i] } }

本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。

Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。