Skip to content

Commit

Permalink
dcache-bulk: allow reset to skip terminated targets
Browse files Browse the repository at this point in the history
Motivation:

In case of a network outage it may be necessary
to re-establish connections between Bulk and
other services.  Because Bulk makes use of
asynchronous waits on `ListenableFuture`s,
it is currently not possible to do this
except by resubmitting the targets.

This can be done using `restart` which
reloads all requests and resets all
non-terminal targets to `CREATED`.

Instead of restarting the service,
one could also use the `reset` command
selectively.  This command, however,
will reinitialize the entire request
regardless of prior target state.

Modification:

Add an option to behave like
the reload on restart, where
terminated targets are left
untouched.

Result:

More efficient recovery without
full restart of the cell.

Target:  master
Patch: https://rb.dcache.org/r/14171
Refers-to: RT 10527 Lost network connectivity to DB servers
Requires-notes: yes
Acked-by: Dmitry
  • Loading branch information
alrossi committed Nov 14, 2023
1 parent 2c55595 commit 5da0a8d
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,10 @@ public String call() {
"Sets status back to QUEUED, zeros out status counts and removes all targets.")
class RequestReset extends FilteredRequest {

@Option(name = "skipTerminated",
usage = "Do not reset the targets which completed.")
boolean skipTerminated = false;

@Override
public String call() throws Exception {
configureFilters();
Expand All @@ -1107,7 +1111,7 @@ public String call() throws Exception {
for (String id : uids) {
executor.submit(()-> {
try {
requestStore.reset(id);
requestStore.reset(id, skipTerminated);
} catch (BulkStorageException e) {
LOGGER.error("could not reset {}: {}.", id, e.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,10 @@ List<BulkRequest> next(Optional<String> sortedBy, Optional<Boolean> reverse, lon
* Reset the request to QUEUED state.
*
* @param uid unique id for request.
* @param skipTerminated if true, do not delete terminated targets
* @throws BulkStorageException
*/
void reset(String uid) throws BulkStorageException;
void reset(String uid, boolean skipTerminated) throws BulkStorageException;

/**
* Retry all requests that have FAILED targets.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ public List<BulkRequest> next(Optional<String> sortedBy, Optional<Boolean> rever
}

@Override
public void reset(String uid) throws BulkStorageException {
public void reset(String uid, boolean skipTerminated) throws BulkStorageException {
/**
* Start from scratch:
* - delete ROOT
Expand All @@ -538,11 +538,20 @@ public void reset(String uid) throws BulkStorageException {
*/
LOGGER.trace("reset {}.", uid);
requestTargetDao.delete(requestTargetDao.where().pids(PID.ROOT.ordinal()).ruids(uid));
requestTargetDao.delete(requestTargetDao.where().pids(DISCOVERED.ordinal()).ruids(uid));
requestTargetDao.update(requestTargetDao.where().pids(INITIAL.ordinal()).ruids(uid),
requestTargetDao.set().state(CREATED).errorType(null).errorMessage(null));
requestDao.update(requestDao.where().uids(uid),
requestDao.set().status(QUEUED));
if (skipTerminated) {
requestTargetDao.delete(requestTargetDao.where().pids(DISCOVERED.ordinal()).ruids(uid)
.state(NON_TERMINAL));
requestTargetDao.update(requestTargetDao.where().pids(INITIAL.ordinal()).ruids(uid)
.state(NON_TERMINAL),
requestTargetDao.set().state(CREATED).errorType(null).errorMessage(null));
} else {
requestTargetDao.delete(requestTargetDao.where().pids(DISCOVERED.ordinal()).ruids(uid));
requestTargetDao.update(requestTargetDao.where().pids(INITIAL.ordinal()).ruids(uid),
requestTargetDao.set().state(CREATED).errorType(null).errorMessage(null));
}

requestDao.update(requestDao.where().uids(uid), requestDao.set().status(QUEUED));

try {
requestCache.get(uid).ifPresent(r -> {
BulkRequestStatusInfo status = r.getStatusInfo();
Expand All @@ -561,7 +570,7 @@ public int retryFailed() throws BulkStorageException {
AtomicInteger count = new AtomicInteger(0);
List<String> uids = requestTargetDao.getRequestsOfFailed();
for (String uid: uids) {
reset(uid);
reset(uid, false);
count.incrementAndGet();
}
return count.get();
Expand Down

0 comments on commit 5da0a8d

Please sign in to comment.