Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/additional datasets #152

Merged
merged 9 commits into from
Jan 29, 2025
Merged
6 changes: 6 additions & 0 deletions .changeset/modern-items-applaud.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@deltadao/nautilus": minor
---

Adds functionality to compute on multiple datasets within one compute job
- Currently only datasets that are encrypted by the same provider are supported
40 changes: 40 additions & 0 deletions docs/pages/docs/guides/compute.mdx
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { Callout } from 'vocs/components'

# Compute to Data [Generate value on data that stays private]

The `Nautilus` instance we created in the setup step provides access to a `compute()` function that we can use to start new compute jobs.
Expand Down Expand Up @@ -53,6 +55,44 @@ const computeJob = await nautilus.compute({
const { jobId } = computeJob // [!code focus]
```

### Computation with multiple datasets

A compute job allows configuration to be run on mutliple datasets. This can be achieved by providing an array of `additionalDatasets` (see [ComputeAsset](/docs/api/ComputeAsset)) to the `compute()` function.

<Callout type='warning'>
Note, that currently the additional datasets to be used in the compute job need to be encrypted by the same compute service provider as the base dataset.
</Callout>

```ts twoslash
import { Nautilus } from '@deltadao/nautilus'
import { ComputeJob } from '@oceanprotocol/lib'
import { Wallet } from 'ethers'
const signer = new Wallet('')
const nautilus = await Nautilus.create(signer)
// ---cut---
const dataset = {
did: 'did:op:123abc...'
}
const algorithm = {
did: 'did:op:123abc...'
}

const additionalDatasets = [ // [!code focus]
{ // [!code focus]
did: 'did:op:additionalDid1...' // [!code focus]
}, // [!code focus]
{ // [!code focus]
did: 'did:op:additionalDid2...' // [!code focus]
} // [!code focus]
] // [!code focus]

const computeJob = await nautilus.compute({
dataset,
algorithm,
additionalDatasets // [!code focus]
}) as ComputeJob
```

## Get a compute job status

Now that you have a reference to any job you started, it is straight forward to monitor the status (see [`getComputeStatus`](/docs/api/nautilus/getComputeStatus)):
Expand Down
148 changes: 113 additions & 35 deletions src/compute/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
import type { Signer } from 'ethers'
import type {
AssetWithAccessDetails,
AssetWithAccessDetailsAndPrice,
ComputeConfig,
ComputeResultConfig,
ComputeStatusConfig,
Expand Down Expand Up @@ -81,8 +82,7 @@ export async function compute(computeConfig: ComputeConfig) {
const dataset = assets.find((asset) => asset.id === datasetDid)
const algo = assets.find((asset) => asset.id === algorithmDid)

// TODO add functionality for additional datasets
const _additionalDatasets = additionalDatasetsConfig
const additionalDatasets = additionalDatasetsConfig
? assets.filter((asset) =>
additionalDatasetsConfig
.map((dataset) => dataset.did)
Expand All @@ -92,23 +92,29 @@ export async function compute(computeConfig: ComputeConfig) {

// 2. Check if the asset is orderable
// TODO: consider to do this first before loading all other assets
const computeService = getServiceByName(dataset, 'compute')
const allowed = await isOrderable(
// TODO consider isAllowed or similar for boolean
dataset,
computeService.id,
{
documentId: algo.id,
serviceId: algo.services[0].id
},
algo
)
LoggerInstance.debug('[compute] Is dataset orderable?', allowed)
if (!allowed)
const isDatasetOrderable = isComputeAssetOrderable(dataset, algo)
LoggerInstance.debug('[compute] Is dataset orderable?', isDatasetOrderable)

if (!isDatasetOrderable)
throw new Error(
'Dataset is not orderable in combination with given algorithm.'
)

for (const dataset of additionalDatasets) {
const isAdditionalDatasetOrderable = isComputeAssetOrderable(
Abrom8 marked this conversation as resolved.
Show resolved Hide resolved
dataset,
algo
)
LoggerInstance.debug(
'[compute] Is additional dataset orderable?',
isAdditionalDatasetOrderable
)
if (!isAdditionalDatasetOrderable)
throw new Error(
'Additional dataset is not orderable in combination with given algorithm.'
)
}

// 3. Initialize the provider
const computeEnv = await getComputeEnviroment(dataset)

Expand All @@ -117,7 +123,8 @@ export async function compute(computeConfig: ComputeConfig) {
dataset,
algo,
signerAddress,
computeEnv
computeEnv,
additionalDatasets
)

// 4. Get prices and fees for the assets
Expand All @@ -135,11 +142,19 @@ export async function compute(computeConfig: ComputeConfig) {
if (!algorithmWithPrice?.orderPriceAndFees)
throw new Error('Error setting algorithm price and fees!')

// TODO remove? never used. maybe missing feature to check if datatoken already in wallet?
const _algoDatatokenBalance = await getDatatokenBalance(
signer,
algo.services[0].datatokenAddress
)
const additionalDatasetsWithPrice: AssetWithAccessDetailsAndPrice[] = []
for (const additionalDataset of additionalDatasets) {
const additionalDatasetWithPrice = await getAssetWithPrice(
additionalDataset,
signer,
chainConfig,
getProviderInitResultsForDataset(
providerInitializeResults.datasets,
additionalDataset
).providerFee
)
additionalDatasetsWithPrice.push(additionalDatasetWithPrice)
}

// TODO ==== Extract asset ordering start ====
const algorithmOrderTx = await handleComputeOrder(
Expand All @@ -153,23 +168,47 @@ export async function compute(computeConfig: ComputeConfig) {
)
if (!algorithmOrderTx) throw new Error('Failed to order algorithm.')

// TODO remove? never used. maybe missing feature to check if datatoken already in wallet?
const _datasetDatatokenBalance = await getDatatokenBalance(
signer,
algo.services[0].datatokenAddress
)

const datasetOrderTx = await handleComputeOrder(
signer,
dataset,
datasetWithPrice?.orderPriceAndFees,
signerAddress,
providerInitializeResults.datasets[0],
getProviderInitResultsForDataset(
providerInitializeResults.datasets,
dataset
),
chainConfig,
computeEnv.consumerAddress
)
if (!datasetOrderTx) throw new Error('Failed to order dataset.')

const additionalDatasetOrderTxs: {
documentId: string
orderTx: string
}[] = []
for (const additionalDatasetWithPrice of additionalDatasetsWithPrice) {
const orderTx = await handleComputeOrder(
signer,
additionalDatasetWithPrice,
additionalDatasetWithPrice?.orderPriceAndFees,
signerAddress,
getProviderInitResultsForDataset(
providerInitializeResults.datasets,
additionalDatasetWithPrice
),
chainConfig,
computeEnv.consumerAddress
)
if (!orderTx)
throw new Error(
`Failed to order additional dataset with id ${additionalDatasetWithPrice.id}.`
)
additionalDatasetOrderTxs.push({
documentId: additionalDatasetWithPrice.id,
orderTx
})
}

// ==== Extract asset ordering end ====

// TODO ==== Extract compute job execution start ====
Expand All @@ -186,6 +225,19 @@ export async function compute(computeConfig: ComputeConfig) {
publishOutput: true // TODO should be configuarable
}

const additionalComputeAssets: ComputeAsset[] = []
for (const additionalDataset of additionalDatasets) {
const additionalComputeAsset: ComputeAsset = {
documentId: additionalDataset.id,
serviceId: additionalDataset.services[0].id,
transferTxId: additionalDatasetOrderTxs.find(
(order) => order.documentId === additionalDataset.id
).orderTx,
...additionalDataset
}
additionalComputeAssets.push(additionalComputeAsset)
}

const response = await startComputeJob(
dataset.services[0].serviceEndpoint,
computeAsset,
Expand All @@ -197,7 +249,8 @@ export async function compute(computeConfig: ComputeConfig) {
},
signer,
computeEnv,
output
output,
additionalComputeAssets
)

// ==== Extract compute job execution end ====
Expand All @@ -209,6 +262,24 @@ export async function compute(computeConfig: ComputeConfig) {
}
}

async function isComputeAssetOrderable(
asset: AssetWithAccessDetails,
algorithm: AssetWithAccessDetails
) {
const computeService = getServiceByName(asset, 'compute')
const isAllowed = await isOrderable(
asset,
computeService.id,
{
documentId: algorithm.id,
serviceId: algorithm.services[0].id
},
algorithm
)
LoggerInstance.debug('[compute] Is dataset orderable?', isAllowed)
return isAllowed
}

async function getComputeAssetPrices(
algo: AssetWithAccessDetails,
dataset: AssetWithAccessDetails,
Expand All @@ -218,13 +289,11 @@ async function getComputeAssetPrices(
) {
LoggerInstance.debug('Initializing provider for compute')

// get fees for dataset from providerInitializeResults.datasets array
const datatokenAddresses = dataset.datatokens.map((dt) => dt.address)

const datasetInitializeResult = providerInitializeResults.datasets.find(
(initializeResult) =>
datatokenAddresses.includes(initializeResult.datatoken)
const datasetInitializeResult = getProviderInitResultsForDataset(
providerInitializeResults.datasets,
dataset
)

const datasetWithPrice = await getAssetWithPrice(
dataset,
signer,
Expand Down Expand Up @@ -415,3 +484,12 @@ export async function stopCompute(stopComputeConfig: StopComputeConfig) {

return await stopComputeJob(providerUri, did, jobId, signer)
}

function getProviderInitResultsForDataset(
providerInitResultDatasets: ProviderComputeInitializeResults['datasets'],
dataset: AssetWithAccessDetails
): ProviderComputeInitialize {
return providerInitResultDatasets.find((initResult) =>
dataset.datatokens.map((dt) => dt.address).includes(initResult.datatoken)
)
}
19 changes: 15 additions & 4 deletions src/utils/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,23 @@ export async function initializeProviderForCompute(
dataset: AssetWithAccessDetails,
algorithm: AssetWithAccessDetails,
accountId: string,
computeEnv: ComputeEnvironment = null
computeEnv: ComputeEnvironment = null,
additionalDatasets?: AssetWithAccessDetails[]
): Promise<ProviderComputeInitializeResults> {
const computeAsset: ComputeAsset = {
documentId: dataset.id,
serviceId: dataset.services[0].id,
transferTxId: dataset.accessDetails.validOrderTx
}

const additionalComputeAssets: ComputeAsset[] = additionalDatasets.map(
(dataset) => ({
documentId: dataset.id,
serviceId: dataset.services[0].id,
transferTxId: dataset.accessDetails.validOrderTx
})
)

const computeAlgo: ComputeAlgorithm = {
documentId: algorithm.id,
serviceId: algorithm.services[0].id,
Expand All @@ -117,7 +127,7 @@ export async function initializeProviderForCompute(

try {
return await ProviderInstance.initializeCompute(
[computeAsset],
[computeAsset, ...additionalComputeAssets],
computeAlgo,
computeEnv?.id,
validUntil,
Expand All @@ -138,7 +148,8 @@ export async function startComputeJob(
algorithm: ComputeAlgorithm,
signer: Signer,
computeEnv: ComputeEnvironment,
output: ComputeOutput
output: ComputeOutput,
additionalDatasets?: ComputeAsset[]
): Promise<ComputeJob | ComputeJob[]> {
try {
return await ProviderInstance.computeStart(
Expand All @@ -148,7 +159,7 @@ export async function startComputeJob(
dataset,
algorithm,
null,
null,
additionalDatasets,
output
)
} catch (error) {
Expand Down
Loading