Skip to content

Commit

Permalink
Redo admin coordinator link
Browse files Browse the repository at this point in the history
  • Loading branch information
rescrv committed Jul 9, 2015
1 parent 4b8329d commit efc08b5
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 389 deletions.
2 changes: 0 additions & 2 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ noinst_HEADERS += common/attribute.h
noinst_HEADERS += common/auth_wallet.h
noinst_HEADERS += common/configuration_flags.h
noinst_HEADERS += common/configuration.h
noinst_HEADERS += common/coordinator_link.h
noinst_HEADERS += common/coordinator_returncode.h
noinst_HEADERS += common/datatype_document.h
noinst_HEADERS += common/datatype_float.h
Expand Down Expand Up @@ -504,7 +503,6 @@ libhyperdex_admin_la_SOURCES =
libhyperdex_admin_la_SOURCES += common/attribute.cc
libhyperdex_admin_la_SOURCES += common/attribute_check.cc
libhyperdex_admin_la_SOURCES += common/configuration.cc
libhyperdex_admin_la_SOURCES += common/coordinator_link.cc
libhyperdex_admin_la_SOURCES += common/datatype_document.cc
libhyperdex_admin_la_SOURCES += common/datatype_float.cc
libhyperdex_admin_la_SOURCES += common/datatype_info.cc
Expand Down
145 changes: 94 additions & 51 deletions admin/admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,15 @@
using hyperdex::admin;

