Skip to content

Commit

Permalink
Fix ExchangeClientTest.sourceTimeout (#8052)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #8052

It's passing internally but failing on CircleCI

Reviewed By: kewang1024

Differential Revision: D52169779

fbshipit-source-id: e2f271719b0d631daece5eccd1fff69a2c3c1590
  • Loading branch information
Yuhta authored and facebook-github-bot committed Dec 14, 2023
1 parent 4f3b800 commit a0ec8e4
Showing 1 changed file with 81 additions and 100 deletions.
181 changes: 81 additions & 100 deletions velox/exec/tests/utils/LocalExchangeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,116 +52,97 @@ class LocalExchangeSource : public exec::ExchangeSource {
VELOX_CHECK(requestPending_);
auto requestedSequence = sequence_;
auto self = shared_from_this();
auto hasBeenCalled = std::make_shared<bool>(false);
static std::mutex resultCallbackMutex;
// Since this lambda may outlive 'this', we need to capture a
// shared_ptr to the current object (self).
auto resultCallback =
[self, requestedSequence, buffers, hasBeenCalled, this](
std::vector<std::unique_ptr<folly::IOBuf>> data, int64_t sequence) {
{
std::lock_guard<std::mutex> l(resultCallbackMutex);
// This is called when data is found and when this times out. Only
// the first of the two runs the body of the function.
if (*hasBeenCalled) {
return;
}
*hasBeenCalled = true;
}
if (data.empty()) {
common::testutil::TestValue::adjust(
"facebook::velox::exec::test::LocalExchangeSource::timeout",
this);
VeloxPromise<Response> requestPromise;
{
std::lock_guard<std::mutex> l(queue_->mutex());
requestPending_ = false;
requestPromise = std::move(promise_);
}
if (!requestPromise.isFulfilled()) {
requestPromise.setValue(Response{0, false});
}
return;
}
if (requestedSequence > sequence) {
VLOG(2) << "Receives earlier sequence than requested: task "
<< taskId_ << ", destination " << destination_
<< ", requested " << sequence << ", received "
<< requestedSequence;
int64_t nExtra = requestedSequence - sequence;
VELOX_CHECK(nExtra < data.size());
data.erase(data.begin(), data.begin() + nExtra);
sequence = requestedSequence;
auto resultCallback = [self, requestedSequence, buffers, this](
std::vector<std::unique_ptr<folly::IOBuf>> data,
int64_t sequence) {
if (requestedSequence > sequence) {
VLOG(2) << "Receives earlier sequence than requested: task " << taskId_
<< ", destination " << destination_ << ", requested "
<< sequence << ", received " << requestedSequence;
int64_t nExtra = requestedSequence - sequence;
VELOX_CHECK(nExtra < data.size());
data.erase(data.begin(), data.begin() + nExtra);
sequence = requestedSequence;
}
std::vector<std::unique_ptr<SerializedPage>> pages;
bool atEnd = false;
int64_t totalBytes = 0;
for (auto& inputPage : data) {
if (!inputPage) {
atEnd = true;
// Keep looping, there could be extra end markers.
continue;
}
totalBytes += inputPage->length();
inputPage->unshare();
pages.push_back(std::make_unique<SerializedPage>(std::move(inputPage)));
inputPage = nullptr;
}
numPages_ += pages.size();
totalBytes_ += totalBytes;

try {
common::testutil::TestValue::adjust(
"facebook::velox::exec::test::LocalExchangeSource", &numPages_);
} catch (const std::exception& e) {
queue_->setError(e.what());
checkSetRequestPromise();
return;
}

int64_t ackSequence;
VeloxPromise<Response> requestPromise;
{
std::vector<ContinuePromise> queuePromises;
{
std::lock_guard<std::mutex> l(queue_->mutex());
requestPending_ = false;
requestPromise = std::move(promise_);
for (auto& page : pages) {
queue_->enqueueLocked(std::move(page), queuePromises);
}
std::vector<std::unique_ptr<SerializedPage>> pages;
bool atEnd = false;
int64_t totalBytes = 0;
for (auto& inputPage : data) {
if (!inputPage) {
atEnd = true;
// Keep looping, there could be extra end markers.
continue;
}
totalBytes += inputPage->length();
inputPage->unshare();
pages.push_back(
std::make_unique<SerializedPage>(std::move(inputPage)));
inputPage = nullptr;
}
numPages_ += pages.size();
totalBytes_ += totalBytes;

try {
common::testutil::TestValue::adjust(
"facebook::velox::exec::test::LocalExchangeSource", &numPages_);
} catch (const std::exception& e) {
queue_->setError(e.what());
checkSetRequestPromise();
return;
if (atEnd) {
queue_->enqueueLocked(nullptr, queuePromises);
atEnd_ = true;
}
ackSequence = sequence_ = sequence + pages.size();
}
for (auto& promise : queuePromises) {
promise.setValue();
}
}
// Outside of queue mutex.
if (atEnd_) {
buffers->deleteResults(taskId_, destination_);
} else {
buffers->acknowledge(taskId_, destination_, ackSequence);
}

if (!requestPromise.isFulfilled()) {
requestPromise.setValue(Response{totalBytes, atEnd_});
}
};

int64_t ackSequence;
// Call the callback in any case after timeout.
auto& exec = folly::QueuedImmediateExecutor::instance();
future = std::move(future).via(&exec).onTimeout(
std::chrono::seconds(maxWaitSeconds), [self, this] {
common::testutil::TestValue::adjust(
"facebook::velox::exec::test::LocalExchangeSource::timeout",
this);
VeloxPromise<Response> requestPromise;
{
std::vector<ContinuePromise> queuePromises;
{
std::lock_guard<std::mutex> l(queue_->mutex());
requestPending_ = false;
requestPromise = std::move(promise_);
for (auto& page : pages) {
queue_->enqueueLocked(std::move(page), queuePromises);
}
if (atEnd) {
queue_->enqueueLocked(nullptr, queuePromises);
atEnd_ = true;
}
ackSequence = sequence_ = sequence + pages.size();
}
for (auto& promise : queuePromises) {
promise.setValue();
}
}
// Outside of queue mutex.
if (atEnd_) {
buffers->deleteResults(taskId_, destination_);
} else {
buffers->acknowledge(taskId_, destination_, ackSequence);
std::lock_guard<std::mutex> l(queue_->mutex());
requestPending_ = false;
requestPromise = std::move(promise_);
}

Response response = {0, false};
if (!requestPromise.isFulfilled()) {
requestPromise.setValue(Response{totalBytes, atEnd_});
requestPromise.setValue(response);
}
};

// Call the callback in any case after timeout. 'future' returned
// from this will be realized with no error but empty data. Also,
// the future is a SemiFuture, so setting a timeout on the future
// in this function is not possible.
auto& exec = folly::QueuedImmediateExecutor::instance();
std::move(folly::futures::sleep(std::chrono::seconds(maxWaitSeconds)))
.via(&exec)
.thenValue([resultCallback, requestedSequence](auto /*ignore*/) {
resultCallback({}, requestedSequence);
return response;
});

buffers->getData(
Expand Down

0 comments on commit a0ec8e4

Please sign in to comment.