From 2eb445171d3b0273d27fb841d96af9f6935834d7 Mon Sep 17 00:00:00 2001 From: yzprofile Date: Fri, 1 Apr 2016 16:32:38 +0800 Subject: [PATCH] feature: support dynamic add peer to compatible with ngx_http_dyups_module --- ngx_http_upstream_check_module.c | 824 ++++++++++++++++++++++++++----- ngx_http_upstream_check_module.h | 6 +- 2 files changed, 693 insertions(+), 137 deletions(-) diff --git a/ngx_http_upstream_check_module.c b/ngx_http_upstream_check_module.c index b628436..0187f68 100644 --- a/ngx_http_upstream_check_module.c +++ b/ngx_http_upstream_check_module.c @@ -1,13 +1,12 @@ /* - * Copyright (C) 2010-2014 Weibin Yao (yaoweibin@gmail.com) - * Copyright (C) 2010-2014 Alibaba Group Holding Limited + * Copyright (C) 2010-2016 Weibin Yao (yaoweibin@gmail.com) + * Copyright (C) 2010-2016 Alibaba Group Holding Limited */ #include #include "ngx_http_upstream_check_module.h" - typedef struct ngx_http_upstream_check_peer_s ngx_http_upstream_check_peer_t; typedef struct ngx_http_upstream_check_srv_conf_s ngx_http_upstream_check_srv_conf_t; @@ -15,6 +14,10 @@ typedef struct ngx_http_upstream_check_srv_conf_s #pragma pack(push, 1) +#if (NGX_DYUPS) +#include +#endif + typedef struct { u_char major; u_char minor; @@ -52,6 +55,7 @@ typedef struct { u_char type; } ngx_ajp_raw_packet_t; + #pragma pack() @@ -69,11 +73,7 @@ typedef struct { typedef struct { ngx_shmtx_t mutex; -#if (nginx_version >= 1002000) ngx_shmtx_sh_t lock; -#else - ngx_atomic_t lock; -#endif ngx_pid_t owner; @@ -85,9 +85,14 @@ typedef struct { ngx_uint_t busyness; ngx_uint_t access_count; + ngx_uint_t checksum; + struct sockaddr *sockaddr; socklen_t socklen; + ngx_int_t ref; + ngx_uint_t delete; + ngx_atomic_t down; u_char padding[64]; @@ -98,6 +103,7 @@ typedef struct { ngx_uint_t generation; ngx_uint_t checksum; ngx_uint_t number; + ngx_uint_t max_number; /* ngx_http_upstream_check_status_peer_t */ ngx_http_upstream_check_peer_shm_t peers[1]; @@ -139,6 +145,8 @@ struct ngx_http_upstream_check_peer_s { ngx_http_upstream_check_peer_shm_t *shm; ngx_http_upstream_check_srv_conf_t *conf; + + unsigned delete; }; @@ -146,6 +154,7 @@ typedef struct { ngx_str_t check_shm_name; ngx_uint_t checksum; ngx_array_t peers; + ngx_slab_pool_t *shpool; ngx_http_upstream_check_peers_shm_t *peers_shm; } ngx_http_upstream_check_peers_t; @@ -239,6 +248,7 @@ struct ngx_http_upstream_check_srv_conf_s { ngx_array_t *fastcgi_params; ngx_uint_t default_down; + ngx_uint_t unique; }; @@ -333,7 +343,33 @@ static ngx_http_fastcgi_request_start_t ngx_http_fastcgi_request_start = { }; +#define upstream_check_index_invalid(check_ctx, index) \ + (check_ctx == NULL \ + || index >= check_ctx->peers_shm->number \ + || index >= check_ctx->peers_shm->max_number) + + +#define PEER_NORMAL 0x00 +#define PEER_DELETING 0x01 +#define PEER_DELETED 0x02 + +#if (NGX_DYUPS) +static ngx_dyups_del_upstream_filter_pt ngx_dyups_del_upstream_next_filter; +static ngx_int_t ngx_dyups_del_upstream_check_filter( + ngx_http_upstream_main_conf_t *umcf, ngx_http_upstream_srv_conf_t *uscf); +#endif + + +static ngx_uint_t ngx_http_upstream_check_add_dynamic_peer_shm( + ngx_pool_t *pool, ngx_http_upstream_check_srv_conf_t *ucscf, + ngx_addr_t *peer_addr); +static void ngx_http_upstream_check_clear_dynamic_peer_shm( + ngx_http_upstream_check_peer_shm_t *peer_shm); + static ngx_int_t ngx_http_upstream_check_add_timers(ngx_cycle_t *cycle); +static ngx_int_t ngx_http_upstream_check_add_timer( + ngx_http_upstream_check_peer_t *peer, ngx_check_conf_t *check_conf, + ngx_msec_t timer, ngx_log_t *log); static ngx_int_t ngx_http_upstream_check_peek_one_byte(ngx_connection_t *c); @@ -403,6 +439,8 @@ static void ngx_http_upstream_check_finish_handler(ngx_event_t *event); static ngx_int_t ngx_http_upstream_check_need_exit(); static void ngx_http_upstream_check_clear_all_events(); +static void ngx_http_upstream_check_clear_peer( + ngx_http_upstream_check_peer_t *peer); static ngx_int_t ngx_http_upstream_check_status_handler( ngx_http_request_t *r); @@ -454,11 +492,16 @@ static char *ngx_http_upstream_check_init_main_conf(ngx_conf_t *cf, static void *ngx_http_upstream_check_create_srv_conf(ngx_conf_t *cf); static char *ngx_http_upstream_check_init_srv_conf(ngx_conf_t *cf, void *conf); +static ngx_uint_t ngx_http_upstream_check_unique_peer( + ngx_http_upstream_check_peers_t *peers, ngx_addr_t *peer_addr, + ngx_http_upstream_check_srv_conf_t *peer_conf); + static void *ngx_http_upstream_check_create_loc_conf(ngx_conf_t *cf); static char * ngx_http_upstream_check_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child); -#define SHM_NAME_LEN 256 +#define MAX_DYNAMIC_PEER 4096 +#define SHM_NAME_LEN 256 static char *ngx_http_upstream_check_init_shm(ngx_conf_t *cf, void *conf); @@ -467,8 +510,8 @@ static ngx_int_t ngx_http_upstream_check_get_shm_name(ngx_str_t *shm_name, static ngx_shm_zone_t *ngx_shared_memory_find(ngx_cycle_t *cycle, ngx_str_t *name, void *tag); static ngx_http_upstream_check_peer_shm_t * -ngx_http_upstream_check_find_shm_peer(ngx_http_upstream_check_peers_shm_t *peers_shm, - ngx_addr_t *addr); +ngx_http_upstream_check_find_shm_peer( + ngx_http_upstream_check_peers_shm_t *peers_shm, ngx_addr_t *addr); static ngx_int_t ngx_http_upstream_check_init_shm_peer( ngx_http_upstream_check_peer_shm_t *peer_shm, @@ -762,6 +805,7 @@ ngx_uint_t ngx_http_upstream_check_add_peer(ngx_conf_t *cf, ngx_http_upstream_srv_conf_t *us, ngx_addr_t *peer_addr) { + ngx_uint_t index; ngx_http_upstream_check_peer_t *peer; ngx_http_upstream_check_peers_t *peers; ngx_http_upstream_check_srv_conf_t *ucscf; @@ -777,10 +821,25 @@ ngx_http_upstream_check_add_peer(ngx_conf_t *cf, return NGX_ERROR; } + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, cf->log, 0, + "http upstream check add upstream process: %ui", + ngx_process); + + if (ngx_process == NGX_PROCESS_WORKER) { + return ngx_http_upstream_check_add_dynamic_peer(cf->pool, us, peer_addr); + } + ucmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_upstream_check_module); peers = ucmcf->peers; + if (ucscf->unique) { + index = ngx_http_upstream_check_unique_peer(peers, peer_addr, ucscf); + if (index != (ngx_uint_t) NGX_ERROR) { + return index; + } + } + peer = ngx_array_push(&peers->peers); if (peer == NULL) { return NGX_ERROR; @@ -865,11 +924,7 @@ ngx_http_upstream_check_addr_change_port(ngx_pool_t *pool, ngx_addr_t *dst, return NGX_ERROR; } -#if (nginx_version >= 1005012) len = ngx_sock_ntop(dst->sockaddr, dst->socklen, p, len, 1); -#else - len = ngx_sock_ntop(dst->sockaddr, p, len, 1); -#endif dst->name.len = len; dst->name.data = p; @@ -878,18 +933,387 @@ ngx_http_upstream_check_addr_change_port(ngx_pool_t *pool, ngx_addr_t *dst, } +ngx_uint_t +ngx_http_upstream_check_add_dynamic_peer(ngx_pool_t *pool, + ngx_http_upstream_srv_conf_t *us, ngx_addr_t *peer_addr) +{ + void *elts; + ngx_uint_t i, index; + ngx_http_upstream_check_peer_t *peer, *p, *np; + ngx_http_upstream_check_peers_t *peers; + ngx_http_upstream_check_srv_conf_t *ucscf; + ngx_http_upstream_check_main_conf_t *ucmcf; + ngx_http_upstream_check_peer_shm_t *peer_shm; + ngx_http_upstream_check_peers_shm_t *peers_shm; + + if (check_peers_ctx == NULL || us->srv_conf == NULL) { + return NGX_ERROR; + } + + ucscf = ngx_http_conf_upstream_srv_conf(us, ngx_http_upstream_check_module); + + if(ucscf->check_interval == 0) { + return NGX_ERROR; + } + + index = ngx_http_upstream_check_add_dynamic_peer_shm(pool, + ucscf, peer_addr); + if (index == (ngx_uint_t) NGX_ERROR) { + return index; + } + + peers_shm = check_peers_ctx->peers_shm; + peer_shm = peers_shm->peers; + + ucmcf = ngx_http_cycle_get_module_main_conf(ngx_cycle, + ngx_http_upstream_check_module); + peers = ucmcf->peers; + peer = NULL; + + p = peers->peers.elts; + + ngx_log_debug2(NGX_LOG_DEBUG_HTTP, pool->log, 0, + "http upstream check add dynamic upstream: %p, n: %ui", + p, peers->peers.nelts); + + for (i = 0; i < peers->peers.nelts; i++) { + + ngx_log_debug3(NGX_LOG_DEBUG_HTTP, pool->log, 0, + "http upstream check add [%ui], index=%ui, delete:%ud", + i, p[i].index, p[i].delete); + + if (p[i].delete) { + p[i].delete = 0; + peer = &p[i]; + break; + } + } + + if (peer == NULL) { + + elts = peers->peers.elts; + + peer = ngx_array_push(&peers->peers); + if (peer == NULL) { + return NGX_ERROR; + } + + if (elts != peers->peers.elts) { + + ngx_log_error(NGX_LOG_INFO, pool->log, 0, + "http upstream check add peer realloc memory"); + + /* reset all upstream peers' timers */ + p = elts; + np = peers->peers.elts; + + for (i = 0; i < peers->peers.nelts - 1; i++) { + + if (p[i].delete) { + continue; + } + ngx_log_error(NGX_LOG_INFO, pool->log, 0, + "http upstream %V old peer: %p, new peer: %p," + "old timer: %p, new timer: %p", + np[i].upstream_name, + np[i].check_ev.data, &np[i], + &p[i].check_ev, &np[i].check_ev); + + ngx_http_upstream_check_clear_peer(&p[i]); + + ngx_memzero(&np[i].pc, sizeof(ngx_peer_connection_t)); + np[i].check_data = NULL; + np[i].pool = NULL; + + ngx_http_upstream_check_add_timer(&np[i], + np[i].conf->check_type_conf, + 0, pool->log); + } + } + } + + ngx_memzero(peer, sizeof(ngx_http_upstream_check_peer_t)); + + peer->conf = ucscf; + peer->index = index; + peer->upstream_name = &us->host; + peer->peer_addr = peer_addr; + + ngx_log_debug3(NGX_LOG_DEBUG_HTTP, pool->log, 0, + "http upstream check add dynamic upstream: %V, " + "peer: %V, index: %ui", + &us->host, &peer_addr->name, index); + + if (ucscf->port) { + peer->check_peer_addr = ngx_pcalloc(pool, sizeof(ngx_addr_t)); + if (peer->check_peer_addr == NULL) { + return NGX_ERROR; + } + + if (ngx_http_upstream_check_addr_change_port(pool, + peer->check_peer_addr, peer_addr, ucscf->port) + != NGX_OK) { + + return NGX_ERROR; + } + + } else { + peer->check_peer_addr = peer->peer_addr; + } + + peer->shm = &peer_shm[index]; + + ngx_http_upstream_check_add_timer(peer, ucscf->check_type_conf, + 0, pool->log); + + ngx_log_debug3(NGX_LOG_DEBUG_HTTP, pool->log, 0, + "http upstream check add peer: %p, index: %ui, shm->ref: %i", + peer, peer->index, peer->shm->ref); + + peers->checksum += + ngx_murmur_hash2(peer_addr->name.data, peer_addr->name.len); + + return peer->index; +} + + +void +ngx_http_upstream_check_delete_dynamic_peer(ngx_str_t *name, + ngx_addr_t *peer_addr) +{ + ngx_uint_t i; + ngx_http_upstream_check_peer_t *peer, *chosen; + ngx_http_upstream_check_peers_t *peers; + + chosen = NULL; + peers = check_peers_ctx; + peer = peers->peers.elts; + + ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0, + "http upstream check delete dynamic upstream: %p, n: %ui", + peer, peers->peers.nelts); + + ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0, + "http upstream check delete dynamic upstream: %V, " + "peer: %V", name, &peer_addr->name); + + for (i = 0; i < peers->peers.nelts; i++) { + if (peer[i].delete) { + continue; + } + + ngx_log_debug3(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0, + "http upstream check delete [%ui], index=%ui, addr:%V", + i, peer[i].index, &peer[i].peer_addr->name); + + if (peer[i].upstream_name->len != name->len + || ngx_strncmp(peer[i].upstream_name->data, + name->data, name->len) != 0) { + continue; + } + + if (peer[i].peer_addr->socklen != peer_addr->socklen + || ngx_memcmp(peer[i].peer_addr->sockaddr, peer_addr->sockaddr, + peer_addr->socklen) != 0) { + continue; + } + + chosen = &peer[i]; + break; + } + + if (chosen == NULL) { + return; + } + + ngx_log_debug3(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0, + "http upstream check delete peer: %p, index: %ui, " + "shm->ref: %i", + chosen, chosen->index, chosen->shm->ref); + + ngx_shmtx_lock(&chosen->shm->mutex); + + if (chosen->shm->owner == ngx_pid) { + chosen->shm->owner = NGX_INVALID_PID; + } + + chosen->shm->ref--; + if (chosen->shm->ref <= 0 && chosen->shm->delete != PEER_DELETED) { + ngx_http_upstream_check_clear_dynamic_peer_shm(chosen->shm); + chosen->shm->delete = PEER_DELETED; + } + ngx_shmtx_unlock(&chosen->shm->mutex); + + ngx_http_upstream_check_clear_peer(chosen); +} + + +static ngx_uint_t +ngx_http_upstream_check_add_dynamic_peer_shm(ngx_pool_t *pool, + ngx_http_upstream_check_srv_conf_t *ucscf, ngx_addr_t *peer_addr) +{ + ngx_int_t rc; + ngx_uint_t i, index; + ngx_slab_pool_t *shpool; + ngx_http_upstream_check_peer_shm_t *peer_shm; + ngx_http_upstream_check_peers_shm_t *peers_shm; + + if (check_peers_ctx == NULL) { + return NGX_ERROR; + } + + shpool = check_peers_ctx->shpool; + peers_shm = check_peers_ctx->peers_shm; + peer_shm = peers_shm->peers; + index = NGX_ERROR; + + ngx_shmtx_lock(&shpool->mutex); + + for (i = 0; i < peers_shm->number; i++) { + + /* TODO: lock the peer mutex */ + if (peer_shm[i].delete == PEER_DELETED) { + continue; + } + + /* TODO: check the peer configure */ + /* Merge the duplicate peer */ + /* check the peer configure by check_type and check_send */ + if (peer_addr->socklen == peer_shm[i].socklen + && ngx_memcmp(peer_addr->sockaddr, peer_shm[i].sockaddr, + peer_addr->socklen) == 0 + && peer_shm[i].checksum + == ngx_murmur_hash2(ucscf->send.data, ucscf->send.len)) + { + ngx_shmtx_unlock(&shpool->mutex); + return i; + } + } + + for (i = 0; i < peers_shm->number; i++) { + + if (peer_shm[i].delete == PEER_DELETED) { + peer_shm[i].delete = PEER_NORMAL; + index = i; + break; + } + } + + if (index == (ngx_uint_t) NGX_ERROR) { + if (peers_shm->number >= peers_shm->max_number) { + goto fail; + } + + index = peers_shm->number++; + } + + ngx_memzero(&peer_shm[index], sizeof(ngx_http_upstream_check_peer_shm_t)); + + peer_shm[index].socklen = peer_addr->socklen; + peer_shm[index].sockaddr = ngx_slab_alloc_locked(shpool, + peer_shm->socklen); + if (peer_shm[index].sockaddr == NULL) { + goto fail; + } + + ngx_memcpy(peer_shm[index].sockaddr, peer_addr->sockaddr, + peer_addr->socklen); + + rc = ngx_http_upstream_check_init_shm_peer(&peer_shm[index], NULL, + ucscf->default_down, pool, + &peer_addr->name); + if (rc != NGX_OK) { + goto fail; + } + + /* Set tag to peer_shm */ + peer_shm[index].checksum = ngx_murmur_hash2(ucscf->send.data, ucscf->send.len); + + ngx_shmtx_unlock(&shpool->mutex); + return index; + +fail: + + ngx_shmtx_unlock(&shpool->mutex); + return NGX_ERROR; +} + + +static void +ngx_http_upstream_check_clear_dynamic_peer_shm( + ngx_http_upstream_check_peer_shm_t *peer_shm) +{ + if (check_peers_ctx == NULL) { + return; + } + + ngx_slab_free_locked(check_peers_ctx->shpool, peer_shm->sockaddr); +} + + + +static ngx_uint_t +ngx_http_upstream_check_unique_peer(ngx_http_upstream_check_peers_t *peers, + ngx_addr_t *peer_addr, ngx_http_upstream_check_srv_conf_t *peer_conf) +{ + ngx_uint_t i; + ngx_http_upstream_check_peer_t *peer; + ngx_http_upstream_check_srv_conf_t *opeer_conf; + + peer = peers->peers.elts; + for (i = 0; i < peers->peers.nelts; i++) { + + if (peer[i].delete) { + continue; + } + + if (peer[i].peer_addr->socklen != peer_addr->socklen) { + continue; + } + + if (ngx_memcmp(peer[i].peer_addr->sockaddr, + peer_addr->sockaddr, peer_addr->socklen) != 0) { + continue; + } + + opeer_conf = peer[i].conf; + + if (opeer_conf->check_type_conf != peer_conf->check_type_conf) { + continue; + } + + if (opeer_conf->send.len != peer_conf->send.len) { + continue; + } + + if (ngx_strncmp(opeer_conf->send.data, + peer_conf->send.data, peer_conf->send.len) != 0) { + continue; + } + + if (opeer_conf->code.status_alive != peer_conf->code.status_alive) { + continue; + } + + return i; + } + + return NGX_ERROR; +} + + ngx_uint_t ngx_http_upstream_check_peer_down(ngx_uint_t index) { - ngx_http_upstream_check_peer_t *peer; + ngx_http_upstream_check_peer_shm_t *peer_shm; - if (check_peers_ctx == NULL || index >= check_peers_ctx->peers.nelts) { + if (upstream_check_index_invalid(check_peers_ctx, index)) { return 0; } - peer = check_peers_ctx->peers.elts; + peer_shm = check_peers_ctx->peers_shm->peers; - return (peer[index].shm->down); + return (peer_shm[index].down); } @@ -899,7 +1323,7 @@ ngx_http_upstream_check_get_peer(ngx_uint_t index) { ngx_http_upstream_check_peer_t *peer; - if (check_peers_ctx == NULL || index >= check_peers_ctx->peers.nelts) { + if (upstream_check_index_invalid(check_peers_ctx, index)) { return; } @@ -919,7 +1343,7 @@ ngx_http_upstream_check_free_peer(ngx_uint_t index) { ngx_http_upstream_check_peer_t *peer; - if (check_peers_ctx == NULL || index >= check_peers_ctx->peers.nelts) { + if (upstream_check_index_invalid(check_peers_ctx, index)) { return; } @@ -940,7 +1364,6 @@ ngx_http_upstream_check_add_timers(ngx_cycle_t *cycle) { ngx_uint_t i; ngx_msec_t t, delay; - ngx_check_conf_t *cf; ngx_http_upstream_check_peer_t *peer; ngx_http_upstream_check_peers_t *peers; ngx_http_upstream_check_srv_conf_t *ucscf; @@ -969,35 +1392,8 @@ ngx_http_upstream_check_add_timers(ngx_cycle_t *cycle) peer_shm = peers_shm->peers; for (i = 0; i < peers->peers.nelts; i++) { - peer[i].shm = &peer_shm[i]; - - peer[i].check_ev.handler = ngx_http_upstream_check_begin_handler; - peer[i].check_ev.log = cycle->log; - peer[i].check_ev.data = &peer[i]; - peer[i].check_ev.timer_set = 0; - - peer[i].check_timeout_ev.handler = - ngx_http_upstream_check_timeout_handler; - peer[i].check_timeout_ev.log = cycle->log; - peer[i].check_timeout_ev.data = &peer[i]; - peer[i].check_timeout_ev.timer_set = 0; ucscf = peer[i].conf; - cf = ucscf->check_type_conf; - - if (cf->need_pool) { - peer[i].pool = ngx_create_pool(ngx_pagesize, cycle->log); - if (peer[i].pool == NULL) { - return NGX_ERROR; - } - } - - peer[i].send_handler = cf->send_handler; - peer[i].recv_handler = cf->recv_handler; - - peer[i].init = cf->init; - peer[i].parse = cf->parse; - peer[i].reinit = cf->reinit; /* * We add a random start time here, since we don't want to trigger @@ -1006,9 +1402,50 @@ ngx_http_upstream_check_add_timers(ngx_cycle_t *cycle) delay = ucscf->check_interval > 1000 ? ucscf->check_interval : 1000; t = ngx_random() % delay; - ngx_add_timer(&peer[i].check_ev, t); + peer[i].shm = &peer_shm[i]; + + ngx_http_upstream_check_add_timer(&peer[i], ucscf->check_type_conf, t, cycle->log); + + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_http_upstream_check_add_timer(ngx_http_upstream_check_peer_t *peer, + ngx_check_conf_t *check_conf, ngx_msec_t timer, ngx_log_t *log) +{ + peer->check_ev.handler = ngx_http_upstream_check_begin_handler; + peer->check_ev.log = log; + peer->check_ev.data = peer; + peer->check_ev.timer_set = 0; + + peer->check_timeout_ev.handler = + ngx_http_upstream_check_timeout_handler; + peer->check_timeout_ev.log = log; + peer->check_timeout_ev.data = peer; + peer->check_timeout_ev.timer_set = 0; + + if (check_conf->need_pool) { + peer->pool = ngx_create_pool(ngx_pagesize, log); + if (peer->pool == NULL) { + return NGX_ERROR; + } } + peer->send_handler = check_conf->send_handler; + peer->recv_handler = check_conf->recv_handler; + + peer->init = check_conf->init; + peer->parse = check_conf->parse; + peer->reinit = check_conf->reinit; + + ngx_add_timer(&peer->check_ev, timer); + + /* TODO: lock */ + peer->shm->ref++; + return NGX_OK; } @@ -1018,7 +1455,6 @@ ngx_http_upstream_check_begin_handler(ngx_event_t *event) { ngx_msec_t interval; ngx_http_upstream_check_peer_t *peer; - ngx_http_upstream_check_peers_t *peers; ngx_http_upstream_check_srv_conf_t *ucscf; ngx_http_upstream_check_peers_shm_t *peers_shm; @@ -1026,25 +1462,19 @@ ngx_http_upstream_check_begin_handler(ngx_event_t *event) return; } - peers = check_peers_ctx; - if (peers == NULL) { - return; - } - - peers_shm = peers->peers_shm; - if (peers_shm == NULL) { + if (check_peers_ctx == NULL) { return; } + peers_shm = check_peers_ctx->peers_shm; peer = event->data; ucscf = peer->conf; ngx_add_timer(event, ucscf->check_interval / 2); /* This process is processing this peer now. */ - if ((peer->shm->owner == ngx_pid || - (peer->pc.connection != NULL) || - peer->check_timeout_ev.timer_set)) { + if (peer->shm->owner == ngx_pid || + peer->check_timeout_ev.timer_set) { return; } @@ -1064,8 +1494,8 @@ ngx_http_upstream_check_begin_handler(ngx_event_t *event) } if ((interval >= ucscf->check_interval) - && (peer->shm->owner == NGX_INVALID_PID)) - { + && (peer->shm->owner == NGX_INVALID_PID)) { + peer->shm->owner = ngx_pid; } else if (interval >= (ucscf->check_interval << 4)) { @@ -1074,8 +1504,8 @@ ngx_http_upstream_check_begin_handler(ngx_event_t *event) * If the check peer has been untouched for 2^4 times of * the check interval, activate the current timer. * Sometimes, the checking process may disappear - * in some circumstances, and the clean event will never - * be triggered. + * under some abnormal circumstances, and the clean event + * will never be triggered. */ peer->shm->owner = ngx_pid; peer->shm->access_time = ngx_current_msec; @@ -1319,6 +1749,10 @@ ngx_http_upstream_check_send_handler(ngx_event_t *event) ctx = peer->check_data; + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, + "http check send total: %z", + ctx->send.last - ctx->send.pos); + while (ctx->send.pos < ctx->send.last) { size = c->send(c, ctx->send.pos, ctx->send.last - ctx->send.pos); @@ -1334,9 +1768,9 @@ ngx_http_upstream_check_send_handler(ngx_event_t *event) } #endif - if (size > 0) { + if (size >= 0) { ctx->send.pos += size; - } else if (size == 0 || size == NGX_AGAIN) { + } else if (size == NGX_AGAIN) { return; } else { c->error = 1; @@ -1387,7 +1821,7 @@ ngx_http_upstream_check_recv_handler(ngx_event_t *event) ctx = peer->check_data; if (ctx->recv.start == NULL) { - /* 1/2 of the page_size, is it enough? */ + /* half of the page_size, is it enough? */ ctx->recv.start = ngx_palloc(c->pool, ngx_pagesize / 2); if (ctx->recv.start == NULL) { goto check_recv_fail; @@ -1451,12 +1885,6 @@ ngx_http_upstream_check_recv_handler(ngx_event_t *event) case NGX_AGAIN: /* The peer has closed its half side of the connection. */ - if (size == 0) { - ngx_http_upstream_check_status_update(peer, 0); - c->error = 1; - break; - } - return; case NGX_ERROR: @@ -1481,6 +1909,7 @@ ngx_http_upstream_check_recv_handler(ngx_event_t *event) return; check_recv_fail: + ngx_http_upstream_check_status_update(peer, 0); ngx_http_upstream_check_clean_event(peer); } @@ -2508,6 +2937,14 @@ ngx_http_upstream_check_status_update(ngx_http_upstream_check_peer_t *peer, ucscf = peer->conf; + ngx_shmtx_lock(&peer->shm->mutex); + + if (peer->shm->delete == PEER_DELETED) { + + ngx_shmtx_unlock(&peer->shm->mutex); + return; + } + if (result) { peer->shm->rise_count++; peer->shm->fall_count = 0; @@ -2529,6 +2966,8 @@ ngx_http_upstream_check_status_update(ngx_http_upstream_check_peer_t *peer, } peer->shm->access_time = ngx_current_msec; + + ngx_shmtx_unlock(&peer->shm->mutex); } @@ -2619,7 +3058,6 @@ static void ngx_http_upstream_check_clear_all_events() { ngx_uint_t i; - ngx_connection_t *c; ngx_http_upstream_check_peer_t *peer; ngx_http_upstream_check_peers_t *peers; @@ -2638,26 +3076,45 @@ ngx_http_upstream_check_clear_all_events() peer = peers->peers.elts; for (i = 0; i < peers->peers.nelts; i++) { - - if (peer[i].check_ev.timer_set) { - ngx_del_timer(&peer[i].check_ev); + if (peer[i].delete) { + continue; } - if (peer[i].check_timeout_ev.timer_set) { - ngx_del_timer(&peer[i].check_timeout_ev); - } + ngx_http_upstream_check_clear_peer(&peer[i]); + } +} - c = peer[i].pc.connection; - if (c) { - ngx_close_connection(c); - peer[i].pc.connection = NULL; - } - if (peer[i].pool != NULL) { - ngx_destroy_pool(peer[i].pool); - peer[i].pool = NULL; - } +static void +ngx_http_upstream_check_clear_peer(ngx_http_upstream_check_peer_t *peer) +{ + if (peer != peer->check_ev.data) { + ngx_log_error(NGX_LOG_CRIT, ngx_cycle->log, 0, + "different peer: %p, data: %p, timer: %p", + peer, peer->check_ev.data, &peer->check_ev); + } + + if (peer->pc.connection) { + ngx_close_connection(peer->pc.connection); + peer->pc.connection = NULL; + } + + if (peer->check_ev.timer_set) { + ngx_del_timer(&peer->check_ev); + } + + if (peer->check_timeout_ev.timer_set) { + ngx_del_timer(&peer->check_timeout_ev); + } + + if (peer->pool != NULL) { + ngx_destroy_pool(peer->pool); + peer->pool = NULL; } + + ngx_memzero(peer, sizeof(ngx_http_upstream_check_peer_t)); + + peer->delete = 1; } @@ -2695,7 +3152,9 @@ ngx_http_upstream_check_status_handler(ngx_http_request_t *r) ctx->format = uclcf->format; } + r->headers_out.content_type_len = ctx->format->content_type.len; r->headers_out.content_type = ctx->format->content_type; + r->headers_out.content_type_lowcase = NULL; if (r->method == NGX_HTTP_HEAD) { r->headers_out.status = NGX_HTTP_OK; @@ -2809,7 +3268,7 @@ ngx_http_upstream_check_status_command_status( } else if (value->len == (sizeof("up") - 1) && ngx_strncasecmp(value->data, (u_char *) "up", value->len) - == 0) { + == 0) { ctx->flag |= NGX_CHECK_STATUS_UP; @@ -2832,8 +3291,13 @@ ngx_http_upstream_check_status_html_format(ngx_buf_t *b, count = 0; + /* TODO: two locks */ for (i = 0; i < peers->peers.nelts; i++) { + if (peer[i].delete) { + continue; + } + if (flag & NGX_CHECK_STATUS_DOWN) { if (!peer[i].shm->down) { @@ -2876,6 +3340,10 @@ ngx_http_upstream_check_status_html_format(ngx_buf_t *b, for (i = 0; i < peers->peers.nelts; i++) { + if (peer[i].delete) { + continue; + } + if (flag & NGX_CHECK_STATUS_DOWN) { if (!peer[i].shm->down) { @@ -2928,6 +3396,10 @@ ngx_http_upstream_check_status_csv_format(ngx_buf_t *b, peer = peers->peers.elts; for (i = 0; i < peers->peers.nelts; i++) { + if (peer[i].delete) { + continue; + } + if (flag & NGX_CHECK_STATUS_DOWN) { if (!peer[i].shm->down) { @@ -2968,6 +3440,10 @@ ngx_http_upstream_check_status_json_format(ngx_buf_t *b, for (i = 0; i < peers->peers.nelts; i++) { + if (peer[i].delete) { + continue; + } + if (flag & NGX_CHECK_STATUS_DOWN) { if (!peer[i].shm->down) { @@ -2995,6 +3471,10 @@ ngx_http_upstream_check_status_json_format(ngx_buf_t *b, last = peers->peers.nelts - 1; for (i = 0; i < peers->peers.nelts; i++) { + if (peer[i].delete) { + continue; + } + if (flag & NGX_CHECK_STATUS_DOWN) { if (!peer[i].shm->down) { @@ -3067,8 +3547,9 @@ static char * ngx_http_upstream_check(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { ngx_str_t *value, s; - ngx_uint_t i, port, rise, fall, default_down; + ngx_uint_t i, port, rise, fall, default_down, unique; ngx_msec_t interval, timeout; + ngx_check_conf_t *check; ngx_http_upstream_check_srv_conf_t *ucscf; /* default values */ @@ -3078,6 +3559,7 @@ ngx_http_upstream_check(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) interval = 30000; timeout = 1000; default_down = 1; + unique = 0; value = cf->args->elts; @@ -3181,6 +3663,25 @@ ngx_http_upstream_check(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) continue; } + if (ngx_strncmp(value[i].data, "unique=", 7) == 0) { + s.len = value[i].len - 7; + s.data = value[i].data + 7; + + if (ngx_strcasecmp(s.data, (u_char *) "true") == 0) { + unique = 1; + } else if (ngx_strcasecmp(s.data, (u_char *) "false") == 0) { + unique = 0; + } else { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid value \"%s\", " + "it must be \"true\" or \"false\"", + value[i].data); + return NGX_CONF_ERROR; + } + + continue; + } + goto invalid_check_parameter; } @@ -3190,12 +3691,24 @@ ngx_http_upstream_check(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) ucscf->fall_count = fall; ucscf->rise_count = rise; ucscf->default_down = default_down; + ucscf->unique = unique; if (ucscf->check_type_conf == NGX_CONF_UNSET_PTR) { ngx_str_set(&s, "tcp"); ucscf->check_type_conf = ngx_http_get_check_type_conf(&s); } + check = ucscf->check_type_conf; + + if (ucscf->send.len == 0) { + ucscf->send.data = check->default_send.data; + ucscf->send.len = check->default_send.len; + } + + if (ucscf->code.status_alive == 0) { + ucscf->code.status_alive = check->default_status_alive; + } + return NGX_CONF_OK; invalid_check_parameter: @@ -3243,6 +3756,23 @@ ngx_http_upstream_check_http_send(ngx_conf_t *cf, ngx_command_t *cmd, ucscf = ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_check_module); + if (ucscf->check_type_conf == NGX_CONF_UNSET_PTR) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid check_http_send should set [check] first"); + return NGX_CONF_ERROR; + } + + if (value[1].len + && (ucscf->check_type_conf->name.len != 4 + || ngx_strncmp(ucscf->check_type_conf->name.data, + "http", 4) != 0)) + { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid check_http_send for type \"%V\"", + &ucscf->check_type_conf->name); + return NGX_CONF_ERROR; + } + ucscf->send = value[1]; return NGX_CONF_OK; @@ -3292,7 +3822,7 @@ ngx_http_upstream_check_http_expect_alive(ngx_conf_t *cf, ngx_command_t *cmd, ucscf = ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_check_module); - bit = ucscf->code.status_alive; + bit = 0; for (i = 1; i < cf->args->nelts; i++) { for (m = 0; mask[m].name.len != 0; m++) { @@ -3425,11 +3955,21 @@ ngx_http_upstream_check_create_main_conf(ngx_conf_t *cf) ucmcf->peers->checksum = 0; - if (ngx_array_init(&ucmcf->peers->peers, cf->pool, 16, +#if (NGX_DEBUG) + + if (ngx_array_init(&ucmcf->peers->peers, cf->pool, 1, + sizeof(ngx_http_upstream_check_peer_t)) != NGX_OK) + { + return NULL; + } + +#else + if (ngx_array_init(&ucmcf->peers->peers, cf->pool, 1024, sizeof(ngx_http_upstream_check_peer_t)) != NGX_OK) { return NULL; } +#endif return ucmcf; } @@ -3571,6 +4111,11 @@ ngx_http_upstream_check_init_main_conf(ngx_conf_t *cf, void *conf) } } +#if (NGX_DYUPS) + ngx_dyups_del_upstream_next_filter = ngx_dyups_del_upstream_top_filter; + ngx_dyups_del_upstream_top_filter = ngx_dyups_del_upstream_check_filter; +#endif + return ngx_http_upstream_check_init_shm(cf, conf); } @@ -3594,7 +4139,7 @@ ngx_http_upstream_check_create_srv_conf(ngx_conf_t *cf) ucscf->fall_count = NGX_CONF_UNSET_UINT; ucscf->rise_count = NGX_CONF_UNSET_UINT; ucscf->check_timeout = NGX_CONF_UNSET_MSEC; - ucscf->check_keepalive_requests = NGX_CONF_UNSET_UINT; + ucscf->check_keepalive_requests = 1; ucscf->check_type_conf = NGX_CONF_UNSET_PTR; return ucscf; @@ -3722,36 +4267,30 @@ ngx_http_upstream_check_init_shm(ngx_conf_t *cf, void *conf) ngx_shm_zone_t *shm_zone; ngx_http_upstream_check_main_conf_t *ucmcf = conf; - if (ucmcf->peers->peers.nelts > 0) { - - ngx_http_upstream_check_shm_generation++; + ngx_http_upstream_check_shm_generation++; - shm_name = &ucmcf->peers->check_shm_name; + shm_name = &ucmcf->peers->check_shm_name; - ngx_http_upstream_check_get_shm_name(shm_name, cf->pool, - ngx_http_upstream_check_shm_generation); + ngx_http_upstream_check_get_shm_name(shm_name, cf->pool, + ngx_http_upstream_check_shm_generation); - /* The default check shared memory size is 1M */ - shm_size = 1 * 1024 * 1024; + /* The default check shared memory size is 1M */ + shm_size = 1 * 1024 * 1024; - shm_size = shm_size < ucmcf->check_shm_size ? - ucmcf->check_shm_size : shm_size; + shm_size = shm_size < ucmcf->check_shm_size ? + ucmcf->check_shm_size : shm_size; - shm_zone = ngx_shared_memory_add(cf, shm_name, shm_size, - &ngx_http_upstream_check_module); + shm_zone = ngx_shared_memory_add(cf, shm_name, shm_size, + &ngx_http_upstream_check_module); - ngx_log_debug2(NGX_LOG_DEBUG_HTTP, cf->log, 0, - "http upstream check, upsteam:%V, shm_zone size:%ui", - shm_name, shm_size); + ngx_log_debug2(NGX_LOG_DEBUG_HTTP, cf->log, 0, + "http upstream check, upsteam:%V, shm_zone size:%ui", + shm_name, shm_size); - shm_zone->data = cf->pool; - check_peers_ctx = ucmcf->peers; + shm_zone->data = cf->pool; + check_peers_ctx = ucmcf->peers; - shm_zone->init = ngx_http_upstream_check_init_shm_zone; - } - else { - check_peers_ctx = NULL; - } + shm_zone->init = ngx_http_upstream_check_init_shm_zone; return NGX_CONF_OK; } @@ -3795,7 +4334,7 @@ ngx_http_upstream_check_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) opeers_shm = NULL; peers_shm = NULL; - ngx_str_null(&oshm_name); + ngx_str_set(&oshm_name, ""); same = 0; peers = check_peers_ctx; @@ -3804,9 +4343,6 @@ ngx_http_upstream_check_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) } number = peers->peers.nelts; - if (number == 0) { - return NGX_OK; - } pool = shm_zone->data; if (pool == NULL) { @@ -3849,7 +4385,7 @@ ngx_http_upstream_check_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) } size = sizeof(*peers_shm) + - (number - 1) * sizeof(ngx_http_upstream_check_peer_shm_t); + (number - 1 + MAX_DYNAMIC_PEER) * sizeof(ngx_http_upstream_check_peer_shm_t); peers_shm = ngx_slab_alloc(shpool, size); @@ -3863,6 +4399,7 @@ ngx_http_upstream_check_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) peers_shm->generation = ngx_http_upstream_check_shm_generation; peers_shm->checksum = peers->checksum; peers_shm->number = number; + peers_shm->max_number = number + MAX_DYNAMIC_PEER; peer = peers->peers.elts; @@ -3870,13 +4407,6 @@ ngx_http_upstream_check_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) peer_shm = &peers_shm->peers[i]; - /* - * This function may be triggered before the old stale - * work process exits. The owner may stick to the old - * pid. - */ - peer_shm->owner = NGX_INVALID_PID; - if (same) { continue; } @@ -3893,7 +4423,7 @@ ngx_http_upstream_check_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) if (opeers_shm) { opeer_shm = ngx_http_upstream_check_find_shm_peer(opeers_shm, - peer[i].peer_addr); + peer[i].peer_addr); if (opeer_shm) { ngx_log_debug1(NGX_LOG_DEBUG_HTTP, shm_zone->shm.log, 0, "http upstream check, inherit opeer: %V ", @@ -3918,6 +4448,7 @@ ngx_http_upstream_check_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) } } + peers->shpool = shpool; peers->peers_shm = peers_shm; shm_zone->data = peers_shm; @@ -3956,7 +4487,8 @@ ngx_shared_memory_find(ngx_cycle_t *cycle, ngx_str_t *name, void *tag) continue; } - if (ngx_strncmp(name->data, shm_zone[i].shm.name.data, name->len) != 0) + if (ngx_strncmp(name->data, shm_zone[i].shm.name.data, name->len) + != 0) { continue; } @@ -3987,7 +4519,8 @@ ngx_http_upstream_check_find_shm_peer(ngx_http_upstream_check_peers_shm_t *p, continue; } - if (ngx_memcmp(addr->sockaddr, peer_shm->sockaddr, addr->socklen) == 0) { + if (ngx_memcmp(addr->sockaddr, peer_shm->sockaddr, + addr->socklen) == 0) { return peer_shm; } } @@ -4013,7 +4546,7 @@ ngx_http_upstream_check_init_shm_peer(ngx_http_upstream_check_peer_shm_t *psh, psh->down = opsh->down; - } else { + } else{ psh->access_time = 0; psh->access_count = 0; @@ -4024,6 +4557,8 @@ ngx_http_upstream_check_init_shm_peer(ngx_http_upstream_check_peer_shm_t *psh, psh->down = init_down; } + psh->owner = NGX_INVALID_PID; + #if (NGX_HAVE_ATOMIC_OPS) file = NULL; @@ -4039,11 +4574,7 @@ ngx_http_upstream_check_init_shm_peer(ngx_http_upstream_check_peer_shm_t *psh, #endif -#if (nginx_version >= 1002000) if (ngx_shmtx_create(&psh->mutex, &psh->lock, file) != NGX_OK) { -#else - if (ngx_shmtx_create(&psh->mutex, (void *) &psh->lock, file) != NGX_OK) { -#endif return NGX_ERROR; } @@ -4056,3 +4587,24 @@ ngx_http_upstream_check_init_process(ngx_cycle_t *cycle) { return ngx_http_upstream_check_add_timers(cycle); } + + +#if (NGX_DYUPS) +static ngx_int_t +ngx_dyups_del_upstream_check_filter( + ngx_http_upstream_main_conf_t *umcf, ngx_http_upstream_srv_conf_t *uscf) +{ + ngx_uint_t i; + ngx_http_upstream_server_t *us; + + us = uscf->servers->elts; + for (i = 0; i < uscf->servers->nelts; i++) { + if (us[i].addrs) { + ngx_http_upstream_check_delete_dynamic_peer(&uscf->host, + us[i].addrs); + } + } + + return ngx_dyups_del_upstream_next_filter(umcf, uscf); +} +#endif diff --git a/ngx_http_upstream_check_module.h b/ngx_http_upstream_check_module.h index b08b396..66e51ba 100644 --- a/ngx_http_upstream_check_module.h +++ b/ngx_http_upstream_check_module.h @@ -14,6 +14,10 @@ ngx_uint_t ngx_http_upstream_check_peer_down(ngx_uint_t index); void ngx_http_upstream_check_get_peer(ngx_uint_t index); void ngx_http_upstream_check_free_peer(ngx_uint_t index); +ngx_uint_t ngx_http_upstream_check_add_dynamic_peer(ngx_pool_t *pool, + ngx_http_upstream_srv_conf_t *us, ngx_addr_t *peer); +void ngx_http_upstream_check_delete_dynamic_peer(ngx_str_t *name, + ngx_addr_t *peer_addr); -#endif //_NGX_HTTP_UPSTREAM_CHECK_MODELE_H_INCLUDED_ +#endif //_NGX_HTTP_UPSTREAM_CHECK_MODELE_H_INCLUDED_