Skip to content

Commit

Permalink
adapt integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
MehmedGIT committed May 6, 2024
1 parent 6016360 commit 71d7d81
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 15 deletions.
10 changes: 5 additions & 5 deletions src/broker/operandi_broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,23 @@ def __init__(
def run_broker(self):
# A list of queues for which a worker process should be created
queues = [RABBITMQ_QUEUE_HARVESTER, RABBITMQ_QUEUE_USERS]
tunel_port = self.worker_starting_port
tunnel_port = self.worker_starting_port
try:
for queue_name in queues:
self.log.info(f"Creating a worker processes to consume from queue: {queue_name}")
self.create_worker_process(
queue_name=queue_name, status_checker=False,
tunnel_port_executor=tunel_port, tunnel_port_transfer=tunel_port+1
tunnel_port_executor=tunnel_port, tunnel_port_transfer=tunnel_port+1
)
tunel_port += 2
tunnel_port += 2

self.log.info(
f"Creating a status checker worker processes to consume from queue: {RABBITMQ_QUEUE_JOB_STATUSES}")
self.create_worker_process(
queue_name=RABBITMQ_QUEUE_JOB_STATUSES, status_checker=True,
tunnel_port_executor=tunel_port, tunnel_port_transfer=tunel_port+1
tunnel_port_executor=tunnel_port, tunnel_port_transfer=tunnel_port+1
)
tunel_port += 2
tunnel_port += 2
except Exception as error:
self.log.error(f"Error while creating worker processes: {error}")

Expand Down
8 changes: 4 additions & 4 deletions src/broker/operandi_broker/job_status_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ def __init__(self, db_url, rabbitmq_url, queue_name, tunnel_port_executor, tunne
self.current_message_job_id = None
self.has_consumed_message = False

self.tunel_port_executor = tunnel_port_executor
self.tunel_port_transfer = tunnel_port_transfer
self.tunnel_port_executor = tunnel_port_executor
self.tunnel_port_transfer = tunnel_port_transfer

def run(self):
try:
Expand All @@ -55,9 +55,9 @@ def run(self):
signal.signal(signal.SIGTERM, self.signal_handler)

sync_db_initiate_database(self.db_url)
self.hpc_executor = HPCExecutor(tunel_host='localhost', tunel_port=self.tunel_port_executor)
self.hpc_executor = HPCExecutor(tunel_host='localhost', tunel_port=self.tunnel_port_executor)
self.log.info("HPC executor connection successful.")
self.hpc_io_transfer = HPCTransfer(tunel_host='localhost', tunel_port=self.tunel_port_transfer)
self.hpc_io_transfer = HPCTransfer(tunel_host='localhost', tunel_port=self.tunnel_port_transfer)
self.log.info("HPC transfer connection successful.")

self.rmq_consumer = get_connection_consumer(rabbitmq_url=self.rmq_url)
Expand Down
8 changes: 4 additions & 4 deletions src/broker/operandi_broker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ def __init__(self, db_url, rabbitmq_url, queue_name, tunnel_port_executor, tunne
self.current_message_job_id = None
self.has_consumed_message = False

self.tunel_port_executor = tunnel_port_executor
self.tunel_port_transfer = tunnel_port_transfer
self.tunnel_port_executor = tunnel_port_executor
self.tunnel_port_transfer = tunnel_port_transfer

def run(self):
try:
Expand All @@ -57,9 +57,9 @@ def run(self):
signal.signal(signal.SIGTERM, self.signal_handler)

sync_db_initiate_database(self.db_url)
self.hpc_executor = HPCExecutor(tunel_host='localhost', tunel_port=self.tunel_port_executor)
self.hpc_executor = HPCExecutor(tunel_host='localhost', tunel_port=self.tunnel_port_executor)
self.log.info("HPC executor connection successful.")
self.hpc_io_transfer = HPCTransfer(tunel_host='localhost', tunel_port=self.tunel_port_transfer)
self.hpc_io_transfer = HPCTransfer(tunel_host='localhost', tunel_port=self.tunnel_port_transfer)
self.log.info("HPC transfer connection successful.")

self.rmq_consumer = get_connection_consumer(rabbitmq_url=self.rmq_url)
Expand Down
12 changes: 10 additions & 2 deletions tests/integration_tests/test_full_cycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,18 @@ def test_full_cycle(auth_harvester, operandi, service_broker, bytes_default_work
response = operandi.get("/")
assert response.json()["message"] == "The home page of the OPERANDI Server"

worker_starting_port = 44000
# Create a background worker for the harvester queue
service_broker.create_worker_process(queue_name=RABBITMQ_QUEUE_HARVESTER, status_checker=False)
service_broker.create_worker_process(
queue_name=RABBITMQ_QUEUE_HARVESTER, status_checker=False,
tunnel_port_executor=worker_starting_port, tunnel_port_transfer=worker_starting_port+1
)
worker_starting_port += 2
# Create a background worker for the job statuses queue
service_broker.create_worker_process(queue_name=RABBITMQ_QUEUE_JOB_STATUSES, status_checker=True)
service_broker.create_worker_process(
queue_name=RABBITMQ_QUEUE_JOB_STATUSES, status_checker=True,
tunnel_port_executor=worker_starting_port, tunnel_port_transfer=worker_starting_port+1
)

# Post a workflow script
response = operandi.post(
Expand Down

0 comments on commit 71d7d81

Please sign in to comment.