Skip to content

Commit

Permalink
[refactor] WindowMap: Use shared_ptr for windows
Browse files Browse the repository at this point in the history
  • Loading branch information
mxmlnkn committed Jan 21, 2024
1 parent 686ae72 commit ae8785b
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 56 deletions.
37 changes: 20 additions & 17 deletions src/rapidgzip/GzipChunkFetcher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class GzipChunkFetcher :
using ChunkData = T_ChunkData;
using BaseType = BlockFetcher<GzipBlockFinder, ChunkData, FetchingStrategy, ENABLE_STATISTICS>;
using BitReader = rapidgzip::BitReader;
using SharedWindow = WindowMap::SharedWindow;
using WindowView = VectorView<uint8_t>;
using BlockFinder = typename BaseType::BlockFinder;

Expand Down Expand Up @@ -194,7 +195,7 @@ class GzipChunkFetcher :
<< formatBits( chunkData->encodedOffsetInBits ) << ", "
<< formatBits( chunkData->encodedOffsetInBits + chunkData->encodedSizeInBits ) << "].\n"
<< " Window size for the chunk offset: "
<< ( lastWindow.has_value() ? std::to_string( lastWindow->size() ) : "no window" ) << ".";
<< ( lastWindow ? std::to_string( lastWindow->size() ) : "no window" ) << ".";
throw std::logic_error( std::move( message ).str() );
}
return std::make_pair( blockInfo.decodedOffsetInBytes, std::move( chunkData ) );
Expand Down Expand Up @@ -296,18 +297,19 @@ class GzipChunkFetcher :
/* Because this is a new block, it might contain markers that we have to replace with the window
* of the last block. The very first block should not contain any markers, ensuring that we
* can successively propagate the window through all blocks. */
auto lastWindow = m_windowMap->get( chunkData->encodedOffsetInBits );
if ( !lastWindow ) {
auto sharedLastWindow = m_windowMap->get( chunkData->encodedOffsetInBits );
if ( !sharedLastWindow ) {
std::stringstream message;
message << "The window of the last block at " << formatBits( chunkData->encodedOffsetInBits )
<< " should exist at this point!";
throw std::logic_error( std::move( message ).str() );
}
const auto& lastWindow = *sharedLastWindow;

if constexpr ( REPLACE_MARKERS_IN_PARALLEL ) {
waitForReplacedMarkers( chunkData, *lastWindow );
waitForReplacedMarkers( chunkData, lastWindow );
} else {
replaceMarkers( chunkData, *lastWindow );
replaceMarkers( chunkData, lastWindow );
}

size_t decodedOffsetInBlock{ 0 };
Expand All @@ -316,7 +318,7 @@ class GzipChunkFetcher :
const auto windowOffset = subblock.encodedOffset + subblock.encodedSize;
/* Avoid recalculating what we already emplaced in waitForReplacedMarkers when calling getLastWindow. */
if ( !m_windowMap->get( windowOffset ) ) {
m_windowMap->emplace( windowOffset, chunkData->getWindowAt( *lastWindow, decodedOffsetInBlock ) );
m_windowMap->emplace( windowOffset, chunkData->getWindowAt( lastWindow, decodedOffsetInBlock ) );
}
}
}
Expand Down Expand Up @@ -417,20 +419,21 @@ class GzipChunkFetcher :
}

