Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bli committed Feb 2, 2024
1 parent ee7719f commit f15815a
Showing 1 changed file with 5 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,11 @@ class SparkConnectorContextSuite extends IntegrationSuiteBase {

// Stop the application, it will trigger the Application End event.
sparkSession.stop()
Thread.sleep(5000)
Thread.sleep(10000)

// no query can be retried after session closed
assert(SparkConnectorContext.closedApplicationIDs.contains(appId))
assert(!SparkConnectorContext.getRunningQueries.contains(appId))

var (message, queryText) = getQueryMessage(conn, queryID, sessionID)
var tryCount: Int = 0
Expand Down Expand Up @@ -276,43 +280,4 @@ class SparkConnectorContextSuite extends IntegrationSuiteBase {
Await.ready(f2, Duration.Inf)
}
}

test("Disable retry after application closed") {
val sc = sparkSession.sparkContext
val appId = sc.applicationId

val df = sparkSession.read.format(SNOWFLAKE_SOURCE_NAME)
.options(connectorOptionsNoTable)
.option(Parameters.PARAM_SUPPORT_SHARE_CONNECTION, "false")
.option("query", "select count(*) from table(generator(timelimit=>100))")
.load()

import scala.concurrent.ExecutionContext.Implicits.global
val f = Future {
df.collect()
}
Thread.sleep(10000)
var queries = SparkConnectorContext.getRunningQueries.get(appId)
assert(queries.isDefined)
assert(queries.get.size == 1)
sparkSession.stop()
Thread.sleep(10000)
queries = SparkConnectorContext.getRunningQueries.get(appId)
SparkConnectorContext.closedApplicationIDs.contains(appId)
assert(queries.isEmpty)

// Wait for child thread done to avoid affect other test cases.
Await.ready(f, Duration.Inf)

// Recreate spark session to avoid affect following test cases
sparkSession = SparkSession.builder
.master("local")
.appName("SnowflakeSourceSuite")
.config("spark.sql.shuffle.partitions", "6")
// "spark.sql.legacy.timeParserPolicy = LEGACY" is added to allow
// spark 3.0 to support legacy conversion for unix_timestamp().
// It may not be necessary for spark 2.X.
.config("spark.sql.legacy.timeParserPolicy", "LEGACY")
.getOrCreate()
}
}

0 comments on commit f15815a

Please sign in to comment.