Skip to content

Commit

Permalink
Miscellaneous fixes for streaming client
Browse files Browse the repository at this point in the history
Summary:
Move the cancel() operation (for fetches) into the mysql thread since we are releasing memory that is owned by that thread.

Make state_ atomic and provide a new `isCancelling()` function.

Make isPaused() work without TSAN issues when called from outside the MySQL thread.

Reviewed By: aditya-jalan

Differential Revision: D70333626

fbshipit-source-id: 1580830afdf37c0f04ec24ec8ae5db5d21e1627b
  • Loading branch information
Jay Edgar authored and facebook-github-bot committed Mar 5, 2025
1 parent 86521b1 commit 5bb0897
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 10 deletions.
15 changes: 15 additions & 0 deletions third-party/squangle/src/squangle/mysql_client/FetchOperation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,21 @@ const InternalConnection& FetchOperationImpl::getInternalConnection() const {
return conn().getInternalConnection();
}

void FetchOperationImpl::cancel() {
// Free any allocated results before the connection is closed
// We need to do this in the mysql_thread for async versions as the
// mysql_thread _might_ be using that memory
auto cancelFn = [&]() {
current_row_stream_ = folly::none;
OperationBase::cancel();
};
if (client_.isInCorrectThread(true)) {
cancelFn();
} else {
client_.runInThread(std::move(cancelFn), true /*wait*/);
}
}

uint64_t FetchOperationImpl::currentLastInsertId() const {
CHECK_THROW(isStreamAccessAllowed(), db::OperationStateException);
return current_last_insert_id_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,7 @@ class FetchOperationImpl : virtual public OperationBase {
use_checksum_ = useChecksum;
}

void cancel() override {
// Free any allocated results before the connection is closed
current_row_stream_ = folly::none;
OperationBase::cancel();
}
void cancel() override;

uint64_t currentLastInsertId() const;
uint64_t currentAffectedRows() const;
Expand Down
10 changes: 7 additions & 3 deletions third-party/squangle/src/squangle/mysql_client/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,11 @@ class OperationBase {
}

OperationState state() const {
return state_;
return state_.load(std::memory_order_relaxed);
}

bool isCancelling() const {
return state() == OperationState::Cancelling;
}

void setObserverCallback(ObserverCallback obs_cb);
Expand Down Expand Up @@ -217,7 +221,7 @@ class OperationBase {
void setConnectionContext(
std::shared_ptr<db::ConnectionContextBase> context) {
CHECK_THROW(
state_ == OperationState::Unstarted, db::OperationStateException);
state() == OperationState::Unstarted, db::OperationStateException);
connection_context_ = std::move(context);
}

Expand Down Expand Up @@ -454,7 +458,7 @@ class OperationBase {
friend class SyncConnection;

// Data members; subclasses freely interact with these.
OperationState state_{OperationState::Unstarted};
std::atomic<OperationState> state_{OperationState::Unstarted};
OperationResult result_{OperationResult::Unknown};

// Connection or query attributes (depending on the Operation type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,20 @@ bool MysqlFetchOperationImpl::isStreamAccessAllowed() const {
return isPaused() || isInEventBaseThread();
}

bool MysqlFetchOperationImpl::isPaused() const {
bool MysqlFetchOperationImpl::isPausedImpl() const {
return active_fetch_action_ == FetchAction::WaitForConsumer;
}

bool MysqlFetchOperationImpl::isPaused() const {
if (client_.isInCorrectThread(true)) {
return isPausedImpl();
}

bool isPaused = false;
client_.runInThread([&]() { isPaused = isPausedImpl(); }, true);
return isPaused;
}

void MysqlFetchOperationImpl::specializedRun() {
if (!conn().runInThread([&]() { specializedRunImpl(); })) {
completeOperationInner(OperationResult::Failed);
Expand Down Expand Up @@ -277,7 +287,7 @@ void MysqlFetchOperationImpl::pauseForConsumer() {
}

void MysqlFetchOperationImpl::resumeImpl() {
CHECK_THROW(isPaused(), db::OperationStateException);
CHECK_THROW(isPausedImpl(), db::OperationStateException);

// We should only allow pauses during fetch or between queries.
// If we come back as RowsFetched and the stream has completed the query,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class MysqlFetchOperationImpl : public MysqlOperationImpl,

private:
void resumeImpl();
bool isPausedImpl() const;
// Checks if the current thread has access to stream, or result data.
bool isStreamAccessAllowed() const override;

Expand Down

0 comments on commit 5bb0897

Please sign in to comment.