Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document how to override default Java Driver configuration #141

Open
msmygit opened this issue May 4, 2023 · 1 comment
Open

Document how to override default Java Driver configuration #141

msmygit opened this issue May 4, 2023 · 1 comment
Labels
documentation Improvements or additions to documentation help wanted Extra attention is needed

Comments

@msmygit
Copy link
Collaborator

msmygit commented May 4, 2023

This is for 3.3.0_stable branch but, would be helpful if we could clarify the same in main branch too.

Today, there is no manual/documentation which explains how to pass in Java Driver configuration to override parameters such as for example request timeouts, heartbeat timeouts, etc.,

Original Error

Expand/Collapse to view stacktrace 1
23/05/03 11:50:06 ERROR DiffJobSession: Could not perform diff for Key: 10061674 %% 1 %% ABCPTSQDBRRLCKJ07
java.util.concurrent.ExecutionException: com.datastax.oss.driver.api.core.connection.HeartbeatException
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at datastax.astra.migrate.DiffJobSession.diffAndClear(DiffJobSession.java:107)
        at datastax.astra.migrate.DiffJobSession.lambda$getDataAndDiff$0(DiffJobSession.java:85)
        at java.util.Iterator.forEachRemaining(Iterator.java:116)
        at com.datastax.oss.driver.internal.core.cql.PagingIterableSpliterator.forEachRemaining(PagingIterableSpliterator.java:118)
        at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
        at datastax.astra.migrate.DiffJobSession.getDataAndDiff(DiffJobSession.java:69)
        at datastax.astra.migrate.DiffData$.$anonfun$diffTable$3(DiffData.scala:28)
        at datastax.astra.migrate.DiffData$.$anonfun$diffTable$3$adapted(DiffData.scala:26)
        at com.datastax.spark.connector.cql.CassandraConnector.$anonfun$withSessionDo$1(CassandraConnector.scala:104)
        at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:121)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:103)
        at datastax.astra.migrate.DiffData$.$anonfun$diffTable$2(DiffData.scala:26)
        at datastax.astra.migrate.DiffData$.$anonfun$diffTable$2$adapted(DiffData.scala:25)
        at com.datastax.spark.connector.cql.CassandraConnector.$anonfun$withSessionDo$1(CassandraConnector.scala:104)
        at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:121)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:103)
        at datastax.astra.migrate.DiffData$.$anonfun$diffTable$1(DiffData.scala:25)
        at datastax.astra.migrate.DiffData$.$anonfun$diffTable$1$adapted(DiffData.scala:24)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1003)
        at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1003)
        at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.oss.driver.api.core.connection.HeartbeatException
        at com.datastax.oss.driver.internal.core.channel.HeartbeatHandler$HeartbeatRequest.fail(HeartbeatHandler.java:109)
        at com.datastax.oss.driver.internal.core.channel.ChannelHandlerRequest.fail(ChannelHandlerRequest.java:62)
        at com.datastax.oss.driver.internal.core.channel.ChannelHandlerRequest.onTimeout(ChannelHandlerRequest.java:108)
        at com.datastax.oss.driver.shaded.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
        at com.datastax.oss.driver.shaded.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
        at com.datastax.oss.driver.shaded.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at com.datastax.oss.driver.shaded.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at com.datastax.oss.driver.shaded.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
        at com.datastax.oss.driver.shaded.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at com.datastax.oss.driver.shaded.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at com.datastax.oss.driver.shaded.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        ... 1 more
Caused by: com.datastax.oss.driver.api.core.DriverTimeoutException: Heartbeat request: timed out after 5000 ms
        ... 10 more
Expand/Collapse to view stacktrace 2
23/05/02 13:16:35 ERROR CopyJobSession: Error occurred during Attempt#: 1
java.util.concurrent.ExecutionException: com.datastax.oss.driver.api.core.DriverTimeoutException: Query timed out after PT10S
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at datastax.astra.migrate.CopyJobSession.iterateAndClearWriteResults(CopyJobSession.java:197)
        at datastax.astra.migrate.CopyJobSession.getDataAndInsert(CopyJobSession.java:109)
        at datastax.astra.migrate.Migrate$.$anonfun$migrateTable$3(Migrate.scala:30)
        at datastax.astra.migrate.Migrate$.$anonfun$migrateTable$3$adapted(Migrate.scala:28)
        at com.datastax.spark.connector.cql.CassandraConnector.$anonfun$withSessionDo$1(CassandraConnector.scala:104)
        at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:121)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:103)
        at datastax.astra.migrate.Migrate$.$anonfun$migrateTable$2(Migrate.scala:28)
        at datastax.astra.migrate.Migrate$.$anonfun$migrateTable$2$adapted(Migrate.scala:27)
        at com.datastax.spark.connector.cql.CassandraConnector.$anonfun$withSessionDo$1(CassandraConnector.scala:104)
        at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:121)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:103)
        at datastax.astra.migrate.Migrate$.$anonfun$migrateTable$1(Migrate.scala:27)
        at datastax.astra.migrate.Migrate$.$anonfun$migrateTable$1$adapted(Migrate.scala:26)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1003)
        at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1003)
        at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.oss.driver.api.core.DriverTimeoutException: Query timed out after PT10S
        at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler.lambda$scheduleTimeout$1(CqlRequestHandler.java:206)
        at com.datastax.oss.driver.shaded.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
        at com.datastax.oss.driver.shaded.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
        at com.datastax.oss.driver.shaded.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
        at com.datastax.oss.driver.shaded.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        ... 1 more
23/05/02 13:16:35 ERROR CopyJobSession: Error with PartitionRange -- ThreadID: 123 Processing min: 3335171328526692640 max: 3337016002934063595 -- Attempt# 1
23/05/02 13:16:35 ERROR CopyJobSession: Error stats Read#: 47, Wrote#: 0, Skipped#: 0, Error#: 47

What options were tried

Option 1

Added spark.cassandra.connection.timeoutMS 90000 in cdm.properties. This did not incrase the timeout value.

Option 2

Added --conf datastax-java-driver.advanced.heartbeat.timeout="90 seconds" --conf datastax-java-driver.basic.request.timeout="60 seconds" to the ./spark-submit command and had no effect in changing the timeout.

Option 3

Attempted the below without any luck,

./spark-submit \
--files /path/to/application.conf \
--conf spark.cassandra.connection.config.profile.path=application.conf \
...

in this case, it would only consider properties from application.conf and ignores everything else in cdm.properties as we could confirm this based on the below error stack,

23/05/02 16:20:56 WARN CassandraConnectionFactory: Ignoring all programmatic configuration, only using configuration from application.conf

Option 4

Attempted to add --driver-java-options "-Ddriver.basic.request.timeout='60 seconds'" to the ./spark-submit command and it ended up with no luck too.

@msmygit msmygit added documentation Improvements or additions to documentation help wanted Extra attention is needed labels May 4, 2023
@yukim
Copy link
Contributor

yukim commented Jun 6, 2023

CDM uses SparkConf to create CassandraConnector, so spark.cassandra.* properties should work.
You need to use spark.cassandra.read.timeoutMS for request timeout though.
spark.cassandra.connection.timeoutMS sets connection timeout.

If you really need to set driver options precisely, your option would be to specify spark.cassandra.connection.config.profile.path, however, current CDM implementation shares SparkConf for both origin and target connections, I imagine this does not work as you already tested.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

2 participants