Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RWLockMap #11

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,19 @@ const release = await lock.lock();
// do something
release();
```

`RWLockMap` will dynamically create and clean up locks keyed by a string.

```js
import {RWLockMap} from '@rocicorp/lock';

const lockMap = new RWLockMap();
const release1 = await lockMap.read("node1");
const release2 = await lockMap.write("node2");
const v3 = await lockMap.withRead("node3", async () => {
return 3;
});
const v4 = await lockMap.withWrite("node4", async () => {
return 4;
});
```
81 changes: 80 additions & 1 deletion src/lock.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {expect} from 'chai';
import {SinonFakeTimers, useFakeTimers} from 'sinon';
import {RWLock} from './lock.js';
import {RWLock, RWLockMap} from './lock.js';

/**
* Creates a promise that resolves after [[ms]] milliseconds. Note that if you
Expand Down Expand Up @@ -50,7 +50,9 @@ test('Multiple reads', async () => {
release();
})();

expect(lock.locked).to.equal(true);
const [v1, v2, v3] = await Promise.all([r1, r2, r3]);
expect(lock.locked).to.equal(false);
expect(v1).to.equal(1);
expect(v2).to.equal(2);
expect(v3).to.equal(undefined);
Expand Down Expand Up @@ -83,9 +85,11 @@ test('Multiple reads with sleep', async () => {
release();
})();

expect(lock.locked).to.equal(true);
await clock.runAllAsync();

const [v1, v2, v3] = await Promise.all([r1, r2, r3]);
expect(lock.locked).to.equal(false);
expect(v1).to.equal(1);
expect(v2).to.equal(2);
expect(v3).to.equal(undefined);
Expand Down Expand Up @@ -122,9 +126,11 @@ test('Multiple write', async () => {
release();
})();

expect(lock.locked).to.equal(true);
await clock.runAllAsync();

const [v1, v2, v3] = await Promise.all([w1, w2, w3]);
expect(lock.locked).to.equal(false);
expect(v1).to.equal(1);
expect(v2).to.equal(2);
expect(v3).to.equal(undefined);
Expand Down Expand Up @@ -161,9 +167,11 @@ test('Write then read', async () => {
release();
})();

expect(lock.locked).to.equal(true);
await clock.runAllAsync();

const [v1, v2, v3] = await Promise.all([w1, r2, r3]);
expect(lock.locked).to.equal(false);
expect(v1).to.equal(1);
expect(v2).to.equal(2);
expect(v3).to.equal(undefined);
Expand Down Expand Up @@ -208,9 +216,11 @@ test('Reads then writes', async () => {
return 4;
})();

expect(lock.locked).to.equal(true);
await clock.runAllAsync();

const [v1, v2, v3, v4] = await Promise.all([r1, r2, w3, w4]);
expect(lock.locked).to.equal(false);
expect(v1).to.equal(1);
expect(v2).to.equal(2);
expect(v3).to.equal(undefined);
Expand Down Expand Up @@ -257,9 +267,11 @@ test('Reads then writes (withRead)', async () => {
return 4;
});

expect(lock.locked).to.equal(true);
await clock.runAllAsync();

const [v1, v2, v3, v4] = await Promise.all([r1, r2, w3, w4]);
expect(lock.locked).to.equal(false);
expect(v1).to.equal(1);
expect(v2).to.equal(2);
expect(v3).to.equal(undefined);
Expand All @@ -276,3 +288,70 @@ test('Reads then writes (withRead)', async () => {
'w4b',
]);
});

test('RWLockMap will cleanup locks.', async () => {
const locks = new RWLockMap();

function runTest(key: string) {
// Same a test("Reads then writes (withRead)")
const log: string[] = [];
const r1: Promise<number> = locks.withRead(key, async () => {
log.push('r1a');
await sleep(8);
log.push('r1b');
return 1;
});
const r2: Promise<number> = locks.withRead(key, async () => {
log.push('r2a');
await sleep(6);
log.push('r2b');
return 2;
});
const w3: Promise<void> = locks.withWrite(key, async () => {
log.push('w3a');
await sleep(4);
log.push('w3b');
});
const w4: Promise<number> = locks.withWrite(key, async () => {
log.push('w4a');
await sleep(2);
log.push('w4b');
return 4;
});

const result = Promise.all([r1, r2, w3, w4]);
return {log, result};
}

async function assertTest(args: {log: string[]; result: Promise<any[]>}) {
const [v1, v2, v3, v4] = await args.result;
expect(v1).to.equal(1);
expect(v2).to.equal(2);
expect(v3).to.equal(undefined);
expect(v4).to.equal(4);

expect(args.log).to.deep.equal([
'r1a',
'r2a',
'r2b',
'r1b',
'w3a',
'w3b',
'w4a',
'w4b',
]);
}

const test1 = runTest('key1');
const test2 = runTest('key2');

expect(locks['_locks'].get('key1')?.locked).to.equal(true);
expect(locks['_locks'].get('key2')?.locked).to.equal(true);
await clock.runAllAsync();

await assertTest(test1);
await assertTest(test2);

expect(locks['_locks'].get('key1')).to.equal(undefined);
expect(locks['_locks'].get('key2')).to.equal(undefined);
});
77 changes: 71 additions & 6 deletions src/lock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,35 @@ export class Lock {
const {promise, resolve} = resolver();
this._lockP = promise;
await previous;
return resolve;
return () => {
if (this._lockP === promise) this._lockP = null;
resolve();
};
}

withLock<R>(f: () => R | Promise<R>): Promise<R> {
return run(this.lock(), f);
}

get locked() {
return this._lockP !== null;
}
}

export class RWLock {
private _lock = new Lock();
private _writeP: Promise<void> | null = null;
private _readP: Promise<void>[] = [];
private _readP: Set<Promise<void>> = new Set();

read(): Promise<() => void> {
return this._lock.withLock(async () => {
await this._writeP;
const {promise, resolve} = resolver();
this._readP.push(promise);
return resolve;
this._readP.add(promise);
return () => {
this._readP.delete(promise);
resolve();
};
});
}

Expand All @@ -40,14 +50,21 @@ export class RWLock {
await Promise.all(this._readP);
const {promise, resolve} = resolver();
this._writeP = promise;
this._readP = [];
return resolve;
this._readP.clear();
return () => {
if (this._writeP === promise) this._writeP = null;
resolve();
};
});
}

withWrite<R>(f: () => R | Promise<R>): Promise<R> {
return run(this.write(), f);
}

get locked() {
return this._lock.locked || this._writeP !== null || this._readP.size > 0;
}
}

async function run<R>(
Expand All @@ -61,3 +78,51 @@ async function run<R>(
release();
}
}

export class RWLockMap {
private _locks = new Map<string, RWLock>();

private getLock(key: string) {
const lock = this._locks.get(key);
if (lock) return lock;
const newLock = new RWLock();
this._locks.set(key, newLock);
return newLock;
}

private deleteLock(key: string) {
this._locks.delete(key);
}

async read(key: string): Promise<() => void> {
const lock = this.getLock(key);
const release = await lock.read();
return () => {
release();
if (!lock.locked) this.deleteLock(key);
};
}

async withRead<R>(key: string, f: () => R | Promise<R>): Promise<R> {
const lock = this.getLock(key);
const result = await lock.withRead(f);
if (!lock.locked) this.deleteLock(key);
return result;
}

async write(key: string): Promise<() => void> {
const lock = this.getLock(key);
const release = await lock.write();
return () => {
release();
if (!lock.locked) this.deleteLock(key);
};
}

async withWrite<R>(key: string, f: () => R | Promise<R>): Promise<R> {
const lock = this.getLock(key);
const result = await lock.withWrite(f);
if (!lock.locked) this.deleteLock(key);
return result;
}
}