Skip to content

Commit

Permalink
Add ZLib compression using System.IO.Compression (#238)
Browse files Browse the repository at this point in the history
* Add ZLib compression using System.IO.Compression

* Update benchmark with builtin zlib compression

* Add test for BuiltinZlibCompression

* Add Apache header
  • Loading branch information
aeons authored Nov 29, 2024
1 parent b95dc1e commit a39573d
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 1 deletion.
7 changes: 7 additions & 0 deletions benchmarks/Compression/Compress.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ public void IronSnappy()
_ = compressor.Compress(Data);
}

[Benchmark]
public void BuiltinZlib()
{
using var compressor = Factories.BuiltinZlibCompressionCompressorFactory.Create();
_ = compressor.Compress(Data);
}

[Benchmark]
public void DotNetZip()
{
Expand Down
10 changes: 10 additions & 0 deletions benchmarks/Compression/Decompress.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public void Setup()
K4aosCompressionLz4Data = compressor.Compress(data);
using (var compressor = Factories.IronSnappyCompressorFactory.Create())
IronSnappyData = compressor.Compress(data);
using (var compressor = Factories.BuiltinZlibCompressionCompressorFactory.Create())
BuiltinZlibData = compressor.Compress(data);
using (var compressor = Factories.DotNetZipCompressorFactory.Create())
DotNetZipData = compressor.Compress(data);
using (var compressor = Factories.ZstdNetCompressorFactory.Create())
Expand All @@ -48,6 +50,7 @@ public void Setup()
public int DecompressedSize { get; private set; }
public ReadOnlySequence<byte> K4aosCompressionLz4Data { get; private set; }
public ReadOnlySequence<byte> IronSnappyData { get; private set; }
public ReadOnlySequence<byte> BuiltinZlibData { get; private set; }
public ReadOnlySequence<byte> DotNetZipData { get; private set; }
public ReadOnlySequence<byte> ZstdNetData { get; private set; }
public ReadOnlySequence<byte> ZstdSharpData { get; private set; }
Expand All @@ -66,6 +69,13 @@ public void IronSnappy()
_ = decompressor.Decompress(IronSnappyData, DecompressedSize);
}

[Benchmark]
public void BuiltinZlib()
{
using var decompressor = Factories.BuiltinZlibCompressionDecompressorFactory.Create();
_ = decompressor.Decompress(BuiltinZlibData, DecompressedSize);
}

