Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: content claims fetcher #26

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added fetcher/content-claims/api.js
Empty file.
19 changes: 19 additions & 0 deletions fetcher/content-claims/api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { UnknownLink } from 'multiformats/link'
import { MultihashIndexItem } from 'cardex/multihash-index-sorted/api'
import { CARLink } from 'cardex/api'

export interface IndexEntry extends MultihashIndexItem {
origin: CARLink
}

export interface Index {
get (c: UnknownLink): Promise<IndexEntry|undefined>
}

export interface DataFetcherStreamOptions {
range?: { offset: number, length?: number }
}

export interface DataFetcher {
stream (k: string, options?: DataFetcherStreamOptions): Promise<ReadableStream<Uint8Array>>
}
47 changes: 47 additions & 0 deletions fetcher/content-claims/block-batch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
const MAX_BYTES_BETWEEN = 1024 * 1024 * 2
const MAX_BATCH_SIZE = 10

/**
* @typedef {import('multiformats').UnknownLink} UnknownLink
* @typedef {{ carCid: import('cardex/api').CARLink, blockCid: UnknownLink, offset: number }} BatchItem
* @typedef {{ add: (i: BatchItem) => void, remove: (cid: UnknownLink) => void, next: () => BatchItem[] }} BlockBatcher
*/

/**
* Batcher for blocks in CARs. Batches are grouped by CAR CID and blocks are
* returned in batches in the order they were inserted.
* @implements {BlockBatcher}
*/
export class OrderedCarBlockBatcher {
/** @type {BatchItem[]} */
#queue = []

/** @param {BatchItem} item */
add (item) {
this.#queue.push(item)
}

/** @param {UnknownLink} cid */
remove (cid) {
this.#queue = this.#queue.filter(item => item.blockCid.toString() !== cid.toString())
}

next () {
const queue = this.#queue
let prevItem = queue.shift()
if (!prevItem) return []
const batch = [prevItem]
while (true) {
const item = queue.at(0)
if (!item) break
if (item.carCid.toString() !== prevItem.carCid.toString() || item.offset - prevItem.offset >= MAX_BYTES_BETWEEN) {
break
}
batch.push(item)
queue.shift() // remove from the queue
if (batch.length >= MAX_BATCH_SIZE) break
prevItem = item
}
return batch
}
}
182 changes: 182 additions & 0 deletions fetcher/content-claims/block-fetcher.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import { readBlockHead, asyncIterableReader } from '@ipld/car/decoder'
import { base58btc } from 'multiformats/bases/base58'
import defer from 'p-defer'
import { OrderedCarBlockBatcher } from './block-batch.js'

/**
* @typedef {import('multiformats').UnknownLink} UnknownLink
* @typedef {import('dagula').Block} Block
* @typedef {import('@cloudflare/workers-types').R2Bucket} R2Bucket
*/

// 2MB (max safe libp2p block size) + typical block header length + some leeway
const MAX_ENCODED_BLOCK_LENGTH = (1024 * 1024 * 2) + 39 + 61

export class BlockFetcher {
/**
* @param {R2Bucket} dataBucket
* @param {import('./dag-index/api.js').Index} index
*/
constructor (dataBucket, index) {
this._dataBucket = dataBucket
this._idx = index
}

/** @param {UnknownLink} cid */
async get (cid) {
// console.log(`get ${cid}`)
const entry = await this._idx.get(cid)
if (!entry) return
const carPath = `${entry.origin}/${entry.origin}.car`
const range = { offset: entry.offset }
const res = await this._dataBucket.get(carPath, { range })
if (!res) return

const reader = res.body.getReader()
const bytesReader = asyncIterableReader((async function * () {
while (true) {
const { done, value } = await reader.read()
if (done) return
yield value
}
})())

const blockHeader = await readBlockHead(bytesReader)
const bytes = await bytesReader.exactly(blockHeader.blockLength)
reader.cancel()
return { cid, bytes }
}
}

