Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rocksdb: SessionState.moveTo API #609

Merged
merged 27 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1a2d9db
add initial moveTo API
chm-diederichs Nov 29, 2024
a15fd47
move moveTo to session method
chm-diederichs Nov 29, 2024
276e516
snapshot makes full tree clone
chm-diederichs Nov 29, 2024
d84c255
add test for snapshots
chm-diederichs Nov 29, 2024
5174927
parent state may be swapped during ready
chm-diederichs Nov 29, 2024
34521c3
no need for snapshotFrom
chm-diederichs Nov 29, 2024
651a7ca
session has method for transferring
chm-diederichs Nov 29, 2024
2a29b81
only check compat if block is not local
chm-diederichs Nov 29, 2024
3d96fa7
add snapshot move to checks
chm-diederichs Nov 29, 2024
fc4f6ca
revert change
chm-diederichs Nov 29, 2024
03719ee
add explicit flag for when session is open
chm-diederichs Dec 2, 2024
2a4df14
wip retrying requests on session move
mafintosh Dec 3, 2024
0d7f23f
transfer all replicating io when moving sessions
mafintosh Dec 3, 2024
3956f33
missing await
mafintosh Dec 3, 2024
3a86b61
fix missing ready in test
mafintosh Dec 3, 2024
c957b5e
use state index instead of flag
chm-diederichs Dec 4, 2024
14968f0
missing close session
chm-diederichs Dec 4, 2024
f19c966
inherit more from parent (#612)
mafintosh Dec 4, 2024
be54b69
rocksdb: minor tweaks (#613)
chm-diederichs Dec 5, 2024
993b106
copyPrologue may not have all nodes
chm-diederichs Dec 12, 2024
b1896c7
add option for overwrite to update dependency
chm-diederichs Dec 12, 2024
0ca6ecb
do not create snapshot over existing snapshot
chm-diederichs Dec 12, 2024
fef32b3
ref session state in constructor
chm-diederichs Dec 12, 2024
277a48d
fix non writable named sessions
chm-diederichs Dec 13, 2024
b841039
emit migrate event on moveTo
chm-diederichs Dec 13, 2024
eb34ea5
Merge branch 'rocksdb' into rocksdb-move-to
chm-diederichs Dec 13, 2024
f27fea9
copyPrologue should always have node
chm-diederichs Dec 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 57 additions & 22 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const {
ASSERTION,
BAD_ARGUMENT,
SESSION_CLOSED,
SESSION_MOVED,
SESSION_NOT_WRITABLE,
SNAPSHOT_NOT_AVAILABLE,
DECODING_ERROR
Expand Down Expand Up @@ -248,8 +249,11 @@ class Hypercore extends EventEmitter {
if (!this.keyPair) this.keyPair = parent.keyPair
this.writable = this._isWritable()

if (parent.state) {
this.state = this.draft ? parent.state.memoryOverlay() : this.snapshotted ? parent.state.snapshot() : parent.state.ref()
const s = parent.state

if (s) {
const shouldSnapshot = this.snapshotted && !s.isSnapshot()
this.state = this.draft ? s.memoryOverlay() : shouldSnapshot ? s.snapshot() : s.ref()
}

if (this.snapshotted && this.core && !this._snapshot) this._updateSnapshot()
Expand Down Expand Up @@ -329,7 +333,7 @@ class Hypercore extends EventEmitter {
}

if (opts.parent) {
if (opts.parent.state === null) await opts.parent.ready()
if (opts.parent._stateIndex === -1) await opts.parent.ready()
this._setupSession(opts.parent)
}

Expand Down Expand Up @@ -564,6 +568,28 @@ class Hypercore extends EventEmitter {
return p
}

transferSession (core) {
// todo: validate we can move

if (this.weak === false) {
this.core.activeSessions--
core.activeSessions++
}

if (this._monitorIndex >= 0) {
this.core.removeMonitor(this)
core.addMonitor(this)
}

const old = this.core

this.core = core

old.replicator.clearRequests(this.activeRequests, SESSION_MOVED())

this.emit('migrate', this.key)
}

createTreeBatch () {
return this.state.tree.batch()
}
Expand Down Expand Up @@ -609,7 +635,12 @@ class Hypercore extends EventEmitter {
const activeRequests = (opts && opts.activeRequests) || this.activeRequests
const req = this.core.replicator.addUpgrade(activeRequests)

upgraded = await req.promise
try {
upgraded = await req.promise
} catch (err) {
if (isSessionMoved(err)) return this.update(opts)
throw err
}
}

if (!upgraded) return false
Expand All @@ -636,7 +667,12 @@ class Hypercore extends EventEmitter {
const timeout = opts && opts.timeout !== undefined ? opts.timeout : this.timeout
if (timeout) req.context.setTimeout(req, timeout)

return req.promise
try {
return await req.promise
} catch (err) {
if (isSessionMoved(err)) return this.seek(bytes, opts)
throw err
}
}

async has (start, end = start + 1) {
Expand Down Expand Up @@ -758,9 +794,16 @@ class Hypercore extends EventEmitter {
const timeout = opts && opts.timeout !== undefined ? opts.timeout : this.timeout
if (timeout) req.context.setTimeout(req, timeout)

const replicatedBlock = await req.promise
if (this._snapshot !== null) checkSnapshot(this, index)
let replicatedBlock = null

try {
replicatedBlock = await req.promise
} catch (err) {
if (isSessionMoved(err)) return this._get(index, opts)
throw err
}

if (this._snapshot !== null) checkSnapshot(this, index)
return maybeUnslab(replicatedBlock)
}

Expand Down Expand Up @@ -790,19 +833,7 @@ class Hypercore extends EventEmitter {
}

download (range) {
const req = this._download(range)

// do not crash in the background...
req.catch(safetyCatch)

return new Download(req)
}

async _download (range) {
if (this.opened === false) await this.opening

const activeRequests = (range && range.activeRequests) || this.activeRequests
return this.core.replicator.addRange(activeRequests, range)
return new Download(this, range)
}

// TODO: get rid of this / deprecate it?
Expand Down Expand Up @@ -841,9 +872,9 @@ class Hypercore extends EventEmitter {
const defaultKeyPair = this.state.name === null ? this.keyPair : null

const { keyPair = defaultKeyPair, signature = null } = opts
const writable = !this._readonly && !!(signature || (keyPair && keyPair.secretKey))
const writable = !!this.draft || !isDefault || !!signature || !!(keyPair && keyPair.secretKey)

if (isDefault && writable === false) throw SESSION_NOT_WRITABLE()
if (this._readonly || writable === false) throw SESSION_NOT_WRITABLE()

blocks = Array.isArray(blocks) ? blocks : [blocks]

Expand Down Expand Up @@ -1031,3 +1062,7 @@ function maybeAddMonitor (name) {
this.core.addMonitor(this)
}
}

function isSessionMoved (err) {
return err.code === 'SESSION_MOVED'
}
4 changes: 1 addition & 3 deletions lib/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,6 @@ module.exports = class Core {
this.verifier = verifier
this.state = new SessionState(this, storage, this.blocks, tree, null)

this.state.ref()

if (this.key === null) this.key = this.header.key
if (this.discoveryKey === null) this.discoveryKey = crypto.discoveryKey(this.key)
if (this.id === null) this.id = z32.encode(this.key)
Expand Down Expand Up @@ -402,7 +400,7 @@ module.exports = class Core {
signature = this.verifier.sign(state.tree.batch(), keyPair)
}

const tree = await this.state._overwrite(state, length, treeLength, signature)
const tree = await this.state._overwrite(state, length, treeLength, signature, false)

// gc blocks from source
if (treeLength < length) {
Expand Down
46 changes: 42 additions & 4 deletions lib/download.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,39 @@
module.exports = class Download {
constructor (req) {
this.req = req
constructor (session, range) {
this.session = session
this.range = range
this.request = null
this.opened = false
this.opening = this._open()
this.opening.catch(noop)
}

ready () {
return this.opening
}

async _open () {
if (this.session.opened === false) await this.session.opening
this._download()
this.opened = true
}

async done () {
return (await this.req).promise
await this.ready()

try {
return await this.request.promise
} catch (err) {
if (isSessionMoved(err)) return this._download()
throw err
}
}

_download () {
const activeRequests = (this.range && this.range.activeRequests) || this.session.activeRequests
this.request = this.session.core.replicator.addRange(activeRequests, this.range)
this.request.promise.catch(noop)
return this.request.promise
}

/**
Expand All @@ -15,8 +44,17 @@ module.exports = class Download {
}

destroy () {
this.req.then(req => req.context && req.context.detach(req), noop)
this._destroyBackground().catch(noop)
}

async _destroyBackground () {
if (this.opened === false) await this.ready()
if (this.request.context) this.request.context.detach(this.request)
}
}

function noop () {}

function isSessionMoved (err) {
return err.code === 'SESSION_MOVED'
}
8 changes: 0 additions & 8 deletions lib/merkle-tree.js
Original file line number Diff line number Diff line change
Expand Up @@ -516,14 +516,6 @@ module.exports = class MerkleTree {
return new MerkleTreeBatch(this)
}

snapshot () {
return {
length: this.length,
byteLength: this.byteLength,
fork: this.fork
}
}

async restoreBatch (length) {
const batch = new MerkleTreeBatch(this)
if (length === this.length) return batch
Expand Down
52 changes: 35 additions & 17 deletions lib/session-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ module.exports = class SessionState {
this._onflush = null
this._flushing = null
this._activeBatch = null

this.ref()
}

isSnapshot () {
Expand Down Expand Up @@ -90,19 +92,7 @@ module.exports = class SessionState {
this.core,
this.storage.snapshot(),
this.blocks,
this.tree.snapshot(),
this.name
)

return s
}

snapshotFrom (snap) {
const s = new SessionState(
this.core,
snap.storage,
this.blocks,
snap.tree,
this.tree.clone(),
this.name
)

Expand Down Expand Up @@ -304,8 +294,6 @@ module.exports = class SessionState {
await storeBitfieldRange(this.storage, storage, batch.ancestors, batch.treeLength, false)
}

if (dependency) storage.setDataDependency(dependency)

return { dependency, tree, treeUpdate }
}

Expand Down Expand Up @@ -413,7 +401,7 @@ module.exports = class SessionState {
}
}

async _overwrite (source, length, treeLength, signature) {
async _overwrite (source, length, treeLength, signature, isDependent) {
const blockPromises = []
const treePromises = []
const rootPromises = []
Expand Down Expand Up @@ -500,8 +488,12 @@ module.exports = class SessionState {
this.tree.setRoots(roots, signature)
}

const dependency = isDependent ? updateDependency(this, length) : null

await this.flushWriteBatch(writer)

if (dependency) this.storage.updateDependencies(dependency.length)

return tree
}

Expand All @@ -513,7 +505,7 @@ module.exports = class SessionState {
try {
const origLength = this.tree.length

const tree = await this._overwrite(state, length, treeLength, null)
const tree = await this._overwrite(state, length, treeLength, null, state === this.core.state)

const bitfield = { start: treeLength, length: tree.length - treeLength, drop: false }

Expand Down Expand Up @@ -545,6 +537,32 @@ module.exports = class SessionState {
return head
}

async moveTo (core) {
const state = core.state

if (state.storage && (await state.storage.openBatch(this.name)) !== null) {
throw STORAGE_CONFLICT('Batch has already been created')
}

const head = this.core.sessionStates.pop()
if (head !== this) this.core.sessionStates[(head.index = this.index)] = head

this.core = core
this.index = this.core.sessionStates.push(this) - 1

if (!this.isSnapshot()) {
const treeInfo = await state._getTreeHeadAt(this.tree.length)
const prologue = state.tree.prologue

// todo: validate treeInfo

this.storage = await state.storage.registerBatch(this.name, treeInfo)
this.tree = await MerkleTree.open(this.storage, treeInfo.length, { prologue })
}

for (const s of this.sessions) s.transferSession(this.core)
}

async createSession (name, length, overwrite, draft) {
let storage = null
let treeInfo = null
Expand Down
Loading
Loading