From b7c1d76ded1d6cafac54a427fa30ff9c003e1378 Mon Sep 17 00:00:00 2001 From: Alexey Kuzin Date: Wed, 1 Mar 2023 18:42:36 -0500 Subject: [PATCH 1/3] Add Serializable to config classes and special OptionalDataException handling --- .../spark/connector/config/ReadConfig.scala | 2 +- .../spark/connector/config/WriteConfig.scala | 2 +- .../connector/rdd/TarantoolWriteRDD.scala | 89 +++++++++++-------- 3 files changed, 52 insertions(+), 41 deletions(-) diff --git a/src/main/scala/io/tarantool/spark/connector/config/ReadConfig.scala b/src/main/scala/io/tarantool/spark/connector/config/ReadConfig.scala index dc28a1a..da4bc61 100644 --- a/src/main/scala/io/tarantool/spark/connector/config/ReadConfig.scala +++ b/src/main/scala/io/tarantool/spark/connector/config/ReadConfig.scala @@ -10,7 +10,7 @@ case class ReadConfig( batchSize: Int = ReadConfig.DEFAULT_BATCH_SIZE, partitioner: TarantoolPartitioner = new TarantoolSinglePartitioner(), conditions: Conditions = Conditions.any() -) { +) extends Serializable { def withConditions(conditions: Conditions): ReadConfig = copy(conditions = conditions) diff --git a/src/main/scala/io/tarantool/spark/connector/config/WriteConfig.scala b/src/main/scala/io/tarantool/spark/connector/config/WriteConfig.scala index 86d1b0e..ca5153f 100644 --- a/src/main/scala/io/tarantool/spark/connector/config/WriteConfig.scala +++ b/src/main/scala/io/tarantool/spark/connector/config/WriteConfig.scala @@ -10,7 +10,7 @@ case class WriteConfig( stopOnError: Boolean = true, rollbackOnError: Boolean = true, transformFieldNames: FieldNameTransformation = FieldNameTransformations.NONE -) {} +) extends Serializable object WriteConfig extends TarantoolConfigBase { diff --git a/src/main/scala/io/tarantool/spark/connector/rdd/TarantoolWriteRDD.scala b/src/main/scala/io/tarantool/spark/connector/rdd/TarantoolWriteRDD.scala index b597ac5..09ee9bf 100644 --- a/src/main/scala/io/tarantool/spark/connector/rdd/TarantoolWriteRDD.scala +++ b/src/main/scala/io/tarantool/spark/connector/rdd/TarantoolWriteRDD.scala @@ -15,7 +15,7 @@ import org.apache.spark.sql.tarantool.MapFunctions.rowToTuple import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.SparkContext -import java.io.{PrintWriter, StringWriter} +import java.io.{OptionalDataException, PrintWriter, StringWriter} import java.util.Collections import java.util.concurrent.CompletableFuture import java.util.concurrent.atomic.AtomicLong @@ -74,50 +74,61 @@ class TarantoolWriteRDD[R] private[spark] ( data: DataFrame, overwrite: Boolean ): Unit = - data.foreachPartition((partition: Iterator[Row]) => - if (partition.nonEmpty) { - val client = connection.client(globalConfig) - val spaceMetadata = client.metadata().getSpaceByName(space).get() - val tupleFactory = new DefaultTarantoolTupleFactory(messagePackMapper, spaceMetadata) + try { + data.foreachPartition((partition: Iterator[Row]) => + if (partition.nonEmpty) { + val client = connection.client(globalConfig) + val spaceMetadata = client.metadata().getSpaceByName(space).get() + val tupleFactory = new DefaultTarantoolTupleFactory(messagePackMapper, spaceMetadata) - val options: Either[ProxyReplaceManyOptions, ProxyInsertManyOptions] = if (overwrite) { - Left( - ProxyReplaceManyOptions - .create() - .withRollbackOnError(writeConfig.rollbackOnError) - .withStopOnError(writeConfig.stopOnError) - ) - } else { - Right( - ProxyInsertManyOptions - .create() - .withRollbackOnError(writeConfig.rollbackOnError) - .withStopOnError(writeConfig.stopOnError) - ) - } - val operation = options match { - case Left(options) => - (tuples: Iterable[TarantoolTuple]) => - client - .space(space) - .replaceMany(JavaConverters.seqAsJavaListConverter(tuples.toList).asJava, options) - case Right(options) => - (tuples: Iterable[TarantoolTuple]) => - client - .space(space) - .insertMany(JavaConverters.seqAsJavaListConverter(tuples.toList).asJava, options) - } + val options: Either[ProxyReplaceManyOptions, ProxyInsertManyOptions] = if (overwrite) { + Left( + ProxyReplaceManyOptions + .create() + .withRollbackOnError(writeConfig.rollbackOnError) + .withStopOnError(writeConfig.stopOnError) + ) + } else { + Right( + ProxyInsertManyOptions + .create() + .withRollbackOnError(writeConfig.rollbackOnError) + .withStopOnError(writeConfig.stopOnError) + ) + } + val operation = options match { + case Left(options) => + (tuples: Iterable[TarantoolTuple]) => + client + .space(space) + .replaceMany(JavaConverters.seqAsJavaListConverter(tuples.toList).asJava, options) + case Right(options) => + (tuples: Iterable[TarantoolTuple]) => + client + .space(space) + .insertMany(JavaConverters.seqAsJavaListConverter(tuples.toList).asJava, options) + } - val tupleStream: Iterator[TarantoolTuple] = - partition.map(row => rowToTuple(tupleFactory, row, writeConfig.transformFieldNames)) + val tupleStream: Iterator[TarantoolTuple] = + partition.map(row => rowToTuple(tupleFactory, row, writeConfig.transformFieldNames)) - if (writeConfig.stopOnError) { - writeSync(tupleStream, operation) + if (writeConfig.stopOnError) { + writeSync(tupleStream, operation) + } else { + writeAsync(tupleStream, operation) + } + } + ) + } catch { + case e: OptionalDataException => { + val message = if (e.length > 0 && e.eof == false) { + s"Deserialization error: Object expected, but primitive data found in the next ${e.length} bytes" } else { - writeAsync(tupleStream, operation) + "Deserialization error: Object expected, but EOF found" } + throw TarantoolSparkException(message, e) } - ) + } type AsyncTarantoolResult = CompletableFuture[TarantoolResult[TarantoolTuple]] From 8e6d28082c2287451bf1305ba2e94d8ae50fb1ca Mon Sep 17 00:00:00 2001 From: Alexey Kuzin Date: Mon, 13 Mar 2023 18:40:42 -0400 Subject: [PATCH 2/3] Move global mapper variable to the task closure context level Highly loaded and concurrent Spark environment produced some strange OptionalDataException errors when deserializing the write tasks. That appeared to be a problem with an object that is changed concurrently while being serialized, thus resulting in corrupted serialized data. Using some deduction and exclusion, it became clear that the object with such race condition is the mapper, which is apparently changed by different threads using the same RDD for different Tarantool spaces. With this patch it is not more possible to use a custom mapper, but the proper implementation of that feature needs now fixing the mapper race condition possibility in the driver and re-implementing passing the mapper without implicits. --- .../connector/rdd/TarantoolWriteRDD.scala | 61 +++++++++++-------- 1 file changed, 34 insertions(+), 27 deletions(-) diff --git a/src/main/scala/io/tarantool/spark/connector/rdd/TarantoolWriteRDD.scala b/src/main/scala/io/tarantool/spark/connector/rdd/TarantoolWriteRDD.scala index 09ee9bf..190df16 100644 --- a/src/main/scala/io/tarantool/spark/connector/rdd/TarantoolWriteRDD.scala +++ b/src/main/scala/io/tarantool/spark/connector/rdd/TarantoolWriteRDD.scala @@ -1,26 +1,23 @@ package io.tarantool.spark.connector.rdd +import io.tarantool.driver.api.TarantoolResult import io.tarantool.driver.api.conditions.Conditions +import io.tarantool.driver.api.space.options.proxy.{ProxyInsertManyOptions, ProxyReplaceManyOptions} import io.tarantool.driver.api.tuple.{DefaultTarantoolTupleFactory, TarantoolTuple} -import io.tarantool.driver.api.TarantoolResult -import io.tarantool.driver.api.space.options.proxy.ProxyInsertManyOptions -import io.tarantool.driver.api.space.options.proxy.ProxyReplaceManyOptions import io.tarantool.driver.mappers.MessagePackMapper import io.tarantool.driver.mappers.factories.DefaultMessagePackMapperFactory -import io.tarantool.spark.connector.{Logging, TarantoolSparkException} import io.tarantool.spark.connector.config.{TarantoolConfig, WriteConfig} import io.tarantool.spark.connector.connection.TarantoolConnection import io.tarantool.spark.connector.util.ScalaToJavaHelper.{toJavaBiFunction, toJavaFunction} +import io.tarantool.spark.connector.{Logging, TarantoolSparkException} +import org.apache.spark.SparkContext import org.apache.spark.sql.tarantool.MapFunctions.rowToTuple import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.SparkContext import java.io.{OptionalDataException, PrintWriter, StringWriter} -import java.util.Collections import java.util.concurrent.CompletableFuture import java.util.concurrent.atomic.AtomicLong -import java.util.{List => JList} -import java.util.{LinkedList => JLinkedList} +import java.util.{Collections, LinkedList => JLinkedList, List => JList} import scala.collection.JavaConversions.asScalaBuffer import scala.collection.JavaConverters import scala.collection.mutable.ListBuffer @@ -40,9 +37,7 @@ class TarantoolWriteRDD[R] private[spark] ( val space: String, val writeConfig: WriteConfig )( - implicit ct: ClassTag[R], - implicit val messagePackMapper: MessagePackMapper = - DefaultMessagePackMapperFactory.getInstance().defaultComplexTypesMapper() + implicit val ct: ClassTag[R] ) extends TarantoolBaseRDD with Serializable with Logging { @@ -75,10 +70,12 @@ class TarantoolWriteRDD[R] private[spark] ( overwrite: Boolean ): Unit = try { - data.foreachPartition((partition: Iterator[Row]) => + val func = (partition: Iterator[Row]) => if (partition.nonEmpty) { val client = connection.client(globalConfig) val spaceMetadata = client.metadata().getSpaceByName(space).get() + val messagePackMapper: MessagePackMapper = + DefaultMessagePackMapperFactory.getInstance().defaultComplexTypesMapper() val tupleFactory = new DefaultTarantoolTupleFactory(messagePackMapper, spaceMetadata) val options: Either[ProxyReplaceManyOptions, ProxyInsertManyOptions] = if (overwrite) { @@ -113,20 +110,27 @@ class TarantoolWriteRDD[R] private[spark] ( partition.map(row => rowToTuple(tupleFactory, row, writeConfig.transformFieldNames)) if (writeConfig.stopOnError) { - writeSync(tupleStream, operation) + writeSync(tupleStream, operation, messagePackMapper) } else { - writeAsync(tupleStream, operation) + writeAsync(tupleStream, operation, messagePackMapper) } } - ) + data.foreachPartition(func) } catch { - case e: OptionalDataException => { - val message = if (e.length > 0 && e.eof == false) { - s"Deserialization error: Object expected, but primitive data found in the next ${e.length} bytes" + case e: Throwable => { + var inner = e + while (Option(inner).isDefined && !inner.isInstanceOf[OptionalDataException]) inner = inner.getCause + if (Option(inner).isDefined) { + val exc: OptionalDataException = inner.asInstanceOf[OptionalDataException] + val message = if (exc.length > 0 && !exc.eof) { + s"Deserialization error: Object expected, but primitive data found in the next ${exc.length} bytes" + } else { + "Deserialization error: Object expected, but EOF found" + } + throw TarantoolSparkException(message, exc) } else { - "Deserialization error: Object expected, but EOF found" + throw e } - throw TarantoolSparkException(message, e) } } @@ -134,7 +138,8 @@ class TarantoolWriteRDD[R] private[spark] ( private def writeSync( tupleStream: Iterator[TarantoolTuple], - operation: Iterable[TarantoolTuple] => AsyncTarantoolResult + operation: Iterable[TarantoolTuple] => AsyncTarantoolResult, + messagePackMapper: MessagePackMapper ): Unit = { val rowCount: AtomicLong = new AtomicLong(0) val tuples: ListBuffer[TarantoolTuple] = ListBuffer() @@ -152,7 +157,7 @@ class TarantoolWriteRDD[R] private[spark] ( operation(batch) .thenApply(toJavaFunction { result: TarantoolResult[TarantoolTuple] => if (result.size != expectedCount) { - throw batchUnsuccessfulException(tuples) + throw batchUnsuccessfulException(tuples, messagePackMapper) } rowCount.getAndAdd(expectedCount) result @@ -180,7 +185,7 @@ class TarantoolWriteRDD[R] private[spark] ( operation(tuples) .thenApply(toJavaFunction { result: TarantoolResult[TarantoolTuple] => if (result.size != expectedCount) { - throw batchUnsuccessfulException(tuples) + throw batchUnsuccessfulException(tuples, messagePackMapper) } rowCount.getAndAdd(expectedCount) result @@ -210,7 +215,8 @@ class TarantoolWriteRDD[R] private[spark] ( private def writeAsync( tupleStream: Iterator[TarantoolTuple], - operation: Iterable[TarantoolTuple] => AsyncTarantoolResult + operation: Iterable[TarantoolTuple] => AsyncTarantoolResult, + messagePackMapper: MessagePackMapper ): Unit = { val rowCount: AtomicLong = new AtomicLong(0) val failedRowsExceptions: JList[Throwable] = @@ -230,7 +236,7 @@ class TarantoolWriteRDD[R] private[spark] ( }) .thenApply(toJavaFunction { result: TarantoolResult[TarantoolTuple] => if (result.size != expectedCount) { - val exception = batchUnsuccessfulException(tuples) + val exception = batchUnsuccessfulException(tuples, messagePackMapper) failedRowsExceptions.add(exception) throw exception } @@ -253,7 +259,7 @@ class TarantoolWriteRDD[R] private[spark] ( }) .thenApply(toJavaFunction { result: TarantoolResult[TarantoolTuple] => if (result.size != expectedCount) { - val exception = batchUnsuccessfulException(tuples) + val exception = batchUnsuccessfulException(tuples, messagePackMapper) failedRowsExceptions.add(exception) throw exception } @@ -302,7 +308,8 @@ class TarantoolWriteRDD[R] private[spark] ( } private def batchUnsuccessfulException( - tuples: ListBuffer[TarantoolTuple] + tuples: ListBuffer[TarantoolTuple], + messagePackMapper: MessagePackMapper ): TarantoolSparkException = { val batch = tuples .map(tuple => tuple.toMessagePackValue(messagePackMapper).toString) From 264578e2cc35e312c9bcbe7947afae202fc6db16 Mon Sep 17 00:00:00 2001 From: Alexey Kuzin Date: Mon, 13 Mar 2023 18:41:40 -0400 Subject: [PATCH 3/3] Fix SLF4J errors and absence of the test logs The latest netty and driver versions introduced a dependency conflict. --- build.sbt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index bd0cda9..e79b187 100644 --- a/build.sbt +++ b/build.sbt @@ -39,7 +39,6 @@ val commonDependencies = Seq( "org.scalatest" %% "scalatest" % "3.2.14" % Test, "org.scalamock" %% "scalamock" % "5.2.0" % Test, "com.dimafeng" %% "testcontainers-scala-scalatest" % "0.40.12" % Test, - "org.slf4j" % "slf4j-api" % "1.7.36" % Test, "ch.qos.logback" % "logback-core" % "1.2.5" % Test, "ch.qos.logback" % "logback-classic" % "1.2.5" % Test, "org.apache.derby" % "derby" % "10.11.1.1" % Test, @@ -50,6 +49,7 @@ val commonDependencies = Seq( .exclude("io.netty", "netty-handler") .exclude("io.netty", "netty-codec") .exclude("io.netty", "netty-codec-http") + .exclude("org.slf4j", "slf4j-api") ) lazy val root = (project in file(".")) @@ -70,7 +70,8 @@ lazy val root = (project in file(".")) "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.6.7.1", "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7.3", "com.fasterxml.jackson.core" % "jackson-core" % "2.6.7", - "io.netty" % "netty-all" % "4.1.70.Final" + "io.netty" % "netty-all" % "4.1.70.Final", + "org.slf4j" % "slf4j-api" % "1.7.36" % Test ), // Compiler options javacOptions ++= Seq(