Skip to content

Commit

Permalink
Merge pull request #44 from Menghuan1918/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
Menghuan1918 authored Oct 28, 2024
2 parents 748a3d7 + 4964c7f commit 712acd1
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 32 deletions.
45 changes: 27 additions & 18 deletions src/pdfdeal/Doc2X/ConvertV2.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,24 @@ async def upload_pdf(apikey: str, pdffile: str, ocr: bool = True) -> str:
},
content=file,
)

trace_id = post_res.headers.get("trace-id", "Failed to get trace-id ")
if post_res.status_code == 200:
response_data = json.loads(post_res.content.decode("utf-8"))
uid = response_data.get("data", {}).get("uid")
trace_id = post_res.headers.get("trace-id")

await code_check(
code=response_data.get("code", response_data), uid=uid, trace_id=trace_id
)
return uid

if post_res.status_code == 429:
raise RateLimit()
raise RateLimit(trace_id=trace_id)
if post_res.status_code == 400:
raise RequestError(post_res.text)
raise RequestError(error_code=post_res.text, trace_id=trace_id)

raise Exception(f"Upload file error! {post_res.status_code}:{post_res.text}")
raise Exception(
f"Upload file error,trace_id{trace_id}:{post_res.status_code}:{post_res.text}"
)


@async_retry()
Expand Down Expand Up @@ -194,19 +196,19 @@ async def uid_status(
response_data = await client.get(
url, headers={"Authorization": f"Bearer {apikey}"}
)
trace_id = response_data.headers.get("trace-id", "Failed to get trace-id ")
if response_data.status_code != 200:
raise Exception(
f"Get status error! {response_data.status_code}:{response_data.text}"
f"Get status error! Trace-id:{trace_id}:{response_data.status_code}:{response_data.text}"
)

try:
data = json.loads(response_data.content.decode("utf-8"))
except Exception as e:
raise Exception(
f"Get status error with {e}! {response_data.status_code}:{response_data.text}"
f"Get status error with {e}! Trace-id:{trace_id}:{response_data.status_code}:{response_data.text}"
)

trace_id = response_data.headers.get("trace-id")
await code_check(data.get("code", response_data), uid, trace_id=trace_id)

progress, status = data["data"].get("progress", 0), data["data"].get("status", "")
Expand All @@ -216,9 +218,13 @@ async def uid_status(
texts, locations = await decode_data(data["data"], convert)
return 100, "Success", texts, locations
elif status == "failed":
raise RequestError(f"Failed to deal with file! {response_data.text}")
raise RequestError(
f"Failed to deal with file uid {uid}! Trace-id:{trace_id}:{response_data.text}"
)
else:
logger.warning(f"Unknown status: {status}")
logger.warning(
f"Unknown status: {status} in uid {uid} file! Trace-id:{trace_id}:{response_data.text}"
)
return progress, status, [], []


Expand Down Expand Up @@ -258,14 +264,14 @@ async def convert_parse(
response_data = await client.post(
url, json=payload, headers={"Authorization": f"Bearer {apikey}"}
)

trace_id = response_data.headers.get("trace-id", "Failed to get trace-id ")
if response_data.status_code != 200:
raise Exception(
f"Conversion request failed: {response_data.status_code}:{response_data.text}"
f"Conversion request failed: Trace-id:{trace_id}:{response_data.status_code}:{response_data.text}"
)

data = response_data.json()
trace_id = response_data.headers.get("trace-id")

await code_check(data.get("code", response_data), uid, trace_id=trace_id)
status = data["data"]["status"]
url = data["data"].get("url", "")
Expand All @@ -275,7 +281,9 @@ async def convert_parse(
elif status == "success":
return "Success", url
else:
raise RequestError(f"Conversion uid {uid} file failed: {data}")
raise RequestError(
f"Conversion uid {uid} file failed in Trace-id:{trace_id}:{data}"
)


@async_retry()
Expand All @@ -301,14 +309,13 @@ async def get_convert_result(apikey: str, uid: str) -> Tuple[str, str]:
response = await client.get(
url, params=params, headers={"Authorization": f"Bearer {apikey}"}
)

trace_id = response.headers.get("trace-id", "Failed to get trace-id ")
if response.status_code != 200:
raise Exception(
f"Get conversion result failed: {response.status_code}:{response.text}"
f"Get conversion result failed: Trace-id:{trace_id}:{response.status_code}:{response.text}"
)

data = response.json()
trace_id = response.headers.get("trace-id")
await code_check(data.get("code", response), uid, trace_id=trace_id)
status = data["data"]["status"]
url = data["data"].get("url", "")
Expand All @@ -318,7 +325,9 @@ async def get_convert_result(apikey: str, uid: str) -> Tuple[str, str]:
elif status == "success":
return "Success", url
else:
raise RequestError(f"Get conversion result for uid {uid} failed: {data}")
raise RequestError(
f"Get conversion result for uid {uid} failed:Trace-id:{trace_id}:{data}"
)


@async_retry()
Expand Down
9 changes: 7 additions & 2 deletions src/pdfdeal/Doc2X/Exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

async def code_check(code: str, uid: str = None, trace_id: str = None):
if code in ["parse_page_limit_exceeded", "parse_concurrency_limit"]:
raise RateLimit()
raise RateLimit(trace_id=trace_id)
if code in RequestError.ERROR_CODES:
raise RequestError(code, uid=uid, trace_id=trace_id)
if code not in ["ok", "success"]:
Expand All @@ -20,8 +20,13 @@ class RateLimit(Exception):
Error when rate limit is reached.
"""

def __init__(self, trace_id: str = None):
self.trace_id = trace_id
super().__init__()

def __str__(self):
return "Rate limit reached. Please wait a moment and try again. (速率限制,请稍后重试)"
trace_msg = f" (Trace ID: {self.trace_id})" if self.trace_id else ""
return f"Rate limit reached. Please wait a moment and try again. (速率限制,请稍后重试){trace_msg}"


class RequestError(Exception):
Expand Down
62 changes: 50 additions & 12 deletions src/pdfdeal/doc2x.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,30 @@ async def parse_pdf(
convert: bool,
) -> Tuple[str, List[str], List[dict]]:
"""Parse PDF file and return uid and extracted text"""
thread_lock = False

async def retry_upload():
for _ in range(maxretry):
try:
return await upload_pdf(apikey, pdf_path, ocr)
except RateLimit:
global limit_lock, get_max_limit, max_threads, full_speed, thread_min
nonlocal thread_lock
if full_speed:
if not get_max_limit and not thread_lock:
thread_lock = True
get_max_limit = True
async with limit_lock:
max_threads = max(thread_min, max_threads - 1)
else:
if not thread_lock:
max_threads = max(thread_min, max_threads - 1)
thread_lock = True
logger.warning("Rate limit reached, retrying...")
await asyncio.sleep(wait_time)
raise RequestError("Max retry reached for upload_pdf")
raise RequestError(
"Max retry reached for upload_pdf, this may be a rate limit issue, try to reduce the number of threads."
)

logger.info(f"Uploading {pdf_path}...")
try:
Expand Down Expand Up @@ -81,6 +96,7 @@ async def convert_to_format(
target_filename=output_name or uid,
)
elif status == "Processing":
logger.info(f"Converting {uid} {output_format} file...")
await asyncio.sleep(3)
status, url = await get_convert_result(apikey, uid)
else:
Expand All @@ -97,6 +113,7 @@ def __init__(
retry_time: int = 5,
max_time: int = 90,
debug: bool = False,
full_speed: bool = False,
) -> None:
"""
Initialize a Doc2X client.
Expand All @@ -108,7 +125,7 @@ def __init__(
retry_time (int, optional): The number of retry attempts. Defaults to 5.
max_time (int, optional): The maximum time (in seconds) to wait for a response. Defaults to 90.
debug (bool, optional): Whether to enable debug logging. Defaults to False.
request_interval (float, optional): Interval between requests in seconds. Defaults to 0.25.
full_speed (bool, optional): **Experimental function**. Whether to enable automatic sniffing of the concurrency limit. Defaults to False.
Raises:
ValueError: If no API key is found.
Expand All @@ -122,10 +139,9 @@ def __init__(
self.retry_time = retry_time
self.max_time = max_time
self.thread = thread
self.parse_thread = thread
self.convert_thread = thread
self.max_pages = max_pages
self.request_interval = 0.2
self.request_interval = 0.1
self.full_speed = full_speed

handler = logging.StreamHandler()
formatter = logging.Formatter(
Expand Down Expand Up @@ -177,6 +193,16 @@ async def pdf2file_back(
convert_tasks = set()
results = [None] * len(pdf_file)
parse_results = [None] * len(pdf_file)
global limit_lock, get_max_limit, max_threads, full_speed, thread_min
thread_min = self.thread
full_speed = self.full_speed
limit_lock = asyncio.Lock()
get_max_limit = False
max_threads = self.thread

if full_speed:
self.max_time = 180
self.retry_time = 10

async def process_file(index, pdf, name):
try:
Expand Down Expand Up @@ -229,7 +255,11 @@ async def process_file(index, pdf, name):
convert_tasks.add(task)

except asyncio.TimeoutError:
results[index] = ("", "Operation timed out", False)
results[index] = (
"",
"Operation timed out, this may be a rate limit issue or network issue, try to reduce the number of threads.",
False,
)
except Exception as e:
results[index] = ("", str(e), False)
finally:
Expand Down Expand Up @@ -277,17 +307,24 @@ async def convert_file(index, name):

results[index] = (result, "", True)
except asyncio.TimeoutError:
results[index] = ("", "Operation timed out", False)
results[index] = (
"",
"Operation timed out, this may be a rate limit issue or network issue, try to reduce the number of threads.",
False,
)
except Exception as e:
results[index] = ("", str(e), False)

# Create and run parse tasks with controlled concurrency
for i, (pdf, name) in enumerate(zip(pdf_file, output_names)):
while len(parse_tasks) >= self.parse_thread:
while len(parse_tasks) >= max_threads:
done, parse_tasks = await asyncio.wait(
parse_tasks, return_when=asyncio.FIRST_COMPLETED
)

if full_speed:
async with limit_lock:
if not get_max_limit:
max_threads = max_threads + 1
task = asyncio.create_task(process_file(i, pdf, name))
parse_tasks.add(task)

Expand All @@ -298,7 +335,8 @@ async def convert_file(index, name):
# Wait for remaining convert tasks
if convert_tasks:
await asyncio.wait(convert_tasks)

if full_speed:
logger.info(f"Convert tasks done with {max_threads} threads.")
success_files = [r[0] if r[2] else "" for r in results]
failed_files = [
{"error": r[1], "path": pdf} if not r[2] else {"error": "", "path": ""}
Expand Down Expand Up @@ -337,9 +375,9 @@ def pdf2file(
pdf_file (str | List[str]): Path to a single PDF file or a list of PDF file paths.
output_names (List[str], optional): List of output file names. Defaults to None.
output_path (str, optional): Directory path for output files. Defaults to "./Output".
output_format (str, optional): Desired output format. Defaults to `md_dollar`. Supported formats include:`md_dollar`|`md`|`tex`|`docx`, support output variable: `txt`|`txts`|`detailed`
output_format (str, optional): Desired output format. Defaults to `md_dollar`. Supported formats include:`md_dollar`|`md`|`tex`|`docx`, will return the path of files, support output variable: `text`|`texts`|`detailed`(it means `string in md format`, `list of strings split by page`, `list of strings split by page (including detailed page information)`)
ocr (bool, optional): Whether to use OCR. Defaults to True.
convert (bool, optional): Whether to convert Convert "[" and "[[" to "$" and "$$", only valid if `output_format` is a variable format(`txt`|`txts`|`detailed`). Defaults to False.
convert (bool, optional): Whether to convert "[" and "[[" to "$" and "$$", only valid if `output_format` is a variable format(`txt`|`txts`|`detailed`). Defaults to False.
Returns:
Tuple[List[str], List[dict], bool]: A tuple containing:
Expand Down

0 comments on commit 712acd1

Please sign in to comment.