Skip to content

Commit

Permalink
Add feature to allow writes only via the local loopback interface - t…
Browse files Browse the repository at this point in the history
…his allows memcached to be used as an in memory cache in front of a data server that only ever gets data from its local server but is network readable
  • Loading branch information
blsemo committed Oct 31, 2016
1 parent 5d2cc64 commit db96695
Show file tree
Hide file tree
Showing 4 changed files with 343 additions and 18 deletions.
124 changes: 116 additions & 8 deletions memcached.c
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ static void settings_init(void) {
settings.crawls_persleep = 1000;
settings.logger_watcher_buf_size = LOGGER_WATCHER_BUF_SIZE;
settings.logger_buf_size = LOGGER_BUF_SIZE;
settings.local_write_only = false;
settings.anonymous_reads = false;
}

/*
Expand Down Expand Up @@ -433,6 +435,8 @@ void conn_close_idle(conn *c) {
if (settings.idle_timeout > 0 &&
(current_time - c->last_cmd_time) > settings.idle_timeout) {
if (c->state != conn_new_cmd && c->state != conn_read) {
settings.local_write_only = false;
settings.anonymous_reads = false;
if (settings.verbose > 1)
fprintf(stderr,
"fd %d wants to timeout, but isn't in read state", c->sfd);
Expand Down Expand Up @@ -464,7 +468,8 @@ void conn_worker_readd(conn *c) {
conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
const int read_buffer_size, enum network_transport transport,
struct event_base *base) {
struct event_base *base,
bool local) {
conn *c;

assert(sfd >= 0 && sfd < max_fds);
Expand Down Expand Up @@ -522,6 +527,7 @@ conn *conn_new(const int sfd, enum conn_states init_state,

c->transport = transport;
c->protocol = settings.binding_protocol;
c->local = local;

/* unix socket mode doesn't need this, so zeroed out. but why
* is this done for every command? presumably for UDP
Expand Down Expand Up @@ -1971,6 +1977,39 @@ static void process_bin_complete_sasl_auth(conn *c) {
}
}

static bool local_write_check(conn *c)
{
assert(settings.local_write_only);
bool rv = c->local;
if (!c->local){
switch (c->cmd){
/* SASL fall-through */
case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS:
case PROTOCOL_BINARY_CMD_SASL_AUTH:
case PROTOCOL_BINARY_CMD_SASL_STEP:
case PROTOCOL_BINARY_CMD_VERSION:
/* anonymous reads */
case PROTOCOL_BINARY_CMD_GET:
case PROTOCOL_BINARY_CMD_GETQ:
case PROTOCOL_BINARY_CMD_GETK:
case PROTOCOL_BINARY_CMD_GETKQ:
case PROTOCOL_BINARY_CMD_STAT:
case PROTOCOL_BINARY_CMD_NOOP:
case PROTOCOL_BINARY_CMD_QUIT:
case PROTOCOL_BINARY_CMD_QUITQ:
rv = true;
break;
}
}

if (settings.verbose > 1){
fprintf(stderr, "%d: local_write_check() in cmd 0x%02x is %s\n",
c->sfd, c->cmd, rv ? "true" : "false");
}
return rv;

}

static bool authenticated(conn *c) {
assert(settings.sasl);
bool rv = false;
Expand Down Expand Up @@ -2006,6 +2045,13 @@ static void dispatch_bin_command(conn *c) {
c->write_and_go = conn_closing;
return;
}

if (settings.local_write_only && !local_write_check(c)){
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, NULL, 0);
c->write_and_go = conn_closing;
return;
}


MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
c->noreply = true;
Expand Down Expand Up @@ -3793,8 +3839,28 @@ static void process_memlimit_command(conn *c, token_t *tokens, const size_t ntok
}
}