[Benchmark]
public void DotNetZip()
{
Expand Down
8 changes: 8 additions & 0 deletions benchmarks/Compression/Factories.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ static Factories()
IronSnappyCompressorFactory = compressor!;
IronSnappyDecompressorFactory = decompressor!;

if (!BuiltinZlibCompression.TryLoading(out compressor, out decompressor))
throw new Exception("Could not load BuiltinZlibCompression");
BuiltinZlibCompressionCompressorFactory = compressor!;
BuiltinZlibCompressionDecompressorFactory = decompressor!;

if (!ZlibCompression.TryLoading(out compressor, out decompressor))
throw new Exception("Could not load DotNetZip");
DotNetZipCompressorFactory = compressor!;
Expand All @@ -53,6 +58,9 @@ static Factories()
public static ICompressorFactory IronSnappyCompressorFactory { get; }
public static IDecompressorFactory IronSnappyDecompressorFactory { get; }

public static ICompressorFactory BuiltinZlibCompressionCompressorFactory { get; }
public static IDecompressorFactory BuiltinZlibCompressionDecompressorFactory { get; }

public static ICompressorFactory DotNetZipCompressorFactory { get; }
public static IDecompressorFactory DotNetZipDecompressorFactory { get; }

Expand Down
2 changes: 2 additions & 0 deletions benchmarks/Compression/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ static void OutputCompressionInfo(MessageSize size, MessageType type)
Console.WriteLine($"\tCompressed with IronSnappy: {compressor.Compress(data).Length}");
using (var compressor = Factories.DotNetZipCompressorFactory.Create())
Console.WriteLine($"\tCompressed with DotNetZip: {compressor.Compress(data).Length}");
using (var compressor = Factories.BuiltinZlibCompressionCompressorFactory.Create())
Console.WriteLine($"\tCompressed with System.IO.Compression: {compressor.Compress(data).Length}");
using (var compressor = Factories.ZstdNetCompressorFactory.Create())
Console.WriteLine($"\tCompressed with ZstdNet: {compressor.Compress(data).Length}");
using (var compressor = Factories.ZstdSharpCompressorFactory.Create())
Expand Down
77 changes: 77 additions & 0 deletions src/DotPulsar/Internal/Compression/BuiltinZlibCompression.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar.Internal.Compression;

using DotPulsar.Internal.Abstractions;
using System.Buffers;
using System.Reflection;

public static class BuiltinZlibCompression
{
public static bool TryLoading(out ICompressorFactory? compressorFactory, out IDecompressorFactory? decompressorFactory)
{
try
{
var assembly = Assembly.Load("System.IO.Compression");
var types = assembly.GetTypes();

// Find the ZLibStream
var zlibStreamType = types.FirstOrDefault(t => t.FullName == "System.IO.Compression.ZLibStream");

// Find the enum values for the ZLibStream constructors
var compressionLevelType = types.FirstOrDefault(t => t.FullName == "System.IO.Compression.CompressionLevel");
var compressionLevelOptimal = compressionLevelType?.GetEnumValues().GetValue(0);
var compressionModeType = types.FirstOrDefault(t => t.FullName == "System.IO.Compression.CompressionMode");
var compressionModeDecompress = compressionModeType?.GetEnumValues().GetValue(0);

compressorFactory = new CompressorFactory(PulsarApi.CompressionType.Zlib, () => new Compressor(CreateCompressor(zlibStreamType, compressionLevelOptimal)));
decompressorFactory = new DecompressorFactory(PulsarApi.CompressionType.Zlib, () => new Decompressor(CreateDecompressor(zlibStreamType, compressionModeDecompress)));

return CompressionTester.TestCompression(compressorFactory, decompressorFactory);
}
catch
{
// Ignore
}

compressorFactory = null;
decompressorFactory = null;

return false;
}

private static Func<ReadOnlySequence<byte>, ReadOnlySequence<byte>> CreateCompressor(Type? zlibStreamType, object? compressionLevelOptimal)
=> data =>
{
using var dataStream = new MemoryStream(data.ToArray());
using var compressedStream = new MemoryStream();
using var compressor = (Stream) Activator.CreateInstance(zlibStreamType!, new object[] { compressedStream, compressionLevelOptimal! })!;
dataStream.CopyTo(compressor);
compressor.Close();
return new ReadOnlySequence<byte>(compressedStream.ToArray());
};

private static Func<ReadOnlySequence<byte>, int, ReadOnlySequence<byte>> CreateDecompressor(Type? zlibStreamType, object? compressionModeDecompress)
=> (data, _) =>
{
using var dataStream = new MemoryStream(data.ToArray());
using var decompressedStream = new MemoryStream();
using var decompressor = (Stream) Activator.CreateInstance(zlibStreamType!, new object[] { dataStream, compressionModeDecompress! })!;
decompressor.CopyTo(decompressedStream);
decompressor.Close();
decompressedStream.Position = 0;
return new ReadOnlySequence<byte>(decompressedStream.ToArray());
};
}
4 changes: 3 additions & 1 deletion src/DotPulsar/Internal/Compression/CompressionFactories.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ private static void LoadSupportForSnappy()

private static void LoadSupportForZlib()
{
if (ZlibCompression.TryLoading(out var compressorFactory, out var decompressorFactory))
if (BuiltinZlibCompression.TryLoading(out var compressorFactory, out var decompressorFactory))
Add(compressorFactory, decompressorFactory);
else if (ZlibCompression.TryLoading(out compressorFactory, out decompressorFactory))
Add(compressorFactory, decompressorFactory);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar.Tests.Internal.Compression;

using DotPulsar.Internal.Compression;

[Trait("Category", "Unit")]
public class BuiltinZlibCompressionTests
{
[Fact]
public void Compression_GivenDataToCompressAndDecompress_ShouldReturnOriginalData()
{
// Arrange
var couldLoad = BuiltinZlibCompression.TryLoading(out var compressorFactory, out var decompressorFactory);
couldLoad.Should().BeTrue();
using var compressor = compressorFactory!.Create();
using var decompressor = decompressorFactory!.Create();

// Act
var compressionWorks = CompressionTester.TestCompression(compressorFactory, decompressorFactory);

// Assert
compressionWorks.Should().BeTrue();
}
}

0 comments on commit a39573d

Please sign in to comment.