diff --git a/quic/client/QuicClientTransport.cpp b/quic/client/QuicClientTransport.cpp index f311c7a2e..39f2598de 100644 --- a/quic/client/QuicClientTransport.cpp +++ b/quic/client/QuicClientTransport.cpp @@ -1293,6 +1293,68 @@ void QuicClientTransport::recvMsg( networkData.getPackets().size(), networkData.getTotalData()); } +void QuicClientTransport::recvFrom( + QuicAsyncUDPSocket& sock, + uint64_t readBufferSize, + int numPackets, + NetworkData& networkData, + Optional& server, + size_t& totalData) { + for (int packetNum = 0; packetNum < numPackets; ++packetNum) { + // We create 1 buffer per packet so that it is not shared, this enables + // us to decrypt in place. If the fizz decrypt api could decrypt in-place + // even if shared, then we could allocate one giant IOBuf here. + Buf readBuffer = folly::IOBuf::createCombined(readBufferSize); + struct iovec vec; + vec.iov_base = readBuffer->writableData(); + vec.iov_len = readBufferSize; + + sockaddr* rawAddr{nullptr}; + struct sockaddr_storage addrStorage {}; + if (!server) { + rawAddr = reinterpret_cast(&addrStorage); + rawAddr->sa_family = sock.getLocalAddressFamily(); + } + + ssize_t ret = + sock.recvfrom(readBuffer->writableData(), readBufferSize, &addrStorage); + if (ret < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // If we got a retriable error, let us continue. + if (conn_->loopDetectorCallback) { + conn_->readDebugState.noReadReason = NoReadReason::RETRIABLE_ERROR; + } + break; + } + // If we got a non-retriable error, we might have received + // a packet that we could process, however let's just quit early. + sock.pauseRead(); + if (conn_->loopDetectorCallback) { + conn_->readDebugState.noReadReason = NoReadReason::NONRETRIABLE_ERROR; + } + return onReadError(folly::AsyncSocketException( + folly::AsyncSocketException::INTERNAL_ERROR, + "::recvmsg() failed", + errno)); + } else if (ret == 0) { + break; + } + + size_t bytesRead = size_t(ret); + totalData += bytesRead; + if (!server) { + server = folly::SocketAddress(); + server->setFromSockaddr(rawAddr, kAddrLen); + } + VLOG(10) << "Got data from socket peer=" << *server << " len=" << bytesRead; + readBuffer->append(bytesRead); + + networkData.addPacket(ReceivedUdpPacket(std::move(readBuffer))); + } + trackDatagramsReceived( + networkData.getPackets().size(), networkData.getTotalData()); +} + void QuicClientTransport::recvMmsg( QuicAsyncUDPSocket& sock, uint64_t readBufferSize, @@ -1598,6 +1660,18 @@ void QuicClientTransport::readWithRecvmsg( processPackets(std::move(networkData), server); } +void QuicClientTransport::readWithRecvfrom( + QuicAsyncUDPSocket& sock, + uint64_t readBufferSize, + uint16_t numPackets) { + NetworkData networkData; + networkData.reserve(numPackets); + size_t totalData = 0; + Optional server; + recvFrom(sock, readBufferSize, numPackets, networkData, server, totalData); + processPackets(std::move(networkData), server); +} + void QuicClientTransport::onNotifyDataAvailable( QuicAsyncUDPSocket& sock) noexcept { auto self = this->shared_from_this(); @@ -1606,7 +1680,9 @@ void QuicClientTransport::onNotifyDataAvailable( conn_->transportSettings.maxRecvPacketSize * numGROBuffers_; const uint16_t numPackets = conn_->transportSettings.maxRecvBatchSize; - if (conn_->transportSettings.shouldUseWrapperRecvmmsgForBatchRecv) { + if (conn_->transportSettings.shouldUseRecvfromForBatchRecv) { + readWithRecvfrom(sock, readBufferSize, numPackets); + } else if (conn_->transportSettings.shouldUseWrapperRecvmmsgForBatchRecv) { readWithRecvmmsgWrapper(sock, readBufferSize, numPackets); } else if (conn_->transportSettings.shouldUseRecvmmsgForBatchRecv) { readWithRecvmmsg(sock, readBufferSize, numPackets); diff --git a/quic/client/QuicClientTransport.h b/quic/client/QuicClientTransport.h index 4160f0b88..41556f41e 100644 --- a/quic/client/QuicClientTransport.h +++ b/quic/client/QuicClientTransport.h @@ -263,6 +263,14 @@ class QuicClientTransport bool shouldOnlyNotify() override; void onNotifyDataAvailable(QuicAsyncUDPSocket& sock) noexcept override; + void recvFrom( + QuicAsyncUDPSocket& sock, + uint64_t readBufferSize, + int numPackets, + NetworkData& networkData, + Optional& server, + size_t& totalData); + void recvMsg( QuicAsyncUDPSocket& sock, uint64_t readBufferSize, @@ -327,6 +335,11 @@ class QuicClientTransport NetworkData&& networkData, const Optional& server); + void readWithRecvfrom( + QuicAsyncUDPSocket& sock, + uint64_t readBufferSize, + uint16_t numPackets); + void readWithRecvmmsgWrapper( QuicAsyncUDPSocket& sock, uint64_t readBufferSize, diff --git a/quic/common/udpsocket/LibevQuicAsyncUDPSocket.cpp b/quic/common/udpsocket/LibevQuicAsyncUDPSocket.cpp index 09f4f28a5..43ebf708b 100644 --- a/quic/common/udpsocket/LibevQuicAsyncUDPSocket.cpp +++ b/quic/common/udpsocket/LibevQuicAsyncUDPSocket.cpp @@ -401,6 +401,20 @@ int LibevQuicAsyncUDPSocket::getGRO() { return -1; } +ssize_t LibevQuicAsyncUDPSocket::recvfrom( + uint8_t* buf, + size_t bufSize, + sockaddr_storage* sockaddrStorage) { + socklen_t addrlen = sizeof(*sockaddrStorage); + return ::recvfrom( + fd_, + buf, + bufSize, + MSG_DONTWAIT, + (struct sockaddr*)sockaddrStorage, + &addrlen); +} + ssize_t LibevQuicAsyncUDPSocket::recvmsg(struct msghdr* msg, int flags) { return ::recvmsg(fd_, msg, flags); } diff --git a/quic/common/udpsocket/LibevQuicAsyncUDPSocket.h b/quic/common/udpsocket/LibevQuicAsyncUDPSocket.h index aca282253..f1876aa0d 100644 --- a/quic/common/udpsocket/LibevQuicAsyncUDPSocket.h +++ b/quic/common/udpsocket/LibevQuicAsyncUDPSocket.h @@ -69,6 +69,11 @@ class LibevQuicAsyncUDPSocket : public QuicAsyncUDPSocketImpl { LOG(FATAL) << __func__ << " not supported in LibevQuicAsyncUDPSocket"; } + ssize_t recvfrom( + uint8_t* buf, + size_t bufSize, + sockaddr_storage* sockaddrStorage) override; + ssize_t recvmsg(struct msghdr* msg, int flags) override; int recvmmsg( diff --git a/quic/common/udpsocket/QuicAsyncUDPSocket.h b/quic/common/udpsocket/QuicAsyncUDPSocket.h index 8089562f8..22777dc3a 100644 --- a/quic/common/udpsocket/QuicAsyncUDPSocket.h +++ b/quic/common/udpsocket/QuicAsyncUDPSocket.h @@ -209,6 +209,13 @@ class QuicAsyncUDPSocket { size_t count, const WriteOptions* options) = 0; + virtual ssize_t recvfrom( + uint8_t* /* buf */, + size_t /* bufSize */, + sockaddr_storage* /* sockaddrStorage */) { + return -1; + } + virtual ssize_t recvmsg(struct msghdr* /* msg */, int /* flags */) = 0; virtual int recvmmsg( diff --git a/quic/state/TransportSettings.h b/quic/state/TransportSettings.h index 71f21f5f7..e39c392b0 100644 --- a/quic/state/TransportSettings.h +++ b/quic/state/TransportSettings.h @@ -255,6 +255,8 @@ struct TransportSettings { bool shouldUseWrapperRecvmmsgForBatchRecv{false}; // Whether or not use recvmmsg. bool shouldUseRecvmmsgForBatchRecv{false}; + // Whether or not use recvfrom. + bool shouldUseRecvfromForBatchRecv{false}; // Config struct for congestion controllers CongestionControlConfig ccaConfig; // A packet is considered loss when a packet that's sent later by at least