Skip to content

Commit

Permalink
[fix] Propagate EPIPE (broken pipe) to a normal return code instead o…
Browse files Browse the repository at this point in the history
…f an exception

This already worked out of the box for the C++ build but didn't for the
Python build, maybe because it overwrote the signal handler?
  • Loading branch information
mxmlnkn committed Jan 16, 2024
1 parent 51502c6 commit 21ba498
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 118 deletions.
40 changes: 40 additions & 0 deletions src/benchmarks/benchmarkIOWrite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,46 @@ enum class FileInitialization
};


void
pwriteAllToFdVector( const int outputFileDescriptor,
const std::vector<::iovec>& dataToWrite,
size_t fileOffset )
{
for ( size_t i = 0; i < dataToWrite.size(); ) {
const auto segmentCount = std::min( static_cast<size_t>( IOV_MAX ), dataToWrite.size() - i );
auto nBytesWritten = ::pwritev( outputFileDescriptor, &dataToWrite[i], segmentCount, fileOffset );

if ( nBytesWritten < 0 ) {
std::stringstream message;
message << "Failed to write all bytes because of: " << strerror( errno ) << " (" << errno << ")";
throw std::runtime_error( std::move( message.str() ) );
}

fileOffset += nBytesWritten;

/* Skip over buffers that were written fully. */
for ( ; ( i < dataToWrite.size() ) && ( dataToWrite[i].iov_len <= static_cast<size_t>( nBytesWritten ) );
++i ) {
nBytesWritten -= dataToWrite[i].iov_len;
}

/* Write out last partially written buffer if necessary so that we can resume full vectorized writing
* from the next iovec buffer. */
if ( ( i < dataToWrite.size() ) && ( nBytesWritten > 0 ) ) {
const auto& iovBuffer = dataToWrite[i];

assert( iovBuffer.iov_len < static_cast<size_t>( nBytesWritten ) );
const auto remainingSize = iovBuffer.iov_len - nBytesWritten;
const auto remainingData = reinterpret_cast<char*>( iovBuffer.iov_base ) + nBytesWritten;
pwriteAllToFd( outputFileDescriptor, remainingData, remainingSize, fileOffset );
fileOffset += remainingSize;

++i;
}
}
}


