From c15259f966ef654fba74a20ebfa60e2816ea9fcc Mon Sep 17 00:00:00 2001 From: mxmlnkn Date: Sat, 6 Jan 2024 19:28:40 +0100 Subject: [PATCH] [fix] Avoid segfault when using ParallelGzipReader into an externally started thread --- src/core/FasterVector.hpp | 18 ++++++++++ src/core/JoiningThread.hpp | 28 --------------- .../rapidgzip/testParallelGzipReader.cpp | 34 +++++++++++++++++++ src/tests/testPythonWrappers.py | 15 +++++++- 4 files changed, 66 insertions(+), 29 deletions(-) diff --git a/src/core/FasterVector.hpp b/src/core/FasterVector.hpp index 8c4847f4..e08eab40 100644 --- a/src/core/FasterVector.hpp +++ b/src/core/FasterVector.hpp @@ -30,6 +30,24 @@ class RpmallocInit inline static const RpmallocInit rpmallocInit{}; +class RpmallocThreadInit +{ +public: + RpmallocThreadInit() + { + rpmalloc_thread_initialize(); + } + + ~RpmallocThreadInit() + { + rpmalloc_thread_finalize( /* release caches */ true ); + } +}; + + +static const thread_local RpmallocThreadInit rpmallocThreadInit{}; + + template class RpmallocAllocator { diff --git a/src/core/JoiningThread.hpp b/src/core/JoiningThread.hpp index 0040ff6e..5107b4ba 100644 --- a/src/core/JoiningThread.hpp +++ b/src/core/JoiningThread.hpp @@ -3,27 +3,6 @@ #include #include -#ifdef WITH_RPMALLOC - #include -#endif - - -#ifdef WITH_RPMALLOC -class RpmallocThreadInit -{ -public: - RpmallocThreadInit() - { - rpmalloc_thread_initialize(); - } - - ~RpmallocThreadInit() - { - rpmalloc_thread_finalize( /* release caches */ true ); - } -}; -#endif - /** * Similar to the planned C++20 std::jthread, this class joins in the destructor. @@ -35,14 +14,7 @@ class JoiningThread template explicit JoiningThread( Function&& function, Args&&... args ) : -#ifdef WITH_RPMALLOC - m_thread( [=] () { - static const thread_local RpmallocThreadInit rpmallocThreadInit{}; - function( std::forward( args )... ); - } ) -#else m_thread( std::forward( function ), std::forward( args )... ) -#endif {} JoiningThread( JoiningThread&& ) = default; diff --git a/src/tests/rapidgzip/testParallelGzipReader.cpp b/src/tests/rapidgzip/testParallelGzipReader.cpp index c25127e8..38005502 100644 --- a/src/tests/rapidgzip/testParallelGzipReader.cpp +++ b/src/tests/rapidgzip/testParallelGzipReader.cpp @@ -745,6 +745,39 @@ testPrefetchingAfterSplit() } +void +testMultiThreadedUsage() +{ + static constexpr size_t DATA_SIZE = 64_Mi; + + /* As there are 4 symbols, 2 bits per symbol should suffice and as the data is random, almost no backreferences + * should be viable. This leads to a compression ratio of ~4, which is large enough for splitting and benign + * enough to have multiple chunks with fairly little uncompressed data. */ + const auto compressedRandomDNA = compressWithZlib( createRandomData( DATA_SIZE, DNA_SYMBOLS ), + CompressionStrategy::HUFFMAN_ONLY ); + + auto reader = std::make_unique >( + std::make_unique( compressedRandomDNA ), + /* parallelization */ 6 ); + reader->setCRC32Enabled( true ); + + std::vector result; + std::thread thread( [&result, gzipReader = std::move( reader )] () { + std::vector buffer( 1024ULL ); + while ( true ) { + const auto nBytesRead = gzipReader->read( buffer.data(), buffer.size() ); + if ( nBytesRead == 0 ) { + break; + } + result.insert( result.end(), buffer.begin(), buffer.begin() + nBytesRead ); + } + } ); + + thread.join(); + REQUIRE_EQUAL( result.size(), DATA_SIZE ); +} + + int main( int argc, char** argv ) @@ -765,6 +798,7 @@ main( int argc, findParentFolderContaining( binaryFolder, "src/tests/data/base64-256KiB.bgz" ) ) / "src" / "tests" / "data"; + testMultiThreadedUsage(); testCRC32AndCleanUnmarkedData(); testPrefetchingAfterSplit(); testCachedChunkReuseAfterSplit(); diff --git a/src/tests/testPythonWrappers.py b/src/tests/testPythonWrappers.py index 0289ce8e..c00d6f81 100644 --- a/src/tests/testPythonWrappers.py +++ b/src/tests/testPythonWrappers.py @@ -14,6 +14,7 @@ import subprocess import sys import tempfile +import threading import time if __name__ == '__main__' and __package__ is None: @@ -357,7 +358,7 @@ def testTriggerDeadlock(filePath): def testDeadlock(encoder): print("Create test file...") # We need at least something larger than the chunk size. - rawFile, compressedFile = createRandomCompressedFile(100 * 1024 * 1024, 6, 'pygzip') + rawFile, compressedFile = createRandomCompressedFile(100 * 1024 * 1024, 6, encoder) task = multiprocessing.Process(target=testTriggerDeadlock, args = (compressedFile.name,)) task.start() @@ -368,6 +369,17 @@ def testDeadlock(encoder): task.join() sys.exit(1) +def readAndPrintFirstBytes(file): + print(file.read(8)) + + +def testRpmallocThreadSafety(encoder): + rawFile, compressedFile = createRandomCompressedFile(1024 * 1024, 6, encoder) + with rapidgzip.open(compressedFile.name) as gzipReader: + thread = threading.Thread(target=readAndPrintFirstBytes, args=[gzipReader]) + thread.start() + thread.join() + if __name__ == '__main__': print("indexed_bzip2 version:", indexed_bzip2.__version__) @@ -375,6 +387,7 @@ def testDeadlock(encoder): print("Cores:", os.cpu_count()) testDeadlock('pygzip') + testRpmallocThreadSafety('pygzip') def test(openIndexedFileFromName, closeUnderlyingFile=None): testPythonInterface(