-
Notifications
You must be signed in to change notification settings - Fork 446
/
tpch_parquet.scala
90 lines (83 loc) · 3.42 KB
/
tpch_parquet.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.spark.sql.execution.debug._
import scala.io.Source
import java.io.File
import java.util.Arrays
import sys.process._
//Configurations:
var parquet_file_path = "/PATH/TO/TPCH_PARQUET_PATH"
var gluten_root = "/PATH/TO/GLUTEN"
def time[R](block: => R): R = {
val t0 = System.nanoTime()
val result = block // call-by-name
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0)/1000000000.0 + " seconds")
result
}
//Read TPC-H Table from DWRF files
val lineitem = spark.read.format("parquet").load("file://" + parquet_file_path + "/lineitem")
val part = spark.read.format("parquet").load("file://" + parquet_file_path + "/part")
val orders = spark.read.format("parquet").load("file://" + parquet_file_path + "/orders")
val customer = spark.read.format("parquet").load("file://" + parquet_file_path + "/customer")
val supplier = spark.read.format("parquet").load("file://" + parquet_file_path + "/supplier")
val partsupp = spark.read.format("parquet").load("file://" + parquet_file_path + "/partsupp")
val region = spark.read.format("parquet").load("file://" + parquet_file_path + "/region")
val nation = spark.read.format("parquet").load("file://" + parquet_file_path + "/nation")
//Create DWRF based TPC-H Table View
lineitem.createOrReplaceTempView("lineitem")
orders.createOrReplaceTempView("orders")
customer.createOrReplaceTempView("customer")
part.createOrReplaceTempView("part")
supplier.createOrReplaceTempView("supplier")
partsupp.createOrReplaceTempView("partsupp")
nation.createOrReplaceTempView("nation")
region.createOrReplaceTempView("region")
def getListOfFiles(dir: String):List[File] = {
val d = new File(dir)
if (d.exists && d.isDirectory) {
//You can run a specific query by using below line
//d.listFiles.filter(_.isFile).filter(_.getName().contains("17.sql")).toList
d.listFiles.filter(_.isFile).toList
} else {
List[File]()
}
}
val fileLists = getListOfFiles(gluten_root + "/gluten-core/src/test/resources/tpch-queries/")
val sorted = fileLists.sortBy {
f => f.getName match {
case name =>
var str = name
str = str.replaceFirst("a", ".1")
str = str.replaceFirst("b", ".2")
str = str.replaceFirst(".sql", "")
str = str.replaceFirst("q", "")
str.toDouble
}}
// Main program to run TPC-H testing
for (t <- sorted) {
println(t)
val fileContents = Source.fromFile(t).getLines.filter(!_.startsWith("--")).mkString(" ")
println(fileContents)
try {
time{spark.sql(fileContents).show}
//spark.sql(fileContents).explain
Thread.sleep(2000)
} catch {
case e: Exception => None
}
}