static void process_command(conn *c, char *command) {
static bool local_write_ascii_check(conn *c, char *cmdToken){
assert(settings.local_write_only);
bool rv = c->local;
if (!c->local){
if (strcmp(cmdToken, "get" ) == 0
|| strcmp(cmdToken, "gets" ) == 0
|| strcmp(cmdToken, "bget" ) == 0
|| strcmp(cmdToken, "quit" ) == 0
|| strcmp(cmdToken, "stats" ) == 0
|| strcmp(cmdToken, "version" ) == 0 ){
rv = true;
}
}

if (settings.verbose > 1){
fprintf(stderr, "%d: local_write_ascii_check() in cmd '%s' is %s\n",
c-> sfd, cmdToken, rv ? "true" : "false");
}
return rv;
}

static void process_command(conn *c, char *command) {
token_t tokens[MAX_TOKENS];
size_t ntokens;
int comm;
Expand All @@ -3820,6 +3886,13 @@ static void process_command(conn *c, char *command) {
}

ntokens = tokenize_command(command, tokens, MAX_TOKENS);

if (settings.local_write_only
&& !local_write_ascii_check(c, tokens[COMMAND_TOKEN].value)){
out_string(c, "AUTH_ERROR Remote write not allowed");
return;
}

if (ntokens >= 3 &&
((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
(strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
Expand Down Expand Up @@ -4577,7 +4650,7 @@ static void drive_machine(conn *c) {
STATS_UNLOCK();
} else {
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
DATA_BUFFER_SIZE, c->transport);
DATA_BUFFER_SIZE, c->transport, c->local);
}

stop = true;
Expand Down Expand Up @@ -4930,6 +5003,20 @@ static void maximize_sndbuf(const int sfd) {
fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
}

const char * LOCALHOST_IPV4 = "127.0.0.1";
const char * LOCALHOST_IPV6 = "::1";

static bool is_local_host(const char *iface)
{
if (iface == NULL)
{
return false;
}
return strcmp(iface, "localhost") == 0
|| strcmp(iface, LOCALHOST_IPV4) == 0
|| strcmp(iface, LOCALHOST_IPV6) == 0;
}

/**
* Create a socket and bind it to a specific port number
* @param interface the interface to bind to
Expand Down Expand Up @@ -4968,7 +5055,7 @@ static int server_socket(const char *interface,
perror("getaddrinfo()");
return 1;
}

for (next= ai; next; next= next->ai_next) {
conn *listen_conn_add;
if ((sfd = new_socket(next)) == -1) {
Expand Down Expand Up @@ -5065,12 +5152,13 @@ static int server_socket(const char *interface,
int per_thread_fd = c ? dup(sfd) : sfd;
dispatch_conn_new(per_thread_fd, conn_read,
EV_READ | EV_PERSIST,
UDP_READ_BUFFER_SIZE, transport);
UDP_READ_BUFFER_SIZE, transport, false);
}
} else {
if (!(listen_conn_add = conn_new(sfd, conn_listening,
EV_READ | EV_PERSIST, 1,
transport, main_base))) {
transport, main_base,
is_local_host(interface)))) {
fprintf(stderr, "failed to create listening connection\n");
exit(EXIT_FAILURE);
}
Expand All @@ -5085,6 +5173,7 @@ static int server_socket(const char *interface,
return success == 0;
}


static int server_sockets(int port, enum network_transport transport,
FILE *portnumber_file) {
if (settings.inter == NULL) {
Expand Down Expand Up @@ -5217,7 +5306,7 @@ static int server_socket_unix(const char *path, int access_mask) {
}
if (!(listen_conn = conn_new(sfd, conn_listening,
EV_READ | EV_PERSIST, 1,
local_transport, main_base))) {
local_transport, main_base,false))) {
fprintf(stderr, "failed to create listening connection\n");
exit(EXIT_FAILURE);
}
Expand Down Expand Up @@ -5669,7 +5758,9 @@ int main (int argc, char **argv) {
SLAB_SIZES,
SLAB_CHUNK_MAX,
TRACK_SIZES,
MODERN
MODERN,
LOCAL_WRITE_ONLY,
ANONYMOUS_READ
};
char *const subopts_tokens[] = {
[MAXCONNS_FAST] = "maxconns_fast",
Expand All @@ -5692,6 +5783,8 @@ int main (int argc, char **argv) {
[SLAB_CHUNK_MAX] = "slab_chunk_max",
[TRACK_SIZES] = "track_sizes",
[MODERN] = "modern",
[LOCAL_WRITE_ONLY] = "local_write_only",
[ANONYMOUS_READ] = "anonymous_read",
NULL
};

Expand Down Expand Up @@ -6121,6 +6214,18 @@ int main (int argc, char **argv) {
start_lru_crawler = true;
start_lru_maintainer = true;
break;
case LOCAL_WRITE_ONLY:
settings.local_write_only = true;
break;
case ANONYMOUS_READ:
#ifndef ENABLE_SASL
fprintf(stderr,
"This server is not built with SASL support.\n");
exit(EX_USAGE);
#endif
settings.anonymous_reads = true;
break;

default:
printf("Illegal suboption \"%s\"\n", subopts_value);
return 1;
Expand All @@ -6134,6 +6239,9 @@ int main (int argc, char **argv) {
return 1;
}
}
if(settings.verbose > 0 && settings.local_write_only){
fprintf(stderr, "Only allowing writing via local loop back");
}

if (settings.slab_chunk_size_max > settings.item_size_max) {
fprintf(stderr, "slab_chunk_max (bytes: %d) cannot be larger than -I (item_size_max %d)\n",
Expand Down
7 changes: 5 additions & 2 deletions memcached.h
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,8 @@ struct settings {
int idle_timeout; /* Number of seconds to let connections idle */
unsigned int logger_watcher_buf_size; /* size of logger's per-watcher buffer */
unsigned int logger_buf_size; /* size of per-thread logger buffer */
bool local_write_only;
bool anonymous_reads;
};

extern struct stats stats;
Expand Down Expand Up @@ -488,6 +490,7 @@ struct conn {
char *rcurr; /** but if we parsed some already, this is where we stopped */
int rsize; /** total allocated size of rbuf */
int rbytes; /** how much data, starting from rcur, do we have unparsed */
bool local; /** connection via local loopback **/

char *wbuf;
char *wcurr;
Expand Down Expand Up @@ -597,7 +600,7 @@ enum delta_result_type do_add_delta(conn *c, const char *key,
const int64_t delta, char *buf,
uint64_t *cas, const uint32_t hv);
enum store_item_type do_store_item(item *item, int comm, conn* c, const uint32_t hv);
conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, struct event_base *base);
conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, struct event_base *base, bool local);
void conn_worker_readd(conn *c);
extern int daemonize(int nochdir, int noclose);

Expand All @@ -622,7 +625,7 @@ extern int daemonize(int nochdir, int noclose);

void memcached_thread_init(int nthreads);
void redispatch_conn(conn *c);
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport);
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport, bool local);
void sidethread_conn_close(conn *c);

/* Lock wrappers for cache functions that are called from main loop. */
Expand Down
Loading

0 comments on commit db96695

Please sign in to comment.