export class BatchingBlockFetcher extends BlockFetcher {
/** @type {Map<string, Array<import('p-defer').DeferredPromise<Block|undefined>>>} */
#pendingBlocks = new Map()

/** @type {import('./block-batch').BlockBatcher} */
#batcher = new OrderedCarBlockBatcher()

#scheduled = false

/** @type {Promise<void>|null} */
#processing = null

#scheduleBatchProcessing () {
if (this.#scheduled) return
this.#scheduled = true

const startProcessing = async () => {
this.#scheduled = false
const { promise, resolve } = defer()
this.#processing = promise
try {
await this.#processBatch()
} finally {
this.#processing = null
resolve()
}
}

// If already running, then start when finished
if (this.#processing) {
return this.#processing.then(startProcessing)
}

// If not running, then start on the next tick
setTimeout(startProcessing)
}

async #processBatch () {
console.log('processing batch')
const batcher = this.#batcher
this.#batcher = new OrderedCarBlockBatcher()
const pendingBlocks = this.#pendingBlocks
this.#pendingBlocks = new Map()

while (true) {
const batch = batcher.next()
if (!batch.length) break

batch.sort((a, b) => a.offset - b.offset)

const { carCid } = batch[0]
const carPath = `${carCid}/${carCid}.car`
const range = {
offset: batch[0].offset,
length: batch[batch.length - 1].offset - batch[0].offset + MAX_ENCODED_BLOCK_LENGTH
}

console.log(`fetching ${batch.length} blocks from ${carCid} (${range.length} bytes @ ${range.offset})`)
const res = await this._dataBucket.get(carPath, { range })
if (!res) {
// should not happen, but if it does, we need to resolve `undefined`
// for the blocks in this batch - they are not found.
for (const blocks of pendingBlocks.values()) {
blocks.forEach(b => b.resolve())
}
return
}

const reader = res.body.getReader()
const bytesReader = asyncIterableReader((async function * () {
while (true) {
const { done, value } = await reader.read()
if (done) return
yield value
}
})())

while (true) {
try {
const blockHeader = await readBlockHead(bytesReader)
const bytes = await bytesReader.exactly(blockHeader.blockLength)
bytesReader.seek(blockHeader.blockLength)

const key = mhToKey(blockHeader.cid.multihash.bytes)
const blocks = pendingBlocks.get(key)
if (blocks) {
// console.log(`got wanted block for ${blockHeader.cid}`)
const block = { cid: blockHeader.cid, bytes }
blocks.forEach(b => b.resolve(block))
pendingBlocks.delete(key)
// remove from batcher if queued to be read
batcher.remove(blockHeader.cid)
}
} catch {
break
}
}
// we should have read all the bytes from the reader by now but if the
// bytesReader throws for bad data _before_ the end then we need to
// cancel the reader - we don't need the rest.
reader.cancel()
}

// resolve `undefined` for any remaining blocks
for (const blocks of pendingBlocks.values()) {
blocks.forEach(b => b.resolve())
}
}

/** @param {UnknownLink} cid */
async get (cid) {
// console.log(`get ${cid}`)
const entry = await this._idx.get(cid)
if (!entry) return

this.#batcher.add({ carCid: entry.origin, blockCid: cid, offset: entry.offset })

if (!entry.multihash) throw new Error('missing entry multihash')
const key = mhToKey(entry.multihash.bytes)
let blocks = this.#pendingBlocks.get(key)
if (!blocks) {
blocks = []
this.#pendingBlocks.set(key, blocks)
}
const deferred = defer()
blocks.push(deferred)
this.#scheduleBatchProcessing()
return deferred.promise
}
}

const mhToKey = (/** @type {Uint8Array} */ mh) => base58btc.encode(mh)
19 changes: 19 additions & 0 deletions fetcher/content-claims/car.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { CarReader } from '@ipld/car'

export const code = 0x0202

/**
* @param {Uint8Array} bytes
* @returns {Promise<Array<{ cid: import('multiformats').UnknownLink, bytes: Uint8Array }>>}
*/
export async function decode (bytes) {
const reader = await CarReader.fromBytes(bytes)
const blocks = []
for await (const b of reader.blocks()) {
blocks.push({
cid: /** @type {import('multiformats').UnknownLink} */ (b.cid),
bytes: b.bytes
})
}
return blocks
}
Loading
Loading