Skip to content

Commit

Permalink
Simplify client coordinator link
Browse files Browse the repository at this point in the history
  • Loading branch information
rescrv committed Jul 2, 2015
1 parent e07d04c commit 809a644
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 49 deletions.
1 change: 0 additions & 1 deletion Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,6 @@ libhyperdex_client_la_SOURCES += common/attribute.cc
libhyperdex_client_la_SOURCES += common/attribute_check.cc
libhyperdex_client_la_SOURCES += common/auth_wallet.cc
libhyperdex_client_la_SOURCES += common/configuration.cc
libhyperdex_client_la_SOURCES += common/coordinator_link.cc
libhyperdex_client_la_SOURCES += common/datatype_document.cc
libhyperdex_client_la_SOURCES += common/datatype_float.cc
libhyperdex_client_la_SOURCES += common/datatype_info.cc
Expand Down
116 changes: 78 additions & 38 deletions client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,18 @@ using hyperdex::client;
using hyperdex::microtransaction;

client :: client(const char* coordinator, uint16_t port)
: m_coord(coordinator, port)
, m_busybee_mapper(m_coord.config())
: m_coord(replicant_client_create(coordinator, port))
, m_busybee_mapper(&m_config)
, m_busybee(&m_busybee_mapper, 0)
, m_config()
, m_config_id(-1)
, m_config_status()
, m_config_state(0)
, m_config_data(NULL)
, m_config_data_sz(0)
, m_next_client_id(1)
, m_next_server_nonce(1)
, m_flagfd()
, m_pending_ops()
, m_failed()
, m_yieldable()
Expand All @@ -97,15 +104,28 @@ client :: client(const char* coordinator, uint16_t port)
, m_macaroons_sz(0)
, m_convert_types(true)
{
if (!m_coord)
{
throw std::bad_alloc();
}

m_busybee.set_external_fd(replicant_client_poll_fd(m_coord));
m_busybee.set_external_fd(m_flagfd.poll_fd());
}

client :: client(const char* conn_str)
: m_coord(conn_str)
, m_busybee_mapper(m_coord.config())
: m_coord(replicant_client_create_conn_str(conn_str))
, m_busybee_mapper(&m_config)
, m_busybee(&m_busybee_mapper, 0)
, m_config()
, m_config_id(-1)
, m_config_status()
, m_config_state(0)
, m_config_data(NULL)
, m_config_data_sz(0)
, m_next_client_id(1)
, m_next_server_nonce(1)
, m_flagfd()
, m_pending_ops()
, m_failed()
, m_yieldable()
Expand All @@ -116,6 +136,12 @@ client :: client(const char* conn_str)
, m_macaroons_sz(0)
, m_convert_types(true)
{
if (!m_coord)
{
throw std::bad_alloc();
}

m_busybee.set_external_fd(replicant_client_poll_fd(m_coord));
m_busybee.set_external_fd(m_flagfd.poll_fd());
}

Expand All @@ -133,7 +159,7 @@ client :: get(const char* space, const char* _key, size_t _key_sz,
return -1;
}

const schema* sc = m_coord.config()->get_schema(space);
const schema* sc = m_config.get_schema(space);

