diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/services/engine/handler/FileQoSStatusHandler.java b/modules/dcache-qos/src/main/java/org/dcache/qos/services/engine/handler/FileQoSStatusHandler.java index 5e33b78ef86..dfe5b0e9ecc 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/services/engine/handler/FileQoSStatusHandler.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/services/engine/handler/FileQoSStatusHandler.java @@ -85,6 +85,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -141,6 +142,11 @@ public final class FileQoSStatusHandler implements CellInfoProvider, private final AtomicLong handledExpired = new AtomicLong(0L); + /** + * This is just a concurrent implementation of a set, which is how it is used here. + */ + private final Set modifyRequests = new ConcurrentSkipListSet<>(); + private QoSRequirementsListener requirementsListener; private QoSVerificationListener verificationListener; private CellStub qosTransitionTopic; @@ -228,43 +234,59 @@ public void handleClearCacheLocation(PnfsId pnfsId, String pool) { public MessageReply handleQoSModification( QoSRequirementsModifiedMessage message) { counters.increment(QOS_MODIFIED.name()); + modifyRequests.add(message.getRequirements().getPnfsId()); MessageReply reply = new MessageReply<>(); qosModifyExecutor.submit(() -> { final FileQoSRequirements requirements = message.getRequirements(); final Subject subject = message.getSubject(); PnfsId pnfsId = requirements.getPnfsId(); Exception exception = null; - try { - LOGGER.debug("handleQoSModification calling fileQoSRequirementsModified for {}.", - pnfsId); - requirementsListener.fileQoSRequirementsModified(requirements, subject); - LOGGER.debug("handleQoSModification calling fileQoSStatusChanged for {}, {}.", - pnfsId, QOS_MODIFIED); - policyStateExecutor.submit(() -> { - FileQoSUpdate update = new FileQoSUpdate(pnfsId, null, QOS_MODIFIED); - update.setSubject(subject); - try { - fileQoSStatusChanged(update); - } catch (QoSException e) { - String error = String.format("could not complete fileQoSStatusChanged " - + "for %s: %s, cause %s.", update, e.getMessage(), - Throwables.getRootCause(e)); - handleActionCompleted(pnfsId, VOID, error); - } - }); + if (!modifyRequests.contains(pnfsId)) { + LOGGER.debug("handleQoSModification, for {} was cancelled, " + + "skipping.", pnfsId); message.setSucceeded(); - } catch (CacheException e) { - exception = e; - message.setFailed(e.getRc(), e); - } catch (QoSException e) { - exception = e; - message.setFailed(CacheException.UNEXPECTED_SYSTEM_EXCEPTION, e); - } catch (NoRouteToCellException e) { - exception = e; - message.setFailed(CacheException.SERVICE_UNAVAILABLE, e); - } catch (InterruptedException e) { - message.setFailed(CacheException.TIMEOUT, e); - exception = e; + } else { + try { + LOGGER.debug( + "handleQoSModification calling fileQoSRequirementsModified for {}.", + pnfsId); + requirementsListener.fileQoSRequirementsModified(requirements, subject); + LOGGER.debug("handleQoSModification calling fileQoSStatusChanged for {}, {}.", + pnfsId, QOS_MODIFIED); + policyStateExecutor.submit(() -> { + if (!modifyRequests.remove(pnfsId)) { + LOGGER.debug("handleQoSModification, for {} was cancelled, " + + "skipping.", pnfsId); + message.setSucceeded(); + } else { + FileQoSUpdate update = new FileQoSUpdate(pnfsId, null, QOS_MODIFIED); + update.setSubject(subject); + try { + fileQoSStatusChanged(update); + message.setSucceeded(); + } catch (QoSException e) { + String error = String.format( + "could not complete fileQoSStatusChanged " + + "for %s: %s, cause %s.", update, e.getMessage(), + Throwables.getRootCause(e)); + message.setFailed(CacheException.UNEXPECTED_SYSTEM_EXCEPTION, e); + handleActionCompleted(pnfsId, VOID, error); + } + } + }); + } catch (CacheException e) { + exception = e; + message.setFailed(e.getRc(), e); + } catch (QoSException e) { + exception = e; + message.setFailed(CacheException.UNEXPECTED_SYSTEM_EXCEPTION, e); + } catch (NoRouteToCellException e) { + exception = e; + message.setFailed(CacheException.SERVICE_UNAVAILABLE, e); + } catch (InterruptedException e) { + message.setFailed(CacheException.TIMEOUT, e); + exception = e; + } } if (exception != null) { @@ -272,6 +294,7 @@ public MessageReply handleQoSModification( requirements.getPnfsId(), exception.getMessage()); handleActionCompleted(pnfsId, VOID, exception.toString()); } + reply.reply(message); }); return reply; @@ -279,16 +302,19 @@ public MessageReply handleQoSModification( public void handleQoSModificationCancelled(PnfsId pnfsId, Subject subject) { counters.increment(QOS_MODIFIED_CANCELED.name()); - qosModifyExecutor.execute(() -> { - try { - LOGGER.debug( - "handleQoSModificationCancelled notifying verification listener to cancel {}.", - pnfsId); - verificationListener.fileQoSVerificationCancelled(pnfsId, subject); - } catch (QoSException e) { - LOGGER.error("Failed to handle QoS requirements for {}: {}.", pnfsId, e.toString()); - } - }); + if (!modifyRequests.remove(pnfsId)) { + qosModifyExecutor.execute(() -> { + try { + LOGGER.debug( + "handleQoSModificationCancelled notifying verification listener to cancel {}.", + pnfsId); + verificationListener.fileQoSVerificationCancelled(pnfsId, subject); + } catch (QoSException e) { + LOGGER.error("Failed to handle QoS requirements for {}: {}.", pnfsId, + e.toString()); + } + }); + } } public void handleQoSPolicyInfoRequest(FileQoSPolicyInfoMessage message, MessageReply reply) { @@ -458,17 +484,15 @@ private void fileQoSStatusChanged(FileQoSUpdate update) throws QoSException { return; } + FileAttributes attributes = requirements.getAttributes(); switch (messageType) { case ADD_CACHE_LOCATION: + /* + * provide for lazy update inside namespace + */ + updateQosOnNamespace(pnfsId, attributes); case QOS_MODIFIED: - FileAttributes attributes = requirements.getAttributes(); try { - if (messageType == ADD_CACHE_LOCATION) { - /* - * provide for lazy update inside namespace - */ - updateQosOnNamespace(pnfsId, attributes); - } LOGGER.debug( "fileQoSStatusChanged calling updateQoSTransition for {}, {}.", pnfsId, messageType); @@ -481,7 +505,6 @@ private void fileQoSStatusChanged(FileQoSUpdate update) throws QoSException { } break; case CLEAR_CACHE_LOCATION: - attributes = requirements.getAttributes(); if (attributes.isUndefined(FileAttribute.LOCATIONS) || attributes.getLocations().isEmpty()) { // empty location here could mean file deletion engineDao.delete(pnfsId); diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperationManager.java b/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperationManager.java index f8fcc31b515..55be7612c13 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperationManager.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperationManager.java @@ -477,6 +477,7 @@ public void cancel(PnfsId pnfsId, boolean remove) { VerifyOperationFilter filter = new VerifyOperationFilter(); filter.setPnfsIds(pnfsId); cancel(new VerifyOperationCancelFilter(filter, remove)); + } public void cancelFileOpForPool(String pool, boolean onlyParent) { diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/handlers/VerifyOperationHandler.java b/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/handlers/VerifyOperationHandler.java index 09609a5e1d3..9acff38883b 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/handlers/VerifyOperationHandler.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/handlers/VerifyOperationHandler.java @@ -73,6 +73,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; import org.dcache.alarms.AlarmMarkerFactory; @@ -168,6 +169,12 @@ private static void sendOutOfSyncAlarm() { } } + /** + * Tracks individual modify requests. + * This is just a concurrent implementation of a set, which is how it is used here. + */ + private final Set modifyRequests = new ConcurrentSkipListSet<>(); + /** * Tracks scan requests and cancellations. */ @@ -268,7 +275,9 @@ public void handleFileOperationsCancelledForPool(String pool) { */ public void handleFileOperationCancelled(PnfsId pnfsId) { counters.incrementReceived(QoSVerifierCounters.VRF_CNCL_MESSAGE); - updateExecutor.submit(() -> manager.cancel(pnfsId, true)); + if (!modifyRequests.remove(pnfsId)) { + updateExecutor.submit(() -> manager.cancel(pnfsId, true)); + } } /** @@ -299,6 +308,9 @@ public void handleQoSActionCompleted(PnfsId pnfsId, VerifyOperationState opState */ public void handleUpdate(FileQoSUpdate data) { LOGGER.debug("handleUpdate, update to be registered: {}", data); + if (!modifyRequests.remove(data.getPnfsId())) { + LOGGER.debug("handleUpdate, update has been cancelled: {}", data); + } if (!manager.createOrUpdateOperation(data)) { LOGGER.debug("handleUpdate, operation already registered for: {}", data.getPnfsId()); handleVerificationNop(data, false); @@ -441,6 +453,8 @@ public void handleVerification(PnfsId pnfsId) { public void handleVerificationRequest(QoSVerificationRequest request) { counters.incrementReceived(QoSVerifierCounters.VRF_REQ_MESSAGE); LOGGER.debug("handleVerificationRequest for {}.", request.getUpdate()); + PnfsId pnfsId = request.getUpdate().getPnfsId(); + modifyRequests.add(pnfsId); updateExecutor.submit(() -> handleUpdate(request.getUpdate())); }