Skip to content

Commit

Permalink
Allow moving from AsyncSocket, transfer of FD read state from AsyncFd…
Browse files Browse the repository at this point in the history
…Socket

Summary: The current APIs are a bit clunky, and are intended only for usage in TLS code. If this keeps coming up, it is possible to get closer to real move semantics that works on both `AsyncFdSocket` (with pre-existing read and/or write state) and non-FD `AsyncSocket`. However, figuring out the correct details for the more general thing is considerably more involved, and is just not worth it yet.

Reviewed By: AjanthanAsogamoorthy

Differential Revision: D48087183

fbshipit-source-id: fb9cf0484733a16648f276fbb5873375d7a1f0c4
  • Loading branch information
Alexey Spiridonov authored and facebook-github-bot committed Aug 7, 2023
1 parent 0c113c0 commit 52c192e
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 14 deletions.
30 changes: 30 additions & 0 deletions folly/io/async/fdsock/AsyncFdSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,23 @@ AsyncFdSocket::AsyncFdSocket(
}
#endif

AsyncFdSocket::AsyncFdSocket(
AsyncFdSocket::DoesNotMoveFdSocketState, AsyncSocket* sock)
: AsyncSocket(sock)
#if !defined(_WIN32)
,
readAncillaryDataCob_(this) {
setUpCallbacks();
}
#else
{
}
#endif

AsyncFdSocket::AsyncFdSocket(
AsyncFdSocket::DoesNotMoveFdSocketState tag, AsyncSocket::UniquePtr sock)
: AsyncFdSocket(tag, sock.get()) {}

