Skip to content

Commit

Permalink
rocksdb: SessionState.moveTo API (#609)
Browse files Browse the repository at this point in the history
* add initial moveTo API

* snapshot makes full tree clone

* add test for snapshots

* parent state may be swapped during ready

* session has method for transferring

* only check compat if block is not local

* add snapshot move to checks

* retrying requests on session move

* transfer all replicating io when moving sessions

* drafts are writable

* session overwrite should emit events

* add option for overwrite to update dependency

* do not create snapshot over existing snapshot

* ref session state in constructor

* fix non writable named sessions

* emit migrate event on moveTo

---------

Co-authored-by: Mathias Buus <[email protected]>
  • Loading branch information
chm-diederichs and mafintosh authored Dec 13, 2024
1 parent 236e584 commit 8905a0e
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 54 deletions.
79 changes: 57 additions & 22 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const {
ASSERTION,
BAD_ARGUMENT,
SESSION_CLOSED,
SESSION_MOVED,
SESSION_NOT_WRITABLE,
SNAPSHOT_NOT_AVAILABLE,
DECODING_ERROR
Expand Down Expand Up @@ -248,8 +249,11 @@ class Hypercore extends EventEmitter {
if (!this.keyPair) this.keyPair = parent.keyPair
this.writable = this._isWritable()

if (parent.state) {
this.state = this.draft ? parent.state.memoryOverlay() : this.snapshotted ? parent.state.snapshot() : parent.state.ref()
const s = parent.state

if (s) {
const shouldSnapshot = this.snapshotted && !s.isSnapshot()
this.state = this.draft ? s.memoryOverlay() : shouldSnapshot ? s.snapshot() : s.ref()
}

if (this.snapshotted && this.core && !this._snapshot) this._updateSnapshot()
Expand Down Expand Up @@ -329,7 +333,7 @@ class Hypercore extends EventEmitter {
}

if (opts.parent) {
if (opts.parent.state === null) await opts.parent.ready()
if (opts.parent._stateIndex === -1) await opts.parent.ready()
this._setupSession(opts.parent)
}

Expand Down Expand Up @@ -564,6 +568,28 @@ class Hypercore extends EventEmitter {
return p
}

transferSession (core) {
// todo: validate we can move

if (this.weak === false) {
this.core.activeSessions--
core.activeSessions++
}

if (this._monitorIndex >= 0) {
this.core.removeMonitor(this)
core.addMonitor(this)
}

const old = this.core

this.core = core

old.replicator.clearRequests(this.activeRequests, SESSION_MOVED())

this.emit('migrate', this.key)
}

createTreeBatch () {
return this.state.tree.batch()
}
Expand Down Expand Up @@ -609,7 +635,12 @@ class Hypercore extends EventEmitter {
const activeRequests = (opts && opts.activeRequests) || this.activeRequests
const req = this.core.replicator.addUpgrade(activeRequests)

upgraded = await req.promise
try {
upgraded = await req.promise
} catch (err) {
if (isSessionMoved(err)) return this.update(opts)
throw err
}
}

if (!upgraded) return false
Expand All @@ -636,7 +667,12 @@ class Hypercore extends EventEmitter {
const timeout = opts && opts.timeout !== undefined ? opts.timeout : this.timeout
if (timeout) req.context.setTimeout(req, timeout)

return req.promise
try {
return await req.promise
} catch (err) {
if (isSessionMoved(err)) return this.seek(bytes, opts)
throw err
}
}

async has (start, end = start + 1) {
Expand Down Expand Up @@ -758,9 +794,16 @@ class Hypercore extends EventEmitter {
const timeout = opts && opts.timeout !== undefined ? opts.timeout : this.timeout
if (timeout) req.context.setTimeout(req, timeout)

const replicatedBlock = await req.promise
if (this._snapshot !== null) checkSnapshot(this, index)
let replicatedBlock = null

try {
replicatedBlock = await req.promise
} catch (err) {
if (isSessionMoved(err)) return this._get(index, opts)
throw err
}

if (this._snapshot !== null) checkSnapshot(this, index)
return maybeUnslab(replicatedBlock)
}

Expand Down Expand Up @@ -790,19 +833,7 @@ class Hypercore extends EventEmitter {
}

download (range) {
const req = this._download(range)

// do not crash in the background...
req.catch(safetyCatch)

return new Download(req)
}

async _download (range) {
if (this.opened === false) await this.opening

const activeRequests = (range && range.activeRequests) || this.activeRequests
return this.core.replicator.addRange(activeRequests, range)
return new Download(this, range)
}

// TODO: get rid of this / deprecate it?
Expand Down Expand Up @@ -841,9 +872,9 @@ class Hypercore extends EventEmitter {
const defaultKeyPair = this.state.name === null ? this.keyPair : null

const { keyPair = defaultKeyPair, signature = null } = opts
const writable = !this._readonly && !!(signature || (keyPair && keyPair.secretKey))
const writable = !!this.draft || !isDefault || !!signature || !!(keyPair && keyPair.secretKey)

if (isDefault && writable === false) throw SESSION_NOT_WRITABLE()
if (this._readonly || writable === false) throw SESSION_NOT_WRITABLE()

blocks = Array.isArray(blocks) ? blocks : [blocks]

Expand Down Expand Up @@ -1031,3 +1062,7 @@ function maybeAddMonitor (name) {
this.core.addMonitor(this)
}
}

function isSessionMoved (err) {
return err.code === 'SESSION_MOVED'
}
4 changes: 1 addition & 3 deletions lib/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,6 @@ module.exports = class Core {
this.verifier = verifier
this.state = new SessionState(this, storage, this.blocks, tree, null)

this.state.ref()

if (this.key === null) this.key = this.header.key
if (this.discoveryKey === null) this.discoveryKey = crypto.discoveryKey(this.key)
if (this.id === null) this.id = z32.encode(this.key)
Expand Down Expand Up @@ -402,7 +400,7 @@ module.exports = class Core {
signature = this.verifier.sign(state.tree.batch(), keyPair)
}

const tree = await this.state._overwrite(state, length, treeLength, signature)
const tree = await this.state._overwrite(state, length, treeLength, signature, false)

// gc blocks from source
if (treeLength < length) {
Expand Down
46 changes: 42 additions & 4 deletions lib/download.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,39 @@
module.exports = class Download {
constructor (req) {
this.req = req
constructor (session, range) {
this.session = session
this.range = range
this.request = null
this.opened = false
this.opening = this._open()
this.opening.catch(noop)
}

ready () {
return this.opening
}

async _open () {
if (this.session.opened === false) await this.session.opening
this._download()
this.opened = true
}

async done () {
return (await this.req).promise
await this.ready()

try {
return await this.request.promise
} catch (err) {
if (isSessionMoved(err)) return this._download()
throw err
}
}

_download () {
const activeRequests = (this.range && this.range.activeRequests) || this.session.activeRequests
this.request = this.session.core.replicator.addRange(activeRequests, this.range)
this.request.promise.catch(noop)
return this.request.promise
}

/**
Expand All @@ -15,8 +44,17 @@ module.exports = class Download {
}

destroy () {
this.req.then(req => req.context && req.context.detach(req), noop)
this._destroyBackground().catch(noop)
}

async _destroyBackground () {
if (this.opened === false) await this.ready()
if (this.request.context) this.request.context.detach(this.request)
}
}

function noop () {}

function isSessionMoved (err) {
return err.code === 'SESSION_MOVED'
}
8 changes: 0 additions & 8 deletions lib/merkle-tree.js
Original file line number Diff line number Diff line change
Expand Up @@ -516,14 +516,6 @@ module.exports = class MerkleTree {
return new MerkleTreeBatch(this)
}

snapshot () {
return {
length: this.length,
byteLength: this.byteLength,
fork: this.fork
}
}

async restoreBatch (length) {
const batch = new MerkleTreeBatch(this)
if (length === this.length) return batch
Expand Down
52 changes: 35 additions & 17 deletions lib/session-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ module.exports = class SessionState {
this._onflush = null
this._flushing = null
this._activeBatch = null

this.ref()
}

isSnapshot () {
Expand Down Expand Up @@ -90,19 +92,7 @@ module.exports = class SessionState {
this.core,
this.storage.snapshot(),
this.blocks,
this.tree.snapshot(),
this.name
)

return s
}

snapshotFrom (snap) {
const s = new SessionState(
this.core,
snap.storage,
this.blocks,
snap.tree,
this.tree.clone(),
this.name
)

Expand Down Expand Up @@ -304,8 +294,6 @@ module.exports = class SessionState {
await storeBitfieldRange(this.storage, storage, batch.ancestors, batch.treeLength, false)
}

if (dependency) storage.setDataDependency(dependency)

return { dependency, tree, treeUpdate }
}

Expand Down Expand Up @@ -413,7 +401,7 @@ module.exports = class SessionState {
}
}

async _overwrite (source, length, treeLength, signature) {
async _overwrite (source, length, treeLength, signature, isDependent) {
const blockPromises = []
const treePromises = []
const rootPromises = []
Expand Down Expand Up @@ -500,8 +488,12 @@ module.exports = class SessionState {
this.tree.setRoots(roots, signature)
}

const dependency = isDependent ? updateDependency(this, length) : null

await this.flushWriteBatch(writer)

if (dependency) this.storage.updateDependencies(dependency.length)

return tree
}

Expand All @@ -513,7 +505,7 @@ module.exports = class SessionState {
try {
const origLength = this.tree.length

const tree = await this._overwrite(state, length, treeLength, null)
const tree = await this._overwrite(state, length, treeLength, null, state === this.core.state)

const bitfield = { start: treeLength, length: tree.length - treeLength, drop: false }

Expand Down Expand Up @@ -545,6 +537,32 @@ module.exports = class SessionState {
return head
}

async moveTo (core) {
const state = core.state

if (state.storage && (await state.storage.openBatch(this.name)) !== null) {
throw STORAGE_CONFLICT('Batch has already been created')
}

const head = this.core.sessionStates.pop()
if (head !== this) this.core.sessionStates[(head.index = this.index)] = head

this.core = core
this.index = this.core.sessionStates.push(this) - 1

if (!this.isSnapshot()) {
const treeInfo = await state._getTreeHeadAt(this.tree.length)
const prologue = state.tree.prologue

// todo: validate treeInfo

this.storage = await state.storage.registerBatch(this.name, treeInfo)
this.tree = await MerkleTree.open(this.storage, treeInfo.length, { prologue })
}

for (const s of this.sessions) s.transferSession(this.core)
}

async createSession (name, length, overwrite, draft) {
let storage = null
let treeInfo = null
Expand Down
Loading

0 comments on commit 8905a0e

Please sign in to comment.