diff --git a/src/raythena/actors/esworker.py b/src/raythena/actors/esworker.py index 7bc129f..1785b73 100644 --- a/src/raythena/actors/esworker.py +++ b/src/raythena/actors/esworker.py @@ -83,7 +83,7 @@ class ESWorker(object): } def __init__(self, actor_id: str, config: Config, - session_log_dir: str, job: PandaJob = None, event_ranges: Sequence[EventRange] = None) -> None: + session_log_dir: str, actor_no: int, actor_count: int, job: PandaJob = None, event_ranges: Sequence[EventRange] = None) -> None: """ Initialize attributes, instantiate a payload and setup the workdir @@ -95,6 +95,8 @@ def __init__(self, actor_id: str, config: Config, event_ranges: optional pre-assigned event ranges to process """ self.id = actor_id + self.actor_no = actor_no + self.actor_count = actor_count self.config = config self._logger = make_logger(self.config, self.id) self.session_log_dir = session_log_dir @@ -191,6 +193,12 @@ def modify_job(self, job: PandaJob) -> PandaJob: if "--multiprocess" not in cmd: cmd = f"--multiprocess=True {cmd}" + job_number = int(job["attemptNr"]) * self.actor_count + self.actor_no + if "--jobNumber=" in cmd: + cmd = re.sub(r"--jobNumber=[0-9]+", f"--jobNumber={job_number}", cmd) + else: + cmd = f"{cmd} --jobNumber={job_number} " + job["jobPars"] = cmd return job diff --git a/src/raythena/drivers/esdriver.py b/src/raythena/drivers/esdriver.py index 868ab62..c105744 100644 --- a/src/raythena/drivers/esdriver.py +++ b/src/raythena/drivers/esdriver.py @@ -196,7 +196,9 @@ def create_actors(self) -> None: kwargs = { 'actor_id': actor_id, 'config': self.config_remote, - 'session_log_dir': self.session_log_dir + 'session_log_dir': self.session_log_dir, + 'actor_no': i, + 'actor_count': self.n_actors, } job = self.bookKeeper.assign_job_to_actor(actor_id) if job: