diff --git a/tests/ctst/steps/utils/kubernetes.ts b/tests/ctst/steps/utils/kubernetes.ts index 6bce6126e..eae0f533f 100644 --- a/tests/ctst/steps/utils/kubernetes.ts +++ b/tests/ctst/steps/utils/kubernetes.ts @@ -1,6 +1,5 @@ -import fs from 'fs'; -import * as path from 'path'; -import lockFile from 'proper-lockfile'; +import { V1ConfigMap } from '@kubernetes/client-node'; + import { KubernetesHelper, Utils } from 'cli-testing'; import Zenko from 'world/Zenko'; import { @@ -81,87 +80,156 @@ export async function createJobAndWaitForCompletion( ) { const batchClient = createKubeBatchClient(world); const watchClient = createKubeWatchClient(world); - - const lockFilePath = path.join('/tmp', `${jobName}.lock`); - let releaseLock: (() => Promise) | false = false; - - if (!fs.existsSync(lockFilePath)) { - fs.writeFileSync(lockFilePath, 'job'); - } - - // Wait 1s to make the lock stale - await Utils.sleep(1000); + const coreClient = createKubeCoreClient(world); try { - // Acquire lock on the file with 0.5s staleness and 1200 retries - releaseLock = await lockFile.lock(lockFilePath, { stale: 500, retries: 1200 }); - world.logger.debug(`Acquired lock for job: ${jobName}`); + // First, try to create a sentinel ConfigMap that will act as our lock + const lockName = `lock-${jobName}`; + const lockConfigMap = new V1ConfigMap(); + lockConfigMap.apiVersion = 'v1'; + lockConfigMap.kind = 'ConfigMap'; + const metadata = new V1ObjectMeta(); + metadata.name = lockName; + metadata.namespace = 'default'; + lockConfigMap.metadata = metadata; + + let lockAcquired = false; + try { + await coreClient.createNamespacedConfigMap('default', lockConfigMap); + lockAcquired = true; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + } catch (err: any) { + if (err?.body?.reason === 'AlreadyExists') { + // Someone else has the lock, wait for their job to finish + world.logger.debug('Lock already exists, waiting for existing job'); + + // Find and wait for the active job + const existingJobs = await batchClient.listNamespacedJob( + 'default', + undefined, + undefined, + undefined, + undefined, + `job-base-name=${jobName}` + ); + + const activeJobs = existingJobs.body.items.filter(job => + !job.status?.succeeded && !job.status?.failed + ); + + if (activeJobs.length > 0) { + await new Promise((resolve, reject) => { + void watchClient.watch( + '/apis/batch/v1/namespaces/default/jobs', + {}, + (type: string, apiObj, watchObj) => { + if ( + activeJobs[0].metadata?.name && + watchObj.object?.metadata?.name === activeJobs[0].metadata.name + ) { + if (watchObj.object?.status?.succeeded) { + resolve(); + } else if (watchObj.object?.status?.failed) { + reject(new Error('Existing job failed')); + } + } + }, + reject + ); + }); + } + + // Try to acquire lock again + await coreClient.createNamespacedConfigMap('default', lockConfigMap); + lockAcquired = true; + } else { + throw err; + } + } - // Read the cron job and prepare the job spec + if (!lockAcquired) { + throw new Error('Failed to acquire lock'); + } + + // Create the new job with the ConfigMap as its owner const cronJob = await batchClient.readNamespacedCronJob(jobName, 'default'); const cronJobSpec = cronJob.body.spec?.jobTemplate.spec; const job = new V1Job(); - const metadata = new V1ObjectMeta(); job.apiVersion = 'batch/v1'; job.kind = 'Job'; job.spec = cronJobSpec; - metadata.name = `${jobName}-${Utils.randomString().toLowerCase()}`; - metadata.annotations = { + + const jobMetadata = new V1ObjectMeta(); + jobMetadata.name = `${jobName}-${Utils.randomString()}`; + jobMetadata.labels = { + 'job-base-name': jobName, + }; + jobMetadata.annotations = { 'cronjob.kubernetes.io/instantiate': 'ctst', }; if (customMetadata) { - metadata.annotations.custom = customMetadata; + jobMetadata.annotations.custom = customMetadata; } - job.metadata = metadata; + + // Set the ConfigMap as the owner of the job + jobMetadata.ownerReferences = [{ + apiVersion: 'v1', + kind: 'ConfigMap', + name: lockName, + uid: (await coreClient.readNamespacedConfigMap(lockName, 'default')).body.metadata!.uid!, + blockOwnerDeletion: true, + controller: true + }]; + + job.metadata = jobMetadata; // Create the job const response = await batchClient.createNamespacedJob('default', job); world.logger.debug('Job created', { job: response.body.metadata }); - const expectedJobName = response.body.metadata?.name; - - // Watch for job completion - await new Promise((resolve, reject) => { - void watchClient.watch( - '/apis/batch/v1/namespaces/default/jobs', - {}, - (type: string, apiObj, watchObj) => { - if ( - expectedJobName && - (watchObj.object?.metadata?.name as string)?.startsWith?.(expectedJobName) - ) { - if (watchObj.object?.status?.succeeded) { - world.logger.debug('Job succeeded', { job: job.metadata }); - resolve(); - } else if (watchObj.object?.status?.failed) { - world.logger.debug('Job failed', { - job: job.metadata, - object: watchObj.object, - }); - reject(new Error('Job failed')); + try { + // Watch for job completion + await new Promise((resolve, reject) => { + void watchClient.watch( + '/apis/batch/v1/namespaces/default/jobs', + {}, + (type: string, apiObj, watchObj) => { + if (watchObj.object?.metadata?.name === response.body.metadata?.name) { + if (watchObj.object?.status?.succeeded) { + world.logger.debug('Job succeeded', { job: job.metadata }); + resolve(); + } else if (watchObj.object?.status?.failed) { + world.logger.debug('Job failed', { + job: job.metadata, + object: watchObj.object, + }); + reject(new Error('Job failed')); + } } - } - }, - reject - ); - }); + }, + reject + ); + }); + } finally { + // Always clean up the lock + if (lockAcquired) { + try { + await coreClient.deleteNamespacedConfigMap(lockName, 'default'); + } catch (err) { + world.logger.warn('Failed to delete lock ConfigMap', { err }); + } + } + } } catch (err: unknown) { world.logger.error('Error creating or waiting for job completion', { jobName, err, }); throw err; - } finally { - // Ensure the lock is released - if (releaseLock) { - await releaseLock(); - world.logger.debug(`Released lock for job: ${jobName}`); - } } } - export async function waitForZenkoToStabilize( world: Zenko, needsReconciliation = false, timeout = 15 * 60 * 1000, namespace = 'default') { // ZKOP pulls the overlay configuration from Pensieve every 5 seconds