Skip to content

Commit

Permalink
fix: prevent duplicate trustless-gateway reqs (#503)
Browse files Browse the repository at this point in the history
Fixes ipfs/service-worker-gateway#104

This PR fixes issues brought up in service-worker-gateway where sub-resources end up causing multiple requests to a trustless gateway for the root CID.
  • Loading branch information
SgtPooki authored Apr 20, 2024
1 parent 97fb1a7 commit 338885f
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 19 deletions.
11 changes: 11 additions & 0 deletions packages/block-brokers/.aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ const options = {
})
res.end(Uint8Array.from([0, 1, 2, 0]))
})
server.all('/ipfs/bafkqabtimvwgy3yk', async (req, res) => {
// delay the response
await new Promise((resolve) => setTimeout(resolve, 500))

res.writeHead(200, {
'content-type': 'application/octet-stream',
'content-length': 5
})
// "hello"
res.end(Uint8Array.from([104, 101, 108, 108, 111]))
})

await server.listen()
const { port } = server.server.address()
Expand Down
1 change: 1 addition & 0 deletions packages/block-brokers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
"@libp2p/logger": "^4.0.7",
"@libp2p/peer-id-factory": "^4.0.7",
"@multiformats/uri-to-multiaddr": "^8.0.0",
"@types/polka": "^0.5.7",
"@types/sinon": "^17.0.3",
"aegir": "^42.2.5",
"cors": "^2.8.5",
Expand Down
81 changes: 63 additions & 18 deletions packages/block-brokers/src/trustless-gateway/trustless-gateway.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
import { base64 } from 'multiformats/bases/base64'
import type { ComponentLogger, Logger } from '@libp2p/interface'
import type { CID } from 'multiformats/cid'

export interface TrustlessGatewayStats {
attempts: number
errors: number
invalidBlocks: number
successes: number
pendingResponses?: number
}

/**
* A `TrustlessGateway` keeps track of the number of attempts, errors, and
* successes for a given gateway url so that we can prioritize gateways that
Expand Down Expand Up @@ -37,13 +46,34 @@ export class TrustlessGateway {
*/
#successes = 0

/**
* A map of pending responses for this gateway. This is used to ensure that
* only one request per CID is made to a given gateway at a time, and that we
* don't make multiple in-flight requests for the same CID to the same gateway.
*/
#pendingResponses = new Map<string, Promise<Uint8Array>>()

private readonly log: Logger

constructor (url: URL | string, logger: ComponentLogger) {
this.url = url instanceof URL ? url : new URL(url)
this.log = logger.forComponent(`helia:trustless-gateway-block-broker:${this.url.hostname}`)
}

/**
* This function returns a unique string for the multihash.bytes of the CID.
*
* Some useful resources for why this is needed can be found using the links below:
*
* - https://github.com/ipfs/helia/pull/503#discussion_r1572451331
* - https://github.com/ipfs/kubo/issues/6815
* - https://www.notion.so/pl-strflt/Handling-ambiguity-around-CIDs-9d5e14f6516f438980b01ef188efe15d#d9d45cd1ed8b4d349b96285de4aed5ab
*/
#uniqueBlockId (cid: CID): string {
const multihashBytes = cid.multihash.bytes
return base64.encode(multihashBytes)
}

