Skip to content

Commit

Permalink
Archive script: Sync all objects, ignore bucket assignments
Browse files Browse the repository at this point in the history
  • Loading branch information
Lezek123 committed Nov 6, 2024
1 parent 7b7bf26 commit 5238b65
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 80 deletions.
2 changes: 1 addition & 1 deletion storage-node/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
### 4.3.0

- Adds `archive` mode / command, which allows downloading, compressing and uploading assigned data objects to an external S3 bucket that can be used as a backup.
- Adds `archive` mode / command, which allows downloading, compressing and uploading all data objects to an external S3 bucket that can be used as a backup.

### 4.2.0

Expand Down
75 changes: 6 additions & 69 deletions storage-node/src/commands/archive.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import { flags } from '@oclif/command'
import { ApiPromise } from '@polkadot/api'
import _ from 'lodash'
import path from 'path'
import { v4 as uuidv4 } from 'uuid'
import ApiCommandBase from '../command-base/ApiCommandBase'
import { customFlags } from '../command-base/CustomFlags'
import logger, { DatePatternByFrequency, Frequency, initNewLogger } from '../services/logger'
import { QueryNodeApi } from '../services/queryNode/api'
import { constructBucketToAddressMapping } from '../services/sync/storageObligations'
import { verifyWorkerId } from '../services/runtime/queries'
import { ArchiveService } from '../services/archive/ArchiveService'
import ExitCodes from './../command-base/ExitCodes'
Expand Down Expand Up @@ -38,13 +35,6 @@ export default class Archive extends ApiCommandBase {
description: 'Storage provider worker ID',
env: 'WORKER_ID',
}),
buckets: customFlags.integerArr({
char: 'b',
description:
'Comma separated list of bucket IDs to sync. Buckets that are not assigned to worker are ignored.\n' +
'If not specified all buckets belonging to the worker will be synced.',
default: process.env.BUCKETS ? _.uniq(process.env.BUCKETS.split(',').map((b) => parseInt(b))) : [],
}),
uploadQueueDir: flags.string({
description:
'Directory to store fully downloaded data objects before compressing them and uploading to S3 (absolute path).',
Expand Down Expand Up @@ -230,61 +220,6 @@ Supported values: warn, error, debug, info. Default:debug`,
...ApiCommandBase.flags,
}

async getSyncableBuckets(api: ApiPromise, qnApi: QueryNodeApi): Promise<string[]> {
const { flags } = this.parse(Archive)
const workerId = flags.worker

if (!(await verifyWorkerId(api, workerId))) {
logger.error(`workerId ${workerId} does not exist in the storage working group`)
this.exit(ExitCodes.InvalidWorkerId)
}

if (!flags.buckets.length) {
logger.info(`No buckets provided. Will use all bucket belonging to worker ${workerId}.`)
}

const selectedBucketsAndAccounts = await constructBucketToAddressMapping(api, qnApi, workerId, flags.buckets)
const selectedBuckets = selectedBucketsAndAccounts.map(([bucketId]) => bucketId)
const selectedVsProvidedDiff = _.difference(
flags.buckets.map((id) => id.toString()),
selectedBuckets
)

if (selectedVsProvidedDiff.length) {
logger.warn(
`Buckets: ${JSON.stringify(
selectedVsProvidedDiff
)} do not belong to worker with ID=${workerId} and will NOT be synced!`
)
}

let syncableBuckets = selectedBuckets
if (process.env.DISABLE_BUCKET_AUTH === 'true') {
logger.warn('Bucket authentication is disabled! This is not recommended for production use!')
} else {
const keystoreAddresses = this.getUnlockedAccounts()
const bucketsWithKeysInKeyring = selectedBucketsAndAccounts.filter(([bucketId, address]) => {
if (!keystoreAddresses.includes(address)) {
this.warn(`Missing transactor key for bucket ${bucketId}. It will NOT be synced!`)
return false
}
return true
})

syncableBuckets = bucketsWithKeysInKeyring.map(([bucketId]) => bucketId)
}

if (!syncableBuckets.length) {
this.error('No buckets to serve. Exiting...')
}

if (flags.buckets.length && syncableBuckets.length !== flags.buckets.length) {
logger.warn(`Only ${syncableBuckets.length} out of ${flags.buckets.length} provided buckets will be synced!`)
}

return syncableBuckets
}

initLogger(): void {
const { flags } = this.parse(Archive)
if (!_.isEmpty(flags.elasticSearchEndpoint) || !_.isEmpty(flags.logFilePath)) {
Expand Down Expand Up @@ -345,9 +280,12 @@ Supported values: warn, error, debug, info. Default:debug`,
defaultStorageClass: flags.awsStorageClass,
})

// Get buckets to sync
const syncableBuckets = await this.getSyncableBuckets(api, qnApi)
logger.info(`Buckets to sync: [${syncableBuckets}]`)
// Verify workerId
const workerId = flags.worker
if (!(await verifyWorkerId(api, workerId))) {
logger.error(`workerId ${workerId} does not exist in the storage working group`)
this.exit(ExitCodes.InvalidWorkerId)
}

