diff --git a/Makefile.am b/Makefile.am index 841ace1a..c6b611ab 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1,4 +1,4 @@ -## Copyright (c) 2012-2014, Cornell University +## Copyright (c) 2012-2015, Cornell University ## All rights reserved. ## ## Redistribution and use in source and binary forms, with or without @@ -656,6 +656,7 @@ noinst_HEADERS += bindings/java/org_hyperdex_client_LengthGreaterEqual.h noinst_HEADERS += bindings/java/org_hyperdex_client_LengthLessEqual.h noinst_HEADERS += bindings/java/org_hyperdex_client_LessEqual.h noinst_HEADERS += bindings/java/org_hyperdex_client_LessThan.h +noinst_HEADERS += bindings/java/org_hyperdex_client_Microtransaction.h noinst_HEADERS += bindings/java/org_hyperdex_client_Range.h noinst_HEADERS += bindings/java/org_hyperdex_client_Regex.h @@ -1000,14 +1001,16 @@ endif # ENABLE_ADMIN EXTRA_DIST += $(shell_wrappers) EXTRA_DIST += test/doc.async-ops.py EXTRA_DIST += test/doc.atomic-ops.py +EXTRA_DIST += test/doc.authorization.py EXTRA_DIST += test/doc.data-types.py EXTRA_DIST += test/doc.documents.py -EXTRA_DIST += test/doc.authorization.py EXTRA_DIST += test/doc-extract.py +EXTRA_DIST += test/doc.mongo.py EXTRA_DIST += test/doc.quick-start.py EXTRA_DIST += test/doctest-runner.py EXTRA_DIST += test/env.sh EXTRA_DIST += test/runner.py +EXTRA_DIST += test/python/Admin.py EXTRA_DIST += test/python/Basic.py EXTRA_DIST += test/python/BasicSearch.py EXTRA_DIST += test/python/CondPut.py @@ -1033,8 +1036,8 @@ EXTRA_DIST += test/python/DataTypeString.py EXTRA_DIST += test/python/GroupAtomic.py EXTRA_DIST += test/python/HyperMongo.py EXTRA_DIST += test/python/LengthString.py -EXTRA_DIST += test/python/MultiAttribute.py EXTRA_DIST += test/python/Microtransactions.py +EXTRA_DIST += test/python/MultiAttribute.py EXTRA_DIST += test/python/RangeSearchInt.py EXTRA_DIST += test/python/RangeSearchString.py EXTRA_DIST += test/python/RegexSearch.py diff --git a/admin/admin.cc b/admin/admin.cc index 7c61e2f2..cbddf0ee 100644 --- a/admin/admin.cc +++ b/admin/admin.cc @@ -139,7 +139,7 @@ admin :: read_only(int ro, hyperdex_admin_returncode* status) } else { - interpret_rpc_request_failure(op->repl_status, status); + interpret_replicant_returncode(op->repl_status, status, &m_last_error); return -1; } } @@ -147,14 +147,6 @@ admin :: read_only(int ro, hyperdex_admin_returncode* status) int64_t admin :: wait_until_stable(enum hyperdex_admin_returncode* status) { - replicant_returncode rc; - - if (!m_coord.force_configuration_fetch(&rc)) - { - interpret_rpc_request_failure(rc, status); - return -1; - } - if (!maintain_coord_connection(status)) { return -1; @@ -172,7 +164,7 @@ admin :: wait_until_stable(enum hyperdex_admin_returncode* status) } else { - interpret_rpc_request_failure(op->repl_status, status); + interpret_replicant_returncode(op->repl_status, status, &m_last_error); return -1; } } @@ -204,7 +196,7 @@ admin :: fault_tolerance(const char* space, uint64_t ft, } else { - interpret_rpc_request_failure(op->repl_status, status); + interpret_replicant_returncode(op->repl_status, status, &m_last_error); return -1; } } @@ -277,7 +269,7 @@ admin :: add_space(const char* description, } else { - interpret_rpc_request_failure(op->repl_status, status); + interpret_replicant_returncode(op->repl_status, status, &m_last_error); return -1; } } @@ -304,7 +296,7 @@ admin :: rm_space(const char* name, } else { - interpret_rpc_request_failure(op->repl_status, status); + interpret_replicant_returncode(op->repl_status, status, &m_last_error); return -1; } } @@ -338,7 +330,7 @@ admin :: mv_space(const char* source, const char* target, } else { - interpret_rpc_request_failure(op->repl_status, status); + interpret_replicant_returncode(op->repl_status, status, &m_last_error); return -1; } } @@ -372,7 +364,7 @@ admin :: add_index(const char* space, const char* attr, } else { - interpret_rpc_request_failure(op->repl_status, status); + interpret_replicant_returncode(op->repl_status, status, &m_last_error); return -1; } } @@ -401,7 +393,7 @@ admin :: rm_index(uint64_t idxid, } else { - interpret_rpc_request_failure(op->repl_status, status); + interpret_replicant_returncode(op->repl_status, status, &m_last_error); return -1; } } @@ -466,7 +458,7 @@ admin :: server_register(uint64_t token, const char* address, ++m_next_admin_id; e::intrusive_ptr op = new coord_rpc_generic(id, status, "register server"); std::auto_ptr msg(e::buffer::create(sizeof(uint64_t) + pack_size(loc))); - *msg << sid << loc; + msg->pack() << sid << loc; int64_t cid = m_coord.rpc("server_register", reinterpret_cast(msg->data()), msg->size(), &op->repl_status, &op->repl_output, &op->repl_output_sz); @@ -477,7 +469,7 @@ admin :: server_register(uint64_t token, const char* address, } else { - interpret_rpc_request_failure(op->repl_status, status); + interpret_replicant_returncode(op->repl_status, status, &m_last_error); return -1; } } @@ -505,7 +497,7 @@ admin :: server_online(uint64_t token, enum hyperdex_admin_returncode* status) } else { - interpret_rpc_request_failure(op->repl_status, status); + interpret_replicant_returncode(op->repl_status, status, &m_last_error); return -1; } } @@ -533,7 +525,7 @@ admin :: server_offline(uint64_t token, enum hyperdex_admin_returncode* status) } else { - interpret_rpc_request_failure(op->repl_status, status); + interpret_replicant_returncode(op->repl_status, status, &m_last_error); return -1; } } @@ -561,7 +553,7 @@ admin :: server_forget(uint64_t token, enum hyperdex_admin_returncode* status) } else { - interpret_rpc_request_failure(op->repl_status, status); + interpret_replicant_returncode(op->repl_status, status, &m_last_error); return -1; } } @@ -589,7 +581,7 @@ admin :: server_kill(uint64_t token, enum hyperdex_admin_returncode* status) } else { - interpret_rpc_request_failure(op->repl_status, status); + interpret_replicant_returncode(op->repl_status, status, &m_last_error); return -1; } } @@ -636,7 +628,7 @@ admin :: coord_backup(const char* path, } else { - interpret_rpc_request_failure(op->repl_status, status); + interpret_replicant_returncode(op->repl_status, status, &m_last_error); return -1; } } @@ -813,7 +805,7 @@ admin :: loop(int timeout, hyperdex_admin_returncode* status) if (lid < 0 && lrc != REPLICANT_TIMEOUT) { - interpret_rpc_loop_failure(lrc, status); + interpret_replicant_returncode(lrc, status, &m_last_error); return -1; } @@ -947,148 +939,66 @@ admin :: set_error_message(const char* msg) m_last_error.set_msg() << msg; } +#define INTERPRET_ERROR(CODE) \ + *status = HYPERDEX_ADMIN_ ## CODE; \ + err->set_loc(__FILE__, __LINE__); \ + err->set_msg() + void -admin :: interpret_rpc_request_failure(replicant_returncode rstatus, - hyperdex_admin_returncode* status) +admin :: interpret_replicant_returncode(replicant_returncode rstatus, + hyperdex_admin_returncode* status, + e::error* err) { - e::error err; - switch (rstatus) { - case REPLICANT_TIMEOUT: - ERROR(TIMEOUT) << "operation timed out"; - break; - case REPLICANT_INTERRUPTED: - ERROR(INTERRUPTED) << "signal received"; + case REPLICANT_SUCCESS: + *status = HYPERDEX_ADMIN_SUCCESS; break; - case REPLICANT_NAME_TOO_LONG: case REPLICANT_OBJ_NOT_FOUND: + case REPLICANT_OBJ_EXIST: case REPLICANT_FUNC_NOT_FOUND: - case REPLICANT_CLUSTER_JUMP: - err = m_coord.error(); - ERROR(COORDFAIL) << "persistent coordinator error: " << err.msg() << " @ " << err.loc(); - break; - case REPLICANT_SERVER_ERROR: - case REPLICANT_NEED_BOOTSTRAP: - case REPLICANT_BACKOFF: - err = m_coord.error(); - ERROR(COORDFAIL) << "transient coordinator error: " << err.msg() << " @ " << err.loc(); - break; - case REPLICANT_SUCCESS: - case REPLICANT_NONE_PENDING: - case REPLICANT_INTERNAL_ERROR: - case REPLICANT_MISBEHAVING_SERVER: - case REPLICANT_BAD_LIBRARY: - case REPLICANT_COND_DESTROYED: case REPLICANT_COND_NOT_FOUND: - case REPLICANT_OBJ_EXIST: - case REPLICANT_CTOR_FAILED: - case REPLICANT_GARBAGE: - default: - err = m_coord.error(); - ERROR(COORDFAIL) << "internal library error: " << err.msg() << " @ " << err.loc(); + case REPLICANT_COND_DESTROYED: + INTERPRET_ERROR(COORDFAIL) << "persistent coordinator error: " + << m_coord.error_message() + << " @ " + << m_coord.error_location(); break; - } -} - -void -admin :: interpret_rpc_loop_failure(replicant_returncode rstatus, - hyperdex_admin_returncode* status) -{ - e::error err; - - switch (rstatus) - { - case REPLICANT_TIMEOUT: - ERROR(TIMEOUT) << "operation timed out"; + case REPLICANT_MAYBE: + INTERPRET_ERROR(COORDFAIL) << "transient coordinator error: " + << "operation may or may not have completed"; break; - case REPLICANT_INTERRUPTED: - ERROR(INTERRUPTED) << "signal received"; + case REPLICANT_SEE_ERRNO: + INTERPRET_ERROR(COORDFAIL) << "transient coordinator error: " + << e::error::strerror(errno); break; - case REPLICANT_NAME_TOO_LONG: - case REPLICANT_OBJ_NOT_FOUND: - case REPLICANT_FUNC_NOT_FOUND: case REPLICANT_CLUSTER_JUMP: - err = m_coord.error(); - ERROR(COORDFAIL) << "persistent coordinator error: " << err.msg() << " @ " << err.loc(); - break; case REPLICANT_SERVER_ERROR: - case REPLICANT_NEED_BOOTSTRAP: - case REPLICANT_BACKOFF: - err = m_coord.error(); - ERROR(COORDFAIL) << "transient coordinator error: " << err.msg() << " @ " << err.loc(); - break; - case REPLICANT_NONE_PENDING: - ERROR(NONEPENDING) << "no outstanding operations to process"; - break; - case REPLICANT_SUCCESS: - case REPLICANT_INTERNAL_ERROR: - case REPLICANT_MISBEHAVING_SERVER: - case REPLICANT_BAD_LIBRARY: - case REPLICANT_COND_DESTROYED: - case REPLICANT_COND_NOT_FOUND: - case REPLICANT_OBJ_EXIST: - case REPLICANT_CTOR_FAILED: - case REPLICANT_GARBAGE: - default: - err = m_coord.error(); - ERROR(COORDFAIL) << "internal library error: " << err.msg() << " @ " << err.loc(); - break; - } -} - -void -admin :: interpret_rpc_response_failure(replicant_returncode rstatus, - hyperdex_admin_returncode* status, - e::error* ret_err) -{ - e::error err; - e::error tmp = m_last_error; - m_last_error = e::error(); - - switch (rstatus) - { - case REPLICANT_SUCCESS: - *status = HYPERDEX_ADMIN_SUCCESS; + case REPLICANT_COMM_FAILED: + INTERPRET_ERROR(COORDFAIL) << "transient coordinator error: " + << m_coord.error_message() + << " @ " + << m_coord.error_location(); break; case REPLICANT_TIMEOUT: - ERROR(TIMEOUT) << "operation timed out"; + INTERPRET_ERROR(TIMEOUT) << "operation timed out"; break; case REPLICANT_INTERRUPTED: - ERROR(INTERRUPTED) << "signal received"; - break; - case REPLICANT_NAME_TOO_LONG: - case REPLICANT_OBJ_NOT_FOUND: - case REPLICANT_FUNC_NOT_FOUND: - case REPLICANT_CLUSTER_JUMP: - err = m_coord.error(); - ERROR(COORDFAIL) << "persistent coordinator error: " << err.msg() << " @ " << err.loc(); - break; - case REPLICANT_SERVER_ERROR: - case REPLICANT_NEED_BOOTSTRAP: - case REPLICANT_BACKOFF: - err = m_coord.error(); - ERROR(COORDFAIL) << "transient coordinator error: " << err.msg() << " @ " << err.loc(); + INTERPRET_ERROR(INTERRUPTED) << "signal received"; break; case REPLICANT_NONE_PENDING: - ERROR(NONEPENDING) << "no outstanding operations to process"; + INTERPRET_ERROR(NONEPENDING) << "no operations pending"; break; - case REPLICANT_INTERNAL_ERROR: - case REPLICANT_MISBEHAVING_SERVER: - case REPLICANT_BAD_LIBRARY: - case REPLICANT_COND_DESTROYED: - case REPLICANT_COND_NOT_FOUND: - case REPLICANT_OBJ_EXIST: - case REPLICANT_CTOR_FAILED: + case REPLICANT_INTERNAL: + case REPLICANT_EXCEPTION: case REPLICANT_GARBAGE: default: - err = m_coord.error(); - ERROR(COORDFAIL) << "internal library error: " << err.msg() << " @ " << err.loc(); + INTERPRET_ERROR(INTERNAL) << "internal library error: " + << m_coord.error_message() + << " @ " + << m_coord.error_location(); break; } - - *ret_err = m_last_error; - m_last_error = tmp; } bool @@ -1099,41 +1009,7 @@ admin :: maintain_coord_connection(hyperdex_admin_returncode* status) if (!m_coord.ensure_configuration(&rc)) { - switch (rc) - { - case REPLICANT_TIMEOUT: - ERROR(TIMEOUT) << "operation timed out"; - return false; - case REPLICANT_INTERRUPTED: - ERROR(INTERRUPTED) << "signal received"; - return false; - case REPLICANT_NAME_TOO_LONG: - case REPLICANT_OBJ_NOT_FOUND: - case REPLICANT_FUNC_NOT_FOUND: - case REPLICANT_CLUSTER_JUMP: - err = m_coord.error(); - ERROR(COORDFAIL) << "persistent coordinator error: " << err.msg() << " @ " << err.loc(); - break; - case REPLICANT_MISBEHAVING_SERVER: - case REPLICANT_SERVER_ERROR: - case REPLICANT_NEED_BOOTSTRAP: - case REPLICANT_BACKOFF: - err = m_coord.error(); - ERROR(COORDFAIL) << "transient coordinator error: " << err.msg() << " @ " << err.loc(); - return false; - case REPLICANT_SUCCESS: - case REPLICANT_OBJ_EXIST: - case REPLICANT_COND_NOT_FOUND: - case REPLICANT_COND_DESTROYED: - case REPLICANT_BAD_LIBRARY: - case REPLICANT_INTERNAL_ERROR: - case REPLICANT_NONE_PENDING: - case REPLICANT_CTOR_FAILED: - case REPLICANT_GARBAGE: - default: - *status = HYPERDEX_ADMIN_INTERNAL; - return false; - } + interpret_replicant_returncode(rc, status, &m_last_error); } if (m_busybee.set_external_fd(m_coord.poll_fd()) != BUSYBEE_SUCCESS) @@ -1142,11 +1018,6 @@ admin :: maintain_coord_connection(hyperdex_admin_returncode* status) return false; } - if (m_coord.queued_responses() > 0) - { - m_handle_coord_ops = true; - } - return true; } diff --git a/admin/admin.h b/admin/admin.h index e11822b9..7cd24e4c 100644 --- a/admin/admin.h +++ b/admin/admin.h @@ -109,12 +109,7 @@ class admin const char* error_message(); const char* error_location(); void set_error_message(const char* msg); - // translate returncodes - void interpret_rpc_request_failure(replicant_returncode rstatus, - hyperdex_admin_returncode* status); - void interpret_rpc_loop_failure(replicant_returncode rstatus, - hyperdex_admin_returncode* status); - void interpret_rpc_response_failure(replicant_returncode rstatus, + void interpret_replicant_returncode(replicant_returncode rstatus, hyperdex_admin_returncode* status, e::error* err); diff --git a/admin/coord_rpc.cc b/admin/coord_rpc.cc index a61b1f96..131c2ccc 100644 --- a/admin/coord_rpc.cc +++ b/admin/coord_rpc.cc @@ -43,6 +43,6 @@ coord_rpc :: ~coord_rpc() throw () { if (repl_output) { - replicant_destroy_output(repl_output, repl_output_sz); + free(repl_output); } } diff --git a/admin/coord_rpc.h b/admin/coord_rpc.h index b6e09e00..f9210225 100644 --- a/admin/coord_rpc.h +++ b/admin/coord_rpc.h @@ -63,7 +63,7 @@ class coord_rpc : public yieldable public: replicant_returncode repl_status; - const char* repl_output; + char* repl_output; size_t repl_output_sz; protected: diff --git a/admin/coord_rpc_backup.cc b/admin/coord_rpc_backup.cc index 65227fba..18085700 100644 --- a/admin/coord_rpc_backup.cc +++ b/admin/coord_rpc_backup.cc @@ -72,7 +72,7 @@ coord_rpc_backup :: handle_response(admin* adm, *status = HYPERDEX_ADMIN_SUCCESS; hyperdex_admin_returncode resp_status; e::error err; - adm->interpret_rpc_response_failure(repl_status, &resp_status, &err); + adm->interpret_replicant_returncode(repl_status, &resp_status, &err); set_status(resp_status); set_error(err); diff --git a/admin/coord_rpc_generic.cc b/admin/coord_rpc_generic.cc index fca37027..4d7bfc37 100644 --- a/admin/coord_rpc_generic.cc +++ b/admin/coord_rpc_generic.cc @@ -72,7 +72,7 @@ coord_rpc_generic :: handle_response(admin* adm, *status = HYPERDEX_ADMIN_SUCCESS; hyperdex_admin_returncode resp_status; e::error err; - adm->interpret_rpc_response_failure(repl_status, &resp_status, &err); + adm->interpret_replicant_returncode(repl_status, &resp_status, &err); set_status(resp_status); set_error(err); diff --git a/admin/pending_perf_counters.cc b/admin/pending_perf_counters.cc index 68ea8ded..5c0222ee 100644 --- a/admin/pending_perf_counters.cc +++ b/admin/pending_perf_counters.cc @@ -208,7 +208,7 @@ pending_perf_counters :: handle_message(admin*, return true; } - e::slice rem = up.as_slice(); + e::slice rem = up.remainder(); char* ptr = const_cast(reinterpret_cast(rem.data())); char* end = ptr + rem.size() - 1; uint64_t max_time = 0; diff --git a/admin/raw_backup.cc b/admin/raw_backup.cc index 1ef32aee..f2c41d0e 100644 --- a/admin/raw_backup.cc +++ b/admin/raw_backup.cc @@ -73,7 +73,7 @@ hyperdex_admin_raw_backup(const char* host, uint16_t port, + sizeof(uint64_t) /*nonce*/ + pack_size(name_s); std::auto_ptr msg(e::buffer::create(sz)); - e::buffer::packer pa = msg->pack_at(BUSYBEE_HEADER_SIZE); + e::packer pa = msg->pack_at(BUSYBEE_HEADER_SIZE); pa = pa << type << flags << version << to << nonce << name_s; bbs.set_timeout(-1); diff --git a/bindings/c.py b/bindings/c.py index 5c5b1044..b76f886b 100644 --- a/bindings/c.py +++ b/bindings/c.py @@ -487,20 +487,15 @@ def generate_admin_doc(): hyperdex_client_returncode* status = &_status #define SIGNAL_PROTECT_ERR(X) \\ - do \\ + sigset_t old_sigs; \\ + sigset_t all_sigs; \\ + sigfillset(&all_sigs); \\ + if (pthread_sigmask(SIG_BLOCK, &all_sigs, &old_sigs) < 0) \\ { \\ - sigset_t old_sigs; \\ - sigset_t all_sigs; \\ - sigfillset(&all_sigs); \\ - if (pthread_sigmask(SIG_BLOCK, &all_sigs, &old_sigs) < 0) \\ - { \\ - *status = HYPERDEX_CLIENT_INTERNAL; \\ - return (X); \\ - } \\ - e::guard g = e::makeguard(pthread_sigmask, SIG_SETMASK, (sigset_t*)&old_sigs, (sigset_t*)NULL); \\ - g.use_variable(); \\ + *status = HYPERDEX_CLIENT_INTERNAL; \\ + return (X); \\ } \\ - while (0) + e::guard g = e::makeguard(pthread_sigmask, SIG_SETMASK, (sigset_t*)&old_sigs, (sigset_t*)NULL) #define SIGNAL_PROTECT SIGNAL_PROTECT_ERR(-1); inline void return_void() {} @@ -595,13 +590,6 @@ def generate_admin_doc(): delete reinterpret_cast(client); } -HYPERDEX_API void -hyperdex_client_set_type_conversion(hyperdex_client* _cl, bool enabled) -{ - hyperdex::client* cl = reinterpret_cast(_cl); - cl->set_type_conversion(enabled); -} - HYPERDEX_API const char* hyperdex_client_error_message(hyperdex_client* _cl) { diff --git a/client/c.cc b/client/c.cc index 555091f5..11ace5f2 100644 --- a/client/c.cc +++ b/client/c.cc @@ -43,20 +43,15 @@ #define FAKE_STATUS hyperdex_client_returncode _status; hyperdex_client_returncode* status = &_status #define SIGNAL_PROTECT_ERR(X) \ - do \ + sigset_t old_sigs; \ + sigset_t all_sigs; \ + sigfillset(&all_sigs); \ + if (pthread_sigmask(SIG_BLOCK, &all_sigs, &old_sigs) < 0) \ { \ - sigset_t old_sigs; \ - sigset_t all_sigs; \ - sigfillset(&all_sigs); \ - if (pthread_sigmask(SIG_BLOCK, &all_sigs, &old_sigs) < 0) \ - { \ - *status = HYPERDEX_CLIENT_INTERNAL; \ - return (X); \ - } \ - e::guard g = e::makeguard(pthread_sigmask, SIG_SETMASK, (sigset_t*)&old_sigs, (sigset_t*)NULL); \ - g.use_variable(); \ + *status = HYPERDEX_CLIENT_INTERNAL; \ + return (X); \ } \ - while (0) + e::guard g = e::makeguard(pthread_sigmask, SIG_SETMASK, (sigset_t*)&old_sigs, (sigset_t*)NULL) #define SIGNAL_PROTECT SIGNAL_PROTECT_ERR(-1); inline void return_void() {} diff --git a/client/client.cc b/client/client.cc index b60da47a..653fc649 100644 --- a/client/client.cc +++ b/client/client.cc @@ -151,7 +151,7 @@ client :: get(const char* space, const char* _key, size_t _key_sz, e::intrusive_ptr op; op = new pending_get(m_next_client_id++, status, attrs, attrs_sz); - size_t sz = HYPERDEX_CLIENT_HEADER_SIZE_REQ + sizeof(uint32_t) + key.size(); + size_t sz = HYPERDEX_CLIENT_HEADER_SIZE_REQ + pack_size(key); auth_wallet aw(m_macaroons, m_macaroons_sz); if (m_macaroons_sz) @@ -160,7 +160,7 @@ client :: get(const char* space, const char* _key, size_t _key_sz, } std::auto_ptr msg(e::buffer::create(sz)); - e::buffer::packer pa = msg->pack_at(HYPERDEX_CLIENT_HEADER_SIZE_REQ) << key; + e::packer pa = msg->pack_at(HYPERDEX_CLIENT_HEADER_SIZE_REQ) << key; if (m_macaroons_sz) { @@ -227,8 +227,8 @@ client :: get_partial(const char* space, const char* _key, size_t _key_sz, e::intrusive_ptr op; op = new pending_get_partial(m_next_client_id++, status, attrs, attrs_sz); size_t sz = HYPERDEX_CLIENT_HEADER_SIZE_REQ - + sizeof(uint32_t) + key.size() - + sizeof(uint32_t) + attrnums.size() * sizeof(uint16_t); + + pack_size(key) + + sizeof(uint64_t) + attrnums.size() * sizeof(uint16_t); auth_wallet aw(m_macaroons, m_macaroons_sz); if (m_macaroons_sz) @@ -237,13 +237,8 @@ client :: get_partial(const char* space, const char* _key, size_t _key_sz, } std::auto_ptr msg(e::buffer::create(sz)); - e::buffer::packer pa = msg->pack_at(HYPERDEX_CLIENT_HEADER_SIZE_REQ); - pa = pa << key << uint32_t(attrnums.size()); - - for (size_t i = 0; i < attrnums.size(); ++i) - { - pa = pa << attrnums[i]; - } + e::packer pa = msg->pack_at(HYPERDEX_CLIENT_HEADER_SIZE_REQ); + pa = pa << key << attrnums; if (m_macaroons_sz) { @@ -464,9 +459,9 @@ client :: perform_group_funcall(const hyperdex_client_keyop_info* opinfo, + pack_size(checks) + inner_msg->size(); std::auto_ptr msg(e::buffer::create(sz)); - e::buffer::packer pa = msg->pack_at(HYPERDEX_CLIENT_HEADER_SIZE_REQ); - pa = pa << checks; - pa.copy(inner_msg->as_slice()); + e::packer pa = msg->pack_at(HYPERDEX_CLIENT_HEADER_SIZE_REQ); + e::slice ims = inner_msg->as_slice(); + pa = pa << checks << e::pack_memmove(ims.data(), ims.size()); return perform_aggregation(servers, op, REQ_GROUP_ATOMIC, msg, status); } @@ -1098,7 +1093,7 @@ client :: maintain_coord_connection(hyperdex_client_returncode* status) } else { - ERROR(COORDFAIL) << "coordinator failure: " << m_coord.error().msg(); + ERROR(COORDFAIL) << "coordinator failure: " << m_coord.error_message(); } return false; @@ -1410,9 +1405,9 @@ client :: uxact_group_commit(microtransaction *transaction, + pack_size(checks) + inner_msg->size(); std::auto_ptr msg(e::buffer::create(sz)); - e::buffer::packer pa = msg->pack_at(HYPERDEX_CLIENT_HEADER_SIZE_REQ); - pa = pa << checks; - pa.copy(inner_msg->as_slice()); + e::packer pa = msg->pack_at(HYPERDEX_CLIENT_HEADER_SIZE_REQ); + e::slice ims = inner_msg->as_slice(); + pa = pa << checks << e::pack_memmove(ims.data(), ims.size()); int64_t result = perform_aggregation(servers, op, REQ_GROUP_ATOMIC, msg, status); delete transaction; diff --git a/client/pending_search_describe.cc b/client/pending_search_describe.cc index 96a15c2d..2315e529 100644 --- a/client/pending_search_describe.cc +++ b/client/pending_search_describe.cc @@ -110,7 +110,7 @@ pending_search_describe :: handle_message(client* cl, return true; } - e::slice text = up.as_slice(); + e::slice text = up.remainder(); add_text(vsi, text); return true; } diff --git a/common/attribute_check.cc b/common/attribute_check.cc index 53eee57c..d5b60db7 100644 --- a/common/attribute_check.cc +++ b/common/attribute_check.cc @@ -241,3 +241,31 @@ hyperdex :: operator < (const attribute_check& lhs, const attribute_check& rhs) { return lhs.attr < rhs.attr; } + +e::packer +hyperdex :: operator << (e::packer lhs, const attribute_check& rhs) +{ + return lhs << rhs.attr + << rhs.value + << rhs.datatype + << rhs.predicate; +} + +e::unpacker +hyperdex :: operator >> (e::unpacker lhs, attribute_check& rhs) +{ + return lhs >> rhs.attr + >> rhs.value + >> rhs.datatype + >> rhs.predicate; +} + +size_t +hyperdex :: pack_size(const attribute_check& rhs) +{ + return sizeof(uint16_t) + + sizeof(uint32_t) + + rhs.value.size() + + pack_size(rhs.datatype) + + pack_size(rhs.predicate); +} diff --git a/common/attribute_check.h b/common/attribute_check.h index dbef3c64..7c231d31 100644 --- a/common/attribute_check.h +++ b/common/attribute_check.h @@ -30,6 +30,7 @@ // e #include +#include // HyperDex #include "namespace.h" @@ -75,6 +76,13 @@ bool operator < (const attribute_check& lhs, const attribute_check& rhs); +e::packer +operator << (e::packer lhs, const attribute_check& rhs); +e::unpacker +operator >> (e::unpacker lhs, attribute_check& rhs); +size_t +pack_size(const attribute_check& rhs); + END_HYPERDEX_NAMESPACE #endif // hyperdex_common_attribute_check_h_ diff --git a/common/auth_wallet.cc b/common/auth_wallet.cc index 8910e77e..3b20a18c 100644 --- a/common/auth_wallet.cc +++ b/common/auth_wallet.cc @@ -87,8 +87,8 @@ build_macaroons(const std::vector& in, } } -e::buffer::packer -hyperdex :: operator << (e::buffer::packer lhs, const auth_wallet& rhs) +e::packer +hyperdex :: operator << (e::packer lhs, const auth_wallet& rhs) { std::vector macaroons; build_macaroons(rhs.m_macaroons, &macaroons); diff --git a/common/auth_wallet.h b/common/auth_wallet.h index f1f2a221..3621c695 100644 --- a/common/auth_wallet.h +++ b/common/auth_wallet.h @@ -29,8 +29,7 @@ #define hyperdex_common_auth_wallet_h_ // e -#include -#include +#include // macaroons #include @@ -52,7 +51,7 @@ class auth_wallet bool get_macaroons(std::vector* macaroons); private: - friend e::buffer::packer operator << (e::buffer::packer lhs, const auth_wallet& rhs); + friend e::packer operator << (e::packer lhs, const auth_wallet& rhs); friend e::unpacker operator >> (e::unpacker lhs, auth_wallet& rhs); friend size_t pack_size(const auth_wallet& aw); @@ -65,8 +64,8 @@ class auth_wallet std::vector m_macaroons; }; -e::buffer::packer -operator << (e::buffer::packer lhs, const auth_wallet& rhs); +e::packer +operator << (e::packer lhs, const auth_wallet& rhs); e::unpacker operator >> (e::unpacker lhs, auth_wallet& rhs); size_t diff --git a/common/configuration.cc b/common/configuration.cc index 669d5213..0120ff8f 100644 --- a/common/configuration.cc +++ b/common/configuration.cc @@ -1025,7 +1025,7 @@ configuration :: dump() const if (idx.type == index::DOCUMENT) { - out << " " << std::string(idx.extra.c_str(), idx.extra.size()); + out << " " << idx.extra.str(); } out << "\n"; @@ -1180,9 +1180,9 @@ configuration :: refill_cache() e::unpacker hyperdex :: operator >> (e::unpacker up, configuration& c) { - uint64_t num_servers; - uint64_t num_spaces; - uint64_t num_transfers; + uint64_t num_servers = 0; + uint64_t num_spaces = 0; + uint64_t num_transfers = 0; up = up >> c.m_cluster >> c.m_version >> c.m_flags >> num_servers >> num_spaces >> num_transfers; diff --git a/common/configuration.h b/common/configuration.h index 2528242d..13c3d467 100644 --- a/common/configuration.h +++ b/common/configuration.h @@ -143,7 +143,7 @@ class configuration private: void refill_cache(); friend size_t pack_size(const configuration&); - friend e::buffer::packer operator << (e::buffer::packer, const configuration& s); + friend e::packer operator << (e::packer, const configuration& s); friend e::unpacker operator >> (e::unpacker, configuration& s); private: @@ -172,8 +172,8 @@ class configuration std::vector m_transfers; }; -e::buffer::packer -operator << (e::buffer::packer, const configuration& c); +e::packer +operator << (e::packer, const configuration& c); e::unpacker operator >> (e::unpacker, configuration& c); size_t diff --git a/common/coordinator_link.cc b/common/coordinator_link.cc index b1a3bde1..f0a14753 100644 --- a/common/coordinator_link.cc +++ b/common/coordinator_link.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2013, Cornell University +// Copyright (c) 2013-2015, Cornell University // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -31,265 +31,138 @@ using hyperdex::coordinator_link; coordinator_link :: coordinator_link(const char* coordinator, uint16_t port) - : m_repl(coordinator, port) + : m_repl(replicant_client_create(coordinator, port)) , m_config() - , m_state(NOTHING) , m_id(-1) , m_status(REPLICANT_GARBAGE) , m_output(NULL) , m_output_sz(0) - , m_pending_ids() { + if (!m_repl) + { + throw std::bad_alloc(); + } } coordinator_link :: coordinator_link(const char* conn_str) - : m_repl(conn_str) + : m_repl(replicant_client_create_conn_str(conn_str)) , m_config() - , m_state(NOTHING) , m_id(-1) , m_status(REPLICANT_GARBAGE) , m_output(NULL) , m_output_sz(0) - , m_pending_ids() { + if (!m_repl) + { + throw std::bad_alloc(); + } } coordinator_link :: ~coordinator_link() throw () { reset(); + replicant_client_destroy(m_repl); } bool -coordinator_link :: force_configuration_fetch(replicant_returncode* status) +coordinator_link :: ensure_configuration(replicant_returncode* status) { - if (m_id >= 0) + if (!prime_state_machine(status)) { - m_repl.kill(m_id); + return false; } - if (m_output) - { - replicant_destroy_output(m_output, m_output_sz); - } + assert(m_id >= 0); + int timeout = m_config.cluster() == 0 ? -1 : 0; + int64_t lid = replicant_client_wait(m_repl, m_id, timeout, status); - m_id = -1; - m_state = WAITING_ON_BROADCAST; - m_status = REPLICANT_SUCCESS; - m_output = NULL; - m_output_sz = 0; - return begin_fetching_config(status); -} - -bool -coordinator_link :: ensure_configuration(replicant_returncode* status) -{ - while (true) + if (lid < 0) { - if (!prime_state_machine(status)) - { - return false; - } - - assert(m_id >= 0); - assert(m_state > NOTHING); - int timeout = 0; - - switch (m_state) - { - case WAITING_ON_BROADCAST: - timeout = 0; - break; - case FETCHING_CONFIG: - timeout = -1; - break; - case NOTHING: - default: - abort(); - } - - int64_t lid = m_repl.loop(timeout, status); - - if (lid < 0 && *status == REPLICANT_TIMEOUT && timeout >= 0) - { - return true; - } - else if (lid < 0) - { - return false; - } - - if (lid == m_id) - { - bool failed = false; - - if (handle_internal_callback(status, &failed)) - { - return true; - } - - if (failed) - { - return false; - } - } - else - { - m_pending_ids.push_back(lid); - } + return *status == REPLICANT_TIMEOUT && timeout == 0; } + + assert(lid == m_id); + return process_new_configuration(status); } int64_t coordinator_link :: rpc(const char* func, const char* data, size_t data_sz, replicant_returncode* status, - const char** output, size_t* output_sz) + char** output, size_t* output_sz) { - return m_repl.send("hyperdex", func, data, data_sz, status, output, output_sz); + return replicant_client_call(m_repl, "hyperdex", func, data, data_sz, + REPLICANT_CALL_ROBUST, status, output, output_sz); } int64_t coordinator_link :: backup(replicant_returncode* status, - const char** output, size_t* output_sz) + char** output, size_t* output_sz) { - return m_repl.backup_object("hyperdex", status, output, output_sz); + return replicant_client_backup_object(m_repl, "hyperdex", status, output, output_sz); } int64_t coordinator_link :: wait(const char* cond, uint64_t state, replicant_returncode* status) { - return m_repl.wait("hyperdex", cond, state, status); + return replicant_client_cond_wait(m_repl, "hyperdex", cond, state, status, NULL, NULL); } int64_t coordinator_link :: loop(int timeout, replicant_returncode* status) { - if (!m_pending_ids.empty()) + if (!prime_state_machine(status)) { - int64_t ret = m_pending_ids.front(); - m_pending_ids.pop_front(); - return ret; + return -1; } - while (true) - { - if (!prime_state_machine(status)) - { - return -1; - } - - int64_t lid = m_repl.loop(timeout, status); - - if (lid == m_id) - { - bool failed = false; + int64_t lid = replicant_client_loop(m_repl, timeout, status); - if (handle_internal_callback(status, &failed)) - { - return INT64_MAX; - } - - if (failed) - { - return -1; - } - } - else - { - return lid; - } - } -} - -bool -coordinator_link :: prime_state_machine(replicant_returncode* status) -{ - if (m_id >= 0) + if (lid == m_id) { - return true; + return process_new_configuration(status) ? INT64_MAX : -1; } - - if (m_state == NOTHING) + else { - if (m_config.version() == 0) - { - m_state = WAITING_ON_BROADCAST; - m_status = REPLICANT_SUCCESS; - return begin_fetching_config(status); - } - else - { - return begin_waiting_on_broadcast(status); - } + return lid; } - - return true; } -bool -coordinator_link :: handle_internal_callback(replicant_returncode* status, bool* failed) +int64_t +coordinator_link :: wait(int64_t id, int timeout, replicant_returncode* status) { - m_id = -1; - - if (m_status != REPLICANT_SUCCESS) + if (!prime_state_machine(status)) { - *status = m_status; - *failed = true; - reset(); - return false; + return -1; } - assert(m_state == WAITING_ON_BROADCAST || - m_state == FETCHING_CONFIG); - - if (m_state == WAITING_ON_BROADCAST) + if (id == INT64_MAX) { - if (!begin_fetching_config(status)) - { - *failed = true; - return false; - } - - *failed = false; - return false; + id = m_id; } - e::unpacker up(m_output, m_output_sz); - configuration new_config; - up = up >> new_config; - reset(); + int64_t lid = replicant_client_wait(m_repl, id, timeout, status); - if (up.error()) + if (lid == m_id) { - *status = REPLICANT_MISBEHAVING_SERVER; - *failed = true; - return false; + return process_new_configuration(status) ? INT64_MAX : -1; } - - if (m_config.cluster() != 0 && - m_config.cluster() != new_config.cluster()) + else { - *status = REPLICANT_MISBEHAVING_SERVER; - *failed = true; - return false; + return lid; } - - m_config = new_config; - return true; } bool -coordinator_link :: begin_waiting_on_broadcast(replicant_returncode* status) +coordinator_link :: prime_state_machine(replicant_returncode* status) { - assert(m_id == -1); - assert(m_state == NOTHING); - assert(m_status == REPLICANT_GARBAGE); - assert(m_output == NULL); - assert(m_output_sz == 0); - m_id = m_repl.wait("hyperdex", "config", - m_config.version() + 1, &m_status); - m_state = WAITING_ON_BROADCAST; + if (m_id >= 0) + { + return true; + } + + m_id = replicant_client_cond_wait(m_repl, "hyperdex", "config", m_config.version() + 1, &m_status, &m_output, &m_output_sz); *status = m_status; if (m_id >= 0) @@ -304,27 +177,30 @@ coordinator_link :: begin_waiting_on_broadcast(replicant_returncode* status) } bool -coordinator_link :: begin_fetching_config(replicant_returncode* status) +coordinator_link :: process_new_configuration(replicant_returncode* status) { - assert(m_id == -1); - assert(m_state == WAITING_ON_BROADCAST); - assert(m_status == REPLICANT_SUCCESS); - assert(m_output == NULL); - assert(m_output_sz == 0); - m_id = m_repl.send("hyperdex", "config_get", "", 0, - &m_status, &m_output, &m_output_sz); - m_state = FETCHING_CONFIG; - *status = m_status; + m_id = -1; - if (m_id >= 0) + if (m_status != REPLICANT_SUCCESS) { - return true; + *status = m_status; + reset(); + return false; } - else + + e::unpacker up(m_output, m_output_sz); + configuration new_config; + up = up >> new_config; + reset(); + + if (up.error()) { - reset(); + *status = REPLICANT_SERVER_ERROR; return false; } + + m_config = new_config; + return true; } void @@ -332,11 +208,10 @@ coordinator_link :: reset() { if (m_output) { - replicant_destroy_output(m_output, m_output_sz); + free(m_output); } m_id = -1; - m_state = NOTHING; m_status = REPLICANT_GARBAGE; m_output = NULL; m_output_sz = 0; diff --git a/common/coordinator_link.h b/common/coordinator_link.h index 28e18dfb..11441a49 100644 --- a/common/coordinator_link.h +++ b/common/coordinator_link.h @@ -50,13 +50,6 @@ BEGIN_HYPERDEX_NAMESPACE class coordinator_link { - public: -#ifdef _MSC_VER - typedef fd_set* poll_fd_t; -#else - typedef int poll_fd_t; -#endif - public: coordinator_link(const char* coordinator, uint16_t port); coordinator_link(const char* conn_str); @@ -64,25 +57,22 @@ class coordinator_link public: const configuration* config() const { return &m_config; } - poll_fd_t poll_fd() { return m_repl.poll_fd(); } - bool force_configuration_fetch(replicant_returncode* status); + int poll_fd() { return replicant_client_poll_fd(m_repl); } // true if there's a configuration to use // false if there's an error to report - // - // blocks if there's progress to be made toward getting a config bool ensure_configuration(replicant_returncode* status); int64_t rpc(const char* func, const char* data, size_t data_sz, replicant_returncode* status, - const char** output, size_t* output_sz); + char** output, size_t* output_sz); int64_t backup(replicant_returncode* status, - const char** output, size_t* output_sz); + char** output, size_t* output_sz); int64_t wait(const char* cond, uint64_t state, replicant_returncode* status); int64_t loop(int timeout, replicant_returncode* status); - void enqueue_response(int64_t id) { m_pending_ids.push_back(id); } - uint64_t queued_responses() { return m_pending_ids.size(); } - e::error error() { return m_repl.last_error(); } + int64_t wait(int64_t id, int timeout, replicant_returncode* status); + std::string error_message() const { return replicant_client_error_message(m_repl); } + std::string error_location() const { return replicant_client_error_location(m_repl); } private: coordinator_link(const coordinator_link&); @@ -90,25 +80,16 @@ class coordinator_link private: bool prime_state_machine(replicant_returncode* status); - // call this when m_repl.loop returns m_id - // returns true if there's a new configuration to handle - // returns false otherwise and sets failed: - // failed == true: there's an error stored in *status; - // failed == false: no error, just working the state machine - bool handle_internal_callback(replicant_returncode* status, bool* failed); - bool begin_waiting_on_broadcast(replicant_returncode* status); - bool begin_fetching_config(replicant_returncode* status); + bool process_new_configuration(replicant_returncode* status); void reset(); private: - replicant_client m_repl; + replicant_client* m_repl; configuration m_config; - enum { NOTHING, WAITING_ON_BROADCAST, FETCHING_CONFIG } m_state; int64_t m_id; replicant_returncode m_status; - const char* m_output; + char* m_output; size_t m_output_sz; - std::list m_pending_ids; }; END_HYPERDEX_NAMESPACE diff --git a/common/funcall.cc b/common/funcall.cc index b6732a4c..d9378fd0 100644 --- a/common/funcall.cc +++ b/common/funcall.cc @@ -30,6 +30,7 @@ // HyperDex #include "common/datatype_info.h" #include "common/funcall.h" +#include "common/serialization.h" using hyperdex::funcall; @@ -188,3 +189,49 @@ hyperdex :: operator < (const funcall& lhs, const funcall& rhs) { return lhs.attr < rhs.attr; } + +e::packer +hyperdex :: operator << (e::packer lhs, const funcall_t& rhs) +{ + uint8_t name = static_cast(rhs); + return lhs << name; +} + +e::unpacker +hyperdex :: operator >> (e::unpacker lhs, funcall_t& rhs) +{ + uint8_t name; + lhs = lhs >> name; + rhs = static_cast(name); + return lhs; +} + +size_t +hyperdex :: pack_size(const funcall_t&) +{ + return sizeof(uint8_t); +} + +e::packer +hyperdex :: operator << (e::packer lhs, const funcall& rhs) +{ + return lhs << rhs.attr << rhs.name + << rhs.arg1 << rhs.arg1_datatype + << rhs.arg2 << rhs.arg2_datatype; +} + +e::unpacker +hyperdex :: operator >> (e::unpacker lhs, funcall& rhs) +{ + return lhs >> rhs.attr >> rhs.name + >> rhs.arg1 >> rhs.arg1_datatype + >> rhs.arg2 >> rhs.arg2_datatype; +} + +size_t +hyperdex :: pack_size(const funcall& m) +{ + return sizeof(uint16_t) + pack_size(m.name) + + pack_size(m.arg1) + pack_size(m.arg1_datatype) + + pack_size(m.arg2) + pack_size(m.arg2_datatype); +} diff --git a/common/funcall.h b/common/funcall.h index 15cffe93..d4760906 100644 --- a/common/funcall.h +++ b/common/funcall.h @@ -37,6 +37,7 @@ // e #include #include +#include // HyperDex #include "namespace.h" @@ -113,6 +114,20 @@ apply_funcs(const schema& sc, bool operator < (const funcall& lhs, const funcall& rhs); +e::packer +operator << (e::packer lhs, const funcall_t& rhs); +e::unpacker +operator >> (e::unpacker lhs, funcall_t& rhs); +size_t +pack_size(const funcall_t& f); + +e::packer +operator << (e::packer lhs, const funcall& rhs); +e::unpacker +operator >> (e::unpacker lhs, funcall& rhs); +size_t +pack_size(const funcall& f); + END_HYPERDEX_NAMESPACE #endif // hyperdex_common_funcall_h_ diff --git a/common/hyperspace.cc b/common/hyperspace.cc index b8e1c4bb..0c2e1a88 100644 --- a/common/hyperspace.cc +++ b/common/hyperspace.cc @@ -193,8 +193,8 @@ space :: reestablish_backing() } } -e::buffer::packer -hyperdex :: operator << (e::buffer::packer pa, const space& s) +e::packer +hyperdex :: operator << (e::packer pa, const space& s) { e::slice name; uint16_t num_subspaces = s.subspaces.size(); @@ -329,8 +329,8 @@ subspace :: operator = (const subspace& rhs) return *this; } -e::buffer::packer -hyperdex :: operator << (e::buffer::packer pa, const subspace& s) +e::packer +hyperdex :: operator << (e::packer pa, const subspace& s) { uint16_t num_attrs = s.attrs.size(); uint32_t num_regions = s.regions.size(); @@ -421,8 +421,8 @@ region :: operator = (const region& rhs) return *this; } -e::buffer::packer -hyperdex :: operator << (e::buffer::packer pa, const region& r) +e::packer +hyperdex :: operator << (e::packer pa, const region& r) { uint16_t num_hashes = r.lower_coord.size(); uint8_t num_replicas = r.replicas.size(); @@ -513,8 +513,8 @@ replica :: operator = (const replica& rhs) return *this; } -e::buffer::packer -hyperdex :: operator << (e::buffer::packer pa, const replica& r) +e::packer +hyperdex :: operator << (e::packer pa, const replica& r) { return pa << r.si.get() << r.vsi.get(); } diff --git a/common/hyperspace.h b/common/hyperspace.h index 1ff8c56e..a785bc48 100644 --- a/common/hyperspace.h +++ b/common/hyperspace.h @@ -74,7 +74,7 @@ class space const attribute& get_attribute(uint16_t index) const; private: - friend e::buffer::packer operator << (e::buffer::packer, const space& s); + friend e::packer operator << (e::packer, const space& s); friend e::unpacker operator >> (e::unpacker, space& s); friend size_t pack_size(const space&); @@ -89,8 +89,8 @@ inline const attribute& space::get_attribute(uint16_t index) const return m_attrs[index]; } -e::buffer::packer -operator << (e::buffer::packer, const space& s); +e::packer +operator << (e::packer, const space& s); e::unpacker operator >> (e::unpacker, space& s); size_t @@ -112,8 +112,8 @@ class subspace std::vector regions; }; -e::buffer::packer -operator << (e::buffer::packer, const subspace& s); +e::packer +operator << (e::packer, const subspace& s); e::unpacker operator >> (e::unpacker, subspace& s); size_t @@ -136,8 +136,8 @@ class region std::vector replicas; }; -e::buffer::packer -operator << (e::buffer::packer, const region& r); +e::packer +operator << (e::packer, const region& r); e::unpacker operator >> (e::unpacker, region& r); size_t @@ -160,8 +160,8 @@ class replica virtual_server_id vsi; }; -e::buffer::packer -operator << (e::buffer::packer, const replica& r); +e::packer +operator << (e::packer, const replica& r); e::unpacker operator >> (e::unpacker, replica& r); size_t diff --git a/common/ids.cc b/common/ids.cc index 8744b5f9..39a1dab1 100644 --- a/common/ids.cc +++ b/common/ids.cc @@ -34,8 +34,8 @@ { \ return lhs << #TYPE "(" << rhs.get() << ")"; \ } \ - e::buffer::packer \ - operator << (e::buffer::packer pa, const TYPE ## _id& rhs) \ + e::packer \ + operator << (e::packer pa, const TYPE ## _id& rhs) \ { \ return pa << rhs.get(); \ } \ diff --git a/common/ids.h b/common/ids.h index 83d4e995..13720c12 100644 --- a/common/ids.h +++ b/common/ids.h @@ -36,6 +36,7 @@ // e #include +#include // HyperDex #include "namespace.h" @@ -68,8 +69,8 @@ { \ return sizeof(uint64_t); \ } \ - e::buffer::packer \ - operator << (e::buffer::packer pa, const TYPE ## _id& rhs); \ + e::packer \ + operator << (e::packer pa, const TYPE ## _id& rhs); \ e::unpacker \ operator >> (e::unpacker up, TYPE ## _id& rhs); \ OPERATOR(TYPE, <) \ diff --git a/common/index.cc b/common/index.cc index 91544ce7..c51b7c55 100644 --- a/common/index.cc +++ b/common/index.cc @@ -74,7 +74,7 @@ hyperdex :: operator << (std::ostream& lhs, const index& rhs) break; case index::DOCUMENT: lhs << "index(" << rhs.id.get() - << ", " << std::string(rhs.extra.c_str(), rhs.extra.size()) + << ", " << rhs.extra.str() << ", " << rhs.attr << ")"; break; default: @@ -84,8 +84,8 @@ hyperdex :: operator << (std::ostream& lhs, const index& rhs) return lhs; } -e::buffer::packer -hyperdex :: operator << (e::buffer::packer pa, const index& t) +e::packer +hyperdex :: operator << (e::packer pa, const index& t) { return pa << t.type << t.id << t.attr << t.extra; } @@ -104,8 +104,8 @@ hyperdex :: pack_size(const index& t) + sizeof(uint32_t) + t.extra.size(); } -e::buffer::packer -hyperdex :: operator << (e::buffer::packer pa, const index::index_t& t) +e::packer +hyperdex :: operator << (e::packer pa, const index::index_t& t) { uint8_t x = t; return pa << x; diff --git a/common/index.h b/common/index.h index 9021f79a..f4458b3f 100644 --- a/common/index.h +++ b/common/index.h @@ -61,15 +61,15 @@ class index std::ostream& operator << (std::ostream& lhs, const index& rhs); -e::buffer::packer -operator << (e::buffer::packer, const index& t); +e::packer +operator << (e::packer, const index& t); e::unpacker operator >> (e::unpacker, index& t); size_t pack_size(const index& t); -e::buffer::packer -operator << (e::buffer::packer, const index::index_t& t); +e::packer +operator << (e::packer, const index::index_t& t); e::unpacker operator >> (e::unpacker, index::index_t& t); size_t diff --git a/common/key_change.cc b/common/key_change.cc index b39c07e3..1ec19b60 100644 --- a/common/key_change.cc +++ b/common/key_change.cc @@ -128,8 +128,8 @@ key_change :: operator = (const key_change& rhs) #define FLAG_FINF 1 #define FLAG_FIF 2 -e::buffer::packer -hyperdex :: operator << (e::buffer::packer pa, const key_change& td) +e::packer +hyperdex :: operator << (e::packer pa, const key_change& td) { uint8_t flags = (td.erase ? 0 : FLAG_WRITE) | (td.fail_if_not_found ? FLAG_FINF : 0) diff --git a/common/key_change.h b/common/key_change.h index fe2ea000..e81633ee 100644 --- a/common/key_change.h +++ b/common/key_change.h @@ -78,8 +78,8 @@ class key_change std::auto_ptr auth; }; -e::buffer::packer -operator << (e::buffer::packer, const key_change& td); +e::packer +operator << (e::packer, const key_change& td); e::unpacker operator >> (e::unpacker, key_change& td); size_t diff --git a/common/serialization.cc b/common/serialization.cc index 4539e6d6..e075954d 100644 --- a/common/serialization.cc +++ b/common/serialization.cc @@ -28,208 +28,8 @@ // HyperDex #include "common/serialization.h" -e::buffer::packer -hyperdex :: operator << (e::buffer::packer lhs, const po6::net::ipaddr& rhs) -{ - assert(rhs.family() == AF_INET || rhs.family() == AF_INET6 || rhs.family() == AF_UNSPEC); - uint8_t type; - uint8_t data[16]; - memset(data, 0, 16); - - if (rhs.family() == AF_INET) - { - type = 4; - sockaddr_in sa; - rhs.pack(&sa, 0); - memmove(data, &sa.sin_addr.s_addr, 4); - } - else if (rhs.family() == AF_INET6) - { - type = 6; - sockaddr_in6 sa; - rhs.pack(&sa, 0); -#ifdef _MSC_VER - memmove(data, &sa.sin6_addr.u.Byte, 16); -#elif defined __APPLE__ - memmove(data, &sa.sin6_addr.__u6_addr.__u6_addr8, 16); -#else - memmove(data, &sa.sin6_addr.__in6_u.__u6_addr8, 16); -#endif - - } - else - { - type = 0; - } - - lhs = lhs << type; - return lhs.copy(e::slice(data, 16)); -} - -e::unpacker -hyperdex :: operator >> (e::unpacker lhs, po6::net::ipaddr& rhs) -{ - uint8_t type; - lhs = lhs >> type; - - if (lhs.remain() < 16) - { - return lhs.as_error(); - } - - e::slice rem = lhs.as_slice(); - - if (type == 4) - { - in_addr ia; - memmove(&ia.s_addr, rem.data(), 4); - rhs = po6::net::ipaddr(ia); - return lhs.advance(16); - } - else if (type == 6) - { - in6_addr ia; -#ifdef _MSC_VER - memmove(ia.u.Byte, rem.data(), 16); -#elif defined __APPLE__ - memmove(ia.__u6_addr.__u6_addr8, rem.data(), 16); -#else - memmove(ia.__in6_u.__u6_addr8, rem.data(), 16); -#endif - rhs = po6::net::ipaddr(ia); - return lhs.advance(16); - } - else if (type == 0) - { - return lhs.advance(16); - } - else - { - return lhs.as_error(); - } -} - -size_t -hyperdex :: pack_size(const po6::net::ipaddr&) -{ - return 17; // One byte for family, and 4/16 for address -} - -e::buffer::packer -hyperdex :: operator << (e::buffer::packer lhs, const po6::net::location& rhs) -{ - return lhs << rhs.address << rhs.port; -} - -e::unpacker -hyperdex :: operator >> (e::unpacker lhs, po6::net::location& rhs) -{ - return lhs >> rhs.address >> rhs.port; -} - -size_t -hyperdex :: pack_size(const po6::net::location& rhs) -{ - return pack_size(rhs.address) + sizeof(uint16_t); -} - -e::buffer::packer -hyperdex :: operator << (e::buffer::packer lhs, const po6::net::hostname& rhs) -{ - return lhs << e::slice(rhs.address.data(), rhs.address.size()) << rhs.port; -} - -e::unpacker -hyperdex :: operator >> (e::unpacker lhs, po6::net::hostname& rhs) -{ - e::slice address; - lhs = lhs >> address >> rhs.port; - rhs.address = std::string(reinterpret_cast(address.data()), address.size()); - return lhs; -} - -size_t -hyperdex :: pack_size(const po6::net::hostname& rhs) -{ - return sizeof(uint32_t) + rhs.address.size() + sizeof(uint16_t); -} - -e::buffer::packer -hyperdex :: operator << (e::buffer::packer lhs, const attribute_check& rhs) -{ - return lhs << rhs.attr - << rhs.value - << rhs.datatype - << rhs.predicate; -} - -e::unpacker -hyperdex :: operator >> (e::unpacker lhs, attribute_check& rhs) -{ - return lhs >> rhs.attr - >> rhs.value - >> rhs.datatype - >> rhs.predicate; -} - -size_t -hyperdex :: pack_size(const attribute_check& rhs) -{ - return sizeof(uint16_t) - + sizeof(uint32_t) - + rhs.value.size() - + pack_size(rhs.datatype) - + pack_size(rhs.predicate); -} - -e::buffer::packer -hyperdex :: operator << (e::buffer::packer lhs, const funcall_t& rhs) -{ - uint8_t name = static_cast(rhs); - return lhs << name; -} - -e::unpacker -hyperdex :: operator >> (e::unpacker lhs, funcall_t& rhs) -{ - uint8_t name; - lhs = lhs >> name; - rhs = static_cast(name); - return lhs; -} - -size_t -hyperdex :: pack_size(const funcall_t&) -{ - return sizeof(uint8_t); -} - -e::buffer::packer -hyperdex :: operator << (e::buffer::packer lhs, const funcall& rhs) -{ - return lhs << rhs.attr << rhs.name - << rhs.arg1 << rhs.arg1_datatype - << rhs.arg2 << rhs.arg2_datatype; -} - -e::unpacker -hyperdex :: operator >> (e::unpacker lhs, funcall& rhs) -{ - return lhs >> rhs.attr >> rhs.name - >> rhs.arg1 >> rhs.arg1_datatype - >> rhs.arg2 >> rhs.arg2_datatype; -} - -size_t -hyperdex :: pack_size(const funcall& m) -{ - return sizeof(uint16_t) + pack_size(m.name) - + sizeof(uint32_t) + m.arg1.size() + pack_size(m.arg1_datatype) - + sizeof(uint32_t) + m.arg2.size() + pack_size(m.arg2_datatype); -} - -e::buffer::packer -hyperdex :: operator << (e::buffer::packer lhs, const hyperdatatype& rhs) +e::packer +hyperdex :: operator << (e::packer lhs, const hyperdatatype& rhs) { uint16_t r = static_cast(rhs); return lhs << r; @@ -250,8 +50,8 @@ hyperdex :: pack_size(const hyperdatatype&) return sizeof(uint16_t); } -e::buffer::packer -hyperdex :: operator << (e::buffer::packer lhs, const hyperpredicate& rhs) +e::packer +hyperdex :: operator << (e::packer lhs, const hyperpredicate& rhs) { uint16_t r = static_cast(rhs); return lhs << r; @@ -271,9 +71,3 @@ hyperdex :: pack_size(const hyperpredicate&) { return sizeof(uint16_t); } - -size_t -hyperdex :: pack_size(const e::slice& s) -{ - return sizeof(uint32_t) + s.size(); -} diff --git a/common/serialization.h b/common/serialization.h index 47ebea97..af8883f3 100644 --- a/common/serialization.h +++ b/common/serialization.h @@ -28,12 +28,8 @@ #ifndef hyperdex_common_serialization_h_ #define hyperdex_common_serialization_h_ -// po6 -#include -#include - // e -#include +#include // HyperDex #include "namespace.h" @@ -42,83 +38,23 @@ #include "common/funcall.h" BEGIN_HYPERDEX_NAMESPACE +using ::pack_size; +using e::pack_size; -e::buffer::packer -operator << (e::buffer::packer lhs, const po6::net::ipaddr& rhs); -e::unpacker -operator >> (e::unpacker lhs, po6::net::ipaddr& rhs); -size_t -pack_size(const po6::net::ipaddr& rhs); - -e::buffer::packer -operator << (e::buffer::packer lhs, const po6::net::location& rhs); -e::unpacker -operator >> (e::unpacker lhs, po6::net::location& rhs); -size_t -pack_size(const po6::net::location& rhs); - -e::buffer::packer -operator << (e::buffer::packer lhs, const po6::net::hostname& rhs); -e::unpacker -operator >> (e::unpacker lhs, po6::net::hostname& rhs); -size_t -pack_size(const po6::net::hostname& rhs); - -e::buffer::packer -operator << (e::buffer::packer lhs, const attribute_check& rhs); -e::unpacker -operator >> (e::unpacker lhs, attribute_check& rhs); -size_t -pack_size(const attribute_check& rhs); - -e::buffer::packer -operator << (e::buffer::packer lhs, const funcall_t& rhs); -e::unpacker -operator >> (e::unpacker lhs, funcall_t& rhs); -size_t -pack_size(const funcall_t& f); - -e::buffer::packer -operator << (e::buffer::packer lhs, const funcall& rhs); -e::unpacker -operator >> (e::unpacker lhs, funcall& rhs); -size_t -pack_size(const funcall& f); - -e::buffer::packer -operator << (e::buffer::packer lhs, const hyperdatatype& rhs); +e::packer +operator << (e::packer lhs, const hyperdatatype& rhs); e::unpacker operator >> (e::unpacker lhs, hyperdatatype& rhs); size_t pack_size(const hyperdatatype& h); -e::buffer::packer -operator << (e::buffer::packer lhs, const hyperpredicate& rhs); +e::packer +operator << (e::packer lhs, const hyperpredicate& rhs); e::unpacker operator >> (e::unpacker lhs, hyperpredicate& rhs); size_t pack_size(const hyperpredicate& p); -inline size_t -pack_size(uint64_t) { return sizeof(uint64_t); } - -size_t -pack_size(const e::slice& s); - -template -size_t -pack_size(const std::vector& v) -{ - size_t sz = sizeof(uint32_t); - - for (size_t i = 0; i < v.size(); ++i) - { - sz += pack_size(v[i]); - } - - return sz; -} - END_HYPERDEX_NAMESPACE #endif // hyperdex_common_serialization_h_ diff --git a/common/server.cc b/common/server.cc index 410185f2..b5dbfa75 100644 --- a/common/server.cc +++ b/common/server.cc @@ -71,8 +71,8 @@ hyperdex :: operator < (const server& lhs, const server& rhs) return lhs.id < rhs.id; } -e::buffer::packer -hyperdex :: operator << (e::buffer::packer lhs, const server& rhs) +e::packer +hyperdex :: operator << (e::packer lhs, const server& rhs) { uint8_t s = static_cast(rhs.state); return lhs << s << rhs.id << rhs.bind_to; diff --git a/common/server.h b/common/server.h index 01ffeba2..f74dabf0 100644 --- a/common/server.h +++ b/common/server.h @@ -63,8 +63,8 @@ class server bool operator < (const server& lhs, const server& rhs); -e::buffer::packer -operator << (e::buffer::packer lhs, const server& rhs); +e::packer +operator << (e::packer lhs, const server& rhs); e::unpacker operator >> (e::unpacker lhs, server& rhs); size_t diff --git a/common/transfer.cc b/common/transfer.cc index ebfaf3c3..efe11d8d 100644 --- a/common/transfer.cc +++ b/common/transfer.cc @@ -143,8 +143,8 @@ hyperdex :: operator << (std::ostream& lhs, const transfer& rhs) << ", vdst=" << rhs.vdst << ")"; } -e::buffer::packer -hyperdex :: operator << (e::buffer::packer pa, const transfer& t) +e::packer +hyperdex :: operator << (e::packer pa, const transfer& t) { return pa << t.id << t.rid << t.src << t.vsrc << t.dst << t.vdst; } diff --git a/common/transfer.h b/common/transfer.h index 02378e0a..88b852ad 100644 --- a/common/transfer.h +++ b/common/transfer.h @@ -66,8 +66,8 @@ class transfer std::ostream& operator << (std::ostream& lhs, const transfer& rhs); -e::buffer::packer -operator << (e::buffer::packer, const transfer& t); +e::packer +operator << (e::packer, const transfer& t); e::unpacker operator >> (e::unpacker, transfer& t); size_t diff --git a/coordinator/coordinator.cc b/coordinator/coordinator.cc index d12dd0f7..fa0a4b81 100644 --- a/coordinator/coordinator.cc +++ b/coordinator/coordinator.cc @@ -38,9 +38,6 @@ // e #include -// Replicant -#include - // HyperDex #include "common/configuration_flags.h" #include "common/serialization.h" @@ -170,40 +167,37 @@ coordinator :: ~coordinator() throw () } void -coordinator :: init(replicant_state_machine_context* ctx, uint64_t token) +coordinator :: init(rsm_context* ctx, uint64_t token) { - FILE* log = replicant_state_machine_log_stream(ctx); - if (m_cluster != 0) { - fprintf(log, "cannot initialize HyperDex cluster with id %" PRIu64 " " + rsm_log(ctx, "cannot initialize HyperDex cluster with id %" PRIu64 " " "because it is already initialized to %" PRIu64 "\n", token, m_cluster); // we lie to the client and pretend all is well return generate_response(ctx, COORD_SUCCESS); } - replicant_state_machine_alarm(ctx, "alarm", ALARM_INTERVAL); - fprintf(log, "initializing HyperDex cluster with id %" PRIu64 "\n", token); + rsm_tick_interval(ctx, "periodic", ALARM_INTERVAL); + rsm_log(ctx, "initializing HyperDex cluster with id %" PRIu64 "\n", token); m_cluster = token; generate_next_configuration(ctx); return generate_response(ctx, COORD_SUCCESS); } void -coordinator :: read_only(replicant_state_machine_context* ctx, bool ro) +coordinator :: read_only(rsm_context* ctx, bool ro) { - FILE* log = replicant_state_machine_log_stream(ctx); uint64_t old_flags = m_flags; if (ro) { if ((m_flags & HYPERDEX_CONFIG_READ_ONLY)) { - fprintf(log, "cluster already in read-only mode\n"); + rsm_log(ctx, "cluster already in read-only mode\n"); } else { - fprintf(log, "putting cluster into read-only mode\n"); + rsm_log(ctx, "putting cluster into read-only mode\n"); } m_flags |= HYPERDEX_CONFIG_READ_ONLY; @@ -212,11 +206,11 @@ coordinator :: read_only(replicant_state_machine_context* ctx, bool ro) { if ((m_flags & HYPERDEX_CONFIG_READ_ONLY)) { - fprintf(log, "putting cluster into read-write mode\n"); + rsm_log(ctx, "putting cluster into read-write mode\n"); } else { - fprintf(log, "cluster already in read-write mode\n"); + rsm_log(ctx, "cluster already in read-write mode\n"); } uint64_t mask = HYPERDEX_CONFIG_READ_ONLY; @@ -233,7 +227,7 @@ coordinator :: read_only(replicant_state_machine_context* ctx, bool ro) } void -coordinator :: fault_tolerance(replicant_state_machine_context* ctx, +coordinator :: fault_tolerance(rsm_context* ctx, const char* space, uint64_t ft) { uint64_t R = 0; @@ -245,12 +239,18 @@ coordinator :: fault_tolerance(replicant_state_machine_context* ctx, for (space_map_t::iterator it = m_spaces.begin(); it != m_spaces.end(); ++it) { - if (it->first == space) { + if (it->first == space) + { s = it->second; break; } } + if (!s) + { + return; + } + s->fault_tolerance = ft; R = ft + 1; @@ -266,17 +266,16 @@ coordinator :: fault_tolerance(replicant_state_machine_context* ctx, } void -coordinator :: server_register(replicant_state_machine_context* ctx, +coordinator :: server_register(rsm_context* ctx, const server_id& sid, const po6::net::location& bind_to) { - FILE* log = replicant_state_machine_log_stream(ctx); server* srv = get_server(sid); if (srv) { std::string str(to_string(srv->bind_to)); - fprintf(log, "cannot register server(%" PRIu64 ") because the id belongs to " + rsm_log(ctx, "cannot register server(%" PRIu64 ") because the id belongs to " "server(%" PRIu64 ", %s)\n", sid.get(), srv->id.get(), str.c_str()); return generate_response(ctx, hyperdex::COORD_DUPLICATE); } @@ -284,22 +283,21 @@ coordinator :: server_register(replicant_state_machine_context* ctx, srv = new_server(sid); srv->state = server::ASSIGNED; srv->bind_to = bind_to; - fprintf(log, "registered server(%" PRIu64 ")\n", sid.get()); + rsm_log(ctx, "registered server(%" PRIu64 ")\n", sid.get()); generate_next_configuration(ctx); return generate_response(ctx, COORD_SUCCESS); } void -coordinator :: server_online(replicant_state_machine_context* ctx, +coordinator :: server_online(rsm_context* ctx, const server_id& sid, const po6::net::location* bind_to) { - FILE* log = replicant_state_machine_log_stream(ctx); server* srv = get_server(sid); if (!srv) { - fprintf(log, "cannot bring server(%" PRIu64 ") online because " + rsm_log(ctx, "cannot bring server(%" PRIu64 ") online because " "the server doesn't exist\n", sid.get()); return generate_response(ctx, hyperdex::COORD_NOT_FOUND); } @@ -309,7 +307,7 @@ coordinator :: server_online(replicant_state_machine_context* ctx, srv->state != server::SHUTDOWN && srv->state != server::AVAILABLE) { - fprintf(log, "cannot bring server(%" PRIu64 ") online because the server is " + rsm_log(ctx, "cannot bring server(%" PRIu64 ") online because the server is " "%s\n", sid.get(), server::to_string(srv->state)); return generate_response(ctx, hyperdex::COORD_NO_CAN_DO); } @@ -326,7 +324,7 @@ coordinator :: server_online(replicant_state_machine_context* ctx, if (m_servers[i].id != sid && m_servers[i].bind_to == *bind_to) { - fprintf(log, "cannot change server(%" PRIu64 ") to %s " + rsm_log(ctx, "cannot change server(%" PRIu64 ") to %s " "because that address is in use by " "server(%" PRIu64 ")\n", sid.get(), to.c_str(), m_servers[i].id.get()); @@ -334,7 +332,7 @@ coordinator :: server_online(replicant_state_machine_context* ctx, } } - fprintf(log, "changing server(%" PRIu64 ")'s address from %s to %s\n", + rsm_log(ctx, "changing server(%" PRIu64 ")'s address from %s to %s\n", sid.get(), from.c_str(), to.c_str()); srv->bind_to = *bind_to; changed = true; @@ -342,7 +340,7 @@ coordinator :: server_online(replicant_state_machine_context* ctx, if (srv->state != server::AVAILABLE) { - fprintf(log, "changing server(%" PRIu64 ") from %s to %s\n", + rsm_log(ctx, "changing server(%" PRIu64 ") from %s to %s\n", sid.get(), server::to_string(srv->state), server::to_string(server::AVAILABLE)); srv->state = server::AVAILABLE; @@ -361,23 +359,24 @@ coordinator :: server_online(replicant_state_machine_context* ctx, generate_next_configuration(ctx); } +#if 0 char buf[sizeof(uint64_t)]; e::pack64be(sid.get(), buf); uint64_t client = replicant_state_machine_get_client(ctx); replicant_state_machine_suspect(ctx, client, "server_suspect", buf, sizeof(uint64_t)); +#endif return generate_response(ctx, COORD_SUCCESS); } void -coordinator :: server_offline(replicant_state_machine_context* ctx, +coordinator :: server_offline(rsm_context* ctx, const server_id& sid) { - FILE* log = replicant_state_machine_log_stream(ctx); server* srv = get_server(sid); if (!srv) { - fprintf(log, "cannot bring server(%" PRIu64 ") offline because " + rsm_log(ctx, "cannot bring server(%" PRIu64 ") offline because " "the server doesn't exist\n", sid.get()); return generate_response(ctx, hyperdex::COORD_NOT_FOUND); } @@ -387,14 +386,14 @@ coordinator :: server_offline(replicant_state_machine_context* ctx, srv->state != server::AVAILABLE && srv->state != server::SHUTDOWN) { - fprintf(log, "cannot bring server(%" PRIu64 ") offline because the server is " + rsm_log(ctx, "cannot bring server(%" PRIu64 ") offline because the server is " "%s\n", sid.get(), server::to_string(srv->state)); return generate_response(ctx, hyperdex::COORD_NO_CAN_DO); } if (srv->state != server::NOT_AVAILABLE && srv->state != server::SHUTDOWN) { - fprintf(log, "changing server(%" PRIu64 ") from %s to %s\n", + rsm_log(ctx, "changing server(%" PRIu64 ") from %s to %s\n", sid.get(), server::to_string(srv->state), server::to_string(server::NOT_AVAILABLE)); srv->state = server::NOT_AVAILABLE; @@ -406,15 +405,14 @@ coordinator :: server_offline(replicant_state_machine_context* ctx, } void -coordinator :: server_shutdown(replicant_state_machine_context* ctx, +coordinator :: server_shutdown(rsm_context* ctx, const server_id& sid) { - FILE* log = replicant_state_machine_log_stream(ctx); server* srv = get_server(sid); if (!srv) { - fprintf(log, "cannot shutdown server(%" PRIu64 ") because " + rsm_log(ctx, "cannot shutdown server(%" PRIu64 ") because " "the server doesn't exist\n", sid.get()); return generate_response(ctx, hyperdex::COORD_NOT_FOUND); } @@ -424,14 +422,14 @@ coordinator :: server_shutdown(replicant_state_machine_context* ctx, srv->state != server::AVAILABLE && srv->state != server::SHUTDOWN) { - fprintf(log, "cannot shutdown server(%" PRIu64 ") because the server is " + rsm_log(ctx, "cannot shutdown server(%" PRIu64 ") because the server is " "%s\n", sid.get(), server::to_string(srv->state)); return generate_response(ctx, hyperdex::COORD_NO_CAN_DO); } if (srv->state != server::SHUTDOWN) { - fprintf(log, "changing server(%" PRIu64 ") from %s to %s\n", + rsm_log(ctx, "changing server(%" PRIu64 ") from %s to %s\n", sid.get(), server::to_string(srv->state), server::to_string(server::SHUTDOWN)); srv->state = server::SHUTDOWN; @@ -443,22 +441,21 @@ coordinator :: server_shutdown(replicant_state_machine_context* ctx, } void -coordinator :: server_kill(replicant_state_machine_context* ctx, +coordinator :: server_kill(rsm_context* ctx, const server_id& sid) { - FILE* log = replicant_state_machine_log_stream(ctx); server* srv = get_server(sid); if (!srv) { - fprintf(log, "cannot kill server(%" PRIu64 ") because " + rsm_log(ctx, "cannot kill server(%" PRIu64 ") because " "the server doesn't exist\n", sid.get()); return generate_response(ctx, hyperdex::COORD_NOT_FOUND); } if (srv->state != server::KILLED) { - fprintf(log, "changing server(%" PRIu64 ") from %s to %s\n", + rsm_log(ctx, "changing server(%" PRIu64 ") from %s to %s\n", sid.get(), server::to_string(srv->state), server::to_string(server::KILLED)); srv->state = server::KILLED; @@ -472,15 +469,14 @@ coordinator :: server_kill(replicant_state_machine_context* ctx, } void -coordinator :: server_forget(replicant_state_machine_context* ctx, +coordinator :: server_forget(rsm_context* ctx, const server_id& sid) { - FILE* log = replicant_state_machine_log_stream(ctx); server* srv = get_server(sid); if (!srv) { - fprintf(log, "cannot forget server(%" PRIu64 ") because " + rsm_log(ctx, "cannot forget server(%" PRIu64 ") because " "the server doesn't exist\n", sid.get()); return generate_response(ctx, hyperdex::COORD_NOT_FOUND); } @@ -503,15 +499,14 @@ coordinator :: server_forget(replicant_state_machine_context* ctx, } void -coordinator :: server_suspect(replicant_state_machine_context* ctx, +coordinator :: server_suspect(rsm_context* ctx, const server_id& sid) { - FILE* log = replicant_state_machine_log_stream(ctx); server* srv = get_server(sid); if (!srv) { - fprintf(log, "cannot suspect server(%" PRIu64 ") because " + rsm_log(ctx, "cannot suspect server(%" PRIu64 ") because " "the server doesn't exist\n", sid.get()); return generate_response(ctx, hyperdex::COORD_NOT_FOUND); } @@ -525,14 +520,14 @@ coordinator :: server_suspect(replicant_state_machine_context* ctx, srv->state != server::NOT_AVAILABLE && srv->state != server::AVAILABLE) { - fprintf(log, "cannot suspect server(%" PRIu64 ") because the server is " + rsm_log(ctx, "cannot suspect server(%" PRIu64 ") because the server is " "%s\n", sid.get(), server::to_string(srv->state)); return generate_response(ctx, hyperdex::COORD_NO_CAN_DO); } if (srv->state != server::NOT_AVAILABLE && srv->state != server::SHUTDOWN) { - fprintf(log, "changing server(%" PRIu64 ") from %s to %s because we suspect it failed\n", + rsm_log(ctx, "changing server(%" PRIu64 ") from %s to %s because we suspect it failed\n", sid.get(), server::to_string(srv->state), server::to_string(server::NOT_AVAILABLE)); srv->state = server::NOT_AVAILABLE; @@ -544,7 +539,7 @@ coordinator :: server_suspect(replicant_state_machine_context* ctx, } void -coordinator :: report_disconnect(replicant_state_machine_context* ctx, +coordinator :: report_disconnect(rsm_context* ctx, const server_id& sid, uint64_t version) { if (m_version != version) @@ -574,19 +569,18 @@ is_space_name(const char* str) } void -coordinator :: space_add(replicant_state_machine_context* ctx, const space& _s) +coordinator :: space_add(rsm_context* ctx, const space& _s) { - FILE* log = replicant_state_machine_log_stream(ctx); if (!_s.validate()) { - fprintf(log, "could not add space \"%s\" because the space does not validate\n", _s.name); + rsm_log(ctx, "could not add space \"%s\" because the space does not validate\n", _s.name); return generate_response(ctx, COORD_MALFORMED); } if (m_spaces.find(std::string(_s.name)) != m_spaces.end()) { - fprintf(log, "could not add space \"%s\" because there is already a space with that name\n", _s.name); + rsm_log(ctx, "could not add space \"%s\" because there is already a space with that name\n", _s.name); return generate_response(ctx, COORD_DUPLICATE); } @@ -618,28 +612,27 @@ coordinator :: space_add(replicant_state_machine_context* ctx, const space& _s) x = m_spaces.insert(std::make_pair(std::string(s->name), s)); assert(x.second); s->name = x.first->first.c_str(); - fprintf(log, "successfully added space \"%s\" with space(%" PRIu64 ")\n", s->name, s->id.get()); + rsm_log(ctx, "successfully added space \"%s\" with space(%" PRIu64 ")\n", s->name, s->id.get()); initial_space_layout(ctx, s.get()); generate_next_configuration(ctx); return generate_response(ctx, COORD_SUCCESS); } void -coordinator :: space_rm(replicant_state_machine_context* ctx, const char* name) +coordinator :: space_rm(rsm_context* ctx, const char* name) { - FILE* log = replicant_state_machine_log_stream(ctx); space_map_t::iterator it; it = m_spaces.find(std::string(name)); if (it == m_spaces.end()) { - fprintf(log, "could not remove space \"%s\" because it doesn't exist\n", name); + rsm_log(ctx, "could not remove space \"%s\" because it doesn't exist\n", name); return generate_response(ctx, COORD_NOT_FOUND); } else { space_id sid(it->second->id.get()); - fprintf(log, "successfully removed space \"%s\"/space(%" PRIu64 ")\n", name, sid.get()); + rsm_log(ctx, "successfully removed space \"%s\"/space(%" PRIu64 ")\n", name, sid.get()); std::vector rids; regions_in_space(it->second, &rids); std::sort(rids.begin(), rids.end()); @@ -678,32 +671,31 @@ coordinator :: space_rm(replicant_state_machine_context* ctx, const char* name) } void -coordinator :: space_mv(replicant_state_machine_context* ctx, const char* src, const char* dst) +coordinator :: space_mv(rsm_context* ctx, const char* src, const char* dst) { - FILE* log = replicant_state_machine_log_stream(ctx); space_map_t::iterator it; it = m_spaces.find(std::string(src)); if (it == m_spaces.end()) { - fprintf(log, "could not rename space \"%s\" because it doesn't exist\n", src); + rsm_log(ctx, "could not rename space \"%s\" because it doesn't exist\n", src); return generate_response(ctx, COORD_NOT_FOUND); } else if (m_spaces.find(std::string(dst)) != m_spaces.end()) { - fprintf(log, "could not rename space \"%s\" to \"%s\" because there is already a space \"%s\"\n", src, dst, dst); + rsm_log(ctx, "could not rename space \"%s\" to \"%s\" because there is already a space \"%s\"\n", src, dst, dst); return generate_response(ctx, COORD_DUPLICATE); } else if (!is_space_name(dst)) { - fprintf(log, "could not rename space \"%s\" to \"%s\" because \"%s\" is an invalid space name\n", src, dst, dst); + rsm_log(ctx, "could not rename space \"%s\" to \"%s\" because \"%s\" is an invalid space name\n", src, dst, dst); return generate_response(ctx, COORD_NO_CAN_DO); } else { space_ptr sp(it->second); space_id sid(sp->id.get()); - fprintf(log, "renaming space \"%s\" (%" PRIu64 ") to \"%s\"\n", src, sid.get(), dst); + rsm_log(ctx, "renaming space \"%s\" (%" PRIu64 ") to \"%s\"\n", src, sid.get(), dst); m_spaces.erase(it); std::pair x; x = m_spaces.insert(std::make_pair(std::string(dst), sp)); @@ -715,16 +707,15 @@ coordinator :: space_mv(replicant_state_machine_context* ctx, const char* src, c } void -coordinator :: index_add(replicant_state_machine_context* ctx, +coordinator :: index_add(rsm_context* ctx, const char* space, const char* what) { - FILE* log = replicant_state_machine_log_stream(ctx); space_map_t::iterator it; it = m_spaces.find(std::string(space)); if (it == m_spaces.end()) { - fprintf(log, "could not create index on \"%s\" on space \"%s\" because the space doesn't exist\n", what, space); + rsm_log(ctx, "could not create index on \"%s\" on space \"%s\" because the space doesn't exist\n", what, space); return generate_response(ctx, COORD_NOT_FOUND); } @@ -754,14 +745,14 @@ coordinator :: index_add(replicant_state_machine_context* ctx, if (attr_num >= sp->sc.attrs_sz) { - fprintf(log, "could not create index on \"%s\" on space \"%s\" because the attribute doesn't exist\n", what, space); + rsm_log(ctx, "could not create index on \"%s\" on space \"%s\" because the attribute doesn't exist\n", what, space); return generate_response(ctx, COORD_NOT_FOUND); } if (type == index::NORMAL && sp->sc.attrs[attr_num].type == HYPERDATATYPE_DOCUMENT) { - fprintf(log, "could not create index on \"%s\" on space \"%s\" because " + rsm_log(ctx, "could not create index on \"%s\" on space \"%s\" because " "it is a document and no dotted path was provided\n", what, space); return generate_response(ctx, COORD_NO_CAN_DO); } @@ -769,7 +760,7 @@ coordinator :: index_add(replicant_state_machine_context* ctx, if (type == index::DOCUMENT && sp->sc.attrs[attr_num].type != HYPERDATATYPE_DOCUMENT) { - fprintf(log, "could not create index on \"%s\" on space \"%s\" because " + rsm_log(ctx, "could not create index on \"%s\" on space \"%s\" because " "it is a not document and a dotted path was provided\n", what, space); return generate_response(ctx, COORD_NO_CAN_DO); } @@ -780,12 +771,12 @@ coordinator :: index_add(replicant_state_machine_context* ctx, sp->indices[i].attr == attr_num && sp->indices[i].extra == e::slice(dotpath)) { - fprintf(log, "did not create index on \"%s\" on space \"%s\" because it is already indexed\n", what, space); + rsm_log(ctx, "did not create index on \"%s\" on space \"%s\" because it is already indexed\n", what, space); return generate_response(ctx, COORD_DUPLICATE); } } - fprintf(log, "creating index on \"%s\" on space \"%s\"\n", what, space); + rsm_log(ctx, "creating index on \"%s\" on space \"%s\"\n", what, space); index_id id(m_counter); ++m_counter; sp->indices.push_back(index(type, id, attr_num, e::slice(dotpath))); @@ -795,9 +786,8 @@ coordinator :: index_add(replicant_state_machine_context* ctx, } void -coordinator :: index_rm(replicant_state_machine_context* ctx, index_id ii) +coordinator :: index_rm(rsm_context* ctx, index_id ii) { - FILE* log = replicant_state_machine_log_stream(ctx); e::compat::shared_ptr s; index* idx; @@ -813,7 +803,7 @@ coordinator :: index_rm(replicant_state_machine_context* ctx, index_id ii) if (!s.get() || !idx) { - fprintf(log, "could not remove index %lu because it doesn't exist\n", ii.get()); + rsm_log(ctx, "could not remove index %lu because it doesn't exist\n", ii.get()); return generate_response(ctx, COORD_NOT_FOUND); } @@ -824,7 +814,7 @@ coordinator :: index_rm(replicant_state_machine_context* ctx, index_id ii) { if (it->attrs[a] == idx->attr) { - fprintf(log, "could not remove index %lu because it's in use by subspace \"%lu\"\n", + rsm_log(ctx, "could not remove index %lu because it's in use by subspace \"%lu\"\n", ii.get(), it->id.get()); return generate_response(ctx, COORD_NO_CAN_DO); } @@ -832,17 +822,16 @@ coordinator :: index_rm(replicant_state_machine_context* ctx, index_id ii) } remove_id(ii, &s->indices); - fprintf(log, "removed index %lu from space \"%s\"\n", ii.get(), s->name); + rsm_log(ctx, "removed index %lu from space \"%s\"\n", ii.get(), s->name); generate_next_configuration(ctx); return generate_response(ctx, COORD_SUCCESS); } void -coordinator :: transfer_go_live(replicant_state_machine_context* ctx, +coordinator :: transfer_go_live(rsm_context* ctx, uint64_t version, const transfer_id& xid) { - FILE* log = replicant_state_machine_log_stream(ctx); transfer* xfer = get_transfer(xid); if (!xfer) @@ -852,7 +841,7 @@ coordinator :: transfer_go_live(replicant_state_machine_context* ctx, return; } - fprintf(log, "cannot make transfer(%" PRIu64 ") live because it doesn't exist\n", xid.get()); + rsm_log(ctx, "cannot make transfer(%" PRIu64 ") live because it doesn't exist\n", xid.get()); return generate_response(ctx, COORD_SUCCESS); } @@ -860,7 +849,7 @@ coordinator :: transfer_go_live(replicant_state_machine_context* ctx, if (!reg) { - fprintf(log, "cannot make transfer(%" PRIu64 ") live because it doesn't exist\n", xid.get()); + rsm_log(ctx, "cannot make transfer(%" PRIu64 ") live because it doesn't exist\n", xid.get()); INVARIANT_BROKEN("transfer refers to nonexistent region"); return generate_response(ctx, COORD_SUCCESS); } @@ -880,17 +869,16 @@ coordinator :: transfer_go_live(replicant_state_machine_context* ctx, } reg->replicas.push_back(replica(xfer->dst, xfer->vdst)); - fprintf(log, "transfer(%" PRIu64 ") is live\n", xid.get()); + rsm_log(ctx, "transfer(%" PRIu64 ") is live\n", xid.get()); generate_next_configuration(ctx); return generate_response(ctx, COORD_SUCCESS); } void -coordinator :: transfer_complete(replicant_state_machine_context* ctx, +coordinator :: transfer_complete(rsm_context* ctx, uint64_t version, const transfer_id& xid) { - FILE* log = replicant_state_machine_log_stream(ctx); transfer* xfer = get_transfer(xid); if (!xfer) @@ -900,7 +888,7 @@ coordinator :: transfer_complete(replicant_state_machine_context* ctx, return; } - fprintf(log, "cannot complete transfer(%" PRIu64 ") because it doesn't exist\n", xid.get()); + rsm_log(ctx, "cannot complete transfer(%" PRIu64 ") because it doesn't exist\n", xid.get()); return generate_response(ctx, COORD_SUCCESS); } @@ -908,7 +896,7 @@ coordinator :: transfer_complete(replicant_state_machine_context* ctx, if (!reg) { - fprintf(log, "cannot complete transfer(%" PRIu64 ") because it doesn't exist\n", xid.get()); + rsm_log(ctx, "cannot complete transfer(%" PRIu64 ") because it doesn't exist\n", xid.get()); INVARIANT_BROKEN("transfer refers to nonexistent region"); return generate_response(ctx, COORD_SUCCESS); } @@ -917,29 +905,29 @@ coordinator :: transfer_complete(replicant_state_machine_context* ctx, reg->replicas[reg->replicas.size() - 2].si == xfer->src && reg->replicas[reg->replicas.size() - 1].si == xfer->dst)) { - fprintf(log, "cannot complete transfer(%" PRIu64 ") because it is not live\n", xid.get()); + rsm_log(ctx, "cannot complete transfer(%" PRIu64 ") because it is not live\n", xid.get()); return generate_response(ctx, COORD_SUCCESS); } del_transfer(xfer->id); - fprintf(log, "transfer(%" PRIu64 ") is complete\n", xid.get()); + rsm_log(ctx, "transfer(%" PRIu64 ") is complete\n", xid.get()); converge_intent(ctx, reg); generate_next_configuration(ctx); return generate_response(ctx, COORD_SUCCESS); } void -coordinator :: config_get(replicant_state_machine_context* ctx) +coordinator :: config_get(rsm_context* ctx) { assert(m_cluster != 0 && m_version != 0); assert(m_latest_config.get()); const char* output = reinterpret_cast(m_latest_config->data()); size_t output_sz = m_latest_config->size(); - replicant_state_machine_set_response(ctx, output, output_sz); + rsm_set_output(ctx, output, output_sz); } void -coordinator :: config_ack(replicant_state_machine_context* ctx, +coordinator :: config_ack(rsm_context* ctx, const server_id& sid, uint64_t version) { m_config_ack_barrier.pass(version, sid); @@ -947,7 +935,7 @@ coordinator :: config_ack(replicant_state_machine_context* ctx, } void -coordinator :: config_stable(replicant_state_machine_context* ctx, +coordinator :: config_stable(rsm_context* ctx, const server_id& sid, uint64_t version) { m_config_stable_barrier.pass(version, sid); @@ -955,19 +943,11 @@ coordinator :: config_stable(replicant_state_machine_context* ctx, } void -coordinator :: checkpoint(replicant_state_machine_context* ctx) +coordinator :: checkpoint(rsm_context* ctx) { - FILE* log = replicant_state_machine_log_stream(ctx); - uint64_t cond_state = 0; - - if (replicant_state_machine_condition_broadcast(ctx, "checkp", &cond_state) < 0) - { - fprintf(log, "could not broadcast on \"checkp\" condition\n"); - } - + rsm_cond_broadcast(ctx, "checkp"); ++m_checkpoint; - fprintf(log, "establishing checkpoint %" PRIu64 "\n", m_checkpoint); - assert(cond_state == m_checkpoint); + rsm_log(ctx, "establishing checkpoint %" PRIu64 "\n", m_checkpoint); assert(m_checkpoint_stable_through <= m_checkpoint); std::vector sids; servers_in_configuration(&sids); @@ -976,7 +956,7 @@ coordinator :: checkpoint(replicant_state_machine_context* ctx) } void -coordinator :: checkpoint_stable(replicant_state_machine_context* ctx, +coordinator :: checkpoint_stable(rsm_context* ctx, const server_id& sid, uint64_t config, uint64_t number) @@ -992,7 +972,7 @@ coordinator :: checkpoint_stable(replicant_state_machine_context* ctx, } void -coordinator :: checkpoints(replicant_state_machine_context* ctx) +coordinator :: checkpoints(rsm_context* ctx) { const size_t sz = sizeof(uint16_t) + 3 * sizeof(uint64_t); m_response.reset(e::buffer::create(sz)); @@ -1001,90 +981,88 @@ coordinator :: checkpoints(replicant_state_machine_context* ctx) ptr = e::pack64be(m_checkpoint, ptr); ptr = e::pack64be(m_checkpoint_stable_through, ptr); ptr = e::pack64be(m_checkpoint_gc_through, ptr); - replicant_state_machine_set_response(ctx, reinterpret_cast(m_response->data()), sz); + rsm_set_output(ctx, reinterpret_cast(m_response->data()), sz); } void -coordinator :: alarm(replicant_state_machine_context* ctx) +coordinator :: periodic(rsm_context* ctx) { - replicant_state_machine_alarm(ctx, "alarm", ALARM_INTERVAL); checkpoint(ctx); } void -coordinator :: debug_dump(replicant_state_machine_context* ctx) +coordinator :: debug_dump(rsm_context* ctx) { - FILE* log = replicant_state_machine_log_stream(ctx); - fprintf(log, "=== begin debug dump ===========================================================\n"); - fprintf(log, "permutation:\n"); + rsm_log(ctx, "=== begin debug dump ===========================================================\n"); + rsm_log(ctx, "permutation:\n"); for (size_t i = 0; i < m_permutation.size(); ++i) { - fprintf(log, " - %" PRIu64 "\n", m_permutation[i].get()); + rsm_log(ctx, " - %" PRIu64 "\n", m_permutation[i].get()); } - fprintf(log, "spares (desire %" PRIu64 "):\n", m_desired_spares); + rsm_log(ctx, "spares (desire %" PRIu64 "):\n", m_desired_spares); for (size_t i = 0; i < m_spares.size(); ++i) { - fprintf(log, " - %" PRIu64 "\n", m_spares[i].get()); + rsm_log(ctx, " - %" PRIu64 "\n", m_spares[i].get()); } - fprintf(log, "intents:\n"); + rsm_log(ctx, "intents:\n"); for (size_t i = 0; i < m_intents.size(); ++i) { - fprintf(log, " - region=%" PRIu64 ", checkpoint=%" PRIu64 " replicas=[", m_intents[i].id.get(), m_intents[i].checkpoint); + rsm_log(ctx, " - region=%" PRIu64 ", checkpoint=%" PRIu64 " replicas=[", m_intents[i].id.get(), m_intents[i].checkpoint); for (size_t j = 0; j < m_intents[i].replicas.size(); ++j) { if (j == 0) { - fprintf(log, "%" PRIu64 "", m_intents[i].replicas[j].get()); + rsm_log(ctx, "%" PRIu64 "", m_intents[i].replicas[j].get()); } else { - fprintf(log, ", %" PRIu64 "", m_intents[i].replicas[j].get()); + rsm_log(ctx, ", %" PRIu64 "", m_intents[i].replicas[j].get()); } } - fprintf(log, "]\n"); + rsm_log(ctx, "]\n"); } - fprintf(log, "transfers:\n"); + rsm_log(ctx, "transfers:\n"); for (size_t i = 0; i < m_transfers.size(); ++i) { - fprintf(log, " - id=%" PRIu64 " rid=%" PRIu64 " src=%" PRIu64 " vsrc=%" PRIu64 " dst=%" PRIu64 " vdst=%" PRIu64 "\n", + rsm_log(ctx, " - id=%" PRIu64 " rid=%" PRIu64 " src=%" PRIu64 " vsrc=%" PRIu64 " dst=%" PRIu64 " vdst=%" PRIu64 "\n", m_transfers[i].id.get(), m_transfers[i].rid.get(), m_transfers[i].src.get(), m_transfers[i].vsrc.get(), m_transfers[i].dst.get(), m_transfers[i].vdst.get()); } - fprintf(log, "offline servers:\n"); + rsm_log(ctx, "offline servers:\n"); for (size_t i = 0; i < m_offline.size(); ++i) { - fprintf(log, " - rid=%" PRIu64 " sid=%" PRIu64 "\n", + rsm_log(ctx, " - rid=%" PRIu64 " sid=%" PRIu64 "\n", m_offline[i].id.get(), m_offline[i].sid.get()); } - fprintf(log, "config ack through: %" PRIu64 "\n", m_config_ack_through); - fprintf(log, "config stable through: %" PRIu64 "\n", m_config_stable_through); - fprintf(log, "checkpoint: latest=%" PRIu64 ", stable=%" PRIu64 ", gc=%" PRIu64 "\n", + rsm_log(ctx, "config ack through: %" PRIu64 "\n", m_config_ack_through); + rsm_log(ctx, "config stable through: %" PRIu64 "\n", m_config_stable_through); + rsm_log(ctx, "checkpoint: latest=%" PRIu64 ", stable=%" PRIu64 ", gc=%" PRIu64 "\n", m_checkpoint, m_checkpoint_stable_through, m_checkpoint_gc_through); - fprintf(log, "=== end debug dump =============================================================\n"); + rsm_log(ctx, "=== end debug dump =============================================================\n"); } coordinator* -coordinator :: recreate(replicant_state_machine_context* ctx, +coordinator :: recreate(rsm_context* ctx, const char* data, size_t data_sz) { std::auto_ptr c(new coordinator()); if (!c.get()) { - fprintf(replicant_state_machine_log_stream(ctx), "memory allocation failed\n"); + rsm_log(ctx, "memory allocation failed\n"); return NULL; } @@ -1107,7 +1085,7 @@ coordinator :: recreate(replicant_state_machine_context* ctx, if (up.error()) { - fprintf(replicant_state_machine_log_stream(ctx), "unpacking failed\n"); + rsm_log(ctx, "unpacking failed\n"); return NULL; } @@ -1160,13 +1138,12 @@ coordinator :: recreate(replicant_state_machine_context* ctx, } c->generate_cached_configuration(ctx); - replicant_state_machine_alarm(ctx, "alarm", ALARM_INTERVAL); return c.release(); } -void -coordinator :: snapshot(replicant_state_machine_context* /*ctx*/, - const char** data, size_t* data_sz) +int +coordinator :: snapshot(rsm_context* /*ctx*/, + char** data, size_t* data_sz) { size_t sz = sizeof(m_cluster) + sizeof(m_counter) @@ -1197,7 +1174,7 @@ coordinator :: snapshot(replicant_state_machine_context* /*ctx*/, } std::auto_ptr buf(e::buffer::create(sz)); - e::buffer::packer pa = buf->pack_at(0); + e::packer pa = buf->pack_at(0); pa = pa << m_cluster << m_counter << m_version << m_flags << m_servers << m_permutation << m_spares << m_desired_spares << m_intents << m_deferred_init << m_offline << m_transfers @@ -1221,6 +1198,8 @@ coordinator :: snapshot(replicant_state_machine_context* /*ctx*/, { memmove(ptr, buf->data(), buf->size()); } + + return 0; } server* @@ -1341,7 +1320,7 @@ compare_space_ptr_by_r_p(const e::compat::shared_ptr& lhs, } // namespace void -coordinator :: rebalance_replica_sets(replicant_state_machine_context* ctx) +coordinator :: rebalance_replica_sets(rsm_context* ctx) { uint64_t R = 0; uint64_t P = 0; @@ -1384,7 +1363,7 @@ coordinator :: rebalance_replica_sets(replicant_state_machine_context* ctx) } void -coordinator :: initial_space_layout(replicant_state_machine_context* ctx, +coordinator :: initial_space_layout(rsm_context* ctx, space* s) { if (m_permutation.empty()) @@ -1429,7 +1408,7 @@ coordinator :: get_region(const region_id& rid) } void -coordinator :: setup_intents(replicant_state_machine_context* ctx, +coordinator :: setup_intents(rsm_context* ctx, const std::vector& replica_sets, space* s, bool skip_transfers) { @@ -1504,7 +1483,7 @@ coordinator :: setup_intents(replicant_state_machine_context* ctx, } void -coordinator :: converge_intent(replicant_state_machine_context* ctx, +coordinator :: converge_intent(rsm_context* ctx, region* reg) { region_intent* ri = get_region_intent(reg->id); @@ -1516,10 +1495,9 @@ coordinator :: converge_intent(replicant_state_machine_context* ctx, } void -coordinator :: converge_intent(replicant_state_machine_context* ctx, +coordinator :: converge_intent(rsm_context* ctx, region* reg, region_intent* ri) { - FILE* log = replicant_state_machine_log_stream(ctx); // if there is a transfer transfer* xfer = get_transfer(reg->id); @@ -1568,13 +1546,13 @@ coordinator :: converge_intent(replicant_state_machine_context* ctx, } else if (reg->replicas.size() == 1) { - fprintf(log, "refusing to remove the last server from " + rsm_log(ctx, "refusing to remove the last server from " "region(%" PRIu64 ") because it was not a clean shutdown\n", reg->id.get()); return; } - fprintf(log, "removing server(%" PRIu64 ") from region(%" PRIu64 ") " + rsm_log(ctx, "removing server(%" PRIu64 ") from region(%" PRIu64 ") " "because it is in state %s\n", reg->replicas[i].si.get(), reg->id.get(), server::to_string(s->state)); @@ -1602,13 +1580,13 @@ coordinator :: converge_intent(replicant_state_machine_context* ctx, { if (reg->replicas.size() == 1) { - fprintf(log, "refusing to remove the last server from " + rsm_log(ctx, "refusing to remove the last server from " "region(%" PRIu64 ") because we need it to transfer data\n", reg->id.get()); return; } - fprintf(log, "removing server(%" PRIu64 ") from region(%" PRIu64 ") " + rsm_log(ctx, "removing server(%" PRIu64 ") from region(%" PRIu64 ") " "to make progress toward desired state\n", reg->replicas[i].si.get(), reg->id.get()); shift_and_pop(i, ®->replicas); @@ -1640,7 +1618,7 @@ coordinator :: converge_intent(replicant_state_machine_context* ctx, { reg->replicas.push_back(replica(m_offline[i].sid, virtual_server_id(m_counter))); ++m_counter; - fprintf(log, "restoring offline server(%" PRIu64 ") to region(%" PRIu64 ")\n", + rsm_log(ctx, "restoring offline server(%" PRIu64 ") to region(%" PRIu64 ")\n", m_offline[i].sid.get(), reg->id.get()); remove_offline(reg->id); break; @@ -1650,7 +1628,7 @@ coordinator :: converge_intent(replicant_state_machine_context* ctx, if (reg->replicas.empty()) { - fprintf(log, "cannot transfer state to new servers in " + rsm_log(ctx, "cannot transfer state to new servers in " "region(%" PRIu64 ") because all servers are offline\n", reg->id.get()); return; @@ -1677,7 +1655,7 @@ coordinator :: converge_intent(replicant_state_machine_context* ctx, xfer = new_transfer(reg, ri->replicas[i]); assert(xfer); - fprintf(log, "adding server(%" PRIu64 ") to region(%" PRIu64 ") " + rsm_log(ctx, "adding server(%" PRIu64 ") to region(%" PRIu64 ") " "copying from server(%" PRIu64 ")/virtual_server(%" PRIu64 ") " "using transfer(%" PRIu64 ")/virtual_server(%" PRIu64 ")\n", xfer->dst.get(), reg->id.get(), @@ -1708,7 +1686,7 @@ coordinator :: converge_intent(replicant_state_machine_context* ctx, if (ri->checkpoint >= m_checkpoint_stable_through) { - fprintf(log, "postponing convergence until after checkpoint %" PRIu64 " is stable\n", ri->checkpoint); + rsm_log(ctx, "postponing convergence until after checkpoint %" PRIu64 " is stable\n", ri->checkpoint); return; } @@ -1720,7 +1698,7 @@ coordinator :: converge_intent(replicant_state_machine_context* ctx, ri->checkpoint = 0; xfer = new_transfer(reg, sid); assert(xfer); - fprintf(log, "rolling server(%" PRIu64 ") to the back of region(%" PRIu64 ") " + rsm_log(ctx, "rolling server(%" PRIu64 ") to the back of region(%" PRIu64 ") " "using transfer(%" PRIu64 ")/virtual_server(%" PRIu64 ")\n", xfer->dst.get(), reg->id.get(), xfer->id.get(), xfer->vdst.get()); @@ -1868,50 +1846,41 @@ coordinator :: del_transfer(const transfer_id& xid) } void -coordinator :: check_ack_condition(replicant_state_machine_context* ctx) +coordinator :: check_ack_condition(rsm_context* ctx) { if (m_config_ack_through < m_config_ack_barrier.min_version()) { - FILE* log = replicant_state_machine_log_stream(ctx); - fprintf(log, "acked through version %" PRIu64 "\n", m_config_ack_barrier.min_version()); + rsm_log(ctx, "acked through version %" PRIu64 "\n", m_config_ack_barrier.min_version()); } while (m_config_ack_through < m_config_ack_barrier.min_version()) { - replicant_state_machine_condition_broadcast(ctx, "ack", &m_config_ack_through); + rsm_cond_broadcast(ctx, "ack"); + ++m_config_ack_through; } } void -coordinator :: check_stable_condition(replicant_state_machine_context* ctx) +coordinator :: check_stable_condition(rsm_context* ctx) { if (m_config_stable_through < m_config_stable_barrier.min_version()) { - FILE* log = replicant_state_machine_log_stream(ctx); - fprintf(log, "stable through version %" PRIu64 "\n", m_config_stable_barrier.min_version()); + rsm_log(ctx, "stable through version %" PRIu64 "\n", m_config_stable_barrier.min_version()); } while (m_intents.empty() && m_deferred_init.empty() && m_config_stable_through < m_config_stable_barrier.min_version()) { - replicant_state_machine_condition_broadcast(ctx, "stable", &m_config_stable_through); + rsm_cond_broadcast(ctx, "stable"); + ++m_config_stable_through; } } void -coordinator :: generate_next_configuration(replicant_state_machine_context* ctx) +coordinator :: generate_next_configuration(rsm_context* ctx) { - FILE* log = replicant_state_machine_log_stream(ctx); - uint64_t cond_state; - - if (replicant_state_machine_condition_broadcast(ctx, "config", &cond_state) < 0) - { - fprintf(log, "could not broadcast on \"config\" condition\n"); - } - ++m_version; - fprintf(log, "issuing new configuration version %" PRIu64 "\n", m_version); - assert(cond_state == m_version); + rsm_log(ctx, "issuing new configuration version %" PRIu64 "\n", m_version); std::vector sids; servers_in_configuration(&sids); m_config_ack_barrier.new_version(m_version, sids); @@ -1919,10 +1888,11 @@ coordinator :: generate_next_configuration(replicant_state_machine_context* ctx) check_ack_condition(ctx); check_stable_condition(ctx); generate_cached_configuration(ctx); + rsm_cond_broadcast_data(ctx, "config", m_latest_config->cdata(), m_latest_config->size()); } void -coordinator :: generate_cached_configuration(replicant_state_machine_context*) +coordinator :: generate_cached_configuration(rsm_context*) { m_latest_config.reset(); size_t sz = 7 * sizeof(uint64_t); @@ -1947,7 +1917,7 @@ coordinator :: generate_cached_configuration(replicant_state_machine_context*) prioritized_transfer_subset(&transfers_subset); std::auto_ptr new_config(e::buffer::create(sz)); - e::buffer::packer pa = new_config->pack_at(0); + e::packer pa = new_config->pack_at(0); pa = pa << m_cluster << m_version << m_flags << uint64_t(m_servers.size()) << uint64_t(m_spaces.size()) @@ -2081,14 +2051,13 @@ coordinator :: regions_in_space(space_ptr s, std::vector* rids) } void -coordinator :: check_checkpoint_stable_condition(replicant_state_machine_context* ctx, bool reissue) +coordinator :: check_checkpoint_stable_condition(rsm_context* ctx, bool reissue) { - FILE* log = replicant_state_machine_log_stream(ctx); assert(m_checkpoint_stable_through <= m_checkpoint); if (m_checkpoint_stable_through < m_checkpoint_stable_barrier.min_version()) { - fprintf(log, "checkpoint %" PRIu64 " done\n", m_checkpoint_stable_barrier.min_version()); + rsm_log(ctx, "checkpoint %" PRIu64 " done\n", m_checkpoint_stable_barrier.min_version()); } bool stabilized = false; @@ -2096,7 +2065,8 @@ coordinator :: check_checkpoint_stable_condition(replicant_state_machine_context while (m_checkpoint_stable_through < m_checkpoint_stable_barrier.min_version()) { stabilized = true; - replicant_state_machine_condition_broadcast(ctx, "checkps", &m_checkpoint_stable_through); + rsm_cond_broadcast(ctx, "checkps"); + ++m_checkpoint_stable_through; } bool gc = false; @@ -2124,12 +2094,13 @@ coordinator :: check_checkpoint_stable_condition(replicant_state_machine_context while (m_checkpoint_gc_through + outstanding_checkpoints < m_checkpoint_stable_barrier.min_version()) { gc = true; - replicant_state_machine_condition_broadcast(ctx, "checkpgc", &m_checkpoint_gc_through); + rsm_cond_broadcast(ctx, "checkpgc"); + ++m_checkpoint_gc_through; } if (gc && m_checkpoint_gc_through > 0) { - fprintf(log, "garbage collect <= checkpoint %" PRIu64 "\n", m_checkpoint_gc_through); + rsm_log(ctx, "garbage collect <= checkpoint %" PRIu64 "\n", m_checkpoint_gc_through); } assert(m_checkpoint_gc_through <= m_checkpoint_stable_through); diff --git a/coordinator/coordinator.h b/coordinator/coordinator.h index 763357b8..2f90bd84 100644 --- a/coordinator/coordinator.h +++ b/coordinator/coordinator.h @@ -38,7 +38,7 @@ #include // Replicant -#include +#include // HyperDex #include "namespace.h" @@ -61,87 +61,87 @@ class coordinator // identity public: - void init(replicant_state_machine_context* ctx, uint64_t token); + void init(rsm_context* ctx, uint64_t token); uint64_t cluster() const { return m_cluster; } // cluster management public: - void read_only(replicant_state_machine_context* ctx, bool ro); - void fault_tolerance(replicant_state_machine_context* ctx, + void read_only(rsm_context* ctx, bool ro); + void fault_tolerance(rsm_context* ctx, const char* space, uint64_t consistency); // server management public: - void server_register(replicant_state_machine_context* ctx, + void server_register(rsm_context* ctx, const server_id& sid, const po6::net::location& bind_to); - void server_online(replicant_state_machine_context* ctx, + void server_online(rsm_context* ctx, const server_id& sid, const po6::net::location* bind_to); - void server_offline(replicant_state_machine_context* ctx, + void server_offline(rsm_context* ctx, const server_id& sid); - void server_shutdown(replicant_state_machine_context* ctx, + void server_shutdown(rsm_context* ctx, const server_id& sid); - void server_kill(replicant_state_machine_context* ctx, + void server_kill(rsm_context* ctx, const server_id& sid); - void server_forget(replicant_state_machine_context* ctx, + void server_forget(rsm_context* ctx, const server_id& sid); - void server_suspect(replicant_state_machine_context* ctx, + void server_suspect(rsm_context* ctx, const server_id& sid); - void report_disconnect(replicant_state_machine_context* ctx, + void report_disconnect(rsm_context* ctx, const server_id& sid, uint64_t version); // space management public: - void space_add(replicant_state_machine_context* ctx, const space& s); - void space_rm(replicant_state_machine_context* ctx, const char* name); - void space_mv(replicant_state_machine_context* ctx, const char* src, const char* dst); + void space_add(rsm_context* ctx, const space& s); + void space_rm(rsm_context* ctx, const char* name); + void space_mv(rsm_context* ctx, const char* src, const char* dst); // index management public: - void index_add(replicant_state_machine_context* ctx, const char* space, const char* attr); - void index_rm(replicant_state_machine_context* ctx, index_id ii); + void index_add(rsm_context* ctx, const char* space, const char* attr); + void index_rm(rsm_context* ctx, index_id ii); // transfers management public: - void transfer_go_live(replicant_state_machine_context* ctx, + void transfer_go_live(rsm_context* ctx, uint64_t version, const transfer_id& xid); - void transfer_complete(replicant_state_machine_context* ctx, + void transfer_complete(rsm_context* ctx, uint64_t version, const transfer_id& xid); // config management public: - void config_get(replicant_state_machine_context* ctx); - void config_ack(replicant_state_machine_context* ctx, + void config_get(rsm_context* ctx); + void config_ack(rsm_context* ctx, const server_id& sid, uint64_t version); - void config_stable(replicant_state_machine_context* ctx, + void config_stable(rsm_context* ctx, const server_id& sid, uint64_t version); // checkpoint management public: - void checkpoint(replicant_state_machine_context* ctx); - void checkpoint_stable(replicant_state_machine_context* ctx, + void checkpoint(rsm_context* ctx); + void checkpoint_stable(rsm_context* ctx, const server_id& sid, uint64_t config, uint64_t number); - void checkpoints(replicant_state_machine_context* ctx); + void checkpoints(rsm_context* ctx); - // alarm + // periodic public: - void alarm(replicant_state_machine_context* ctx); + void periodic(rsm_context* ctx); // debug public: - void debug_dump(replicant_state_machine_context* ctx); + void debug_dump(rsm_context* ctx); // backup/restore public: - static coordinator* recreate(replicant_state_machine_context* ctx, + static coordinator* recreate(rsm_context* ctx, const char* data, size_t data_sz); - void snapshot(replicant_state_machine_context* ctx, - const char** data, size_t* data_sz); + int snapshot(rsm_context* ctx, + char** data, size_t* data_sz); private: typedef e::compat::shared_ptr space_ptr; @@ -157,21 +157,21 @@ class coordinator bool in_permutation(const server_id& sid); void add_permutation(const server_id& sid); void remove_permutation(const server_id& sid); - void rebalance_replica_sets(replicant_state_machine_context* ctx); + void rebalance_replica_sets(rsm_context* ctx); // hyperspace - void initial_space_layout(replicant_state_machine_context* ctx, + void initial_space_layout(rsm_context* ctx, space* s); region* get_region(const region_id& rid); // intents - void setup_intents(replicant_state_machine_context* ctx, + void setup_intents(rsm_context* ctx, const std::vector& replica_sets, space* s, bool skip_transfers); // looks up region_intent* ri, removing any possibility of the user // using an invalid pointer - void converge_intent(replicant_state_machine_context* ctx, + void converge_intent(rsm_context* ctx, region* reg); // ri and m_intents may be changed after this call - void converge_intent(replicant_state_machine_context* ctx, + void converge_intent(rsm_context* ctx, region* reg, region_intent* ri); region_intent* new_region_intent(const region_id& rid); region_intent* get_region_intent(const region_id& rid); @@ -184,15 +184,15 @@ class coordinator transfer* get_transfer(const transfer_id& xid); void del_transfer(const transfer_id& xid); // configuration - void check_ack_condition(replicant_state_machine_context* ctx); - void check_stable_condition(replicant_state_machine_context* ctx); - void generate_next_configuration(replicant_state_machine_context* ctx); - void generate_cached_configuration(replicant_state_machine_context* ctx); + void check_ack_condition(rsm_context* ctx); + void check_stable_condition(rsm_context* ctx); + void generate_next_configuration(rsm_context* ctx); + void generate_cached_configuration(rsm_context* ctx); void prioritized_transfer_subset(std::vector* transfers); void servers_in_configuration(std::vector* sids); void regions_in_space(space_ptr s, std::vector* rids); // checkpoints - void check_checkpoint_stable_condition(replicant_state_machine_context* ctx, bool reissue); + void check_checkpoint_stable_condition(rsm_context* ctx, bool reissue); private: // meta state diff --git a/coordinator/offline_server.h b/coordinator/offline_server.h index 3857c7dd..ded70552 100644 --- a/coordinator/offline_server.h +++ b/coordinator/offline_server.h @@ -51,8 +51,8 @@ pack_size(const offline_server& os) return pack_size(os.id) + pack_size(os.sid); } -inline e::buffer::packer -operator << (e::buffer::packer pa, const offline_server& os) +inline e::packer +operator << (e::packer pa, const offline_server& os) { return pa << os.id << os.sid; } diff --git a/coordinator/region_intent.h b/coordinator/region_intent.h index 11a953e8..270dc3ba 100644 --- a/coordinator/region_intent.h +++ b/coordinator/region_intent.h @@ -55,8 +55,8 @@ pack_size(const region_intent& ri) return pack_size(ri.id) + pack_size(ri.replicas) + sizeof(ri.checkpoint); } -inline e::buffer::packer -operator << (e::buffer::packer pa, const region_intent& ri) +inline e::packer +operator << (e::packer pa, const region_intent& ri) { return pa << ri.id << ri.replicas << ri.checkpoint; } diff --git a/coordinator/server_barrier.h b/coordinator/server_barrier.h index e0e5b9e6..38a2ae21 100644 --- a/coordinator/server_barrier.h +++ b/coordinator/server_barrier.h @@ -57,7 +57,7 @@ class server_barrier private: friend size_t pack_size(const server_barrier& ri); - friend e::buffer::packer operator << (e::buffer::packer pa, const server_barrier& ri); + friend e::packer operator << (e::packer pa, const server_barrier& ri); friend e::unpacker operator >> (e::unpacker up, server_barrier& ri); typedef std::pair > version_t; typedef std::list version_list_t; @@ -79,8 +79,8 @@ pack_size(const server_barrier& ri) return sz; } -inline e::buffer::packer -operator << (e::buffer::packer pa, const server_barrier& ri) +inline e::packer +operator << (e::packer pa, const server_barrier& ri) { typedef server_barrier::version_list_t version_list_t; uint32_t x = ri.m_versions.size(); diff --git a/coordinator/symtable.c b/coordinator/symtable.c index 4433c04b..ae124b71 100644 --- a/coordinator/symtable.c +++ b/coordinator/symtable.c @@ -27,7 +27,7 @@ */ /* Replicant */ -#include +#include /* HyperDex */ #include "visibility.h" @@ -36,10 +36,9 @@ #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-pedantic" -struct replicant_state_machine HYPERDEX_API rsm = { +struct state_machine HYPERDEX_API rsm = { hyperdex_coordinator_create, hyperdex_coordinator_recreate, - hyperdex_coordinator_destroy, hyperdex_coordinator_snapshot, {{"config_get", hyperdex_coordinator_config_get}, {"config_ack", hyperdex_coordinator_config_ack}, @@ -60,7 +59,7 @@ struct replicant_state_machine HYPERDEX_API rsm = { {"transfer_go_live", hyperdex_coordinator_transfer_go_live}, {"transfer_complete", hyperdex_coordinator_transfer_complete}, {"checkpoint_stable", hyperdex_coordinator_checkpoint_stable}, - {"alarm", hyperdex_coordinator_alarm}, + {"periodic", hyperdex_coordinator_periodic}, {"read_only", hyperdex_coordinator_read_only}, {"fault_tolerance", hyperdex_coordinator_fault_tolerance}, {"checkpoints", hyperdex_coordinator_checkpoints}, diff --git a/coordinator/transitions.cc b/coordinator/transitions.cc index 9c287c69..70ff2708 100644 --- a/coordinator/transitions.cc +++ b/coordinator/transitions.cc @@ -41,25 +41,13 @@ using namespace hyperdex; -#define PROTECT_NULL \ - do \ - { \ - if (!obj) \ - { \ - fprintf(replicant_state_machine_log_stream(ctx), "cannot operate on NULL object\n"); \ - return generate_response(ctx, hyperdex::COORD_UNINITIALIZED); \ - } \ - } \ - while (0) - #define PROTECT_UNINITIALIZED \ + coordinator* c = static_cast(obj); \ do \ { \ - PROTECT_NULL; \ - coordinator* c = static_cast(obj); \ if (c->cluster() == 0) \ { \ - fprintf(replicant_state_machine_log_stream(ctx), "cluster not initialized\n"); \ + rsm_log(ctx, "cluster not initialized\n"); \ return generate_response(ctx, hyperdex::COORD_UNINITIALIZED); \ } \ } \ @@ -70,7 +58,7 @@ using namespace hyperdex; { \ if (up.error() || up.remain()) \ { \ - fprintf(log, "received malformed \"" #MSGTYPE "\" message\n"); \ + rsm_log(ctx, "received malformed \"" #MSGTYPE "\" message\n"); \ return generate_response(ctx, hyperdex::COORD_MALFORMED); \ } \ } while (0) @@ -79,84 +67,36 @@ extern "C" { void* -hyperdex_coordinator_create(struct replicant_state_machine_context* ctx) +hyperdex_coordinator_create(struct rsm_context* ctx) { - if (replicant_state_machine_condition_create(ctx, "config") < 0) - { - fprintf(replicant_state_machine_log_stream(ctx), "could not create condition \"config\"\n"); - return NULL; - } - - if (replicant_state_machine_condition_create(ctx, "ack") < 0) - { - fprintf(replicant_state_machine_log_stream(ctx), "could not create condition \"ack\"\n"); - return NULL; - } - - if (replicant_state_machine_condition_create(ctx, "stable") < 0) - { - fprintf(replicant_state_machine_log_stream(ctx), "could not create condition \"stable\"\n"); - return NULL; - } - - if (replicant_state_machine_condition_create(ctx, "checkp") < 0) - { - fprintf(replicant_state_machine_log_stream(ctx), "could not create condition \"checkp\"\n"); - return NULL; - } - - if (replicant_state_machine_condition_create(ctx, "checkps") < 0) - { - fprintf(replicant_state_machine_log_stream(ctx), "could not create condition \"checkps\"\n"); - return NULL; - } - - if (replicant_state_machine_condition_create(ctx, "checkpgc") < 0) - { - fprintf(replicant_state_machine_log_stream(ctx), "could not create condition \"checkpgc\"\n"); - return NULL; - } - - coordinator* c = new (std::nothrow) coordinator(); - - if (!c) - { - fprintf(replicant_state_machine_log_stream(ctx), "memory allocation failed\n"); - } - - return c; + rsm_cond_create(ctx, "config"); + rsm_cond_create(ctx, "ack"); + rsm_cond_create(ctx, "stable"); + rsm_cond_create(ctx, "checkp"); + rsm_cond_create(ctx, "checkps"); + rsm_cond_create(ctx, "checkpgc"); + return new (std::nothrow) coordinator(); } void* -hyperdex_coordinator_recreate(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_recreate(struct rsm_context* ctx, const char* data, size_t data_sz) { return coordinator::recreate(ctx, data, data_sz); } -void -hyperdex_coordinator_destroy(struct replicant_state_machine_context*, void* obj) +int +hyperdex_coordinator_snapshot(struct rsm_context* ctx, + void* obj, char** data, size_t* data_sz) { - if (obj) - { - delete static_cast(obj); - } -} - -void -hyperdex_coordinator_snapshot(struct replicant_state_machine_context* ctx, - void* obj, const char** data, size_t* data_sz) -{ - PROTECT_NULL; coordinator* c = static_cast(obj); - c->snapshot(ctx, data, data_sz); + return c->snapshot(ctx, data, data_sz); } void -hyperdex_coordinator_init(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_init(struct rsm_context* ctx, void* obj, const char* data, size_t data_sz) { - PROTECT_NULL; coordinator* c = static_cast(obj); std::string id(data, data_sz); @@ -165,12 +105,10 @@ hyperdex_coordinator_init(struct replicant_state_machine_context* ctx, } void -hyperdex_coordinator_read_only(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_read_only(struct rsm_context* ctx, void* obj, const char* data, size_t data_sz) { PROTECT_UNINITIALIZED; - FILE* log = replicant_state_machine_log_stream(ctx); - coordinator* c = static_cast(obj); uint8_t set; e::unpacker up(data, data_sz); up = up >> set; @@ -179,12 +117,10 @@ hyperdex_coordinator_read_only(struct replicant_state_machine_context* ctx, } void -hyperdex_coordinator_fault_tolerance(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_fault_tolerance(struct rsm_context* ctx, void* obj, const char* data, size_t data_sz) { PROTECT_UNINITIALIZED; - FILE* log = replicant_state_machine_log_stream(ctx); - coordinator* c = static_cast(obj); uint64_t ft; const char* ft_ptr = data + data_sz - sizeof(uint64_t); e::unpacker up(ft_ptr, sizeof(uint64_t)); @@ -194,21 +130,18 @@ hyperdex_coordinator_fault_tolerance(struct replicant_state_machine_context* ctx } void -hyperdex_coordinator_config_get(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_config_get(struct rsm_context* ctx, void* obj, const char*, size_t) { PROTECT_UNINITIALIZED; - coordinator* c = static_cast(obj); c->config_get(ctx); } void -hyperdex_coordinator_config_ack(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_config_ack(struct rsm_context* ctx, void* obj, const char* data, size_t data_sz) { PROTECT_UNINITIALIZED; - FILE* log = replicant_state_machine_log_stream(ctx); - coordinator* c = static_cast(obj); server_id sid; uint64_t version; e::unpacker up(data, data_sz); @@ -218,12 +151,10 @@ hyperdex_coordinator_config_ack(struct replicant_state_machine_context* ctx, } void -hyperdex_coordinator_config_stable(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_config_stable(struct rsm_context* ctx, void* obj, const char* data, size_t data_sz) { PROTECT_UNINITIALIZED; - FILE* log = replicant_state_machine_log_stream(ctx); - coordinator* c = static_cast(obj); server_id sid; uint64_t version; e::unpacker up(data, data_sz); @@ -233,12 +164,10 @@ hyperdex_coordinator_config_stable(struct replicant_state_machine_context* ctx, } void -hyperdex_coordinator_server_register(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_server_register(struct rsm_context* ctx, void* obj, const char* data, size_t data_sz) { PROTECT_UNINITIALIZED; - FILE* log = replicant_state_machine_log_stream(ctx); - coordinator* c = static_cast(obj); server_id sid; po6::net::location bind_to; e::unpacker up(data, data_sz); @@ -248,12 +177,10 @@ hyperdex_coordinator_server_register(struct replicant_state_machine_context* ctx } void -hyperdex_coordinator_server_online(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_server_online(struct rsm_context* ctx, void* obj, const char* data, size_t data_sz) { PROTECT_UNINITIALIZED; - FILE* log = replicant_state_machine_log_stream(ctx); - coordinator* c = static_cast(obj); server_id sid; po6::net::location bind_to; e::unpacker up(data, data_sz); @@ -272,12 +199,10 @@ hyperdex_coordinator_server_online(struct replicant_state_machine_context* ctx, } void -hyperdex_coordinator_server_offline(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_server_offline(struct rsm_context* ctx, void* obj, const char* data, size_t data_sz) { PROTECT_UNINITIALIZED; - FILE* log = replicant_state_machine_log_stream(ctx); - coordinator* c = static_cast(obj); server_id sid; e::unpacker up(data, data_sz); up = up >> sid; @@ -286,12 +211,10 @@ hyperdex_coordinator_server_offline(struct replicant_state_machine_context* ctx, } void -hyperdex_coordinator_server_shutdown(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_server_shutdown(struct rsm_context* ctx, void* obj, const char* data, size_t data_sz) { PROTECT_UNINITIALIZED; - FILE* log = replicant_state_machine_log_stream(ctx); - coordinator* c = static_cast(obj); server_id sid; e::unpacker up(data, data_sz); up = up >> sid; @@ -300,12 +223,10 @@ hyperdex_coordinator_server_shutdown(struct replicant_state_machine_context* ctx } void -hyperdex_coordinator_server_kill(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_server_kill(struct rsm_context* ctx, void* obj, const char* data, size_t data_sz) { PROTECT_UNINITIALIZED; - FILE* log = replicant_state_machine_log_stream(ctx); - coordinator* c = static_cast(obj); server_id sid; e::unpacker up(data, data_sz); up = up >> sid; @@ -314,12 +235,10 @@ hyperdex_coordinator_server_kill(struct replicant_state_machine_context* ctx, } void -hyperdex_coordinator_server_forget(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_server_forget(struct rsm_context* ctx, void* obj, const char* data, size_t data_sz) { PROTECT_UNINITIALIZED; - FILE* log = replicant_state_machine_log_stream(ctx); - coordinator* c = static_cast(obj); server_id sid; e::unpacker up(data, data_sz); up = up >> sid; @@ -328,12 +247,10 @@ hyperdex_coordinator_server_forget(struct replicant_state_machine_context* ctx, } void -hyperdex_coordinator_server_suspect(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_server_suspect(struct rsm_context* ctx, void* obj, const char* data, size_t data_sz) { PROTECT_UNINITIALIZED; - FILE* log = replicant_state_machine_log_stream(ctx); - coordinator* c = static_cast(obj); server_id sid; e::unpacker up(data, data_sz); up = up >> sid; @@ -342,12 +259,10 @@ hyperdex_coordinator_server_suspect(struct replicant_state_machine_context* ctx, } void -hyperdex_coordinator_report_disconnect(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_report_disconnect(struct rsm_context* ctx, void* obj, const char* data, size_t data_sz) { PROTECT_UNINITIALIZED; - FILE* log = replicant_state_machine_log_stream(ctx); - coordinator* c = static_cast(obj); server_id sid; uint64_t version; e::unpacker up(data, data_sz); @@ -357,12 +272,10 @@ hyperdex_coordinator_report_disconnect(struct replicant_state_machine_context* c } void -hyperdex_coordinator_space_add(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_space_add(struct rsm_context* ctx, void* obj, const char* data, size_t data_sz) { PROTECT_UNINITIALIZED; - FILE* log = replicant_state_machine_log_stream(ctx); - coordinator* c = static_cast(obj); space s; e::unpacker up(data, data_sz); up = up >> s; @@ -371,16 +284,14 @@ hyperdex_coordinator_space_add(struct replicant_state_machine_context* ctx, } void -hyperdex_coordinator_space_rm(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_space_rm(struct rsm_context* ctx, void* obj, const char* data, size_t data_sz) { PROTECT_UNINITIALIZED; - FILE* log = replicant_state_machine_log_stream(ctx); - coordinator* c = static_cast(obj); if (data_sz == 0 || data[data_sz - 1] != '\0') { - fprintf(log, "received malformed \"rm_space\" message\n"); + rsm_log(ctx, "received malformed \"rm_space\" message\n"); return generate_response(ctx, COORD_MALFORMED); } @@ -388,16 +299,14 @@ hyperdex_coordinator_space_rm(struct replicant_state_machine_context* ctx, } void -hyperdex_coordinator_space_mv(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_space_mv(struct rsm_context* ctx, void* obj, const char* data, size_t data_sz) { PROTECT_UNINITIALIZED; - FILE* log = replicant_state_machine_log_stream(ctx); - coordinator* c = static_cast(obj); if (data_sz < 2 || data[data_sz - 1] != '\0') { - fprintf(log, "received malformed \"mv_space\" message\n"); + rsm_log(ctx, "received malformed \"mv_space\" message\n"); return generate_response(ctx, COORD_MALFORMED); } @@ -406,7 +315,7 @@ hyperdex_coordinator_space_mv(struct replicant_state_machine_context* ctx, if (dst >= data + data_sz) { - fprintf(log, "received malformed \"mv_space\" message\n"); + rsm_log(ctx, "received malformed \"mv_space\" message\n"); return generate_response(ctx, COORD_MALFORMED); } @@ -414,18 +323,16 @@ hyperdex_coordinator_space_mv(struct replicant_state_machine_context* ctx, } void -hyperdex_coordinator_index_add(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_index_add(struct rsm_context* ctx, void* obj, const char* data, size_t data_sz) { PROTECT_UNINITIALIZED; - FILE* log = replicant_state_machine_log_stream(ctx); - coordinator* c = static_cast(obj); const char* space = data; size_t space_sz = strnlen(data, data_sz); if (data_sz == 0 || data[data_sz - 1] != '\0' || space_sz + 2 >= data_sz) { - fprintf(log, "received malformed \"add_index\" message\n"); + rsm_log(ctx, "received malformed \"add_index\" message\n"); return generate_response(ctx, COORD_MALFORMED); } @@ -434,7 +341,7 @@ hyperdex_coordinator_index_add(struct replicant_state_machine_context* ctx, if (space_sz + attr_sz + 2 != data_sz) { - fprintf(log, "received malformed \"add_index\" message\n"); + rsm_log(ctx, "received malformed \"add_index\" message\n"); return generate_response(ctx, COORD_MALFORMED); } @@ -442,12 +349,10 @@ hyperdex_coordinator_index_add(struct replicant_state_machine_context* ctx, } void -hyperdex_coordinator_index_rm(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_index_rm(struct rsm_context* ctx, void* obj, const char* data, size_t data_sz) { PROTECT_UNINITIALIZED; - FILE* log = replicant_state_machine_log_stream(ctx); - coordinator* c = static_cast(obj); index_id ii; e::unpacker up(data, data_sz); up = up >> ii; @@ -456,12 +361,10 @@ hyperdex_coordinator_index_rm(struct replicant_state_machine_context* ctx, } void -hyperdex_coordinator_transfer_go_live(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_transfer_go_live(struct rsm_context* ctx, void* obj, const char* data, size_t data_sz) { PROTECT_UNINITIALIZED; - FILE* log = replicant_state_machine_log_stream(ctx); - coordinator* c = static_cast(obj); transfer_id xid; uint64_t version; e::unpacker up(data, data_sz); @@ -471,12 +374,10 @@ hyperdex_coordinator_transfer_go_live(struct replicant_state_machine_context* ct } void -hyperdex_coordinator_transfer_complete(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_transfer_complete(struct rsm_context* ctx, void* obj, const char* data, size_t data_sz) { PROTECT_UNINITIALIZED; - FILE* log = replicant_state_machine_log_stream(ctx); - coordinator* c = static_cast(obj); transfer_id xid; uint64_t version; e::unpacker up(data, data_sz); @@ -486,12 +387,10 @@ hyperdex_coordinator_transfer_complete(struct replicant_state_machine_context* c } void -hyperdex_coordinator_checkpoint_stable(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_checkpoint_stable(struct rsm_context* ctx, void* obj, const char* data, size_t data_sz) { PROTECT_UNINITIALIZED; - FILE* log = replicant_state_machine_log_stream(ctx); - coordinator* c = static_cast(obj); server_id sid; uint64_t config; uint64_t checkpoint; @@ -502,29 +401,26 @@ hyperdex_coordinator_checkpoint_stable(struct replicant_state_machine_context* c } void -hyperdex_coordinator_checkpoints(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_checkpoints(struct rsm_context* ctx, void* obj, const char*, size_t) { PROTECT_UNINITIALIZED; - coordinator* c = static_cast(obj); c->checkpoints(ctx); } void -hyperdex_coordinator_alarm(struct replicant_state_machine_context* ctx, - void* obj, const char*, size_t) +hyperdex_coordinator_periodic(struct rsm_context* ctx, + void* obj, const char*, size_t) { PROTECT_UNINITIALIZED; - coordinator* c = static_cast(obj); - c->alarm(ctx); + c->periodic(ctx); } void -hyperdex_coordinator_debug_dump(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_debug_dump(struct rsm_context* ctx, void* obj, const char*, size_t) { PROTECT_UNINITIALIZED; - coordinator* c = static_cast(obj); c->debug_dump(ctx); } diff --git a/coordinator/transitions.h b/coordinator/transitions.h index ae9b22c2..08199fc3 100644 --- a/coordinator/transitions.h +++ b/coordinator/transitions.h @@ -34,25 +34,21 @@ extern "C" #endif /* __cplusplus */ /* Replicant */ -#include +#include void* -hyperdex_coordinator_create(struct replicant_state_machine_context* ctx); +hyperdex_coordinator_create(struct rsm_context* ctx); void* -hyperdex_coordinator_recreate(struct replicant_state_machine_context* ctx, +hyperdex_coordinator_recreate(struct rsm_context* ctx, const char* data, size_t data_sz); -void -hyperdex_coordinator_destroy(struct replicant_state_machine_context* ctx, - void* f); - -void -hyperdex_coordinator_snapshot(struct replicant_state_machine_context* ctx, - void* obj, const char** data, size_t* sz); +int +hyperdex_coordinator_snapshot(struct rsm_context* ctx, + void* obj, char** data, size_t* sz); #define TRANSITION(X) void \ - hyperdex_coordinator_ ## X(struct replicant_state_machine_context* ctx, \ + hyperdex_coordinator_ ## X(struct rsm_context* ctx, \ void* obj, const char* data, size_t data_sz) TRANSITION(init); @@ -86,7 +82,7 @@ TRANSITION(transfer_complete); TRANSITION(checkpoint_stable); TRANSITION(checkpoints); -TRANSITION(alarm); +TRANSITION(periodic); TRANSITION(debug_dump); diff --git a/coordinator/util.h b/coordinator/util.h index 568e9ba2..72ccda06 100644 --- a/coordinator/util.h +++ b/coordinator/util.h @@ -35,7 +35,7 @@ BEGIN_HYPERDEX_NAMESPACE static inline void -generate_response(replicant_state_machine_context* ctx, coordinator_returncode x) +generate_response(rsm_context* ctx, coordinator_returncode x) { const char* ptr = NULL; @@ -64,11 +64,11 @@ generate_response(replicant_state_machine_context* ctx, coordinator_returncode x break; } - replicant_state_machine_set_response(ctx, ptr, 2); + rsm_set_output(ctx, ptr, 2); } #define INVARIANT_BROKEN(X) \ - fprintf(log, "invariant broken at " __FILE__ ":%d: %s\n", __LINE__, X "\n") + rsm_log(ctx, "invariant broken at " __FILE__ ":%d: %s\n", __LINE__, X "\n") END_HYPERDEX_NAMESPACE diff --git a/daemon/coordinator_link_wrapper.cc b/daemon/coordinator_link_wrapper.cc index 76976fe7..2d4833c8 100644 --- a/daemon/coordinator_link_wrapper.cc +++ b/daemon/coordinator_link_wrapper.cc @@ -59,7 +59,7 @@ class coordinator_link_wrapper::coord_rpc public: replicant_returncode status; - const char* output; + char* output; size_t output_sz; std::ostringstream msg; @@ -89,7 +89,7 @@ coordinator_link_wrapper :: coord_rpc :: ~coord_rpc() throw () { if (output) { - replicant_destroy_output(output, output_sz); + free(output); } } @@ -98,9 +98,9 @@ coordinator_link_wrapper :: coord_rpc :: callback(coordinator_link_wrapper* clw) { if (status != REPLICANT_SUCCESS) { - e::error err = clw->m_coord->error(); LOG(ERROR) << "coordinator error: " << msg.str() - << ": " << err.msg() << " @ " << err.loc(); + << ": " << clw->m_coord->error_message() + << " @ " << clw->m_coord->error_location(); } if (status == REPLICANT_CLUSTER_JUMP) @@ -173,7 +173,7 @@ bool coordinator_link_wrapper :: register_id(server_id us, const po6::net::location& bind_to) { std::auto_ptr buf(e::buffer::create(sizeof(uint64_t) + pack_size(bind_to))); - e::buffer::packer pa = buf->pack_at(0); + e::packer pa = buf->pack_at(0); pa = pa << us << bind_to; std::auto_ptr rpc(new coord_rpc()); int64_t rid = m_coord->rpc("server_register", @@ -184,18 +184,20 @@ coordinator_link_wrapper :: register_id(server_id us, const po6::net::location& if (rid < 0) { - e::error err = m_coord->error(); - LOG(ERROR) << "could not register as " << us << ": " << err.msg() << " @ " << err.loc(); + LOG(ERROR) << "could not register as " << us + << ": " << m_coord->error_message() + << " @ " << m_coord->error_location(); return false; } replicant_returncode lrc = REPLICANT_GARBAGE; - int64_t lid = m_coord->loop(-1, &lrc); + int64_t lid = m_coord->wait(rid, -1, &lrc); if (lid < 0) { - e::error err = m_coord->error(); - LOG(ERROR) << "could not register as " << us << ": " << err.msg() << " @ " << err.loc(); + LOG(ERROR) << "could not register as " << us + << ": " << m_coord->error_message() + << " @ " << m_coord->error_location(); return false; } @@ -207,8 +209,9 @@ coordinator_link_wrapper :: register_id(server_id us, const po6::net::location& if (rpc->status != REPLICANT_SUCCESS) { - e::error err = m_coord->error(); - LOG(ERROR) << "could not register as " << us << ": " << err.msg() << " @ " << err.loc(); + LOG(ERROR) << "could not register as " << us + << ": " << m_coord->error_message() + << " @ " << m_coord->error_location(); return false; } @@ -221,7 +224,7 @@ coordinator_link_wrapper :: register_id(server_id us, const po6::net::location& switch (rc) { case COORD_SUCCESS: - return true; + break; case COORD_DUPLICATE: LOG(ERROR) << "could not register as " << us << ": another server has this ID"; return false; @@ -241,6 +244,19 @@ coordinator_link_wrapper :: register_id(server_id us, const po6::net::location& LOG(ERROR) << "could not register as " << us << ": coordinator returned invalid message"; return false; } + + while (!m_coord->config()->exists(us)) + { + lid = m_coord->wait(INT64_MAX, -1, &lrc); + + if (lid < 0) + { + LOG(ERROR) << "could not register as " << us << ": coordinator loop malfunction"; + return false; + } + } + + return true; } bool @@ -253,42 +269,30 @@ coordinator_link_wrapper :: initialize_checkpoints(uint64_t* _checkpoint, if (id < 0) { - e::error err = m_coord->error(); - LOG(ERROR) << "could not retrieve checkpoints: " << err.msg() << " @ " << err.loc(); + LOG(ERROR) << "could not retrieve checkpoints: " + << m_coord->error_message() + << " @ " << m_coord->error_location(); return false; } replicant_returncode lrc = REPLICANT_GARBAGE; - std::vector irrelevant_for_now; + int64_t lid = m_coord->wait(id, -1, &lrc); - while (true) + if (lid < 0) { - int64_t lid = m_coord->loop(-1, &lrc); - - if (lid < 0) - { - e::error err = m_coord->error(); - LOG(ERROR) << "could not retrieve checkpoints: " << err.msg() << " @ " << err.loc(); - return false; - } - - if (lid == id) - { - break; - } - - irrelevant_for_now.push_back(lid); + LOG(ERROR) << "could not retrieve checkpoints: " + << m_coord->error_message() + << " @ " << m_coord->error_location(); + return false; } - for (size_t i = 0; i < irrelevant_for_now.size(); ++i) - { - m_coord->enqueue_response(irrelevant_for_now[i]); - } + assert(lid == id); if (rpc->status != REPLICANT_SUCCESS) { - e::error err = m_coord->error(); - LOG(ERROR) << "could not retrieve checkpoints: " << err.msg() << " @ " << err.loc(); + LOG(ERROR) << "could not retrieve checkpoints: " + << m_coord->error_message() + << " @ " << m_coord->error_location(); return false; } @@ -386,28 +390,30 @@ coordinator_link_wrapper :: maintain_link() exit_status = false; break; } - else if (id < 0 && (status == REPLICANT_BACKOFF || - status == REPLICANT_NEED_BOOTSTRAP)) + else if (id < 0 && status == REPLICANT_COMM_FAILED) { - e::error err = m_coord->error(); LOG(ERROR) << "coordinator disconnected: backing off before retrying"; - LOG(ERROR) << "details: " << err.msg() << " @ " << err.loc(); + LOG(ERROR) << "details: " + << m_coord->error_message() + << " @ " << m_coord->error_location(); do_sleep(); exit_status = false; break; } else if (id < 0 && status == REPLICANT_CLUSTER_JUMP) { - e::error err = m_coord->error(); - LOG(ERROR) << "cluster jump: " << err.msg() << " @ " << err.loc(); + LOG(ERROR) << "cluster jump: " + << m_coord->error_message() + << " @ " << m_coord->error_location(); do_sleep(); exit_status = false; break; } else if (id < 0) { - e::error err = m_coord->error(); - LOG(ERROR) << "coordinator error: " << err.msg() << " @ " << err.loc(); + LOG(ERROR) << "coordinator error: " + << m_coord->error_message() + << " @ " << m_coord->error_location(); do_sleep(); exit_status = false; break; @@ -766,7 +772,7 @@ coordinator_link_wrapper :: ensure_available() size_t sz = sizeof(uint64_t) + pack_size(m_daemon->m_bind_to); std::auto_ptr buf(e::buffer::create(sz)); - *buf << m_daemon->m_us << m_daemon->m_bind_to; + buf->pack() << m_daemon->m_us << m_daemon->m_bind_to; e::intrusive_ptr rpc = new coord_rpc_available(); rpc->msg << "server online"; m_online_id = make_rpc_nosync("server_online", @@ -1038,9 +1044,9 @@ coordinator_link_wrapper :: make_rpc_nosync(const char* func, if (id < 0) { - e::error err = m_coord->error(); LOG(ERROR) << "coordinator error: " << rpc->msg.str() - << ": " << err.msg() << " @ " << err.loc(); + << ": " << m_coord->error_message() + << " @ " << m_coord->error_location(); } else { @@ -1058,9 +1064,9 @@ coordinator_link_wrapper :: wait_nosync(const char* cond, uint64_t state, if (id < 0) { - e::error err = m_coord->error(); LOG(ERROR) << "coordinator error: " << rpc->msg.str() - << ": " << err.msg() << " @ " << err.loc(); + << ": " << m_coord->error_message() + << " @ " << m_coord->error_location(); } else { diff --git a/daemon/coordinator_link_wrapper.h b/daemon/coordinator_link_wrapper.h index bda55997..92d66338 100644 --- a/daemon/coordinator_link_wrapper.h +++ b/daemon/coordinator_link_wrapper.h @@ -32,6 +32,7 @@ #include // STL +#include #include // po6 @@ -40,6 +41,9 @@ #include #include +// e +#include + // HyperDex #include "namespace.h" #include "common/configuration.h" diff --git a/daemon/daemon.cc b/daemon/daemon.cc index 5c6e5865..6a1c26ea 100644 --- a/daemon/daemon.cc +++ b/daemon/daemon.cc @@ -795,7 +795,7 @@ daemon :: process_req_get(server_id from, + sizeof(uint16_t) + pack_size(value); msg.reset(e::buffer::create(sz)); - e::buffer::packer pa = msg->pack_at(HYPERDEX_HEADER_SIZE_VC); + e::packer pa = msg->pack_at(HYPERDEX_HEADER_SIZE_VC); pa = pa << nonce << static_cast(result); if (result == NET_SUCCESS) @@ -879,7 +879,7 @@ daemon :: process_req_get_partial(server_id from, + pack_size(value) + value.size() * sizeof(uint16_t); msg.reset(e::buffer::create(sz)); - e::buffer::packer pa = msg->pack_at(HYPERDEX_HEADER_SIZE_VC); + e::packer pa = msg->pack_at(HYPERDEX_HEADER_SIZE_VC); pa = pa << nonce << static_cast(result); if (result == NET_SUCCESS) @@ -1057,7 +1057,7 @@ daemon :: process_req_group_atomic(server_id from, } // Only forward the actual atomic operation - e::slice sl = up.as_slice(); + e::slice sl = up.remainder(); m_sm.group_keyop(from, vto, nonce, &checks, REQ_ATOMIC, sl, RESP_GROUP_ATOMIC); } @@ -1293,7 +1293,7 @@ daemon :: process_backup(server_id from, + sizeof(uint16_t) + pack_size(path); msg.reset(e::buffer::create(sz)); - e::buffer::packer pa = msg->pack_at(HYPERDEX_HEADER_SIZE_VC); + e::packer pa = msg->pack_at(HYPERDEX_HEADER_SIZE_VC); pa = pa << nonce << static_cast(result) << path; m_comm.send_client(vto, from, BACKUP, msg); } @@ -1345,9 +1345,8 @@ daemon :: process_perf_counters(server_id from, + sizeof(uint64_t) + out.size() + 1; msg.reset(e::buffer::create(sz)); - e::buffer::packer pa = msg->pack_at(HYPERDEX_HEADER_SIZE_VC); - pa = pa << nonce; - pa.copy(e::slice(out.data(), out.size() + 1)); + msg->pack_at(HYPERDEX_HEADER_SIZE_VC) + << nonce << e::pack_memmove(out.c_str(), out.size() + 1); m_comm.send_client(vto, from, PERF_COUNTERS, msg); } diff --git a/daemon/datalayer.cc b/daemon/datalayer.cc index d84be31a..ff175498 100644 --- a/daemon/datalayer.cc +++ b/daemon/datalayer.cc @@ -234,7 +234,7 @@ datalayer :: save_state(const server_id& us, + pack_size(bind_to) + pack_size(coordinator); std::auto_ptr state(e::buffer::create(sz)); - *state << us << bind_to << coordinator; + state->pack() << us << bind_to << coordinator; leveldb::Status st = m_db->Put(wopts, leveldb::Slice("state", 5), leveldb::Slice(reinterpret_cast(state->data()), state->size())); diff --git a/daemon/index_document.cc b/daemon/index_document.cc index 429512a6..886455af 100644 --- a/daemon/index_document.cc +++ b/daemon/index_document.cc @@ -227,7 +227,7 @@ index_document :: parse_path(const index* idx, { hyperdatatype type; - if (m_di.extract_value(idx->extra.c_str(), document, &type, scratch, value)) + if (m_di.extract_value(idx->extra.cdata(), document, &type, scratch, value)) { if (type == HYPERDATATYPE_STRING) { diff --git a/daemon/search_manager.cc b/daemon/search_manager.cc index 5ac8a9aa..a34f9098 100644 --- a/daemon/search_manager.cc +++ b/daemon/search_manager.cc @@ -488,7 +488,7 @@ search_manager :: sorted_search(const server_id& from, } std::auto_ptr msg(e::buffer::create(sz)); - e::buffer::packer pa = msg->pack_at(HYPERDEX_HEADER_SIZE_VC); + e::packer pa = msg->pack_at(HYPERDEX_HEADER_SIZE_VC); pa = pa << nonce << static_cast(top_n.size()); for (size_t i = 0; i < top_n.size(); ++i) @@ -551,9 +551,8 @@ search_manager :: group_keyop(const server_id& from, + pack_size(key) + remain.size(); std::auto_ptr msg(e::buffer::create(sz)); - e::buffer::packer pa = msg->pack_at(HYPERDEX_HEADER_SIZE_SV); - pa = pa << static_cast(0) << key; - pa = pa.copy(remain); + msg->pack_at(HYPERDEX_HEADER_SIZE_SV) + << uint64_t(0) << key << e::pack_memmove(remain.data(), remain.size()); virtual_server_id vsi = m_daemon->m_config.point_leader(ri, key); if (vsi != virtual_server_id()) @@ -685,8 +684,8 @@ search_manager :: search_describe(const server_id& from, + sizeof(uint64_t) + text_sz; std::auto_ptr msg(e::buffer::create(sz)); - e::buffer::packer pa = msg->pack_at(HYPERDEX_HEADER_SIZE_VC) << nonce; - pa.copy(e::slice(text, text_sz)); + msg->pack_at(HYPERDEX_HEADER_SIZE_VC) + << nonce << e::pack_memmove(text, text_sz); m_daemon->m_comm.send_client(to, from, RESP_SEARCH_DESCRIBE, msg); } diff --git a/doc/async-ops.tex b/doc/async-ops.tex index 7097ce52..dc762b96 100644 --- a/doc/async-ops.tex +++ b/doc/async-ops.tex @@ -211,6 +211,12 @@ \section{Potential Pitfalls} is committed, it really is committed to sufficiently many replicas to withstand the number of simultaneous failures in the space specification). +A more subtle pitfall with asynchronous operations is that the order of +concurrent asynchronous operations is not guaranteed, even if they are issued by +the same client. If a client requires that operations be ordered in a +particular manner, it must do so itself by ensuring that the dependent operation +is not issued until all its dependencies complete successfully. + \section{A Common Window Pattern} \label{sec:async-ops:window} diff --git a/test/search-stress-test.cc b/test/search-stress-test.cc index 55ca0c8e..3e3d1062 100644 --- a/test/search-stress-test.cc +++ b/test/search-stress-test.cc @@ -732,6 +732,10 @@ all_search_tests(size_t testno, hyperdex::Client* cl, const std::vector& expecting) { + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = 100 * 1000ULL * 1000ULL; + nanosleep(&ts, NULL); search(testno, cl, NULL, 0, expecting); sorted_search(testno, cl, NULL, 0, expecting); size_t num = 0;