Skip to content

Commit

Permalink
Add option to coalesce on insertion to read buffer.
Browse files Browse the repository at this point in the history
Summary:
This diff adds the ability to coalesce adjoining buffers sometimes on insertion into the read buffer, using the tail room. This only happens in the "rightward" case. E.g. if there's an existing buffer at offset [0, 1200) with 5000 bytes of tail room, and we are inserting a 1200 byte buffer, it will copy it to the tail and the buffer will become [0, 2400).

To make this maximally useful we also have to make it so that the allocated packet buffers are over-allocated instead of packet-sized.

Reviewed By: afrind, kvtsoy

Differential Revision: D65394114

fbshipit-source-id: b92d466baa1d253f0c5854d679e97f14f523c7eb
  • Loading branch information
Matt Joras authored and facebook-github-bot committed Nov 4, 2024
1 parent 3705cba commit 41ce128
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 114 deletions.
15 changes: 10 additions & 5 deletions quic/client/QuicClientTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1680,16 +1680,21 @@ void QuicClientTransport::onNotifyDataAvailable(
conn_->transportSettings.maxRecvPacketSize * numGROBuffers_;
const uint16_t numPackets = conn_->transportSettings.maxRecvBatchSize;

const size_t readAllocSize =
conn_->transportSettings.readCoalescingSize > kDefaultUDPSendPacketLen
? conn_->transportSettings.readCoalescingSize
: readBufferSize;

if (conn_->transportSettings.shouldUseRecvfromForBatchRecv) {
readWithRecvfrom(sock, readBufferSize, numPackets);
readWithRecvfrom(sock, readAllocSize, numPackets);
} else if (conn_->transportSettings.shouldUseWrapperRecvmmsgForBatchRecv) {
readWithRecvmmsgWrapper(sock, readBufferSize, numPackets);
readWithRecvmmsgWrapper(sock, readAllocSize, numPackets);
} else if (conn_->transportSettings.shouldUseRecvmmsgForBatchRecv) {
readWithRecvmmsg(sock, readBufferSize, numPackets);
readWithRecvmmsg(sock, readAllocSize, numPackets);
} else if (conn_->transportSettings.networkDataPerSocketRead) {
readWithRecvmsgSinglePacketLoop(sock, readBufferSize);
readWithRecvmsgSinglePacketLoop(sock, readAllocSize);
} else {
readWithRecvmsg(sock, readBufferSize, numPackets);
readWithRecvmsg(sock, readAllocSize, numPackets);
}
}

Expand Down
51 changes: 49 additions & 2 deletions quic/state/QuicStreamFunctions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,42 @@ void writeDataToQuicStream(QuicCryptoStream& stream, Buf data) {
stream.writeBuffer.append(std::move(data));
}

// Helper function which appends a raw data range to what MUST be a "tail" of
// a logic IOBuf chain, buf. The function will pack data into the available
// tail agressively, and allocate in terms of appendLen until the push is
// complete.
static void pushToTail(folly::IOBuf* dst, Buf src, size_t allocSize) {
size_t appended = 0;
auto len = src->length();
auto data = src->data();
while (appended < len) {
// If there's no tail room or that buffer is shared, trying to use the tail
// will cause problems.
if (dst->tailroom() == 0 || dst->isSharedOne()) {
// If the buffer we are pushing has tail room, just use that one.
// Otherwise, we have to allocate one.
Buf newBuf;
if (src->tailroom() > 0 && !src->isSharedOne()) {
src->trimStart(appended);
dst->appendChain(std::move(src));
return;
}
newBuf = folly::IOBuf::createCombined(allocSize);
dst->appendChain(std::move(newBuf));
dst = dst->next();
}
auto toAppend = std::min(dst->tailroom(), len - appended);
memcpy(dst->writableTail(), data, toAppend);
dst->append(toAppend);
appended += toAppend;
data += toAppend;
}
}

void appendDataToReadBufferCommon(
QuicStreamLike& stream,
StreamBuffer buffer,
uint32_t coalescingSize,
folly::Function<void(uint64_t, uint64_t)>&& connFlowControlVisitor) {
auto& readBuffer = stream.readBuffer;
auto it = readBuffer.begin();
Expand Down Expand Up @@ -210,7 +243,20 @@ void appendDataToReadBufferCommon(
currentEnd > itEnd) {
// Right overlap. Not done.
current->data.trimStartAtMost(itEnd - current->offset);
it->data.append(current->data.move());
if (coalescingSize > kDefaultUDPSendPacketLen) {
auto itData = it->data.move();
auto itDataTail = itData->prev();
auto currentData = current->data.move();
while (currentData != nullptr) {
auto rest = currentData->pop();
pushToTail(itDataTail, std::move(currentData), coalescingSize);
itDataTail = itData->prev();
currentData = std::move(rest);
}
it->data.append(std::move(itData));
} else {
it->data.append(current->data.move());
}
current = &(*it);
currentAlreadyInserted = true;
DCHECK(!startOverlap);
Expand Down Expand Up @@ -245,6 +291,7 @@ void appendDataToReadBuffer(QuicStreamState& stream, StreamBuffer buffer) {
appendDataToReadBufferCommon(
stream,
std::move(buffer),
stream.conn.transportSettings.readCoalescingSize,
[&stream](uint64_t previousMaxOffsetObserved, uint64_t bufferEndOffset) {
updateFlowControlOnStreamData(
stream, previousMaxOffsetObserved, bufferEndOffset);
Expand All @@ -253,7 +300,7 @@ void appendDataToReadBuffer(QuicStreamState& stream, StreamBuffer buffer) {

void appendDataToReadBuffer(QuicCryptoStream& stream, StreamBuffer buffer) {
appendDataToReadBufferCommon(
stream, std::move(buffer), [](uint64_t, uint64_t) {});
stream, std::move(buffer), 0, [](uint64_t, uint64_t) {});
}

std::pair<Buf, bool> readDataInOrderFromReadBuffer(
Expand Down
1 change: 1 addition & 0 deletions quic/state/QuicStreamFunctions.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ uint64_t getNumPacketsTxWithNewData(const QuicStreamState& stream);
void appendDataToReadBufferCommon(
QuicStreamLike& stream,
StreamBuffer buffer,
uint32_t coalescingSize,
folly::Function<void(uint64_t, uint64_t)>&& connFlowControlVisitor);

/**
Expand Down
4 changes: 4 additions & 0 deletions quic/state/TransportSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,10 @@ struct TransportSettings {
// reception.
bool unidirectionalStreamsReadCallbacksFirst{false};
bool immediatelyRetransmitInitialPackets{false};
// If > 0 this represents the coalescing of appends to the read buffer
// which will be applied. i.e. when used the underlying IOBufs in the read
// buffer will mostly be in chunks of this size.
uint32_t readCoalescingSize{0};
};

} // namespace quic
1 change: 1 addition & 0 deletions quic/state/test/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ cpp_unittest(
srcs = [
"QuicStreamFunctionsTest.cpp",
],
supports_static_listing = False,
deps = [
"fbsource//third-party/googletest:gmock",
"//quic/client:state_and_handshake",
Expand Down
Loading

0 comments on commit 41ce128

Please sign in to comment.