From cc75eadc60abc7b1d550e440c5d30db2d35e71f1 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 24 Nov 2024 10:50:53 +0100 Subject: [PATCH] Add Spark --- spark/benchmark.sh | 22 +++++++++++++ spark/queries.sql | 43 +++++++++++++++++++++++++ spark/query.py | 25 +++++++++++++++ spark/results/c6a.4xlarge.json | 58 ++++++++++++++++++++++++++++++++++ spark/run.sh | 8 +++++ 5 files changed, 156 insertions(+) create mode 100755 spark/benchmark.sh create mode 100644 spark/queries.sql create mode 100755 spark/query.py create mode 100644 spark/results/c6a.4xlarge.json create mode 100755 spark/run.sh diff --git a/spark/benchmark.sh b/spark/benchmark.sh new file mode 100755 index 000000000..5484486cd --- /dev/null +++ b/spark/benchmark.sh @@ -0,0 +1,22 @@ +#!/bin/bash + +# Install + +sudo apt-get update +sudo apt-get install -y python3-pip openjdk-17-jdk + +export JAVA_HOME="/usr/lib/jvm/java-17-openjdk-amd64/" +export PATH=$JAVA_HOME/bin:$PATH + +pip install --break-system-packages pyspark psutil + +# Load the data + +wget --no-verbose --continue 'https://datasets.clickhouse.com/hits_compatible/hits.parquet' + +# Run the queries + +./run.sh 2>&1 | tee log.txt + +cat log.txt | grep -P '^Time:\s+([\d\.]+)|Failure!' | sed -r -e 's/Time: //; s/^Failure!$/null/' | + awk '{ if (i % 3 == 0) { printf "[" }; printf $1; if (i % 3 != 2) { printf "," } else { print "]," }; ++i; }' diff --git a/spark/queries.sql b/spark/queries.sql new file mode 100644 index 000000000..e3ca1b12c --- /dev/null +++ b/spark/queries.sql @@ -0,0 +1,43 @@ +SELECT COUNT(*) FROM hits; +SELECT COUNT(*) FROM hits WHERE AdvEngineID <> 0; +SELECT SUM(AdvEngineID), COUNT(*), AVG(ResolutionWidth) FROM hits; +SELECT AVG(UserID) FROM hits; +SELECT COUNT(DISTINCT UserID) FROM hits; +SELECT COUNT(DISTINCT SearchPhrase) FROM hits; +SELECT MIN(EventDate), MAX(EventDate) FROM hits; +SELECT AdvEngineID, COUNT(*) FROM hits WHERE AdvEngineID <> 0 GROUP BY AdvEngineID ORDER BY COUNT(*) DESC; +SELECT RegionID, COUNT(DISTINCT UserID) AS u FROM hits GROUP BY RegionID ORDER BY u DESC LIMIT 10; +SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID) FROM hits GROUP BY RegionID ORDER BY c DESC LIMIT 10; +SELECT MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM hits WHERE MobilePhoneModel <> '' GROUP BY MobilePhoneModel ORDER BY u DESC LIMIT 10; +SELECT MobilePhone, MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM hits WHERE MobilePhoneModel <> '' GROUP BY MobilePhone, MobilePhoneModel ORDER BY u DESC LIMIT 10; +SELECT SearchPhrase, COUNT(*) AS c FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT SearchPhrase, COUNT(DISTINCT UserID) AS u FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10; +SELECT SearchEngineID, SearchPhrase, COUNT(*) AS c FROM hits WHERE SearchPhrase <> '' GROUP BY SearchEngineID, SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT UserID, COUNT(*) FROM hits GROUP BY UserID ORDER BY COUNT(*) DESC LIMIT 10; +SELECT UserID, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10; +SELECT UserID, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, SearchPhrase LIMIT 10; +SELECT UserID, extract(minute FROM TIMESTAMP(EventTime)) AS m, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, extract(minute FROM TIMESTAMP(EventTime)), SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10; +SELECT UserID FROM hits WHERE UserID = 435090932899640449; +SELECT COUNT(*) FROM hits WHERE URL LIKE '%google%'; +SELECT SearchPhrase, MIN(URL), COUNT(*) AS c FROM hits WHERE URL LIKE '%google%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT SearchPhrase, MIN(URL), MIN(Title), COUNT(*) AS c, COUNT(DISTINCT UserID) FROM hits WHERE Title LIKE '%Google%' AND URL NOT LIKE '%.google.%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT * FROM hits WHERE URL LIKE '%google%' ORDER BY EventTime LIMIT 10; +SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY EventTime LIMIT 10; +SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY SearchPhrase LIMIT 10; +SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY EventTime, SearchPhrase LIMIT 10; +SELECT CounterID, AVG(length(URL)) AS l, COUNT(*) AS c FROM hits WHERE URL <> '' GROUP BY CounterID HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25; +SELECT REGEXP_REPLACE(Referer, '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS k, AVG(length(Referer)) AS l, COUNT(*) AS c, MIN(Referer) FROM hits WHERE Referer <> '' GROUP BY REGEXP_REPLACE(Referer, '^https?://(?:www\.)?([^/]+)/.*$', '\1') HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25; +SELECT SUM(ResolutionWidth), SUM(ResolutionWidth + 1), SUM(ResolutionWidth + 2), SUM(ResolutionWidth + 3), SUM(ResolutionWidth + 4), SUM(ResolutionWidth + 5), SUM(ResolutionWidth + 6), SUM(ResolutionWidth + 7), SUM(ResolutionWidth + 8), SUM(ResolutionWidth + 9), SUM(ResolutionWidth + 10), SUM(ResolutionWidth + 11), SUM(ResolutionWidth + 12), SUM(ResolutionWidth + 13), SUM(ResolutionWidth + 14), SUM(ResolutionWidth + 15), SUM(ResolutionWidth + 16), SUM(ResolutionWidth + 17), SUM(ResolutionWidth + 18), SUM(ResolutionWidth + 19), SUM(ResolutionWidth + 20), SUM(ResolutionWidth + 21), SUM(ResolutionWidth + 22), SUM(ResolutionWidth + 23), SUM(ResolutionWidth + 24), SUM(ResolutionWidth + 25), SUM(ResolutionWidth + 26), SUM(ResolutionWidth + 27), SUM(ResolutionWidth + 28), SUM(ResolutionWidth + 29), SUM(ResolutionWidth + 30), SUM(ResolutionWidth + 31), SUM(ResolutionWidth + 32), SUM(ResolutionWidth + 33), SUM(ResolutionWidth + 34), SUM(ResolutionWidth + 35), SUM(ResolutionWidth + 36), SUM(ResolutionWidth + 37), SUM(ResolutionWidth + 38), SUM(ResolutionWidth + 39), SUM(ResolutionWidth + 40), SUM(ResolutionWidth + 41), SUM(ResolutionWidth + 42), SUM(ResolutionWidth + 43), SUM(ResolutionWidth + 44), SUM(ResolutionWidth + 45), SUM(ResolutionWidth + 46), SUM(ResolutionWidth + 47), SUM(ResolutionWidth + 48), SUM(ResolutionWidth + 49), SUM(ResolutionWidth + 50), SUM(ResolutionWidth + 51), SUM(ResolutionWidth + 52), SUM(ResolutionWidth + 53), SUM(ResolutionWidth + 54), SUM(ResolutionWidth + 55), SUM(ResolutionWidth + 56), SUM(ResolutionWidth + 57), SUM(ResolutionWidth + 58), SUM(ResolutionWidth + 59), SUM(ResolutionWidth + 60), SUM(ResolutionWidth + 61), SUM(ResolutionWidth + 62), SUM(ResolutionWidth + 63), SUM(ResolutionWidth + 64), SUM(ResolutionWidth + 65), SUM(ResolutionWidth + 66), SUM(ResolutionWidth + 67), SUM(ResolutionWidth + 68), SUM(ResolutionWidth + 69), SUM(ResolutionWidth + 70), SUM(ResolutionWidth + 71), SUM(ResolutionWidth + 72), SUM(ResolutionWidth + 73), SUM(ResolutionWidth + 74), SUM(ResolutionWidth + 75), SUM(ResolutionWidth + 76), SUM(ResolutionWidth + 77), SUM(ResolutionWidth + 78), SUM(ResolutionWidth + 79), SUM(ResolutionWidth + 80), SUM(ResolutionWidth + 81), SUM(ResolutionWidth + 82), SUM(ResolutionWidth + 83), SUM(ResolutionWidth + 84), SUM(ResolutionWidth + 85), SUM(ResolutionWidth + 86), SUM(ResolutionWidth + 87), SUM(ResolutionWidth + 88), SUM(ResolutionWidth + 89) FROM hits; +SELECT SearchEngineID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits WHERE SearchPhrase <> '' GROUP BY SearchEngineID, ClientIP ORDER BY c DESC LIMIT 10; +SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits WHERE SearchPhrase <> '' GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10; +SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10; +SELECT URL, COUNT(*) AS c FROM hits GROUP BY URL ORDER BY c DESC LIMIT 10; +SELECT 1, URL, COUNT(*) AS c FROM hits GROUP BY 1, URL ORDER BY c DESC LIMIT 10; +SELECT ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3, COUNT(*) AS c FROM hits GROUP BY ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3 ORDER BY c DESC LIMIT 10; +SELECT URL, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= 15887 AND EventDate <= 15917 AND DontCountHits = 0 AND IsRefresh = 0 AND URL <> '' GROUP BY URL ORDER BY PageViews DESC LIMIT 10; +SELECT Title, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= 15887 AND EventDate <= 15917 AND DontCountHits = 0 AND IsRefresh = 0 AND Title <> '' GROUP BY Title ORDER BY PageViews DESC LIMIT 10; +SELECT URL, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= 15887 AND EventDate <= 15917 AND IsRefresh = 0 AND IsLink <> 0 AND IsDownload = 0 GROUP BY URL ORDER BY PageViews DESC LIMIT 1010; +SELECT TraficSourceID, SearchEngineID, AdvEngineID, CASE WHEN (SearchEngineID = 0 AND AdvEngineID = 0) THEN Referer ELSE '' END AS Src, URL AS Dst, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= 15887 AND EventDate <= 15917 AND IsRefresh = 0 GROUP BY TraficSourceID, SearchEngineID, AdvEngineID, CASE WHEN (SearchEngineID = 0 AND AdvEngineID = 0) THEN Referer ELSE '' END, URL ORDER BY PageViews DESC LIMIT 1010; +SELECT URLHash, EventDate, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= 15887 AND EventDate <= 15917 AND IsRefresh = 0 AND TraficSourceID IN (-1, 6) AND RefererHash = 3594120000172545465 GROUP BY URLHash, EventDate ORDER BY PageViews DESC LIMIT 110; +SELECT WindowClientWidth, WindowClientHeight, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= 15887 AND EventDate <= 15917 AND IsRefresh = 0 AND DontCountHits = 0 AND URLHash = 2868770270353813622 GROUP BY WindowClientWidth, WindowClientHeight ORDER BY PageViews DESC LIMIT 10010; +SELECT DATE_TRUNC('minute', TIMESTAMP(EventTime)) AS M, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= 15900 AND EventDate <= 15901 AND IsRefresh = 0 AND DontCountHits = 0 GROUP BY DATE_TRUNC('minute', TIMESTAMP(EventTime)) ORDER BY DATE_TRUNC('minute', TIMESTAMP(EventTime)) LIMIT 1010; diff --git a/spark/query.py b/spark/query.py new file mode 100755 index 000000000..80c9b2387 --- /dev/null +++ b/spark/query.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 + +from pyspark.sql import SparkSession + +import timeit +import psutil +import sys + +query = sys.stdin.read() +print(query) + +spark = SparkSession.builder.appName("ClickBench").getOrCreate() +df = spark.read.parquet("hits.parquet") +df.createOrReplaceTempView("hits") + +for try_num in range(3): + try: + start = timeit.default_timer() + result = spark.sql(query) + result.show() + end = timeit.default_timer() + print("Time: ", end - start) + except Exception as e: + print(e); + print("Failure!") diff --git a/spark/results/c6a.4xlarge.json b/spark/results/c6a.4xlarge.json new file mode 100644 index 000000000..0fd082a71 --- /dev/null +++ b/spark/results/c6a.4xlarge.json @@ -0,0 +1,58 @@ +{ + "system": "Spark", + "date": "2024-11-24", + "machine": "c6a.4xlarge, 500gb gp2", + "cluster_size": 1, + "comment": "", + + "tags": ["Java", "column-oriented", "embedded", "stateless"], + + "load_time": 0, + "data_size": 14737666736, + + "result": [ +[3.451568882999709,1.5099017709999316,1.4457615579995036], +[3.6331949529994745,1.5100484389995472,1.4306618689997777], +[3.8691103789997214,1.4875263670001004,1.4309303390000423], +[3.898077081000338,1.7829432369999267,1.4163864700003614], +[8.497431836999567,5.491745006999736,5.221632884999963], +[9.437205449999965,5.365197217999594,5.351961569000196], +[4.4380845669993505,1.529258092999953,1.453303806999429], +[4.237463717999162,1.667103736000172,1.6277630399999907], +[10.763717450000513,7.253228950999983,6.920655083999918], +[null,null,null], +[6.172259834000215,2.829899385999852,2.691536891999931], +[6.373072564999347,3.1935174989994266,2.7825274629994965], +[8.945597008000732,5.497667528999955,5.295739161999336], +[null,null,null], +[9.564496040999984,5.930335183000352,5.790437358000418], +[9.215286374000243,5.703157988000385,5.49577771299937], +[null,null,null], +[10.59676237900021,7.282936314999461,7.215708122999786], +[null,null,null], +[3.8846778059996723,2.0206755600001998,2.169723668000188], +[11.782155242000044,4.769403714999498,4.605500732999644], +[13.540546769000684,6.075493086999813,5.741942548000225], +[null,null,null], +[null,null,null], +[5.327587396999661,2.7010835809996934,2.6526427419994434], +[4.6284881000001405,2.3514357849999215,2.246716489999926], +[5.139321022000331,2.6851238569997804,2.615352394999718], +[12.611963741000181,6.204201994999494,6.084104303999993], +[28.934751748999588,null,null], +[7.1145374830002766,4.032988468999974,3.9520326800002294], +[9.205675344000156,5.727411702999234,5.090095773999565], +[10.557396175999202,6.476196016999893,6.509062041999641], +[null,null,null], +[null,null,null], +[null,null,null], +[9.532622613000058,5.988163566000367,5.835901048999403], +[4.609914306000064,2.430121431999396,1.941113203999521], +[4.321152522000375,1.8502409359998637,1.5165435509998133], +[4.113692834000176,1.7422720820004542,1.6487724720000188], +[5.191798449000089,3.2760622159994455,2.884947004999958], +[4.11264138200022,1.8089530379993448,1.5781850240000495], +[4.053875679999692,1.728480698000567,1.6408681860002616], +[4.013041234000411,1.6797796270002436,1.5398138239997934] +] +} diff --git a/spark/run.sh b/spark/run.sh new file mode 100755 index 000000000..64df8c608 --- /dev/null +++ b/spark/run.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +cat queries.sql | while read query; do + sync + echo 3 | sudo tee /proc/sys/vm/drop_caches >/dev/null + + ./query.py <<< "${query}" +done