Skip to content

Commit

Permalink
Rework and simplify RCU code
Browse files Browse the repository at this point in the history
Use __ATOMIC_RELAXED where possible.
Dont store additional values in the users field.

Reviewed-by: Neil Horman <[email protected]>
Reviewed-by: Tomas Mraz <[email protected]>
(Merged from openssl#26690)
  • Loading branch information
bernd-edlinger authored and t8m committed Feb 13, 2025
1 parent 65787e2 commit 5949918
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 118 deletions.
99 changes: 25 additions & 74 deletions crypto/threads_pthread.c
Original file line number Diff line number Diff line change
Expand Up @@ -270,25 +270,6 @@ static ossl_inline uint64_t fallback_atomic_or_fetch(uint64_t *p, uint64_t m)
# define ATOMIC_OR_FETCH(p, v, o) fallback_atomic_or_fetch(p, v)
# endif

/*
* users is broken up into 2 parts
* bits 0-15 current readers
* bit 32-63 ID
*/
# define READER_SHIFT 0
# define ID_SHIFT 32
/* TODO: READER_SIZE 32 in threads_win.c */
# define READER_SIZE 16
# define ID_SIZE 32

# define READER_MASK (((uint64_t)1 << READER_SIZE) - 1)
# define ID_MASK (((uint64_t)1 << ID_SIZE) - 1)
# define READER_COUNT(x) ((uint32_t)(((uint64_t)(x) >> READER_SHIFT) & \
READER_MASK))
# define ID_VAL(x) ((uint32_t)(((uint64_t)(x) >> ID_SHIFT) & ID_MASK))
# define VAL_READER ((uint64_t)1 << READER_SHIFT)
# define VAL_ID(x) ((uint64_t)x << ID_SHIFT)

/*
* This is the core of an rcu lock. It tracks the readers and writers for the
* current quiescence point for a given lock. Users is the 64 bit value that
Expand Down Expand Up @@ -388,29 +369,16 @@ static struct rcu_qp *get_hold_current_qp(struct rcu_lock_st *lock)
*/
qp_idx = ATOMIC_LOAD_N(uint32_t, &lock->reader_idx, __ATOMIC_ACQUIRE);

/*
* Notes of use of __ATOMIC_RELEASE
* This counter is only read by the write side of the lock, and so we
* specify __ATOMIC_RELEASE here to ensure that the write side of the
* lock see this during the spin loop read of users, as it waits for the
* reader count to approach zero
*/
ATOMIC_ADD_FETCH(&lock->qp_group[qp_idx].users, VAL_READER,
__ATOMIC_RELEASE);
ATOMIC_ADD_FETCH(&lock->qp_group[qp_idx].users, (uint64_t)1,
__ATOMIC_ACQUIRE);

/* if the idx hasn't changed, we're good, else try again */
if (qp_idx == ATOMIC_LOAD_N(uint32_t, &lock->reader_idx,
__ATOMIC_ACQUIRE))
__ATOMIC_RELAXED))
break;

/*
* Notes on use of __ATOMIC_RELEASE
* As with the add above, we want to ensure that this decrement is
* seen by the write side of the lock as soon as it happens to prevent
* undue spinning waiting for write side completion
*/
ATOMIC_SUB_FETCH(&lock->qp_group[qp_idx].users, VAL_READER,
__ATOMIC_RELEASE);
ATOMIC_SUB_FETCH(&lock->qp_group[qp_idx].users, (uint64_t)1,
__ATOMIC_RELAXED);
}

