diff --git a/libfs/lib/rdma/agent.c b/libfs/lib/rdma/agent.c index 39c00ce..809387a 100644 --- a/libfs/lib/rdma/agent.c +++ b/libfs/lib/rdma/agent.c @@ -1,6 +1,7 @@ #include #include #include +#include #include "agent.h" app_conn_cb_fn app_conn_event; @@ -58,8 +59,23 @@ void init_rdma_agent(char *listen_port, struct mr_context *regions, rdma_initialized = 1; } -void shutdown_rdma_agent() +void shutdown_rdma_agent(int *socket_fds, int len) { + int i; + struct conn_context* ctx; + rdma_initialized = 0; + void *ret; + + for(i=0; irealfd, SHUT_RDWR); + pthread_join(ctx->cq_poller_thread, &ret); + } + #if 0 void *ret; int sockfd = -1; @@ -112,7 +128,7 @@ static void* rdma_server_loop(void *port) //request connection to another RDMA agent (non-blocking) //returns socket descriptor if successful, otherwise -1 -int add_connection(char* ip, char *port, int app_type, pid_t pid, int ch_type, int polling_loop) +int add_connection(char* ip, char *port, int app_type, pid_t pid, int ch_type, int polling_loop, int*shutdown) { int sockfd = -1; @@ -141,9 +157,13 @@ int add_connection(char* ip, char *port, int app_type, pid_t pid, int ch_type, i sockfd = shmem_chan_add(atoi(port), -1, app_type, pid, polling_loop); struct conn_context *ctx = s_conn_ctx[sockfd]; + struct client_server_thread_args* args = malloc(sizeof(struct client_server_thread_args)); int *arg = malloc(sizeof(int)); *arg = sockfd; - if(pthread_create(&ctx->cq_poller_thread, NULL, local_client_thread, arg) != 0) + + args->arg1 = arg; + args->arg2 = shutdown; + if(pthread_create(&ctx->cq_poller_thread, NULL, local_client_thread, args) != 0) mp_die("Failed to create client_thread"); printf("[Local-Client] Creating connection (pid:%u, app_type:%d, status:pending) to %s:%s on sockfd %d\n", pid, app_type, ip, port, sockfd); diff --git a/libfs/lib/rdma/agent.h b/libfs/lib/rdma/agent.h index 226eea8..342619c 100644 --- a/libfs/lib/rdma/agent.h +++ b/libfs/lib/rdma/agent.h @@ -29,9 +29,9 @@ void init_rdma_agent(char *listen_port, struct mr_context *regions, app_disc_cb_fn app_disconnect, app_recv_cb_fn app_receive); -void shutdown_rdma_agent(); +void shutdown_rdma_agent(int *sockets, int len); -int add_connection(char* ip, char *port, int app_type, pid_t pid, int ch_type, int polling_loop); +int add_connection(char* ip, char *port, int app_type, pid_t pid, int ch_type, int polling_loop, int* shutdown); static void on_pre_conn(struct rdma_cm_id *id); static void on_connection(struct rdma_cm_id *id); diff --git a/libfs/lib/rdma/shmem_ch.c b/libfs/lib/rdma/shmem_ch.c index 76a4b82..800527f 100644 --- a/libfs/lib/rdma/shmem_ch.c +++ b/libfs/lib/rdma/shmem_ch.c @@ -75,7 +75,7 @@ void shmem_chan_disconnect(int sockfd) free(ctx); } -void shmem_poll_loop(int sockfd) +void shmem_poll_loop(int sockfd, int* shutdown) { struct conn_context *ctx = get_channel_ctx(sockfd); volatile struct message *recv_msg = NULL; @@ -92,6 +92,10 @@ void shmem_poll_loop(int sockfd) printf("start shmem_poll_loop for sockfd %d\n", ctx->sockfd); while(ctx->poll_enable) { recv_msg = shmem_recv(ctx); + if(*shutdown == 1) { + shmem_chan_disconnect(ctx->sockfd); + return; + } if(recv_msg) { recv_msg->meta.app.sockfd = ctx->sockfd; @@ -152,17 +156,19 @@ void shmem_poll_loop(int sockfd) } -void * local_client_thread(void *arg) +void * local_client_thread(void *args) { printf("In thread\n"); + struct client_server_thread_args *cs_args = (struct client_server_thread_args*)args; + char send_path[32]; char recv_path[32]; char shm_msg[128]; char init_msg[128]; int client_socket; struct sockaddr_in serv_addr; - int sockfd = *((int *)arg); + int sockfd = *(cs_args->arg1); struct conn_context *ctx = s_conn_ctx[sockfd]; @@ -235,7 +241,7 @@ void * local_client_thread(void *arg) } } #endif - shmem_poll_loop(sockfd); + shmem_poll_loop(sockfd, cs_args->arg2); //shmem_chan_clear(sockfd); printf("Exit client_thread \n"); @@ -336,10 +342,13 @@ void * local_server_thread(void *arg) //pthread_mutex_unlock(&lock); close(ctx->realfd); #endif - shmem_poll_loop(sockfd); + int *shutdown = (int *)malloc(sizeof(int)); + *shutdown = 0; + shmem_poll_loop(sockfd, shutdown); //shmem_chan_clear(sockfd); printf("Exit server_thread \n"); + free(shutdown); pthread_exit(NULL); } diff --git a/libfs/lib/rdma/shmem_ch.h b/libfs/lib/rdma/shmem_ch.h index 87076cc..192798e 100644 --- a/libfs/lib/rdma/shmem_ch.h +++ b/libfs/lib/rdma/shmem_ch.h @@ -29,7 +29,12 @@ void shmem_chan_setup(int sockfd, volatile void *send_buf, volatile void *recv_b void shmem_chan_disconnect(int sockfd); //void shmem_chan_clear(int sockfd); -void shmem_poll_loop(int sockfd); +void shmem_poll_loop(int sockfd, int *shutdown); + +struct client_server_thread_args { + int *arg1; + int *arg2; +}; void * local_client_thread(void *arg); void * local_server_thread(void *arg); diff --git a/libfs/src/distributed/rpc_interface.c b/libfs/src/distributed/rpc_interface.c index 8cecda2..1b075ca 100644 --- a/libfs/src/distributed/rpc_interface.c +++ b/libfs/src/distributed/rpc_interface.c @@ -20,6 +20,7 @@ char g_self_ip[INET_ADDRSTRLEN]; int g_self_id = -1; int g_kernfs_id = -1; int rpc_shutdown = 0; +int socket_fds[g_n_nodes * 3]; //struct timeval start_time, end_time; @@ -27,6 +28,7 @@ int rpc_shutdown = 0; int init_rpc(struct mr_context *regions, int n_regions, char *listen_port, signal_cb_fn signal_callback) { + rpc_shutdown = 0; assert(RPC_MSG_BYTES > MAX_REMOTE_PATH); //ensure that we can signal remote read requests (including file path) int chan_type = -1; @@ -138,9 +140,9 @@ int init_rpc(struct mr_context *regions, int n_regions, char *listen_port, signa if(do_connect) { printf("Connecting to KernFS instance %d [ip: %s]\n", i, g_kernfs_peers[i]->ip); - add_connection((char*)g_kernfs_peers[i]->ip, listen_port, SOCK_IO, pid, chan_type, always_poll); - add_connection((char*)g_kernfs_peers[i]->ip, listen_port, SOCK_BG, pid, chan_type, always_poll); - add_connection((char*)g_kernfs_peers[i]->ip, listen_port, SOCK_LS, pid, chan_type, always_poll); + socket_fds[(i*3) + 0] = add_connection((char*)g_kernfs_peers[i]->ip, listen_port, SOCK_IO, pid, chan_type, always_poll, &rpc_shutdown); + socket_fds[(i*3) + 1] = add_connection((char*)g_kernfs_peers[i]->ip, listen_port, SOCK_BG, pid, chan_type, always_poll, &rpc_shutdown); + socket_fds[(i*3) + 2] = add_connection((char*)g_kernfs_peers[i]->ip, listen_port, SOCK_LS, pid, chan_type, always_poll, &rpc_shutdown); } } @@ -190,7 +192,7 @@ int init_rpc(struct mr_context *regions, int n_regions, char *listen_port, signa int shutdown_rpc() { rpc_shutdown = 1; - shutdown_rdma_agent(); + shutdown_rdma_agent(socket_fds, 3 * g_n_nodes); return 0; }