/**
* Fetch a raw block from `this.url` following the specification defined at
* https://specs.ipfs.tech/http-gateways/trustless-gateway/
Expand All @@ -60,26 +90,29 @@ export class TrustlessGateway {
throw new Error(`Signal to fetch raw block for CID ${cid} from gateway ${this.url} was aborted prior to fetch`)
}

const blockId = this.#uniqueBlockId(cid)
try {
this.#attempts++
const res = await fetch(gwUrl.toString(), {
signal,
headers: {
// also set header, just in case ?format= is filtered out by some
// reverse proxy
Accept: 'application/vnd.ipld.raw'
},
cache: 'force-cache'
})

this.log('GET %s %d', gwUrl, res.status)

if (!res.ok) {
this.#errors++
throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${this.url}`)
let pendingResponse: Promise<Uint8Array> | undefined = this.#pendingResponses.get(blockId)
if (pendingResponse == null) {
this.#attempts++
pendingResponse = fetch(gwUrl.toString(), {
signal,
headers: {
Accept: 'application/vnd.ipld.raw'
},
cache: 'force-cache'
}).then(async (res) => {
this.log('GET %s %d', gwUrl, res.status)
if (!res.ok) {
this.#errors++
throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${this.url}`)
}
this.#successes++
return new Uint8Array(await res.arrayBuffer())
})
this.#pendingResponses.set(blockId, pendingResponse)
}
this.#successes++
return new Uint8Array(await res.arrayBuffer())
return await pendingResponse
} catch (cause) {
// @ts-expect-error - TS thinks signal?.aborted can only be false now
// because it was checked for true above.
Expand All @@ -88,6 +121,8 @@ export class TrustlessGateway {
}
this.#errors++
throw new Error(`unable to fetch raw block for CID ${cid}`)
} finally {
this.#pendingResponses.delete(blockId)
}
}

Expand Down Expand Up @@ -130,4 +165,14 @@ export class TrustlessGateway {
incrementInvalidBlocks (): void {
this.#invalidBlocks++
}

getStats (): TrustlessGatewayStats {
return {
attempts: this.#attempts,
errors: this.#errors,
invalidBlocks: this.#invalidBlocks,
successes: this.#successes,
pendingResponses: this.#pendingResponses.size
}
}
}
31 changes: 30 additions & 1 deletion packages/block-brokers/test/trustless-gateway.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { multiaddr } from '@multiformats/multiaddr'
import { uriToMultiaddr } from '@multiformats/uri-to-multiaddr'
import { expect } from 'aegir/chai'
import { CID } from 'multiformats/cid'
import * as raw from 'multiformats/codecs/raw'
import Sinon from 'sinon'
import { type StubbedInstance, stubConstructor, stubInterface } from 'sinon-ts'
import { TrustlessGatewayBlockBroker } from '../src/trustless-gateway/broker.js'
import { TrustlessGateway } from '../src/trustless-gateway/trustless-gateway.js'
import { createBlock } from './fixtures/create-block.js'
import type { Routing } from '@helia/interface'
import type { CID } from 'multiformats/cid'

describe('trustless-gateway-block-broker', () => {
let blocks: Array<{ cid: CID, block: Uint8Array }>
Expand Down Expand Up @@ -190,4 +190,33 @@ describe('trustless-gateway-block-broker', () => {

await expect(sessionBlockstore?.retrieve?.(blocks[0].cid)).to.eventually.deep.equal(blocks[0].block)
})

it('does not trigger new network requests if the same cid request is in-flight', async function () {
// from .aegir.js polka server
const cid = CID.parse('bafkqabtimvwgy3yk')
if (process.env.TRUSTLESS_GATEWAY == null) {
return this.skip()
}
const trustlessGateway = new TrustlessGateway(process.env.TRUSTLESS_GATEWAY, defaultLogger())

// Call getRawBlock multiple times with the same CID
const promises = Array.from({ length: 10 }, async () => trustlessGateway.getRawBlock(cid))

// Wait for both promises to resolve
const [block1, ...blocks] = await Promise.all(promises)

// Assert that all calls to getRawBlock returned the same block
for (const block of blocks) {
expect(block).to.deep.equal(block1)
}

expect(trustlessGateway.getStats()).to.deep.equal({
// attempt is only incremented when a new request is made
attempts: 1,
errors: 0,
invalidBlocks: 0,
successes: 1,
pendingResponses: 0 // the queue is empty
})
})
})

0 comments on commit 338885f

Please sign in to comment.