Skip to content

Commit

Permalink
SNOW-1693064 Compile with Scala 2.13 (#167)
Browse files Browse the repository at this point in the history
* change doc style

* fix all scala 2.13 changes

* support scala 2.13

* add test

* update test

* fix test

* fix udf test

* fix test
  • Loading branch information
sfc-gh-bli authored Oct 2, 2024
1 parent 030b86a commit f51773f
Show file tree
Hide file tree
Showing 84 changed files with 13,152 additions and 11,906 deletions.
24 changes: 24 additions & 0 deletions .github/workflows/precommit-multiple-sdk.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: precommit test - Compile with Multiple SDK
on:
push:
branches: [ main ]
pull_request:
branches: '**'

jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup JDK
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 8
- name: Decrypt profile.properties
run: .github/scripts/decrypt_profile.sh
env:
PROFILE_PASSWORD: ${{ secrets.PROFILE_PASSWORD }}
- name: Run test
run: sbt clean +compile
1 change: 1 addition & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ align = none
align.openParenDefnSite = false
align.openParenCallSite = false
align.tokens = []
docstrings.style = Asterisk
optIn = {
configStyleArguments = false
}
Expand Down
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ lazy val root = (project in file("."))
name := snowparkName,
version := "1.15.0-SNAPSHOT",
scalaVersion := sys.props.getOrElse("SCALA_VERSION", default = "2.12.18"),
crossScalaVersions := Seq("2.12.18", "2.13.15"),
organization := "com.snowflake",
javaOptions ++= Seq("-source", "1.8", "-target", "1.8"),
licenses := Seq("The Apache Software License, Version 2.0" ->
Expand Down Expand Up @@ -58,7 +59,7 @@ lazy val root = (project in file("."))
// "junit" % "juint" % "4.13.1" % Test,
"com.github.sbt" % "junit-interface" % "0.13.3" % Test,
"org.mockito" % "mockito-core" % "2.23.0" % Test,
"org.scalatest" %% "scalatest" % "3.0.5" % Test,
"org.scalatest" %% "scalatest" % "3.2.19" % Test,
),
scalafmtOnCompile := true,
javafmtOnCompile := true,
Expand Down
260 changes: 135 additions & 125 deletions src/main/scala/com/snowflake/snowpark/AsyncJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,138 +4,146 @@ import com.snowflake.snowpark.internal.{CloseableIterator, ErrorMessage}
import com.snowflake.snowpark.internal.analyzer.SnowflakePlan
import scala.reflect.runtime.universe.{TypeTag, typeOf}

/** Provides a way to track an asynchronous query in Snowflake.
*
* You can use this object to check the status of an asynchronous query and retrieve the results.
*
* To check the status of an asynchronous query that you submitted earlier, call
* [[Session.createAsyncJob]], and pass in the query ID. This returns an `AsyncJob` object that you
* can use to check the status of the query and retrieve the query results.
*
* Example 1: Create an AsyncJob by specifying a valid `<query_id>`, check whether the query is
* running or not, and get the result rows.
* {{{
* val asyncJob = session.createAsyncJob(<query_id>)
* println(s"Is query \${asyncJob.getQueryId()} running? \${asyncJob.isRunning()}")
* val rows = asyncJob.getRows()
* }}}
*
* Example 2: Create an AsyncJob by specifying a valid `<query_id>` and cancel the query if it is
* still running.
* {{{
* session.createAsyncJob(<query_id>).cancel()
* }}}
*
* @since 0.11.0
*/
/**
* Provides a way to track an asynchronous query in Snowflake.
*
* You can use this object to check the status of an asynchronous query and retrieve the results.
*
* To check the status of an asynchronous query that you submitted earlier, call
* [[Session.createAsyncJob]], and pass in the query ID. This returns an `AsyncJob` object that you
* can use to check the status of the query and retrieve the query results.
*
* Example 1: Create an AsyncJob by specifying a valid `<query_id>`, check whether the query is
* running or not, and get the result rows.
* {{{
* val asyncJob = session.createAsyncJob(<query_id>)
* println(s"Is query \${asyncJob.getQueryId()} running? \${asyncJob.isRunning()}")
* val rows = asyncJob.getRows()
* }}}
*
* Example 2: Create an AsyncJob by specifying a valid `<query_id>` and cancel the query if it is
* still running.
* {{{
* session.createAsyncJob(<query_id>).cancel()
* }}}
*
* @since 0.11.0
*/
class AsyncJob private[snowpark] (queryID: String, session: Session, plan: Option[SnowflakePlan]) {

/** Get the query ID for the underlying query.
*
* @since 0.11.0
* @return
* a query ID
*/
/**
* Get the query ID for the underlying query.
*
* @since 0.11.0
* @return
* a query ID
*/
def getQueryId(): String = queryID

/** Returns an iterator of [[Row]] objects that you can use to retrieve the results for the
* underlying query.
*
* Unlike the [[getRows]] method, this method does not load all data into memory at once.
*
* @since 0.11.0
* @param maxWaitTimeInSeconds
* The maximum number of seconds to wait for the query to complete before attempting to
* retrieve the results. The default value is the value of the
* `snowpark_request_timeout_in_seconds` configuration property.
* @return
* An Iterator of [[Row]] objects
*/
/**
* Returns an iterator of [[Row]] objects that you can use to retrieve the results for the
* underlying query.
*
* Unlike the [[getRows]] method, this method does not load all data into memory at once.
*
* @since 0.11.0
* @param maxWaitTimeInSeconds
* The maximum number of seconds to wait for the query to complete before attempting to retrieve
* the results. The default value is the value of the `snowpark_request_timeout_in_seconds`
* configuration property.
* @return
* An Iterator of [[Row]] objects
*/
def getIterator(maxWaitTimeInSeconds: Int = session.requestTimeoutInSeconds): Iterator[Row] =
session.conn.getAsyncResult(queryID, maxWaitTimeInSeconds, plan)._1

/** Returns an Array of [[Row]] objects that represent the results of the underlying query.
*
* @since 0.11.0
* @param maxWaitTimeInSeconds
* The maximum number of seconds to wait for the query to complete before attempting to
* retrieve the results. The default value is the value of the
* `snowpark_request_timeout_in_seconds` configuration property.
* @return
* An Array of [[Row]] objects
*/
/**
* Returns an Array of [[Row]] objects that represent the results of the underlying query.
*
* @since 0.11.0
* @param maxWaitTimeInSeconds
* The maximum number of seconds to wait for the query to complete before attempting to retrieve
* the results. The default value is the value of the `snowpark_request_timeout_in_seconds`
* configuration property.
* @return
* An Array of [[Row]] objects
*/
def getRows(maxWaitTimeInSeconds: Int = session.requestTimeoutInSeconds): Array[Row] =
getIterator(maxWaitTimeInSeconds).toArray

/** Returns true if the underlying query completed.
*
* Completion may be due to query success, cancellation or failure, in all of these cases, this
* method will return true.
*
* @since 0.11.0
* @return
* true if this query completed.
*/
/**
* Returns true if the underlying query completed.
*
* Completion may be due to query success, cancellation or failure, in all of these cases, this
* method will return true.
*
* @since 0.11.0
* @return
* true if this query completed.
*/
def isDone(): Boolean = session.conn.isDone(queryID)

/** Cancel the underlying query if it is running.
*
* @since 0.11.0
*/
/**
* Cancel the underlying query if it is running.
*
* @since 0.11.0
*/
def cancel(): Unit = session.conn.runQuery(s"SELECT SYSTEM$$CANCEL_QUERY('$queryID')")
}

/** Provides a way to track an asynchronously executed action in a DataFrame.
*
* To get the result of the action (e.g. the number of results from a `count()` action or an Array
* of [[Row]] objects from the `collect()` action), call the [[getResult]] method.
*
* To perform an action on a DataFrame asynchronously, call an action method on the
* [[DataFrameAsyncActor]] object returned by [[DataFrame.async]]. For example:
* {{{
* val asyncJob1 = df.async.collect()
* val asyncJob2 = df.async.toLocalIterator()
* val asyncJob3 = df.async.count()
* }}}
* Each of these methods returns a TypedAsyncJob object that you can use to get the results of the
* action.
*
* @since 0.11.0
*/
/**
* Provides a way to track an asynchronously executed action in a DataFrame.
*
* To get the result of the action (e.g. the number of results from a `count()` action or an Array
* of [[Row]] objects from the `collect()` action), call the [[getResult]] method.
*
* To perform an action on a DataFrame asynchronously, call an action method on the
* [[DataFrameAsyncActor]] object returned by [[DataFrame.async]]. For example:
* {{{
* val asyncJob1 = df.async.collect()
* val asyncJob2 = df.async.toLocalIterator()
* val asyncJob3 = df.async.count()
* }}}
* Each of these methods returns a TypedAsyncJob object that you can use to get the results of the
* action.
*
* @since 0.11.0
*/
class TypedAsyncJob[T: TypeTag] private[snowpark] (
queryID: String,
session: Session,
plan: Option[SnowflakePlan])
extends AsyncJob(queryID, session, plan) {

/** Returns the result for the specific DataFrame action.
*
* Example 1: Create a TypedAsyncJob by asynchronously executing a DataFrame action `collect()`,
* check whether the job is running or not, and get the action result with [[getResult]]. NOTE:
* The returned type for [[getResult]] in this example is `Array[Row]`.
* {{{
* val df = session.table("t1")
* val asyncJob = df.async.collect()
* println(s"Is query \${asyncJob.getQueryId()} running? \${asyncJob.isRunning()}")
* val rowResult = asyncJob.getResult()
* }}}
*
* Example 2: Create a TypedAsyncJob by asynchronously executing a DataFrame action count() and
* get the action result with [[getResult]]. NOTE: The returned type for [[getResult]] in this
* example is `Long`.
* {{{
* val asyncJob = df.async.count()
* val longResult = asyncJob.getResult()
* }}}
*
* @since 0.11.0
* @param maxWaitTimeInSeconds
* The maximum number of seconds to wait for the query to complete before attempting to
* retrieve the results. The default value is the value of the
* `snowpark_request_timeout_in_seconds` configuration property.
* @return
* The result for the specific action
*/
/**
* Returns the result for the specific DataFrame action.
*
* Example 1: Create a TypedAsyncJob by asynchronously executing a DataFrame action `collect()`,
* check whether the job is running or not, and get the action result with [[getResult]]. NOTE:
* The returned type for [[getResult]] in this example is `Array[Row]`.
* {{{
* val df = session.table("t1")
* val asyncJob = df.async.collect()
* println(s"Is query \${asyncJob.getQueryId()} running? \${asyncJob.isRunning()}")
* val rowResult = asyncJob.getResult()
* }}}
*
* Example 2: Create a TypedAsyncJob by asynchronously executing a DataFrame action count() and
* get the action result with [[getResult]]. NOTE: The returned type for [[getResult]] in this
* example is `Long`.
* {{{
* val asyncJob = df.async.count()
* val longResult = asyncJob.getResult()
* }}}
*
* @since 0.11.0
* @param maxWaitTimeInSeconds
* The maximum number of seconds to wait for the query to complete before attempting to retrieve
* the results. The default value is the value of the `snowpark_request_timeout_in_seconds`
* configuration property.
* @return
* The result for the specific action
*/
def getResult(maxWaitTimeInSeconds: Int = session.requestTimeoutInSeconds): T = {
val tpe = typeOf[T]
tpe match {
Expand Down Expand Up @@ -181,27 +189,29 @@ class TypedAsyncJob[T: TypeTag] private[snowpark] (
}
}

/** Provides a way to track an asynchronously executed action in a MergeBuilder.
*
* @since 1.3.0
*/
/**
* Provides a way to track an asynchronously executed action in a MergeBuilder.
*
* @since 1.3.0
*/
class MergeTypedAsyncJob private[snowpark] (
queryID: String,
session: Session,
plan: Option[SnowflakePlan],
mergeBuilder: MergeBuilder)
extends TypedAsyncJob[MergeResult](queryID, session, plan) {

/** Returns the MergeResult for the MergeBuilder's action
*
* @since 1.3.0
* @param maxWaitTimeInSeconds
* The maximum number of seconds to wait for the query to complete before attempting to
* retrieve the results. The default value is the value of the
* `snowpark_request_timeout_in_seconds` configuration property.
* @return
* The [[MergeResult]]
*/
/**
* Returns the MergeResult for the MergeBuilder's action
*
* @since 1.3.0
* @param maxWaitTimeInSeconds
* The maximum number of seconds to wait for the query to complete before attempting to retrieve
* the results. The default value is the value of the `snowpark_request_timeout_in_seconds`
* configuration property.
* @return
* The [[MergeResult]]
*/
override def getResult(maxWaitTimeInSeconds: Int = session.requestTimeoutInSeconds): MergeResult =
MergeBuilder.getMergeResult(getRows(maxWaitTimeInSeconds), mergeBuilder)
}
Loading

0 comments on commit f51773f

Please sign in to comment.