diff --git a/pilot/api/data.py b/pilot/api/data.py index 7f734a66..aedaca24 100644 --- a/pilot/api/data.py +++ b/pilot/api/data.py @@ -196,19 +196,22 @@ def prepare_sources(self, files: list, activities: Any = None) -> None: """ return None - def prepare_inputddms(self, files: list): + def prepare_inputddms(self, files: list, activities: list = None): """ Prepare input DDMs. Populates filespec.inputddms for each entry from `files` list. :param files: list of `FileSpec` objects + :param activities: ordered list of activities to resolve `astorages` (optional) """ astorages = self.infosys.queuedata.astorages if self.infosys and self.infosys.queuedata else {} - storages = astorages.get('read_lan', []) + activities = activities or ['read_lan'] + + storages = next((astorages.get(a) for a in activities if astorages.get(a)), None) or [] - #activity = activities[0] #if not storages: ## ignore empty astorages + # activity = activities[0] # raise PilotException("Failed to resolve input sources: no associated storages defined for activity=%s (%s)" # % (activity, ','.join(activities)), code=ErrorCodes.NOSTORAGE, state='NO_ASTORAGES_DEFINED') diff --git a/pilot/control/data.py b/pilot/control/data.py index 641cc31b..01c30c10 100644 --- a/pilot/control/data.py +++ b/pilot/control/data.py @@ -219,14 +219,14 @@ def create_trace_report(job: JobData, label: str = 'stage-in') -> Any: return trace_report -def get_stagein_client(job: JobData, args: object, label: str = 'stage-in') -> (Any, str): +def get_stagein_client(job: JobData, args: object, label: str = 'stage-in') -> (Any, str, list): """ Return the proper stage-in client. :param job: job object (JobData) :param args: pilot args object (object) :param label: 'stage-in' (str) - :return: stage-in client (StageInClient). + :return: stage-in client (StageInClient), copytool_activities, storage_activities """ # create the trace report trace_report = create_trace_report(job, label=label) @@ -234,12 +234,17 @@ def get_stagein_client(job: JobData, args: object, label: str = 'stage-in') -> ( if job.is_eventservicemerge: client = StageInESClient(job.infosys, logger=logger, trace_report=trace_report) activity = 'es_events_read' + storage_activities = ['read_lan'] else: client = StageInClient(job.infosys, logger=logger, trace_report=trace_report, ipv=args.internet_protocol_version, workdir=job.workdir) - activity = 'pr' + activity = 'read_lan' - return client, activity + is_unified = job.infosys.queuedata.type == 'unified' if job.infosys else None + is_analysis = job.is_analysis() + storage_activities = ['read_lan_analysis', 'read_lan'] if is_unified and is_analysis else ['read_lan'] + + return client, activity, storage_activities def _stage_in(args: object, job: JobData) -> bool: @@ -284,8 +289,8 @@ def _stage_in(args: object, job: JobData) -> bool: try: logger.info('stage-in will not be done in a container') - client, activity = get_stagein_client(job, args, label) - logger.info(f'activity={activity}') + client, activity, storage_activities = get_stagein_client(job, args, label) + logger.info(f'activity={activity}, storage_activities={storage_activities}') use_pcache = job.infosys.queuedata.use_pcache logger.debug(f'use_pcache={use_pcache}') # get the proper input file destination (normally job.workdir unless stager workflow) @@ -305,6 +310,7 @@ def _stage_in(args: object, job: JobData) -> bool: 'args': args} logger.debug(f'kwargs={kwargs}') client.prepare_sources(job.indata) + client.prepare_inputddms(job.indata, storage_activities) logger.info('prepared sources - will now transfer files') client.transfer(job.indata, activity=activity, **kwargs) except PilotException as error: