diff --git a/.gitignore b/.gitignore index 31b8dbb1..b4cdb176 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,8 @@ *.lo *.o *.pyc +*.py +*.sh /bindings/java/org/hyperdex/*/*.class /test/java/*.class /man/hyperdex*.1 @@ -80,6 +82,9 @@ /hyperdex.upack /hyperdex-validate-space /hyperdex-wait-until-stable +/hyperdex-set-backup-cluster +/hyperdex-set-primary-cluster +/hyperdex-set-backup-affinity /install-sh /leveldb-dump /libtool @@ -94,3 +99,10 @@ /test/search-stress-test /test/simple-consistency-stress-test /ylwrap +MENG_TODO.txt +ac/* +ad/* +bc/* +bd/* +cc/* +cdae/* diff --git a/Makefile.am b/Makefile.am index bf9c84c6..ca681723 100644 --- a/Makefile.am +++ b/Makefile.am @@ -187,6 +187,10 @@ noinst_HEADERS += daemon/auth.h noinst_HEADERS += daemon/background_thread.h noinst_HEADERS += daemon/communication.h noinst_HEADERS += daemon/coordinator_link_wrapper.h +noinst_HEADERS += daemon/wan_manager.h +noinst_HEADERS += daemon/wan_manager_pending.h +noinst_HEADERS += daemon/wan_manager_transfer_in_state.h +noinst_HEADERS += daemon/wan_manager_transfer_out_state.h noinst_HEADERS += daemon/daemon.h noinst_HEADERS += daemon/datalayer_checkpointer_thread.h noinst_HEADERS += daemon/datalayer_encodings.h @@ -266,6 +270,10 @@ hyperdex_daemon_SOURCES += daemon/auth.cc hyperdex_daemon_SOURCES += daemon/background_thread.cc hyperdex_daemon_SOURCES += daemon/communication.cc hyperdex_daemon_SOURCES += daemon/coordinator_link_wrapper.cc +hyperdex_daemon_SOURCES += daemon/wan_manager.cc +hyperdex_daemon_SOURCES += daemon/wan_manager_pending.cc +hyperdex_daemon_SOURCES += daemon/wan_manager_transfer_in_state.cc +hyperdex_daemon_SOURCES += daemon/wan_manager_transfer_out_state.cc hyperdex_daemon_SOURCES += daemon/daemon.cc hyperdex_daemon_SOURCES += daemon/datalayer.cc hyperdex_daemon_SOURCES += daemon/datalayer_checkpointer_thread.cc @@ -1143,6 +1151,9 @@ hyperdexexec_PROGRAMS += hyperdex-wait-until-stable hyperdexexec_PROGRAMS += hyperdex-backup hyperdexexec_PROGRAMS += hyperdex-backup-manager hyperdexexec_PROGRAMS += hyperdex-raw-backup +hyperdexexec_PROGRAMS += hyperdex-set-backup-cluster +hyperdexexec_PROGRAMS += hyperdex-set-primary-cluster +hyperdexexec_PROGRAMS += hyperdex-set-backup-affinity hyperdexexec_SCRIPTS += hyperdex-noc dist_man_MANS += man/hyperdex-add-space.1 dist_man_MANS += man/hyperdex-rm-space.1 @@ -1300,6 +1311,30 @@ hyperdex_set_read_only_LDADD = libhyperdex-admin.la -lpopt man/hyperdex-set-read-only.1: man/hyperdex-set-read-only.1.h2m tools/set-read-only.cc | hyperdex-set-read-only$(EXEEXT) $(help2man_verbose)help2man $(HELP2MAN_FLAGS) --section 1 --output $@ --include $< ${abs_top_builddir}/hyperdex-set-read-only$(EXEEXT) +# hyperdex-set-primary-cluster +EXTRA_DIST += man/hyperdex-set-primary-cluster.1.md +EXTRA_DIST += man/hyperdex-set-primary-cluster.1.h2m +hyperdex_set_primary_cluster_SOURCES = tools/set-primary-cluster.cc +hyperdex_set_primary_cluster_LDADD = libhyperdex-admin.la -lpopt +man/hyperdex-set-primary-cluster.1: man/hyperdex-set-primary-cluster.1.h2m tools/set-primary-cluster.cc | hyperdex-set-primary-cluster$(EXEEXT) + $(help2man_verbose)help2man $(HELP2MAN_FLAGS) --section 1 --output $@ --include $< ${abs_top_builddir}/hyperdex-set-primary-cluster$(EXEEXT) + +# hyperdex-set-backup-affinity +EXTRA_DIST += man/hyperdex-set-backup-affinity.1.md +EXTRA_DIST += man/hyperdex-set-backup-affinity.1.h2m +hyperdex_set_backup_affinity_SOURCES = tools/set-backup-affinity.cc +hyperdex_set_backup_affinity_LDADD = libhyperdex-admin.la -lpopt +man/hyperdex-set-backup-affinity.1: man/hyperdex-set-backup-affinity.1.h2m tools/set-backup-affinity.cc | hyperdex-set-backup-affinity$(EXEEXT) + $(help2man_verbose)help2man $(HELP2MAN_FLAGS) --section 1 --output $@ --include $< ${abs_top_builddir}/hyperdex-set-backup-affinity$(EXEEXT) + +# hyperdex-set-backup-cluster +EXTRA_DIST += man/hyperdex-set-backup-cluster.1.md +EXTRA_DIST += man/hyperdex-set-backup-cluster.1.h2m +hyperdex_set_backup_cluster_SOURCES = tools/set-backup-cluster.cc +hyperdex_set_backup_cluster_LDADD = libhyperdex-admin.la -lpopt +man/hyperdex-set-backup-cluster.1: man/hyperdex-set-backup-cluster.1.h2m tools/set-backup-cluster.cc | hyperdex-set-backup-cluster$(EXEEXT) + $(help2man_verbose)help2man $(HELP2MAN_FLAGS) --section 1 --output $@ --include $< ${abs_top_builddir}/hyperdex-set-backup-cluster$(EXEEXT) + # hyperdex-set-read-write EXTRA_DIST += man/hyperdex-set-read-write.1.md EXTRA_DIST += man/hyperdex-set-read-write.1.h2m diff --git a/admin/admin.cc b/admin/admin.cc index 76bb9336..4b772ba5 100644 --- a/admin/admin.cc +++ b/admin/admin.cc @@ -148,6 +148,97 @@ admin :: read_only(int ro, hyperdex_admin_returncode* status) } } +int64_t +admin :: set_primary_cluster(int prim, hyperdex_admin_returncode* status) +{ + if (!maintain_coord_connection(status)) { + return -1; + } + + int64_t id = m_next_admin_id; + ++m_next_admin_id; + e::intrusive_ptr op = new coord_rpc_generic(id, status, "set primary-cluster"); + + char buf[sizeof(uint8_t)]; + buf[0] = prim ? 1 : 0; + + int64_t cid = m_coord.rpc("set_primary_cluster", buf, sizeof(uint8_t), + &op->repl_status, &op->repl_output, &op->repl_output_sz); + + if (cid >= 0) + { + m_coord_ops[cid] = op; + return op->admin_visible_id(); + } + else + { + interpret_rpc_request_failure(op->repl_status, status); + return -1; + } +} + +int64_t +admin :: set_backup_affinity(const char* host, int64_t port, hyperdex_admin_returncode* status) +{ + if (!maintain_coord_connection(status)) { + return -1; + } + + int64_t id = m_next_admin_id; + ++m_next_admin_id; + e::intrusive_ptr op = new coord_rpc_generic(id, status, "set backup-affinity"); + int64_t host_sz = strlen(host); + + std::vector buf(host_sz + sizeof(uint64_t)); + memcpy(&buf[0], host, host_sz); + e::pack64be(port, &buf[0] + host_sz); + + int64_t cid = m_coord.rpc("set_backup_affinity", &buf[0], host_sz + sizeof(uint64_t), + &op->repl_status, &op->repl_output, &op->repl_output_sz); + + if (cid >= 0) + { + m_coord_ops[cid] = op; + return op->admin_visible_id(); + } + else + { + interpret_rpc_request_failure(op->repl_status, status); + return -1; + } +} + +int64_t +admin :: set_backup_cluster(const char* host, int64_t port, hyperdex_admin_returncode* status) +{ + if (!maintain_coord_connection(status)) { + return -1; + } + + int64_t id = m_next_admin_id; + ++m_next_admin_id; + e::intrusive_ptr op = new coord_rpc_generic(id, status, "set backup-cluster"); + int64_t host_sz = strlen(host); + + std::vector buf(host_sz + sizeof(uint64_t)); + memcpy(&buf[0], host, host_sz); + e::pack64be(port, &buf[0] + host_sz); + + int64_t cid = m_coord.rpc("set_backup_cluster", &buf[0], host_sz + sizeof(uint64_t), + &op->repl_status, &op->repl_output, &op->repl_output_sz); + + if (cid >= 0) + { + m_coord_ops[cid] = op; + return op->admin_visible_id(); + } + else + { + interpret_rpc_request_failure(op->repl_status, status); + return -1; + } +} + int64_t admin :: wait_until_stable(enum hyperdex_admin_returncode* status) { diff --git a/admin/admin.h b/admin/admin.h index 5ea804d1..0e0b4651 100644 --- a/admin/admin.h +++ b/admin/admin.h @@ -62,6 +62,12 @@ class admin // cluster int64_t read_only(int ro, enum hyperdex_admin_returncode* status); + int64_t set_primary_cluster(int prim, + enum hyperdex_admin_returncode* status); + int64_t set_backup_cluster(const char* host, const int64_t port, + enum hyperdex_admin_returncode* status); + int64_t set_backup_affinity(const char* host, const int64_t port, + enum hyperdex_admin_returncode* status); int64_t wait_until_stable(enum hyperdex_admin_returncode* status); int64_t fault_tolerance(const char* space, uint64_t ft, enum hyperdex_admin_returncode* status); diff --git a/admin/c.cc b/admin/c.cc index 4bc8ab6a..791bb8ae 100644 --- a/admin/c.cc +++ b/admin/c.cc @@ -126,6 +126,7 @@ hyperdex_admin_dump_config(struct hyperdex_admin* _adm, return adm->dump_config(status, config); ); } + HYPERDEX_API int64_t hyperdex_admin_read_only(struct hyperdex_admin* _adm, int ro, @@ -137,6 +138,39 @@ hyperdex_admin_read_only(struct hyperdex_admin* _adm, ); } +HYPERDEX_API int64_t +hyperdex_admin_set_primary_cluster(struct hyperdex_admin* _adm, + int prim, + enum hyperdex_admin_returncode* status) +{ + C_WRAP_EXCEPT( + hyperdex::admin* adm = reinterpret_cast(_adm); + return adm->set_primary_cluster(prim, status); + ); +} + +HYPERDEX_API int64_t +hyperdex_admin_set_backup_affinity(struct hyperdex_admin* _adm, + const char* host, const int64_t port, + enum hyperdex_admin_returncode* status) +{ + C_WRAP_EXCEPT( + hyperdex::admin* adm = reinterpret_cast(_adm); + return adm->set_backup_affinity(host, port, status); + ); +} + +HYPERDEX_API int64_t +hyperdex_admin_set_backup_cluster(struct hyperdex_admin* _adm, + const char* host, const int64_t port, + enum hyperdex_admin_returncode* status) +{ + C_WRAP_EXCEPT( + hyperdex::admin* adm = reinterpret_cast(_adm); + return adm->set_backup_cluster(host, port, status); + ); +} + HYPERDEX_API int64_t hyperdex_admin_wait_until_stable(struct hyperdex_admin* _adm, enum hyperdex_admin_returncode* status) diff --git a/common/attribute.cc b/common/attribute.cc index 5d8bd1a7..673a5001 100644 --- a/common/attribute.cc +++ b/common/attribute.cc @@ -25,6 +25,9 @@ // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE // POSSIBILITY OF SUCH DAMAGE. +// C +#include + // HyperDex #include "common/attribute.h" @@ -55,3 +58,15 @@ attribute :: operator = (const attribute& rhs) type = rhs.type; return *this; } + +bool +attribute :: operator == (const attribute& rhs) const +{ + return ((strcmp(name, rhs.name) == 0) && type == rhs.type); +} + +bool +attribute :: operator != (const attribute& rhs) const +{ + return ((strcmp(name, rhs.name) != 0) || type != rhs.type); +} diff --git a/common/attribute.h b/common/attribute.h index 46cbd9cb..0977f5a0 100644 --- a/common/attribute.h +++ b/common/attribute.h @@ -43,6 +43,8 @@ class attribute public: attribute& operator = (const attribute& rhs); + bool operator == (const attribute& rhs) const; + bool operator != (const attribute& rhs) const; public: const char* name; diff --git a/common/configuration.cc b/common/configuration.cc index 669d5213..4987592f 100644 --- a/common/configuration.cc +++ b/common/configuration.cc @@ -67,6 +67,7 @@ configuration :: configuration() , m_point_leaders_by_virtual() , m_spaces() , m_transfers() + , m_primary_coord() { refill_cache(); } @@ -89,6 +90,7 @@ configuration :: configuration(const configuration& other) , m_point_leaders_by_virtual(other.m_point_leaders_by_virtual) , m_spaces(other.m_spaces) , m_transfers(other.m_transfers) + , m_primary_coord(other.m_primary_coord) { refill_cache(); } @@ -115,6 +117,18 @@ configuration :: read_only() const return m_flags & HYPERDEX_CONFIG_READ_ONLY; } +bool +configuration :: is_backup_cluster() const +{ + return (m_flags & HYPERDEX_BACKUP_CLUSTER) >> 1; +} + +po6::net::location +configuration :: get_primary_location() +{ + return m_primary_coord; +} + void configuration :: get_all_addresses(std::vector >* addrs) const { @@ -201,6 +215,20 @@ configuration :: get_server_id(const virtual_server_id& id) const return server_id(); } +virtual_server_id +configuration :: get_virtual(const server_id& si) const +{ + for (std::vector::const_iterator it = m_server_ids_by_virtual.begin(); + it != m_server_ids_by_virtual.end(); + ++it) { + if (it->second == si.get()) { + return virtual_server_id(it->first); + } + } + + return virtual_server_id(); +} + const schema* configuration :: get_schema(const char* sname) const { @@ -395,6 +423,12 @@ configuration :: point_leaders(const server_id& si, std::vector* serv } } +std::vector +configuration :: get_spaces() +{ + return m_spaces; +} + void configuration :: key_regions(const server_id& si, std::vector* regions) const { @@ -1064,6 +1098,7 @@ configuration :: operator = (const configuration& rhs) m_cluster = rhs.m_cluster; m_version = rhs.m_version; m_flags = rhs.m_flags; + m_primary_coord = rhs.m_primary_coord; m_servers = rhs.m_servers; m_region_ids_by_virtual = rhs.m_region_ids_by_virtual; m_server_ids_by_virtual = rhs.m_server_ids_by_virtual; @@ -1183,7 +1218,7 @@ hyperdex :: operator >> (e::unpacker up, configuration& c) uint64_t num_servers; uint64_t num_spaces; uint64_t num_transfers; - up = up >> c.m_cluster >> c.m_version >> c.m_flags + up = up >> c.m_cluster >> c.m_version >> c.m_flags >> c.m_primary_coord >> num_servers >> num_spaces >> num_transfers; c.m_servers.clear(); diff --git a/common/configuration.h b/common/configuration.h index 2528242d..df122a99 100644 --- a/common/configuration.h +++ b/common/configuration.h @@ -62,6 +62,8 @@ class configuration uint64_t cluster() const; uint64_t version() const; bool read_only() const; + bool is_backup_cluster() const; + po6::net::location get_primary_location(); // membership metadata public: @@ -78,6 +80,7 @@ class configuration const schema* get_schema(const region_id& ri) const; const subspace* get_subspace(const region_id& ri) const; virtual_server_id get_virtual(const region_id& ri, const server_id& si) const; + virtual_server_id get_virtual(const server_id& si) const; subspace_id subspace_of(const region_id& ri) const; subspace_id subspace_prev(const subspace_id& ss) const; subspace_id subspace_next(const subspace_id& ss) const; @@ -88,6 +91,7 @@ class configuration void key_regions(const server_id& s, std::vector* servers) const; bool is_point_leader(const virtual_server_id& e) const; virtual_server_id point_leader(const char* space, const e::slice& key) const; + std::vector get_spaces(); // point leader for this key in the same space as ri virtual_server_id point_leader(const region_id& ri, const e::slice& key) const; // lhs and rhs are in adjacent subspaces such that lhs sends CHAIN_PUT @@ -170,6 +174,7 @@ class configuration std::vector m_point_leaders_by_virtual; std::vector m_spaces; std::vector m_transfers; + po6::net::location m_primary_coord; }; e::buffer::packer diff --git a/common/configuration_flags.h b/common/configuration_flags.h index 5ae306d8..df8892cb 100644 --- a/common/configuration_flags.h +++ b/common/configuration_flags.h @@ -29,5 +29,6 @@ #define hyperdex_common_configuration_flags_h_ #define HYPERDEX_CONFIG_READ_ONLY 1 +#define HYPERDEX_BACKUP_CLUSTER 2 #endif // hyperdex_common_configuration_flags_h_ diff --git a/common/hyperspace.cc b/common/hyperspace.cc index b8e1c4bb..da7dc602 100644 --- a/common/hyperspace.cc +++ b/common/hyperspace.cc @@ -147,6 +147,27 @@ space :: operator = (const space& rhs) return *this; } +bool +space :: operator== (const space& rhs) +{ + int i; + bool verify_name = (strcmp(name, rhs.name) == 0); + if (verify_name) { + if (sc.attrs_sz != rhs.sc.attrs_sz) { + return false; + } + for (i = 0; i < sc.attrs_sz; i++) { + if (sc.attrs[i] != rhs.sc.attrs[i]) { + return false; + } + } + return true; + } else { + return false; + } + +} + void space :: reestablish_backing() { diff --git a/common/hyperspace.h b/common/hyperspace.h index 1ff8c56e..ac1b3bb4 100644 --- a/common/hyperspace.h +++ b/common/hyperspace.h @@ -61,6 +61,7 @@ class space public: space& operator = (const space&); + bool operator== (const space&); public: space_id id; diff --git a/common/network_msgtype.cc b/common/network_msgtype.cc index 01259ada..7b25b393 100644 --- a/common/network_msgtype.cc +++ b/common/network_msgtype.cc @@ -62,6 +62,10 @@ hyperdex :: operator << (std::ostream& lhs, const network_msgtype& rhs) STRINGIFY(XFER_HSA); STRINGIFY(XFER_HA); STRINGIFY(XFER_HW); + STRINGIFY(WAN_HS); + STRINGIFY(WAN_XFER); + STRINGIFY(WAN_MORE); + STRINGIFY(WAN_ACK); STRINGIFY(BACKUP); STRINGIFY(PERF_COUNTERS); STRINGIFY(CONFIGMISMATCH); diff --git a/common/network_msgtype.h b/common/network_msgtype.h index dbfbc647..f52a7d1c 100644 --- a/common/network_msgtype.h +++ b/common/network_msgtype.h @@ -79,6 +79,11 @@ enum network_msgtype XFER_HA = 84, // handshake ack XFER_HW = 85, // wiped + WAN_HS = 101, + WAN_XFER = 102, + WAN_MORE = 103, + WAN_ACK = 104, + BACKUP = 126, PERF_COUNTERS = 127, diff --git a/coordinator/coordinator.cc b/coordinator/coordinator.cc index d12dd0f7..ef1d162a 100644 --- a/coordinator/coordinator.cc +++ b/coordinator/coordinator.cc @@ -159,6 +159,7 @@ coordinator :: coordinator() , m_checkpoint_stable_barrier() , m_latest_config() , m_response() + , m_primary_coord() { assert(m_config_ack_through == m_config_ack_barrier.min_version()); assert(m_config_stable_through == m_config_stable_barrier.min_version()); @@ -232,6 +233,83 @@ coordinator :: read_only(replicant_state_machine_context* ctx, bool ro) return generate_response(ctx, COORD_SUCCESS); } + +void +coordinator :: set_primary_cluster(replicant_state_machine_context* ctx, bool prim) +{ + FILE* log = replicant_state_machine_log_stream(ctx); + uint64_t old_flags = m_flags; + + if (!prim) { + fprintf(log, "error in set_primary_cluster call\n"); + } + + if (!(m_flags & HYPERDEX_BACKUP_CLUSTER) >> 1) { + fprintf(log, "cluster already primary cluster\n"); + } else { + fprintf(log, "putting cluster into primary mode\n"); + uint64_t mask = HYPERDEX_BACKUP_CLUSTER; + mask = ~mask; + m_flags &= mask; + m_primary_coord = po6::net::location(); // clear coordinator + fprintf(log, "trying to put cluster into primary mode"); + // XXX robustness + } + + if (old_flags != m_flags) + { + generate_next_configuration(ctx); + } + + return generate_response(ctx, COORD_SUCCESS); +} + +void +coordinator :: set_backup_affinity(replicant_state_machine_context* ctx, + const char* host, const int64_t port) +{ + FILE* log = replicant_state_machine_log_stream(ctx); + uint64_t old_flags = m_flags; + + if ((m_flags & HYPERDEX_BACKUP_CLUSTER) >> 1) { + fprintf(log, "changing backup cluster affinity\n"); + m_primary_coord = po6::net::location(host, port); + assert(m_primary_coord != po6::net::location()); + fprintf(log, "trying to change backup cluster affinity to host %s and port %ld\n", host, port); + generate_next_configuration(ctx); + return generate_response(ctx, COORD_SUCCESS); + } else { + fprintf(log, "cluster not in backup mode\n"); + return; + } +} + +void +coordinator :: set_backup_cluster(replicant_state_machine_context* ctx, + const char* host, const int64_t port) +{ + FILE* log = replicant_state_machine_log_stream(ctx); + uint64_t old_flags = m_flags; + + if ((m_flags & HYPERDEX_BACKUP_CLUSTER) >> 1) { + fprintf(log, "cluster already backing up another coordinator\n"); + } else { + fprintf(log, "putting cluster into backup mode\n"); + m_flags |= HYPERDEX_BACKUP_CLUSTER; + m_primary_coord = po6::net::location(host, port); + assert(m_primary_coord != po6::net::location()); + fprintf(log, "trying to put up backup cluster for host %s and port %ld\n", host, port); + // XXX robustness + } + + if (old_flags != m_flags) + { + generate_next_configuration(ctx); + } + + return generate_response(ctx, COORD_SUCCESS); +} + void coordinator :: fault_tolerance(replicant_state_machine_context* ctx, const char* space, uint64_t ft) @@ -1095,7 +1173,7 @@ coordinator :: recreate(replicant_state_machine_context* ctx, >> c->m_config_ack_through >> c->m_config_ack_barrier >> c->m_config_stable_through >> c->m_config_stable_barrier >> c->m_checkpoint >> c->m_checkpoint_stable_through - >> c->m_checkpoint_gc_through >> c->m_checkpoint_stable_barrier; + >> c->m_checkpoint_gc_through >> c->m_checkpoint_stable_barrier >> c->m_primary_coord; while (!up.error() && up.remain()) { @@ -1187,7 +1265,8 @@ coordinator :: snapshot(replicant_state_machine_context* /*ctx*/, + sizeof(m_checkpoint) + sizeof(m_checkpoint_stable_through) + sizeof(m_checkpoint_gc_through) - + pack_size(m_checkpoint_stable_barrier); + + pack_size(m_checkpoint_stable_barrier) + + pack_size(m_primary_coord); for (space_map_t::iterator it = m_spaces.begin(); it != m_spaces.end(); ++it) @@ -1204,7 +1283,7 @@ coordinator :: snapshot(replicant_state_machine_context* /*ctx*/, << m_config_ack_through << m_config_ack_barrier << m_config_stable_through << m_config_stable_barrier << m_checkpoint << m_checkpoint_stable_through - << m_checkpoint_gc_through << m_checkpoint_stable_barrier; + << m_checkpoint_gc_through << m_checkpoint_stable_barrier << m_primary_coord; for (space_map_t::iterator it = m_spaces.begin(); it != m_spaces.end(); ++it) @@ -1925,7 +2004,7 @@ void coordinator :: generate_cached_configuration(replicant_state_machine_context*) { m_latest_config.reset(); - size_t sz = 7 * sizeof(uint64_t); + size_t sz = 7 * sizeof(uint64_t) + pack_size(m_primary_coord); for (size_t i = 0; i < m_servers.size(); ++i) { @@ -1948,7 +2027,7 @@ coordinator :: generate_cached_configuration(replicant_state_machine_context*) std::auto_ptr new_config(e::buffer::create(sz)); e::buffer::packer pa = new_config->pack_at(0); - pa = pa << m_cluster << m_version << m_flags + pa = pa << m_cluster << m_version << m_flags << m_primary_coord << uint64_t(m_servers.size()) << uint64_t(m_spaces.size()) << uint64_t(transfers_subset.size()); diff --git a/coordinator/coordinator.h b/coordinator/coordinator.h index 763357b8..84cf3097 100644 --- a/coordinator/coordinator.h +++ b/coordinator/coordinator.h @@ -67,6 +67,9 @@ class coordinator // cluster management public: void read_only(replicant_state_machine_context* ctx, bool ro); + void set_backup_cluster(replicant_state_machine_context* ctx, const char* host, const int64_t port); + void set_backup_affinity(replicant_state_machine_context* ctx, const char* host, const int64_t port); + void set_primary_cluster(replicant_state_machine_context* ctx, bool prim); void fault_tolerance(replicant_state_machine_context* ctx, const char* space, uint64_t consistency); @@ -227,6 +230,8 @@ class coordinator // cached config std::auto_ptr m_latest_config; std::auto_ptr m_response; + // backup cluster + po6::net::location m_primary_coord; private: coordinator(const coordinator&); diff --git a/coordinator/symtable.c b/coordinator/symtable.c index 4433c04b..a1c556ad 100644 --- a/coordinator/symtable.c +++ b/coordinator/symtable.c @@ -62,6 +62,9 @@ struct replicant_state_machine HYPERDEX_API rsm = { {"checkpoint_stable", hyperdex_coordinator_checkpoint_stable}, {"alarm", hyperdex_coordinator_alarm}, {"read_only", hyperdex_coordinator_read_only}, + {"set_backup_cluster", hyperdex_coordinator_set_backup_cluster}, + {"set_backup_affinity", hyperdex_coordinator_set_backup_affinity}, + {"set_primary_cluster", hyperdex_coordinator_set_primary_cluster}, {"fault_tolerance", hyperdex_coordinator_fault_tolerance}, {"checkpoints", hyperdex_coordinator_checkpoints}, {"debug_dump", hyperdex_coordinator_debug_dump}, diff --git a/coordinator/transitions.cc b/coordinator/transitions.cc index 9c287c69..abe9dacd 100644 --- a/coordinator/transitions.cc +++ b/coordinator/transitions.cc @@ -178,6 +178,52 @@ hyperdex_coordinator_read_only(struct replicant_state_machine_context* ctx, c->read_only(ctx, set != 0); } +void +hyperdex_coordinator_set_backup_affinity(struct replicant_state_machine_context* ctx, + void* obj, const char* data, size_t data_sz) +{ + PROTECT_UNINITIALIZED; + FILE* log = replicant_state_machine_log_stream(ctx); + coordinator* c = static_cast(obj); + char* host; + int64_t port; + const char* port_ptr = data + data_sz - sizeof(int64_t); + e::unpacker up(port_ptr, sizeof(int64_t)); + up = up >> port; + CHECK_UNPACK(set_backup_affinity); + c->set_backup_affinity(ctx, data, port); +} + +void +hyperdex_coordinator_set_backup_cluster(struct replicant_state_machine_context* ctx, + void* obj, const char* data, size_t data_sz) +{ + PROTECT_UNINITIALIZED; + FILE* log = replicant_state_machine_log_stream(ctx); + coordinator* c = static_cast(obj); + char* host; + int64_t port; + const char* port_ptr = data + data_sz - sizeof(int64_t); + e::unpacker up(port_ptr, sizeof(int64_t)); + up = up >> port; + CHECK_UNPACK(set_backup_cluster); + c->set_backup_cluster(ctx, data, port); +} + +void +hyperdex_coordinator_set_primary_cluster(struct replicant_state_machine_context* ctx, + void* obj, const char* data, size_t data_sz) +{ + PROTECT_UNINITIALIZED; + FILE* log = replicant_state_machine_log_stream(ctx); + coordinator* c = static_cast(obj); + uint8_t set; + e::unpacker up(data, data_sz); + up = up >> set; + CHECK_UNPACK(set_primary_cluster); + c->set_primary_cluster(ctx, set != 0); +} + void hyperdex_coordinator_fault_tolerance(struct replicant_state_machine_context* ctx, void* obj, const char* data, size_t data_sz) diff --git a/coordinator/transitions.h b/coordinator/transitions.h index ae9b22c2..6077fef9 100644 --- a/coordinator/transitions.h +++ b/coordinator/transitions.h @@ -58,6 +58,9 @@ hyperdex_coordinator_snapshot(struct replicant_state_machine_context* ctx, TRANSITION(init); TRANSITION(read_only); +TRANSITION(set_backup_cluster); +TRANSITION(set_backup_affinity); +TRANSITION(set_primary_cluster); TRANSITION(fault_tolerance); TRANSITION(config_get); diff --git a/daemon/communication.cc b/daemon/communication.cc index 32d2e378..1a28d257 100644 --- a/daemon/communication.cc +++ b/daemon/communication.cc @@ -182,6 +182,59 @@ communication :: send_client(const virtual_server_id& from, return true; } +bool +communication :: send_wan(const virtual_server_id& from, + const server_id& to, + network_msgtype msg_type, + std::auto_ptr msg) +{ + assert(msg->size() >= HYPERDEX_HEADER_SIZE_SV); + + if (m_daemon->m_us != m_daemon->m_config.get_server_id(from) && + from != virtual_server_id(UINT64_MAX)) + { + return false; + } + + uint8_t mt = static_cast(msg_type); + uint8_t flags = 0; + virtual_server_id vto = virtual_server_id(UINT64_MAX); + msg->pack_at(BUSYBEE_HEADER_SIZE) << mt << flags << m_daemon->m_config.version() << vto.get(); + +#ifdef HD_LOG_ALL_MESSAGES + LOG(INFO) << "SEND " << from << "->" << to << " " << msg_type << " " << msg->hex(); +#endif + + if (to == m_daemon->m_us) + { + m_busybee->deliver(to.get(), msg); + } + else + { + busybee_returncode rc = m_busybee->send(to.get(), msg); + + switch (rc) + { + case BUSYBEE_SUCCESS: + break; + case BUSYBEE_DISRUPTED: + handle_disruption(to.get()); + return false; + case BUSYBEE_SHUTDOWN: + case BUSYBEE_POLLFAILED: + case BUSYBEE_ADDFDFAIL: + case BUSYBEE_TIMEOUT: + case BUSYBEE_EXTERNAL: + case BUSYBEE_INTERRUPTED: + default: + LOG(ERROR) << "BusyBee unexpectedly returned " << rc; + return false; + } + } + + return true; +} + bool communication :: send(const virtual_server_id& from, const server_id& to, diff --git a/daemon/communication.h b/daemon/communication.h index 77a1976b..0e2c834a 100644 --- a/daemon/communication.h +++ b/daemon/communication.h @@ -91,6 +91,10 @@ class communication const server_id& to, network_msgtype msg_type, std::auto_ptr msg); + bool send_wan(const virtual_server_id& from, + const server_id& to, + network_msgtype msg_type, + std::auto_ptr msg); bool send(const virtual_server_id& from, const server_id& to, network_msgtype msg_type, diff --git a/daemon/coordinator_link_wrapper.cc b/daemon/coordinator_link_wrapper.cc index 76976fe7..d3e7be2b 100644 --- a/daemon/coordinator_link_wrapper.cc +++ b/daemon/coordinator_link_wrapper.cc @@ -470,6 +470,25 @@ coordinator_link_wrapper :: request_shutdown() make_rpc("server_shutdown", buf, sizeof(uint64_t), rpc); } +void +coordinator_link_wrapper :: add_space(hyperdex::space space) +{ + std::auto_ptr msg(e::buffer::create(pack_size(space))); + msg->pack_at(0) << space; + e::intrusive_ptr rpc = new coord_rpc(); + rpc->msg << "add space"; + make_rpc("space_add", reinterpret_cast(msg->data()), msg->size(), rpc); +} + +void coordinator_link_wrapper :: rm_space(hyperdex::space space) +{ + const char* sp = space.name; + uint64_t sz = strlen(sp); + e::intrusive_ptr rpc = new coord_rpc(); + rpc->msg << "rm space"; + make_rpc("space_rm", sp, sz, rpc); +} + uint64_t coordinator_link_wrapper :: checkpoint() { diff --git a/daemon/coordinator_link_wrapper.h b/daemon/coordinator_link_wrapper.h index bda55997..593f4073 100644 --- a/daemon/coordinator_link_wrapper.h +++ b/daemon/coordinator_link_wrapper.h @@ -70,6 +70,8 @@ class coordinator_link_wrapper void copy_config(configuration* config); uint64_t config_version(); void request_shutdown(); + void add_space(hyperdex::space space); + void rm_space(hyperdex::space space); uint64_t checkpoint(); uint64_t checkpoint_stable(); uint64_t checkpoint_gc(); diff --git a/daemon/daemon.cc b/daemon/daemon.cc index 0a76c726..5e4c1ff9 100644 --- a/daemon/daemon.cc +++ b/daemon/daemon.cc @@ -61,6 +61,7 @@ using po6::threads::make_thread_wrapper; using hyperdex::daemon; int s_interrupts = 0; +int s_kick = 1; bool s_debug = false; static void @@ -83,6 +84,12 @@ exit_after_timeout(int /*signum*/) RAW_LOG(ERROR, "took too long to shutdown; just exiting"); } +static void +kick_after_timeout(int /*signum*/) +{ + __sync_fetch_and_add(&s_kick, 1); +} + static void handle_debug(int /*signum*/) { @@ -101,6 +108,7 @@ daemon :: daemon() , m_gc() , m_gc_ts() , m_coord(this) + , m_wan((new wan_manager(this))) , m_data_dir() , m_data(this) , m_comm(this) @@ -111,6 +119,7 @@ daemon :: daemon() , m_protect_pause() , m_can_pause(&m_protect_pause) , m_paused(false) + , m_wan_reconfigured(false) , m_perf_req_get() , m_perf_req_get_partial() , m_perf_req_atomic() @@ -334,6 +343,8 @@ daemon :: run(bool daemonize, m_bind_to = bind_to; m_coord.set_coordinator_address(coordinator.address.c_str(), coordinator.port); + m_wan->set_coordinator_address(coordinator.address.c_str(), coordinator.port); + m_wan->set_is_backup(false); if (!saved) { @@ -380,6 +391,7 @@ daemon :: run(bool daemonize, m_repl.setup(); m_stm.setup(); m_sm.setup(); + m_wan->setup(coordinator.address.c_str(), coordinator.port + 71); for (size_t i = 0; i < threads; ++i) { @@ -409,6 +421,19 @@ daemon :: run(bool daemonize, LOG(INFO) << "end debug dump"; } + if (m_config.is_backup_cluster()) { + if (s_kick > 0 && !requested_exit) { + if (!install_signal_handler(SIGALRM, kick_after_timeout)) { + __sync_fetch_and_add(&s_interrupts, 2); + break; + } + m_wan->kick(); + // ualarm(3000000, 0); + alarm(3); + s_kick = 0; + } + } + if (s_interrupts > 0 && !requested_exit) { if (!install_signal_handler(SIGALRM, exit_after_timeout)) @@ -482,6 +507,9 @@ daemon :: run(bool daemonize, LOG(INFO) << "moving to configuration version=" << new_config.version() << "; pausing all activity while we reconfigure"; this->pause(); + LOG(INFO) << "coordinator tells us that the host we are supposed to backup is " << new_config.get_primary_location(); + LOG(INFO) << "coordinator tells us that backup_cluster = " << new_config.is_backup_cluster(); + reconfigure_wan(old_config, new_config); m_comm.reconfigure(old_config, new_config, m_us); m_data.reconfigure(old_config, new_config, m_us); m_repl.reconfigure(old_config, new_config, m_us); @@ -523,6 +551,7 @@ daemon :: run(bool daemonize, m_threads[i]->join(); } + m_wan->teardown(); m_sm.teardown(); m_stm.teardown(); m_repl.teardown(); @@ -549,6 +578,7 @@ daemon :: pause() m_repl.pause(); m_data.pause(); m_comm.pause(); + m_wan->pause(); } void @@ -560,6 +590,11 @@ daemon :: unpause() m_repl.unpause(); m_stm.unpause(); m_sm.unpause(); + if (!m_wan_reconfigured) { + m_wan->unpause(); + } else { + m_wan_reconfigured = false; + } assert(m_paused); m_paused = false; m_can_pause.signal(); @@ -703,6 +738,22 @@ daemon :: loop(size_t thread) process_perf_counters(from, vfrom, vto, msg, up); m_perf_perf_counters.tap(); break; + case WAN_HS: + // XXX process_wan_handshake + m_wan->handle_handshake(from, vfrom, vto, msg, up); + break; + case WAN_XFER: + // XXX process wan xfer + m_wan->recv_data(from, vfrom, vto, msg, up); + break; + case WAN_MORE: + // XXX process_wan_more + m_wan->send_more_data(from, vfrom, vto, msg, up); + break; + case WAN_ACK: + m_wan->handle_ack(from, vfrom, vto, msg, up); + // XXX process_wan_ack + break; case RESP_GET: case RESP_GET_PARTIAL: case RESP_ATOMIC: @@ -1670,3 +1721,58 @@ daemon :: collect_stats_io(std::ostringstream* ret) *ret << " io.time_in_queue=" << time_in_queue; } } + +void +daemon :: reconfigure_wan(configuration old_config, configuration new_config) +{ + bool new_flags = new_config.is_backup_cluster(); + bool old_flags = old_config.is_backup_cluster(); + po6::net::location new_loc = new_config.get_primary_location(); + po6::net::location old_loc = old_config.get_primary_location(); + std::stringstream ss; + + if (new_flags != old_flags) { + wan_manager* tmp = m_wan; + if (new_flags) { // moving to backup + tmp->rm_all_spaces(); + tmp->set_teardown(); + tmp->unpause(); + delete(tmp); + m_wan = new wan_manager(this); + + LOG(INFO) << "moving cluster to backup " << new_loc; + ss << new_loc.address; + LOG(INFO) << "setting coordinator to host = " << ss.str() << ", port = " << new_loc.port; + m_wan->set_coordinator_address(ss.str().c_str(), new_loc.port); + } else { // moving to primary + tmp->set_teardown(); + tmp->unpause(); + delete(tmp); + + m_wan = new wan_manager(this); + LOG(INFO) << "moving cluster to primary"; + ss << m_bind_to.address; + LOG(INFO) << "setting coordinator to host = " << ss.str() << ", port = " << m_bind_to.port; + m_wan->set_coordinator_address(ss.str().c_str(), m_bind_to.port); + } + m_wan->set_is_backup(new_flags); + m_wan->setup(ss.str().c_str(), m_bind_to.port + 71); + m_wan_reconfigured = true; + } else if (new_loc != old_loc && new_flags == old_flags + && new_loc != po6::net::location() + && new_flags) { // in backup, changing coordinators + LOG(INFO) << "changing backup affiliation"; + wan_manager* tmp = m_wan; + tmp->rm_all_spaces(); + tmp->set_teardown(); + tmp->unpause(); + delete(tmp); + m_wan = new wan_manager(this); + ss << new_loc.address; + LOG(INFO) << "setting coordinator to host = " << ss.str() << ", port = " << new_loc.port; + m_wan->set_coordinator_address(ss.str().c_str(), new_loc.port); + m_wan->set_is_backup(new_flags); + m_wan->setup(ss.str().c_str(), new_loc.port + 711); + m_wan_reconfigured = true; + } +} diff --git a/daemon/daemon.h b/daemon/daemon.h index 823cdb20..1e722a6b 100644 --- a/daemon/daemon.h +++ b/daemon/daemon.h @@ -46,6 +46,7 @@ #include "common/ids.h" #include "daemon/communication.h" #include "daemon/coordinator_link_wrapper.h" +#include "daemon/wan_manager.h" #include "daemon/datalayer.h" #include "daemon/performance_counter.h" #include "daemon/replication_manager.h" @@ -101,6 +102,7 @@ class daemon void process_xfer_ack(server_id from, virtual_server_id vfrom, virtual_server_id vto, std::auto_ptr msg, e::unpacker up); void process_backup(server_id from, virtual_server_id vfrom, virtual_server_id vto, std::auto_ptr msg, e::unpacker up); void process_perf_counters(server_id from, virtual_server_id vfrom, virtual_server_id vto, std::auto_ptr msg, e::unpacker up); + void reconfigure_wan(configuration old_config, configuration new_config); private: void collect_stats(); @@ -113,6 +115,7 @@ class daemon friend class background_thread; friend class communication; friend class coordinator_link_wrapper; + friend class wan_manager; friend class datalayer; friend class key_state; friend class replication_manager; @@ -126,6 +129,7 @@ class daemon e::garbage_collector m_gc; e::garbage_collector::thread_state m_gc_ts; coordinator_link_wrapper m_coord; + wan_manager* m_wan; std::string m_data_dir; datalayer m_data; communication m_comm; @@ -137,6 +141,7 @@ class daemon po6::threads::mutex m_protect_pause; po6::threads::cond m_can_pause; bool m_paused; + bool m_wan_reconfigured; // counters performance_counter m_perf_req_get; performance_counter m_perf_req_get_partial; diff --git a/daemon/main.cc b/daemon/main.cc index c7f7d63c..b32e6943 100644 --- a/daemon/main.cc +++ b/daemon/main.cc @@ -96,6 +96,7 @@ main(int argc, const char* argv[]) .description("immediately flush all log output") .set_true(&log_immediate).hidden(); + if (!ap.parse(argc, argv)) { return EXIT_FAILURE; @@ -201,7 +202,8 @@ main(int argc, const char* argv[]) po6::pathname(log ? log : data), po6::pathname(pidfile), has_pidfile, listen, bind_to, - coordinator, po6::net::hostname(coordinator_host, coordinator_port), + coordinator, + po6::net::hostname(coordinator_host, coordinator_port), threads); } catch (std::exception& e) diff --git a/daemon/replication_manager.cc b/daemon/replication_manager.cc index 38f8b8db..1d8d8982 100644 --- a/daemon/replication_manager.cc +++ b/daemon/replication_manager.cc @@ -232,6 +232,52 @@ replication_manager :: debug_dump() m_retransmitter->trigger(); } +void +replication_manager :: wan_atomic(const server_id& from, + const virtual_server_id& to, + uint64_t nonce, + uint64_t version, + std::auto_ptr kc, + std::auto_ptr backing) +{ + const region_id ri(m_daemon->m_config.get_region_id(to)); + const schema& sc(*m_daemon->m_config.get_schema(ri)); + + if (m_daemon->m_config.read_only()) + { + respond_to_client(to, from, nonce, NET_READONLY); + return; + } + + if (!kc->validate(sc)) + { + LOG(ERROR) << "dropping nonce=" << nonce << " from client=" << from + << " because the key, checks, or funcs don't validate"; + respond_to_client(to, from, nonce, NET_BADDIMSPEC); + return; + } + + if (m_daemon->m_config.point_leader(ri, kc->key) != to) + { + LOG(ERROR) << "dropping nonce=" << nonce << " from client=" << from + << " because it doesn't map to " << ri; + respond_to_client(to, from, nonce, NET_NOTUS); + return; + } + + key_map_t::state_reference ksr; + key_state* ks = get_or_create_key_state(ri, kc->key, &ksr); + + if (version % datalayer::REGION_PERIODIC == 0) + { + m_daemon->m_data.bump_version(ri, version); + } + + // we hava a valid keychange. put it into the workloop + ks->enqueue_key_change(from, nonce, version, kc, backing); + ks->work_state_machine(this, to, sc); +} + void replication_manager :: client_atomic(const server_id& from, const virtual_server_id& to, diff --git a/daemon/replication_manager.h b/daemon/replication_manager.h index a9d40f06..f081f29f 100644 --- a/daemon/replication_manager.h +++ b/daemon/replication_manager.h @@ -91,6 +91,12 @@ class replication_manager uint64_t nonce, std::auto_ptr kc, std::auto_ptr backing); + void wan_atomic(const server_id& from, + const virtual_server_id& to, + uint64_t nonce, + uint64_t version, + std::auto_ptr kc, + std::auto_ptr backing); // These are called in response to messages from other hosts. void chain_op(const virtual_server_id& from, const virtual_server_id& to, diff --git a/daemon/wan_manager.cc b/daemon/wan_manager.cc new file mode 100644 index 00000000..584c4d0d --- /dev/null +++ b/daemon/wan_manager.cc @@ -0,0 +1,1320 @@ +// Copyright (c) 2015, Cornell University +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of HyperDex nor the names of its contributors may be +// used to endorse or promote products derived from this software without +// specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +#define __STDC_LIMIT_MACROS + +// C +#include +#include + +// POSIX +#include + +// STL +#include + +// Google Log +#include +#include + +// e +#include + +// HyperDex +#include "client/constants.h" +#include "common/coordinator_returncode.h" +#include "common/serialization.h" +#include +#include "daemon/daemon.h" +#include "daemon/datalayer_iterator.h" +#include "daemon/wan_manager.h" +#include "daemon/wan_manager_pending.h" +#include "daemon/wan_manager_transfer_in_state.h" +#include "daemon/wan_manager_transfer_out_state.h" + +using po6::threads::make_thread_wrapper; +using hyperdex::configuration; +using hyperdex::wan_manager; +using hyperdex::transfer_id; +using hyperdex::reconfigure_returncode; + +class wan_manager::background_thread : public ::hyperdex::background_thread +{ + public: + background_thread(wan_manager* wm); + ~background_thread() throw (); + + public: + virtual const char* thread_name(); + virtual bool have_work(); + virtual void copy_work(); + virtual void do_work(); + + public: + void kick(); + + private: + background_thread(const background_thread&); + background_thread& operator = (const background_thread&); + + private: + wan_manager* m_wm; + bool m_need_kickstart; +}; + +wan_manager :: wan_manager(daemon* d) + : m_daemon(d) + , m_poller(make_thread_wrapper(&wan_manager::background_maintenance, this)) + , m_coord() + , m_mtx() + , m_cond(&m_mtx) + , m_is_backup(false) + , m_poller_started(false) + , m_teardown(false) + , m_manual_teardown(false) + , m_deferred() + , m_locked(false) + , m_kill(false) + , m_to_kill() + , m_waiting(0) + , m_sleep(1000ULL * 1000ULL) + , m_online_id(-1) + , m_shutdown_requested(false) + , m_transfer_vids() + , m_xid(0) + , m_transfers_in() + , m_transfers_out() + , m_background_thread(new background_thread(this)) + , m_config() + , m_busybee_mapper(&m_config) + , m_busybee() + , m_link_thread(make_thread_wrapper(&wan_manager::run, this)) + , m_threads() + , m_paused(false) + , m_protect_pause() + , m_can_pause(&m_protect_pause) + , m_has_config(false) + , m_busybee_running(false) +{ +} + +wan_manager :: ~wan_manager() throw () +{ + if (m_manual_teardown) { + m_teardown = true; + m_busybee->shutdown(); + m_busybee_running = false; + for (size_t i = 0; i < m_threads.size(); ++i) { + m_threads[i]->join(); + } + m_background_thread->shutdown(); + m_transfers_in.clear(); + m_transfers_out.clear(); + m_link_thread.join(); + if (m_poller_started) + { + m_poller.join(); + } + } +} + +bool +wan_manager :: setup(const char* host, int64_t port) +{ + // XXX please fix this, this is really hacky + po6::net::location bind_to(host, port); + m_busybee.reset(new busybee_mta(&m_daemon->m_gc, &m_busybee_mapper, bind_to, + m_daemon->m_us.get(), 4)); + m_busybee_running = true; + m_link_thread.start(); + m_background_thread->start(); + for (size_t i = 0; i < 4; ++i) { + using namespace po6::threads; + e::compat::shared_ptr t(new thread(make_thread_wrapper(&wan_manager::loop, this))); + m_threads.push_back(t); + t->start(); + } + return true; +} + +void +wan_manager :: teardown() +{ + enter_critical_section(); + m_teardown = true; + exit_critical_section(); + m_busybee->shutdown(); + m_busybee_running = false; + for (size_t i = 0; i < m_threads.size(); ++i) { + m_threads[i]->join(); + } + m_background_thread->shutdown(); + m_transfers_in.clear(); + m_transfers_out.clear(); + m_link_thread.join(); + if (m_poller_started) + { + m_poller.join(); + } +} + +void +wan_manager :: wake_one() +{ + m_busybee->wake_one(); +} + +void +wan_manager :: set_coordinator_address(const char* host, uint16_t port) +{ + assert(!m_coord.get()); + m_coord.reset(new coordinator_link(host, port)); +} + +void +wan_manager :: set_is_backup(bool isbackup) +{ + m_is_backup = isbackup; +} + +void +wan_manager :: pause() +{ + if (m_teardown) { + LOG(INFO) << "should teardown, not pausing..."; + return; + } + + po6::threads::mutex::hold hold(&m_protect_pause); + + while (m_paused) { + if (m_teardown) { + LOG(INFO) << "should teardown, not pausing..."; + return; + } + m_can_pause.wait(); + } + + if (!m_teardown) { + m_paused = true; + m_background_thread->initiate_pause(); + m_background_thread->wait_until_paused(); + if (m_busybee_running) { + m_busybee->pause(); + } + } +} + +void +wan_manager :: unpause() +{ + po6::threads::mutex::hold hold(&m_protect_pause); + + m_background_thread->unpause(); + m_busybee->unpause(); + + assert(m_paused); + m_paused = false; + m_can_pause.signal(); +} + +void +wan_manager :: set_teardown() +{ + enter_critical_section(); + m_teardown = true; + m_manual_teardown = true; + exit_critical_section(); +} + +void +wan_manager :: kick() +{ + m_background_thread->kick(); +} + +void +wan_manager :: reconfigure(configuration *config) +{ + // XXX busybee deliver early messages + m_config = *config; + std::vector their_spaces = m_config.get_spaces(); + std::vector our_spaces = m_daemon->m_config.get_spaces(); + std::vector::iterator their; + std::vector::iterator our; + + for (their = their_spaces.begin(); their != their_spaces.end(); ++their) { + for (our = our_spaces.begin(); our != our_spaces.end(); ++our) { + if (*our == *their) { + continue; + } + } + m_daemon->m_coord.add_space(*their); + } +} + +void +wan_manager :: rm_all_spaces() +{ + std::vector spaces = m_daemon->m_config.get_spaces(); + std::vector::iterator it; + + for (it = spaces.begin(); it != spaces.end(); ++it) { + m_daemon->m_coord.rm_space(*it); + } +} + +wan_manager::transfer_in_state* +wan_manager :: get_tis(const transfer_id& xid) +{ + for (size_t i = 0; i < m_transfers_in.size(); ++i) + { + if (m_transfers_in[i]->xfer.id == xid) + { + return m_transfers_in[i].get(); + } + } + + return NULL; +} + +wan_manager::transfer_out_state* +wan_manager :: get_tos(const transfer_id& xid) +{ + for (size_t i = 0; i < m_transfers_out.size(); ++i) + { + if (m_transfers_out[i]->xfer.id == xid) + { + return m_transfers_out[i].get(); + } + } + + return NULL; +} + +void +wan_manager :: send_handshake_syn(const transfer& xfer) +{ + uint64_t timestamp(0); + size_t sz = HYPERDEX_HEADER_SIZE_SV + + sizeof(uint64_t) + + sizeof(uint64_t); + std::auto_ptr msg(e::buffer::create(sz)); + msg->pack_at(HYPERDEX_HEADER_SIZE_SV) << xfer.id.get() << timestamp; + // LOG(INFO) << "xfer id = " << xfer.id.get() << " timestamp = " << timestamp; + send(xfer.vdst, WAN_HS, msg); +} + +void +wan_manager :: send_ask_for_more(const transfer& xfer) +{ + size_t sz = HYPERDEX_HEADER_SIZE_SV + + sizeof(uint64_t); + std::auto_ptr msg(e::buffer::create(sz)); + msg->pack_at(HYPERDEX_HEADER_SIZE_SV) << xfer.id.get(); + // LOG(INFO) << "xfer id = " << xfer.id.get(); + send(xfer.vdst, WAN_MORE, msg); +} + +void +wan_manager :: send_object(const transfer& xfer, + pending* op) +{ + uint8_t flags = (op->has_value ? 1 : 0); + // LOG(INFO) << "op has value = " << op->has_value; + // LOG(INFO) << "flags= " << flags << " xfer id= " << xfer.id.get() << " seq_no= " << op->seq_no << " version= " << op->version; + size_t sz = HYPERDEX_HEADER_SIZE_SV + + sizeof(uint8_t) + + sizeof(uint64_t) + + sizeof(uint64_t) + + sizeof(uint64_t) + + sizeof(uint64_t) + + sizeof(uint32_t) + op->key.size() + + pack_size(op->value); + std::auto_ptr msg(e::buffer::create(sz)); + msg->pack_at(HYPERDEX_HEADER_SIZE_SV) << flags << xfer.id.get() << xfer.rid.get() << op->seq_no + << op->version << op->key << op->value; + m_daemon->m_comm.send_wan(xfer.vsrc, xfer.dst, WAN_XFER, msg); +} + +void +wan_manager :: send_ack(const transfer& xfer, uint64_t seq_no) +{ + uint8_t flags = 0; + size_t sz = HYPERDEX_HEADER_SIZE_SV + + sizeof(uint8_t) + + sizeof(uint64_t) + + sizeof(uint64_t); + std::auto_ptr msg(e::buffer::create(sz)); + msg->pack_at(HYPERDEX_HEADER_SIZE_SV) << flags << xfer.id.get() << seq_no; + // send_exact(xfer.vdst, xfer.vsrc, XFER_ACK, msg); +} + +wan_manager :: background_thread :: background_thread(wan_manager* wm) + : hyperdex::background_thread(wm->m_daemon) + , m_wm(wm) + , m_need_kickstart(false) +{ +} + +wan_manager :: background_thread :: ~background_thread() throw () +{ +} + +const char* +wan_manager :: background_thread :: thread_name() +{ + return "wan manager"; +} + +bool +wan_manager :: background_thread :: have_work() +{ + return m_need_kickstart; +} + +void +wan_manager :: background_thread :: copy_work() +{ + m_need_kickstart = false; +} + +void +wan_manager :: background_thread :: do_work() +{ + if (m_wm->m_is_backup) { + if (m_wm->m_has_config && m_wm->m_daemon->m_config.version() > 0) { + std::vector overlap = m_wm->config_space_overlap(m_wm->m_config, + m_wm->m_daemon->m_config); + m_wm->setup_transfer_state(overlap); + + for (size_t i = 0; i < m_wm->m_transfers_in.size(); ++i) { + po6::threads::mutex::hold hold2(&m_wm->m_transfers_in[i]->mtx); + transfer_in_state *tis = m_wm->m_transfers_in[i].get(); + virtual_server_id v_us = m_wm->m_daemon->m_config.get_virtual(tis->xfer.rid, + m_wm->m_daemon->m_us); + if (m_wm->m_daemon->m_config.is_point_leader(v_us)) { + m_wm->give_me_more_state(tis); + } + } + m_wm->wake_one(); + } + } +} + +void +wan_manager :: background_thread :: kick() +{ + this->lock(); + m_need_kickstart = true; + this->wakeup(); + this->unlock(); +} + +bool +wan_manager :: maintain_link() +{ + if (m_teardown) { + return false; + } + + enter_critical_section_killable(); + bool exit_status = false; + + if (!m_poller_started) + { + m_poller.start(); + m_poller_started = true; + } + + while (!m_teardown) { + int64_t id = -1; + replicant_returncode status = REPLICANT_GARBAGE; + + if (!m_deferred.empty()) { + id = m_deferred.front().first; + status = m_deferred.front().second; + m_deferred.pop(); + } else { + id = m_coord->loop(1000, &status); + } + + if (id < 0 && + (status == REPLICANT_TIMEOUT || + status == REPLICANT_INTERRUPTED)) + { + reset_sleep(); + exit_status = false; + break; + } + else if (id < 0 && (status == REPLICANT_BACKOFF || + status == REPLICANT_NEED_BOOTSTRAP)) + { + e::error err = m_coord->error(); + LOG(ERROR) << "coordinator disconnected: backing off before retrying"; + LOG(ERROR) << "details: " << err.msg() << " @ " << err.loc(); + do_sleep(); + exit_status = false; + break; + } + else if (id < 0 && status == REPLICANT_CLUSTER_JUMP) + { + e::error err = m_coord->error(); + LOG(ERROR) << "cluster jump: " << err.msg() << " @ " << err.loc(); + do_sleep(); + exit_status = false; + break; + } + else if (id < 0) + { + e::error err = m_coord->error(); + LOG(ERROR) << "coordinator error: " << err.msg() << " @ " << err.loc(); + do_sleep(); + exit_status = false; + break; + } + + reset_sleep(); + + if (id == INT64_MAX) { + exit_status = m_coord->config()->exists(m_daemon->m_us); + break; + } else { + e::error err = m_coord->error(); + LOG(ERROR) << "coordinator error: " << err.msg() << " @" << err.loc(); + do_sleep(); + exit_status = false; + break; + } + } + + exit_critical_section_killable(); + return exit_status; +} + +void +wan_manager :: copy_config(configuration* config) +{ + enter_critical_section(); + *config = *m_coord->config(); + exit_critical_section(); +} + +configuration +wan_manager :: get_config() +{ + return m_config; +} + +void +wan_manager :: loop() +{ + sigset_t ss; + + LOG(INFO) << "network thread started on for wan manager."; + + if (sigfillset(&ss) < 0) + { + PLOG(ERROR) << "sigfillset"; + return; + } + + sigdelset(&ss, SIGPROF); + + if (pthread_sigmask(SIG_SETMASK, &ss, NULL) < 0) + { + PLOG(ERROR) << "could not block signals"; + return; + } + + e::garbage_collector::thread_state ts; + m_daemon->m_gc.register_thread(&ts); + + server_id from; + virtual_server_id vfrom; + virtual_server_id vto; + network_msgtype type; + std::auto_ptr msg; + e::unpacker up; + + while (recv(&from, &vfrom, &vto, &type, &msg, &up)) + { + assert(from != server_id()); + assert(vto != virtual_server_id()); + + switch (type) + { + case WAN_HS: + // handle_handshake(from, vfrom, vto, msg, up); + break; + case WAN_XFER: + recv_data(from, vfrom, vto, msg, up); + break; + case WAN_MORE: + // send_more_data(from, vfrom, vto, msg, up); + break; + case WAN_ACK: + // handle_ack(from, vfrom, vto, msg, up); + break; + default: + LOG(INFO) << "received " << type << " message which wan network thread does not process"; + break; + } + + m_daemon->m_gc.quiescent_state(&ts); + } + + m_daemon->m_gc.deregister_thread(&ts); + LOG(INFO) << "network thread shutting down"; +} + +std::vector +wan_manager :: config_space_overlap(configuration primary, configuration backup) +{ + std::vector pspaces = primary.get_spaces(); + std::vector bspaces = backup.get_spaces(); + std::vector overlap; + for (std::vector::iterator pspace = pspaces.begin(); + pspace != pspaces.end(); ++pspace) { + for (std::vector::iterator bspace = bspaces.begin(); + bspace != bspaces.end(); ++bspace) { + if (*pspace == *bspace) { + overlap.push_back(*pspace); + } + } + } + return overlap; // primary spaces +} + +void +wan_manager :: setup_transfer_state(std::vector overlap) +{ + int i, j, k; + for (i = 0; i < overlap.size(); i++) { + hyperdex::space currspace = overlap[i]; + hyperdex::subspace currsub = currspace.subspaces[0]; + for (k = 0; k < currsub.regions.size(); k++) { + hyperdex::region reg = currsub.regions[k]; + virtual_server_id vid = m_config.tail_of_region(reg.id); + const bool is_in = m_transfer_vids.find(vid) != m_transfer_vids.end(); + if (!is_in) { + // XXX make wan_xfer, smaller # fields + transfer xfer; + xfer.vdst = vid; + xfer.rid = reg.id; + xfer.id = transfer_id(m_xid++); + transfer_in_state *tis = new transfer_in_state(xfer); + m_transfer_vids.insert(xfer.vdst); + m_transfers_in.push_back(tis); + } + } + } +} + +void +wan_manager :: give_me_more_state(transfer_in_state* tis) +{ + if (!tis->handshake_complete) { + send_handshake_syn(tis->xfer); + } else { + send_ask_for_more(tis->xfer); + } +} + +void +wan_manager :: handle_handshake(const server_id& from, + const virtual_server_id&, + const virtual_server_id& vto, + std::auto_ptr msg, + e::unpacker up) +{ + uint64_t xid; + uint64_t timestamp; + + if((up >> xid >> timestamp).error()) { + LOG(WARNING) << "unpack of WAN_HS failed; here's some hex: " << msg->hex(); + return; + } + + // LOG(INFO) << "xfer id = " << xid << " timestamp = " << timestamp; + // XXX make wan_xfer, smaller # fields + transfer xfer; + xfer.id = transfer_id(xid); + xfer.dst = from; + xfer.vsrc = vto; + + transfer_out_state *tos = new transfer_out_state(xfer); + po6::threads::mutex::hold hold(&tos->mtx); + bool wipe = false; + region_id curr_rid = m_daemon->m_config.get_region_id(vto); + tos->xfer.rid = curr_rid; + + std::auto_ptr iter; + iter.reset(m_daemon->m_data.replay_region_from_checkpoint(curr_rid, timestamp, &wipe)); + tos->wipe = wipe; + tos->iter = iter; + tos->window_sz = 1024; // XXX: magic number + m_transfers_out.push_back(tos); + transfer_more_state(tos); +} + +void +wan_manager :: transfer_more_state(transfer_out_state* tos) +{ + assert(tos->iter.get()); + + while (tos->window.size() < tos->window_sz && tos->iter->valid()) + { + e::intrusive_ptr op(new pending()); + op->seq_no = tos->next_seq_no; + ++tos->next_seq_no; + op->kref.assign(reinterpret_cast(tos->iter->key().data()), tos->iter->key().size()); + op->key = e::slice(op->kref); + + if (tos->iter->has_value()) + { + op->has_value = true; + + if (tos->iter->unpack_value(&op->value, &op->version, &op->vref) != datalayer::SUCCESS) + { + LOG(ERROR) << "error doing state transfer"; + break; + } + } + else + { + op->has_value = false; + op->version = 0; + } + + // tos->window.push_back(op); XXX: do we need to store ops at all? + send_object(tos->xfer, op.get()); + tos->iter->next(); + } +} + +void +wan_manager :: recv_data(const server_id& from, + const virtual_server_id& vfrom, + const virtual_server_id& vto, + std::auto_ptr msg, + e::unpacker up) +{ + uint8_t flags; + uint64_t xid; + uint64_t rid; + uint64_t seq_no; + uint64_t version; + e::slice key; + std::vector value; + + if ((up >> flags >> xid >> rid >> seq_no >> version >> key >> value).error()) + { + LOG(WARNING) << "unpack of WAN_XFER failed; here's some hex: " << msg->hex(); + return; + } + + bool has_value = flags & 1; + // LOG(INFO) << "flags = " << flags << " xid= " << xid << " rid= " << rid << " seq_no= " << seq_no << " version= " << version; + wan_xfer(transfer_id(xid), seq_no, has_value, version, msg, key, value, region_id(rid)); +} + +void +wan_manager :: wan_xfer(const transfer_id& xid, + uint64_t seq_no, + bool has_value, + uint64_t version, + std::auto_ptr msg, + const e::slice& key, + const std::vector& value, + const region_id rid) +{ + transfer_in_state* tis = get_tis(xid); + + if (!tis) + { + LOG(INFO) << "dropping WAN_XFER for " << xid << " which we don't know about"; + return; + } + + po6::threads::mutex::hold hold(&tis->mtx); + + tis->handshake_complete = true; + // if (tis->xfer.vsrc != from || tis->xfer.id != xid) + // { + // LOG(INFO) << "dropping XFER_OP that came from the wrong host"; + // return; + // } + + if (seq_no < tis->upper_bound_acked) + { + LOG(INFO) << "send ack"; + // return send_ack(tis->xfer, seq_no); + } + + std::list >::iterator where_to_put_it; + + for (where_to_put_it = tis->queued.begin(); + where_to_put_it != tis->queued.end(); ++where_to_put_it) + { + if ((*where_to_put_it)->seq_no == seq_no) + { + // silently drop it + return; + } + + if ((*where_to_put_it)->seq_no > seq_no) + { + break; + } + } + + e::intrusive_ptr op(new pending()); + op->seq_no = seq_no; + op->has_value = has_value; + op->version = version; + op->key = key; + op->value = value; + op->msg = msg; + tis->queued.insert(where_to_put_it, op); + + make_keychanges(tis); + // give_me_more_state(tis); +} + +void +wan_manager :: make_keychanges(transfer_in_state* tis) +{ + for (std::list >::iterator it = tis->queued.begin(); + it != tis->queued.end(); + ++it) { + + std::auto_ptr msg; + e::intrusive_ptr op = *it; + size_t header_sz = HYPERDEX_CLIENT_HEADER_SIZE_REQ + + pack_size(op->key); + + std::vector checks; // don't do anything with these + std::vector funcs; + const schema& sc(*m_config.get_schema(tis->xfer.rid)); + + for (size_t i = 0; i < op->value.size(); i++) { + attribute attr = sc.attrs[i+1]; // recall that key is first attr in schema + + funcall o; + o.attr = sc.lookup_attr(attr.name); + o.name = FUNC_SET; + o.arg1 = op->value[i]; + o.arg1_datatype = attr.type; + funcs.push_back(o); + } + + std::stable_sort(funcs.begin(), funcs.end()); + size_t sz = header_sz + + sizeof(uint8_t) + + pack_size(checks) + + pack_size(funcs); + msg.reset(e::buffer::create(sz)); + uint8_t flags = 0 | 0 | 128 | 0; // XXX magic + msg->pack_at(header_sz) << flags << checks << funcs; + msg->pack_at(HYPERDEX_CLIENT_HEADER_SIZE_REQ) << op->key; + + std::auto_ptr kc(new key_change()); + kc->key = op->key; + kc->erase = false; + kc->fail_if_not_found = false; + kc->fail_if_found = false; + kc->checks = checks; + kc->funcs = funcs; + uint64_t nonce; // pseudo-random memory + + std::vector pt_leaders; + m_daemon->m_config.point_leaders(m_daemon->m_us, &pt_leaders); + + for (int i = 0; i < pt_leaders.size(); ++i) { + region_id ri = pt_leaders[i]; + virtual_server_id pt_lead = m_daemon->m_config.point_leader(ri, kc->key); + if (pt_lead != virtual_server_id()) { + m_daemon->m_repl.wan_atomic(m_daemon->m_us, pt_lead, nonce, op->version, kc, msg); + break; + } + } + } +} + +void +wan_manager :: send_more_data(const server_id& from, + const virtual_server_id&, + const virtual_server_id& vto, + std::auto_ptr msg, + e::unpacker up) +{ + uint64_t xid; + + if((up >> xid).error()) { + LOG(WARNING) << "unpack of WAN_MORE failed; here's some hex: " << msg->hex(); + return; + } + // LOG(INFO) << "xfer id = " << xid; + transfer_out_state *tos = get_tos(transfer_id(xid)); + + if (!tos) { + LOG(INFO) << "dropping WAN_MORE for " << xid << " which we don't know about"; + return; + } + po6::threads::mutex::hold hold(&tos->mtx); + + transfer_more_state(tos); +} + +void +wan_manager :: handle_ack(const server_id& from, + const virtual_server_id& vfrom, + const virtual_server_id& vto, + std::auto_ptr msg, + e::unpacker up) +{ +} + +void +wan_manager :: run() +{ + if (m_is_backup) { + while (!m_daemon->m_coord.should_exit() && !m_teardown) { + if (!maintain_link()) { + LOG(INFO) << "WAN MANAGER COULD NOT MAINTAIN LINK"; + if (m_teardown) { + break; + } + } + + const configuration& old_pconfig(m_config); + configuration new_pconfig; + copy_config(&new_pconfig); + if (old_pconfig.version() < new_pconfig.version()) { + LOG(INFO) << "moving to cross configuration version=" << new_pconfig.version() + << " on this cluster."; + pause(); + reconfigure(&new_pconfig); + m_has_config = true; + if (!m_manual_teardown) { + unpause(); + } + } + } + } +} + +void +wan_manager :: handle_disruption() +{ + m_busybee_running = false; +} + +bool +wan_manager :: recv(server_id* from, + virtual_server_id* vfrom, + virtual_server_id* vto, + network_msgtype* msg_type, + std::auto_ptr* msg, + e::unpacker* up) +{ + while (true) + { + uint64_t id; + busybee_returncode rc = m_busybee->recv(&id, msg); + + switch (rc) + { + case BUSYBEE_SUCCESS: + break; + case BUSYBEE_SHUTDOWN: + LOG(INFO) << "busybee shutdown, exiting..."; + handle_disruption(); + return false; + case BUSYBEE_DISRUPTED: + LOG(INFO) << "busybee distrupted, exiting..."; + handle_disruption(); + return false; + break; + case BUSYBEE_INTERRUPTED: + LOG(INFO) << "busybee interrupted, exiting..."; + handle_disruption(); + return false; + break; + case BUSYBEE_POLLFAILED: + LOG(INFO) << "receive pollfailed"; + case BUSYBEE_ADDFDFAIL: + LOG(INFO) << "receive addfdfail"; + case BUSYBEE_TIMEOUT: + LOG(INFO) << "receive timeout"; + case BUSYBEE_EXTERNAL: + LOG(INFO) << "receive external"; + default: + LOG(ERROR) << "busybee unexpectedly returned " << rc; + continue; + } + + uint8_t mt; + uint8_t flags; + uint64_t version; + uint64_t vidf; + uint64_t vidt; + *up = (*msg)->unpack_from(BUSYBEE_HEADER_SIZE); + *up = *up >> mt >> flags >> version >> vidt; + *msg_type = static_cast(mt); + *from = server_id(id); + *vto = virtual_server_id(vidt); + + // if ((flags & 0x1)) + // { + // LOG(INFO) << "unpacked vidf"; + // *up = *up >> vidf; + // *vfrom = virtual_server_id(vidf); + // } + // else + // { + // *vfrom = virtual_server_id(); + // } + + if (up->error()) + { + LOG(WARNING) << "dropping message that has a malformed header; here's some hex: " << (*msg)->hex(); + continue; + } + + bool from_valid = true; + bool to_valid = m_daemon->m_us == m_daemon->m_config.get_server_id(*vto) || + *vto == virtual_server_id(UINT64_MAX); + + // If this is a virtual-virtual message, validate with other's config + if ((flags & 0x1)) + { + from_valid = *from == m_config.get_server_id(virtual_server_id(vidf)); + } + + // No matter what, wait for the config the sender saw + if (version > m_config.version()) + { + LOG(INFO) << "cross version = " << version; + LOG(INFO) << "our cross version = " << m_config.version(); + // early_message em(version, id, *msg); + // m_early_messages.push(em); + LOG(INFO) << "dropping early message in recv for now"; + continue; + } + + if ((flags & 0x2) && version < m_config.version()) + { + continue; + } + + if (from_valid && to_valid) + { +#ifdef HD_LOG_ALL_MESSAGES + LOG(INFO) << "RECV " << *from << "/" << *vfrom << "->" << *vto << " " << *msg_type << " " << (*msg)->hex(); +#endif + return true; + } + + // Shove the message back at the client so it fails with a reconfigure. + if (!(flags & 0x1)) + { + LOG(INFO) << "CONFIGMISMATCH"; + LOG(INFO) << "from_valid = " << from_valid << " to_valid = " << to_valid; + // mt = static_cast(CONFIGMISMATCH); + // (*msg)->pack_at(BUSYBEE_HEADER_SIZE) << mt; + // m_busybee->send(id, *msg); + } + } +} + +bool +wan_manager :: send(const virtual_server_id& from, + const server_id& to, + network_msgtype msg_type, + std::auto_ptr msg) +{ + assert(msg->size() >= HYPERDEX_HEADER_SIZE_VV); + + if (m_daemon->m_us != m_daemon->m_config.get_server_id(from)) + { + return false; + } + + uint8_t mt = static_cast(msg_type); + uint8_t flags = 1; + virtual_server_id vto(UINT64_MAX); + msg->pack_at(BUSYBEE_HEADER_SIZE) << mt << flags << m_daemon->m_config.version() << vto.get() << from.get(); + + if (to == server_id()) + { + return false; + } + +#ifdef HD_LOG_ALL_MESSAGES + LOG(INFO) << "SEND " << from << "->" << to << " " << msg_type << " " << msg->hex(); +#endif + + if (to == m_daemon->m_us) + { + m_busybee->deliver(to.get(), msg); + } + else + { + busybee_returncode rc = m_busybee->send(to.get(), msg); + + switch (rc) + { + case BUSYBEE_SUCCESS: + break; + case BUSYBEE_DISRUPTED: + handle_disruption(); + return false; + case BUSYBEE_SHUTDOWN: + case BUSYBEE_POLLFAILED: + case BUSYBEE_ADDFDFAIL: + case BUSYBEE_TIMEOUT: + case BUSYBEE_EXTERNAL: + case BUSYBEE_INTERRUPTED: + default: + LOG(ERROR) << "BusyBee unexpectedly returned " << rc; + return false; + } + } + + return true; +} + +bool +wan_manager :: send(const virtual_server_id& vto, + network_msgtype msg_type, + std::auto_ptr msg) +{ + assert(msg->size() >= HYPERDEX_HEADER_SIZE_SV); + + uint8_t mt = static_cast(msg_type); + uint8_t flags = 0; + msg->pack_at(BUSYBEE_HEADER_SIZE) << mt << flags << m_config.version() << vto.get(); + server_id to = m_config.get_server_id(vto); + + if (to == server_id()) + { + return false; + } + +#ifdef HD_LOG_ALL_MESSAGES + LOG(INFO) << "SEND ->" << vto << " " << msg_type << " " << msg->hex(); +#endif + + if (to == m_daemon->m_us) + { + m_busybee->deliver(to.get(), msg); + } + else + { + busybee_returncode rc = m_busybee->send(to.get(), msg); + + switch (rc) + { + case BUSYBEE_SUCCESS: + break; + case BUSYBEE_DISRUPTED: + handle_disruption(); + return false; + case BUSYBEE_SHUTDOWN: + case BUSYBEE_POLLFAILED: + case BUSYBEE_ADDFDFAIL: + case BUSYBEE_TIMEOUT: + case BUSYBEE_EXTERNAL: + case BUSYBEE_INTERRUPTED: + default: + LOG(ERROR) << "BusyBee unexpectedly returned " << rc; + return false; + } + } + + return true; +} + +void +wan_manager :: background_maintenance() +{ + sigset_t ss; + + if (sigfillset(&ss) < 0) + { + PLOG(ERROR) << "sigfillset"; + return; + } + + if (pthread_sigmask(SIG_BLOCK, &ss, NULL) < 0) + { + PLOG(ERROR) << "could not block signals"; + return; + } + + while (true) + { + enter_critical_section_background(); + + if (m_teardown) + { + break; + } + + replicant_returncode status = REPLICANT_GARBAGE; + int64_t id = m_coord->loop(1000, &status); + + if (status != REPLICANT_TIMEOUT && status != REPLICANT_INTERRUPTED) + { + m_deferred.push(std::make_pair(id, status)); + } + + exit_critical_section(); + } +} + +void +wan_manager :: do_sleep() +{ + uint64_t sleep = m_sleep; + timespec ts; + + while (sleep > 0) + { + ts.tv_sec = 0; + ts.tv_nsec = std::min(static_cast(10 * 1000ULL * 1000ULL), sleep); + sigset_t empty_signals; + sigset_t old_signals; + sigemptyset(&empty_signals); // should never fail + pthread_sigmask(SIG_SETMASK, &empty_signals, &old_signals); // should never fail + nanosleep(&ts, NULL); // nothing to gain by checking output + pthread_sigmask(SIG_SETMASK, &old_signals, NULL); // should never fail + sleep -= ts.tv_nsec; + } + + m_sleep = std::min(static_cast(1000ULL * 1000ULL * 1000ULL), m_sleep * 2); +} + +void +wan_manager :: reset_sleep() +{ + uint64_t start_sleep = 1000ULL * 1000ULL; + + if (m_sleep != start_sleep) + { + m_sleep = start_sleep; + LOG(INFO) << "connection to coordinator reestablished"; + } +} + +void +wan_manager :: enter_critical_section() +{ + po6::threads::mutex::hold hold(&m_mtx); + + while (m_locked) + { + if (m_kill) + { + pthread_kill(m_to_kill, SIGUSR1); + } + + ++m_waiting; + m_cond.wait(); + --m_waiting; + } + + m_locked = true; + m_kill = false; +} + +void +wan_manager :: exit_critical_section() +{ + po6::threads::mutex::hold hold(&m_mtx); + m_locked = false; + m_kill = false; + + if (m_waiting > 0) + { + m_cond.broadcast(); + } +} + +void +wan_manager :: enter_critical_section_killable() +{ + po6::threads::mutex::hold hold(&m_mtx); + + while (m_locked) + { + if (m_kill) + { + pthread_kill(m_to_kill, SIGUSR1); + } + + ++m_waiting; + m_cond.wait(); + --m_waiting; + } + + m_locked = true; + m_kill = true; + m_to_kill = pthread_self(); +} + +void +wan_manager :: exit_critical_section_killable() +{ + po6::threads::mutex::hold hold(&m_mtx); + m_locked = false; + m_kill = false; + + if (m_waiting > 0) + { + m_cond.broadcast(); + } +} + +void +wan_manager :: enter_critical_section_background() +{ + po6::threads::mutex::hold hold(&m_mtx); + + while (m_locked || m_waiting > 0) + { + ++m_waiting; + m_cond.wait(); + --m_waiting; + } + + m_locked = true; + m_kill = true; + m_to_kill = pthread_self(); +} diff --git a/daemon/wan_manager.h b/daemon/wan_manager.h new file mode 100644 index 00000000..f8928464 --- /dev/null +++ b/daemon/wan_manager.h @@ -0,0 +1,228 @@ +// Copyright (c) 2015, Cornell University +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of HyperDex nor the names of its contributors may be +// used to endorse or promote products derived from this software without +// specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +#ifndef hyperdex_daemon_wan_manager_h_ +#define hyperdex_daemon_wan_manager_h_ + +// C +#include + +// STL +#include +#include +#include +#include + +// po6 +#include +#include +#include +#include + +// BusyBee +#include +#include + +// e +#include +#include + +// HyperDex +#include "namespace.h" +#include "common/configuration.h" +#include "daemon/background_thread.h" +#include "common/coordinator_link.h" +#include "common/ids.h" +#include "common/mapper.h" +#include "common/network_msgtype.h" +#include "daemon/reconfigure_returncode.h" + +#define HYPERDEX_HEADER_SIZE_VC (BUSYBEE_HEADER_SIZE \ + + sizeof(uint8_t) /*message type*/ \ + + sizeof(uint64_t) /*virt from*/) +#define HYPERDEX_HEADER_SIZE_SV (BUSYBEE_HEADER_SIZE \ + + sizeof(uint8_t) /*message type*/ \ + + sizeof(uint8_t) /*flags*/ \ + + sizeof(uint64_t) /*config version*/ \ + + sizeof(uint64_t) /*virt to*/) +#define HYPERDEX_HEADER_SIZE_VV (BUSYBEE_HEADER_SIZE \ + + sizeof(uint8_t) /*message type*/ \ + + sizeof(uint8_t) /*flags*/ \ + + sizeof(uint64_t) /*config version*/ \ + + sizeof(uint64_t) /*virt to*/ \ + + sizeof(uint64_t) /*virt from*/) +#define HYPERDEX_HEADER_SIZE_VS HYPERDEX_HEADER_SIZE_VV + +BEGIN_HYPERDEX_NAMESPACE +class daemon; + +class wan_manager +{ + public: + wan_manager(daemon* d); + ~wan_manager() throw (); + + // primary coordinator link + public: + void set_coordinator_address(const char* host, uint16_t port); + void set_is_backup(bool isbackup); + bool maintain_link(); + void copy_config(configuration* config); + configuration get_config(); + + private: + void background_maintenance(); + void do_sleep(); + void reset_sleep(); + void enter_critical_section(); + void exit_critical_section(); + void enter_critical_section_killable(); + void enter_critical_section_background(); + void exit_critical_section_killable(); + + // state pull from primary + public: + void kick(); + void wake_one(); + bool setup(const char* host, int64_t port); + void teardown(); + void set_teardown(); + void pause(); + void unpause(); + void reconfigure(configuration *config); + void rm_all_spaces(); + void handle_handshake(const server_id& from, + const virtual_server_id& vfrom, + const virtual_server_id& vto, + std::auto_ptr msg, + e::unpacker up); + void recv_data(const server_id& from, + const virtual_server_id& vfrom, + const virtual_server_id& vto, + std::auto_ptr msg, + e::unpacker up); + void send_more_data(const server_id& from, + const virtual_server_id& vfrom, + const virtual_server_id& vto, + std::auto_ptr msg, + e::unpacker up); + void handle_ack(const server_id& from, + const virtual_server_id& vfrom, + const virtual_server_id& vto, + std::auto_ptr msg, + e::unpacker up); + private: + class pending; + class transfer_in_state; + class transfer_out_state; + class background_thread; + + private: + void handle_disruption(); + void run(); + std::vector config_space_overlap(configuration primary, configuration backup); + void setup_transfer_state(std::vector overlap); + // msg loop and handling fns + void loop(); + void wan_xfer(const transfer_id& xid, + uint64_t seq_no, + bool has_value, + uint64_t version, + std::auto_ptr msg, + const e::slice& key, + const std::vector& value, + const region_id rid); + void make_keychanges(transfer_in_state* tis); + bool recv(server_id* from, + virtual_server_id* vfrom, + virtual_server_id* vto, + network_msgtype* msg_type, + std::auto_ptr* msg, + e::unpacker* up); + bool send(const virtual_server_id& to, + network_msgtype msg_type, + std::auto_ptr msg); + bool send(const virtual_server_id& from, + const server_id& to, + network_msgtype msg_type, + std::auto_ptr msg); + void transfer_more_state(transfer_out_state* tos); + void give_me_more_state(transfer_in_state* tis); + // get the appropriate state + transfer_in_state* get_tis(const transfer_id& xid); + transfer_out_state* get_tos(const transfer_id& xid); + // network message fns + void send_handshake_syn(const transfer& xfer); + void send_ask_for_more(const transfer& xfer); + void send_object(const transfer& xfer, pending* op); + void send_ack(const transfer& xfer, uint64_t seq_id); + + private: + wan_manager(const wan_manager&); + wan_manager& operator = (const wan_manager&); + + private: + daemon* m_daemon; + po6::threads::thread m_poller; + std::auto_ptr m_coord; + po6::threads::mutex m_mtx; + po6::threads::cond m_cond; + bool m_is_backup; + bool m_poller_started; + bool m_teardown; + bool m_manual_teardown; + std::queue > m_deferred; + bool m_locked; + bool m_kill; + pthread_t m_to_kill; + uint64_t m_waiting; + uint64_t m_sleep; + int64_t m_online_id; + bool m_shutdown_requested; + // background transfer + std::set m_transfer_vids; + uint64_t m_xid; + std::vector > m_transfers_in; + std::vector > m_transfers_out; + const std::auto_ptr m_background_thread; + // busybee + configuration m_config; + mapper m_busybee_mapper; + std::auto_ptr m_busybee; + po6::threads::thread m_link_thread; + std::vector > m_threads; + bool m_paused; + po6::threads::mutex m_protect_pause; + po6::threads::cond m_can_pause; + bool m_has_config; + bool m_busybee_running; + +}; + +END_HYPERDEX_NAMESPACE + +#endif /* end of include guard: hyperdex_daemon_wan_manager_h_ */ diff --git a/daemon/wan_manager_pending.cc b/daemon/wan_manager_pending.cc new file mode 100644 index 00000000..2bbfe959 --- /dev/null +++ b/daemon/wan_manager_pending.cc @@ -0,0 +1,51 @@ +// Copyright (c) 2012, Cornell University +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of HyperDex nor the names of its contributors may be +// used to endorse or promote products derived from this software without +// specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +#define __STDC_LIMIT_MACROS + +// HyperDex +#include "daemon/wan_manager_pending.h" + +using hyperdex::wan_manager; + +wan_manager :: wan_manager :: pending :: pending() + : seq_no(0) + , has_value(false) + , version(0) + , key() + , value() + , acked(false) + , msg() + , kref() + , vref() + , m_ref(0) +{ +} + +wan_manager :: wan_manager :: pending :: ~pending() throw () +{ +} diff --git a/daemon/wan_manager_pending.h b/daemon/wan_manager_pending.h new file mode 100644 index 00000000..804ca10d --- /dev/null +++ b/daemon/wan_manager_pending.h @@ -0,0 +1,63 @@ +// Copyright (c) 2012, Cornell University +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of HyperDex nor the names of its contributors may be +// used to endorse or promote products derived from this software without +// specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +#ifndef hyperdex_daemon_wan_manager_pending_h_ +#define hyperdex_daemon_wan_manager_pending_h_ + +// HyperDex +#include "daemon/datalayer.h" +#include "daemon/wan_manager.h" + +class hyperdex::wan_manager::pending +{ + public: + pending(); + ~pending() throw (); + + public: + uint64_t seq_no; + bool has_value; + uint64_t version; + e::slice key; + std::vector value; + bool acked; + std::auto_ptr msg; + std::string kref; + datalayer::reference vref; + + private: + friend class e::intrusive_ptr; + + private: + void inc() { ++m_ref; } + void dec() { --m_ref; if (m_ref == 0) delete this; } + + private: + size_t m_ref; +}; + +#endif /* end of include guard: hyperdex_daemon_wan_manager_pending_h_ */ diff --git a/daemon/wan_manager_transfer_in_state.cc b/daemon/wan_manager_transfer_in_state.cc new file mode 100644 index 00000000..f949c112 --- /dev/null +++ b/daemon/wan_manager_transfer_in_state.cc @@ -0,0 +1,65 @@ +// Copyright (c) 2012, Cornell University +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of HyperDex nor the names of its contributors may be +// used to endorse or promote products derived from this software without +// specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +#define __STDC_LIMIT_MACROS + +// Google Log +#include + +// HyperDex +#include "daemon/datalayer.h" +#include "daemon/datalayer_iterator.h" +#include "daemon/wan_manager_pending.h" +#include "daemon/wan_manager_transfer_in_state.h" + +using hyperdex::wan_manager; + +wan_manager :: transfer_in_state :: transfer_in_state(const transfer& _xfer) + : xfer(_xfer) + , mtx() + , upper_bound_acked(1) + , queued() + , handshake_complete(false) + , wipe(false) + , wiped(false) + , m_ref(0) +{ +} + +wan_manager :: transfer_in_state :: ~transfer_in_state() throw () +{ +} + +void +wan_manager :: transfer_in_state :: debug_dump() +{ + po6::threads::mutex::hold hold(&mtx); + LOG(INFO) << " transfer=" << xfer; + LOG(INFO) << " upper_bound_acked=" << upper_bound_acked; + LOG(INFO) << " wipe=" << wipe; + LOG(INFO) << " wiped=" << wiped; +} diff --git a/daemon/wan_manager_transfer_in_state.h b/daemon/wan_manager_transfer_in_state.h new file mode 100644 index 00000000..bc3f7784 --- /dev/null +++ b/daemon/wan_manager_transfer_in_state.h @@ -0,0 +1,66 @@ +// Copyright (c) 2012, Cornell University +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of HyperDex nor the names of its contributors may be +// used to endorse or promote products derived from this software without +// specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +#ifndef hyperdex_daemon_wan_manager_transfer_in_state_h_ +#define hyperdex_daemon_wan_manager_transfer_in_state_h_ + +// e +#include + +// HyperDex +#include "daemon/wan_manager.h" + +class hyperdex::wan_manager::transfer_in_state +{ + public: + transfer_in_state(const transfer& xfer); + ~transfer_in_state() throw (); + + public: + void debug_dump(); + + public: + transfer xfer; + po6::threads::mutex mtx; + uint64_t upper_bound_acked; + std::list > queued; + bool handshake_complete; + bool wipe; + bool wiped; + + private: + friend class e::intrusive_ptr; + + private: + void inc() { __sync_add_and_fetch(&m_ref, 1); } + void dec() { if (__sync_sub_and_fetch(&m_ref, 1) == 0) delete this; } + + private: + size_t m_ref; +}; + +#endif /* end of include guard: hyperdex_daemon_wan_manager_transfer_in_state_h_ */ diff --git a/daemon/wan_manager_transfer_out_state.cc b/daemon/wan_manager_transfer_out_state.cc new file mode 100644 index 00000000..69e3ca92 --- /dev/null +++ b/daemon/wan_manager_transfer_out_state.cc @@ -0,0 +1,64 @@ +// Copyright (c) 2012, Cornell University +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of HyperDex nor the names of its contributors may be +// used to endorse or promote products derived from this software without +// specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +#define __STDC_LIMIT_MACROS + +// Google Log +#include + +// HyperDex +#include "daemon/datalayer_iterator.h" +#include "daemon/wan_manager_pending.h" +#include "daemon/wan_manager_transfer_out_state.h" + +using hyperdex::wan_manager; + +wan_manager :: transfer_out_state :: transfer_out_state(const transfer& _xfer) + : xfer(_xfer) + , mtx() + , next_seq_no(1) + , window() + , window_sz(1) + , iter() + , wipe(false) + , m_ref(0) +{ +} + +wan_manager :: transfer_out_state :: ~transfer_out_state() throw () +{ +} + +void +wan_manager :: transfer_out_state :: debug_dump() +{ + po6::threads::mutex::hold hold(&mtx); + LOG(INFO) << " transfer=" << xfer; + LOG(INFO) << " next_seq_no=" << next_seq_no; + LOG(INFO) << " window_sz=" << window_sz; + LOG(INFO) << " wipe=" << wipe; +} diff --git a/daemon/wan_manager_transfer_out_state.h b/daemon/wan_manager_transfer_out_state.h new file mode 100644 index 00000000..5c5f36d8 --- /dev/null +++ b/daemon/wan_manager_transfer_out_state.h @@ -0,0 +1,80 @@ +// Copyright (c) 2012, Cornell University +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of HyperDex nor the names of its contributors may be +// used to endorse or promote products derived from this software without +// specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +#ifndef hyperdex_daemon_wan_manager_transfer_out_state_h_ +#define hyperdex_daemon_wan_manager_transfer_out_state_h_ + +// STL +#include + +// po6 +#include + +// e +#include + +// HyperDex +#include "daemon/datalayer.h" +#include "daemon/wan_manager.h" + +using hyperdex::wan_manager; + +class wan_manager::transfer_out_state +{ + public: + transfer_out_state(const transfer& xfer); + ~transfer_out_state() throw (); + + public: + void debug_dump(); + + public: + transfer xfer; + po6::threads::mutex mtx; + uint64_t next_seq_no; + std::list > window; + size_t window_sz; + std::auto_ptr iter; + bool wipe; + + private: + friend class e::intrusive_ptr; + + private: + void inc() { __sync_add_and_fetch(&m_ref, 1); } + void dec() { if (__sync_sub_and_fetch(&m_ref, 1) == 0) delete this; } + + private: + size_t m_ref; + + private: + transfer_out_state(const transfer_out_state&); + transfer_out_state& operator = (const transfer_out_state&); +}; + + +#endif /* end of include guard: hyperdex_daemon_wan_manager_transfer_out_state_h_ */ diff --git a/doc/wan-manager.tex b/doc/wan-manager.tex new file mode 100644 index 00000000..3584dc13 --- /dev/null +++ b/doc/wan-manager.tex @@ -0,0 +1,128 @@ +\section{Georeplication with HyperDex} +\label{sec:georepl} + +HyperDex has a new feature called "georeplication" in development to increase the +fault-tolerance of a typical HyperDex deployment to protect against catastrophic +datacenter events, such as natural disasters or power outages. Suppose a user +has a HyperDex deployment running in Virginia that serves all of the +\code{get}, \code{put}, etc operations for the user's service. This cluster +is scalable and fault tolerant in the case of a few machines failing and +rebooting and/or being replaced; however, if the whole datacenter were +disabled, the cluster would obviously stop running. + +Using georeplication in HyperDex, the user could have both the +"primary" Virginia cluster running and a secondary "backup" cluster +running in Arkansas that mirrors all the \code{put} operations +that are performed at the primary in Virginia. There is a discrete +time difference between the \code{put}s reflected in the primary +and the \code{put}s reflected in the backup, but in the limit all +\code{put}s to the primary will be reflected in the backup. + +\section{Starting the Georeplicator} +\label{sec:startgeo} + +To set up a primary cluster to be georeplicated, we first have to +have the primary cluster running. We start the coordinator +and a single (or more) daemon with: + +\begin{consolecode} +hyperdex coordinator -f -l 127.0.0.1 -p 1982 +\end{consolecode} + +and + +\begin{consolecode} +hyperdex daemon -f --listen=127.0.0.1 --listen-port=2012 \ + --coordinator=127.0.0.1 --coordinator-port=1982 --data=/path/to/data +\end{consolecode} + +This will be our primary cluster. We then have to initialize our backup cluster +as well by doing: + +\begin{consolecode} +hyperdex coordinator -f -l 127.0.0.1 -p 1984 +\end{consolecode} + +and + +\begin{consolecode} +hyperdex daemon -f --listen=127.0.0.1 --listen-port=2014 \ + --coordinator=127.0.0.1 --coordinator-port=1984 --data=/path/to/data +\end{consolecode} + +This is our backup cluster. For real georeplication, we would want this cluster +to be geographically separate from the primary, but for the purposes of a demo +they can both be run on localhost. + +All HyperDex clusters started in this +manner are actually in primary mode, meaning that the code that pulls +\code{put}s from a primary and replicates them is not running yet. Hence +both of these clusters are primary clusters so far. Now, let's point our backup +cluster to the primary we want to georeplicate and have it pull the \code{put}s! +We do this by running the "hyperdex-set-backup-cluster" tool: + +\begin{consolecode} + hyperdex-set-backup-cluster -h 127.0.0.1 -p 1984 -r 127.0.0.1 -t 1982 +\end{consolecode} + +This tool contacts the backup cluster coordinator (port 1984) and tells +it to start pulling updates from the primary cluster (port 1982) and +replicate them. At this point we have the backup cluster set up to +replicate all the changes to the primary! + +\section{Datacenter Catastrophe} +\label{sec:catastrophe} + +As time moves along, the backup cluster we set up in the previous +section will get closer and closer to an exact replica of the primary. +Suppose now, that a major catastrophic event occurs to the primary, +taking the entire cluster out. At this point, we can move the backup cluster +to be the primary cluster (and no longer pull updates from anyone else) by +running: + +\begin{consolecode} + hyperdex-set-primary-cluster -h 127.0.0.1 -p 1984 +\end{consolecode} + +which sets the cluster at port 1984 to be a primary cluster. The user +running HyperDex would also have to redirect all his HyperDex clients to point +to port 1984 to make their \code{get}s, \code{put}s, etc for the conversion +to be complete. + +\section{Changing Backup Affinity} +\label{sec:affinity} + +Suppose now we had another primary cluster running at port 1986 with: + +\begin{consolecode} +hyperdex coordinator -f -l 127.0.0.1 -p 1986 +\end{consolecode} + +and + +\begin{consolecode} +hyperdex daemon -f --listen=127.0.0.1 --listen-port=2016 \ + --coordinator=127.0.0.1 --coordinator-port=1986 --data=/path/to/data +\end{consolecode} + +along with our other primary and backup clusters. Also suppose we already +had the backup cluster (port 1984) pulling changes from port 1982, but instead +we wanted to start backing up the cluster at port 1986 instead for some reason +(higher priority data, etc). We could use the "hyperdex-set-backup-affinity" +tool to redirect the backup to forget about the primary it was backing up +and to back up another primary instead. To do this, we run: + +\begin{consolecode} + hyperdex-set-backup-affinity -h 127.0.0.1 -p 1984 -r 127.0.0.1 -t 1986 +\end{consolecode} + +This tool contacts the backup cluster coordinator at port 1984 and tells it +to starting backing up the cluster at port 1986 instead of the cluster +it is currently backing up. + +\section{Georeplication Conclusion} +\label{sec:geoconc} + +The tools given allow users to flexibly and reliably georeplicate HyperDex clusters +to their needs, allowing performant, reliable, scalable, and linearizable data +storage at their fingertips. diff --git a/include/hyperdex/admin.h b/include/hyperdex/admin.h index 1b7a33fb..dc5de9fb 100644 --- a/include/hyperdex/admin.h +++ b/include/hyperdex/admin.h @@ -90,6 +90,23 @@ hyperdex_admin_read_only(struct hyperdex_admin* admin, int ro, enum hyperdex_admin_returncode* status); +int64_t +hyperdex_admin_set_primary_cluster(struct hyperdex_admin* admin, + int prim, + enum hyperdex_admin_returncode* status); + +int64_t +hyperdex_admin_set_backup_cluster(struct hyperdex_admin* admin, + const char* host, + const int64_t port, + enum hyperdex_admin_returncode* status); + +int64_t +hyperdex_admin_set_backup_affinity(struct hyperdex_admin* admin, + const char* host, + const int64_t port, + enum hyperdex_admin_returncode* status); + int64_t hyperdex_admin_wait_until_stable(struct hyperdex_admin* admin, enum hyperdex_admin_returncode* status); diff --git a/include/hyperdex/admin.hpp b/include/hyperdex/admin.hpp index adbe3977..1d8b8a87 100644 --- a/include/hyperdex/admin.hpp +++ b/include/hyperdex/admin.hpp @@ -59,6 +59,12 @@ class Admin { return hyperdex_admin_dump_config(m_adm, status, config); } int64_t read_only(int ro, enum hyperdex_admin_returncode* status) { return hyperdex_admin_read_only(m_adm, ro, status); } + int64_t set_primary_cluster(int prim, enum hyperdex_admin_returncode* status) + { return hyperdex_admin_set_primary_cluster(m_adm, prim, status); } + int64_t set_backup_cluster(const char* host, const int64_t port, enum hyperdex_admin_returncode* status) + { return hyperdex_admin_set_backup_cluster(m_adm, host, port, status); } + int64_t set_backup_affinity(const char* host, const int64_t port, enum hyperdex_admin_returncode* status) + { return hyperdex_admin_set_backup_affinity(m_adm, host, port, status); } int64_t wait_until_stable(enum hyperdex_admin_returncode* status) { return hyperdex_admin_wait_until_stable(m_adm, status); } int64_t fault_tolerance(const char* space, uint64_t ft, diff --git a/make_spaces.py b/make_spaces.py new file mode 100644 index 00000000..8d354df7 --- /dev/null +++ b/make_spaces.py @@ -0,0 +1,21 @@ +import hyperdex.admin +import hyperdex.client +import time + +a = hyperdex.admin.Admin('127.0.0.1', 1982) + +a.add_space(''' + space phonebook + key username + attributes first, last, int phone + subspace first, last, phone + create 8 partitions + tolerate 2 failures + ''') + +c = hyperdex.client.Client('127.0.0.1', 1982) +c.put('phonebook', 'jsmith1', {'first': 'John', 'last': 'Smith', 'phone': 2228675309}) +c.put('phonebook', 'jtk54', {'first': 'Jacob', 'last': 'Kiefer', 'phone': 5556079876}) + +for i in range(66): + c.put('phonebook', 'kek' + str(i), {'first': 'Kook', 'last': 'Kookerson', 'phone': i}) diff --git a/tools/set-backup-affinity.cc b/tools/set-backup-affinity.cc new file mode 100644 index 00000000..a62b9ad4 --- /dev/null +++ b/tools/set-backup-affinity.cc @@ -0,0 +1,100 @@ +// Copyright (c) 2015, Cornell University +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of HyperDex nor the names of its contributors may be +// used to endorse or promote products derived from this software without +// specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +// C +#include + +// HyperDex +#include +#include "tools/common.h" + +int +main(int argc, const char* argv[]) +{ + const char* host = "."; + long port = 0; + hyperdex::connect_opts conn; + e::argparser ap; + ap.autohelp(); + ap.option_string("[OPTIONS] "); + ap.add("Connect to a cluster:", conn.parser()); + ap.arg().name('r', "primary-host") + .description("connect to primary cluster at this ip") + .metavar("p_host").as_string(&host); + ap.arg().name('t', "primary-port") + .description("port on host to connect to") + .metavar("p_port").as_long(&port); + + if (!ap.parse(argc, argv)) + { + return EXIT_FAILURE; + } + + if (!conn.validate()) + { + std::cerr << "invalid host:port specification\n" << std::endl; + ap.usage(); + return EXIT_FAILURE; + } + + try + { + hyperdex::Admin h(conn.host(), conn.port()); + hyperdex_admin_returncode rrc; + int64_t rid = h.set_backup_affinity(host, port, &rrc); + + if (rid < 0) + { + std::cerr << "could not make cluster backup cluster: " << rrc << std::endl; + return EXIT_FAILURE; + } + + hyperdex_admin_returncode lrc; + int64_t lid = h.loop(-1, &lrc); + + if (lid < 0) + { + std::cerr << "could not make cluster backup cluster: " << lrc << std::endl; + return EXIT_FAILURE; + } + + assert(rid == lid); + + if (rrc != HYPERDEX_ADMIN_SUCCESS) + { + std::cerr << "could not make cluster backup cluster: " << rrc << std::endl; + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; + } + catch (std::exception& e) + { + std::cerr << "error: " << e.what() << std::endl; + return EXIT_FAILURE; + } +} diff --git a/tools/set-backup-cluster.cc b/tools/set-backup-cluster.cc new file mode 100644 index 00000000..99cdcd19 --- /dev/null +++ b/tools/set-backup-cluster.cc @@ -0,0 +1,100 @@ +// Copyright (c) 2015, Cornell University +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of HyperDex nor the names of its contributors may be +// used to endorse or promote products derived from this software without +// specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +// C +#include + +// HyperDex +#include +#include "tools/common.h" + +int +main(int argc, const char* argv[]) +{ + const char* host = "."; + long port = 0; + hyperdex::connect_opts conn; + e::argparser ap; + ap.autohelp(); + ap.option_string("[OPTIONS] "); + ap.add("Connect to a cluster:", conn.parser()); + ap.arg().name('r', "primary-host") + .description("connect to primary cluster at this ip") + .metavar("p_host").as_string(&host); + ap.arg().name('t', "primary-port") + .description("port on host to connect to") + .metavar("p_port").as_long(&port); + + if (!ap.parse(argc, argv)) + { + return EXIT_FAILURE; + } + + if (!conn.validate()) + { + std::cerr << "invalid host:port specification\n" << std::endl; + ap.usage(); + return EXIT_FAILURE; + } + + try + { + hyperdex::Admin h(conn.host(), conn.port()); + hyperdex_admin_returncode rrc; + int64_t rid = h.set_backup_cluster(host, port, &rrc); + + if (rid < 0) + { + std::cerr << "could not make cluster backup cluster: " << rrc << std::endl; + return EXIT_FAILURE; + } + + hyperdex_admin_returncode lrc; + int64_t lid = h.loop(-1, &lrc); + + if (lid < 0) + { + std::cerr << "could not make cluster backup cluster: " << lrc << std::endl; + return EXIT_FAILURE; + } + + assert(rid == lid); + + if (rrc != HYPERDEX_ADMIN_SUCCESS) + { + std::cerr << "could not make cluster backup cluster: " << rrc << std::endl; + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; + } + catch (std::exception& e) + { + std::cerr << "error: " << e.what() << std::endl; + return EXIT_FAILURE; + } +} diff --git a/tools/set-primary-cluster.cc b/tools/set-primary-cluster.cc new file mode 100644 index 00000000..52d80ab0 --- /dev/null +++ b/tools/set-primary-cluster.cc @@ -0,0 +1,92 @@ +// Copyright (c) 2015, Cornell University +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of HyperDex nor the names of its contributors may be +// used to endorse or promote products derived from this software without +// specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +// C +#include + +// HyperDex +#include +#include "tools/common.h" + +int +main(int argc, const char* argv[]) +{ + hyperdex::connect_opts conn; + e::argparser ap; + ap.autohelp(); + ap.option_string("[OPTIONS] "); + ap.add("Connect to a cluster:", conn.parser()); + + if (!ap.parse(argc, argv)) + { + return EXIT_FAILURE; + } + + if (!conn.validate()) + { + std::cerr << "invalid host:port specification\n" << std::endl; + ap.usage(); + return EXIT_FAILURE; + } + + try + { + hyperdex::Admin h(conn.host(), conn.port()); + hyperdex_admin_returncode rrc; + int64_t rid = h.set_primary_cluster(true, &rrc); + + if (rid < 0) + { + std::cerr << "could not make cluster primary cluster: " << rrc << std::endl; + return EXIT_FAILURE; + } + + hyperdex_admin_returncode lrc; + int64_t lid = h.loop(-1, &lrc); + + if (lid < 0) + { + std::cerr << "could not make cluster primary cluster: " << lrc << std::endl; + return EXIT_FAILURE; + } + + assert(rid == lid); + + if (rrc != HYPERDEX_ADMIN_SUCCESS) + { + std::cerr << "could not make cluster primary cluster: " << rrc << std::endl; + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; + } + catch (std::exception& e) + { + std::cerr << "error: " << e.what() << std::endl; + return EXIT_FAILURE; + } +}