Skip to content

Commit

Permalink
Fixed libre office process multi-port startup issues in 'sync' mode.
Browse files Browse the repository at this point in the history
  • Loading branch information
vladd-bit committed Jul 26, 2023
1 parent fad1f0c commit de7c812
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 51 deletions.
4 changes: 3 additions & 1 deletion config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import multiprocessing
import os
import multiprocessing

from sys import platform

OCR_SERVICE_VERSION = "0.2.0"
Expand Down Expand Up @@ -77,6 +78,7 @@
LIBRE_OFFICE_PORT_CAP = DEFAULT_LIBRE_OFFICE_SERVER_PORT + OCR_WEB_SERVICE_WORKERS

LIBRE_OFFICE_LISTENER_PORT_RANGE = range(DEFAULT_LIBRE_OFFICE_SERVER_PORT, LIBRE_OFFICE_PORT_CAP)

LIBRE_OFFICE_NETWORK_INTERFACE = "localhost"


Expand Down
23 changes: 23 additions & 0 deletions gunicorn.conf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import time
import sys
import traceback

from ocr_service.utils.utils import sync_port_mapping

sys.path.append(".")

counter = 0

def pre_fork(server, worker):
global counter

time.sleep(2 + counter * 1.5)
counter += 1

def post_fork(server, worker):
try:
sync_port_mapping(worker_id=(counter - 1), worker_pid=worker.pid)
time.sleep(1)
except Exception:
print(traceback.print_exc())

68 changes: 25 additions & 43 deletions ocr_service/app/app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import atexit
import logging
import os
import subprocess
import sys
Expand All @@ -12,57 +11,34 @@
from config import *
from ocr_service.api import api
from ocr_service.processor.processor import Processor
from ocr_service.utils.utils import is_port_in_use
from ocr_service.utils.utils import setup_logging,get_assigned_port

sys.path.append("..")

def setup_logging():
"""
:description: Configure and setup a default logging handler to print messages to stdout
"""
global root_logger
root_logger = logging.getLogger()
log_format = '[%(asctime)s] [%(levelname)s] %(name)s: %(message)s'
app_log_level = os.getenv("LOG_LEVEL", LOG_LEVEL)
log_handler = logging.StreamHandler(sys.stdout)
log_handler.setFormatter(logging.Formatter(fmt=log_format))
log_handler.setLevel(level=app_log_level)

# only add the handler if a previous one does not exists
handler_exists = False
for h in root_logger.handlers:
if isinstance(h, logging.StreamHandler) and h.level is log_handler.level:
handler_exists = True
break

if not handler_exists:
root_logger.addHandler(log_handler)
app = Flask(__name__, instance_relative_config=True)

def start_office_server(port_num):
loffice_process = { "process" : subprocess.Popen(args=[LIBRE_OFFICE_PYTHON_PATH, "-m", "unoserver.server", "--interface", LIBRE_OFFICE_NETWORK_INTERFACE, "--executable", LIBRE_OFFICE_EXEC_PATH, "--port", str(port_num)],
cwd=TMP_FILE_DIR, close_fds=True, shell=False), "used" : False}
cwd=TMP_FILE_DIR, close_fds=True, shell=False), "pid" : "" , "port" : str(port_num)}
loffice_process["pid"] = loffice_process["process"].pid

return loffice_process

def start_office_converter_servers():
loffice_processes = {}

loffice_processes = {}
port_count = 0

for port_num in LIBRE_OFFICE_LISTENER_PORT_RANGE:
if OCR_WEB_SERVICE_WORKERS > 1:
if port_num not in list(loffice_processes.keys()):
port_count += 1
if is_port_in_use(port_num) == False:
loffice_processes[port_num] = start_office_server(port_num)
else:
break
else:
print("WOREKER TRYING PORT " + str(port_num))
if is_port_in_use(port_num) == False and port_count < OCR_WEB_SERVICE_THREADS:
loffice_processes[port_num] = start_office_server(port_num)
port_count += 1
else:
if (port_count < OCR_WEB_SERVICE_WORKERS or port_count < OCR_WEB_SERVICE_THREADS):
port_count += 1
print("STARTED WORKER ON PORT: " + str(port_num))
process = start_office_server(port_num)
loffice_processes[port_num] = process
if port_num == get_assigned_port(os.getpid()) and OCR_WEB_SERVICE_THREADS == 1:
break

elif OCR_WEB_SERVICE_WORKERS == 1:
continue

return loffice_processes

