Skip to content

Commit

Permalink
chore: Fix shenanigans with ThreadPoolExecutor when sending the same …
Browse files Browse the repository at this point in the history
…file to different instances
  • Loading branch information
TheophileDiot committed Aug 7, 2024
1 parent 8a32358 commit 171af43
Showing 1 changed file with 17 additions and 24 deletions.
41 changes: 17 additions & 24 deletions src/common/utils/ApiCaller.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#!/usr/bin/env python3

from concurrent.futures import ThreadPoolExecutor, as_completed
from copy import deepcopy
from io import BytesIO
from os import getenv, sep
from os.path import join
from sys import path as sys_path
from tarfile import open as tar_open
from typing import Any, Dict, List, Literal, Optional, Tuple, Union

# Update system path for dependencies
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",))]:
if deps_path not in sys_path:
sys_path.append(deps_path)
Expand All @@ -28,20 +30,21 @@ def send_to_apis(
files: Optional[Dict[str, BytesIO]] = None,
data: Optional[Dict[str, Any]] = None,
response: bool = False,
) -> Tuple[bool, Tuple[bool, Optional[Dict[str, Any]]]]:
def send_request(api):
if files is not None:
for buffer in files.values():
buffer.seek(0)
) -> Tuple[bool, Optional[Dict[str, Any]]]:
def send_request(api, files):
sent, err, status, resp = api.request(method, url, files=files, data=data)
return api, sent, err, status, resp

ret = True
url = url if not url.startswith("/") else url[1:]
responses = {}
url = url.lstrip("/")
responses = {} if response else None

if files:
for buffer in files.values():
buffer.seek(0, 0) # Ensure the file pointer is at the beginning

with ThreadPoolExecutor() as executor:
future_to_api = {executor.submit(send_request, api): api for api in self.apis}
future_to_api = {executor.submit(send_request, api, deepcopy(files) if files else None): api for api in self.apis}
for future in as_completed(future_to_api):
api = future_to_api[future]
try:
Expand All @@ -52,33 +55,23 @@ def send_request(api):
else:
if status != 200:
ret = False
self.__logger.error(f"Error while sending API request to {api.endpoint}{url} : status = {resp['status']}, msg = {resp['msg']}")
self.__logger.error(f"Error while sending API request to {api.endpoint}{url} : status = {status}, msg = {resp.get('msg')}")
else:
self.__logger.info(
f"Successfully sent API request to {api.endpoint}{url}",
)
self.__logger.info(f"Successfully sent API request to {api.endpoint}{url}")

if response:
instance = api.endpoint.replace("http://", "").split(":")[0]
if isinstance(resp, dict):
responses[instance] = resp
else:
responses[instance] = resp.json()
responses[instance] = resp if isinstance(resp, dict) else resp.json()
except Exception as exc:
ret = False
self.__logger.error(f"API request generated an exception: {exc}")

if response:
return ret, responses
return ret
return ret, responses

def send_files(self, path: str, url: str) -> bool:
ret = True
with BytesIO() as tgz:
with tar_open(mode="w:gz", fileobj=tgz, dereference=True, compresslevel=3) as tf:
tf.add(path, arcname=".")
tgz.seek(0)
tgz.seek(0, 0)
files = {"archive.tar.gz": tgz}
if not self.send_to_apis("POST", url, files=files):
ret = False
return ret
return self.send_to_apis("POST", url, files=files)[0]

0 comments on commit 171af43

Please sign in to comment.