diff --git a/items.c b/items.c index d7de9cd789..e245eaaeb0 100644 --- a/items.c +++ b/items.c @@ -46,6 +46,36 @@ typedef struct { rel_time_t evicted_time; } itemstats_t; +typedef struct { + void *c; /* original connection structure. still with source thread attached. */ + int sfd; /* client fd. */ + bipbuf_t *buf; /* output buffer */ + char *cbuf; /* current buffer */ +} crawler_client_t; + +typedef struct _crawler_module_t crawler_module_t; + +typedef void (*crawler_eval_func)(crawler_module_t *cm, item *it, uint32_t hv, int slab_cls); +typedef int (*crawler_init_func)(crawler_module_t *cm, void *data); // TODO: init args? +typedef void (*crawler_deinit_func)(crawler_module_t *cm); // TODO: extra args? +typedef void (*crawler_doneclass_func)(crawler_module_t *cm, int slab_cls); +typedef void (*crawler_finalize_func)(crawler_module_t *cm); + +typedef struct { + crawler_init_func init; /* run before crawl starts */ + crawler_eval_func eval; /* runs on an item. */ + crawler_doneclass_func doneclass; /* runs once per sub-crawler completion. */ + crawler_finalize_func finalize; /* runs once when all sub-crawlers are done. */ + bool needs_lock; /* whether or not we need the LRU lock held when eval is called */ + bool needs_client; /* whether or not to grab onto the remote client */ +} crawler_module_reg_t; + +struct _crawler_module_t { + void *data; /* opaque data pointer */ + crawler_client_t c; + crawler_module_reg_t *mod; +}; + typedef struct { uint64_t histo[61]; uint64_t ttl_hourplus; @@ -57,24 +87,57 @@ typedef struct { bool run_complete; } crawlerstats_t; -typedef struct { - void *c; /* original connection structure. still with source thread attached. */ - int sfd; /* client fd. */ - bipbuf_t *buf; /* output buffer */ - char *cbuf; /* current buffer */ -} crawler_client_t; +struct crawler_expired_data { + pthread_mutex_t lock; + crawlerstats_t crawlerstats[MAX_NUMBER_OF_SLAB_CLASSES]; + /* redundant with crawlerstats_t so we can get overall start/stop/done */ + rel_time_t start_time; + rel_time_t end_time; + bool crawl_complete; + bool is_external; /* whether this was an alloc local or remote to the module. */ +}; + +static int crawler_expired_init(crawler_module_t *cm, void *data); +static void crawler_expired_doneclass(crawler_module_t *cm, int slab_cls); +static void crawler_expired_finalize(crawler_module_t *cm); +static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv, int i); + +crawler_module_reg_t crawler_expired_mod = { + .init = crawler_expired_init, + .eval = crawler_expired_eval, + .doneclass = crawler_expired_doneclass, + .finalize = crawler_expired_finalize, + .needs_lock = true, + .needs_client = false +}; + +static void crawler_metadump_eval(crawler_module_t *cm, item *search, uint32_t hv, int i); + +crawler_module_reg_t crawler_metadump_mod = { + .init = NULL, + .eval = crawler_metadump_eval, + .doneclass = NULL, + .finalize = NULL, + .needs_lock = false, + .needs_client = true +}; + +crawler_module_reg_t *crawler_mod_regs[2] = { + &crawler_expired_mod, + &crawler_metadump_mod +}; + +crawler_module_t active_crawler_mod; static item *heads[LARGEST_ID]; static item *tails[LARGEST_ID]; static crawler crawlers[LARGEST_ID]; -static crawler_client_t crawler_client; static itemstats_t itemstats[LARGEST_ID]; static unsigned int sizes[LARGEST_ID]; static uint64_t sizes_bytes[LARGEST_ID]; static unsigned int *stats_sizes_hist = NULL; static uint64_t stats_sizes_cas_min = 0; static int stats_sizes_buckets = 0; -static crawlerstats_t crawlerstats[MAX_NUMBER_OF_SLAB_CLASSES]; static int crawler_count = 0; static volatile int do_run_lru_crawler_thread = 0; @@ -84,7 +147,6 @@ static int lru_maintainer_initialized = 0; static int lru_maintainer_check_clsid = 0; static pthread_mutex_t lru_crawler_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t lru_crawler_cond = PTHREAD_COND_INITIALIZER; -static pthread_mutex_t lru_crawler_stats_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t lru_maintainer_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t cas_id_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t stats_sizes_lock = PTHREAD_MUTEX_INITIALIZER; @@ -103,7 +165,8 @@ void item_stats_reset(void) { static int lru_pull_tail(const int orig_id, const int cur_lru, const uint64_t total_bytes, uint8_t flags); -static int lru_crawler_start(uint32_t id, uint32_t remaining, const enum crawler_run_type type); +static int lru_crawler_start(uint8_t *ids, uint32_t remaining, const enum crawler_run_type type, + void *data, void *c, const int sfd); /* Get the next CAS id for a new item. */ /* TODO: refactor some atomics for this. */ @@ -1058,20 +1121,22 @@ static int lru_maintainer_juggle(const int slabs_clsid) { * The latter is to avoid newly started daemons from waiting too long before * retrying a crawl. */ -static void lru_maintainer_crawler_check(void) { +static void lru_maintainer_crawler_check(struct crawler_expired_data *cdata) { int i; - static rel_time_t last_crawls[MAX_NUMBER_OF_SLAB_CLASSES]; + static rel_time_t next_crawls[MAX_NUMBER_OF_SLAB_CLASSES]; static rel_time_t next_crawl_wait[MAX_NUMBER_OF_SLAB_CLASSES]; + uint8_t todo[MAX_NUMBER_OF_SLAB_CLASSES]; + memset(todo, 0, sizeof(uint8_t) * MAX_NUMBER_OF_SLAB_CLASSES); + bool do_run = false; + if (!cdata->crawl_complete) { + return; + } + for (i = POWER_SMALLEST; i < MAX_NUMBER_OF_SLAB_CLASSES; i++) { - crawlerstats_t *s = &crawlerstats[i]; + crawlerstats_t *s = &cdata->crawlerstats[i]; /* We've not successfully kicked off a crawl yet. */ - if (last_crawls[i] == 0) { - if (lru_crawler_start(i, 0, CRAWLER_EXPIRED) > 0) { - last_crawls[i] = current_time; - } - } - pthread_mutex_lock(&lru_crawler_stats_lock); if (s->run_complete) { + pthread_mutex_lock(&cdata->lock); int x; /* Should we crawl again? */ uint64_t possible_reclaims = s->seen - s->noexp; @@ -1083,27 +1148,45 @@ static void lru_maintainer_crawler_check(void) { rel_time_t since_run = current_time - s->end_time; /* Don't bother if the payoff is too low. */ if (settings.verbose > 1) - fprintf(stderr, "maint crawler: low_watermark: %llu, possible_reclaims: %llu, since_run: %u\n", - (unsigned long long)low_watermark, (unsigned long long)possible_reclaims, + fprintf(stderr, "maint crawler[%d]: low_watermark: %llu, possible_reclaims: %llu, since_run: %u\n", + i, (unsigned long long)low_watermark, (unsigned long long)possible_reclaims, (unsigned int)since_run); for (x = 0; x < 60; x++) { - if (since_run < (x * 60) + 60) - break; available_reclaims += s->histo[x]; + if (available_reclaims > low_watermark) { + if (next_crawl_wait[i] < (x * 60)) { + next_crawl_wait[i] += 60; + } else if (next_crawl_wait[i] >= 60) { + next_crawl_wait[i] -= 60; + } + break; + } } - if (available_reclaims > low_watermark) { - last_crawls[i] = 0; - if (next_crawl_wait[i] > 60) - next_crawl_wait[i] -= 60; - } else if (since_run > 5 && since_run > next_crawl_wait[i]) { - last_crawls[i] = 0; - if (next_crawl_wait[i] < MAX_MAINTCRAWL_WAIT) - next_crawl_wait[i] += 60; + + if (available_reclaims == 0) { + next_crawl_wait[i] += 60; + } + + if (next_crawl_wait[i] > MAX_MAINTCRAWL_WAIT) { + next_crawl_wait[i] = MAX_MAINTCRAWL_WAIT; } + + next_crawls[i] = current_time + next_crawl_wait[i] + 5; if (settings.verbose > 1) - fprintf(stderr, "maint crawler: available reclaims: %llu, next_crawl: %u\n", (unsigned long long)available_reclaims, next_crawl_wait[i]); + fprintf(stderr, "maint crawler[%d]: next_crawl: %u, [%d] now: [%d]\n", + i, next_crawl_wait[i], next_crawls[i], current_time); + // Got our calculation, avoid running until next actual run. + s->run_complete = false; + pthread_mutex_unlock(&cdata->lock); } - pthread_mutex_unlock(&lru_crawler_stats_lock); + if (current_time > next_crawls[i]) { + todo[i] = 1; + do_run = true; + next_crawls[i] = current_time + 5; // minimum retry wait. + } + } + if (do_run) { + lru_crawler_start(todo, 0, CRAWLER_EXPIRED, cdata, NULL, 0); } } @@ -1116,6 +1199,10 @@ static void *lru_maintainer_thread(void *arg) { int i; useconds_t to_sleep = MIN_LRU_MAINTAINER_SLEEP; rel_time_t last_crawler_check = 0; + struct crawler_expired_data cdata; + memset(&cdata, 0, sizeof(struct crawler_expired_data)); + pthread_mutex_init(&cdata.lock, NULL); + cdata.crawl_complete = true; // kick off the crawler. pthread_mutex_lock(&lru_maintainer_lock); if (settings.verbose > 2) @@ -1149,7 +1236,7 @@ static void *lru_maintainer_thread(void *arg) { } /* Once per second at most */ if (settings.lru_crawler && last_crawler_check != current_time) { - lru_maintainer_crawler_check(); + lru_maintainer_crawler_check(&cdata); last_crawler_check = current_time; } } @@ -1308,13 +1395,83 @@ static item *crawler_crawl_q(item *it) { return it->next; /* success */ } +#define LRU_CRAWLER_WRITEBUF 8192 + +static void lru_crawler_close_client(crawler_client_t *c) { + //fprintf(stderr, "CRAWLER: Closing client\n"); + sidethread_conn_close(c->c); + c->c = NULL; + c->cbuf = NULL; + bipbuf_free(c->buf); + c->buf = NULL; +} + +static void lru_crawler_release_client(crawler_client_t *c) { + //fprintf(stderr, "CRAWLER: Closing client\n"); + redispatch_conn(c->c); + c->c = NULL; + c->cbuf = NULL; + bipbuf_free(c->buf); + c->buf = NULL; +} + +static int crawler_expired_init(crawler_module_t *cm, void *data) { + struct crawler_expired_data *d; + if (data != NULL) { + d = data; + d->is_external = true; + cm->data = data; + } else { + // allocate data. + d = calloc(1, sizeof(struct crawler_expired_data)); + if (d == NULL) { + return -1; + } + // init lock. + pthread_mutex_init(&d->lock, NULL); + d->is_external = false; + d->start_time = current_time; + + cm->data = d; + } + pthread_mutex_lock(&d->lock); + memset(&d->crawlerstats, 0, sizeof(crawlerstats_t) * MAX_NUMBER_OF_SLAB_CLASSES); + for (int x = 0; x < MAX_NUMBER_OF_SLAB_CLASSES; x++) { + d->crawlerstats[x].start_time = current_time; + d->crawlerstats[x].run_complete = false; + } + pthread_mutex_unlock(&d->lock); + return 0; +} + +static void crawler_expired_doneclass(crawler_module_t *cm, int slab_cls) { + struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data; + pthread_mutex_lock(&d->lock); + d->crawlerstats[CLEAR_LRU(slab_cls)].end_time = current_time; + d->crawlerstats[CLEAR_LRU(slab_cls)].run_complete = true; + pthread_mutex_unlock(&d->lock); +} + +static void crawler_expired_finalize(crawler_module_t *cm) { + struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data; + pthread_mutex_lock(&d->lock); + d->end_time = current_time; + d->crawl_complete = true; + pthread_mutex_unlock(&d->lock); + + if (!d->is_external) { + free(d); + } +} + /* I pulled this out to make the main thread clearer, but it reaches into the * main thread's values too much. Should rethink again. */ -static void item_crawler_evaluate(item *search, uint32_t hv, int i) { +static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv, int i) { int slab_id = CLEAR_LRU(i); - crawlerstats_t *s = &crawlerstats[slab_id]; - itemstats[i].crawler_items_checked++; + struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data; + pthread_mutex_lock(&d->lock); + crawlerstats_t *s = &d->crawlerstats[slab_id]; int is_flushed = item_is_flushed(search); if ((search->exptime != 0 && search->exptime < current_time) || is_flushed) { @@ -1350,9 +1507,10 @@ static void item_crawler_evaluate(item *search, uint32_t hv, int i) { s->histo[bucket]++; } } + pthread_mutex_unlock(&d->lock); } -static void item_crawler_metadump(item *it, uint32_t hv, int i) { +static void crawler_metadump_eval(crawler_module_t *cm, item *it, uint32_t hv, int i) { //int slab_id = CLEAR_LRU(i); char keybuf[KEY_MAX_LENGTH * 3 + 1]; int is_flushed = item_is_flushed(it); @@ -1364,7 +1522,7 @@ static void item_crawler_metadump(item *it, uint32_t hv, int i) { } // TODO: uriencode directly into the buffer. uriencode(ITEM_key(it), keybuf, it->nkey, KEY_MAX_LENGTH * 3 + 1); - int total = snprintf(crawler_client.cbuf, 4096, + int total = snprintf(cm->c.cbuf, 4096, "key=%s exp=%ld la=%llu cas=%llu fetch=%s\n", keybuf, (it->exptime == 0) ? -1 : (long)it->exptime + process_started, @@ -1373,32 +1531,14 @@ static void item_crawler_metadump(item *it, uint32_t hv, int i) { (it->it_flags & ITEM_FETCHED) ? "yes" : "no"); refcount_decr(&it->refcount); // TODO: some way of tracking the errors. these are very unlikely though. - if (total >= 4096 || total <= 0) { + if (total >= LRU_CRAWLER_WRITEBUF - 1 || total <= 0) { /* Failed to write, don't push it. */ return; } - bipbuf_push(crawler_client.buf, total); -} - -static void item_crawler_close_client(crawler_client_t *c) { - //fprintf(stderr, "CRAWLER: Closing client\n"); - sidethread_conn_close(c->c); - c->c = NULL; - c->cbuf = NULL; - bipbuf_free(c->buf); - c->buf = NULL; + bipbuf_push(cm->c.buf, total); } -static void item_crawler_release_client(crawler_client_t *c) { - //fprintf(stderr, "CRAWLER: Closing client\n"); - redispatch_conn(c->c); - c->c = NULL; - c->cbuf = NULL; - bipbuf_free(c->buf); - c->buf = NULL; -} - -static int item_crawler_metadump_poll(crawler_client_t *c) { +static int lru_crawler_poll(crawler_client_t *c) { unsigned char *data; unsigned int data_size = 0; struct pollfd to_poll[1]; @@ -1418,23 +1558,23 @@ static int item_crawler_metadump_poll(crawler_client_t *c) { char buf[1]; int res = read(c->sfd, buf, 1); if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) { - item_crawler_close_client(c); + lru_crawler_close_client(c); return -1; } } if ((data = bipbuf_peek_all(c->buf, &data_size)) != NULL) { if (to_poll[0].revents & (POLLHUP|POLLERR)) { - item_crawler_close_client(c); + lru_crawler_close_client(c); return -1; } else if (to_poll[0].revents & POLLOUT) { int total = write(c->sfd, data, data_size); if (total == -1) { if (errno != EAGAIN && errno != EWOULDBLOCK) { - item_crawler_close_client(c); + lru_crawler_close_client(c); return -1; } } else if (total == 0) { - item_crawler_close_client(c); + lru_crawler_close_client(c); return -1; } else { bipbuf_poll(c->buf, total); @@ -1448,13 +1588,13 @@ static int item_crawler_metadump_poll(crawler_client_t *c) { * for it to clear up or close. * Return NULL if closed. */ -static int item_crawler_metadump_getbuf(crawler_client_t *c) { +static int lru_crawler_client_getbuf(crawler_client_t *c) { void *buf = NULL; if (c->c == NULL) return -1; /* not enough space. */ - while ((buf = bipbuf_request(c->buf, 4096)) == NULL) { + while ((buf = bipbuf_request(c->buf, LRU_CRAWLER_WRITEBUF)) == NULL) { // TODO: max loops before closing. - int ret = item_crawler_metadump_poll(c); + int ret = lru_crawler_poll(c); if (ret < 0) return ret; } @@ -1462,19 +1602,13 @@ static int item_crawler_metadump_getbuf(crawler_client_t *c) { return 0; } -static void item_crawler_class_done(int i) { +static void lru_crawler_class_done(int i) { crawlers[i].it_flags = 0; crawler_count--; crawler_unlink_q((item *)&crawlers[i]); pthread_mutex_unlock(&lru_locks[i]); - pthread_mutex_lock(&lru_crawler_stats_lock); - crawlerstats[CLEAR_LRU(i)].end_time = current_time; - crawlerstats[CLEAR_LRU(i)].run_complete = true; - pthread_mutex_unlock(&lru_crawler_stats_lock); - - if (crawlers[i].type == CRAWLER_METADUMP && crawler_client.c != NULL) { - item_crawler_metadump_poll(&crawler_client); - } + if (active_crawler_mod.mod->doneclass != NULL) + active_crawler_mod.mod->doneclass(&active_crawler_mod, i); } static void *item_crawler_thread(void *arg) { @@ -1499,11 +1633,11 @@ static void *item_crawler_thread(void *arg) { } /* Get memory from bipbuf, if client has no space, flush. */ - // TODO: Will become a callback run here. - if (crawlers[i].type == CRAWLER_METADUMP) { - int ret = item_crawler_metadump_getbuf(&crawler_client); + if (active_crawler_mod.c.c != NULL) { + int ret = lru_crawler_client_getbuf(&active_crawler_mod.c); if (ret != 0) { - item_crawler_class_done(i); + // FIXME: Kill off the whole run. + lru_crawler_class_done(i); continue; } } @@ -1513,7 +1647,7 @@ static void *item_crawler_thread(void *arg) { (crawlers[i].remaining && --crawlers[i].remaining < 1)) { if (settings.verbose > 2) fprintf(stderr, "Nothing left to crawl for %d\n", i); - item_crawler_class_done(i); + lru_crawler_class_done(i); continue; } uint32_t hv = hash(ITEM_key(search), search->nkey); @@ -1533,20 +1667,21 @@ static void *item_crawler_thread(void *arg) { continue; } + itemstats[i].crawler_items_checked++; /* Frees the item or decrements the refcount. */ /* Interface for this could improve: do the free/decr here * instead? */ - if (crawlers[i].type == CRAWLER_METADUMP) + if (!active_crawler_mod.mod->needs_lock) { pthread_mutex_unlock(&lru_locks[i]); + } - pthread_mutex_lock(&lru_crawler_stats_lock); - crawlers[i].eval(search, hv, i); - pthread_mutex_unlock(&lru_crawler_stats_lock); + active_crawler_mod.mod->eval(&active_crawler_mod, search, hv, i); if (hold_lock) item_trylock_unlock(hold_lock); - if (crawlers[i].type != CRAWLER_METADUMP) + if (active_crawler_mod.mod->needs_lock) { pthread_mutex_unlock(&lru_locks[i]); + } if (crawls_persleep-- <= 0 && settings.lru_crawler_sleep) { usleep(settings.lru_crawler_sleep); @@ -1555,11 +1690,14 @@ static void *item_crawler_thread(void *arg) { } } - // FIXME: Need to global-ize the type of the crawl, or we're calling - // finalize on an opaque here. - if (crawler_client.c != NULL) { - item_crawler_metadump_poll(&crawler_client); - item_crawler_release_client(&crawler_client); + if (active_crawler_mod.mod != NULL) { + if (active_crawler_mod.mod->finalize != NULL) + active_crawler_mod.mod->finalize(&active_crawler_mod); + if (active_crawler_mod.c.c != NULL) { + lru_crawler_poll(&active_crawler_mod.c); + lru_crawler_release_client(&active_crawler_mod.c); + } + active_crawler_mod.mod = NULL; } if (settings.verbose > 2) @@ -1626,7 +1764,7 @@ int start_item_crawler_thread(void) { /* 'remaining' is passed in so the LRU maintainer thread can scrub the whole * LRU every time. */ -static int do_lru_crawler_start(uint32_t id, uint32_t remaining, enum crawler_run_type type) { +static int do_lru_crawler_start(uint32_t id, uint32_t remaining) { int i; uint32_t sid; uint32_t tocrawl[3]; @@ -1649,16 +1787,6 @@ static int do_lru_crawler_start(uint32_t id, uint32_t remaining, enum crawler_ru crawlers[sid].time = 0; crawlers[sid].remaining = remaining; crawlers[sid].slabs_clsid = sid; - crawlers[sid].type = type; - switch (type) { - case CRAWLER_METADUMP: - crawlers[sid].eval = item_crawler_metadump; - break; - case CRAWLER_EXPIRED: - default: - crawlers[sid].eval = item_crawler_evaluate; - break; - } crawler_link_q((item *)&crawlers[sid]); crawler_count++; starts++; @@ -1670,58 +1798,71 @@ static int do_lru_crawler_start(uint32_t id, uint32_t remaining, enum crawler_ru stats_state.lru_crawler_running = true; stats.lru_crawler_starts++; STATS_UNLOCK(); - pthread_mutex_lock(&lru_crawler_stats_lock); - memset(&crawlerstats[id], 0, sizeof(crawlerstats_t)); - crawlerstats[id].start_time = current_time; - pthread_mutex_unlock(&lru_crawler_stats_lock); } return starts; } -static int lru_crawler_start(uint32_t id, uint32_t remaining, const enum crawler_run_type type) { - int starts; - if (pthread_mutex_trylock(&lru_crawler_lock) != 0) { - return 0; +static int lru_crawler_set_client(crawler_module_t *cm, void *c, const int sfd) { + crawler_client_t *crawlc = &cm->c; + if (crawlc->c != NULL) { + return -1; } - starts = do_lru_crawler_start(id, remaining, type); - if (starts) { - pthread_cond_signal(&lru_crawler_cond); + crawlc->c = c; + crawlc->sfd = sfd; + + crawlc->buf = bipbuf_new(1024 * 128); + if (crawlc->buf == NULL) { + return -2; } - pthread_mutex_unlock(&lru_crawler_lock); - return starts; + return 0; } -/* FIXME: Temporary hack since we can't yet pass this information into - * lru_crawler_crawl.. which has the proper locks/etc. - * Multiple parallel commands could race, but isn't part of the testing. - */ -int lru_crawler_set_client(void *c, const int sfd) { - if (crawler_client.c != NULL) { +static int lru_crawler_start(uint8_t *ids, uint32_t remaining, + const enum crawler_run_type type, void *data, + void *c, const int sfd) { + int starts = 0; + if (pthread_mutex_trylock(&lru_crawler_lock) != 0) { return -1; } - crawler_client.c = c; - crawler_client.sfd = sfd; - crawler_client.buf = bipbuf_new(1024 * 128); - if (crawler_client.buf == NULL) { - return -2; + /* Configure the module */ + assert(crawler_mod_regs[type] != NULL); + active_crawler_mod.mod = crawler_mod_regs[type]; + if (active_crawler_mod.mod->init != NULL) { + active_crawler_mod.mod->init(&active_crawler_mod, data); } - return 0; + if (active_crawler_mod.mod->needs_client) { + if (c == NULL || sfd == 0) { + pthread_mutex_unlock(&lru_crawler_lock); + return -2; + } + if (lru_crawler_set_client(&active_crawler_mod, c, sfd) != 0) { + pthread_mutex_unlock(&lru_crawler_lock); + return -2; + } + } + + for (int sid = POWER_SMALLEST; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) { + if (ids[sid]) { + starts += do_lru_crawler_start(sid, remaining); + } + } + if (starts) { + pthread_cond_signal(&lru_crawler_cond); + } + pthread_mutex_unlock(&lru_crawler_lock); + return starts; } -/* FIXME: Split this into two functions: one to kick a crawler for a sid, and one to - * parse the string. LRU maintainer code is generating a string to set up a - * sid. +/* * Also only clear the crawlerstats once per sid. */ -enum crawler_result_type lru_crawler_crawl(char *slabs, const enum crawler_run_type type) { +enum crawler_result_type lru_crawler_crawl(char *slabs, const enum crawler_run_type type, + void *c, const int sfd) { char *b = NULL; uint32_t sid = 0; int starts = 0; uint8_t tocrawl[MAX_NUMBER_OF_SLAB_CLASSES]; - if (pthread_mutex_trylock(&lru_crawler_lock) != 0) { - return CRAWLER_RUNNING; - } /* FIXME: I added this while debugging. Don't think it's needed? */ memset(tocrawl, 0, sizeof(uint8_t) * MAX_NUMBER_OF_SLAB_CLASSES); @@ -1743,16 +1884,15 @@ enum crawler_result_type lru_crawler_crawl(char *slabs, const enum crawler_run_t } } - for (sid = POWER_SMALLEST; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) { - if (tocrawl[sid]) - starts += do_lru_crawler_start(sid, settings.lru_crawler_tocrawl, type); - } - if (starts) { - pthread_cond_signal(&lru_crawler_cond); - pthread_mutex_unlock(&lru_crawler_lock); + starts = lru_crawler_start(tocrawl, settings.lru_crawler_tocrawl, + type, NULL, c, sfd); + if (starts == -1) { + return CRAWLER_RUNNING; + } else if (starts == -2) { + return CRAWLER_ERROR; /* FIXME: not very helpful. */ + } else if (starts) { return CRAWLER_OK; } else { - pthread_mutex_unlock(&lru_crawler_lock); return CRAWLER_NOTSTARTED; } } @@ -1768,12 +1908,14 @@ void lru_crawler_resume(void) { int init_lru_crawler(void) { if (lru_crawler_initialized == 0) { - memset(&crawlerstats, 0, sizeof(crawlerstats_t) * MAX_NUMBER_OF_SLAB_CLASSES); if (pthread_cond_init(&lru_crawler_cond, NULL) != 0) { fprintf(stderr, "Can't initialize lru crawler condition\n"); return -1; } pthread_mutex_init(&lru_crawler_lock, NULL); + active_crawler_mod.c.c = NULL; + active_crawler_mod.mod = NULL; + active_crawler_mod.data = NULL; lru_crawler_initialized = 1; } return 0; diff --git a/items.h b/items.h index 848f8c015a..d9d99e3699 100644 --- a/items.h +++ b/items.h @@ -35,7 +35,7 @@ void item_stats_reset(void); extern pthread_mutex_t lru_locks[POWER_LARGEST]; enum crawler_result_type { - CRAWLER_OK=0, CRAWLER_RUNNING, CRAWLER_BADCLASS, CRAWLER_NOTSTARTED + CRAWLER_OK=0, CRAWLER_RUNNING, CRAWLER_BADCLASS, CRAWLER_NOTSTARTED, CRAWLER_ERROR }; int start_lru_maintainer_thread(void); @@ -47,7 +47,6 @@ void lru_maintainer_resume(void); int start_item_crawler_thread(void); int stop_item_crawler_thread(void); int init_lru_crawler(void); -enum crawler_result_type lru_crawler_crawl(char *slabs, enum crawler_run_type); -int lru_crawler_set_client(void *c, const int sfd); /* FIXME: Temporary. */ +enum crawler_result_type lru_crawler_crawl(char *slabs, enum crawler_run_type, void *c, const int sfd); void lru_crawler_pause(void); void lru_crawler_resume(void); diff --git a/memcached.c b/memcached.c index c090343281..bc3636c5a5 100644 --- a/memcached.c +++ b/memcached.c @@ -3976,7 +3976,7 @@ static void process_command(conn *c, char *command) { return; } - rv = lru_crawler_crawl(tokens[2].value, CRAWLER_EXPIRED); + rv = lru_crawler_crawl(tokens[2].value, CRAWLER_EXPIRED, NULL, 0); switch(rv) { case CRAWLER_OK: out_string(c, "OK"); @@ -3990,6 +3990,9 @@ static void process_command(conn *c, char *command) { case CRAWLER_NOTSTARTED: out_string(c, "NOTSTARTED no items to crawl"); break; + case CRAWLER_ERROR: + out_string(c, "ERROR an unknown error happened"); + break; } return; } else if (ntokens == 4 && strcmp(tokens[COMMAND_TOKEN + 1].value, "metadump") == 0) { @@ -3998,9 +4001,8 @@ static void process_command(conn *c, char *command) { return; } - // FIXME: check response code. - lru_crawler_set_client(c, c->sfd); - int rv = lru_crawler_crawl(tokens[2].value, CRAWLER_METADUMP); + int rv = lru_crawler_crawl(tokens[2].value, CRAWLER_METADUMP, + c, c->sfd); switch(rv) { case CRAWLER_OK: out_string(c, "OK"); @@ -4017,6 +4019,9 @@ static void process_command(conn *c, char *command) { case CRAWLER_NOTSTARTED: out_string(c, "NOTSTARTED no items to crawl"); break; + case CRAWLER_ERROR: + out_string(c, "ERROR an unknown error happened"); + break; } return; } else if (ntokens == 4 && strcmp(tokens[COMMAND_TOKEN + 1].value, "tocrawl") == 0) { diff --git a/memcached.h b/memcached.h index dd1fd6c413..359c122020 100644 --- a/memcached.h +++ b/memcached.h @@ -419,8 +419,6 @@ typedef struct _stritem { /* then data with terminating \r\n (no terminating null; it's binary!) */ } item; -typedef void (*crawler_eval_func)(item *it, uint32_t hv, int slab_cls); - // TODO: If we eventually want user loaded modules, we can't use an enum :( enum crawler_run_type { CRAWLER_EXPIRED=0, CRAWLER_METADUMP @@ -439,8 +437,6 @@ typedef struct { uint8_t slabs_clsid;/* which slab class we're in */ uint8_t nkey; /* key length, w/terminating null and padding */ uint32_t remaining; /* Max keys to crawl per slab per invocation */ - enum crawler_run_type type; /* which module to use during run */ - crawler_eval_func eval; /* The function to run with the locked item */ } crawler; /* Header when an item is actually a chunk of another item. */