return &lock->qp_group[qp_idx];
Expand Down Expand Up @@ -477,14 +445,14 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock)
for (i = 0; i < MAX_QPS; i++) {
if (data->thread_qps[i].lock == lock) {
/*
* As with read side acquisition, we use __ATOMIC_RELEASE here
* to ensure that the decrement is published immediately
* to any write side waiters
* we have to use __ATOMIC_RELEASE here
* to ensure that all preceding read instructions complete
* before the decrement is visible to ossl_synchronize_rcu
*/
data->thread_qps[i].depth--;
if (data->thread_qps[i].depth == 0) {
ret = ATOMIC_SUB_FETCH(&data->thread_qps[i].qp->users,
VAL_READER, __ATOMIC_RELEASE);
(uint64_t)1, __ATOMIC_RELEASE);
OPENSSL_assert(ret != UINT64_MAX);
data->thread_qps[i].qp = NULL;
data->thread_qps[i].lock = NULL;
Expand All @@ -503,9 +471,8 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock)
* Write side allocation routine to get the current qp
* and replace it with a new one
*/
static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock, uint32_t *curr_id)
{
uint64_t new_id;
uint32_t current_idx;

pthread_mutex_lock(&lock->alloc_lock);
Expand All @@ -528,29 +495,11 @@ static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
lock->current_alloc_idx =
(lock->current_alloc_idx + 1) % lock->group_count;

/* get and insert a new id */
new_id = VAL_ID(lock->id_ctr);
*curr_id = lock->id_ctr;
lock->id_ctr++;

/*
* Even though we are under a write side lock here
* We need to use atomic instructions to ensure that the results
* of this update are published to the read side prior to updating the
* reader idx below
*/
ATOMIC_AND_FETCH(&lock->qp_group[current_idx].users, ID_MASK,
__ATOMIC_RELEASE);
ATOMIC_OR_FETCH(&lock->qp_group[current_idx].users, new_id,
__ATOMIC_RELEASE);

/*
* Update the reader index to be the prior qp.
* Note the use of __ATOMIC_RELEASE here is based on the corresponding use
* of __ATOMIC_ACQUIRE in get_hold_current_qp, as we want any publication
* of this value to be seen on the read side immediately after it happens
*/
ATOMIC_STORE_N(uint32_t, &lock->reader_idx, lock->current_alloc_idx,
__ATOMIC_RELEASE);
__ATOMIC_RELAXED);

/* wake up any waiters */
pthread_cond_signal(&lock->alloc_signal);
Expand Down Expand Up @@ -594,32 +543,34 @@ void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock)
{
struct rcu_qp *qp;
uint64_t count;
uint32_t curr_id;
struct rcu_cb_item *cb_items, *tmpcb;

pthread_mutex_lock(&lock->write_lock);
cb_items = lock->cb_items;
lock->cb_items = NULL;
pthread_mutex_unlock(&lock->write_lock);

qp = update_qp(lock);
qp = update_qp(lock, &curr_id);

/* retire in order */
pthread_mutex_lock(&lock->prior_lock);
while (lock->next_to_retire != curr_id)
pthread_cond_wait(&lock->prior_signal, &lock->prior_lock);
lock->next_to_retire++;
pthread_cond_broadcast(&lock->prior_signal);
pthread_mutex_unlock(&lock->prior_lock);

/*
* wait for the reader count to reach zero
* Note the use of __ATOMIC_ACQUIRE here to ensure that any
* prior __ATOMIC_RELEASE write operation in get_hold_current_qp
* prior __ATOMIC_RELEASE write operation in ossl_rcu_read_unlock
* is visible prior to our read
* however this is likely just necessary to silence a tsan warning
*/
do {
count = ATOMIC_LOAD_N(uint64_t, &qp->users, __ATOMIC_ACQUIRE);
} while (READER_COUNT(count) != 0);

/* retire in order */
pthread_mutex_lock(&lock->prior_lock);
while (lock->next_to_retire != ID_VAL(count))
pthread_cond_wait(&lock->prior_signal, &lock->prior_lock);
lock->next_to_retire++;
pthread_cond_broadcast(&lock->prior_signal);
pthread_mutex_unlock(&lock->prior_lock);
} while (count != (uint64_t)0);

retire_qp(lock, qp);

Expand Down
57 changes: 13 additions & 44 deletions crypto/threads_win.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,6 @@ typedef struct {
} CRYPTO_win_rwlock;
# endif

/*
* users is broken up into 2 parts
* bits 0-31 current readers
* bit 32-63 ID
*/
# define READER_SHIFT 0
# define ID_SHIFT 32
/* TODO: READER_SIZE 16 in threads_pthread.c */
# define READER_SIZE 32
# define ID_SIZE 32

