查询执行

我们现在能够编写代码来执行针对 CSV 文件的优化过的查询。

在我们用 KQuery 执行查询之前,使用一个可信赖的替代方案是有用的,这样我们就能知道正确的结果应该是什么,并获得一些基线性能指标进行比较。

Apache Spark 示例

本章讨论的源代码可以在 KQuery 项目的 spark 模块中找到。

首先,我们需要创建一个 Spark 上下文。注意,我们使用单线程执行,这样我们就可以与 KQuery 中的单线程实现的性能做一个相对公平的比较。

val spark = SparkSession.builder() .master("local[1]") .getOrCreate()

接下来,我们需要将 CSV 文件注册到上下文的 DataFrame 中。

val schema = StructType(Seq( StructField("VendorID", DataTypes.IntegerType), StructField("tpep_pickup_datetime", DataTypes.TimestampType), StructField("tpep_dropoff_datetime", DataTypes.TimestampType), StructField("passenger_count", DataTypes.IntegerType), StructField("trip_distance", DataTypes.DoubleType), StructField("RatecodeID", DataTypes.IntegerType), StructField("store_and_fwd_flag", DataTypes.StringType), StructField("PULocationID", DataTypes.IntegerType), StructField("DOLocationID", DataTypes.IntegerType), StructField("payment_type", DataTypes.IntegerType), StructField("fare_amount", DataTypes.DoubleType), StructField("extra", DataTypes.DoubleType), StructField("mta_tax", DataTypes.DoubleType), StructField("tip_amount", DataTypes.DoubleType), StructField("tolls_amount", DataTypes.DoubleType), StructField("improvement_surcharge", DataTypes.DoubleType), StructField("total_amount", DataTypes.DoubleType) )) val tripdata = spark.read.format("csv") .option("header", "true") .schema(schema) .load("/mnt/nyctaxi/csv/yellow_tripdata_2019-01.csv") tripdata.createOrReplaceTempView("tripdata")

最后,我们可以继续对 DataFrame 执行 SQL。

val start = System.currentTimeMillis() val df = spark.sql( """SELECT passenger_count, MAX(fare_amount) |FROM tripdata |GROUP BY passenger_count""".stripMargin) df.foreach(row => println(row)) val duration = System.currentTimeMillis() - start println(s"Query took $duration ms")

在我的台式机上面执行这段代码会产生以下输出。

[1,623259.86] [6,262.5] [3,350.0] [5,760.0] [9,92.0] [4,500.0] [8,87.0] [7,78.0] [2,492.5] [0,36090.3] Query took 14418 ms

KQuery 示例

本章讨论的源代码可以在 KQuery 项目的 examples 模块中找到。

下面是用 KQuery 实现的等价查询。请注意,这段代码与 Spark 的示例不同,因为 KQuery 还没有指定 CSV 文件表结构的选项,所以所有的数据类型都是字符串,这意味着我们需要在查询计划中添加一个显式的转换,将 fare_amount 列转换成数值类型。

val time = measureTimeMillis { val ctx = ExecutionContext() val df = ctx.csv("/mnt/nyctaxi/csv/yellow_tripdata_2019-01.csv", 1*1024) .aggregate( listOf(col("passenger_count")), listOf(max(cast(col("fare_amount"), ArrowTypes.FloatType)))) val optimizedPlan = Optimizer().optimize(df.logicalPlan()) val results = ctx.execute(optimizedPlan) results.forEach { println(it.toCSV()) } println("Query took $time ms")

在我的台式机上面会产生以下输出。

Schema<passenger_count: Utf8, MAX: FloatingPoint(DOUBLE)> 1,623259.86 2,492.5 3,350.0 4,500.0 5,760.0 6,262.5 7,78.0 8,87.0 9,92.0 0,36090.3 Query took 6740 ms

可以看到,结果与 Apache Spark 产生的结果一致。还可以看到,对于这种规模的输入,其性能还不错。由于 Apache Spark 针对“大数据”进行了优化,因此它很可能在更大的数据集上胜过 KQuery。

去掉查询优化器

现在删除这些查询优化,看看它们对性能有多大帮助。

val time = measureTimeMillis { val ctx = ExecutionContext() val df = ctx.csv("/mnt/nyctaxi/csv/yellow_tripdata_2019-01.csv", 1*1024) .aggregate( listOf(col("passenger_count")), listOf(max(cast(col("fare_amount"), ArrowTypes.FloatType)))) val results = ctx.execute(df.logicalPlan()) results.forEach { println(it.toCSV()) } println("Query took $time ms")

在我的台式机上面会产生以下输出。

1,623259.86 2,492.5 3,350.0 4,500.0 5,760.0 6,262.5 7,78.0 8,87.0 9,92.0 0,36090.3 Query took 36090 ms

结果是一样的,但是查询的执行时间大约是五倍。这清楚地表明了上一章中讨论的投影下推优化的好处。

本书的电子版、MOBI和PDF格式也可从 https://leanpub.com/how-query-engines-work 购买。

Copyright © 2020-2022 Grove Enterprises, LLC。保留所有权利。