Skip to content

Commit

Permalink
add parquet source
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Nov 28, 2024
1 parent c461342 commit 5b51092
Showing 1 changed file with 65 additions and 55 deletions.
120 changes: 65 additions & 55 deletions polars/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,7 @@
from datetime import date
import json

start = timeit.default_timer()
df = pl.scan_parquet("hits.parquet").collect()
stop = timeit.default_timer()
load_time = stop - start


# fix some types
df = df.with_columns(
(pl.col("EventTime") * int(1e6)).cast(pl.Datetime(time_unit="us")),
pl.col("EventDate").cast(pl.Date),
)
assert df["EventTime"][0].year == 2013
df = df.rechunk()

lf = df.lazy()

# 0: No., 1: SQL, 3: Polars
# 0: No., 1: SQL, 2: Polars
queries = [
("Q0", "SELECT COUNT(*) FROM hits;", lambda x: x.select(pl.len()).collect().height),
(
Expand Down Expand Up @@ -452,55 +436,81 @@
(
"Q42",
"SELECT DATE_TRUNC('minute', EventTime) AS M, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-14' AND EventDate <= '2013-07-15' AND IsRefresh = 0 AND DontCountHits = 0 GROUP BY DATE_TRUNC('minute', EventTime) ORDER BY DATE_TRUNC('minute', EventTime) LIMIT 10 OFFSET 1000;",
lambda x: None,
lambda x: x.filter(
(pl.col("CounterID") == 62)
& (pl.col("EventDate") >= date(2013, 7, 14))
& (pl.col("EventDate") <= date(2013, 7, 15))
& (pl.col("IsRefresh") == 0)
& (pl.col("DontCountHits") == 0)
)
.group_by(pl.col("EventTime").dt.truncate("minute"))
.group_by(pl.col("EventTime").dt.truncate("1m"))
.agg(pl.len().alias("PageViews"))
.slice(1000, 10).collect(),
),
]

queries_times = []
for q in queries:
print(q[0])
times = []
for _ in range(3):
start = timeit.default_timer()
result = q[2](lf)
end = timeit.default_timer()
if result is None:
times.append(None)
else:
times.append(end - start)
queries_times.append(times)

result_json = {
"system": "Polars (DataFrame)",
"date": date.today().strftime("%Y-%m-%d"),
"machine": "c6a.metal, 500gb gp2",
"cluster_size": 1,
"comment": "",
"tags": [
"column-oriented",
"dataframe",
],
"load_time": float(load_time),
"data_size": int(lf.collect().estimated_size()),
"result": queries_times,
}
def run_timings(lf: pl.LazyFrame, name: str, src: str, load_time: int | None) -> None:
queries_times = []
for q in queries:
print(q[0])
times = []
for _ in range(3):
start = timeit.default_timer()
result = q[2](lf)
end = timeit.default_timer()
if result is None:
times.append(None)
else:
times.append(end - start)
queries_times.append(times)

result_json = {
"system": name,
"date": date.today().strftime("%Y-%m-%d"),
"machine": "c6a.metal, 500gb gp2",
"cluster_size": 1,
"comment": "",
"tags": [
"column-oriented",
src,
],
"load_time": float(load_time) if load_time is not None else None,
"result": queries_times,
}
# if cpuinfo contains "AMD EPYC 9654" update machine and write result into results/epyc-9654.json
if "AMD EPYC 9654" in open("/proc/cpuinfo").read():
result_json["machine"] = "EPYC 9654, 384G"
with open(f"results/{src}_epyc-9654.json", "w") as f:
f.write(json.dumps(result_json, indent=4))
else:
# write result into results/c6a.metal.json
with open(f"results/{src}_c6a.metal.json", "w") as f:
f.write(json.dumps(result_json, indent=4))

# if cpuinfo contains "AMD EPYC 9654" update machine and write result into results/epyc-9654.json
if "AMD EPYC 9654" in open("/proc/cpuinfo").read():
result_json["machine"] = "EPYC 9654, 384G"
with open("results/epyc-9654.json", "w") as f:
f.write(json.dumps(result_json, indent=4))
else:
# write result into results/c6a.metal.json
with open("results/c6a.metal.json", "w") as f:
f.write(json.dumps(result_json, indent=4))

# Run from Parquet
lf = pl.scan_parquet("hits.parquet").with_columns(
(pl.col("EventTime") * int(1e6)).cast(pl.Datetime(time_unit="us")),
pl.col("EventDate").cast(pl.Date),
)
print("run parquet queries")
run_timings(lf, "Polars (Parquet)", "parquet", None)


print("run DataFrame (in-memory) queries, this loads all data in memory!")
start = timeit.default_timer()
df = pl.scan_parquet("hits.parquet").collect()
stop = timeit.default_timer()
load_time = stop - start

# fix some types
df = df.with_columns(
(pl.col("EventTime") * int(1e6)).cast(pl.Datetime(time_unit="us")),
pl.col("EventDate").cast(pl.Date),
)
assert df["EventTime"][0].year == 2013
df = df.rechunk()

lf = df.lazy()
run_timings(lf, "Polars (DataFrame)", "DataFrame", load_time)

0 comments on commit 5b51092

Please sign in to comment.