Skip to content

Commit

Permalink
Parallel for mlchain 0.2.5 (#48)
Browse files Browse the repository at this point in the history
* Increase MAX_CONTENT_LENGTH to 1GB

* Default starlette -> flask, add parallel sync to compatible with gevent

* Handle ast.literal_eval by using json.loads

* Add Background Sync

* Update requirements

* Add itsdangerous and remove python-Levenshtein

* Fixed Click version for Flask 1

* Remove Werkzeug due to Flask 1.1.4 error

* Remove Jinja2 due to Flask 1.1.4

* Fixed Jinja2 and Werkzeug

* Fixed h11 issue

* Replace fuzzywuzzy by thefuzz and drop support python 3.6

* Fixed MarkupSafe

* Remove reduntdant MarkupSafe>=1.1.1

* Re-update sentry-sdk[flask]

* Does not use opencv-python 4.5 because of failed coverage test

Co-authored-by: Hoang Viet <[email protected]>
  • Loading branch information
lamhoangtung and meocong authored Mar 11, 2022
1 parent 33fe470 commit 5c00860
Show file tree
Hide file tree
Showing 12 changed files with 320 additions and 35 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
fail-fast: false
matrix:
os: [macos-latest, ubuntu-latest, windows-latest]
python-version: [3.6, 3.7, 3.8, 3.9]
python-version: [3.7, 3.8, 3.9]

if: "!contains(github.event.head_commit.message, 'ci skip')"

Expand Down
3 changes: 2 additions & 1 deletion mlchain/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
)

# Parameters of MLchain
__version__ = "0.2.3"
__version__ = "0.2.5"

HOST = "https://www.api.mlchain.ml"
WEB_HOST = HOST
API_ADDRESS = HOST
Expand Down
38 changes: 25 additions & 13 deletions mlchain/base/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ def import_cv2():
import cv2 as cv
cv2 = cv

def ast_json_parse_string(value: str):
try:
l = ast.literal_eval(value)
return l
except Exception as ex:
try:
l = json.loads(value)
return l
except Exception as ex1:
raise MLChainAssertionError("Can't convert {0} to Python list, dict, set. Please check the variable {1}".format(value, mlchain_context.CONVERT_VARIABLE))

def bytes2ndarray(value: bytes) -> np.ndarray:
import_cv2()
nparr = np.fromstring(value, np.uint8)
Expand Down Expand Up @@ -81,9 +92,10 @@ def str2ndarray(value: str) -> np.ndarray:
pass

try:
l = ast_json_parse_string(value)

# If it is a string array
import ast
arr = np.array(ast.literal_eval(value))
arr = np.array(l)
if arr is not None:
return arr
except:
Expand Down Expand Up @@ -118,25 +130,24 @@ def str2bool(value: str) -> bool:


def str2list(value: str) -> List:
try:
l = ast.literal_eval(value)
return l
except:
try:
l = ast_json_parse_string(value)
return l
except Exception as ex:
return [value]


def str2dict(value: str) -> dict:
try:
l = ast.literal_eval(value)
l = ast_json_parse_string(value)
return l
except:
except Exception as ex:
raise MLChainAssertionError("Can't convert {0} to dict. Please check the variable {1}".format(value, mlchain_context.CONVERT_VARIABLE))

def str2set(value: str) -> set:
try:
l = ast.literal_eval(value)
l = ast_json_parse_string(value)
return l
except:
except Exception as ex:
raise MLChainAssertionError("Can't convert {0} to set. Please check the variable {1}".format(value, mlchain_context.CONVERT_VARIABLE))

def str2bytes(value: str) -> bytes:
Expand Down Expand Up @@ -501,9 +512,10 @@ def str2pil(value: str) -> Image.Image:
pass

try:
l = ast_json_parse_string(value)

# If it is a string array
import ast
return Image.fromarray(ast.literal_eval(value))
return Image.fromarray(l)
except:
raise MLChainAssertionError(
"There's no way to convert to PIL Image with variable {0}. Please check the variable {1}".format(value, mlchain_context.CONVERT_VARIABLE))
Expand Down
2 changes: 1 addition & 1 deletion mlchain/base/serve_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import types
from mlchain.context import mlchain_context
from .exceptions import MLChainAssertionError, MlChainError, MLChain404Error
from fuzzywuzzy import process as fuzzywuzzy_process
from thefuzz import process as fuzzywuzzy_process

def non_thread(timeout=-1):
if timeout is None or (isinstance(timeout, (float, int)) and timeout <= 0):
Expand Down
13 changes: 8 additions & 5 deletions mlchain/cli/mlconfig.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ host: 0.0.0.0 # Host of service
port: 8001 # Port service

