Skip to content

Commit

Permalink
dcache-bulk: only set request status to QUEUED when permissions and t…
Browse files Browse the repository at this point in the history
…argets are all inserted

Motivation:

Sometimes when bulk requests are interrupted
(service goes down) the insert of the initial
request may not have completed.   When reloaded
or retried, a failure having to do with
missing Subject or missing targets may occur.

Modification:

Add a new state, INCOMPLETE.  Do not set the
state of the request to QUEUED until
permissions and targets have been added
to their respective tables.

On reset or reload, query again only for QUEUED.
Leave the incomplete requests as INCOMPLETE,
but add the INCOMPLETE state to the archiver
logic so that they are eventually cleared.

Result:

No runtime execution errors for corrupted
requests.

Target: master
Request: 9.2
Patch: https://rb.dcache.org/r/14113/
Requires-notes: yes
Acked-by: Tigran
  • Loading branch information
alrossi committed Sep 29, 2023
1 parent 8479f44 commit a29351a
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING

import static org.dcache.services.bulk.BulkRequestStatus.CANCELLED;
import static org.dcache.services.bulk.BulkRequestStatus.COMPLETED;
import static org.dcache.services.bulk.BulkRequestStatus.INCOMPLETE;

import dmg.cells.nucleus.CellInfoProvider;
import java.io.PrintWriter;
Expand Down Expand Up @@ -137,7 +138,7 @@ public void run() {
long threshhold = System.currentTimeMillis() - archiverWindowUnit.toMillis(archiverWindow);

List<String> expiredUids = requestDao.getUids(
requestDao.where().modifiedBefore(threshhold).status(COMPLETED, CANCELLED),
requestDao.where().modifiedBefore(threshhold).status(INCOMPLETE, COMPLETED, CANCELLED),
Integer.MAX_VALUE);

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ public JdbcBulkRequestCriterion id(Long id) {
return this;
}

public JdbcBulkRequestCriterion unique(Long id) {
if (id != null) {
addClause("bulk_request.id = ?", id);
}
return this;
}

public JdbcBulkRequestCriterion pnfsId(String pnfsid) {
if (pnfsid != null) {
addClause("request_target.pnfsid = ?", pnfsid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ public JdbcBulkRequestUpdate updateFrom(BulkRequest request, String user)
.clearOnSuccess(request.isClearOnSuccess()).clearOnFailure(request.isClearOnFailure())
.depth(request.getExpandDirectories())
.targetPrefix(request.getTargetPrefix()).urlPrefix(request.getUrlPrefix()).user(user)
.status(BulkRequestStatus.QUEUED).arrivedAt(System.currentTimeMillis());
.status(BulkRequestStatus.INCOMPLETE).arrivedAt(System.currentTimeMillis());
}

public JdbcBulkRequestCriterion where() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,8 @@ public void store(Subject subject, Restriction restriction, BulkRequest request)
try {
/*
* Insertion order: request, permissions, must be maintained.
* Initial insertion status is INCOMPLETE. This is immediately changed to QUEUED
* after insertion is complete.
*/
requestDao.insert(
requestDao.updateFrom(request, BulkRequestStore.uidGidKey(subject)))
Expand All @@ -623,6 +625,8 @@ public void store(Subject subject, Restriction restriction, BulkRequest request)
requestDao.insertArguments(request);

requestTargetDao.insertInitialTargets(request);

requestDao.update(requestDao.where().unique(request.getId()), requestDao.set().status(QUEUED));
} catch (BulkStorageException e) {
throw new BulkStorageException("store failed for " + request.getUid(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
package org.dcache.services.bulk;

public enum BulkRequestStatus {
INCOMPLETE("Request has been created but insertion is not yet complete."),
QUEUED("Request has been submitted to the service and awaits processing."),
STARTED("Request has been set to active and has begun processing."),
COMPLETED("All targets of the request have reached terminal state."),
Expand Down

0 comments on commit a29351a

Please sign in to comment.