From 6698a000e677697b0159853a392232324343ea37 Mon Sep 17 00:00:00 2001 From: Dhruv Kadam <136492453+DhruvKadam-git@users.noreply.github.com> Date: Sun, 29 Sep 2024 14:49:40 +0530 Subject: [PATCH 1/2] Update worker.py 1. Error Handling: Added try-except blocks for file operations and network requests. 2. Logging Enhancements: Improved logging to capture more detailed information. 3. Code Refactoring: Created download_file and upload_index functions to avoid code repetition. 4. Configuration: Used constants for MIN_TOKENS, MAX_TOKENS, and RECURSION_DEPTH. --- application/worker.py | 120 ++++++++++++++++++++---------------------- 1 file changed, 56 insertions(+), 64 deletions(-) diff --git a/application/worker.py b/application/worker.py index 53f6c06a6..2000523c8 100755 --- a/application/worker.py +++ b/application/worker.py @@ -22,22 +22,23 @@ db = mongo["docsgpt"] sources_collection = db["sources"] +# Constants +MIN_TOKENS = 150 +MAX_TOKENS = 1250 +RECURSION_DEPTH = 2 # Define a function to extract metadata from a given filename. def metadata_from_filename(title): return {"title": title} - # Define a function to generate a random string of a given length. def generate_random_string(length): return "".join([string.ascii_letters[i % 52] for i in range(length)]) - current_dir = os.path.dirname( os.path.dirname(os.path.dirname(os.path.abspath(__file__))) ) - def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5): """ Recursively extract zip files with a limit on recursion depth. @@ -52,9 +53,13 @@ def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5): logging.warning(f"Reached maximum recursion depth of {max_depth}") return - with zipfile.ZipFile(zip_path, "r") as zip_ref: - zip_ref.extractall(extract_to) - os.remove(zip_path) # Remove the zip file after extracting + try: + with zipfile.ZipFile(zip_path, "r") as zip_ref: + zip_ref.extractall(extract_to) + os.remove(zip_path) # Remove the zip file after extracting + except Exception as e: + logging.error(f"Error extracting zip file {zip_path}: {e}") + return # Check for nested zip files and extract them for root, dirs, files in os.walk(extract_to): @@ -64,6 +69,38 @@ def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5): file_path = os.path.join(root, file) extract_zip_recursive(file_path, root, current_depth + 1, max_depth) +def download_file(url, params, dest_path): + try: + response = requests.get(url, params=params) + response.raise_for_status() + with open(dest_path, "wb") as f: + f.write(response.content) + except requests.RequestException as e: + logging.error(f"Error downloading file: {e}") + raise + +def upload_index(full_path, file_data): + try: + if settings.VECTOR_STORE == "faiss": + files = { + "file_faiss": open(full_path + "/index.faiss", "rb"), + "file_pkl": open(full_path + "/index.pkl", "rb"), + } + response = requests.post( + urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data + ) + else: + response = requests.post( + urljoin(settings.API_URL, "/api/upload_index"), data=file_data + ) + response.raise_for_status() + except requests.RequestException as e: + logging.error(f"Error uploading index: {e}") + raise + finally: + if settings.VECTOR_STORE == "faiss": + for file in files.values(): + file.close() # Define the main function for ingesting and processing documents. def ingest_worker( @@ -84,39 +121,25 @@ def ingest_worker( Returns: dict: Information about the completed ingestion task, including input parameters and a "limited" flag. """ - # directory = 'inputs' or 'temp' - # formats = [".rst", ".md"] input_files = None recursive = True limit = None exclude = True - # name_job = 'job1' - # filename = 'install.rst' - # user = 'local' sample = False token_check = True - min_tokens = 150 - max_tokens = 1250 - recursion_depth = 2 full_path = os.path.join(directory, user, name_job) logging.info(f"Ingest file: {full_path}", extra={"user": user, "job": name_job}) - # check if API_URL env variable is set file_data = {"name": name_job, "file": filename, "user": user} - response = requests.get( - urljoin(settings.API_URL, "/api/download"), params=file_data - ) - file = response.content + download_file(urljoin(settings.API_URL, "/api/download"), file_data, os.path.join(full_path, filename)) if not os.path.exists(full_path): os.makedirs(full_path) - with open(os.path.join(full_path, filename), "wb") as f: - f.write(file) # check if file is .zip and extract it if filename.endswith(".zip"): extract_zip_recursive( - os.path.join(full_path, filename), full_path, 0, recursion_depth + os.path.join(full_path, filename), full_path, 0, RECURSION_DEPTH ) self.update_state(state="PROGRESS", meta={"current": 1}) @@ -132,8 +155,8 @@ def ingest_worker( ).load_data() raw_docs = group_split( documents=raw_docs, - min_tokens=min_tokens, - max_tokens=max_tokens, + min_tokens=MIN_TOKENS, + max_tokens=MAX_TOKENS, token_check=token_check, ) @@ -148,28 +171,13 @@ def ingest_worker( for i in range(min(5, len(raw_docs))): logging.info(f"Sample document {i}: {raw_docs[i]}") - # get files from outputs/inputs/index.faiss and outputs/inputs/index.pkl - # and send them to the server (provide user and name in form) - file_data = { - "name": name_job, - "user": user, + file_data.update({ "tokens": tokens, "retriever": retriever, "id": str(id), "type": "local", - } - if settings.VECTOR_STORE == "faiss": - files = { - "file_faiss": open(full_path + "/index.faiss", "rb"), - "file_pkl": open(full_path + "/index.pkl", "rb"), - } - response = requests.post( - urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data - ) - else: - response = requests.post( - urljoin(settings.API_URL, "/api/upload_index"), data=file_data - ) + }) + upload_index(full_path, file_data) # delete local shutil.rmtree(full_path) @@ -183,7 +191,6 @@ def ingest_worker( "limited": False, } - def remote_worker( self, source_data, @@ -197,16 +204,14 @@ def remote_worker( doc_id=None, ): token_check = True - min_tokens = 150 - max_tokens = 1250 - full_path = directory + "/" + user + "/" + name_job + full_path = os.path.join(directory, user, name_job) if not os.path.exists(full_path): os.makedirs(full_path) self.update_state(state="PROGRESS", meta={"current": 1}) logging.info( f"Remote job: {full_path}", - extra={"user": user, "job": name_job, source_data: source_data}, + extra={"user": user, "job": name_job, "source_data": source_data}, ) remote_loader = RemoteCreator.create_loader(loader) @@ -214,11 +219,10 @@ def remote_worker( docs = group_split( documents=raw_docs, - min_tokens=min_tokens, - max_tokens=max_tokens, + min_tokens=MIN_TOKENS, + max_tokens=MAX_TOKENS, token_check=token_check, ) - # docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] tokens = count_tokens_docs(docs) if operation_mode == "upload": id = ObjectId() @@ -230,7 +234,6 @@ def remote_worker( call_openai_api(docs, full_path, id, self) self.update_state(state="PROGRESS", meta={"current": 100}) - # Proceed with uploading and cleaning as in the original function file_data = { "name": name_job, "user": user, @@ -241,23 +244,12 @@ def remote_worker( "remote_data": source_data, "sync_frequency": sync_frequency, } - if settings.VECTOR_STORE == "faiss": - files = { - "file_faiss": open(full_path + "/index.faiss", "rb"), - "file_pkl": open(full_path + "/index.pkl", "rb"), - } - - requests.post( - urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data - ) - else: - requests.post(urljoin(settings.API_URL, "/api/upload_index"), data=file_data) + upload_index(full_path, file_data) shutil.rmtree(full_path) return {"urls": source_data, "name_job": name_job, "user": user, "limited": False} - def sync( self, source_data, @@ -283,10 +275,10 @@ def sync( doc_id, ) except Exception as e: + logging.error(f"Error during sync: {e}") return {"status": "error", "error": str(e)} return {"status": "success"} - def sync_worker(self, frequency): sync_counts = Counter() sources = sources_collection.find() From 5e604950c5d4da5c97be648755c044ade1b846de Mon Sep 17 00:00:00 2001 From: DhruvKadam-git Date: Tue, 1 Oct 2024 17:39:31 +0530 Subject: [PATCH 2/2] Add .env-template --- .env-template | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 .env-template diff --git a/.env-template b/.env-template new file mode 100644 index 000000000..e93f03635 --- /dev/null +++ b/.env-template @@ -0,0 +1,9 @@ +API_KEY= +LLM_NAME=docsgpt +VITE_API_STREAMING=true + +#For Azure (you can delete it if you don't use Azure) +OPENAI_API_BASE= +OPENAI_API_VERSION= +AZURE_DEPLOYMENT_NAME= +AZURE_EMBEDDINGS_DEPLOYMENT_NAME= \ No newline at end of file