Skip to content

Commit

Permalink
allow redispatching sidethread conn to worker
Browse files Browse the repository at this point in the history
also fixes a bug where metadump was closing the client connection after a
single slab class.

not ported to the logger yet.
  • Loading branch information
dormando committed Aug 20, 2016
1 parent 5a93d69 commit 31e7d8f
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 14 deletions.
18 changes: 17 additions & 1 deletion items.c
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,15 @@ static void item_crawler_close_client(crawler_client_t *c) {
c->buf = NULL;
}

static void item_crawler_release_client(crawler_client_t *c) {
//fprintf(stderr, "CRAWLER: Closing client\n");
redispatch_conn(c->c);
c->c = NULL;
c->cbuf = NULL;
bipbuf_free(c->buf);
c->buf = NULL;
}

static int item_crawler_metadump_poll(crawler_client_t *c) {
unsigned char *data;
unsigned int data_size = 0;
Expand Down Expand Up @@ -1465,7 +1474,6 @@ static void item_crawler_class_done(int i) {

if (crawlers[i].type == CRAWLER_METADUMP && crawler_client.c != NULL) {
item_crawler_metadump_poll(&crawler_client);
item_crawler_close_client(&crawler_client);
}
}

Expand Down Expand Up @@ -1546,6 +1554,14 @@ static void *item_crawler_thread(void *arg) {
}
}
}

// FIXME: Need to global-ize the type of the crawl, or we're calling
// finalize on an opaque here.
if (crawler_client.c != NULL) {
item_crawler_metadump_poll(&crawler_client);
item_crawler_release_client(&crawler_client);
}

if (settings.verbose > 2)
fprintf(stderr, "LRU crawler thread sleeping\n");
STATS_LOCK();
Expand Down
12 changes: 12 additions & 0 deletions memcached.c
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,18 @@ void conn_close_idle(conn *c) {
}
}

/* bring conn back from a sidethread. could have had its event base moved. */
void conn_worker_readd(conn *c) {
c->ev_flags = EV_READ | EV_PERSIST;
event_set(&c->event, c->sfd, c->ev_flags, event_handler, (void *)c);
event_base_set(c->thread->base, &c->event);
c->state = conn_new_cmd;

if (event_add(&c->event, 0) == -1) {
perror("event_add");
}
}

conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
const int read_buffer_size, enum network_transport transport,
Expand Down
3 changes: 2 additions & 1 deletion memcached.h
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ enum delta_result_type do_add_delta(conn *c, const char *key,
uint64_t *cas, const uint32_t hv);
enum store_item_type do_store_item(item *item, int comm, conn* c, const uint32_t hv);
conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, struct event_base *base);
void conn_worker_readd(conn *c);
extern int daemonize(int nochdir, int noclose);

#define mutex_lock(x) pthread_mutex_lock(x)
Expand All @@ -620,7 +621,7 @@ extern int daemonize(int nochdir, int noclose);
*/

void memcached_thread_init(int nthreads);
int dispatch_event_add(int thread, conn *c);
void redispatch_conn(conn *c);
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport);
void sidethread_conn_close(conn *c);

Expand Down
31 changes: 19 additions & 12 deletions thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ struct conn_queue_item {
int event_flags;
int read_buffer_size;
enum network_transport transport;
conn *c;
CQ_ITEM *next;
};

Expand Down Expand Up @@ -422,6 +423,14 @@ static void thread_libevent_process(int fd, short which, void *arg) {
cqi_free(item);
}
break;
case 'r':
item = cq_pop(me->new_conn_queue);

if (NULL != item) {
conn_worker_readd(item->c);
cqi_free(item);
}
break;
/* we were told to pause and report in */
case 'p':
register_thread_initialized();
Expand Down Expand Up @@ -481,30 +490,28 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
/*
* Re-dispatches a connection back to the original thread. Can be called from
* any side thread borrowing a connection.
* TODO: Look into this. too complicated?
*/
#ifdef BOGUS_DEFINE
void redispatch_conn(conn *c) {
CQ_ITEM *item = cqi_new();
char buf[1];
if (item == NULL) {
/* Can't cleanly redispatch connection. close it forcefully. */
/* FIXME: is conn_cleanup() necessary?
* if conn was handed off to a side thread it should be clean.
* could also put it into a "clean_me" state?
*/
c->state = conn_closed;
close(c->sfd);
return;
}
LIBEVENT_THREAD *thread = c->thread;
item->sfd = sfd;
/* pass in the state somehow?
item->init_state = conn_closing; */
item->event_flags = c->event_flags;
item->conn = c;
item->sfd = c->sfd;
item->init_state = conn_new_cmd;
item->c = c;

cq_push(thread->new_conn_queue, item);

buf[0] = 'r';
if (write(thread->notify_send_fd, buf, 1) != 1) {
perror("Writing to thread notify pipe");
}
}
#endif

/* This misses the allow_new_conns flag :( */
void sidethread_conn_close(conn *c) {
Expand Down

0 comments on commit 31e7d8f

Please sign in to comment.