Skip to content

Commit

Permalink
fix: Incorrect ref count (#775)
Browse files Browse the repository at this point in the history
This is a back port of ae89eca
  • Loading branch information
arv authored Jan 11, 2022
1 parent 39ad0d4 commit a21c3d0
Show file tree
Hide file tree
Showing 3 changed files with 270 additions and 42 deletions.
190 changes: 189 additions & 1 deletion src/dag/write.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type * as kv from '../kv/mod';
import {Read} from './read';
import {Hash, hashOf, initHasher} from '../hash';
import type {Value} from '../kv/store';
import {Store} from './store';

setup(async () => {
await initHasher();
Expand Down Expand Up @@ -132,7 +133,8 @@ test('ref count invalid', async () => {
const w = new Write(kvw);
let err;
try {
await w.getRefCount(h);
await w.setHead('fakehead', h);
await w.commit();
} catch (e) {
err = e;
}
Expand Down Expand Up @@ -241,3 +243,189 @@ test('roundtrip', async () => {
await t('', [0], []);
await t('', {a: true}, []);
});

test('that changeRefCount does not write stale value with a dimamond pattern', async () => {
const kvStore = new MemStore();
const dagStore = new Store(kvStore);

// If we have a diamond structure we update the refcount for C twice.
//
// R
// / \
// A B
// \ /
// C

const c = await dagStore.withWrite(async dagWrite => {
const c = Chunk.new('c', []);
const a = Chunk.new('a', [c.hash]);
const b = Chunk.new('b', [c.hash]);
const r = Chunk.new('r', [a.hash, b.hash]);
await Promise.all([
dagWrite.setHead('test', r.hash),
dagWrite.putChunk(a),
dagWrite.putChunk(b),
dagWrite.putChunk(c),
dagWrite.putChunk(r),
]);
await dagWrite.commit();

return c;
});

await kvStore.withRead(async kvRead => {
expect(await kvRead.get(chunkRefCountKey(c.hash))).to.equal(2);
});

await dagStore.withWrite(async dagWrite => {
const e = Chunk.new('e', []);
await Promise.all([dagWrite.setHead('test', e.hash), dagWrite.putChunk(e)]);
await dagWrite.commit();
});

await kvStore.withRead(async kvRead => {
expect(await kvRead.has(chunkRefCountKey(c.hash))).to.equal(false);
});
});

test('that changeRefCount does not write stale value with a diamond pattern and a child', async () => {
const kvStore = new MemStore();
const dagStore = new Store(kvStore);

// If we have a diamond structure we update the refcount for C twice.
//
// R
// / \
// A B
// \ /
// C
// |
// D

const [a, d] = await dagStore.withWrite(async dagWrite => {
const d = Chunk.new('d', []);
const c = Chunk.new('c', [d.hash]);
const a = Chunk.new('a', [c.hash]);
const b = Chunk.new('b', [c.hash]);
const r = Chunk.new('r', [a.hash, b.hash]);
await Promise.all([
dagWrite.setHead('test', r.hash),
dagWrite.putChunk(a),
dagWrite.putChunk(b),
dagWrite.putChunk(c),
dagWrite.putChunk(r),
]);
await dagWrite.commit();

return [a, d];
});

await kvStore.withRead(async kvRead => {
expect(await kvRead.get(chunkRefCountKey(d.hash))).to.equal(1);
});

// A
// \
// C
// |
// D
await dagStore.withWrite(async dagWrite => {
await dagWrite.setHead('test', a.hash);
await dagWrite.commit();
});

await kvStore.withRead(async kvRead => {
expect(await kvRead.get(chunkRefCountKey(a.hash))).to.equal(1);
expect(await kvRead.get(chunkRefCountKey(d.hash))).to.equal(1);
});
});

test('that we changeRefCount does not write stale value with a 3 incoming refs', async () => {
const kvStore = new MemStore();
const dagStore = new Store(kvStore);

// If we have a diamond structure we update the refcount for D three times.
//
// R
// / | \
// A B C
// \ | /
// D

const d = await dagStore.withWrite(async dagWrite => {
const d = Chunk.new('d', []);
const a = Chunk.new('a', [d.hash]);
const b = Chunk.new('b', [d.hash]);
const c = Chunk.new('c', [d.hash]);
const r = Chunk.new('r', [a.hash, b.hash, c.hash]);
await Promise.all([
dagWrite.setHead('test', r.hash),
dagWrite.putChunk(a),
dagWrite.putChunk(b),
dagWrite.putChunk(c),
dagWrite.putChunk(d),
dagWrite.putChunk(r),
]);
await dagWrite.commit();

return d;
});

await kvStore.withRead(async kvRead => {
expect(await kvRead.get(chunkRefCountKey(d.hash))).to.equal(3);
});

await dagStore.withWrite(async dagWrite => {
const e = Chunk.new('e', []);
await Promise.all([dagWrite.setHead('test', e.hash), dagWrite.putChunk(e)]);
await dagWrite.commit();
});

await kvStore.withRead(async kvRead => {
expect(await kvRead.has(chunkRefCountKey(d.hash))).to.equal(false);
});
});

test('that we changeRefCount does not write stale value with a 3 incoming refs bypassing one level', async () => {
const kvStore = new MemStore();
const dagStore = new Store(kvStore);

// If we have a diamond structure we update the refcount for D three times.
//
// R
// / | \
// A B |
// \ | /
// D

const d = await dagStore.withWrite(async dagWrite => {
const d = Chunk.new('d', []);
const a = Chunk.new('a', [d.hash]);
const b = Chunk.new('b', [d.hash]);
const r = Chunk.new('r', [a.hash, b.hash, d.hash]);
await Promise.all([
dagWrite.setHead('test', r.hash),
dagWrite.putChunk(a),
dagWrite.putChunk(b),
dagWrite.putChunk(d),
dagWrite.putChunk(r),
]);
await dagWrite.commit();

return d;
});

await kvStore.withRead(async kvRead => {
expect(await kvRead.get(chunkRefCountKey(d.hash))).to.equal(3);
});

await dagStore.withWrite(async dagWrite => {
const e = Chunk.new('e', []);
await Promise.all([dagWrite.setHead('test', e.hash), dagWrite.putChunk(e)]);
await dagWrite.commit();
});

await kvStore.withRead(async kvRead => {
expect(await kvRead.has(chunkRefCountKey(d.hash))).to.equal(false);
});
});
120 changes: 80 additions & 40 deletions src/dag/write.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export class Write extends Read {
private readonly _kvw: kv.Write;
private readonly _newChunks = new Set<Hash>();
private readonly _changedHeads = new Map<string, HeadChange>();
private readonly _refCountLoadingPromises = new Map<Hash, Promise<void>>();

constructor(kvw: kv.Write) {
super(kvw);
Expand Down Expand Up @@ -73,73 +74,85 @@ export class Write extends Read {
}

async commit(): Promise<void> {
await this.collectGarbage();
await this._collectGarbage();
await this._kvw.commit();
}

async collectGarbage(): Promise<void> {
private async _collectGarbage(): Promise<void> {
// We increment all the ref counts before we do all the decrements. This
// is so that we do not remove an item that goes from 1 -> 0 -> 1
const newHeads: (Hash | undefined)[] = [];
const oldHeads: (Hash | undefined)[] = [];
const newHeads: Hash[] = [];
const oldHeads: Hash[] = [];
for (const changedHead of this._changedHeads.values()) {
oldHeads.push(changedHead.old);
newHeads.push(changedHead.new);
changedHead.old && oldHeads.push(changedHead.old);
changedHead.new && newHeads.push(changedHead.new);
}

const refCountCache: Map<Hash, number> = new Map();

for (const n of newHeads) {
if (n !== undefined) {
await this.changeRefCount(n, 1);
}
await this._changeRefCount(n, 1, refCountCache);
}

for (const o of oldHeads) {
if (o !== undefined) {
await this.changeRefCount(o, -1);
}
await this._changeRefCount(o, -1, refCountCache);
}

await this._applyGatheredRefCountChanges(refCountCache);

// Now we go through the mutated chunks to see if any of them are still orphaned.
const ps = [];
for (const hash of this._newChunks) {
const count = await this.getRefCount(hash);
await this._ensureRefCountLoaded(hash, refCountCache);
const count = refCountCache.get(hash);
if (count === 0) {
ps.push(this.removeAllRelatedKeys(hash, false));
ps.push(this._removeAllRelatedKeys(hash));
}
}
await Promise.all(ps);
}

async changeRefCount(hash: Hash, delta: number): Promise<void> {
const oldCount = await this.getRefCount(hash);
const newCount = oldCount + delta;

if ((oldCount === 0 && delta === 1) || (oldCount === 1 && delta === -1)) {
private async _changeRefCount(
hash: Hash,
delta: number,
refCountCache: Map<Hash, number>,
): Promise<void> {
// First make sure that we have the ref count in the cache. This is async
// because it might need to load the ref count from the store.
//
// Once we have loaded the ref count all the updates to it are sync to
// prevent race conditions.
await this._ensureRefCountLoaded(hash, refCountCache);

if (updateRefCount(hash, delta, refCountCache)) {
const meta = await this._kvw.get(chunkMetaKey(hash));
if (meta !== undefined) {
assertMeta(meta);
const ps = meta.map(ref => this.changeRefCount(ref, delta));
const ps = meta.map(ref =>
this._changeRefCount(ref, delta, refCountCache),
);
await Promise.all(ps);
}
}

if (newCount === 0) {
await this.removeAllRelatedKeys(hash, true);
} else {
await this.setRefCount(hash, newCount);
}
}

async setRefCount(hash: Hash, count: number): Promise<void> {
const refCountKey = chunkRefCountKey(hash);
if (count === 0) {
await this._kvw.del(refCountKey);
} else {
await this._kvw.put(refCountKey, count);
private _ensureRefCountLoaded(
hash: Hash,
refCountCache: Map<Hash, number>,
): Promise<void> {
// Only get the ref count once.
let p = this._refCountLoadingPromises.get(hash);
if (p === undefined) {
p = (async () => {
const value = await this._getRefCount(hash);
refCountCache.set(hash, value);
})();
this._refCountLoadingPromises.set(hash, p);
}
return p;
}

async getRefCount(hash: Hash): Promise<number> {
private async _getRefCount(hash: Hash): Promise<number> {
const value = await this._kvw.get(chunkRefCountKey(hash));
if (value === undefined) {
return 0;
Expand All @@ -153,26 +166,53 @@ export class Write extends Read {
return value;
}

async removeAllRelatedKeys(
hash: Hash,
updateMutatedChunks: boolean,
): Promise<void> {
private async _removeAllRelatedKeys(hash: Hash): Promise<void> {
await Promise.all([
this._kvw.del(chunkDataKey(hash)),
this._kvw.del(chunkMetaKey(hash)),
this._kvw.del(chunkRefCountKey(hash)),
]);

if (updateMutatedChunks) {
this._newChunks.delete(hash);
}
this._newChunks.delete(hash);
}

private async _applyGatheredRefCountChanges(
refCountCache: Map<Hash, number>,
): Promise<void> {
const ps: Promise<void>[] = [];
refCountCache.forEach((count, hash) => {
if (count === 0) {
ps.push(this._removeAllRelatedKeys(hash));
} else {
const refCountKey = chunkRefCountKey(hash);
ps.push(this._kvw.put(refCountKey, count));
}
});
await Promise.all(ps);
}

close(): void {
this._kvw.release();
}
}

/**
* Updates the ref count in the refCountCache.
*
* Returns true if the the node changed from reachable to unreachable and vice
* versa.
*/
function updateRefCount(
hash: Hash,
delta: number,
refCountCache: Map<Hash, number>,
): boolean {
const oldCount = refCountCache.get(hash);
assertNumber(oldCount);
refCountCache.set(hash, oldCount + delta);
return (oldCount === 0 && delta === 1) || (oldCount === 1 && delta === -1);
}

export function toLittleEndian(count: number): Uint8Array {
if (count < 0 || count > 0xffff) {
throw new Error('Ref count out of range');
Expand Down
Loading

0 comments on commit a21c3d0

Please sign in to comment.