Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RWLockMap #10

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@
Provides `Lock` and `RWLock` (read write lock) synchronization primitives for
protecting in-memory state across multiple tasks and/or microtasks.

This is a wholesale fork of [`@rocicorp/lock`](https://github.com/rocicorp/lock) but adds `RWLockMap` functionality.

# Installation

```
npm install @rocicorp/lock
npm install @ccorcos/lock
```

# Usage

`Lock` is a mutex that can be used to synchronize access to a shared resource.

```ts
import {Lock} from '@rocicorp/lock';
import {Lock} from '@ccorcos/lock';

const lock = new Lock();

Expand All @@ -35,7 +37,7 @@ void f(2);
`RWLock` is a read write lock. There can be mutlipe readers at the same time but only one writer at the same time.

```js
import {RWLock} from '@rocicorp/lock';
import {RWLock} from '@ccorcos/lock';

const rwLock = new RWLock();

Expand Down Expand Up @@ -73,3 +75,19 @@ const release = await lock.lock();
// do something
release();
```

`RWLockMap` will dynamically create and clean up locks keyed by a string.

```js
import {RWLockMap} from '@ccorcos/lock';

const lockMap = new RWLockMap();
const release1 = await lockMap.read("node1");
const release2 = await lockMap.write("node2");
const v3 = await lockMap.withRead("node3", async () => {
return 3;
});
const v4 = await lockMap.withWrite("node4", async () => {
return 4;
});
```
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"name": "@rocicorp/lock",
"name": "@ccorcos/lock",
"description": "Implements Lock and RWLock synchronization primitives.",
"version": "1.0.3",
"repository": "github:rocicorp/lock",
"repository": "github:ccorcos/lock",
"license": "Apache-2.0",
"engines": {
"node": "^12.20.0 || ^14.13.1 || >=16.0.0"
Expand Down
82 changes: 81 additions & 1 deletion src/lock.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {expect} from 'chai';
import {SinonFakeTimers, useFakeTimers} from 'sinon';
import {RWLock} from './lock.js';
import {RWLock, RWLockMap} from './lock.js';

/**
* Creates a promise that resolves after [[ms]] milliseconds. Note that if you
Expand Down Expand Up @@ -50,7 +50,9 @@ test('Multiple reads', async () => {
release();
})();

expect(lock.unlocked).to.equal(false);
const [v1, v2, v3] = await Promise.all([r1, r2, r3]);
expect(lock.unlocked).to.equal(true);
expect(v1).to.equal(1);
expect(v2).to.equal(2);
expect(v3).to.equal(undefined);
Expand Down Expand Up @@ -83,9 +85,11 @@ test('Multiple reads with sleep', async () => {
release();
})();

expect(lock.unlocked).to.equal(false);
await clock.runAllAsync();

const [v1, v2, v3] = await Promise.all([r1, r2, r3]);
expect(lock.unlocked).to.equal(true);
expect(v1).to.equal(1);
expect(v2).to.equal(2);
expect(v3).to.equal(undefined);
Expand Down Expand Up @@ -122,9 +126,11 @@ test('Multiple write', async () => {
release();
})();

expect(lock.unlocked).to.equal(false);
await clock.runAllAsync();

const [v1, v2, v3] = await Promise.all([w1, w2, w3]);
expect(lock.unlocked).to.equal(true);
expect(v1).to.equal(1);
expect(v2).to.equal(2);
expect(v3).to.equal(undefined);
Expand Down Expand Up @@ -161,9 +167,11 @@ test('Write then read', async () => {
release();
})();

expect(lock.unlocked).to.equal(false);
await clock.runAllAsync();

const [v1, v2, v3] = await Promise.all([w1, r2, r3]);
expect(lock.unlocked).to.equal(true);
expect(v1).to.equal(1);
expect(v2).to.equal(2);
expect(v3).to.equal(undefined);
Expand Down Expand Up @@ -208,9 +216,11 @@ test('Reads then writes', async () => {
return 4;
})();

expect(lock.unlocked).to.equal(false);
await clock.runAllAsync();

const [v1, v2, v3, v4] = await Promise.all([r1, r2, w3, w4]);
expect(lock.unlocked).to.equal(true);
expect(v1).to.equal(1);
expect(v2).to.equal(2);
expect(v3).to.equal(undefined);
Expand Down Expand Up @@ -257,9 +267,11 @@ test('Reads then writes (withRead)', async () => {
return 4;
});

expect(lock.unlocked).to.equal(false);
await clock.runAllAsync();

const [v1, v2, v3, v4] = await Promise.all([r1, r2, w3, w4]);
expect(lock.unlocked).to.equal(true);
expect(v1).to.equal(1);
expect(v2).to.equal(2);
expect(v3).to.equal(undefined);
Expand All @@ -276,3 +288,71 @@ test('Reads then writes (withRead)', async () => {
'w4b',
]);
});

test('RWLockMap will cleanup locks.', async () => {
const locks = new RWLockMap();

function runTest(key: string) {
// Same a test("Reads then writes (withRead)")
const log: string[] = [];
const r1: Promise<number> = locks.withRead(key, async () => {
log.push('r1a');
await sleep(8);
log.push('r1b');
return 1;
});
const r2: Promise<number> = locks.withRead(key, async () => {
log.push('r2a');
await sleep(6);
log.push('r2b');
return 2;
});
const w3: Promise<number> = locks.withWrite(key, async () => {
log.push('w3a');
await sleep(4);
log.push('w3b');
return -1;
});
const w4: Promise<number> = locks.withWrite(key, async () => {
log.push('w4a');
await sleep(2);
log.push('w4b');
return 4;
});

const result = Promise.all([r1, r2, w3, w4]);
return {log, result};
}

async function assertTest(args: {log: string[]; result: Promise<number[]>}) {
const [v1, v2, v3, v4] = await args.result;
expect(v1).to.equal(1);
expect(v2).to.equal(2);
expect(v3).to.equal(-1);
expect(v4).to.equal(4);

expect(args.log).to.deep.equal([
'r1a',
'r2a',
'r2b',
'r1b',
'w3a',
'w3b',
'w4a',
'w4b',
]);
}

const test1 = runTest('key1');
const test2 = runTest('key2');

expect(locks['_locks'].get('key1')?.unlocked).to.equal(false);
expect(locks['_locks'].get('key2')?.unlocked).to.equal(false);
await clock.runAllAsync();

await assertTest(test1);
await assertTest(test2);

expect(locks['_locks'].get('key1')).to.equal(undefined);
expect(locks['_locks'].get('key2')).to.equal(undefined);
});
79 changes: 73 additions & 6 deletions src/lock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,35 @@ export class Lock {
const {promise, resolve} = resolver();
this._lockP = promise;
await previous;
return resolve;
return () => {
if (this._lockP === promise) this._lockP = null;
resolve();
};
}

withLock<R>(f: () => R | Promise<R>): Promise<R> {
return run(this.lock(), f);
}

get unlocked() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to call this locked or isLocked.

return this._lockP === null;
}
}

export class RWLock {
private _lock = new Lock();
private _writeP: Promise<void> | null = null;
private _readP: Promise<void>[] = [];
private _readP: Set<Promise<void>> = new Set();

read(): Promise<() => void> {
return this._lock.withLock(async () => {
await this._writeP;
const {promise, resolve} = resolver();
this._readP.push(promise);
return resolve;
this._readP.add(promise);
return () => {
this._readP.delete(promise);
resolve();
};
});
}

Expand All @@ -40,14 +50,23 @@ export class RWLock {
await Promise.all(this._readP);
const {promise, resolve} = resolver();
this._writeP = promise;
this._readP = [];
return resolve;
this._readP.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think a Set is needed here. You can also do length = 0 on arrays.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A Set will make this._readP.delete(promise); more efficient.

return () => {
if (this._writeP === promise) this._writeP = null;
resolve();
};
});
}

withWrite<R>(f: () => R | Promise<R>): Promise<R> {
return run(this.write(), f);
}

get unlocked() {
return (
this._lock.unlocked && this._writeP === null && this._readP.size === 0
);
}
}

async function run<R>(
Expand All @@ -61,3 +80,51 @@ async function run<R>(
release();
}
}

export class RWLockMap {
private _locks = new Map<string, RWLock>();

private getLock(key: string) {
const lock = this._locks.get(key);
if (lock) return lock;
const newLock = new RWLock();
this._locks.set(key, newLock);
return newLock;
}

private deleteLock(key: string) {
this._locks.delete(key);
}

async read(key: string): Promise<() => void> {
const lock = this.getLock(key);
const release = await lock.read();
return () => {
release();
if (lock.unlocked) this.deleteLock(key);
};
}

async withRead<R>(key: string, f: () => R | Promise<R>): Promise<R> {
const lock = this.getLock(key);
const result = await lock.withRead(f);
if (lock.unlocked) this.deleteLock(key);
return result;
}

async write(key: string): Promise<() => void> {
const lock = this.getLock(key);
const release = await lock.write();
return () => {
release();
if (lock.unlocked) this.deleteLock(key);
};
}

async withWrite<R>(key: string, f: () => R | Promise<R>): Promise<R> {
const lock = this.getLock(key);
const result = await lock.withWrite(f);
if (lock.unlocked) this.deleteLock(key);
return result;
}
}