From 5da0a8d82c3034bf715e9138e80e4be9e7b7cd2c Mon Sep 17 00:00:00 2001 From: Albert Louis Rossi Date: Tue, 14 Nov 2023 14:11:03 -0600 Subject: [PATCH] dcache-bulk: allow reset to skip terminated targets Motivation: In case of a network outage it may be necessary to re-establish connections between Bulk and other services. Because Bulk makes use of asynchronous waits on `ListenableFuture`s, it is currently not possible to do this except by resubmitting the targets. This can be done using `restart` which reloads all requests and resets all non-terminal targets to `CREATED`. Instead of restarting the service, one could also use the `reset` command selectively. This command, however, will reinitialize the entire request regardless of prior target state. Modification: Add an option to behave like the reload on restart, where terminated targets are left untouched. Result: More efficient recovery without full restart of the cell. Target: master Patch: https://rb.dcache.org/r/14171 Refers-to: RT 10527 Lost network connectivity to DB servers Requires-notes: yes Acked-by: Dmitry --- .../services/bulk/BulkServiceCommands.java | 6 ++++- .../services/bulk/store/BulkRequestStore.java | 3 ++- .../jdbc/request/JdbcBulkRequestStore.java | 23 +++++++++++++------ 3 files changed, 23 insertions(+), 9 deletions(-) diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkServiceCommands.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkServiceCommands.java index 15e05239ba3..7455533884e 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkServiceCommands.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkServiceCommands.java @@ -1099,6 +1099,10 @@ public String call() { "Sets status back to QUEUED, zeros out status counts and removes all targets.") class RequestReset extends FilteredRequest { + @Option(name = "skipTerminated", + usage = "Do not reset the targets which completed.") + boolean skipTerminated = false; + @Override public String call() throws Exception { configureFilters(); @@ -1107,7 +1111,7 @@ public String call() throws Exception { for (String id : uids) { executor.submit(()-> { try { - requestStore.reset(id); + requestStore.reset(id, skipTerminated); } catch (BulkStorageException e) { LOGGER.error("could not reset {}: {}.", id, e.toString()); } diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/BulkRequestStore.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/BulkRequestStore.java index 67dffee0697..cf9454ad104 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/BulkRequestStore.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/BulkRequestStore.java @@ -284,9 +284,10 @@ List next(Optional sortedBy, Optional reverse, lon * Reset the request to QUEUED state. * * @param uid unique id for request. + * @param skipTerminated if true, do not delete terminated targets * @throws BulkStorageException */ - void reset(String uid) throws BulkStorageException; + void reset(String uid, boolean skipTerminated) throws BulkStorageException; /** * Retry all requests that have FAILED targets. diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestStore.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestStore.java index c76921c20c7..3e3d497e204 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestStore.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestStore.java @@ -524,7 +524,7 @@ public List next(Optional sortedBy, Optional rever } @Override - public void reset(String uid) throws BulkStorageException { + public void reset(String uid, boolean skipTerminated) throws BulkStorageException { /** * Start from scratch: * - delete ROOT @@ -538,11 +538,20 @@ public void reset(String uid) throws BulkStorageException { */ LOGGER.trace("reset {}.", uid); requestTargetDao.delete(requestTargetDao.where().pids(PID.ROOT.ordinal()).ruids(uid)); - requestTargetDao.delete(requestTargetDao.where().pids(DISCOVERED.ordinal()).ruids(uid)); - requestTargetDao.update(requestTargetDao.where().pids(INITIAL.ordinal()).ruids(uid), - requestTargetDao.set().state(CREATED).errorType(null).errorMessage(null)); - requestDao.update(requestDao.where().uids(uid), - requestDao.set().status(QUEUED)); + if (skipTerminated) { + requestTargetDao.delete(requestTargetDao.where().pids(DISCOVERED.ordinal()).ruids(uid) + .state(NON_TERMINAL)); + requestTargetDao.update(requestTargetDao.where().pids(INITIAL.ordinal()).ruids(uid) + .state(NON_TERMINAL), + requestTargetDao.set().state(CREATED).errorType(null).errorMessage(null)); + } else { + requestTargetDao.delete(requestTargetDao.where().pids(DISCOVERED.ordinal()).ruids(uid)); + requestTargetDao.update(requestTargetDao.where().pids(INITIAL.ordinal()).ruids(uid), + requestTargetDao.set().state(CREATED).errorType(null).errorMessage(null)); + } + + requestDao.update(requestDao.where().uids(uid), requestDao.set().status(QUEUED)); + try { requestCache.get(uid).ifPresent(r -> { BulkRequestStatusInfo status = r.getStatusInfo(); @@ -561,7 +570,7 @@ public int retryFailed() throws BulkStorageException { AtomicInteger count = new AtomicInteger(0); List uids = requestTargetDao.getRequestsOfFailed(); for (String uid: uids) { - reset(uid); + reset(uid, false); count.incrementAndGet(); } return count.get();