Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

membase shard compatible mutilate #3

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Bug fixed and issue where internal connections inside a single Connec…
…tion

don't wait for each other to connect before changing read_state to IDLE.
syang0 committed Mar 21, 2013
commit a13ccd843ede1bf81d8c7bc0efb9b043f56e6a1c
104 changes: 44 additions & 60 deletions Connection.cc
Original file line number Diff line number Diff line change
@@ -122,6 +122,10 @@ void Connection::reset() {
stats = ConnectionStats(stats.sampling);
}

void Connection::start_sasl() {
for(single_connection& conn : conns)
issue_sasl(conn.bev);
}

void Connection::issue_sasl(struct bufferevent *bev) {
read_state = WAITING_FOR_OP;
@@ -142,24 +146,16 @@ void Connection::issue_sasl(struct bufferevent *bev) {
}

void Connection::issue_get(const char* key, double now) {
struct bufferevent *bev = NULL;
std::queue<Operation>* op_queue;
single_connection* conn = &(conns.front());
uint16_t keylen = strlen(key);
uint16_t vbucket_id = 0;
Operation op;
int l;

//TODO(syang0) Test, comment this out and you should see misses.
if (vb) {
vbucket_id = vbucket_get_vbucket_by_key(vb, key, keylen);
int serverIndex = vbucket_get_master(vb, vbucket_id);
single_connection& conn = conns[serverIndex];
op_queue = &conn.op_queue;
bev = conn.bev;
} else {
single_connection& conn = conns.front();
op_queue = &(conn.op_queue);
bev = conn.bev;
conn = &(conns[serverIndex]);
}


@@ -183,7 +179,7 @@ void Connection::issue_get(const char* key, double now) {
op.type = Operation::GET;
op.key = string(key);

op_queue->push(op);
conn->op_queue.push(op);

if (read_state == IDLE)
read_state = WAITING_FOR_OP;
@@ -195,20 +191,19 @@ void Connection::issue_get(const char* key, double now) {
0x00, 0x00, htons(vbucket_id),
htonl(keylen) };

bufferevent_write(bev, &h, 24); // size does not include extras
bufferevent_write(bev, key, keylen);
bufferevent_write(conn->bev, &h, 24); // size does not include extras
bufferevent_write(conn->bev, key, keylen);
l = 24 + keylen;
} else {
l = evbuffer_add_printf(bufferevent_get_output(bev), "get %s\r\n", key);
l = evbuffer_add_printf(bufferevent_get_output(conn->bev), "get %s\r\n", key);
}

if (read_state != LOADING) stats.tx_bytes += l;
}

void Connection::issue_set(const char* key, const char* value, int length,
double now) {
struct bufferevent *bev = NULL;
std::queue<Operation>* op_queue;
single_connection* conn = &(conns.front());
uint16_t keylen = strlen(key);
uint16_t vbucket_id = 0;
Operation op;
@@ -217,13 +212,7 @@ void Connection::issue_set(const char* key, const char* value, int length,
if (vb) {
vbucket_id = vbucket_get_vbucket_by_key(vb, key, keylen);
int serverIndex = vbucket_get_master(vb, vbucket_id);
single_connection& conn = conns[serverIndex];
op_queue = &conn.op_queue;
bev = conn.bev;
} else {
single_connection& conn = conns.front();
op_queue = &(conn.op_queue);
bev = conn.bev;
conn = &(conns[serverIndex]);
}

#if HAVE_CLOCK_GETTIME
@@ -234,7 +223,7 @@ void Connection::issue_set(const char* key, const char* value, int length,
#endif

op.type = Operation::SET;
op_queue->push(op);
conn->op_queue.push(op);

if (read_state == IDLE)
read_state = WAITING_FOR_OP;
@@ -245,15 +234,15 @@ void Connection::issue_set(const char* key, const char* value, int length,
0x08, 0x00, htons(vbucket_id),
htonl(keylen + 8 + length)};

bufferevent_write(bev, &h, 32); // With extras
bufferevent_write(bev, key, keylen);
bufferevent_write(bev, value, length);
bufferevent_write(conn->bev, &h, 32); // With extras
bufferevent_write(conn->bev, key, keylen);
bufferevent_write(conn->bev, value, length);
l = 24 + h.body_len;
} else {
l = evbuffer_add_printf(bufferevent_get_output(bev),
l = evbuffer_add_printf(bufferevent_get_output(conn->bev),
"set %s 0 0 %d\r\n", key, length);
bufferevent_write(bev, value, length);
bufferevent_write(bev, "\r\n", 2);
bufferevent_write(conn->bev, value, length);
bufferevent_write(conn->bev, "\r\n", 2);
l += length + 2;
}

@@ -277,27 +266,6 @@ void Connection::issue_something(double now) {
}
}

void Connection::pop_op(std::queue<Operation>& op_queue) {
assert(op_queue.size() > 0);

op_queue.pop();

if (read_state == LOADING) return;
read_state = IDLE;

// Advance the read state machine.
if (op_queue.size() > 0) {
Operation& op = op_queue.front();
switch (op.type) {
case Operation::GET:
case Operation::SET:
read_state = WAITING_FOR_OP;
break;
default: DIE("Not implemented.");
}
}
}

bool Connection::check_exit_condition(double now) {
if (read_state == INIT_READ) return false;
if (now == 0.0) now = get_time();
@@ -387,7 +355,8 @@ void Connection::event_callback(struct bufferevent *bev, short events) {
// event_base_gettimeofday_cached(base, &now_tv);

if (events & BEV_EVENT_CONNECTED) {
single_connection conn = find_conn(bev);
single_connection& conn = find_conn(bev);
conn.connected = true;
D("Connected to %s:%s.", conn.hostname.c_str(), conn.port.c_str());
int fd = bufferevent_getfd(bev);
if (fd < 0) DIE("bufferevent_getfd");
@@ -399,10 +368,15 @@ void Connection::event_callback(struct bufferevent *bev, short events) {
DIE("setsockopt()");
}

if (options.sasl)
issue_sasl(bev);
else
bool allConnsConnected = true;
for (single_connection& c : conns) {
if (!c.connected)
allConnsConnected = false;
}

if (allConnsConnected)
read_state = IDLE; // This is the most important part!

} else if (events & BEV_EVENT_ERROR) {
int err = bufferevent_socket_get_dns_error(bev);
if (err) DIE("DNS error: %s", evutil_gai_strerror(err));
@@ -435,13 +409,16 @@ void Connection::read_callback(struct bufferevent *bev) {
if (op_queue.size() > 0) op = &op_queue.front();

switch (read_state) {
case INIT_READ: DIE("event from uninitialized connection");
case IDLE: return; // We munched all the data we expected?

// Note: for binary, the whole get suite (GET, GET_DATA, END) is collapsed
// into one state
case WAITING_FOR_OP:
assert(op_queue.size() > 0);
if (op_queue.empty()) {
// Maybe we're waiting on other connections?
assert(op_queues_size() > 0);
return;
}

finished = (options.binary) ? consume_binary_response(input) :
consume_ascii_response(input, op);
@@ -464,12 +441,19 @@ void Connection::read_callback(struct bufferevent *bev) {
if (op->type == Operation::SET)
stats.log_get(*op);

pop_op(op_queue);
op_queue.pop();
if (op_queues_size() == 0)
read_state = IDLE;

drive_write_machine(now);
break;

case LOADING:
assert(op_queue.size() > 0);
if (op_queue.empty()) {
// Maybe we're waiting on other connections?
assert(op_queues_size() > 0);
return;
}

if (options.binary) {
if (!consume_binary_response(input)) return;
@@ -480,7 +464,7 @@ void Connection::read_callback(struct bufferevent *bev) {
}

loader_completed++;
pop_op(op_queue);
op_queue.pop();

if (loader_completed == options.records) {
D("Finished loading.");
14 changes: 7 additions & 7 deletions Connection.h
Original file line number Diff line number Diff line change
@@ -38,9 +38,10 @@ class Connection {

struct single_connection {
single_connection(string hostname, string port, struct bufferevent *bev)
: hostname(hostname), port(port), bev(bev) {};
: hostname(hostname), port(port), connected(false), bev(bev) {};
string hostname;
string port;
bool connected;
std::queue<Operation> op_queue;
struct bufferevent *bev;
};
@@ -72,15 +73,14 @@ class Connection {
void issue_set(const char* key, const char* value, int length,
double now = 0.0);
void issue_something(double now = 0.0);
void pop_op(std::queue<Operation>& op_queue);
bool check_exit_condition(double now = 0.0);
void drive_write_machine(double now = 0.0);
bool isIdle();

void start_loading();

void reset();
void issue_sasl(struct bufferevent *bev);
void start_sasl();

void event_callback(struct bufferevent *bev, short events);
void read_callback(struct bufferevent *bev);
@@ -96,10 +96,8 @@ class Connection {
private:
struct event_base *base;
struct evdns_base *evdns;

vector<single_connection> conns;
int op_queues_size();
single_connection& find_conn(struct bufferevent *bev);
VBUCKET_CONFIG_HANDLE vb;

struct event *timer; // Used to control inter-transmission time.
double lambda, next_time; // Inter-transmission time parameters.
@@ -114,5 +112,7 @@ class Connection {
KeyGenerator *keygen;
Generator *iagen;

VBUCKET_CONFIG_HANDLE vb;
int op_queues_size();
single_connection& find_conn(struct bufferevent *bev);
void issue_sasl(struct bufferevent *bev);
};
20 changes: 19 additions & 1 deletion mutilate.cc
Original file line number Diff line number Diff line change
@@ -638,7 +638,6 @@ void do_mutilate(const vector<string>& servers, options_t& options,
vector<Connection*> connections;
vector<Connection*> server_lead;

//TODO(syang0) check that if membaseConfig is given, no server can be given too
if (args.membaseConfig_given) {
assert(vb);
for (int c = 0; c < options.connections; c++) {
@@ -679,6 +678,25 @@ void do_mutilate(const vector<string>& servers, options_t& options,
else break;
}

if (options.sasl) {
for(auto s: connections) s->start_sasl();

// Wait for all Connections to become IDLE.
while (1) {
// FIXME: If all connections become ready before event_base_loop
// is called, this will deadlock.
event_base_loop(base, EVLOOP_ONCE);

bool restart = false;
for (Connection *conn: connections)
if (!conn->isIdle())
restart = true;

if (restart) continue;
else break;
}
}

// Load database on lead connection for each server.
if (!options.noload) {
V("Loading database.");