Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

membase shard compatible mutilate #3

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
379 changes: 180 additions & 199 deletions Connection.cc

Large diffs are not rendered by default.

45 changes: 29 additions & 16 deletions Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "Generator.h"
#include "Operation.h"
#include "util.h"
#include "vbucket.h"

using namespace std;

Expand All @@ -25,25 +26,33 @@ void timer_cb(evutil_socket_t fd, short what, void *ptr);

class Connection {
public:
struct Host {
string hostname;
string port;
};

Connection(struct event_base* _base, struct evdns_base* _evdns,
string _hostname, string _port, options_t options,
bool sampling = true);
options_t options, vector<Connection::Host> hosts,
VBUCKET_CONFIG_HANDLE vb, bool sampling = true);

~Connection();

string hostname;
string port;
struct single_connection {
single_connection(Host host, struct bufferevent *bev)
: host(host), connected(false), bev(bev) {};
Host host;
bool connected;
std::queue<Operation> op_queue;
struct bufferevent *bev;
};

double start_time; // Time when this connection began operations.

enum read_state_enum {
INIT_READ,
LOADING,
IDLE,
WAITING_FOR_SASL,
WAITING_FOR_GET,
WAITING_FOR_GET_DATA,
WAITING_FOR_END,
WAITING_FOR_SET,
WAITING_FOR_OP,
MAX_READ_STATE,
};

Expand All @@ -64,31 +73,31 @@ class Connection {
void issue_set(const char* key, const char* value, int length,
double now = 0.0);
void issue_something(double now = 0.0);
void pop_op();
bool check_exit_condition(double now = 0.0);
void drive_write_machine(double now = 0.0);
bool isIdle();

void start_loading();

void reset();
void issue_sasl();
void start_sasl();

void event_callback(short events);
void read_callback();
void event_callback(struct bufferevent *bev, short events);
void read_callback(struct bufferevent *bev);
void write_callback();
void timer_callback();
bool consume_binary_response(evbuffer *input);
bool consume_ascii_response(evbuffer *input, Operation *op);

void set_priority(int pri);

options_t options;

std::queue<Operation> op_queue;

private:
struct event_base *base;
struct evdns_base *evdns;
struct bufferevent *bev;
vector<single_connection> conns;
VBUCKET_CONFIG_HANDLE vb;

struct event *timer; // Used to control inter-transmission time.
double lambda, next_time; // Inter-transmission time parameters.
Expand All @@ -102,4 +111,8 @@ class Connection {
Generator *keysize;
KeyGenerator *keygen;
Generator *iagen;

int op_queues_size();
single_connection& find_conn(struct bufferevent *bev);
void issue_sasl(struct bufferevent *bev);
};
10 changes: 7 additions & 3 deletions SConstruct
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ env['HAVE_POSIX_BARRIER'] = True

env.Append(CPPPATH = ['/usr/local/include', '/opt/local/include'])
env.Append(LIBPATH = ['/opt/local/lib'])
env.Append(CCFLAGS = '-std=c++0x -D_GNU_SOURCE') # -D__STDC_FORMAT_MACROS')
env.Append(CXXFLAGS = '-std=c++0x')
env.Append(CCFLAGS = '-D_GNU_SOURCE') # -D__STDC_FORMAT_MACROS')
if sys.platform == 'darwin':
env['CC'] = 'clang'
env['CXX'] = 'clang++'
Expand All @@ -29,6 +30,9 @@ if not conf.CheckLibWithHeader("event", "event2/event.h", "C++"):
if not conf.CheckLibWithHeader("pthread", "pthread.h", "C++"):
print "pthread required"
Exit(1)
if not conf.CheckLib("hashkit"):
print "hashkit needed by libvbucket"
Exit(1)
conf.CheckLib("rt", "clock_gettime", language="C++")
conf.CheckLibWithHeader("zmq", "zmq.hpp", "C++")
conf.CheckFunc('clock_gettime')
Expand All @@ -37,14 +41,14 @@ if not conf.CheckFunc('pthread_barrier_init'):

env = conf.Finish()

env.Append(CFLAGS = ' -O3 -Wall -g')
env.Append(CFLAGS = ' -O3 -Wall -g -std=c99')
#env.Append(CPPFLAGS = ' -D_GNU_SOURCE -D__STDC_FORMAT_MACROS')
#env.Append(CPPFLAGS = ' -DUSE_ADAPTIVE_SAMPLER')

env.Command(['cmdline.cc', 'cmdline.h'], 'cmdline.ggo', 'gengetopt < $SOURCE')

src = Split("""mutilate.cc cmdline.cc log.cc distributions.cc util.cc
Connection.cc Generator.cc""")
Connection.cc Generator.cc vbucket.c cJSON.c""")

if not env['HAVE_POSIX_BARRIER']: # USE_POSIX_BARRIER:
src += ['barrier.cc']
Expand Down
Loading