Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bulk: handle absolute/relative paths in uniform fashion #7721

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ protected BulkActivity(String name, TargetType targetType) {
retryPolicy = DEFAULT_RETRY_POLICY;
}

public void cancel(BulkRequestTarget target) {
public void cancel(String prefix, BulkRequestTarget target) {
target.cancel();
}

Expand Down Expand Up @@ -173,11 +173,12 @@ public void setDescriptors(Set<BulkActivityArgumentDescriptor> descriptors) {
*
* @param rid of the request.
* @param tid of the target.
* @param prefix target prefix
* @param path of the target on which to perform the activity.
* @return future result of the activity.
* @throws BulkServiceException
*/
public abstract ListenableFuture<R> perform(String rid, long tid, FsPath path, FileAttributes attributes)
public abstract ListenableFuture<R> perform(String rid, long tid, String prefix, FsPath path, FileAttributes attributes)
throws BulkServiceException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ public DeleteActivity(String name, TargetType targetType) {
}

@Override
public ListenableFuture<PnfsDeleteEntryMessage> perform(String rid, long tid, FsPath path,
public ListenableFuture<PnfsDeleteEntryMessage> perform(String rid, long tid, String prefix, FsPath path,
FileAttributes attributes) {
PnfsDeleteEntryMessage msg = new PnfsDeleteEntryMessage(path.toString());
FsPath absolutePath = BulkRequestTarget.computeFsPath(prefix, path.toString());
PnfsDeleteEntryMessage msg = new PnfsDeleteEntryMessage(absolutePath.toString());
msg.setSubject(subject);
if (attributes != null && attributes.getFileType() == FileType.DIR && skipDirs) {
msg.setSucceeded();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ protected LogTargetActivity(String name, TargetType targetType) {
}

@Override
public ListenableFuture<BulkRequestTarget> perform(String ruid, long tid, FsPath path,
public ListenableFuture<BulkRequestTarget> perform(String ruid, long tid, String prefix, FsPath path,
FileAttributes attributes) {
long now = System.currentTimeMillis();
BulkRequestTarget t = BulkRequestTargetBuilder.builder(null).activity(this.getName()).id(tid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,26 +91,27 @@ public PinActivity(String name, TargetType targetType) {
super(name, targetType);
}

public void cancel(BulkRequestTarget target) {
super.cancel(target);
public void cancel(String prefix, BulkRequestTarget target) {
super.cancel(prefix, target);
try {
pinManager.send(unpinMessage(id, target));
pinManager.send(unpinMessage(id, prefix, target));
} catch (CacheException e) {
target.setErrorObject(new BulkServiceException("unable to fetch pnfsid of target in "
target.setErrorObject(new BulkServiceException("unable to fetch pnfsid of target in "
+ "order to cancel pinning.", e));
}
}

@Override
public ListenableFuture<Message> perform(String rid, long tid, FsPath target,
public ListenableFuture<Message> perform(String rid, long tid, String prefix, FsPath path,
FileAttributes attributes) {
if (id == null) {
id = rid;
}

try {
if (attributes == null) {
attributes = getAttributes(target);
FsPath absolutePath = BulkRequestTarget.computeFsPath(prefix, path.toString());
attributes = getAttributes(absolutePath);
}

checkPinnable(attributes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING

import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
import static diskCacheV111.util.CacheException.INVALID_ARGS;
import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath;
import static org.dcache.services.bulk.util.BulkRequestTarget.State.SKIPPED;

import com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -130,11 +131,12 @@ protected FileAttributes getAttributes(FsPath path) throws CacheException {
return pnfsHandler.getFileAttributes(path, MINIMALLY_REQUIRED_ATTRIBUTES);
}

protected PinManagerUnpinMessage unpinMessage(String id, BulkRequestTarget target)
protected PinManagerUnpinMessage unpinMessage(String id, String prefix, BulkRequestTarget target)
throws CacheException {
PnfsId pnfsId = target.getPnfsId();
if (pnfsId == null) {
pnfsId = getAttributes(target.getPath()).getPnfsId();
FsPath absolutePath = computeFsPath(prefix, target.getPath().toString());
pnfsId = getAttributes(absolutePath).getPnfsId();
}
return unpinMessage(id, pnfsId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
package org.dcache.services.bulk.activity.plugin.pin;

import static org.dcache.services.bulk.activity.plugin.pin.ReleaseActivityProvider.REQUEST_ID;
import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -77,11 +78,12 @@ public ReleaseActivity(String name, TargetType targetType) {
}

@Override
public ListenableFuture<Message> perform(String rid, long tid, FsPath target,
public ListenableFuture<Message> perform(String rid, long tid, String prefix, FsPath path,
FileAttributes attributes) {
try {
if (attributes == null) {
attributes = getAttributes(target);
FsPath absolutePath = computeFsPath(prefix, path.toString());
attributes = getAttributes(absolutePath);
}
return pinManager.send(unpinMessage(id, attributes.getPnfsId()));
} catch (CacheException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,34 +102,36 @@ public StageActivity(String name, TargetType targetType) {
super(name, targetType);
}

public void cancel(BulkRequestTarget target) {
super.cancel(target);
public void cancel(String prefix, BulkRequestTarget target) {
super.cancel(prefix, target);
try {
pinManager.send(unpinMessage(id, target));
pinManager.send(unpinMessage(id, prefix, target));
} catch (CacheException e) {
target.setErrorObject(new BulkServiceException("unable to fetch pnfsid of target in "
+ "order to cancel staging.", e));
+ "order to cancel staging.", e));
}
}

@Override
public ListenableFuture<Message> perform(String rid, long tid, FsPath target,
public ListenableFuture<Message> perform(String rid, long tid, String prefix, FsPath path,
FileAttributes attributes) {
id = rid;

try {
/*
* refetch the attributes because RP is not stored in the bulk database.
*/
attributes = getAttributes(target);

FsPath absolutePath = BulkRequestTarget.computeFsPath(prefix, path.toString());
attributes = getAttributes(absolutePath);

checkStageable(attributes);

PinManagerPinMessage message
= new PinManagerPinMessage(attributes, getProtocolInfo(),
pnfsHandler.getRestriction(),
id,
getLifetimeInMillis(target));
getLifetimeInMillis(path));
message.setSubject(subject);

Optional<ListenableFuture<Message>> skipOption = skipIfOnline(attributes, message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
package org.dcache.services.bulk.activity.plugin.pin;

import static org.dcache.services.bulk.activity.plugin.pin.UnpinActivityProvider.UNPIN_REQUEST_ID;
import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -78,11 +79,12 @@ public UnpinActivity(String name, TargetType targetType) {
}

@Override
public ListenableFuture<Message> perform(String rid, long tid, FsPath target,
public ListenableFuture<Message> perform(String rid, long tid, String prefix, FsPath path,
FileAttributes attributes) {
try {
if (attributes == null) {
attributes = getAttributes(target);
FsPath absolutePath = computeFsPath(prefix, path.toString());
attributes = getAttributes(absolutePath);
}
return pinManager.send(unpinMessage(id, attributes.getPnfsId()));
} catch (CacheException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,30 +112,45 @@ public UpdateQoSActivity(String name, TargetType targetType) {
}

@Override
public synchronized void cancel(BulkRequestTarget target) {
public synchronized void cancel(String prefix, BulkRequestTarget target) {
RemoteQoSRequirementsClient client = new RemoteQoSRequirementsClient();
client.setRequirementsService(qosEngine);
PnfsId pnfsId = target.getAttributes().getPnfsId();
PnfsId pnfsId = null;
if (target.getAttributes() == null) {
FsPath absolutePath = BulkRequestTarget.computeFsPath(prefix,
target.getPath().toString());
try {
pnfsId = pnfsHandler.getFileAttributes(absolutePath.toString(),
MINIMALLY_REQUIRED_ATTRIBUTES).getPnfsId();
} catch (CacheException e) {
LOGGER.error("fileQoSRequirementsModifiedCancelled failed: failed to fetch attributes for {} {}.",
target.getPath().toString(),
e.getMessage());
}
} else {
pnfsId = target.getAttributes().getPnfsId();
}
try {
client.fileQoSRequirementsModifiedCancelled(pnfsId, subject);
} catch (QoSException e) {
LOGGER.error("fileQoSRequirementsModifiedCancelled failed: {}, {}.", pnfsId,
e.getMessage());
}
responseReceiver.cancel(pnfsId.toString());
super.cancel(target);
super.cancel(prefix, target);
}

@Override
public ListenableFuture<QoSTransitionCompletedMessage> perform(String rid, long tid,
FsPath path, FileAttributes attributes) throws BulkServiceException {
String prefix, FsPath path, FileAttributes attributes) throws BulkServiceException {
if (targetQos == null && qosPolicy == null) {
return Futures.immediateFailedFuture(new IllegalArgumentException("no target qos or policy given."));
}

if (attributes == null) {
try {
attributes = pnfsHandler.getFileAttributes(path, MINIMALLY_REQUIRED_ATTRIBUTES);
FsPath absolutePath = BulkRequestTarget.computeFsPath(prefix, path.toString());
attributes = pnfsHandler.getFileAttributes(absolutePath.toString(), MINIMALLY_REQUIRED_ATTRIBUTES);
} catch (CacheException e) {
throw new BulkServiceException("failed to retrieve file attributes", e);
}
Expand Down Expand Up @@ -232,6 +247,3 @@ public void setNamespaceHandler(PnfsHandler pnfsHandler) {
this.pnfsHandler = pnfsHandler;
}
}



Loading
Loading