-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #26 from passadis/dev
Dev
- Loading branch information
Showing
11 changed files
with
1,185 additions
and
5 deletions.
There are no files selected for viewing
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
{"cells":[{"cell_type":"markdown","source":["- **CLEAN BOOKS DATASET CSV**"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"2b2668cd-20f5-48d7-88a3-47303d54203f"},{"cell_type":"code","source":["from pyspark.sql import SparkSession\n","from pyspark.sql.functions import udf, col, size, split\n","from pyspark.sql.types import StringType, BooleanType, StructType, StructField, FloatType\n","import re\n","\n","# Initialize Spark Session\n","spark = SparkSession.builder.appName(\"CleanCSVData\").getOrCreate()\n","\n","# Define schema to ensure columns are interpreted correctly\n","schema = StructType([\n"," StructField(\"id\", StringType(), True),\n"," StructField(\"Book\", StringType(), True),\n"," StructField(\"Author\", StringType(), True),\n"," StructField(\"Description\", StringType(), True),\n"," StructField(\"Genres\", StringType(), True),\n"," StructField(\"Rating\", FloatType(), True)\n","])\n","\n","# Read the CSV file into a Spark DataFrame with proper handling of special characters\n","df = spark.read.format(\"csv\") \\\n"," .option(\"header\", \"true\") \\\n"," .option(\"quote\", '\"') \\\n"," .option(\"escape\", '\\\\') \\\n"," .option(\"delimiter\", \",\") \\\n"," .option(\"multiLine\", \"true\") \\\n"," .schema(schema) \\\n"," .load(\"abfss://[email protected]/Datasets.Lakehouse/Files/books.csv\")\n","\n","# Display the original DataFrame\n","display(df)\n","\n","# Define a function to check if the row has the expected number of columns\n","def is_valid_row(*columns):\n"," # Check if all columns have non-null values\n"," if len(columns) != 6:\n"," return False\n"," return all(columns)\n","\n","is_valid_row_udf = udf(is_valid_row, BooleanType())\n","\n","# Apply the UDF to filter rows with missing or misaligned columns\n","df_clean = df.filter(is_valid_row_udf(\n"," col(\"id\"),\n"," col(\"Book\"),\n"," col(\"Author\"),\n"," col(\"Description\"),\n"," col(\"Genres\"),\n"," col(\"Rating\")\n","))\n","\n","# Additional filtering to catch multi-line anomalies in the 'Description' field\n","df_clean = df_clean.filter(~col(\"Description\").contains(\"\\n\"))\n","\n","# Display the cleaned DataFrame\n","display(df_clean)\n","\n","# Coalesce the DataFrame to a single partition for saving\n","df_clean_coalesced = df_clean.coalesce(1)\n","\n","# Save the cleaned DataFrame to a new CSV file in Azure Data Lake Storage\n","cleaned_file_path = \"abfss://[email protected]/Datasets.Lakehouse/Files/booksclean\"\n","df_clean_coalesced.write.format(\"csv\") \\\n"," .option(\"header\", \"true\") \\\n"," .mode(\"overwrite\") \\\n"," .save(cleaned_file_path)\n","\n","print(f\"Cleaned data saved to: {cleaned_file_path}\")\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"collapsed":false},"id":"549e66cd-6ac5-4ebf-8187-e6e768211dbc"}],"metadata":{"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"name":"synapse_pyspark","language":"Python","display_name":"Synapse PySpark"},"language_info":{"name":"python"},"microsoft":{"language":"python","language_group":"synapse_pyspark","ms_spell_check":{"ms_spell_check_language":"en"}},"widgets":{},"nteract":{"version":"[email protected]"},"synapse_widget":{"version":"0.1","state":{}},"spark_compute":{"compute_id":"/trident/default","session_options":{"conf":{"spark.synapse.nbs.session.timeout":"1200000"}}},"dependencies":{"lakehouse":{"default_lakehouse":"83b65b13-7f82-4177-838c-f19a8134860b","known_lakehouses":[{"id":"83b65b13-7f82-4177-838c-f19a8134860b"}],"default_lakehouse_name":"Datasets","default_lakehouse_workspace_id":"9750728a-936e-41b9-a6cd-1247d645f4c5"},"environment":{"environmentId":"d5a85687-f4c8-4dae-86ec-ba90dc32a717","workspaceId":"9750728a-936e-41b9-a6cd-1247d645f4c5"}}},"nbformat":4,"nbformat_minor":5} |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
{ | ||
"cells": [ | ||
{ | ||
"cell_type": "markdown", | ||
"id": "c65c4afc-dde1-4e03-94f4-521b780dbbe7", | ||
"metadata": { | ||
"microsoft": { | ||
"language": "python", | ||
"language_group": "synapse_pyspark" | ||
}, | ||
"nteract": { | ||
"transient": { | ||
"deleting": false | ||
} | ||
} | ||
}, | ||
"source": [ | ||
"- **Convert Books Dataset from CSV to JSON**" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"id": "143fec06-0102-45e8-ba65-21cb8e1a11d8", | ||
"metadata": { | ||
"jupyter": { | ||
"outputs_hidden": false, | ||
"source_hidden": false | ||
}, | ||
"microsoft": { | ||
"language": "python", | ||
"language_group": "synapse_pyspark" | ||
}, | ||
"nteract": { | ||
"transient": { | ||
"deleting": false | ||
} | ||
} | ||
}, | ||
"outputs": [], | ||
"source": [ | ||
"from pyspark.sql import SparkSession\n", | ||
"from pyspark.sql.functions import col, udf\n", | ||
"from pyspark.sql.types import ArrayType, StringType\n", | ||
"import ast\n", | ||
"\n", | ||
"# Initialize Spark session\n", | ||
"spark = SparkSession.builder.appName(\"TransformGenres\").getOrCreate()\n", | ||
"\n", | ||
"# Path to your CSV file\n", | ||
"input_file = \"abfss://[email protected]/Datasets.Lakehouse/Files/bookers.csv\"\n", | ||
"\n", | ||
"# Load data into Spark DataFrame\n", | ||
"df = spark.read.format(\"csv\").option(\"header\", \"true\").load(input_file)\n", | ||
"\n", | ||
"# Function to convert genres string to a list of strings with error handling\n", | ||
"def convert_genres_to_list(genres_str):\n", | ||
" try:\n", | ||
" return ast.literal_eval(genres_str)\n", | ||
" except (ValueError, SyntaxError):\n", | ||
" return []\n", | ||
"\n", | ||
"# Register the UDF\n", | ||
"convert_genres_udf = udf(convert_genres_to_list, ArrayType(StringType()))\n", | ||
"\n", | ||
"# Apply the conversion UDF to the Genres column\n", | ||
"df = df.withColumn(\"Genres\", convert_genres_udf(col(\"Genres\")))\n", | ||
"\n", | ||
"# Rename columns to match the index headers\n", | ||
"df = df.withColumnRenamed(\"_c0\", \"id\") \\\n", | ||
" .withColumnRenamed(\"Book\", \"Title\") \\\n", | ||
" .withColumnRenamed(\"Avg_Rating\", \"Rating\")\n", | ||
"\n", | ||
"# Ensure the id field is a string\n", | ||
"df = df.withColumn(\"id\", df[\"id\"].cast(StringType()))\n", | ||
"\n", | ||
"# Show the transformed DataFrame to verify\n", | ||
"df.show(truncate=False)\n", | ||
"\n", | ||
"# Save the transformed DataFrame to a JSON file\n", | ||
"output_file = \"abfss://[email protected]/Datasets.Lakehouse/Files/bookers.json\" # Adjust the path as needed\n", | ||
"df.write.json(output_file, mode='overwrite')\n", | ||
"\n", | ||
"# Verify the JSON structure by reading it back\n", | ||
"transformed_df = spark.read.json(output_file)\n", | ||
"transformed_df.show(truncate=False)\n" | ||
] | ||
} | ||
], | ||
"metadata": { | ||
"dependencies": { | ||
"lakehouse": { | ||
"default_lakehouse": "83b65b13-7f82-4177-838c-f19a8134860b", | ||
"default_lakehouse_name": "Datasets", | ||
"default_lakehouse_workspace_id": "9750728a-936e-41b9-a6cd-1247d645f4c5", | ||
"known_lakehouses": [ | ||
{ | ||
"id": "83b65b13-7f82-4177-838c-f19a8134860b" | ||
} | ||
] | ||
} | ||
}, | ||
"kernel_info": { | ||
"name": "synapse_pyspark" | ||
}, | ||
"kernelspec": { | ||
"display_name": "Synapse PySpark", | ||
"language": "Python", | ||
"name": "synapse_pyspark" | ||
}, | ||
"language_info": { | ||
"name": "python" | ||
}, | ||
"microsoft": { | ||
"language": "python", | ||
"language_group": "synapse_pyspark", | ||
"ms_spell_check": { | ||
"ms_spell_check_language": "en" | ||
} | ||
}, | ||
"nteract": { | ||
"version": "[email protected]" | ||
}, | ||
"spark_compute": { | ||
"compute_id": "/trident/default", | ||
"session_options": { | ||
"conf": { | ||
"spark.synapse.nbs.session.timeout": "1200000" | ||
} | ||
} | ||
}, | ||
"synapse_widget": { | ||
"state": {}, | ||
"version": "0.1" | ||
}, | ||
"widgets": {} | ||
}, | ||
"nbformat": 4, | ||
"nbformat_minor": 5 | ||
} |
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.