Skip to content

Commit

Permalink
NXP-32313: Remove nested BAF for evaluating event based rules
Browse files Browse the repository at this point in the history
  • Loading branch information
guirenard committed Mar 6, 2024
1 parent d1d49c8 commit abc1b48
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.nuxeo.retention;

import org.nuxeo.ecm.core.api.CoreSession;
import org.nuxeo.ecm.core.query.sql.NXQL;
import org.nuxeo.retention.adapters.RetentionRule;

/**
Expand Down Expand Up @@ -93,7 +94,7 @@ public class RetentionConstants {
*/
public static final String EVENT_INPUT_REGEX = "^[a-zA-Z0-9-_.\\s]+$";

public static final String ACTIVE_EVENT_BASED_RETENTION_RULES_QUERY = "SELECT * FROM Document" //
public static final String ACTIVE_EVENT_BASED_RETENTION_RULES_QUERY = "SELECT " + NXQL.ECM_UUID + " FROM Document" //
+ " WHERE ecm:mixinType = '" + RETENTION_RULE_FACET + "'" //
+ " AND ecm:isTrashed = 0" //
+ " AND ecm:isVersion = 0" //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.STATUS_STREAM;
import static org.nuxeo.lib.stream.computation.AbstractComputation.INPUT_1;
import static org.nuxeo.lib.stream.computation.AbstractComputation.OUTPUT_1;
import static org.nuxeo.retention.actions.ProcessRetentionEventAction.ACTION_EVENT_ID_PARAM;
import static org.nuxeo.retention.actions.ProcessRetentionEventAction.ACTION_EVENT_INPUT_PARAM;

