Skip to content

Commit

Permalink
Move core functionality to QuicTransportBaseLite [15/n]
Browse files Browse the repository at this point in the history
Summary: See title.

Reviewed By: kvtsoy

Differential Revision: D64542784

fbshipit-source-id: a155c441b3d2cf0a8220e7ddebd94f0335cb9420
  • Loading branch information
Aman Sharma authored and facebook-github-bot committed Oct 23, 2024
1 parent bf25fd8 commit 520ae73
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 195 deletions.
59 changes: 0 additions & 59 deletions quic/api/QuicSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,21 +264,6 @@ class QuicSocket : virtual public QuicSocketLite {
virtual folly::Expected<folly::Unit, LocalErrorCode> resumeRead(
StreamId id) = 0;

/**
* Read from the given stream, up to maxLen bytes. If maxLen is 0, transport
* will return all available bytes.
*
* The return value is Expected. If the value hasError(), then a read error
* occurred and it can be obtained with error(). If the value hasValue(),
* then value() returns a pair of the data (if any) and the EOF marker.
*
* Calling read() when there is no data/eof to deliver will return an
* EAGAIN-like error code.
*/
virtual folly::Expected<std::pair<Buf, bool>, LocalErrorCode> read(
StreamId id,
size_t maxLen) = 0;

/**
* ===== Peek/Consume API =====
*/
Expand Down Expand Up @@ -358,30 +343,6 @@ class QuicSocket : virtual public QuicSocketLite {
StreamId id,
size_t amount) = 0;

/**
* ===== Write API =====
*/

/**
* Creates a bidirectional stream. This assigns a stream ID but does not
* send anything to the peer.
*
* If replaySafe is false, the transport will buffer (up to the send buffer
* limits) any writes on this stream until the transport is replay safe.
*/
virtual folly::Expected<StreamId, LocalErrorCode> createBidirectionalStream(
bool replaySafe = true) = 0;

/**
* Creates a unidirectional stream. This assigns a stream ID but does not
* send anything to the peer.
*
* If replaySafe is false, the transport will buffer (up to the send buffer
* limits) any writes on this stream until the transport is replay safe.
*/
virtual folly::Expected<StreamId, LocalErrorCode> createUnidirectionalStream(
bool replaySafe = true) = 0;

/**
* Create a bidirectional stream group.
*/
Expand All @@ -406,16 +367,6 @@ class QuicSocket : virtual public QuicSocketLite {
virtual folly::Expected<StreamId, LocalErrorCode>
createUnidirectionalStreamInGroup(StreamGroupId groupId) = 0;

/**
* Returns the number of bidirectional streams that can be opened.
*/
virtual uint64_t getNumOpenableBidirectionalStreams() const = 0;

/**
* Returns the number of unidirectional streams that can be opened.
*/
virtual uint64_t getNumOpenableUnidirectionalStreams() const = 0;

/**
* Returns whether a stream ID represents a client-initiated stream.
*/
Expand All @@ -426,16 +377,6 @@ class QuicSocket : virtual public QuicSocketLite {
*/
virtual bool isServerStream(StreamId stream) noexcept = 0;

/**
* Returns whether a stream ID represents a unidirectional stream.
*/
virtual bool isUnidirectionalStream(StreamId stream) noexcept = 0;

/**
* Returns whether a stream ID represents a bidirectional stream.
*/
virtual bool isBidirectionalStream(StreamId stream) noexcept = 0;

/**
* Returns directionality (unidirectional or bidirectional) of a stream by ID.
*/
Expand Down
55 changes: 55 additions & 0 deletions quic/api/QuicSocketLite.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,46 @@ class QuicSocketLite {
receiveWindowMaxOffset(receiveWindowMaxOffsetIn) {}
};

/**
* Creates a bidirectional stream. This assigns a stream ID but does not
* send anything to the peer.
*
* If replaySafe is false, the transport will buffer (up to the send buffer
* limits) any writes on this stream until the transport is replay safe.
*/
virtual folly::Expected<StreamId, LocalErrorCode> createBidirectionalStream(
bool replaySafe = true) = 0;

/**
* Creates a unidirectional stream. This assigns a stream ID but does not
* send anything to the peer.
*
* If replaySafe is false, the transport will buffer (up to the send buffer
* limits) any writes on this stream until the transport is replay safe.
*/
virtual folly::Expected<StreamId, LocalErrorCode> createUnidirectionalStream(
bool replaySafe = true) = 0;

/**
* Returns the number of bidirectional streams that can be opened.
*/
virtual uint64_t getNumOpenableBidirectionalStreams() const = 0;

/**
* Returns the number of unidirectional streams that can be opened.
*/
virtual uint64_t getNumOpenableUnidirectionalStreams() const = 0;

/**
* Returns whether a stream ID represents a unidirectional stream.
*/
virtual bool isUnidirectionalStream(StreamId stream) noexcept = 0;

/**
* Returns whether a stream ID represents a bidirectional stream.
*/
virtual bool isBidirectionalStream(StreamId stream) noexcept = 0;

/**
* ===== Read API ====
*/
Expand All @@ -240,6 +280,21 @@ class QuicSocketLite {
*/
using ReadCallback = StreamReadCallback;

/**
* Read from the given stream, up to maxLen bytes. If maxLen is 0, transport
* will return all available bytes.
*
* The return value is Expected. If the value hasError(), then a read error
* occurred and it can be obtained with error(). If the value hasValue(),
* then value() returns a pair of the data (if any) and the EOF marker.
*
* Calling read() when there is no data/eof to deliver will return an
* EAGAIN-like error code.
*/
virtual folly::Expected<std::pair<Buf, bool>, LocalErrorCode> read(
StreamId id,
size_t maxLen) = 0;

/**
* Set the read callback for the given stream. Note that read callback is
* expected to be set all the time. Removing read callback indicates that
Expand Down
121 changes: 0 additions & 121 deletions quic/api/QuicTransportBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -484,61 +484,6 @@ void QuicTransportBase::invokeStreamsAvailableCallbacks() {
}
}

folly::Expected<std::pair<Buf, bool>, LocalErrorCode> QuicTransportBase::read(
StreamId id,
size_t maxLen) {
if (isSendingStream(conn_->nodeType, id)) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
[[maybe_unused]] auto self = sharedGuard();
SCOPE_EXIT {
updateReadLooper();
updatePeekLooper(); // read can affect "peek" API
updateWriteLooper(true);
};
try {
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(id));
auto result = readDataFromQuicStream(*stream, maxLen);
if (result.second) {
VLOG(10) << "Delivered eof to app for stream=" << stream->id << " "
<< *this;
auto it = readCallbacks_.find(id);
if (it != readCallbacks_.end()) {
// it's highly unlikely that someone called read() without having a read
// callback so we don't deal with the case of someone installing a read
// callback after reading the EOM.
it->second.deliveredEOM = true;
}
}
return folly::makeExpected<LocalErrorCode>(std::move(result));
} catch (const QuicTransportException& ex) {
VLOG(4) << "read() error " << ex.what() << " " << *this;
exceptionCloseWhat_ = ex.what();
closeImpl(
QuicError(QuicErrorCode(ex.errorCode()), std::string("read() error")));
return folly::makeUnexpected(LocalErrorCode::TRANSPORT_ERROR);
} catch (const QuicInternalException& ex) {
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
exceptionCloseWhat_ = ex.what();
closeImpl(
QuicError(QuicErrorCode(ex.errorCode()), std::string("read() error")));
return folly::makeUnexpected(ex.errorCode());
} catch (const std::exception& ex) {
VLOG(4) << "read() error " << ex.what() << " " << *this;
exceptionCloseWhat_ = ex.what();
closeImpl(QuicError(
QuicErrorCode(TransportErrorCode::INTERNAL_ERROR),
std::string("read() error")));
return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR);
}
}

folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBase::peek(
StreamId id,
const folly::Function<void(StreamId id, const folly::Range<PeekIterator>&)
Expand Down Expand Up @@ -1153,64 +1098,6 @@ void QuicTransportBase::onNetworkData(
}
}

uint64_t QuicTransportBase::getNumOpenableBidirectionalStreams() const {
return conn_->streamManager->openableLocalBidirectionalStreams();
}

uint64_t QuicTransportBase::getNumOpenableUnidirectionalStreams() const {
return conn_->streamManager->openableLocalUnidirectionalStreams();
}

folly::Expected<StreamId, LocalErrorCode>
QuicTransportBase::createStreamInternal(
bool bidirectional,
const OptionalIntegral<StreamGroupId>& streamGroupId) {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
folly::Expected<QuicStreamState*, LocalErrorCode> streamResult;
if (bidirectional) {
streamResult =
conn_->streamManager->createNextBidirectionalStream(streamGroupId);
} else {
streamResult =
conn_->streamManager->createNextUnidirectionalStream(streamGroupId);
}
if (streamResult) {
const StreamId streamId = streamResult.value()->id;

if (getSocketObserverContainer() &&
getSocketObserverContainer()
->hasObserversForEvent<
SocketObserverInterface::Events::streamEvents>()) {
getSocketObserverContainer()
->invokeInterfaceMethod<
SocketObserverInterface::Events::streamEvents>(
[event = SocketObserverInterface::StreamOpenEvent(
streamId,
getStreamInitiator(streamId),
getStreamDirectionality(streamId))](
auto observer, auto observed) {
observer->streamOpened(observed, event);
});
}

return streamId;
} else {
return folly::makeUnexpected(streamResult.error());
}
}

folly::Expected<StreamId, LocalErrorCode>
QuicTransportBase::createBidirectionalStream(bool /*replaySafe*/) {
return createStreamInternal(true);
}

folly::Expected<StreamId, LocalErrorCode>
QuicTransportBase::createUnidirectionalStream(bool /*replaySafe*/) {
return createStreamInternal(false);
}

folly::Expected<StreamGroupId, LocalErrorCode>
QuicTransportBase::createBidirectionalStreamGroup() {
if (closeState_ != CloseState::OPEN) {
Expand Down Expand Up @@ -1245,14 +1132,6 @@ bool QuicTransportBase::isServerStream(StreamId stream) noexcept {
return quic::isServerStream(stream);
}

bool QuicTransportBase::isUnidirectionalStream(StreamId stream) noexcept {
return quic::isUnidirectionalStream(stream);
}

bool QuicTransportBase::isBidirectionalStream(StreamId stream) noexcept {
return quic::isBidirectionalStream(stream);
}

StreamDirectionality QuicTransportBase::getStreamDirectionality(
StreamId stream) noexcept {
return quic::getStreamDirectionality(stream);
Expand Down
15 changes: 0 additions & 15 deletions quic/api/QuicTransportBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,6 @@ class QuicTransportBase : public QuicSocket,
folly::Expected<folly::Unit, LocalErrorCode> pauseRead(StreamId id) override;
folly::Expected<folly::Unit, LocalErrorCode> resumeRead(StreamId id) override;

folly::Expected<std::pair<Buf, bool>, LocalErrorCode> read(
StreamId id,
size_t maxLen) override;

folly::Expected<folly::Unit, LocalErrorCode> setPeekCallback(
StreamId id,
PeekCallback* cb) override;
Expand All @@ -115,10 +111,6 @@ class QuicTransportBase : public QuicSocket,
folly::Expected<folly::Unit, std::pair<LocalErrorCode, Optional<uint64_t>>>
consume(StreamId id, uint64_t offset, size_t amount) override;

folly::Expected<StreamId, LocalErrorCode> createBidirectionalStream(
bool replaySafe = true) override;
folly::Expected<StreamId, LocalErrorCode> createUnidirectionalStream(
bool replaySafe = true) override;
folly::Expected<StreamGroupId, LocalErrorCode>
createBidirectionalStreamGroup() override;
folly::Expected<StreamGroupId, LocalErrorCode>
Expand All @@ -127,12 +119,8 @@ class QuicTransportBase : public QuicSocket,
StreamGroupId groupId) override;
folly::Expected<StreamId, LocalErrorCode> createUnidirectionalStreamInGroup(
StreamGroupId groupId) override;
uint64_t getNumOpenableBidirectionalStreams() const override;
uint64_t getNumOpenableUnidirectionalStreams() const override;
bool isClientStream(StreamId stream) noexcept override;
bool isServerStream(StreamId stream) noexcept override;
bool isUnidirectionalStream(StreamId stream) noexcept override;
bool isBidirectionalStream(StreamId stream) noexcept override;
StreamDirectionality getStreamDirectionality(
StreamId stream) noexcept override;

Expand Down Expand Up @@ -364,9 +352,6 @@ class QuicTransportBase : public QuicSocket,
folly::Expected<folly::Unit, LocalErrorCode> setPeekCallbackInternal(
StreamId id,
PeekCallback* cb) noexcept;
folly::Expected<StreamId, LocalErrorCode> createStreamInternal(
bool bidirectional,
const OptionalIntegral<StreamGroupId>& streamGroupId = std::nullopt);

void schedulePingTimeout(
PingCallback* callback,
Expand Down
Loading

0 comments on commit 520ae73

Please sign in to comment.