Skip to content

Commit

Permalink
Merge branch 'improvement/ZENKO-4941' into tmp/octopus/w/2.7/improvem…
Browse files Browse the repository at this point in the history
…ent/ZENKO-4941
  • Loading branch information
bert-e committed Dec 3, 2024
2 parents c950724 + 74734d3 commit befd4da
Showing 1 changed file with 123 additions and 55 deletions.
178 changes: 123 additions & 55 deletions tests/ctst/steps/utils/kubernetes.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import fs from 'fs';
import * as path from 'path';
import lockFile from 'proper-lockfile';
import { V1ConfigMap } from '@kubernetes/client-node';

import { KubernetesHelper, Utils } from 'cli-testing';
import Zenko from 'world/Zenko';
import {
Expand Down Expand Up @@ -81,87 +80,156 @@ export async function createJobAndWaitForCompletion(
) {
const batchClient = createKubeBatchClient(world);
const watchClient = createKubeWatchClient(world);

const lockFilePath = path.join('/tmp', `${jobName}.lock`);
let releaseLock: (() => Promise<void>) | false = false;

if (!fs.existsSync(lockFilePath)) {
fs.writeFileSync(lockFilePath, 'job');
}

// Wait 1s to make the lock stale
await Utils.sleep(1000);
const coreClient = createKubeCoreClient(world);

try {
// Acquire lock on the file with 0.5s staleness and 1200 retries
releaseLock = await lockFile.lock(lockFilePath, { stale: 500, retries: 1200 });
world.logger.debug(`Acquired lock for job: ${jobName}`);
// First, try to create a sentinel ConfigMap that will act as our lock
const lockName = `lock-${jobName}`;
const lockConfigMap = new V1ConfigMap();
lockConfigMap.apiVersion = 'v1';
lockConfigMap.kind = 'ConfigMap';
const metadata = new V1ObjectMeta();
metadata.name = lockName;
metadata.namespace = 'default';
lockConfigMap.metadata = metadata;

let lockAcquired = false;
try {
await coreClient.createNamespacedConfigMap('default', lockConfigMap);
lockAcquired = true;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} catch (err: any) {
if (err?.body?.reason === 'AlreadyExists') {
// Someone else has the lock, wait for their job to finish
world.logger.debug('Lock already exists, waiting for existing job');

// Find and wait for the active job
const existingJobs = await batchClient.listNamespacedJob(
'default',
undefined,
undefined,
undefined,
undefined,
`job-base-name=${jobName}`
);

const activeJobs = existingJobs.body.items.filter(job =>
!job.status?.succeeded && !job.status?.failed
);

if (activeJobs.length > 0) {
await new Promise<void>((resolve, reject) => {
void watchClient.watch(
'/apis/batch/v1/namespaces/default/jobs',
{},
(type: string, apiObj, watchObj) => {
if (
activeJobs[0].metadata?.name &&
watchObj.object?.metadata?.name === activeJobs[0].metadata.name
) {
if (watchObj.object?.status?.succeeded) {
resolve();
} else if (watchObj.object?.status?.failed) {
reject(new Error('Existing job failed'));
}
}
},
reject
);
});
}

// Try to acquire lock again
await coreClient.createNamespacedConfigMap('default', lockConfigMap);
lockAcquired = true;
} else {
throw err;
}
}

// Read the cron job and prepare the job spec
if (!lockAcquired) {
throw new Error('Failed to acquire lock');
}

// Create the new job with the ConfigMap as its owner
const cronJob = await batchClient.readNamespacedCronJob(jobName, 'default');
const cronJobSpec = cronJob.body.spec?.jobTemplate.spec;

const job = new V1Job();
const metadata = new V1ObjectMeta();
job.apiVersion = 'batch/v1';
job.kind = 'Job';
job.spec = cronJobSpec;
metadata.name = `${jobName}-${Utils.randomString().toLowerCase()}`;
metadata.annotations = {

const jobMetadata = new V1ObjectMeta();
jobMetadata.name = `${jobName}-${Utils.randomString()}`;
jobMetadata.labels = {
'job-base-name': jobName,
};
jobMetadata.annotations = {
'cronjob.kubernetes.io/instantiate': 'ctst',
};
if (customMetadata) {
metadata.annotations.custom = customMetadata;
jobMetadata.annotations.custom = customMetadata;
}
job.metadata = metadata;

// Set the ConfigMap as the owner of the job
jobMetadata.ownerReferences = [{
apiVersion: 'v1',
kind: 'ConfigMap',
name: lockName,
uid: (await coreClient.readNamespacedConfigMap(lockName, 'default')).body.metadata!.uid!,
blockOwnerDeletion: true,
controller: true
}];

job.metadata = jobMetadata;

// Create the job
const response = await batchClient.createNamespacedJob('default', job);
world.logger.debug('Job created', { job: response.body.metadata });

const expectedJobName = response.body.metadata?.name;

// Watch for job completion
await new Promise<void>((resolve, reject) => {
void watchClient.watch(
'/apis/batch/v1/namespaces/default/jobs',
{},
(type: string, apiObj, watchObj) => {
if (
expectedJobName &&
(watchObj.object?.metadata?.name as string)?.startsWith?.(expectedJobName)
) {
if (watchObj.object?.status?.succeeded) {
world.logger.debug('Job succeeded', { job: job.metadata });
resolve();
} else if (watchObj.object?.status?.failed) {
world.logger.debug('Job failed', {
job: job.metadata,
object: watchObj.object,
});
reject(new Error('Job failed'));
try {
// Watch for job completion
await new Promise<void>((resolve, reject) => {
void watchClient.watch(
'/apis/batch/v1/namespaces/default/jobs',
{},
(type: string, apiObj, watchObj) => {
if (watchObj.object?.metadata?.name === response.body.metadata?.name) {
if (watchObj.object?.status?.succeeded) {
world.logger.debug('Job succeeded', { job: job.metadata });
resolve();
} else if (watchObj.object?.status?.failed) {
world.logger.debug('Job failed', {
job: job.metadata,
object: watchObj.object,
});
reject(new Error('Job failed'));
}
}
}
},
reject
);
});
},
reject
);
});
} finally {
// Always clean up the lock
if (lockAcquired) {
try {
await coreClient.deleteNamespacedConfigMap(lockName, 'default');
} catch (err) {
world.logger.warn('Failed to delete lock ConfigMap', { err });
}
}
}
} catch (err: unknown) {
world.logger.error('Error creating or waiting for job completion', {
jobName,
err,
});
throw err;
} finally {
// Ensure the lock is released
if (releaseLock) {
await releaseLock();
world.logger.debug(`Released lock for job: ${jobName}`);
}
}
}


export async function waitForZenkoToStabilize(
world: Zenko, needsReconciliation = false, timeout = 15 * 60 * 1000, namespace = 'default') {
// ZKOP pulls the overlay configuration from Pensieve every 5 seconds
Expand Down

0 comments on commit befd4da

Please sign in to comment.