import java.io.Serializable;
import java.util.Arrays;
Expand Down Expand Up @@ -56,6 +54,10 @@ public class EvalInputEventBasedRuleAction implements StreamProcessorTopology {

public static final String ACTION_FULL_NAME = "retention/" + ACTION_NAME;

public static final String ACTION_EVENT_INPUT_PARAM = "eventInput";

public static final String ACTION_EVENT_ID_PARAM = "eventId";

@Override
public Topology getTopology(Map<String, String> options) {
return Topology.builder()
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,39 @@
package org.nuxeo.retention.listeners;

import static org.nuxeo.ecm.core.api.security.SecurityConstants.SYSTEM_USERNAME;
import static org.nuxeo.retention.actions.ProcessRetentionEventAction.ACTION_EVENT_ID_PARAM;
import static org.nuxeo.retention.actions.ProcessRetentionEventAction.ACTION_EVENT_INPUT_PARAM;
import static org.nuxeo.ecm.core.query.sql.NXQL.ECM_UUID;
import static org.nuxeo.retention.RetentionConstants.ACTIVE_EVENT_BASED_RETENTION_RULES_QUERY;
import static org.nuxeo.retention.RetentionConstants.RECORD_RULE_IDS_PROP;
import static org.nuxeo.retention.RetentionConstants.RULE_RECORD_DOCUMENT_QUERY;
import static org.nuxeo.retention.RetentionConstants.STARTING_POINT_EVENT_PROP;
import static org.nuxeo.retention.actions.EvalInputEventBasedRuleAction.ACTION_EVENT_ID_PARAM;
import static org.nuxeo.retention.actions.EvalInputEventBasedRuleAction.ACTION_EVENT_INPUT_PARAM;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.nuxeo.ecm.core.api.CoreInstance;
import org.nuxeo.ecm.core.api.CoreSession;
import org.nuxeo.ecm.core.api.PartialList;
import org.nuxeo.ecm.core.bulk.BulkService;
import org.nuxeo.ecm.core.bulk.message.BulkCommand;
import org.nuxeo.ecm.core.event.Event;
import org.nuxeo.ecm.core.event.EventContext;
import org.nuxeo.ecm.core.event.EventListener;
import org.nuxeo.ecm.core.query.sql.NXQL;
import org.nuxeo.ecm.core.repository.RepositoryService;
import org.nuxeo.retention.RetentionConstants;
import org.nuxeo.retention.actions.ProcessRetentionEventAction;
import org.nuxeo.retention.actions.EvalInputEventBasedRuleAction;
import org.nuxeo.retention.event.RetentionEventContext;
import org.nuxeo.runtime.api.Framework;

/**
* Listener processing events with a {@link org.nuxeo.retention.event.RetentionEventContext}). The listener schedules a
* {@link org.nuxeo.retention.actions.ProcessRetentionEventAction} on a query retrieving all the retention rules
* targeting the listened event.
* {@link org.nuxeo.retention.actions.EvalInputEventBasedRuleAction} on a query retrieving all the records attached to
* all the retention rules targeting the listened event.
*
* @since 11.1
*/
Expand All @@ -56,21 +68,31 @@ public void handleEvent(Event event) {
String eventInput = ((RetentionEventContext) evtCtx).getInput();
BulkService bulkService = Framework.getService(BulkService.class);
RepositoryService repositoryService = Framework.getService(RepositoryService.class);
StringBuilder query = new StringBuilder(RetentionConstants.ACTIVE_EVENT_BASED_RETENTION_RULES_QUERY);
query.append(" AND ") //
// Only with event name
.append(RetentionConstants.STARTING_POINT_EVENT_PROP)
.append(" = ")
.append(NXQL.escapeString(eventName));
for (String repositoryName : repositoryService.getRepositoryNames()) {
BulkCommand command = new BulkCommand.Builder(ProcessRetentionEventAction.ACTION_NAME, query.toString(),
SYSTEM_USERNAME).param(ACTION_EVENT_ID_PARAM, eventName)
.param(ACTION_EVENT_INPUT_PARAM, eventInput)
.repository(repositoryName)
.build();
StringBuilder query = new StringBuilder(RULE_RECORD_DOCUMENT_QUERY);
var rulesIds = getEventBasedRuleIdsForEvent(eventName, repositoryName);
query.append(" AND ") //
.append(RECORD_RULE_IDS_PROP) //
.append(String.format(" IN ('%s')", rulesIds.stream().collect(Collectors.joining("', '"))));
BulkCommand command = new BulkCommand.Builder(EvalInputEventBasedRuleAction.ACTION_NAME,
query.toString(), SYSTEM_USERNAME).param(ACTION_EVENT_ID_PARAM, eventName)
.param(ACTION_EVENT_INPUT_PARAM, eventInput)
.repository(repositoryName)
.build();
bulkService.submit(command);
}
}
}

protected List<String> getEventBasedRuleIdsForEvent(String eventName, String repository) {
StringBuilder query = new StringBuilder(ACTIVE_EVENT_BASED_RETENTION_RULES_QUERY);
query.append(" AND ") //
.append(STARTING_POINT_EVENT_PROP)
.append(" = ")
.append(NXQL.escapeString(eventName));
CoreSession session = CoreInstance.getCoreSession(repository);
PartialList<Map<String, Serializable>> results = session.queryProjection(query.toString(), 0, 0);
return results.stream().map(m -> (String) m.get(ECM_UUID)).collect(Collectors.toList());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
<action name="unholdDocumentsAction" inputStream="retention/unholdDocumentsAction" bucketSize="100" batchSize="20" />
<action name="attachRetentionRule" inputStream="retention/attachRetentionRule" bucketSize="100" batchSize="20" />
<action name="evalInputEventBasedRule" inputStream="retention/evalInputEventBasedRule" bucketSize="100" batchSize="20" />
<action name="processRetentionEvent" inputStream="retention/processRetentionEvent" bucketSize="100" batchSize="20" />
</extension>

<extension target="org.nuxeo.runtime.stream.service" point="streamProcessor">
Expand All @@ -30,11 +29,6 @@
defaultPartitions="${nuxeo.bulk.action.evalInputEventBasedRule.defaultPartitions:=4}">
<policy name="default" maxRetries="3" delay="1s" maxDelay="10s" continueOnFailure="true" />
</streamProcessor>
<streamProcessor name="processRetentionEvent" class="org.nuxeo.retention.actions.ProcessRetentionEventAction"
defaultConcurrency="${nuxeo.bulk.action.processRetentionEvent.defaultConcurrency:=2}"
defaultPartitions="${nuxeo.bulk.action.processRetentionEvent.defaultPartitions:=4}">
<policy name="default" maxRetries="3" delay="1s" maxDelay="10s" continueOnFailure="true" />
</streamProcessor>
</extension>

</component>
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.nuxeo.retention.RetentionConstants.RECORD_MANAGER_GROUP_NAME;

import java.time.Duration;
import java.util.Calendar;
import java.util.Collections;
import java.util.List;

import javax.inject.Inject;

import org.junit.Test;
import org.nuxeo.ecm.core.api.CoreInstance;
import org.nuxeo.ecm.core.api.CoreSession;
import org.nuxeo.ecm.core.api.DocumentModel;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.core.api.event.DocumentEventTypes;
Expand Down Expand Up @@ -263,19 +267,21 @@ public void testManualEventBasedRuleWithExpressionOnCustomEvent() throws Interru
assertTrue(session.isUnderRetentionOrLegalHold(file.getRef()));
assertTrue(record.isRetentionIndeterminate());

// Trigger event with unexpected input
service.fireRetentionEvent(retentionEventId, triggeringEventValue, false, session);
// Trigger event with unexpected input from non admin user with proper role
CoreSession userSession = CoreInstance.getCoreSession(session.getRepositoryName(), "user");
userSession.getPrincipal().setGroups(Collections.singletonList(RECORD_MANAGER_GROUP_NAME));
service.fireRetentionEvent(retentionEventId, triggeringEventValue, false, userSession);
awaitRetentionExpiration(500);
// Check record is still under indeterminate retention
file = session.getDocument(file.getRef());
record = file.getAdapter(Record.class);
assertTrue(session.isUnderRetentionOrLegalHold(file.getRef()));
assertTrue(record.isRetentionIndeterminate());

// Trigger event with expected input
// Trigger event with expected input from non admin user with proper role
file.setPropertyValue("dc:title", triggeringEventValue);
file = session.saveDocument(file);
service.fireRetentionEvent(retentionEventId, triggeringEventValue, false, session);
service.fireRetentionEvent(retentionEventId, triggeringEventValue, false, userSession);
awaitRetentionExpiration(500);
// Check record is no longer under indeterminate retention
file = session.getDocument(file.getRef());
Expand Down

0 comments on commit abc1b48

Please sign in to comment.