# Server config
server: starlette # Option flask or starlette or grpc
server: flask # Option flask or starlette or grpc
wrapper: gunicorn # Option None or gunicorn
cors: true # Auto enable CORS
cors_allow_origins: # Allow origins for CORS
Expand All @@ -20,10 +20,13 @@ template_folder: # template folder for TemplateResponse

# Gunicorn config - Use gunicorn for general case
gunicorn:
timeout: 200 # The requests will be maximum 200 seconds in default, then when the requests is done, the worker will be restarted
max_requests: 0 # Maximum serving requests until workers restart to handle over memory in Python
workers: 1 # Number of duplicate workers
threads: 1 # Number of simultaneous threads in workers
timeout: 200 # The requests will be maximum 200 seconds in default, then when the requests is done, the worker will be restarted
max_requests: 0 # Maximum serving requests until workers restart to handle over memory in Python
workers: 1 # Number of duplicate workers
threads: 1 # Number of simultaneous threads in workers
worker_class: gthread # The base worker_class, can use gevent (For better IO) or uvicorn.workers.UvicornWorker (starlette - For Async)
max_requests_jitter: 50 # Restart worker different time
accesslog: mlchain-server.log # Log file for gunicorn

bind:
- 'unix:/tmp/gunicorn.sock' # Using sock to make gunicorn faster
Expand Down
2 changes: 1 addition & 1 deletion mlchain/server/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import warnings
from inspect import signature, _empty
from collections import defaultdict
from fuzzywuzzy.fuzz import ratio
from thefuzz.fuzz import ratio
from mlchain.base import ServeModel
from mlchain.base.log import logger
from mlchain.base.serializer import JsonSerializer, MsgpackSerializer, MsgpackBloscSerializer
Expand Down
3 changes: 3 additions & 0 deletions mlchain/workflows_sync/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .background import Background
from .parallel import Parallel
from .task import Task
117 changes: 117 additions & 0 deletions mlchain/workflows_sync/background.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import inspect
import time
from threading import Thread, Event
from .task import Task
from datetime import timedelta
from concurrent.futures import ThreadPoolExecutor
import logging
import traceback

class BackgroundTask(Thread):
def __init__(self, interval, task, max_repeat, callback=None, max_thread:int=1, pass_fail_job:bool=False):
assert callable(task)

Thread.__init__(self)
self.stopped = Event()
self.is_done = False
self.interval = interval
self.task = task
self.max_repeat = max_repeat
self.callback = callback
self.output = None
self.pool_limit = ThreadPoolExecutor(max_workers=max_thread)
self.pass_fail_job = pass_fail_job

if callback is not None:
self.pool_limit_callback = ThreadPoolExecutor(max_workers=1)

def stop(self):
self.stopped.set()
self.join()

def get_output(self, task, *args, **kwargs):
try:
self.output = task(*args, **kwargs)
except Exception as ex:
self.output = ("MLCHAIN_BACKGROUND_ERROR", traceback.format_exc())
self.call_the_callback()

def call_the_callback(self):
if self.callback:
self.pool_limit_callback.submit(self.callback)

if isinstance(self.output, tuple) and len(self.output) == 2 and self.output[0] == "MLCHAIN_BACKGROUND_ERROR":
if self.pass_fail_job:
logging.error("BACKGROUND CALL ERROR: {0}".format(self.output[1]))
else:
raise Exception("BACKGROUND CALL ERROR: {0}".format(self.output[1]))

def run(self):
if self.interval is not None:
count_repeat = 0
while (self.max_repeat < 0 or count_repeat < self.max_repeat) \
and (not self.stopped.wait(self.interval.total_seconds())):

if isinstance(type(self.task), Task) \
or issubclass(type(self.task), Task):
self.pool_limit.submit(self.get_output, self.task.func_, *self.task.args, **self.task.kwargs)
else:
self.pool_limit.submit(self.get_output, self.task)
count_repeat += 1
else:
if isinstance(type(self.task), Task) \
or issubclass(type(self.task), Task):
self.pool_limit.submit(self.get_output, self.task.func_, *self.task.args, **self.task.kwargs)
else:
self.pool_limit.submit(self.get_output, self.task)

self.pool_limit.shutdown(wait=True)
self.is_done = True

if isinstance(self.output, tuple) and len(self.output) == 2 and self.output[0] == "MLCHAIN_BACKGROUND_ERROR":
if self.pass_fail_job:
logging.error("BACKGROUND CALL ERROR: {0}".format(self.output[1]))
else:
raise Exception("BACKGROUND CALL ERROR: {0}".format(self.output[1]))

