diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java index 226b511ec57..fa88a53a16b 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java @@ -93,7 +93,8 @@ public enum TargetType { public static final Set MINIMALLY_REQUIRED_ATTRIBUTES = Collections.unmodifiableSet(EnumSet.of(FileAttribute.PNFSID, FileAttribute.TYPE, - FileAttribute.OWNER_GROUP, FileAttribute.OWNER, FileAttribute.RETENTION_POLICY)); + FileAttribute.OWNER_GROUP, FileAttribute.OWNER, FileAttribute.ACCESS_LATENCY, + FileAttribute.RETENTION_POLICY)); private static final BulkTargetRetryPolicy DEFAULT_RETRY_POLICY = new NoRetryPolicy(); diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/pin/PinActivity.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/pin/PinActivity.java index d602bcba35d..b548d2ff553 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/pin/PinActivity.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/pin/PinActivity.java @@ -74,6 +74,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.net.URI; import java.net.URISyntaxException; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import org.dcache.pinmanager.PinManagerPinMessage; import org.dcache.services.bulk.BulkServiceException; @@ -118,6 +119,12 @@ public ListenableFuture perform(String rid, long tid, FsPath target, = new PinManagerPinMessage(attributes, getProtocolInfo(), id, lifetimeInMillis); message.setSubject(subject); + + Optional> skipOption = skipIfOnline(attributes, message); + if (skipOption.isPresent()) { + return skipOption.get(); + } + return pinManager.send(message, Long.MAX_VALUE); } catch (URISyntaxException | CacheException e) { return Futures.immediateFailedFuture(e); diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/pin/PinManagerActivity.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/pin/PinManagerActivity.java index 6a80e0607ed..e9a17a0b845 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/pin/PinManagerActivity.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/pin/PinManagerActivity.java @@ -61,18 +61,23 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; import static diskCacheV111.util.CacheException.INVALID_ARGS; +import static org.dcache.services.bulk.util.BulkRequestTarget.State.SKIPPED; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import diskCacheV111.util.AccessLatency; import diskCacheV111.util.CacheException; import diskCacheV111.util.FsPath; import diskCacheV111.util.NamespaceHandlerAware; import diskCacheV111.util.PnfsHandler; import diskCacheV111.util.PnfsId; import diskCacheV111.vehicles.Message; +import java.util.Optional; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import org.dcache.cells.CellStub; import org.dcache.pinmanager.PinManagerAware; +import org.dcache.pinmanager.PinManagerPinMessage; import org.dcache.pinmanager.PinManagerUnpinMessage; import org.dcache.services.bulk.activity.BulkActivity; import org.dcache.services.bulk.util.BulkRequestTarget; @@ -105,6 +110,9 @@ protected void handleCompletion(BulkRequestTarget target, ListenableFuture> skipIfOnline(FileAttributes attributes, + PinManagerPinMessage message) { + ListenableFuture future = null; + if (attributes.getAccessLatency() == AccessLatency.ONLINE) { + message.setReply(); + message.setLifetime(-1L); + future = Futures.immediateFuture(message); + } + + return Optional.ofNullable(future); + } } diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/pin/StageActivity.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/pin/StageActivity.java index 9afa3387351..5802d66e3ae 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/pin/StageActivity.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/pin/StageActivity.java @@ -78,6 +78,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import org.dcache.pinmanager.PinManagerPinMessage; import org.dcache.services.bulk.BulkServiceException; @@ -128,6 +129,12 @@ public ListenableFuture perform(String rid, long tid, FsPath target, = new PinManagerPinMessage(attributes, getProtocolInfo(), id, getLifetimeInMillis(target)); message.setSubject(subject); + + Optional> skipOption = skipIfOnline(attributes, message); + if (skipOption.isPresent()) { + return skipOption.get(); + } + return pinManager.send(message, Long.MAX_VALUE); } catch (URISyntaxException | CacheException e) { return Futures.immediateFailedFuture(e);