Skip to content

Commit

Permalink
catchup should be mutexed and fix related bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
mafintosh committed Sep 13, 2023
1 parent 5c28a3a commit a962e30
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
6 changes: 3 additions & 3 deletions lib/batch.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ module.exports = class HypercoreBatch extends EventEmitter {
async update (opts) {
if (this.opened === false) await this.ready()
await this.session.update(opts)
await this._catchup()
}

setUserData (key, value, opts) {
Expand Down Expand Up @@ -275,7 +274,7 @@ module.exports = class HypercoreBatch extends EventEmitter {
const offset = this.core.tree.length - this._sessionLength
for (let i = 0; i < offset; i++) this._byteLength -= this._appends[i].byteLength

await this.core.insertValuesUnsafe(this._appends.slice(0, offset), this._sessionLength, this._sessionByteLength, b.nodes)
await this.core.insertValuesUnsafe(this._appends.slice(0, offset), this._sessionLength, this._sessionByteLength, b.nodes.slice(0))

this._sessionLength = this.session.length
this._sessionByteLength = this.session.byteLength
Expand All @@ -287,11 +286,12 @@ module.exports = class HypercoreBatch extends EventEmitter {

async _flush (length, keyPair, signature) { // TODO: make this safe to interact with a parallel truncate...
if (this._appends.length === 0) return true
if (!(await this._catchup())) return false

await this.core._mutex.lock()

try {
if (!(await this._catchup())) return false

const flushingLength = Math.min(length - this._sessionLength, this._appends.length)
if (flushingLength <= 0) return true

Expand Down
8 changes: 4 additions & 4 deletions lib/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -439,18 +439,18 @@ module.exports = class Core {
const bitfield = {
drop: false,
start: offset,
end: offset + values.length
length: values.length
}

await this.blocks.putBatch(offset, values, byteOffset)
await this.oplog.append({
await this.oplog.append([{
userData: null,
treeNodes,
treeUpgrade: null,
bitfield
}, false)
}], false)

this.bitfield.setRange(bitfield.start, values.length, true)
this.bitfield.setRange(bitfield.start, bitfield.length, true)
for (const node of treeNodes) this.tree.unflushed.set(node.index, node)

const status = updateContig(this.header, bitfield, this.bitfield)
Expand Down

0 comments on commit a962e30

Please sign in to comment.