Skip to content

Commit

Permalink
fix: ci
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed May 16, 2024
1 parent 534d302 commit 2b904f0
Show file tree
Hide file tree
Showing 10 changed files with 3,640 additions and 5,365 deletions.
8,592 changes: 3,358 additions & 5,234 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"remove": "sst remove",
"console": "sst console",
"lint": "tsc && eslint '**/*.js'",
"clean": "rm -rf dist node_modules package-lock.json ./*/{.cache,dist,node_modules}",
"clean": "rm -rf dist node_modules ./*/{.cache,dist,node_modules}",
"test": "npm test -w billing -w upload-api -w carpark -w replicator -w satnav -w roundabout -w filecoin",
"test-integration": "ava --verbose --serial --timeout=660s test/*.test.js",
"fetch-metrics-for-space": "npm run fetch-metrics-for-space -w tools",
Expand Down
1 change: 1 addition & 0 deletions stacks/upload-api-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ export function UploadApiStack({ stack, app }) {
pieceOfferQueue,
filecoinSubmitQueue,
multihashesQueue,
blocksCarPositionTable,
],
environment: {
DID: process.env.UPLOAD_API_DID ?? '',
Expand Down
119 changes: 109 additions & 10 deletions test/blob.test.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import { testBlob as test } from './helpers/context.js'

import pWaitFor from 'p-wait-for'
import * as BlobCapabilities from '@web3-storage/capabilities/blob'
import { base58btc } from 'multiformats/bases/base58'
import * as Link from 'multiformats/link'
import { equals } from 'multiformats/bytes'
import { code as RAW_CODE } from 'multiformats/codecs/raw'
import { HeadObjectCommand } from '@aws-sdk/client-s3'
import { Assert } from '@web3-storage/content-claims/capability'
import { ShardingStream, UnixFS } from '@web3-storage/upload-client'
import { useReceiptsStorage } from '../upload-api/stores/receipts.js'
import { codec as carCodec } from '@ucanto/transport/car'

import { METRICS_NAMES, SPACE_METRICS_NAMES } from '../upload-api/constants.js'

import * as Blob from './helpers/blob-client.js'
import {
Expand All @@ -16,18 +21,26 @@ import {
getAwsBucketClient,
getCloudflareBucketClient,
getCarparkBucketInfo,
getRoundaboutEndpoint
getRoundaboutEndpoint,
getDynamoDb,
} from './helpers/deployment.js'
import { randomFile } from './helpers/random.js'
import { createMailSlurpInbox, setupNewClient, getServiceProps } from './helpers/up-client.js'
import { getMetrics, getSpaceMetrics } from './helpers/metrics.js'

test.before(t => {
t.context = {
apiEndpoint: getApiEndpoint(),
roundaboutEndpoint: getRoundaboutEndpoint(),
metricsDynamo: getDynamoDb('admin-metrics'),
spaceMetricsDynamo: getDynamoDb('space-metrics'),
}
})

/**
* @typedef {import('multiformats').UnknownLink} UnknownLink
*/

// Integration test for all flow from uploading a blob, to all the reads pipelines to work.
test('blob integration flow with receipts validation', async t => {
const stage = getStage()
Expand All @@ -39,20 +52,73 @@ test('blob integration flow with receipts validation', async t => {
throw new Error('Testing space DID must be set')
}

// Get space metrics before blob/add
const spaceBeforeBlobAddMetrics = await getSpaceMetrics(t, spaceDid, SPACE_METRICS_NAMES.BLOB_ADD_TOTAL)
const spaceBeforeBlobAddSizeMetrics = await getSpaceMetrics(t, spaceDid, SPACE_METRICS_NAMES.BLOB_ADD_SIZE_TOTAL)

// Get metrics before upload
const beforeOperationMetrics = await getMetrics(t)
const beforeBlobAddTotal = beforeOperationMetrics.find(row => row.name === METRICS_NAMES.BLOB_ADD_TOTAL)
const beforeBlobAddSizeTotal = beforeOperationMetrics.find(row => row.name === METRICS_NAMES.BLOB_ADD_SIZE_TOTAL)

// Prepare data
const file = await randomFile(100)
const data = new Uint8Array(await file.arrayBuffer())

// Encode file as Unixfs and perform store/add
const blocksReadableStream = UnixFS.createFileEncoderStream(file)
/** @type {import('@web3-storage/upload-client/types').CARLink[]} */
const shards = []
/** @type {Uint8Array[]} */
const shardBytes = []
/** @type {import('@web3-storage/upload-client/types').AnyLink | undefined} */
let root

/** @type {import('multiformats/hashes/digest').Digest<18, number> | undefined} */
let multihash
/** @type {{ put: any, accept: { task: any }} | undefined} */
let next
await blocksReadableStream
.pipeThrough(new ShardingStream())
.pipeThrough(
new TransformStream({
async transform(car, controller) {
const bytes = new Uint8Array(await car.arrayBuffer())

// Add blob using custom client to be able to access receipts
const res = await Blob.add(serviceProps.conf, bytes, { connection: serviceProps.connection })
t.truthy(res)
t.truthy(res.multihash)
multihash = res.multihash
next = res.next

const cid = Link.create(carCodec.code, res.multihash)
const { version, roots, size, slices } = car
controller.enqueue({ version, roots, size, cid, slices, bytes })
}
})
)
.pipeTo(
new WritableStream({
write(meta) {
root = root || meta.roots[0]
shards.push(meta.cid)
shardBytes.push(meta.bytes)
},
})
)

if (root === undefined) throw new Error('missing root CID')
if (multihash === undefined) throw new Error('missing multihash')
t.is(shards.length, 1)

// Add blob using custom client to be able to access receipts
const res = await Blob.add(serviceProps.conf, data, { connection: serviceProps.connection })
t.truthy(res)
console.log('root', root)

// Get bucket clients
const s3Client = getAwsBucketClient()
const r2Client = getCloudflareBucketClient()

// Check blob exists in R2, but not S3
const encodedMultihash = base58btc.encode(res.multihash.bytes)
const encodedMultihash = base58btc.encode(multihash.bytes)
const r2Request = await r2Client.send(
new HeadObjectCommand({
// Env var
Expand All @@ -61,6 +127,7 @@ test('blob integration flow with receipts validation', async t => {
})
)
t.is(r2Request.$metadata.httpStatusCode, 200)
const carSize = r2Request.ContentLength
try {
await s3Client.send(
new HeadObjectCommand({
Expand All @@ -74,11 +141,11 @@ test('blob integration flow with receipts validation', async t => {

// Check receipts were written
const receiptsStorage = useReceiptsStorage(s3Client, `task-store-${stage}-0`, `invocation-store-${stage}-0`, `workflow-store-${stage}-0`)
const getPutTaskReceipt = await receiptsStorage.get(res.next.put.task.link())
const getPutTaskReceipt = await receiptsStorage.get(next?.put.task.link())
t.truthy(getPutTaskReceipt.ok?.out.ok)
t.deepEqual(getPutTaskReceipt.ok?.out.ok, {})

const getAcceptTaskReceipt = await receiptsStorage.get(res.next.accept.task.link())
const getAcceptTaskReceipt = await receiptsStorage.get(next?.accept.task.link())
t.truthy(getAcceptTaskReceipt.ok?.out.ok)
t.truthy(getAcceptTaskReceipt.ok?.out.ok.site)

Expand All @@ -91,14 +158,16 @@ test('blob integration flow with receipts validation', async t => {
t.truthy(acceptForks?.find(f => f.capabilities[0].can === Assert.location.can))

// Read from Roundabout and check bytes can be read by raw CID
const rawCid = Link.create(RAW_CODE, res.multihash)
const rawCid = Link.create(RAW_CODE, multihash)
const roundaboutResponse = await fetch(
`${t.context.roundaboutEndpoint}/${rawCid.toString()}`
)
t.is(roundaboutResponse.status, 200)

const fetchedBytes = new Uint8Array(await roundaboutResponse.arrayBuffer())
t.truthy(equals(data, fetchedBytes))
t.truthy(equals(shardBytes[0], fetchedBytes))

// ----

// TODO: Read from w3link (staging?)
// fetch `https://${rootCid}.ipfs.w3s.link
Expand All @@ -111,4 +180,34 @@ test('blob integration flow with receipts validation', async t => {
// dns4/elastic.ipfs??

// use https://github.com/ipfs/helia to connect to hoverboard peer and read som bytes

// Validate metrics
// Check metrics were updated
console.log('check metrics')
if (beforeBlobAddSizeTotal && spaceBeforeBlobAddSizeMetrics) {
await pWaitFor(async () => {
const afterOperationMetrics = await getMetrics(t)
const afterBlobAddTotal = afterOperationMetrics.find(row => row.name === METRICS_NAMES.BLOB_ADD_TOTAL)
const afterBlobAddSizeTotal = afterOperationMetrics.find(row => row.name === METRICS_NAMES.BLOB_ADD_SIZE_TOTAL)
const spaceAfterBlobAddMetrics = await getSpaceMetrics(t, spaceDid, SPACE_METRICS_NAMES.BLOB_ADD_TOTAL)
const spaceAfterBlobAddSizeMetrics = await getSpaceMetrics(t, spaceDid, SPACE_METRICS_NAMES.BLOB_ADD_SIZE_TOTAL)

// If staging accept more broad condition given multiple parallel tests can happen there
if (stage === 'staging') {
return (
afterBlobAddTotal?.value >= beforeBlobAddTotal?.value + 1 &&
afterBlobAddSizeTotal?.value >= beforeBlobAddSizeTotal.value + carSize &&
spaceAfterBlobAddMetrics?.value >= spaceBeforeBlobAddMetrics?.value + 1 &&
spaceAfterBlobAddSizeMetrics?.value >= spaceBeforeBlobAddSizeMetrics?.value + carSize
)
}

return (
afterBlobAddTotal?.value === beforeBlobAddTotal?.value + 1 &&
afterBlobAddSizeTotal?.value === beforeBlobAddSizeTotal.value + carSize &&
spaceAfterBlobAddMetrics?.value === spaceBeforeBlobAddMetrics?.value + 1 &&
spaceAfterBlobAddSizeMetrics?.value === spaceBeforeBlobAddSizeMetrics?.value + carSize
)
})
}
})
20 changes: 10 additions & 10 deletions test/filecoin.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import pRetry from 'p-retry'
import * as CAR from '@ucanto/transport/car'
import { Storefront } from '@web3-storage/filecoin-client'
import { DynamoDBClient } from '@aws-sdk/client-dynamodb'
import * as Link from 'multiformats/link'
import * as raw from 'multiformats/codecs/raw'
import { base58btc } from 'multiformats/bases/base58'

import { useReceiptStore } from '../filecoin/store/receipt.js'

Expand Down Expand Up @@ -61,23 +64,20 @@ test('w3filecoin integration flow', async t => {
// Upload new file
await client.uploadFile(file, {
onShardStored: (meta) => {
const content = Link.create(raw.code, meta.cid.multihash)

uploadFiles.push({
content: meta.cid,
content,
piece: meta.piece
})
console.log(`shard file written with {${meta.cid}, ${meta.piece}}`)
console.log(`shard file written with {${meta.cid}, ${content}, ${meta.piece}}`)
},
})
t.is(uploadFiles.length, 1)
return uploadFiles[0]
}
))

// Shortcut if computing piece cid is disabled
if (process.env.DISABLE_PIECE_CID_COMPUTE === 'true') {
return
}

// Check filecoin pieces computed after leaving queue
// Bucket event given client is not invoking this
await Promise.all(uploads.map(async (upload) => {
Expand All @@ -95,20 +95,20 @@ test('w3filecoin integration flow', async t => {
// Check roundabout can redirect from pieceCid to signed bucket url for car
const roundaboutEndpoint = await getRoundaboutEndpoint()
const roundaboutUrl = new URL(testUpload.piece.toString(), roundaboutEndpoint)
console.log('checking roundabout for one piece', roundaboutUrl.toString())
await pWaitFor(async () => {
try {
const res = await fetch(roundaboutUrl, {
method: 'HEAD',
redirect: 'manual'
})
return res.status === 302 && res.headers.get('location')?.includes(testUpload.content.toString())
const encodedMultihash = base58btc.encode(testUpload.content.multihash.bytes)
return res.status === 302 && res.headers.get('location')?.includes(encodedMultihash)
} catch {}
return false
}, {
interval: 100,
})
console.log('roundabout redirected from piece to car', roundaboutUrl.toString())
console.log('roundabout redirected from piece to blob', roundaboutUrl.toString())

// Invoke `filecoin/offer`
console.log(`invoke 'filecoin/offer' for piece ${testUpload.piece.toString()} (${testUpload.content})`)
Expand Down
6 changes: 6 additions & 0 deletions test/helpers/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,16 @@ dotenv.config({
* @typedef {object} BlobContext
* @property {string} apiEndpoint
* @property {string} roundaboutEndpoint
* @property {Dynamo} metricsDynamo
* @property {Dynamo} spaceMetricsDynamo
*
* @typedef {object} StoreContext
* @property {string} apiEndpoint
* @property {Dynamo} rateLimitsDynamo
*
* @typedef {object} MetricsContext
* @property {Dynamo} metricsDynamo
* @property {Dynamo} spaceMetricsDynamo
*
* @typedef {import("ava").TestFn<Awaited<Context>>} TestContextFn
* @typedef {import("ava").TestFn<Awaited<RoundaboutContext>>} TestRoundaboutContextFn
Expand Down
28 changes: 28 additions & 0 deletions test/helpers/metrics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { getTableItem, getAllTableRows } from './table.js'

/**
* @param {import("ava").ExecutionContext<import("./context.js").MetricsContext>} t
*/
export async function getMetrics (t) {
const metrics = await getAllTableRows(
t.context.metricsDynamo.client,
t.context.metricsDynamo.tableName
)

return metrics
}

/**
* @param {import("ava").ExecutionContext<import("./context.js").MetricsContext>} t
* @param {`did:${string}:${string}`} spaceDid
* @param {string} name
*/
export async function getSpaceMetrics (t, spaceDid, name) {
const item = await getTableItem(
t.context.spaceMetricsDynamo.client,
t.context.spaceMetricsDynamo.tableName,
{ space: spaceDid, name }
)

return item
}
Loading

0 comments on commit 2b904f0

Please sign in to comment.