From a45d24d16904086dd40836b5083a6e8b83f8ab5f Mon Sep 17 00:00:00 2001 From: dormando Date: Wed, 10 Aug 2016 02:25:36 -0700 Subject: [PATCH] pull LRU crawler out into its own file. ~600 lines gone from items.c makes it a lot more manageable. this change is almost purely moving code around and renaming functions. very little logic has changed. --- Makefile.am | 3 +- crawler.c | 630 +++++++++++++++++++++++++++++++++++++++++++++++++++ crawler.h | 39 ++++ items.c | 642 ++-------------------------------------------------- items.h | 24 +- memcached.c | 1 - memcached.h | 4 + 7 files changed, 701 insertions(+), 642 deletions(-) create mode 100644 crawler.c create mode 100644 crawler.h diff --git a/Makefile.am b/Makefile.am index 281e9afd89..23523015de 100644 --- a/Makefile.am +++ b/Makefile.am @@ -20,7 +20,8 @@ memcached_SOURCES = memcached.c memcached.h \ util.c util.h \ trace.h cache.h sasl_defs.h \ bipbuffer.c bipbuffer.h \ - logger.c logger.h + logger.c logger.h \ + crawler.c crawler.h if BUILD_CACHE memcached_SOURCES += cache.c diff --git a/crawler.c b/crawler.c new file mode 100644 index 0000000000..246c70c72b --- /dev/null +++ b/crawler.c @@ -0,0 +1,630 @@ +/* Copyright 2016 Netflix. + * + * Use and distribution licensed under the BSD license. See + * the LICENSE file for full text. + */ + +/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +#include "memcached.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define LARGEST_ID POWER_LARGEST + +typedef struct { + void *c; /* original connection structure. still with source thread attached. */ + int sfd; /* client fd. */ + bipbuf_t *buf; /* output buffer */ + char *cbuf; /* current buffer */ +} crawler_client_t; + +typedef struct _crawler_module_t crawler_module_t; + +typedef void (*crawler_eval_func)(crawler_module_t *cm, item *it, uint32_t hv, int slab_cls); +typedef int (*crawler_init_func)(crawler_module_t *cm, void *data); // TODO: init args? +typedef void (*crawler_deinit_func)(crawler_module_t *cm); // TODO: extra args? +typedef void (*crawler_doneclass_func)(crawler_module_t *cm, int slab_cls); +typedef void (*crawler_finalize_func)(crawler_module_t *cm); + +typedef struct { + crawler_init_func init; /* run before crawl starts */ + crawler_eval_func eval; /* runs on an item. */ + crawler_doneclass_func doneclass; /* runs once per sub-crawler completion. */ + crawler_finalize_func finalize; /* runs once when all sub-crawlers are done. */ + bool needs_lock; /* whether or not we need the LRU lock held when eval is called */ + bool needs_client; /* whether or not to grab onto the remote client */ +} crawler_module_reg_t; + +struct _crawler_module_t { + void *data; /* opaque data pointer */ + crawler_client_t c; + crawler_module_reg_t *mod; +}; + +static int crawler_expired_init(crawler_module_t *cm, void *data); +static void crawler_expired_doneclass(crawler_module_t *cm, int slab_cls); +static void crawler_expired_finalize(crawler_module_t *cm); +static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv, int i); + +crawler_module_reg_t crawler_expired_mod = { + .init = crawler_expired_init, + .eval = crawler_expired_eval, + .doneclass = crawler_expired_doneclass, + .finalize = crawler_expired_finalize, + .needs_lock = true, + .needs_client = false +}; + +static void crawler_metadump_eval(crawler_module_t *cm, item *search, uint32_t hv, int i); + +crawler_module_reg_t crawler_metadump_mod = { + .init = NULL, + .eval = crawler_metadump_eval, + .doneclass = NULL, + .finalize = NULL, + .needs_lock = false, + .needs_client = true +}; + +crawler_module_reg_t *crawler_mod_regs[2] = { + &crawler_expired_mod, + &crawler_metadump_mod +}; + +crawler_module_t active_crawler_mod; + +static crawler crawlers[LARGEST_ID]; + +static int crawler_count = 0; +static volatile int do_run_lru_crawler_thread = 0; +static int lru_crawler_initialized = 0; +static pthread_mutex_t lru_crawler_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t lru_crawler_cond = PTHREAD_COND_INITIALIZER; + +/* Will crawl all slab classes a minimum of once per hour */ +#define MAX_MAINTCRAWL_WAIT 60 * 60 + +/*** LRU CRAWLER THREAD ***/ + +#define LRU_CRAWLER_WRITEBUF 8192 + +static void lru_crawler_close_client(crawler_client_t *c) { + //fprintf(stderr, "CRAWLER: Closing client\n"); + sidethread_conn_close(c->c); + c->c = NULL; + c->cbuf = NULL; + bipbuf_free(c->buf); + c->buf = NULL; +} + +static void lru_crawler_release_client(crawler_client_t *c) { + //fprintf(stderr, "CRAWLER: Closing client\n"); + redispatch_conn(c->c); + c->c = NULL; + c->cbuf = NULL; + bipbuf_free(c->buf); + c->buf = NULL; +} + +static int crawler_expired_init(crawler_module_t *cm, void *data) { + struct crawler_expired_data *d; + if (data != NULL) { + d = data; + d->is_external = true; + cm->data = data; + } else { + // allocate data. + d = calloc(1, sizeof(struct crawler_expired_data)); + if (d == NULL) { + return -1; + } + // init lock. + pthread_mutex_init(&d->lock, NULL); + d->is_external = false; + d->start_time = current_time; + + cm->data = d; + } + pthread_mutex_lock(&d->lock); + memset(&d->crawlerstats, 0, sizeof(crawlerstats_t) * MAX_NUMBER_OF_SLAB_CLASSES); + for (int x = 0; x < MAX_NUMBER_OF_SLAB_CLASSES; x++) { + d->crawlerstats[x].start_time = current_time; + d->crawlerstats[x].run_complete = false; + } + pthread_mutex_unlock(&d->lock); + return 0; +} + +static void crawler_expired_doneclass(crawler_module_t *cm, int slab_cls) { + struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data; + pthread_mutex_lock(&d->lock); + d->crawlerstats[CLEAR_LRU(slab_cls)].end_time = current_time; + d->crawlerstats[CLEAR_LRU(slab_cls)].run_complete = true; + pthread_mutex_unlock(&d->lock); +} + +static void crawler_expired_finalize(crawler_module_t *cm) { + struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data; + pthread_mutex_lock(&d->lock); + d->end_time = current_time; + d->crawl_complete = true; + pthread_mutex_unlock(&d->lock); + + if (!d->is_external) { + free(d); + } +} + +/* I pulled this out to make the main thread clearer, but it reaches into the + * main thread's values too much. Should rethink again. + */ +static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv, int i) { + int slab_id = CLEAR_LRU(i); + struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data; + pthread_mutex_lock(&d->lock); + crawlerstats_t *s = &d->crawlerstats[slab_id]; + int is_flushed = item_is_flushed(search); + if ((search->exptime != 0 && search->exptime < current_time) + || is_flushed) { + crawlers[i].reclaimed++; + s->reclaimed++; + + if (settings.verbose > 1) { + int ii; + char *key = ITEM_key(search); + fprintf(stderr, "LRU crawler found an expired item (flags: %d, slab: %d): ", + search->it_flags, search->slabs_clsid); + for (ii = 0; ii < search->nkey; ++ii) { + fprintf(stderr, "%c", key[ii]); + } + fprintf(stderr, "\n"); + } + if ((search->it_flags & ITEM_FETCHED) == 0 && !is_flushed) { + crawlers[i].unfetched++; + } + do_item_unlink_nolock(search, hv); + do_item_remove(search); + assert(search->slabs_clsid == 0); + } else { + s->seen++; + refcount_decr(&search->refcount); + if (search->exptime == 0) { + s->noexp++; + } else if (search->exptime - current_time > 3599) { + s->ttl_hourplus++; + } else { + rel_time_t ttl_remain = search->exptime - current_time; + int bucket = ttl_remain / 60; + s->histo[bucket]++; + } + } + pthread_mutex_unlock(&d->lock); +} + +static void crawler_metadump_eval(crawler_module_t *cm, item *it, uint32_t hv, int i) { + //int slab_id = CLEAR_LRU(i); + char keybuf[KEY_MAX_LENGTH * 3 + 1]; + int is_flushed = item_is_flushed(it); + /* Ignore expired content. */ + if ((it->exptime != 0 && it->exptime < current_time) + || is_flushed) { + refcount_decr(&it->refcount); + return; + } + // TODO: uriencode directly into the buffer. + uriencode(ITEM_key(it), keybuf, it->nkey, KEY_MAX_LENGTH * 3 + 1); + int total = snprintf(cm->c.cbuf, 4096, + "key=%s exp=%ld la=%llu cas=%llu fetch=%s\n", + keybuf, + (it->exptime == 0) ? -1 : (long)it->exptime + process_started, + (unsigned long long)it->time + process_started, + (unsigned long long)ITEM_get_cas(it), + (it->it_flags & ITEM_FETCHED) ? "yes" : "no"); + refcount_decr(&it->refcount); + // TODO: some way of tracking the errors. these are very unlikely though. + if (total >= LRU_CRAWLER_WRITEBUF - 1 || total <= 0) { + /* Failed to write, don't push it. */ + return; + } + bipbuf_push(cm->c.buf, total); +} + +static int lru_crawler_poll(crawler_client_t *c) { + unsigned char *data; + unsigned int data_size = 0; + struct pollfd to_poll[1]; + to_poll[0].fd = c->sfd; + to_poll[0].events = POLLOUT; + + int ret = poll(to_poll, 1, 1000); + + if (ret < 0) { + // fatal. + return -1; + } + + if (ret == 0) return 0; + + if (to_poll[0].revents & POLLIN) { + char buf[1]; + int res = read(c->sfd, buf, 1); + if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) { + lru_crawler_close_client(c); + return -1; + } + } + if ((data = bipbuf_peek_all(c->buf, &data_size)) != NULL) { + if (to_poll[0].revents & (POLLHUP|POLLERR)) { + lru_crawler_close_client(c); + return -1; + } else if (to_poll[0].revents & POLLOUT) { + int total = write(c->sfd, data, data_size); + if (total == -1) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + lru_crawler_close_client(c); + return -1; + } + } else if (total == 0) { + lru_crawler_close_client(c); + return -1; + } else { + bipbuf_poll(c->buf, total); + } + } + } + return 0; +} + +/* Grab some space to work with, if none exists, run the poll() loop and wait + * for it to clear up or close. + * Return NULL if closed. + */ +static int lru_crawler_client_getbuf(crawler_client_t *c) { + void *buf = NULL; + if (c->c == NULL) return -1; + /* not enough space. */ + while ((buf = bipbuf_request(c->buf, LRU_CRAWLER_WRITEBUF)) == NULL) { + // TODO: max loops before closing. + int ret = lru_crawler_poll(c); + if (ret < 0) return ret; + } + + c->cbuf = buf; + return 0; +} + +static void lru_crawler_class_done(int i) { + crawlers[i].it_flags = 0; + crawler_count--; + do_item_unlinktail_q((item *)&crawlers[i]); + do_item_stats_add_crawl(i, crawlers[i].reclaimed, + crawlers[i].unfetched, crawlers[i].checked); + pthread_mutex_unlock(&lru_locks[i]); + if (active_crawler_mod.mod->doneclass != NULL) + active_crawler_mod.mod->doneclass(&active_crawler_mod, i); +} + +static void *item_crawler_thread(void *arg) { + int i; + int crawls_persleep = settings.crawls_persleep; + + pthread_mutex_lock(&lru_crawler_lock); + pthread_cond_signal(&lru_crawler_cond); + settings.lru_crawler = true; + if (settings.verbose > 2) + fprintf(stderr, "Starting LRU crawler background thread\n"); + while (do_run_lru_crawler_thread) { + pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock); + + while (crawler_count) { + item *search = NULL; + void *hold_lock = NULL; + + for (i = POWER_SMALLEST; i < LARGEST_ID; i++) { + if (crawlers[i].it_flags != 1) { + continue; + } + + /* Get memory from bipbuf, if client has no space, flush. */ + if (active_crawler_mod.c.c != NULL) { + int ret = lru_crawler_client_getbuf(&active_crawler_mod.c); + if (ret != 0) { + lru_crawler_class_done(i); + continue; + } + } + pthread_mutex_lock(&lru_locks[i]); + search = do_item_crawl_q((item *)&crawlers[i]); + if (search == NULL || + (crawlers[i].remaining && --crawlers[i].remaining < 1)) { + if (settings.verbose > 2) + fprintf(stderr, "Nothing left to crawl for %d\n", i); + lru_crawler_class_done(i); + continue; + } + uint32_t hv = hash(ITEM_key(search), search->nkey); + /* Attempt to hash item lock the "search" item. If locked, no + * other callers can incr the refcount + */ + if ((hold_lock = item_trylock(hv)) == NULL) { + pthread_mutex_unlock(&lru_locks[i]); + continue; + } + /* Now see if the item is refcount locked */ + if (refcount_incr(&search->refcount) != 2) { + refcount_decr(&search->refcount); + if (hold_lock) + item_trylock_unlock(hold_lock); + pthread_mutex_unlock(&lru_locks[i]); + continue; + } + + crawlers[i].checked++; + /* Frees the item or decrements the refcount. */ + /* Interface for this could improve: do the free/decr here + * instead? */ + if (!active_crawler_mod.mod->needs_lock) { + pthread_mutex_unlock(&lru_locks[i]); + } + + active_crawler_mod.mod->eval(&active_crawler_mod, search, hv, i); + + if (hold_lock) + item_trylock_unlock(hold_lock); + if (active_crawler_mod.mod->needs_lock) { + pthread_mutex_unlock(&lru_locks[i]); + } + + if (crawls_persleep-- <= 0 && settings.lru_crawler_sleep) { + usleep(settings.lru_crawler_sleep); + crawls_persleep = settings.crawls_persleep; + } + } + } + + if (active_crawler_mod.mod != NULL) { + if (active_crawler_mod.mod->finalize != NULL) + active_crawler_mod.mod->finalize(&active_crawler_mod); + if (active_crawler_mod.c.c != NULL) { + lru_crawler_poll(&active_crawler_mod.c); + lru_crawler_release_client(&active_crawler_mod.c); + } + active_crawler_mod.mod = NULL; + } + + if (settings.verbose > 2) + fprintf(stderr, "LRU crawler thread sleeping\n"); + STATS_LOCK(); + stats_state.lru_crawler_running = false; + STATS_UNLOCK(); + } + pthread_mutex_unlock(&lru_crawler_lock); + if (settings.verbose > 2) + fprintf(stderr, "LRU crawler thread stopping\n"); + + return NULL; +} + +static pthread_t item_crawler_tid; + +int stop_item_crawler_thread(void) { + int ret; + pthread_mutex_lock(&lru_crawler_lock); + do_run_lru_crawler_thread = 0; + pthread_cond_signal(&lru_crawler_cond); + pthread_mutex_unlock(&lru_crawler_lock); + if ((ret = pthread_join(item_crawler_tid, NULL)) != 0) { + fprintf(stderr, "Failed to stop LRU crawler thread: %s\n", strerror(ret)); + return -1; + } + settings.lru_crawler = false; + return 0; +} + +/* Lock dance to "block" until thread is waiting on its condition: + * caller locks mtx. caller spawns thread. + * thread blocks on mutex. + * caller waits on condition, releases lock. + * thread gets lock, sends signal. + * caller can't wait, as thread has lock. + * thread waits on condition, releases lock + * caller wakes on condition, gets lock. + * caller immediately releases lock. + * thread is now safely waiting on condition before the caller returns. + */ +int start_item_crawler_thread(void) { + int ret; + + if (settings.lru_crawler) + return -1; + pthread_mutex_lock(&lru_crawler_lock); + do_run_lru_crawler_thread = 1; + if ((ret = pthread_create(&item_crawler_tid, NULL, + item_crawler_thread, NULL)) != 0) { + fprintf(stderr, "Can't create LRU crawler thread: %s\n", + strerror(ret)); + pthread_mutex_unlock(&lru_crawler_lock); + return -1; + } + /* Avoid returning until the crawler has actually started */ + pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock); + pthread_mutex_unlock(&lru_crawler_lock); + + return 0; +} + +/* 'remaining' is passed in so the LRU maintainer thread can scrub the whole + * LRU every time. + */ +static int do_lru_crawler_start(uint32_t id, uint32_t remaining) { + int i; + uint32_t sid; + uint32_t tocrawl[3]; + int starts = 0; + tocrawl[0] = id | HOT_LRU; + tocrawl[1] = id | WARM_LRU; + tocrawl[2] = id | COLD_LRU; + + for (i = 0; i < 3; i++) { + sid = tocrawl[i]; + pthread_mutex_lock(&lru_locks[sid]); + // TODO: Pretty sure this is a needless optimization. + //if (tails[sid] != NULL) { + if (settings.verbose > 2) + fprintf(stderr, "Kicking LRU crawler off for LRU %u\n", sid); + crawlers[sid].nbytes = 0; + crawlers[sid].nkey = 0; + crawlers[sid].it_flags = 1; /* For a crawler, this means enabled. */ + crawlers[sid].next = 0; + crawlers[sid].prev = 0; + crawlers[sid].time = 0; + crawlers[sid].remaining = remaining; + crawlers[sid].slabs_clsid = sid; + crawlers[sid].reclaimed = 0; + crawlers[sid].unfetched = 0; + crawlers[sid].checked = 0; + do_item_linktail_q((item *)&crawlers[sid]); + crawler_count++; + starts++; + //} + pthread_mutex_unlock(&lru_locks[sid]); + } + if (starts) { + STATS_LOCK(); + stats_state.lru_crawler_running = true; + stats.lru_crawler_starts++; + STATS_UNLOCK(); + } + return starts; +} + +static int lru_crawler_set_client(crawler_module_t *cm, void *c, const int sfd) { + crawler_client_t *crawlc = &cm->c; + if (crawlc->c != NULL) { + return -1; + } + crawlc->c = c; + crawlc->sfd = sfd; + + crawlc->buf = bipbuf_new(1024 * 128); + if (crawlc->buf == NULL) { + return -2; + } + return 0; +} + +int lru_crawler_start(uint8_t *ids, uint32_t remaining, + const enum crawler_run_type type, void *data, + void *c, const int sfd) { + int starts = 0; + if (pthread_mutex_trylock(&lru_crawler_lock) != 0) { + return -1; + } + + /* Configure the module */ + assert(crawler_mod_regs[type] != NULL); + active_crawler_mod.mod = crawler_mod_regs[type]; + if (active_crawler_mod.mod->init != NULL) { + active_crawler_mod.mod->init(&active_crawler_mod, data); + } + if (active_crawler_mod.mod->needs_client) { + if (c == NULL || sfd == 0) { + pthread_mutex_unlock(&lru_crawler_lock); + return -2; + } + if (lru_crawler_set_client(&active_crawler_mod, c, sfd) != 0) { + pthread_mutex_unlock(&lru_crawler_lock); + return -2; + } + } + + for (int sid = POWER_SMALLEST; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) { + if (ids[sid]) { + starts += do_lru_crawler_start(sid, remaining); + } + } + if (starts) { + pthread_cond_signal(&lru_crawler_cond); + } + pthread_mutex_unlock(&lru_crawler_lock); + return starts; +} + +/* + * Also only clear the crawlerstats once per sid. + */ +enum crawler_result_type lru_crawler_crawl(char *slabs, const enum crawler_run_type type, + void *c, const int sfd) { + char *b = NULL; + uint32_t sid = 0; + int starts = 0; + uint8_t tocrawl[MAX_NUMBER_OF_SLAB_CLASSES]; + + /* FIXME: I added this while debugging. Don't think it's needed? */ + memset(tocrawl, 0, sizeof(uint8_t) * MAX_NUMBER_OF_SLAB_CLASSES); + if (strcmp(slabs, "all") == 0) { + for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) { + tocrawl[sid] = 1; + } + } else { + for (char *p = strtok_r(slabs, ",", &b); + p != NULL; + p = strtok_r(NULL, ",", &b)) { + + if (!safe_strtoul(p, &sid) || sid < POWER_SMALLEST + || sid >= MAX_NUMBER_OF_SLAB_CLASSES-1) { + pthread_mutex_unlock(&lru_crawler_lock); + return CRAWLER_BADCLASS; + } + tocrawl[sid] = 1; + } + } + + starts = lru_crawler_start(tocrawl, settings.lru_crawler_tocrawl, + type, NULL, c, sfd); + if (starts == -1) { + return CRAWLER_RUNNING; + } else if (starts == -2) { + return CRAWLER_ERROR; /* FIXME: not very helpful. */ + } else if (starts) { + return CRAWLER_OK; + } else { + return CRAWLER_NOTSTARTED; + } +} + +/* If we hold this lock, crawler can't wake up or move */ +void lru_crawler_pause(void) { + pthread_mutex_lock(&lru_crawler_lock); +} + +void lru_crawler_resume(void) { + pthread_mutex_unlock(&lru_crawler_lock); +} + +int init_lru_crawler(void) { + if (lru_crawler_initialized == 0) { + if (pthread_cond_init(&lru_crawler_cond, NULL) != 0) { + fprintf(stderr, "Can't initialize lru crawler condition\n"); + return -1; + } + pthread_mutex_init(&lru_crawler_lock, NULL); + active_crawler_mod.c.c = NULL; + active_crawler_mod.mod = NULL; + active_crawler_mod.data = NULL; + lru_crawler_initialized = 1; + } + return 0; +} diff --git a/crawler.h b/crawler.h new file mode 100644 index 0000000000..7c7c1d2277 --- /dev/null +++ b/crawler.h @@ -0,0 +1,39 @@ +#ifndef CRAWLER_H +#define CRAWLER_H + +typedef struct { + uint64_t histo[61]; + uint64_t ttl_hourplus; + uint64_t noexp; + uint64_t reclaimed; + uint64_t seen; + rel_time_t start_time; + rel_time_t end_time; + bool run_complete; +} crawlerstats_t; + +struct crawler_expired_data { + pthread_mutex_t lock; + crawlerstats_t crawlerstats[MAX_NUMBER_OF_SLAB_CLASSES]; + /* redundant with crawlerstats_t so we can get overall start/stop/done */ + rel_time_t start_time; + rel_time_t end_time; + bool crawl_complete; + bool is_external; /* whether this was an alloc local or remote to the module. */ +}; + +enum crawler_result_type { + CRAWLER_OK=0, CRAWLER_RUNNING, CRAWLER_BADCLASS, CRAWLER_NOTSTARTED, CRAWLER_ERROR +}; + +int start_item_crawler_thread(void); +int stop_item_crawler_thread(void); +int init_lru_crawler(void); +enum crawler_result_type lru_crawler_crawl(char *slabs, enum crawler_run_type, void *c, const int sfd); +int lru_crawler_start(uint8_t *ids, uint32_t remaining, + const enum crawler_run_type type, void *data, + void *c, const int sfd); +void lru_crawler_pause(void); +void lru_crawler_resume(void); + +#endif diff --git a/items.c b/items.c index e245eaaeb0..4399f2472c 100644 --- a/items.c +++ b/items.c @@ -19,14 +19,8 @@ static void item_link_q(item *it); static void item_unlink_q(item *it); -#define HOT_LRU 0 -#define WARM_LRU 64 -#define COLD_LRU 128 -#define NOEXP_LRU 192 static unsigned int lru_type_map[4] = {HOT_LRU, WARM_LRU, COLD_LRU, NOEXP_LRU}; -#define CLEAR_LRU(id) (id & ~(3<<6)) - #define LARGEST_ID POWER_LARGEST typedef struct { uint64_t evicted; @@ -46,92 +40,8 @@ typedef struct { rel_time_t evicted_time; } itemstats_t; -typedef struct { - void *c; /* original connection structure. still with source thread attached. */ - int sfd; /* client fd. */ - bipbuf_t *buf; /* output buffer */ - char *cbuf; /* current buffer */ -} crawler_client_t; - -typedef struct _crawler_module_t crawler_module_t; - -typedef void (*crawler_eval_func)(crawler_module_t *cm, item *it, uint32_t hv, int slab_cls); -typedef int (*crawler_init_func)(crawler_module_t *cm, void *data); // TODO: init args? -typedef void (*crawler_deinit_func)(crawler_module_t *cm); // TODO: extra args? -typedef void (*crawler_doneclass_func)(crawler_module_t *cm, int slab_cls); -typedef void (*crawler_finalize_func)(crawler_module_t *cm); - -typedef struct { - crawler_init_func init; /* run before crawl starts */ - crawler_eval_func eval; /* runs on an item. */ - crawler_doneclass_func doneclass; /* runs once per sub-crawler completion. */ - crawler_finalize_func finalize; /* runs once when all sub-crawlers are done. */ - bool needs_lock; /* whether or not we need the LRU lock held when eval is called */ - bool needs_client; /* whether or not to grab onto the remote client */ -} crawler_module_reg_t; - -struct _crawler_module_t { - void *data; /* opaque data pointer */ - crawler_client_t c; - crawler_module_reg_t *mod; -}; - -typedef struct { - uint64_t histo[61]; - uint64_t ttl_hourplus; - uint64_t noexp; - uint64_t reclaimed; - uint64_t seen; - rel_time_t start_time; - rel_time_t end_time; - bool run_complete; -} crawlerstats_t; - -struct crawler_expired_data { - pthread_mutex_t lock; - crawlerstats_t crawlerstats[MAX_NUMBER_OF_SLAB_CLASSES]; - /* redundant with crawlerstats_t so we can get overall start/stop/done */ - rel_time_t start_time; - rel_time_t end_time; - bool crawl_complete; - bool is_external; /* whether this was an alloc local or remote to the module. */ -}; - -static int crawler_expired_init(crawler_module_t *cm, void *data); -static void crawler_expired_doneclass(crawler_module_t *cm, int slab_cls); -static void crawler_expired_finalize(crawler_module_t *cm); -static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv, int i); - -crawler_module_reg_t crawler_expired_mod = { - .init = crawler_expired_init, - .eval = crawler_expired_eval, - .doneclass = crawler_expired_doneclass, - .finalize = crawler_expired_finalize, - .needs_lock = true, - .needs_client = false -}; - -static void crawler_metadump_eval(crawler_module_t *cm, item *search, uint32_t hv, int i); - -crawler_module_reg_t crawler_metadump_mod = { - .init = NULL, - .eval = crawler_metadump_eval, - .doneclass = NULL, - .finalize = NULL, - .needs_lock = false, - .needs_client = true -}; - -crawler_module_reg_t *crawler_mod_regs[2] = { - &crawler_expired_mod, - &crawler_metadump_mod -}; - -crawler_module_t active_crawler_mod; - static item *heads[LARGEST_ID]; static item *tails[LARGEST_ID]; -static crawler crawlers[LARGEST_ID]; static itemstats_t itemstats[LARGEST_ID]; static unsigned int sizes[LARGEST_ID]; static uint64_t sizes_bytes[LARGEST_ID]; @@ -139,14 +49,9 @@ static unsigned int *stats_sizes_hist = NULL; static uint64_t stats_sizes_cas_min = 0; static int stats_sizes_buckets = 0; -static int crawler_count = 0; -static volatile int do_run_lru_crawler_thread = 0; static volatile int do_run_lru_maintainer_thread = 0; -static int lru_crawler_initialized = 0; static int lru_maintainer_initialized = 0; static int lru_maintainer_check_clsid = 0; -static pthread_mutex_t lru_crawler_lock = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t lru_crawler_cond = PTHREAD_COND_INITIALIZER; static pthread_mutex_t lru_maintainer_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t cas_id_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t stats_sizes_lock = PTHREAD_MUTEX_INITIALIZER; @@ -160,13 +65,19 @@ void item_stats_reset(void) { } } +/* called with class lru lock held */ +void do_item_stats_add_crawl(const int i, const uint64_t reclaimed, + const uint64_t unfetched, const uint64_t checked) { + itemstats[i].crawler_reclaimed += reclaimed; + itemstats[i].expired_unfetched += unfetched; + itemstats[i].crawler_items_checked += checked; +} + #define LRU_PULL_EVICT 1 #define LRU_PULL_CRAWL_BLOCKS 2 static int lru_pull_tail(const int orig_id, const int cur_lru, const uint64_t total_bytes, uint8_t flags); -static int lru_crawler_start(uint8_t *ids, uint32_t remaining, const enum crawler_run_type type, - void *data, void *c, const int sfd); /* Get the next CAS id for a new item. */ /* TODO: refactor some atomics for this. */ @@ -1295,16 +1206,15 @@ int init_lru_maintainer(void) { return 0; } -/*** ITEM CRAWLER THREAD ***/ - -static void crawler_link_q(item *it) { /* item is the new tail */ +/* Tail linkers and crawler for the LRU crawler. */ +void do_item_linktail_q(item *it) { /* item is the new tail */ item **head, **tail; assert(it->it_flags == 1); assert(it->nbytes == 0); head = &heads[it->slabs_clsid]; tail = &tails[it->slabs_clsid]; - assert(*tail != 0); + //assert(*tail != 0); assert(it != *tail); assert((*head && *tail) || (*head == 0 && *tail == 0)); it->prev = *tail; @@ -1318,7 +1228,7 @@ static void crawler_link_q(item *it) { /* item is the new tail */ return; } -static void crawler_unlink_q(item *it) { +void do_item_unlinktail_q(item *it) { item **head, **tail; head = &heads[it->slabs_clsid]; tail = &tails[it->slabs_clsid]; @@ -1341,7 +1251,7 @@ static void crawler_unlink_q(item *it) { /* This is too convoluted, but it's a difficult shuffle. Try to rewrite it * more clearly. */ -static item *crawler_crawl_q(item *it) { +item *do_item_crawl_q(item *it) { item **head, **tail; assert(it->it_flags == 1); assert(it->nbytes == 0); @@ -1394,529 +1304,3 @@ static item *crawler_crawl_q(item *it) { return it->next; /* success */ } - -#define LRU_CRAWLER_WRITEBUF 8192 - -static void lru_crawler_close_client(crawler_client_t *c) { - //fprintf(stderr, "CRAWLER: Closing client\n"); - sidethread_conn_close(c->c); - c->c = NULL; - c->cbuf = NULL; - bipbuf_free(c->buf); - c->buf = NULL; -} - -static void lru_crawler_release_client(crawler_client_t *c) { - //fprintf(stderr, "CRAWLER: Closing client\n"); - redispatch_conn(c->c); - c->c = NULL; - c->cbuf = NULL; - bipbuf_free(c->buf); - c->buf = NULL; -} - -static int crawler_expired_init(crawler_module_t *cm, void *data) { - struct crawler_expired_data *d; - if (data != NULL) { - d = data; - d->is_external = true; - cm->data = data; - } else { - // allocate data. - d = calloc(1, sizeof(struct crawler_expired_data)); - if (d == NULL) { - return -1; - } - // init lock. - pthread_mutex_init(&d->lock, NULL); - d->is_external = false; - d->start_time = current_time; - - cm->data = d; - } - pthread_mutex_lock(&d->lock); - memset(&d->crawlerstats, 0, sizeof(crawlerstats_t) * MAX_NUMBER_OF_SLAB_CLASSES); - for (int x = 0; x < MAX_NUMBER_OF_SLAB_CLASSES; x++) { - d->crawlerstats[x].start_time = current_time; - d->crawlerstats[x].run_complete = false; - } - pthread_mutex_unlock(&d->lock); - return 0; -} - -static void crawler_expired_doneclass(crawler_module_t *cm, int slab_cls) { - struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data; - pthread_mutex_lock(&d->lock); - d->crawlerstats[CLEAR_LRU(slab_cls)].end_time = current_time; - d->crawlerstats[CLEAR_LRU(slab_cls)].run_complete = true; - pthread_mutex_unlock(&d->lock); -} - -static void crawler_expired_finalize(crawler_module_t *cm) { - struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data; - pthread_mutex_lock(&d->lock); - d->end_time = current_time; - d->crawl_complete = true; - pthread_mutex_unlock(&d->lock); - - if (!d->is_external) { - free(d); - } -} - -/* I pulled this out to make the main thread clearer, but it reaches into the - * main thread's values too much. Should rethink again. - */ -static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv, int i) { - int slab_id = CLEAR_LRU(i); - struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data; - pthread_mutex_lock(&d->lock); - crawlerstats_t *s = &d->crawlerstats[slab_id]; - int is_flushed = item_is_flushed(search); - if ((search->exptime != 0 && search->exptime < current_time) - || is_flushed) { - itemstats[i].crawler_reclaimed++; - s->reclaimed++; - - if (settings.verbose > 1) { - int ii; - char *key = ITEM_key(search); - fprintf(stderr, "LRU crawler found an expired item (flags: %d, slab: %d): ", - search->it_flags, search->slabs_clsid); - for (ii = 0; ii < search->nkey; ++ii) { - fprintf(stderr, "%c", key[ii]); - } - fprintf(stderr, "\n"); - } - if ((search->it_flags & ITEM_FETCHED) == 0 && !is_flushed) { - itemstats[i].expired_unfetched++; - } - do_item_unlink_nolock(search, hv); - do_item_remove(search); - assert(search->slabs_clsid == 0); - } else { - s->seen++; - refcount_decr(&search->refcount); - if (search->exptime == 0) { - s->noexp++; - } else if (search->exptime - current_time > 3599) { - s->ttl_hourplus++; - } else { - rel_time_t ttl_remain = search->exptime - current_time; - int bucket = ttl_remain / 60; - s->histo[bucket]++; - } - } - pthread_mutex_unlock(&d->lock); -} - -static void crawler_metadump_eval(crawler_module_t *cm, item *it, uint32_t hv, int i) { - //int slab_id = CLEAR_LRU(i); - char keybuf[KEY_MAX_LENGTH * 3 + 1]; - int is_flushed = item_is_flushed(it); - /* Ignore expired content. */ - if ((it->exptime != 0 && it->exptime < current_time) - || is_flushed) { - refcount_decr(&it->refcount); - return; - } - // TODO: uriencode directly into the buffer. - uriencode(ITEM_key(it), keybuf, it->nkey, KEY_MAX_LENGTH * 3 + 1); - int total = snprintf(cm->c.cbuf, 4096, - "key=%s exp=%ld la=%llu cas=%llu fetch=%s\n", - keybuf, - (it->exptime == 0) ? -1 : (long)it->exptime + process_started, - (unsigned long long)it->time + process_started, - (unsigned long long)ITEM_get_cas(it), - (it->it_flags & ITEM_FETCHED) ? "yes" : "no"); - refcount_decr(&it->refcount); - // TODO: some way of tracking the errors. these are very unlikely though. - if (total >= LRU_CRAWLER_WRITEBUF - 1 || total <= 0) { - /* Failed to write, don't push it. */ - return; - } - bipbuf_push(cm->c.buf, total); -} - -static int lru_crawler_poll(crawler_client_t *c) { - unsigned char *data; - unsigned int data_size = 0; - struct pollfd to_poll[1]; - to_poll[0].fd = c->sfd; - to_poll[0].events = POLLOUT; - - int ret = poll(to_poll, 1, 1000); - - if (ret < 0) { - // fatal. - return -1; - } - - if (ret == 0) return 0; - - if (to_poll[0].revents & POLLIN) { - char buf[1]; - int res = read(c->sfd, buf, 1); - if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) { - lru_crawler_close_client(c); - return -1; - } - } - if ((data = bipbuf_peek_all(c->buf, &data_size)) != NULL) { - if (to_poll[0].revents & (POLLHUP|POLLERR)) { - lru_crawler_close_client(c); - return -1; - } else if (to_poll[0].revents & POLLOUT) { - int total = write(c->sfd, data, data_size); - if (total == -1) { - if (errno != EAGAIN && errno != EWOULDBLOCK) { - lru_crawler_close_client(c); - return -1; - } - } else if (total == 0) { - lru_crawler_close_client(c); - return -1; - } else { - bipbuf_poll(c->buf, total); - } - } - } - return 0; -} - -/* Grab some space to work with, if none exists, run the poll() loop and wait - * for it to clear up or close. - * Return NULL if closed. - */ -static int lru_crawler_client_getbuf(crawler_client_t *c) { - void *buf = NULL; - if (c->c == NULL) return -1; - /* not enough space. */ - while ((buf = bipbuf_request(c->buf, LRU_CRAWLER_WRITEBUF)) == NULL) { - // TODO: max loops before closing. - int ret = lru_crawler_poll(c); - if (ret < 0) return ret; - } - - c->cbuf = buf; - return 0; -} - -static void lru_crawler_class_done(int i) { - crawlers[i].it_flags = 0; - crawler_count--; - crawler_unlink_q((item *)&crawlers[i]); - pthread_mutex_unlock(&lru_locks[i]); - if (active_crawler_mod.mod->doneclass != NULL) - active_crawler_mod.mod->doneclass(&active_crawler_mod, i); -} - -static void *item_crawler_thread(void *arg) { - int i; - int crawls_persleep = settings.crawls_persleep; - - pthread_mutex_lock(&lru_crawler_lock); - pthread_cond_signal(&lru_crawler_cond); - settings.lru_crawler = true; - if (settings.verbose > 2) - fprintf(stderr, "Starting LRU crawler background thread\n"); - while (do_run_lru_crawler_thread) { - pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock); - - while (crawler_count) { - item *search = NULL; - void *hold_lock = NULL; - - for (i = POWER_SMALLEST; i < LARGEST_ID; i++) { - if (crawlers[i].it_flags != 1) { - continue; - } - - /* Get memory from bipbuf, if client has no space, flush. */ - if (active_crawler_mod.c.c != NULL) { - int ret = lru_crawler_client_getbuf(&active_crawler_mod.c); - if (ret != 0) { - // FIXME: Kill off the whole run. - lru_crawler_class_done(i); - continue; - } - } - pthread_mutex_lock(&lru_locks[i]); - search = crawler_crawl_q((item *)&crawlers[i]); - if (search == NULL || - (crawlers[i].remaining && --crawlers[i].remaining < 1)) { - if (settings.verbose > 2) - fprintf(stderr, "Nothing left to crawl for %d\n", i); - lru_crawler_class_done(i); - continue; - } - uint32_t hv = hash(ITEM_key(search), search->nkey); - /* Attempt to hash item lock the "search" item. If locked, no - * other callers can incr the refcount - */ - if ((hold_lock = item_trylock(hv)) == NULL) { - pthread_mutex_unlock(&lru_locks[i]); - continue; - } - /* Now see if the item is refcount locked */ - if (refcount_incr(&search->refcount) != 2) { - refcount_decr(&search->refcount); - if (hold_lock) - item_trylock_unlock(hold_lock); - pthread_mutex_unlock(&lru_locks[i]); - continue; - } - - itemstats[i].crawler_items_checked++; - /* Frees the item or decrements the refcount. */ - /* Interface for this could improve: do the free/decr here - * instead? */ - if (!active_crawler_mod.mod->needs_lock) { - pthread_mutex_unlock(&lru_locks[i]); - } - - active_crawler_mod.mod->eval(&active_crawler_mod, search, hv, i); - - if (hold_lock) - item_trylock_unlock(hold_lock); - if (active_crawler_mod.mod->needs_lock) { - pthread_mutex_unlock(&lru_locks[i]); - } - - if (crawls_persleep-- <= 0 && settings.lru_crawler_sleep) { - usleep(settings.lru_crawler_sleep); - crawls_persleep = settings.crawls_persleep; - } - } - } - - if (active_crawler_mod.mod != NULL) { - if (active_crawler_mod.mod->finalize != NULL) - active_crawler_mod.mod->finalize(&active_crawler_mod); - if (active_crawler_mod.c.c != NULL) { - lru_crawler_poll(&active_crawler_mod.c); - lru_crawler_release_client(&active_crawler_mod.c); - } - active_crawler_mod.mod = NULL; - } - - if (settings.verbose > 2) - fprintf(stderr, "LRU crawler thread sleeping\n"); - STATS_LOCK(); - stats_state.lru_crawler_running = false; - STATS_UNLOCK(); - } - pthread_mutex_unlock(&lru_crawler_lock); - if (settings.verbose > 2) - fprintf(stderr, "LRU crawler thread stopping\n"); - - return NULL; -} - -static pthread_t item_crawler_tid; - -int stop_item_crawler_thread(void) { - int ret; - pthread_mutex_lock(&lru_crawler_lock); - do_run_lru_crawler_thread = 0; - pthread_cond_signal(&lru_crawler_cond); - pthread_mutex_unlock(&lru_crawler_lock); - if ((ret = pthread_join(item_crawler_tid, NULL)) != 0) { - fprintf(stderr, "Failed to stop LRU crawler thread: %s\n", strerror(ret)); - return -1; - } - settings.lru_crawler = false; - return 0; -} - -/* Lock dance to "block" until thread is waiting on its condition: - * caller locks mtx. caller spawns thread. - * thread blocks on mutex. - * caller waits on condition, releases lock. - * thread gets lock, sends signal. - * caller can't wait, as thread has lock. - * thread waits on condition, releases lock - * caller wakes on condition, gets lock. - * caller immediately releases lock. - * thread is now safely waiting on condition before the caller returns. - */ -int start_item_crawler_thread(void) { - int ret; - - if (settings.lru_crawler) - return -1; - pthread_mutex_lock(&lru_crawler_lock); - do_run_lru_crawler_thread = 1; - if ((ret = pthread_create(&item_crawler_tid, NULL, - item_crawler_thread, NULL)) != 0) { - fprintf(stderr, "Can't create LRU crawler thread: %s\n", - strerror(ret)); - pthread_mutex_unlock(&lru_crawler_lock); - return -1; - } - /* Avoid returning until the crawler has actually started */ - pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock); - pthread_mutex_unlock(&lru_crawler_lock); - - return 0; -} - -/* 'remaining' is passed in so the LRU maintainer thread can scrub the whole - * LRU every time. - */ -static int do_lru_crawler_start(uint32_t id, uint32_t remaining) { - int i; - uint32_t sid; - uint32_t tocrawl[3]; - int starts = 0; - tocrawl[0] = id | HOT_LRU; - tocrawl[1] = id | WARM_LRU; - tocrawl[2] = id | COLD_LRU; - - for (i = 0; i < 3; i++) { - sid = tocrawl[i]; - pthread_mutex_lock(&lru_locks[sid]); - if (tails[sid] != NULL) { - if (settings.verbose > 2) - fprintf(stderr, "Kicking LRU crawler off for LRU %u\n", sid); - crawlers[sid].nbytes = 0; - crawlers[sid].nkey = 0; - crawlers[sid].it_flags = 1; /* For a crawler, this means enabled. */ - crawlers[sid].next = 0; - crawlers[sid].prev = 0; - crawlers[sid].time = 0; - crawlers[sid].remaining = remaining; - crawlers[sid].slabs_clsid = sid; - crawler_link_q((item *)&crawlers[sid]); - crawler_count++; - starts++; - } - pthread_mutex_unlock(&lru_locks[sid]); - } - if (starts) { - STATS_LOCK(); - stats_state.lru_crawler_running = true; - stats.lru_crawler_starts++; - STATS_UNLOCK(); - } - return starts; -} - -static int lru_crawler_set_client(crawler_module_t *cm, void *c, const int sfd) { - crawler_client_t *crawlc = &cm->c; - if (crawlc->c != NULL) { - return -1; - } - crawlc->c = c; - crawlc->sfd = sfd; - - crawlc->buf = bipbuf_new(1024 * 128); - if (crawlc->buf == NULL) { - return -2; - } - return 0; -} - -static int lru_crawler_start(uint8_t *ids, uint32_t remaining, - const enum crawler_run_type type, void *data, - void *c, const int sfd) { - int starts = 0; - if (pthread_mutex_trylock(&lru_crawler_lock) != 0) { - return -1; - } - - /* Configure the module */ - assert(crawler_mod_regs[type] != NULL); - active_crawler_mod.mod = crawler_mod_regs[type]; - if (active_crawler_mod.mod->init != NULL) { - active_crawler_mod.mod->init(&active_crawler_mod, data); - } - if (active_crawler_mod.mod->needs_client) { - if (c == NULL || sfd == 0) { - pthread_mutex_unlock(&lru_crawler_lock); - return -2; - } - if (lru_crawler_set_client(&active_crawler_mod, c, sfd) != 0) { - pthread_mutex_unlock(&lru_crawler_lock); - return -2; - } - } - - for (int sid = POWER_SMALLEST; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) { - if (ids[sid]) { - starts += do_lru_crawler_start(sid, remaining); - } - } - if (starts) { - pthread_cond_signal(&lru_crawler_cond); - } - pthread_mutex_unlock(&lru_crawler_lock); - return starts; -} - -/* - * Also only clear the crawlerstats once per sid. - */ -enum crawler_result_type lru_crawler_crawl(char *slabs, const enum crawler_run_type type, - void *c, const int sfd) { - char *b = NULL; - uint32_t sid = 0; - int starts = 0; - uint8_t tocrawl[MAX_NUMBER_OF_SLAB_CLASSES]; - - /* FIXME: I added this while debugging. Don't think it's needed? */ - memset(tocrawl, 0, sizeof(uint8_t) * MAX_NUMBER_OF_SLAB_CLASSES); - if (strcmp(slabs, "all") == 0) { - for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) { - tocrawl[sid] = 1; - } - } else { - for (char *p = strtok_r(slabs, ",", &b); - p != NULL; - p = strtok_r(NULL, ",", &b)) { - - if (!safe_strtoul(p, &sid) || sid < POWER_SMALLEST - || sid >= MAX_NUMBER_OF_SLAB_CLASSES-1) { - pthread_mutex_unlock(&lru_crawler_lock); - return CRAWLER_BADCLASS; - } - tocrawl[sid] = 1; - } - } - - starts = lru_crawler_start(tocrawl, settings.lru_crawler_tocrawl, - type, NULL, c, sfd); - if (starts == -1) { - return CRAWLER_RUNNING; - } else if (starts == -2) { - return CRAWLER_ERROR; /* FIXME: not very helpful. */ - } else if (starts) { - return CRAWLER_OK; - } else { - return CRAWLER_NOTSTARTED; - } -} - -/* If we hold this lock, crawler can't wake up or move */ -void lru_crawler_pause(void) { - pthread_mutex_lock(&lru_crawler_lock); -} - -void lru_crawler_resume(void) { - pthread_mutex_unlock(&lru_crawler_lock); -} - -int init_lru_crawler(void) { - if (lru_crawler_initialized == 0) { - if (pthread_cond_init(&lru_crawler_cond, NULL) != 0) { - fprintf(stderr, "Can't initialize lru crawler condition\n"); - return -1; - } - pthread_mutex_init(&lru_crawler_lock, NULL); - active_crawler_mod.c.c = NULL; - active_crawler_mod.mod = NULL; - active_crawler_mod.data = NULL; - lru_crawler_initialized = 1; - } - return 0; -} diff --git a/items.h b/items.h index d9d99e3699..cc89a6113d 100644 --- a/items.h +++ b/items.h @@ -1,3 +1,10 @@ +#define HOT_LRU 0 +#define WARM_LRU 64 +#define COLD_LRU 128 +#define NOEXP_LRU 192 + +#define CLEAR_LRU(id) (id & ~(3<<6)) + /* See items.c */ uint64_t get_cas_id(void); @@ -16,9 +23,15 @@ int do_item_replace(item *it, item *new_it, const uint32_t hv); int item_is_flushed(item *it); +void do_item_linktail_q(item *it); +void do_item_unlinktail_q(item *it); +item *do_item_crawl_q(item *it); + /*@null@*/ char *item_cachedump(const unsigned int slabs_clsid, const unsigned int limit, unsigned int *bytes); void item_stats(ADD_STAT add_stats, void *c); +void do_item_stats_add_crawl(const int i, const uint64_t reclaimed, + const uint64_t unfetched, const uint64_t checked); void item_stats_totals(ADD_STAT add_stats, void *c); /*@null@*/ void item_stats_sizes(ADD_STAT add_stats, void *c); @@ -34,19 +47,8 @@ item *do_item_touch(const char *key, const size_t nkey, uint32_t exptime, const void item_stats_reset(void); extern pthread_mutex_t lru_locks[POWER_LARGEST]; -enum crawler_result_type { - CRAWLER_OK=0, CRAWLER_RUNNING, CRAWLER_BADCLASS, CRAWLER_NOTSTARTED, CRAWLER_ERROR -}; - int start_lru_maintainer_thread(void); int stop_lru_maintainer_thread(void); int init_lru_maintainer(void); void lru_maintainer_pause(void); void lru_maintainer_resume(void); - -int start_item_crawler_thread(void); -int stop_item_crawler_thread(void); -int init_lru_crawler(void); -enum crawler_result_type lru_crawler_crawl(char *slabs, enum crawler_run_type, void *c, const int sfd); -void lru_crawler_pause(void); -void lru_crawler_resume(void); diff --git a/memcached.c b/memcached.c index bc3636c5a5..8412d47050 100644 --- a/memcached.c +++ b/memcached.c @@ -4059,7 +4059,6 @@ static void process_command(conn *c, char *command) { } else { out_string(c, "ERROR failed to stop lru crawler thread"); } - } else if ((strcmp(tokens[COMMAND_TOKEN + 1].value, "metadump") == 0)) { } else { out_string(c, "ERROR"); } diff --git a/memcached.h b/memcached.h index 359c122020..f7f4cd4a3e 100644 --- a/memcached.h +++ b/memcached.h @@ -437,6 +437,9 @@ typedef struct { uint8_t slabs_clsid;/* which slab class we're in */ uint8_t nkey; /* key length, w/terminating null and padding */ uint32_t remaining; /* Max keys to crawl per slab per invocation */ + uint64_t reclaimed; /* items reclaimed during this crawl. */ + uint64_t unfetched; /* items reclaiemd unfetched during this crawl. */ + uint64_t checked; /* items examined during this crawl. */ } crawler; /* Header when an item is actually a chunk of another item. */ @@ -605,6 +608,7 @@ extern int daemonize(int nochdir, int noclose); #include "slabs.h" #include "assoc.h" #include "items.h" +#include "crawler.h" #include "trace.h" #include "hash.h" #include "util.h"