if (!sc)
{
Expand Down Expand Up @@ -183,7 +209,7 @@ client :: get_partial(const char* space, const char* _key, size_t _key_sz,
return -1;
}

const schema* sc = m_coord.config()->get_schema(space);
const schema* sc = m_config.get_schema(space);

if (!sc)
{
Expand Down Expand Up @@ -255,7 +281,7 @@ client :: get_partial(const char* space, const char* _key, size_t _key_sz,
{ \
return -1; \
} \
const schema* sc = m_coord.config()->get_schema(space); \
const schema* sc = m_config.get_schema(space); \
if (!sc) \
{ \
ERROR(UNKNOWNSPACE) << "space \"" << e::strescape(space) << "\" does not exist"; \
Expand Down Expand Up @@ -378,7 +404,7 @@ client :: perform_funcall(const hyperdex_client_keyop_info* opinfo,
return -1;
}

const schema* sc = m_coord.config()->get_schema(space);
const schema* sc = m_config.get_schema(space);

if (!sc)
{
Expand Down Expand Up @@ -598,7 +624,7 @@ client :: loop(int timeout, hyperdex_client_returncode* status)

if (vfrom == psp.vsi &&
id == psp.si &&
m_coord.config()->get_server_id(vfrom) == id)
m_config.get_server_id(vfrom) == id)
{
if (!op->handle_message(this, id, vfrom, msg_type, msg, up, status, &m_last_error))
{
Expand All @@ -615,7 +641,7 @@ client :: loop(int timeout, hyperdex_client_returncode* status)
<< "; it came from "
<< vfrom << "/" << id
<< "; our config says that virtual_id should map to "
<< m_coord.config()->get_server_id(vfrom);
<< m_config.get_server_id(vfrom);
return -1;
}
}
Expand Down Expand Up @@ -716,7 +742,7 @@ client :: attribute_type(const char* space, const char* name,
return HYPERDATATYPE_GARBAGE;
}

const hyperdex::schema* sc = m_coord.config()->get_schema(space);
const hyperdex::schema* sc = m_config.get_schema(space);

if (!sc)
{
Expand Down Expand Up @@ -1010,7 +1036,7 @@ client :: prepare_searchop(const schema& sc,
}

std::stable_sort(checks->begin(), checks->end());
m_coord.config()->lookup_search(space, *checks, servers); // XXX search guaranteed empty vs. search encounters offline server
m_config.lookup_search(space, *checks, servers); // XXX search guaranteed empty vs. search encounters offline server

if (servers->empty())
{
Expand Down Expand Up @@ -1090,7 +1116,7 @@ client :: perform_aggregation(const std::vector<virtual_server_id>& servers,
for (size_t i = 0; i < servers.size(); ++i)
{
uint64_t nonce = m_next_server_nonce++;
pending_server_pair psp(m_coord.config()->get_server_id(servers[i]), servers[i], op);
pending_server_pair psp(m_config.get_server_id(servers[i]), servers[i], op);
std::auto_ptr<e::buffer> msg_copy(msg->copy());

if (!send(mt, psp.vsi, nonce, msg_copy, op, status))
Expand All @@ -1105,49 +1131,63 @@ client :: perform_aggregation(const std::vector<virtual_server_id>& servers,
bool
client :: maintain_coord_connection(hyperdex_client_returncode* status)
{
if (m_config_status != REPLICANT_SUCCESS)
{
replicant_client_kill(m_coord, m_config_id);
m_config_id = -1;
}

replicant_returncode rc;
uint64_t old_version = m_coord.config()->version();

if (!m_coord.ensure_configuration(&rc))
if (m_config_id < 0)
{
if (rc == REPLICANT_INTERRUPTED)
m_config_status = REPLICANT_SUCCESS;
m_config_id = replicant_client_cond_follow(m_coord, "hyperdex", "config",
&m_config_status, &m_config_state,
&m_config_data, &m_config_data_sz);
if (replicant_client_wait(m_coord, m_config_id, -1, &rc) < 0)
{
ERROR(INTERRUPTED) << "signal received";
ERROR(COORDFAIL) << "coordinator failure: " << replicant_client_error_message(m_coord);
return false;
}
else if (rc == REPLICANT_TIMEOUT)
}

if (replicant_client_loop(m_coord, 0, &rc) < 0)
{
if (rc == REPLICANT_INTERRUPTED)
{
ERROR(TIMEOUT) << "operation timed out";
ERROR(INTERRUPTED) << "interrupted by a signal";
return false;
}
else
else if (rc != REPLICANT_NONE_PENDING && rc != REPLICANT_TIMEOUT)
{
ERROR(COORDFAIL) << "coordinator failure: " << m_coord.error_message();
ERROR(COORDFAIL) << "coordinator failure: " << replicant_client_error_message(m_coord);
return false;
}

return false;
}

if (m_busybee.set_external_fd(m_coord.poll_fd()) != BUSYBEE_SUCCESS)
if (m_config.version() < m_config_state)
{
*status = HYPERDEX_CLIENT_POLLFAILED;
return false;
}
configuration new_config;
e::unpacker up(m_config_data, m_config_data_sz);
up = up >> new_config;

uint64_t new_version = m_coord.config()->version();
if (!up.error())
{
m_config = new_config;
}

if (old_version < new_version)
{
pending_map_t::iterator it = m_pending_ops.begin();

while (it != m_pending_ops.end())
{
// If the mapping that was true when the operation started is no
// longer true, we fail the operation with a RECONFIGURE.
if (m_coord.config()->get_server_id(it->second.vsi) != it->second.si)
if (m_config.get_server_id(it->second.vsi) != it->second.si)
{
m_failed.push_back(it->second);
pending_map_t::iterator tmp = it;
++it;
m_pending_ops.erase(tmp);
m_pending_ops.erase(it);
it = m_pending_ops.begin();
}
else
{
Expand All @@ -1169,10 +1209,10 @@ client :: send(network_msgtype mt,
{
const uint8_t type = static_cast<uint8_t>(mt);
const uint8_t flags = 0;
const uint64_t version = m_coord.config()->version();
const uint64_t version = m_config.version();
msg->pack_at(BUSYBEE_HEADER_SIZE)
<< type << flags << version << to << nonce;
server_id id = m_coord.config()->get_server_id(to);
server_id id = m_config.get_server_id(to);
m_busybee.set_timeout(-1);
busybee_returncode rc = m_busybee.send(id.get(), msg);

Expand Down Expand Up @@ -1207,7 +1247,7 @@ client :: send_keyop(const char* space,
e::intrusive_ptr<pending> op,
hyperdex_client_returncode* status)
{
virtual_server_id vsi = m_coord.config()->point_leader(space, key);
virtual_server_id vsi = m_config.point_leader(space, key);

if (vsi == virtual_server_id())
{
Expand Down Expand Up @@ -1261,7 +1301,7 @@ microtransaction* client::uxact_init(const char* space, hyperdex_client_returnco
return NULL;
}

const schema* sc = m_coord.config()->get_schema(space);
const schema* sc = m_config.get_schema(space);

if (!sc)
{
Expand Down
16 changes: 14 additions & 2 deletions client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@
// BusyBee
#include <busybee_st.h>

// Replicant
#include <replicant.h>

// HyperDex
#include <hyperdex/client.h>
#include "namespace.h"
#include "common/configuration.h"
#include "common/coordinator_link.h"
#include "common/mapper.h"
#include "client/keyop_info.h"
#include "client/pending.h"
Expand Down Expand Up @@ -249,17 +251,27 @@ class client
void handle_disruption(const server_id& si);

private:
coordinator_link m_coord;
replicant_client* m_coord;
mapper m_busybee_mapper;
busybee_st m_busybee;
// configuration
configuration m_config;
int64_t m_config_id;
replicant_returncode m_config_status;
uint64_t m_config_state;
char* m_config_data;
size_t m_config_data_sz;
// nonces
int64_t m_next_client_id;
uint64_t m_next_server_nonce;
e::flagfd m_flagfd;
// operations
pending_map_t m_pending_ops;
pending_queue_t m_failed;
std::list<e::intrusive_ptr<pending> > m_yieldable;
e::intrusive_ptr<pending> m_yielding;
e::intrusive_ptr<pending> m_yielded;
// misc
e::error m_last_error;
const char** m_macaroons;
size_t m_macaroons_sz;
Expand Down
4 changes: 2 additions & 2 deletions client/pending_get.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ pending_get :: handle_message(client* cl,
hyperdex_client_returncode op_status;
e::error op_error;

if (!value_to_attributes(*cl->m_coord.config(),
cl->m_coord.config()->get_region_id(vsi),
if (!value_to_attributes(cl->m_config,
cl->m_config.get_region_id(vsi),
NULL, 0, value, &op_status, &op_error,
m_attrs, m_attrs_sz, cl->m_convert_types))
{
Expand Down
4 changes: 2 additions & 2 deletions client/pending_get_partial.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ pending_get_partial :: handle_message(client* cl,
hyperdex_client_returncode op_status;
e::error op_error;

if (!value_to_attributes(*cl->m_coord.config(),
cl->m_coord.config()->get_region_id(vsi),
if (!value_to_attributes(cl->m_config,
cl->m_config.get_region_id(vsi),
value, &op_status, &op_error,
m_attrs, m_attrs_sz, cl->m_convert_types))
{
Expand Down
4 changes: 2 additions & 2 deletions client/pending_search.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ pending_search :: handle_message(client* cl,
hyperdex_client_returncode op_status;
e::error op_error;

if (!value_to_attributes(*cl->m_coord.config(),
cl->m_coord.config()->get_region_id(vsi),
if (!value_to_attributes(cl->m_config,
cl->m_config.get_region_id(vsi),
key.data(), key.size(), value,
&op_status, &op_error, m_attrs, m_attrs_sz, cl->m_convert_types))
{
Expand Down
4 changes: 2 additions & 2 deletions client/pending_sorted_search.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pending_sorted_search :: yield(hyperdex_client_returncode* status, e::error* err
const std::vector<e::slice>& value(m_results[m_results_idx].value);
++m_results_idx;

if (!value_to_attributes(*m_cl->m_coord.config(), m_ri, key.data(), key.size(),
if (!value_to_attributes(m_cl->m_config, m_ri, key.data(), key.size(),
value, &op_status, &op_error, m_attrs, m_attrs_sz, m_cl->m_convert_types))
{
set_status(op_status);
Expand All @@ -111,7 +111,7 @@ pending_sorted_search :: handle_sent_to(const server_id& si,
{
if (m_ri == region_id())
{
m_ri = m_cl->m_coord.config()->get_region_id(vsi);
m_ri = m_cl->m_config.get_region_id(vsi);
}

return pending_aggregation::handle_sent_to(si, vsi);
Expand Down

0 comments on commit 809a644

Please sign in to comment.