-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update Default Parquet Write Compression #7692
Conversation
I'm not sure about this, block compression is far from free, and many workloads particularly those using object storage would be willing to trade faster read performance for slightly larger objects. |
@tustvold Going from uncompressed to even zstd(1) is likely to bring a 50% reduction in file size (up to 80% is not uncommon depending on the data). I have some of my own DataFusion specific benchmarks, but Uber Engineering has a nice public analysis on this topic here. I added some benchmarks showing with/without compression here. Zstd(1)-Zstd(3) write speeds in the single threaded Another argument in favor of this change is that most other popular frameworks with parquet write support default to compression of either snappy (compatibility) or zstd (best performance), so users of DataFusion imo will not expect the default to be uncompressed. The Datafusion default is not all that important for systems/database developers, since those users almost certainly will tune the settings to their use case anyway. This is more important for efforts to gain adoption of direct users using Datafusion for analysis/ETL workloads, such as via the datafusion-cli or python bindings. |
Whats the performance hit like for read? A write speed reduction is not a huge deal to me, it is read performance that matters as this has will have a direct impact on query latency. FWIW LZ4_RAW is probably the "best" codec, but ecosystem support is limited... |
I ran a quick test using Query1, reading the uncompressed parquet file vs. zstd compressed. This is using local SSD based storage.
The performance is nearly identical. I averaged each over 50 runs for the above numbers and they are converging towards <1% performance difference. Variance run-to-run is ~5% so on a single run either one can be faster. Script: import time
from datafusion import SessionContext
t = time.time()
#uncompressed file, ~3.6Gb on disk
#file = "/home/dev/arrow-datafusion/benchmarks/data/tpch_sf10/lineitem/part-0.parquet"
#zstd compressed file, ~1.6Gb on disk
file = "/home/dev/arrow-datafusion/test_out/benchon.parquet"
# Create a DataFusion context
ctx = SessionContext()
# Register table with context
ctx.register_parquet('test', file)
times = []
for i in range(50):
t = time.time()
query = """
select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
test
where
l_shipdate <= date '1998-09-02'
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus
"""
# Execute SQL
df = ctx.sql(f"{query}")
df.show()
elapsed = time.time() - t
times.append(elapsed)
print(f"datafusion agg query {elapsed}s")
print(sum(times)/len(times)) |
Can you run a test that is actually bottlenecked on parquet, e.g. a predicated scan, as opposed to something with sorts and groups by in it that will dominate pretty much anything else other than joins. |
I was also a bit suspicious of how identical the performance was. I caught a mistake in my set up, both numbers above were for ZSTD not uncompressed. I corrected the mistake and ran two more tests below. Indeed, using local storage uncompressed reads are a good bit faster. I would be interested to compare this to remote object storage where bandwidth may be more of a bottleneck.. Removing the group by / order by:
New Query: select
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
test
where
l_shipdate <= date '1998-09-02' I also timed caching the entire parquet file into memory (select * from test), then df.cache(). I only did average over 5 runs this time.
|
It is more typically first-byte latency, not bandwidth that hurts with OS, so the size of the pages can end up being less relevant than you might expect... I dunno, I suspect there is no "correct" answer to this, which leads me to be tempted to just leave it as is, but don't feel strongly, so if other people do... |
I agree that there is no universally optimal choice and am also interested in more opinions. For the bandwidth concern, I'm thinking more about a user who installs the python bindings to their macbook pro and reads from S3. The bottleneck there may be their likely 1gbps network connection. Even in a server environment if you are running on a single node, bandwidth could be limiting. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think defaulting to basic and fast block level compression is much less surprising than NO compression for most users and therefore I think this is a good change.
While there may be corner cases where no compression is desired as @tustvold mentions, I think they are somewhat specialized
Maybe some other contributors have opinions on the matter (@Dandandan perhaps?)
I took the liberty of merging up from main to get #7701 and solve the failing CI check |
I agree. Parquet is designed for block compression. And people want to optimize for long term storage cost as well. Zstd is probably one of the better defaults for use cases @devinjdangelo describes (data pipelines). |
Thank you @devinjdangelo |
* update compression default * fix tests --------- Co-authored-by: Andrew Lamb <[email protected]>
Which issue does this PR close?
Closes #7691
Rationale for this change
See issue for discussion
What changes are included in this PR?
Set default parquet writer to use zstd level 3 compression.
Are these changes tested?
By existing tests
Are there any user-facing changes?
Much smaller parquet files for a minor write performance penalty.