Skip to content

Commit

Permalink
fix formatting with black
Browse files Browse the repository at this point in the history
  • Loading branch information
Lukas Wingerberg committed Nov 20, 2024
1 parent 453a779 commit f08ef02
Showing 1 changed file with 73 additions and 46 deletions.
119 changes: 73 additions & 46 deletions src/server/grpc-ffmpeg.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,55 +24,68 @@
logger = logging.getLogger(__name__)

# Configuration
VALID_TOKEN = os.getenv('VALID_TOKEN', 'my_secret_token')
ALLOWED_BINARIES = ['ffmpeg', 'ffprobe', 'mediainfo', 'vainfo'] # Added 'mediainfo' to allowed binaries
BINARY_PATH_PREFIX = os.getenv('BINARY_PATH_PREFIX', '/usr/lib/jellyfin-ffmpeg/')
SSL_KEY_PATH = os.getenv('SSL_KEY_PATH', 'server.key')
SSL_CERT_PATH = os.getenv('SSL_CERT_PATH', 'server.crt')
USE_SSL = os.getenv('USE_SSL', 'false').lower() == 'true'
MAX_FFMPEG_WORKERS= os.getenv('MAX_FFMPEG_WORKERS', 10)
VALID_TOKEN = os.getenv("VALID_TOKEN", "my_secret_token")
ALLOWED_BINARIES = [
"ffmpeg",
"ffprobe",
"mediainfo",
"vainfo",
] # Added 'mediainfo' to allowed binaries
BINARY_PATH_PREFIX = os.getenv("BINARY_PATH_PREFIX", "/usr/lib/jellyfin-ffmpeg/")
SSL_KEY_PATH = os.getenv("SSL_KEY_PATH", "server.key")
SSL_CERT_PATH = os.getenv("SSL_CERT_PATH", "server.crt")
USE_SSL = os.getenv("USE_SSL", "false").lower() == "true"
MAX_FFMPEG_WORKERS = os.getenv("MAX_FFMPEG_WORKERS", 10)

# Health check variables
HEALTHCHECK_INTERVAL = 60 # Interval in seconds (1 hour)
HEALTHCHECK_FILE = '/app/healthcheck.mkv'
HEALTHCHECK_OUTPUT = '/tmp/healthcheck_output.mp4'
HEALTHCHECK_FILE = "/app/healthcheck.mkv"
HEALTHCHECK_OUTPUT = "/tmp/healthcheck_output.mp4"

# Variable to store health status
health_status = {'healthy': False}
health_status = {"healthy": False}

# Prometheus metrics
binary_counters = {
binary: Counter(f"{binary}_commands", f"Number of {binary} commands executed")
for binary in ALLOWED_BINARIES
}
ffmpeg_process_gauge = Gauge("ffmpeg_process_count", "Number of running ffmpeg processes")
ffmpeg_max_workers_gauge = Gauge('ffmpeg_max_workers', 'Maximum number of allowed ffmpeg processes based on thread pool size')
ffmpeg_process_gauge = Gauge(
"ffmpeg_process_count", "Number of running ffmpeg processes"
)
ffmpeg_max_workers_gauge = Gauge(
"ffmpeg_max_workers",
"Maximum number of allowed ffmpeg processes based on thread pool size",
)


class TokenAuthValidator(grpc.AuthMetadataPlugin):
def __call__(self, context, callback):
token = None
for key, value in context.invocation_metadata():
if key == 'authorization':
if key == "authorization":
token = value
break

if token is None or token != VALID_TOKEN:
callback(grpc.StatusCode.UNAUTHENTICATED, 'Invalid token')
callback(grpc.StatusCode.UNAUTHENTICATED, "Invalid token")
else:
callback(None, None)


class FFmpegService(ffmpeg_pb2_grpc.FFmpegServiceServicer):
async def ExecuteCommand(self, request, context):
command = request.command
logger.info(f'Received command: {command}')
logger.info(f"Received command: {command}")

# Tokenize the command
tokens = shlex.split(command)

# Check if the command is allowed
if tokens[0] not in ALLOWED_BINARIES:
yield ffmpeg_pb2.CommandResponse(output="Error: Command not allowed", exit_code=1)
yield ffmpeg_pb2.CommandResponse(
output="Error: Command not allowed", exit_code=1
)
return

# Prepend the binary path prefix
Expand All @@ -85,9 +98,7 @@ async def ExecuteCommand(self, request, context):

try:
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)

async def read_stream(stream, response_type, stream_name):
Expand All @@ -96,12 +107,16 @@ async def read_stream(stream, response_type, stream_name):
if not line:
break
logger.info(f'{stream_name}: {line.decode("utf-8").strip()}')
yield response_type(output=line.decode('utf-8'), stream=stream_name)
yield response_type(output=line.decode("utf-8"), stream=stream_name)

async for response in read_stream(process.stdout, ffmpeg_pb2.CommandResponse, "stdout"):
async for response in read_stream(
process.stdout, ffmpeg_pb2.CommandResponse, "stdout"
):
yield response

async for response in read_stream(process.stderr, ffmpeg_pb2.CommandResponse, "stderr"):
async for response in read_stream(
process.stderr, ffmpeg_pb2.CommandResponse, "stderr"
):
yield response