admin :: admin(const char* coordinator, uint16_t port)
: m_coord(coordinator, port)
, m_busybee_mapper(m_coord.config())
: m_coord(replicant_client_create(coordinator, port))
, m_busybee_mapper(&m_config)
, m_busybee(&m_busybee_mapper, 0)
, m_config()
, m_config_id(-1)
, m_config_status()
, m_config_state(0)
, m_config_data(NULL)
, m_config_data_sz(0)
, m_next_admin_id(1)
, m_next_server_nonce(1)
, m_handle_coord_ops(false)
Expand Down Expand Up @@ -93,7 +99,7 @@ admin :: dump_config(hyperdex_admin_returncode* status,

int64_t id = m_next_admin_id;
++m_next_admin_id;
std::string tmp = m_coord.config()->dump();
std::string tmp = m_config.dump();
e::intrusive_ptr<pending> op = new pending_string(id, status, HYPERDEX_ADMIN_SUCCESS, tmp, config);
m_yieldable.push_back(op.get());
return op->admin_visible_id();
Expand All @@ -109,7 +115,7 @@ admin :: list_subspaces(const char* space, hyperdex_admin_returncode* status, co

int64_t id = m_next_admin_id;
++m_next_admin_id;
std::string tmp = m_coord.config()->list_subspaces(space);
std::string tmp = m_config.list_subspaces(space);
e::intrusive_ptr<pending> op = new pending_string(id, status, HYPERDEX_ADMIN_SUCCESS, tmp, subspaces);
m_yieldable.push_back(op.get());
return op->admin_visible_id();
Expand All @@ -129,8 +135,8 @@ admin :: read_only(int ro, hyperdex_admin_returncode* status)
e::intrusive_ptr<coord_rpc> op = new coord_rpc_generic(id, status, (set ? "set read-only" : "set read-write"));
char buf[sizeof(uint8_t)];
buf[0] = set ? 1 : 0;
int64_t cid = m_coord.rpc("read_only", buf, sizeof(uint8_t),
&op->repl_status, &op->repl_output, &op->repl_output_sz);
int64_t cid = rpc("read_only", buf, sizeof(uint8_t),
&op->repl_status, &op->repl_output, &op->repl_output_sz);

if (cid >= 0)
{
Expand All @@ -155,7 +161,7 @@ admin :: wait_until_stable(enum hyperdex_admin_returncode* status)
int64_t id = m_next_admin_id;
++m_next_admin_id;
e::intrusive_ptr<coord_rpc> op = new coord_rpc_generic(id, status, "wait for stability");
int64_t cid = m_coord.wait("stable", m_coord.config()->version(), &op->repl_status);
int64_t cid = replicant_client_cond_wait(m_coord, "hyperdex", "stable", m_config.version(), &op->repl_status, NULL, NULL);

if (cid >= 0)
{
Expand Down Expand Up @@ -186,8 +192,8 @@ admin :: fault_tolerance(const char* space, uint64_t ft,
memcpy(&buf[0], space, space_sz);
e::pack64be(ft, &buf[0] + space_sz);

int64_t cid = m_coord.rpc("fault_tolerance", &buf[0], space_sz + sizeof(uint64_t),
&op->repl_status, &op->repl_output, &op->repl_output_sz);
int64_t cid = rpc("fault_tolerance", &buf[0], space_sz + sizeof(uint64_t),
&op->repl_status, &op->repl_output, &op->repl_output_sz);

if (cid >= 0)
{
Expand Down Expand Up @@ -259,8 +265,8 @@ admin :: add_space(const char* description,
int64_t id = m_next_admin_id;
++m_next_admin_id;
e::intrusive_ptr<coord_rpc> op = new coord_rpc_generic(id, status, "add space");
int64_t cid = m_coord.rpc("space_add", reinterpret_cast<const char*>(msg->data()), msg->size(),
&op->repl_status, &op->repl_output, &op->repl_output_sz);
int64_t cid = rpc("space_add", reinterpret_cast<const char*>(msg->data()), msg->size(),
&op->repl_status, &op->repl_output, &op->repl_output_sz);

if (cid >= 0)
{
Expand All @@ -286,8 +292,8 @@ admin :: rm_space(const char* name,
int64_t id = m_next_admin_id;
++m_next_admin_id;
e::intrusive_ptr<coord_rpc> op = new coord_rpc_generic(id, status, "rm space");
int64_t cid = m_coord.rpc("space_rm", name, strlen(name) + 1,
&op->repl_status, &op->repl_output, &op->repl_output_sz);
int64_t cid = rpc("space_rm", name, strlen(name) + 1,
&op->repl_status, &op->repl_output, &op->repl_output_sz);

if (cid >= 0)
{
Expand Down Expand Up @@ -320,8 +326,8 @@ admin :: mv_space(const char* source, const char* target,
int64_t id = m_next_admin_id;
++m_next_admin_id;
e::intrusive_ptr<coord_rpc> op = new coord_rpc_generic(id, status, "mv space");
int64_t cid = m_coord.rpc("space_mv", &buf[0], buf.size(),
&op->repl_status, &op->repl_output, &op->repl_output_sz);
int64_t cid = rpc("space_mv", &buf[0], buf.size(),
&op->repl_status, &op->repl_output, &op->repl_output_sz);

if (cid >= 0)
{
Expand Down Expand Up @@ -354,8 +360,8 @@ admin :: add_index(const char* space, const char* attr,
int64_t id = m_next_admin_id;
++m_next_admin_id;
e::intrusive_ptr<coord_rpc> op = new coord_rpc_generic(id, status, "add_index");
int64_t cid = m_coord.rpc("index_add", &buf[0], buf.size(),
&op->repl_status, &op->repl_output, &op->repl_output_sz);
int64_t cid = rpc("index_add", &buf[0], buf.size(),
&op->repl_status, &op->repl_output, &op->repl_output_sz);

if (cid >= 0)
{
Expand Down Expand Up @@ -383,8 +389,8 @@ admin :: rm_index(uint64_t idxid,
int64_t id = m_next_admin_id;
++m_next_admin_id;
e::intrusive_ptr<coord_rpc> op = new coord_rpc_generic(id, status, "rm_index");
int64_t cid = m_coord.rpc("index_rm", buf, sizeof(uint64_t),
&op->repl_status, &op->repl_output, &op->repl_output_sz);
int64_t cid = rpc("index_rm", buf, sizeof(uint64_t),
&op->repl_status, &op->repl_output, &op->repl_output_sz);

if (cid >= 0)
{
Expand All @@ -409,7 +415,7 @@ admin :: list_indices(const char* space, enum hyperdex_admin_returncode* status,

int64_t id = m_next_admin_id;
++m_next_admin_id;
std::string tmp = m_coord.config()->list_indices(space);
std::string tmp = m_config.list_indices(space);
e::intrusive_ptr<pending> op = new pending_string(id, status, HYPERDEX_ADMIN_SUCCESS, tmp, indexes);
m_yieldable.push_back(op.get());
return op->admin_visible_id();
Expand All @@ -426,7 +432,7 @@ admin :: list_spaces(hyperdex_admin_returncode* status,

int64_t id = m_next_admin_id;
++m_next_admin_id;
std::string tmp = m_coord.config()->list_spaces();
std::string tmp = m_config.list_spaces();
e::intrusive_ptr<pending> op = new pending_string(id, status, HYPERDEX_ADMIN_SUCCESS, tmp, spaces);
m_yieldable.push_back(op.get());
return op->admin_visible_id();
Expand Down Expand Up @@ -459,8 +465,8 @@ admin :: server_register(uint64_t token, const char* address,
e::intrusive_ptr<coord_rpc> op = new coord_rpc_generic(id, status, "register server");
std::auto_ptr<e::buffer> msg(e::buffer::create(sizeof(uint64_t) + pack_size(loc)));
msg->pack() << sid << loc;
int64_t cid = m_coord.rpc("server_register", reinterpret_cast<const char*>(msg->data()), msg->size(),
&op->repl_status, &op->repl_output, &op->repl_output_sz);
int64_t cid = rpc("server_register", reinterpret_cast<const char*>(msg->data()), msg->size(),
&op->repl_status, &op->repl_output, &op->repl_output_sz);

if (cid >= 0)
{
Expand All @@ -487,8 +493,8 @@ admin :: server_online(uint64_t token, enum hyperdex_admin_returncode* status)
e::intrusive_ptr<coord_rpc> op = new coord_rpc_generic(id, status, "bring server online");
char buf[sizeof(uint64_t)];
e::pack64be(token, buf);
int64_t cid = m_coord.rpc("server_online", buf, sizeof(uint64_t),
&op->repl_status, &op->repl_output, &op->repl_output_sz);
int64_t cid = rpc("server_online", buf, sizeof(uint64_t),
&op->repl_status, &op->repl_output, &op->repl_output_sz);

if (cid >= 0)
{
Expand All @@ -515,8 +521,8 @@ admin :: server_offline(uint64_t token, enum hyperdex_admin_returncode* status)
e::intrusive_ptr<coord_rpc> op = new coord_rpc_generic(id, status, "bring server offline");
char buf[sizeof(uint64_t)];
e::pack64be(token, buf);
int64_t cid = m_coord.rpc("server_offline", buf, sizeof(uint64_t),
&op->repl_status, &op->repl_output, &op->repl_output_sz);
int64_t cid = rpc("server_offline", buf, sizeof(uint64_t),
&op->repl_status, &op->repl_output, &op->repl_output_sz);

if (cid >= 0)
{
Expand All @@ -543,8 +549,8 @@ admin :: server_forget(uint64_t token, enum hyperdex_admin_returncode* status)
e::intrusive_ptr<coord_rpc> op = new coord_rpc_generic(id, status, "forget server");
char buf[sizeof(uint64_t)];
e::pack64be(token, buf);
int64_t cid = m_coord.rpc("server_forget", buf, sizeof(uint64_t),
&op->repl_status, &op->repl_output, &op->repl_output_sz);
int64_t cid = rpc("server_forget", buf, sizeof(uint64_t),
&op->repl_status, &op->repl_output, &op->repl_output_sz);

if (cid >= 0)
{
Expand All @@ -571,8 +577,8 @@ admin :: server_kill(uint64_t token, enum hyperdex_admin_returncode* status)
e::intrusive_ptr<coord_rpc> op = new coord_rpc_generic(id, status, "kill server");
char buf[sizeof(uint64_t)];
e::pack64be(token, buf);
int64_t cid = m_coord.rpc("server_kill", buf, sizeof(uint64_t),
&op->repl_status, &op->repl_output, &op->repl_output_sz);
int64_t cid = rpc("server_kill", buf, sizeof(uint64_t),
&op->repl_status, &op->repl_output, &op->repl_output_sz);

if (cid >= 0)
{
Expand Down Expand Up @@ -619,7 +625,7 @@ admin :: coord_backup(const char* path,
int64_t id = m_next_admin_id;
++m_next_admin_id;
e::intrusive_ptr<coord_rpc> op = new coord_rpc_backup(id, status, path);
int64_t cid = m_coord.backup(&op->repl_status, &op->repl_output, &op->repl_output_sz);
int64_t cid = replicant_client_backup_object(m_coord, "hyperdex", &op->repl_status, &op->repl_output, &op->repl_output_sz);

if (cid >= 0)
{
Expand Down Expand Up @@ -680,7 +686,7 @@ admin :: enable_perf_counters(hyperdex_admin_returncode* status,
int64_t id = m_next_admin_id;
++m_next_admin_id;
m_pcs = new pending_perf_counters(id, status, pc);
m_pcs->send_perf_reqs(this, m_coord.config(), status);
m_pcs->send_perf_reqs(this, &m_config, status);
return m_pcs->admin_visible_id();
}
}
Expand Down Expand Up @@ -773,7 +779,7 @@ admin :: loop(int timeout, hyperdex_admin_returncode* status)

if (t <= 0)
{
m_pcs->send_perf_reqs(this, m_coord.config(), status);
m_pcs->send_perf_reqs(this, &m_config, status);
t = m_pcs->millis_to_next_send();
}

Expand Down Expand Up @@ -801,7 +807,7 @@ admin :: loop(int timeout, hyperdex_admin_returncode* status)
{
m_handle_coord_ops = false;
replicant_returncode lrc = REPLICANT_GARBAGE;
int64_t lid = m_coord.loop(0, &lrc);
int64_t lid = replicant_client_loop(m_coord, 0, &lrc);

if (lid < 0 && lrc != REPLICANT_TIMEOUT)
{
Expand Down Expand Up @@ -960,9 +966,7 @@ admin :: interpret_replicant_returncode(replicant_returncode rstatus,
case REPLICANT_COND_NOT_FOUND:
case REPLICANT_COND_DESTROYED:
INTERPRET_ERROR(COORDFAIL) << "persistent coordinator error: "
<< m_coord.error_message()
<< " @ "
<< m_coord.error_location();
<< replicant_client_error_message(m_coord);
break;
case REPLICANT_MAYBE:
INTERPRET_ERROR(COORDFAIL) << "transient coordinator error: "
Expand All @@ -976,9 +980,7 @@ admin :: interpret_replicant_returncode(replicant_returncode rstatus,
case REPLICANT_SERVER_ERROR:
case REPLICANT_COMM_FAILED:
INTERPRET_ERROR(COORDFAIL) << "transient coordinator error: "
<< m_coord.error_message()
<< " @ "
<< m_coord.error_location();
<< replicant_client_error_message(m_coord);
break;
case REPLICANT_TIMEOUT:
INTERPRET_ERROR(TIMEOUT) << "operation timed out";
Expand All @@ -994,33 +996,74 @@ admin :: interpret_replicant_returncode(replicant_returncode rstatus,
case REPLICANT_GARBAGE:
default:
INTERPRET_ERROR(INTERNAL) << "internal library error: "
<< m_coord.error_message()
<< " @ "
<< m_coord.error_location();
<< replicant_client_error_message(m_coord);
break;
}
}

bool
admin :: maintain_coord_connection(hyperdex_admin_returncode* status)
{
if (m_config_status != REPLICANT_SUCCESS)
{
replicant_client_kill(m_coord, m_config_id);
m_config_id = -1;
}

replicant_returncode rc;
e::error err;

if (!m_coord.ensure_configuration(&rc))
if (m_config_id < 0)
{
interpret_replicant_returncode(rc, status, &m_last_error);
m_config_status = REPLICANT_SUCCESS;
m_config_id = replicant_client_cond_follow(m_coord, "hyperdex", "config",
&m_config_status, &m_config_state,
&m_config_data, &m_config_data_sz);
if (replicant_client_wait(m_coord, m_config_id, -1, &rc) < 0)
{
ERROR(COORDFAIL) << "coordinator failure: " << replicant_client_error_message(m_coord);
return false;
}
}

if (replicant_client_wait(m_coord, m_config_id, 0, &rc) < 0)
{
if (rc == REPLICANT_INTERRUPTED)
{
ERROR(INTERRUPTED) << "interrupted by a signal";
return false;
}
else if (rc != REPLICANT_NONE_PENDING && rc != REPLICANT_TIMEOUT)
{
ERROR(COORDFAIL) << "coordinator failure: " << replicant_client_error_message(m_coord);
return false;
}
}

if (m_busybee.set_external_fd(m_coord.poll_fd()) != BUSYBEE_SUCCESS)
if (m_config.version() < m_config_state)
{
ERROR(POLLFAILED) << "poll failed";
return false;
configuration new_config;
e::unpacker up(m_config_data, m_config_data_sz);
up = up >> new_config;

if (!up.error())
{
m_config = new_config;
}
}

return true;
}

int64_t
admin :: rpc(const char* func,
const char* data, size_t data_sz,
replicant_returncode* status,
char** output, size_t* output_sz)
{
return replicant_client_call(m_coord, "hyperdex", func, data, data_sz,
REPLICANT_CALL_ROBUST, status, output, output_sz);
}

bool
admin :: send(network_msgtype mt,
server_id id,
Expand All @@ -1031,7 +1074,7 @@ admin :: send(network_msgtype mt,
{
const uint8_t type = static_cast<uint8_t>(mt);
const uint8_t flags = 0;
const uint64_t version = m_coord.config()->version();
const uint64_t version = m_config.version();
msg->pack_at(BUSYBEE_HEADER_SIZE)
<< type << flags << version << uint64_t(UINT64_MAX) << nonce;
m_busybee.set_timeout(-1);
Expand Down
Loading

0 comments on commit efc08b5

Please sign in to comment.