Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup communication threads in shutdown #19

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions libfs/lib/rdma/agent.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <sys/syscall.h>
#include <pthread.h>
#include <stdatomic.h>
#include <sys/socket.h>
#include "agent.h"

app_conn_cb_fn app_conn_event;
Expand Down Expand Up @@ -58,8 +59,23 @@ void init_rdma_agent(char *listen_port, struct mr_context *regions,
rdma_initialized = 1;
}

void shutdown_rdma_agent()
void shutdown_rdma_agent(int *socket_fds, int len)
{
int i;
struct conn_context* ctx;
rdma_initialized = 0;
void *ret;

for(i=0; i<len; i++) {
ctx = get_channel_ctx(socket_fds[i]);
if(!ctx) {
printf("Could not get ctx, was probably killed earlier\n");
continue;
}
shutdown(ctx->realfd, SHUT_RDWR);
pthread_join(ctx->cq_poller_thread, &ret);
}

#if 0
void *ret;
int sockfd = -1;
Expand Down Expand Up @@ -112,7 +128,7 @@ static void* rdma_server_loop(void *port)

//request connection to another RDMA agent (non-blocking)
//returns socket descriptor if successful, otherwise -1
int add_connection(char* ip, char *port, int app_type, pid_t pid, int ch_type, int polling_loop)
int add_connection(char* ip, char *port, int app_type, pid_t pid, int ch_type, int polling_loop, int*shutdown)
{
int sockfd = -1;

Expand Down Expand Up @@ -141,9 +157,13 @@ int add_connection(char* ip, char *port, int app_type, pid_t pid, int ch_type, i

sockfd = shmem_chan_add(atoi(port), -1, app_type, pid, polling_loop);
struct conn_context *ctx = s_conn_ctx[sockfd];
struct client_server_thread_args* args = malloc(sizeof(struct client_server_thread_args));
int *arg = malloc(sizeof(int));
*arg = sockfd;
if(pthread_create(&ctx->cq_poller_thread, NULL, local_client_thread, arg) != 0)

args->arg1 = arg;
args->arg2 = shutdown;
if(pthread_create(&ctx->cq_poller_thread, NULL, local_client_thread, args) != 0)
mp_die("Failed to create client_thread");
printf("[Local-Client] Creating connection (pid:%u, app_type:%d, status:pending) to %s:%s on sockfd %d\n",
pid, app_type, ip, port, sockfd);
Expand Down
4 changes: 2 additions & 2 deletions libfs/lib/rdma/agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ void init_rdma_agent(char *listen_port, struct mr_context *regions,
app_disc_cb_fn app_disconnect,
app_recv_cb_fn app_receive);

void shutdown_rdma_agent();
void shutdown_rdma_agent(int *sockets, int len);

int add_connection(char* ip, char *port, int app_type, pid_t pid, int ch_type, int polling_loop);
int add_connection(char* ip, char *port, int app_type, pid_t pid, int ch_type, int polling_loop, int* shutdown);

static void on_pre_conn(struct rdma_cm_id *id);
static void on_connection(struct rdma_cm_id *id);
Expand Down
19 changes: 14 additions & 5 deletions libfs/lib/rdma/shmem_ch.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ void shmem_chan_disconnect(int sockfd)
free(ctx);
}

void shmem_poll_loop(int sockfd)
void shmem_poll_loop(int sockfd, int* shutdown)
{
struct conn_context *ctx = get_channel_ctx(sockfd);
volatile struct message *recv_msg = NULL;
Expand All @@ -92,6 +92,10 @@ void shmem_poll_loop(int sockfd)
printf("start shmem_poll_loop for sockfd %d\n", ctx->sockfd);
while(ctx->poll_enable) {
recv_msg = shmem_recv(ctx);
if(*shutdown == 1) {
shmem_chan_disconnect(ctx->sockfd);
return;
}

if(recv_msg) {
recv_msg->meta.app.sockfd = ctx->sockfd;
Expand Down Expand Up @@ -152,17 +156,19 @@ void shmem_poll_loop(int sockfd)

}

void * local_client_thread(void *arg)
void * local_client_thread(void *args)
{
printf("In thread\n");

struct client_server_thread_args *cs_args = (struct client_server_thread_args*)args;

char send_path[32];
char recv_path[32];
char shm_msg[128];
char init_msg[128];
int client_socket;
struct sockaddr_in serv_addr;
int sockfd = *((int *)arg);
int sockfd = *(cs_args->arg1);

struct conn_context *ctx = s_conn_ctx[sockfd];

Expand Down Expand Up @@ -235,7 +241,7 @@ void * local_client_thread(void *arg)
}
}
#endif
shmem_poll_loop(sockfd);
shmem_poll_loop(sockfd, cs_args->arg2);
//shmem_chan_clear(sockfd);

printf("Exit client_thread \n");
Expand Down Expand Up @@ -336,10 +342,13 @@ void * local_server_thread(void *arg)
//pthread_mutex_unlock(&lock);
close(ctx->realfd);
#endif
shmem_poll_loop(sockfd);
int *shutdown = (int *)malloc(sizeof(int));
*shutdown = 0;
shmem_poll_loop(sockfd, shutdown);
//shmem_chan_clear(sockfd);

printf("Exit server_thread \n");
free(shutdown);

pthread_exit(NULL);
}
Expand Down
7 changes: 6 additions & 1 deletion libfs/lib/rdma/shmem_ch.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ void shmem_chan_setup(int sockfd, volatile void *send_buf, volatile void *recv_b
void shmem_chan_disconnect(int sockfd);
//void shmem_chan_clear(int sockfd);

void shmem_poll_loop(int sockfd);
void shmem_poll_loop(int sockfd, int *shutdown);

struct client_server_thread_args {
int *arg1;
int *arg2;
};

void * local_client_thread(void *arg);
void * local_server_thread(void *arg);
Expand Down
10 changes: 6 additions & 4 deletions libfs/src/distributed/rpc_interface.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ char g_self_ip[INET_ADDRSTRLEN];
int g_self_id = -1;
int g_kernfs_id = -1;
int rpc_shutdown = 0;
int socket_fds[g_n_nodes * 3];

//struct timeval start_time, end_time;

//uint32_t msg_seq[g_n_nodes*sock_count] = {0};

int init_rpc(struct mr_context *regions, int n_regions, char *listen_port, signal_cb_fn signal_callback)
{
rpc_shutdown = 0;
assert(RPC_MSG_BYTES > MAX_REMOTE_PATH); //ensure that we can signal remote read requests (including file path)

int chan_type = -1;
Expand Down Expand Up @@ -138,9 +140,9 @@ int init_rpc(struct mr_context *regions, int n_regions, char *listen_port, signa
if(do_connect) {
printf("Connecting to KernFS instance %d [ip: %s]\n", i, g_kernfs_peers[i]->ip);

add_connection((char*)g_kernfs_peers[i]->ip, listen_port, SOCK_IO, pid, chan_type, always_poll);
add_connection((char*)g_kernfs_peers[i]->ip, listen_port, SOCK_BG, pid, chan_type, always_poll);
add_connection((char*)g_kernfs_peers[i]->ip, listen_port, SOCK_LS, pid, chan_type, always_poll);
socket_fds[(i*3) + 0] = add_connection((char*)g_kernfs_peers[i]->ip, listen_port, SOCK_IO, pid, chan_type, always_poll, &rpc_shutdown);
socket_fds[(i*3) + 1] = add_connection((char*)g_kernfs_peers[i]->ip, listen_port, SOCK_BG, pid, chan_type, always_poll, &rpc_shutdown);
socket_fds[(i*3) + 2] = add_connection((char*)g_kernfs_peers[i]->ip, listen_port, SOCK_LS, pid, chan_type, always_poll, &rpc_shutdown);

}
}
Expand Down Expand Up @@ -190,7 +192,7 @@ int init_rpc(struct mr_context *regions, int n_regions, char *listen_port, signa
int shutdown_rpc()
{
rpc_shutdown = 1;
shutdown_rdma_agent();
shutdown_rdma_agent(socket_fds, 3 * g_n_nodes);
return 0;
}

Expand Down