From 8cf97b85d4cb02ff3f2235502aa4681345591224 Mon Sep 17 00:00:00 2001 From: Konstantinos Passadis | Azure MVP | MCT Date: Mon, 11 Nov 2024 01:58:57 +0200 Subject: [PATCH] Delete new-users-index.ipynb --- new-users-index.ipynb | 1 - 1 file changed, 1 deletion(-) delete mode 100644 new-users-index.ipynb diff --git a/new-users-index.ipynb b/new-users-index.ipynb deleted file mode 100644 index 5861960..0000000 --- a/new-users-index.ipynb +++ /dev/null @@ -1 +0,0 @@ -{"cells":[{"cell_type":"markdown","source":["### New Users Update notebook\n","- This Notebook updates the Users Index when a new user registers and updates the Users Table in Azure SQL\n","- The Notebook runs upon a CDC Trigger that initiates the Data Pipeline which fetches the Users Table\n","- The Notebook compares the current Users Index and updates only the new ones as well as creates and saves the embeddings"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"fde1d68a-b954-4562-a6a2-ea8a54f2294a"},{"cell_type":"code","source":["import json\n","\n","# Define input and output file paths\n","input_file = '/lakehouse/default/Files/userdata.json'\n","output_file = '/lakehouse/default/Files/udata.json'\n","\n","# Read the JSON file\n","with open(input_file, 'r') as file:\n"," user_data = json.load(file)\n","\n","# Transform the Genres field from a JSON string to an actual list\n","for user in user_data:\n"," try:\n"," user['Genres'] = json.loads(user['Genres'])\n"," except json.JSONDecodeError as e:\n"," print(f\"Invalid JSON format for Genres in user ID {user.get('id', 'unknown')}: {user['Genres']}. Error: {e}\")\n"," user['Genres'] = [] # Set to an empty list or handle it as per your requirement\n","\n","# Write the transformed data to a new JSON file\n","with open(output_file, 'w') as file:\n"," json.dump(user_data, file, indent=4)\n","\n","print(f\"Transformed data saved to {output_file}\")"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"602762e5-9478-4736-a62f-0c2c6201e218"},{"cell_type":"code","source":["import openai\n","import json\n","import time\n","import os\n","from azure.search.documents import SearchClient\n","from azure.core.credentials import AzureKeyCredential\n","import concurrent.futures\n","from azure.search.documents import SearchIndexingBufferedSender\n","\n","# Configuration\n","openai.api_type = \"azure\"\n","openai.api_base = \"https://dev-oai-kpass.openai.azure.com/\"\n","openai.api_version = \"2024-02-01\"\n","openai.api_key = \"23e8aaa33f3d414faddc28b598124272\"\n","deployment_id = \"text-embedding-ada-002\"\n","\n","search_service_name = \"azaivztqx\"\n","search_index_name = \"users-index\"\n","admin_key = \"UvNc9RS47BkkZi0Hz7XPdSkpvi9QXDuqbg6rrejGw5AzSeBxWhxe\"\n","endpoint = f\"https://{search_service_name}.search.windows.net\"\n","\n","# Initialize the search client\n","search_client = SearchClient(endpoint=endpoint, index_name=search_index_name, credential=AzureKeyCredential(admin_key))\n","\n","# Step 1: Fetch existing user IDs and their data from the search index\n","def get_existing_users():\n"," existing_users = {}\n"," results = search_client.search(\"*\", select=[\"UserId\", \"Genres\", \"Age\"], include_total_count=True)\n"," for result in results:\n"," existing_users[result[\"UserId\"]] = result\n"," return existing_users\n","\n","existing_users = get_existing_users()\n","print(f\"Fetched {len(existing_users)} existing users from the index.\")\n","\n","# Step 2: Load user data from JSON file\n","input_file = '/lakehouse/default/Files/udata.json'\n","with open(input_file, 'r') as file:\n"," user_data = json.load(file)\n","\n","# Step 3: Validate each document and separate valid and invalid documents\n","valid_documents = []\n","invalid_documents = []\n","\n","def validate_documents(user_data):\n"," for doc in user_data:\n"," valid = True\n","\n"," if 'UserId' in doc:\n"," try:\n"," doc['UserId'] = str(doc['UserId'])\n"," except ValueError:\n"," print(f\"Invalid value for UserId in document ID {doc['UserId']}: {doc['UserId']}\")\n"," invalid_documents.append(doc)\n"," valid = False\n"," \n"," # Validate Genres is a list of strings\n"," if 'Genres' in doc:\n"," if isinstance(doc['Genres'], str):\n"," try:\n"," doc['Genres'] = json.loads(doc['Genres'])\n"," except json.JSONDecodeError:\n"," print(f\"Invalid JSON format for Genres in document ID {doc['UserId']}: {doc['Genres']}\")\n"," invalid_documents.append(doc)\n"," valid = False\n"," elif isinstance(doc['Genres'], list):\n"," if not all(isinstance(genre, str) for genre in doc['Genres']):\n"," print(f\"Unexpected format for Genres in document ID {doc['UserId']}: {doc['Genres']}\")\n"," invalid_documents.append(doc)\n"," valid = False\n"," else:\n"," print(f\"Unexpected format for Genres in document ID {doc['UserId']}: {doc['Genres']}\")\n"," invalid_documents.append(doc)\n"," valid = False\n"," \n"," if valid:\n"," valid_documents.append(doc)\n","\n","# Run validation\n","validate_documents(user_data)\n","\n","print(f\"Valid documents: {len(valid_documents)}\")\n","print(f\"Invalid documents: {len(invalid_documents)}\")\n","\n","# Step 4: Filter out users who already exist and have the same data\n","def filter_new_or_updated_users(existing_users, valid_documents):\n"," new_or_updated_users = []\n"," for doc in valid_documents:\n"," user_id = doc.get('UserId')\n"," if user_id in existing_users:\n"," existing_doc = existing_users[user_id]\n"," # Check if the existing user data is the same\n"," if doc['Genres'] == existing_doc.get('Genres') and doc['Age'] == existing_doc.get('Age'):\n"," continue # Skip users whose data hasn't changed\n"," new_or_updated_users.append(doc)\n"," return new_or_updated_users\n","\n","new_users = filter_new_or_updated_users(existing_users, valid_documents)\n","print(f\"Found {len(new_users)} new or updated users to process.\")\n","\n","# Step 5: Upload the new or updated users to the Azure Search index\n","def upload_documents_to_index(documents):\n"," try:\n"," if documents:\n"," result = search_client.upload_documents(documents=documents)\n"," print(f\"Uploaded {len(documents)} documents to the Azure Search index.\")\n"," else:\n"," print(\"No new or updated documents to upload.\")\n"," except Exception as e:\n"," print(f\"Error uploading documents: {e}\")\n","\n","upload_documents_to_index(new_users)\n","\n","\n","\n","# Step 4: Generate embeddings for new or updated users\n","def generate_embeddings_batch(texts, max_retries=7, backoff_factor=2):\n"," embeddings = []\n"," for text in texts:\n"," for attempt in range(max_retries):\n"," try:\n"," response = openai.Embedding.create(input=text, engine=deployment_id)\n"," embeddings.append(response['data'][0]['embedding'])\n"," break\n"," except openai.error.RateLimitError:\n"," wait_time = backoff_factor * (2 ** attempt)\n"," print(f\"Rate limit exceeded. Retrying in {wait_time} seconds...\")\n"," time.sleep(wait_time)\n"," except Exception as e:\n"," print(f\"Error generating embedding: {e}\")\n"," raise e\n"," time.sleep(0.5)\n"," return embeddings\n","\n","def process_documents(documents, batch_size=5, max_workers=8):\n"," with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:\n"," futures = {}\n"," for i in range(0, len(documents), batch_size):\n"," batch = documents[i:i + batch_size]\n"," texts = [f\"{' '.join(doc['Genres'])} {doc['Age']}\" for doc in batch]\n"," future = executor.submit(generate_embeddings_batch, texts)\n"," futures[future] = (batch, texts, i)\n","\n"," for future in concurrent.futures.as_completed(futures):\n"," embeddings = future.result()\n"," batch, texts, start_index = futures[future]\n"," for j, embedding in enumerate(embeddings):\n"," # Store embeddings in the document\n"," batch[j]['Embedding'] = embedding\n"," batch[j]['searchContent'] = texts[j]\n","\n","# Step 5: Process and generate embeddings for filtered users\n","if new_users:\n"," process_documents(new_users)\n","\n","\n","# Ensure the output directory exists\n","output_dir = \"/lakehouse/default/Files/embeddings\"\n","os.makedirs(output_dir, exist_ok=True)\n","\n","output_file = os.path.join(output_dir, \"updated_users.json\")\n","with open(output_file, 'w') as file:\n"," json.dump(new_users, file, indent=2)\n","\n","print(f\"Updated users with embeddings saved to {output_file}\")\n","\n","\n","# Upload the documents with embeddings to the index\n","try:\n"," with SearchIndexingBufferedSender(\n"," endpoint=endpoint,\n"," index_name=search_index_name,\n"," credential=AzureKeyCredential(admin_key),\n"," ) as batch_client:\n"," with open(output_file, 'r') as file:\n"," documents = json.load(file)\n"," batch_client.upload_documents(documents=documents)\n","\n"," print(f\"Uploaded {len(documents)} documents in total\")\n","except Exception as e:\n"," print(f\"Error uploading documents: {e}\")\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"91057c70-74ec-4c28-a3c5-aab917fa316d"},{"cell_type":"markdown","source":[],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"78ee6b4b-721e-48b9-870d-877e14f43262"}],"metadata":{"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"name":"synapse_pyspark","language":"Python","display_name":"Synapse PySpark"},"language_info":{"name":"python"},"microsoft":{"language":"python","language_group":"synapse_pyspark","ms_spell_check":{"ms_spell_check_language":"en"}},"widgets":{},"nteract":{"version":"nteract-front-end@1.0.0"},"spark_compute":{"compute_id":"/trident/default","session_options":{"conf":{"spark.synapse.nbs.session.timeout":"1200000"}}},"dependencies":{"environment":{"environmentId":"d5a85687-f4c8-4dae-86ec-ba90dc32a717","workspaceId":"9750728a-936e-41b9-a6cd-1247d645f4c5"},"lakehouse":{"default_lakehouse":"83b65b13-7f82-4177-838c-f19a8134860b","default_lakehouse_name":"Datasets","default_lakehouse_workspace_id":"9750728a-936e-41b9-a6cd-1247d645f4c5"}}},"nbformat":4,"nbformat_minor":5} \ No newline at end of file