Skip to content

Commit

Permalink
Feat (Core): Migrate import contentlets action to job processor (#30432)
Browse files Browse the repository at this point in the history
This pull request introduces a new processor
`ImportContentletsProcessor`, which replicates the functionality we have
in the `ImportContentletsAction` but using the new jobs
infrastructureseveral, also enhancements and refactoring to the
`JobQueueManagerAPI` and related classes. The key changes include adding
new methods for retrieving job lists, integrating a retry policy
processor, and improving the real-time job monitoring system.

### Enhancements to Job Retrieval:

* Added new methods to `JobQueueManagerAPI` for retrieving lists of
active, completed, canceled, and failed jobs with pagination support.
(`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java`)
[[1]](diffhunk://#diff-97639376a50922f533c812eb8848f70e1913df8f14f0fb5bb582243f5660e465R90-R101)
[[2]](diffhunk://#diff-97639376a50922f533c812eb8848f70e1913df8f14f0fb5bb582243f5660e465L100-R148)
* Implemented these methods in `JobQueueManagerAPIImpl` to interact with
the job queue and handle exceptions.
(`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java`)
[[1]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561R303-R313)
[[2]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L304-R364)

### Retry Policy Integration:

* Introduced `RetryPolicyProcessor` to process retry policies for job
processors and integrated it into `JobQueueManagerAPIImpl`.
(`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java`)
[[1]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561R111)
[[2]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561R251-R256)
* Modified the constructor of `JobQueueManagerAPIImpl` to include the
`RetryPolicyProcessor` parameter.
(`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java`)
[[1]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L144-R151)
[[2]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561R160-R161)
[[3]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L166)

### Real-Time Job Monitoring:

* Enhanced `RealTimeJobMonitor` to support filtered watching through
predicates, allowing clients to receive specific job updates. Added
detailed documentation and examples for usage.
(`dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java`)
[[1]](diffhunk://#diff-ff9af41ece416b81fb0f12a7a8a9a8e5fe54ccbb278f4121bb0fce04b149cc36R4-R129)
[[2]](diffhunk://#diff-ff9af41ece416b81fb0f12a7a8a9a8e5fe54ccbb278f4121bb0fce04b149cc36L59-R155)
* Created `AbstractJobWatcher` to encapsulate a watcher and its filter
predicate, supporting the new monitoring functionality.
(`dotCMS/src/main/java/com/dotcms/jobs/business/api/events/AbstractJobWatcher.java`)

### Code Refactoring:

* Simplified method signatures and improved exception handling in
`JobQueueManagerAPIImpl`.
(`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java`)
[[1]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L685-R736)
[[2]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561R860-R882)
* Added missing Javadoc comments and improved existing ones for better
clarity and documentation.
(`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java`)
[[1]](diffhunk://#diff-97639376a50922f533c812eb8848f70e1913df8f14f0fb5bb582243f5660e465R64)
[[2]](diffhunk://#diff-97639376a50922f533c812eb8848f70e1913df8f14f0fb5bb582243f5660e465R178)

These changes collectively improve the robustness, maintainability, and
functionality of the job management and monitoring systems in the
`dotCMS` project.
  • Loading branch information
jgambarios authored and spbolton committed Nov 11, 2024
1 parent 6216748 commit 73623f2
Show file tree
Hide file tree
Showing 30 changed files with 3,134 additions and 489 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ public interface JobQueueManagerAPI {

/**
* Retrieves the job processors for all registered queues.
*
* @return A map of queue names to job processors
*/
Map<String,Class<? extends JobProcessor>> getQueueNames();
Map<String, Class<? extends JobProcessor>> getQueueNames();

/**
* Creates a new job in the specified queue.
Expand All @@ -86,6 +87,18 @@ String createJob(String queueName, Map<String, Object> parameters)
*/
Job getJob(String jobId) throws DotDataException;

/**
* Retrieves a list of active jobs for a specific queue.
*
* @param queueName The name of the queue
* @param page The page number
* @param pageSize The number of jobs per page
* @return A result object containing the list of active jobs and pagination information.
* @throws JobQueueDataException if there's an error fetching the jobs
*/
JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize)
throws JobQueueDataException;

/**
* Retrieves a list of jobs.
*
Expand All @@ -97,21 +110,42 @@ String createJob(String queueName, Map<String, Object> parameters)
JobPaginatedResult getJobs(int page, int pageSize) throws DotDataException;

/**
* Retrieves a list of active jobs for a specific queue.
* @param queueName The name of the queue
* @param page The page number
* @param pageSize The number of jobs per page
* Retrieves a list of active jobs, meaning jobs that are currently being processed.
*
* @param page The page number
* @param pageSize The number of jobs per page
* @return A result object containing the list of active jobs and pagination information.
* @throws JobQueueDataException if there's an error fetching the jobs
*/
JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) throws JobQueueDataException;
JobPaginatedResult getActiveJobs(int page, int pageSize) throws JobQueueDataException;

/**
* Retrieves a list of completed jobs for a specific queue within a date range.
* Retrieves a list of completed jobs
*
* @param page The page number
* @param pageSize The number of jobs per page
* @return A result object containing the list of completed jobs and pagination information.
* @throws JobQueueDataException
* @throws JobQueueDataException if there's an error fetching the jobs
*/
JobPaginatedResult getCompletedJobs(int page, int pageSize) throws JobQueueDataException;

/**
* Retrieves a list of canceled jobs
*
* @param page The page number
* @param pageSize The number of jobs per page
* @return A result object containing the list of canceled jobs and pagination information.
* @throws JobQueueDataException if there's an error fetching the jobs
*/
JobPaginatedResult getCanceledJobs(int page, int pageSize) throws JobQueueDataException;

/**
* Retrieves a list of failed jobs
*
* @param page The page number
* @param pageSize The number of jobs per page
* @return A result object containing the list of failed jobs and pagination information.
* @throws JobQueueDataException if there's an error fetching the jobs
*/
JobPaginatedResult getFailedJobs(int page, int pageSize) throws JobQueueDataException;

Expand Down Expand Up @@ -141,6 +175,7 @@ String createJob(String queueName, Map<String, Object> parameters)

/**
* Retrieves the retry strategy for a specific queue.
*
* @param jobId The ID of the job
* @return The processor instance, or an empty optional if not found
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
import com.dotcms.jobs.business.error.CircuitBreaker;
import com.dotcms.jobs.business.error.ErrorDetail;
import com.dotcms.jobs.business.error.JobProcessorNotFoundException;
import com.dotcms.jobs.business.error.RetryPolicyProcessor;
import com.dotcms.jobs.business.error.RetryStrategy;
import com.dotcms.jobs.business.job.Job;
import com.dotcms.jobs.business.job.JobPaginatedResult;
import com.dotcms.jobs.business.job.JobResult;
import com.dotcms.jobs.business.job.JobState;
import com.dotcms.jobs.business.processor.Cancellable;
import com.dotcms.jobs.business.processor.DefaultProgressTracker;
import com.dotcms.jobs.business.processor.DefaultRetryStrategy;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.processor.ProgressTracker;
import com.dotcms.jobs.business.queue.JobQueue;
Expand Down Expand Up @@ -107,6 +109,7 @@ public class JobQueueManagerAPIImpl implements JobQueueManagerAPI {
private ExecutorService executorService;
private final Map<String, RetryStrategy> retryStrategies;
private final RetryStrategy defaultRetryStrategy;
private final RetryPolicyProcessor retryPolicyProcessor;

private final ScheduledExecutorService pollJobUpdatesScheduler;
private LocalDateTime lastPollJobUpdateTime = LocalDateTime.now();
Expand Down Expand Up @@ -142,10 +145,11 @@ public class JobQueueManagerAPIImpl implements JobQueueManagerAPI {
public JobQueueManagerAPIImpl(@Named("queueProducer") JobQueue jobQueue,
JobQueueConfig jobQueueConfig,
CircuitBreaker circuitBreaker,
RetryStrategy defaultRetryStrategy,
@DefaultRetryStrategy RetryStrategy defaultRetryStrategy,
RealTimeJobMonitor realTimeJobMonitor,
EventProducer eventProducer,
JobProcessorFactory jobProcessorFactory) {
JobProcessorFactory jobProcessorFactory,
RetryPolicyProcessor retryPolicyProcessor) {

this.jobQueue = jobQueue;
this.threadPoolSize = jobQueueConfig.getThreadPoolSize();
Expand All @@ -154,6 +158,8 @@ public JobQueueManagerAPIImpl(@Named("queueProducer") JobQueue jobQueue,
this.retryStrategies = new ConcurrentHashMap<>();
this.defaultRetryStrategy = defaultRetryStrategy;
this.circuitBreaker = circuitBreaker;
this.jobProcessorFactory = jobProcessorFactory;
this.retryPolicyProcessor = retryPolicyProcessor;

this.pollJobUpdatesScheduler = Executors.newSingleThreadScheduledExecutor();
pollJobUpdatesScheduler.scheduleAtFixedRate(
Expand All @@ -164,7 +170,6 @@ public JobQueueManagerAPIImpl(@Named("queueProducer") JobQueue jobQueue,
// Events
this.realTimeJobMonitor = realTimeJobMonitor;
this.eventProducer = eventProducer;
this.jobProcessorFactory = jobProcessorFactory;
}

@Override
Expand Down Expand Up @@ -244,6 +249,12 @@ public void registerProcessor(final String queueName, final Class<? extends JobP
jobProcessor.getName(), queueName));
}
processors.put(queueName, processor);

// Process the retry policy for the processor
RetryStrategy retryStrategy = retryPolicyProcessor.processRetryPolicy(processor);
if (retryStrategy != null) {
setRetryStrategy(queueName, retryStrategy);
}
}

@Override
Expand Down Expand Up @@ -290,6 +301,17 @@ public Job getJob(final String jobId) throws DotDataException {
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize)
throws JobQueueDataException {
try {
return jobQueue.getActiveJobs(queueName, page, pageSize);
} catch (JobQueueDataException e) {
throw new JobQueueDataException("Error fetching active jobs", e);
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getJobs(final int page, final int pageSize) throws DotDataException {
Expand All @@ -302,23 +324,45 @@ public JobPaginatedResult getJobs(final int page, final int pageSize) throws Dot

@CloseDBIfOpened
@Override
public JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize)
public JobPaginatedResult getActiveJobs(int page, int pageSize)
throws JobQueueDataException {
try {
return jobQueue.getActiveJobs(queueName, page, pageSize);
return jobQueue.getActiveJobs(page, pageSize);
} catch (JobQueueDataException e) {
throw new JobQueueDataException("Error fetching active jobs", e);
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getCompletedJobs(int page, int pageSize)
throws JobQueueDataException {
try {
return jobQueue.getCompletedJobs(page, pageSize);
} catch (JobQueueDataException e) {
throw new JobQueueDataException("Error fetching completed jobs", e);
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getCanceledJobs(int page, int pageSize)
throws JobQueueDataException {
try {
return jobQueue.getCanceledJobs(page, pageSize);
} catch (JobQueueDataException e) {
throw new JobQueueDataException("Error fetching canceled jobs", e);
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getFailedJobs(int page, int pageSize)
throws JobQueueDataException {
try {
return jobQueue.getFailedJobs(page, pageSize);
} catch (JobQueueDataException e) {
throw new JobQueueDataException("Error fetching active jobs", e);
throw new JobQueueDataException("Error fetching failed jobs", e);
}
}

Expand Down Expand Up @@ -683,15 +727,14 @@ private void processJob(final Job job) throws DotDataException {
"Error processing job " + runningJob.id() + ": " + e.getMessage(), e
);
handleJobFailure(
runningJob, processor, e, e.getMessage(), "Job execution"
runningJob, processor, e, "Job execution"
);
}
} else {

Logger.error(this, "No processor found for queue: " + job.queueName());
handleJobFailure(job, null, new JobProcessorNotFoundException(job.queueName()),
"No processor found for queue", "Processor selection"
);
"Processor selection");
}
}

Expand Down Expand Up @@ -815,6 +858,29 @@ private void handleJobCancellation(final Job job, final JobProcessor processor)
);
}

/**
* Handles the failure of a job
*
* @param job The job that failed.
* @param processor The processor that handled the job.
* @param exception The exception that caused the failure.
* @param processingStage The stage of processing where the failure occurred.
*/
@WrapInTransaction
private void handleJobFailure(final Job job, final JobProcessor processor,
final Exception exception, final String processingStage) throws DotDataException {

var jobFailureException = exception;
if (exception.getCause() != null) {
jobFailureException = (Exception) exception.getCause();
}

handleJobFailure(
job, processor, jobFailureException, jobFailureException.getMessage(),
processingStage
);
}

/**
* Handles the failure of a job
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.dotcms.jobs.business.api.events;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.immutables.value.Value;

/**
* Class to hold a watcher and its filter predicate.
*/
@Value.Style(typeImmutable = "*", typeAbstract = "Abstract*")
@Value.Immutable
@JsonSerialize(as = JobWatcher.class)
@JsonDeserialize(as = JobWatcher.class)
public interface AbstractJobWatcher {

/**
* Returns a Consumer that performs an operation on a Job instance.
*
* @return a Consumer of Job that defines what to do with a Job instance.
*/
Consumer<com.dotcms.jobs.business.job.Job> watcher();

/**
* Returns a predicate that can be used to filter jobs based on custom criteria.
*
* @return a Predicate object to filter Job instances
*/
Predicate<com.dotcms.jobs.business.job.Job> filter();

}
Loading

0 comments on commit 73623f2

Please sign in to comment.