From ae8785bcfaf92460722b971772dc5ef3d5b48cab Mon Sep 17 00:00:00 2001 From: mxmlnkn Date: Sun, 21 Jan 2024 20:09:08 +0100 Subject: [PATCH] [refactor] WindowMap: Use shared_ptr for windows --- src/rapidgzip/GzipChunkFetcher.hpp | 37 ++++++------ src/rapidgzip/IndexFileFormat.hpp | 4 +- src/rapidgzip/ParallelGzipReader.hpp | 22 ++----- src/rapidgzip/WindowMap.hpp | 59 ++++++++++++++----- src/tests/rapidgzip/testGzipChunkFetcher.cpp | 6 +- src/tests/rapidgzip/testGzipIndexFormat.cpp | 34 +++++++++++ .../rapidgzip/testParallelGzipReader.cpp | 4 +- 7 files changed, 110 insertions(+), 56 deletions(-) diff --git a/src/rapidgzip/GzipChunkFetcher.hpp b/src/rapidgzip/GzipChunkFetcher.hpp index 5fa5dc07..b90e902c 100644 --- a/src/rapidgzip/GzipChunkFetcher.hpp +++ b/src/rapidgzip/GzipChunkFetcher.hpp @@ -66,6 +66,7 @@ class GzipChunkFetcher : using ChunkData = T_ChunkData; using BaseType = BlockFetcher; using BitReader = rapidgzip::BitReader; + using SharedWindow = WindowMap::SharedWindow; using WindowView = VectorView; using BlockFinder = typename BaseType::BlockFinder; @@ -194,7 +195,7 @@ class GzipChunkFetcher : << formatBits( chunkData->encodedOffsetInBits ) << ", " << formatBits( chunkData->encodedOffsetInBits + chunkData->encodedSizeInBits ) << "].\n" << " Window size for the chunk offset: " - << ( lastWindow.has_value() ? std::to_string( lastWindow->size() ) : "no window" ) << "."; + << ( lastWindow ? std::to_string( lastWindow->size() ) : "no window" ) << "."; throw std::logic_error( std::move( message ).str() ); } return std::make_pair( blockInfo.decodedOffsetInBytes, std::move( chunkData ) ); @@ -296,18 +297,19 @@ class GzipChunkFetcher : /* Because this is a new block, it might contain markers that we have to replace with the window * of the last block. The very first block should not contain any markers, ensuring that we * can successively propagate the window through all blocks. */ - auto lastWindow = m_windowMap->get( chunkData->encodedOffsetInBits ); - if ( !lastWindow ) { + auto sharedLastWindow = m_windowMap->get( chunkData->encodedOffsetInBits ); + if ( !sharedLastWindow ) { std::stringstream message; message << "The window of the last block at " << formatBits( chunkData->encodedOffsetInBits ) << " should exist at this point!"; throw std::logic_error( std::move( message ).str() ); } + const auto& lastWindow = *sharedLastWindow; if constexpr ( REPLACE_MARKERS_IN_PARALLEL ) { - waitForReplacedMarkers( chunkData, *lastWindow ); + waitForReplacedMarkers( chunkData, lastWindow ); } else { - replaceMarkers( chunkData, *lastWindow ); + replaceMarkers( chunkData, lastWindow ); } size_t decodedOffsetInBlock{ 0 }; @@ -316,7 +318,7 @@ class GzipChunkFetcher : const auto windowOffset = subblock.encodedOffset + subblock.encodedSize; /* Avoid recalculating what we already emplaced in waitForReplacedMarkers when calling getLastWindow. */ if ( !m_windowMap->get( windowOffset ) ) { - m_windowMap->emplace( windowOffset, chunkData->getWindowAt( *lastWindow, decodedOffsetInBlock ) ); + m_windowMap->emplace( windowOffset, chunkData->getWindowAt( lastWindow, decodedOffsetInBlock ) ); } } } @@ -417,20 +419,21 @@ class GzipChunkFetcher : } /* Check for previous window. */ - const auto previousWindow = m_windowMap->get( chunkData->encodedOffsetInBits ); - if ( !previousWindow ) { + const auto sharedPreviousWindow = m_windowMap->get( chunkData->encodedOffsetInBits ); + if ( !sharedPreviousWindow ) { continue; } + const auto& previousWindow = *sharedPreviousWindow; const auto windowOffset = chunkData->encodedOffsetInBits + chunkData->encodedSizeInBits; if ( !m_windowMap->get( windowOffset ) ) { - m_windowMap->emplace( windowOffset, chunkData->getLastWindow( *previousWindow ) ); + m_windowMap->emplace( windowOffset, chunkData->getLastWindow( previousWindow ) ); } m_markersBeingReplaced.emplace( chunkData->encodedOffsetInBits, this->submitTaskWithHighPriority( - [this, chunkData, previousWindow] () { replaceMarkers( chunkData, *previousWindow ); } + [this, chunkData, sharedPreviousWindow] () { replaceMarkers( chunkData, *sharedPreviousWindow ); } ) ); } @@ -615,7 +618,7 @@ class GzipChunkFetcher : * indexes were created by us and follow that scheme. */ ( m_isBgzfFile && !m_blockFinder->finalized() - ? std::make_optional( WindowView{} ) + ? std::make_shared() : m_windowMap->get( blockOffset ) ), /* decodedSize */ blockInfo ? blockInfo->decodedSizeInBytes : std::optional{}, m_cancelThreads, @@ -637,7 +640,7 @@ class GzipChunkFetcher : decodeBlock( BitReader const& originalBitReader, size_t const blockOffset, size_t const untilOffset, - std::optional const initialWindow, + SharedWindow const initialWindow, std::optional const decodedSize, std::atomic const& cancelThreads, FileType const fileType = FileType::GZIP, @@ -653,6 +656,7 @@ class GzipChunkFetcher : if ( initialWindow && untilOffsetIsExact ) { const auto fileSize = originalBitReader.size(); + const auto& window = *initialWindow; ChunkData result; result.setCRC32Enabled( crc32Enabled ); @@ -661,7 +665,7 @@ class GzipChunkFetcher : result = decodeBlockWithInflateWrapper( originalBitReader, fileSize ? std::min( untilOffset, *fileSize ) : untilOffset, - *initialWindow, + window, decodedSize, std::move( result ) ); @@ -675,9 +679,7 @@ class GzipChunkFetcher : << " Decoded size : " << result.decodedSizeInBytes << " B\n" << " Expected decoded size : " << *decodedSize << " B\n" << " Until offset is exact : " << untilOffsetIsExact << "\n" - << " Initial Window : " << ( initialWindow - ? std::to_string( initialWindow->size() ) - : std::string( "None" ) ) << "\n"; + << " Initial Window : " << std::to_string( window.size() ) << "\n"; throw std::runtime_error( std::move( message ).str() ); } @@ -687,11 +689,12 @@ class GzipChunkFetcher : BitReader bitReader( originalBitReader ); if ( initialWindow ) { bitReader.seek( blockOffset ); + const auto& window = *initialWindow; ChunkData result; result.setCRC32Enabled( crc32Enabled ); result.fileType = fileType; - return decodeBlockWithRapidgzip( &bitReader, untilOffset, initialWindow, maxDecompressedChunkSize, + return decodeBlockWithRapidgzip( &bitReader, untilOffset, window, maxDecompressedChunkSize, std::move( result ) ); } diff --git a/src/rapidgzip/IndexFileFormat.hpp b/src/rapidgzip/IndexFileFormat.hpp index 78ea92ae..92ebf077 100644 --- a/src/rapidgzip/IndexFileFormat.hpp +++ b/src/rapidgzip/IndexFileFormat.hpp @@ -483,10 +483,10 @@ writeGzipIndex( const GzipIndex& in if ( !std::all_of( checkpoints.begin(), checkpoints.end(), [&index, windowSizeInBytes] ( const auto& checkpoint ) { const auto window = index.windows->get( checkpoint.compressedOffsetInBits ); - return window.has_value() && ( window->empty() || ( window->size() >= windowSizeInBytes ) ); + return window && ( window->empty() || ( window->size() >= windowSizeInBytes ) ); } ) ) { - throw std::invalid_argument( "All window sizes must be at least 32 KiB!" ); + throw std::invalid_argument( "All window sizes must be at least 32 KiB or empty!" ); } checkedWrite( "GZIDX", 5 ); diff --git a/src/rapidgzip/ParallelGzipReader.hpp b/src/rapidgzip/ParallelGzipReader.hpp index 09174e5a..639d2837 100644 --- a/src/rapidgzip/ParallelGzipReader.hpp +++ b/src/rapidgzip/ParallelGzipReader.hpp @@ -723,20 +723,7 @@ class ParallelGzipReader final : setBlockOffsets( const GzipIndex& index ) { const auto result = index.windows->data(); - setBlockOffsets( index, [&windows = result.second] ( size_t offset ) -> Window { - return windows->at( offset ); - } ); - } - - void - setBlockOffsets( GzipIndex&& index ) - { - const auto result = index.windows->data(); - setBlockOffsets( std::move( index ), [&windows = result.second] ( size_t offset ) -> Window { - Window window; - std::swap( windows->at( offset ), window ); - return window; - } ); + setBlockOffsets( index, [&windows = result.second] ( size_t offset ) { return windows->at( offset ); } ); } /** @@ -745,8 +732,8 @@ class ParallelGzipReader final : * This makes it possible to destructively return the Window to avoid a copy! */ void - setBlockOffsets( const GzipIndex& index, - const std::function& getWindow ) + setBlockOffsets( const GzipIndex& index, + const std::function& getWindow ) { if ( index.checkpoints.empty() || !index.windows || !getWindow ) { return; @@ -795,7 +782,8 @@ class ParallelGzipReader final : * For some reason, indexed_gzip also stores windows for the very last checkpoint at the end of the file, * which is useless because there is nothing thereafter. But, don't filter it here so that exportIndex * mirrors importIndex better. */ - m_windowMap->emplace( checkpoint.compressedOffsetInBits, getWindow( checkpoint.compressedOffsetInBits ) ); + m_windowMap->emplaceShared( checkpoint.compressedOffsetInBits, + getWindow( checkpoint.compressedOffsetInBits ) ); } /* Input file-end offset if not included in checkpoints. */ diff --git a/src/rapidgzip/WindowMap.hpp b/src/rapidgzip/WindowMap.hpp index 354d8bc4..6a083f83 100644 --- a/src/rapidgzip/WindowMap.hpp +++ b/src/rapidgzip/WindowMap.hpp @@ -2,22 +2,24 @@ #include #include +#include #include -#include #include #include #include #include +#include #include class WindowMap { public: - using Window = rapidgzip::deflate::DecodedVector; + using Window = FasterVector; using WindowView = VectorView; - using Windows = std::map; + using SharedWindow = std::shared_ptr; + using Windows = std::map; public: WindowMap() = default; @@ -30,27 +32,39 @@ class WindowMap } void - emplace( size_t encodedBlockOffset, - Window window ) + emplace( const size_t encodedBlockOffset, + WindowView window ) { + emplaceShared( encodedBlockOffset, std::make_shared( window.begin(), window.end() ) ); + } + + void + emplaceShared( const size_t encodedBlockOffset, + SharedWindow sharedWindow ) + { + if ( !sharedWindow ) { + return; + } + std::scoped_lock lock( m_mutex ); + if ( m_windows.empty() ) { - m_windows.emplace( encodedBlockOffset, std::move( window ) ); + m_windows.emplace( encodedBlockOffset, std::move( sharedWindow ) ); } else if ( m_windows.rbegin()->first < encodedBlockOffset ) { /* Last value is smaller, so it is given that there is no collision and we can "append" * the new value with a hint in constant time. This should be the common case as windows * should be inserted in order of the offset! */ - m_windows.emplace_hint( m_windows.end(), encodedBlockOffset, std::move( window ) ); + m_windows.emplace_hint( m_windows.end(), encodedBlockOffset, std::move( sharedWindow ) ); } else { const auto match = m_windows.find( encodedBlockOffset ); - if ( ( match != m_windows.end() ) && ( match->second != window ) ) { - throw std::invalid_argument( "Window data to insert is inconsistent and may not be changed!" ); + if ( ( match != m_windows.end() ) && ( *match->second != *sharedWindow ) ) { + throw std::invalid_argument( "Window offset to insert already exists and may not be changed!" ); } - m_windows.emplace( encodedBlockOffset, std::move( window ) ); + m_windows.emplace( encodedBlockOffset, std::move( sharedWindow ) ); } } - [[nodiscard]] std::optional + [[nodiscard]] SharedWindow get( size_t encodedOffsetInBits ) const { /* Note that insertions might invalidate iterators but not references to values and especially not the @@ -58,9 +72,9 @@ class WindowMap * a WindowView without a corresponding lock. */ std::scoped_lock lock( m_mutex ); if ( const auto match = m_windows.find( encodedOffsetInBits ); match != m_windows.end() ) { - return WindowView( match->second.data(), match->second.size() ); + return match->second; } - return std::nullopt; + return nullptr; } [[nodiscard]] bool @@ -104,8 +118,23 @@ class WindowMap [[nodiscard]] bool operator==( const WindowMap& other ) const { - std::scoped_lock lock( m_mutex ); - return m_windows == other.m_windows; + std::scoped_lock lock( m_mutex, other.m_mutex ); + + if ( m_windows.size() != other.m_windows.size() ) { + return false; + } + + for ( const auto& [offset, window] : m_windows ) { + const auto otherWindow = other.m_windows.find( offset ); + if ( ( otherWindow == other.m_windows.end() ) + || ( static_cast( window ) != static_cast( otherWindow->second ) ) + || ( static_cast( window ) && static_cast( otherWindow->second ) + && ( *window != *otherWindow->second ) ) ) { + return false; + } + } + + return true; } private: diff --git a/src/tests/rapidgzip/testGzipChunkFetcher.cpp b/src/tests/rapidgzip/testGzipChunkFetcher.cpp index 799056c4..20253e5d 100644 --- a/src/tests/rapidgzip/testGzipChunkFetcher.cpp +++ b/src/tests/rapidgzip/testGzipChunkFetcher.cpp @@ -63,7 +63,7 @@ testAutomaticMarkerResolution( const std::filesystem::path& filePath, bitReader, blockOffset, /* untilOffset */ std::numeric_limits::max(), - /* window */ std::nullopt, + /* window */ {}, /* decodedSize */ std::nullopt, cancel ); @@ -208,7 +208,7 @@ testIsalBug() bitReader, blockOffset, untilOffset, - /* window */ window, + /* window */ std::make_shared( window ), /* decodedSize */ 4171816, cancel, FileType::GZIP, @@ -349,7 +349,7 @@ decodeWithDecodeBlock( UniqueFileReader&& fileReader ) bitReader, bitReader.tell(), /* untilOffset */ std::numeric_limits::max(), - /* window */ std::nullopt, + /* window */ {}, /* decodedSize */ std::nullopt, cancel ); } diff --git a/src/tests/rapidgzip/testGzipIndexFormat.cpp b/src/tests/rapidgzip/testGzipIndexFormat.cpp index 686c8a51..69851363 100644 --- a/src/tests/rapidgzip/testGzipIndexFormat.cpp +++ b/src/tests/rapidgzip/testGzipIndexFormat.cpp @@ -53,6 +53,40 @@ testIndexReadWrite( const std::filesystem::path& compressedPath, writeGzipIndex( index, checkedWrite ); } const auto rereadIndex = readGzipIndex( std::make_unique( gzipIndexPath ) ); + + REQUIRE_EQUAL( rereadIndex.compressedSizeInBytes, index.compressedSizeInBytes ); + REQUIRE_EQUAL( rereadIndex.uncompressedSizeInBytes, index.uncompressedSizeInBytes ); + REQUIRE_EQUAL( rereadIndex.checkpointSpacing, index.checkpointSpacing ); + REQUIRE_EQUAL( rereadIndex.windowSizeInBytes, index.windowSizeInBytes ); + + REQUIRE( rereadIndex.checkpoints == index.checkpoints ); + + REQUIRE_EQUAL( static_cast( rereadIndex.windows ), static_cast( index.windows ) ); + if ( rereadIndex.windows && index.windows ) { + REQUIRE_EQUAL( rereadIndex.windows->size(), index.windows->size() ); + const auto& [_, windows] = index.windows->data(); + for ( const auto& [offset, window] : *windows ) { + const auto rereadWindow = rereadIndex.windows->get( offset ); + if ( !rereadWindow ) { + std::cerr << "Failed to find offset " << offset << " in reread index!\n"; + continue; + } + + if ( static_cast( window ) != static_cast( rereadWindow ) ) { + std::stringstream message; + message << std::boolalpha << "Shared window has value: " << static_cast( window ) + << " while reread shared window has value: " << static_cast( rereadWindow ); + std::cerr << std::move( message ).str() << "\n"; + continue; + } + + if ( *window != *rereadWindow ) { + std::cerr << "Window contents for offset " << offset << " differ!\n"; + continue; + } + } + } + REQUIRE( rereadIndex == index ); } catch ( const std::exception& exception ) diff --git a/src/tests/rapidgzip/testParallelGzipReader.cpp b/src/tests/rapidgzip/testParallelGzipReader.cpp index cdcb88f5..37e95b58 100644 --- a/src/tests/rapidgzip/testParallelGzipReader.cpp +++ b/src/tests/rapidgzip/testParallelGzipReader.cpp @@ -271,8 +271,8 @@ testParallelDecodingWithIndex( const TemporaryDirectory& tmpFolder ) const auto reconstructedWindow = reconstructedIndex.windows->get( reconstructed.compressedOffsetInBits ); const auto realWindow = realIndex.windows->get( real.compressedOffsetInBits ); - REQUIRE( reconstructedWindow.has_value() ); - REQUIRE( realWindow.has_value() ); + REQUIRE( static_cast( reconstructedWindow ) ); + REQUIRE( static_cast( realWindow ) ); } } REQUIRE( *reconstructedIndex.windows == *realIndex.windows );