Skip to content

Commit

Permalink
Feature Add queue (#25)
Browse files Browse the repository at this point in the history
* Update README.md

* Add queueing

* Move default properties to properties.py

* Reorganize

* Make worker cnt configurable

* Update README.md
  • Loading branch information
MatthiasReumann authored Jan 31, 2023
1 parent 3a7826e commit 75fbf09
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 84 deletions.
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Microservice that generates subtitles for [TUM-Live](https://live.rbg.tum.de).
```bash
$ grpcurl -plaintext localhost:50055 list live.voice.v1.SubtitleGenerator

voice.SubtitleGenerator.Generate
live.voice.v1.SubtitleGenerator.Generate
```

```bash
Expand Down Expand Up @@ -88,7 +88,8 @@ VOSK_MODEL_DIR=/data
VOSK_DWNLD_URLS=https://alphacephei.com/vosk/models/vosk-model-small-en-us-0.15.zip,https://alphacephei.com/vosk/models/vosk-model-small-de-0.15.zip
VOSK_MODELS=model-fr:fr,model-en:en
WHISPER_MODEL=medium
MAX_WORKERS=10
MAX_THREADS=10
CNT_WORKERS=3
```
</p>
</details>
Expand All @@ -115,7 +116,8 @@ vosk:
lang: 'de'
whisper:
model: 'tiny'
max_workers: 10
max_threads: 12
cnt_workers: 3
```
</p>
</details>
Expand Down
3 changes: 2 additions & 1 deletion config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ vosk:
lang: 'de'
whisper:
model: 'tiny'
max_workers: 12
max_threads: 12
cnt_workers: 3
17 changes: 17 additions & 0 deletions subtitles/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""Implements gRPC Client facade"""

import logging
import grpc
import subtitles_pb2_grpc, subtitles_pb2
from grpc._channel import _InactiveRpcError


def receive(receiver: str, req: subtitles_pb2.ReceiveRequest):
with grpc.insecure_channel(receiver) as channel:
stub = subtitles_pb2_grpc.SubtitleReceiverStub(channel)
try:
stub.Receive(req)
except _InactiveRpcError as grpc_err:
logging.error(grpc_err.details())
except Exception as err:
logging.error(err)
24 changes: 21 additions & 3 deletions subtitles/properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,20 @@
import os.path
import yaml

DEFAULT_PROPERTIES = {
'api': {'port': 50055},
'receiver': {'host': 'localhost', 'port': '50053'},
'transcriber': 'whisper',
'vosk': {
'model_dir': '/tmp',
'download_urls': [],
'models': []
},
'whisper': {'model': 'tiny'},
'max_threads': None,
'cnt_workers': 1,
}


class PropertyError(Exception):
pass
Expand Down Expand Up @@ -77,9 +91,13 @@ def get(self) -> dict:

properties['whisper']['model'] = os.getenv('WHISPER_MODEL', properties['whisper']['model'])

max_workers = os.getenv('MAX_WORKERS', properties['max_workers'])
if max_workers:
properties['max_workers'] = int(max_workers)
max_threads = os.getenv('MAX_THREADS', properties['max_threads'])
if max_threads:
properties['max_threads'] = int(max_threads)

cnt_workers = os.getenv('CNT_WORKERS', properties['cnt_workers'])
if cnt_workers:
properties['cnt_workers'] = int(cnt_workers)

return properties

Expand Down
121 changes: 44 additions & 77 deletions subtitles/subtitles.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,31 @@
import os
from signal import signal, SIGTERM, SIGINT, SIGQUIT, strsignal
from concurrent.futures import ThreadPoolExecutor
from properties import YAMLPropertiesFile, EnvProperties, PropertyError
from properties import YAMLPropertiesFile, EnvProperties, PropertyError, DEFAULT_PROPERTIES
from grpc_reflection.v1alpha import reflection
from grpc._channel import _InactiveRpcError
from google.protobuf import empty_pb2
from model_loader import download_models, ModelLoadError
import grpc
import subtitles_pb2
import subtitles_pb2_grpc
from taskqueue import TaskQueue
from vosk_transcriber import VoskTranscriber
from whisper_transcriber import WhisperTranscriber
from transcriber import Transcriber
from tasks import GenerationTask
from worker import Worker


class SubtitleServerService(subtitles_pb2_grpc.SubtitleGeneratorServicer):
"""grpc service for subtitles"""

def __init__(self, transcriber: Transcriber, receiver: str, executor: ThreadPoolExecutor) -> None:
"""Initialize service.
Args:
transcriber: The transcriber used for subtitle generation.
receiver: The address of the receiver service.
executor: Threadpool for jobs.
"""
self.__transcriber = transcriber
self.__receiver = receiver
self.__executor = executor
def __init__(self, queue: TaskQueue) -> None:
"""Initialize service"""
self.__queue = queue

def Generate(self, req: subtitles_pb2.GenerateRequest,
context: grpc.ServicerContext) -> empty_pb2.Empty:
""" Handler function for an incoming Generate request.
"""Handler function for an incoming Generate request.
Args:
req: An object holding the grpc message data.
Expand All @@ -50,71 +44,50 @@ def Generate(self, req: subtitles_pb2.GenerateRequest,
logging.debug(f'checking if {source} exists')
if not os.path.isfile(source):
context.abort(grpc.StatusCode.NOT_FOUND, f'can not find source file: {source}')
return empty_pb2.Empty()

logging.debug('starting thread to generate subtitles')
self.__executor.submit(self.__generate, self.__transcriber, source, stream_id, language)
return empty_pb2.Empty()
return

def __generate(self, transcriber: Transcriber, source: str, stream_id: str, language: str) -> None:
subtitles, language = transcriber.generate(source, language)
logging.debug('enqueue request')
self.__queue.put(GenerationTask(source, language, stream_id))

logging.info(f'trying to connect to receiver @ {self.__receiver}')
with grpc.insecure_channel(self.__receiver) as channel:
stub = subtitles_pb2_grpc.SubtitleReceiverStub(channel)
request = subtitles_pb2.ReceiveRequest(
stream_id=stream_id,
subtitles=subtitles,
language=language)

try:
stub.Receive(request)
logging.info('subtitle-request sent')
except _InactiveRpcError as grpc_err:
logging.error(grpc_err.details())
except Exception as err:
logging.error(err)
return empty_pb2.Empty()


def serve(transcriber: Transcriber,
receiver: str,
def serve(executor: ThreadPoolExecutor,
q: TaskQueue,
port: int,
max_workers: int,
debug: bool = False) -> None:
"""Starts the grpc server.
Args:
transcriber: The transcriber used.
receiver: The network address of the receiver.
executor: The pool of threads
q: Queue of tasks
port: The port on which the voice service listens.
max_workers: The maximum number of threads that can be used to execute the given calls.
debug: Whether the server should be started in debug mode or not.
"""
server = grpc.server(executor)
subtitles_pb2_grpc.add_SubtitleGeneratorServicer_to_server(
servicer=SubtitleServerService(q),
server=server)

with ThreadPoolExecutor(max_workers) as executor:
server = grpc.server(executor)
subtitles_pb2_grpc.add_SubtitleGeneratorServicer_to_server(
servicer=SubtitleServerService(transcriber, receiver, executor),
server=server)
if debug:
activate_reflection(server)

if debug:
activate_reflection(server)
logging.info(f'listening at :{port}')
server.add_insecure_port(f'[::]:{port}')
server.start()

logging.info(f'listening at :{port}')
server.add_insecure_port(f'[::]:{port}')
server.start()
def handle_shutdown(signum, *_):
logging.info(f'received "{strsignal(signum)}" signal')
all_requests_done = server.stop(16)
all_requests_done.wait(16)
q.stop()
executor.shutdown(wait=True)
logging.info('shut down gracefully')

def handle_shutdown(signum, *_):
logging.info(f'received "{strsignal(signum)}" signal')
all_requests_done = server.stop(30)
all_requests_done.wait(30)
executor.shutdown(wait=True)
logging.info('shut down gracefully')

signal(SIGTERM, handle_shutdown)
signal(SIGINT, handle_shutdown)
signal(SIGQUIT, handle_shutdown)
server.wait_for_termination()
signal(SIGTERM, handle_shutdown)
signal(SIGINT, handle_shutdown)
signal(SIGQUIT, handle_shutdown)
server.wait_for_termination()


def get_transcriber(properties: dict, debug: bool) -> Transcriber:
Expand All @@ -141,18 +114,7 @@ def main():
debug = os.getenv('DEBUG', '') != ""
logging.basicConfig(level=(logging.INFO, logging.DEBUG)[debug])

properties = {
'api': {'port': 50055},
'receiver': {'host': 'localhost', 'port': '50053'},
'transcriber': 'whisper',
'vosk': {
'model_dir': '/tmp',
'download_urls': [],
'models': []
},
'whisper': {'model': 'tiny'},
'max_workers': None,
}
properties = DEFAULT_PROPERTIES

try:
config_file = os.getenv("CONFIG_FILE")
Expand All @@ -177,10 +139,15 @@ def main():
transcriber = get_transcriber(properties, debug)
receiver = f'{properties["receiver"]["host"]}:{properties["receiver"]["port"]}'
port = properties['api']['port']
max_workers = properties['max_workers']
max_threads = properties['max_threads']
cnt_workers = properties['cnt_workers']

logging.debug(properties)
serve(transcriber, receiver, port, max_workers, debug)

q = TaskQueue(cnt_workers)
with ThreadPoolExecutor(max_threads) as executor:
[Worker(transcriber, receiver, executor, q) for _ in range(cnt_workers)]
serve(executor, q, port, debug)


if __name__ == "__main__":
Expand Down
13 changes: 13 additions & 0 deletions subtitles/taskqueue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import queue

from tasks import StopTask


class TaskQueue(queue.Queue):
def __init__(self, task_worker_cnt: int = 1):
super().__init__()
self.task_worker_cnt = task_worker_cnt

def stop(self):
for _ in range(self.task_worker_cnt):
self.put(StopTask())
12 changes: 12 additions & 0 deletions subtitles/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from dataclasses import dataclass


@dataclass
class GenerationTask:
source: str
language: str
stream_id: str


class StopTask:
pass
42 changes: 42 additions & 0 deletions subtitles/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import logging
from concurrent.futures import ThreadPoolExecutor
import subtitles_pb2
from tasks import GenerationTask, StopTask
from taskqueue import TaskQueue
from transcriber import Transcriber
from client import receive


class Worker:
"""Thread for subtitle generation"""

def __init__(self,
transcriber: Transcriber,
receiver: str,
executor: ThreadPoolExecutor,
taskqueue: TaskQueue):
"""Start the generator threads."""
executor.submit(run, transcriber, receiver, taskqueue)


def run(transcriber: Transcriber, receiver: str, taskqueue: TaskQueue) -> None:
while True:
logging.info('worker: waiting for task...')
task = taskqueue.get()
if isinstance(task, GenerationTask):
logging.info('worker: starting to generate subtitles...')
logging.debug(f'worker: task: {task}')
generate(transcriber, receiver, task)
elif isinstance(task, StopTask):
break


def generate(transcriber: Transcriber, receiver: str, task: GenerationTask) -> None:
subtitles, language = transcriber.generate(task.source, task.language)

logging.info(f'worker: sending receive message to receiver @ {receiver}')
receive(receiver,
req=subtitles_pb2.ReceiveRequest(
stream_id=task.stream_id,
subtitles=subtitles,
language=language))

0 comments on commit 75fbf09

Please sign in to comment.