Skip to content

Commit

Permalink
rely on PQ.astorages.read_lan_analysis (failback to read_lan) as loca…
Browse files Browse the repository at this point in the history
…l inputddms for analy jobs at unified PQ
  • Loading branch information
anisyonk committed Dec 25, 2024
1 parent 9e76a9f commit d2ce1c7
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 9 deletions.
9 changes: 6 additions & 3 deletions pilot/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,19 +196,22 @@ def prepare_sources(self, files: list, activities: Any = None) -> None:
"""
return None

def prepare_inputddms(self, files: list):
def prepare_inputddms(self, files: list, activities: list = None):
"""
Prepare input DDMs.
Populates filespec.inputddms for each entry from `files` list.
:param files: list of `FileSpec` objects
:param activities: ordered list of activities to resolve `astorages` (optional)
"""
astorages = self.infosys.queuedata.astorages if self.infosys and self.infosys.queuedata else {}
storages = astorages.get('read_lan', [])
activities = activities or ['read_lan']

storages = next((astorages.get(a) for a in activities if astorages.get(a)), None) or []

#activity = activities[0]
#if not storages: ## ignore empty astorages
# activity = activities[0]
# raise PilotException("Failed to resolve input sources: no associated storages defined for activity=%s (%s)"
# % (activity, ','.join(activities)), code=ErrorCodes.NOSTORAGE, state='NO_ASTORAGES_DEFINED')

Expand Down
18 changes: 12 additions & 6 deletions pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,27 +219,32 @@ def create_trace_report(job: JobData, label: str = 'stage-in') -> Any:
return trace_report


def get_stagein_client(job: JobData, args: object, label: str = 'stage-in') -> (Any, str):
def get_stagein_client(job: JobData, args: object, label: str = 'stage-in') -> (Any, str, list):
"""
Return the proper stage-in client.
:param job: job object (JobData)
:param args: pilot args object (object)
:param label: 'stage-in' (str)
:return: stage-in client (StageInClient).
:return: stage-in client (StageInClient), copytool_activities, storage_activities
"""
# create the trace report
trace_report = create_trace_report(job, label=label)

if job.is_eventservicemerge:
client = StageInESClient(job.infosys, logger=logger, trace_report=trace_report)
activity = 'es_events_read'
storage_activities = ['read_lan']
else:
client = StageInClient(job.infosys, logger=logger, trace_report=trace_report,
ipv=args.internet_protocol_version, workdir=job.workdir)
activity = 'pr'
activity = 'read_lan'

return client, activity
is_unified = job.infosys.queuedata.type == 'unified' if job.infosys else None
is_analysis = job.is_analysis()
storage_activities = ['read_lan_analysis', 'read_lan'] if is_unified and is_analysis else ['read_lan']

return client, activity, storage_activities


def _stage_in(args: object, job: JobData) -> bool:
Expand Down Expand Up @@ -284,8 +289,8 @@ def _stage_in(args: object, job: JobData) -> bool:
try:
logger.info('stage-in will not be done in a container')

client, activity = get_stagein_client(job, args, label)
logger.info(f'activity={activity}')
client, activity, storage_activities = get_stagein_client(job, args, label)
logger.info(f'activity={activity}, storage_activities={storage_activities}')
use_pcache = job.infosys.queuedata.use_pcache
logger.debug(f'use_pcache={use_pcache}')
# get the proper input file destination (normally job.workdir unless stager workflow)
Expand All @@ -305,6 +310,7 @@ def _stage_in(args: object, job: JobData) -> bool:
'args': args}
logger.debug(f'kwargs={kwargs}')
client.prepare_sources(job.indata)
client.prepare_inputddms(job.indata, storage_activities)
logger.info('prepared sources - will now transfer files')
client.transfer(job.indata, activity=activity, **kwargs)
except PilotException as error:
Expand Down

0 comments on commit d2ce1c7

Please sign in to comment.