Skip to content

Commit

Permalink
[fix] Avoid segfault when using ParallelGzipReader into an externally…
Browse files Browse the repository at this point in the history
… started thread
  • Loading branch information
mxmlnkn committed Jan 6, 2024
1 parent 59b2023 commit c15259f
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 29 deletions.
18 changes: 18 additions & 0 deletions src/core/FasterVector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,24 @@ class RpmallocInit
inline static const RpmallocInit rpmallocInit{};


class RpmallocThreadInit
{
public:
RpmallocThreadInit()
{
rpmalloc_thread_initialize();
}

~RpmallocThreadInit()
{
rpmalloc_thread_finalize( /* release caches */ true );
}
};


static const thread_local RpmallocThreadInit rpmallocThreadInit{};


template<typename ElementType>
class RpmallocAllocator
{
Expand Down
28 changes: 0 additions & 28 deletions src/core/JoiningThread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,6 @@
#include <thread>
#include <utility>

#ifdef WITH_RPMALLOC
#include <rpmalloc.h>
#endif


#ifdef WITH_RPMALLOC
class RpmallocThreadInit
{
public:
RpmallocThreadInit()
{
rpmalloc_thread_initialize();
}

~RpmallocThreadInit()
{
rpmalloc_thread_finalize( /* release caches */ true );
}
};
#endif


/**
* Similar to the planned C++20 std::jthread, this class joins in the destructor.
Expand All @@ -35,14 +14,7 @@ class JoiningThread
template<class Function, class... Args>
explicit
JoiningThread( Function&& function, Args&&... args ) :
#ifdef WITH_RPMALLOC
m_thread( [=] () {
static const thread_local RpmallocThreadInit rpmallocThreadInit{};
function( std::forward<Args>( args )... );
} )
#else
m_thread( std::forward<Function>( function ), std::forward<Args>( args )... )
#endif
{}

JoiningThread( JoiningThread&& ) = default;
Expand Down
34 changes: 34 additions & 0 deletions src/tests/rapidgzip/testParallelGzipReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,39 @@ testPrefetchingAfterSplit()
}


void
testMultiThreadedUsage()
{
static constexpr size_t DATA_SIZE = 64_Mi;

/* As there are 4 symbols, 2 bits per symbol should suffice and as the data is random, almost no backreferences
* should be viable. This leads to a compression ratio of ~4, which is large enough for splitting and benign
* enough to have multiple chunks with fairly little uncompressed data. */
const auto compressedRandomDNA = compressWithZlib( createRandomData( DATA_SIZE, DNA_SYMBOLS ),
CompressionStrategy::HUFFMAN_ONLY );

auto reader = std::make_unique<rapidgzip::ParallelGzipReader<rapidgzip::ChunkData, /* ENABLE_STATISTICS */ true> >(
std::make_unique<BufferViewFileReader>( compressedRandomDNA ),
/* parallelization */ 6 );
reader->setCRC32Enabled( true );

std::vector<char> result;
std::thread thread( [&result, gzipReader = std::move( reader )] () {
std::vector<char> buffer( 1024ULL );
while ( true ) {
const auto nBytesRead = gzipReader->read( buffer.data(), buffer.size() );
if ( nBytesRead == 0 ) {
break;
}
result.insert( result.end(), buffer.begin(), buffer.begin() + nBytesRead );
}
} );

thread.join();
REQUIRE_EQUAL( result.size(), DATA_SIZE );
}


int
main( int argc,
char** argv )
Expand All @@ -765,6 +798,7 @@ main( int argc,
findParentFolderContaining( binaryFolder, "src/tests/data/base64-256KiB.bgz" )
) / "src" / "tests" / "data";

testMultiThreadedUsage();
testCRC32AndCleanUnmarkedData();
testPrefetchingAfterSplit();
testCachedChunkReuseAfterSplit();
Expand Down
15 changes: 14 additions & 1 deletion src/tests/testPythonWrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import subprocess
import sys
import tempfile
import threading
import time

if __name__ == '__main__' and __package__ is None:
Expand Down Expand Up @@ -357,7 +358,7 @@ def testTriggerDeadlock(filePath):
def testDeadlock(encoder):
print("Create test file...")
# We need at least something larger than the chunk size.
rawFile, compressedFile = createRandomCompressedFile(100 * 1024 * 1024, 6, 'pygzip')
rawFile, compressedFile = createRandomCompressedFile(100 * 1024 * 1024, 6, encoder)

task = multiprocessing.Process(target=testTriggerDeadlock, args = (compressedFile.name,))
task.start()
Expand All @@ -368,13 +369,25 @@ def testDeadlock(encoder):
task.join()
sys.exit(1)

def readAndPrintFirstBytes(file):
print(file.read(8))


def testRpmallocThreadSafety(encoder):
rawFile, compressedFile = createRandomCompressedFile(1024 * 1024, 6, encoder)
with rapidgzip.open(compressedFile.name) as gzipReader:
thread = threading.Thread(target=readAndPrintFirstBytes, args=[gzipReader])
thread.start()
thread.join()


if __name__ == '__main__':
print("indexed_bzip2 version:", indexed_bzip2.__version__)
print("rapidgzip version:", rapidgzip.__version__)
print("Cores:", os.cpu_count())

testDeadlock('pygzip')
testRpmallocThreadSafety('pygzip')

def test(openIndexedFileFromName, closeUnderlyingFile=None):
testPythonInterface(
Expand Down

0 comments on commit c15259f

Please sign in to comment.