Skip to content

Commit

Permalink
fic memory errors and optimize code
Browse files Browse the repository at this point in the history
  • Loading branch information
vtjnash committed Feb 2, 2024
1 parent ea41fe1 commit 67bc2f4
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 39 deletions.
68 changes: 38 additions & 30 deletions src/frame_compression.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ const BUF_SIZE = 16*1024
const LZ4_FOOTER_SIZE = 4

mutable struct LZ4FrameCompressor <: TranscodingStreams.Codec
ctx::Ref{Ptr{LZ4F_cctx}}
prefs::Ref{LZ4F_preferences_t}
header::Memory
ctx::Ptr{LZ4F_cctx}
prefs::Base.RefValue{LZ4F_preferences_t}
header::Vector{UInt8}
write_header::Bool
end

Expand All @@ -29,10 +29,10 @@ Creates an LZ4 compression codec.
"""
function LZ4FrameCompressor(; kwargs...)
x, y = splitkwargs(kwargs, (:compressionlevel, :autoflush))
ctx = Ref{Ptr{LZ4F_cctx}}(C_NULL)
ctx = Ptr{LZ4F_cctx}(C_NULL)
frame = LZ4F_frameInfo_t(; y...)
prefs = Ref(LZ4F_preferences_t(frame; x...))
return LZ4FrameCompressor(ctx, prefs, Memory(Vector{UInt8}(undef, LZ4F_HEADER_SIZE_MAX)), false)
return LZ4FrameCompressor(ctx, prefs, Vector{UInt8}(undef, LZ4F_HEADER_SIZE_MAX), false)
end

const LZ4FrameCompressorStream{S} = TranscodingStream{LZ4FrameCompressor,S} where S<:IO
Expand All @@ -53,7 +53,7 @@ end
Returns the expected size of the transcoded data.
"""
function TranscodingStreams.expectedsize(codec::LZ4FrameCompressor, input::Memory)::Int
LZ4F_compressBound(input.size, codec.prefs) + LZ4F_HEADER_SIZE_MAX + LZ4_FOOTER_SIZE
LZ4F_compressBound(length(input), codec.prefs) + LZ4F_HEADER_SIZE_MAX + LZ4_FOOTER_SIZE
end

"""
Expand All @@ -62,7 +62,7 @@ end
Returns the minimum output size of `process`.
"""
function TranscodingStreams.minoutsize(codec::LZ4FrameCompressor, input::Memory)::Int
LZ4F_compressBound(input.size, codec.prefs)
LZ4F_compressBound(length(input), codec.prefs)
end

"""
Expand All @@ -71,7 +71,9 @@ end
Initializes the LZ4F Compression Codec.
"""
function TranscodingStreams.initialize(codec::LZ4FrameCompressor)::Nothing
LZ4F_createCompressionContext(codec.ctx, LZ4F_getVersion())
cctxPtr = Ref{Ptr{LZ4F_cctx}}(C_NULL)
LZ4F_createCompressionContext(cctxPtr, LZ4F_getVersion())
codec.ctx = cctxPtr[]
nothing
end

