diff --git a/quic/api/QuicSocket.h b/quic/api/QuicSocket.h index 8ac465d83..ce2d03454 100644 --- a/quic/api/QuicSocket.h +++ b/quic/api/QuicSocket.h @@ -264,21 +264,6 @@ class QuicSocket : virtual public QuicSocketLite { virtual folly::Expected resumeRead( StreamId id) = 0; - /** - * Read from the given stream, up to maxLen bytes. If maxLen is 0, transport - * will return all available bytes. - * - * The return value is Expected. If the value hasError(), then a read error - * occurred and it can be obtained with error(). If the value hasValue(), - * then value() returns a pair of the data (if any) and the EOF marker. - * - * Calling read() when there is no data/eof to deliver will return an - * EAGAIN-like error code. - */ - virtual folly::Expected, LocalErrorCode> read( - StreamId id, - size_t maxLen) = 0; - /** * ===== Peek/Consume API ===== */ @@ -358,30 +343,6 @@ class QuicSocket : virtual public QuicSocketLite { StreamId id, size_t amount) = 0; - /** - * ===== Write API ===== - */ - - /** - * Creates a bidirectional stream. This assigns a stream ID but does not - * send anything to the peer. - * - * If replaySafe is false, the transport will buffer (up to the send buffer - * limits) any writes on this stream until the transport is replay safe. - */ - virtual folly::Expected createBidirectionalStream( - bool replaySafe = true) = 0; - - /** - * Creates a unidirectional stream. This assigns a stream ID but does not - * send anything to the peer. - * - * If replaySafe is false, the transport will buffer (up to the send buffer - * limits) any writes on this stream until the transport is replay safe. - */ - virtual folly::Expected createUnidirectionalStream( - bool replaySafe = true) = 0; - /** * Create a bidirectional stream group. */ @@ -406,16 +367,6 @@ class QuicSocket : virtual public QuicSocketLite { virtual folly::Expected createUnidirectionalStreamInGroup(StreamGroupId groupId) = 0; - /** - * Returns the number of bidirectional streams that can be opened. - */ - virtual uint64_t getNumOpenableBidirectionalStreams() const = 0; - - /** - * Returns the number of unidirectional streams that can be opened. - */ - virtual uint64_t getNumOpenableUnidirectionalStreams() const = 0; - /** * Returns whether a stream ID represents a client-initiated stream. */ @@ -426,16 +377,6 @@ class QuicSocket : virtual public QuicSocketLite { */ virtual bool isServerStream(StreamId stream) noexcept = 0; - /** - * Returns whether a stream ID represents a unidirectional stream. - */ - virtual bool isUnidirectionalStream(StreamId stream) noexcept = 0; - - /** - * Returns whether a stream ID represents a bidirectional stream. - */ - virtual bool isBidirectionalStream(StreamId stream) noexcept = 0; - /** * Returns directionality (unidirectional or bidirectional) of a stream by ID. */ diff --git a/quic/api/QuicSocketLite.h b/quic/api/QuicSocketLite.h index 1f9b7ef62..c7a759101 100644 --- a/quic/api/QuicSocketLite.h +++ b/quic/api/QuicSocketLite.h @@ -231,6 +231,46 @@ class QuicSocketLite { receiveWindowMaxOffset(receiveWindowMaxOffsetIn) {} }; + /** + * Creates a bidirectional stream. This assigns a stream ID but does not + * send anything to the peer. + * + * If replaySafe is false, the transport will buffer (up to the send buffer + * limits) any writes on this stream until the transport is replay safe. + */ + virtual folly::Expected createBidirectionalStream( + bool replaySafe = true) = 0; + + /** + * Creates a unidirectional stream. This assigns a stream ID but does not + * send anything to the peer. + * + * If replaySafe is false, the transport will buffer (up to the send buffer + * limits) any writes on this stream until the transport is replay safe. + */ + virtual folly::Expected createUnidirectionalStream( + bool replaySafe = true) = 0; + + /** + * Returns the number of bidirectional streams that can be opened. + */ + virtual uint64_t getNumOpenableBidirectionalStreams() const = 0; + + /** + * Returns the number of unidirectional streams that can be opened. + */ + virtual uint64_t getNumOpenableUnidirectionalStreams() const = 0; + + /** + * Returns whether a stream ID represents a unidirectional stream. + */ + virtual bool isUnidirectionalStream(StreamId stream) noexcept = 0; + + /** + * Returns whether a stream ID represents a bidirectional stream. + */ + virtual bool isBidirectionalStream(StreamId stream) noexcept = 0; + /** * ===== Read API ==== */ @@ -240,6 +280,21 @@ class QuicSocketLite { */ using ReadCallback = StreamReadCallback; + /** + * Read from the given stream, up to maxLen bytes. If maxLen is 0, transport + * will return all available bytes. + * + * The return value is Expected. If the value hasError(), then a read error + * occurred and it can be obtained with error(). If the value hasValue(), + * then value() returns a pair of the data (if any) and the EOF marker. + * + * Calling read() when there is no data/eof to deliver will return an + * EAGAIN-like error code. + */ + virtual folly::Expected, LocalErrorCode> read( + StreamId id, + size_t maxLen) = 0; + /** * Set the read callback for the given stream. Note that read callback is * expected to be set all the time. Removing read callback indicates that diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index c7cf76446..170136a45 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -484,61 +484,6 @@ void QuicTransportBase::invokeStreamsAvailableCallbacks() { } } -folly::Expected, LocalErrorCode> QuicTransportBase::read( - StreamId id, - size_t maxLen) { - if (isSendingStream(conn_->nodeType, id)) { - return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); - } - if (closeState_ != CloseState::OPEN) { - return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED); - } - [[maybe_unused]] auto self = sharedGuard(); - SCOPE_EXIT { - updateReadLooper(); - updatePeekLooper(); // read can affect "peek" API - updateWriteLooper(true); - }; - try { - if (!conn_->streamManager->streamExists(id)) { - return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS); - } - auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(id)); - auto result = readDataFromQuicStream(*stream, maxLen); - if (result.second) { - VLOG(10) << "Delivered eof to app for stream=" << stream->id << " " - << *this; - auto it = readCallbacks_.find(id); - if (it != readCallbacks_.end()) { - // it's highly unlikely that someone called read() without having a read - // callback so we don't deal with the case of someone installing a read - // callback after reading the EOM. - it->second.deliveredEOM = true; - } - } - return folly::makeExpected(std::move(result)); - } catch (const QuicTransportException& ex) { - VLOG(4) << "read() error " << ex.what() << " " << *this; - exceptionCloseWhat_ = ex.what(); - closeImpl( - QuicError(QuicErrorCode(ex.errorCode()), std::string("read() error"))); - return folly::makeUnexpected(LocalErrorCode::TRANSPORT_ERROR); - } catch (const QuicInternalException& ex) { - VLOG(4) << __func__ << " " << ex.what() << " " << *this; - exceptionCloseWhat_ = ex.what(); - closeImpl( - QuicError(QuicErrorCode(ex.errorCode()), std::string("read() error"))); - return folly::makeUnexpected(ex.errorCode()); - } catch (const std::exception& ex) { - VLOG(4) << "read() error " << ex.what() << " " << *this; - exceptionCloseWhat_ = ex.what(); - closeImpl(QuicError( - QuicErrorCode(TransportErrorCode::INTERNAL_ERROR), - std::string("read() error"))); - return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR); - } -} - folly::Expected QuicTransportBase::peek( StreamId id, const folly::Function&) @@ -1153,64 +1098,6 @@ void QuicTransportBase::onNetworkData( } } -uint64_t QuicTransportBase::getNumOpenableBidirectionalStreams() const { - return conn_->streamManager->openableLocalBidirectionalStreams(); -} - -uint64_t QuicTransportBase::getNumOpenableUnidirectionalStreams() const { - return conn_->streamManager->openableLocalUnidirectionalStreams(); -} - -folly::Expected -QuicTransportBase::createStreamInternal( - bool bidirectional, - const OptionalIntegral& streamGroupId) { - if (closeState_ != CloseState::OPEN) { - return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED); - } - folly::Expected streamResult; - if (bidirectional) { - streamResult = - conn_->streamManager->createNextBidirectionalStream(streamGroupId); - } else { - streamResult = - conn_->streamManager->createNextUnidirectionalStream(streamGroupId); - } - if (streamResult) { - const StreamId streamId = streamResult.value()->id; - - if (getSocketObserverContainer() && - getSocketObserverContainer() - ->hasObserversForEvent< - SocketObserverInterface::Events::streamEvents>()) { - getSocketObserverContainer() - ->invokeInterfaceMethod< - SocketObserverInterface::Events::streamEvents>( - [event = SocketObserverInterface::StreamOpenEvent( - streamId, - getStreamInitiator(streamId), - getStreamDirectionality(streamId))]( - auto observer, auto observed) { - observer->streamOpened(observed, event); - }); - } - - return streamId; - } else { - return folly::makeUnexpected(streamResult.error()); - } -} - -folly::Expected -QuicTransportBase::createBidirectionalStream(bool /*replaySafe*/) { - return createStreamInternal(true); -} - -folly::Expected -QuicTransportBase::createUnidirectionalStream(bool /*replaySafe*/) { - return createStreamInternal(false); -} - folly::Expected QuicTransportBase::createBidirectionalStreamGroup() { if (closeState_ != CloseState::OPEN) { @@ -1245,14 +1132,6 @@ bool QuicTransportBase::isServerStream(StreamId stream) noexcept { return quic::isServerStream(stream); } -bool QuicTransportBase::isUnidirectionalStream(StreamId stream) noexcept { - return quic::isUnidirectionalStream(stream); -} - -bool QuicTransportBase::isBidirectionalStream(StreamId stream) noexcept { - return quic::isBidirectionalStream(stream); -} - StreamDirectionality QuicTransportBase::getStreamDirectionality( StreamId stream) noexcept { return quic::getStreamDirectionality(stream); diff --git a/quic/api/QuicTransportBase.h b/quic/api/QuicTransportBase.h index b2933ca74..ed359c726 100644 --- a/quic/api/QuicTransportBase.h +++ b/quic/api/QuicTransportBase.h @@ -92,10 +92,6 @@ class QuicTransportBase : public QuicSocket, folly::Expected pauseRead(StreamId id) override; folly::Expected resumeRead(StreamId id) override; - folly::Expected, LocalErrorCode> read( - StreamId id, - size_t maxLen) override; - folly::Expected setPeekCallback( StreamId id, PeekCallback* cb) override; @@ -115,10 +111,6 @@ class QuicTransportBase : public QuicSocket, folly::Expected>> consume(StreamId id, uint64_t offset, size_t amount) override; - folly::Expected createBidirectionalStream( - bool replaySafe = true) override; - folly::Expected createUnidirectionalStream( - bool replaySafe = true) override; folly::Expected createBidirectionalStreamGroup() override; folly::Expected @@ -127,12 +119,8 @@ class QuicTransportBase : public QuicSocket, StreamGroupId groupId) override; folly::Expected createUnidirectionalStreamInGroup( StreamGroupId groupId) override; - uint64_t getNumOpenableBidirectionalStreams() const override; - uint64_t getNumOpenableUnidirectionalStreams() const override; bool isClientStream(StreamId stream) noexcept override; bool isServerStream(StreamId stream) noexcept override; - bool isUnidirectionalStream(StreamId stream) noexcept override; - bool isBidirectionalStream(StreamId stream) noexcept override; StreamDirectionality getStreamDirectionality( StreamId stream) noexcept override; @@ -364,9 +352,6 @@ class QuicTransportBase : public QuicSocket, folly::Expected setPeekCallbackInternal( StreamId id, PeekCallback* cb) noexcept; - folly::Expected createStreamInternal( - bool bidirectional, - const OptionalIntegral& streamGroupId = std::nullopt); void schedulePingTimeout( PingCallback* callback, diff --git a/quic/api/QuicTransportBaseLite.cpp b/quic/api/QuicTransportBaseLite.cpp index 9175fbc34..3b51d74fa 100644 --- a/quic/api/QuicTransportBaseLite.cpp +++ b/quic/api/QuicTransportBaseLite.cpp @@ -99,6 +99,32 @@ folly::Expected QuicTransportBaseLite::stopSending( return folly::unit; } +folly::Expected +QuicTransportBaseLite::createBidirectionalStream(bool /*replaySafe*/) { + return createStreamInternal(true); +} + +folly::Expected +QuicTransportBaseLite::createUnidirectionalStream(bool /*replaySafe*/) { + return createStreamInternal(false); +} + +uint64_t QuicTransportBaseLite::getNumOpenableBidirectionalStreams() const { + return conn_->streamManager->openableLocalBidirectionalStreams(); +} + +uint64_t QuicTransportBaseLite::getNumOpenableUnidirectionalStreams() const { + return conn_->streamManager->openableLocalUnidirectionalStreams(); +} + +bool QuicTransportBaseLite::isUnidirectionalStream(StreamId stream) noexcept { + return quic::isUnidirectionalStream(stream); +} + +bool QuicTransportBaseLite::isBidirectionalStream(StreamId stream) noexcept { + return quic::isBidirectionalStream(stream); +} + folly::Expected QuicTransportBaseLite::notifyPendingWriteOnConnection( ConnectionWriteCallback* wcb) { @@ -425,6 +451,60 @@ QuicTransportBaseLite::setReadCallback( return setReadCallbackInternal(id, cb, err); } +folly::Expected, LocalErrorCode> +QuicTransportBaseLite::read(StreamId id, size_t maxLen) { + if (isSendingStream(conn_->nodeType, id)) { + return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); + } + if (closeState_ != CloseState::OPEN) { + return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED); + } + [[maybe_unused]] auto self = sharedGuard(); + SCOPE_EXIT { + updateReadLooper(); + updatePeekLooper(); // read can affect "peek" API + updateWriteLooper(true); + }; + try { + if (!conn_->streamManager->streamExists(id)) { + return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS); + } + auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(id)); + auto result = readDataFromQuicStream(*stream, maxLen); + if (result.second) { + VLOG(10) << "Delivered eof to app for stream=" << stream->id << " " + << *this; + auto it = readCallbacks_.find(id); + if (it != readCallbacks_.end()) { + // it's highly unlikely that someone called read() without having a read + // callback so we don't deal with the case of someone installing a read + // callback after reading the EOM. + it->second.deliveredEOM = true; + } + } + return folly::makeExpected(std::move(result)); + } catch (const QuicTransportException& ex) { + VLOG(4) << "read() error " << ex.what() << " " << *this; + exceptionCloseWhat_ = ex.what(); + closeImpl( + QuicError(QuicErrorCode(ex.errorCode()), std::string("read() error"))); + return folly::makeUnexpected(LocalErrorCode::TRANSPORT_ERROR); + } catch (const QuicInternalException& ex) { + VLOG(4) << __func__ << " " << ex.what() << " " << *this; + exceptionCloseWhat_ = ex.what(); + closeImpl( + QuicError(QuicErrorCode(ex.errorCode()), std::string("read() error"))); + return folly::makeUnexpected(ex.errorCode()); + } catch (const std::exception& ex) { + VLOG(4) << "read() error " << ex.what() << " " << *this; + exceptionCloseWhat_ = ex.what(); + closeImpl(QuicError( + QuicErrorCode(TransportErrorCode::INTERNAL_ERROR), + std::string("read() error"))); + return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR); + } +} + folly::Expected QuicTransportBaseLite::setStreamPriority(StreamId id, Priority priority) { if (closeState_ != CloseState::OPEN) { @@ -1211,6 +1291,46 @@ void QuicTransportBaseLite::closeUdpSocket() { sock->close(); } +folly::Expected +QuicTransportBaseLite::createStreamInternal( + bool bidirectional, + const OptionalIntegral& streamGroupId) { + if (closeState_ != CloseState::OPEN) { + return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED); + } + folly::Expected streamResult; + if (bidirectional) { + streamResult = + conn_->streamManager->createNextBidirectionalStream(streamGroupId); + } else { + streamResult = + conn_->streamManager->createNextUnidirectionalStream(streamGroupId); + } + if (streamResult) { + const StreamId streamId = streamResult.value()->id; + + if (getSocketObserverContainer() && + getSocketObserverContainer() + ->hasObserversForEvent< + SocketObserverInterface::Events::streamEvents>()) { + getSocketObserverContainer() + ->invokeInterfaceMethod< + SocketObserverInterface::Events::streamEvents>( + [event = SocketObserverInterface::StreamOpenEvent( + streamId, + getStreamInitiator(streamId), + getStreamDirectionality(streamId))]( + auto observer, auto observed) { + observer->streamOpened(observed, event); + }); + } + + return streamId; + } else { + return folly::makeUnexpected(streamResult.error()); + } +} + void QuicTransportBaseLite::cancelTimeout(QuicTimerCallback* callback) { callback->cancelTimerCallback(); } diff --git a/quic/api/QuicTransportBaseLite.h b/quic/api/QuicTransportBaseLite.h index e4e43bf8f..3709e01e0 100644 --- a/quic/api/QuicTransportBaseLite.h +++ b/quic/api/QuicTransportBaseLite.h @@ -61,6 +61,15 @@ class QuicTransportBaseLite : virtual public QuicSocketLite, StreamId id, ApplicationErrorCode error) override; + folly::Expected createBidirectionalStream( + bool replaySafe = true) override; + folly::Expected createUnidirectionalStream( + bool replaySafe = true) override; + uint64_t getNumOpenableBidirectionalStreams() const override; + uint64_t getNumOpenableUnidirectionalStreams() const override; + bool isUnidirectionalStream(StreamId stream) noexcept override; + bool isBidirectionalStream(StreamId stream) noexcept override; + folly::Expected notifyPendingWriteOnStream( StreamId id, StreamWriteCallback* wcb) override; @@ -137,6 +146,10 @@ class QuicTransportBaseLite : virtual public QuicSocketLite, Optional err = GenericApplicationErrorCode::NO_ERROR) override; + folly::Expected, LocalErrorCode> read( + StreamId id, + size_t maxLen) override; + void setReceiveWindow(StreamId, size_t /*recvWindowSize*/) override {} void setSendBuffer(StreamId, size_t /*maxUnacked*/, size_t /*maxUnsent*/) @@ -455,6 +468,10 @@ class QuicTransportBaseLite : virtual public QuicSocketLite, void closeUdpSocket(); + folly::Expected createStreamInternal( + bool bidirectional, + const OptionalIntegral& streamGroupId = std::nullopt); + void runOnEvbAsync( folly::Function)> func);