// Check and normalize input directories
const { tmpDownloadDir, uploadQueueDir } = await this.checkAndNormalizeDirs({
Expand All @@ -364,7 +302,6 @@ Supported values: warn, error, debug, info. Default:debug`,
// Build and run archive service
const X_HOST_ID = uuidv4()
const archiveService = new ArchiveService({
buckets: syncableBuckets.map((id) => id.toString()),
archiveTrackfileBackupFreqMinutes: flags.archiveTrackfileBackupFreqMinutes,
localCountTriggerThreshold: flags.localCountTriggerThreshold,
localSizeTriggerThreshold: flags.localSizeTriggerThresholdMB * 1_000_000,
Expand Down
9 changes: 2 additions & 7 deletions storage-node/src/services/archive/ArchiveService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ class DataObjectsQueue {
}

type ArchiveServiceParams = {
// Supported buckets
buckets: string[]
// Upload trigger Thresholds
localCountTriggerThreshold: number | undefined
localSizeTriggerThreshold: number
Expand Down Expand Up @@ -142,8 +140,6 @@ type ArchiveServiceParams = {

export class ArchiveService {
private logger: Logger
// Buckets
private buckets: string[]
// Thresholds
private localCountTriggerThreshold: number | undefined
private localSizeTriggerThreshold: number
Expand Down Expand Up @@ -185,7 +181,6 @@ export class ArchiveService {

constructor(params: ArchiveServiceParams) {
// From params:
this.buckets = params.buckets
this.localCountTriggerThreshold = params.localCountTriggerThreshold
this.localSizeTriggerThreshold = params.localSizeTriggerThreshold
this.localAgeTriggerThresholdMinutes = params.localAgeTriggerThresholdMinutes
Expand Down Expand Up @@ -372,7 +367,7 @@ export class ArchiveService {
* @throws Error If there's an issue w/ file access or the query node
*/
public async performSync(): Promise<void> {
const model = await getStorageObligationsFromRuntime(this.queryNodeApi, this.buckets)
const model = await getStorageObligationsFromRuntime(this.queryNodeApi)

const assignedObjects = model.dataObjects
const added = assignedObjects.filter((obj) => !this.objectTrackingService.isTracked(obj.id))
Expand Down Expand Up @@ -400,7 +395,7 @@ export class ArchiveService {
}
const [downloadTask] = await getDownloadTasks(
model,
this.buckets,
[],
[object],
this.uploadQueueDir,
this.tmpDownloadDir,
Expand Down
12 changes: 12 additions & 0 deletions storage-node/src/services/queryNode/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import logger from '../logger'
import {
DataObjectByBagIdsDetailsFragment,
DataObjectDetailsFragment,
GetAllStorageBagDetails,
GetAllStorageBagDetailsQuery,
GetAllStorageBagDetailsQueryVariables,
GetDataObjects,
GetDataObjectsByBagIds,
GetDataObjectsByBagIdsQuery,
Expand Down Expand Up @@ -236,6 +239,15 @@ export class QueryNodeApi {
return result
}

public async getAllStorageBagsDetails(): Promise<Array<StorageBagDetailsFragment>> {
const result = await this.multipleEntitiesQuery<
GetAllStorageBagDetailsQuery,
GetAllStorageBagDetailsQueryVariables
>(GetAllStorageBagDetails, {}, 'storageBags')

return result
}

/**
* Returns data objects info by pages for the given bags.
*
Expand Down
6 changes: 6 additions & 0 deletions storage-node/src/services/queryNode/queries/queries.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ query getStorageBagDetails($bucketIds: [String!]) {
}
}

query getAllStorageBagDetails {
storageBags {
...StorageBagDetails
}
}

fragment DataObjectByBagIdsDetails on StorageDataObject {
id
size
Expand Down
7 changes: 4 additions & 3 deletions storage-node/src/services/sync/storageObligations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,17 @@ export type DataObject = {
* runtime (Query Node).
*
* @param queryNodeUrl - Query Node URL
* @param workerId - worker ID
* @param bucketIds - bucket IDs. If undefined, we treat all existing bags as obligations.
* @returns promise for the DataObligations
*/
export async function getStorageObligationsFromRuntime(
qnApi: QueryNodeApi,
bucketIds: string[]
bucketIds?: string[]
): Promise<DataObligations> {
const allBuckets = await getAllBuckets(qnApi)

const assignedBags = await getAllAssignedBags(qnApi, bucketIds)
const assignedBags =
bucketIds === undefined ? await qnApi.getAllStorageBagsDetails() : await getAllAssignedBags(qnApi, bucketIds)

const bagIds = assignedBags.map((bag) => bag.id)
const assignedDataObjects = await getAllAssignedDataObjects(qnApi, bagIds)
Expand Down

0 comments on commit 5238b65

Please sign in to comment.