Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add blob protocol to infra #353

Merged
merged 4 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion billing/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"@sentry/serverless": "^7.74.1",
"@ucanto/interface": "^10.0.1",
"@ucanto/server": "^10.0.0",
"@web3-storage/capabilities": "^13.3.0",
"@web3-storage/capabilities": "^13.3.1",
"big.js": "^6.2.1",
"multiformats": "^13.1.0",
"p-retry": "^6.2.0",
Expand Down
568 changes: 219 additions & 349 deletions package-lock.json

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
"@ipld/dag-ucan": "^3.0.1",
"@tsconfig/node16": "^1.0.3",
"@types/git-rev-sync": "^2.0.0",
"@ucanto/client": "^9.0.0",
"@ucanto/core": "^9.0.1",
"@ucanto/interface": "^9.0.0",
"@ucanto/principal": "^9.0.0",
"@ucanto/transport": "^9.0.0",
"@ucanto/validator": "^9.0.1",
"@ucanto/client": "^9.0.1",
"@ucanto/core": "^10.0.1",
"@ucanto/interface": "^10.0.1",
"@ucanto/principal": "^9.0.1",
"@ucanto/transport": "^9.1.1",
"@ucanto/validator": "^9.0.2",
"@web-std/blob": "^3.0.4",
"@web-std/fetch": "^4.1.0",
"@web3-storage/data-segment": "5.0.0",
Expand Down
5 changes: 4 additions & 1 deletion stacks/upload-api-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export function UploadApiStack({ stack, app }) {

// Get references to constructs created in other stacks
const { carparkBucket } = use(CarparkStack)
const { storeTable, uploadTable, delegationBucket, delegationTable, revocationTable, adminMetricsTable, spaceMetricsTable, consumerTable, subscriptionTable, rateLimitTable, pieceTable, privateKey } = use(UploadDbStack)
const { allocationTable, storeTable, uploadTable, delegationBucket, delegationTable, revocationTable, adminMetricsTable, spaceMetricsTable, consumerTable, subscriptionTable, rateLimitTable, pieceTable, privateKey } = use(UploadDbStack)
const { invocationBucket, taskBucket, workflowBucket, ucanStream } = use(UcanInvocationStack)
const { customerTable, spaceDiffTable, spaceSnapshotTable, stripeSecretKey } = use(BillingDbStack)
const { pieceOfferQueue, filecoinSubmitQueue } = use(FilecoinStack)
Expand All @@ -41,6 +41,7 @@ export function UploadApiStack({ stack, app }) {
defaults: {
function: {
permissions: [
allocationTable,
storeTable,
uploadTable,
customerTable,
Expand All @@ -66,6 +67,7 @@ export function UploadApiStack({ stack, app }) {
environment: {
DID: process.env.UPLOAD_API_DID ?? '',
AGGREGATOR_DID,
ALLOCATION_TABLE_NAME: allocationTable.tableName,
STORE_TABLE_NAME: storeTable.tableName,
STORE_BUCKET_NAME: carparkBucket.bucketName,
UPLOAD_TABLE_NAME: uploadTable.tableName,
Expand All @@ -92,6 +94,7 @@ export function UploadApiStack({ stack, app }) {
COMMIT: git.commmit,
STAGE: stack.stage,
ACCESS_SERVICE_URL: getServiceURL(stack, customDomain) ?? '',
UPLOAD_SERVICE_URL: getServiceURL(stack, customDomain) ?? '',
POSTMARK_TOKEN: process.env.POSTMARK_TOKEN ?? '',
PROVIDERS: process.env.PROVIDERS ?? '',
R2_ACCESS_KEY_ID: process.env.R2_ACCESS_KEY_ID ?? '',
Expand Down
8 changes: 8 additions & 0 deletions stacks/upload-db-stack.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Table, Bucket, Config } from 'sst/constructs'

import {
allocationTableProps,
storeTableProps,
uploadTableProps,
consumerTableProps,
Expand Down Expand Up @@ -31,6 +32,12 @@ export function UploadDbStack({ stack, app }) {
// TODO: we should look into creating a trust layer for content claims
const contentClaimsPrivateKey = new Config.Secret(stack, 'CONTENT_CLAIMS_PRIVATE_KEY')

/**
* The allocation table tracks allocated multihashes per space.
* Used by the blob/* service capabilities.
*/
const allocationTable = new Table(stack, 'allocation', allocationTableProps)

/**
* This table takes a stored CAR and makes an entry in the store table
* Used by the store/* service capabilities.
Expand Down Expand Up @@ -99,6 +106,7 @@ export function UploadDbStack({ stack, app }) {
const spaceMetricsTable = new Table(stack, 'space-metrics', spaceMetricsTableProps)

return {
allocationTable,
storeTable,
uploadTable,
pieceTable,
Expand Down
45 changes: 42 additions & 3 deletions upload-api/functions/ucan-invocation-router.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import { createUcantoServer } from '../service.js'
import { Config } from 'sst/node/config'
import { CAR, Legacy, Codec } from '@ucanto/transport'
import { Email } from '../email.js'
import { createTasksStorage } from '../stores/tasks.js'
import { createReceiptsStorage } from '../stores/receipts.js'
import { createAllocationsStorage } from '../stores/allocations.js'
import { createBlobsStorage, composeblobStoragesWithOrderedHas } from '../stores/blobs.js'
import { useProvisionStore } from '../stores/provisions.js'
import { useSubscriptionsStore } from '../stores/subscriptions.js'
import { createDelegationsTable } from '../tables/delegations.js'
Expand All @@ -39,6 +43,7 @@ import { createSpaceDiffStore } from '@web3-storage/w3infra-billing/tables/space
import { createSpaceSnapshotStore } from '@web3-storage/w3infra-billing/tables/space-snapshot.js'
import { useUsageStore } from '../stores/usage.js'
import { createStripeBillingProvider } from '../billing.js'
import { createTasksScheduler } from '../scheduler.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -99,6 +104,7 @@ export async function ucanInvocationRouter(request) {
storeTableName,
storeBucketName,
uploadTableName,
allocationTableName,
consumerTableName,
customerTableName,
subscriptionTableName,
Expand All @@ -122,6 +128,7 @@ export async function ucanInvocationRouter(request) {
aggregatorDid,
dealTrackerDid,
dealTrackerUrl,
uploadServiceURL,
pieceOfferQueueUrl,
filecoinSubmitQueueUrl,
requirePaymentPlan,
Expand All @@ -144,6 +151,22 @@ export async function ucanInvocationRouter(request) {
const { PRIVATE_KEY, STRIPE_SECRET_KEY } = Config
const serviceSigner = getServiceSigner({ did: UPLOAD_API_DID, privateKey: PRIVATE_KEY })

const tasksStorage = createTasksStorage(AWS_REGION, invocationBucketName, workflowBucketName)
const receiptsStorage = createReceiptsStorage(AWS_REGION, taskBucketName, invocationBucketName, workflowBucketName)
const allocationsStorage = createAllocationsStorage(AWS_REGION, allocationTableName, {
endpoint: dbEndpoint,
})
const blobsStorage = composeblobStoragesWithOrderedHas(
createBlobsStorage(R2_REGION, carparkBucketName, {
endpoint: carparkBucketEndpoint,
credentials: {
accessKeyId: carparkBucketAccessKeyId,
secretAccessKey: carparkBucketSecretAccessKey,
},
}),
createBlobsStorage(AWS_REGION, storeBucketName),
)

const invocationBucket = createInvocationStore(
AWS_REGION,
invocationBucketName
Expand Down Expand Up @@ -172,16 +195,29 @@ export async function ucanInvocationRouter(request) {
const spaceSnapshotStore = createSpaceSnapshotStore({ region: AWS_REGION }, { tableName: spaceSnapshotTableName })
const usageStorage = useUsageStore({ spaceDiffStore, spaceSnapshotStore })

const connection = getServiceConnection({
const dealTrackerConnection = getServiceConnection({
did: dealTrackerDid,
url: dealTrackerUrl
})
const selfConnection = getServiceConnection({
did: serviceSigner.did(),
url: uploadServiceURL
})
const tasksScheduler = createTasksScheduler(() => selfConnection)

const server = createUcantoServer(serviceSigner, {
codec,
allocationsStorage,
blobsStorage,
tasksStorage,
receiptsStorage,
tasksScheduler,
getServiceConnection: () => selfConnection,
// TODO: to be deprecated with `store/*` protocol
storeTable: createStoreTable(AWS_REGION, storeTableName, {
endpoint: dbEndpoint,
}),
// TODO: to be deprecated with `store/*` protocol
carStoreBucket: composeCarStoresWithOrderedHas(
createCarStore(AWS_REGION, storeBucketName),
createCarStore(R2_REGION, carparkBucketName, {
Expand All @@ -192,6 +228,7 @@ export async function ucanInvocationRouter(request) {
},
}),
),
// TODO: to be deprecated with `store/*` protocol
dudewhereBucket: createDudewhereStore(R2_REGION, R2_DUDEWHERE_BUCKET_NAME, {
endpoint: R2_ENDPOINT,
credentials: {
Expand All @@ -218,10 +255,10 @@ export async function ucanInvocationRouter(request) {
pieceOfferQueue: createPieceOfferQueueClient({ region: AWS_REGION }, { queueUrl: pieceOfferQueueUrl }),
filecoinSubmitQueue: createFilecoinSubmitQueueClient({ region: AWS_REGION }, { queueUrl: filecoinSubmitQueueUrl }),
dealTrackerService: {
connection,
connection: dealTrackerConnection,
invocationConfig: {
issuer: serviceSigner,
audience: connection.id,
audience: dealTrackerConnection.id,
with: serviceSigner.did()
}
},
Expand Down Expand Up @@ -316,6 +353,7 @@ function getLambdaEnv () {
storeTableName: mustGetEnv('STORE_TABLE_NAME'),
storeBucketName: mustGetEnv('STORE_BUCKET_NAME'),
uploadTableName: mustGetEnv('UPLOAD_TABLE_NAME'),
allocationTableName: mustGetEnv('ALLOCATION_TABLE_NAME'),
consumerTableName: mustGetEnv('CONSUMER_TABLE_NAME'),
customerTableName: mustGetEnv('CUSTOMER_TABLE_NAME'),
subscriptionTableName: mustGetEnv('SUBSCRIPTION_TABLE_NAME'),
Expand All @@ -339,6 +377,7 @@ function getLambdaEnv () {
postmarkToken: mustGetEnv('POSTMARK_TOKEN'),
providers: mustGetEnv('PROVIDERS'),
accessServiceURL: mustGetEnv('ACCESS_SERVICE_URL'),
uploadServiceURL: mustGetEnv('UPLOAD_SERVICE_URL'),
aggregatorDid: mustGetEnv('AGGREGATOR_DID'),
requirePaymentPlan: (process.env.REQUIRE_PAYMENT_PLAN === 'true'),
dealTrackerDid: mustGetEnv('DEAL_TRACKER_DID'),
Expand Down
9 changes: 5 additions & 4 deletions upload-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
"@ucanto/transport": "^9.1.1",
"@ucanto/validator": "^9.0.2",
"@web-std/fetch": "^4.1.0",
"@web3-storage/access": "^18.3.0",
"@web3-storage/capabilities": "^13.3.0",
"@web3-storage/access": "^18.3.1",
"@web3-storage/capabilities": "^13.3.1",
"@web3-storage/did-mailto": "^2.1.0",
"@web3-storage/upload-api": "^9.0.1",
"@web3-storage/upload-api": "^9.1.5",
"multiformats": "^13.1.0",
"nanoid": "^5.0.2",
"preact": "^10.14.1",
Expand Down Expand Up @@ -65,7 +65,8 @@
"eslintConfig": {
"rules": {
"unicorn/consistent-destructuring": "off",
"unicorn/prefer-array-flat-map": "off"
"unicorn/prefer-array-flat-map": "off",
"unicorn/no-useless-undefined": "off"
}
}
}
46 changes: 46 additions & 0 deletions upload-api/scheduler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* @typedef {import('@web3-storage/upload-api/types').TasksScheduler} TasksSchedulerInterface
* @typedef {import('@web3-storage/upload-api/types').Service} Service
* @typedef {import('@ucanto/interface').ConnectionView<Service>} Connection
* @typedef {import('@ucanto/interface').ServiceInvocation} ServiceInvocation
* @typedef {import('@ucanto/interface').Failure} Failure
* @typedef {import('@ucanto/interface').Unit} Unit
* @typedef {import('@ucanto/interface').Result<Unit, Failure>} Result
*/

/**
* @param {() => Connection} getServiceConnection
*/
export const createTasksScheduler = (getServiceConnection) => new TasksScheduler(getServiceConnection)

/**
* @implements {TasksSchedulerInterface}
*/
export class TasksScheduler {
/**
*
* @param {() => Connection} getServiceConnection
*/
constructor (getServiceConnection) {
this.getServiceConnection = getServiceConnection
}

/**
* @param {ServiceInvocation} invocation
* @returns {Promise<Result>}
*/
async schedule(invocation) {
const connection = this.getServiceConnection()
// This performs a HTTP Request to the Service URL.
// upload-api service URL stores received invocations and produced
// receipts on the server.
const [res] = await connection.execute(invocation)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this write a receipt into receipt storage ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not needed. Sadly this looks then a bit confusing and looking forward to have some kind of receipt storage part of ucanto/scheduler later on. Here this does a HTTP request executing the invocation, which goes through the ucan-invocation-router and is hooked up with the receipt storage: https://github.com/web3-storage/w3infra/blob/main/upload-api/functions/ucan-invocation-router.js#L280

I will add a comment to explain this


if (res.out.error) {
return res.out
}
return {
ok: {},
}
}
}
Loading
Loading