void AsyncFdSocket::writeChainWithFds(
WriteCallback* callback,
std::unique_ptr<folly::IOBuf> buf,
Expand Down Expand Up @@ -130,6 +147,19 @@ void AsyncFdSocket::setUpCallbacks() noexcept {
AsyncSocket::setReadAncillaryDataCB(&readAncillaryDataCob_);
}

void AsyncFdSocket::swapFdReadStateWith(AsyncFdSocket* other) {
// We don't need these write-state assertions to correctly swap read
// state, but since the only use-case is `moveToPlaintext`, they help.
DCHECK_EQ(0, other->allocatedToSendFdsSeqNum_);
DCHECK_EQ(0, other->sentFdsSeqNum_);
DCHECK_EQ(0, other->sendMsgCob_.writeTagToFds_.size());

fdsQueue_.swap(other->fdsQueue_);
std::swap(receivedFdsSeqNum_, other->receivedFdsSeqNum_);
// Do NOT swap `readAncillaryDataCob_` since its internal members are not
// "state", but plumbing that does not change.
}

void AsyncFdSocket::releaseIOBuf(
std::unique_ptr<folly::IOBuf> buf, ReleaseIOBufCallback* callback) {
sendMsgCob_.destroyFdsForWriteTag(WriteRequestTag{buf.get()});
Expand Down
31 changes: 30 additions & 1 deletion folly/io/async/fdsock/AsyncFdSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,24 @@ class AsyncFdSocket : public AsyncSocket {
NetworkSocket fd,
const folly::SocketAddress* peerAddress = nullptr);

/**
* EXPERIMENTAL / TEMPORARY: These move-like constructors should not be
* used to go from one AsyncFdSocket to another because this will not
* correctly preserve read & write state. Full move is not implemented
* since its trickier, and was not yet needed -- see `swapFdReadStateWith`.
*/
struct DoesNotMoveFdSocketState {};

protected:
_FRIEND_TEST_FOR_ASYNC_FD_SOCKET(
AsyncFdSocketSequenceRoundtripTest, WithDataSize);
// Protected since it's easy to accidentally pass an `AsyncFdSocket` here,
// a scenario that's extremely easy to use incorrectly.
AsyncFdSocket(DoesNotMoveFdSocketState, AsyncSocket*);

public:
AsyncFdSocket(DoesNotMoveFdSocketState, AsyncSocket::UniquePtr);

/**
* `AsyncSocket::writeChain` analog that passes FDs as ancillary data over
* the socket (see `man cmsg`).
Expand Down Expand Up @@ -136,8 +154,19 @@ class AsyncFdSocket : public AsyncSocket {
LOG(DFATAL) << "AsyncFdSocket::setReadAncillaryDataCB is forbidden";
}

// This uses no ancillary data callbacks on Windows, they wouldn't compile.
// This class has no ancillary data callbacks on Windows, they wouldn't compile
#if !defined(_WIN32)
/**
* EXPERIMENTAL / TEMPORARY: This just does what is required for
* `moveToPlaintext` to support StopTLS. That use-case could later be
* covered by full move-construct or move-assign support, but both would
* be more complex to support.
*
* Swaps "read FDs" state (receive queue & sequence numbers) with `other`.
* DFATALs if `other` had any "write FDs" state.
*/
void swapFdReadStateWith(AsyncFdSocket* other);

protected:
void releaseIOBuf(
std::unique_ptr<folly::IOBuf>, ReleaseIOBufCallback*) override;
Expand Down
52 changes: 39 additions & 13 deletions folly/io/async/fdsock/test/AsyncFdSocketTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ struct AsyncFdSocketTest : public testing::Test {
}()} {}

explicit AsyncFdSocketTest(std::array<NetworkSocket, 2> fds)
: sendSock_{&evb_, fds[0]}, recvSock_{&evb_, fds[1]} {
recvSock_.setReadCB(&rcb_);
: sendSock_{&evb_, fds[0]},
recvSock_(std::make_unique<AsyncFdSocket>(&evb_, fds[1])) {
recvSock_->setReadCB(&rcb_);
}

EventBase evb_;
Expand All @@ -100,7 +101,7 @@ struct AsyncFdSocketTest : public testing::Test {
AsyncFdSocket sendSock_;

ReadCallback rcb_; // NB: `~AsyncSocket` calls `rcb.readEOF`
AsyncFdSocket recvSock_;
std::unique_ptr<AsyncFdSocket> recvSock_;
};

TEST_F(AsyncFdSocketTest, TestAddSeqNum) {
Expand Down Expand Up @@ -171,7 +172,7 @@ TEST_P(AsyncFdSocketSimpleRoundtripTest, WithNumFds) {
rcb_.verifyData(&data, sizeof(data));
rcb_.clearData();

checkFdsMatch(sendFds, sendSeqNum, recvSock_.popNextReceivedFds());
checkFdsMatch(sendFds, sendSeqNum, recvSock_->popNextReceivedFds());
}

// Round-trip & verify various numbers of FDs with 1 byte of data.
Expand Down Expand Up @@ -225,7 +226,7 @@ TEST_F(AsyncFdSocketTest, MultiPartSend) {

// FDs are sent with the first send & received by the first receive
evb_.loopOnce();
checkFdsMatch(sendFds, sendSeqNum, recvSock_.popNextReceivedFds());
checkFdsMatch(sendFds, sendSeqNum, recvSock_->popNextReceivedFds());
EXPECT_EQ(1, sendSock.numWrites_);

// Receive the rest of the data.
Expand All @@ -239,23 +240,23 @@ TEST_F(AsyncFdSocketTest, MultiPartSend) {
// There are no more data or FDs
evb_.loopOnce(EVLOOP_NONBLOCK);
EXPECT_EQ(0, rcb_.dataRead()) << "Leftover reads";
EXPECT_TRUE(recvSock_.popNextReceivedFds().empty()) << "Extra FDs";
EXPECT_TRUE(recvSock_->popNextReceivedFds().empty()) << "Extra FDs";
}

struct AsyncFdSocketSequenceRoundtripTest
: public AsyncFdSocketTest,
public testing::WithParamInterface<int> {};
public testing::WithParamInterface<std::tuple<bool, int>> {};

TEST_P(AsyncFdSocketSequenceRoundtripTest, WithDataSize) {
size_t dataSize = GetParam();
auto [swapSocket, dataSize] = GetParam();

// The default `ReadCallback` has special-snowflake buffer management
// that's annoying for this test. Secondarily, this exercises the
// "ReadVec" path.
ReadvCallback rcb(128, 3);
// Avoid `readEOF` use-after-stack-scope in `~AsyncSocket`.
SCOPE_EXIT { recvSock_.setReadCB(nullptr); };
recvSock_.setReadCB(&rcb);
SCOPE_EXIT { recvSock_->setReadCB(nullptr); };
recvSock_->setReadCB(&rcb);

std::queue<
std::tuple<int, std::string, folly::SocketFds::ToSend, SocketFds::SeqNum>>
Expand Down Expand Up @@ -288,6 +289,23 @@ TEST_P(AsyncFdSocketSequenceRoundtripTest, WithDataSize) {
// The max expected steps is ~3k: 1234567 / (3 * 128)
for (int i = 0; i < 10000 && !sentQueue.empty(); ++i) {
evb_.loopOnce(EVLOOP_NONBLOCK);
// Validate that "move from AsyncSocket" and "swap read state" interrupt
// neither the reading of data nor of FDs.
if (swapSocket) {
AsyncFdSocket prevReadStateSock{nullptr};
prevReadStateSock.swapFdReadStateWith(recvSock_.get());

// Test moving the non-FD parts of the socket, while reading.
struct EnableMakeUnique : public AsyncFdSocket {
EnableMakeUnique(AsyncSocket* sock)
: AsyncFdSocket(AsyncFdSocket::DoesNotMoveFdSocketState{}, sock) {}
};
recvSock_ = std::make_unique<EnableMakeUnique>(recvSock_.get());
recvSock_->setReadCB(&rcb);

// Test moving the FD read state.
recvSock_->swapFdReadStateWith(&prevReadStateSock);
}
size_t dataRead = rcb.buf_->computeChainDataLength();
if (!dataRead) {
continue;
Expand All @@ -305,7 +323,7 @@ TEST_P(AsyncFdSocketSequenceRoundtripTest, WithDataSize) {
// FDs, which would fail in `checkFdsMatch`.
if (!sendFds.empty()) {
const auto sendSeqNum = std::get<3>(sentQueue.front());
checkFdsMatch(sendFds, sendSeqNum, recvSock_.popNextReceivedFds());
checkFdsMatch(sendFds, sendSeqNum, recvSock_->popNextReceivedFds());
}
}

Expand All @@ -331,14 +349,22 @@ TEST_P(AsyncFdSocketSequenceRoundtripTest, WithDataSize) {
EXPECT_TRUE(sentQueue.empty()) << "Stuck reading?";
evb_.loopOnce(EVLOOP_NONBLOCK);
EXPECT_EQ(0, rcb.buf_->computeChainDataLength()) << "Leftover reads";
EXPECT_TRUE(recvSock_.popNextReceivedFds().empty()) << "Extra FDs";
EXPECT_TRUE(recvSock_->popNextReceivedFds().empty()) << "Extra FDs";
}

// Vary the data size to (hopefully) get a variety of chunking behaviors.
INSTANTIATE_TEST_SUITE_P(
VaryDataSize,
AsyncFdSocketSequenceRoundtripTest,
testing::Values(1, 12, 123, 1234, 12345, 123456, 1234567));
testing::Combine(
testing::Values(false, true),
testing::Values(1, 12, 123, 1234, 12345, 123456, 1234567)),
[](const auto& info) {
return fmt::format(
"{}{}",
std::get<0>(info.param) ? "SwapSocket_" : "",
std::get<1>(info.param));
});

#endif // !Windows

Expand Down

0 comments on commit 52c192e

Please sign in to comment.