await process.wait()
Expand Down Expand Up @@ -143,7 +158,9 @@ async def run_health_check(self):
await self.run_command(cleanup_command)

# Run ffmpeg conversion test
ffmpeg_command = f"{BINARY_PATH_PREFIX}ffmpeg -i {HEALTHCHECK_FILE} {HEALTHCHECK_OUTPUT}"
ffmpeg_command = (
f"{BINARY_PATH_PREFIX}ffmpeg -i {HEALTHCHECK_FILE} {HEALTHCHECK_OUTPUT}"
)
ffmpeg_output = await self.run_command(ffmpeg_command)

if "Conversion failed" in ffmpeg_output:
Expand All @@ -162,13 +179,11 @@ async def run_health_check(self):

def update_health_status(self, is_healthy):
global health_status
health_status['healthy'] = is_healthy
health_status["healthy"] = is_healthy

async def run_command(self, command):
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)

stdout, stderr = await process.communicate()
Expand All @@ -178,7 +193,7 @@ async def run_command(self, command):
if process.returncode != 0:
logger.error(f"Command '{command}' failed with error: {error}")
return f"Command '{command}' failed with error: {error}"

return output

async def is_file_valid(self, filename):
Expand All @@ -189,25 +204,26 @@ async def is_file_valid(self, filename):
logger.error(f"Error checking file {filename}: {str(e)}")
return False


async def start_grpc_server():
server = grpc.aio.server(ThreadPoolExecutor(max_workers=MAX_FFMPEG_WORKERS))
ffmpeg_pb2_grpc.add_FFmpegServiceServicer_to_server(FFmpegService(), server)

# Set the ffmpeg_max_workers metric to the max_workers value
ffmpeg_max_workers_gauge.set(MAX_FFMPEG_WORKERS)

listen_addr = '0.0.0.0:50051'
listen_addr = "0.0.0.0:50051"
if USE_SSL:
with open(SSL_CERT_PATH, 'rb') as f:
with open(SSL_CERT_PATH, "rb") as f:
certificate_chain = f.read()
with open(SSL_KEY_PATH, 'rb') as f:
with open(SSL_KEY_PATH, "rb") as f:
private_key = f.read()
server_creds = grpc.ssl_server_credentials(((private_key, certificate_chain),))
server.add_secure_port(listen_addr, server_creds)
logger.info(f'Server started with SSL on {listen_addr}')
logger.info(f"Server started with SSL on {listen_addr}")
else:
server.add_insecure_port(listen_addr)
logger.info(f'Server started without SSL on {listen_addr}')
logger.info(f"Server started without SSL on {listen_addr}")

await server.start()
try:
Expand All @@ -217,26 +233,31 @@ async def start_grpc_server():
finally:
await server.stop(0)


async def start_http_server():
async def health_check(request):
if health_status['healthy']:
if health_status["healthy"]:
return web.Response(text="OK")
else:
return web.Response(text="Health check failed", status=500)

async def metrics(request):

return web.Response(body=generate_latest(), headers={"Content-Type": CONTENT_TYPE_LATEST})

return web.Response(
body=generate_latest(), headers={"Content-Type": CONTENT_TYPE_LATEST}
)

app = web.Application()
app.router.add_get('/health', health_check)
app.router.add_get('/metrics', metrics) # Prometheus metrics endpoint
app.router.add_get("/health", health_check)
app.router.add_get("/metrics", metrics) # Prometheus metrics endpoint

runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, '0.0.0.0', 8080)
site = web.TCPSite(runner, "0.0.0.0", 8080)
await site.start()
logger.info('http endpoint server started on http://localhost:8080 /health and /metrics')
logger.info(
"http endpoint server started on http://localhost:8080 /health and /metrics"
)

# Keep the server alive until shutdown
try:
Expand All @@ -258,32 +279,38 @@ async def ffmpeg_server():
logger.info("Server tasks canceled. Cleaning up...")
raise


def handle_signals():
loop = asyncio.get_event_loop()
for signame in {'SIGINT', 'SIGTERM'}:
loop.add_signal_handler(getattr(signal, signame), lambda: asyncio.create_task(shutdown(signame)))
for signame in {"SIGINT", "SIGTERM"}:
loop.add_signal_handler(
getattr(signal, signame), lambda: asyncio.create_task(shutdown(signame))
)


async def shutdown(signame):
logger.info(f"Received signal {signame}, shutting down...")
tasks = [task for task in asyncio.all_tasks() if task is not asyncio.current_task()]

# Terminate all subprocesses if any exist
for task in tasks:
if hasattr(task, 'process') and task.process is not None:
if hasattr(task, "process") and task.process is not None:
task.process.terminate()
else:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True, timeout=5)
logger.info("Shutdown complete.")


async def main():
try:
await ffmpeg_server()
except asyncio.CancelledError:
logger.info("Main task canceled.")
raise

if __name__ == '__main__':

if __name__ == "__main__":
handle_signals()
try:
asyncio.run(main())
Expand All @@ -293,4 +320,4 @@ async def main():
sys.exit(0) # Exit with 0 on Ctrl+C
except Exception as e:
logger.error(f"Unhandled exception: {e}")
sys.exit(1) # Exit with 1 on unhandled exceptions
sys.exit(1) # Exit with 1 on unhandled exceptions

0 comments on commit f08ef02

Please sign in to comment.