Skip to content

Commit

Permalink
Add heartbeat manager
Browse files Browse the repository at this point in the history
  • Loading branch information
pranjalssh committed Aug 29, 2023
1 parent 20090bc commit 3145f4e
Show file tree
Hide file tree
Showing 13 changed files with 374 additions and 146 deletions.
116 changes: 11 additions & 105 deletions presto-native-execution/presto_cpp/main/Announcer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@
#include <boost/lexical_cast.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <folly/Random.h>
#include <folly/futures/Retrying.h>
#include <velox/common/memory/Memory.h>
#include "presto_cpp/external/json/json.hpp"
#include "presto_cpp/main/http/HttpClient.h"

namespace facebook::presto {
namespace {
Expand Down Expand Up @@ -82,13 +80,17 @@ Announcer::Announcer(
const std::string& nodeId,
const std::string& nodeLocation,
const std::vector<std::string>& connectorIds,
const uint64_t minFrequencyMs,
const uint64_t maxFrequencyMs,
const std::string& clientCertAndKeyPath,
const std::string& ciphers)
: coordinatorDiscoverer_(coordinatorDiscoverer),
minFrequencyMs_(minFrequencyMs),
maxFrequencyMs_(maxFrequencyMs),
: PeriodicServiceInventoryManager(
address,
port,
coordinatorDiscoverer,
clientCertAndKeyPath,
ciphers,
"Announcement",
maxFrequencyMs),
announcementBody_(announcementBody(
address,
useHttps,
Expand All @@ -98,106 +100,10 @@ Announcer::Announcer(
nodeLocation,
connectorIds)),
announcementRequest_(
announcementRequest(address, port, nodeId, announcementBody_)),
pool_(velox::memory::addDefaultLeafMemoryPool("Announcer")),
eventBaseThread_(false /*autostart*/),
clientCertAndKeyPath_(clientCertAndKeyPath),
ciphers_(ciphers) {}

void Announcer::start() {
eventBaseThread_.start("Announcer");
stopped_ = false;
auto* eventBase = eventBaseThread_.getEventBase();
eventBase->runOnDestruction([this] { client_.reset(); });
eventBase->schedule([this]() { return makeAnnouncement(); });
}

void Announcer::stop() {
stopped_ = true;
eventBaseThread_.stop();
}

void Announcer::makeAnnouncement() {
// stop() calls EventBase's destructor which executed all pending callbacks;
// make sure not to do anything if that's the case
if (stopped_) {
return;
}

try {
auto newAddress = coordinatorDiscoverer_->updateAddress();
if (newAddress != address_) {
LOG(INFO) << "Discovery service changed to " << newAddress.getAddressStr()
<< ":" << newAddress.getPort();
std::swap(address_, newAddress);
client_ = std::make_shared<http::HttpClient>(
eventBaseThread_.getEventBase(),
address_,
std::chrono::milliseconds(10'000),
pool_,
clientCertAndKeyPath_,
ciphers_);
}
} catch (const std::exception& ex) {
LOG(WARNING) << "Error occurred during announcement run: " << ex.what();
scheduleNext();
return;
}

client_->sendRequest(announcementRequest_, announcementBody_)
.via(eventBaseThread_.getEventBase())
.thenValue([this](auto response) {
auto message = response->headers();
if (message->getStatusCode() != http::kHttpAccepted) {
++failedAttempts_;
LOG(WARNING) << "Announcement failed: HTTP "
<< message->getStatusCode() << " - "
<< response->dumpBodyChain();
} else if (response->hasError()) {
++failedAttempts_;
LOG(ERROR) << "Announcement failed: " << response->error();
} else {
failedAttempts_ = 0;
LOG(INFO) << "Announcement succeeded: HTTP "
<< message->getStatusCode();
}
})
.thenError(
folly::tag_t<std::exception>{},
[this](const std::exception& e) {
++failedAttempts_;
LOG(WARNING) << "Announcement failed: " << e.what();
})
.thenTry([this](auto /*unused*/) { scheduleNext(); });
}

uint64_t Announcer::getAnnouncementDelay() const {
if (failedAttempts_ > 0) {
// For announcement failure cases, execute exponential back off to ping
// coordinator with max back off time cap at 'maxFrequencyMs_'.
auto rng = folly::ThreadLocalPRNG();
return folly::futures::detail::retryingJitteredExponentialBackoffDur(
failedAttempts_,
std::chrono::milliseconds(minFrequencyMs_),
std::chrono::milliseconds(maxFrequencyMs_),
backOffjitterParam_,
rng)
.count();
}

// Adds some jitter for successful cases so that all workers does not ping
// coordinator at the same time
return maxFrequencyMs_ + folly::Random::rand32(2000) - 1000;
}
announcementRequest(address, port, nodeId, announcementBody_)) {}

void Announcer::scheduleNext() {
if (stopped_) {
return;
}
eventBaseThread_.getEventBase()->scheduleAt(
[this]() { return makeAnnouncement(); },
std::chrono::steady_clock::now() +
std::chrono::milliseconds(getAnnouncementDelay()));
std::tuple<proxygen::HTTPMessage, std::string> Announcer::httpRequest() {
return {announcementRequest_, announcementBody_};
}

} // namespace facebook::presto
29 changes: 4 additions & 25 deletions presto-native-execution/presto_cpp/main/Announcer.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
#include <folly/io/async/EventBaseThread.h>
#include <presto_cpp/main/http/HttpClient.h>
#include "presto_cpp/main/CoordinatorDiscoverer.h"
#include "presto_cpp/main/PeriodicServiceInventoryManager.h"

namespace facebook::presto {

class Announcer {
class Announcer : public PeriodicServiceInventoryManager {
public:
Announcer(
const std::string& address,
Expand All @@ -31,40 +32,18 @@ class Announcer {
const std::string& nodeId,
const std::string& nodeLocation,
const std::vector<std::string>& connectorIds,
const uint64_t minFrequencyMs,
const uint64_t maxFrequencyMs_,
const std::string& clientCertAndKeyPath = "",
const std::string& ciphers = "");

~Announcer() = default;

void start();

void stop();
protected:
std::tuple<proxygen::HTTPMessage, std::string> httpRequest() override;

private:
void makeAnnouncement();

uint64_t getAnnouncementDelay() const;

void scheduleNext();

const std::shared_ptr<CoordinatorDiscoverer> coordinatorDiscoverer_;
const uint64_t minFrequencyMs_;
const uint64_t maxFrequencyMs_;
const std::string announcementBody_;
const proxygen::HTTPMessage announcementRequest_;
const std::shared_ptr<velox::memory::MemoryPool> pool_;
folly::EventBaseThread eventBaseThread_;
const std::string clientCertAndKeyPath_;
const std::string ciphers_;
/// jitter value for backoff delay time in case of announcment failure
const double backOffjitterParam_{0.1};

folly::SocketAddress address_;
std::shared_ptr<http::HttpClient> client_;
std::atomic_bool stopped_{true};
uint64_t failedAttempts_{0};
};

} // namespace facebook::presto
4 changes: 3 additions & 1 deletion presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ add_library(
ServerOperation.cpp
SignalHandler.cpp
TaskManager.cpp
TaskResource.cpp)
TaskResource.cpp
PeriodicHeartbeatManager.cpp
PeriodicServiceInventoryManager.cpp)

add_dependencies(presto_server_lib presto_operators presto_protocol
presto_types presto_thrift-cpp2 presto_thrift_extra)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

namespace facebook::presto {

// TODO: Rename to ServiceDiscoverer as this may talk to either
// Coordinator or Resource Manager.
class CoordinatorDiscoverer {
public:
virtual ~CoordinatorDiscoverer() = default;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "presto_cpp/main/PeriodicHeartbeatManager.h"
#include <velox/common/memory/Memory.h>

namespace facebook::presto {
PeriodicHeartbeatManager::PeriodicHeartbeatManager(
const std::string& address,
int port,
const std::shared_ptr<CoordinatorDiscoverer>& coordinatorDiscoverer,
const std::string& clientCertAndKeyPath,
const std::string& ciphers,
std::function<protocol::NodeStatus()> nodeStatusFetcher,
uint64_t frequencyMs)
: PeriodicServiceInventoryManager(
address,
port,
coordinatorDiscoverer,
clientCertAndKeyPath,
ciphers,
"Heartbeat",
frequencyMs),
nodeStatusFetcher_(std::move(nodeStatusFetcher)) {}

std::tuple<proxygen::HTTPMessage, std::string>
PeriodicHeartbeatManager::httpRequest() {
nlohmann::json j;
to_json(j, nodeStatusFetcher_());
std::string body = j.dump();
proxygen::HTTPMessage request;
request.setMethod(proxygen::HTTPMethod::PUT);
request.setURL("/v1/heartbeat");
request.getHeaders().set(
proxygen::HTTP_HEADER_HOST, fmt::format("{}:{}", address_, port_));
request.getHeaders().set(
proxygen::HTTP_HEADER_CONTENT_TYPE, "application/json");
request.getHeaders().set(
proxygen::HTTP_HEADER_CONTENT_LENGTH, std::to_string(body.size()));
return {request, body};
}

} // namespace facebook::presto
46 changes: 46 additions & 0 deletions presto-native-execution/presto_cpp/main/PeriodicHeartbeatManager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "presto_cpp/main/PeriodicServiceInventoryManager.h"
#include "presto_cpp/presto_protocol/presto_protocol.h"

namespace facebook::presto {

class PeriodicHeartbeatManager : public PeriodicServiceInventoryManager {
public:
PeriodicHeartbeatManager(
const std::string& address,
int port,
const std::shared_ptr<CoordinatorDiscoverer>& coordinatorDiscoverer,
const std::string& clientCertAndKeyPath,
const std::string& ciphers,
std::function<protocol::NodeStatus()> nodeStatusFetcher,
uint64_t frequencyMs);

protected:
bool retryFailed() override {
return false;
}

int updateServiceTimes() override {
return 10;
}

std::tuple<proxygen::HTTPMessage, std::string> httpRequest() override;

private:
std::function<protocol::NodeStatus()> nodeStatusFetcher_;
};
} // namespace facebook::presto
Loading

0 comments on commit 3145f4e

Please sign in to comment.