Skip to content

Commit

Permalink
replace our implementation of a mutex with async-mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
jdeepee committed Sep 15, 2023
1 parent 13b019f commit 3c9f101
Showing 1 changed file with 13 additions and 80 deletions.
93 changes: 13 additions & 80 deletions bootstrap-languages/p-diff-sync/linksAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { LinkSyncAdapter, PerspectiveDiffObserver, HolochainLanguageDelegate, LanguageContext, PerspectiveDiff,
LinkExpression, DID, Perspective, PerspectiveState } from "https://esm.sh/@perspect3vism/[email protected]";;
import type { SyncStateChangeObserver } from "https://esm.sh/@perspect3vism/[email protected]";;
LinkExpression, DID, Perspective, PerspectiveState } from "https://esm.sh/@perspect3vism/[email protected]";
import type { SyncStateChangeObserver } from "https://esm.sh/@perspect3vism/[email protected]";
import { Mutex, withTimeout } from "https://esm.sh/[email protected]";
import { DNA_NICK, ZOME_NAME } from "./build/dna.js";

class PeerInfo {
Expand All @@ -15,7 +16,7 @@ export class LinkAdapter implements LinkSyncAdapter {
linkCallback?: PerspectiveDiffObserver
syncStateChangeCallback?: SyncStateChangeObserver
peers: Map<DID, PeerInfo> = new Map();
generalMutex: Mutex = new Mutex();
generalMutex: Mutex = withTimeout(new Mutex(), 10000, new Error('new fancy error'));;
me: DID
gossipLogCount: number = 0;
myCurrentRevision: Buffer | null = null;
Expand Down Expand Up @@ -46,17 +47,10 @@ export class LinkAdapter implements LinkSyncAdapter {
}

async sync(): Promise<PerspectiveDiff> {
//console.log("PerspectiveDiffSync.sync(); Getting lock");
const release = await this.generalMutex.acquire();
//console.log("PerspectiveDiffSync.sync(); Got lock");
try {
//console.log("PerspectiveDiffSync.sync(); Getting lock");

const success = await this.generalMutex.lock();
if (!success) {
console.log("Failed to get lock due to timeout");
return new PerspectiveDiff()
}

//console.log("PerspectiveDiffSync.sync(); Got lock");

//@ts-ignore
let current_revision = await this.hcDna.call(DNA_NICK, ZOME_NAME, "sync", null);
if (current_revision && Buffer.isBuffer(current_revision)) {
Expand All @@ -65,7 +59,7 @@ export class LinkAdapter implements LinkSyncAdapter {
} catch (e) {
console.error("PerspectiveDiffSync.sync(); got error", e);
} finally {
this.generalMutex.unlock();
release();
}
await this.gossip();
return new PerspectiveDiff()
Expand All @@ -75,18 +69,8 @@ export class LinkAdapter implements LinkSyncAdapter {
this.gossipLogCount += 1;
let lostPeers: DID[] = [];

const release = await this.generalMutex.acquire();
try {
//console.log("PerspectiveDiffSync.gossip(); Getting peers lock");
// Trying to lock with a timeout
const success = await this.generalMutex.lock();

if (!success) {
console.log("Failed to get lock due to timeout");
return;
}

//console.log("PerspectiveDiffSync.gossip(); Got lock");

this.peers.forEach( (peerInfo, peer) => {
if (peerInfo.lastSeen.getTime() + 10000 < new Date().getTime()) {
lostPeers.push(peer);
Expand Down Expand Up @@ -138,8 +122,6 @@ export class LinkAdapter implements LinkSyncAdapter {
} else {
await callback(PerspectiveState.Synced);
};
} else if (differentRevisions == 0) {
await callback(PerspectiveState.Synced);
}
}

Expand Down Expand Up @@ -193,7 +175,7 @@ export class LinkAdapter implements LinkSyncAdapter {
} catch (e) {
console.error("PerspectiveDiffSync.gossip(); got error", e);
} finally {
this.generalMutex.unlock();
release();
}
}

Expand All @@ -204,15 +186,9 @@ export class LinkAdapter implements LinkSyncAdapter {
}

async commit(diff: PerspectiveDiff): Promise<string> {
//console.log("PerspectiveDiffSync.commit(); Getting lock");
const release = await this.generalMutex.acquire();
try {
//console.log("PerspectiveDiffSync.commit(); Getting lock");
const success = await this.generalMutex.lock();

if (!success) {
console.log("Failed to get lock due to timeout");
return "";
}

//console.log("PerspectiveDiffSync.commit(); Got lock");
let prep_diff = {
additions: diff.additions.map((diff) => prepareLinkExpression(diff)),
Expand All @@ -226,7 +202,7 @@ export class LinkAdapter implements LinkSyncAdapter {
} catch (e) {
console.error("PerspectiveDiffSync.commit(); got error", e);
} finally {
this.generalMutex.unlock();
release();
}
}

Expand Down Expand Up @@ -308,46 +284,3 @@ function prepareLinkExpression(link: LinkExpression): object {
}
return data;
}


class Mutex {
private locked = false;
private waitingResolvers: ((success: boolean) => void)[] = [];

async lock(timeout = 10000): Promise<boolean> { // default timeout of 10 seconds
const promise = new Promise<boolean>((resolve) => {
if (this.locked) {
console.log("Was not able to get lock on mutex adding to waitingResolvers");
const timer = setTimeout(() => {
const index = this.waitingResolvers.indexOf(resolve);
if (index > -1) {
this.waitingResolvers.splice(index, 1);
resolve(false); // Timeout occurred
}
}, timeout);

this.waitingResolvers.push((success: boolean) => {
clearTimeout(timer);
resolve(success);
});
} else {
resolve(true);
}
});

const success = await promise;
if (success) this.locked = true;
return success;
}

unlock(): void {
if (!this.locked) return;
if (this.waitingResolvers.length > 0) {
console.log("Called unlock and got some waitingResolvers to finish");
const resolve = this.waitingResolvers.shift();
if (resolve) resolve(true); // Successfully acquired lock
} else {
this.locked = false;
}
}
}

0 comments on commit 3c9f101

Please sign in to comment.