-
Notifications
You must be signed in to change notification settings - Fork 0
/
write_to_iceberg
64 lines (47 loc) · 2.06 KB
/
write_to_iceberg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from delta import *
spark = SparkSession \
.builder \
.appName("word_counter") \
.config('spark.jars.packages','org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.14.1') \
.config('spark.sql.extensions','org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.config('spark.sql.catalog.spark_catalog','org.apache.iceberg.spark.SparkSessionCatalog') \
.config('spark.sql.catalog.spark_catalog.type','hive') \
.config('spark.sql.catalog.local','org.apache.iceberg.spark.SparkCatalog') \
.config('spark.sql.catalog.local.type','hadoop') \
.config('spark.sql.catalog.local.warehouse','warehouse') \
.getOrCreate()
# spark = SparkSession \
# .builder \
# .appName("word_counter") \
# .config('spark.jars.packages','io.delta:delta-core_2.12:2.1.0') \
# .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
# .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
# .getOrCreate()
spark.sql('DROP TABLE local.db.table')
spark.sql('CREATE TABLE local.db.table (unique_word string, count bigint) USING iceberg')
# spark.sql('DROP TABLE local.db.table')
inputPath = "./text.txt"
schema1 = StructType([StructField('word', StringType(), True)])
print('step1')
inputDF = spark.read.option("delimiter", "/t").schema(schema1).csv(inputPath)
inputDF.show()
print('step2')
inputDF = inputDF.select(split(col("word")," ").alias("word"))
inputDF.show()
print('step3')
inputDF = inputDF.select(inputDF.word,explode(inputDF.word)).withColumnRenamed("col","unique_word")
inputDF.show()
print('step4')
inputDF = inputDF.filter(inputDF["unique_word"] != "")
inputDF.show()
print('step5')
inputDF = inputDF.groupBy("unique_word").count()
inputDF.show()
print('writing to iceberg')
# inputDF.writeTo("local.db.table").create()
inputDF.writeTo("local.db.table").append()
DF_from_iceberg = spark.table("local.db.table")
DF_from_iceberg.show()