# define READER_MASK (((uint64_t)1 << READER_SIZE) - 1)
# define ID_MASK (((uint64_t)1 << ID_SIZE) - 1)
# define READER_COUNT(x) ((uint32_t)(((uint64_t)(x) >> READER_SHIFT) & \
READER_MASK))
# define ID_VAL(x) ((uint32_t)(((uint64_t)(x) >> ID_SHIFT) & ID_MASK))
# define VAL_READER ((int64_t)1 << READER_SHIFT)
# define VAL_ID(x) ((uint64_t)x << ID_SHIFT)

/*
* This defines a quescent point (qp)
* This is the barrier beyond which a writer
Expand Down Expand Up @@ -229,13 +210,13 @@ static ossl_inline struct rcu_qp *get_hold_current_qp(CRYPTO_RCU_LOCK *lock)
for (;;) {
CRYPTO_atomic_load_int((int *)&lock->reader_idx, (int *)&qp_idx,
lock->rw_lock);
CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, VAL_READER, &tmp64,
CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, (uint64_t)1, &tmp64,
lock->rw_lock);
CRYPTO_atomic_load_int((int *)&lock->reader_idx, (int *)&tmp,
lock->rw_lock);
if (qp_idx == tmp)
break;
CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, -VAL_READER, &tmp64,
CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, (uint64_t)-1, &tmp64,
lock->rw_lock);
}

Expand Down Expand Up @@ -313,7 +294,7 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock)
data->thread_qps[i].depth--;
if (data->thread_qps[i].depth == 0) {
CRYPTO_atomic_add64(&data->thread_qps[i].qp->users,
-VAL_READER, (uint64_t *)&ret,
(uint64_t)-1, (uint64_t *)&ret,
lock->rw_lock);
OPENSSL_assert(ret >= 0);
data->thread_qps[i].qp = NULL;
Expand All @@ -328,12 +309,10 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock)
* Write side allocation routine to get the current qp
* and replace it with a new one
*/
static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock, uint32_t *curr_id)
{
uint64_t new_id;
uint32_t current_idx;
uint32_t tmp;
uint64_t tmp64;

ossl_crypto_mutex_lock(lock->alloc_lock);
/*
Expand All @@ -355,20 +334,9 @@ static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
(lock->current_alloc_idx + 1) % lock->group_count;

/* get and insert a new id */
new_id = VAL_ID(lock->id_ctr);
*curr_id = lock->id_ctr;
lock->id_ctr++;

/*
* Even though we are under a write side lock here
* We need to use atomic instructions to ensure that the results
* of this update are published to the read side prior to updating the
* reader idx below
*/
CRYPTO_atomic_and(&lock->qp_group[current_idx].users, ID_MASK, &tmp64,
lock->rw_lock);
CRYPTO_atomic_add64(&lock->qp_group[current_idx].users, new_id, &tmp64,
lock->rw_lock);

/* update the reader index to be the prior qp */
tmp = lock->current_alloc_idx;
InterlockedExchange((LONG volatile *)&lock->reader_idx, tmp);
Expand All @@ -393,28 +361,29 @@ void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock)
{
struct rcu_qp *qp;
uint64_t count;
uint32_t curr_id;
struct rcu_cb_item *cb_items, *tmpcb;

/* before we do anything else, lets grab the cb list */
cb_items = InterlockedExchangePointer((void * volatile *)&lock->cb_items,
NULL);

qp = update_qp(lock);

/* wait for the reader count to reach zero */
do {
CRYPTO_atomic_load(&qp->users, &count, lock->rw_lock);
} while (READER_COUNT(count) != 0);
qp = update_qp(lock, &curr_id);

/* retire in order */
ossl_crypto_mutex_lock(lock->prior_lock);
while (lock->next_to_retire != ID_VAL(count))
while (lock->next_to_retire != curr_id)
ossl_crypto_condvar_wait(lock->prior_signal, lock->prior_lock);

lock->next_to_retire++;
ossl_crypto_condvar_broadcast(lock->prior_signal);
ossl_crypto_mutex_unlock(lock->prior_lock);

/* wait for the reader count to reach zero */
do {
CRYPTO_atomic_load(&qp->users, &count, lock->rw_lock);
} while (count != (uint64_t)0);

retire_qp(lock, qp);

/* handle any callbacks that we have */
Expand Down

0 comments on commit 5949918

Please sign in to comment.