if self.callback is not None:
self.pool_limit_callback.shutdown(wait=True)
self.is_done = True

def wait(self, interval: float = 0.1):
while not self.is_done:
time.sleep(interval)
return self.output

def wait(self, interval: float = 0.1):
while not self.is_done:
time.sleep(interval)
return self.output

class Background:
"""
Run a task in background using Threading.Event
:task: [Task, function] item
:interval: timedelta or float seconds
"""

def __init__(self, task, interval:float=None, max_repeat:int=-1, callback=None):
assert callable(task), 'You have to transfer a callable instance or an mlchain.Task'
assert (max_repeat > 0 and interval is not None and interval > 0) or max_repeat == -1, "interval need to be set when max_repeat > 0"
assert callback is None or callable(callback), "callback need to be callable"

if interval is not None:
if isinstance(interval, int) or isinstance(interval, float):
interval = timedelta(seconds = interval)

self.task = task
self.interval = interval
self.max_repeat = max_repeat
self.callback = callback

def run(self, max_thread:int=1, pass_fail_job:bool=False):
task = BackgroundTask(interval=self.interval, task=self.task,
max_repeat=self.max_repeat, callback=self.callback, max_thread=max_thread, pass_fail_job=pass_fail_job)
task.start()

return task
116 changes: 116 additions & 0 deletions mlchain/workflows_sync/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import os
from multiprocessing.pool import ThreadPool
from mlchain.base.log import format_exc, except_handler, logger
from typing import List

class TrioProgress:
def __init__(self, total, notebook_mode=False, **kwargs):
if notebook_mode: # pragma: no cover
from tqdm.notebook import tqdm
else:
from tqdm import tqdm

self.tqdm = tqdm(total=total, **kwargs)
self.count = 0
self.total = total

def task_processed(self):
self.tqdm.update(1)
self.count += 1
if self.count == self.total:
self.tqdm.close()

class Parallel:
"""
Build a collection of tasks to be executed in parallel
:tasks: List of [Task, function] items
:max_threads: Maximum Threads for this Parallel
:max_retries: Maximum retry time when a task fail
:pass_fail_job: Pass or Raise error when a task run fail
:verbose: Print error or not
"""

def __init__(
self,
tasks: List,
max_threads: int = 10,
max_retries: int = 0,
pass_fail_job: bool = False,
verbose: bool = True,
):
"""
:tasks: [Task, function] items
:max_threads: Maximum threads to Parallel, max_threads=0 means no limitation
:max_retries: How many time retry when job fail
:pass_fail_job: No exeption when a job fail
:verbose: Verbose or not
"""

assert isinstance(tasks, list) and all(
callable(task) for task in tasks
), "You have to transfer a list of callable instances or mlchain.Task"
self.tasks = tasks
if max_threads == -1:
max_threads = 100
elif max_threads == 0:
max_threads = os.cpu_count()
self.max_threads = max(0, max_threads)

self.max_retries = max(max_retries + 1, 1)
self.pass_fail_job = pass_fail_job
self.verbose = verbose
self.show_progress_bar = False
self.progress_bar = None

def update_progress_bar(self):
if self.show_progress_bar:
self.progress_bar.task_processed()

def exec_task(self, task, idx=None):
for retry_idx in range(self.max_retries):
try:
output = task.exec()
self.update_progress_bar()
return output
except Exception as ex:
if retry_idx == self.max_retries - 1 and not self.pass_fail_job:
return ex
if retry_idx < self.max_retries - 1 or not self.verbose:
logger.error(
"PARALLEL ERROR in {0}th task and retry task, "
"run times = {1}".format(idx, retry_idx + 1)
)
else:
logger.debug(
"PASSED PARALLEL ERROR in {}th task:".format(idx, format_exc(name="mlchain.workflows.parallel"))
)
return None

def run(self, progress_bar: bool = False, notebook_mode: bool = False):
"""
When you run parallel in root, please use this function
:progress_bar: Use tqdm to show the progress of calling Parallel
:notebook_mode: Put it to true if run mlchain inside notebook
"""
pool = ThreadPool(max(1, self.max_threads))
if progress_bar:
self.show_progress_bar = True
self.progress_bar = TrioProgress(
total=len(self.tasks), notebook_mode=notebook_mode
)

async_result = [
pool.apply_async(self.exec_task, args=[task, idx])
for idx, task in enumerate(self.tasks)
]

results = []
for result in async_result:
output = result.get()
if isinstance(output, Exception):
pool.terminate()
pool.close()
raise output
results.append(output)
pool.close()
return results
Loading

0 comments on commit 5c00860

Please sign in to comment.