/* Check for previous window. */
const auto previousWindow = m_windowMap->get( chunkData->encodedOffsetInBits );
if ( !previousWindow ) {
const auto sharedPreviousWindow = m_windowMap->get( chunkData->encodedOffsetInBits );
if ( !sharedPreviousWindow ) {
continue;
}
const auto& previousWindow = *sharedPreviousWindow;

const auto windowOffset = chunkData->encodedOffsetInBits + chunkData->encodedSizeInBits;
if ( !m_windowMap->get( windowOffset ) ) {
m_windowMap->emplace( windowOffset, chunkData->getLastWindow( *previousWindow ) );
m_windowMap->emplace( windowOffset, chunkData->getLastWindow( previousWindow ) );
}

m_markersBeingReplaced.emplace(
chunkData->encodedOffsetInBits,
this->submitTaskWithHighPriority(
[this, chunkData, previousWindow] () { replaceMarkers( chunkData, *previousWindow ); }
[this, chunkData, sharedPreviousWindow] () { replaceMarkers( chunkData, *sharedPreviousWindow ); }
)
);
}
Expand Down Expand Up @@ -615,7 +618,7 @@ class GzipChunkFetcher :
* indexes were created by us and follow that scheme.
*/
( m_isBgzfFile && !m_blockFinder->finalized()
? std::make_optional( WindowView{} )
? std::make_shared<WindowMap::Window>()
: m_windowMap->get( blockOffset ) ),
/* decodedSize */ blockInfo ? blockInfo->decodedSizeInBytes : std::optional<size_t>{},
m_cancelThreads,
Expand All @@ -637,7 +640,7 @@ class GzipChunkFetcher :
decodeBlock( BitReader const& originalBitReader,
size_t const blockOffset,
size_t const untilOffset,
std::optional<WindowView> const initialWindow,
SharedWindow const initialWindow,
std::optional<size_t> const decodedSize,
std::atomic<bool> const& cancelThreads,
FileType const fileType = FileType::GZIP,
Expand All @@ -653,6 +656,7 @@ class GzipChunkFetcher :

if ( initialWindow && untilOffsetIsExact ) {
const auto fileSize = originalBitReader.size();
const auto& window = *initialWindow;

ChunkData result;
result.setCRC32Enabled( crc32Enabled );
Expand All @@ -661,7 +665,7 @@ class GzipChunkFetcher :
result = decodeBlockWithInflateWrapper<InflateWrapper>(
originalBitReader,
fileSize ? std::min( untilOffset, *fileSize ) : untilOffset,
*initialWindow,
window,
decodedSize,
std::move( result ) );

Expand All @@ -675,9 +679,7 @@ class GzipChunkFetcher :
<< " Decoded size : " << result.decodedSizeInBytes << " B\n"
<< " Expected decoded size : " << *decodedSize << " B\n"
<< " Until offset is exact : " << untilOffsetIsExact << "\n"
<< " Initial Window : " << ( initialWindow
? std::to_string( initialWindow->size() )
: std::string( "None" ) ) << "\n";
<< " Initial Window : " << std::to_string( window.size() ) << "\n";
throw std::runtime_error( std::move( message ).str() );
}

Expand All @@ -687,11 +689,12 @@ class GzipChunkFetcher :
BitReader bitReader( originalBitReader );
if ( initialWindow ) {
bitReader.seek( blockOffset );
const auto& window = *initialWindow;

ChunkData result;
result.setCRC32Enabled( crc32Enabled );
result.fileType = fileType;
return decodeBlockWithRapidgzip( &bitReader, untilOffset, initialWindow, maxDecompressedChunkSize,
return decodeBlockWithRapidgzip( &bitReader, untilOffset, window, maxDecompressedChunkSize,
std::move( result ) );
}

Expand Down
4 changes: 2 additions & 2 deletions src/rapidgzip/IndexFileFormat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,10 +483,10 @@ writeGzipIndex( const GzipIndex& in

if ( !std::all_of( checkpoints.begin(), checkpoints.end(), [&index, windowSizeInBytes] ( const auto& checkpoint ) {
const auto window = index.windows->get( checkpoint.compressedOffsetInBits );
return window.has_value() && ( window->empty() || ( window->size() >= windowSizeInBytes ) );
return window && ( window->empty() || ( window->size() >= windowSizeInBytes ) );
} ) )
{
throw std::invalid_argument( "All window sizes must be at least 32 KiB!" );
throw std::invalid_argument( "All window sizes must be at least 32 KiB or empty!" );
}

checkedWrite( "GZIDX", 5 );
Expand Down
22 changes: 5 additions & 17 deletions src/rapidgzip/ParallelGzipReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -723,20 +723,7 @@ class ParallelGzipReader final :
setBlockOffsets( const GzipIndex& index )
{
const auto result = index.windows->data();
setBlockOffsets( index, [&windows = result.second] ( size_t offset ) -> Window {
return windows->at( offset );
} );
}

void
setBlockOffsets( GzipIndex&& index )
{
const auto result = index.windows->data();
setBlockOffsets( std::move( index ), [&windows = result.second] ( size_t offset ) -> Window {
Window window;
std::swap( windows->at( offset ), window );
return window;
} );
setBlockOffsets( index, [&windows = result.second] ( size_t offset ) { return windows->at( offset ); } );
}

/**
Expand All @@ -745,8 +732,8 @@ class ParallelGzipReader final :
* This makes it possible to destructively return the Window to avoid a copy!
*/
void
setBlockOffsets( const GzipIndex& index,
const std::function<Window( size_t )>& getWindow )
setBlockOffsets( const GzipIndex& index,
const std::function<WindowMap::SharedWindow( size_t )>& getWindow )
{
if ( index.checkpoints.empty() || !index.windows || !getWindow ) {
return;
Expand Down Expand Up @@ -795,7 +782,8 @@ class ParallelGzipReader final :
* For some reason, indexed_gzip also stores windows for the very last checkpoint at the end of the file,
* which is useless because there is nothing thereafter. But, don't filter it here so that exportIndex
* mirrors importIndex better. */
m_windowMap->emplace( checkpoint.compressedOffsetInBits, getWindow( checkpoint.compressedOffsetInBits ) );
m_windowMap->emplaceShared( checkpoint.compressedOffsetInBits,
getWindow( checkpoint.compressedOffsetInBits ) );
}

/* Input file-end offset if not included in checkpoints. */
Expand Down
59 changes: 44 additions & 15 deletions src/rapidgzip/WindowMap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,24 @@

#include <cstdint>
#include <map>
#include <memory>
#include <mutex>
#include <optional>
#include <stdexcept>
#include <utility>
#include <vector>

#include <DecodedData.hpp>
#include <FasterVector.hpp>
#include <VectorView.hpp>


class WindowMap
{
public:
using Window = rapidgzip::deflate::DecodedVector;
using Window = FasterVector<std::uint8_t>;
using WindowView = VectorView<std::uint8_t>;
using Windows = std::map</* encoded block offset */ size_t, Window>;
using SharedWindow = std::shared_ptr<const Window>;
using Windows = std::map</* encoded block offset */ size_t, SharedWindow>;

public:
WindowMap() = default;
Expand All @@ -30,37 +32,49 @@ class WindowMap
}

void
emplace( size_t encodedBlockOffset,
Window window )
emplace( const size_t encodedBlockOffset,
WindowView window )
{
emplaceShared( encodedBlockOffset, std::make_shared<Window>( window.begin(), window.end() ) );
}

void
emplaceShared( const size_t encodedBlockOffset,
SharedWindow sharedWindow )
{
if ( !sharedWindow ) {
return;
}

std::scoped_lock lock( m_mutex );

if ( m_windows.empty() ) {
m_windows.emplace( encodedBlockOffset, std::move( window ) );
m_windows.emplace( encodedBlockOffset, std::move( sharedWindow ) );
} else if ( m_windows.rbegin()->first < encodedBlockOffset ) {
/* Last value is smaller, so it is given that there is no collision and we can "append"
* the new value with a hint in constant time. This should be the common case as windows
* should be inserted in order of the offset! */
m_windows.emplace_hint( m_windows.end(), encodedBlockOffset, std::move( window ) );
m_windows.emplace_hint( m_windows.end(), encodedBlockOffset, std::move( sharedWindow ) );
} else {
const auto match = m_windows.find( encodedBlockOffset );
if ( ( match != m_windows.end() ) && ( match->second != window ) ) {
throw std::invalid_argument( "Window data to insert is inconsistent and may not be changed!" );
if ( ( match != m_windows.end() ) && ( *match->second != *sharedWindow ) ) {
throw std::invalid_argument( "Window offset to insert already exists and may not be changed!" );
}
m_windows.emplace( encodedBlockOffset, std::move( window ) );
m_windows.emplace( encodedBlockOffset, std::move( sharedWindow ) );
}
}

[[nodiscard]] std::optional<WindowView>
[[nodiscard]] SharedWindow
get( size_t encodedOffsetInBits ) const
{
/* Note that insertions might invalidate iterators but not references to values and especially not the
* internal pointers of the vectors we are storing in the values. Meaning, it is safe to simply return
* a WindowView without a corresponding lock. */
std::scoped_lock lock( m_mutex );
if ( const auto match = m_windows.find( encodedOffsetInBits ); match != m_windows.end() ) {
return WindowView( match->second.data(), match->second.size() );
return match->second;
}
return std::nullopt;
return nullptr;
}

[[nodiscard]] bool
Expand Down Expand Up @@ -104,8 +118,23 @@ class WindowMap
[[nodiscard]] bool
operator==( const WindowMap& other ) const
{
std::scoped_lock lock( m_mutex );
return m_windows == other.m_windows;
std::scoped_lock lock( m_mutex, other.m_mutex );

if ( m_windows.size() != other.m_windows.size() ) {
return false;
}

for ( const auto& [offset, window] : m_windows ) {
const auto otherWindow = other.m_windows.find( offset );
if ( ( otherWindow == other.m_windows.end() )
|| ( static_cast<bool>( window ) != static_cast<bool>( otherWindow->second ) )
|| ( static_cast<bool>( window ) && static_cast<bool>( otherWindow->second )
&& ( *window != *otherWindow->second ) ) ) {
return false;
}
}

return true;
}

private:
Expand Down
6 changes: 3 additions & 3 deletions src/tests/rapidgzip/testGzipChunkFetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ testAutomaticMarkerResolution( const std::filesystem::path& filePath,
bitReader,
blockOffset,
/* untilOffset */ std::numeric_limits<size_t>::max(),
/* window */ std::nullopt,
/* window */ {},
/* decodedSize */ std::nullopt,
cancel );

Expand Down Expand Up @@ -208,7 +208,7 @@ testIsalBug()
bitReader,
blockOffset,
untilOffset,
/* window */ window,
/* window */ std::make_shared<WindowMap::Window>( window ),
/* decodedSize */ 4171816,
cancel,
FileType::GZIP,
Expand Down Expand Up @@ -349,7 +349,7 @@ decodeWithDecodeBlock( UniqueFileReader&& fileReader )
bitReader,
bitReader.tell(),
/* untilOffset */ std::numeric_limits<size_t>::max(),
/* window */ std::nullopt,
/* window */ {},
/* decodedSize */ std::nullopt,
cancel );
}
Expand Down
34 changes: 34 additions & 0 deletions src/tests/rapidgzip/testGzipIndexFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,40 @@ testIndexReadWrite( const std::filesystem::path& compressedPath,
writeGzipIndex( index, checkedWrite );
}
const auto rereadIndex = readGzipIndex( std::make_unique<StandardFileReader>( gzipIndexPath ) );

REQUIRE_EQUAL( rereadIndex.compressedSizeInBytes, index.compressedSizeInBytes );
REQUIRE_EQUAL( rereadIndex.uncompressedSizeInBytes, index.uncompressedSizeInBytes );
REQUIRE_EQUAL( rereadIndex.checkpointSpacing, index.checkpointSpacing );
REQUIRE_EQUAL( rereadIndex.windowSizeInBytes, index.windowSizeInBytes );

REQUIRE( rereadIndex.checkpoints == index.checkpoints );

REQUIRE_EQUAL( static_cast<bool>( rereadIndex.windows ), static_cast<bool>( index.windows ) );
if ( rereadIndex.windows && index.windows ) {
REQUIRE_EQUAL( rereadIndex.windows->size(), index.windows->size() );
const auto& [_, windows] = index.windows->data();
for ( const auto& [offset, window] : *windows ) {
const auto rereadWindow = rereadIndex.windows->get( offset );
if ( !rereadWindow ) {
std::cerr << "Failed to find offset " << offset << " in reread index!\n";
continue;
}

if ( static_cast<bool>( window ) != static_cast<bool>( rereadWindow ) ) {
std::stringstream message;
message << std::boolalpha << "Shared window has value: " << static_cast<bool>( window )
<< " while reread shared window has value: " << static_cast<bool>( rereadWindow );
std::cerr << std::move( message ).str() << "\n";
continue;
}

if ( *window != *rereadWindow ) {
std::cerr << "Window contents for offset " << offset << " differ!\n";
continue;
}
}
}

REQUIRE( rereadIndex == index );
}
catch ( const std::exception& exception )
Expand Down
4 changes: 2 additions & 2 deletions src/tests/rapidgzip/testParallelGzipReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ testParallelDecodingWithIndex( const TemporaryDirectory& tmpFolder )

const auto reconstructedWindow = reconstructedIndex.windows->get( reconstructed.compressedOffsetInBits );
const auto realWindow = realIndex.windows->get( real.compressedOffsetInBits );
REQUIRE( reconstructedWindow.has_value() );
REQUIRE( realWindow.has_value() );
REQUIRE( static_cast<bool>( reconstructedWindow ) );
REQUIRE( static_cast<bool>( realWindow ) );
}
}
REQUIRE( *reconstructedIndex.windows == *realIndex.windows );
Expand Down

0 comments on commit ae8785b

Please sign in to comment.