diff --git a/Notebooks/Books-Pipeline/1-cleanCsv.ipynb b/Notebooks/Books-Pipeline/1-cleanCsv.ipynb deleted file mode 100644 index 5df9f89..0000000 --- a/Notebooks/Books-Pipeline/1-cleanCsv.ipynb +++ /dev/null @@ -1 +0,0 @@ -{"cells":[{"cell_type":"code","source":["# Welcome to your new notebook\n","# Type here in the cell editor to add code!\n","# %pip install pandas\n","# abfss://Data@onelake.dfs.fabric.microsoft.com/Datasets.Lakehouse/Files/booksdata.csv\n","# Files/booksdata.csv"],"outputs":[],"execution_count":null,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"5f68967e-253c-4d35-be30-7a0b16a7e0fb"},{"cell_type":"markdown","source":["- **CLEAN CSV**"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"2b2668cd-20f5-48d7-88a3-47303d54203f"},{"cell_type":"code","source":["from pyspark.sql.functions import udf, col\n","from pyspark.sql.types import BooleanType\n","import re\n","\n","# Read the CSV file into a Spark DataFrame\n","df = spark.read.format(\"csv\").option(\"header\", \"true\").load(\"abfss://Data@onelake.dfs.fabric.microsoft.com/Datasets.Lakehouse/Files/books.csv\")\n","display(df)\n","\n","# Define the UDF to check for non-ASCII characters\n","def has_non_ascii(value):\n"," if value is None:\n"," return False\n"," return bool(re.search(r'[^\\x00-\\x7F]', value))\n","\n","# Register the UDF\n","has_non_ascii_udf = udf(has_non_ascii, BooleanType())\n","\n","# Apply the UDF to each column that needs to be checked\n","df_clean = df.filter(~has_non_ascii_udf(col(\"Book\")) & \n"," ~has_non_ascii_udf(col(\"Author\")) & \n"," ~has_non_ascii_udf(col(\"Genres\")))\n","\n","# Display the cleaned DataFrame\n","display(df_clean)\n","\n","# Coalesce the DataFrame to a single partition\n","df_clean_coalesced = df_clean.coalesce(1)\n","\n","# Save the coalesced DataFrame to a new CSV file in Azure Data Lake Storage\n","cleaned_file_path = \"abfss://Data@onelake.dfs.fabric.microsoft.com/Datasets.Lakehouse/Files/booksclean\"\n","df_clean_coalesced.write.format(\"csv\").option(\"header\", \"true\").mode(\"overwrite\").save(cleaned_file_path)\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"collapsed":false},"id":"549e66cd-6ac5-4ebf-8187-e6e768211dbc"}],"metadata":{"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"name":"synapse_pyspark","language":"Python","display_name":"Synapse PySpark"},"language_info":{"name":"python"},"microsoft":{"language":"python","language_group":"synapse_pyspark","ms_spell_check":{"ms_spell_check_language":"en"}},"widgets":{},"nteract":{"version":"nteract-front-end@1.0.0"},"synapse_widget":{"version":"0.1","state":{}},"spark_compute":{"compute_id":"/trident/default"},"dependencies":{"lakehouse":{"default_lakehouse":"83b65b13-7f82-4177-838c-f19a8134860b","known_lakehouses":[{"id":"83b65b13-7f82-4177-838c-f19a8134860b"}],"default_lakehouse_name":"Datasets","default_lakehouse_workspace_id":"9750728a-936e-41b9-a6cd-1247d645f4c5"}}},"nbformat":4,"nbformat_minor":5} \ No newline at end of file diff --git a/Notebooks/Books-Pipeline/1-cleanbooksCsv.ipynb b/Notebooks/Books-Pipeline/1-cleanbooksCsv.ipynb new file mode 100644 index 0000000..58a5a2b --- /dev/null +++ b/Notebooks/Books-Pipeline/1-cleanbooksCsv.ipynb @@ -0,0 +1 @@ +{"cells":[{"cell_type":"markdown","source":["- **CLEAN BOOKS DATASET CSV**"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"2b2668cd-20f5-48d7-88a3-47303d54203f"},{"cell_type":"code","source":["from pyspark.sql import SparkSession\n","from pyspark.sql.functions import udf, col, size, split\n","from pyspark.sql.types import StringType, BooleanType, StructType, StructField, FloatType\n","import re\n","\n","# Initialize Spark Session\n","spark = SparkSession.builder.appName(\"CleanCSVData\").getOrCreate()\n","\n","# Define schema to ensure columns are interpreted correctly\n","schema = StructType([\n"," StructField(\"id\", StringType(), True),\n"," StructField(\"Book\", StringType(), True),\n"," StructField(\"Author\", StringType(), True),\n"," StructField(\"Description\", StringType(), True),\n"," StructField(\"Genres\", StringType(), True),\n"," StructField(\"Rating\", FloatType(), True)\n","])\n","\n","# Read the CSV file into a Spark DataFrame with proper handling of special characters\n","df = spark.read.format(\"csv\") \\\n"," .option(\"header\", \"true\") \\\n"," .option(\"quote\", '\"') \\\n"," .option(\"escape\", '\\\\') \\\n"," .option(\"delimiter\", \",\") \\\n"," .option(\"multiLine\", \"true\") \\\n"," .schema(schema) \\\n"," .load(\"abfss://Data@onelake.dfs.fabric.microsoft.com/Datasets.Lakehouse/Files/books.csv\")\n","\n","# Display the original DataFrame\n","display(df)\n","\n","# Define a function to check if the row has the expected number of columns\n","def is_valid_row(*columns):\n"," # Check if all columns have non-null values\n"," if len(columns) != 6:\n"," return False\n"," return all(columns)\n","\n","is_valid_row_udf = udf(is_valid_row, BooleanType())\n","\n","# Apply the UDF to filter rows with missing or misaligned columns\n","df_clean = df.filter(is_valid_row_udf(\n"," col(\"id\"),\n"," col(\"Book\"),\n"," col(\"Author\"),\n"," col(\"Description\"),\n"," col(\"Genres\"),\n"," col(\"Rating\")\n","))\n","\n","# Additional filtering to catch multi-line anomalies in the 'Description' field\n","df_clean = df_clean.filter(~col(\"Description\").contains(\"\\n\"))\n","\n","# Display the cleaned DataFrame\n","display(df_clean)\n","\n","# Coalesce the DataFrame to a single partition for saving\n","df_clean_coalesced = df_clean.coalesce(1)\n","\n","# Save the cleaned DataFrame to a new CSV file in Azure Data Lake Storage\n","cleaned_file_path = \"abfss://Data@onelake.dfs.fabric.microsoft.com/Datasets.Lakehouse/Files/booksclean\"\n","df_clean_coalesced.write.format(\"csv\") \\\n"," .option(\"header\", \"true\") \\\n"," .mode(\"overwrite\") \\\n"," .save(cleaned_file_path)\n","\n","print(f\"Cleaned data saved to: {cleaned_file_path}\")\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"collapsed":false},"id":"549e66cd-6ac5-4ebf-8187-e6e768211dbc"}],"metadata":{"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"name":"synapse_pyspark","language":"Python","display_name":"Synapse PySpark"},"language_info":{"name":"python"},"microsoft":{"language":"python","language_group":"synapse_pyspark","ms_spell_check":{"ms_spell_check_language":"en"}},"widgets":{},"nteract":{"version":"nteract-front-end@1.0.0"},"synapse_widget":{"version":"0.1","state":{}},"spark_compute":{"compute_id":"/trident/default","session_options":{"conf":{"spark.synapse.nbs.session.timeout":"1200000"}}},"dependencies":{"lakehouse":{"default_lakehouse":"83b65b13-7f82-4177-838c-f19a8134860b","known_lakehouses":[{"id":"83b65b13-7f82-4177-838c-f19a8134860b"}],"default_lakehouse_name":"Datasets","default_lakehouse_workspace_id":"9750728a-936e-41b9-a6cd-1247d645f4c5"},"environment":{"environmentId":"d5a85687-f4c8-4dae-86ec-ba90dc32a717","workspaceId":"9750728a-936e-41b9-a6cd-1247d645f4c5"}}},"nbformat":4,"nbformat_minor":5} \ No newline at end of file diff --git a/Notebooks/Books-Pipeline/2- ConvertToJson.ipynb b/Notebooks/Books-Pipeline/2- ConvertToJson.ipynb deleted file mode 100644 index d5d588f..0000000 --- a/Notebooks/Books-Pipeline/2- ConvertToJson.ipynb +++ /dev/null @@ -1 +0,0 @@ -{"cells":[{"cell_type":"markdown","source":["- **CSV to JSON**"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"c65c4afc-dde1-4e03-94f4-521b780dbbe7"},{"cell_type":"code","source":["from pyspark.sql import SparkSession\n","from pyspark.sql.functions import col, udf\n","from pyspark.sql.types import ArrayType, StringType\n","import ast\n","\n","# Initialize Spark session\n","spark = SparkSession.builder.appName(\"TransformGenres\").getOrCreate()\n","\n","# Path to your CSV file\n","input_file = \"abfss://Data@onelake.dfs.fabric.microsoft.com/Datasets.Lakehouse/Files/bookers.csv\"\n","\n","# Load data into Spark DataFrame\n","df = spark.read.format(\"csv\").option(\"header\", \"true\").load(input_file)\n","\n","# Function to convert genres string to a list of strings with error handling\n","def convert_genres_to_list(genres_str):\n"," try:\n"," return ast.literal_eval(genres_str)\n"," except (ValueError, SyntaxError):\n"," return []\n","\n","# Register the UDF\n","convert_genres_udf = udf(convert_genres_to_list, ArrayType(StringType()))\n","\n","# Apply the conversion UDF to the Genres column\n","df = df.withColumn(\"Genres\", convert_genres_udf(col(\"Genres\")))\n","\n","# Rename columns to match the index headers\n","df = df.withColumnRenamed(\"_c0\", \"id\") \\\n"," .withColumnRenamed(\"Book\", \"Title\") \\\n"," .withColumnRenamed(\"Avg_Rating\", \"Rating\")\n","\n","# Ensure the id field is a string\n","df = df.withColumn(\"id\", df[\"id\"].cast(StringType()))\n","\n","# Show the transformed DataFrame to verify\n","df.show(truncate=False)\n","\n","# Save the transformed DataFrame to a JSON file\n","output_file = \"abfss://Data@onelake.dfs.fabric.microsoft.com/Datasets.Lakehouse/Files/bookers.json\" # Adjust the path as needed\n","df.write.json(output_file, mode='overwrite')\n","\n","# Verify the JSON structure by reading it back\n","transformed_df = spark.read.json(output_file)\n","transformed_df.show(truncate=False)\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"143fec06-0102-45e8-ba65-21cb8e1a11d8"}],"metadata":{"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"name":"synapse_pyspark","language":"Python","display_name":"Synapse PySpark"},"language_info":{"name":"python"},"microsoft":{"language":"python","language_group":"synapse_pyspark","ms_spell_check":{"ms_spell_check_language":"en"}},"widgets":{},"nteract":{"version":"nteract-front-end@1.0.0"},"synapse_widget":{"version":"0.1","state":{}},"spark_compute":{"compute_id":"/trident/default"},"dependencies":{"lakehouse":{"default_lakehouse":"83b65b13-7f82-4177-838c-f19a8134860b","default_lakehouse_name":"Datasets","default_lakehouse_workspace_id":"9750728a-936e-41b9-a6cd-1247d645f4c5","known_lakehouses":[{"id":"83b65b13-7f82-4177-838c-f19a8134860b"}]}}},"nbformat":4,"nbformat_minor":5} \ No newline at end of file diff --git a/Notebooks/Books-Pipeline/2-bookstojson.ipynb b/Notebooks/Books-Pipeline/2-bookstojson.ipynb new file mode 100644 index 0000000..8ba28d0 --- /dev/null +++ b/Notebooks/Books-Pipeline/2-bookstojson.ipynb @@ -0,0 +1,140 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "c65c4afc-dde1-4e03-94f4-521b780dbbe7", + "metadata": { + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "- **Convert Books Dataset from CSV to JSON**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "143fec06-0102-45e8-ba65-21cb8e1a11d8", + "metadata": { + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "from pyspark.sql import SparkSession\n", + "from pyspark.sql.functions import col, udf\n", + "from pyspark.sql.types import ArrayType, StringType\n", + "import ast\n", + "\n", + "# Initialize Spark session\n", + "spark = SparkSession.builder.appName(\"TransformGenres\").getOrCreate()\n", + "\n", + "# Path to your CSV file\n", + "input_file = \"abfss://Data@onelake.dfs.fabric.microsoft.com/Datasets.Lakehouse/Files/bookers.csv\"\n", + "\n", + "# Load data into Spark DataFrame\n", + "df = spark.read.format(\"csv\").option(\"header\", \"true\").load(input_file)\n", + "\n", + "# Function to convert genres string to a list of strings with error handling\n", + "def convert_genres_to_list(genres_str):\n", + " try:\n", + " return ast.literal_eval(genres_str)\n", + " except (ValueError, SyntaxError):\n", + " return []\n", + "\n", + "# Register the UDF\n", + "convert_genres_udf = udf(convert_genres_to_list, ArrayType(StringType()))\n", + "\n", + "# Apply the conversion UDF to the Genres column\n", + "df = df.withColumn(\"Genres\", convert_genres_udf(col(\"Genres\")))\n", + "\n", + "# Rename columns to match the index headers\n", + "df = df.withColumnRenamed(\"_c0\", \"id\") \\\n", + " .withColumnRenamed(\"Book\", \"Title\") \\\n", + " .withColumnRenamed(\"Avg_Rating\", \"Rating\")\n", + "\n", + "# Ensure the id field is a string\n", + "df = df.withColumn(\"id\", df[\"id\"].cast(StringType()))\n", + "\n", + "# Show the transformed DataFrame to verify\n", + "df.show(truncate=False)\n", + "\n", + "# Save the transformed DataFrame to a JSON file\n", + "output_file = \"abfss://Data@onelake.dfs.fabric.microsoft.com/Datasets.Lakehouse/Files/bookers.json\" # Adjust the path as needed\n", + "df.write.json(output_file, mode='overwrite')\n", + "\n", + "# Verify the JSON structure by reading it back\n", + "transformed_df = spark.read.json(output_file)\n", + "transformed_df.show(truncate=False)\n" + ] + } + ], + "metadata": { + "dependencies": { + "lakehouse": { + "default_lakehouse": "83b65b13-7f82-4177-838c-f19a8134860b", + "default_lakehouse_name": "Datasets", + "default_lakehouse_workspace_id": "9750728a-936e-41b9-a6cd-1247d645f4c5", + "known_lakehouses": [ + { + "id": "83b65b13-7f82-4177-838c-f19a8134860b" + } + ] + } + }, + "kernel_info": { + "name": "synapse_pyspark" + }, + "kernelspec": { + "display_name": "Synapse PySpark", + "language": "Python", + "name": "synapse_pyspark" + }, + "language_info": { + "name": "python" + }, + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark", + "ms_spell_check": { + "ms_spell_check_language": "en" + } + }, + "nteract": { + "version": "nteract-front-end@1.0.0" + }, + "spark_compute": { + "compute_id": "/trident/default", + "session_options": { + "conf": { + "spark.synapse.nbs.session.timeout": "1200000" + } + } + }, + "synapse_widget": { + "state": {}, + "version": "0.1" + }, + "widgets": {} + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/Notebooks/Books-Pipeline/3-JsonToArray.ipynb b/Notebooks/Books-Pipeline/3-JsonToArray.ipynb deleted file mode 100644 index c32cda6..0000000 --- a/Notebooks/Books-Pipeline/3-JsonToArray.ipynb +++ /dev/null @@ -1 +0,0 @@ -{"cells":[{"cell_type":"code","source":["# Welcome to your new notebook\n","# Type here in the cell editor to add code!\n"],"outputs":[],"execution_count":null,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"30b7f257-9b1e-48cb-836a-19012389ad54"},{"cell_type":"code","source":["import json\n","\n","# Read the newline-delimited JSON file\n","input_file = '/lakehouse/default/Files/bookerset.json'\n","output_file = '/lakehouse/default/Files/bookersdata.json'\n","\n","with open(input_file, 'r') as f:\n"," lines = f.readlines()\n","\n","# Parse each line as a JSON object and add to a list\n","books = [json.loads(line) for line in lines]\n","\n","# Write the list of books to a single JSON array file\n","with open(output_file, 'w') as f:\n"," json.dump(books, f, indent=2)\n","\n","print(f\"Converted {len(books)} records to a single JSON array in {output_file}\")\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"5086f6a2-3d8e-45d7-86cf-cc148fc5d9f4"},{"cell_type":"code","source":["\n","\n","import pandas as pd\n","# Load data into pandas DataFrame from \"/lakehouse/default/\" + \"Files/bookdata.json\"\n","df = pd.read_json(\"/lakehouse/default/\" + \"Files/bookersdata.json\",typ=\"series\")\n","display(df)\n"],"outputs":[{"output_type":"display_data","data":{"application/vnd.livy.statement-meta+json":{"spark_pool":null,"statement_id":5,"statement_ids":[5],"livy_statement_state":"available","session_id":"3ef4e2cf-f9ca-4d52-94ee-f5367d86fe1f","state":"finished","normalized_state":"finished","queued_time":"2024-07-22T20:32:47.9039111Z","session_start_time":"2024-07-22T20:32:48.2842773Z","execution_start_time":"2024-07-22T20:33:59.6471994Z","execution_finish_time":"2024-07-22T20:34:02.9482071Z","parent_msg_id":"7c99bc29-5896-494c-b5a2-163f99422971"},"text/plain":"StatementMeta(, 3ef4e2cf-f9ca-4d52-94ee-f5367d86fe1f, 5, Finished, Available, Finished)"},"metadata":{}},{"output_type":"display_data","data":{"text/plain":"0 {'id': '0', 'Title': 'To Kill a Mockingbird', ...\n1 {'id': '1', 'Title': 'Harry Potter and the Phi...\n2 {'id': '2', 'Title': 'Pride and Prejudice', 'A...\n3 {'id': '3', 'Title': 'The Diary of a Young Gir...\n4 {'id': '4', 'Title': 'Animal Farm', 'Author': ...\n ... \n9532 {'id': '9995', 'Title': 'Breeders (Breeders Tr...\n9533 {'id': '9996', 'Title': 'Dynamo', 'Author': 'E...\n9534 {'id': '9997', 'Title': 'The Republic of Trees...\n9535 {'id': '9998', 'Title': 'Waking Up (Healing He...\n9536 {'id': '9999', 'Title': 'Bits and Pieces: Tale...\nLength: 9537, dtype: object"},"metadata":{}}],"execution_count":1,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"c46008dd-f103-4834-88b9-3a236b43ab33"}],"metadata":{"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"name":"synapse_pyspark","language":"Python","display_name":"Synapse PySpark"},"language_info":{"name":"python"},"microsoft":{"language":"python","language_group":"synapse_pyspark","ms_spell_check":{"ms_spell_check_language":"en"}},"widgets":{},"nteract":{"version":"nteract-front-end@1.0.0"},"spark_compute":{"compute_id":"/trident/default"},"dependencies":{"lakehouse":{"default_lakehouse":"83b65b13-7f82-4177-838c-f19a8134860b","default_lakehouse_name":"Datasets","default_lakehouse_workspace_id":"9750728a-936e-41b9-a6cd-1247d645f4c5"}}},"nbformat":4,"nbformat_minor":5} \ No newline at end of file diff --git a/Notebooks/Books-Pipeline/3-booksJsontoArray.ipynb b/Notebooks/Books-Pipeline/3-booksJsontoArray.ipynb new file mode 100644 index 0000000..be2eeda --- /dev/null +++ b/Notebooks/Books-Pipeline/3-booksJsontoArray.ipynb @@ -0,0 +1,136 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "4404b1a6-09e0-487e-a05b-23adbce1cc7f", + "metadata": { + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "-**Transform Books Json to Array**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5086f6a2-3d8e-45d7-86cf-cc148fc5d9f4", + "metadata": { + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "import json\n", + "\n", + "# Read the newline-delimited JSON file\n", + "input_file = '/lakehouse/default/Files/bookerset.json'\n", + "output_file = '/lakehouse/default/Files/bookersdata.json'\n", + "\n", + "with open(input_file, 'r') as f:\n", + " lines = f.readlines()\n", + "\n", + "# Parse each line as a JSON object and add to a list\n", + "books = [json.loads(line) for line in lines]\n", + "\n", + "# Write the list of books to a single JSON array file\n", + "with open(output_file, 'w') as f:\n", + " json.dump(books, f, indent=2)\n", + "\n", + "print(f\"Converted {len(books)} records to a single JSON array in {output_file}\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c46008dd-f103-4834-88b9-3a236b43ab33", + "metadata": { + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "\n", + "\n", + "import pandas as pd\n", + "# Load data into pandas DataFrame from \"/lakehouse/default/\" + \"Files/bookdata.json\"\n", + "df = pd.read_json(\"/lakehouse/default/\" + \"Files/bookersdata.json\",typ=\"series\")\n", + "display(df)\n" + ] + } + ], + "metadata": { + "dependencies": { + "environment": { + "environmentId": "d5a85687-f4c8-4dae-86ec-ba90dc32a717", + "workspaceId": "9750728a-936e-41b9-a6cd-1247d645f4c5" + }, + "lakehouse": { + "default_lakehouse": "83b65b13-7f82-4177-838c-f19a8134860b", + "default_lakehouse_name": "Datasets", + "default_lakehouse_workspace_id": "9750728a-936e-41b9-a6cd-1247d645f4c5" + } + }, + "kernel_info": { + "name": "synapse_pyspark" + }, + "kernelspec": { + "display_name": "Synapse PySpark", + "language": "Python", + "name": "synapse_pyspark" + }, + "language_info": { + "name": "python" + }, + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark", + "ms_spell_check": { + "ms_spell_check_language": "en" + } + }, + "nteract": { + "version": "nteract-front-end@1.0.0" + }, + "spark_compute": { + "compute_id": "/trident/default", + "session_options": { + "conf": { + "spark.synapse.nbs.session.timeout": "1200000" + } + } + }, + "widgets": {} + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/Notebooks/Books-Pipeline/4-JsontoAiIndex.ipynb b/Notebooks/Books-Pipeline/4-JsontoAiIndex.ipynb deleted file mode 100644 index e4013f5..0000000 --- a/Notebooks/Books-Pipeline/4-JsontoAiIndex.ipynb +++ /dev/null @@ -1 +0,0 @@ -{"cells":[{"cell_type":"code","execution_count":null,"id":"12c5384c-1c93-404d-bac9-d7058d261c9d","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["#%pip install pandas azure-search-documents==11.6.0b3 azure-core\n"]},{"cell_type":"code","execution_count":null,"id":"eb23d98f-6200-4e00-90af-2f9bf294d334","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["from azure.core.credentials import AzureKeyCredential\n","from azure.search.documents.indexes import SearchIndexClient\n","from azure.search.documents.indexes.models import SearchIndex, SimpleField, SearchFieldDataType, SearchField, ComplexField\n","\n","# Define your Azure Cognitive Search service and API key\n","service_name = 'xxxx'\n","admin_key = 'xxxxxxxxxxxxxxxxxxxxxx'\n","index_name = 'xxxx-index'\n","\n","# Create a client\n","endpoint = f'https://{service_name}.search.windows.net'\n","admin_client = SearchIndexClient(endpoint=endpoint, credential=AzureKeyCredential(admin_key))\n","\n","# Define the fields of the index\n","fields = [\n"," SimpleField(name=\"id\", type=SearchFieldDataType.String, key=True, retrievable=True, stored=True),\n"," SearchField(name=\"Title\", type=SearchFieldDataType.String, searchable=True, filterable=True, retrievable=True, stored=True, analyzer_name=\"standard.lucene\"),\n"," SearchField(name=\"Author\", type=SearchFieldDataType.String, searchable=True, filterable=True, retrievable=True, stored=True, sortable=True, facetable=True, analyzer_name=\"standard.lucene\"),\n"," SearchField(name=\"Genres\", type=SearchFieldDataType.Collection(SearchFieldDataType.String), searchable=True, filterable=True, retrievable=True, stored=True, facetable=True, analyzer_name=\"standard.lucene\"),\n"," SimpleField(name=\"Rating\", type=SearchFieldDataType.Double, filterable=True, retrievable=True, stored=True, sortable=True, facetable=True)\n","]\n","\n","# Define the index\n","index = SearchIndex(name=index_name, fields=fields)\n","\n","# Create the index\n","admin_client.create_index(index)\n","print(f'Index \"{index_name}\" created successfully.')\n"]},{"cell_type":"code","execution_count":null,"id":"45d32031-8192-4040-a445-03761a2b2623","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["from azure.search.documents import SearchClient\n","from azure.core.credentials import AzureKeyCredential\n","import json\n","\n","# Define your Azure Cognitive Search credentials and endpoint\n","service_endpoint = \"https://xxxxxx.search.windows.net\"\n","index_name = \"xxxxx-index\"\n","admin_key = \"xxxxxxxxxxxxxxxxxxxxxxxx\"\n","\n","# Create a SearchClient\n","client = SearchClient(endpoint=service_endpoint, index_name=index_name, credential=AzureKeyCredential(admin_key))\n","\n","# Load the JSON data\n","output_file = \"/lakehouse/default/Files/bookersdata.json\"\n","with open(output_file, 'r') as file:\n"," documents = json.load(file) # Load the JSON array directly\n","\n","# Lists to hold valid and invalid documents\n","valid_documents = []\n","invalid_documents = []\n","\n","# Validate each document\n","for doc in documents:\n"," valid = True\n"," \n"," # Validate and convert Rating to float\n"," if 'Rating' in doc:\n"," try:\n"," doc['Rating'] = float(doc['Rating'])\n"," except ValueError:\n"," print(f\"Invalid value for Rating in document ID {doc['id']}: {doc['Rating']}\")\n"," invalid_documents.append(doc)\n"," valid = False\n"," \n"," # Validate Genres is a list of strings\n"," if 'Genres' in doc:\n"," if isinstance(doc['Genres'], str):\n"," try:\n"," doc['Genres'] = json.loads(doc['Genres'])\n"," except json.JSONDecodeError:\n"," print(f\"Invalid JSON format for Genres in document ID {doc['id']}: {doc['Genres']}\")\n"," invalid_documents.append(doc)\n"," valid = False\n"," elif isinstance(doc['Genres'], list):\n"," if not all(isinstance(genre, str) for genre in doc['Genres']):\n"," print(f\"Unexpected format for Genres in document ID {doc['id']}: {doc['Genres']}\")\n"," invalid_documents.append(doc)\n"," valid = False\n"," else:\n"," print(f\"Unexpected format for Genres in document ID {doc['id']}: {doc['Genres']}\")\n"," invalid_documents.append(doc)\n"," valid = False\n"," \n"," if valid:\n"," valid_documents.append(doc)\n","\n","# Log the number of valid and invalid documents\n","print(f\"Valid documents: {len(valid_documents)}\")\n","print(f\"Invalid documents: {len(invalid_documents)}\")\n","\n","# Upload valid documents to the Azure Search index\n","if valid_documents:\n"," result = client.upload_documents(documents=valid_documents)\n"," print(f\"Uploaded {len(valid_documents)} documents to the Azure Search index. Results: {result}\")\n","else:\n"," print(\"No valid documents to upload.\")\n"]}],"metadata":{"dependencies":{"lakehouse":{"default_lakehouse":"83b65b13-7f82-4177-838c-f19a8134860b","default_lakehouse_name":"Datasets","default_lakehouse_workspace_id":"9750728a-936e-41b9-a6cd-1247d645f4c5"}},"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"display_name":"Synapse PySpark","language":"Python","name":"synapse_pyspark"},"language_info":{"name":"python"},"microsoft":{"language":"python","language_group":"synapse_pyspark","ms_spell_check":{"ms_spell_check_language":"en"}},"nteract":{"version":"nteract-front-end@1.0.0"},"spark_compute":{"compute_id":"/trident/default"},"synapse_widget":{"state":{},"version":"0.1"},"widgets":{}},"nbformat":4,"nbformat_minor":5} diff --git a/Notebooks/Books-Pipeline/4-booksjsonToindex.ipynb b/Notebooks/Books-Pipeline/4-booksjsonToindex.ipynb new file mode 100644 index 0000000..87cc2ea --- /dev/null +++ b/Notebooks/Books-Pipeline/4-booksjsonToindex.ipynb @@ -0,0 +1,221 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "6bcb953c-1e64-4b32-8b62-e28a4760b51a", + "metadata": { + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "**Copy Books Dataset to Azure AI Search**\n", + "\n", + "- Create the Index\n", + "- Validate Data\n", + "- Copy Data to index\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "eb23d98f-6200-4e00-90af-2f9bf294d334", + "metadata": { + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "from azure.core.credentials import AzureKeyCredential\n", + "from azure.search.documents.indexes import SearchIndexClient\n", + "from azure.search.documents.indexes.models import SearchIndex, SimpleField, SearchFieldDataType, SearchField, ComplexField\n", + "\n", + "# Define your Azure Cognitive Search service and API key\n", + "service_name = 'xxxxxxxxx'\n", + "admin_key = 'xxxxxxxxxx'\n", + "index_name = 'books-index'\n", + "\n", + "# Create a client\n", + "endpoint = f'https://{service_name}.search.windows.net'\n", + "admin_client = SearchIndexClient(endpoint=endpoint, credential=AzureKeyCredential(admin_key))\n", + "\n", + "# Define the fields of the index\n", + "fields = [\n", + " SimpleField(name=\"id\", type=SearchFieldDataType.String, key=True, retrievable=True, stored=True),\n", + " SearchField(name=\"Title\", type=SearchFieldDataType.String, searchable=True, filterable=True, retrievable=True, stored=True, analyzer_name=\"standard.lucene\"),\n", + " SearchField(name=\"Description\", type=SearchFieldDataType.String, searchable=True, filterable=False, retrievable=True, stored=True, analyzer_name=\"standard.lucene\"),\n", + " SearchField(name=\"Author\", type=SearchFieldDataType.String, searchable=True, filterable=True, retrievable=True, stored=True, sortable=True, facetable=True, analyzer_name=\"standard.lucene\"),\n", + " SearchField(name=\"Genres\", type=SearchFieldDataType.Collection(SearchFieldDataType.String), searchable=True, filterable=True, retrievable=True, stored=True, facetable=True, analyzer_name=\"standard.lucene\"),\n", + " SimpleField(name=\"Rating\", type=SearchFieldDataType.Double, filterable=True, retrievable=True, stored=True, sortable=True, facetable=True)\n", + "]\n", + "\n", + "# Define the index\n", + "index = SearchIndex(name=index_name, fields=fields)\n", + "\n", + "# Create the index\n", + "admin_client.create_index(index)\n", + "print(f'Index \"{index_name}\" created successfully.')\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fb1350c8-4100-4ec3-a488-af2f42604c8c", + "metadata": { + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "from azure.search.documents import SearchClient\n", + "from azure.core.credentials import AzureKeyCredential\n", + "import json\n", + "\n", + "# Define your Azure Cognitive Search credentials and endpoint\n", + "service_endpoint = \"https://azaivztqx.search.windows.net\"\n", + "index_name = \"books-index\"\n", + "admin_key = \"UvNc9RS47BkkZi0Hz7XPdSkpvi9QXDuqbg6rrejGw5AzSeBxWhxe\"\n", + "\n", + "# Create a SearchClient\n", + "client = SearchClient(endpoint=service_endpoint, index_name=index_name, credential=AzureKeyCredential(admin_key))\n", + "\n", + "# Load the JSON data\n", + "output_file = \"/lakehouse/default/Files/bookersdata.json\"\n", + "with open(output_file, 'r') as file:\n", + " documents = json.load(file) # Load the JSON array directly\n", + "\n", + "# Initialize lists for valid and invalid documents\n", + "valid_documents = []\n", + "invalid_documents = [] # <-- Declare this list here\n", + "\n", + "# Ensure all required fields, including the description, are present\n", + "for doc in documents:\n", + " if 'Description' not in doc:\n", + " doc['Description'] = '' # Fill missing descriptions with an empty string\n", + "\n", + "# Validate each document\n", + "for doc in documents:\n", + " valid = True\n", + " \n", + " # Validate and convert Rating to float\n", + " if 'Rating' in doc:\n", + " try:\n", + " doc['Rating'] = float(doc['Rating'])\n", + " except ValueError:\n", + " print(f\"Invalid value for Rating in document ID {doc.get('id', 'unknown')}: {doc['Rating']}\")\n", + " invalid_documents.append(doc)\n", + " valid = False\n", + " \n", + " # Validate Genres is a list of strings\n", + " if 'Genres' in doc:\n", + " if isinstance(doc['Genres'], str):\n", + " try:\n", + " doc['Genres'] = json.loads(doc['Genres'])\n", + " except json.JSONDecodeError:\n", + " print(f\"Invalid JSON format for Genres in document ID {doc.get('id', 'unknown')}: {doc['Genres']}\")\n", + " invalid_documents.append(doc)\n", + " valid = False\n", + " elif isinstance(doc['Genres'], list):\n", + " if not all(isinstance(genre, str) for genre in doc['Genres']):\n", + " print(f\"Unexpected format for Genres in document ID {doc.get('id', 'unknown')}: {doc['Genres']}\")\n", + " invalid_documents.append(doc)\n", + " valid = False\n", + " else:\n", + " print(f\"Unexpected format for Genres in document ID {doc.get('id', 'unknown')}: {doc['Genres']}\")\n", + " invalid_documents.append(doc)\n", + " valid = False\n", + " \n", + " if valid:\n", + " valid_documents.append(doc)\n", + "\n", + "# Log the number of valid and invalid documents\n", + "print(f\"Valid documents: {len(valid_documents)}\")\n", + "print(f\"Invalid documents: {len(invalid_documents)}\")\n", + "\n", + "# Upload valid documents to the Azure Search index\n", + "if valid_documents:\n", + " result = client.upload_documents(documents=valid_documents)\n", + " print(f\"Uploaded {len(valid_documents)} documents to the Azure Search index. Results: {result}\")\n", + "else:\n", + " print(\"No valid documents to upload.\")\n" + ] + } + ], + "metadata": { + "dependencies": { + "environment": { + "environmentId": "d5a85687-f4c8-4dae-86ec-ba90dc32a717", + "workspaceId": "9750728a-936e-41b9-a6cd-1247d645f4c5" + }, + "lakehouse": { + "default_lakehouse": "83b65b13-7f82-4177-838c-f19a8134860b", + "default_lakehouse_name": "Datasets", + "default_lakehouse_workspace_id": "9750728a-936e-41b9-a6cd-1247d645f4c5" + } + }, + "kernel_info": { + "name": "synapse_pyspark" + }, + "kernelspec": { + "display_name": "Synapse PySpark", + "language": "Python", + "name": "synapse_pyspark" + }, + "language_info": { + "name": "python" + }, + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark", + "ms_spell_check": { + "ms_spell_check_language": "en" + } + }, + "nteract": { + "version": "nteract-front-end@1.0.0" + }, + "spark_compute": { + "compute_id": "/trident/default", + "session_options": { + "conf": { + "spark.synapse.nbs.session.timeout": "1200000" + } + } + }, + "synapse_widget": { + "state": {}, + "version": "0.1" + }, + "widgets": {} + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/Notebooks/Books-Pipeline/5-AiSearch-embeddings.ipynb b/Notebooks/Books-Pipeline/5-AiSearch-embeddings.ipynb deleted file mode 100644 index 54feb5e..0000000 --- a/Notebooks/Books-Pipeline/5-AiSearch-embeddings.ipynb +++ /dev/null @@ -1 +0,0 @@ -{"cells":[{"cell_type":"code","execution_count":null,"id":"952b0652-488d-4784-8137-f2259f7f0213","metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"outputs":[],"source":["# Welcome to your new notebook\n","# Type here in the cell editor to add code!\n","# %pip install azure-search-documents azure-search azure-core azure-search-documents==11.6.0b3\n","\n"]},{"cell_type":"markdown","id":"adc24797-f859-4cba-81d6-c33c78a572b9","metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"source":["**NEW SERIES**\n"]},{"cell_type":"markdown","id":"ed133835-ce51-4dc0-8e2c-0a1369ff041d","metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"source":["- **Connect to Azure Ai Search Index**\n","- **Configure Vector Search & Update**"]},{"cell_type":"code","execution_count":null,"id":"1d3cea20-d017-4b20-ab28-4cc87a5d08a6","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["# %pip install azure-search-documents azure-search azure-core openai==0.28"]},{"cell_type":"code","execution_count":null,"id":"adefbefa-df7c-440f-a24d-5223468c3da8","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["import openai\n","from azure.search.documents.indexes import SearchIndexClient\n","from azure.search.documents.indexes.models import (\n"," SimpleField, SearchFieldDataType, SearchField,\n"," VectorSearch, HnswAlgorithmConfiguration, VectorSearchProfile,\n"," SemanticConfiguration, SemanticPrioritizedFields, SemanticField, SemanticSearch\n",")\n","from azure.core.credentials import AzureKeyCredential\n","\n","# Configuration\n","openai.api_type = \"azure\"\n","openai.api_base = \"https://xxxxxxxxxxxxxx.openai.azure.com/\"\n","openai.api_version = \"2023-11-01\"\n","openai.api_key = \"xxxxxxxxxxxxxxxxxxxx\"\n","\n","search_service_name = \"xxxxxxxxxxxxxx\"\n","search_index_name = \"xxxxxxxxxxx-index\"\n","admin_key = \"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\"\n","endpoint = f\"https://{search_service_name}.search.windows.net\"\n","\n","# Create a search index client\n","index_client = SearchIndexClient(endpoint=endpoint, credential=AzureKeyCredential(admin_key))\n","\n","# Retrieve the existing index\n","existing_index = index_client.get_index(search_index_name)\n","\n","# Define new fields if necessary (e.g., contentVector, searchContent)\n","new_fields = [\n"," SearchField(name=\"Embedding\", type=SearchFieldDataType.Collection(SearchFieldDataType.Single), \n"," searchable=True, vector_search_dimensions=1536, vector_search_profile_name=\"myHnswProfile\"),\n"," SearchField(name=\"searchContent\", type=SearchFieldDataType.String, searchable=True)\n","]\n","\n","# Add new fields to the existing fields\n","fields = existing_index.fields\n","for new_field in new_fields:\n"," if new_field.name not in [field.name for field in fields]:\n"," fields.append(new_field)\n","\n","# Configure the vector search\n","vector_search = VectorSearch(\n"," algorithms=[\n"," HnswAlgorithmConfiguration(\n"," name=\"myHnsw\",\n"," parameters={\n"," \"m\": 8,\n"," \"efConstruction\": 800,\n"," \"efSearch\": 800,\n"," \"metric\": \"cosine\"\n"," }\n"," )\n"," ],\n"," profiles=[\n"," VectorSearchProfile(\n"," name=\"myHnswProfile\",\n"," algorithm_configuration_name=\"myHnsw\",\n"," )\n"," ]\n",")\n","\n","# Define semantic configuration\n","semantic_config = SemanticConfiguration(\n"," name=\"my-semantic-config\",\n"," prioritized_fields=SemanticPrioritizedFields(\n"," title_field=SemanticField(field_name=\"Title\"),\n"," keywords_fields=[SemanticField(field_name=\"Genres\")],\n"," content_fields=[SemanticField(field_name=\"searchContent\")]\n"," )\n",")\n","\n","# Create the semantic settings with the configuration\n","semantic_search = SemanticSearch(configurations=[semantic_config])\n","\n","# Update the search index with the new fields and configurations\n","existing_index.fields = fields\n","existing_index.vector_search = vector_search\n","existing_index.semantic_search = semantic_search\n","\n","result = index_client.create_or_update_index(existing_index)\n","print(f'Index {result.name} updated successfully')\n"]},{"cell_type":"markdown","id":"cdece52e-0c27-48c0-ba5e-e3374af8fbe3","metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"source":["- **Create Embeddings (rate limit patience)**\n","- **Get Embeddings as JSON**\n","- **Upload to Azure AI Search**\n","- **Batch - Parallel**"]},{"cell_type":"code","execution_count":null,"id":"1ca5f8ae-17e2-42c9-aa45-074d12578395","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["import openai\n","import json\n","import time\n","from azure.search.documents import SearchClient\n","from azure.core.credentials import AzureKeyCredential\n","import concurrent.futures\n","import os\n","\n","# Configuration\n","openai.api_type = \"azure\"\n","openai.api_base = \"https://xxxxxxxxxxxxxx.openai.azure.com/\"\n","openai.api_version = \"2024-05-01-preview\"\n","openai.api_key = \"xxxxxxxxxxxxxxxxxxxxxxxxxxxxx\"\n","deployment_id = \"text-embedding-ada-002\"\n","\n","search_service_name = \"xxxxxxxxxxxxxxxxxx\"\n","search_index_name = \"xxxxxxxxxxxx-index\"\n","admin_key = \"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\"\n","endpoint = f\"https://{search_service_name}.search.windows.net\"\n","\n","# Initialize the search client\n","search_client = SearchClient(endpoint=endpoint, index_name=search_index_name, credential=AzureKeyCredential(admin_key))\n","\n","# Fetch all documents from the search index\n","results = search_client.search(search_text=\"*\", include_total_count=True)\n","documents = [doc for doc in results]\n","\n","# Function to generate embeddings for a batch of texts\n","def generate_embeddings_batch(texts, max_retries=7, backoff_factor=2):\n"," embeddings = []\n"," for text in texts:\n"," for attempt in range(max_retries):\n"," try:\n"," response = openai.Embedding.create(input=[text], deployment_id=deployment_id)\n"," embeddings.append(response['data'][0]['embedding'])\n"," break\n"," except openai.error.RateLimitError as e:\n"," if attempt < max_retries - 1:\n"," wait_time = backoff_factor * (2 ** attempt)\n"," print(f\"Rate limit exceeded. Retrying in {wait_time} seconds...\")\n"," time.sleep(wait_time)\n"," else:\n"," print(\"Max retries exceeded. Please try again later.\")\n"," raise e\n"," except Exception as e:\n"," print(f\"Unexpected error: {e}\")\n"," raise e\n"," time.sleep(1) # Add a delay between individual requests to reduce aggressiveness\n"," return embeddings\n","\n","def process_documents(documents, batch_size=5, max_workers=8):\n"," with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:\n"," futures = {}\n"," for i in range(0, len(documents), batch_size):\n"," batch = documents[i:i + batch_size]\n"," texts = [f\"{doc['Title']} {doc['Author']} {doc['Genres']} Rating: {doc['Rating']}\" for doc in batch]\n"," future = executor.submit(generate_embeddings_batch, texts)\n"," futures[future] = (batch, texts, i)\n","\n"," for future in concurrent.futures.as_completed(futures):\n"," try:\n"," embeddings = future.result()\n"," batch, texts, start_index = futures[future]\n"," print(f\"Processing batch starting at index {start_index}\")\n"," for j, embedding in enumerate(embeddings):\n"," documents[start_index + j]['Embedding'] = embedding\n"," documents[start_index + j]['searchContent'] = texts[j]\n"," \n"," except Exception as e:\n"," print(f\"Error processing batch: {e}\") \n","\n","# Process documents to generate embeddings\n","process_documents(documents)\n","\n","# Ensure the output directory exists\n","output_dir = \"/lakehouse/default/Files/embeddings\"\n","os.makedirs(output_dir, exist_ok=True)\n","\n","# Save the documents with embeddings to a JSON file in the lakehouse\n","output_file = os.path.join(output_dir, \"bookVectors.json\")\n","with open(output_file, 'w') as file:\n"," json.dump(documents, file, indent=2) # Adding indent parameter for pretty printing\n","\n","print(f\"Documents with embeddings saved to {output_file}\")\n"]},{"cell_type":"code","execution_count":null,"id":"31f5425f-8028-4180-b51f-85ba5f5555fb","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["from azure.search.documents import SearchClient\n","from azure.search.documents import SearchIndexingBufferedSender\n","\n","# Upload the documents with embeddings to the index\n","search_client = SearchClient(endpoint=endpoint, index_name=search_index_name, credential=AzureKeyCredential(admin_key))\n","\n","# Using SearchIndexingBufferedSender to upload the documents in batches optimized for indexing\n","with SearchIndexingBufferedSender(\n"," endpoint=endpoint,\n"," index_name=search_index_name,\n"," credential=AzureKeyCredential(admin_key),\n",") as batch_client:\n"," # Add upload actions for all documents\n"," with open(\"/lakehouse/default/Files/embeddings/bookVectors.json\", 'r') as file:\n"," documents = json.load(file)\n"," batch_client.upload_documents(documents=documents)\n","\n","print(f\"Uploaded {len(documents)} documents in total\")\n"]}],"metadata":{"dependencies":{"lakehouse":{"default_lakehouse":"83b65b13-7f82-4177-838c-f19a8134860b","default_lakehouse_name":"Datasets","default_lakehouse_workspace_id":"9750728a-936e-41b9-a6cd-1247d645f4c5","known_lakehouses":[{"id":"83b65b13-7f82-4177-838c-f19a8134860b"}]}},"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"display_name":"Synapse PySpark","language":"Python","name":"synapse_pyspark"},"language_info":{"name":"python"},"microsoft":{"language":"python","language_group":"synapse_pyspark","ms_spell_check":{"ms_spell_check_language":"en"}},"nteract":{"version":"nteract-front-end@1.0.0"},"spark_compute":{"compute_id":"/trident/default"},"synapse_widget":{"state":{},"version":"0.1"},"widgets":{}},"nbformat":4,"nbformat_minor":5} diff --git a/Notebooks/Books-Pipeline/5-booksVectorEmbeddings.ipynb b/Notebooks/Books-Pipeline/5-booksVectorEmbeddings.ipynb new file mode 100644 index 0000000..656e1ad --- /dev/null +++ b/Notebooks/Books-Pipeline/5-booksVectorEmbeddings.ipynb @@ -0,0 +1,367 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "ed133835-ce51-4dc0-8e2c-0a1369ff041d", + "metadata": { + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "**The Notebook updates the Index**\n", + "- **Connect to Azure Ai Search Index**\n", + "- **Configure Vector Search & Update**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1d3cea20-d017-4b20-ab28-4cc87a5d08a6", + "metadata": { + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "# %pip install azure-search-documents azure-search azure-core openai==0.28" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "adefbefa-df7c-440f-a24d-5223468c3da8", + "metadata": { + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "import openai\n", + "from azure.search.documents.indexes import SearchIndexClient\n", + "from azure.search.documents.indexes.models import (\n", + " SimpleField, SearchFieldDataType, SearchField,\n", + " VectorSearch, HnswAlgorithmConfiguration, VectorSearchProfile,\n", + " SemanticConfiguration, SemanticPrioritizedFields, SemanticField, SemanticSearch\n", + ")\n", + "from azure.core.credentials import AzureKeyCredential\n", + "\n", + "# Configuration\n", + "openai.api_type = \"azure\"\n", + "openai.api_base = \"https://xxxxx.openai.azure.com/\"\n", + "openai.api_version = \"2023-11-01\"\n", + "openai.api_key = \"xxxxxx\"\n", + "\n", + "search_service_name = \"xxxxxx\"\n", + "search_index_name = \"books-index\"\n", + "admin_key = \"xxxxxxx\"\n", + "endpoint = f\"https://{search_service_name}.search.windows.net\"\n", + "\n", + "# Create a search index client\n", + "index_client = SearchIndexClient(endpoint=endpoint, credential=AzureKeyCredential(admin_key))\n", + "\n", + "# Retrieve the existing index\n", + "existing_index = index_client.get_index(search_index_name)\n", + "\n", + "# Define new fields if necessary (e.g., contentVector, searchContent)\n", + "new_fields = [\n", + " SearchField(name=\"Embedding\", type=SearchFieldDataType.Collection(SearchFieldDataType.Single),\n", + " searchable=True, vector_search_dimensions=1536, vector_search_profile_name=\"myHnswProfile\"),\n", + " SearchField(name=\"searchContent\", type=SearchFieldDataType.String, searchable=True),\n", + " SearchField(name=\"DescriptionEmbedding\", type=SearchFieldDataType.Collection(SearchFieldDataType.Single),\n", + " searchable=True, vector_search_dimensions=1536, vector_search_profile_name=\"myHnswProfile\")\n", + "]\n", + "\n", + "# Add new fields to the existing fields\n", + "fields = existing_index.fields\n", + "for new_field in new_fields:\n", + " if new_field.name not in [field.name for field in fields]:\n", + " fields.append(new_field)\n", + "\n", + "# Configure the vector search\n", + "vector_search = VectorSearch(\n", + " algorithms=[\n", + " HnswAlgorithmConfiguration(\n", + " name=\"myHnsw\",\n", + " parameters={\n", + " \"m\": 8,\n", + " \"efConstruction\": 800,\n", + " \"efSearch\": 800,\n", + " \"metric\": \"cosine\"\n", + " }\n", + " )\n", + " ],\n", + " profiles=[\n", + " VectorSearchProfile(\n", + " name=\"myHnswProfile\",\n", + " algorithm_configuration_name=\"myHnsw\",\n", + " )\n", + " ]\n", + ")\n", + "\n", + "# Define semantic configuration\n", + "semantic_config = SemanticConfiguration(\n", + " name=\"my-semantic-config\",\n", + " prioritized_fields=SemanticPrioritizedFields(\n", + " title_field=SemanticField(field_name=\"Title\"),\n", + " keywords_fields=[SemanticField(field_name=\"Genres\")],\n", + " content_fields=[SemanticField(field_name=\"searchContent\"), SemanticField(field_name=\"Description\")]\n", + " )\n", + ")\n", + "\n", + "# Create the semantic settings with the configuration\n", + "semantic_search = SemanticSearch(configurations=[semantic_config])\n", + "\n", + "# Update the search index with the new fields and configurations\n", + "existing_index.fields = fields\n", + "existing_index.vector_search = vector_search\n", + "existing_index.semantic_search = semantic_search\n", + "\n", + "result = index_client.create_or_update_index(existing_index)\n", + "print(f'Index {result.name} updated successfully')\n" + ] + }, + { + "cell_type": "markdown", + "id": "cdece52e-0c27-48c0-ba5e-e3374af8fbe3", + "metadata": { + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "- **Create Embeddings (rate limit patience)**\n", + "- **Get Embeddings as JSON**\n", + "- **Upload to Azure AI Search**\n", + "- **Batch - Parallel**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9a7cf705-308d-416d-acad-4d7d8972424c", + "metadata": { + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "import openai\n", + "import json\n", + "import time\n", + "from azure.search.documents import SearchClient\n", + "from azure.core.credentials import AzureKeyCredential\n", + "from azure.search.documents import SearchIndexingBufferedSender\n", + "import concurrent.futures\n", + "import os\n", + "\n", + "# Configuration\n", + "openai.api_type = \"azure\"\n", + "openai.api_base = \"https://xxxxx.openai.azure.com/\"\n", + "openai.api_version = \"2024-05-01-preview\"\n", + "openai.api_key = \"xxxxxxx\"\n", + "deployment_id = \"text-embedding-ada-002\"\n", + "\n", + "search_service_name = \"xxxxxx\"\n", + "search_index_name = \"books-index\"\n", + "admin_key = \"xxxxxxxx\"\n", + "endpoint = f\"https://{search_service_name}.search.windows.net\"\n", + "\n", + "# Initialize the search client\n", + "search_client = SearchClient(endpoint=endpoint, index_name=search_index_name, credential=AzureKeyCredential(admin_key))\n", + "\n", + "# Fetch all documents from the search index\n", + "results = search_client.search(search_text=\"*\", include_total_count=True)\n", + "documents = [doc for doc in results]\n", + "\n", + "# Function to generate embeddings for a batch of texts\n", + "def generate_embeddings_batch(texts, max_retries=7, backoff_factor=2):\n", + " embeddings = []\n", + " for text in texts:\n", + " for attempt in range(max_retries):\n", + " try:\n", + " response = openai.Embedding.create(input=[text], deployment_id=deployment_id)\n", + " embeddings.append(response['data'][0]['embedding'])\n", + " break\n", + " except openai.error.RateLimitError as e:\n", + " if attempt < max_retries - 1:\n", + " wait_time = backoff_factor * (2 ** attempt)\n", + " print(f\"Rate limit exceeded. Retrying in {wait_time} seconds...\")\n", + " time.sleep(wait_time)\n", + " else:\n", + " print(\"Max retries exceeded. Please try again later.\")\n", + " raise e\n", + " except Exception as e:\n", + " print(f\"Unexpected error: {e}\")\n", + " raise e\n", + " time.sleep(1) # Add a delay between individual requests to reduce aggressiveness\n", + " return embeddings\n", + "\n", + "def process_documents(documents, batch_size=5, max_workers=8):\n", + " with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:\n", + " futures = {}\n", + " for i in range(0, len(documents), batch_size):\n", + " batch = documents[i:i + batch_size]\n", + " \n", + " # Create searchContent text\n", + " search_texts = [f\"{doc.get('Title', '')} {doc.get('Author', '')} {doc.get('Genres', '')} Rating: {doc.get('Rating', '')}\" for doc in batch]\n", + " description_texts = [doc.get('Description', '') if 'Description' in doc else '' for doc in batch]\n", + " \n", + " # Generate embeddings for both searchContent and Description in parallel\n", + " future_search = executor.submit(generate_embeddings_batch, search_texts)\n", + " future_description = executor.submit(generate_embeddings_batch, description_texts)\n", + " \n", + " futures[future_search] = ('Embedding', batch, search_texts, i)\n", + " futures[future_description] = ('DescriptionEmbedding', batch, description_texts, i)\n", + "\n", + " # Process the completed futures\n", + " for future in concurrent.futures.as_completed(futures):\n", + " try:\n", + " field_name, batch, texts, start_index = futures[future]\n", + " embeddings = future.result()\n", + " print(f\"Processing batch starting at index {start_index} for {field_name}\")\n", + "\n", + " for j, embedding in enumerate(embeddings):\n", + " # Store the embeddings in the respective fields\n", + " documents[start_index + j][field_name] = embedding\n", + " if field_name == 'Embedding':\n", + " documents[start_index + j]['searchContent'] = texts[j]\n", + " elif field_name == 'DescriptionEmbedding':\n", + " documents[start_index + j]['Description'] = texts[j] # Optional: store the text used for embedding\n", + "\n", + " except Exception as e:\n", + " print(f\"Error processing batch: {e}\") \n", + " \n", + "\n", + "# Process documents to generate embeddings\n", + "process_documents(documents)\n", + "\n", + "# Ensure the output directory exists\n", + "output_dir = \"/lakehouse/default/Files/embeddings\"\n", + "os.makedirs(output_dir, exist_ok=True)\n", + "\n", + "# Save the documents with embeddings to a JSON file in the lakehouse\n", + "output_file = os.path.join(output_dir, \"bookVectors.json\")\n", + "with open(output_file, 'w') as file:\n", + " json.dump(documents, file, indent=2)\n", + "\n", + "print(f\"Documents with embeddings saved to {output_file}\")\n", + "\n", + "# Upload the documents with embeddings to the index\n", + "# search_client = SearchClient(endpoint=endpoint, index_name=search_index_name, credential=AzureKeyCredential(admin_key))\n", + "\n", + "# Using SearchIndexingBufferedSender to upload the documents in batches optimized for indexing\n", + "with SearchIndexingBufferedSender(\n", + " endpoint=endpoint,\n", + " index_name=search_index_name,\n", + " credential=AzureKeyCredential(admin_key),\n", + ") as batch_client:\n", + " # Add upload actions for all documents\n", + " with open(\"/lakehouse/default/Files/embeddings/bookVectors.json\", 'r') as file:\n", + " documents = json.load(file)\n", + " batch_client.upload_documents(documents=documents)\n", + "\n", + "print(f\"Uploaded {len(documents)} documents in total\")\n" + ] + } + ], + "metadata": { + "dependencies": { + "environment": { + "environmentId": "d5a85687-f4c8-4dae-86ec-ba90dc32a717", + "workspaceId": "9750728a-936e-41b9-a6cd-1247d645f4c5" + }, + "lakehouse": { + "default_lakehouse": "83b65b13-7f82-4177-838c-f19a8134860b", + "default_lakehouse_name": "Datasets", + "default_lakehouse_workspace_id": "9750728a-936e-41b9-a6cd-1247d645f4c5", + "known_lakehouses": [ + { + "id": "83b65b13-7f82-4177-838c-f19a8134860b" + } + ] + } + }, + "kernel_info": { + "name": "synapse_pyspark" + }, + "kernelspec": { + "display_name": "Synapse PySpark", + "language": "Python", + "name": "synapse_pyspark" + }, + "language_info": { + "name": "python" + }, + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark", + "ms_spell_check": { + "ms_spell_check_language": "en" + } + }, + "nteract": { + "version": "nteract-front-end@1.0.0" + }, + "spark_compute": { + "compute_id": "/trident/default", + "session_options": { + "conf": { + "spark.synapse.nbs.session.timeout": "1200000" + } + } + }, + "synapse_widget": { + "state": {}, + "version": "0.1" + }, + "widgets": {} + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/Notebooks/Users-Pipeline/new-users-index.ipynb b/Notebooks/Users-Pipeline/new-users-index.ipynb new file mode 100644 index 0000000..17ab037 --- /dev/null +++ b/Notebooks/Users-Pipeline/new-users-index.ipynb @@ -0,0 +1,320 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "fde1d68a-b954-4562-a6a2-ea8a54f2294a", + "metadata": { + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "### New Users Update notebook\n", + "- This Notebook updates the Users Index when a new user registers and updates the Users Table in Azure SQL\n", + "- The Notebook runs upon a CDC Trigger that initiates the Data Pipeline which fetches the Users Table\n", + "- The Notebook compares the current Users Index and updates only the new ones as well as creates and saves the embeddings" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "602762e5-9478-4736-a62f-0c2c6201e218", + "metadata": { + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "import json\n", + "\n", + "# Define input and output file paths\n", + "input_file = '/lakehouse/default/Files/userdata.json'\n", + "output_file = '/lakehouse/default/Files/udata.json'\n", + "\n", + "# Read the JSON file\n", + "with open(input_file, 'r') as file:\n", + " user_data = json.load(file)\n", + "\n", + "# Transform the Genres field from a JSON string to an actual list\n", + "for user in user_data:\n", + " try:\n", + " user['Genres'] = json.loads(user['Genres'])\n", + " except json.JSONDecodeError as e:\n", + " print(f\"Invalid JSON format for Genres in user ID {user.get('id', 'unknown')}: {user['Genres']}. Error: {e}\")\n", + " user['Genres'] = [] # Set to an empty list or handle it as per your requirement\n", + "\n", + "# Write the transformed data to a new JSON file\n", + "with open(output_file, 'w') as file:\n", + " json.dump(user_data, file, indent=4)\n", + "\n", + "print(f\"Transformed data saved to {output_file}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "91057c70-74ec-4c28-a3c5-aab917fa316d", + "metadata": { + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark" + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "import openai\n", + "import json\n", + "import time\n", + "import os\n", + "from azure.search.documents import SearchClient\n", + "from azure.core.credentials import AzureKeyCredential\n", + "import concurrent.futures\n", + "from azure.search.documents import SearchIndexingBufferedSender\n", + "\n", + "# Configuration\n", + "openai.api_type = \"azure\"\n", + "openai.api_base = \"https://xxxxx.openai.azure.com/\"\n", + "openai.api_version = \"2024-02-01\"\n", + "openai.api_key = \"xxxxxxxx\"\n", + "deployment_id = \"text-embedding-ada-002\"\n", + "\n", + "search_service_name = \"xxxxx\"\n", + "search_index_name = \"xxxxx-index\"\n", + "admin_key = \"xxxxxxxx\"\n", + "endpoint = f\"https://{search_service_name}.search.windows.net\"\n", + "\n", + "# Initialize the search client\n", + "search_client = SearchClient(endpoint=endpoint, index_name=search_index_name, credential=AzureKeyCredential(admin_key))\n", + "\n", + "# Step 1: Fetch existing user IDs and their data from the search index\n", + "def get_existing_users():\n", + " existing_users = {}\n", + " results = search_client.search(\"*\", select=[\"UserId\", \"Genres\", \"Age\"], include_total_count=True)\n", + " for result in results:\n", + " existing_users[result[\"UserId\"]] = result\n", + " return existing_users\n", + "\n", + "existing_users = get_existing_users()\n", + "print(f\"Fetched {len(existing_users)} existing users from the index.\")\n", + "\n", + "# Step 2: Load user data from JSON file\n", + "input_file = '/lakehouse/default/Files/udata.json'\n", + "with open(input_file, 'r') as file:\n", + " user_data = json.load(file)\n", + "\n", + "# Step 3: Validate each document and separate valid and invalid documents\n", + "valid_documents = []\n", + "invalid_documents = []\n", + "\n", + "def validate_documents(user_data):\n", + " for doc in user_data:\n", + " valid = True\n", + "\n", + " if 'UserId' in doc:\n", + " try:\n", + " doc['UserId'] = str(doc['UserId'])\n", + " except ValueError:\n", + " print(f\"Invalid value for UserId in document ID {doc['UserId']}: {doc['UserId']}\")\n", + " invalid_documents.append(doc)\n", + " valid = False\n", + " \n", + " # Validate Genres is a list of strings\n", + " if 'Genres' in doc:\n", + " if isinstance(doc['Genres'], str):\n", + " try:\n", + " doc['Genres'] = json.loads(doc['Genres'])\n", + " except json.JSONDecodeError:\n", + " print(f\"Invalid JSON format for Genres in document ID {doc['UserId']}: {doc['Genres']}\")\n", + " invalid_documents.append(doc)\n", + " valid = False\n", + " elif isinstance(doc['Genres'], list):\n", + " if not all(isinstance(genre, str) for genre in doc['Genres']):\n", + " print(f\"Unexpected format for Genres in document ID {doc['UserId']}: {doc['Genres']}\")\n", + " invalid_documents.append(doc)\n", + " valid = False\n", + " else:\n", + " print(f\"Unexpected format for Genres in document ID {doc['UserId']}: {doc['Genres']}\")\n", + " invalid_documents.append(doc)\n", + " valid = False\n", + " \n", + " if valid:\n", + " valid_documents.append(doc)\n", + "\n", + "# Run validation\n", + "validate_documents(user_data)\n", + "\n", + "print(f\"Valid documents: {len(valid_documents)}\")\n", + "print(f\"Invalid documents: {len(invalid_documents)}\")\n", + "\n", + "# Step 4: Filter out users who already exist and have the same data\n", + "def filter_new_or_updated_users(existing_users, valid_documents):\n", + " new_or_updated_users = []\n", + " for doc in valid_documents:\n", + " user_id = doc.get('UserId')\n", + " if user_id in existing_users:\n", + " existing_doc = existing_users[user_id]\n", + " # Check if the existing user data is the same\n", + " if doc['Genres'] == existing_doc.get('Genres') and doc['Age'] == existing_doc.get('Age'):\n", + " continue # Skip users whose data hasn't changed\n", + " new_or_updated_users.append(doc)\n", + " return new_or_updated_users\n", + "\n", + "new_users = filter_new_or_updated_users(existing_users, valid_documents)\n", + "print(f\"Found {len(new_users)} new or updated users to process.\")\n", + "\n", + "# Step 5: Upload the new or updated users to the Azure Search index\n", + "def upload_documents_to_index(documents):\n", + " try:\n", + " if documents:\n", + " result = search_client.upload_documents(documents=documents)\n", + " print(f\"Uploaded {len(documents)} documents to the Azure Search index.\")\n", + " else:\n", + " print(\"No new or updated documents to upload.\")\n", + " except Exception as e:\n", + " print(f\"Error uploading documents: {e}\")\n", + "\n", + "upload_documents_to_index(new_users)\n", + "\n", + "\n", + "\n", + "# Step 4: Generate embeddings for new or updated users\n", + "def generate_embeddings_batch(texts, max_retries=7, backoff_factor=2):\n", + " embeddings = []\n", + " for text in texts:\n", + " for attempt in range(max_retries):\n", + " try:\n", + " response = openai.Embedding.create(input=text, engine=deployment_id)\n", + " embeddings.append(response['data'][0]['embedding'])\n", + " break\n", + " except openai.error.RateLimitError:\n", + " wait_time = backoff_factor * (2 ** attempt)\n", + " print(f\"Rate limit exceeded. Retrying in {wait_time} seconds...\")\n", + " time.sleep(wait_time)\n", + " except Exception as e:\n", + " print(f\"Error generating embedding: {e}\")\n", + " raise e\n", + " time.sleep(0.5)\n", + " return embeddings\n", + "\n", + "def process_documents(documents, batch_size=5, max_workers=8):\n", + " with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:\n", + " futures = {}\n", + " for i in range(0, len(documents), batch_size):\n", + " batch = documents[i:i + batch_size]\n", + " texts = [f\"{' '.join(doc['Genres'])} {doc['Age']}\" for doc in batch]\n", + " future = executor.submit(generate_embeddings_batch, texts)\n", + " futures[future] = (batch, texts, i)\n", + "\n", + " for future in concurrent.futures.as_completed(futures):\n", + " embeddings = future.result()\n", + " batch, texts, start_index = futures[future]\n", + " for j, embedding in enumerate(embeddings):\n", + " # Store embeddings in the document\n", + " batch[j]['Embedding'] = embedding\n", + " batch[j]['searchContent'] = texts[j]\n", + "\n", + "# Step 5: Process and generate embeddings for filtered users\n", + "if new_users:\n", + " process_documents(new_users)\n", + "\n", + "\n", + "# Ensure the output directory exists\n", + "output_dir = \"/lakehouse/default/Files/embeddings\"\n", + "os.makedirs(output_dir, exist_ok=True)\n", + "\n", + "output_file = os.path.join(output_dir, \"updated_users.json\")\n", + "with open(output_file, 'w') as file:\n", + " json.dump(new_users, file, indent=2)\n", + "\n", + "print(f\"Updated users with embeddings saved to {output_file}\")\n", + "\n", + "\n", + "# Upload the documents with embeddings to the index\n", + "try:\n", + " with SearchIndexingBufferedSender(\n", + " endpoint=endpoint,\n", + " index_name=search_index_name,\n", + " credential=AzureKeyCredential(admin_key),\n", + " ) as batch_client:\n", + " with open(output_file, 'r') as file:\n", + " documents = json.load(file)\n", + " batch_client.upload_documents(documents=documents)\n", + "\n", + " print(f\"Uploaded {len(documents)} documents in total\")\n", + "except Exception as e:\n", + " print(f\"Error uploading documents: {e}\")\n" + ] + } + ], + "metadata": { + "dependencies": { + "environment": { + "environmentId": "d5a85687-f4c8-4dae-86ec-ba90dc32a717", + "workspaceId": "9750728a-936e-41b9-a6cd-1247d645f4c5" + }, + "lakehouse": { + "default_lakehouse": "83b65b13-7f82-4177-838c-f19a8134860b", + "default_lakehouse_name": "Datasets", + "default_lakehouse_workspace_id": "9750728a-936e-41b9-a6cd-1247d645f4c5" + } + }, + "kernel_info": { + "name": "synapse_pyspark" + }, + "kernelspec": { + "display_name": "Synapse PySpark", + "language": "Python", + "name": "synapse_pyspark" + }, + "language_info": { + "name": "python" + }, + "microsoft": { + "language": "python", + "language_group": "synapse_pyspark", + "ms_spell_check": { + "ms_spell_check_language": "en" + } + }, + "nteract": { + "version": "nteract-front-end@1.0.0" + }, + "spark_compute": { + "compute_id": "/trident/default", + "session_options": { + "conf": { + "spark.synapse.nbs.session.timeout": "1200000" + } + } + }, + "widgets": {} + }, + "nbformat": 4, + "nbformat_minor": 5 +}