Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow clearing an uncommited batch #476

Merged
merged 1 commit into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -813,8 +813,8 @@ module.exports = class Hypercore extends EventEmitter {
return true
}

batch ({ checkout = -1, autoClose = true, session = true, restore = false } = {}) {
return new Batch(session ? this.session() : this, checkout, autoClose, restore)
batch ({ checkout = -1, autoClose = true, session = true, restore = false, clear = false } = {}) {
return new Batch(session ? this.session() : this, checkout, autoClose, restore, clear)
}

async seek (bytes, opts) {
Expand Down
18 changes: 12 additions & 6 deletions lib/batch.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const c = require('compact-encoding')
const b4a = require('b4a')

module.exports = class HypercoreBatch extends EventEmitter {
constructor (session, checkoutLength, autoClose, restore) {
constructor (session, checkoutLength, autoClose, restore, clear) {
super()

this.session = session
Expand All @@ -25,8 +25,10 @@ module.exports = class HypercoreBatch extends EventEmitter {
this._sessionByteLength = 0
this._sessionBatch = null
this._flushing = null
this._clear = clear

this.opening = this.ready().catch(noop)
this.opening = this._open()
this.opening.catch(noop)
}

get id () {
Expand Down Expand Up @@ -69,23 +71,27 @@ module.exports = class HypercoreBatch extends EventEmitter {
return this.session.manifest
}

async ready () {
ready () {
return this.opening
}

async _open () {
await this.session.ready()
if (this.opened) return

if (this._clear) this._checkoutLength = this.core.tree.length

if (this._checkoutLength !== -1) {
const batch = await this.session.core.tree.restoreBatch(this._checkoutLength)
batch.treeLength = this._checkoutLength
if (this.opened) return
this._sessionLength = batch.length
this._sessionByteLength = batch.byteLength
this._sessionBatch = batch
if (this._clear) await this.core.clearBatch()
} else {
const last = this.restore ? this.session.core.bitfield.findFirst(false, this.session.length) : 0

if (last > this.session.length) {
const batch = await this.session.core.tree.restoreBatch(last)
if (this.opened) return
this._sessionLength = batch.length
this._sessionByteLength = batch.byteLength - this.session.padding * batch.length
this._sessionBatch = batch
Expand Down
34 changes: 34 additions & 0 deletions lib/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,40 @@ module.exports = class Core {
}
}

async clearBatch () {
await this._mutex.lock()

try {
const len = this.bitfield.findFirst(false, this.tree.length)
if (len <= this.tree.length) return

const batch = await this.tree.truncate(this.tree.length, this.tree.fork)

batch.signature = this.tree.signature // same sig

const entry = {
userData: null,
treeNodes: batch.nodes,
treeUpgrade: batch,
bitfield: {
drop: true,
start: batch.ancestors,
length: len - batch.ancestors
}
}

await this.oplog.append([entry], false)

this.bitfield.setRange(batch.ancestors, len - batch.ancestors, false)
batch.commit()

// TODO: (see below todo)
await this._flushOplog()
} finally {
this._mutex.unlock()
}
}

async clear (start, end, cleared) {
await this._mutex.lock()

Expand Down
35 changes: 35 additions & 0 deletions test/batch.js
Original file line number Diff line number Diff line change
Expand Up @@ -608,3 +608,38 @@ test('persistent batch', async function (t) {
sub.alike(downloaded.sort(), [6, 7], 'got non pending blocks')
})
})

test('clear', async function (t) {
const core = await create()

await core.append('hello')

const clone = await create(core.key)

const b = clone.batch()

await b.append('hello')
await b.flush()
await b.close()

const [s1, s2] = replicate(core, clone, t)

await eventFlush()
t.ok(!!(await clone.get(0)), 'got block 0 proof')

s1.destroy()
s2.destroy()

const b1 = clone.batch()
await b1.ready()
await b1.append('foo')
await b1.flush()
await b1.close()

t.is(clone.length, 1, 'clone length is still 1')

const b2 = clone.batch({ clear: true })
await b2.ready()

t.is(b2.length, 1, 'reset the batch')
})