Skip to content

Commit

Permalink
Checking connection's idle and dead status in connection pool module.
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengshuxin committed Aug 15, 2024
1 parent 0835d76 commit 1bc8785
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 15 deletions.
9 changes: 9 additions & 0 deletions lib_acl_cpp/include/acl_cpp/connpool/connect_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ class ACL_CPP_API connect_client : public noncopyable {
*/
virtual bool open() = 0;

/**
* 虚函数,子类可实现本方法,用来表明当前连接是否正常,以方便连接池对象在检测
* 连接存活性时自动关闭断开的连接
* @return {bool}
*/
virtual bool alive() {
return true;
}

/**
* 获得连接池对象引用,在 connect_pool 内部创建
* 连接对象会调用 set_pool 设置连接池对象句柄
Expand Down
13 changes: 12 additions & 1 deletion lib_acl_cpp/include/acl_cpp/connpool/connect_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,20 @@ class ACL_CPP_API connect_manager : public noncopyable {
* 检测连接池中的空闲连接,将过期的连接释放掉
* @param step {size_t} 每次检测连接池的个数
* @param left {size_t*} 非空时,将存储所有剩余连接个数总和
* @param min {size_t} 希望每个连接池保持的最小连接数
* @param kick_dead {book} 是否需要自动删除连接池中的异常连接
* @return {size_t} 被释放的空闲连接数
*/
size_t check_idle(size_t step, size_t* left = NULL);
size_t check_idle(size_t step, size_t* left = NULL,
size_t min = 0, bool kick_dead = false);

/**
* 检测连接池中的异常连接并关闭
* @param step {size_t} 每次检测连接池的个数
* @param left {size_t*} 非空时,将存储所有剩余连接个数总和
* @return {size_t} 被释放的连接数
*/
size_t check_dead(size_t step, size_t* left = NULL);

/**
* 获得连接池集合中连接池对象的个数
Expand Down
62 changes: 61 additions & 1 deletion lib_acl_cpp/include/acl_cpp/connpool/connect_monitor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ class ACL_CPP_API connect_monitor : public thread {
/**
* 构造函数
* @param manager {connect_manager&}
* @param check_server {bool} 是否检查服务端服务可用性并将不用的服务地址置入黑名单中
*/
connect_monitor(connect_manager& manager);
connect_monitor(connect_manager& manager, bool check_server = false);

virtual ~connect_monitor();

Expand Down Expand Up @@ -49,6 +50,58 @@ class ACL_CPP_API connect_monitor : public thread {
*/
connect_monitor& set_conn_timeout(int n);

/**
* 是否检查服务端服务可用性并将不用的服务地址置入黑名单中
* @return {bool}
*/
bool is_check_server() const {
return check_server_;
}

/**
* 设置是否在连接检测器启动后自动关闭过期的空闲连接
* @param on {bool} 是否自动关闭过期的空闲连接
* @param kick_dead {bool} 是否检查所有连接的存活状态并关闭异常连接,当该参数
* 为 true 时,connect_client 的子类必须重载 alive() 虚方法,返回连接是否存活
* @param conns_min {size_t} > 0 表示尽量维持每个连接池中的最小活跃连接数
* @param step {bool} 每次检测连接池个数
* @return {connect_monitor&}
*/
connect_monitor& set_check_idle(bool on, bool kick_dead = false,
size_t conns_min = 0, size_t step = 1);

/**
* 是否自动检测并关闭过期空闲连接
* @return {bool}
*/
bool check_idle_on() const {
return check_idle_on_;
}

/**
* 是否需要检查异常连接并关闭
* @return {bool}
*/
bool is_kick_dead() const {
return kick_dead_;
}

/**
* 获得每个连接池希望保持的最小连接数
* @return {size_t}
*/
size_t get_conns_mininal() const {
return conns_min_;
}

/**
* 当 check_idle_on() 返回 true 时,返回每次检测的连接数量限制
* @return {size_t}
*/
size_t get_check_idle_step() const {
return check_idle_step_;
}

/**
* 停止检测线程
* @param graceful {bool} 是否文明地关闭检测过程,如果为 true
Expand Down Expand Up @@ -139,8 +192,15 @@ class ACL_CPP_API connect_monitor : public thread {
bool stop_graceful_;
aio_handle handle_; // 后台检测线程的非阻塞句柄
connect_manager& manager_; // 连接池集合管理对象
bool check_server_; // 是否检查服务端可用性
int check_inter_; // 检测连接池状态的时间间隔(秒)
int conn_timeout_; // 连接服务器的超时时间
bool check_idle_on_; // 是否检测并关闭过期空闲连接
bool kick_dead_; // 是否删除异常连接
size_t conns_min_; // 希望每个连接池的最小连接数
size_t check_idle_step_; // 每次检测连接数限制
bool check_dead_on_; // 是否检测并关闭断开的连接
size_t check_dead_step_; // 每次检测异常连接的数量限制
rpc_service* rpc_service_; // 异步 RPC 通信服务句柄
};

Expand Down
17 changes: 16 additions & 1 deletion lib_acl_cpp/include/acl_cpp/connpool/connect_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,24 @@ class ACL_CPP_API connect_pool : public noncopyable {
* 检查连接池中空闲的连接,将过期的连接释放掉
* @param ttl {time_t} 空闲时间间隔超过此值的连接将被释放
* @param exclusive {bool} 内部是否需要加锁
* @param kick_dead {bool} 是否自动检测死连接并关闭之
* @return {int} 被释放的空闲连接个数
*/
int check_idle(time_t ttl, bool exclusive = true);
int check_idle(time_t ttl, bool exclusive = true, bool kick_dead = false);

/**
* 检测连接状态,并关闭断开连接
* @param exclusive {bool} 内部是否需要加锁
* @return {size_t} 被关闭的连接个数
*/
size_t check_dead(bool exclusive = true);

/**
* 保持最小活跃连接数
* @param min {size_t} 该值 > 0 表示希望连接池中最小的活跃连接数
* @return {size_t} 返回实际的连接数
*/
size_t keep_minimal(size_t min);

/**
* 设置连接池的存活状态
Expand Down
6 changes: 4 additions & 2 deletions lib_acl_cpp/src/connpool/check_rpc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ class check_rpc : public rpc_request {

protected:
// 子线程处理函数
virtual void rpc_run();
// @override
void rpc_run();

// 主线程处理过程,收到子线程任务完成的消息
virtual void rpc_onover();
// @override
void rpc_onover();

private:
connect_monitor& monitor_;
Expand Down
12 changes: 12 additions & 0 deletions lib_acl_cpp/src/connpool/check_timer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ void check_timer::timer_callback(unsigned int id)

connect_manager& manager = monitor_.get_manager();

// 自动检测并关闭过期空闲长连接
if (monitor_.check_idle_on()) {
manager.check_idle(monitor_.get_check_idle_step(), NULL,
monitor_.get_conns_mininal(), monitor_.is_kick_dead());
}

if (!monitor_.is_check_server()) {
return;
}

// 开始收集并检测服务端服务可用性,并将不可用服务地址加入黑名单中

// 先提取所有服务器地址,因为要操作的对象处于多线程环境中,
// 所以需要进行互斥锁保护
manager.lock();
Expand Down
49 changes: 44 additions & 5 deletions lib_acl_cpp/src/connpool/connect_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,47 @@ connect_pool* connect_manager::get(const char* addr,

//////////////////////////////////////////////////////////////////////////

size_t connect_manager::check_idle(size_t step, size_t* left /* = NULL */)
size_t connect_manager::check_idle(size_t step, size_t* left /* NULL */,
size_t min /* 0 */, bool kick_dead /* false */)
{
if (step == 0) {
step = 1;
size_t nleft = 0, nfreed = 0, pools_size, check_max, check_pos;
unsigned long id = get_id();

lock_guard guard(lock_);

conns_pools& pools = get_pools_by_id(id);
pools_size = pools.pools.size();
if (pools_size == 0) {
return 0;
}

if (step == 0 || step > pools_size) {
step = pools_size;
}

check_pos = pools.check_next++ % pools_size;
check_max = check_pos + step;

while (check_pos < pools_size && check_pos < check_max) {
connect_pool* pool = pools.pools[check_pos++ % pools_size];
int ret = pool->check_idle(idle_ttl_, true, kick_dead);
if (ret > 0) {
nfreed += ret;
}
if (min > 0) {
pool->keep_minimal(min);
}
nleft += pool->get_count();
}

if (left) {
*left = nleft;
}
return nfreed;
}

size_t connect_manager::check_dead(size_t step, size_t* left /* NULL */)
{
size_t nleft = 0, nfreed = 0, pools_size, check_max, check_pos;
unsigned long id = get_id();

Expand All @@ -398,12 +433,16 @@ size_t connect_manager::check_idle(size_t step, size_t* left /* = NULL */)
return 0;
}

if (step == 0 || step > pools_size) {
step = pools_size;
}

check_pos = pools.check_next++ % pools_size;
check_max = check_pos + step;

while (check_pos < pools_size && check_pos < check_max) {
connect_pool* pool = pools.pools[check_pos++];
int ret = pool->check_idle(idle_ttl_);
connect_pool* pool = pools.pools[check_pos++ % pools_size];
int ret = pool->check_dead();
if (ret > 0) {
nfreed += ret;
}
Expand Down
18 changes: 17 additions & 1 deletion lib_acl_cpp/src/connpool/connect_monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,20 @@

namespace acl {

connect_monitor::connect_monitor(connect_manager& manager)
connect_monitor::connect_monitor(connect_manager& manager, bool check_server /* false */)
: stop_(false)
, stop_graceful_(true)
, handle_(ENGINE_KERNEL)
, manager_(manager)
, check_server_(check_server)
, check_inter_(1)
, conn_timeout_(10)
, check_idle_on_(false)
, kick_dead_(false)
, conns_min_(0)
, check_idle_step_(128)
, check_dead_on_(false)
, check_dead_step_(128)
, rpc_service_(NULL)
{
}
Expand Down Expand Up @@ -55,6 +62,15 @@ connect_monitor& connect_monitor::set_conn_timeout(int n)
return *this;
}

connect_monitor& connect_monitor::set_check_idle(bool on,
bool kick_dead, size_t conns_min, size_t step) {
check_idle_on_ = on;
kick_dead_ = kick_dead;
conns_min_ = conns_min;
check_idle_step_ = step;
return *this;
}

void connect_monitor::stop(bool graceful)
{
stop_ = true;
Expand Down
Loading

0 comments on commit 1bc8785

Please sign in to comment.