diff --git a/crypto/threads_pthread.c b/crypto/threads_pthread.c index 4e8df06207bb1..116603652a15b 100644 --- a/crypto/threads_pthread.c +++ b/crypto/threads_pthread.c @@ -270,25 +270,6 @@ static ossl_inline uint64_t fallback_atomic_or_fetch(uint64_t *p, uint64_t m) # define ATOMIC_OR_FETCH(p, v, o) fallback_atomic_or_fetch(p, v) # endif -/* - * users is broken up into 2 parts - * bits 0-15 current readers - * bit 32-63 ID - */ -# define READER_SHIFT 0 -# define ID_SHIFT 32 -/* TODO: READER_SIZE 32 in threads_win.c */ -# define READER_SIZE 16 -# define ID_SIZE 32 - -# define READER_MASK (((uint64_t)1 << READER_SIZE) - 1) -# define ID_MASK (((uint64_t)1 << ID_SIZE) - 1) -# define READER_COUNT(x) ((uint32_t)(((uint64_t)(x) >> READER_SHIFT) & \ - READER_MASK)) -# define ID_VAL(x) ((uint32_t)(((uint64_t)(x) >> ID_SHIFT) & ID_MASK)) -# define VAL_READER ((uint64_t)1 << READER_SHIFT) -# define VAL_ID(x) ((uint64_t)x << ID_SHIFT) - /* * This is the core of an rcu lock. It tracks the readers and writers for the * current quiescence point for a given lock. Users is the 64 bit value that @@ -388,29 +369,16 @@ static struct rcu_qp *get_hold_current_qp(struct rcu_lock_st *lock) */ qp_idx = ATOMIC_LOAD_N(uint32_t, &lock->reader_idx, __ATOMIC_ACQUIRE); - /* - * Notes of use of __ATOMIC_RELEASE - * This counter is only read by the write side of the lock, and so we - * specify __ATOMIC_RELEASE here to ensure that the write side of the - * lock see this during the spin loop read of users, as it waits for the - * reader count to approach zero - */ - ATOMIC_ADD_FETCH(&lock->qp_group[qp_idx].users, VAL_READER, - __ATOMIC_RELEASE); + ATOMIC_ADD_FETCH(&lock->qp_group[qp_idx].users, (uint64_t)1, + __ATOMIC_ACQUIRE); /* if the idx hasn't changed, we're good, else try again */ if (qp_idx == ATOMIC_LOAD_N(uint32_t, &lock->reader_idx, - __ATOMIC_ACQUIRE)) + __ATOMIC_RELAXED)) break; - /* - * Notes on use of __ATOMIC_RELEASE - * As with the add above, we want to ensure that this decrement is - * seen by the write side of the lock as soon as it happens to prevent - * undue spinning waiting for write side completion - */ - ATOMIC_SUB_FETCH(&lock->qp_group[qp_idx].users, VAL_READER, - __ATOMIC_RELEASE); + ATOMIC_SUB_FETCH(&lock->qp_group[qp_idx].users, (uint64_t)1, + __ATOMIC_RELAXED); } return &lock->qp_group[qp_idx]; @@ -477,14 +445,14 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock) for (i = 0; i < MAX_QPS; i++) { if (data->thread_qps[i].lock == lock) { /* - * As with read side acquisition, we use __ATOMIC_RELEASE here - * to ensure that the decrement is published immediately - * to any write side waiters + * we have to use __ATOMIC_RELEASE here + * to ensure that all preceding read instructions complete + * before the decrement is visible to ossl_synchronize_rcu */ data->thread_qps[i].depth--; if (data->thread_qps[i].depth == 0) { ret = ATOMIC_SUB_FETCH(&data->thread_qps[i].qp->users, - VAL_READER, __ATOMIC_RELEASE); + (uint64_t)1, __ATOMIC_RELEASE); OPENSSL_assert(ret != UINT64_MAX); data->thread_qps[i].qp = NULL; data->thread_qps[i].lock = NULL; @@ -503,9 +471,8 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock) * Write side allocation routine to get the current qp * and replace it with a new one */ -static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock) +static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock, uint32_t *curr_id) { - uint64_t new_id; uint32_t current_idx; pthread_mutex_lock(&lock->alloc_lock); @@ -528,29 +495,11 @@ static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock) lock->current_alloc_idx = (lock->current_alloc_idx + 1) % lock->group_count; - /* get and insert a new id */ - new_id = VAL_ID(lock->id_ctr); + *curr_id = lock->id_ctr; lock->id_ctr++; - /* - * Even though we are under a write side lock here - * We need to use atomic instructions to ensure that the results - * of this update are published to the read side prior to updating the - * reader idx below - */ - ATOMIC_AND_FETCH(&lock->qp_group[current_idx].users, ID_MASK, - __ATOMIC_RELEASE); - ATOMIC_OR_FETCH(&lock->qp_group[current_idx].users, new_id, - __ATOMIC_RELEASE); - - /* - * Update the reader index to be the prior qp. - * Note the use of __ATOMIC_RELEASE here is based on the corresponding use - * of __ATOMIC_ACQUIRE in get_hold_current_qp, as we want any publication - * of this value to be seen on the read side immediately after it happens - */ ATOMIC_STORE_N(uint32_t, &lock->reader_idx, lock->current_alloc_idx, - __ATOMIC_RELEASE); + __ATOMIC_RELAXED); /* wake up any waiters */ pthread_cond_signal(&lock->alloc_signal); @@ -594,6 +543,7 @@ void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock) { struct rcu_qp *qp; uint64_t count; + uint32_t curr_id; struct rcu_cb_item *cb_items, *tmpcb; pthread_mutex_lock(&lock->write_lock); @@ -601,25 +551,26 @@ void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock) lock->cb_items = NULL; pthread_mutex_unlock(&lock->write_lock); - qp = update_qp(lock); + qp = update_qp(lock, &curr_id); + + /* retire in order */ + pthread_mutex_lock(&lock->prior_lock); + while (lock->next_to_retire != curr_id) + pthread_cond_wait(&lock->prior_signal, &lock->prior_lock); + lock->next_to_retire++; + pthread_cond_broadcast(&lock->prior_signal); + pthread_mutex_unlock(&lock->prior_lock); /* * wait for the reader count to reach zero * Note the use of __ATOMIC_ACQUIRE here to ensure that any - * prior __ATOMIC_RELEASE write operation in get_hold_current_qp + * prior __ATOMIC_RELEASE write operation in ossl_rcu_read_unlock * is visible prior to our read + * however this is likely just necessary to silence a tsan warning */ do { count = ATOMIC_LOAD_N(uint64_t, &qp->users, __ATOMIC_ACQUIRE); - } while (READER_COUNT(count) != 0); - - /* retire in order */ - pthread_mutex_lock(&lock->prior_lock); - while (lock->next_to_retire != ID_VAL(count)) - pthread_cond_wait(&lock->prior_signal, &lock->prior_lock); - lock->next_to_retire++; - pthread_cond_broadcast(&lock->prior_signal); - pthread_mutex_unlock(&lock->prior_lock); + } while (count != (uint64_t)0); retire_qp(lock, qp); diff --git a/crypto/threads_win.c b/crypto/threads_win.c index 03a22fd2c92a6..258b7a59c7746 100644 --- a/crypto/threads_win.c +++ b/crypto/threads_win.c @@ -43,25 +43,6 @@ typedef struct { } CRYPTO_win_rwlock; # endif -/* - * users is broken up into 2 parts - * bits 0-31 current readers - * bit 32-63 ID - */ -# define READER_SHIFT 0 -# define ID_SHIFT 32 -/* TODO: READER_SIZE 16 in threads_pthread.c */ -# define READER_SIZE 32 -# define ID_SIZE 32 - -# define READER_MASK (((uint64_t)1 << READER_SIZE) - 1) -# define ID_MASK (((uint64_t)1 << ID_SIZE) - 1) -# define READER_COUNT(x) ((uint32_t)(((uint64_t)(x) >> READER_SHIFT) & \ - READER_MASK)) -# define ID_VAL(x) ((uint32_t)(((uint64_t)(x) >> ID_SHIFT) & ID_MASK)) -# define VAL_READER ((int64_t)1 << READER_SHIFT) -# define VAL_ID(x) ((uint64_t)x << ID_SHIFT) - /* * This defines a quescent point (qp) * This is the barrier beyond which a writer @@ -229,13 +210,13 @@ static ossl_inline struct rcu_qp *get_hold_current_qp(CRYPTO_RCU_LOCK *lock) for (;;) { CRYPTO_atomic_load_int((int *)&lock->reader_idx, (int *)&qp_idx, lock->rw_lock); - CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, VAL_READER, &tmp64, + CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, (uint64_t)1, &tmp64, lock->rw_lock); CRYPTO_atomic_load_int((int *)&lock->reader_idx, (int *)&tmp, lock->rw_lock); if (qp_idx == tmp) break; - CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, -VAL_READER, &tmp64, + CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, (uint64_t)-1, &tmp64, lock->rw_lock); } @@ -313,7 +294,7 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock) data->thread_qps[i].depth--; if (data->thread_qps[i].depth == 0) { CRYPTO_atomic_add64(&data->thread_qps[i].qp->users, - -VAL_READER, (uint64_t *)&ret, + (uint64_t)-1, (uint64_t *)&ret, lock->rw_lock); OPENSSL_assert(ret >= 0); data->thread_qps[i].qp = NULL; @@ -328,12 +309,10 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock) * Write side allocation routine to get the current qp * and replace it with a new one */ -static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock) +static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock, uint32_t *curr_id) { - uint64_t new_id; uint32_t current_idx; uint32_t tmp; - uint64_t tmp64; ossl_crypto_mutex_lock(lock->alloc_lock); /* @@ -355,20 +334,9 @@ static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock) (lock->current_alloc_idx + 1) % lock->group_count; /* get and insert a new id */ - new_id = VAL_ID(lock->id_ctr); + *curr_id = lock->id_ctr; lock->id_ctr++; - /* - * Even though we are under a write side lock here - * We need to use atomic instructions to ensure that the results - * of this update are published to the read side prior to updating the - * reader idx below - */ - CRYPTO_atomic_and(&lock->qp_group[current_idx].users, ID_MASK, &tmp64, - lock->rw_lock); - CRYPTO_atomic_add64(&lock->qp_group[current_idx].users, new_id, &tmp64, - lock->rw_lock); - /* update the reader index to be the prior qp */ tmp = lock->current_alloc_idx; InterlockedExchange((LONG volatile *)&lock->reader_idx, tmp); @@ -393,28 +361,29 @@ void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock) { struct rcu_qp *qp; uint64_t count; + uint32_t curr_id; struct rcu_cb_item *cb_items, *tmpcb; /* before we do anything else, lets grab the cb list */ cb_items = InterlockedExchangePointer((void * volatile *)&lock->cb_items, NULL); - qp = update_qp(lock); - - /* wait for the reader count to reach zero */ - do { - CRYPTO_atomic_load(&qp->users, &count, lock->rw_lock); - } while (READER_COUNT(count) != 0); + qp = update_qp(lock, &curr_id); /* retire in order */ ossl_crypto_mutex_lock(lock->prior_lock); - while (lock->next_to_retire != ID_VAL(count)) + while (lock->next_to_retire != curr_id) ossl_crypto_condvar_wait(lock->prior_signal, lock->prior_lock); lock->next_to_retire++; ossl_crypto_condvar_broadcast(lock->prior_signal); ossl_crypto_mutex_unlock(lock->prior_lock); + /* wait for the reader count to reach zero */ + do { + CRYPTO_atomic_load(&qp->users, &count, lock->rw_lock); + } while (count != (uint64_t)0); + retire_qp(lock, qp); /* handle any callbacks that we have */