diff --git a/README.md b/README.md index 479a79f..6599f97 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ Microservice that generates subtitles for [TUM-Live](https://live.rbg.tum.de). ```bash $ grpcurl -plaintext localhost:50055 list live.voice.v1.SubtitleGenerator -voice.SubtitleGenerator.Generate +live.voice.v1.SubtitleGenerator.Generate ``` ```bash @@ -88,7 +88,8 @@ VOSK_MODEL_DIR=/data VOSK_DWNLD_URLS=https://alphacephei.com/vosk/models/vosk-model-small-en-us-0.15.zip,https://alphacephei.com/vosk/models/vosk-model-small-de-0.15.zip VOSK_MODELS=model-fr:fr,model-en:en WHISPER_MODEL=medium -MAX_WORKERS=10 +MAX_THREADS=10 +CNT_WORKERS=3 ```

@@ -115,7 +116,8 @@ vosk: lang: 'de' whisper: model: 'tiny' -max_workers: 10 +max_threads: 12 +cnt_workers: 3 ```

diff --git a/config.yml b/config.yml index c2d3a00..f48db88 100644 --- a/config.yml +++ b/config.yml @@ -16,4 +16,5 @@ vosk: lang: 'de' whisper: model: 'tiny' -max_workers: 12 \ No newline at end of file +max_threads: 12 +cnt_workers: 3 \ No newline at end of file diff --git a/subtitles/client.py b/subtitles/client.py new file mode 100644 index 0000000..b37fab1 --- /dev/null +++ b/subtitles/client.py @@ -0,0 +1,17 @@ +"""Implements gRPC Client facade""" + +import logging +import grpc +import subtitles_pb2_grpc, subtitles_pb2 +from grpc._channel import _InactiveRpcError + + +def receive(receiver: str, req: subtitles_pb2.ReceiveRequest): + with grpc.insecure_channel(receiver) as channel: + stub = subtitles_pb2_grpc.SubtitleReceiverStub(channel) + try: + stub.Receive(req) + except _InactiveRpcError as grpc_err: + logging.error(grpc_err.details()) + except Exception as err: + logging.error(err) diff --git a/subtitles/properties.py b/subtitles/properties.py index f1c9eef..403fdc4 100644 --- a/subtitles/properties.py +++ b/subtitles/properties.py @@ -3,6 +3,20 @@ import os.path import yaml +DEFAULT_PROPERTIES = { + 'api': {'port': 50055}, + 'receiver': {'host': 'localhost', 'port': '50053'}, + 'transcriber': 'whisper', + 'vosk': { + 'model_dir': '/tmp', + 'download_urls': [], + 'models': [] + }, + 'whisper': {'model': 'tiny'}, + 'max_threads': None, + 'cnt_workers': 1, +} + class PropertyError(Exception): pass @@ -77,9 +91,13 @@ def get(self) -> dict: properties['whisper']['model'] = os.getenv('WHISPER_MODEL', properties['whisper']['model']) - max_workers = os.getenv('MAX_WORKERS', properties['max_workers']) - if max_workers: - properties['max_workers'] = int(max_workers) + max_threads = os.getenv('MAX_THREADS', properties['max_threads']) + if max_threads: + properties['max_threads'] = int(max_threads) + + cnt_workers = os.getenv('CNT_WORKERS', properties['cnt_workers']) + if cnt_workers: + properties['cnt_workers'] = int(cnt_workers) return properties diff --git a/subtitles/subtitles.py b/subtitles/subtitles.py index abe0f5c..3aa48ec 100644 --- a/subtitles/subtitles.py +++ b/subtitles/subtitles.py @@ -3,37 +3,31 @@ import os from signal import signal, SIGTERM, SIGINT, SIGQUIT, strsignal from concurrent.futures import ThreadPoolExecutor -from properties import YAMLPropertiesFile, EnvProperties, PropertyError +from properties import YAMLPropertiesFile, EnvProperties, PropertyError, DEFAULT_PROPERTIES from grpc_reflection.v1alpha import reflection -from grpc._channel import _InactiveRpcError from google.protobuf import empty_pb2 from model_loader import download_models, ModelLoadError import grpc import subtitles_pb2 import subtitles_pb2_grpc +from taskqueue import TaskQueue from vosk_transcriber import VoskTranscriber from whisper_transcriber import WhisperTranscriber from transcriber import Transcriber +from tasks import GenerationTask +from worker import Worker class SubtitleServerService(subtitles_pb2_grpc.SubtitleGeneratorServicer): """grpc service for subtitles""" - def __init__(self, transcriber: Transcriber, receiver: str, executor: ThreadPoolExecutor) -> None: - """Initialize service. - - Args: - transcriber: The transcriber used for subtitle generation. - receiver: The address of the receiver service. - executor: Threadpool for jobs. - """ - self.__transcriber = transcriber - self.__receiver = receiver - self.__executor = executor + def __init__(self, queue: TaskQueue) -> None: + """Initialize service""" + self.__queue = queue def Generate(self, req: subtitles_pb2.GenerateRequest, context: grpc.ServicerContext) -> empty_pb2.Empty: - """ Handler function for an incoming Generate request. + """Handler function for an incoming Generate request. Args: req: An object holding the grpc message data. @@ -50,71 +44,50 @@ def Generate(self, req: subtitles_pb2.GenerateRequest, logging.debug(f'checking if {source} exists') if not os.path.isfile(source): context.abort(grpc.StatusCode.NOT_FOUND, f'can not find source file: {source}') - return empty_pb2.Empty() - - logging.debug('starting thread to generate subtitles') - self.__executor.submit(self.__generate, self.__transcriber, source, stream_id, language) - return empty_pb2.Empty() + return - def __generate(self, transcriber: Transcriber, source: str, stream_id: str, language: str) -> None: - subtitles, language = transcriber.generate(source, language) + logging.debug('enqueue request') + self.__queue.put(GenerationTask(source, language, stream_id)) - logging.info(f'trying to connect to receiver @ {self.__receiver}') - with grpc.insecure_channel(self.__receiver) as channel: - stub = subtitles_pb2_grpc.SubtitleReceiverStub(channel) - request = subtitles_pb2.ReceiveRequest( - stream_id=stream_id, - subtitles=subtitles, - language=language) - - try: - stub.Receive(request) - logging.info('subtitle-request sent') - except _InactiveRpcError as grpc_err: - logging.error(grpc_err.details()) - except Exception as err: - logging.error(err) + return empty_pb2.Empty() -def serve(transcriber: Transcriber, - receiver: str, +def serve(executor: ThreadPoolExecutor, + q: TaskQueue, port: int, - max_workers: int, debug: bool = False) -> None: """Starts the grpc server. Args: - transcriber: The transcriber used. - receiver: The network address of the receiver. + executor: The pool of threads + q: Queue of tasks port: The port on which the voice service listens. - max_workers: The maximum number of threads that can be used to execute the given calls. debug: Whether the server should be started in debug mode or not. """ + server = grpc.server(executor) + subtitles_pb2_grpc.add_SubtitleGeneratorServicer_to_server( + servicer=SubtitleServerService(q), + server=server) - with ThreadPoolExecutor(max_workers) as executor: - server = grpc.server(executor) - subtitles_pb2_grpc.add_SubtitleGeneratorServicer_to_server( - servicer=SubtitleServerService(transcriber, receiver, executor), - server=server) + if debug: + activate_reflection(server) - if debug: - activate_reflection(server) + logging.info(f'listening at :{port}') + server.add_insecure_port(f'[::]:{port}') + server.start() - logging.info(f'listening at :{port}') - server.add_insecure_port(f'[::]:{port}') - server.start() + def handle_shutdown(signum, *_): + logging.info(f'received "{strsignal(signum)}" signal') + all_requests_done = server.stop(16) + all_requests_done.wait(16) + q.stop() + executor.shutdown(wait=True) + logging.info('shut down gracefully') - def handle_shutdown(signum, *_): - logging.info(f'received "{strsignal(signum)}" signal') - all_requests_done = server.stop(30) - all_requests_done.wait(30) - executor.shutdown(wait=True) - logging.info('shut down gracefully') - - signal(SIGTERM, handle_shutdown) - signal(SIGINT, handle_shutdown) - signal(SIGQUIT, handle_shutdown) - server.wait_for_termination() + signal(SIGTERM, handle_shutdown) + signal(SIGINT, handle_shutdown) + signal(SIGQUIT, handle_shutdown) + server.wait_for_termination() def get_transcriber(properties: dict, debug: bool) -> Transcriber: @@ -141,18 +114,7 @@ def main(): debug = os.getenv('DEBUG', '') != "" logging.basicConfig(level=(logging.INFO, logging.DEBUG)[debug]) - properties = { - 'api': {'port': 50055}, - 'receiver': {'host': 'localhost', 'port': '50053'}, - 'transcriber': 'whisper', - 'vosk': { - 'model_dir': '/tmp', - 'download_urls': [], - 'models': [] - }, - 'whisper': {'model': 'tiny'}, - 'max_workers': None, - } + properties = DEFAULT_PROPERTIES try: config_file = os.getenv("CONFIG_FILE") @@ -177,10 +139,15 @@ def main(): transcriber = get_transcriber(properties, debug) receiver = f'{properties["receiver"]["host"]}:{properties["receiver"]["port"]}' port = properties['api']['port'] - max_workers = properties['max_workers'] + max_threads = properties['max_threads'] + cnt_workers = properties['cnt_workers'] logging.debug(properties) - serve(transcriber, receiver, port, max_workers, debug) + + q = TaskQueue(cnt_workers) + with ThreadPoolExecutor(max_threads) as executor: + [Worker(transcriber, receiver, executor, q) for _ in range(cnt_workers)] + serve(executor, q, port, debug) if __name__ == "__main__": diff --git a/subtitles/taskqueue.py b/subtitles/taskqueue.py new file mode 100644 index 0000000..1b114a6 --- /dev/null +++ b/subtitles/taskqueue.py @@ -0,0 +1,13 @@ +import queue + +from tasks import StopTask + + +class TaskQueue(queue.Queue): + def __init__(self, task_worker_cnt: int = 1): + super().__init__() + self.task_worker_cnt = task_worker_cnt + + def stop(self): + for _ in range(self.task_worker_cnt): + self.put(StopTask()) \ No newline at end of file diff --git a/subtitles/tasks.py b/subtitles/tasks.py new file mode 100644 index 0000000..6a30d4a --- /dev/null +++ b/subtitles/tasks.py @@ -0,0 +1,12 @@ +from dataclasses import dataclass + + +@dataclass +class GenerationTask: + source: str + language: str + stream_id: str + + +class StopTask: + pass diff --git a/subtitles/worker.py b/subtitles/worker.py new file mode 100644 index 0000000..d6365d7 --- /dev/null +++ b/subtitles/worker.py @@ -0,0 +1,42 @@ +import logging +from concurrent.futures import ThreadPoolExecutor +import subtitles_pb2 +from tasks import GenerationTask, StopTask +from taskqueue import TaskQueue +from transcriber import Transcriber +from client import receive + + +class Worker: + """Thread for subtitle generation""" + + def __init__(self, + transcriber: Transcriber, + receiver: str, + executor: ThreadPoolExecutor, + taskqueue: TaskQueue): + """Start the generator threads.""" + executor.submit(run, transcriber, receiver, taskqueue) + + +def run(transcriber: Transcriber, receiver: str, taskqueue: TaskQueue) -> None: + while True: + logging.info('worker: waiting for task...') + task = taskqueue.get() + if isinstance(task, GenerationTask): + logging.info('worker: starting to generate subtitles...') + logging.debug(f'worker: task: {task}') + generate(transcriber, receiver, task) + elif isinstance(task, StopTask): + break + + +def generate(transcriber: Transcriber, receiver: str, task: GenerationTask) -> None: + subtitles, language = transcriber.generate(task.source, task.language) + + logging.info(f'worker: sending receive message to receiver @ {receiver}') + receive(receiver, + req=subtitles_pb2.ReceiveRequest( + stream_id=task.stream_id, + subtitles=subtitles, + language=language))