Skip to content

Commit

Permalink
feat: getPath with carScope (#8)
Browse files Browse the repository at this point in the history
add getPath method as a generator that returns blocks for the targeted
dag and all blocks traversed while resolving a cid+path string

supports carScope to specify what blocks to return for the resolved dag
- `'all'`: return the entire dag starting at path. (default)
- `'block'`: return the block identified by the path.
- `'file'`: Mimic gateway semantics: Return All blocks for a multi-block
file or just enough blocks to enumerate a dir/map but not the dir
contents.

see: storacha/freeway#33
see: storacha/freeway#34
see: ipfs/specs#402

TODO:
- [x] find out how to identify the boundaries of a unixfs hamt 

...unixfs-exporter seems to define it as "not having an empty or null
Link.Name after the first 2 chars are stripped, which seems loose...
what happens if the actual dir listing has 2 char long link names? see:
https://github.com/ipfs/js-ipfs-unixfs/blob/e853049bd63d6773442e1540ae49b6a443ca8672/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts#L20-L42

License: MIT

---------

Signed-off-by: Oli Evans <[email protected]>
Co-authored-by: Alan Shaw <[email protected]>
  • Loading branch information
olizilla and Alan Shaw authored May 1, 2023
1 parent e927919 commit a613b45
Show file tree
Hide file tree
Showing 9 changed files with 9,507 additions and 5,415 deletions.
7 changes: 4 additions & 3 deletions bin.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ cli.command('get <cid>')
.describe('Fetch a DAG from the peer. Outputs a CAR file.')
.option('-p, --peer', 'Address of peer to fetch data from.')
.option('-t, --timeout', 'Timeout in milliseconds.', TIMEOUT)
.action(async (cid, { peer, timeout }) => {
cid = CID.parse(cid)
.action(async (cidPath, { peer, timeout }) => {
const [cidStr] = cidPath.replace(/^\/ipfs\//, '').split('/')
const cid = CID.parse(cidStr)
const controller = new TimeoutController(timeout)
const libp2p = await getLibp2p()
const dagula = await fromNetwork(libp2p, { peer, hashers })
Expand All @@ -73,7 +74,7 @@ cli.command('get <cid>')
let error
;(async () => {
try {
for await (const block of dagula.get(cid, { signal: controller.signal })) {
for await (const block of dagula.getPath(cidPath, { signal: controller.signal })) {
controller.reset()
await writer.put(block)
}
Expand Down
14 changes: 14 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,21 @@ export interface Network {
handle: (protocol: string | string[], handler: StreamHandler) => Promise<void>
}

export type CarScope = 'all'|'file'|'block'

export interface CarScopeOptions {
carScope?: CarScope
}

export interface IDagula {
/**
* Get a complete DAG.
*/
get (cid: CID|string, options?: AbortOptions): AsyncIterableIterator<Block>
/**
* Get a DAG for a cid+path
*/
getPath (cidPath: string, options?: AbortOptions & CarScopeOptions): AsyncIterableIterator<Block>
/**
* Get a single block.
*/
Expand All @@ -55,6 +65,10 @@ export declare class Dagula implements IDagula {
* Get a complete DAG.
*/
get (cid: CID|string, options?: AbortOptions): AsyncIterableIterator<Block>
/**
* Get a DAG for a cid+path
*/
getPath (cidPath: string, options?: AbortOptions & CarScopeOptions): AsyncIterableIterator<Block>
/**
* Get a single block.
*/
Expand Down
133 changes: 119 additions & 14 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import debug from 'debug'
import { CID } from 'multiformats/cid'
import * as dagPB from '@ipld/dag-pb'
import * as Block from 'multiformats/block'
import { exporter, walkPath } from 'ipfs-unixfs-exporter'
import { transform } from 'streaming-iterables'
Expand Down Expand Up @@ -29,20 +30,23 @@ export class Dagula {
}

/**
* @param {import('multiformats').CID|string} cid
* @param {{ signal?: AbortSignal }} [options]
* @param {CID[]|CID|string} cid
* @param {object} [options]
* @param {AbortSignal} [options.signal]
* @param {(block: import('multiformats').BlockView) => CID[]} [options.search]
*/
async * get (cid, options = {}) {
cid = typeof cid === 'string' ? CID.parse(cid) : cid
log('getting DAG %s', cid)
let cids = Array.isArray(cid) ? cid : [cid]
const search = options.search || breadthFirstSearch()

/** @type {AbortController[]} */
let aborters = []
const { signal } = options
signal?.addEventListener('abort', () => aborters.forEach(a => a.abort(signal.reason)))

let cids = [cid]
while (true) {
while (cids.length > 0) {
log('fetching %d CIDs', cids.length)
const fetchBlocks = transform(cids.length, async cid => {
if (signal) {
Expand All @@ -54,7 +58,7 @@ export class Dagula {
}
return this.getBlock(cid)
})
const nextCids = []
let nextCids = []
for await (const { cid, bytes } of fetchBlocks(cids)) {
const decoder = this.#decoders[cid.code]
if (!decoder) {
Expand All @@ -72,16 +76,87 @@ export class Dagula {
// createUnsafe here.
const block = await Block.create({ bytes, cid, codec: decoder, hasher })
yield block
for (const [, cid] of block.links()) {
nextCids.push(cid)
}
nextCids = nextCids.concat(search(block))
}
if (!nextCids.length) break
log('%d CIDs in links', nextCids.length)
cids = nextCids
}
}

/**
* Yield all blocks traversed to resolve the ipfs path.
* Then use carScope to determine the set of blocks of the targeted dag to yield.
* Yield all blocks by default.
* Use carScope: 'block' to yield the termimal block.
* Use carScope: 'file' to yield all the blocks of a unixfs file, or enough blocks to list a directory.
*
* @param {string} cidPath
* @param {object} [options]
* @param {AbortSignal} [options.signal]
* @param {'all'|'file'|'block'} [options.carScope] control how many layers of the dag are returned
* 'all': return the entire dag starting at path. (default)
* 'block': return the block identified by the path.
* 'file': Mimic gateway semantics: Return All blocks for a multi-block file or just enough blocks to enumerate a dir/map but not the dir contents.
* Where path points to a single block file, all three selectors would return the same thing.
* where path points to a sharded hamt: 'file' returns the blocks of the hamt so the dir can be listed. 'block' returns the root block of the hamt.
*/
async * getPath (cidPath, options = {}) {
const carScope = options.carScope ?? 'all'

/**
* The resolved dag root at the terminus of the cidPath
* @type {import('ipfs-unixfs-exporter').UnixFSEntry}
*/
let base

/**
* Cache of blocks required to resove the cidPath
* @type {import('./index').Block[]}
*/
let traversed = []

/**
* Adapter for unixfs-exporter to track the blocks it loads as it resolves the path.
* `walkPath` emits a single unixfs entry for multiblock structures, but we need the individual blocks.
* TODO: port logic to @web3-storage/ipfs-path to make this less ugly.
*/
const blockstore = {
/**
* @param {CID} cid
* @param {{ signal?: AbortSignal }} [options]
*/
get: async (cid, options) => {
const block = await this.getBlock(cid, options)
traversed.push(block)
return block.bytes
}
}
for await (const item of walkPath(cidPath, blockstore, { signal: options.signal })) {
base = item
yield * traversed
traversed = []
}

if (carScope === 'all' || (carScope === 'file' && base.type !== 'directory')) {
const links = base.node.Links?.map(l => l.Hash) || []
// fetch the entire dag rooted at the end of the provided path
if (links.length) {
yield * this.get(links, { signal: options.signal })
}
}
// non-files, like directories, and IPLD Maps only return blocks necessary for their enumeration
if (carScope === 'file' && base.type === 'directory') {
// the single block for the root has already been yielded.
// For a hamt we must fetch all the blocks of the (current) hamt.
if (base.unixfs.type === 'hamt-sharded-directory') {
const hamtLinks = base.node.Links?.filter(l => l.Name.length === 2).map(l => l.Hash) || []
if (hamtLinks.length) {
yield * this.get(hamtLinks, { search: hamtSearch, signal: options.signal })
}
}
}
}

/**
* @param {import('multiformats').CID|string} cid
* @param {{ signal?: AbortSignal }} [options]
Expand Down Expand Up @@ -117,11 +192,11 @@ export class Dagula {
}

/**
* @param {string|import('multiformats').CID} path
* @param {string} cidPath
* @param {{ signal?: AbortSignal }} [options]
*/
async * walkUnixfsPath (path, options = {}) {
log('walking unixfs %s', path)
async * walkUnixfsPath (cidPath, options = {}) {
log('walking unixfs %s', cidPath)
const blockstore = {
/**
* @param {CID} cid
Expand All @@ -132,8 +207,38 @@ export class Dagula {
return block.bytes
}
}
yield * walkPath(cidPath, blockstore, { signal: options.signal })
}
}

// @ts-ignore exporter requires Blockstore but only uses `get`
yield * walkPath(path, blockstore, { signal: options.signal })
/**
* Create a search function that given a decoded Block
* will return an array of CIDs to fetch next.
*
* @param {([name, cid]: [string, Link]) => boolean} linkFilter
*/
export function breadthFirstSearch (linkFilter = () => true) {
/**
* @param {import('multiformats').BlockView} block
*/
return function (block) {
const nextCids = []
if (block.cid.code === dagPB.code) {
for (const { Name, Hash } of block.value.Links) {
if (linkFilter([Name, Hash])) {
nextCids.push(Hash)
}
}
} else {
// links() paths dagPb in the ipld style so name is e.g `Links/0/Hash`, and not what we want here.
for (const link of block.links()) {
if (linkFilter(link)) {
nextCids.push(link[1])
}
}
}
return nextCids
}
}

export const hamtSearch = breadthFirstSearch(([name]) => name.length === 2)
Loading

0 comments on commit a613b45

Please sign in to comment.