Skip to content

Commit

Permalink
bulk: handle absolute/relative paths in uniform fashion
Browse files Browse the repository at this point in the history
Motivation:
-----------
Recent change(s) that massaged user input target paths and
stored absolute paths on bulk backend lead to ambiguity between
user provided and dcache resolved paths and also resulted in inability
to use full paths (i.e. only relative paths are supported). At
Fermilab we need to use both - relative and absolute paths

Modification:
-------------
Revert all recent changes that appended prefix to user
supplied paths, stored the result and then stripped the
prefix so that only "original" paths are exposed to the user.
Instead, like before, store user supplied paths but carry
over request prefix which is computed from user root and
door root. When calling PnfsManager using paths the full
paths of the targets are reassembled using the prefix

Result:
------
Restored ability to use absolute paths when using REST API.

Issue: #7693
Patch: https://rb.dcache.org/r/14355/
Target: trunk
Request: 10.2, 10.1, 10.0, 9.2
Require-book: no
Require-notes: yes
Acked-by: Lea Morschel, Tigran Mkrtchyan
  • Loading branch information
DmitryLitvintsev authored and khys95 committed Dec 12, 2024
1 parent d177399 commit 6794baa
Show file tree
Hide file tree
Showing 17 changed files with 129 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ protected BulkActivity(String name, TargetType targetType) {
retryPolicy = DEFAULT_RETRY_POLICY;
}

