Skip to content

Commit

Permalink
Use a file locking mechanism to ensure no concurrent job runs
Browse files Browse the repository at this point in the history
- Watching the resource or using kube api is prone to race
  condition, as their is a delay between the request to create
  a job, and its run, plus, the logic is not easy to centralize
  if we consider job already completed or not yet started, as
  we delete the old jobs upon completion
- Using alock mechanism is similar to what we already have for
  quotas, plus, it ensures no race condition and tests being
  able to restart as soon as possible.

Issue: ZENKO-4941
  • Loading branch information
williamlardier committed Dec 3, 2024
1 parent 6a8731c commit 633b42e
Showing 1 changed file with 39 additions and 41 deletions.
80 changes: 39 additions & 41 deletions tests/ctst/steps/utils/kubernetes.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import * as path from 'path';
import lockFile from 'proper-lockfile';
import { KubernetesHelper, Utils } from 'cli-testing';
import Zenko from 'world/Zenko';
import {
Expand Down Expand Up @@ -71,37 +73,26 @@ export function createKubeCustomObjectClient(world: Zenko): CustomObjectsApi {
return KubernetesHelper.customObject;
}

// Do not check job result, only wait till it completes
export async function waitForExistingJobCompletion(world: Zenko, jobName: string) {
const watchClient = createKubeWatchClient(world);
try {
await new Promise<void>(resolve => {
void watchClient.watch(
'/apis/batch/v1/namespaces/default/jobs',
{},
(type: string, apiObj, watchObj) => {
if ((watchObj.object?.metadata?.name as string)?.startsWith?.(jobName)) {
if (watchObj.object?.status?.succeeded || watchObj.object?.status?.failed) {
resolve();
}
}
}, () => resolve());
});
} catch (err: unknown) {
world.logger.error('error waiting for job completion', {
jobName,
err,
});
}
}

export async function createJobAndWaitForCompletion(world: Zenko, jobName: string, customMetadata?: string) {
export async function createJobAndWaitForCompletion(
world: Zenko,
jobName: string,
customMetadata?: string
) {
const batchClient = createKubeBatchClient(world);
const watchClient = createKubeWatchClient(world);
await waitForExistingJobCompletion(world, jobName);

const lockFilePath = path.join('/tmp', `${jobName}.lock`);
let releaseLock: (() => Promise<void>) | false = false;

try {
// Acquire lock on the file
releaseLock = await lockFile.lock(lockFilePath, { stale: 600000 });
world.logger.debug(`Acquired lock for job: ${jobName}`);

// Read the cron job and prepare the job spec
const cronJob = await batchClient.readNamespacedCronJob(jobName, 'default');
const cronJobSpec = cronJob.body.spec?.jobTemplate.spec;

const job = new V1Job();
const metadata = new V1ObjectMeta();
job.apiVersion = 'batch/v1';
Expand All @@ -112,50 +103,57 @@ export async function createJobAndWaitForCompletion(world: Zenko, jobName: strin
'cronjob.kubernetes.io/instantiate': 'ctst',
};
if (customMetadata) {
metadata.annotations = {
custom: customMetadata,
};
metadata.annotations.custom = customMetadata;
}
job.metadata = metadata;

// Create the job
const response = await batchClient.createNamespacedJob('default', job);
world.logger.debug('job created', {
job: response.body.metadata,
});
world.logger.debug('Job created', { job: response.body.metadata });

const expectedJobName = response.body.metadata?.name;

// Watch for job completion
await new Promise<void>((resolve, reject) => {
void watchClient.watch(
'/apis/batch/v1/namespaces/default/jobs',
{},
(type: string, apiObj, watchObj) => {
if (job.metadata?.name && expectedJobName &&
(watchObj.object?.metadata?.name as string)?.startsWith?.(expectedJobName)) {
if (
expectedJobName &&
(watchObj.object?.metadata?.name as string)?.startsWith?.(expectedJobName)
) {
if (watchObj.object?.status?.succeeded) {
world.logger.debug('job succeeded', {
job: job.metadata,
});
world.logger.debug('Job succeeded', { job: job.metadata });
resolve();
} else if (watchObj.object?.status?.failed) {
world.logger.debug('job failed', {
world.logger.debug('Job failed', {
job: job.metadata,
object: watchObj.object,
});
reject(new Error('job failed'));
reject(new Error('Job failed'));
}
}
}, reject);
},
reject
);
});
} catch (err: unknown) {
world.logger.error('error creating job', {
world.logger.error('Error creating or waiting for job completion', {
jobName,
err,
});
throw err;
} finally {
// Ensure the lock is released
if (releaseLock) {
await releaseLock();
world.logger.debug(`Released lock for job: ${jobName}`);
}
}
}


export async function waitForZenkoToStabilize(
world: Zenko, needsReconciliation = false, timeout = 15 * 60 * 1000, namespace = 'default') {
// ZKOP pulls the overlay configuration from Pensieve every 5 seconds
Expand Down

0 comments on commit 633b42e

Please sign in to comment.