From efc08b503ec193ac201da168f5c801165373d789 Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Thu, 9 Jul 2015 12:27:12 -0700 Subject: [PATCH] Redo admin coordinator link --- Makefile.am | 2 - admin/admin.cc | 145 +++++++++++++-------- admin/admin.h | 24 +++- admin/backup_state_machine.cc | 6 +- common/coordinator_link.cc | 229 ---------------------------------- common/coordinator_link.h | 102 --------------- 6 files changed, 119 insertions(+), 389 deletions(-) delete mode 100644 common/coordinator_link.cc delete mode 100644 common/coordinator_link.h diff --git a/Makefile.am b/Makefile.am index bed03819..f2413f62 100644 --- a/Makefile.am +++ b/Makefile.am @@ -116,7 +116,6 @@ noinst_HEADERS += common/attribute.h noinst_HEADERS += common/auth_wallet.h noinst_HEADERS += common/configuration_flags.h noinst_HEADERS += common/configuration.h -noinst_HEADERS += common/coordinator_link.h noinst_HEADERS += common/coordinator_returncode.h noinst_HEADERS += common/datatype_document.h noinst_HEADERS += common/datatype_float.h @@ -504,7 +503,6 @@ libhyperdex_admin_la_SOURCES = libhyperdex_admin_la_SOURCES += common/attribute.cc libhyperdex_admin_la_SOURCES += common/attribute_check.cc libhyperdex_admin_la_SOURCES += common/configuration.cc -libhyperdex_admin_la_SOURCES += common/coordinator_link.cc libhyperdex_admin_la_SOURCES += common/datatype_document.cc libhyperdex_admin_la_SOURCES += common/datatype_float.cc libhyperdex_admin_la_SOURCES += common/datatype_info.cc diff --git a/admin/admin.cc b/admin/admin.cc index cbddf0ee..54f9a253 100644 --- a/admin/admin.cc +++ b/admin/admin.cc @@ -60,9 +60,15 @@ using hyperdex::admin; admin :: admin(const char* coordinator, uint16_t port) - : m_coord(coordinator, port) - , m_busybee_mapper(m_coord.config()) + : m_coord(replicant_client_create(coordinator, port)) + , m_busybee_mapper(&m_config) , m_busybee(&m_busybee_mapper, 0) + , m_config() + , m_config_id(-1) + , m_config_status() + , m_config_state(0) + , m_config_data(NULL) + , m_config_data_sz(0) , m_next_admin_id(1) , m_next_server_nonce(1) , m_handle_coord_ops(false) @@ -93,7 +99,7 @@ admin :: dump_config(hyperdex_admin_returncode* status, int64_t id = m_next_admin_id; ++m_next_admin_id; - std::string tmp = m_coord.config()->dump(); + std::string tmp = m_config.dump(); e::intrusive_ptr op = new pending_string(id, status, HYPERDEX_ADMIN_SUCCESS, tmp, config); m_yieldable.push_back(op.get()); return op->admin_visible_id(); @@ -109,7 +115,7 @@ admin :: list_subspaces(const char* space, hyperdex_admin_returncode* status, co int64_t id = m_next_admin_id; ++m_next_admin_id; - std::string tmp = m_coord.config()->list_subspaces(space); + std::string tmp = m_config.list_subspaces(space); e::intrusive_ptr op = new pending_string(id, status, HYPERDEX_ADMIN_SUCCESS, tmp, subspaces); m_yieldable.push_back(op.get()); return op->admin_visible_id(); @@ -129,8 +135,8 @@ admin :: read_only(int ro, hyperdex_admin_returncode* status) e::intrusive_ptr op = new coord_rpc_generic(id, status, (set ? "set read-only" : "set read-write")); char buf[sizeof(uint8_t)]; buf[0] = set ? 1 : 0; - int64_t cid = m_coord.rpc("read_only", buf, sizeof(uint8_t), - &op->repl_status, &op->repl_output, &op->repl_output_sz); + int64_t cid = rpc("read_only", buf, sizeof(uint8_t), + &op->repl_status, &op->repl_output, &op->repl_output_sz); if (cid >= 0) { @@ -155,7 +161,7 @@ admin :: wait_until_stable(enum hyperdex_admin_returncode* status) int64_t id = m_next_admin_id; ++m_next_admin_id; e::intrusive_ptr op = new coord_rpc_generic(id, status, "wait for stability"); - int64_t cid = m_coord.wait("stable", m_coord.config()->version(), &op->repl_status); + int64_t cid = replicant_client_cond_wait(m_coord, "hyperdex", "stable", m_config.version(), &op->repl_status, NULL, NULL); if (cid >= 0) { @@ -186,8 +192,8 @@ admin :: fault_tolerance(const char* space, uint64_t ft, memcpy(&buf[0], space, space_sz); e::pack64be(ft, &buf[0] + space_sz); - int64_t cid = m_coord.rpc("fault_tolerance", &buf[0], space_sz + sizeof(uint64_t), - &op->repl_status, &op->repl_output, &op->repl_output_sz); + int64_t cid = rpc("fault_tolerance", &buf[0], space_sz + sizeof(uint64_t), + &op->repl_status, &op->repl_output, &op->repl_output_sz); if (cid >= 0) { @@ -259,8 +265,8 @@ admin :: add_space(const char* description, int64_t id = m_next_admin_id; ++m_next_admin_id; e::intrusive_ptr op = new coord_rpc_generic(id, status, "add space"); - int64_t cid = m_coord.rpc("space_add", reinterpret_cast(msg->data()), msg->size(), - &op->repl_status, &op->repl_output, &op->repl_output_sz); + int64_t cid = rpc("space_add", reinterpret_cast(msg->data()), msg->size(), + &op->repl_status, &op->repl_output, &op->repl_output_sz); if (cid >= 0) { @@ -286,8 +292,8 @@ admin :: rm_space(const char* name, int64_t id = m_next_admin_id; ++m_next_admin_id; e::intrusive_ptr op = new coord_rpc_generic(id, status, "rm space"); - int64_t cid = m_coord.rpc("space_rm", name, strlen(name) + 1, - &op->repl_status, &op->repl_output, &op->repl_output_sz); + int64_t cid = rpc("space_rm", name, strlen(name) + 1, + &op->repl_status, &op->repl_output, &op->repl_output_sz); if (cid >= 0) { @@ -320,8 +326,8 @@ admin :: mv_space(const char* source, const char* target, int64_t id = m_next_admin_id; ++m_next_admin_id; e::intrusive_ptr op = new coord_rpc_generic(id, status, "mv space"); - int64_t cid = m_coord.rpc("space_mv", &buf[0], buf.size(), - &op->repl_status, &op->repl_output, &op->repl_output_sz); + int64_t cid = rpc("space_mv", &buf[0], buf.size(), + &op->repl_status, &op->repl_output, &op->repl_output_sz); if (cid >= 0) { @@ -354,8 +360,8 @@ admin :: add_index(const char* space, const char* attr, int64_t id = m_next_admin_id; ++m_next_admin_id; e::intrusive_ptr op = new coord_rpc_generic(id, status, "add_index"); - int64_t cid = m_coord.rpc("index_add", &buf[0], buf.size(), - &op->repl_status, &op->repl_output, &op->repl_output_sz); + int64_t cid = rpc("index_add", &buf[0], buf.size(), + &op->repl_status, &op->repl_output, &op->repl_output_sz); if (cid >= 0) { @@ -383,8 +389,8 @@ admin :: rm_index(uint64_t idxid, int64_t id = m_next_admin_id; ++m_next_admin_id; e::intrusive_ptr op = new coord_rpc_generic(id, status, "rm_index"); - int64_t cid = m_coord.rpc("index_rm", buf, sizeof(uint64_t), - &op->repl_status, &op->repl_output, &op->repl_output_sz); + int64_t cid = rpc("index_rm", buf, sizeof(uint64_t), + &op->repl_status, &op->repl_output, &op->repl_output_sz); if (cid >= 0) { @@ -409,7 +415,7 @@ admin :: list_indices(const char* space, enum hyperdex_admin_returncode* status, int64_t id = m_next_admin_id; ++m_next_admin_id; - std::string tmp = m_coord.config()->list_indices(space); + std::string tmp = m_config.list_indices(space); e::intrusive_ptr op = new pending_string(id, status, HYPERDEX_ADMIN_SUCCESS, tmp, indexes); m_yieldable.push_back(op.get()); return op->admin_visible_id(); @@ -426,7 +432,7 @@ admin :: list_spaces(hyperdex_admin_returncode* status, int64_t id = m_next_admin_id; ++m_next_admin_id; - std::string tmp = m_coord.config()->list_spaces(); + std::string tmp = m_config.list_spaces(); e::intrusive_ptr op = new pending_string(id, status, HYPERDEX_ADMIN_SUCCESS, tmp, spaces); m_yieldable.push_back(op.get()); return op->admin_visible_id(); @@ -459,8 +465,8 @@ admin :: server_register(uint64_t token, const char* address, e::intrusive_ptr op = new coord_rpc_generic(id, status, "register server"); std::auto_ptr msg(e::buffer::create(sizeof(uint64_t) + pack_size(loc))); msg->pack() << sid << loc; - int64_t cid = m_coord.rpc("server_register", reinterpret_cast(msg->data()), msg->size(), - &op->repl_status, &op->repl_output, &op->repl_output_sz); + int64_t cid = rpc("server_register", reinterpret_cast(msg->data()), msg->size(), + &op->repl_status, &op->repl_output, &op->repl_output_sz); if (cid >= 0) { @@ -487,8 +493,8 @@ admin :: server_online(uint64_t token, enum hyperdex_admin_returncode* status) e::intrusive_ptr op = new coord_rpc_generic(id, status, "bring server online"); char buf[sizeof(uint64_t)]; e::pack64be(token, buf); - int64_t cid = m_coord.rpc("server_online", buf, sizeof(uint64_t), - &op->repl_status, &op->repl_output, &op->repl_output_sz); + int64_t cid = rpc("server_online", buf, sizeof(uint64_t), + &op->repl_status, &op->repl_output, &op->repl_output_sz); if (cid >= 0) { @@ -515,8 +521,8 @@ admin :: server_offline(uint64_t token, enum hyperdex_admin_returncode* status) e::intrusive_ptr op = new coord_rpc_generic(id, status, "bring server offline"); char buf[sizeof(uint64_t)]; e::pack64be(token, buf); - int64_t cid = m_coord.rpc("server_offline", buf, sizeof(uint64_t), - &op->repl_status, &op->repl_output, &op->repl_output_sz); + int64_t cid = rpc("server_offline", buf, sizeof(uint64_t), + &op->repl_status, &op->repl_output, &op->repl_output_sz); if (cid >= 0) { @@ -543,8 +549,8 @@ admin :: server_forget(uint64_t token, enum hyperdex_admin_returncode* status) e::intrusive_ptr op = new coord_rpc_generic(id, status, "forget server"); char buf[sizeof(uint64_t)]; e::pack64be(token, buf); - int64_t cid = m_coord.rpc("server_forget", buf, sizeof(uint64_t), - &op->repl_status, &op->repl_output, &op->repl_output_sz); + int64_t cid = rpc("server_forget", buf, sizeof(uint64_t), + &op->repl_status, &op->repl_output, &op->repl_output_sz); if (cid >= 0) { @@ -571,8 +577,8 @@ admin :: server_kill(uint64_t token, enum hyperdex_admin_returncode* status) e::intrusive_ptr op = new coord_rpc_generic(id, status, "kill server"); char buf[sizeof(uint64_t)]; e::pack64be(token, buf); - int64_t cid = m_coord.rpc("server_kill", buf, sizeof(uint64_t), - &op->repl_status, &op->repl_output, &op->repl_output_sz); + int64_t cid = rpc("server_kill", buf, sizeof(uint64_t), + &op->repl_status, &op->repl_output, &op->repl_output_sz); if (cid >= 0) { @@ -619,7 +625,7 @@ admin :: coord_backup(const char* path, int64_t id = m_next_admin_id; ++m_next_admin_id; e::intrusive_ptr op = new coord_rpc_backup(id, status, path); - int64_t cid = m_coord.backup(&op->repl_status, &op->repl_output, &op->repl_output_sz); + int64_t cid = replicant_client_backup_object(m_coord, "hyperdex", &op->repl_status, &op->repl_output, &op->repl_output_sz); if (cid >= 0) { @@ -680,7 +686,7 @@ admin :: enable_perf_counters(hyperdex_admin_returncode* status, int64_t id = m_next_admin_id; ++m_next_admin_id; m_pcs = new pending_perf_counters(id, status, pc); - m_pcs->send_perf_reqs(this, m_coord.config(), status); + m_pcs->send_perf_reqs(this, &m_config, status); return m_pcs->admin_visible_id(); } } @@ -773,7 +779,7 @@ admin :: loop(int timeout, hyperdex_admin_returncode* status) if (t <= 0) { - m_pcs->send_perf_reqs(this, m_coord.config(), status); + m_pcs->send_perf_reqs(this, &m_config, status); t = m_pcs->millis_to_next_send(); } @@ -801,7 +807,7 @@ admin :: loop(int timeout, hyperdex_admin_returncode* status) { m_handle_coord_ops = false; replicant_returncode lrc = REPLICANT_GARBAGE; - int64_t lid = m_coord.loop(0, &lrc); + int64_t lid = replicant_client_loop(m_coord, 0, &lrc); if (lid < 0 && lrc != REPLICANT_TIMEOUT) { @@ -960,9 +966,7 @@ admin :: interpret_replicant_returncode(replicant_returncode rstatus, case REPLICANT_COND_NOT_FOUND: case REPLICANT_COND_DESTROYED: INTERPRET_ERROR(COORDFAIL) << "persistent coordinator error: " - << m_coord.error_message() - << " @ " - << m_coord.error_location(); + << replicant_client_error_message(m_coord); break; case REPLICANT_MAYBE: INTERPRET_ERROR(COORDFAIL) << "transient coordinator error: " @@ -976,9 +980,7 @@ admin :: interpret_replicant_returncode(replicant_returncode rstatus, case REPLICANT_SERVER_ERROR: case REPLICANT_COMM_FAILED: INTERPRET_ERROR(COORDFAIL) << "transient coordinator error: " - << m_coord.error_message() - << " @ " - << m_coord.error_location(); + << replicant_client_error_message(m_coord); break; case REPLICANT_TIMEOUT: INTERPRET_ERROR(TIMEOUT) << "operation timed out"; @@ -994,9 +996,7 @@ admin :: interpret_replicant_returncode(replicant_returncode rstatus, case REPLICANT_GARBAGE: default: INTERPRET_ERROR(INTERNAL) << "internal library error: " - << m_coord.error_message() - << " @ " - << m_coord.error_location(); + << replicant_client_error_message(m_coord); break; } } @@ -1004,23 +1004,66 @@ admin :: interpret_replicant_returncode(replicant_returncode rstatus, bool admin :: maintain_coord_connection(hyperdex_admin_returncode* status) { + if (m_config_status != REPLICANT_SUCCESS) + { + replicant_client_kill(m_coord, m_config_id); + m_config_id = -1; + } + replicant_returncode rc; - e::error err; - if (!m_coord.ensure_configuration(&rc)) + if (m_config_id < 0) { - interpret_replicant_returncode(rc, status, &m_last_error); + m_config_status = REPLICANT_SUCCESS; + m_config_id = replicant_client_cond_follow(m_coord, "hyperdex", "config", + &m_config_status, &m_config_state, + &m_config_data, &m_config_data_sz); + if (replicant_client_wait(m_coord, m_config_id, -1, &rc) < 0) + { + ERROR(COORDFAIL) << "coordinator failure: " << replicant_client_error_message(m_coord); + return false; + } + } + + if (replicant_client_wait(m_coord, m_config_id, 0, &rc) < 0) + { + if (rc == REPLICANT_INTERRUPTED) + { + ERROR(INTERRUPTED) << "interrupted by a signal"; + return false; + } + else if (rc != REPLICANT_NONE_PENDING && rc != REPLICANT_TIMEOUT) + { + ERROR(COORDFAIL) << "coordinator failure: " << replicant_client_error_message(m_coord); + return false; + } } - if (m_busybee.set_external_fd(m_coord.poll_fd()) != BUSYBEE_SUCCESS) + if (m_config.version() < m_config_state) { - ERROR(POLLFAILED) << "poll failed"; - return false; + configuration new_config; + e::unpacker up(m_config_data, m_config_data_sz); + up = up >> new_config; + + if (!up.error()) + { + m_config = new_config; + } } return true; } +int64_t +admin :: rpc(const char* func, + const char* data, size_t data_sz, + replicant_returncode* status, + char** output, size_t* output_sz) +{ + return replicant_client_call(m_coord, "hyperdex", func, data, data_sz, + REPLICANT_CALL_ROBUST, status, output, output_sz); +} + bool admin :: send(network_msgtype mt, server_id id, @@ -1031,7 +1074,7 @@ admin :: send(network_msgtype mt, { const uint8_t type = static_cast(mt); const uint8_t flags = 0; - const uint64_t version = m_coord.config()->version(); + const uint64_t version = m_config.version(); msg->pack_at(BUSYBEE_HEADER_SIZE) << type << flags << version << uint64_t(UINT64_MAX) << nonce; m_busybee.set_timeout(-1); diff --git a/admin/admin.h b/admin/admin.h index 7cd24e4c..4b1a2c8b 100644 --- a/admin/admin.h +++ b/admin/admin.h @@ -37,10 +37,12 @@ // BusyBee #include +// Replicant +#include + // HyperDex #include "include/hyperdex/admin.h" #include "namespace.h" -#include "common/coordinator_link.h" #include "common/mapper.h" #include "admin/coord_rpc.h" #include "admin/multi_yieldable.h" @@ -134,6 +136,10 @@ class admin private: bool maintain_coord_connection(hyperdex_admin_returncode* status); + int64_t rpc(const char* func, + const char* data, size_t data_sz, + replicant_returncode* status, + char** output, size_t* output_sz); bool send(network_msgtype mt, server_id id, uint64_t nonce, @@ -143,11 +149,20 @@ class admin void handle_disruption(const server_id& si); private: - coordinator_link m_coord; + replicant_client* m_coord; mapper m_busybee_mapper; busybee_st m_busybee; + // configuration + configuration m_config; + int64_t m_config_id; + replicant_returncode m_config_status; + uint64_t m_config_state; + char* m_config_data; + size_t m_config_data_sz; + // nonces int64_t m_next_admin_id; uint64_t m_next_server_nonce; + // operations bool m_handle_coord_ops; coord_rpc_map_t m_coord_ops; pending_map_t m_server_ops; @@ -157,7 +172,12 @@ class admin e::intrusive_ptr m_yielding; e::intrusive_ptr m_yielded; e::intrusive_ptr m_pcs; + // misc e::error m_last_error; + + private: + admin(const admin&); + admin& operator = (const admin&); }; END_HYPERDEX_NAMESPACE diff --git a/admin/backup_state_machine.cc b/admin/backup_state_machine.cc index ad37929c..f9221768 100644 --- a/admin/backup_state_machine.cc +++ b/admin/backup_state_machine.cc @@ -240,10 +240,10 @@ backup_state_machine :: callback_wait_to_quiesce(admin* adm, int64_t id) // at this point we know that: // 1. the cluster is in read-only mode // 2. every write initiated before setting it to read-only mode is complete - m_configuration_version = adm->m_coord.config()->version(); + m_configuration_version = adm->m_config.version(); // now figure out the servers to take a backup on - adm->m_coord.config()->get_all_addresses(&m_servers); + adm->m_config.get_all_addresses(&m_servers); std::sort(m_servers.rbegin(), m_servers.rend()); return callback_daemon_backup(adm, id); } @@ -319,7 +319,7 @@ backup_state_machine :: callback_wait_to_quiesce_again(admin* adm, int64_t id) return; } - if (m_configuration_version != adm->m_coord.config()->version()) + if (m_configuration_version != adm->m_config.version()) { YIELDING_ERROR(INTERNAL) << "configuration changed while taking backup"; backout(adm); diff --git a/common/coordinator_link.cc b/common/coordinator_link.cc deleted file mode 100644 index bee407cd..00000000 --- a/common/coordinator_link.cc +++ /dev/null @@ -1,229 +0,0 @@ -// Copyright (c) 2013-2015, Cornell University -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are met: -// -// * Redistributions of source code must retain the above copyright notice, -// this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above copyright -// notice, this list of conditions and the following disclaimer in the -// documentation and/or other materials provided with the distribution. -// * Neither the name of HyperDex nor the names of its contributors may be -// used to endorse or promote products derived from this software without -// specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -// POSSIBILITY OF SUCH DAMAGE. - -// HyperDex -#include "common/coordinator_link.h" - -using hyperdex::coordinator_link; - -coordinator_link :: coordinator_link(const char* coordinator, uint16_t port) - : m_repl(replicant_client_create(coordinator, port)) - , m_config() - , m_id(-1) - , m_status(REPLICANT_GARBAGE) - , m_output(NULL) - , m_output_sz(0) -{ - if (!m_repl) - { - throw std::bad_alloc(); - } -} - -coordinator_link :: coordinator_link(const char* conn_str) - : m_repl(replicant_client_create_conn_str(conn_str)) - , m_config() - , m_id(-1) - , m_status(REPLICANT_GARBAGE) - , m_output(NULL) - , m_output_sz(0) -{ - if (!m_repl) - { - throw std::bad_alloc(); - } -} - -coordinator_link :: ~coordinator_link() throw () -{ - reset(); - replicant_client_destroy(m_repl); -} - -bool -coordinator_link :: ensure_configuration(replicant_returncode* status) -{ - if (!prime_state_machine(status)) - { - return false; - } - - assert(m_id >= 0); - int timeout = m_config.cluster() == 0 ? -1 : 0; - int64_t lid = replicant_client_wait(m_repl, m_id, timeout, status); - - if (lid < 0) - { - return *status == REPLICANT_TIMEOUT && timeout == 0; - } - - assert(lid == m_id); - return process_new_configuration(status); -} - -int64_t -coordinator_link :: rpc(const char* func, - const char* data, size_t data_sz, - replicant_returncode* status, - char** output, size_t* output_sz) -{ - return replicant_client_call(m_repl, "hyperdex", func, data, data_sz, - REPLICANT_CALL_ROBUST, status, output, output_sz); -} - -int64_t -coordinator_link :: rpc_defended(const char* enter_func, - const char* enter_data, size_t enter_data_sz, - const char* exit_func, - const char* exit_data, size_t exit_data_sz, - replicant_returncode* status) -{ - return replicant_client_defended_call(m_repl, "hyperdex", enter_func, enter_data, enter_data_sz, - exit_func, exit_data, exit_data_sz, status); -} - -int64_t -coordinator_link :: backup(replicant_returncode* status, - char** output, size_t* output_sz) -{ - return replicant_client_backup_object(m_repl, "hyperdex", status, output, output_sz); -} - -int64_t -coordinator_link :: wait(const char* cond, uint64_t state, - replicant_returncode* status) -{ - return replicant_client_cond_wait(m_repl, "hyperdex", cond, state, status, NULL, NULL); -} - -int64_t -coordinator_link :: loop(int timeout, replicant_returncode* status) -{ - if (!prime_state_machine(status)) - { - return -1; - } - - int64_t lid = replicant_client_loop(m_repl, timeout, status); - - if (lid == m_id) - { - return process_new_configuration(status) ? INT64_MAX : -1; - } - else - { - return lid; - } -} - -int64_t -coordinator_link :: wait(int64_t id, int timeout, replicant_returncode* status) -{ - if (!prime_state_machine(status)) - { - return -1; - } - - if (id == INT64_MAX) - { - id = m_id; - } - - int64_t lid = replicant_client_wait(m_repl, id, timeout, status); - - if (lid == m_id) - { - return process_new_configuration(status) ? INT64_MAX : -1; - } - else - { - return lid; - } -} - -bool -coordinator_link :: prime_state_machine(replicant_returncode* status) -{ - if (m_id >= 0) - { - return true; - } - - m_id = replicant_client_cond_wait(m_repl, "hyperdex", "config", m_config.version() + 1, &m_status, &m_output, &m_output_sz); - *status = m_status; - - if (m_id >= 0) - { - return true; - } - else - { - reset(); - return false; - } -} - -bool -coordinator_link :: process_new_configuration(replicant_returncode* status) -{ - m_id = -1; - - if (m_status != REPLICANT_SUCCESS) - { - *status = m_status; - reset(); - return false; - } - - e::unpacker up(m_output, m_output_sz); - configuration new_config; - up = up >> new_config; - reset(); - - if (up.error()) - { - *status = REPLICANT_SERVER_ERROR; - return false; - } - - m_config = new_config; - return true; -} - -void -coordinator_link :: reset() -{ - if (m_output) - { - free(m_output); - } - - m_id = -1; - m_status = REPLICANT_GARBAGE; - m_output = NULL; - m_output_sz = 0; -} diff --git a/common/coordinator_link.h b/common/coordinator_link.h deleted file mode 100644 index 33d0f8b3..00000000 --- a/common/coordinator_link.h +++ /dev/null @@ -1,102 +0,0 @@ -// Copyright (c) 2013, Cornell University -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are met: -// -// * Redistributions of source code must retain the above copyright notice, -// this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above copyright -// notice, this list of conditions and the following disclaimer in the -// documentation and/or other materials provided with the distribution. -// * Neither the name of HyperDex nor the names of its contributors may be -// used to endorse or promote products derived from this software without -// specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -// POSSIBILITY OF SUCH DAMAGE. - -#ifndef hyperdex_common_coordinator_link_h_ -#define hyperdex_common_coordinator_link_h_ - -#define __STDC_LIMIT_MACROS - -// C -#include - -// STL -#include - -// e -#include - -// Replicant -#include - -// HyperDex -#include "namespace.h" -#include "common/configuration.h" - -BEGIN_HYPERDEX_NAMESPACE - -class coordinator_link -{ - public: - coordinator_link(const char* coordinator, uint16_t port); - coordinator_link(const char* conn_str); - ~coordinator_link() throw (); - - public: - const configuration* config() const { return &m_config; } - int poll_fd() { return replicant_client_poll_fd(m_repl); } - // true if there's a configuration to use - // false if there's an error to report - bool ensure_configuration(replicant_returncode* status); - int64_t rpc(const char* func, - const char* data, size_t data_sz, - replicant_returncode* status, - char** output, size_t* output_sz); - int64_t rpc_defended(const char* enter_func, - const char* enter_data, size_t enter_data_sz, - const char* exit_func, - const char* exit_data, size_t exit_data_sz, - replicant_returncode* status); - int64_t backup(replicant_returncode* status, - char** output, size_t* output_sz); - int64_t wait(const char* cond, uint64_t state, - replicant_returncode* status); - int64_t loop(int timeout, replicant_returncode* status); - int64_t wait(int64_t id, int timeout, replicant_returncode* status); - std::string error_message() const { return replicant_client_error_message(m_repl); } - std::string error_location() const { return replicant_client_error_location(m_repl); } - - private: - coordinator_link(const coordinator_link&); - coordinator_link& operator = (const coordinator_link&); - - private: - bool prime_state_machine(replicant_returncode* status); - bool process_new_configuration(replicant_returncode* status); - void reset(); - - private: - replicant_client* m_repl; - configuration m_config; - int64_t m_id; - replicant_returncode m_status; - char* m_output; - size_t m_output_sz; -}; - -END_HYPERDEX_NAMESPACE - -#endif // hyperdex_common_coordinator_link_h_