Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Field not found in source schema #11843

Open
rohitanil opened this issue Dec 21, 2024 · 2 comments
Open

Field not found in source schema #11843

rohitanil opened this issue Dec 21, 2024 · 2 comments
Labels
question Further information is requested

Comments

@rohitanil
Copy link

rohitanil commented Dec 21, 2024

Query engine

Spark

Question

I am using a REST catalog, minio and spark iceberg on docker to read from Kafka topic, make some transformations and write it out as iceberg tables. I am able to print the transformed data to the console but throws an error when I write to Iceberg. I can see metadata in minio

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, min, max, avg, window
from pyspark.sql.types import StructField, StructType, DoubleType, StringType, TimestampType

if __name__ == "__main__":
    spark = SparkSession.builder.appName("Kafka-Iceberg-Stream-Processor").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    schema = StructType([
        StructField("BTC_EUR", DoubleType(), True),
        StructField("BTC_INR", DoubleType(), True),
        StructField("BTC_USD", DoubleType(), True),
        StructField("ETH_EUR", DoubleType(), True),
        StructField("ETH_INR", DoubleType(), True),
        StructField("ETH_USD", DoubleType(), True),
        StructField("timestamp", TimestampType(), True)
    ])

    kafka_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "192.168.1.16:9092") \
        .option("subscribe", "crypto") \
        .option("startingOffsets", "earliest") \
        .load()

    parsed_df = kafka_df.selectExpr(
        "CAST(value AS STRING) AS value",
        "timestamp"
    ).withColumn("parsed_json", from_json(col("value"), schema)) \
        .select(
        col("parsed_json.BTC_EUR").alias("BTC_EUR"),
        col("parsed_json.BTC_INR").alias("BTC_INR"),
        col("parsed_json.BTC_USD").alias("BTC_USD"),
        col("parsed_json.ETH_EUR").alias("ETH_EUR"),
        col("parsed_json.ETH_INR").alias("ETH_INR"),
        col("parsed_json.ETH_USD").alias("ETH_USD"),
        col("parsed_json.timestamp").alias("event_time"),
        col("timestamp").alias("processing_time")
    )

    windowed_df = parsed_df \
        .withWatermark("event_time", "1 minute") \
        .groupBy(window(col("event_time"), "5 minutes", "5 minutes")) \
        .agg(
        max("BTC_EUR").alias("max_BTC_EUR"),
        min("BTC_EUR").alias("min_BTC_EUR"),
        avg("BTC_EUR").alias("avg_BTC_EUR"),
        max("BTC_INR").alias("max_BTC_INR"),
        min("BTC_INR").alias("min_BTC_INR"),
        avg("BTC_INR").alias("avg_BTC_INR"),
        max("BTC_USD").alias("max_BTC_USD"),
        min("BTC_USD").alias("min_BTC_USD"),
        avg("BTC_USD").alias("avg_BTC_USD"),
        max("ETH_EUR").alias("max_ETH_EUR"),
        min("ETH_EUR").alias("min_ETH_EUR"),
        avg("ETH_EUR").alias("avg_ETH_EUR"),
        max("ETH_INR").alias("max_ETH_INR"),
        min("ETH_INR").alias("min_ETH_INR"),
        avg("ETH_INR").alias("avg_ETH_INR"),
        max("ETH_USD").alias("max_ETH_USD"),
        min("ETH_USD").alias("min_ETH_USD"),
        avg("ETH_USD").alias("avg_ETH_USD")
    ).withColumn("window_start", col("window.start")) \
        .withColumn("window_end", col("window.end")) \
        .drop("window") \
        .select("window_start",
                "window_end",
                "max_BTC_EUR",
                "min_BTC_EUR",
                "avg_BTC_EUR",
                "max_BTC_INR",
                "min_BTC_INR",
                "avg_BTC_INR",
                "max_BTC_USD",
                "min_BTC_USD",
                "avg_BTC_USD",
                "max_ETH_EUR",
                "min_ETH_EUR",
                "avg_ETH_EUR",
                "max_ETH_INR",
                "min_ETH_INR",
                "avg_ETH_INR",
                "max_ETH_USD",
                "min_ETH_USD",
                "avg_ETH_USD")

    # Create the table if it doesn't exist
    spark.sql("""
        CREATE TABLE IF NOT EXISTS db.crypto_metrics4 (
            window_start TIMESTAMP,
            window_end TIMESTAMP,
            max_BTC_EUR DOUBLE,
            min_BTC_EUR DOUBLE,
            avg_BTC_EUR DOUBLE,
            max_BTC_INR DOUBLE,
            min_BTC_INR DOUBLE,
            avg_BTC_INR DOUBLE,
            max_BTC_USD DOUBLE,
            min_BTC_USD DOUBLE,
            avg_BTC_USD DOUBLE,
            max_ETH_EUR DOUBLE,
            min_ETH_EUR DOUBLE,
            avg_ETH_EUR DOUBLE,
            max_ETH_INR DOUBLE,
            min_ETH_INR DOUBLE,
            avg_ETH_INR DOUBLE,
            max_ETH_USD DOUBLE,
            min_ETH_USD DOUBLE,
            avg_ETH_USD DOUBLE
        ) USING iceberg
    """)

    print(spark.sql("SHOW TABLES IN rest.db").show())
    print(windowed_df.printSchema())
    # Write stream data to Iceberg
    query = windowed_df.writeStream \
        .outputMode("complete") \
        .format("iceberg") \
        .option("path", "s3://warehouse/crypto_metrics4") \
        .option("checkpointLocation", "/tmp/spark/checkpoints/crypto_metrics4") \
        .trigger(processingTime="1 minute") \
        .start()

    query.awaitTermination()

