-
Notifications
You must be signed in to change notification settings - Fork 0
/
7.ingest_lap_times_file.py
57 lines (37 loc) · 1.61 KB
/
7.ingest_lap_times_file.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# Databricks notebook source
# MAGIC %md
# MAGIC ### Ingest lap_times folder
# COMMAND ----------
# MAGIC %md
# MAGIC ##### Step 1 - Read the CSV file using the spark dataframe reader API
# COMMAND ----------
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# COMMAND ----------
lap_times_schema = StructType(fields=[StructField("raceId", IntegerType(), False),
StructField("driverId", IntegerType(), True),
StructField("lap", IntegerType(), True),
StructField("position", IntegerType(), True),
StructField("time", StringType(), True),
StructField("milliseconds", IntegerType(), True)
])
# COMMAND ----------
lap_times_df = spark.read \
.schema(lap_times_schema) \
.csv("/mnt/formula1dl/raw/lap_times")
# COMMAND ----------
# MAGIC %md
# MAGIC ##### Step 2 - Rename columns and add new columns
# MAGIC 1. Rename driverId and raceId
# MAGIC 1. Add ingestion_date with current timestamp
# COMMAND ----------
from pyspark.sql.functions import current_timestamp
# COMMAND ----------
final_df = lap_times_df.withColumnRenamed("driverId", "driver_id") \
.withColumnRenamed("raceId", "race_id") \
.withColumn("ingestion_date", current_timestamp())
# COMMAND ----------
# MAGIC %md
# MAGIC ##### Step 3 - Write to output to processed container in parquet format
# COMMAND ----------
final_df.write.mode("overwrite").parquet("/mnt/formula1dl/processed/lap_times")
# COMMAND ----------