Skip to content

Commit

Permalink
fix format issues and add env DAOS_SHM_SIZE
Browse files Browse the repository at this point in the history
Features: shm

Required-githooks: true
Skipped-githooks: codespell

Signed-off-by: Lei Huang <[email protected]>
  • Loading branch information
wiliamhuang committed Dec 21, 2024
1 parent fe7d780 commit c1acb9d
Show file tree
Hide file tree
Showing 7 changed files with 321 additions and 233 deletions.
144 changes: 83 additions & 61 deletions src/gurt/shm_alloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,36 +29,54 @@ static int pid;
static __thread int idx_small = -1;

/* the address of shared memory region */
struct d_shm_hdr *d_shm_head;
struct d_shm_hdr *d_shm_head;

/* the attribute set for mutex located inside shared memory */
pthread_mutexattr_t d_shm_mutex_attr;
pthread_mutexattr_t d_shm_mutex_attr;

/* the number of times to try calling shm_open() */
#define RETRY (5)
#define RETRY (5)

/**
* pid of the process who creates shared memory region. shared memory is NOT unmapped when this
* process exits to keep shm always available. shared memory is unmapped when other processes
* exit.
*/
static int pid_shm_creator;
static int pid_shm_creator;

int
shm_init(void)
{
int i;
int shm_ht_fd;
int shmopen_perm = 0600;
void *shm_addr;
int rc;
char daos_shm_name_buf[64];
int i;
int shm_ht_fd;
int shmopen_perm = 0600;
void *shm_addr;
int rc;
char daos_shm_name_buf[64];
uint64_t shm_size;
uint64_t shm_pool_size;

if (pid == 0)
pid = getpid();
/* shared memory already initlized in current process */
if (d_shm_head)
if (d_shm_head) {
while (d_shm_head->magic != DSM_MAGIC)
usleep(1);
/* shared memory already initlized in current process */
return 0;
}

rc = d_getenv_uint64_t("DAOS_SHM_SIZE", &shm_size);
if (rc != -DER_NONEXIST) {
/* set parameter from env */
shm_pool_size = shm_size / N_SHM_POOL;
if (shm_pool_size % 4096)
/* make shm_pool_size 4K aligned */
shm_pool_size += (4096 - (shm_pool_size % 4096));
shm_size = shm_pool_size * N_SHM_POOL + sizeof(struct d_shm_hdr);
} else {
shm_pool_size = SHM_POOL_SIZE;
shm_size = SHM_SIZE_REQ;
}

rc = pthread_mutexattr_init(&d_shm_mutex_attr);
D_ASSERT(rc == 0);
Expand All @@ -71,15 +89,17 @@ shm_init(void)

/* the shared memory only accessible for individual user for now */
sprintf(daos_shm_name_buf, "%s_%d", daos_shm_name, getuid());
shm_ht_fd = shm_open(daos_shm_name_buf, O_RDWR, 0600);
open_rw:
shm_ht_fd = shm_open(daos_shm_name_buf, O_RDWR, shmopen_perm);
/* failed to open */
if(shm_ht_fd == -1) {
if (shm_ht_fd == -1) {
if (errno == ENOENT) {
goto create_shm;
} else {
DS_ERROR(errno, "unexpected error shm_open()");
for (i = 0; i < RETRY; i++) {
usleep(5);
/* take a short nap to wait for the creation of shm */
usleep(10);
shm_ht_fd = shm_open(daos_shm_name_buf, O_RDWR, shmopen_perm);
if (shm_ht_fd >= 0)
break;
Expand All @@ -93,78 +113,84 @@ shm_init(void)
}

/* map existing shared memory */
shm_addr = mmap(FIXED_SHM_ADDR, SHM_SIZE_REQ, PROT_READ | PROT_WRITE, MAP_SHARED |
MAP_FIXED, shm_ht_fd, 0);
shm_addr = mmap(FIXED_SHM_ADDR, shm_size, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_FIXED, shm_ht_fd, 0);
if (shm_addr != FIXED_SHM_ADDR) {
DS_ERROR(errno, "mmap failed to map at desired address");
goto err;
}
d_shm_head = (struct d_shm_hdr *)shm_addr;
/* wait until the shared memory initlization finished */
while (d_shm_head->magic != DSM_MAGIC)
/* wait until the shared memory initialization finished */
while (((struct d_shm_hdr *)shm_addr)->magic != DSM_MAGIC)
usleep(1);
if (d_shm_head->size != SHM_SIZE_REQ) {
if (((struct d_shm_hdr *)shm_addr)->size != shm_size) {
/* EBADRQC - Invalid request code */
errno = EBADRQC;
DS_ERROR(errno, "unexpected shared memory size. Multiple versions of daos?");
DS_ERROR(errno, "unexpected shared memory size. Multiple versions of daos or env?");
goto err_unmap;
}
atomic_fetch_add_relaxed(&(d_shm_head->ref_count), 1);
atomic_fetch_add_relaxed(&(((struct d_shm_hdr *)shm_addr)->ref_count), 1);
d_shm_head = (struct d_shm_hdr *)shm_addr;
close(shm_ht_fd);
return 0;

create_shm:
shm_ht_fd = shm_open(daos_shm_name_buf, O_RDWR | O_CREAT | O_EXCL, shmopen_perm);
/* failed to create */
if(shm_ht_fd == -1) {
if (shm_ht_fd == -1) {
if (errno == EEXIST)
goto open_rw;
DS_ERROR(errno, "shm_open() failed to create shared memory");
return errno;
}

if (ftruncate(shm_ht_fd, SHM_SIZE_REQ) != 0) {
/* set the size of shared memory region */
if (ftruncate(shm_ht_fd, shm_size) != 0) {
DS_ERROR(errno, "ftruncate() failed for shm_ht_fd");
goto err;
}
/* map the shared memory at a fixed address for now. We will remove this limit later. */
shm_addr = mmap(FIXED_SHM_ADDR, SHM_SIZE_REQ, PROT_READ | PROT_WRITE,
shm_addr = mmap(FIXED_SHM_ADDR, shm_size, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_FIXED, shm_ht_fd, 0);
if (shm_addr != FIXED_SHM_ADDR) {
DS_ERROR(errno, "mmap() failed to map at desired address");
goto err;
}
d_shm_head = (struct d_shm_hdr *)shm_addr;
d_shm_head = (struct d_shm_hdr *)shm_addr;
d_shm_head->magic = 0;
/* initialize memory allocators */
for (i = 0; i < N_SHM_POOL; i++) {
d_shm_head->tlsf[i] = tlsf_create_with_pool(shm_addr + sizeof(struct d_shm_hdr) +
(i * SHM_POOL_SIZE), SHM_POOL_SIZE);
d_shm_head->tlsf[i] = tlsf_create_with_pool(
shm_addr + sizeof(struct d_shm_hdr) + (i * shm_pool_size), shm_pool_size);
}

if(shm_mutex_init(&(d_shm_head->g_lock)) != 0) {
if (shm_mutex_init(&(d_shm_head->g_lock)) != 0) {
DS_ERROR(errno, "shm_mutex_init() failed");
goto err_unmap;
}
for (i = 0; i < N_SHM_POOL; i++) {
if(shm_mutex_init(&(d_shm_head->mem_lock[i])) != 0) {
if (shm_mutex_init(&(d_shm_head->mem_lock[i])) != 0) {
DS_ERROR(errno, "shm_mutex_init() failed");
goto err_unmap;
}
}
if(shm_mutex_init(&(d_shm_head->ht_lock)) != 0) {
if (shm_mutex_init(&(d_shm_head->ht_lock)) != 0) {
DS_ERROR(errno, "shm_mutex_init() failed");
goto err_unmap;
}

pid_shm_creator = pid;
pid_shm_creator = pid;
d_shm_head->off_ht_head = INVALID_OFFSET;

atomic_store_relaxed(&(d_shm_head->ref_count), 1);
d_shm_head->size = SHM_SIZE_REQ;
d_shm_head->magic = DSM_MAGIC;
/* initilization is finished now. */
atomic_store_relaxed(&(d_shm_head->large_mem_count), 0);
d_shm_head->size = shm_size;
d_shm_head->shm_pool_size = shm_pool_size;
d_shm_head->magic = DSM_MAGIC;
/* initialization is finished now. */
return 0;

err_unmap:
d_shm_head = NULL;
munmap(shm_addr, SHM_SIZE_REQ);
munmap(shm_addr, shm_size);

err:
close(shm_ht_fd);
Expand All @@ -181,16 +207,16 @@ shm_alloc_comm(size_t align, size_t size)
uint64_t oldref;

if (idx_small < 0) {
tid = syscall(SYS_gettid);
tid = syscall(SYS_gettid);
hash = d_hash_string_u32((const char *)&tid, sizeof(int));
/* choose a memory allocator based on tid */
idx_small = hash % N_SHM_POOL;
}
idx_allocator = idx_small;
if (size >= LARGE_MEM) {
oldref = atomic_fetch_add_relaxed(&(d_shm_head->large_mem_count), 1);
/* pick the allocator for large memery request with round-robin */
idx_allocator = oldref % N_SHM_POOL;
/* pick the allocator for large memory request with round-robin */
idx_allocator = oldref % N_SHM_POOL;
}
shm_mutex_lock(&(d_shm_head->mem_lock[idx_allocator]), NULL);
if (align == 0)
Expand Down Expand Up @@ -221,14 +247,14 @@ shm_free(void *ptr)

/* compare with the lower bound address of shared memory pool */
if (ptr < d_shm_head->tlsf[0]) {
DS_WARN(EINVAL, "Out of range memory pointer for shm_free()\n");
DS_ERROR(EINVAL, "Out of range memory pointer for shm_free()\n");

Check warning on line 250 in src/gurt/shm_alloc.c

View workflow job for this annotation

GitHub Actions / Logging macro checking

check-return, Line contains too many newlines
return;
}

idx_allocator = ((uint64_t)ptr - (uint64_t)d_shm_head->tlsf[0]) / SHM_POOL_SIZE;
idx_allocator = ((uint64_t)ptr - (uint64_t)d_shm_head->tlsf[0]) / d_shm_head->shm_pool_size;
/* compare with the upper bound address of shared memory pool */
if (idx_allocator >= N_SHM_POOL) {
DS_WARN(EINVAL, "Out of range memory pointer for shm_free()\n");
DS_ERROR(EINVAL, "Out of range memory pointer for shm_free()\n");

Check warning on line 257 in src/gurt/shm_alloc.c

View workflow job for this annotation

GitHub Actions / Logging macro checking

check-return, Line contains too many newlines
return;
}

Expand All @@ -246,29 +272,25 @@ shm_destroy(void)
unlink(daos_shm_file_name);
}

void
shm_dec_ref(void)
{
D_ASSERT(d_shm_head != NULL);
atomic_fetch_add_relaxed(&(d_shm_head->ref_count), -1);
if (pid != pid_shm_creator)
munmap(d_shm_head, SHM_SIZE_REQ);
d_shm_head = NULL;
}

void
shm_inc_ref(void)
{
D_ASSERT(d_shm_head != NULL);
atomic_fetch_add_relaxed(&(d_shm_head->ref_count), 1);
}

bool
shm_inited(void)
{
if (d_shm_head == NULL)
return false;
if (d_shm_head->magic != DSM_MAGIC)
return false;

return true;
}

void
shm_fini(void)
{
if (!shm_inited())
return;

atomic_fetch_add_relaxed(&(d_shm_head->ref_count), -1);
if (pid != pid_shm_creator)
munmap(d_shm_head, d_shm_head->size);
d_shm_head = NULL;
}
Loading

0 comments on commit c1acb9d

Please sign in to comment.