-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path8.ingest_qualifying_file.py
67 lines (45 loc) · 2.11 KB
/
8.ingest_qualifying_file.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# Databricks notebook source
# MAGIC %md
# MAGIC ### Ingest qualifying json files
# COMMAND ----------
# MAGIC %md
# MAGIC ##### Step 1 - Read the JSON file using the spark dataframe reader API
# COMMAND ----------
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# COMMAND ----------
qualifying_schema = StructType(fields=[StructField("qualifyId", IntegerType(), False),
StructField("raceId", IntegerType(), True),
StructField("driverId", IntegerType(), True),
StructField("constructorId", IntegerType(), True),
StructField("number", IntegerType(), True),
StructField("position", IntegerType(), True),
StructField("q1", StringType(), True),
StructField("q2", StringType(), True),
StructField("q3", StringType(), True),
])
# COMMAND ----------
qualifying_df = spark.read \
.schema(qualifying_schema) \
.option("multiLine", True) \
.json("/mnt/formula1dl/raw/qualifying")
# COMMAND ----------
# MAGIC %md
# MAGIC ##### Step 2 - Rename columns and add new columns
# MAGIC 1. Rename qualifyingId, driverId, constructorId and raceId
# MAGIC 1. Add ingestion_date with current timestamp
# COMMAND ----------
from pyspark.sql.functions import current_timestamp
# COMMAND ----------
final_df = qualifying_df.withColumnRenamed("qualifyId", "qualify_id") \
.withColumnRenamed("driverId", "driver_id") \
.withColumnRenamed("raceId", "race_id") \
.withColumnRenamed("constructorId", "constructor_id") \
.withColumn("ingestion_date", current_timestamp())
# COMMAND ----------
# MAGIC %md
# MAGIC ##### Step 3 - Write to output to processed container in parquet format
# COMMAND ----------
final_df.write.mode("overwrite").parquet("/mnt/formula1dl/processed/qualifying")
# COMMAND ----------
display(spark.read.parquet('/mnt/formula1dl/processed/qualifying'))
# COMMAND ----------