Expand All @@ -81,7 +83,8 @@ end
Finalizes the LZ4F Compression Codec.
"""
function TranscodingStreams.finalize(codec::LZ4FrameCompressor)::Nothing
LZ4F_freeCompressionContext(codec.ctx[])
LZ4F_freeCompressionContext(codec.ctx)
codec.ctx = C_NULL
nothing
end

Expand All @@ -93,9 +96,10 @@ Creates the LZ4F header to be written to the output.
"""
function TranscodingStreams.startproc(codec::LZ4FrameCompressor, mode::Symbol, error::Error)::Symbol
try
header = Vector{UInt8}(undef, LZ4F_HEADER_SIZE_MAX)
headerSize = LZ4F_compressBegin(codec.ctx[], header, convert(Csize_t, LZ4F_HEADER_SIZE_MAX), codec.prefs)
codec.header = Memory(resize!(header, headerSize))
header = codec.header
resize!(header, LZ4F_HEADER_SIZE_MAX)
headerSize = LZ4F_compressBegin(codec.ctx, header, convert(Csize_t, sizeof(header)), codec.prefs)
resize!(header, headerSize)
codec.write_header = true
:ok
catch err
Expand All @@ -115,33 +119,35 @@ function TranscodingStreams.process(codec::LZ4FrameCompressor, input::Memory, ou
data_read = 0
data_written = 0
if codec.write_header
if output.size < codec.header.size
if length(output) < sizeof(codec.header)
error[] = ErrorException("Output buffer too small for header.")
return (data_read, data_written, :error)
end
unsafe_copyto!(output.ptr, codec.header.ptr, codec.header.size)
data_written = codec.header.size
data_written = sizeof(codec.header)
unsafe_copyto!(output.ptr, pointer(codec.header), data_written)
codec.write_header = false
end

data_read = length(input)
try
if input.size == 0
data_written += LZ4F_compressEnd(codec.ctx[], output.ptr + data_written, output.size - data_written, C_NULL)
if data_read == 0
data_written += LZ4F_compressEnd(codec.ctx, output.ptr + data_written, length(output) - data_written, C_NULL)
(data_read, data_written, :end)
else
data_written += LZ4F_compressUpdate(codec.ctx[], output.ptr + data_written, output.size - data_written, input.ptr, input.size, C_NULL)
(input.size, data_written, :ok)
data_written += LZ4F_compressUpdate(codec.ctx, output.ptr + data_written, length(output) - data_written, input.ptr, data_read, C_NULL)
(data_read, data_written, :ok)
end

catch err
error[] = err
data_read = 0
(data_read, data_written, :error)
end

end

struct LZ4FrameDecompressor <: TranscodingStreams.Codec
dctx::Ref{Ptr{LZ4F_dctx}}
mutable struct LZ4FrameDecompressor <: TranscodingStreams.Codec
dctx::Ptr{LZ4F_dctx}
end

"""
Expand All @@ -150,7 +156,7 @@ end
Creates an LZ4 decompression codec.
"""
function LZ4FrameDecompressor()
dctx = Ref{Ptr{LZ4F_dctx}}(C_NULL)
dctx = Ptr{LZ4F_dctx}(C_NULL)
return LZ4FrameDecompressor(dctx)
end

Expand All @@ -171,7 +177,9 @@ end
Initializes the LZ4F Decompression Codec.
"""
function TranscodingStreams.initialize(codec::LZ4FrameDecompressor)::Nothing
LZ4F_createDecompressionContext(codec.dctx, LZ4F_getVersion())
dctxPtr = Ref{Ptr{LZ4F_dctx}}(C_NULL)
LZ4F_createDecompressionContext(dctxPtr, LZ4F_getVersion())
codec.dctx = dctxPtr[]
nothing
end

Expand All @@ -181,7 +189,8 @@ end
Finalizes the LZ4F Decompression Codec.
"""
function TranscodingStreams.finalize(codec::LZ4FrameDecompressor)::Nothing
LZ4F_freeDecompressionContext(codec.dctx[])
LZ4F_freeDecompressionContext(codec.dctx)
codec.dctx = C_NULL
nothing
end

Expand All @@ -196,18 +205,17 @@ function TranscodingStreams.process(codec::LZ4FrameDecompressor, input::Memory,
data_written = 0

try
if input.size == 0
if length(input) == 0
(data_read, data_written, :end)
else
src_size = Ref{Csize_t}(input.size)
dst_size = Ref{Csize_t}(output.size)
LZ4F_decompress(codec.dctx[], output.ptr, dst_size, input.ptr, src_size, C_NULL)
src_size = Ref{Csize_t}(length(input))
dst_size = Ref{Csize_t}(length(output))
LZ4F_decompress(codec.dctx, output.ptr, dst_size, input.ptr, src_size, C_NULL)
(src_size[], dst_size[], :ok)
end

catch err
if isa(err, LZ4Exception) && err.msg == "ERROR_frameType_unknown"
codec.dctx[] = C_NULL
codec.dctx = C_NULL
end
error[] = err
(data_read, data_written, :error)
Expand Down
6 changes: 3 additions & 3 deletions src/hc_compression.jl
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ end
Returns the minimum output size of `process`.
"""
function TranscodingStreams.minoutsize(codec::LZ4HCCompressor, input::Memory)::Int
LZ4_compressBound(input.size) + CINT_SIZE
LZ4_compressBound(length(input)) + CINT_SIZE
end

"""
Expand Down Expand Up @@ -146,9 +146,9 @@ function TranscodingStreams.process(
error::Error
)::Tuple{Int,Int,Symbol}

input.size == 0 && return (0, 0, :end)
length(input) == 0 && return (0, 0, :end)
try
data_size = min(input.size, codec.block_size)
data_size = min(length(input), codec.block_size)

in_buffer = copy_data!(codec.buffer, input, data_size)
out_buffer = Vector{UInt8}(undef, LZ4_compressBound(data_size))
Expand Down
12 changes: 6 additions & 6 deletions src/lz4_compression.jl
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ end
Returns the minimum output size of `process`.
"""
function TranscodingStreams.minoutsize(codec::LZ4FastCompressor, input::Memory)::Int
LZ4_compressBound(input.size) + CINT_SIZE
LZ4_compressBound(length(input)) + CINT_SIZE
end

"""
Expand Down Expand Up @@ -141,11 +141,11 @@ function TranscodingStreams.process(
error::Error
)::Tuple{Int,Int,Symbol}

input.size == 0 && return (0, 0, :end)
length(input) == 0 && return (0, 0, :end)
try
in_buffer = get_buffer!(codec.buffer)

data_size = min(input.size, codec.block_size)
data_size = min(length(input), codec.block_size)
out_buffer = Vector{UInt8}(undef, LZ4_compressBound(data_size))
GC.@preserve in_buffer unsafe_copyto!(pointer(in_buffer), input.ptr, data_size)

Expand Down Expand Up @@ -206,7 +206,7 @@ end
Returns the expected size of the transcoded data.
"""
function TranscodingStreams.expectedsize(codec::LZ4SafeDecompressor, input::Memory)::Int
max(input.size * 2, codec.block_size)
max(length(input) * 2, codec.block_size)
end

"""
Expand All @@ -215,7 +215,7 @@ end
Returns the minimum output size of `process`.
"""
function TranscodingStreams.minoutsize(codec::LZ4SafeDecompressor, input::Memory)::Int
max(input.size * 2, codec.block_size)
max(length(input) * 2, codec.block_size)
end

"""
Expand Down Expand Up @@ -274,7 +274,7 @@ function TranscodingStreams.process(
error::Error
)::Tuple{Int,Int,Symbol}

input.size == 0 && return (0, 0, :end)
length(input) == 0 && return (0, 0, :end)
try
out_buffer = get_buffer!(codec.buffer)
data_size = readint(input)
Expand Down

0 comments on commit 67bc2f4

Please sign in to comment.