public void cancel(BulkRequestTarget target) {
public void cancel(String prefix, BulkRequestTarget target) {
target.cancel();
}

Expand Down Expand Up @@ -173,11 +173,12 @@ public void setDescriptors(Set<BulkActivityArgumentDescriptor> descriptors) {
*
* @param rid of the request.
* @param tid of the target.
* @param prefix target prefix
* @param path of the target on which to perform the activity.
* @return future result of the activity.
* @throws BulkServiceException
*/
public abstract ListenableFuture<R> perform(String rid, long tid, FsPath path, FileAttributes attributes)
public abstract ListenableFuture<R> perform(String rid, long tid, String prefix, FsPath path, FileAttributes attributes)
throws BulkServiceException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ public DeleteActivity(String name, TargetType targetType) {
}

@Override
public ListenableFuture<PnfsDeleteEntryMessage> perform(String rid, long tid, FsPath path,
public ListenableFuture<PnfsDeleteEntryMessage> perform(String rid, long tid, String prefix, FsPath path,
FileAttributes attributes) {
PnfsDeleteEntryMessage msg = new PnfsDeleteEntryMessage(path.toString());
FsPath absolutePath = BulkRequestTarget.computeFsPath(prefix, path.toString());
PnfsDeleteEntryMessage msg = new PnfsDeleteEntryMessage(absolutePath.toString());
msg.setSubject(subject);
if (attributes != null && attributes.getFileType() == FileType.DIR && skipDirs) {
msg.setSucceeded();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ protected LogTargetActivity(String name, TargetType targetType) {
}

@Override
public ListenableFuture<BulkRequestTarget> perform(String ruid, long tid, FsPath path,
public ListenableFuture<BulkRequestTarget> perform(String ruid, long tid, String prefix, FsPath path,
FileAttributes attributes) {
long now = System.currentTimeMillis();
BulkRequestTarget t = BulkRequestTargetBuilder.builder(null).activity(this.getName()).id(tid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,26 +91,27 @@ public PinActivity(String name, TargetType targetType) {
super(name, targetType);
}

public void cancel(BulkRequestTarget target) {
super.cancel(target);
public void cancel(String prefix, BulkRequestTarget target) {
super.cancel(prefix, target);
try {
pinManager.send(unpinMessage(id, target));
pinManager.send(unpinMessage(id, prefix, target));
} catch (CacheException e) {
target.setErrorObject(new BulkServiceException("unable to fetch pnfsid of target in "
target.setErrorObject(new BulkServiceException("unable to fetch pnfsid of target in "
+ "order to cancel pinning.", e));
}
}

@Override
public ListenableFuture<Message> perform(String rid, long tid, FsPath target,
public ListenableFuture<Message> perform(String rid, long tid, String prefix, FsPath path,
FileAttributes attributes) {
if (id == null) {
id = rid;
}

try {
if (attributes == null) {
attributes = getAttributes(target);
FsPath absolutePath = BulkRequestTarget.computeFsPath(prefix, path.toString());
attributes = getAttributes(absolutePath);
}

checkPinnable(attributes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING

import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
import static diskCacheV111.util.CacheException.INVALID_ARGS;
import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath;
import static org.dcache.services.bulk.util.BulkRequestTarget.State.SKIPPED;

import com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -130,11 +131,12 @@ protected FileAttributes getAttributes(FsPath path) throws CacheException {
return pnfsHandler.getFileAttributes(path, MINIMALLY_REQUIRED_ATTRIBUTES);
}

protected PinManagerUnpinMessage unpinMessage(String id, BulkRequestTarget target)
protected PinManagerUnpinMessage unpinMessage(String id, String prefix, BulkRequestTarget target)
throws CacheException {
PnfsId pnfsId = target.getPnfsId();
if (pnfsId == null) {
pnfsId = getAttributes(target.getPath()).getPnfsId();
FsPath absolutePath = computeFsPath(prefix, target.getPath().toString());
pnfsId = getAttributes(absolutePath).getPnfsId();
}
return unpinMessage(id, pnfsId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
package org.dcache.services.bulk.activity.plugin.pin;

import static org.dcache.services.bulk.activity.plugin.pin.ReleaseActivityProvider.REQUEST_ID;
import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -77,11 +78,12 @@ public ReleaseActivity(String name, TargetType targetType) {
}

@Override
public ListenableFuture<Message> perform(String rid, long tid, FsPath target,
public ListenableFuture<Message> perform(String rid, long tid, String prefix, FsPath path,
FileAttributes attributes) {
try {
if (attributes == null) {
attributes = getAttributes(target);
FsPath absolutePath = computeFsPath(prefix, path.toString());
attributes = getAttributes(absolutePath);
}
return pinManager.send(unpinMessage(id, attributes.getPnfsId()));
} catch (CacheException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,32 +102,34 @@ public StageActivity(String name, TargetType targetType) {
super(name, targetType);
}

public void cancel(BulkRequestTarget target) {
super.cancel(target);
public void cancel(String prefix, BulkRequestTarget target) {
super.cancel(prefix, target);
try {
pinManager.send(unpinMessage(id, target));
pinManager.send(unpinMessage(id, prefix, target));
} catch (CacheException e) {
target.setErrorObject(new BulkServiceException("unable to fetch pnfsid of target in "
+ "order to cancel staging.", e));
+ "order to cancel staging.", e));
}
}

@Override
public ListenableFuture<Message> perform(String rid, long tid, FsPath target,
public ListenableFuture<Message> perform(String rid, long tid, String prefix, FsPath path,
FileAttributes attributes) {
id = rid;

try {
/*
* refetch the attributes because RP is not stored in the bulk database.
*/
attributes = getAttributes(target);

FsPath absolutePath = BulkRequestTarget.computeFsPath(prefix, path.toString());
attributes = getAttributes(absolutePath);

checkStageable(attributes);

PinManagerPinMessage message
= new PinManagerPinMessage(attributes, getProtocolInfo(), id,
getLifetimeInMillis(target));
getLifetimeInMillis(path));
message.setSubject(subject);

Optional<ListenableFuture<Message>> skipOption = skipIfOnline(attributes, message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
package org.dcache.services.bulk.activity.plugin.pin;

import static org.dcache.services.bulk.activity.plugin.pin.UnpinActivityProvider.UNPIN_REQUEST_ID;
import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -78,11 +79,12 @@ public UnpinActivity(String name, TargetType targetType) {
}

@Override
public ListenableFuture<Message> perform(String rid, long tid, FsPath target,
public ListenableFuture<Message> perform(String rid, long tid, String prefix, FsPath path,
FileAttributes attributes) {
try {
if (attributes == null) {
attributes = getAttributes(target);
FsPath absolutePath = computeFsPath(prefix, path.toString());
attributes = getAttributes(absolutePath);
}
return pinManager.send(unpinMessage(id, attributes.getPnfsId()));
} catch (CacheException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,30 +112,45 @@ public UpdateQoSActivity(String name, TargetType targetType) {
}

@Override
public synchronized void cancel(BulkRequestTarget target) {
public synchronized void cancel(String prefix, BulkRequestTarget target) {
RemoteQoSRequirementsClient client = new RemoteQoSRequirementsClient();
client.setRequirementsService(qosEngine);
PnfsId pnfsId = target.getAttributes().getPnfsId();
PnfsId pnfsId = null;
if (target.getAttributes() == null) {
FsPath absolutePath = BulkRequestTarget.computeFsPath(prefix,
target.getPath().toString());
try {
pnfsId = pnfsHandler.getFileAttributes(absolutePath.toString(),
MINIMALLY_REQUIRED_ATTRIBUTES).getPnfsId();
} catch (CacheException e) {
LOGGER.error("fileQoSRequirementsModifiedCancelled failed: failed to fetch attributes for {} {}.",
target.getPath().toString(),
e.getMessage());
}
} else {
pnfsId = target.getAttributes().getPnfsId();
}
try {
client.fileQoSRequirementsModifiedCancelled(pnfsId, subject);
} catch (QoSException e) {
LOGGER.error("fileQoSRequirementsModifiedCancelled failed: {}, {}.", pnfsId,
e.getMessage());
}
responseReceiver.cancel(pnfsId.toString());
super.cancel(target);
super.cancel(prefix, target);
}

@Override
public ListenableFuture<QoSTransitionCompletedMessage> perform(String rid, long tid,
FsPath path, FileAttributes attributes) throws BulkServiceException {
String prefix, FsPath path, FileAttributes attributes) throws BulkServiceException {
if (targetQos == null && qosPolicy == null) {
return Futures.immediateFailedFuture(new IllegalArgumentException("no target qos or policy given."));
}

if (attributes == null) {
try {
attributes = pnfsHandler.getFileAttributes(path, MINIMALLY_REQUIRED_ATTRIBUTES);
FsPath absolutePath = BulkRequestTarget.computeFsPath(prefix, path.toString());
attributes = pnfsHandler.getFileAttributes(absolutePath.toString(), MINIMALLY_REQUIRED_ATTRIBUTES);
} catch (CacheException e) {
throw new BulkServiceException("failed to retrieve file attributes", e);
}
Expand Down Expand Up @@ -232,6 +247,3 @@ public void setNamespaceHandler(PnfsHandler pnfsHandler) {
this.pnfsHandler = pnfsHandler;
}
}



Loading

0 comments on commit 6794baa

Please sign in to comment.