diff --git a/example/counter/counter_hostname_test/CMakeLists.txt b/example/counter/counter_hostname_test/CMakeLists.txt new file mode 100644 index 00000000..42785c8b --- /dev/null +++ b/example/counter/counter_hostname_test/CMakeLists.txt @@ -0,0 +1,134 @@ +cmake_minimum_required(VERSION 2.8.10) +project(counter C CXX) + +option(EXAMPLE_LINK_SO "Whether examples are linked dynamically" OFF) +option(LINK_TCMALLOC "Link tcmalloc if possible" ON) + +execute_process( + COMMAND bash -c "find ${CMAKE_SOURCE_DIR}/../.. -type d -path \"*output/include/braft\" | xargs dirname | xargs dirname | tr -d '\n'" + OUTPUT_VARIABLE OUTPUT_PATH +) + +set(CMAKE_PREFIX_PATH ${OUTPUT_PATH}) + +include(FindThreads) +include(FindProtobuf) + +if (NOT PROTOBUF_PROTOC_EXECUTABLE) + get_filename_component(PROTO_LIB_DIR ${PROTOBUF_LIBRARY} DIRECTORY) + set (PROTOBUF_PROTOC_EXECUTABLE "${PROTO_LIB_DIR}/../bin/protoc") +endif() + +protobuf_generate_cpp(PROTO_SRC PROTO_HEADER counter.proto) +# include PROTO_HEADER +include_directories(${CMAKE_CURRENT_BINARY_DIR}) + +find_path(BRPC_INCLUDE_PATH NAMES brpc/server.h) +if(EXAMPLE_LINK_SO) + find_library(BRPC_LIB NAMES brpc) + find_library(BRAFT_LIB NAMES braft) +else() + find_library(BRPC_LIB NAMES libbrpc.a brpc) + find_library(BRAFT_LIB NAMES libbraft.a braft) +endif() + +if((NOT BRPC_INCLUDE_PATH) OR (NOT BRPC_LIB)) + message(FATAL_ERROR "Fail to find brpc") +endif() +include_directories(${BRPC_INCLUDE_PATH}) + +find_path(BRAFT_INCLUDE_PATH NAMES braft/raft.h) +if ((NOT BRAFT_INCLUDE_PATH) OR (NOT BRAFT_LIB)) + message (FATAL_ERROR "Fail to find braft") +endif() +include_directories(${BRAFT_INCLUDE_PATH}) + +find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h) +find_library(GFLAGS_LIBRARY NAMES gflags libgflags) +if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY)) + message(FATAL_ERROR "Fail to find gflags") +endif() +include_directories(${GFLAGS_INCLUDE_PATH}) + +execute_process( + COMMAND bash -c "grep \"namespace [_A-Za-z0-9]\\+ {\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $2}' | tr -d '\n'" + OUTPUT_VARIABLE GFLAGS_NS +) +if(${GFLAGS_NS} STREQUAL "GFLAGS_NAMESPACE") + execute_process( + COMMAND bash -c "grep \"#define GFLAGS_NAMESPACE [_A-Za-z0-9]\\+\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $3}' | tr -d '\n'" + OUTPUT_VARIABLE GFLAGS_NS + ) +endif() + +if (LINK_TCMALLOC) + find_path(GPERFTOOLS_INCLUDE_DIR NAMES gperftools/heap-profiler.h) + find_library(GPERFTOOLS_LIBRARIES NAMES tcmalloc_and_profiler) + if (GPERFTOOLS_INCLUDE_DIR AND GPERFTOOLS_LIBRARIES) + set(CMAKE_CXX_FLAGS "-DBRPC_ENABLE_CPU_PROFILER") + include_directories(${GPERFTOOLS_INCLUDE_DIR}) + else () + set (GPERFTOOLS_LIBRARIES "") + endif () +endif () + +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CMAKE_CPP_FLAGS} -DGFLAGS_NS=${GFLAGS_NS} -DNDEBUG -O2 -D__const__=__unused__ -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer") +if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + # require at least gcc 4.8 + if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS 4.8) + message(FATAL_ERROR "GCC is too old, please install a newer version supporting C++11") + endif() +elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Clang") + # require at least clang 3.3 + if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS 3.3) + message(FATAL_ERROR "Clang is too old, please install a newer version supporting C++11") + endif() +else() + message(WARNING "You are using an unsupported compiler! Compilation has only been tested with Clang and GCC.") +endif() + +if(CMAKE_VERSION VERSION_LESS "3.1.3") + if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + endif() + if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + endif() +else() + set(CMAKE_CXX_STANDARD 11) + set(CMAKE_CXX_STANDARD_REQUIRED ON) +endif() + +find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h) +find_library(LEVELDB_LIB NAMES leveldb) +if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB)) + message(FATAL_ERROR "Fail to find leveldb") +endif() +include_directories(${LEVELDB_INCLUDE_PATH}) + +add_executable(counter_client client.cpp ${PROTO_SRC} ${PROTO_HEADER}) +add_executable(counter_server server.cpp ${PROTO_SRC} ${PROTO_HEADER}) + +set(DYNAMIC_LIB + ${CMAKE_THREAD_LIBS_INIT} + ${GFLAGS_LIBRARY} + ${PROTOBUF_LIBRARY} + ${GPERFTOOLS_LIBRARIES} + ${LEVELDB_LIB} + ${BRAFT_LIB} + ${BRPC_LIB} + rt + ssl + crypto + dl + z + ) + +target_link_libraries(counter_client + "-Xlinker \"-(\"" + ${DYNAMIC_LIB} + "-Xlinker \"-)\"") +target_link_libraries(counter_server + "-Xlinker \"-(\"" + ${DYNAMIC_LIB} + "-Xlinker \"-)\"") diff --git a/example/counter/counter_hostname_test/client.cpp b/example/counter/counter_hostname_test/client.cpp new file mode 100644 index 00000000..f63bc871 --- /dev/null +++ b/example/counter/counter_hostname_test/client.cpp @@ -0,0 +1,163 @@ +// Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include "counter.pb.h" + +DEFINE_bool(log_each_request, false, "Print log for each request"); +DEFINE_bool(use_bthread, false, "Use bthread to send requests"); +DEFINE_int32(add_percentage, 100, "Percentage of fetch_add"); +DEFINE_int64(added_by, 1, "Num added to each peer"); +DEFINE_int32(thread_num, 1, "Number of threads sending requests"); +DEFINE_int32(timeout_ms, 1000, "Timeout for each request"); +DEFINE_string(conf, "", "Configuration of the raft group"); +DEFINE_string(group, "Counter", "Id of the replication group"); + +bvar::LatencyRecorder g_latency_recorder("counter_client"); + +static void* sender(void* arg) { + while (!brpc::IsAskedToQuit()) { + braft::PeerId leader; + // Select leader of the target group from RouteTable + if (braft::rtb::select_leader(FLAGS_group, &leader) != 0) { + // Leader is unknown in RouteTable. Ask RouteTable to refresh leader + // by sending RPCs. + butil::Status st = braft::rtb::refresh_leader( + FLAGS_group, FLAGS_timeout_ms); + if (!st.ok()) { + // Not sure about the leader, sleep for a while and the ask again. + LOG(WARNING) << "Fail to refresh_leader : " << st; + bthread_usleep(FLAGS_timeout_ms * 1000L); + } + continue; + } + + // Now we known who is the leader, construct Stub and then sending + // rpc + brpc::Channel channel; + if (leader.type_ == braft::PeerId::Type::EndPoint) { + if (channel.Init(leader.addr, NULL) != 0) { + LOG(ERROR) << "Fail to init channel to " << leader; + bthread_usleep(FLAGS_timeout_ms * 1000L); + continue; + } + } else { + std::string naming_service_url; + naming_service_url.append(PROTOCOL_PREFIX); + naming_service_url.append(leader.hostname_); + if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) { + LOG(ERROR) << "Fail to init channel to " << leader; + bthread_usleep(FLAGS_timeout_ms * 1000L); + continue; + } + + } + example::CounterService_Stub stub(&channel); + + brpc::Controller cntl; + cntl.set_timeout_ms(FLAGS_timeout_ms); + // Randomly select which request we want send; + example::CounterResponse response; + + if (butil::fast_rand_less_than(100) < (size_t)FLAGS_add_percentage) { + example::FetchAddRequest request; + request.set_value(FLAGS_added_by); + stub.fetch_add(&cntl, &request, &response, NULL); + } else { + example::GetRequest request; + stub.get(&cntl, &request, &response, NULL); + } + if (cntl.Failed()) { + LOG(WARNING) << "Fail to send request to " << leader + << " : " << cntl.ErrorText(); + // Clear leadership since this RPC failed. + braft::rtb::update_leader(FLAGS_group, braft::PeerId()); + bthread_usleep(FLAGS_timeout_ms * 1000L); + continue; + } + if (!response.success()) { + LOG(WARNING) << "Fail to send request to " << leader + << ", redirecting to " + << (response.has_redirect() + ? response.redirect() : "nowhere"); + // Update route table since we have redirect information + braft::rtb::update_leader(FLAGS_group, response.redirect()); + continue; + } + g_latency_recorder << cntl.latency_us(); + if (FLAGS_log_each_request) { + LOG(INFO) << "Received response from " << leader + << " value=" << response.value() + << " latency=" << cntl.latency_us(); + bthread_usleep(1000L * 1000L); + } + } + return NULL; +} + +int main(int argc, char* argv[]) { + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + butil::AtExitManager exit_manager; + + // Register configuration of target group to RouteTable + if (braft::rtb::update_configuration(FLAGS_group, FLAGS_conf) != 0) { + LOG(ERROR) << "Fail to register configuration " << FLAGS_conf + << " of group " << FLAGS_group; + return -1; + } + + std::vector tids; + tids.resize(FLAGS_thread_num); + if (!FLAGS_use_bthread) { + for (int i = 0; i < FLAGS_thread_num; ++i) { + if (pthread_create(&tids[i], NULL, sender, NULL) != 0) { + LOG(ERROR) << "Fail to create pthread"; + return -1; + } + } + } else { + for (int i = 0; i < FLAGS_thread_num; ++i) { + if (bthread_start_background(&tids[i], NULL, sender, NULL) != 0) { + LOG(ERROR) << "Fail to create bthread"; + return -1; + } + } + } + + while (!brpc::IsAskedToQuit()) { + sleep(1); + LOG_IF(INFO, !FLAGS_log_each_request) + << "Sending Request to " << FLAGS_group + << " (" << FLAGS_conf << ')' + << " at qps=" << g_latency_recorder.qps(1) + << " latency=" << g_latency_recorder.latency(1); + } + + LOG(INFO) << "Counter client is going to quit"; + for (int i = 0; i < FLAGS_thread_num; ++i) { + if (!FLAGS_use_bthread) { + pthread_join(tids[i], NULL); + } else { + bthread_join(tids[i], NULL); + } + } + + return 0; +} diff --git a/example/counter/counter_hostname_test/counter.proto b/example/counter/counter_hostname_test/counter.proto new file mode 100644 index 00000000..8407a47b --- /dev/null +++ b/example/counter/counter_hostname_test/counter.proto @@ -0,0 +1,25 @@ +syntax="proto2"; +package example; +option cc_generic_services = true; + +message Snapshot { + required int64 value = 1; +}; + +message FetchAddRequest { + required int64 value = 1; +}; + +message CounterResponse { + required bool success = 1; + optional int64 value = 2; + optional string redirect = 3; +}; + +message GetRequest { +}; + +service CounterService { + rpc fetch_add(FetchAddRequest) returns (CounterResponse); + rpc get(GetRequest) returns (CounterResponse); +}; diff --git a/example/counter/counter_hostname_test/run_client.sh b/example/counter/counter_hostname_test/run_client.sh new file mode 100644 index 00000000..4763e098 --- /dev/null +++ b/example/counter/counter_hostname_test/run_client.sh @@ -0,0 +1,60 @@ +#!/bin/bash + +# Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# source shflags from current directory +mydir="${BASH_SOURCE%/*}" +if [[ ! -d "$mydir" ]]; then mydir="$PWD"; fi +. $mydir/../shflags + + +# define command-line flags +DEFINE_boolean clean 1 'Remove old "runtime" dir before running' +DEFINE_integer add_percentage 100 'Percentage of fetch_add operation' +DEFINE_integer bthread_concurrency '8' 'Number of worker pthreads' +DEFINE_integer server_port 8100 "Port of the first server" +DEFINE_integer server_num '3' 'Number of servers' +DEFINE_integer thread_num 3 'Number of sending thread' +DEFINE_string crash_on_fatal 'true' 'Crash on fatal log' +DEFINE_string log_each_request 'true' 'Print log for each request' +DEFINE_string valgrind 'false' 'Run in valgrind' +DEFINE_string use_bthread "true" "Use bthread to send request" + +FLAGS "$@" || exit 1 + +# hostname prefers ipv6 +# IP=`hostname -i | awk '{print $NF}'` +IP=`hostname` + +if [ "$FLAGS_valgrind" == "true" ] && [ $(which valgrind) ] ; then + VALGRIND="valgrind --tool=memcheck --leak-check=full" +fi + +raft_peers="" +for ((i=0; i<$FLAGS_server_num; ++i)); do + raft_peers="${raft_peers}${IP}:$((${FLAGS_server_port}+i)):0," +done + +export TCMALLOC_SAMPLE_PARAMETER=524288 + +${VALGRIND} ./counter_client \ + --add_percentage=${FLAGS_add_percentage} \ + --bthread_concurrency=${FLAGS_bthread_concurrency} \ + --conf="${raft_peers}" \ + --crash_on_fatal_log=${FLAGS_crash_on_fatal} \ + --log_each_request=${FLAGS_log_each_request} \ + --thread_num=${FLAGS_thread_num} \ + --use_bthread=${FLAGS_use_bthread} \ + diff --git a/example/counter/counter_hostname_test/run_server.sh b/example/counter/counter_hostname_test/run_server.sh new file mode 100644 index 00000000..19903e2b --- /dev/null +++ b/example/counter/counter_hostname_test/run_server.sh @@ -0,0 +1,69 @@ +#!/bin/bash + +# Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# source shflags from current directory +mydir="${BASH_SOURCE%/*}" +if [[ ! -d "$mydir" ]]; then mydir="$PWD"; fi +. $mydir/../shflags + +# define command-line flags +DEFINE_string crash_on_fatal 'true' 'Crash on fatal log' +DEFINE_integer bthread_concurrency '18' 'Number of worker pthreads' +DEFINE_string sync 'true' 'fsync each time' +DEFINE_string valgrind 'false' 'Run in valgrind' +DEFINE_integer max_segment_size '8388608' 'Max segment size' +DEFINE_integer server_num '3' 'Number of servers' +DEFINE_boolean clean 1 'Remove old "runtime" dir before running' +DEFINE_integer port 8100 "Port of the first server" + +# parse the command-line +FLAGS "$@" || exit 1 +eval set -- "${FLAGS_ARGV}" + +# The alias for printing to stderr +alias error=">&2 echo counter: " + +# hostname prefers ipv6 +# IP=`hostname -i | awk '{print $NF}'` +IP=`hostname` + +if [ "$FLAGS_valgrind" == "true" ] && [ $(which valgrind) ] ; then + VALGRIND="valgrind --tool=memcheck --leak-check=full" +fi + +raft_peers="" +for ((i=0; i<$FLAGS_server_num; ++i)); do + raft_peers="${raft_peers}${IP}:$((${FLAGS_port}+i)):0," +done + +if [ "$FLAGS_clean" == "0" ]; then + rm -rf runtime +fi + +export TCMALLOC_SAMPLE_PARAMETER=524288 + +for ((i=0; i<$FLAGS_server_num; ++i)); do + mkdir -p runtime/$i + cp ./counter_server runtime/$i + cd runtime/$i + ${VALGRIND} ./counter_server \ + -bthread_concurrency=${FLAGS_bthread_concurrency}\ + -crash_on_fatal_log=${FLAGS_crash_on_fatal} \ + -raft_max_segment_size=${FLAGS_max_segment_size} \ + -raft_sync=${FLAGS_sync} \ + -port=$((${FLAGS_port}+i)) -conf="${raft_peers}" > std.log 2>&1 & + cd ../.. +done diff --git a/example/counter/counter_hostname_test/server.cpp b/example/counter/counter_hostname_test/server.cpp new file mode 100644 index 00000000..f8a605ee --- /dev/null +++ b/example/counter/counter_hostname_test/server.cpp @@ -0,0 +1,414 @@ +// Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include // DEFINE_* +#include // brpc::Controller +#include // brpc::Server +#include // braft::Node braft::StateMachine +#include // braft::SnapshotWriter +#include // braft::AsyncClosureGuard +#include // braft::ProtoBufFile +#include "counter.pb.h" // CounterService + +DEFINE_bool(check_term, true, "Check if the leader changed to another term"); +DEFINE_bool(disable_cli, false, "Don't allow raft_cli access this node"); +DEFINE_bool(log_applied_task, false, "Print notice log when a task is applied"); +DEFINE_int32(election_timeout_ms, 5000, + "Start election in such milliseconds if disconnect with the leader"); +DEFINE_int32(port, 8100, "Listen port of this peer"); +DEFINE_int32(snapshot_interval, 30, "Interval between each snapshot"); +DEFINE_string(conf, "", "Initial configuration of the replication group"); +DEFINE_string(data_path, "./data1", "Path of data stored on"); +DEFINE_string(group, "Counter", "Id of the replication group"); +DEFINE_string(node_hostname, "counter", "Dns name of this node."); + +namespace example { +class Counter; + +// Implements Closure which encloses RPC stuff +class FetchAddClosure : public braft::Closure { +public: + FetchAddClosure(Counter* counter, + const FetchAddRequest* request, + CounterResponse* response, + google::protobuf::Closure* done) + : _counter(counter) + , _request(request) + , _response(response) + , _done(done) {} + ~FetchAddClosure() {} + + const FetchAddRequest* request() const { return _request; } + CounterResponse* response() const { return _response; } + void Run(); + +private: + Counter* _counter; + const FetchAddRequest* _request; + CounterResponse* _response; + google::protobuf::Closure* _done; +}; + +// Implementation of example::Counter as a braft::StateMachine. +class Counter : public braft::StateMachine { +public: + Counter() + : _node(NULL) + , _value(0) + , _leader_term(-1) + {} + ~Counter() { + delete _node; + } + + // Starts this node + int start() { + butil::EndPoint addr(butil::my_ip(), FLAGS_port); + braft::NodeOptions node_options; + if (node_options.initial_conf.parse_from(FLAGS_conf) != 0) { + LOG(ERROR) << "Fail to parse configuration `" << FLAGS_conf << '\''; + return -1; + } + node_options.election_timeout_ms = FLAGS_election_timeout_ms; + node_options.fsm = this; + node_options.node_owns_fsm = false; + node_options.snapshot_interval_s = FLAGS_snapshot_interval; + std::string prefix = "local://" + FLAGS_data_path; + node_options.log_uri = prefix + "/log"; + node_options.raft_meta_uri = prefix + "/raft_meta"; + node_options.snapshot_uri = prefix + "/snapshot"; + node_options.disable_cli = FLAGS_disable_cli; + // braft::Node* node = new braft::Node(FLAGS_group, braft::PeerId(addr)); + braft::PeerId node_host(addr); + node_host.hostname_.append(FLAGS_node_hostname); + node_host.hostname_.append(":"); + node_host.hostname_.append(std::to_string(FLAGS_port)); + node_host.type_ = braft::PeerId::Type::HostName; + braft::Node* node = new braft::Node(FLAGS_group, node_host); + if (node->init(node_options) != 0) { + LOG(ERROR) << "Fail to init raft node"; + delete node; + return -1; + } + _node = node; + return 0; + } + + // Impelements Service methods + void fetch_add(const FetchAddRequest* request, + CounterResponse* response, + google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + // Serialize request to the replicated write-ahead-log so that all the + // peers in the group receive this request as well. + // Notice that _value can't be modified in this routine otherwise it + // will be inconsistent with others in this group. + + // Serialize request to IOBuf + const int64_t term = _leader_term.load(butil::memory_order_relaxed); + if (term < 0) { + return redirect(response); + } + butil::IOBuf log; + butil::IOBufAsZeroCopyOutputStream wrapper(&log); + if (!request->SerializeToZeroCopyStream(&wrapper)) { + LOG(ERROR) << "Fail to serialize request"; + response->set_success(false); + return; + } + // Apply this log as a braft::Task + braft::Task task; + task.data = &log; + // This callback would be iovoked when the task actually excuted or + // fail + task.done = new FetchAddClosure(this, request, response, + done_guard.release()); + if (FLAGS_check_term) { + // ABA problem can be avoid if expected_term is set + task.expected_term = term; + } + // Now the task is applied to the group, waiting for the result. + return _node->apply(task); + } + + void get(CounterResponse* response) { + // In consideration of consistency. GetRequest to follower should be + // rejected. + if (!is_leader()) { + // This node is a follower or it's not up-to-date. Redirect to + // the leader if possible. + return redirect(response); + } + + // This is the leader and is up-to-date. It's safe to respond client + response->set_success(true); + response->set_value(_value.load(butil::memory_order_relaxed)); + } + + bool is_leader() const + { return _leader_term.load(butil::memory_order_acquire) > 0; } + + // Shut this node down. + void shutdown() { + if (_node) { + _node->shutdown(NULL); + } + } + + // Blocking this thread until the node is eventually down. + void join() { + if (_node) { + _node->join(); + } + } + +private: +friend class FetchAddClosure; + + void redirect(CounterResponse* response) { + response->set_success(false); + if (_node) { + braft::PeerId leader = _node->leader_id(); + if (!leader.is_empty()) { + response->set_redirect(leader.to_string()); + } + } + } + + // @braft::StateMachine + void on_apply(braft::Iterator& iter) { + // A batch of tasks are committed, which must be processed through + // |iter| + for (; iter.valid(); iter.next()) { + int64_t detal_value = 0; + CounterResponse* response = NULL; + // This guard helps invoke iter.done()->Run() asynchronously to + // avoid that callback blocks the StateMachine. + braft::AsyncClosureGuard closure_guard(iter.done()); + if (iter.done()) { + // This task is applied by this node, get value from this + // closure to avoid additional parsing. + FetchAddClosure* c = dynamic_cast(iter.done()); + response = c->response(); + detal_value = c->request()->value(); + } else { + // Have to parse FetchAddRequest from this log. + butil::IOBufAsZeroCopyInputStream wrapper(iter.data()); + FetchAddRequest request; + CHECK(request.ParseFromZeroCopyStream(&wrapper)); + detal_value = request.value(); + } + + // Now the log has been parsed. Update this state machine by this + // operation. + const int64_t prev = _value.fetch_add(detal_value, + butil::memory_order_relaxed); + if (response) { + response->set_success(true); + response->set_value(prev); + } + + // The purpose of following logs is to help you understand the way + // this StateMachine works. + // Remove these logs in performance-sensitive servers. + LOG_IF(INFO, FLAGS_log_applied_task) + << "Added value=" << prev << " by detal=" << detal_value + << " at log_index=" << iter.index(); + } + } + + struct SnapshotArg { + int64_t value; + braft::SnapshotWriter* writer; + braft::Closure* done; + }; + + static void *save_snapshot(void* arg) { + SnapshotArg* sa = (SnapshotArg*) arg; + std::unique_ptr arg_guard(sa); + // Serialize StateMachine to the snapshot + brpc::ClosureGuard done_guard(sa->done); + std::string snapshot_path = sa->writer->get_path() + "/data"; + LOG(INFO) << "Saving snapshot to " << snapshot_path; + // Use protobuf to store the snapshot for backward compatibility. + Snapshot s; + s.set_value(sa->value); + braft::ProtoBufFile pb_file(snapshot_path); + if (pb_file.save(&s, true) != 0) { + sa->done->status().set_error(EIO, "Fail to save pb_file"); + return NULL; + } + // Snapshot is a set of files in raft. Add the only file into the + // writer here. + if (sa->writer->add_file("data") != 0) { + sa->done->status().set_error(EIO, "Fail to add file to writer"); + return NULL; + } + return NULL; + } + + void on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done) { + // Save current StateMachine in memory and starts a new bthread to avoid + // blocking StateMachine since it's a bit slow to write data to disk + // file. + SnapshotArg* arg = new SnapshotArg; + arg->value = _value.load(butil::memory_order_relaxed); + arg->writer = writer; + arg->done = done; + bthread_t tid; + bthread_start_urgent(&tid, NULL, save_snapshot, arg); + } + + int on_snapshot_load(braft::SnapshotReader* reader) { + // Load snasphot from reader, replacing the running StateMachine + CHECK(!is_leader()) << "Leader is not supposed to load snapshot"; + if (reader->get_file_meta("data", NULL) != 0) { + LOG(ERROR) << "Fail to find `data' on " << reader->get_path(); + return -1; + } + std::string snapshot_path = reader->get_path() + "/data"; + braft::ProtoBufFile pb_file(snapshot_path); + Snapshot s; + if (pb_file.load(&s) != 0) { + LOG(ERROR) << "Fail to load snapshot from " << snapshot_path; + return -1; + } + _value.store(s.value(), butil::memory_order_relaxed); + return 0; + } + + void on_leader_start(int64_t term) { + _leader_term.store(term, butil::memory_order_release); + LOG(INFO) << "Node becomes leader"; + } + void on_leader_stop(const butil::Status& status) { + _leader_term.store(-1, butil::memory_order_release); + LOG(INFO) << "Node stepped down : " << status; + } + + void on_shutdown() { + LOG(INFO) << "This node is down"; + } + void on_error(const ::braft::Error& e) { + LOG(ERROR) << "Met raft error " << e; + } + void on_configuration_committed(const ::braft::Configuration& conf) { + LOG(INFO) << "Configuration of this group is " << conf; + } + void on_stop_following(const ::braft::LeaderChangeContext& ctx) { + LOG(INFO) << "Node stops following " << ctx; + } + void on_start_following(const ::braft::LeaderChangeContext& ctx) { + LOG(INFO) << "Node start following " << ctx; + } + // end of @braft::StateMachine + +private: + braft::Node* volatile _node; + butil::atomic _value; + butil::atomic _leader_term; +}; + +void FetchAddClosure::Run() { + // Auto delete this after Run() + std::unique_ptr self_guard(this); + // Repsond this RPC. + brpc::ClosureGuard done_guard(_done); + if (status().ok()) { + return; + } + // Try redirect if this request failed. + _counter->redirect(_response); +} + +// Implements example::CounterService if you are using brpc. +class CounterServiceImpl : public CounterService { +public: + explicit CounterServiceImpl(Counter* counter) : _counter(counter) {} + void fetch_add(::google::protobuf::RpcController* controller, + const ::example::FetchAddRequest* request, + ::example::CounterResponse* response, + ::google::protobuf::Closure* done) { + return _counter->fetch_add(request, response, done); + } + void get(::google::protobuf::RpcController* controller, + const ::example::GetRequest* request, + ::example::CounterResponse* response, + ::google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + return _counter->get(response); + } +private: + Counter* _counter; +}; + +} // namespace example + +int main(int argc, char* argv[]) { + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + butil::AtExitManager exit_manager; + + // Generally you only need one Server. + brpc::Server server; + example::Counter counter; + example::CounterServiceImpl service(&counter); + + // Add your service into RPC server + if (server.AddService(&service, + brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { + LOG(ERROR) << "Fail to add service"; + return -1; + } + // raft can share the same RPC server. Notice the second parameter, because + // adding services into a running server is not allowed and the listen + // address of this server is impossible to get before the server starts. You + // have to specify the address of the server. + if (braft::add_service(&server, FLAGS_port) != 0) { + LOG(ERROR) << "Fail to add raft service"; + return -1; + } + + // It's recommended to start the server before Counter is started to avoid + // the case that it becomes the leader while the service is unreacheable by + // clients. + // Notice the default options of server is used here. Check out details from + // the doc of brpc if you would like change some options; + if (server.Start(FLAGS_port, NULL) != 0) { + LOG(ERROR) << "Fail to start Server"; + return -1; + } + + // It's ok to start Counter; + if (counter.start() != 0) { + LOG(ERROR) << "Fail to start Counter"; + return -1; + } + + LOG(INFO) << "Counter service is running on " << server.listen_address(); + // Wait until 'CTRL-C' is pressed. then Stop() and Join() the service + while (!brpc::IsAskedToQuit()) { + sleep(1); + } + + LOG(INFO) << "Counter service is going to quit"; + + // Stop counter before server + counter.shutdown(); + server.Stop(0); + + // Wait until all the processing tasks are over. + counter.join(); + server.Join(); + return 0; +} diff --git a/example/counter/counter_hostname_test/stop.sh b/example/counter/counter_hostname_test/stop.sh new file mode 100644 index 00000000..30024062 --- /dev/null +++ b/example/counter/counter_hostname_test/stop.sh @@ -0,0 +1,21 @@ +#!/bin/bash +#=============================================================================== +# +# FILE: stop.sh +# +# USAGE: ./stop.sh +# +# DESCRIPTION: +# +# OPTIONS: --- +# REQUIREMENTS: --- +# BUGS: --- +# NOTES: --- +# AUTHOR: WangYao (), wangyao02@baidu.com +# COMPANY: Baidu.com, Inc +# VERSION: 1.0 +# CREATED: 2015年10月30日 17时50分43秒 CST +# REVISION: --- +#=============================================================================== + +killall -9 counter_server diff --git a/src/braft/cli.cpp b/src/braft/cli.cpp index c0447812..cf2777ae 100644 --- a/src/braft/cli.cpp +++ b/src/braft/cli.cpp @@ -35,9 +35,19 @@ static butil::Status get_leader(const GroupId& group_id, const Configuration& co for (Configuration::const_iterator iter = conf.begin(); iter != conf.end(); ++iter) { brpc::Channel channel; - if (channel.Init(iter->addr, NULL) != 0) { - return butil::Status(-1, "Fail to init channel to %s", - iter->to_string().c_str()); + if (iter->type_ == PeerId::Type::EndPoint) { + if (channel.Init(iter->addr, NULL) != 0) { + return butil::Status(-1, "Fail to init channel to %s", + iter->to_string().c_str()); + } + } else { + std::string naming_service_url; + naming_service_url.append(PROTOCOL_PREFIX); + naming_service_url.append(iter->hostname_); + if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) { + return butil::Status(-1, "Fail to init channel to %s", + iter->to_string().c_str()); + } } CliService_Stub stub(&channel); GetLeaderRequest request; @@ -73,9 +83,19 @@ butil::Status add_peer(const GroupId& group_id, const Configuration& conf, butil::Status st = get_leader(group_id, conf, &leader_id); BRAFT_RETURN_IF(!st.ok(), st); brpc::Channel channel; - if (channel.Init(leader_id.addr, NULL) != 0) { - return butil::Status(-1, "Fail to init channel to %s", - leader_id.to_string().c_str()); + if (leader_id.type_ == PeerId::Type::EndPoint) { + if (channel.Init(leader_id.addr, NULL) != 0) { + return butil::Status(-1, "Fail to init channel to %s", + leader_id.to_string().c_str()); + } + } else { + std::string naming_service_url; + naming_service_url.append(PROTOCOL_PREFIX); + naming_service_url.append(leader_id.hostname_); + if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) { + return butil::Status(-1, "Fail to init channel to %s", + leader_id.to_string().c_str()); + } } AddPeerRequest request; request.set_group_id(group_id); @@ -111,9 +131,19 @@ butil::Status remove_peer(const GroupId& group_id, const Configuration& conf, butil::Status st = get_leader(group_id, conf, &leader_id); BRAFT_RETURN_IF(!st.ok(), st); brpc::Channel channel; - if (channel.Init(leader_id.addr, NULL) != 0) { - return butil::Status(-1, "Fail to init channel to %s", - leader_id.to_string().c_str()); + if (leader_id.type_ == PeerId::Type::EndPoint) { + if (channel.Init(leader_id.addr, NULL) != 0) { + return butil::Status(-1, "Fail to init channel to %s", + leader_id.to_string().c_str()); + } + } else { + std::string naming_service_url; + naming_service_url.append(PROTOCOL_PREFIX); + naming_service_url.append(leader_id.hostname_); + if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) { + return butil::Status(-1, "Fail to init channel to %s", + leader_id.to_string().c_str()); + } } RemovePeerRequest request; request.set_group_id(group_id); @@ -150,9 +180,19 @@ butil::Status reset_peer(const GroupId& group_id, const PeerId& peer_id, return butil::Status(EINVAL, "new_conf is empty"); } brpc::Channel channel; - if (channel.Init(peer_id.addr, NULL) != 0) { - return butil::Status(-1, "Fail to init channel to %s", - peer_id.to_string().c_str()); + if (peer_id.type_ == PeerId::Type::EndPoint) { + if (channel.Init(peer_id.addr, NULL) != 0) { + return butil::Status(-1, "Fail to init channel to %s", + peer_id.to_string().c_str()); + } + } else { + std::string naming_service_url; + naming_service_url.append(PROTOCOL_PREFIX); + naming_service_url.append(peer_id.hostname_); + if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) { + return butil::Status(-1, "Fail to init channel to %s", + peer_id.to_string().c_str()); + } } brpc::Controller cntl; cntl.set_timeout_ms(options.timeout_ms); @@ -176,9 +216,19 @@ butil::Status reset_peer(const GroupId& group_id, const PeerId& peer_id, butil::Status snapshot(const GroupId& group_id, const PeerId& peer_id, const CliOptions& options) { brpc::Channel channel; - if (channel.Init(peer_id.addr, NULL) != 0) { - return butil::Status(-1, "Fail to init channel to %s", - peer_id.to_string().c_str()); + if (peer_id.type_ == PeerId::Type::EndPoint) { + if (channel.Init(peer_id.addr, NULL) != 0) { + return butil::Status(-1, "Fail to init channel to %s", + peer_id.to_string().c_str()); + } + } else { + std::string naming_service_url; + naming_service_url.append(PROTOCOL_PREFIX); + naming_service_url.append(peer_id.hostname_); + if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) { + return butil::Status(-1, "Fail to init channel to %s", + peer_id.to_string().c_str()); + } } brpc::Controller cntl; cntl.set_timeout_ms(options.timeout_ms); @@ -204,9 +254,19 @@ butil::Status change_peers(const GroupId& group_id, const Configuration& conf, LOG(INFO) << "conf=" << conf << " leader=" << leader_id << " new_peers=" << new_peers; brpc::Channel channel; - if (channel.Init(leader_id.addr, NULL) != 0) { - return butil::Status(-1, "Fail to init channel to %s", - leader_id.to_string().c_str()); + if (leader_id.type_ == PeerId::Type::EndPoint) { + if (channel.Init(leader_id.addr, NULL) != 0) { + return butil::Status(-1, "Fail to init channel to %s", + leader_id.to_string().c_str()); + } + } else { + std::string naming_service_url; + naming_service_url.append(PROTOCOL_PREFIX); + naming_service_url.append(leader_id.hostname_); + if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) { + return butil::Status(-1, "Fail to init channel to %s", + leader_id.to_string().c_str()); + } } ChangePeersRequest request; @@ -250,9 +310,19 @@ butil::Status transfer_leader(const GroupId& group_id, const Configuration& conf return butil::Status::OK(); } brpc::Channel channel; - if (channel.Init(leader_id.addr, NULL) != 0) { - return butil::Status(-1, "Fail to init channel to %s", - leader_id.to_string().c_str()); + if (leader_id.type_ == PeerId::Type::EndPoint) { + if (channel.Init(leader_id.addr, NULL) != 0) { + return butil::Status(-1, "Fail to init channel to %s", + leader_id.to_string().c_str()); + } + } else { + std::string naming_service_url; + naming_service_url.append(PROTOCOL_PREFIX); + naming_service_url.append(leader_id.hostname_); + if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) { + return butil::Status(-1, "Fail to init channel to %s", + leader_id.to_string().c_str()); + } } TransferLeaderRequest request; request.set_group_id(group_id); diff --git a/src/braft/configuration.h b/src/braft/configuration.h index 310539ae..14d8123c 100644 --- a/src/braft/configuration.h +++ b/src/braft/configuration.h @@ -40,76 +40,154 @@ enum Role { }; // Represent a participant in a replicating group. +// Conf like: 172-17-0-1.default.pod.cluster.local:8002:0,172-17-0-2.default.pod.cluster.local:8002:0,172-17-0-3.default.pod.cluster.local:8002:0 struct PeerId { butil::EndPoint addr; // ip+port. int idx; // idx in same addr, default 0 Role role = REPLICA; - - PeerId() : idx(0), role(REPLICA) {} - explicit PeerId(butil::EndPoint addr_) : addr(addr_), idx(0), role(REPLICA) {} - PeerId(butil::EndPoint addr_, int idx_) : addr(addr_), idx(idx_), role(REPLICA) {} - PeerId(butil::EndPoint addr_, int idx_, bool witness) : addr(addr_), idx(idx_) { + std::string hostname_; // hostname+port, e.g. www.foo.com:8765 + enum class Type { + EndPoint = 0, + HostName + }; + Type type_; + + PeerId() : idx(0), role(REPLICA), type_(Type::EndPoint) {} + explicit PeerId(butil::EndPoint addr_) : addr(addr_), idx(0), role(REPLICA), type_(Type::EndPoint) {} + PeerId(butil::EndPoint addr_, int idx_) : addr(addr_), idx(idx_), role(REPLICA), type_(Type::EndPoint) {} + PeerId(butil::EndPoint addr_, int idx_, bool witness) : addr(addr_), idx(idx_), type_(Type::EndPoint) { if (witness) { this->role = WITNESS; } } - /*intended implicit*/PeerId(const std::string& str) { CHECK_EQ(0, parse(str)); } - PeerId(const PeerId& id) : addr(id.addr), idx(id.idx), role(id.role) {} + PeerId(const PeerId& id) : addr(id.addr), idx(id.idx), role(id.role), hostname_(id.hostname_), type_(id.type_) {} + PeerId(PeerId&& id) : addr(std::move(id.addr)), idx(std::move(id.idx)), role(std::move(id.role)), hostname_(std::move(id.hostname_)), + type_(std::move(id.type_)) {} + + PeerId& operator=(const PeerId& id) { + addr = id.addr; + idx = id.idx; + hostname_ = id.hostname_; + type_ = id.type_; + role = id.role + + return *this; + } + + PeerId& operator=(PeerId&& id) { + addr = std::move(id.addr); + idx = std::move(id.idx); + hostname_ = std::move(id.hostname_); + type_ = std::move(id.type_); + role = std::move(id.role) + + return *this; + } void reset() { - addr.ip = butil::IP_ANY; - addr.port = 0; + if (type_ == Type::EndPoint) { + addr.ip = butil::IP_ANY; + addr.port = 0; + } + else { + hostname_.clear(); + } idx = 0; role = REPLICA; } bool is_empty() const { - return (addr.ip == butil::IP_ANY && addr.port == 0 && idx == 0); + if (type_ == Type::EndPoint) { + return (addr.ip == butil::IP_ANY && addr.port == 0 && idx == 0); + } else { + return (hostname_.empty() && idx == 0); + } } bool is_witness() const { return role == WITNESS; } int parse(const std::string& str) { reset(); - char ip_str[64]; + char temp_str[265]; // max length of DNS Name < 255 int value = REPLICA; - if (2 > sscanf(str.c_str(), "%[^:]%*[:]%d%*[:]%d%*[:]%d", ip_str, &addr.port, &idx, &value)) { + int port; + if (2 > sscanf(str.c_str(), "%[^:]%*[:]%d%*[:]%d%*[:]%d", temp_str, &port, &idx, &value)) { reset(); return -1; } role = (Role)value; if (role > WITNESS) { - reset(); + reset() return -1; } - if (0 != butil::str2ip(ip_str, &addr.ip)) { - reset(); - return -1; + if (0 != butil::str2ip(temp_str, &addr.ip)) { + type_ = Type::HostName; + hostname_.append(temp_str); + hostname_.append(":"); + hostname_.append(std::to_string(port)); + } else { + type_ = Type::EndPoint; + addr.port = port; } return 0; } std::string to_string() const { - char str[128]; - snprintf(str, sizeof(str), "%s:%d:%d", butil::endpoint2str(addr).c_str(), idx, int(role)); + char str[265]; // max length of DNS Name < 255 + if (type_ == Type::EndPoint) { + snprintf(str, sizeof(str), "%s:%d:%d", butil::endpoint2str(addr).c_str(), idx, int(role)); + } else { + snprintf(str, sizeof(str), "%s:%d:%d", hostname_.c_str(), idx, int(role)); + } return std::string(str); } - PeerId& operator=(const PeerId& rhs) = default; }; inline bool operator<(const PeerId& id1, const PeerId& id2) { - if (id1.addr < id2.addr) { - return true; + if (id1.type_ != id2.type_) { + LOG(WARNING) << "PeerId id1 and PeerId id2 do not have same type(IP Addr or Hostname)."; + if (id1.type_ == PeerId::Type::EndPoint) { + if (strcmp(butil::endpoint2str(id1.addr).c_str(), id2.hostname_.c_str()) < 0) { + return true; + } else { + return false; + } + } else { + if (strcmp(id1.hostname_.c_str(), butil::endpoint2str(id2.addr).c_str()) < 0) { + return true; + } else { + return false; + } + } } else { - return id1.addr == id2.addr && id1.idx < id2.idx; + if (id1.type_ == PeerId::Type::EndPoint) { + if (id1.addr < id2.addr) { + return true; + } else { + return id1.addr == id2.addr && id1.idx < id2.idx; + } + } else { + if (id1.hostname_ < id2.hostname_) { + return true; + } else { + return id1.hostname_ == id2.hostname_ && id1.idx < id2.idx; + } + } } } inline bool operator==(const PeerId& id1, const PeerId& id2) { - return (id1.addr == id2.addr && id1.idx == id2.idx); + if (id1.type_ != id2.type_) { + return false; + } + if (id1.type_ == PeerId::Type::EndPoint) { + return (id1.addr == id2.addr && id1.idx == id2.idx); + } else { + return (id1.hostname_ == id2.hostname_ && id1.idx == id2.idx); + } } inline bool operator!=(const PeerId& id1, const PeerId& id2) { @@ -117,7 +195,11 @@ inline bool operator!=(const PeerId& id1, const PeerId& id2) { } inline std::ostream& operator << (std::ostream& os, const PeerId& id) { - return os << id.addr << ':' << id.idx << ':' << int(id.role); + if (id.type_ == PeerId::Type::EndPoint) { + return os << id.addr << ':' << id.idx << ':' << int(id.role); + } else { + return os << id.hostname_ << ':' << id.idx << ':' << int(id.role); + } } struct NodeId { diff --git a/src/braft/node.cpp b/src/braft/node.cpp index b5ba39eb..687e46a4 100644 --- a/src/braft/node.cpp +++ b/src/braft/node.cpp @@ -1654,12 +1654,22 @@ void NodeImpl::pre_vote(std::unique_lock* lck, bool triggered) { options.max_retry = 0; options.connect_timeout_ms = FLAGS_raft_rpc_channel_connect_timeout_ms; brpc::Channel channel; - if (0 != channel.Init(iter->addr, &options)) { - LOG(WARNING) << "node " << _group_id << ":" << _server_id + if (iter->type_ == PeerId::Type::EndPoint) { + if (0 != channel.Init(iter->addr, &options)) { + LOG(WARNING) << "node " << _group_id << ":" << _server_id << " channel init failed, addr " << iter->addr; - continue; + continue; + } + } else { + std::string naming_service_url; + naming_service_url.append(PROTOCOL_PREFIX); + naming_service_url.append(iter->hostname_); + if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, &options) != 0) { + LOG(WARNING) << "node " << _group_id << ":" << _server_id + << " channel init failed, hostname " << iter->hostname_; + continue; + } } - OnPreVoteRPCDone* done = new OnPreVoteRPCDone( *iter, _current_term, _pre_vote_ctx.version(), this); done->cntl.set_timeout_ms(_options.election_timeout_ms); @@ -1760,10 +1770,21 @@ void NodeImpl::request_peers_to_vote(const std::set& peers, options.connect_timeout_ms = FLAGS_raft_rpc_channel_connect_timeout_ms; options.max_retry = 0; brpc::Channel channel; - if (0 != channel.Init(iter->addr, &options)) { - LOG(WARNING) << "node " << _group_id << ":" << _server_id - << " channel init failed, addr " << iter->addr; - continue; + if (iter->type_ == PeerId::Type::EndPoint) { + if (0 != channel.Init(iter->addr, &options)) { + LOG(WARNING) << "node " << _group_id << ":" << _server_id + << " channel init failed, addr " << iter->addr; + continue; + } + } else { + std::string naming_service_url; + naming_service_url.append(PROTOCOL_PREFIX); + naming_service_url.append(iter->hostname_); + if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, &options) != 0) { + LOG(WARNING) << "node " << _group_id << ":" << _server_id + << " channel init failed, addr " << iter->hostname_; + continue; + } } OnRequestVoteRPCDone* done = diff --git a/src/braft/raft_meta.cpp b/src/braft/raft_meta.cpp index fc791e70..065888c7 100644 --- a/src/braft/raft_meta.cpp +++ b/src/braft/raft_meta.cpp @@ -531,6 +531,11 @@ int FileBasedSingleMetaStorage::save() { StablePBMeta meta; meta.set_term(_term); + // if _votedfor's hostname_ is empty, the raft meta file(format ":0") could not be parsed. + // make some tricky fix. + if (_votedfor.type_ == PeerId::Type::HostName && _votedfor.hostname_.empty()) { + _votedfor.hostname_.append("localhost:0"); + } meta.set_votedfor(_votedfor.to_string()); std::string path(_path); diff --git a/src/braft/replicator.cpp b/src/braft/replicator.cpp index 65aea0df..6e78493e 100644 --- a/src/braft/replicator.cpp +++ b/src/braft/replicator.cpp @@ -116,11 +116,23 @@ int Replicator::start(const ReplicatorOptions& options, ReplicatorId *id) { brpc::ChannelOptions channel_opt; channel_opt.connect_timeout_ms = FLAGS_raft_rpc_channel_connect_timeout_ms; channel_opt.timeout_ms = -1; // We don't need RPC timeout - if (r->_sending_channel.Init(options.peer_id.addr, &channel_opt) != 0) { - LOG(ERROR) << "Fail to init sending channel" - << ", group " << options.group_id; - delete r; - return -1; + if (options.peer_id.type_ == PeerId::Type::EndPoint) { + if (r->_sending_channel.Init(options.peer_id.addr, &channel_opt) != 0) { + LOG(ERROR) << "Fail to init sending channel" + << ", group " << options.group_id; + delete r; + return -1; + } + } else { + std::string naming_service_url; + naming_service_url.append(PROTOCOL_PREFIX); + naming_service_url.append(options.peer_id.hostname_); + if (r->_sending_channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, &channel_opt) != 0) { + LOG(ERROR) << "Fail to init sending channel" + << ", group " << options.group_id; + delete r; + return -1; + } } // bind lifecycle with node, AddRef diff --git a/src/braft/route_table.cpp b/src/braft/route_table.cpp index 000abd55..9f79c66a 100644 --- a/src/braft/route_table.cpp +++ b/src/braft/route_table.cpp @@ -23,6 +23,10 @@ #include #include #include "braft/cli.pb.h" +#include "braft/util.h" + +#include +#include namespace braft { namespace rtb { @@ -78,6 +82,23 @@ DISALLOW_COPY_AND_ASSIGN(RouteTable); return 0; } + std::pair InitAndGetChannelTo(std::string name) { + std::unique_lock lk(_channel_mux); + auto it = _channels.find(name); + if (it != _channels.end()) { + return {true, it->second.get()}; + } + + std::unique_ptr chan_ptr(new brpc::Channel); + if (chan_ptr->Init(name.c_str(), LOAD_BALANCER_NAME, nullptr) != 0) { + LOG(ERROR) << "Fail to init channel to " << name; + return {false, nullptr}; + } + brpc::Channel* chan = chan_ptr.get(); + _channels.emplace(name, std::move(chan_ptr)); + return {true, chan}; + } + private: friend struct DefaultSingletonTraits; RouteTable() { @@ -130,6 +151,9 @@ friend struct DefaultSingletonTraits; } DbMap _map; + + std::unordered_map> _channels; + std::mutex _channel_mux; }; int update_configuration(const GroupId& group, const Configuration& conf) { @@ -174,24 +198,38 @@ butil::Status refresh_leader(const GroupId& group, int timeout_ms) { for (Configuration::const_iterator iter = conf.begin(); iter != conf.end(); ++iter) { brpc::Channel channel; - if (channel.Init(iter->addr, NULL) != 0) { - if (error.ok()) { - error.set_error(-1, "Fail to init channel to %s", - iter->to_string().c_str()); - } else { - std::string saved_et = error.error_str(); - error.set_error(-1, "%s, Fail to init channel to %s", - saved_et.c_str(), - iter->to_string().c_str()); + brpc::Channel *chan_ptr = &channel; + if (iter->type_ == PeerId::Type::EndPoint) { + if (channel.Init(iter->addr, NULL) != 0) { + if (error.ok()) { + error.set_error(-1, "Fail to init EndPoint channel to %s", + iter->to_string().c_str()); + } else { + std::string saved_et = error.error_str(); + error.set_error(-1, "%s, Fail to init EndPoint channel to %s", + saved_et.c_str(), + iter->to_string().c_str()); + } + continue; + } + } else { + std::string naming_service_url; + naming_service_url.append(PROTOCOL_PREFIX); + naming_service_url.append(iter->hostname_); + auto [success, chan] = rtb->InitAndGetChannelTo(naming_service_url); + if (!success) { + error.set_error(-1, "Fail to init HostName channel to %s", + iter->hostname_.c_str()); + continue; } - continue; + chan_ptr = chan; } brpc::Controller cntl; cntl.set_timeout_ms(timeout_ms); GetLeaderRequest request; request.set_group_id(group); GetLeaderResponse respones; - CliService_Stub stub(&channel); + CliService_Stub stub(chan_ptr); stub.get_leader(&cntl, &request, &respones, NULL); if (!cntl.Failed()) { update_leader(group, respones.leader_id()); diff --git a/src/braft/util.h b/src/braft/util.h index 50bec34b..902914f1 100644 --- a/src/braft/util.h +++ b/src/braft/util.h @@ -172,6 +172,10 @@ std::ostream& operator<<(std::ostream& os, const CounterRecorder&); namespace braft { class Closure; +// for Brpc Channel Init Naming Service API +inline const char *PROTOCOL_PREFIX = "http://"; +inline const char *LOAD_BALANCER_NAME = "rr"; + // http://stackoverflow.com/questions/1493936/faster-approach-to-checking-for-an-all-zero-buffer-in-c inline bool is_zero(const char* buff, const size_t size) { if (size >= sizeof(uint64_t)) { diff --git a/test/test_node.cpp b/test/test_node.cpp index 407e16af..c90454d6 100644 --- a/test/test_node.cpp +++ b/test/test_node.cpp @@ -266,9 +266,17 @@ TEST_P(NodeTest, TripleNode) { brpc::Channel channel; brpc::ChannelOptions options; options.protocol = brpc::PROTOCOL_HTTP; - - if (channel.Init(leader->node_id().peer_id.addr, &options) != 0) { - LOG(ERROR) << "Fail to initialize channel"; + if (leader->node_id().peer_id.type_ == PeerId::Type::EndPoint) { + if (channel.Init(leader->node_id().peer_id.addr, &options) != 0) { + LOG(ERROR) << "Fail to initialize channel"; + } + } else { + std::string naming_service_url; + naming_service_url.append(PROTOCOL_PREFIX); + naming_service_url.append(leader->node_id().peer_id.hostname_); + if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, &options) != 0) { + LOG(ERROR) << "Fail to initialize channel"; + } } { @@ -703,8 +711,17 @@ TEST_P(NodeTest, Leader_step_down_during_install_snapshot) { brpc::Channel channel; brpc::ChannelOptions options; options.protocol = brpc::PROTOCOL_HTTP; - if (channel.Init(leader->node_id().peer_id.addr, &options) != 0) { - LOG(ERROR) << "Fail to initialize channel"; + if (leader->node_id().peer_id.type_ == PeerId::Type::EndPoint) { + if (channel.Init(leader->node_id().peer_id.addr, &options) != 0) { + LOG(ERROR) << "Fail to initialize channel"; + } + } else { + std::string naming_service_url; + naming_service_url.append(PROTOCOL_PREFIX); + naming_service_url.append(leader->node_id().peer_id.hostname_); + if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, &options) != 0) { + LOG(ERROR) << "Fail to initialize channel"; + } } { brpc::Controller cntl;