Skip to content

Commit

Permalink
Cleanup communication threads in shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
OmSaran committed Nov 18, 2021
1 parent 6f4aec8 commit 59faaba
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 15 deletions.
27 changes: 24 additions & 3 deletions libfs/lib/rdma/agent.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <sys/syscall.h>
#include <pthread.h>
#include <stdatomic.h>
#include <sys/socket.h>
#include "agent.h"

app_conn_cb_fn app_conn_event;
Expand Down Expand Up @@ -58,8 +59,24 @@ void init_rdma_agent(char *listen_port, struct mr_context *regions,
rdma_initialized = 1;
}

void shutdown_rdma_agent()
void shutdown_rdma_agent(int *socket_fds, int len)
{
printf("In %s; len = %d\n", __func__, len);
int i;
struct conn_context* ctx;
rdma_initialized = 0;
void *ret;

for(i=0; i<len; i++) {
ctx = get_channel_ctx(socket_fds[i]);
if(!ctx) {
printf("Could not get ctx, was probably killed earlier\n");
continue;
}
shutdown(ctx->realfd, SHUT_RDWR);
pthread_join(ctx->cq_poller_thread, &ret);
}

#if 0
void *ret;
int sockfd = -1;
Expand Down Expand Up @@ -112,7 +129,7 @@ static void* rdma_server_loop(void *port)

//request connection to another RDMA agent (non-blocking)
//returns socket descriptor if successful, otherwise -1
int add_connection(char* ip, char *port, int app_type, pid_t pid, int ch_type, int polling_loop)
int add_connection(char* ip, char *port, int app_type, pid_t pid, int ch_type, int polling_loop, int*shutdown)
{
int sockfd = -1;

Expand Down Expand Up @@ -141,9 +158,13 @@ int add_connection(char* ip, char *port, int app_type, pid_t pid, int ch_type, i

sockfd = shmem_chan_add(atoi(port), -1, app_type, pid, polling_loop);
struct conn_context *ctx = s_conn_ctx[sockfd];
struct client_server_thread_args* args = malloc(sizeof(struct client_server_thread_args));
int *arg = malloc(sizeof(int));
*arg = sockfd;
if(pthread_create(&ctx->cq_poller_thread, NULL, local_client_thread, arg) != 0)

args->arg1 = arg;
args->arg2 = shutdown;
if(pthread_create(&ctx->cq_poller_thread, NULL, local_client_thread, args) != 0)
mp_die("Failed to create client_thread");
printf("[Local-Client] Creating connection (pid:%u, app_type:%d, status:pending) to %s:%s on sockfd %d\n",
pid, app_type, ip, port, sockfd);
Expand Down
4 changes: 2 additions & 2 deletions libfs/lib/rdma/agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ void init_rdma_agent(char *listen_port, struct mr_context *regions,
app_disc_cb_fn app_disconnect,
app_recv_cb_fn app_receive);

void shutdown_rdma_agent();
void shutdown_rdma_agent(int *sockets, int len);

int add_connection(char* ip, char *port, int app_type, pid_t pid, int ch_type, int polling_loop);
int add_connection(char* ip, char *port, int app_type, pid_t pid, int ch_type, int polling_loop, int* shutdown);

static void on_pre_conn(struct rdma_cm_id *id);
static void on_connection(struct rdma_cm_id *id);
Expand Down
19 changes: 14 additions & 5 deletions libfs/lib/rdma/shmem_ch.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ void shmem_chan_disconnect(int sockfd)
free(ctx);
}

void shmem_poll_loop(int sockfd)
void shmem_poll_loop(int sockfd, int* shutdown)
{
struct conn_context *ctx = get_channel_ctx(sockfd);
volatile struct message *recv_msg = NULL;
Expand All @@ -92,6 +92,10 @@ void shmem_poll_loop(int sockfd)
printf("start shmem_poll_loop for sockfd %d\n", ctx->sockfd);
while(ctx->poll_enable) {
recv_msg = shmem_recv(ctx);
if(*shutdown == 1) {
shmem_chan_disconnect(ctx->sockfd);
return;
}

if(recv_msg) {
recv_msg->meta.app.sockfd = ctx->sockfd;
Expand Down Expand Up @@ -152,17 +156,19 @@ void shmem_poll_loop(int sockfd)

}

void * local_client_thread(void *arg)
void * local_client_thread(void *args)
{
printf("In thread\n");

struct client_server_thread_args *cs_args = (struct client_server_thread_args*)args;

char send_path[32];
char recv_path[32];
char shm_msg[128];
char init_msg[128];
int client_socket;
struct sockaddr_in serv_addr;
int sockfd = *((int *)arg);
int sockfd = *(cs_args->arg1);

struct conn_context *ctx = s_conn_ctx[sockfd];

Expand Down Expand Up @@ -235,7 +241,7 @@ void * local_client_thread(void *arg)
}
}
#endif
shmem_poll_loop(sockfd);
shmem_poll_loop(sockfd, cs_args->arg2);
//shmem_chan_clear(sockfd);

printf("Exit client_thread \n");
Expand Down Expand Up @@ -336,10 +342,13 @@ void * local_server_thread(void *arg)
//pthread_mutex_unlock(&lock);
close(ctx->realfd);
#endif
shmem_poll_loop(sockfd);
int *shutdown = (int *)malloc(sizeof(int));
*shutdown = 0;
shmem_poll_loop(sockfd, shutdown);
//shmem_chan_clear(sockfd);

printf("Exit server_thread \n");
free(shutdown);

pthread_exit(NULL);
}
Expand Down
7 changes: 6 additions & 1 deletion libfs/lib/rdma/shmem_ch.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ void shmem_chan_setup(int sockfd, volatile void *send_buf, volatile void *recv_b
void shmem_chan_disconnect(int sockfd);
//void shmem_chan_clear(int sockfd);

void shmem_poll_loop(int sockfd);
void shmem_poll_loop(int sockfd, int *shutdown);

struct client_server_thread_args {
int *arg1;
int *arg2;
};

void * local_client_thread(void *arg);
void * local_server_thread(void *arg);
Expand Down
10 changes: 6 additions & 4 deletions libfs/src/distributed/rpc_interface.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ char g_self_ip[INET_ADDRSTRLEN];
int g_self_id = -1;
int g_kernfs_id = -1;
int rpc_shutdown = 0;
int socket_fds[g_n_nodes * 3];

//struct timeval start_time, end_time;

//uint32_t msg_seq[g_n_nodes*sock_count] = {0};

int init_rpc(struct mr_context *regions, int n_regions, char *listen_port, signal_cb_fn signal_callback)
{
rpc_shutdown = 0;
assert(RPC_MSG_BYTES > MAX_REMOTE_PATH); //ensure that we can signal remote read requests (including file path)

int chan_type = -1;
Expand Down Expand Up @@ -138,9 +140,9 @@ int init_rpc(struct mr_context *regions, int n_regions, char *listen_port, signa
if(do_connect) {
printf("Connecting to KernFS instance %d [ip: %s]\n", i, g_kernfs_peers[i]->ip);

add_connection((char*)g_kernfs_peers[i]->ip, listen_port, SOCK_IO, pid, chan_type, always_poll);
add_connection((char*)g_kernfs_peers[i]->ip, listen_port, SOCK_BG, pid, chan_type, always_poll);
add_connection((char*)g_kernfs_peers[i]->ip, listen_port, SOCK_LS, pid, chan_type, always_poll);
socket_fds[(i*3) + 0] = add_connection((char*)g_kernfs_peers[i]->ip, listen_port, SOCK_IO, pid, chan_type, always_poll, &rpc_shutdown);
socket_fds[(i*3) + 1] = add_connection((char*)g_kernfs_peers[i]->ip, listen_port, SOCK_BG, pid, chan_type, always_poll, &rpc_shutdown);
socket_fds[(i*3) + 2] = add_connection((char*)g_kernfs_peers[i]->ip, listen_port, SOCK_LS, pid, chan_type, always_poll, &rpc_shutdown);

}
}
Expand Down Expand Up @@ -190,7 +192,7 @@ int init_rpc(struct mr_context *regions, int n_regions, char *listen_port, signa
int shutdown_rpc()
{
rpc_shutdown = 1;
shutdown_rdma_agent();
shutdown_rdma_agent(socket_fds, 3 * g_n_nodes);
return 0;
}

Expand Down

0 comments on commit 59faaba

Please sign in to comment.