Skip to content

Commit

Permalink
MigrationModule: new option --target-deficit-mode
Browse files Browse the repository at this point in the history
* Control behavior of migrations when the number of replicas requested
  exceeds the number of pools available to satisfy that request.

* Permitted values are "wait" (wait until pools become available), or
  "limit" (job will end when all available pools are hosting a replica
  if the request cannot be met).
  • Loading branch information
greenc-FNAL committed Nov 12, 2024
1 parent 404dc6d commit 4a164aa
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ private synchronized void createTask(PoolManagerPoolInformation targetInfo, Stri
false, // compute checksum on update; should not happen
false, // force copy even if pool is not readable
true, // maintain atime
1); // only one copy per task
1, // only one copy per task
true); // wait for new targets if necessary

createTask(taskParameters, source);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,8 @@ public Task handleMakeOneCopy(FileAttributes attributes) {
false, // compute checksum on update; should not happen
false, // force copy even if pool not readable
true, // maintain atime
1);
1, // only one copy per task
true); // wait for new targets if necessary

Task task = new Task(taskParameters, completionHandler, source, pnfsId,
ReplicaState.CACHED, ONLINE_STICKY_RECORD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public Job(MigrationContext context, JobDefinition definition) {
context.getExecutor(), definition.selectionStrategy,
definition.poolList, definition.isEager, definition.isMetaOnly,
definition.computeChecksumOnUpdate, definition.forceSourceMode,
definition.maintainAtime, definition.replicas);
definition.maintainAtime, definition.replicas, definition.waitForTargets);

_pinPrefix = context.getPinManagerStub().getDestinationPath().getDestinationAddress()
.getCellName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ public class JobDefinition {
*/
public final int replicas;

/**
* Whether to wait for targets to become available to satisfy number of replicas.
*/
public final boolean waitForTargets;


public JobDefinition(Predicate<CacheEntry> filter,
CacheEntryMode sourceMode,
CacheEntryMode targetMode,
Expand All @@ -120,7 +126,8 @@ public JobDefinition(Predicate<CacheEntry> filter,
boolean maintainAtime,
Expression pauseWhen,
Expression stopWhen,
boolean forceSourceMode) {
boolean forceSourceMode,
boolean waitForTargets) {
this.filter = filter;
this.sourceMode = sourceMode;
this.targetMode = targetMode;
Expand All @@ -139,5 +146,6 @@ public JobDefinition(Predicate<CacheEntry> filter,
this.pauseWhen = pauseWhen;
this.stopWhen = stopWhen;
this.forceSourceMode = forceSourceMode;
this.waitForTargets = waitForTargets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,16 @@ public class MigrationCopyCommand implements Callable<String> {
usage = "Enables the transfer of files from a disabled pool.")
boolean forceSourceMode;

@Option(name = "target-deficit-mode", metaVar = "tdmode",
values = { "wait", "limit" },
category = "Transfer options",
usage = "Behaviour when requested replicas exceeds available targets:\n" +
"wait:\n" +
" wait for new targets to become available.\n" +
"limit:\n" +
" limit the number of replicas to that of the currently-available targets.\n")
String targetDeficitMode = "wait";

@Argument(metaVar = "target",
required = false,
usage = "Required unless -target=pgroup is supplied, in which case we" +
Expand Down Expand Up @@ -838,7 +848,8 @@ public String call() throws IllegalArgumentException {
maintainAtime,
createLifetimePredicate(pauseWhen),
createLifetimePredicate(stopWhen),
forceSourceMode);
forceSourceMode,
targetDeficitMode.equals("wait"));

if (definition.targetMode.state == CacheEntryMode.State.DELETE
|| definition.targetMode.state == CacheEntryMode.State.REMOVABLE) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.dcache.pool.migration;

import static com.google.common.base.Preconditions.checkState;
import static java.lang.Long.min;
import static java.util.stream.Collectors.toCollection;
import static java.util.stream.Collectors.toList;

Expand Down Expand Up @@ -238,7 +239,8 @@ synchronized boolean hasMoreLocations() {
* job.
*/
synchronized boolean needsMoreReplicas() {
return _replicas.size() < _parameters.replicas;
return _replicas.size() < (_parameters.waitForTargets ? _parameters.replicas
: min(_parameters.replicas, _parameters.poolList.getPools().stream().count()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,16 @@ public class TaskParameters {
*/
public final int replicas;

/**
* Whether to wait for targets to become available to satisfy number of replicas.
*/
public final boolean waitForTargets;

public TaskParameters(CellStub pool, CellStub pnfs, CellStub pinManager,
ScheduledExecutorService executor,
PoolSelectionStrategy selectionStrategy, RefreshablePoolList poolList, boolean isEager,
boolean isMetaOnly, boolean computeChecksumOnUpdate, boolean forceSourceMode,
boolean maintainAtime, int replicas) {
boolean maintainAtime, int replicas, boolean waitForTargets) {
this.pool = pool;
this.pnfs = pnfs;
this.pinManager = pinManager;
Expand All @@ -105,5 +110,6 @@ public TaskParameters(CellStub pool, CellStub pnfs, CellStub pinManager,
this.forceSourceMode = forceSourceMode;
this.maintainAtime = maintainAtime;
this.replicas = replicas;
this.waitForTargets = waitForTargets;
}
}

0 comments on commit 4a164aa

Please sign in to comment.