You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have a Spark Structured Streaming application where I'd like to write streaming data to HBase using SHC.
Does anyone know a solution or way/workaround to still use the SHC for writing structured streaming data to HBase?
Thanks in advance!
class HBaseSink(options: Map[String, String]) extends Sink with Logging {
// String with HBaseTableCatalog.tableCatalog
private val hBaseCatalog = options.get("hbasecat").map(_.toString).getOrElse("")
class HBaseSinkProvider extends StreamSinkProvider with DataSourceRegister {
def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new HBaseSink(parameters)
}
def shortName(): String = "hbase"
}`
When running the application I'm getting the following message:
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Queries with streaming sources must be executed with writeStream.start();; at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:38) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:381)
The text was updated successfully, but these errors were encountered:
PrasadKumar716
changed the title
gettng error like Queries with streaming sources must be executed with writeStream.start() in SHC with Spark Structured Streaming
Getting Exception like "Queries with streaming sources must be executed with writeStream.start()" in SHC Spark Structured Streaming
Dec 15, 2022
I have a Spark Structured Streaming application where I'd like to write streaming data to HBase using SHC.
Does anyone know a solution or way/workaround to still use the SHC for writing structured streaming data to HBase?
Thanks in advance!
`val rowsdf = spark
.readStream
.format('kafka')
.option('kafka.bootstrap.servers', brokers)
.option('subscribe', topic)
.option('group.id', group_id)
.option('maxOffsetsPerTrigger', 1000)
.option("startingOffsets", "earliest")
.load()
rowsdf.printSchema()
catalog = '''
{
"table":
{
"namespace": "default",
"name": "changes",
"tableCoder": "PrimitiveType"
},
"rowkey": "consumer_id",
"columns":
{
"consumer_id": {"cf": "rowkey", "col": "consumer_id", "type": "string"},
"audit_system_id": {"cf": "d", "col": "audit_system_id", "type": "string"},
"object_path": {"cf": "d", "col": "object_path", "type": "string"},
"object_type": {"cf": "d", "col": "object_type", "type": "string"},
"what_action": {"cf": "d", "col": "what_action", "type": "string"},
"when": {"cf": "t", "col": "when", "type": "bigint"},
"where": {"cf": "d", "col": "where", "type": "string"},
"who": {"cf": "d", "col": "who", "type": "string"},
"workstation": {"cf": "d", "col": "workstation", "type": "string"}
}
}'''
HBaseSinkProvider
`package org.apache.spark.sql.execution.datasources.hbase
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.execution.datasources.hbase._
class HBaseSink(options: Map[String, String]) extends Sink with Logging {
// String with HBaseTableCatalog.tableCatalog
private val hBaseCatalog = options.get("hbasecat").map(_.toString).getOrElse("")
override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
val df = data.sparkSession.createDataFrame(data.rdd, data.schema)
df.write
.options(Map(HBaseTableCatalog.tableCatalog->hBaseCatalog,
HBaseTableCatalog.newTable -> "5"))
.format("org.apache.spark.sql.execution.datasources.hbase").save()
}
}
class HBaseSinkProvider extends StreamSinkProvider with DataSourceRegister {
def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new HBaseSink(parameters)
}
def shortName(): String = "hbase"
}`
When running the application I'm getting the following message:
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Queries with streaming sources must be executed with writeStream.start();; at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:38) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:381)
The text was updated successfully, but these errors were encountered: