Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/additional datasets #152

Merged
merged 9 commits into from
Jan 29, 2025
5 changes: 5 additions & 0 deletions .changeset/modern-items-applaud.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@deltadao/nautilus": minor
---

Adds functionality to compute on multiple datasets within the same compute job
148 changes: 113 additions & 35 deletions src/compute/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
import type { Signer } from 'ethers'
import type {
AssetWithAccessDetails,
AssetWithAccessDetailsAndPrice,
ComputeConfig,
ComputeResultConfig,
ComputeStatusConfig,
Expand Down Expand Up @@ -81,8 +82,7 @@ export async function compute(computeConfig: ComputeConfig) {
const dataset = assets.find((asset) => asset.id === datasetDid)
const algo = assets.find((asset) => asset.id === algorithmDid)

// TODO add functionality for additional datasets
const _additionalDatasets = additionalDatasetsConfig
const additionalDatasets = additionalDatasetsConfig
? assets.filter((asset) =>
additionalDatasetsConfig
.map((dataset) => dataset.did)
Expand All @@ -92,23 +92,29 @@ export async function compute(computeConfig: ComputeConfig) {

// 2. Check if the asset is orderable
// TODO: consider to do this first before loading all other assets
const computeService = getServiceByName(dataset, 'compute')
const allowed = await isOrderable(
// TODO consider isAllowed or similar for boolean
dataset,
computeService.id,
{
documentId: algo.id,
serviceId: algo.services[0].id
},
algo
)
LoggerInstance.debug('[compute] Is dataset orderable?', allowed)
if (!allowed)
const isDatasetOrderable = isComputeAssetOrderable(dataset, algo)
LoggerInstance.debug('[compute] Is dataset orderable?', isDatasetOrderable)

if (!isDatasetOrderable)
throw new Error(
'Dataset is not orderable in combination with given algorithm.'
)

for (const dataset of additionalDatasets) {
const isAdditionalDatasetOrderable = isComputeAssetOrderable(
Abrom8 marked this conversation as resolved.
Show resolved Hide resolved
dataset,
algo
)
LoggerInstance.debug(
'[compute] Is additional dataset orderable?',
isAdditionalDatasetOrderable
)
if (!isAdditionalDatasetOrderable)
throw new Error(
'Additional dataset is not orderable in combination with given algorithm.'
)
}

// 3. Initialize the provider
const computeEnv = await getComputeEnviroment(dataset)

Expand All @@ -117,7 +123,8 @@ export async function compute(computeConfig: ComputeConfig) {
dataset,
algo,
signerAddress,
computeEnv
computeEnv,
additionalDatasets
)

// 4. Get prices and fees for the assets
Expand All @@ -135,11 +142,19 @@ export async function compute(computeConfig: ComputeConfig) {
if (!algorithmWithPrice?.orderPriceAndFees)
throw new Error('Error setting algorithm price and fees!')

// TODO remove? never used. maybe missing feature to check if datatoken already in wallet?
const _algoDatatokenBalance = await getDatatokenBalance(
signer,
algo.services[0].datatokenAddress
)
const additionalDatasetsWithPrice: AssetWithAccessDetailsAndPrice[] = []
for (const additionalDataset of additionalDatasets) {
const additionalDatasetWithPrice = await getAssetWithPrice(
additionalDataset,
signer,
chainConfig,
getProviderInitResultsForDataset(
providerInitializeResults.datasets,
additionalDataset
).providerFee
)
additionalDatasetsWithPrice.push(additionalDatasetWithPrice)
}

// TODO ==== Extract asset ordering start ====
const algorithmOrderTx = await handleComputeOrder(
Expand All @@ -153,23 +168,47 @@ export async function compute(computeConfig: ComputeConfig) {
)
if (!algorithmOrderTx) throw new Error('Failed to order algorithm.')

// TODO remove? never used. maybe missing feature to check if datatoken already in wallet?
const _datasetDatatokenBalance = await getDatatokenBalance(
signer,
algo.services[0].datatokenAddress
)

const datasetOrderTx = await handleComputeOrder(
signer,
dataset,
datasetWithPrice?.orderPriceAndFees,
signerAddress,
providerInitializeResults.datasets[0],
getProviderInitResultsForDataset(
providerInitializeResults.datasets,
dataset
),
chainConfig,
computeEnv.consumerAddress
)
if (!datasetOrderTx) throw new Error('Failed to order dataset.')

const additionalDatasetOrderTxs: {
documentId: string
orderTx: string
}[] = []
for (const additionalDatasetWithPrice of additionalDatasetsWithPrice) {
const orderTx = await handleComputeOrder(
signer,
additionalDatasetWithPrice,
additionalDatasetWithPrice?.orderPriceAndFees,
signerAddress,
getProviderInitResultsForDataset(
providerInitializeResults.datasets,
additionalDatasetWithPrice
),
chainConfig,
computeEnv.consumerAddress
)
if (!orderTx)
throw new Error(
`Failed to order additional dataset with id ${additionalDatasetWithPrice.id}.`
)
additionalDatasetOrderTxs.push({
documentId: additionalDatasetWithPrice.id,
orderTx
})
}

// ==== Extract asset ordering end ====

// TODO ==== Extract compute job execution start ====
Expand All @@ -186,6 +225,19 @@ export async function compute(computeConfig: ComputeConfig) {
publishOutput: true // TODO should be configuarable
}

const additionalComputeAssets: ComputeAsset[] = []
for (const additionalDataset of additionalDatasets) {
const additionalComputeAsset: ComputeAsset = {
documentId: additionalDataset.id,
serviceId: additionalDataset.services[0].id,
transferTxId: additionalDatasetOrderTxs.find(
(order) => order.documentId === additionalDataset.id
).orderTx,
...additionalDataset
}
additionalComputeAssets.push(additionalComputeAsset)
}

const response = await startComputeJob(
dataset.services[0].serviceEndpoint,
computeAsset,
Expand All @@ -197,7 +249,8 @@ export async function compute(computeConfig: ComputeConfig) {
},
signer,
computeEnv,
output
output,
additionalComputeAssets
)

// ==== Extract compute job execution end ====
Expand All @@ -209,6 +262,24 @@ export async function compute(computeConfig: ComputeConfig) {
}
}

async function isComputeAssetOrderable(
asset: AssetWithAccessDetails,
algorithm: AssetWithAccessDetails
) {
const computeService = getServiceByName(asset, 'compute')
const isAllowed = await isOrderable(
asset,
computeService.id,
{
documentId: algorithm.id,
serviceId: algorithm.services[0].id
},
algorithm
)
LoggerInstance.debug('[compute] Is dataset orderable?', isAllowed)
return isAllowed
}

async function getComputeAssetPrices(
algo: AssetWithAccessDetails,
dataset: AssetWithAccessDetails,
Expand All @@ -218,13 +289,11 @@ async function getComputeAssetPrices(
) {
LoggerInstance.debug('Initializing provider for compute')

// get fees for dataset from providerInitializeResults.datasets array
const datatokenAddresses = dataset.datatokens.map((dt) => dt.address)

const datasetInitializeResult = providerInitializeResults.datasets.find(
(initializeResult) =>
datatokenAddresses.includes(initializeResult.datatoken)
const datasetInitializeResult = getProviderInitResultsForDataset(
providerInitializeResults.datasets,
dataset
)

const datasetWithPrice = await getAssetWithPrice(
dataset,
signer,
Expand Down Expand Up @@ -415,3 +484,12 @@ export async function stopCompute(stopComputeConfig: StopComputeConfig) {

return await stopComputeJob(providerUri, did, jobId, signer)
}

function getProviderInitResultsForDataset(
providerInitResultDatasets: ProviderComputeInitializeResults['datasets'],
dataset: AssetWithAccessDetails
): ProviderComputeInitialize {
return providerInitResultDatasets.find((initResult) =>
dataset.datatokens.map((dt) => dt.address).includes(initResult.datatoken)
)
}
19 changes: 15 additions & 4 deletions src/utils/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,23 @@ export async function initializeProviderForCompute(
dataset: AssetWithAccessDetails,
algorithm: AssetWithAccessDetails,
accountId: string,
computeEnv: ComputeEnvironment = null
computeEnv: ComputeEnvironment = null,
additionalDatasets?: AssetWithAccessDetails[]
): Promise<ProviderComputeInitializeResults> {
const computeAsset: ComputeAsset = {
documentId: dataset.id,
serviceId: dataset.services[0].id,
transferTxId: dataset.accessDetails.validOrderTx
}

const additionalComputeAssets: ComputeAsset[] = additionalDatasets.map(
(dataset) => ({
documentId: dataset.id,
serviceId: dataset.services[0].id,
transferTxId: dataset.accessDetails.validOrderTx
})
)

const computeAlgo: ComputeAlgorithm = {
documentId: algorithm.id,
serviceId: algorithm.services[0].id,
Expand All @@ -117,7 +127,7 @@ export async function initializeProviderForCompute(

try {
return await ProviderInstance.initializeCompute(
[computeAsset],
[computeAsset, ...additionalComputeAssets],
computeAlgo,
computeEnv?.id,
validUntil,
Expand All @@ -138,7 +148,8 @@ export async function startComputeJob(
algorithm: ComputeAlgorithm,
signer: Signer,
computeEnv: ComputeEnvironment,
output: ComputeOutput
output: ComputeOutput,
additionalDatasets?: ComputeAsset[]
): Promise<ComputeJob | ComputeJob[]> {
try {
return await ProviderInstance.computeStart(
Expand All @@ -148,7 +159,7 @@ export async function startComputeJob(
dataset,
algorithm,
null,
null,
additionalDatasets,
output
)
} catch (error) {
Expand Down
67 changes: 67 additions & 0 deletions test/integration/compute-flow.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ describe('Compute Flow Integration', async function () {
let nautilusAlgoPublisher: Nautilus

let computeDatasetDid: string
let additionalComputeDatasetDid: string
let computeAlgorithmDid: string
let computeJobId: string

Expand Down Expand Up @@ -182,6 +183,72 @@ describe('Compute Flow Integration', async function () {
// TODO: either increase timeout or introduce different solution to wait for ctd to finish
// takes longer on live test networks (e.g. Mumbai ~4 minutes)
.timeout(120000)

it('publishes a second compute dataset asset', async () => {
// serviceEndpoint to use for the test asset
const { providerUri } = nautilusDatasetPublisher.getOceanConfig()

// Create the "compute" service
const serviceBuilder = new ServiceBuilder({
serviceType: ServiceTypes.COMPUTE,
fileType: FileTypes.URL
})
const service = serviceBuilder
.setServiceEndpoint(providerUri)
.setTimeout(datasetService.timeout)
.addFile(datasetService.files[0])
.setPricing(await getPricing(datasetPublisherSigner, 'free'))
.build()

// configure the asset
const assetBuilder = new AssetBuilder()
const asset = assetBuilder
.setAuthor('testAuthor')
.setDescription('A compute dataset publishing test')
.setLicense('MIT')
.setName('Test Publish Compute Dataset')
.setOwner(datasetPublisherAddress)
.setType('dataset')
.setNftData(nftParams)
.addService(service)
.build()

// publish
const result = await nautilusDatasetPublisher.publish(asset)

expect(result).to.have.property('ddo').to.have.property('id')

additionalComputeDatasetDid = result.ddo.id
})

it('starts a compute job with multiple datasets', async () => {
// wait until DDOs are found in metadata cache
const aquarius = new Aquarius(
nautilusDatasetPublisher.getOceanConfig().metadataCacheUri
)
console.log(
`Waiting for aquarius at ${aquarius.aquariusURL} to cache algo and datasets...`
)
await aquarius.waitForIndexer(computeAlgorithmDid)
await aquarius.waitForIndexer(computeDatasetDid)
await aquarius.waitForIndexer(additionalComputeDatasetDid)

const startedJob = await nautilusDatasetPublisher.compute({
dataset: {
did: computeDatasetDid
},
algorithm: {
did: computeAlgorithmDid
},
additionalDatasets: [{ did: additionalComputeDatasetDid }]
})

const computeJob = Array.isArray(startedJob) ? startedJob[0] : startedJob

expect(computeJob).to.have.property('jobId')

computeJobId = computeJob.jobId
}).timeout(120000)
})

async function getStatusHelper(nautilus: Nautilus, jobId: string) {
Expand Down
Loading