diff --git a/index.js b/index.js index da168f01..3e9e24e5 100644 --- a/index.js +++ b/index.js @@ -20,6 +20,7 @@ const { ASSERTION, BAD_ARGUMENT, SESSION_CLOSED, + SESSION_MOVED, SESSION_NOT_WRITABLE, SNAPSHOT_NOT_AVAILABLE, DECODING_ERROR @@ -248,8 +249,11 @@ class Hypercore extends EventEmitter { if (!this.keyPair) this.keyPair = parent.keyPair this.writable = this._isWritable() - if (parent.state) { - this.state = this.draft ? parent.state.memoryOverlay() : this.snapshotted ? parent.state.snapshot() : parent.state.ref() + const s = parent.state + + if (s) { + const shouldSnapshot = this.snapshotted && !s.isSnapshot() + this.state = this.draft ? s.memoryOverlay() : shouldSnapshot ? s.snapshot() : s.ref() } if (this.snapshotted && this.core && !this._snapshot) this._updateSnapshot() @@ -329,7 +333,7 @@ class Hypercore extends EventEmitter { } if (opts.parent) { - if (opts.parent.state === null) await opts.parent.ready() + if (opts.parent._stateIndex === -1) await opts.parent.ready() this._setupSession(opts.parent) } @@ -564,6 +568,28 @@ class Hypercore extends EventEmitter { return p } + transferSession (core) { + // todo: validate we can move + + if (this.weak === false) { + this.core.activeSessions-- + core.activeSessions++ + } + + if (this._monitorIndex >= 0) { + this.core.removeMonitor(this) + core.addMonitor(this) + } + + const old = this.core + + this.core = core + + old.replicator.clearRequests(this.activeRequests, SESSION_MOVED()) + + this.emit('migrate', this.key) + } + createTreeBatch () { return this.state.tree.batch() } @@ -609,7 +635,12 @@ class Hypercore extends EventEmitter { const activeRequests = (opts && opts.activeRequests) || this.activeRequests const req = this.core.replicator.addUpgrade(activeRequests) - upgraded = await req.promise + try { + upgraded = await req.promise + } catch (err) { + if (isSessionMoved(err)) return this.update(opts) + throw err + } } if (!upgraded) return false @@ -636,7 +667,12 @@ class Hypercore extends EventEmitter { const timeout = opts && opts.timeout !== undefined ? opts.timeout : this.timeout if (timeout) req.context.setTimeout(req, timeout) - return req.promise + try { + return await req.promise + } catch (err) { + if (isSessionMoved(err)) return this.seek(bytes, opts) + throw err + } } async has (start, end = start + 1) { @@ -758,9 +794,16 @@ class Hypercore extends EventEmitter { const timeout = opts && opts.timeout !== undefined ? opts.timeout : this.timeout if (timeout) req.context.setTimeout(req, timeout) - const replicatedBlock = await req.promise - if (this._snapshot !== null) checkSnapshot(this, index) + let replicatedBlock = null + + try { + replicatedBlock = await req.promise + } catch (err) { + if (isSessionMoved(err)) return this._get(index, opts) + throw err + } + if (this._snapshot !== null) checkSnapshot(this, index) return maybeUnslab(replicatedBlock) } @@ -790,19 +833,7 @@ class Hypercore extends EventEmitter { } download (range) { - const req = this._download(range) - - // do not crash in the background... - req.catch(safetyCatch) - - return new Download(req) - } - - async _download (range) { - if (this.opened === false) await this.opening - - const activeRequests = (range && range.activeRequests) || this.activeRequests - return this.core.replicator.addRange(activeRequests, range) + return new Download(this, range) } // TODO: get rid of this / deprecate it? @@ -841,9 +872,9 @@ class Hypercore extends EventEmitter { const defaultKeyPair = this.state.name === null ? this.keyPair : null const { keyPair = defaultKeyPair, signature = null } = opts - const writable = !this._readonly && !!(signature || (keyPair && keyPair.secretKey)) + const writable = !!this.draft || !isDefault || !!signature || !!(keyPair && keyPair.secretKey) - if (isDefault && writable === false) throw SESSION_NOT_WRITABLE() + if (this._readonly || writable === false) throw SESSION_NOT_WRITABLE() blocks = Array.isArray(blocks) ? blocks : [blocks] @@ -1031,3 +1062,7 @@ function maybeAddMonitor (name) { this.core.addMonitor(this) } } + +function isSessionMoved (err) { + return err.code === 'SESSION_MOVED' +} diff --git a/lib/core.js b/lib/core.js index 67def666..08a0a030 100644 --- a/lib/core.js +++ b/lib/core.js @@ -268,8 +268,6 @@ module.exports = class Core { this.verifier = verifier this.state = new SessionState(this, storage, this.blocks, tree, null) - this.state.ref() - if (this.key === null) this.key = this.header.key if (this.discoveryKey === null) this.discoveryKey = crypto.discoveryKey(this.key) if (this.id === null) this.id = z32.encode(this.key) @@ -402,7 +400,7 @@ module.exports = class Core { signature = this.verifier.sign(state.tree.batch(), keyPair) } - const tree = await this.state._overwrite(state, length, treeLength, signature) + const tree = await this.state._overwrite(state, length, treeLength, signature, false) // gc blocks from source if (treeLength < length) { diff --git a/lib/download.js b/lib/download.js index 8aed76fd..6c88505f 100644 --- a/lib/download.js +++ b/lib/download.js @@ -1,10 +1,39 @@ module.exports = class Download { - constructor (req) { - this.req = req + constructor (session, range) { + this.session = session + this.range = range + this.request = null + this.opened = false + this.opening = this._open() + this.opening.catch(noop) + } + + ready () { + return this.opening + } + + async _open () { + if (this.session.opened === false) await this.session.opening + this._download() + this.opened = true } async done () { - return (await this.req).promise + await this.ready() + + try { + return await this.request.promise + } catch (err) { + if (isSessionMoved(err)) return this._download() + throw err + } + } + + _download () { + const activeRequests = (this.range && this.range.activeRequests) || this.session.activeRequests + this.request = this.session.core.replicator.addRange(activeRequests, this.range) + this.request.promise.catch(noop) + return this.request.promise } /** @@ -15,8 +44,17 @@ module.exports = class Download { } destroy () { - this.req.then(req => req.context && req.context.detach(req), noop) + this._destroyBackground().catch(noop) + } + + async _destroyBackground () { + if (this.opened === false) await this.ready() + if (this.request.context) this.request.context.detach(this.request) } } function noop () {} + +function isSessionMoved (err) { + return err.code === 'SESSION_MOVED' +} diff --git a/lib/merkle-tree.js b/lib/merkle-tree.js index a591339f..9728133a 100644 --- a/lib/merkle-tree.js +++ b/lib/merkle-tree.js @@ -516,14 +516,6 @@ module.exports = class MerkleTree { return new MerkleTreeBatch(this) } - snapshot () { - return { - length: this.length, - byteLength: this.byteLength, - fork: this.fork - } - } - async restoreBatch (length) { const batch = new MerkleTreeBatch(this) if (length === this.length) return batch diff --git a/lib/session-state.js b/lib/session-state.js index da92dbc3..35a56c79 100644 --- a/lib/session-state.js +++ b/lib/session-state.js @@ -31,6 +31,8 @@ module.exports = class SessionState { this._onflush = null this._flushing = null this._activeBatch = null + + this.ref() } isSnapshot () { @@ -90,19 +92,7 @@ module.exports = class SessionState { this.core, this.storage.snapshot(), this.blocks, - this.tree.snapshot(), - this.name - ) - - return s - } - - snapshotFrom (snap) { - const s = new SessionState( - this.core, - snap.storage, - this.blocks, - snap.tree, + this.tree.clone(), this.name ) @@ -304,8 +294,6 @@ module.exports = class SessionState { await storeBitfieldRange(this.storage, storage, batch.ancestors, batch.treeLength, false) } - if (dependency) storage.setDataDependency(dependency) - return { dependency, tree, treeUpdate } } @@ -413,7 +401,7 @@ module.exports = class SessionState { } } - async _overwrite (source, length, treeLength, signature) { + async _overwrite (source, length, treeLength, signature, isDependent) { const blockPromises = [] const treePromises = [] const rootPromises = [] @@ -500,8 +488,12 @@ module.exports = class SessionState { this.tree.setRoots(roots, signature) } + const dependency = isDependent ? updateDependency(this, length) : null + await this.flushWriteBatch(writer) + if (dependency) this.storage.updateDependencies(dependency.length) + return tree } @@ -513,7 +505,7 @@ module.exports = class SessionState { try { const origLength = this.tree.length - const tree = await this._overwrite(state, length, treeLength, null) + const tree = await this._overwrite(state, length, treeLength, null, state === this.core.state) const bitfield = { start: treeLength, length: tree.length - treeLength, drop: false } @@ -545,6 +537,32 @@ module.exports = class SessionState { return head } + async moveTo (core) { + const state = core.state + + if (state.storage && (await state.storage.openBatch(this.name)) !== null) { + throw STORAGE_CONFLICT('Batch has already been created') + } + + const head = this.core.sessionStates.pop() + if (head !== this) this.core.sessionStates[(head.index = this.index)] = head + + this.core = core + this.index = this.core.sessionStates.push(this) - 1 + + if (!this.isSnapshot()) { + const treeInfo = await state._getTreeHeadAt(this.tree.length) + const prologue = state.tree.prologue + + // todo: validate treeInfo + + this.storage = await state.storage.registerBatch(this.name, treeInfo) + this.tree = await MerkleTree.open(this.storage, treeInfo.length, { prologue }) + } + + for (const s of this.sessions) s.transferSession(this.core) + } + async createSession (name, length, overwrite, draft) { let storage = null let treeInfo = null diff --git a/test/move-to.js b/test/move-to.js new file mode 100644 index 00000000..2a96d411 --- /dev/null +++ b/test/move-to.js @@ -0,0 +1,106 @@ +const test = require('brittle') +const b4a = require('b4a') +const crypto = require('hypercore-crypto') +const { create } = require('./helpers') + +test('move - basic', async function (t) { + t.plan(9) + + const core = await create(t) + + const sess = core.session({ name: 'session' }) + + await sess.append('1') + await sess.append('2') + await sess.append('3') + + await core.core.commit(sess.state) + + t.is(core.length, 3) + t.is(sess.length, 3) + + const keyPair = crypto.keyPair() + + const manifest = { + prologue: { + length: core.length, + hash: core.state.tree.hash() + }, + signers: [{ + publicKey: keyPair.publicKey + }] + } + + const core2 = await create(t, { manifest, keyPair }) + await core2.core.copyPrologue(core.state) + + t.is(core2.length, 3) + + sess.once('migrate', key => { t.alike(key, core2.key) }) + + await sess.state.moveTo(core2.core) + await sess.append('4') + + await core2.core.commit(sess.state) + + t.alike(await sess.get(0), b4a.from('1')) + t.alike(await sess.get(1), b4a.from('2')) + t.alike(await sess.get(2), b4a.from('3')) + t.alike(await sess.get(3), b4a.from('4')) + + t.alike(await core2.get(3), b4a.from('4')) + + await core.close() + await core2.close() + await sess.close() +}) + +test('move - snapshots', async function (t) { + const core = await create(t) + + await core.append('hello') + await core.append('world') + await core.append('again') + + const sess = core.session({ name: 'snapshot' }) + + const snap = sess.snapshot() + await snap.ready() + + await sess.close() + await core.truncate(1) + + await core.append('break') + + t.is(snap.length, 3) + t.is(core.length, 2) + + const keyPair = crypto.keyPair() + + const manifest = { + prologue: { + length: core.length, + hash: core.state.tree.hash() + }, + signers: [{ + publicKey: keyPair.publicKey + }] + } + + const core2 = await create(t, { manifest, keyPair }) + await core2.core.copyPrologue(core.state) + + t.is(core2.length, 2) + + await snap.state.moveTo(core2.core) + + t.is(snap.length, 3) + + t.alike(await snap.get(0), b4a.from('hello')) + t.alike(await snap.get(1), b4a.from('world')) + t.alike(await snap.get(2), b4a.from('again')) + + await snap.close() + await core.close() + await core2.close() +}) diff --git a/test/snapshots.js b/test/snapshots.js index 05b239e8..c02e0243 100644 --- a/test/snapshots.js +++ b/test/snapshots.js @@ -158,7 +158,11 @@ test('snapshot over named batch persists after truncate', async function (t) { await core.append('block #2.0') const session = core.session({ name: 'session' }) + const snapshot = session.snapshot({ valueEncoding: 'utf-8' }) + await snapshot.ready() + + await session.close() t.is(snapshot.length, 3)