Expand All @@ -74,15 +50,18 @@ def create_app():

try:
setup_logging()
loffice_processes = start_office_converter_servers()
global _loffice_processes
_loffice_processes = {}
_loffice_processes.update(start_office_converter_servers())

app = Flask(__name__, instance_relative_config=True)
app.register_blueprint(api)

# share processes for api call resource allocation
api.processor = Processor(loffice_processes)
api.processor = Processor()
api.processor.loffice_process_list.update(_loffice_processes)

proc_listener_thread = threading.Thread(target=process_listener, name="loffice_proc_listener")
proc_listener_thread.daemon = True
proc_listener_thread.start()

except Exception:
Expand All @@ -99,10 +78,13 @@ def process_listener():
if psutil.pid_exists(p.pid) is False or p.is_running() is False or p.status() is psutil.STATUS_ZOMBIE:
print("Libreoffice port:" + str(port_num) + "unoserver is DOWN, restarting.....")
exit_handler(port_num)
api.processor.loffice_process_list[port_num] = start_office_server(port_num)
process = start_office_server(port_num)
api.processor.loffice_process_list[port_num] = process

print("Checking soffice pid: " + str(p.pid) + " | port: " + str(port_num))
print("Checking loffice subproceess status " + str(p.name))

_loffice_processes = api.processor.loffice_process_list
time.sleep(LIBRE_OFFICE_PROCESSES_LISTENER_INTERVAL)
except Exception:
raise
Expand Down
5 changes: 2 additions & 3 deletions ocr_service/processor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,12 @@
class Processor:

@injector.inject
def __init__(self, loffice_process_list: dict = None):
def __init__(self):
app_log_level = os.getenv("LOG_LEVEL", LOG_LEVEL)
self.log = logging.getLogger(self.__class__.__name__)
self.log.setLevel(level=app_log_level)
self.log.debug("Processor log level set to : ", str(app_log_level))

self.loffice_process_list = loffice_process_list
self.loffice_process_list = {}

def _preprocess_html_to_img(self, stream: bytes, file_name: str) -> List[PILImage]:
""" Uses html2image to screenshot the page to an PIL image.
Expand Down
56 changes: 52 additions & 4 deletions ocr_service/utils/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import fcntl
import json
import os
import sys
import psutil
import logging
import socket

from sys import platform
from typing import List
Expand Down Expand Up @@ -97,7 +98,54 @@ def get_process_id_by_process_name(process_name: str = "") -> int:

return pid

def is_port_in_use(port: int) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(('localhost', port)) == 0
def sync_port_mapping(worker_id = None, worker_pid = None):
with open(os.path.join(TMP_FILE_DIR, './worker_process_data.txt'), encoding="utf-8", mode='a+') as f:
fcntl.lockf(f, fcntl.LOCK_EX)

port_mapping = {}
text = f.read()

if len(text) > 0:
port_mapping = json.loads(text)

port_mapping[str((LIBRE_OFFICE_LISTENER_PORT_RANGE[0] + worker_id))] = str(worker_pid)
output = json.dumps(port_mapping, indent=1)
f.seek(0)
f.truncate(0)
f.write(output)
fcntl.lockf(f, fcntl.LOCK_UN)

def get_assigned_port(current_worker_pid):
port_mapping = {}
with open(os.path.join(TMP_FILE_DIR, './worker_process_data.txt'), encoding="utf-8", mode='r+') as f:
fcntl.lockf(f, fcntl.LOCK_EX)
port_mapping = json.loads(f.read())
fcntl.lockf(f, fcntl.LOCK_UN)

for port_num, worker_pid in port_mapping.items():
if int(worker_pid) == int(current_worker_pid):
return int(port_num)

return False

def setup_logging():
"""
:description: Configure and setup a default logging handler to print messages to stdout
"""
global root_logger
root_logger = logging.getLogger()
log_format = '[%(asctime)s] [%(levelname)s] %(name)s: %(message)s'
app_log_level = os.getenv("LOG_LEVEL", LOG_LEVEL)
log_handler = logging.StreamHandler(sys.stdout)
log_handler.setFormatter(logging.Formatter(fmt=log_format))
log_handler.setLevel(level=app_log_level)

# only add the handler if a previous one does not exists
handler_exists = False
for h in root_logger.handlers:
if isinstance(h, logging.StreamHandler) and h.level is log_handler.level:
handler_exists = True
break

if not handler_exists:
root_logger.addHandler(log_handler)

0 comments on commit de7c812

Please sign in to comment.