diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java index d7e0311f1240..86f5b24ca77d 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java @@ -61,9 +61,10 @@ public interface JobQueueManagerAPI { /** * Retrieves the job processors for all registered queues. + * * @return A map of queue names to job processors */ - Map> getQueueNames(); + Map> getQueueNames(); /** * Creates a new job in the specified queue. @@ -86,6 +87,18 @@ String createJob(String queueName, Map parameters) */ Job getJob(String jobId) throws DotDataException; + /** + * Retrieves a list of active jobs for a specific queue. + * + * @param queueName The name of the queue + * @param page The page number + * @param pageSize The number of jobs per page + * @return A result object containing the list of active jobs and pagination information. + * @throws JobQueueDataException if there's an error fetching the jobs + */ + JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) + throws JobQueueDataException; + /** * Retrieves a list of jobs. * @@ -97,21 +110,42 @@ String createJob(String queueName, Map parameters) JobPaginatedResult getJobs(int page, int pageSize) throws DotDataException; /** - * Retrieves a list of active jobs for a specific queue. - * @param queueName The name of the queue - * @param page The page number - * @param pageSize The number of jobs per page + * Retrieves a list of active jobs, meaning jobs that are currently being processed. + * + * @param page The page number + * @param pageSize The number of jobs per page * @return A result object containing the list of active jobs and pagination information. * @throws JobQueueDataException if there's an error fetching the jobs */ - JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) throws JobQueueDataException; + JobPaginatedResult getActiveJobs(int page, int pageSize) throws JobQueueDataException; /** - * Retrieves a list of completed jobs for a specific queue within a date range. + * Retrieves a list of completed jobs + * * @param page The page number * @param pageSize The number of jobs per page * @return A result object containing the list of completed jobs and pagination information. - * @throws JobQueueDataException + * @throws JobQueueDataException if there's an error fetching the jobs + */ + JobPaginatedResult getCompletedJobs(int page, int pageSize) throws JobQueueDataException; + + /** + * Retrieves a list of canceled jobs + * + * @param page The page number + * @param pageSize The number of jobs per page + * @return A result object containing the list of canceled jobs and pagination information. + * @throws JobQueueDataException if there's an error fetching the jobs + */ + JobPaginatedResult getCanceledJobs(int page, int pageSize) throws JobQueueDataException; + + /** + * Retrieves a list of failed jobs + * + * @param page The page number + * @param pageSize The number of jobs per page + * @return A result object containing the list of failed jobs and pagination information. + * @throws JobQueueDataException if there's an error fetching the jobs */ JobPaginatedResult getFailedJobs(int page, int pageSize) throws JobQueueDataException; @@ -141,6 +175,7 @@ String createJob(String queueName, Map parameters) /** * Retrieves the retry strategy for a specific queue. + * * @param jobId The ID of the job * @return The processor instance, or an empty optional if not found */ diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java index a06bb61be3ff..ffb777cf8791 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java @@ -15,6 +15,7 @@ import com.dotcms.jobs.business.error.CircuitBreaker; import com.dotcms.jobs.business.error.ErrorDetail; import com.dotcms.jobs.business.error.JobProcessorNotFoundException; +import com.dotcms.jobs.business.error.RetryPolicyProcessor; import com.dotcms.jobs.business.error.RetryStrategy; import com.dotcms.jobs.business.job.Job; import com.dotcms.jobs.business.job.JobPaginatedResult; @@ -22,6 +23,7 @@ import com.dotcms.jobs.business.job.JobState; import com.dotcms.jobs.business.processor.Cancellable; import com.dotcms.jobs.business.processor.DefaultProgressTracker; +import com.dotcms.jobs.business.processor.DefaultRetryStrategy; import com.dotcms.jobs.business.processor.JobProcessor; import com.dotcms.jobs.business.processor.ProgressTracker; import com.dotcms.jobs.business.queue.JobQueue; @@ -107,6 +109,7 @@ public class JobQueueManagerAPIImpl implements JobQueueManagerAPI { private ExecutorService executorService; private final Map retryStrategies; private final RetryStrategy defaultRetryStrategy; + private final RetryPolicyProcessor retryPolicyProcessor; private final ScheduledExecutorService pollJobUpdatesScheduler; private LocalDateTime lastPollJobUpdateTime = LocalDateTime.now(); @@ -142,10 +145,11 @@ public class JobQueueManagerAPIImpl implements JobQueueManagerAPI { public JobQueueManagerAPIImpl(@Named("queueProducer") JobQueue jobQueue, JobQueueConfig jobQueueConfig, CircuitBreaker circuitBreaker, - RetryStrategy defaultRetryStrategy, + @DefaultRetryStrategy RetryStrategy defaultRetryStrategy, RealTimeJobMonitor realTimeJobMonitor, EventProducer eventProducer, - JobProcessorFactory jobProcessorFactory) { + JobProcessorFactory jobProcessorFactory, + RetryPolicyProcessor retryPolicyProcessor) { this.jobQueue = jobQueue; this.threadPoolSize = jobQueueConfig.getThreadPoolSize(); @@ -154,6 +158,8 @@ public JobQueueManagerAPIImpl(@Named("queueProducer") JobQueue jobQueue, this.retryStrategies = new ConcurrentHashMap<>(); this.defaultRetryStrategy = defaultRetryStrategy; this.circuitBreaker = circuitBreaker; + this.jobProcessorFactory = jobProcessorFactory; + this.retryPolicyProcessor = retryPolicyProcessor; this.pollJobUpdatesScheduler = Executors.newSingleThreadScheduledExecutor(); pollJobUpdatesScheduler.scheduleAtFixedRate( @@ -164,7 +170,6 @@ public JobQueueManagerAPIImpl(@Named("queueProducer") JobQueue jobQueue, // Events this.realTimeJobMonitor = realTimeJobMonitor; this.eventProducer = eventProducer; - this.jobProcessorFactory = jobProcessorFactory; } @Override @@ -244,6 +249,12 @@ public void registerProcessor(final String queueName, final Class watcher(); + + /** + * Returns a predicate that can be used to filter jobs based on custom criteria. + * + * @return a Predicate object to filter Job instances + */ + Predicate filter(); + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java index c55466f1f62a..29c9eab2e0b7 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java @@ -1,38 +1,121 @@ package com.dotcms.jobs.business.api.events; import com.dotcms.jobs.business.job.Job; +import com.dotcms.jobs.business.job.JobState; +import com.dotmarketing.util.Logger; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; +import java.util.function.Predicate; import javax.enterprise.context.ApplicationScoped; import javax.enterprise.event.Observes; /** - * Manages real-time monitoring of jobs in the system. This class handles registration of job - * watchers, updates watchers on job changes, and processes various job-related events. + * Manages real-time monitoring of jobs in the system. This class provides functionality to register + * watchers for specific jobs and receive notifications about job state changes and progress updates. + * + *

Thread safety is ensured through a combination of {@link ConcurrentHashMap} for storing watchers + * and synchronized {@link List}s for managing multiple watchers per job. This allows concurrent + * registration and notification of watchers without compromising data consistency.

+ * + *

The monitor supports filtered watching through predicates, allowing clients to receive only + * the updates they're interested in. Common predicates are provided through the inner + * {@link Predicates} class.

+ * + *

Usage Examples:

+ * + *

Watch all job updates:

+ *
{@code
+ * monitor.registerWatcher(jobId, job -> System.out.println("Job updated: " + job.id()));
+ * }
+ * + *

Watch only completed jobs:

+ *
{@code
+ * monitor.registerWatcher(jobId,
+ *     job -> handleCompletion(job),
+ *     Predicates.isCompleted()
+ * );
+ * }
+ * + *

Watch progress changes with threshold:

+ *
{@code
+ * monitor.registerWatcher(jobId,
+ *     job -> updateProgress(job),
+ *     Predicates.progressChanged(0.1f) // Updates every 10% progress
+ * );
+ * }
+ * + *

Combine multiple conditions:

+ *
{@code
+ * monitor.registerWatcher(jobId,
+ *     job -> handleUpdate(job),
+ *     Predicates.hasState(JobState.RUNNING)
+ *         .and(Predicates.progressChanged(0.05f))
+ * );
+ * }
+ * + * @see JobWatcher + * @see Predicates */ @ApplicationScoped public class RealTimeJobMonitor { - private final Map>> jobWatchers = new ConcurrentHashMap<>(); + private final Map> jobWatchers = new ConcurrentHashMap<>(); /** - * Registers a watcher for a specific job. + * Registers a watcher for a specific job with optional filtering of updates. The watcher will + * be notified of job updates that match the provided filter predicate. If no filter is provided + * (null), the watcher receives all updates for the job. * - * @param jobId The ID of the job to watch. - * @param watcher The consumer to be notified of job updates. + *

Multiple watchers can be registered for the same job, and each watcher can have + * its own filter predicate. Watchers are automatically removed when a job reaches a final state + * (completed, cancelled, or removed).

+ * + * @param jobId The ID of the job to watch + * @param watcher The consumer to be notified of job updates + * @param filter Optional predicate to filter job updates (null means receive all updates) + * @throws IllegalArgumentException if jobId or watcher is null + * @see Predicates for common filter predicates + */ + public void registerWatcher(String jobId, Consumer watcher, Predicate filter) { + jobWatchers.compute(jobId, (key, existingWatchers) -> { + List watchers = Objects.requireNonNullElseGet( + existingWatchers, + () -> Collections.synchronizedList(new ArrayList<>()) + ); + + final var jobWatcher = JobWatcher.builder() + .watcher(watcher) + .filter(filter != null ? filter : job -> true).build(); + + watchers.add(jobWatcher); + return watchers; + }); + } + + /** + * Registers a watcher for a specific job that receives all updates. + * This is a convenience method equivalent to calling {@code registerWatcher(jobId, watcher, null)}. + * + * @param jobId The ID of the job to watch + * @param watcher The consumer to be notified of job updates + * @throws IllegalArgumentException if jobId or watcher is null */ public void registerWatcher(String jobId, Consumer watcher) { - jobWatchers.computeIfAbsent(jobId, k -> new CopyOnWriteArrayList<>()).add(watcher); + registerWatcher(jobId, watcher, null); } /** * Retrieves the set of job IDs currently being watched. + * The returned set is a snapshot and may not reflect concurrent modifications. * - * @return A set of job IDs. + * @return An unmodifiable set of job IDs with active watchers */ public Set getWatchedJobIds() { return jobWatchers.keySet(); @@ -40,8 +123,10 @@ public Set getWatchedJobIds() { /** * Updates watchers for a list of jobs. + * Each job's watchers are notified according to their filter predicates. * - * @param updatedJobs List of jobs that have been updated. + * @param updatedJobs List of jobs that have been updated + * @throws IllegalArgumentException if updatedJobs is null */ public void updateWatchers(List updatedJobs) { for (Job job : updatedJobs) { @@ -56,9 +141,18 @@ public void updateWatchers(List updatedJobs) { */ private void updateWatchers(Job job) { - List> watchers = jobWatchers.get(job.id()); + List watchers = jobWatchers.get(job.id()); if (watchers != null) { - watchers.forEach(watcher -> watcher.accept(job)); + watchers.forEach(jobWatcher -> { + try { + if (jobWatcher.filter().test(job)) { + jobWatcher.watcher().accept(job); + } + } catch (Exception e) { + Logger.error(this, "Error notifying job watcher for job " + job.id(), e); + watchers.remove(jobWatcher); + } + }); } } @@ -136,4 +230,85 @@ public void onJobProgressUpdated(@Observes JobProgressUpdatedEvent event) { updateWatchers(event.getJob()); } + /** + * Common predicates for filtering job updates. These predicates can be used individually or + * combined using {@link Predicate#and(Predicate)} and {@link Predicate#or(Predicate)} to create + * more complex filtering conditions. + */ + public static class Predicates { + + private Predicates() { + // Prevent instantiation + } + + /** + * Creates a predicate that matches jobs with any of the specified states. + * + * @param states One or more job states to match + * @return A predicate that returns true if the job's state matches any of the specified + * states + * @throws IllegalArgumentException if states is null or empty + */ + public static Predicate hasState(JobState... states) { + return job -> Arrays.asList(states).contains(job.state()); + } + + /** + * Creates a predicate that matches jobs whose progress has changed by at least the + * specified threshold since the last notification. + * + * @param threshold The minimum progress change (0.0 to 1.0) required to match + * @return A predicate that tracks and matches significant progress changes + * @throws IllegalArgumentException if threshold is not between 0.0 and 1.0 + */ + public static Predicate progressChanged(float threshold) { + return new Predicate<>() { + private float lastProgress = 0; + + @Override + public boolean test(Job job) { + float currentProgress = job.progress(); + if (Math.abs(currentProgress - lastProgress) >= threshold) { + lastProgress = currentProgress; + return true; + } + return false; + } + }; + } + + /** + * Creates a predicate that matches failed jobs with error details. The predicate only + * matches if the job is in FAILED state and has error details available. + * + * @return A predicate for matching failed jobs + */ + public static Predicate hasFailed() { + return job -> job.state() == JobState.FAILED + && job.result().isPresent() + && job.result().get().errorDetail().isPresent(); + } + + /** + * Creates a predicate that matches completed jobs. The predicate matches any job in the + * COMPLETED state. + * + * @return A predicate for matching completed jobs + */ + public static Predicate isCompleted() { + return job -> job.state() == JobState.COMPLETED; + } + + /** + * Creates a predicate that matches canceled jobs. The predicate matches any job in the + * CANCELED state. + * + * @return A predicate for matching canceled jobs + */ + public static Predicate isCanceled() { + return job -> job.state() == JobState.CANCELED; + } + + } + } diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/ExponentialBackoffRetryStrategy.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/ExponentialBackoffRetryStrategy.java index 8b16953d763f..d754d6e9890d 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/error/ExponentialBackoffRetryStrategy.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/error/ExponentialBackoffRetryStrategy.java @@ -16,7 +16,7 @@ public class ExponentialBackoffRetryStrategy implements RetryStrategy { private final long maxDelay; private final double backoffFactor; private final int maxRetries; - private final Set> retryableExceptions; + private final Set> nonRetryableExceptions; private final SecureRandom random = new SecureRandom(); /** @@ -36,14 +36,14 @@ public ExponentialBackoffRetryStrategy(long initialDelay, long maxDelay, double * Constructs an ExponentialBackoffRetryStrategy with the specified parameters and retryable * exceptions. * - * @param initialDelay The initial delay between retries in milliseconds. - * @param maxDelay The maximum delay between retries in milliseconds. - * @param backoffFactor The factor by which the delay increases with each retry. - * @param maxRetries The maximum number of retry attempts allowed. - * @param retryableExceptions A set of exception classes that are considered retryable. + * @param initialDelay The initial delay between retries in milliseconds. + * @param maxDelay The maximum delay between retries in milliseconds. + * @param backoffFactor The factor by which the delay increases with each retry. + * @param maxRetries The maximum number of retry attempts allowed. + * @param nonRetryableExceptions A set of exception classes that are considered non retryable. */ public ExponentialBackoffRetryStrategy(long initialDelay, long maxDelay, double backoffFactor, - int maxRetries, Set> retryableExceptions) { + int maxRetries, Set> nonRetryableExceptions) { if (initialDelay <= 0 || maxDelay <= 0 || backoffFactor <= 1) { throw new IllegalArgumentException("Invalid retry strategy parameters"); @@ -53,7 +53,7 @@ public ExponentialBackoffRetryStrategy(long initialDelay, long maxDelay, double this.maxDelay = maxDelay; this.backoffFactor = backoffFactor; this.maxRetries = maxRetries; - this.retryableExceptions = new HashSet<>(retryableExceptions); + this.nonRetryableExceptions = new HashSet<>(nonRetryableExceptions); } /** @@ -65,7 +65,7 @@ public ExponentialBackoffRetryStrategy(long initialDelay, long maxDelay, double */ @Override public boolean shouldRetry(final Job job, final Class exceptionClass) { - return job.retryCount() < maxRetries && isRetryableException(exceptionClass); + return job.retryCount() < maxRetries && !isNonRetryableException(exceptionClass); } /** @@ -93,25 +93,22 @@ public int maxRetries() { } @Override - public boolean isRetryableException(final Class exceptionClass) { + public boolean isNonRetryableException(final Class exceptionClass) { if (exceptionClass == null) { return false; } - if (retryableExceptions.isEmpty()) { - return true; // If no specific exceptions are set, all are retryable - } - return retryableExceptions.stream() + return nonRetryableExceptions.stream() .anyMatch(clazz -> clazz.isAssignableFrom(exceptionClass)); } @Override - public void addRetryableException(final Class exceptionClass) { - retryableExceptions.add(exceptionClass); + public void addNonRetryableException(final Class exceptionClass) { + nonRetryableExceptions.add(exceptionClass); } @Override - public Set> getRetryableExceptions() { - return Set.copyOf(retryableExceptions); + public Set> getNonRetryableExceptions() { + return Set.copyOf(nonRetryableExceptions); } } \ No newline at end of file diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobProcessingException.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobProcessingException.java index ef4d9148ffb0..b656c8f5b79b 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobProcessingException.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobProcessingException.java @@ -17,4 +17,15 @@ public class JobProcessingException extends RuntimeException { public JobProcessingException(String jobId, String reason, Throwable cause) { super("Error processing job " + jobId + ". Reason: " + reason, cause); } + + /** + * Constructs a new JobProcessingException with the specified job ID, reason, and cause. + * + * @param jobId The ID of the job that encountered an error during processing + * @param reason A description of why the error occurred + */ + public JobProcessingException(String jobId, String reason) { + super("Error processing job " + jobId + ". Reason: " + reason); + } + } \ No newline at end of file diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobValidationException.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobValidationException.java new file mode 100644 index 000000000000..00dca669a109 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobValidationException.java @@ -0,0 +1,31 @@ +package com.dotcms.jobs.business.error; + +/** + * Exception thrown when a job fails validation before or during processing. This exception provides + * information about which job failed validation, the reason for the validation failure, and the + * underlying cause (if available). + */ +public class JobValidationException extends RuntimeException { + + /** + * Constructs a new JobValidationException with the specified job ID, reason, and cause. + * + * @param jobId The ID of the job that failed validation + * @param reason A description of why the validation failed + * @param cause The underlying cause of the validation failure (can be null) + */ + public JobValidationException(String jobId, String reason, Throwable cause) { + super("Error processing job " + jobId + ". Reason: " + reason, cause); + } + + /** + * Constructs a new JobValidationException with the specified job ID and reason. + * + * @param jobId The ID of the job that failed validation + * @param reason A description of why the validation failed + */ + public JobValidationException(String jobId, String reason) { + super("Error processing job " + jobId + ". Reason: " + reason); + } + +} \ No newline at end of file diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/NoRetryStrategy.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/NoRetryStrategy.java new file mode 100644 index 000000000000..72b51c445e2a --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/error/NoRetryStrategy.java @@ -0,0 +1,46 @@ +package com.dotcms.jobs.business.error; + +import com.dotcms.jobs.business.job.Job; +import java.util.Collections; +import java.util.Set; +import javax.enterprise.context.ApplicationScoped; + +/** + * Implements a no-retry strategy for job processors that should never retry failed jobs. This + * strategy always returns false for shouldRetry and maintains an empty set of non-retryable + * exceptions since retries are never attempted. + */ +@ApplicationScoped +public class NoRetryStrategy implements RetryStrategy { + + @Override + public boolean shouldRetry(Job job, Class exceptionClass) { + return false; // Never retry + } + + @Override + public long nextRetryDelay(Job job) { + return 0; // Not used since retries never occur + } + + @Override + public int maxRetries() { + return 0; + } + + @Override + public boolean isNonRetryableException(Class exceptionClass) { + return true; // All exceptions are considered non-retryable + } + + @Override + public void addNonRetryableException(Class exceptionClass) { + // No-op since all exceptions are already non-retryable + } + + @Override + public Set> getNonRetryableExceptions() { + return Collections.emptySet(); // No need to track specific exceptions + } + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/RetryPolicyProcessor.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/RetryPolicyProcessor.java new file mode 100644 index 000000000000..1775684c8f33 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/error/RetryPolicyProcessor.java @@ -0,0 +1,93 @@ +package com.dotcms.jobs.business.error; + +import com.dotcms.jobs.business.processor.ExponentialBackoffRetryPolicy; +import com.dotcms.jobs.business.processor.JobProcessor; +import com.dotcms.jobs.business.processor.NoRetryPolicy; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +/** + * Processes retry policies for job processors. This class is responsible for interpreting retry + * policy annotations on job processor classes and creating appropriate RetryStrategy instances. + */ +@ApplicationScoped +public class RetryPolicyProcessor { + + private NoRetryStrategy noRetryStrategy; + + /** + * Default constructor required for CDI proxy creation. + */ + public RetryPolicyProcessor() { + // Default constructor for CDI + } + + @Inject + public RetryPolicyProcessor(NoRetryStrategy noRetryStrategy) { + this.noRetryStrategy = noRetryStrategy; + } + + /** + * Processes the retry policy for a given job processor class. + *

+ * Currently supports ExponentialBackoffRetryPolicy and NoRetryPolicy. + * + * @param processorClass The class of the job processor to process. + * @return A RetryStrategy based on the annotation present on the processor class, or null if no + * supported annotation is found. + * + * @see ExponentialBackoffRetryPolicy + * @see NoRetryPolicy + */ + public RetryStrategy processRetryPolicy(Class processorClass) { + + // Check for NoRetryPolicy + if (processorClass.isAnnotationPresent(NoRetryPolicy.class)) { + return noRetryStrategy; + } + + // Check for ExponentialBackoffRetryPolicy + if (processorClass.isAnnotationPresent(ExponentialBackoffRetryPolicy.class)) { + return processExponentialBackoffPolicy( + processorClass.getAnnotation(ExponentialBackoffRetryPolicy.class) + ); + } + + return null; + } + + /** + * Processes an ExponentialBackoffRetryPolicy annotation and creates an + * ExponentialBackoffRetryStrategy based on its parameters. + * + * @param policy The ExponentialBackoffRetryPolicy annotation to process + * @return An ExponentialBackoffRetryStrategy configured based on the annotation + */ + private RetryStrategy processExponentialBackoffPolicy(ExponentialBackoffRetryPolicy policy) { + + final long initialDelay = policy.initialDelay() != -1 ? policy.initialDelay() + : RetryStrategyProducer.DEFAULT_RETRY_STRATEGY_INITIAL_DELAY; + final long maxDelay = policy.maxDelay() != -1 ? policy.maxDelay() : + RetryStrategyProducer.DEFAULT_RETRY_STRATEGY_MAX_DELAY; + final double backoffFactor = policy.backoffFactor() != -1 ? policy.backoffFactor() + : RetryStrategyProducer.DEFAULT_RETRY_STRATEGY_BACK0FF_FACTOR; + final int maxRetries = policy.maxRetries() != -1 ? policy.maxRetries() + : RetryStrategyProducer.DEFAULT_RETRY_STRATEGY_MAX_RETRIES; + + Set> nonRetryableExceptions = new HashSet<>( + Arrays.asList(policy.nonRetryableExceptions()) + ); + + return new ExponentialBackoffRetryStrategy( + initialDelay, + maxDelay, + backoffFactor, + maxRetries, + nonRetryableExceptions + ); + } + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/RetryStrategy.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/RetryStrategy.java index 23f54600de02..256172478224 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/error/RetryStrategy.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/error/RetryStrategy.java @@ -35,25 +35,25 @@ public interface RetryStrategy { int maxRetries(); /** - * Determines whether a given exception is retryable according to this strategy. + * Determines whether a given exception is not retryable according to this strategy. * * @param exceptionClass The class of the exception to check. - * @return true if the exception is retryable, false otherwise. + * @return true if the exception is not retryable, false otherwise. */ - boolean isRetryableException(Class exceptionClass); + boolean isNonRetryableException(Class exceptionClass); /** - * Adds an exception class to the set of retryable exceptions. + * Adds an exception class to the set of non retryable exceptions. * - * @param exceptionClass The exception class to be considered retryable. + * @param exceptionClass The exception class to be considered non retryable. */ - void addRetryableException(Class exceptionClass); + void addNonRetryableException(Class exceptionClass); /** - * Returns an unmodifiable set of the currently registered retryable exceptions. + * Returns an unmodifiable set of the currently registered non retryable exceptions. * - * @return An unmodifiable set of retryable exception classes. + * @return An unmodifiable set of non retryable exception classes. */ - Set> getRetryableExceptions(); + Set> getNonRetryableExceptions(); } \ No newline at end of file diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/RetryStrategyProducer.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/RetryStrategyProducer.java index 2b9452d66713..f91b9a6755c9 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/error/RetryStrategyProducer.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/error/RetryStrategyProducer.java @@ -1,5 +1,6 @@ package com.dotcms.jobs.business.error; +import com.dotcms.jobs.business.processor.DefaultRetryStrategy; import com.dotmarketing.util.Config; import javax.enterprise.context.ApplicationScoped; import javax.enterprise.inject.Produces; @@ -38,6 +39,7 @@ public class RetryStrategyProducer { * @return An ExponentialBackoffRetryStrategy instance configured with the default values. */ @Produces + @DefaultRetryStrategy public RetryStrategy produceDefaultRetryStrategy() { return new ExponentialBackoffRetryStrategy( DEFAULT_RETRY_STRATEGY_INITIAL_DELAY, diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/DefaultRetryStrategy.java b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/DefaultRetryStrategy.java new file mode 100644 index 000000000000..82802b95c6b9 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/DefaultRetryStrategy.java @@ -0,0 +1,23 @@ +package com.dotcms.jobs.business.processor; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import java.lang.annotation.Documented; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import javax.inject.Qualifier; + +/** + * Qualifier annotation to identify the default retry strategy implementation. + */ +@Qualifier +@Retention(RUNTIME) +@Target({TYPE, METHOD, FIELD, PARAMETER}) +@Documented +public @interface DefaultRetryStrategy { + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/ExponentialBackoffRetryPolicy.java b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/ExponentialBackoffRetryPolicy.java new file mode 100644 index 000000000000..4e31497fda55 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/ExponentialBackoffRetryPolicy.java @@ -0,0 +1,66 @@ +package com.dotcms.jobs.business.processor; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation to specify an exponential backoff retry policy for job processors. This annotation + *

+ * should be applied at the class level to define the retry behavior for the entire job processor. + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +public @interface ExponentialBackoffRetryPolicy { + + /** + * Specifies the maximum number of retry attempts. + *

+ * If set to -1, the value will be taken from the configuration property + * 'DEFAULT_RETRY_STRATEGY_MAX_RETRIES'. + * + * @return the maximum number of retries, or -1 to use the config value + */ + int maxRetries() default -1; + + /** + * Specifies the initial delay between retry attempts in milliseconds. + *

+ * If set to -1, the value will be taken from the configuration property + * 'DEFAULT_RETRY_STRATEGY_INITIAL_DELAY'. + * + * @return the initial delay in milliseconds, or -1 to use the config value + */ + long initialDelay() default -1; + + /** + * Specifies the maximum delay between retry attempts in milliseconds. + *

+ * If set to -1, the value will be taken from the configuration property + * 'DEFAULT_RETRY_STRATEGY_MAX_DELAY'. + * + * @return the maximum delay in milliseconds, or -1 to use the config value + */ + long maxDelay() default -1; + + /** + * Specifies the factor by which the delay increases with each retry attempt. + *

+ * If set to -1, the value will be taken from the configuration property + * 'DEFAULT_RETRY_STRATEGY_BACK0FF_FACTOR'. + * + * @return the backoff factor, or -1 to use the config value + */ + double backoffFactor() default -1; + + /** + * Specifies the exception types that should not be retried. If an empty array is provided, all + * exceptions will be considered retryable. + * + * @return an array of Throwable classes representing non-retryable exceptions + */ + Class[] nonRetryableExceptions() default {}; +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/NoRetryPolicy.java b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/NoRetryPolicy.java new file mode 100644 index 000000000000..527805fba76f --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/NoRetryPolicy.java @@ -0,0 +1,28 @@ +package com.dotcms.jobs.business.processor; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation to explicitly specify that a job processor should not retry failed jobs. + * This provides a more semantic way to indicate no-retry behavior compared to setting + * maxRetries=0 in ExponentialBackoffRetryPolicy. + * + *

Usage example:

+ *
+ * {@literal @}NoRetryPolicy
+ * {@literal @}Queue("myQueue")
+ * public class MyJobProcessor implements JobProcessor {
+ *     // Implementation
+ * }
+ * 
+ */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +public @interface NoRetryPolicy { + // No elements needed - presence of annotation is sufficient +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/FailJob.java b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/FailSuccessJob.java similarity index 69% rename from dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/FailJob.java rename to dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/FailSuccessJob.java index a70a4a4103b6..ae34091b2bbd 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/FailJob.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/FailSuccessJob.java @@ -6,13 +6,14 @@ import com.dotmarketing.exception.DotRuntimeException; import java.util.Map; -@Queue("fail") -public class FailJob implements JobProcessor { +@Queue("failSuccess") +public class FailSuccessJob implements JobProcessor { @Override public void process(Job job) { - - throw new DotRuntimeException( "Failed job !"); + if (job.parameters().containsKey("fail")) { + throw new DotRuntimeException("Failed job !"); + } } @Override diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java new file mode 100644 index 000000000000..5c586ee62a59 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java @@ -0,0 +1,728 @@ +package com.dotcms.jobs.business.processor.impl; + +import com.dotcms.contenttype.model.type.ContentType; +import com.dotcms.jobs.business.error.JobCancellationException; +import com.dotcms.jobs.business.error.JobProcessingException; +import com.dotcms.jobs.business.error.JobValidationException; +import com.dotcms.jobs.business.job.Job; +import com.dotcms.jobs.business.processor.Cancellable; +import com.dotcms.jobs.business.processor.ExponentialBackoffRetryPolicy; +import com.dotcms.jobs.business.processor.JobProcessor; +import com.dotcms.jobs.business.processor.NoRetryPolicy; +import com.dotcms.jobs.business.processor.Queue; +import com.dotcms.jobs.business.util.JobUtil; +import com.dotcms.repackage.com.csvreader.CsvReader; +import com.dotcms.rest.api.v1.temp.DotTempFile; +import com.dotmarketing.beans.Host; +import com.dotmarketing.business.APILocator; +import com.dotmarketing.db.HibernateUtil; +import com.dotmarketing.exception.DotDataException; +import com.dotmarketing.exception.DotHibernateException; +import com.dotmarketing.exception.DotSecurityException; +import com.dotmarketing.portlets.contentlet.action.ImportAuditUtil; +import com.dotmarketing.util.AdminLogger; +import com.dotmarketing.util.FileUtil; +import com.dotmarketing.util.ImportUtil; +import com.dotmarketing.util.Logger; +import com.google.common.hash.Hashing; +import com.liferay.portal.model.User; +import com.liferay.portal.util.Constants; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Calendar; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.LongConsumer; + +/** + * Processor implementation for handling content import operations in dotCMS. This class provides + * functionality to import content from CSV files, with support for both preview and publish + * operations, as well as multilingual content handling. + * + *

The processor implements both {@link JobProcessor} and {@link Cancellable} interfaces to + * provide job processing and cancellation capabilities. It's annotated with {@link Queue} to + * specify the queue name and {@link ExponentialBackoffRetryPolicy} to define retry behavior.

+ * + *

Key features:

+ *
    + *
  • Support for both preview and publish operations
  • + *
  • Multilingual content import capabilities
  • + *
  • Progress tracking during import
  • + *
  • Cancellation support
  • + *
  • Validation of import parameters and content
  • + *
+ * + * @see JobProcessor + * @see Cancellable + * @see Queue + * @see ExponentialBackoffRetryPolicy + */ +@Queue("importContentlets") +@NoRetryPolicy +public class ImportContentletsProcessor implements JobProcessor, Cancellable { + + private static final String PARAMETER_LANGUAGE = "language"; + private static final String PARAMETER_FIELDS = "fields"; + private static final String PARAMETER_USER_ID = "userId"; + private static final String PARAMETER_SITE_IDENTIFIER = "siteIdentifier"; + private static final String PARAMETER_SITE_NAME = "siteName"; + private static final String PARAMETER_CONTENT_TYPE = "contentType"; + private static final String PARAMETER_WORKFLOW_ACTION_ID = "workflowActionId"; + private static final String PARAMETER_CMD = Constants.CMD; + private static final String CMD_PREVIEW = com.dotmarketing.util.Constants.PREVIEW; + private static final String CMD_PUBLISH = com.dotmarketing.util.Constants.PUBLISH; + + private static final String LANGUAGE_CODE_HEADER = "languageCode"; + private static final String COUNTRY_CODE_HEADER = "countryCode"; + + /** + * Flag to track cancellation requests for the current import operation. + */ + private final AtomicBoolean cancellationRequested = new AtomicBoolean(false); + + /** + * Storage for metadata about the import operation results. + */ + private Map resultMetadata = new HashMap<>(); + + /** + * Processes a content import job. This method serves as the main entry point for the import + * operation and handles both preview and publish modes. + * + *

The method performs the following steps:

+ *
    + *
  1. Validates the input parameters and retrieves the necessary user information
  2. + *
  3. Retrieves and validates the import file
  4. + *
  5. Sets up progress tracking
  6. + *
  7. Executes either preview or publish operation based on the command
  8. + *
  9. Ensures proper progress updates throughout the process
  10. + *
+ * + * @param job The job containing import parameters and configuration + * @throws JobProcessingException if any error occurs during processing + */ + @Override + public void process(final Job job) throws JobProcessingException { + + final String command = getCommand(job); + + final User user; + try { + user = getUser(job); + } catch (Exception e) { + Logger.error(this, "Error retrieving user", e); + throw new JobProcessingException(job.id(), "Error retrieving user", e); + } + + Logger.info(this, String.format("Processing import contentlets job [%s], " + + "with command [%s] and user [%s]", job.id(), command, user.getUserId())); + + // Retrieving the import file + Optional tempFile = JobUtil.retrieveTempFile(job); + if (tempFile.isEmpty()) { + Logger.error(this.getClass(), "Unable to retrieve the import file. Quitting the job."); + throw new JobValidationException(job.id(), "Unable to retrieve the import file."); + } + + // Validate the job has the required data + validate(job); + + final var language = getLanguage(job); + final var fileToImport = tempFile.get().file; + final long totalLines = totalLines(job, fileToImport); + final Charset charset = language == -1 ? + Charset.defaultCharset() : FileUtil.detectEncodeType(fileToImport); + + // Create a progress callback function + final var progressTracker = job.progressTracker().orElseThrow( + () -> new JobProcessingException(job.id(), "Progress tracker not found") + ); + final LongConsumer progressCallback = processedLines -> { + float progressPercentage = (float) processedLines / totalLines; + // This ensures the progress is between 0.0 and 1.0 + progressTracker.updateProgress(Math.min(1.0f, Math.max(0.0f, progressPercentage))); + }; + + if (CMD_PREVIEW.equals(command)) { + handlePreview(job, language, fileToImport, charset, user, progressCallback); + } else if (CMD_PUBLISH.equals(command)) { + handlePublish(job, language, fileToImport, charset, user, progressCallback); + } + + if (!cancellationRequested.get()) { + // Ensure the progress is at 100% when the job is done + progressTracker.updateProgress(1.0f); + } + } + + /** + * Handles cancellation requests for the import operation. When called, it marks the operation + * for cancellation. + * + * @param job The job to be cancelled + * @throws JobCancellationException if any error occurs during cancellation + */ + @Override + public void cancel(Job job) throws JobCancellationException { + + Logger.info(this.getClass(), "Job cancellation requested: " + job.id()); + cancellationRequested.set(true); + + final var importId = jobIdToLong(job.id()); + ImportAuditUtil.cancelledImports.put(importId, Calendar.getInstance().getTime()); + } + + /** + * Retrieves metadata about the import operation results. + * + * @param job The job whose metadata is being requested + * @return A map containing result metadata, or an empty map if no metadata is available + */ + @Override + public Map getResultMetadata(Job job) { + + if (resultMetadata.isEmpty()) { + return Collections.emptyMap(); + } + + return resultMetadata; + } + + /** + * Handles the preview phase of content import. This method analyzes the CSV file and provides + * information about potential issues without actually importing the content. + * + * @param job The import job configuration + * @param language The target language for import + * @param fileToImport The CSV file to be imported + * @param charset The character encoding of the import file + * @param user The user performing the import + * @param progressCallback Callback for tracking import progress + */ + private void handlePreview(final Job job, long language, final File fileToImport, + final Charset charset, final User user, final LongConsumer progressCallback) { + + try { + try (Reader reader = new BufferedReader( + new InputStreamReader(new FileInputStream(fileToImport), charset))) { + + CsvReader csvReader = createCsvReader(reader); + CsvHeaderInfo headerInfo = processHeadersBasedOnLanguage(job, language, csvReader); + + final var previewResult = generatePreview(job, user, + headerInfo.headers, csvReader, headerInfo.languageCodeColumn, + headerInfo.countryCodeColumn, progressCallback); + resultMetadata = new HashMap<>(previewResult); + } + } catch (Exception e) { + + try { + HibernateUtil.rollbackTransaction(); + } catch (DotHibernateException he) { + Logger.error(this, he.getMessage(), he); + } + + final var errorMessage = "An error occurred when analyzing the CSV file."; + Logger.error(this, errorMessage, e); + throw new JobProcessingException(job.id(), errorMessage, e); + } + } + + /** + * Handles the publish phase of content import. This method performs the actual content import + * operation, creating or updating content based on the CSV file. + * + * @param job The import job configuration + * @param language The target language for import + * @param fileToImport The CSV file to be imported + * @param charset The character encoding of the import file + * @param user The user performing the import + * @param progressCallback Callback for tracking import progress + */ + private void handlePublish(final Job job, long language, final File fileToImport, + final Charset charset, final User user, final LongConsumer progressCallback) { + + AdminLogger.log( + ImportContentletsProcessor.class, "process", + "Importing Contentlets", user + ); + + try { + try (Reader reader = new BufferedReader( + new InputStreamReader(new FileInputStream(fileToImport), charset))) { + + CsvReader csvReader = createCsvReader(reader); + CsvHeaderInfo headerInfo = readPublishHeaders(language, csvReader); + + final var importResults = processFile(job, user, headerInfo.headers, csvReader, + headerInfo.languageCodeColumn, headerInfo.countryCodeColumn, + progressCallback); + resultMetadata = new HashMap<>(importResults); + } + } catch (Exception e) { + + try { + HibernateUtil.rollbackTransaction(); + } catch (DotHibernateException he) { + Logger.error(this, he.getMessage(), he); + } + + final var errorMessage = "An error occurred when importing the CSV file."; + Logger.error(this, errorMessage, e); + throw new JobProcessingException(job.id(), errorMessage, e); + } finally { + final var importId = jobIdToLong(job.id()); + ImportAuditUtil.cancelledImports.remove(importId); + } + } + + /** + * Reads and analyzes the content of the CSV import file to determine potential errors, + * inconsistencies or warnings, and provide the user with useful information regarding the + * contents of the file. + * + * @param job - The {@link Job} being processed. + * @param user - The {@link User} performing this action. + * @param csvHeaders - The headers that make up the CSV file. + * @param csvReader - The actual data contained in the CSV file. + * @param languageCodeHeaderColumn - The column name containing the language code. + * @param countryCodeHeaderColumn - The column name containing the country code. + * @param progressCallback - The callback function to update the progress of the job. + * @throws DotDataException An error occurred when analyzing the CSV file. + */ + private Map> generatePreview(final Job job, final User user, + final String[] csvHeaders, final CsvReader csvReader, + final int languageCodeHeaderColumn, int countryCodeHeaderColumn, + final LongConsumer progressCallback) throws DotDataException { + + final var currentSiteId = getSiteIdentifier(job); + final var currentSiteName = getSiteName(job); + final var contentType = getContentType(job); + final var fields = getFields(job); + final var language = getLanguage(job); + final var workflowActionId = getWorkflowActionId(job); + final var httpReq = JobUtil.generateMockRequest(user, currentSiteName); + + Logger.info(this, "-------- Starting Content Import Preview -------- "); + Logger.info(this, String.format("-> Content Type ID: %s", contentType)); + + return ImportUtil.importFile(0L, currentSiteId, contentType, fields, true, + (language == -1), user, language, csvHeaders, csvReader, languageCodeHeaderColumn, + countryCodeHeaderColumn, workflowActionId, httpReq, progressCallback); + } + + /** + * Executes the content import process after the review process has been run and displayed to + * the user. + * + * @param job - The {@link Job} being processed. + * @param user - The {@link User} performing this action. + * @param csvHeaders - The headers that make up the CSV file. + * @param csvReader - The actual data contained in the CSV file. + * @param languageCodeHeaderColumn - The column name containing the language code. + * @param countryCodeHeaderColumn - The column name containing the country code. + * @param progressCallback - The callback function to update the progress of the job. + * @return The status of the content import performed by dotCMS. This provides information + * regarding inconsistencies, errors, warnings and/or precautions to the user. + * @throws DotDataException An error occurred when importing the CSV file. + */ + private Map> processFile(final Job job, final User user, + final String[] csvHeaders, final CsvReader csvReader, + final int languageCodeHeaderColumn, final int countryCodeHeaderColumn, + final LongConsumer progressCallback) throws DotDataException { + + final var currentSiteId = getSiteIdentifier(job); + final var currentSiteName = getSiteName(job); + final var contentType = getContentType(job); + final var fields = getFields(job); + final var language = getLanguage(job); + final var workflowActionId = getWorkflowActionId(job); + final var httpReq = JobUtil.generateMockRequest(user, currentSiteName); + final var importId = jobIdToLong(job.id()); + + Logger.info(this, "-------- Starting Content Import Process -------- "); + Logger.info(this, String.format("-> Content Type ID: %s", contentType)); + + return ImportUtil.importFile(importId, currentSiteId, contentType, fields, false, + (language == -1), user, language, csvHeaders, csvReader, languageCodeHeaderColumn, + countryCodeHeaderColumn, workflowActionId, httpReq, progressCallback); + } + + /** + * Retrieve the command from the job parameters + * + * @param job input job + * @return the command from the job parameters, if not found, return the default value "preview" + */ + private String getCommand(final Job job) { + + if (!job.parameters().containsKey(PARAMETER_CMD)) { + return CMD_PREVIEW; + } + + return (String) job.parameters().get(PARAMETER_CMD); + } + + /** + * Retrieve the user from the job parameters + * + * @param job input job + * @return the user from the job parameters + * @throws DotDataException if an error occurs during the user retrieval + * @throws DotSecurityException if we don't have the necessary permissions to retrieve the user + */ + private User getUser(final Job job) throws DotDataException, DotSecurityException { + final var userId = (String) job.parameters().get(PARAMETER_USER_ID); + return APILocator.getUserAPI().loadUserById(userId); + } + + /** + * Retrieves the site identifier from the job parameters. + * + * @param job The job containing the parameters + * @return The site identifier string, or null if not present in parameters + */ + private String getSiteIdentifier(final Job job) { + return (String) job.parameters().get(PARAMETER_SITE_IDENTIFIER); + } + + /** + * Retrieves the site name from the job parameters. + * + * @param job The job containing the parameters + * @return The site name string, or null if not present in parameters + */ + private String getSiteName(final Job job) { + return (String) job.parameters().get(PARAMETER_SITE_NAME); + } + + /** + * Retrieves the content type from the job parameters. + * + * @param job The job containing the parameters + * @return The content type string, or null if not present in parameters + */ + private String getContentType(final Job job) { + return (String) job.parameters().get(PARAMETER_CONTENT_TYPE); + } + + /** + * Retrieves the workflow action ID from the job parameters. + * + * @param job The job containing the parameters + * @return The workflow action ID string, or null if not present in parameters + */ + private String getWorkflowActionId(final Job job) { + return (String) job.parameters().get(PARAMETER_WORKFLOW_ACTION_ID); + } + + /** + * Retrieves the language setting from the job parameters. Handles both string and long + * parameter types. + * + * @param job The job containing the parameters + * @return The language ID as a long, or -1 if not specified + */ + private long getLanguage(final Job job) { + + if (!job.parameters().containsKey(PARAMETER_LANGUAGE) + || job.parameters().get(PARAMETER_LANGUAGE) == null) { + return -1; + } + + final Object language = job.parameters().get(PARAMETER_LANGUAGE); + + if (language instanceof String) { + return Long.parseLong((String) language); + } + + return (long) language; + } + + /** + * Retrieves the fields array from the job parameters. + * + * @param job The job containing the parameters + * @return An array of field strings, or an empty array if no fields are specified + */ + public String[] getFields(final Job job) { + + if (!job.parameters().containsKey(PARAMETER_FIELDS) + || job.parameters().get(PARAMETER_FIELDS) == null) { + return new String[0]; + } + + final var fields = job.parameters().get(PARAMETER_FIELDS); + if (fields instanceof List) { + return ((List) fields).toArray(new String[0]); + } + + return (String[]) fields; + } + + /** + * Validates the job parameters and content type. Performs security checks to prevent + * unauthorized host imports. + * + * @param job The job to validate + * @throws JobValidationException if validation fails + * @throws JobProcessingException if an error occurs during content type validation + */ + private void validate(final Job job) { + + if (getContentType(job) != null && getContentType(job).isEmpty()) { + Logger.error(this.getClass(), "A Content Type is required"); + throw new JobValidationException(job.id(), "A Content Type is required"); + } else if (getWorkflowActionId(job) != null && getWorkflowActionId(job).isEmpty()) { + Logger.error(this.getClass(), "Workflow action type is required"); + throw new JobValidationException(job.id(), "Workflow action type is required"); + } + + // Security measure to prevent invalid attempts to import a host. + try { + final ContentType hostContentType = APILocator.getContentTypeAPI( + APILocator.systemUser()).find(Host.HOST_VELOCITY_VAR_NAME + ); + final boolean isHost = (hostContentType.id().equals(getContentType(job))); + if (isHost) { + Logger.error(this, "Invalid attempt to import a host."); + throw new JobValidationException(job.id(), "Invalid attempt to import a host."); + } + } catch (DotSecurityException | DotDataException e) { + throw new JobProcessingException(job.id(), "Error validating content type", e); + } + } + + /** + * Utility method to convert a job ID to a long value for internal processing. Uses FarmHash for + * efficient hash generation and distribution. + * + * @param jobId The string job identifier + * @return A long value representing the job ID + */ + public static long jobIdToLong(final String jobId) { + + // Use FarmHash for good distribution and speed + long hashValue = Hashing.farmHashFingerprint64() + .hashString(jobId, StandardCharsets.UTF_8).asLong(); + + // Ensure the value is positive (in the upper half of the bigint range) + return Math.abs(hashValue); + } + + /** + * Count the number of lines in the file + * + * @param dotTempFile temporary file + * @return the number of lines in the file + */ + private Long totalLines(final Job job, final File dotTempFile) { + + long totalCount; + try (BufferedReader reader = new BufferedReader(new FileReader(dotTempFile))) { + totalCount = reader.lines().count(); + if (totalCount == 0) { + Logger.info(this.getClass(), + "No lines in CSV import file: " + dotTempFile.getName()); + } + } catch (Exception e) { + Logger.error(this.getClass(), + "Error calculating total lines in CSV import file: " + e.getMessage()); + throw new JobProcessingException(job.id(), + "Error calculating total lines in CSV import file", e); + } + + return totalCount; + } + + /** + * Reads and processes headers for publishing operation. + * + * @param language The target language for import + * @param csvreader The CSV reader containing the file data + * @return CsvHeaderInfo containing processed header information + * @throws IOException if an error occurs reading the CSV file + */ + private CsvHeaderInfo readPublishHeaders(long language, CsvReader csvreader) + throws IOException { + if (language == -1 && csvreader.readHeaders()) { + return findLanguageColumnsInHeaders(csvreader.getHeaders()); + } + return new CsvHeaderInfo(null, -1, -1); + } + + /** + * Locates language-related columns in CSV headers. + * + * @param headers Array of CSV header strings + * @return CsvHeaderInfo containing the positions of language and country code columns + */ + private CsvHeaderInfo findLanguageColumnsInHeaders(String[] headers) { + + int languageCodeColumn = -1; + int countryCodeColumn = -1; + + for (int column = 0; column < headers.length; ++column) { + if (headers[column].equals(LANGUAGE_CODE_HEADER)) { + languageCodeColumn = column; + } + if (headers[column].equals(COUNTRY_CODE_HEADER)) { + countryCodeColumn = column; + } + if (languageCodeColumn != -1 && countryCodeColumn != -1) { + break; + } + } + + return new CsvHeaderInfo(headers, languageCodeColumn, countryCodeColumn); + } + + /** + * Creates a CSV reader with appropriate configuration for import operations. + * + * @param reader The source reader for CSV content + * @return A configured CsvReader instance + */ + private CsvReader createCsvReader(final Reader reader) { + CsvReader csvreader = new CsvReader(reader); + csvreader.setSafetySwitch(false); + return csvreader; + } + + /** + * Processes CSV headers based on the specified language configuration. + * + * @param job The current import job + * @param language The target language for import + * @param csvReader The CSV reader to process headers from + * @return CsvHeaderInfo containing processed header information + * @throws IOException if an error occurs reading the CSV file + */ + private CsvHeaderInfo processHeadersBasedOnLanguage(final Job job, final long language, + final CsvReader csvReader) throws IOException { + if (language != -1) { + validateLanguage(job, language); + return new CsvHeaderInfo(null, -1, -1); + } + + return processMultilingualHeaders(job, csvReader); + } + + /** + * Validates the language configuration for import operations. + * + * @param job The current import job + * @param language The language identifier to validate + */ + private void validateLanguage(Job job, long language) { + if (language == 0) { + final var errorMessage = "Please select a valid Language."; + Logger.error(this, errorMessage); + throw new JobValidationException(job.id(), errorMessage); + } + } + + /** + * Processes headers for multilingual content imports. + * + * @param job The current import job + * @param csvReader The CSV reader to process headers from + * @return CsvHeaderInfo containing processed multilingual header information + * @throws IOException if an error occurs reading the CSV file + */ + private CsvHeaderInfo processMultilingualHeaders(final Job job, final CsvReader csvReader) + throws IOException { + + if (getFields(job).length == 0) { + final var errorMessage = + "A key identifying the different Language versions of the same " + + "content must be defined when importing multilingual files."; + Logger.error(this, errorMessage); + throw new JobValidationException(job.id(), errorMessage); + } + + if (!csvReader.readHeaders()) { + final var errorMessage = "An error occurred when attempting to read the CSV file headers."; + Logger.error(this, errorMessage); + throw new JobProcessingException(job.id(), errorMessage); + } + + String[] headers = csvReader.getHeaders(); + return findLanguageColumns(job, headers); + } + + /** + * Locates language-related columns in CSV headers. + * + * @param headers Array of CSV header strings + * @return CsvHeaderInfo containing the positions of language and country code columns + */ + private CsvHeaderInfo findLanguageColumns(Job job, String[] headers) + throws JobProcessingException { + + int languageCodeColumn = -1; + int countryCodeColumn = -1; + + for (int column = 0; column < headers.length; ++column) { + if (headers[column].equals(LANGUAGE_CODE_HEADER)) { + languageCodeColumn = column; + } + if (headers[column].equals(COUNTRY_CODE_HEADER)) { + countryCodeColumn = column; + } + if (languageCodeColumn != -1 && countryCodeColumn != -1) { + break; + } + } + + validateLanguageColumns(job, languageCodeColumn, countryCodeColumn); + return new CsvHeaderInfo(headers, languageCodeColumn, countryCodeColumn); + } + + /** + * Performs validation of language columns for multilingual imports. + * + * @param job The current import job + * @param languageCodeColumn The index of the language code column + * @param countryCodeColumn The index of the country code column + * @throws JobValidationException if the required language columns are not found + */ + private void validateLanguageColumns(Job job, int languageCodeColumn, int countryCodeColumn) + throws JobProcessingException { + if (languageCodeColumn == -1 || countryCodeColumn == -1) { + final var errorMessage = "languageCode and countryCode fields are mandatory in the CSV " + + "file when importing multilingual content."; + Logger.error(this, errorMessage); + throw new JobValidationException(job.id(), errorMessage); + } + } + + /** + * Container class for CSV header information, particularly for handling language-related + * columns in multilingual imports. + */ + private static class CsvHeaderInfo { + + final String[] headers; + final int languageCodeColumn; + final int countryCodeColumn; + + CsvHeaderInfo(String[] headers, int languageCodeColumn, int countryCodeColumn) { + this.headers = headers; + this.languageCodeColumn = languageCodeColumn; + this.countryCodeColumn = countryCodeColumn; + } + } + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/LargeFileReader.java b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/LargeFileReader.java index a0f4025f5f66..36245a0c4ba6 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/LargeFileReader.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/LargeFileReader.java @@ -6,15 +6,13 @@ import com.dotcms.jobs.business.processor.JobProcessor; import com.dotcms.jobs.business.processor.ProgressTracker; import com.dotcms.jobs.business.processor.Queue; +import com.dotcms.jobs.business.util.JobUtil; import com.dotcms.rest.api.v1.temp.DotTempFile; -import com.dotcms.rest.api.v1.temp.TempFileAPI; -import com.dotmarketing.business.APILocator; import com.dotmarketing.exception.DotRuntimeException; import com.dotmarketing.util.Logger; import io.vavr.control.Try; import java.io.BufferedReader; import java.io.FileReader; -import java.util.List; import java.util.Map; import java.util.Optional; @@ -36,7 +34,7 @@ public void process(Job job) { Logger.info(this.getClass(), "Processing job: " + job.id()); Map params = job.parameters(); - Optional tempFile = tempFile(params); + Optional tempFile = JobUtil.retrieveTempFile(job); if (tempFile.isEmpty()) { Logger.error(this.getClass(), "Unable to retrieve the temporary file. Quitting the job."); throw new DotRuntimeException("Unable to retrieve the temporary file."); @@ -181,36 +179,6 @@ Optional linesParam(Map params) { return Optional.of(nLines); } - /** - * Retrieve the temporary file from the parameters - * - * @param params input parameters - * @return the temporary file - */ - Optional tempFile(Map params) { - // Extract parameters - String tempFileId = (String) params.get("tempFileId"); - - final Object requestFingerPrintRaw = params.get("requestFingerPrint"); - if (!(requestFingerPrintRaw instanceof String)) { - Logger.error(this.getClass(), - "Parameter 'requestFingerPrint' is required and must be a string."); - return Optional.empty(); - } - final String requestFingerPrint = (String) requestFingerPrintRaw; - - // Retrieve the temporary file - final TempFileAPI tempFileAPI = APILocator.getTempFileAPI(); - final Optional tempFile = tempFileAPI.getTempFile(List.of(requestFingerPrint), - tempFileId); - if (tempFile.isEmpty()) { - Logger.error(this.getClass(), "Temporary file not found: " + tempFileId); - return Optional.empty(); - } - - return tempFile; - } - /** * Provide metadata for the job result. * @param job The job for which to provide metadata. diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java index 45cdebfabcf9..af9d434bfcb7 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java @@ -76,6 +76,36 @@ JobPaginatedResult getCompletedJobs(String queueName, LocalDateTime startDate, */ JobPaginatedResult getJobs(int page, int pageSize) throws JobQueueDataException; + /** + * Retrieves a list of active jobs, meaning jobs that are currently being processed. + * + * @param page The page number (for pagination). + * @param pageSize The number of items per page. + * @return A result object containing the list of active jobs and pagination information. + * @throws JobQueueDataException if there's a data storage error while fetching the jobs + */ + JobPaginatedResult getActiveJobs(int page, int pageSize) throws JobQueueDataException; + + /** + * Retrieves a list of completed jobs. + * + * @param page The page number (for pagination). + * @param pageSize The number of items per page. + * @return A result object containing the list of completed jobs and pagination information. + * @throws JobQueueDataException if there's a data storage error while fetching the jobs + */ + JobPaginatedResult getCompletedJobs(int page, int pageSize) throws JobQueueDataException; + + /** + * Retrieves a list of canceled jobs. + * + * @param page The page number (for pagination). + * @param pageSize The number of items per page. + * @return A result object containing the list of canceled jobs and pagination information. + * @throws JobQueueDataException if there's a data storage error while fetching the jobs + */ + JobPaginatedResult getCanceledJobs(int page, int pageSize) throws JobQueueDataException; + /** * Retrieves a list of failed jobs. * diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java index 229b7c68111f..dfc179106563 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java @@ -53,7 +53,6 @@ * @see Job * @see JobState */ - public class PostgresJobQueue implements JobQueue { private static final String CREATE_JOB_QUEUE_QUERY = "INSERT INTO job_queue " @@ -76,7 +75,7 @@ public class PostgresJobQueue implements JobQueue { + "ORDER BY priority DESC, created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED) " + "RETURNING *"; - private static final String GET_ACTIVE_JOBS_QUERY = + private static final String GET_ACTIVE_JOBS_QUERY_FOR_QUEUE = "WITH total AS (SELECT COUNT(*) AS total_count " + " FROM job WHERE queue_name = ? AND state IN (?, ?) " + "), " + @@ -86,7 +85,7 @@ public class PostgresJobQueue implements JobQueue { ") " + "SELECT p.*, t.total_count FROM total t LEFT JOIN paginated_data p ON true"; - private static final String GET_COMPLETED_JOBS_QUERY = + private static final String GET_COMPLETED_JOBS_QUERY_FOR_QUEUE = "WITH total AS (SELECT COUNT(*) AS total_count " + " FROM job WHERE queue_name = ? AND state = ? AND completed_at BETWEEN ? AND ? " + "), " + @@ -96,15 +95,15 @@ public class PostgresJobQueue implements JobQueue { ") " + "SELECT p.*, t.total_count FROM total t LEFT JOIN paginated_data p ON true"; - private static final String GET_FAILED_JOBS_QUERY = + private static final String GET_JOBS_QUERY_BY_STATE = "WITH total AS (" + " SELECT COUNT(*) AS total_count FROM job " + - " WHERE state = ? " + + " WHERE state IN $??$ " + "), " + "paginated_data AS (" + " SELECT * " + - " FROM job WHERE state = ? " + - " ORDER BY updated_at DESC " + + " FROM job WHERE state IN $??$ " + + " ORDER BY $ORDER_BY$ DESC " + " LIMIT ? OFFSET ? " + ") " + "SELECT p.*, t.total_count " + @@ -153,6 +152,9 @@ public class PostgresJobQueue implements JobQueue { private static final String COLUMN_TOTAL_COUNT = "total_count"; + private static final String REPLACE_TOKEN_PARAMETERS = "$??$"; + private static final String REPLACE_TOKEN_ORDER_BY = "$ORDER_BY$"; + /** * Jackson mapper configuration and lazy initialized instance. */ @@ -247,7 +249,7 @@ public JobPaginatedResult getActiveJobs(final String queueName, final int page, try { DotConnect dc = new DotConnect(); - dc.setSQL(GET_ACTIVE_JOBS_QUERY); + dc.setSQL(GET_ACTIVE_JOBS_QUERY_FOR_QUEUE); dc.addParam(queueName); dc.addParam(JobState.PENDING.name()); dc.addParam(JobState.RUNNING.name()); @@ -259,8 +261,10 @@ public JobPaginatedResult getActiveJobs(final String queueName, final int page, return jobPaginatedResult(page, pageSize, dc); } catch (DotDataException e) { - Logger.error(this, "Database error while fetching active jobs", e); - throw new JobQueueDataException("Database error while fetching active jobs", e); + Logger.error(this, + "Database error while fetching active jobs by queue", e); + throw new JobQueueDataException( + "Database error while fetching active jobs by queue", e); } } @@ -272,7 +276,7 @@ public JobPaginatedResult getCompletedJobs(final String queueName, try { DotConnect dc = new DotConnect(); - dc.setSQL(GET_COMPLETED_JOBS_QUERY); + dc.setSQL(GET_COMPLETED_JOBS_QUERY_FOR_QUEUE); dc.addParam(queueName); dc.addParam(JobState.COMPLETED.name()); dc.addParam(Timestamp.valueOf(startDate)); @@ -286,8 +290,10 @@ public JobPaginatedResult getCompletedJobs(final String queueName, return jobPaginatedResult(page, pageSize, dc); } catch (DotDataException e) { - Logger.error(this, "Database error while fetching completed jobs", e); - throw new JobQueueDataException("Database error while fetching completed jobs", e); + Logger.error(this, + "Database error while fetching completed jobs by queue", e); + throw new JobQueueDataException( + "Database error while fetching completed jobs by queue", e); } } @@ -308,13 +314,92 @@ public JobPaginatedResult getJobs(final int page, final int pageSize) } } + @Override + public JobPaginatedResult getActiveJobs(final int page, final int pageSize) + throws JobQueueDataException { + + try { + + var query = GET_JOBS_QUERY_BY_STATE + .replace(REPLACE_TOKEN_PARAMETERS, "(?, ?)") + .replace(REPLACE_TOKEN_ORDER_BY, "created_at"); + + DotConnect dc = new DotConnect(); + dc.setSQL(query); + dc.addParam(JobState.PENDING.name()); + dc.addParam(JobState.RUNNING.name()); + dc.addParam(JobState.PENDING.name()); // Repeated for paginated_data CTE + dc.addParam(JobState.RUNNING.name()); + dc.addParam(pageSize); + dc.addParam((page - 1) * pageSize); + + return jobPaginatedResult(page, pageSize, dc); + } catch (DotDataException e) { + Logger.error(this, "Database error while fetching active jobs", e); + throw new JobQueueDataException("Database error while fetching active jobs", e); + } + } + + @Override + public JobPaginatedResult getCompletedJobs(final int page, final int pageSize) + throws JobQueueDataException { + + try { + + var query = GET_JOBS_QUERY_BY_STATE + .replace(REPLACE_TOKEN_PARAMETERS, "(?)") + .replace(REPLACE_TOKEN_ORDER_BY, "completed_at"); + + DotConnect dc = new DotConnect(); + dc.setSQL(query); + dc.addParam(JobState.COMPLETED.name()); + dc.addParam(JobState.COMPLETED.name()); // Repeated for paginated_data CTE + dc.addParam(pageSize); + dc.addParam((page - 1) * pageSize); + + return jobPaginatedResult(page, pageSize, dc); + } catch (DotDataException e) { + Logger.error(this, "Database error while fetching completed jobs", e); + throw new JobQueueDataException("Database error while fetching completed jobs", e); + } + } + + @Override + public JobPaginatedResult getCanceledJobs(final int page, final int pageSize) + throws JobQueueDataException { + + try { + + var query = GET_JOBS_QUERY_BY_STATE + .replace(REPLACE_TOKEN_PARAMETERS, "(?)") + .replace(REPLACE_TOKEN_ORDER_BY, "completed_at"); + + DotConnect dc = new DotConnect(); + dc.setSQL(query); + dc.addParam(JobState.CANCELED.name()); + dc.addParam(JobState.CANCELED.name()); // Repeated for paginated_data CTE + dc.addParam(pageSize); + dc.addParam((page - 1) * pageSize); + + return jobPaginatedResult(page, pageSize, dc); + } catch (DotDataException e) { + Logger.error(this, "Database error while fetching cancelled jobs", e); + throw new JobQueueDataException("Database error while fetching cancelled jobs", e); + } + } + @Override public JobPaginatedResult getFailedJobs(final int page, final int pageSize) throws JobQueueDataException { try { + + var query = GET_JOBS_QUERY_BY_STATE + .replace(REPLACE_TOKEN_PARAMETERS, "(?)") + .replace(REPLACE_TOKEN_ORDER_BY, "updated_at"); + DotConnect dc = new DotConnect(); - dc.setSQL(GET_FAILED_JOBS_QUERY); + dc.setSQL(query); dc.addParam(JobState.FAILED.name()); dc.addParam(JobState.FAILED.name()); // Repeated for paginated_data CTE dc.addParam(pageSize); diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/util/JobUtil.java b/dotCMS/src/main/java/com/dotcms/jobs/business/util/JobUtil.java new file mode 100644 index 000000000000..6a6ec3650e4e --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/util/JobUtil.java @@ -0,0 +1,105 @@ +package com.dotcms.jobs.business.util; + +import com.dotcms.api.web.HttpServletRequestThreadLocal; +import com.dotcms.jobs.business.job.Job; +import com.dotcms.mock.request.FakeHttpRequest; +import com.dotcms.mock.request.MockHeaderRequest; +import com.dotcms.mock.request.MockSessionRequest; +import com.dotcms.rest.api.v1.temp.DotTempFile; +import com.dotcms.rest.api.v1.temp.TempFileAPI; +import com.dotmarketing.business.APILocator; +import com.dotmarketing.util.Logger; +import com.dotmarketing.util.UtilMethods; +import com.dotmarketing.util.WebKeys; +import com.liferay.portal.model.User; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import javax.servlet.http.HttpServletRequest; + +/** + * Utility class for job-related operations. + */ +public class JobUtil { + + private JobUtil() { + throw new IllegalStateException("Utility class"); + } + + /** + * Retrieves the temporary file associated with the given job. + * + * @param job The job containing the temporary file information in its parameters. + * @return An Optional containing the DotTempFile if found, or an empty Optional if not found or + * if an error occurs. + */ + public static Optional retrieveTempFile(final Job job) { + + if (job == null) { + Logger.error(JobUtil.class, "Job cannot be null"); + return Optional.empty(); + } + + Map params = job.parameters(); + if (params == null) { + Logger.error(JobUtil.class, "Job parameters cannot be null"); + return Optional.empty(); + } + + // Extract parameters + String tempFileId = (String) params.get("tempFileId"); + if (tempFileId == null) { + Logger.error(JobUtil.class, "Parameter 'tempFileId' is required"); + return Optional.empty(); + } + + final Object requestFingerPrintRaw = params.get("requestFingerPrint"); + if (!(requestFingerPrintRaw instanceof String)) { + Logger.error(JobUtil.class, + "Parameter 'requestFingerPrint' is required and must be a string."); + return Optional.empty(); + } + final String requestFingerPrint = (String) requestFingerPrintRaw; + + // Retrieve the temporary file + final TempFileAPI tempFileAPI = APILocator.getTempFileAPI(); + final Optional tempFile = tempFileAPI.getTempFile( + List.of(requestFingerPrint), tempFileId + ); + if (tempFile.isEmpty()) { + Logger.error(JobUtil.class, "Temporary file not found: " + tempFileId); + } + + return tempFile; + } + + /** + * Utility method to create or retrieve an HttpServletRequest when needed from job processors. + * Uses thread-local request if available, otherwise creates a mock request with the specified + * user and site information. + * + * @param user The user performing the import + * @param siteName The name of the site for the import + * @return An HttpServletRequest instance configured for the import operation + */ + public static HttpServletRequest generateMockRequest(final User user, final String siteName) { + + if (null != HttpServletRequestThreadLocal.INSTANCE.getRequest()) { + return HttpServletRequestThreadLocal.INSTANCE.getRequest(); + } + + final HttpServletRequest requestProxy = new MockSessionRequest( + new MockHeaderRequest( + new FakeHttpRequest(siteName, "/").request(), + "referer", + "https://" + siteName + "/fakeRefer") + .request()); + requestProxy.setAttribute(WebKeys.CMS_USER, user); + requestProxy.getSession().setAttribute(WebKeys.CMS_USER, user); + requestProxy.setAttribute(com.liferay.portal.util.WebKeys.USER_ID, + UtilMethods.extractUserIdOrNull(user)); + + return requestProxy; + } + +} diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobParams.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobParams.java index 16737ceae606..f95c7e606373 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobParams.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobParams.java @@ -25,7 +25,7 @@ public class JobParams { private String jsonParams; @FormDataParam("params") - private Map params; + private Map params; public InputStream getFileInputStream() { return fileInputStream; @@ -47,17 +47,14 @@ public void setJsonParams(String jsonParams) { this.jsonParams = jsonParams; } - public Map getParams() throws JsonProcessingException { - if (null == params) { - if (null == jsonParams){ - throw new IllegalArgumentException("Job Params must be passed as a json object in the params field."); - } + public Map getParams() throws JsonProcessingException { + if (null == params && (null != jsonParams && !jsonParams.isEmpty())) { params = new ObjectMapper().readValue(jsonParams, Map.class); } return params; } - public void setParams(Map params) { + public void setParams(Map params) { this.params = params; } diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java index cf990ad0fac9..1902207ce186 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java @@ -12,11 +12,13 @@ import com.dotcms.rest.api.v1.temp.DotTempFile; import com.dotcms.rest.api.v1.temp.TempFileAPI; import com.dotmarketing.business.APILocator; +import com.dotmarketing.business.web.WebAPILocator; import com.dotmarketing.exception.DoesNotExistException; import com.dotmarketing.exception.DotDataException; import com.dotmarketing.util.Logger; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.annotations.VisibleForTesting; +import com.liferay.portal.model.User; import java.io.InputStream; import java.lang.reflect.Constructor; import java.time.format.DateTimeFormatter; @@ -114,24 +116,81 @@ public JobQueueHelper(JobQueueManagerAPI jobQueueManagerAPI, JobProcessorScanner */ @VisibleForTesting void registerProcessor(final String queueName, final Class processor){ - jobQueueManagerAPI.registerProcessor(queueName.toLowerCase(), processor); + jobQueueManagerAPI.registerProcessor(queueName, processor); } /** - * creates a job - * @param queueName - * @param form - * @return jobId - * @throws JsonProcessingException - * @throws DotDataException + * Creates a job + * + * @param queueName The name of the queue + * @param form The request form with the job parameters + * @param user The user requesting to create the job + * @param request The request object + * @return jobId The ID of the created job + * @throws JsonProcessingException If there is an error processing the request form parameters + * @throws DotDataException If there is an error creating the job */ - String createJob(String queueName, JobParams form, HttpServletRequest request) - throws JsonProcessingException, DotDataException { + String createJob(final String queueName, final JobParams form, final User user, + final HttpServletRequest request) throws JsonProcessingException, DotDataException { + + final HashMap in; + + final var parameters = form.getParams(); + if (parameters != null) { + in = new HashMap<>(parameters); + } else { + in = new HashMap<>(); + } - final HashMap in = new HashMap<>(form.getParams()); handleUploadIfPresent(form, in, request); + return createJob(queueName, in, user, request); + } + + /** + * Creates a job with the given parameters + * + * @param queueName The name of the queue + * @param parameters The job parameters + * @param user The user requesting to create the job + * @param request The request object + * @return jobId The ID of the created job + * @throws DotDataException If there is an error creating the job + */ + String createJob(final String queueName, final Map parameters, final User user, + final HttpServletRequest request) throws DotDataException { + + final HashMap in; + if (parameters != null) { + in = new HashMap<>(parameters); + } else { + in = new HashMap<>(); + } + return createJob(queueName, in, user, request); + } + + /** + * Creates a job with the given parameters + * + * @param queueName The name of the queue + * @param parameters The job parameters + * @param user The user requesting to create the job + * @param request The request object + * @return jobId The ID of the created job + * @throws DotDataException If there is an error creating the job + */ + private String createJob(final String queueName, final HashMap parameters, + final User user, final HttpServletRequest request) throws DotDataException { + + // Get the current host and include it in the job params + final var currentHost = WebAPILocator.getHostWebAPI().getCurrentHostNoThrow(request); + parameters.put("siteName", currentHost.getHostname()); + parameters.put("siteIdentifier", currentHost.getIdentifier()); + + // Including the user id in the job params + parameters.put("userId", user.getUserId()); + try { - return jobQueueManagerAPI.createJob(queueName.toLowerCase(), Map.copyOf(in)); + return jobQueueManagerAPI.createJob(queueName, Map.copyOf(parameters)); } catch (JobProcessorNotFoundException e) { Logger.error(this.getClass(), "Error creating job", e); throw new DoesNotExistException(e.getMessage()); @@ -179,25 +238,41 @@ void watchJob(String jobId, Consumer watcher) { * @return JobPaginatedResult * @throws DotDataException if there's an error fetching the jobs */ + JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) { + try { + return jobQueueManagerAPI.getActiveJobs(queueName, page, pageSize); + } catch (JobQueueDataException e) { + Logger.error(this.getClass(), "Error fetching active jobs", e); + } + return JobPaginatedResult.builder().build(); + } + + /** + * Retrieves a list of jobs. + * + * @param page The page number + * @param pageSize The number of jobs per page + * @return A result object containing the list of jobs and pagination information. + */ JobPaginatedResult getJobs(int page, int pageSize) { try { return jobQueueManagerAPI.getJobs(page, pageSize); - } catch (DotDataException e){ + } catch (DotDataException e) { Logger.error(this.getClass(), "Error fetching jobs", e); } return JobPaginatedResult.builder().build(); } /** - * Retrieves a list of jobs. - * @param page The page number + * Retrieves a list of active jobs, meaning jobs that are currently being processed. + * + * @param page The page number * @param pageSize The number of jobs per page - * @return JobPaginatedResult - * @throws DotDataException if there's an error fetching the jobs + * @return A result object containing the list of active jobs and pagination information. */ - JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) { + JobPaginatedResult getActiveJobs(int page, int pageSize) { try { - return jobQueueManagerAPI.getActiveJobs(queueName.toLowerCase(), page, pageSize); + return jobQueueManagerAPI.getActiveJobs(page, pageSize); } catch (JobQueueDataException e) { Logger.error(this.getClass(), "Error fetching active jobs", e); } @@ -205,11 +280,43 @@ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) { } /** - * Retrieves a list of completed jobs for a specific queue within a date range. + * Retrieves a list of completed jobs + * + * @param page The page number + * @param pageSize The number of jobs per page + * @return A result object containing the list of completed jobs and pagination information. + */ + JobPaginatedResult getCompletedJobs(int page, int pageSize) { + try { + return jobQueueManagerAPI.getCompletedJobs(page, pageSize); + } catch (JobQueueDataException e) { + Logger.error(this.getClass(), "Error fetching completed jobs", e); + } + return JobPaginatedResult.builder().build(); + } + + /** + * Retrieves a list of canceled jobs + * + * @param page The page number + * @param pageSize The number of jobs per page + * @return A result object containing the list of canceled jobs and pagination information. + */ + JobPaginatedResult getCanceledJobs(int page, int pageSize) { + try { + return jobQueueManagerAPI.getCanceledJobs(page, pageSize); + } catch (JobQueueDataException e) { + Logger.error(this.getClass(), "Error fetching canceled jobs", e); + } + return JobPaginatedResult.builder().build(); + } + + /** + * Retrieves a list of failed jobs + * * @param page The page number * @param pageSize The number of jobs per page * @return A result object containing the list of completed jobs and pagination information. - * @throws JobQueueDataException if there's an error fetching the jobs */ JobPaginatedResult getFailedJobs(int page, int pageSize) { try { @@ -217,7 +324,7 @@ JobPaginatedResult getFailedJobs(int page, int pageSize) { } catch (JobQueueDataException e) { Logger.error(this.getClass(), "Error fetching failed jobs", e); } - return JobPaginatedResult.builder().build(); + return JobPaginatedResult.builder().build(); } /** diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java index e3816a015529..c298d24d50a6 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java @@ -26,7 +26,6 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; import org.glassfish.jersey.media.sse.EventOutput; import org.glassfish.jersey.media.sse.OutboundEvent; import org.glassfish.jersey.media.sse.SseFeature; @@ -53,118 +52,108 @@ public JobQueueResource(WebResource webResource, JobQueueHelper helper) { @Path("/{queueName}") @Consumes(MediaType.MULTIPART_FORM_DATA) @Produces(MediaType.APPLICATION_JSON) - public Response createJob( + public ResponseEntityView createJob( @Context HttpServletRequest request, @PathParam("queueName") String queueName, @BeanParam JobParams form) throws JsonProcessingException, DotDataException { - new WebResource.InitBuilder(webResource) + final var initDataObject = new WebResource.InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) .requestAndResponse(request, null) .rejectWhenNoUser(true) .init(); - final String jobId = helper.createJob(queueName, form, request); - return Response.ok(new ResponseEntityView<>(jobId)).build(); + final String jobId = helper.createJob(queueName, form, initDataObject.getUser(), request); + return new ResponseEntityView<>(jobId); + } + + @POST + @Path("/{queueName}") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public ResponseEntityView createJob( + @Context HttpServletRequest request, + @PathParam("queueName") String queueName, + Map parameters) throws DotDataException { + final var initDataObject = new WebResource.InitBuilder(webResource) + .requiredBackendUser(true) + .requiredFrontendUser(false) + .requestAndResponse(request, null) + .rejectWhenNoUser(true) + .init(); + final String jobId = helper.createJob( + queueName, parameters, initDataObject.getUser(), request); + return new ResponseEntityView<>(jobId); } @GET @Path("/queues") @Produces(MediaType.APPLICATION_JSON) public ResponseEntityView> getQueues(@Context HttpServletRequest request) { - new WebResource.InitBuilder(webResource) + new WebResource.InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) .requestAndResponse(request, null) .rejectWhenNoUser(true) .init(); - return new ResponseEntityView<>(helper.getQueueNames()); + return new ResponseEntityView<>(helper.getQueueNames()); } @GET @Path("/{jobId}/status") @Produces(MediaType.APPLICATION_JSON) - public ResponseEntityView getJobStatus(@Context HttpServletRequest request, @PathParam("jobId") String jobId) + public ResponseEntityView getJobStatus(@Context HttpServletRequest request, + @PathParam("jobId") String jobId) throws DotDataException { - new WebResource.InitBuilder(webResource) + new WebResource.InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) .requestAndResponse(request, null) .rejectWhenNoUser(true) .init(); - Job job = helper.getJob(jobId); - return new ResponseEntityView<>(job); + Job job = helper.getJob(jobId); + return new ResponseEntityView<>(job); } @POST @Path("/{jobId}/cancel") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.WILDCARD) - public ResponseEntityView cancelJob(@Context HttpServletRequest request, @PathParam("jobId") String jobId) - throws DotDataException { - new WebResource.InitBuilder(webResource) - .requiredBackendUser(true) - .requiredFrontendUser(false) - .requestAndResponse(request, null) - .rejectWhenNoUser(true) - .init(); - helper.cancelJob(jobId); - return new ResponseEntityView<>("Job cancelled successfully"); - } - - @GET - @Produces(MediaType.APPLICATION_JSON) - public ResponseEntityView listJobs(@Context HttpServletRequest request, - @QueryParam("page") @DefaultValue("1") int page, - @QueryParam("pageSize") @DefaultValue("20") int pageSize) { - new WebResource.InitBuilder(webResource) + public ResponseEntityView cancelJob(@Context HttpServletRequest request, + @PathParam("jobId") String jobId) throws DotDataException { + new WebResource.InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) .requestAndResponse(request, null) .rejectWhenNoUser(true) .init(); - final JobPaginatedResult result = helper.getJobs(page, pageSize); - return new ResponseEntityView<>(result); + helper.cancelJob(jobId); + return new ResponseEntityView<>("Cancellation request successfully sent to job " + jobId); } @GET @Path("/{queueName}/active") @Produces(MediaType.APPLICATION_JSON) - public ResponseEntityView activeJobs(@Context HttpServletRequest request, @PathParam("queueName") String queueName, - @QueryParam("page") @DefaultValue("1") int page, - @QueryParam("pageSize") @DefaultValue("20") int pageSize) { - new WebResource.InitBuilder(webResource) - .requiredBackendUser(true) - .requiredFrontendUser(false) - .requestAndResponse(request, null) - .rejectWhenNoUser(true) - .init(); - final JobPaginatedResult result = helper.getActiveJobs(queueName, page, pageSize); - return new ResponseEntityView<>(result); - } - - @GET - @Path("/failed") - @Produces(MediaType.APPLICATION_JSON) - public ResponseEntityView failedJobs(@Context HttpServletRequest request, + public ResponseEntityView activeJobs(@Context HttpServletRequest request, + @PathParam("queueName") String queueName, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { - new WebResource.InitBuilder(webResource) + new WebResource.InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) .requestAndResponse(request, null) .rejectWhenNoUser(true) .init(); - final JobPaginatedResult result = helper.getFailedJobs(page, pageSize); - return new ResponseEntityView<>(result); + final JobPaginatedResult result = helper.getActiveJobs(queueName, page, pageSize); + return new ResponseEntityView<>(result); } - @GET @Path("/{jobId}/monitor") @Produces(SseFeature.SERVER_SENT_EVENTS) - public EventOutput monitorJob(@Context HttpServletRequest request, @PathParam("jobId") String jobId) { + public EventOutput monitorJob(@Context HttpServletRequest request, + @PathParam("jobId") String jobId) { new WebResource.InitBuilder(webResource) .requiredBackendUser(true) @@ -194,7 +183,7 @@ public EventOutput monitorJob(@Context HttpServletRequest request, @PathParam("j } catch (IOException e) { Logger.error(this, "Error closing SSE connection", e); } - } else { + } else { // Callback for watching job updates and sending them to the client helper.watchJob(job.id(), watched -> { if (!eventOutput.isClosed()) { @@ -214,4 +203,84 @@ public EventOutput monitorJob(@Context HttpServletRequest request, @PathParam("j } return eventOutput; } + + @GET + @Produces(MediaType.APPLICATION_JSON) + public ResponseEntityView listJobs(@Context HttpServletRequest request, + @QueryParam("page") @DefaultValue("1") int page, + @QueryParam("pageSize") @DefaultValue("20") int pageSize) { + new WebResource.InitBuilder(webResource) + .requiredBackendUser(true) + .requiredFrontendUser(false) + .requestAndResponse(request, null) + .rejectWhenNoUser(true) + .init(); + final JobPaginatedResult result = helper.getJobs(page, pageSize); + return new ResponseEntityView<>(result); + } + + @GET + @Path("/active") + @Produces(MediaType.APPLICATION_JSON) + public ResponseEntityView activeJobs(@Context HttpServletRequest request, + @QueryParam("page") @DefaultValue("1") int page, + @QueryParam("pageSize") @DefaultValue("20") int pageSize) { + new WebResource.InitBuilder(webResource) + .requiredBackendUser(true) + .requiredFrontendUser(false) + .requestAndResponse(request, null) + .rejectWhenNoUser(true) + .init(); + final JobPaginatedResult result = helper.getActiveJobs(page, pageSize); + return new ResponseEntityView<>(result); + } + + @GET + @Path("/completed") + @Produces(MediaType.APPLICATION_JSON) + public ResponseEntityView completedJobs(@Context HttpServletRequest request, + @QueryParam("page") @DefaultValue("1") int page, + @QueryParam("pageSize") @DefaultValue("20") int pageSize) { + new WebResource.InitBuilder(webResource) + .requiredBackendUser(true) + .requiredFrontendUser(false) + .requestAndResponse(request, null) + .rejectWhenNoUser(true) + .init(); + final JobPaginatedResult result = helper.getCompletedJobs(page, pageSize); + return new ResponseEntityView<>(result); + } + + @GET + @Path("/canceled") + @Produces(MediaType.APPLICATION_JSON) + public ResponseEntityView canceledJobs(@Context HttpServletRequest request, + @QueryParam("page") @DefaultValue("1") int page, + @QueryParam("pageSize") @DefaultValue("20") int pageSize) { + new WebResource.InitBuilder(webResource) + .requiredBackendUser(true) + .requiredFrontendUser(false) + .requestAndResponse(request, null) + .rejectWhenNoUser(true) + .init(); + final JobPaginatedResult result = helper.getCanceledJobs(page, pageSize); + return new ResponseEntityView<>(result); + } + + @GET + @Path("/failed") + @Produces(MediaType.APPLICATION_JSON) + public ResponseEntityView failedJobs(@Context HttpServletRequest request, + @QueryParam("page") @DefaultValue("1") int page, + @QueryParam("pageSize") @DefaultValue("20") int pageSize) { + new WebResource.InitBuilder(webResource) + .requiredBackendUser(true) + .requiredFrontendUser(false) + .requestAndResponse(request, null) + .rejectWhenNoUser(true) + .init(); + final JobPaginatedResult result = helper.getFailedJobs(page, pageSize); + return new ResponseEntityView<>(result); + } + } \ No newline at end of file diff --git a/dotCMS/src/main/java/com/dotmarketing/util/ImportUtil.java b/dotCMS/src/main/java/com/dotmarketing/util/ImportUtil.java index cd3ea121d7c7..9f9c41203d89 100644 --- a/dotCMS/src/main/java/com/dotmarketing/util/ImportUtil.java +++ b/dotCMS/src/main/java/com/dotmarketing/util/ImportUtil.java @@ -53,20 +53,27 @@ import com.liferay.portal.language.LanguageUtil; import com.liferay.portal.model.User; import com.liferay.util.StringPool; - import java.io.File; -import java.io.IOException; import java.io.Reader; import java.net.URL; import java.sql.Timestamp; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.function.Function; +import java.util.function.LongConsumer; import java.util.stream.Collectors; import javax.servlet.http.HttpServletRequest; - import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -109,6 +116,78 @@ public class ImportUtil { private static final SimpleDateFormat DATE_FIELD_FORMAT = new SimpleDateFormat("yyyyMMdd"); + /** + * Imports the data contained in a CSV file into dotCMS. The data can be + * either new or an update for existing content. The {@code preview} + * parameter determines the behavior of this method: + *
    + *
  • {@code preview == true}: This is the ideal approach. The data + * contained in the CSV file is previously analyzed and evaluated + * BEFORE actually committing any changes to existing contentlets or + * adding new ones. This way, users can perform the appropriate corrections + * (if needed) before submitting the new contents.
  • + *
  • {@code preview == false}: Setting the parameter this way will make + * the system try to import the contents right away. The method will also + * return a summary with the status of the operation.
  • + *
+ * + * @param importId + * - The ID of this data import. + * @param currentSiteId + * - The ID of the Site where the content will be added/updated. + * @param contentTypeInode + * - The Inode of the Content Type that the content is associated + * to. + * @param keyfields + * - The Inodes of the fields used to associated existing dotCMS + * contentlets with the information in this file. Can be empty. + * @param preview + * - Set to {@code true} if an analysis and evaluation of the + * imported data will be generated before actually + * importing the data. Otherwise, set to {@code false}. + * @param isMultilingual + * - If set to {@code true}, the CSV file will import contents in + * more than one language. Otherwise, set to {@code false}. + * @param user + * - The {@link User} performing this action. + * @param language + * - The language ID for the contents. If the ID equals -1, the + * columns for language code and country code will be used to + * infer the language ID. + * @param csvHeaders + * - The headers for each column in the CSV file. + * @param csvreader + * - The actual data contained in the CSV file. + * @param languageCodeHeaderColumn + * - The column name containing the language code. + * @param countryCodeHeaderColumn + * - The column name containing the country code. + * @param reader + * - The character streams reader. + * @param wfActionId + * - The workflow Action Id to execute on the import + * @param request + * - The request object. + * @return The resulting analysis performed on the CSV file. This provides + * information regarding inconsistencies, errors, warnings and/or + * precautions to the user. + * @throws DotRuntimeException + * An error occurred when analyzing the CSV file. + * @throws DotDataException + * An error occurred when analyzing the CSV file. + */ + public static HashMap> importFile( + Long importId, String currentSiteId, String contentTypeInode, String[] keyfields, + boolean preview, boolean isMultilingual, User user, long language, + String[] csvHeaders, CsvReader csvreader, int languageCodeHeaderColumn, + int countryCodeHeaderColumn, Reader reader, String wfActionId, + final HttpServletRequest request) throws DotRuntimeException, DotDataException { + + return importFile(importId, currentSiteId, contentTypeInode, keyfields, preview, + isMultilingual, user, language, csvHeaders, csvreader, languageCodeHeaderColumn, + countryCodeHeaderColumn, wfActionId, request, null); + } + /** * Imports the data contained in a CSV file into dotCMS. The data can be * either new or an update for existing content. The {@code preview} @@ -155,10 +234,12 @@ public class ImportUtil { * - The column name containing the language code. * @param countryCodeHeaderColumn * - The column name containing the country code. - * @param reader - * - The character streams reader. * @param wfActionId * - The workflow Action Id to execute on the import + * @param request + * - The request object. + * @param progressCallback + * - A callback function to report progress. * @return The resulting analysis performed on the CSV file. This provides * information regarding inconsistencies, errors, warnings and/or * precautions to the user. @@ -167,7 +248,11 @@ public class ImportUtil { * @throws DotDataException * An error occurred when analyzing the CSV file. */ - public static HashMap> importFile(Long importId, String currentSiteId, String contentTypeInode, String[] keyfields, boolean preview, boolean isMultilingual, User user, long language, String[] csvHeaders, CsvReader csvreader, int languageCodeHeaderColumn, int countryCodeHeaderColumn, Reader reader, String wfActionId, final HttpServletRequest request) + public static HashMap> importFile(Long importId, String currentSiteId, + String contentTypeInode, String[] keyfields, boolean preview, boolean isMultilingual, + User user, long language, String[] csvHeaders, CsvReader csvreader, + int languageCodeHeaderColumn, int countryCodeHeaderColumn, String wfActionId, + final HttpServletRequest request, final LongConsumer progressCallback) throws DotRuntimeException, DotDataException { HashMap> results = new HashMap<>(); @@ -293,6 +378,11 @@ public static HashMap> importFile(Long importId, String cur errors++; Logger.warn(ImportUtil.class, "Error line: " + lines + " (" + csvreader.getRawRecord() + "). Line Ignored."); + } finally { + // Progress callback + if (progressCallback != null) { + progressCallback.accept(lines); + } } } @@ -331,14 +421,6 @@ public static HashMap> importFile(Long importId, String cur } catch (final Exception e) { Logger.error(ImportContentletsAction.class, String.format("An error occurred when parsing CSV file in " + "line #%s: %s", lineNumber, e.getMessage()), e); - } finally { - if (reader != null) { - try { - reader.close(); - } catch (IOException e) { - // Reader could not be closed. Continue - } - } } final String action = preview ? "Content preview" : "Content import"; String statusMsg = String.format("%s has finished, %d lines were read correctly.", action, lines); diff --git a/dotcms-integration/src/test/java/com/dotcms/Junit5Suite1.java b/dotcms-integration/src/test/java/com/dotcms/Junit5Suite1.java index a0003e96d56d..4055eca82377 100644 --- a/dotcms-integration/src/test/java/com/dotcms/Junit5Suite1.java +++ b/dotcms-integration/src/test/java/com/dotcms/Junit5Suite1.java @@ -2,6 +2,7 @@ import com.dotcms.jobs.business.api.JobQueueManagerAPICDITest; import com.dotcms.jobs.business.api.JobQueueManagerAPIIntegrationTest; +import com.dotcms.jobs.business.processor.impl.ImportContentletsProcessorIntegrationTest; import com.dotcms.jobs.business.queue.PostgresJobQueueIntegrationTest; import com.dotcms.rest.api.v1.job.JobQueueHelperIntegrationTest; import org.junit.platform.suite.api.SelectClasses; @@ -12,7 +13,8 @@ JobQueueManagerAPICDITest.class, PostgresJobQueueIntegrationTest.class, JobQueueManagerAPIIntegrationTest.class, - JobQueueHelperIntegrationTest.class + JobQueueHelperIntegrationTest.class, + ImportContentletsProcessorIntegrationTest.class }) public class Junit5Suite1 { diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java index 1ea8589cc6c4..8525440886d3 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java @@ -28,6 +28,7 @@ import com.dotcms.jobs.business.error.ErrorDetail; import com.dotcms.jobs.business.error.JobCancellationException; import com.dotcms.jobs.business.error.JobProcessingException; +import com.dotcms.jobs.business.error.RetryPolicyProcessor; import com.dotcms.jobs.business.error.RetryStrategy; import com.dotcms.jobs.business.job.Job; import com.dotcms.jobs.business.job.JobPaginatedResult; @@ -144,6 +145,8 @@ public boolean awaitProcessingCompleted(long timeout, TimeUnit unit) private EventProducer eventProducer; + private RetryPolicyProcessor retryPolicyProcessor; + /** * Factory to create mock JobProcessor instances for testing. * This is how we instruct the JobQueueManagerAPI to use our mock processors. @@ -173,10 +176,11 @@ public void setUp() { mockRetryStrategy = mock(RetryStrategy.class); mockCircuitBreaker = mock(CircuitBreaker.class); eventProducer = mock(EventProducer.class); + retryPolicyProcessor = mock(RetryPolicyProcessor.class); jobQueueManagerAPI = newJobQueueManagerAPI( mockJobQueue, mockCircuitBreaker, mockRetryStrategy, eventProducer, jobProcessorFactory, - 1, 10 + retryPolicyProcessor, 1, 10 ); jobQueueManagerAPI.registerProcessor("testQueue", JobProcessor.class); @@ -995,7 +999,7 @@ public void test_CircuitBreaker_Opens() throws Exception { // Create JobQueueManagerAPIImpl with the real CircuitBreaker JobQueueManagerAPI jobQueueManagerAPI = newJobQueueManagerAPI( mockJobQueue, circuitBreaker, mockRetryStrategy, eventProducer, jobProcessorFactory, - 1, 1000 + retryPolicyProcessor, 1, 1000 ); jobQueueManagerAPI.registerProcessor("testQueue", JobProcessor.class); @@ -1080,7 +1084,7 @@ public void test_CircuitBreaker_Closes() throws Exception { // Create JobQueueManagerAPIImpl with the real CircuitBreaker JobQueueManagerAPI jobQueueManagerAPI = newJobQueueManagerAPI( mockJobQueue, circuitBreaker, mockRetryStrategy, eventProducer, jobProcessorFactory, - 1, 1000 + retryPolicyProcessor, 1, 1000 ); jobQueueManagerAPI.registerProcessor("testQueue", JobProcessor.class); @@ -1143,7 +1147,7 @@ public void test_CircuitBreaker_Reset() throws Exception { // Create JobQueueManagerAPIImpl with the real CircuitBreaker JobQueueManagerAPI jobQueueManagerAPI = newJobQueueManagerAPI( mockJobQueue, circuitBreaker, mockRetryStrategy, eventProducer, jobProcessorFactory, - 1, 1000 + retryPolicyProcessor, 1, 1000 ); jobQueueManagerAPI.registerProcessor("testQueue", JobProcessor.class); @@ -1327,13 +1331,15 @@ private JobQueueManagerAPI newJobQueueManagerAPI(JobQueue jobQueue, RetryStrategy retryStrategy, EventProducer eventProducer, JobProcessorFactory jobProcessorFactory, + RetryPolicyProcessor retryPolicyProcessor, int threadPoolSize, int pollJobUpdatesIntervalMilliseconds) { final var realTimeJobMonitor = new RealTimeJobMonitor(); return new JobQueueManagerAPIImpl( jobQueue, new JobQueueConfig(threadPoolSize, pollJobUpdatesIntervalMilliseconds), - circuitBreaker, retryStrategy, realTimeJobMonitor, eventProducer, jobProcessorFactory + circuitBreaker, retryStrategy, realTimeJobMonitor, eventProducer, + jobProcessorFactory, retryPolicyProcessor ); } diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessorIntegrationTest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessorIntegrationTest.java new file mode 100644 index 000000000000..e2889dc18f46 --- /dev/null +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessorIntegrationTest.java @@ -0,0 +1,282 @@ +package com.dotcms.jobs.business.processor.impl; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import com.dotcms.contenttype.model.type.ContentType; +import com.dotcms.datagen.TestDataUtils; +import com.dotcms.jobs.business.job.Job; +import com.dotcms.jobs.business.job.JobState; +import com.dotcms.jobs.business.processor.DefaultProgressTracker; +import com.dotcms.jobs.business.util.JobUtil; +import com.dotcms.rest.api.v1.temp.DotTempFile; +import com.dotcms.rest.api.v1.temp.TempFileAPI; +import com.dotcms.util.IntegrationTestInitService; +import com.dotmarketing.beans.Host; +import com.dotmarketing.business.APILocator; +import com.dotmarketing.exception.DotSecurityException; +import com.dotmarketing.portlets.contentlet.model.Contentlet; +import com.liferay.portal.model.User; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.servlet.http.HttpServletRequest; +import org.jboss.weld.junit5.EnableWeld; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +/** + * Integration tests for the {@link ImportContentletsProcessor} class. These tests verify the + * functionality of content import operations in a real database environment. The tests cover both + * preview and publish modes, testing the complete workflow of content import including content type + * creation, CSV file processing, and content verification. + * + *

The test suite creates temporary content types and files for testing, + * and includes cleanup operations to maintain database integrity. + */ +@EnableWeld +public class ImportContentletsProcessorIntegrationTest extends com.dotcms.Junit5WeldBaseTest { + + private static Host defaultSite; + private static User systemUser; + private static HttpServletRequest request; + + /** + * Sets up the test environment before all tests are run. This method: + *

    + *
  • Initializes the dotCMS test environment
  • + *
  • Retrieves the system user for test operations
  • + *
  • Sets up the default site
  • + *
  • Creates a mock HTTP request for the import process
  • + *
+ * + * @throws Exception if there's an error during setup + */ + @BeforeAll + static void setUp() throws Exception { + + // Initialize the test environment + IntegrationTestInitService.getInstance().init(); + + // Get system user + systemUser = APILocator.getUserAPI().getSystemUser(); + + // Get the default site + defaultSite = APILocator.getHostAPI().findDefaultHost(systemUser, false); + + // Create a mock request + request = JobUtil.generateMockRequest(systemUser, defaultSite.getHostname()); + } + + /** + * Tests the preview mode of the content import process. This test: + *
    + *
  • Creates a test content type
  • + *
  • Generates a test CSV file with sample content
  • + *
  • Processes the import in preview mode
  • + *
  • Verifies the preview results and metadata
  • + *
  • Verifies there is no content creation in the database
  • + *
+ * + *

The test ensures that preview mode properly validates the content + * without actually creating it in the system. + * + * @throws Exception if there's an error during the test execution + */ + @Test + void test_process_preview() throws Exception { + + ContentType testContentType = null; + + try { + // Initialize processor + final var processor = new ImportContentletsProcessor(); + + // Create test content type + testContentType = createTestContentType(); + + // Create test CSV file + File csvFile = createTestCsvFile(); + + // Create test job + final var testJob = createTestJob( + csvFile, "preview", testContentType.id(), "b9d89c80-3d88-4311-8365-187323c96436" + ); + + // Process the job in preview mode + processor.process(testJob); + + // Verify preview results + Map metadata = processor.getResultMetadata(testJob); + assertNotNull(metadata, "Preview metadata should not be null"); + assertNotNull(metadata.get("errors"), "Preview metadata errors should not be null"); + assertNotNull(metadata.get("results"), "Preview metadata results should not be null"); + assertEquals(0, ((ArrayList) metadata.get("errors")).size(), + "Preview metadata errors should be empty"); + + // Verify no content was created + final var importedContent = findImportedContent(testContentType.id()); + assertNotNull(importedContent, "Imported content should not be null"); + assertEquals(0, importedContent.size(), "Imported content should have no items"); + + } finally { + if (testContentType != null) { + // Clean up test content type + APILocator.getContentTypeAPI(systemUser).delete(testContentType); + } + } + } + + /** + * Tests the publish mode of the content import process. This test: + *

    + *
  • Creates a test content type
  • + *
  • Generates a test CSV file with sample content
  • + *
  • Processes the import in publish mode
  • + *
  • Verifies the actual content creation in the database
  • + *
+ * + *

The test confirms that content is properly created in the system + * and matches the data provided in the CSV file. + * + * @throws Exception if there's an error during the test execution + */ + @Test + void test_process_publish() throws Exception { + + ContentType testContentType = null; + + try { + // Initialize processor + final var processor = new ImportContentletsProcessor(); + + // Create test content type + testContentType = createTestContentType(); + + // Create test CSV file + File csvFile = createTestCsvFile(); + + // Create test job + final var testJob = createTestJob( + csvFile, "publish", testContentType.id(), "b9d89c80-3d88-4311-8365-187323c96436" + ); + + // Process the job in preview mode + processor.process(testJob); + + // Verify preview results + Map metadata = processor.getResultMetadata(testJob); + assertNotNull(metadata, "Publish metadata should not be null"); + assertNotNull(metadata.get("errors"), "Publish metadata errors should not be null"); + assertNotNull(metadata.get("results"), "Publish metadata results should not be null"); + assertEquals(0, ((ArrayList) metadata.get("errors")).size(), + "Publish metadata errors should be empty"); + + // Verify the content was actually created + final var importedContent = findImportedContent(testContentType.id()); + assertNotNull(importedContent, "Imported content should not be null"); + assertEquals(2, importedContent.size(), "Imported content should have 2 items"); + + } finally { + if (testContentType != null) { + // Clean up test content type + APILocator.getContentTypeAPI(systemUser).delete(testContentType); + } + } + } + + /** + * Creates a test content type for import operations. The content type is designed to support + * rich text content and is suitable for testing import functionality. + * + * @return A newly created {@link ContentType} instance + */ + private ContentType createTestContentType() { + return TestDataUtils.getRichTextLikeContentType(); + } + + /** + * Creates a test job for the import process. + * + * @param csvFile The CSV file containing the content to be imported + * @param cmd The command to execute ('preview' or 'publish') + * @param contentTypeId The ID of the content type for the imported content + * @param workflowActionId The ID of the workflow action to be applied + * @return A configured {@link Job} instance ready for processing + * @throws IOException if there's an error reading the CSV file + * @throws DotSecurityException if there's a security violation during job creation + */ + private Job createTestJob(final File csvFile, final String cmd, final String contentTypeId, + final String workflowActionId) throws IOException, DotSecurityException { + + final Map jobParameters = new HashMap<>(); + + // Setup basic job parameters + jobParameters.put("cmd", cmd); + jobParameters.put("userId", systemUser.getUserId()); + jobParameters.put("siteName", defaultSite.getHostname()); + jobParameters.put("siteIdentifier", defaultSite.getIdentifier()); + jobParameters.put("contentType", contentTypeId); + jobParameters.put("workflowActionId", workflowActionId); + jobParameters.put("language", "1"); + + final TempFileAPI tempFileAPI = APILocator.getTempFileAPI(); + try (final var fileInputStream = new FileInputStream(csvFile)) { + + final DotTempFile tempFile = tempFileAPI.createTempFile( + csvFile.getName(), request, fileInputStream + ); + + jobParameters.put("tempFileId", tempFile.id); + jobParameters.put("requestFingerPrint", tempFileAPI.getRequestFingerprint(request)); + } + + return Job.builder() + .id("test-job-id") + .queueName("Test Job") + .state(JobState.RUNNING) + .parameters(jobParameters) + .progressTracker(new DefaultProgressTracker()) + .build(); + } + + /** + * Creates a test CSV file with sample content. The file includes a header row and two content + * rows with title and body fields. + * + * @return A temporary {@link File} containing the CSV data + * @throws IOException if there's an error creating or writing to the file + */ + private File createTestCsvFile() throws IOException { + + // Create a CSV file that matches your content type structure + StringBuilder csv = new StringBuilder(); + csv.append("title,body\n"); + csv.append("Test Title 1,Test Body 1\n"); + csv.append("Test Title 2,Test Body 2\n"); + + File csvFile = File.createTempFile("test", ".csv"); + Files.write(csvFile.toPath(), csv.toString().getBytes()); + + return csvFile; + } + + /** + * Retrieves the list of content that was imported during the test. + * + * @param contentTypeId The ID of the content type to search for + * @return A list of {@link Contentlet} objects that were imported + * @throws Exception if there's an error retrieving the content + */ + private List findImportedContent(final String contentTypeId) throws Exception { + return APILocator.getContentletAPI().findByStructure( + contentTypeId, systemUser, false, -1, 0 + ); + } + +} \ No newline at end of file diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/queue/PostgresJobQueueIntegrationTest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/queue/PostgresJobQueueIntegrationTest.java index 890354fd0285..62d180e3088f 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/queue/PostgresJobQueueIntegrationTest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/queue/PostgresJobQueueIntegrationTest.java @@ -80,12 +80,12 @@ void test_createJob_and_getJob() throws JobQueueException { } /** - * Method to test: getActiveJobs in PostgresJobQueue + * Method to test: getActiveJobs in PostgresJobQueue for queue * Given Scenario: Multiple active jobs are created * ExpectedResult: All active jobs are retrieved correctly */ @Test - void test_getActiveJobs() throws JobQueueException { + void test_getActiveJobsForQueue() throws JobQueueException { String queueName = "testQueue"; for (int i = 0; i < 5; i++) { @@ -98,12 +98,29 @@ void test_getActiveJobs() throws JobQueueException { } /** - * Method to test: getCompletedJobs in PostgresJobQueue + * Method to test: getActiveJobs in PostgresJobQueue Given Scenario: Multiple active jobs are + * created ExpectedResult: All active jobs are retrieved correctly + */ + @Test + void test_getActiveJobs() throws JobQueueException { + + String queueName = "testQueue"; + for (int i = 0; i < 5; i++) { + jobQueue.createJob(queueName, new HashMap<>()); + } + + JobPaginatedResult result = jobQueue.getActiveJobs(1, 10); + assertEquals(5, result.jobs().size()); + assertEquals(5, result.total()); + } + + /** + * Method to test: getCompletedJobs in PostgresJobQueue for queue * Given Scenario: Multiple jobs are created and completed * ExpectedResult: All completed jobs within the given time range are retrieved */ @Test - void testGetCompletedJobs() throws JobQueueException { + void testGetCompletedJobsForQueue() throws JobQueueException { String queueName = "testQueue"; LocalDateTime startDate = LocalDateTime.now().minusDays(1); @@ -123,6 +140,54 @@ void testGetCompletedJobs() throws JobQueueException { result.jobs().forEach(job -> assertEquals(JobState.COMPLETED, job.state())); } + /** + * Method to test: getCompletedJobs in PostgresJobQueue + * Given Scenario: Multiple jobs are created and completed + * ExpectedResult: All completed jobs are retrieved + */ + @Test + void testGetCompletedJobs() throws JobQueueException { + + String queueName = "testQueue"; + + // Create and complete some jobs + for (int i = 0; i < 3; i++) { + String jobId = jobQueue.createJob(queueName, new HashMap<>()); + Job job = jobQueue.getJob(jobId); + Job completedJob = job.markAsCompleted(null); + jobQueue.updateJobStatus(completedJob); + } + + JobPaginatedResult result = jobQueue.getCompletedJobs(1, 10); + assertEquals(3, result.jobs().size()); + assertEquals(3, result.total()); + result.jobs().forEach(job -> assertEquals(JobState.COMPLETED, job.state())); + } + + /** + * Method to test: getCanceledJobs in PostgresJobQueue + * Given Scenario: Multiple jobs are created and canceled + * ExpectedResult: All canceled jobs are retrieved + */ + @Test + void testGetCanceledJobs() throws JobQueueException { + + String queueName = "testQueue"; + + // Create and complete some jobs + for (int i = 0; i < 3; i++) { + String jobId = jobQueue.createJob(queueName, new HashMap<>()); + Job job = jobQueue.getJob(jobId); + Job completedJob = job.markAsCanceled(null); + jobQueue.updateJobStatus(completedJob); + } + + JobPaginatedResult result = jobQueue.getCanceledJobs(1, 10); + assertEquals(3, result.jobs().size()); + assertEquals(3, result.total()); + result.jobs().forEach(job -> assertEquals(JobState.CANCELED, job.state())); + } + /** * Method to test: getFailedJobs in PostgresJobQueue * Given Scenario: Multiple jobs are created and set to failed state diff --git a/dotcms-integration/src/test/java/com/dotcms/rest/api/v1/job/JobQueueHelperIntegrationTest.java b/dotcms-integration/src/test/java/com/dotcms/rest/api/v1/job/JobQueueHelperIntegrationTest.java index 06d6a5856165..144f22a8822b 100644 --- a/dotcms-integration/src/test/java/com/dotcms/rest/api/v1/job/JobQueueHelperIntegrationTest.java +++ b/dotcms-integration/src/test/java/com/dotcms/rest/api/v1/job/JobQueueHelperIntegrationTest.java @@ -36,19 +36,71 @@ public class JobQueueHelperIntegrationTest extends com.dotcms.Junit5WeldBaseTes @Inject JobQueueHelper jobQueueHelper; + /** + * Test with no parameters in the JobParams creating a job + * Given scenario: create a job with no parameters and valid queue name + * Expected result: the job is created + * + * @throws DotDataException if there's an error creating the job + */ @Test - void testEmptyParams(){ - assertThrows(IllegalArgumentException.class, () -> { - jobQueueHelper.createJob("any", new JobParams(), mock(HttpServletRequest.class)); - }); + void testEmptyParams() throws DotDataException, JsonProcessingException { + + jobQueueHelper.registerProcessor("demoQueue", DemoJobProcessor.class); + + final var jobParams = new JobParams(); + final var user = mock(User.class); + final var request = mock(HttpServletRequest.class); + + when(user.getUserId()).thenReturn("dotcms.org.1"); + + final String jobId = jobQueueHelper.createJob( + "demoQueue", jobParams, user, request + ); + + Assertions.assertNotNull(jobId); + final Job job = jobQueueHelper.getJob(jobId); + Assertions.assertNotNull(job); + Assertions.assertEquals(jobId, job.id()); + } + + /** + * Test with null parameters creating a job + * Given scenario: create a job with null parameters and valid queue name + * Expected result: the job is created + * + * @throws DotDataException if there's an error creating the job + */ + @Test + void testCreateJobWithNoParameters() throws DotDataException { + + jobQueueHelper.registerProcessor("demoQueue", DemoJobProcessor.class); + + final var user = mock(User.class); + when(user.getUserId()).thenReturn("dotcms.org.1"); + + final String jobId = jobQueueHelper.createJob( + "demoQueue", (Map) null, user, mock(HttpServletRequest.class) + ); + + Assertions.assertNotNull(jobId); + final Job job = jobQueueHelper.getJob(jobId); + Assertions.assertNotNull(job); + Assertions.assertEquals(jobId, job.id()); } @Test void testWithValidParamsButInvalidQueueName(){ final JobParams jobParams = new JobParams(); jobParams.setJsonParams("{}"); + + final var user = mock(User.class); + when(user.getUserId()).thenReturn("dotcms.org.1"); + assertThrows(DoesNotExistException.class, () -> { - jobQueueHelper.createJob("nonExisting", jobParams, mock(HttpServletRequest.class)); + jobQueueHelper.createJob( + "nonExisting", jobParams, user, mock(HttpServletRequest.class) + ); }); } @@ -83,14 +135,18 @@ void testWithValidParamsAndQueueName() throws DotDataException, JsonProcessingEx final JobParams jobParams = new JobParams(); jobParams.setJsonParams("{}"); - final String jobId = jobQueueHelper.createJob("demoQueue", jobParams, - mock(HttpServletRequest.class)); + final var user = mock(User.class); + when(user.getUserId()).thenReturn("dotcms.org.1"); + + final String jobId = jobQueueHelper.createJob( + "demoQueue", jobParams, user, mock(HttpServletRequest.class) + ); Assertions.assertNotNull(jobId); final Job job = jobQueueHelper.getJob(jobId); Assertions.assertNotNull(job); Assertions.assertEquals(jobId, job.id()); - Assertions.assertTrue(jobQueueHelper.getQueueNames().contains("demoQueue".toLowerCase())); + Assertions.assertTrue(jobQueueHelper.getQueueNames().contains("demoQueue")); } /** @@ -104,8 +160,13 @@ void testIsWatchable() throws DotDataException, JsonProcessingException { jobQueueHelper.registerProcessor("testQueue", DemoJobProcessor.class); final JobParams jobParams = new JobParams(); jobParams.setJsonParams("{}"); - final String jobId = jobQueueHelper.createJob("testQueue", jobParams, - mock(HttpServletRequest.class)); + + final var user = mock(User.class); + when(user.getUserId()).thenReturn("dotcms.org.1"); + + final String jobId = jobQueueHelper.createJob( + "testQueue", jobParams, user, mock(HttpServletRequest.class) + ); Assertions.assertNotNull(jobId); final Job job = jobQueueHelper.getJob(jobId); assertFalse(jobQueueHelper.isNotWatchable(job)); @@ -123,8 +184,13 @@ void testGetStatusInfo() throws DotDataException, JsonProcessingException { jobQueueHelper.registerProcessor("testQueue", DemoJobProcessor.class); final JobParams jobParams = new JobParams(); jobParams.setJsonParams("{}"); - final String jobId = jobQueueHelper.createJob("testQueue", jobParams, - mock(HttpServletRequest.class)); + + final var user = mock(User.class); + when(user.getUserId()).thenReturn("dotcms.org.1"); + + final String jobId = jobQueueHelper.createJob( + "testQueue", jobParams, user, mock(HttpServletRequest.class) + ); Assertions.assertNotNull(jobId); final Job job = jobQueueHelper.getJob(jobId); final Map info = jobQueueHelper.getJobStatusInfo(job); diff --git a/dotcms-postman/src/main/resources/postman/JobQueueResourceAPITests.postman_collection.json b/dotcms-postman/src/main/resources/postman/JobQueueResourceAPITests.postman_collection.json index f7aa1934dadc..cbc8bc3971d6 100644 --- a/dotcms-postman/src/main/resources/postman/JobQueueResourceAPITests.postman_collection.json +++ b/dotcms-postman/src/main/resources/postman/JobQueueResourceAPITests.postman_collection.json @@ -1,11 +1,10 @@ { "info": { - "_postman_id": "a12c5acf-e63e-4357-9642-07ca2795b509", + "_postman_id": "cc2de2d8-aecf-4063-a97c-089965ba573d", "name": "JobQueueResource API Tests", "description": "Postman collection for testing the JobQueueResource API endpoints.", "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json", - "_exporter_id": "10041132", - "_collection_link": "https://speeding-firefly-555540.postman.co/workspace/blank~a8ffdb2b-2b56-46fa-ae3e-f4b3b0f8204a/collection/10041132-a12c5acf-e63e-4357-9642-07ca2795b509?action=share&source=collection_link&creator=10041132" + "_exporter_id": "31066048" }, "item": [ { @@ -79,7 +78,7 @@ "});", "", "pm.test(\"Response has a demo queue in it\", function () {", - " pm.expect(jsonData.entity).to.include.members(['demo', 'fail']);", + " pm.expect(jsonData.entity).to.include.members(['demo', 'failSuccess']);", "});" ], "type": "text/javascript", @@ -107,63 +106,7 @@ "response": [] }, { - "name": "Create Job No Params Expect Bad Request", - "event": [ - { - "listen": "test", - "script": { - "exec": [ - "pm.test(\"Status code is 400\", function () {", - " pm.response.to.have.status(400);", - "});", - "" - ], - "type": "text/javascript", - "packages": {} - } - } - ], - "request": { - "method": "POST", - "header": [ - { - "key": "Content-Type", - "value": "multipart/form-data" - } - ], - "body": { - "mode": "formdata", - "formdata": [ - { - "key": "file", - "type": "file", - "src": [] - }, - { - "key": "params", - "value": "", - "type": "text" - } - ] - }, - "url": { - "raw": "{{baseUrl}}/api/v1/jobs/{{queueName}}", - "host": [ - "{{baseUrl}}" - ], - "path": [ - "api", - "v1", - "jobs", - "{{queueName}}" - ] - }, - "description": "Creates a new job in the specified queue." - }, - "response": [] - }, - { - "name": "Create Job Expect Success", + "name": "Create Multipart Job Expect Success", "event": [ { "listen": "test", @@ -176,9 +119,7 @@ "var jsonData = pm.response.json();", "pm.expect(jsonData.entity).to.be.a('String');", "// Save jobId to environment variable", - "pm.collectionVariables.set(\"jobId\", jsonData.entity);", - "let jId = pm.collectionVariables.get(\"jobId\");", - "console.log(jId);" + "pm.collectionVariables.set(\"jobId\", jsonData.entity);" ], "type": "text/javascript", "packages": {} @@ -220,7 +161,7 @@ "{{queueName}}" ] }, - "description": "Creates a new job in the specified queue." + "description": "Creates a new job with a multipart content type in the specified queue." }, "response": [] }, @@ -376,123 +317,7 @@ "response": [] }, { - "name": "Cancel Job", - "event": [ - { - "listen": "test", - "script": { - "exec": [ - "pm.test(\"Status code is 200\", function () {", - " pm.response.to.have.status(200);", - "});", - "", - "// Check if cancellation message is returned", - "var jsonData = pm.response.json();", - "pm.test(\"Job cancelled successfully\", function () {", - " pm.expect(jsonData.entity).to.equal('Job cancelled successfully');", - "});", - "", - "var jobId = pm.collectionVariables.get(\"jobId\");", - "console.log(\" At the time this request was sent \" + jobId);", - "pm.collectionVariables.set(\"cancelledJobId\",jobId);" - ], - "type": "text/javascript", - "packages": {} - } - }, - { - "listen": "prerequest", - "script": { - "exec": [ - "" - ], - "type": "text/javascript", - "packages": {} - } - } - ], - "request": { - "method": "POST", - "header": [], - "url": { - "raw": "{{baseUrl}}/api/v1/jobs/{{jobId}}/cancel", - "host": [ - "{{baseUrl}}" - ], - "path": [ - "api", - "v1", - "jobs", - "{{jobId}}", - "cancel" - ] - }, - "description": "Cancels a specific job." - }, - "response": [] - }, - { - "name": "Create Second Job Expect Success", - "event": [ - { - "listen": "test", - "script": { - "exec": [ - "pm.test(\"Status code is 200\", function () {", - " pm.response.to.have.status(200);", - "});", - "", - "var jsonData = pm.response.json();", - "pm.expect(jsonData.entity).to.be.a('String');", - "// Save jobId to environment variable", - "pm.collectionVariables.set(\"jobId\", jsonData.entity);" - ], - "type": "text/javascript", - "packages": {} - } - } - ], - "request": { - "method": "POST", - "header": [ - { - "key": "Content-Type", - "value": "multipart/form-data" - } - ], - "body": { - "mode": "formdata", - "formdata": [ - { - "key": "file", - "type": "file", - "src": "resources/JobQueue/odyssey.txt" - }, - { - "key": "params", - "value": "{\n \"nLines\":\"1\"\n}", - "type": "text" - } - ] - }, - "url": { - "raw": "{{baseUrl}}/api/v1/jobs/{{queueName}}", - "host": [ - "{{baseUrl}}" - ], - "path": [ - "api", - "v1", - "jobs", - "{{queueName}}" - ] - }, - "description": "Creates a new job in the specified queue." - }, - "response": [] - }, - { - "name": "Active Jobs", + "name": "Get all active Jobs", "event": [ { "listen": "test", @@ -520,6 +345,11 @@ " pm.expect(entity).to.have.property(\"jobs\").that.is.an(\"array\").with.lengthOf(entity.total);", "});", "", + "// Check 'jobs' array length", + "pm.test(\"'jobs' is an array with the correct length\", function () {", + " pm.expect(entity).to.have.property(\"jobs\").that.is.an(\"array\").with.lengthOf(1);", + "});", + "", "// Iterate over each job in the 'jobs' array", "entity.jobs.forEach((job, index) => {", " pm.test(`Job ${index + 1}: 'completedAt' is null or a valid date`, function () {", @@ -638,7 +468,7 @@ "method": "GET", "header": [], "url": { - "raw": "{{baseUrl}}/api/v1/jobs/{{queueName}}/active?page={{page}}&pageSize={{pageSize}}", + "raw": "{{baseUrl}}/api/v1/jobs/active?page={{page}}&pageSize={{pageSize}}", "host": [ "{{baseUrl}}" ], @@ -646,7 +476,6 @@ "api", "v1", "jobs", - "{{queueName}}", "active" ], "query": [ @@ -662,98 +491,563 @@ } ] }, - "description": "Lists active jobs for a specific queue with pagination." + "description": "Lists active jobs with pagination." }, "response": [] }, { - "name": "Create Failing Job", + "name": "Active Jobs by queue", "event": [ { "listen": "test", "script": { "exec": [ - "pm.test(\"Status code is 200\", function () {", + "", + "// Store the response in a variable", + "let response = pm.response.json();", + "", + "// Validate that the response status is 200 OK", + "pm.test(\"Response status is 200\", function () {", " pm.response.to.have.status(200);", "});", "", - "var jsonData = pm.response.json();", - "pm.expect(jsonData.entity).to.be.a('String');", - "// Save jobId to environment variable", - "pm.environment.set(\"failingJobId\", jsonData.entity);", - "" - ], - "type": "text/javascript", - "packages": {} - } - } - ], - "request": { - "method": "POST", - "header": [ - { - "key": "Content-Type", - "value": "multipart/form-data" - } - ], - "body": { - "mode": "formdata", - "formdata": [ - { - "key": "file", - "type": "file", - "src": [], - "disabled": true - }, - { - "key": "params", - "value": "{\n\n}", - "type": "text" - } - ] - }, - "url": { - "raw": "{{baseUrl}}/api/v1/jobs/fail", - "host": [ - "{{baseUrl}}" - ], - "path": [ - "api", - "v1", - "jobs", - "fail" - ] - }, - "description": "Creates a new job in the specified queue (Create Failing Job)" - }, - "response": [] - }, - { - "name": "Monitor Non Existing Job", - "event": [ - { - "listen": "test", - "script": { - "exec": [ - "pm.test(\"Status code is 200\", function () {", - " pm.response.to.have.status(200);", + "// Check if the 'entity' object exists", + "pm.test(\"'entity' object exists\", function () {", + " pm.expect(response).to.have.property(\"entity\");", "});", "", - "pm.test(\"Response contains job-not-found event and 404 data\", function () {", - " const responseText = pm.response.text();", - " pm.expect(responseText).to.include(\"event: job-not-found\");", - " pm.expect(responseText).to.include(\"data: 404\");", + "// Validate the fields within `entity`", + "let entity = response.entity;", + "", + "// Check that 'jobs' is an array and validate its length", + "pm.test(\"'jobs' is an array with the correct length\", function () {", + " pm.expect(entity).to.have.property(\"jobs\").that.is.an(\"array\").with.lengthOf(entity.total);", "});", - "" - ], - "type": "text/javascript", - "packages": {} - } - } - ], - "request": { - "method": "GET", - "header": [ + "", + "// Iterate over each job in the 'jobs' array", + "entity.jobs.forEach((job, index) => {", + " pm.test(`Job ${index + 1}: 'completedAt' is null or a valid date`, function () {", + " pm.expect(job.completedAt).to.satisfy(function(val) {", + " return val === null || new Date(val).toString() !== \"Invalid Date\";", + " });", + " });", + "", + " pm.test(`Job ${index + 1}: 'createdAt' is a valid date string`, function () {", + " pm.expect(job.createdAt).to.be.a(\"string\");", + " pm.expect(new Date(job.createdAt)).to.not.equal(\"Invalid Date\");", + " });", + "", + " pm.test(`Job ${index + 1}: 'executionNode' is a valid UUID`, function () {", + " pm.expect(job.executionNode).to.match(/^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$/);", + " });", + "", + " pm.test(`Job ${index + 1}: 'id' is a valid UUID`, function () {", + " pm.expect(job.id).to.match(/^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$/);", + " });", + "", + " // Validate the `parameters` object", + " let parameters = job.parameters;", + "", + " pm.test(`Job ${index + 1}: 'parameters' contains expected keys with valid values`, function () {", + " pm.expect(parameters).to.have.property(\"nLines\").that.is.a(\"string\");", + " pm.expect(parameters).to.have.property(\"requestFingerPrint\").that.is.a(\"string\");", + " pm.expect(parameters.requestFingerPrint).to.have.lengthOf(44); // Typical length for SHA-256 in Base64", + " pm.expect(parameters).to.have.property(\"tempFileId\").that.is.a(\"string\");", + " });", + "", + " pm.test(`Job ${index + 1}: 'progress' is a number between 0 and 1`, function () {", + " pm.expect(job.progress).to.be.a(\"number\").within(0, 1);", + " });", + "", + " pm.test(`Job ${index + 1}: 'queueName' is a non-empty string`, function () {", + " pm.expect(job.queueName).to.be.a(\"string\").that.is.not.empty;", + " });", + "", + " pm.test(`Job ${index + 1}: 'result' is null or an object`, function () {", + " pm.expect(job.result === null || typeof job.result === \"object\").to.be.true;", + " });", + "", + " pm.test(`Job ${index + 1}: 'retryCount' is a non-negative integer`, function () {", + " pm.expect(job.retryCount).to.be.a(\"number\").that.is.at.least(0);", + " });", + "", + " pm.test(`Job ${index + 1}: 'startedAt' is null or a valid date`, function () {", + " pm.expect(job.startedAt).to.satisfy(function(val) {", + " return val === null || new Date(val).toString() !== \"Invalid Date\";", + " });", + " });", + "", + " pm.test(`Job ${index + 1}: 'state' is a non-empty string`, function () {", + " pm.expect(job.state).to.be.a(\"string\").that.is.not.empty;", + " });", + "", + " pm.test(`Job ${index + 1}: 'updatedAt' is a valid date string`, function () {", + " pm.expect(job.updatedAt).to.be.a(\"string\");", + " pm.expect(new Date(job.updatedAt)).to.not.equal(\"Invalid Date\");", + " });", + "});", + "", + "//Look for the last created job ", + "let jobsArray = entity.jobs;", + "", + "var jobId = pm.collectionVariables.get(\"jobId\");", + "pm.test(\"jobId is present in the response\", function () {", + " var jobFound = jobsArray.some(function(job) {", + " return job.id === jobId;", + " });", + " pm.expect(jobFound).to.be.true;", + "});", + "", + "// Validate pagination fields within `entity`", + "pm.test(\"'page' is a positive integer\", function () {", + " pm.expect(entity.page).to.be.a(\"number\").that.is.at.least(1);", + "});", + "", + "pm.test(\"'pageSize' is a positive integer\", function () {", + " pm.expect(entity.pageSize).to.be.a(\"number\").that.is.at.least(1);", + "});", + "", + "pm.test(\"'total' matches the length of 'jobs' array\", function () {", + " pm.expect(entity.total).to.equal(entity.jobs.length);", + "});", + "", + "// Validate other top-level objects in the response", + "pm.test(\"'errors' is an empty array\", function () {", + " pm.expect(response.errors).to.be.an(\"array\").that.is.empty;", + "});", + "", + "pm.test(\"'i18nMessagesMap' is an empty object\", function () {", + " pm.expect(response.i18nMessagesMap).to.be.an(\"object\").that.is.empty;", + "});", + "", + "pm.test(\"'messages' is an empty array\", function () {", + " pm.expect(response.messages).to.be.an(\"array\").that.is.empty;", + "});", + "", + "pm.test(\"'pagination' is null\", function () {", + " pm.expect(response.pagination).to.be.null;", + "});", + "", + "pm.test(\"'permissions' is an empty array\", function () {", + " pm.expect(response.permissions).to.be.an(\"array\").that.is.empty;", + "});", + "" + ], + "type": "text/javascript", + "packages": {} + } + } + ], + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{baseUrl}}/api/v1/jobs/{{queueName}}/active?page={{page}}&pageSize={{pageSize}}", + "host": [ + "{{baseUrl}}" + ], + "path": [ + "api", + "v1", + "jobs", + "{{queueName}}", + "active" + ], + "query": [ + { + "key": "page", + "value": "{{page}}", + "description": "Page number" + }, + { + "key": "pageSize", + "value": "{{pageSize}}", + "description": "Number of items per page" + } + ] + }, + "description": "Lists active jobs for a specific queue with pagination." + }, + "response": [] + }, + { + "name": "Waiting Job to start execution", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "const maxTimeout = 30000; // 10 seconds", + "const maxRetries = 10; // Maximum number of retry attempts", + "const startTime = parseInt(pm.environment.get(\"startTime\"));", + "const retryCount = parseInt(pm.environment.get(\"retryCount\"));", + "const elapsedTime = Date.now() - startTime;", + "", + "console.log(`Attempt ${retryCount + 1}, Elapsed time: ${elapsedTime}ms`);", + "", + "var response = pm.response.json();", + "console.log(\"Current job state:\", response.entity.state);", + " ", + "// Check if job status is not \"PENDING\"", + "if (response.entity.state !== \"PENDING\") {", + "", + " console.log(`Job transitioned to ${response.entity.state}`);", + " pm.test(`Job transitioned out of PENDING state to ${response.entity.state}`, function() {", + " pm.expect(response.entity.state).to.not.equal(\"PENDING\");", + " });", + "", + " // Clear environment variables once done", + " pm.environment.unset(\"startTime\");", + " pm.environment.unset(\"retryCount\");", + "} else if (elapsedTime < maxTimeout && retryCount < maxRetries) {", + " // Increment retry count", + " pm.environment.set(\"retryCount\", retryCount + 1);", + " ", + " setTimeout(function(){", + " console.log(\"Sleeping for 3 seconds before next request.\");", + " }, 3000);", + " postman.setNextRequest(\"Waiting Job to start execution\");", + " console.log(`Job still in PENDING state, retrying... (${maxTimeout - elapsedTime}ms remaining)`);", + "} else {", + " // If we exceed the max timeout or max retries, fail the test", + " const timeoutReason = elapsedTime >= maxTimeout ? \"timeout\" : \"max retries\";", + " pm.environment.unset(\"startTime\");", + " pm.environment.unset(\"retryCount\");", + " pm.test(`Job state check failed due to ${timeoutReason}`, function () {", + " pm.expect.fail(`${timeoutReason} reached after ${elapsedTime}ms. Job still in PENDING state after ${retryCount} attempts`);", + " });", + "}", + "", + "// Add response validation", + "pm.test(\"Response is successful\", function () {", + " pm.response.to.be.success;", + " pm.response.to.have.status(200);", + "});", + "", + "pm.test(\"Response has the correct structure\", function () {", + " const response = pm.response.json();", + " pm.expect(response).to.have.property('entity');", + " pm.expect(response.entity).to.have.property('state');", + "});" + ], + "type": "text/javascript", + "packages": {} + } + }, + { + "listen": "prerequest", + "script": { + "exec": [ + "if (!pm.environment.get(\"startTime\")) {", + " pm.environment.set(\"startTime\", Date.now());", + "}", + "", + "if (!pm.environment.get(\"retryCount\")) {", + " pm.environment.set(\"retryCount\", 0);", + "}" + ], + "type": "text/javascript", + "packages": {} + } + } + ], + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{baseUrl}}/api/v1/jobs/{{jobId}}/status", + "host": [ + "{{baseUrl}}" + ], + "path": [ + "api", + "v1", + "jobs", + "{{jobId}}", + "status" + ] + }, + "description": "Retrieves the status of a specific job." + }, + "response": [] + }, + { + "name": "Cancel Job", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Status code is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "// Check if cancellation message is returned", + "var jsonData = pm.response.json();", + "pm.test(\"Job cancelled successfully\", function () {", + " pm.expect(jsonData.entity).to.include('Cancellation request successfully sent to job');", + "});", + "", + "var jobId = pm.collectionVariables.get(\"jobId\");", + "console.log(\" At the time this request was sent \" + jobId);", + "pm.collectionVariables.set(\"cancelledJobId\",jobId);" + ], + "type": "text/javascript", + "packages": {} + } + }, + { + "listen": "prerequest", + "script": { + "exec": [ + "" + ], + "type": "text/javascript", + "packages": {} + } + } + ], + "request": { + "method": "POST", + "header": [], + "url": { + "raw": "{{baseUrl}}/api/v1/jobs/{{jobId}}/cancel", + "host": [ + "{{baseUrl}}" + ], + "path": [ + "api", + "v1", + "jobs", + "{{jobId}}", + "cancel" + ] + }, + "description": "Cancels a specific job." + }, + "response": [] + }, + { + "name": "Create Failing Job", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Status code is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "var jsonData = pm.response.json();", + "pm.expect(jsonData.entity).to.be.a('String');", + "// Save jobId to environment variable", + "pm.environment.set(\"failingJobId\", jsonData.entity);", + "" + ], + "type": "text/javascript", + "packages": {} + } + } + ], + "request": { + "method": "POST", + "header": [], + "body": { + "mode": "raw", + "raw": "{\n \"fail\": true\n}", + "options": { + "raw": { + "language": "json" + } + } + }, + "url": { + "raw": "{{baseUrl}}/api/v1/jobs/failSuccess", + "host": [ + "{{baseUrl}}" + ], + "path": [ + "api", + "v1", + "jobs", + "failSuccess" + ] + }, + "description": "Creates a new job in the specified queue (Create Failing Job)" + }, + "response": [] + }, + { + "name": "Create Success Job", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Status code is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "var jsonData = pm.response.json();", + "pm.expect(jsonData.entity).to.be.a('String');", + "// Save jobId to environment variable", + "pm.environment.set(\"successJobId\", jsonData.entity);", + "" + ], + "type": "text/javascript", + "packages": {} + } + } + ], + "request": { + "method": "POST", + "header": [], + "body": { + "mode": "raw", + "raw": "{}", + "options": { + "raw": { + "language": "json" + } + } + }, + "url": { + "raw": "{{baseUrl}}/api/v1/jobs/failSuccess", + "host": [ + "{{baseUrl}}" + ], + "path": [ + "api", + "v1", + "jobs", + "failSuccess" + ] + }, + "description": "Creates a new job in the specified queue (Create a job that will finish sucessfully)" + }, + "response": [] + }, + { + "name": "Waiting Job to complete", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "const maxTimeout = 30000; // 10 seconds", + "const maxRetries = 10; // Maximum number of retry attempts", + "const startTime = parseInt(pm.environment.get(\"startTime\"));", + "const retryCount = parseInt(pm.environment.get(\"retryCount\"));", + "const elapsedTime = Date.now() - startTime;", + "", + "console.log(`Attempt ${retryCount + 1}, Elapsed time: ${elapsedTime}ms`);", + "", + "var response = pm.response.json();", + "console.log(\"Current job state:\", response.entity.state);", + " ", + "// Check if job status is \"COMPLETED\"", + "if (response.entity.state === \"COMPLETED\") {", + " // Clear environment variables once done", + " pm.environment.unset(\"startTime\");", + " pm.environment.unset(\"retryCount\");", + "} else if (elapsedTime < maxTimeout && retryCount < maxRetries) {", + " // Increment retry count", + " pm.environment.set(\"retryCount\", retryCount + 1);", + " ", + " setTimeout(function(){", + " console.log(\"Sleeping for 3 seconds before next request.\");", + " }, 3000);", + " postman.setNextRequest(\"Waiting Job to complete\");", + " console.log(`Job still processing, retrying... (${maxTimeout - elapsedTime}ms remaining)`);", + "} else {", + " // If we exceed the max timeout or max retries, fail the test", + " const timeoutReason = elapsedTime >= maxTimeout ? \"timeout\" : \"max retries\";", + " pm.environment.unset(\"startTime\");", + " pm.environment.unset(\"retryCount\");", + " pm.test(`Job state check failed due to ${timeoutReason}`, function () {", + " pm.expect.fail(`${timeoutReason} reached after ${elapsedTime}ms. Job still in processing state after ${retryCount} attempts`);", + " });", + "}", + "", + "// Add response validation", + "pm.test(\"Response is successful\", function () {", + " pm.response.to.be.success;", + " pm.response.to.have.status(200);", + "});", + "", + "pm.test(\"Response has the correct structure\", function () {", + " const response = pm.response.json();", + " pm.expect(response).to.have.property('entity');", + " pm.expect(response.entity).to.have.property('state');", + "});" + ], + "type": "text/javascript", + "packages": {} + } + }, + { + "listen": "prerequest", + "script": { + "exec": [ + "if (!pm.environment.get(\"startTime\")) {", + " pm.environment.set(\"startTime\", Date.now());", + "}", + "", + "if (!pm.environment.get(\"retryCount\")) {", + " pm.environment.set(\"retryCount\", 0);", + "}" + ], + "type": "text/javascript", + "packages": {} + } + } + ], + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{baseUrl}}/api/v1/jobs/{{successJobId}}/status", + "host": [ + "{{baseUrl}}" + ], + "path": [ + "api", + "v1", + "jobs", + "{{successJobId}}", + "status" + ] + }, + "description": "Retrieves the status of a specific job." + }, + "response": [] + }, + { + "name": "Monitor Non Existing Job", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Status code is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "pm.test(\"Response contains job-not-found event and 404 data\", function () {", + " const responseText = pm.response.text();", + " pm.expect(responseText).to.include(\"event: job-not-found\");", + " pm.expect(responseText).to.include(\"data: 404\");", + "});", + "" + ], + "type": "text/javascript", + "packages": {} + } + } + ], + "request": { + "method": "GET", + "header": [ { "key": "Accept", "value": "text/event-stream" @@ -906,7 +1200,160 @@ "response": [] }, { - "name": "Failed Jobs", + "name": "Get all canceled Jobs", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "// Get the expected job ID from collection variables", + "const jobId = pm.collectionVariables.get('cancelledJobId');", + "", + "// Parse the response JSON", + "const response = pm.response.json();", + "", + "// Validate that the response status is 200 OK", + "pm.test(\"Response status is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "// Validate that the response contains an \"entity.jobs\" array", + "pm.test(\"Response should contain jobs array\", function () {", + " pm.expect(response.entity).to.have.property(\"jobs\");", + " pm.expect(response.entity.jobs).to.be.an(\"array\");", + "});", + "", + "// Validate that the jobs array contains only one job", + "pm.test(\"Jobs array should contain only one job\", function () {", + " pm.expect(response.entity.jobs.length).to.eql(1);", + "});", + "", + "// Validate that the job ID in the response matches the expected job ID", + "pm.test(\"Job ID should match expected job ID\", function () {", + " pm.expect(response.entity.jobs[0].id).to.eql(jobId);", + "});" + ], + "type": "text/javascript", + "packages": {} + } + }, + { + "listen": "prerequest", + "script": { + "exec": [ + "" + ], + "type": "text/javascript", + "packages": {} + } + } + ], + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{baseUrl}}/api/v1/jobs/canceled?page={{page}}&pageSize={{pageSize}}", + "host": [ + "{{baseUrl}}" + ], + "path": [ + "api", + "v1", + "jobs", + "canceled" + ], + "query": [ + { + "key": "page", + "value": "{{page}}" + }, + { + "key": "pageSize", + "value": "{{pageSize}}" + } + ] + }, + "description": "Lists canceled jobs with pagination." + }, + "response": [] + }, + { + "name": "Get all completed Jobs", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "// Parse the response JSON", + "const response = pm.response.json();", + "", + "// Validate that the response status is 200 OK", + "pm.test(\"Response status is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "// Validate that the response contains an \"entity.jobs\" array", + "pm.test(\"Response should contain jobs array\", function () {", + " pm.expect(response.entity).to.have.property(\"jobs\");", + " pm.expect(response.entity.jobs).to.be.an(\"array\");", + "});", + "", + "// Validate that the jobs array contains only one job", + "pm.test(\"Jobs array should contain only one job\", function () {", + " pm.expect(response.entity.jobs.length).to.eql(1);", + "});", + "", + "// Validate that the job ID in the response matches the expected job ID", + "pm.test(\"Job ID should match expected job ID\", function () {", + " pm.expect(response.entity.jobs[0].id).to.eql(pm.environment.get('successJobId'));", + "});" + ], + "type": "text/javascript", + "packages": {} + } + }, + { + "listen": "prerequest", + "script": { + "exec": [ + "" + ], + "type": "text/javascript", + "packages": {} + } + } + ], + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{baseUrl}}/api/v1/jobs/completed?page={{page}}&pageSize={{pageSize}}", + "host": [ + "{{baseUrl}}" + ], + "path": [ + "api", + "v1", + "jobs", + "completed" + ], + "query": [ + { + "key": "page", + "value": "{{page}}" + }, + { + "key": "pageSize", + "value": "{{pageSize}}" + } + ] + }, + "description": "Lists completed jobs with pagination." + }, + "response": [] + }, + { + "name": "Get all failed Jobs", "event": [ { "listen": "test",