diff --git a/items.c b/items.c index a14973f3f8..d7de9cd789 100644 --- a/items.c +++ b/items.c @@ -1389,6 +1389,15 @@ static void item_crawler_close_client(crawler_client_t *c) { c->buf = NULL; } +static void item_crawler_release_client(crawler_client_t *c) { + //fprintf(stderr, "CRAWLER: Closing client\n"); + redispatch_conn(c->c); + c->c = NULL; + c->cbuf = NULL; + bipbuf_free(c->buf); + c->buf = NULL; +} + static int item_crawler_metadump_poll(crawler_client_t *c) { unsigned char *data; unsigned int data_size = 0; @@ -1465,7 +1474,6 @@ static void item_crawler_class_done(int i) { if (crawlers[i].type == CRAWLER_METADUMP && crawler_client.c != NULL) { item_crawler_metadump_poll(&crawler_client); - item_crawler_close_client(&crawler_client); } } @@ -1546,6 +1554,14 @@ static void *item_crawler_thread(void *arg) { } } } + + // FIXME: Need to global-ize the type of the crawl, or we're calling + // finalize on an opaque here. + if (crawler_client.c != NULL) { + item_crawler_metadump_poll(&crawler_client); + item_crawler_release_client(&crawler_client); + } + if (settings.verbose > 2) fprintf(stderr, "LRU crawler thread sleeping\n"); STATS_LOCK(); diff --git a/memcached.c b/memcached.c index 2cf849cc89..c090343281 100644 --- a/memcached.c +++ b/memcached.c @@ -449,6 +449,18 @@ void conn_close_idle(conn *c) { } } +/* bring conn back from a sidethread. could have had its event base moved. */ +void conn_worker_readd(conn *c) { + c->ev_flags = EV_READ | EV_PERSIST; + event_set(&c->event, c->sfd, c->ev_flags, event_handler, (void *)c); + event_base_set(c->thread->base, &c->event); + c->state = conn_new_cmd; + + if (event_add(&c->event, 0) == -1) { + perror("event_add"); + } +} + conn *conn_new(const int sfd, enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, diff --git a/memcached.h b/memcached.h index a98d60f58d..dd1fd6c413 100644 --- a/memcached.h +++ b/memcached.h @@ -599,6 +599,7 @@ enum delta_result_type do_add_delta(conn *c, const char *key, uint64_t *cas, const uint32_t hv); enum store_item_type do_store_item(item *item, int comm, conn* c, const uint32_t hv); conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, struct event_base *base); +void conn_worker_readd(conn *c); extern int daemonize(int nochdir, int noclose); #define mutex_lock(x) pthread_mutex_lock(x) @@ -620,7 +621,7 @@ extern int daemonize(int nochdir, int noclose); */ void memcached_thread_init(int nthreads); -int dispatch_event_add(int thread, conn *c); +void redispatch_conn(conn *c); void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport); void sidethread_conn_close(conn *c); diff --git a/thread.c b/thread.c index 757dd8f460..b6231806fc 100644 --- a/thread.c +++ b/thread.c @@ -24,6 +24,7 @@ struct conn_queue_item { int event_flags; int read_buffer_size; enum network_transport transport; + conn *c; CQ_ITEM *next; }; @@ -422,6 +423,14 @@ static void thread_libevent_process(int fd, short which, void *arg) { cqi_free(item); } break; + case 'r': + item = cq_pop(me->new_conn_queue); + + if (NULL != item) { + conn_worker_readd(item->c); + cqi_free(item); + } + break; /* we were told to pause and report in */ case 'p': register_thread_initialized(); @@ -481,30 +490,28 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, /* * Re-dispatches a connection back to the original thread. Can be called from * any side thread borrowing a connection. - * TODO: Look into this. too complicated? */ -#ifdef BOGUS_DEFINE void redispatch_conn(conn *c) { CQ_ITEM *item = cqi_new(); char buf[1]; if (item == NULL) { /* Can't cleanly redispatch connection. close it forcefully. */ - /* FIXME: is conn_cleanup() necessary? - * if conn was handed off to a side thread it should be clean. - * could also put it into a "clean_me" state? - */ c->state = conn_closed; close(c->sfd); return; } LIBEVENT_THREAD *thread = c->thread; - item->sfd = sfd; - /* pass in the state somehow? - item->init_state = conn_closing; */ - item->event_flags = c->event_flags; - item->conn = c; + item->sfd = c->sfd; + item->init_state = conn_new_cmd; + item->c = c; + + cq_push(thread->new_conn_queue, item); + + buf[0] = 'r'; + if (write(thread->notify_send_fd, buf, 1) != 1) { + perror("Writing to thread notify pipe"); + } } -#endif /* This misses the allow_new_conns flag :( */ void sidethread_conn_close(conn *c) {