Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade the locking system #1024

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 57 additions & 12 deletions pipelinewise/cli/commands.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
"""
PipelineWise CLI - Commands
"""
import contextlib
import os
import shlex
import logging
import json
import time
from typing import Callable, Optional
import sherlock

from dataclasses import dataclass
from subprocess import PIPE, STDOUT, Popen
from subprocess import PIPE, STDOUT, Popen, TimeoutExpired

from . import utils
from .errors import StreamBufferTooLargeException
Expand Down Expand Up @@ -478,13 +481,20 @@ def log_file_with_status(log_file: str, status: str) -> str:
return f'{log_file}.{status}'


# pylint: disable=too-many-locals
def run_command(command: str, log_file: str = None, line_callback: callable = None):
# TODO: This method is too complex! make its complexity less than 15!
# pylint: disable=too-many-locals,too-many-statements,too-many-branches
def run_command( # noqa: C901
command: str,
lock: Optional[sherlock.lock.BaseLock] = None,
log_file: Optional[str] = None,
line_callback: Optional[Callable[[str], str]] = None,
):
"""
Runs a shell command with or without log file with STDOUT and STDERR

Args:
command: A unix command to run
lock: An optional distributed lock to ensure only one instance
log_file: Write stdout and stderr to log file
line_callback: function to call on each line on stdout and stderr
"""
Expand All @@ -505,10 +515,15 @@ def run_command(command: str, log_file: str = None, line_callback: callable = No
log_file_success = log_file_with_status(log_file, STATUS_SUCCESS)

# Start command
with Popen(shlex.split(piped_command), stdout=PIPE, stderr=STDOUT) as proc:
with open(log_file_running, 'a+', encoding='utf-8') as logfile:
last_renew = time.time()
try:
with lock or contextlib.nullcontext() as lock_obj, \
Popen(shlex.split(piped_command), stdout=PIPE, stderr=STDOUT) as proc, \
open(log_file_running, 'a+', encoding='utf-8') as logfile:
# Prevent reading from stdout blocking.
os.set_blocking(proc.stdout.fileno(), False)
stdout = ''
while True:
while proc.poll() is None:
line = proc.stdout.readline()
if line:
decoded_line = line.decode('utf-8')
Expand All @@ -520,8 +535,21 @@ def run_command(command: str, log_file: str = None, line_callback: callable = No

logfile.write(decoded_line)
logfile.flush()
if proc.poll() is not None:
break

if lock_obj is not None:
# Check if the lock is going to expire in the next 5 seconds
# and so we should renew the lock.
now = time.time()
if now - last_renew > lock_obj.expire - 5:
if lock_obj.renew() is False:
# We've failed to renew the lock for some reason so we should probably
# exit.
proc.terminate()
last_renew = now
except Exception:
# If the subprocess failed for any reason then make sure to rename the logfile.
os.rename(log_file_running, log_file_failed)
raise

proc_rc = proc.poll()
if proc_rc != 0:
Expand All @@ -542,13 +570,30 @@ def run_command(command: str, log_file: str = None, line_callback: callable = No
return [proc_rc, stdout, None]

# No logfile needed: STDOUT and STDERR returns in an array once the command finished
with Popen(shlex.split(piped_command), stdout=PIPE, stderr=PIPE) as proc:
proc_tuple = proc.communicate()
last_renew = time.time()
with lock or contextlib.nullcontext() as lock_obj, \
Popen(shlex.split(piped_command), stdout=PIPE, stderr=PIPE) as proc:
while True:
try:
proc_tuple = proc.communicate(timeout=1.0)
break
except TimeoutExpired:
if lock_obj is not None:
# Check if the lock is going to expire in the next 5 seconds
# and so we should renew the lock.
now = time.time()
if now - last_renew > lock_obj.expire - 5:
if lock_obj.renew() is False:
# We've failed to renew the lock for some reason so we should probably
# exit.
proc.terminate()
last_renew = now

proc_rc = proc.returncode
stdout = proc_tuple[0].decode('utf-8')
stderr = proc_tuple[1].decode('utf-8')

if proc_rc != 0:
LOGGER.error(stderr)
if proc_rc != 0:
LOGGER.error(stderr)

return [proc_rc, stdout, stderr]
1 change: 0 additions & 1 deletion pipelinewise/cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ def get_connector_files(connector_dir: str) -> Dict:
'state': os.path.join(connector_dir, 'state.json'),
'transformation': os.path.join(connector_dir, 'transformation.json'),
'selection': os.path.join(connector_dir, 'selection.json'),
'pidfile': os.path.join(connector_dir, 'pipelinewise.pid'),
}

def save(self):
Expand Down
Loading