Skip to content

Commit

Permalink
Sync and cleanup rework
Browse files Browse the repository at this point in the history
  • Loading branch information
Lezek123 committed Nov 25, 2024
1 parent 53fbddf commit 29a12fb
Show file tree
Hide file tree
Showing 8 changed files with 439 additions and 299 deletions.
71 changes: 40 additions & 31 deletions storage-node/src/services/archive/ArchiveService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
OBJECTS_TRACKING_FILENAME,
} from './tracking'
import { QueryNodeApi } from '../queryNode/api'
import { getStorageObligationsFromRuntime } from '../sync/storageObligations'
import { DataObjectDetailsLoader, getStorageObligationsFromRuntime } from '../sync/storageObligations'
import { getDownloadTasks } from '../sync/synchronizer'
import sleep from 'sleep-promise'
import { Logger } from 'winston'
Expand Down Expand Up @@ -369,40 +369,49 @@ export class ArchiveService {
public async performSync(): Promise<void> {
const model = await getStorageObligationsFromRuntime(this.queryNodeApi)

const assignedObjects = model.dataObjects
const added = assignedObjects.filter((obj) => !this.objectTrackingService.isTracked(obj.id))
added.sort((a, b) => parseInt(b.id) - parseInt(a.id))
const assignedObjectsIds = await model.createAssignedObjectsIdsLoader(true).getAll()
const unsyncedIds = assignedObjectsIds
.filter((id) => !this.objectTrackingService.isTracked(id))
.map((id) => parseInt(id))
.sort((a, b) => a - b)

this.logger.info(`Sync - new objects: ${added.length}`)
this.logger.info(`Sync - new objects: ${unsyncedIds.length}`)

// Add new download tasks while the upload dir size limit allows
while (added.length) {
const uploadDirectorySize = await this.getUploadDirSize()
while (true) {
const object = added.pop()
if (!object) {
break
}
if (object.size + uploadDirectorySize + this.syncQueueObjectsSize > this.uploadDirSizeLimit) {
this.logger.debug(
`Waiting for some disk space to free ` +
`(upload_dir: ${uploadDirectorySize} / ${this.uploadDirSizeLimit}, ` +
`sync_q=${this.syncQueueObjectsSize}, obj_size=${object.size})... `
// Sync objects in batches of 10_000
for (const unsyncedIdsBatch of _.chunk(unsyncedIds, 10_000)) {
const objectsBatchLoader = new DataObjectDetailsLoader(this.queryNodeApi, {
by: 'ids',
ids: unsyncedIdsBatch.map((id) => id.toString()),
})
const objectsBatch = await objectsBatchLoader.getAll()
// Add new download tasks while the upload dir size limit allows
while (objectsBatch.length) {
const uploadDirectorySize = await this.getUploadDirSize()
while (true) {
const object = objectsBatch.pop()
if (!object) {
break
}
if (object.size + uploadDirectorySize + this.syncQueueObjectsSize > this.uploadDirSizeLimit) {
this.logger.debug(
`Waiting for some disk space to free ` +
`(upload_dir: ${uploadDirectorySize} / ${this.uploadDirSizeLimit}, ` +
`sync_q=${this.syncQueueObjectsSize}, obj_size=${object.size})... `
)
objectsBatch.push(object)
await sleep(60_000)
break
}
const [downloadTask] = await getDownloadTasks(
model,
[object],
this.uploadQueueDir,
this.tmpDownloadDir,
this.syncWorkersTimeout,
this.hostId
)
added.push(object)
await sleep(60_000)
break
await this.addDownloadTask(downloadTask, object.size)
}
const [downloadTask] = await getDownloadTasks(
model,
[],
[object],
this.uploadQueueDir,
this.tmpDownloadDir,
this.syncWorkersTimeout,
this.hostId
)
await this.addDownloadTask(downloadTask, object.size)
}
}
}
Expand Down
129 changes: 86 additions & 43 deletions storage-node/src/services/queryNode/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,26 @@ import fetch from 'cross-fetch'
import stringify from 'fast-safe-stringify'
import logger from '../logger'
import {
DataObjectByBagIdsDetailsFragment,
DataObjectDetailsFragment,
DataObjectIdsByBagId,
DataObjectIdsByBagIdQuery,
DataObjectIdsByBagIdQueryVariables,
DataObjectsByBagsConnection,
DataObjectsByBagsConnectionQuery,
DataObjectsByBagsConnectionQueryVariables,
DataObjectsByIdsConnection,
DataObjectsByIdsConnectionQuery,
DataObjectsByIdsConnectionQueryVariables,
DataObjectsWithBagDetailsByIds,
DataObjectsWithBagDetailsByIdsQuery,
DataObjectsWithBagDetailsByIdsQueryVariables,
DataObjectWithBagDetailsFragment,
GetAllStorageBagDetails,
GetAllStorageBagDetailsQuery,
GetAllStorageBagDetailsQueryVariables,
GetDataObjects,
GetDataObjectsByBagIds,
GetDataObjectsByBagIdsQuery,
GetDataObjectsByBagIdsQueryVariables,
GetDataObjectsDeletedEvents,
GetDataObjectsDeletedEventsQuery,
GetDataObjectsDeletedEventsQueryVariables,
GetDataObjectsQuery,
GetDataObjectsQueryVariables,
GetSquidVersion,
GetSquidVersionQuery,
GetSquidVersionQueryVariables,
Expand All @@ -41,7 +47,7 @@ import {
StorageBucketDetailsFragment,
StorageBucketIdsFragment,
} from './generated/queries'
import { Maybe, StorageBagWhereInput } from './generated/schema'
import { Maybe } from './generated/schema'

/**
* Defines query paging limits.
Expand All @@ -53,7 +59,7 @@ type PaginationQueryVariables = {
lastCursor?: Maybe<string>
}

type PaginationQueryResult<T = unknown> = {
export type PaginationQueryResult<T = unknown> = {
edges: { node: T }[]
pageInfo: {
hasNextPage: boolean
Expand Down Expand Up @@ -249,50 +255,87 @@ export class QueryNodeApi {
}

/**
* Returns data objects info by pages for the given bags.
* Gets a page of data objects belonging to specified bags.
*
* @param bagIds - query filter: bag IDs
*/
public async getDataObjectsByBagIds(bagIds: string[]): Promise<Array<DataObjectByBagIdsDetailsFragment>> {
const allBagIds = [...bagIds] // Copy to avoid modifying the original array
let fullResult: DataObjectByBagIdsDetailsFragment[] = []
while (allBagIds.length) {
const bagIdsBatch = allBagIds.splice(0, 1000)
const input: StorageBagWhereInput = { id_in: bagIdsBatch }
fullResult = [
...fullResult,
...(await this.multipleEntitiesQuery<GetDataObjectsByBagIdsQuery, GetDataObjectsByBagIdsQueryVariables>(
GetDataObjectsByBagIds,
{ bagIds: input },
'storageDataObjects'
)),
]
}
public async getDataObjectsByBagsPage<IncludeDetails extends boolean>(
bagIds: string[],
limit: number,
after: string | undefined,
includeDetails: IncludeDetails,
isAccepted?: boolean
): Promise<
IncludeDetails extends true
? PaginationQueryResult<DataObjectDetailsFragment> | null
: PaginationQueryResult<{ id: string }> | null
> {
return this.uniqueEntityQuery<DataObjectsByBagsConnectionQuery, DataObjectsByBagsConnectionQueryVariables>(
DataObjectsByBagsConnection,
{
bagIds: [...bagIds],
isAccepted,
limit,
after,
includeDetails: includeDetails,
},
'storageDataObjectsConnection'
)
}

return fullResult
/**
* Gets a page of data objects by the given list of dataObject IDs.
*
* @param ids - query filter: data object ids
*/
public async getDataObjectsByIdsPage<IncludeDetails extends boolean>(
ids: string[],
limit: number,
after: string | undefined,
includeDetails: IncludeDetails,
isAccepted?: boolean
): Promise<
IncludeDetails extends true
? PaginationQueryResult<DataObjectDetailsFragment> | null
: PaginationQueryResult<{ id: string }> | null
> {
return this.uniqueEntityQuery<DataObjectsByIdsConnectionQuery, DataObjectsByIdsConnectionQueryVariables>(
DataObjectsByIdsConnection,
{
ids: [...ids],
isAccepted,
limit,
after,
includeDetails: includeDetails,
},
'storageDataObjectsConnection'
)
}

/**
* Returns data objects info by pages for the given dataObject IDs.
* Returns a list of data objects by ids, with their corresponding bag details
*
* @param dataObjectIds - query filter: dataObject IDs
* @param ids - query filter: data object ids
*/
public async getDataObjectDetails(dataObjectIds: string[]): Promise<Array<DataObjectDetailsFragment>> {
const allDataObjectIds = [...dataObjectIds] // Copy to avoid modifying the original array
let fullResult: DataObjectDetailsFragment[] = []
while (allDataObjectIds.length) {
const dataObjectIdsBatch = allDataObjectIds.splice(0, 1000)
fullResult = [
...fullResult,
...(await this.multipleEntitiesQuery<GetDataObjectsQuery, GetDataObjectsQueryVariables>(
GetDataObjects,
{ dataObjectIds: dataObjectIdsBatch },
'storageDataObjects'
)),
]
}
public async getDataObjectsWithBagDetails(ids: string[]): Promise<DataObjectWithBagDetailsFragment[]> {
return this.multipleEntitiesQuery<
DataObjectsWithBagDetailsByIdsQuery,
DataObjectsWithBagDetailsByIdsQueryVariables
>(DataObjectsWithBagDetailsByIds, { ids: [...ids] }, 'storageDataObjects')
}

return fullResult
/**
* Returns a list of data object ids that belong to a given bag.
*
* @param bagId - query filter: bag ID
*/
public async getDataObjectIdsByBagId(bagId: string): Promise<string[]> {
const result = await this.multipleEntitiesQuery<DataObjectIdsByBagIdQuery, DataObjectIdsByBagIdQueryVariables>(
DataObjectIdsByBagId,
{ bagId },
'storageDataObjects'
)
return result.map((o) => o.id)
}

/**
Expand Down
76 changes: 65 additions & 11 deletions storage-node/src/services/queryNode/queries/queries.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@ query getAllStorageBagDetails {
}
}

fragment DataObjectByBagIdsDetails on StorageDataObject {
query dataObjectIdsByBagId($bagId: String) {
storageDataObjects(where: { storageBag: { id_eq: $bagId } }) {
id
}
}

fragment DataObjectDetails on StorageDataObject {
id
size
ipfsHash
Expand All @@ -68,13 +74,7 @@ fragment DataObjectByBagIdsDetails on StorageDataObject {
}
}

query getDataObjectsByBagIds($bagIds: StorageBagWhereInput) {
storageDataObjects(where: { storageBag: $bagIds, isAccepted_eq: true }) {
...DataObjectByBagIdsDetails
}
}

fragment DataObjectDetails on StorageDataObject {
fragment DataObjectWithBagDetails on StorageDataObject {
id
isAccepted
ipfsHash
Expand All @@ -83,9 +83,63 @@ fragment DataObjectDetails on StorageDataObject {
}
}

query getDataObjects($dataObjectIds: [String!]) {
storageDataObjects(where: { id_in: $dataObjectIds }) {
...DataObjectDetails
query dataObjectsByBagsConnection(
$bagIds: [String!]
$limit: Int
$after: String
$includeDetails: Boolean!
$isAccepted: Boolean
) {
storageDataObjectsConnection(
where: { storageBag: { id_in: $bagIds }, isAccepted_eq: $isAccepted }
first: $limit
after: $after
orderBy: id_ASC
) {
edges {
node {
id
...DataObjectDetails @include(if: $includeDetails)
}
}
pageInfo {
startCursor
endCursor
hasNextPage
}
}
}

query dataObjectsByIdsConnection(
$ids: [String!]
$limit: Int
$after: String
$includeDetails: Boolean!
$isAccepted: Boolean
) {
storageDataObjectsConnection(
where: { id_in: $ids, isAccepted_eq: $isAccepted }
first: $limit
after: $after
orderBy: id_ASC
) {
edges {
node {
id
...DataObjectDetails @include(if: $includeDetails)
}
}
pageInfo {
startCursor
endCursor
hasNextPage
}
}
}

query dataObjectsWithBagDetailsByIds($ids: [String!]) {
storageDataObjects(where: { id_in: $ids }) {
...DataObjectWithBagDetails
}
}

Expand Down
2 changes: 1 addition & 1 deletion storage-node/src/services/sync/acceptPendingObjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ export class AcceptPendingObjectsService {
}

private async processPendingObjects(pendingIds: string[]): Promise<PendingObjectDetails> {
const pendingDataObjects = await this.qnApi.getDataObjectDetails(pendingIds)
const pendingDataObjects = await this.qnApi.getDataObjectsWithBagDetails(pendingIds)

// objects not found in the query node
const maybeDeletedObjectIds = pendingIds.filter(
Expand Down
Loading

0 comments on commit 29a12fb

Please sign in to comment.