Skip to content

Commit

Permalink
Move worker management (#180)
Browse files Browse the repository at this point in the history
* Moved worker management code from main to worker manager.

* Added worker properties class.
  • Loading branch information
AshishA26 authored Jul 11, 2024
1 parent 18ffc37 commit 55e622a
Show file tree
Hide file tree
Showing 3 changed files with 441 additions and 157 deletions.
182 changes: 107 additions & 75 deletions documentation/main_multiprocess_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
COUNTUP_TO_ADD_RANDOM_QUEUE_MAX_SIZE = 5
ADD_RANDOM_TO_CONCATENATOR_QUEUE_MAX_SIZE = 5

# Play with these numbers to see process bottlenecks
COUNTUP_WORKER_COUNT = 2
ADD_RANDOM_WORKER_COUNT = 2
CONCATENATOR_WORKER_COUNT = 2


# main() is required for early return
def main() -> int:
Expand Down Expand Up @@ -69,83 +74,111 @@ def main() -> int:
ADD_RANDOM_TO_CONCATENATOR_QUEUE_MAX_SIZE,
)

# Prepare processes
# Data path: countup_worker to add_random_worker to concatenator_workers
# Play with these numbers to see process bottlenecks
countup_workers = [
mp.Process(
target=countup_worker.countup_worker,
args=(
3,
100,
countup_to_add_random_queue,
controller,
),
),
mp.Process(
target=countup_worker.countup_worker,
args=(
2,
200,
countup_to_add_random_queue,
controller,
),
),
]
countup_manager = worker_manager.WorkerManager(countup_workers)

add_random_workers = [
mp.Process(
target=add_random_worker.add_random_worker,
args=(
252,
10,
5,
countup_to_add_random_queue,
add_random_to_concatenator_queue,
controller,
),
),
mp.Process(
target=add_random_worker.add_random_worker,
args=(
350,
4,
1,
countup_to_add_random_queue,
add_random_to_concatenator_queue,
controller,
),
# Worker properties
result, countup_worker_properties = worker_manager.WorkerProperties.create(
count=COUNTUP_WORKER_COUNT,
target=countup_worker.countup_worker,
work_arguments=(
3,
100,
),
]
add_random_manager = worker_manager.WorkerManager(add_random_workers)

concatenator_workers = [
mp.Process(
target=concatenator_worker.concatenator_worker,
args=(
"Hello ",
" world!",
add_random_to_concatenator_queue,
controller,
),
input_queues=[],
output_queues=[countup_to_add_random_queue],
controller=controller,
local_logger=main_logger,
)
if not result:
print("Failed to create arguments for Countup")
return -1

# Get Pylance to stop complaining
assert countup_worker_properties is not None

result, add_random_worker_properties = worker_manager.WorkerProperties.create(
count=ADD_RANDOM_WORKER_COUNT,
target=add_random_worker.add_random_worker,
work_arguments=(
252,
10,
5,
),
mp.Process(
target=concatenator_worker.concatenator_worker,
args=(
"Example ",
" code!",
add_random_to_concatenator_queue,
controller,
),
input_queues=[countup_to_add_random_queue],
output_queues=[add_random_to_concatenator_queue],
controller=controller,
local_logger=main_logger,
)
if not result:
print("Failed to create arguments for Add Random")
return -1

# Get Pylance to stop complaining
assert add_random_worker_properties is not None

result, concatenator_worker_properties = worker_manager.WorkerProperties.create(
count=CONCATENATOR_WORKER_COUNT,
target=concatenator_worker.concatenator_worker,
work_arguments=(
"Hello ",
" world!",
),
]
concatenator_manager = worker_manager.WorkerManager(concatenator_workers)
input_queues=[add_random_to_concatenator_queue],
output_queues=[],
controller=controller,
local_logger=main_logger,
)
if not result:
print("Failed to create arguments for Concatenator")
return -1

# Get Pylance to stop complaining
assert concatenator_worker_properties is not None

# Prepare processes
# Data path: countup_worker to add_random_worker to concatenator_workers
worker_managers = []

result, countup_manager = worker_manager.WorkerManager.create(
worker_properties=countup_worker_properties,
local_logger=main_logger,
)
if not result:
print("Failed to create manager for Countup")
return -1

# Get Pylance to stop complaining
assert countup_manager is not None

worker_managers.append(countup_manager)

result, add_random_manager = worker_manager.WorkerManager.create(
worker_properties=add_random_worker_properties,
local_logger=main_logger,
)
if not result:
print("Failed to create manager for Add Random")
return -1

# Get Pylance to stop complaining
assert add_random_manager is not None

worker_managers.append(add_random_manager)

result, concatenator_manager = worker_manager.WorkerManager.create(
worker_properties=concatenator_worker_properties,
local_logger=main_logger,
)
if not result:
print("Failed to create manager for Concatenator")
return -1

# Get Pylance to stop complaining
assert concatenator_manager is not None

worker_managers.append(concatenator_manager)

# Start worker processes
countup_manager.start_workers()
add_random_manager.start_workers()
concatenator_manager.start_workers()
for manager in worker_managers:
manager.start_workers()

frame = inspect.currentframe()
main_logger.info("Started", frame)
Expand Down Expand Up @@ -178,9 +211,8 @@ def main() -> int:
main_logger.info("Queues cleared", frame)

# Clean up worker processes
countup_manager.join_workers()
add_random_manager.join_workers()
concatenator_manager.join_workers()
for manager in worker_managers:
manager.join_workers()

frame = inspect.currentframe()
main_logger.info("Stopped", frame)
Expand Down
Loading

0 comments on commit 55e622a

Please sign in to comment.