diff --git a/WalletKitCore/WalletKitCoreTests/test/bitcoin/testBwm.c b/WalletKitCore/WalletKitCoreTests/test/bitcoin/testBwm.c index 23eb55043..b83e81e43 100644 --- a/WalletKitCore/WalletKitCoreTests/test/bitcoin/testBwm.c +++ b/WalletKitCore/WalletKitCoreTests/test/bitcoin/testBwm.c @@ -365,6 +365,7 @@ static void * _testBRWalletManagerConnectThread (void *context) { BRRunTestWalletManagerSyncThreadState *state = (BRRunTestWalletManagerSyncThreadState *) context; while (!state->kill) { + nanosleep (&(struct timespec){0, 50000000}, NULL); BRWalletManagerConnect (state->manager); } return NULL; @@ -374,6 +375,7 @@ static void * _testBRWalletManagerDisconnectThread (void *context) { BRRunTestWalletManagerSyncThreadState *state = (BRRunTestWalletManagerSyncThreadState *) context; while (!state->kill) { + nanosleep (&(struct timespec){0, 50000000}, NULL); BRWalletManagerDisconnect (state->manager); } return NULL; @@ -383,15 +385,27 @@ static void * _testBRWalletManagerScanThread (void *context) { BRRunTestWalletManagerSyncThreadState *state = (BRRunTestWalletManagerSyncThreadState *) context; while (!state->kill) { + nanosleep (&(struct timespec){0, 50000000}, NULL); BRWalletManagerScan (state->manager); } return NULL; } +static void * +_testBRWalletManagerNetworkThread (void *context) { + BRRunTestWalletManagerSyncThreadState *state = (BRRunTestWalletManagerSyncThreadState *) context; + while (!state->kill) { + nanosleep (&(struct timespec){0, 50000000}, NULL); + BRWalletManagerSetNetworkReachable (state->manager, rand() % 2); + } + return NULL; +} + static void * _testBRWalletManagerSwapThread (void *context) { BRRunTestWalletManagerSyncThreadState *state = (BRRunTestWalletManagerSyncThreadState *) context; while (!state->kill) { + nanosleep (&(struct timespec){0, 50000000}, NULL); switch (BRWalletManagerGetMode (state->manager)) { case CRYPTO_SYNC_MODE_API_ONLY: BRWalletManagerSetMode (state->manager, CRYPTO_SYNC_MODE_P2P_ONLY); @@ -943,11 +957,7 @@ BRRunTestWalletManagerSyncForMode (const char *testName, } printf("Testing BRWalletManager threading...\n"); - if (mode == CRYPTO_SYNC_MODE_P2P_ONLY) { - // TODO(fix): There is a thread-related issue in BRPeerManager/BRPeer where we have a use after free; re-enable once that is fixed - fprintf(stderr, "***WARNING*** %s:%d: BRWalletManager threading test is disabled for CRYPTO_SYNC_MODE_P2P_ONLY\n", testName, __LINE__); - - } else { + { // Test setup BRRunTestWalletManagerSyncState state = {0}; BRRunTestWalletManagerSyncTestSetup (&state, blockHeight, 1); @@ -956,22 +966,24 @@ BRRunTestWalletManagerSyncForMode (const char *testName, BRWalletManagerStart (manager); BRRunTestWalletManagerSyncThreadState threadState = {0, manager}; - pthread_t connectThread = (pthread_t) NULL, disconnectThread = (pthread_t) NULL, scanThread = (pthread_t) NULL; + pthread_t connectThread = (pthread_t) NULL, disconnectThread = (pthread_t) NULL, scanThread = (pthread_t) NULL, networkThread = (pthread_t) NULL; success = (0 == pthread_create (&connectThread, NULL, _testBRWalletManagerConnectThread, (void*) &threadState) && 0 == pthread_create (&disconnectThread, NULL, _testBRWalletManagerDisconnectThread, (void*) &threadState) && - 0 == pthread_create (&scanThread, NULL, _testBRWalletManagerScanThread, (void*) &threadState)); + 0 == pthread_create (&scanThread, NULL, _testBRWalletManagerScanThread, (void*) &threadState) && + 0 == pthread_create (&networkThread, NULL, _testBRWalletManagerNetworkThread, (void*) &threadState)); if (!success) { fprintf(stderr, "***FAILED*** %s:%d: pthread_creates failed\n", testName, __LINE__); return success; } - sleep (5); + sleep (10); threadState.kill = 1; success = (0 == pthread_join (connectThread, NULL) && 0 == pthread_join (disconnectThread, NULL) && - 0 == pthread_join (scanThread, NULL)); + 0 == pthread_join (scanThread, NULL) && + 0 == pthread_join (networkThread, NULL)); if (!success) { fprintf(stderr, "***FAILED*** %s:%d: pthread_joins failed\n", testName, __LINE__); return success; @@ -1139,11 +1151,7 @@ BRRunTestWalletManagerSyncAllModes (const char *testName, } printf("Testing BRWalletManager mode swap threading...\n"); - if (primaryMode == CRYPTO_SYNC_MODE_P2P_ONLY || secondaryMode == CRYPTO_SYNC_MODE_P2P_ONLY) { - // TODO(fix): There is a thread-related issue in BRPeerManager/BRPeer where we have a use after free; re-enable once that is fixed - fprintf(stderr, "***WARNING*** %s:%d: BRWalletManager mode swap threading test is disabled\n", testName, __LINE__); - - } else { + { // Test setup BRRunTestWalletManagerSyncState state = {0}; BRRunTestWalletManagerSyncTestSetup (&state, blockHeight, 1); @@ -1152,23 +1160,26 @@ BRRunTestWalletManagerSyncAllModes (const char *testName, BRWalletManagerStart (manager); BRRunTestWalletManagerSyncThreadState threadState = {0, manager}; - pthread_t connectThread = (pthread_t) NULL, disconnectThread = (pthread_t) NULL, scanThread = (pthread_t) NULL, swapThread = (pthread_t) NULL; + pthread_t connectThread = (pthread_t) NULL, disconnectThread = (pthread_t) NULL, scanThread = (pthread_t) NULL; + pthread_t networkThread = (pthread_t) NULL, swapThread = (pthread_t) NULL; success = (0 == pthread_create (&connectThread, NULL, _testBRWalletManagerConnectThread, (void*) &threadState) && 0 == pthread_create (&disconnectThread, NULL, _testBRWalletManagerDisconnectThread, (void*) &threadState) && 0 == pthread_create (&scanThread, NULL, _testBRWalletManagerScanThread, (void*) &threadState) && + 0 == pthread_create (&networkThread, NULL, _testBRWalletManagerNetworkThread, (void*) &threadState) && 0 == pthread_create (&swapThread, NULL, _testBRWalletManagerSwapThread, (void*) &threadState)); if (!success) { fprintf(stderr, "***FAILED*** %s:%d: pthread_creates failed\n", testName, __LINE__); return success; } - sleep (5); + sleep (10); threadState.kill = 1; success = (0 == pthread_join (connectThread, NULL) && 0 == pthread_join (disconnectThread, NULL) && 0 == pthread_join (scanThread, NULL) && + 0 == pthread_join (networkThread, NULL) && 0 == pthread_join (swapThread, NULL)); if (!success) { fprintf(stderr, "***FAILED*** %s:%d: pthread_joins failed\n", testName, __LINE__); diff --git a/WalletKitCore/src/bitcoin/BRPeer.c b/WalletKitCore/src/bitcoin/BRPeer.c index 85a129e97..9d7b22e52 100644 --- a/WalletKitCore/src/bitcoin/BRPeer.c +++ b/WalletKitCore/src/bitcoin/BRPeer.c @@ -100,7 +100,6 @@ typedef struct { uint32_t magicNumber; char host[INET6_ADDRSTRLEN]; BRPeerStatus status; - int waitingForNetwork; volatile int needsFilterUpdate; uint64_t nonce, feePerKb; char *useragent; @@ -986,7 +985,7 @@ static double _peerGetMempoolTime (BRPeerContext *ctx) { } -static void *_peerThreadRoutine(void *arg) +static void *_peerThreadConnectRoutine(void *arg) { BRPeer *peer = arg; BRPeerContext *ctx = arg; @@ -1115,6 +1114,22 @@ static void *_peerThreadRoutine(void *arg) return NULL; // detached threads don't need to return a value } +static void *_peerThreadDisconnectRoutine(void *arg) { + BRPeer *peer = arg; + BRPeerContext *ctx = arg; + + pthread_cleanup_push(ctx->threadCleanup, ctx->info); + + peer_log(peer, "waiting-disconnected"); + + assert (0 == array_count(ctx->pongCallback)); + assert (NULL == ctx->mempoolCallback); + + if (ctx->disconnected) ctx->disconnected(ctx->info, 0); + pthread_cleanup_pop(1); + return NULL; // detached threads don't need to return a value +} + static void _dummyThreadCleanup(void *info) { } @@ -1227,35 +1242,31 @@ void BRPeerConnect(BRPeer *peer) pthread_attr_t attr; pthread_mutex_lock(&ctx->lock); - if (ctx->status == BRPeerStatusDisconnected || ctx->waitingForNetwork) { - ctx->status = BRPeerStatusConnecting; - + if (ctx->status == BRPeerStatusDisconnected || ctx->status == BRPeerStatusWaiting) { if (ctx->networkIsReachable && ! ctx->networkIsReachable(ctx->info)) { // delay until network is reachable - if (! ctx->waitingForNetwork) peer_log(peer, "waiting for network reachability"); - ctx->waitingForNetwork = 1; + if (ctx->status != BRPeerStatusWaiting) peer_log(peer, "waiting for network reachability"); + ctx->status = BRPeerStatusWaiting; } else { peer_log(peer, "connecting"); - ctx->waitingForNetwork = 0; + ctx->status = BRPeerStatusConnecting; gettimeofday(&tv, NULL); // No race - set before the thread starts. ctx->disconnectTime = tv.tv_sec + (double)tv.tv_usec/1000000 + CONNECT_TIMEOUT; if (pthread_attr_init(&attr) != 0) { - // error = ENOMEM; - peer_log(peer, "error creating thread"); + peer_log(peer, "error creating connect thread"); ctx->status = BRPeerStatusDisconnected; - //if (ctx->disconnected) ctx->disconnected(ctx->info, error); + assert (0); } else if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 || pthread_attr_setstacksize(&attr, PTHREAD_STACK_SIZE) != 0 || - pthread_create(&ctx->thread, &attr, _peerThreadRoutine, peer) != 0) { - // error = EAGAIN; - peer_log(peer, "error creating thread"); + pthread_create(&ctx->thread, &attr, _peerThreadConnectRoutine, peer) != 0) { + peer_log(peer, "error creating connect thread"); pthread_attr_destroy(&attr); ctx->status = BRPeerStatusDisconnected; - //if (ctx->disconnected) ctx->disconnected(ctx->info, error); + assert (0); } } } @@ -1267,6 +1278,7 @@ void BRPeerDisconnect(BRPeer *peer) { BRPeerContext *ctx = (BRPeerContext *)peer; int socket = -1; + pthread_attr_t attr; if (_peerCheckAndGetSocket(ctx, &socket)) { pthread_mutex_lock(&ctx->lock); @@ -1274,8 +1286,28 @@ void BRPeerDisconnect(BRPeer *peer) pthread_mutex_unlock(&ctx->lock); if (shutdown(socket, SHUT_RDWR) < 0) peer_log(peer, "%s", strerror(errno)); - close(socket); } + + pthread_mutex_lock(&ctx->lock); + if (ctx->status == BRPeerStatusWaiting) { + peer_log(peer, "disconnecting"); + ctx->status = BRPeerStatusDisconnected; + + if (pthread_attr_init(&attr) != 0) { + peer_log(peer, "error creating disconnect thread"); + assert (0); + } + else if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 || + pthread_attr_setstacksize(&attr, PTHREAD_STACK_SIZE) != 0 || + pthread_create(&ctx->thread, &attr, _peerThreadDisconnectRoutine, peer) != 0) { + peer_log(peer, "error creating disconnect thread"); + pthread_attr_destroy(&attr); + assert (0); + } + + ctx->status = BRPeerStatusDisconnected; + } + pthread_mutex_unlock(&ctx->lock); } // call this to (re)schedule a disconnect in the given number of seconds, or < 0 to cancel (useful for sync timeout) diff --git a/WalletKitCore/src/bitcoin/BRPeer.h b/WalletKitCore/src/bitcoin/BRPeer.h index 0603d7f24..da5d2d26b 100644 --- a/WalletKitCore/src/bitcoin/BRPeer.h +++ b/WalletKitCore/src/bitcoin/BRPeer.h @@ -93,7 +93,8 @@ extern "C" { typedef enum { BRPeerStatusDisconnected = 0, BRPeerStatusConnecting, - BRPeerStatusConnected + BRPeerStatusConnected, + BRPeerStatusWaiting } BRPeerStatus; typedef struct { diff --git a/WalletKitCore/src/bitcoin/BRPeerManager.c b/WalletKitCore/src/bitcoin/BRPeerManager.c index d744e5b61..a0c66d50f 100644 --- a/WalletKitCore/src/bitcoin/BRPeerManager.c +++ b/WalletKitCore/src/bitcoin/BRPeerManager.c @@ -839,7 +839,7 @@ static void _peerDisconnected(void *info, int error) if (manager->connectFailureCount > MAX_CONNECT_FAILURES) manager->connectFailureCount = MAX_CONNECT_FAILURES; } - if (! manager->isConnected && manager->connectFailureCount == MAX_CONNECT_FAILURES) { + if (! manager->isConnected && manager->connectFailureCount >= MAX_CONNECT_FAILURES) { _BRPeerManagerSyncStopped(manager); // clear out stored peers so we get a fresh list from DNS on next connect attempt @@ -1498,6 +1498,7 @@ static void _peerThreadCleanup(void *info) free(info); pthread_mutex_lock(&manager->lock); + assert (0 != manager->peerThreadCount); manager->peerThreadCount--; pthread_mutex_unlock(&manager->lock); if (manager->threadCleanup) manager->threadCleanup(manager->info); @@ -1665,7 +1666,7 @@ void BRPeerManagerConnect(BRPeerManager *manager) for (size_t i = array_count(manager->connectedPeers); i > 0; i--) { BRPeer *p = manager->connectedPeers[i - 1]; - if (BRPeerConnectStatus(p) == BRPeerStatusConnecting) BRPeerConnect(p); + if (BRPeerConnectStatus(p) == BRPeerStatusWaiting) BRPeerConnect(p); } if (array_count(manager->connectedPeers) < manager->maxConnectCount) { @@ -1707,13 +1708,6 @@ void BRPeerManagerConnect(BRPeerManager *manager) _peerSetFeePerKb, _peerRequestedTx, _peerNetworkIsReachable, _peerThreadCleanup); BRPeerSetEarliestKeyTime(info->peer, manager->earliestKeyTime); BRPeerConnect(info->peer); - - if (BRPeerConnectStatus(info->peer) == BRPeerStatusDisconnected) { - pthread_mutex_unlock(&manager->lock); - _peerDisconnected(info, ENOTCONN); - pthread_mutex_lock(&manager->lock); - manager->peerThreadCount--; - } } } @@ -1745,7 +1739,7 @@ void BRPeerManagerDisconnect(BRPeerManager *manager) p = manager->connectedPeers[i - 1]; manager->connectFailureCount = MAX_CONNECT_FAILURES; // prevent futher automatic reconnect attempts BRPeerDisconnect(p); - if (BRPeerConnectStatus(p) == BRPeerStatusConnecting) manager->peerThreadCount--; // waiting for network + while (BRPeerConnectStatus(p) == BRPeerStatusConnecting) BRPeerDisconnect(p); } peerThreadCount = manager->peerThreadCount;