Skip to content

Commit

Permalink
add support for stats (#5)
Browse files Browse the repository at this point in the history
* add support for stats

* no need for todo as pr is up

* docs

* only one io tick per delete/insert

* clean it up a bit

* add to gen script for fixtures

* fix left over error

* cleanup naming and now use a batch for more speeeeed

* remove unneeded line in codegen

* regen

* stats type is derived
  • Loading branch information
mafintosh authored Sep 14, 2024
1 parent 593298d commit c12abd6
Show file tree
Hide file tree
Showing 16 changed files with 691 additions and 29 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ Alias for `await find(...).one()`

Get a document from a collection

#### `{ count } = await db.stats(collectionOrIndex)`

Get stats, about a collection or index with stats enabled.

#### `await db.insert(collection, doc)`

Insert a document into a collection. NOTE: you have to flush the db later for this to be persisted.
Expand Down
18 changes: 10 additions & 8 deletions builder/codegen.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ module.exports = function generateCode (hyperdb) {
for (let i = 0; i < hyperdb.orderedTypes.length; i++) {
const type = hyperdb.orderedTypes[i]
if (type.isCollection) {
const id = `collection${collections.length}`
const id = `collection${type.id}`
collections.push({ id, type })
str += generateCollectionDefinition(id, type)
} else if (type.isIndex) {
const id = `index${indexes.length}`
indexes.push({ id, type })
str += generateIndexDefinition(id, type)
str += generateIndexDefinition(id, type, `collection${type.collection.id}`)
}
str += '\n'
}
Expand All @@ -93,9 +93,8 @@ module.exports = function generateCode (hyperdb) {
str += 'const Indexes = [...IndexMap.values()]\n'

str += 'for (const index of IndexMap.values()) {\n'
str += ' const collection = CollectionMap.get(index._collectionName)\n'
str += ' const collection = index.collection\n'
str += ' collection.indexes.push(index)\n'
str += ' index.collection = collection\n'
str += ' index.offset = collection.indexes.length - 1\n'
str += '}\n'
str += '\n'
Expand Down Expand Up @@ -169,7 +168,7 @@ function generateCollectionDefinition (id, collection) {
str += `// ${s(collection.fqn)} reconstruction function\n`
str += `function ${id}_reconstruct (version, keyBuf, valueBuf) {\n`
str += ' // TODO: This should be fully code generated\n'
str += ' const key = collection0_key.decode(keyBuf)\n'
str += ` const key = ${id}_key.decode(keyBuf)\n`
str += ` const value = c.decode(resolveStruct(${s(collection.valueEncoding)}, version), valueBuf)\n`
str += ' return {\n'
for (let i = 0; i < collection.key.length; i++) {
Expand All @@ -184,6 +183,8 @@ function generateCollectionDefinition (id, collection) {
str += `// ${s(collection.fqn)}\n`
str += `const ${id} = {\n`
str += ` name: ${s(collection.fqn)},\n`
str += ` id: ${collection.id},\n`
str += ` stats: ${collection.stats},\n`
str += ` encodeKey: ${generateEncodeCollectionKey(id, collection)},\n`
str += ` encodeKeyRange: ${generateEncodeKeyRange(id, collection)},\n`
str += ` encodeValue: ${generateEncodeCollectionValue(collection)},\n`
Expand All @@ -193,18 +194,19 @@ function generateCollectionDefinition (id, collection) {
return str
}

function generateIndexDefinition (id, index) {
function generateIndexDefinition (id, index, collectionId) {
let str = generateCommonPrefix(id, index)
str += `// ${s(index.fqn)}\n`
str += `const ${id} = {\n`
str += ` _collectionName: ${s(index.description.collection)},\n`
str += ` name: ${s(index.fqn)},\n`
str += ` id: ${index.id},\n`
str += ` stats: ${index.stats},\n`
str += ` encodeKeys: ${generateEncodeIndexKeys(id, index)},\n`
str += ` encodeKeyRange: ${generateEncodeKeyRange(id, index)},\n`
str += ` encodeValue: (doc) => ${id}.collection.encodeKey(doc),\n`
str += ' reconstruct: (keyBuf, valueBuf) => valueBuf,\n'
str += ' offset: 0,\n'
str += ' collection: null\n'
str += ` collection: ${collectionId}\n`
str += '}\n'
return str
}
Expand Down
33 changes: 32 additions & 1 deletion builder/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class DBType {
this.fqn = getFQN(this.namespace, this.description.name)
this.key = description.key
this.fullKey = []
this.stats = !!description.stats

this.isMapped = false
this.isIndex = false
Expand Down Expand Up @@ -56,7 +57,8 @@ class DBType {
name: this.description.name,
unsafe: this.description.unsafe,
namespace: this.namespace,
id: this.id
id: this.id,
stats: this.stats
}
}
}
Expand Down Expand Up @@ -229,6 +231,7 @@ class Builder {
this.typesById = new Map()
this.orderedTypes = []

this.registeredStats = false
this.currentOffset = this.offset

this.initializing = true
Expand Down Expand Up @@ -256,12 +259,39 @@ class Builder {
return { id: this.currentOffset++, prefix: null }
}

_registerStats () {
if (this.registeredStats) return
this.registeredStats = true

this.schema.register({
namespace: null,
name: 'stats',
derived: true,
fields: [{
name: 'id',
type: 'uint',
required: true
}, {
name: 'count',
type: 'uint',
required: true
}]
})
this.registerCollection({
name: 'stats',
schema: 'stats',
key: ['id']
}, null)
}

registerCollection (description, namespace) {
const collection = new Collection(this, namespace, description)
if (this.typesByName.has(collection.fqn)) return

this.orderedTypes.push(collection)
this.typesByName.set(collection.fqn, collection)

if (collection.stats) this._registerStats()
}

registerIndex (description, namespace) {
Expand Down Expand Up @@ -322,5 +352,6 @@ class Builder {
module.exports = Builder

function getFQN (namespace, name) {
if (namespace === null) return name
return '@' + namespace + '/' + name
}
127 changes: 114 additions & 13 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ const b4a = require('b4a')
// engines
const RocksEngine = require('./lib/engine/rocks')

const STATS = 'stats'

class Updates {
constructor (clock, entries) {
constructor (clock, entries, stats) {
this.refs = 1
this.mutating = 0
this.clock = clock
this.map = new Map(entries)
this.stats = new Map(stats)
}

get size () {
Expand All @@ -28,13 +31,24 @@ class Updates {
detach () {
const entries = new Array(this.map.size)

let i = 0
for (const [key, u] of this.map) {
entries[i++] = [key, { key: u.key, value: u.value, indexes: u.indexes.slice(0) }]
if (entries.length > 0) {
let i = 0
for (const [key, u] of this.map) {
entries[i++] = [key, { key: u.key, value: u.value, indexes: u.indexes.slice(0) }]
}
}

const stats = new Array(this.stats.size)

if (stats.length > 0) {
let i = 0
for (const [col, st] of this.stats) {
stats[i++] = [col, st]
}
}

this.refs--
return new Updates(this.clock, entries)
return new Updates(this.clock, entries, stats)
}

get (key) {
Expand All @@ -44,11 +58,26 @@ class Updates {

flush (clock) {
this.clock = clock
this.stats.clear()
this.map.clear()
}

update (key, value) {
const u = { key, value, indexes: [] }
prestats (collectionOrIndex, engine) {
const st = this.stats.get(collectionOrIndex)
if (st) return st

const state = {
key: null,
value: null,
promise: null
}

this.stats.set(collectionOrIndex, state)
return state
}

update (collection, key, value) {
const u = { created: false, collection, key, value, indexes: [] }
this.map.set(b4a.toString(key, 'hex'), u)
return u
}
Expand All @@ -61,6 +90,22 @@ class Updates {
return this.map.values()
}

indexStatsOverlay (index) {
throw new Error('Index stats are not currently implemented, open an issue')
}

collectionStatsOverlay (collection) {
const info = { count: 0 }

for (const u of this.map.values()) {
if (u.collection !== collection) continue
if (u.value === null) info.count--
else if (u.created) info.count++
}

return info
}

overlay (range, index, reverse) {
const overlay = []

Expand Down Expand Up @@ -98,7 +143,7 @@ class HyperDB {
constructor (engine, definition, {
version = definition.version,
snapshot = engine.snapshot(),
updates = new Updates(engine.clock, []),
updates = new Updates(engine.clock, [], []),
rootInstance = null,
writable = true
} = {}) {
Expand Down Expand Up @@ -239,7 +284,7 @@ class HyperDB {
}

function map (entries) {
return engine.getRange(snap, entries)
return engine.getIndirectRange(snap, entries)
}
}

Expand All @@ -254,12 +299,52 @@ class HyperDB {
if (collection === null) return null

const key = collection.encodeKey(doc)

const u = this.updates.get(key)
const value = u !== null ? u.value : await this.engine.get(this.engineSnapshot, key)

return value === null ? null : collection.reconstruct(this.version, key, value)
}

async stats (indexName) {
const collection = this.definition.resolveCollection(indexName)
const index = collection === null ? this.definition.resolveIndex(indexName) : null

if (collection === null && index === null) throw new Error('Unknown index: ' + indexName)

const target = collection || index
const st = await this.get(STATS, { id: target.id })

const overlay = index ? this.updates.indexStatsOverlay(index) : this.updates.collectionStatsOverlay(collection)

if (!st) return overlay

st.count += overlay.count
return st
}

async _getPrev (key, collection) {
const st = collection.stats === true ? this.updates.prestats(collection) : null

if (st !== null && !st.promise && !st.value) {
const statsCollection = this.definition.resolveCollection(STATS)

st.key = statsCollection.encodeKey({ id: collection.id })
st.promise = this.engine.getBatch(this.engineSnapshot, [key, st.key])

const [value, stats] = await st.promise

st.value = stats === null ? statsCollection.encodeValue(this.version, { count: 0 }) : stats
st.promise = null

return value
}

const value = await this.engine.get(this.engineSnapshot, key)
if (st !== null && st.promise !== null) await st.promise
return value
}

async delete (collectionName, doc) {
maybeClosed(this)

Expand All @@ -273,7 +358,7 @@ class HyperDB {
let prevValue = null
this.updates.mutating++
try {
prevValue = await this.engine.get(this.engineSnapshot, key)
prevValue = await this._getPrev(key, collection)
} finally {
this.updates.mutating--
}
Expand All @@ -285,7 +370,7 @@ class HyperDB {

const prevDoc = collection.reconstruct(this.version, key, prevValue)

const u = this.updates.update(key, null)
const u = this.updates.update(collection, key, null)

for (let i = 0; i < collection.indexes.length; i++) {
const idx = collection.indexes[i]
Expand All @@ -312,7 +397,7 @@ class HyperDB {
let prevValue = null
this.updates.mutating++
try {
prevValue = await this.engine.get(this.engineSnapshot, key)
prevValue = await this._getPrev(key, collection)
} finally {
this.updates.mutating--
}
Expand All @@ -321,7 +406,9 @@ class HyperDB {

const prevDoc = prevValue === null ? null : collection.reconstruct(this.version, key, prevValue)

const u = this.updates.update(key, value)
const u = this.updates.update(collection, key, value)

u.created = prevValue === null

for (let i = 0; i < collection.indexes.length; i++) {
const idx = collection.indexes[i]
Expand Down Expand Up @@ -355,6 +442,18 @@ class HyperDB {
}
}

_applyStats () {
const statsCollection = this.definition.resolveCollection(STATS)
for (const [collection, { key, value }] of this.updates.stats) {
const stats = statsCollection.reconstruct(this.version, key, value)
const overlay = this.updates.collectionStatsOverlay(collection)
stats.count += overlay.count
const updatedValue = statsCollection.encodeValue(this.version, stats)
if (b4a.equals(value, updatedValue)) continue
this.updates.update(statsCollection, key, updatedValue)
}
}

async flush () {
maybeClosed(this)

Expand All @@ -364,6 +463,8 @@ class HyperDB {
if (this.updates.clock !== this.engine.clock) throw new Error('Database has changed, refusing to commit')
if (this.updates.refs > 1) this.updates = this.updates.detach()

if (this.updates.stats.size) this._applyStats()

await this.engine.commit(this.updates)

this.reload()
Expand Down
Loading

0 comments on commit c12abd6

Please sign in to comment.