[[nodiscard]] const char*
toStringFile( const FileInitialization fileInitialization )
{
Expand Down
8 changes: 7 additions & 1 deletion src/benchmarks/benchmarkSequential2023.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,13 @@ benchmarkWrite( const std::string& filePath,
uint64_t sum{ 0 };
for ( size_t i = 0; i < data.size(); i += chunkSize ) {
const auto sizeToWrite = std::min( chunkSize, data.size() - i );
writeAllToFd( *ufd, &data[i], sizeToWrite );
const auto errorCode = writeAllToFd( *ufd, &data[i], sizeToWrite );
if ( errorCode != 0 ) {
std::stringstream message;
message << "Failed to write all bytes because of: " << strerror( errorCode )
<< " (" << errorCode << ")";
throw std::runtime_error( std::move( message.str() ) );
}
sum += sizeToWrite;
}

Expand Down
125 changes: 48 additions & 77 deletions src/core/FileUtils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,11 +353,10 @@ findParentFolderContaining( const std::string& folder,
* Or maybe try setting the pipe buffer size to some forced value and
* then only free the last data after pipe size more has been written.
*
* @note Throws if some splice calls were successful followed by an unsucessful one before finishing.
* @return true if successful and false if it could not be spliced from the beginning, e.g., because the file
* descriptor is not a pipe.
* @return 0 if successful and errno if it could not be spliced from the beginning, e.g., because the file
* descriptor is not a pipe or because the file descriptor triggered a SIGPIPE.
*/
[[nodiscard]] inline bool
[[nodiscard]] inline int
writeAllSpliceUnsafe( [[maybe_unused]] const int outputFileDescriptor,
[[maybe_unused]] const void* const dataToWrite,
[[maybe_unused]] const size_t dataToWriteSize )
Expand All @@ -367,22 +366,21 @@ writeAllSpliceUnsafe( [[maybe_unused]] const int outputFileDescriptor,
dataToSplice.iov_base = const_cast<void*>( reinterpret_cast<const void*>( dataToWrite ) );
dataToSplice.iov_len = dataToWriteSize;
while ( dataToSplice.iov_len > 0 ) {
/* Note: On a broken pipe signal (EPIPE), the C++ CLI will directly exit and will not resume
* on the following line while with the Python wrapper, it will resume!
* @see https://stackoverflow.com/a/18963142/2191065 */
const auto nBytesWritten = ::vmsplice( outputFileDescriptor, &dataToSplice, 1, /* flags */ 0 );
if ( nBytesWritten < 0 ) {
if ( dataToSplice.iov_len == dataToWriteSize ) {
return false;
}
std::cerr << "error: " << errno << "\n";
throw std::runtime_error( "Failed to write to pipe" );
return errno;
}
dataToSplice.iov_base = reinterpret_cast<char*>( dataToSplice.iov_base ) + nBytesWritten;
dataToSplice.iov_len -= nBytesWritten;
}
return true;
return 0;
}


[[nodiscard]] inline bool
[[nodiscard]] inline int
writeAllSpliceUnsafe( [[maybe_unused]] const int outputFileDescriptor,
[[maybe_unused]] const std::vector<::iovec>& dataToWrite )
{
Expand All @@ -392,7 +390,7 @@ writeAllSpliceUnsafe( [[maybe_unused]] const int outputFileDes

if ( nBytesWritten < 0 ) {
if ( i == 0 ) {
return false;
return errno;
}

std::stringstream message;
Expand All @@ -415,14 +413,15 @@ writeAllSpliceUnsafe( [[maybe_unused]] const int outputFileDes
const auto size = iovBuffer.iov_len - nBytesWritten;

const auto remainingData = reinterpret_cast<char*>( iovBuffer.iov_base ) + nBytesWritten;
if ( !writeAllSpliceUnsafe( outputFileDescriptor, remainingData, size ) ) {
throw std::runtime_error( "Failed to write to pipe subsequently." );
const auto errorCode = writeAllSpliceUnsafe( outputFileDescriptor, remainingData, size );
if ( errorCode != 0 ) {
return errorCode;
}
++i;
}
}

return true;
return 0;
}


Expand Down Expand Up @@ -460,39 +459,47 @@ class SpliceVault
* the pipe.
*/
template<typename T>
[[nodiscard]] bool
[[nodiscard]] int
splice( const void* const dataToWrite,
size_t const dataToWriteSize,
const std::shared_ptr<T>& splicedData )
{
if ( ( m_pipeBufferSize < 0 )
|| !writeAllSpliceUnsafe( m_fileDescriptor, dataToWrite, dataToWriteSize ) ) {
return false;
if ( m_pipeBufferSize < 0 ) {
return -1;
}

const auto errorCode = writeAllSpliceUnsafe( m_fileDescriptor, dataToWrite, dataToWriteSize );
if ( errorCode != 0 ) {
return errorCode;
}

account( splicedData, dataToWriteSize );
return true;
return 0;
}

/**
* Overload that works for iovec structures directly.
*/
template<typename T>
[[nodiscard]] bool
[[nodiscard]] int
splice( const std::vector<::iovec>& buffersToWrite,
const std::shared_ptr<T>& splicedData )
{
if ( ( m_pipeBufferSize < 0 )
|| !writeAllSpliceUnsafe( m_fileDescriptor, buffersToWrite ) ) {
return false;
if ( m_pipeBufferSize < 0 ) {
return -1;
}

const auto errorCode = writeAllSpliceUnsafe( m_fileDescriptor, buffersToWrite );
if ( errorCode != 0 ) {
return errorCode;
}

const auto dataToWriteSize = std::accumulate(
buffersToWrite.begin(), buffersToWrite.end(), size_t( 0 ),
[] ( size_t sum, const auto& buffer ) { return sum + buffer.iov_len; } );

account( splicedData, dataToWriteSize );
return true;
return 0;
}

private:
Expand Down Expand Up @@ -557,7 +564,7 @@ class SpliceVault
* Posix write is not guaranteed to write everything and in fact was encountered to not write more than
* 0x7ffff000 (2'147'479'552) B. To avoid this, it has to be looped over.
*/
inline void
[[nodiscard]] inline int
writeAllToFd( const int outputFileDescriptor,
const void* const dataToWrite,
const uint64_t dataToWriteSize )
Expand All @@ -573,13 +580,12 @@ writeAllToFd( const int outputFileDescriptor,

const auto nBytesWritten = ::write( outputFileDescriptor, currentBufferPosition, nBytesToWritePerCall );
if ( nBytesWritten <= 0 ) {
std::stringstream message;
message << "Unable to write all data to the given file descriptor. Wrote " << nTotalWritten << " out of "
<< dataToWriteSize << " (" << strerror( errno ) << ").";
throw std::runtime_error( std::move( message ).str() );
return errno;
}
nTotalWritten += static_cast<uint64_t>( nBytesWritten );
}

return 0;
}


Expand Down Expand Up @@ -609,7 +615,7 @@ pwriteAllToFd( const int outputFileDescriptor,
}


inline void
[[nodiscard]] inline int
writeAllToFdVector( const int outputFileDescriptor,
const std::vector<::iovec>& dataToWrite )
{
Expand All @@ -618,9 +624,7 @@ writeAllToFdVector( const int outputFileDescriptor,
auto nBytesWritten = ::writev( outputFileDescriptor, &dataToWrite[i], segmentCount );

if ( nBytesWritten < 0 ) {
std::stringstream message;
message << "Failed to write all bytes because of: " << strerror( errno ) << " (" << errno << ")";
throw std::runtime_error( std::move( message.str() ) );
return errno;
}

/* Skip over buffers that were written fully. */
Expand All @@ -637,67 +641,32 @@ writeAllToFdVector( const int outputFileDescriptor,
assert( iovBuffer.iov_len < static_cast<size_t>( nBytesWritten ) );
const auto remainingSize = iovBuffer.iov_len - nBytesWritten;
const auto remainingData = reinterpret_cast<char*>( iovBuffer.iov_base ) + nBytesWritten;
writeAllToFd( outputFileDescriptor, remainingData, remainingSize );
const auto errorCode = writeAllToFd( outputFileDescriptor, remainingData, remainingSize );
if ( errorCode != 0 ) {
return errorCode;
}

++i;
}
}
}


inline void
pwriteAllToFdVector( const int outputFileDescriptor,
const std::vector<::iovec>& dataToWrite,
size_t fileOffset )
{
for ( size_t i = 0; i < dataToWrite.size(); ) {
const auto segmentCount = std::min( static_cast<size_t>( IOV_MAX ), dataToWrite.size() - i );
auto nBytesWritten = ::pwritev( outputFileDescriptor, &dataToWrite[i], segmentCount, fileOffset );

if ( nBytesWritten < 0 ) {
std::stringstream message;
message << "Failed to write all bytes because of: " << strerror( errno ) << " (" << errno << ")";
throw std::runtime_error( std::move( message.str() ) );
}

fileOffset += nBytesWritten;

/* Skip over buffers that were written fully. */
for ( ; ( i < dataToWrite.size() ) && ( dataToWrite[i].iov_len <= static_cast<size_t>( nBytesWritten ) );
++i ) {
nBytesWritten -= dataToWrite[i].iov_len;
}

/* Write out last partially written buffer if necessary so that we can resume full vectorized writing
* from the next iovec buffer. */
if ( ( i < dataToWrite.size() ) && ( nBytesWritten > 0 ) ) {
const auto& iovBuffer = dataToWrite[i];

assert( iovBuffer.iov_len < static_cast<size_t>( nBytesWritten ) );
const auto remainingSize = iovBuffer.iov_len - nBytesWritten;
const auto remainingData = reinterpret_cast<char*>( iovBuffer.iov_base ) + nBytesWritten;
pwriteAllToFd( outputFileDescriptor, remainingData, remainingSize, fileOffset );
fileOffset += remainingSize;

++i;
}
}
return 0;
}
#endif // HAVE_IOVEC


inline void
[[nodiscard]] inline int
writeAll( const int outputFileDescriptor,
void* const outputBuffer,
const void* const dataToWrite,
const uint64_t dataToWriteSize )
{
if ( dataToWriteSize == 0 ) {
return;
return 0;
}

if ( outputFileDescriptor >= 0 ) {
writeAllToFd( outputFileDescriptor, dataToWrite, dataToWriteSize );
return writeAllToFd( outputFileDescriptor, dataToWrite, dataToWriteSize );
}

if ( outputBuffer != nullptr ) {
Expand All @@ -706,6 +675,8 @@ writeAll( const int outputFileDescriptor,
}
std::memcpy( outputBuffer, dataToWrite, dataToWriteSize );
}

return 0;
}


Expand Down
8 changes: 7 additions & 1 deletion src/indexed_bzip2/BZ2ReaderInterface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@ class BZ2ReaderInterface :
uint64_t const size ) mutable
{
auto* const currentBufferPosition = outputBuffer == nullptr ? nullptr : outputBuffer + nBytesDecoded;
writeAll( outputFileDescriptor, currentBufferPosition, buffer, size );
const auto errorCode = writeAll( outputFileDescriptor, currentBufferPosition, buffer, size );
if ( errorCode != 0 ) {
std::stringstream message;
message << "Failed to write all bytes because of: " << strerror( errorCode )
<< " (" << errorCode << ")";
throw std::runtime_error( std::move( message.str() ) );
}
nBytesDecoded += size;
};

Expand Down
Loading

0 comments on commit 21ba498

Please sign in to comment.