And I am submitting the spark job along with configurations as follows

spark-submit \
    --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.4 \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.rest=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.rest.type=rest \
    --conf spark.sql.catalog.rest.uri=http://rest:8181 \
    --conf spark.sql.catalog.rest=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.rest.warehouse=s3://warehouse \
    --conf spark.sql.defaultCatalog=rest \
    test.py

Error

py4j.protocol.Py4JJavaError: An error occurred while calling o181.start.
: org.apache.iceberg.exceptions.RuntimeIOException: Failed to get file system for path: s3://warehouse/crypto_metrics4/metadata/version-hint.text
        at org.apache.iceberg.hadoop.Util.getFs(Util.java:58)
        at org.apache.iceberg.hadoop.HadoopTableOperations.getFileSystem(HadoopTableOperations.java:412)
        at org.apache.iceberg.hadoop.HadoopTableOperations.findVersion(HadoopTableOperations.java:315)
        at org.apache.iceberg.hadoop.HadoopTableOperations.refresh(HadoopTableOperations.java:104)
        at org.apache.iceberg.hadoop.HadoopTableOperations.current(HadoopTableOperations.java:84)
        at org.apache.iceberg.hadoop.HadoopTables.load(HadoopTables.java:94)
        at org.apache.iceberg.spark.SparkCatalog.loadFromPathIdentifier(SparkCatalog.java:972)
        at org.apache.iceberg.spark.SparkCatalog.load(SparkCatalog.java:839)
        at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:170)
        at org.apache.iceberg.spark.source.IcebergSource.getTable(IcebergSource.java:107)
        at org.apache.iceberg.spark.source.IcebergSource.inferPartitioning(IcebergSource.java:90)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:82)
        at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:399)
        at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:251)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
        at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
        at org.apache.iceberg.hadoop.Util.getFs(Util.java:56)
        ... 25 more

What am I possibly doing wrong?

@rohitanil rohitanil added the question Further information is requested label Dec 21, 2024
@stym06
Copy link

stym06 commented Dec 21, 2024

This is possibly due to hadoop-aws module not available in classpath that contains hdfs Implementation classes.

you can import https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws/3.4.1

@MonkeyCanCode
Copy link

@rohitanil as commented by @stym06 , you are missing couple things and one of them is aws jar. If u want to load hadoop-aws jars, you will need to load couple additional dependencies jars as well. Alternative, you can use iceberg-aws-bundle jar instead. This is documented in https://iceberg.apache.org/docs/1.7.1/aws/#spark

Other than this, you will need couple additional properties so Spark will know how to talk to S3, such as following:

spark.hadoop.fs.s3.impl org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
# If u are planning to pass in key and secret
spark.hadoop.fs.s3a.access.key=ACCESSKEY
spark.hadoop.fs.s3a.secret.key=SECRETKEY
## if u are planning to use assume role, then set the next one and remove key/secret above
## spark.hadoop.fs.s3a.aws.credentials.provider org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants