From 3c9f101c1823f21b4dcefb18c7e933ef2ec83bf1 Mon Sep 17 00:00:00 2001 From: Joshua Parkin Date: Fri, 15 Sep 2023 12:51:54 +0100 Subject: [PATCH] replace our implementation of a mutex with async-mutex --- .../p-diff-sync/linksAdapter.ts | 93 +++---------------- 1 file changed, 13 insertions(+), 80 deletions(-) diff --git a/bootstrap-languages/p-diff-sync/linksAdapter.ts b/bootstrap-languages/p-diff-sync/linksAdapter.ts index 1d0741113..f4c3b88c6 100644 --- a/bootstrap-languages/p-diff-sync/linksAdapter.ts +++ b/bootstrap-languages/p-diff-sync/linksAdapter.ts @@ -1,6 +1,7 @@ import { LinkSyncAdapter, PerspectiveDiffObserver, HolochainLanguageDelegate, LanguageContext, PerspectiveDiff, - LinkExpression, DID, Perspective, PerspectiveState } from "https://esm.sh/@perspect3vism/ad4m@0.5.0";; -import type { SyncStateChangeObserver } from "https://esm.sh/@perspect3vism/ad4m@0.5.0";; + LinkExpression, DID, Perspective, PerspectiveState } from "https://esm.sh/@perspect3vism/ad4m@0.5.0"; +import type { SyncStateChangeObserver } from "https://esm.sh/@perspect3vism/ad4m@0.5.0"; +import { Mutex, withTimeout } from "https://esm.sh/async-mutex@0.4.0"; import { DNA_NICK, ZOME_NAME } from "./build/dna.js"; class PeerInfo { @@ -15,7 +16,7 @@ export class LinkAdapter implements LinkSyncAdapter { linkCallback?: PerspectiveDiffObserver syncStateChangeCallback?: SyncStateChangeObserver peers: Map = new Map(); - generalMutex: Mutex = new Mutex(); + generalMutex: Mutex = withTimeout(new Mutex(), 10000, new Error('new fancy error'));; me: DID gossipLogCount: number = 0; myCurrentRevision: Buffer | null = null; @@ -46,17 +47,10 @@ export class LinkAdapter implements LinkSyncAdapter { } async sync(): Promise { + //console.log("PerspectiveDiffSync.sync(); Getting lock"); + const release = await this.generalMutex.acquire(); + //console.log("PerspectiveDiffSync.sync(); Got lock"); try { - //console.log("PerspectiveDiffSync.sync(); Getting lock"); - - const success = await this.generalMutex.lock(); - if (!success) { - console.log("Failed to get lock due to timeout"); - return new PerspectiveDiff() - } - - //console.log("PerspectiveDiffSync.sync(); Got lock"); - //@ts-ignore let current_revision = await this.hcDna.call(DNA_NICK, ZOME_NAME, "sync", null); if (current_revision && Buffer.isBuffer(current_revision)) { @@ -65,7 +59,7 @@ export class LinkAdapter implements LinkSyncAdapter { } catch (e) { console.error("PerspectiveDiffSync.sync(); got error", e); } finally { - this.generalMutex.unlock(); + release(); } await this.gossip(); return new PerspectiveDiff() @@ -75,18 +69,8 @@ export class LinkAdapter implements LinkSyncAdapter { this.gossipLogCount += 1; let lostPeers: DID[] = []; + const release = await this.generalMutex.acquire(); try { - //console.log("PerspectiveDiffSync.gossip(); Getting peers lock"); - // Trying to lock with a timeout - const success = await this.generalMutex.lock(); - - if (!success) { - console.log("Failed to get lock due to timeout"); - return; - } - - //console.log("PerspectiveDiffSync.gossip(); Got lock"); - this.peers.forEach( (peerInfo, peer) => { if (peerInfo.lastSeen.getTime() + 10000 < new Date().getTime()) { lostPeers.push(peer); @@ -138,8 +122,6 @@ export class LinkAdapter implements LinkSyncAdapter { } else { await callback(PerspectiveState.Synced); }; - } else if (differentRevisions == 0) { - await callback(PerspectiveState.Synced); } } @@ -193,7 +175,7 @@ export class LinkAdapter implements LinkSyncAdapter { } catch (e) { console.error("PerspectiveDiffSync.gossip(); got error", e); } finally { - this.generalMutex.unlock(); + release(); } } @@ -204,15 +186,9 @@ export class LinkAdapter implements LinkSyncAdapter { } async commit(diff: PerspectiveDiff): Promise { + //console.log("PerspectiveDiffSync.commit(); Getting lock"); + const release = await this.generalMutex.acquire(); try { - //console.log("PerspectiveDiffSync.commit(); Getting lock"); - const success = await this.generalMutex.lock(); - - if (!success) { - console.log("Failed to get lock due to timeout"); - return ""; - } - //console.log("PerspectiveDiffSync.commit(); Got lock"); let prep_diff = { additions: diff.additions.map((diff) => prepareLinkExpression(diff)), @@ -226,7 +202,7 @@ export class LinkAdapter implements LinkSyncAdapter { } catch (e) { console.error("PerspectiveDiffSync.commit(); got error", e); } finally { - this.generalMutex.unlock(); + release(); } } @@ -308,46 +284,3 @@ function prepareLinkExpression(link: LinkExpression): object { } return data; } - - -class Mutex { - private locked = false; - private waitingResolvers: ((success: boolean) => void)[] = []; - - async lock(timeout = 10000): Promise { // default timeout of 10 seconds - const promise = new Promise((resolve) => { - if (this.locked) { - console.log("Was not able to get lock on mutex adding to waitingResolvers"); - const timer = setTimeout(() => { - const index = this.waitingResolvers.indexOf(resolve); - if (index > -1) { - this.waitingResolvers.splice(index, 1); - resolve(false); // Timeout occurred - } - }, timeout); - - this.waitingResolvers.push((success: boolean) => { - clearTimeout(timer); - resolve(success); - }); - } else { - resolve(true); - } - }); - - const success = await promise; - if (success) this.locked = true; - return success; - } - - unlock(): void { - if (!this.locked) return; - if (this.waitingResolvers.length > 0) { - console.log("Called unlock and got some waitingResolvers to finish"); - const resolve = this.waitingResolvers.shift(); - if (resolve) resolve(true); // Successfully acquired lock - } else { - this.locked = false; - } - } -} \ No newline at end of file