Skip to content

Commit

Permalink
Colossus archive script: Support for faster compression / no compress…
Browse files Browse the repository at this point in the history
…ion + bug fixes
  • Loading branch information
Lezek123 committed Oct 29, 2024
1 parent abcf781 commit 2b45e12
Show file tree
Hide file tree
Showing 6 changed files with 328 additions and 134 deletions.
3 changes: 3 additions & 0 deletions colossus.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ RUN yarn workspace storage-node build
RUN yarn cache clean

FROM node:18 as final

WORKDIR /joystream
# 7zip and zstd are required by the archive script
RUN apt-get update && apt-get install -y p7zip-full zstd
COPY --from=builder /joystream /joystream
RUN yarn --frozen-lockfile --production

Expand Down
35 changes: 33 additions & 2 deletions storage-node/src/commands/archive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { IConnectionHandler } from '../services/s3/IConnectionHandler'
import { AwsConnectionHandler } from '../services/s3/AwsConnectionHandler'
import { createDirectory } from '../services/helpers/filesystem'
import { promises as fsp } from 'fs'
import { CompressionAlgorithm, CompressionLevel, getCompressionService } from '../services/archive/compression'
import { StorageClass } from '@aws-sdk/client-s3'

/**
Expand Down Expand Up @@ -87,11 +88,34 @@ export default class Archive extends ApiCommandBase {
}),
archiveTrackfileBackupFreqMinutes: flags.integer({
description:
'Specifies how frequently the archive tracking file (containing information about .7z files content)' +
'Specifies how frequently the archive tracking file (containing information about archive files content)' +
" should be uploaded to S3 (in case it's changed).",
env: 'ARCHIVE_TRACKFILE_BACKUP_FREQ_MINUTES',
default: 60,
}),
compressionAlgorithm: flags.enum<CompressionAlgorithm>({
required: true,
description: 'Compression algorithm to use for archive files',
options: ['7zip', 'zstd', 'none'],
default: 'zstd',
env: 'COMPRESSION_ALGORITHM',
}),
compressionLevel: flags.enum<CompressionLevel>({
required: true,
description: 'Compression level to use for archive files (lower is faster, but provides lower storage savings)',
env: 'COMPRESSION_LEVEL',
default: 'medium',
options: ['low', 'medium', 'high'],
}),
compressionThreads: flags.integer({
required: true,
description:
'Number of threads to use for compression. ' +
'Note that {uploadWorkersNumber} upload tasks may be running at once ' +
'and each of them can spawn a separate compression task which uses {compressionThreads} threads!',
env: 'COMPRESSION_THREADS',
default: 1,
}),
uploadWorkersNumber: flags.integer({
required: false,
description: 'Upload workers number (max async operations in progress).',
Expand Down Expand Up @@ -248,7 +272,7 @@ Supported values: warn, error, debug, info. Default:debug`,
this.error('No buckets to serve. Exiting...')
}

if (syncableBuckets.length !== flags.buckets.length) {
if (flags.buckets.length && syncableBuckets.length !== flags.buckets.length) {
logger.warn(`Only ${syncableBuckets.length} out of ${flags.buckets.length} provided buckets will be synced!`)
}

Expand Down Expand Up @@ -325,6 +349,12 @@ Supported values: warn, error, debug, info. Default:debug`,
uploadQueueDir: flags.uploadQueueDir,
})

const compressionService = getCompressionService(
flags.compressionAlgorithm,
flags.compressionThreads,
flags.compressionLevel
)

// Build and run archive service
const X_HOST_ID = uuidv4()
const archiveService = new ArchiveService({
Expand All @@ -339,6 +369,7 @@ Supported values: warn, error, debug, info. Default:debug`,
tmpDownloadDir,
s3ConnectionHandler,
queryNodeApi: qnApi,
compressionService,
uploadWorkersNum: flags.uploadWorkersNumber,
hostId: X_HOST_ID,
syncWorkersNum: flags.syncWorkersNumber,
Expand Down
75 changes: 29 additions & 46 deletions storage-node/src/services/archive/ArchiveService.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { promises as fsp } from 'fs'
import path from 'path'
import logger from '../logger'
import { CompressFilesTask, UploadArchiveFileTask } from './tasks'
import { CompressAndUploadTask, UploadArchiveFileTask } from './tasks'
import { WorkingStack, TaskProcessorSpawner } from '../processing/workingProcess'
import { downloadEvents, DownloadFileTask } from '../sync/tasks'
import _ from 'lodash'
Expand All @@ -18,6 +18,7 @@ import { getDownloadTasks } from '../sync/synchronizer'
import sleep from 'sleep-promise'
import { Logger } from 'winston'
import { StorageClass } from '@aws-sdk/client-s3'
import { CompressionService } from './compression'

type DataObjectData = {
id: string
Expand Down Expand Up @@ -123,6 +124,8 @@ type ArchiveServiceParams = {
// API's
s3ConnectionHandler: IConnectionHandler<StorageClass>
queryNodeApi: QueryNodeApi
// Compression service
compressionService: CompressionService
// Upload tasks config
uploadWorkersNum: number
// Sync tasks config
Expand Down Expand Up @@ -151,6 +154,8 @@ export class ArchiveService {
// API's and services
private queryNodeApi: QueryNodeApi
private s3ConnectionHandler: IConnectionHandler<StorageClass>
// Compression service
private compressionService: CompressionService
// Tracking services
private objectTrackingService: ObjectTrackingService
private archivesTrackingService: ArchivesTrackingService
Expand Down Expand Up @@ -183,6 +188,7 @@ export class ArchiveService {
this.uploadQueueDir = params.uploadQueueDir
this.tmpDownloadDir = params.tmpDownloadDir
this.s3ConnectionHandler = params.s3ConnectionHandler
this.compressionService = params.compressionService
this.queryNodeApi = params.queryNodeApi
this.uploadWorkersNum = params.uploadWorkersNum
this.hostId = params.hostId
Expand Down Expand Up @@ -400,9 +406,12 @@ export class ArchiveService {
const uploadDirContents = await fsp.readdir(this.uploadQueueDir, { withFileTypes: true })
for (const item of uploadDirContents) {
if (item.isFile()) {
const [name, ext1, ext2] = item.name.split('.')
const splitParts = item.name.split('.')
const name = splitParts[0]
const isTmp = splitParts[1] === 'tmp'
const ext = splitParts.slice(isTmp ? 2 : 1).join('.')
// 1. If file name is an int and has no ext: We assume it's a fully downloaded data object
if (parseInt(name).toString() === name && !ext1) {
if (parseInt(name).toString() === name && !isTmp && !ext) {
const dataObjectId = name
// 1.1. If the object is not in dataObjectsQueue: remove
if (!this.dataObjectsQueue.has(dataObjectId)) {
Expand All @@ -419,8 +428,8 @@ export class ArchiveService {
await this.tryRemovingLocalDataObject(dataObjectId)
}
}
// 2. If file is .7z: We assume it's a valid archive with data objects
else if (ext1 === '7z') {
// 2. If file is an archive and has no `.tmp` ext: We assume it's a valid archive with data objects
else if (!isTmp && ext === this.compressionService.getExt()) {
if (!this.archivesTrackingService.isTracked(item.name)) {
// 2.1. If not tracked by archiveTrackingService - try to re-upload:
this.logger.warn(`Found unuploaded archive: ${item.name}. Scheduling for re-upload...`)
Expand All @@ -430,17 +439,18 @@ export class ArchiveService {
item.name,
this.uploadQueueDir,
this.archivesTrackingService,
this.s3ConnectionHandler
this.s3ConnectionHandler,
this.compressionService
),
])
// 2.2. If it's already tracked by archiveTrackingService (already uploaded): remove
} else {
this.logger.warn(`Found already uploaded archive: ${item.name}. Removing...`)
await this.tryRemovingLocalFile(path.join(this.uploadQueueDir, item.name))
}
// 3. If file is .tmp.7z: remove
} else if (ext1 === 'tmp' && ext2 === '7z') {
this.logger.warn(`Found broken archive: ${item.name}. Removing...`)
// 3. If file is temporary: remove
} else if (isTmp) {
this.logger.warn(`Found temporary file: ${item.name}. Removing...`)
await this.tryRemovingLocalFile(path.join(this.uploadQueueDir, item.name))
} else if (item.name !== ARCHIVES_TRACKING_FILENAME && item.name !== OBJECTS_TRACKING_FILENAME) {
this.logger.warn(`Found unrecognized file: ${item.name}`)
Expand Down Expand Up @@ -583,8 +593,7 @@ export class ArchiveService {
}

/**
* Compresses batches of data objects into 7zip archives and
* schedules the uploads to S3.
* Compresses batches of data objects into archives and schedules the uploads to S3.
*/
public async prepareAndUploadBatches(dataObjectBatches: DataObjectData[][]): Promise<void> {
if (!dataObjectBatches.length) {
Expand All @@ -594,46 +603,20 @@ export class ArchiveService {

this.preparingForUpload = true

this.logger.info(`Preparing ${dataObjectBatches.length} batches for upload...`)
const compressionTasks: CompressFilesTask[] = []
this.logger.info(`Preparing ${dataObjectBatches.length} object batches for upload...`)
const uploadTasks: CompressAndUploadTask[] = []
for (const batch of dataObjectBatches) {
const compressionTask = new CompressFilesTask(
const uploadTask = new CompressAndUploadTask(
this.uploadQueueDir,
batch.map((o) => o.id)
batch.map((o) => o.id),
this.archivesTrackingService,
this.s3ConnectionHandler,
this.compressionService
)
compressionTasks.push(compressionTask)
uploadTasks.push(uploadTask)
}

// We run compression tasks one by one, because they spawn 7zip, which uses all available threads
// by default, ie. we probably won't benefit from running multiple 7zip tasks in parallel.
this.logger.info(`Creating ${compressionTasks.length} archive file(s)...`)
const archiveFiles = []
for (const compressionTask of compressionTasks) {
this.logger.debug(compressionTask.description())
try {
await compressionTask.execute()
archiveFiles.push(compressionTask.getArchiveFilePath())
} catch (e) {
this.logger.error(`Data objects compression task failed: ${e.toString()}`)
}
}

// After collecting the archive files we add them to upload queue
const uploadFileTasks = archiveFiles.map(
(filePath) =>
new UploadArchiveFileTask(
filePath,
path.basename(filePath),
this.uploadQueueDir,
this.archivesTrackingService,
this.s3ConnectionHandler
)
)

if (uploadFileTasks.length) {
this.logger.info(`Scheduling ${uploadFileTasks.length} uploads to S3...`)
await this.uploadWorkingStack.add(uploadFileTasks)
}
await this.uploadWorkingStack.add(uploadTasks)

this.preparingForUpload = false
}
Expand Down
55 changes: 0 additions & 55 deletions storage-node/src/services/archive/SevenZipService.ts

This file was deleted.

Loading

0 comments on commit 2b45e12

Please sign in to comment.