From 5b510928e89d1944806e973c4874b28c84b28cad Mon Sep 17 00:00:00 2001 From: ritchie Date: Thu, 28 Nov 2024 15:26:46 +0100 Subject: [PATCH] add parquet source --- polars/query.py | 120 ++++++++++++++++++++++++++---------------------- 1 file changed, 65 insertions(+), 55 deletions(-) diff --git a/polars/query.py b/polars/query.py index 82a97aa01..347c6e662 100755 --- a/polars/query.py +++ b/polars/query.py @@ -5,23 +5,7 @@ from datetime import date import json -start = timeit.default_timer() -df = pl.scan_parquet("hits.parquet").collect() -stop = timeit.default_timer() -load_time = stop - start - - -# fix some types -df = df.with_columns( - (pl.col("EventTime") * int(1e6)).cast(pl.Datetime(time_unit="us")), - pl.col("EventDate").cast(pl.Date), -) -assert df["EventTime"][0].year == 2013 -df = df.rechunk() - -lf = df.lazy() - -# 0: No., 1: SQL, 3: Polars +# 0: No., 1: SQL, 2: Polars queries = [ ("Q0", "SELECT COUNT(*) FROM hits;", lambda x: x.select(pl.len()).collect().height), ( @@ -452,7 +436,6 @@ ( "Q42", "SELECT DATE_TRUNC('minute', EventTime) AS M, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-14' AND EventDate <= '2013-07-15' AND IsRefresh = 0 AND DontCountHits = 0 GROUP BY DATE_TRUNC('minute', EventTime) ORDER BY DATE_TRUNC('minute', EventTime) LIMIT 10 OFFSET 1000;", - lambda x: None, lambda x: x.filter( (pl.col("CounterID") == 62) & (pl.col("EventDate") >= date(2013, 7, 14)) @@ -460,47 +443,74 @@ & (pl.col("IsRefresh") == 0) & (pl.col("DontCountHits") == 0) ) - .group_by(pl.col("EventTime").dt.truncate("minute")) + .group_by(pl.col("EventTime").dt.truncate("1m")) .agg(pl.len().alias("PageViews")) .slice(1000, 10).collect(), ), ] -queries_times = [] -for q in queries: - print(q[0]) - times = [] - for _ in range(3): - start = timeit.default_timer() - result = q[2](lf) - end = timeit.default_timer() - if result is None: - times.append(None) - else: - times.append(end - start) - queries_times.append(times) -result_json = { - "system": "Polars (DataFrame)", - "date": date.today().strftime("%Y-%m-%d"), - "machine": "c6a.metal, 500gb gp2", - "cluster_size": 1, - "comment": "", - "tags": [ - "column-oriented", - "dataframe", - ], - "load_time": float(load_time), - "data_size": int(lf.collect().estimated_size()), - "result": queries_times, -} +def run_timings(lf: pl.LazyFrame, name: str, src: str, load_time: int | None) -> None: + queries_times = [] + for q in queries: + print(q[0]) + times = [] + for _ in range(3): + start = timeit.default_timer() + result = q[2](lf) + end = timeit.default_timer() + if result is None: + times.append(None) + else: + times.append(end - start) + queries_times.append(times) + + result_json = { + "system": name, + "date": date.today().strftime("%Y-%m-%d"), + "machine": "c6a.metal, 500gb gp2", + "cluster_size": 1, + "comment": "", + "tags": [ + "column-oriented", + src, + ], + "load_time": float(load_time) if load_time is not None else None, + "result": queries_times, + } + # if cpuinfo contains "AMD EPYC 9654" update machine and write result into results/epyc-9654.json + if "AMD EPYC 9654" in open("/proc/cpuinfo").read(): + result_json["machine"] = "EPYC 9654, 384G" + with open(f"results/{src}_epyc-9654.json", "w") as f: + f.write(json.dumps(result_json, indent=4)) + else: + # write result into results/c6a.metal.json + with open(f"results/{src}_c6a.metal.json", "w") as f: + f.write(json.dumps(result_json, indent=4)) -# if cpuinfo contains "AMD EPYC 9654" update machine and write result into results/epyc-9654.json -if "AMD EPYC 9654" in open("/proc/cpuinfo").read(): - result_json["machine"] = "EPYC 9654, 384G" - with open("results/epyc-9654.json", "w") as f: - f.write(json.dumps(result_json, indent=4)) -else: - # write result into results/c6a.metal.json - with open("results/c6a.metal.json", "w") as f: - f.write(json.dumps(result_json, indent=4)) + +# Run from Parquet +lf = pl.scan_parquet("hits.parquet").with_columns( + (pl.col("EventTime") * int(1e6)).cast(pl.Datetime(time_unit="us")), + pl.col("EventDate").cast(pl.Date), +) +print("run parquet queries") +run_timings(lf, "Polars (Parquet)", "parquet", None) + + +print("run DataFrame (in-memory) queries, this loads all data in memory!") +start = timeit.default_timer() +df = pl.scan_parquet("hits.parquet").collect() +stop = timeit.default_timer() +load_time = stop - start + +# fix some types +df = df.with_columns( + (pl.col("EventTime") * int(1e6)).cast(pl.Datetime(time_unit="us")), + pl.col("EventDate").cast(pl.Date), +) +assert df["EventTime"][0].year == 2013 +df = df.rechunk() + +lf = df.lazy() +run_timings(lf, "Polars (DataFrame)", "DataFrame", load_time)