Skip to content

Commit

Permalink
Implement FlintJob Logic for EMR-S (opensearch-project#52)
Browse files Browse the repository at this point in the history
* Implement FlintJob Logic for EMR-S

This commit introduces FlintJob logic for EMR-S, mirroring the existing SQLJob implementation for EMR cluster. The key differences in FlintJob are:
1. It reads OpenSearch host information from spark command parameters.
2. It ensures the existence of a result index with the correct mapping in OpenSearch, creating it if necessary. This process occurs in parallel to SQL query execution.
3. It reports an error if the result index mapping is incorrect.
4. It saves a failure status if the SQL execution fails.

Testing:
1. Manual testing was conducted using the EMR-S CLI.
2. New unit tests were added to verify the functionality.

Signed-off-by: Kaituo Li <[email protected]>

* address comments

Signed-off-by: Kaituo Li <[email protected]>

---------

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo authored Oct 4, 2023
1 parent 6d87b81 commit 71d67a0
Show file tree
Hide file tree
Showing 5 changed files with 547 additions and 7 deletions.
1 change: 1 addition & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
version = 2.7.5
29 changes: 27 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,37 @@ lazy val standaloneCosmetic = project
Compile / packageBin := (flintSparkIntegration / assembly).value)

lazy val sparkSqlApplication = (project in file("spark-sql-application"))
// dependency will be provided at runtime, so it doesn't need to be included in the assembled JAR
.dependsOn(flintSparkIntegration % "provided")
.settings(
commonSettings,
name := "sql-job",
scalaVersion := scala212,
libraryDependencies ++= Seq("org.scalatest" %% "scalatest" % "3.2.15" % "test"),
libraryDependencies ++= deps(sparkVersion))
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "3.2.15" % "test"),
libraryDependencies ++= deps(sparkVersion),
libraryDependencies += "com.typesafe.play" %% "play-json" % "2.9.2",
// Assembly settings
// the sbt assembly plugin found multiple copies of the module-info.class file with
// different contents in the jars that it was merging flintCore dependencies.
// This can happen if you have multiple dependencies that include the same library,
// but with different versions.
assemblyPackageScala / assembleArtifact := false,
assembly / assemblyOption ~= {
_.withIncludeScala(false)
},
assembly / assemblyMergeStrategy := {
case PathList(ps@_*) if ps.last endsWith ("module-info.class") =>
MergeStrategy.discard
case PathList("module-info.class") => MergeStrategy.discard
case PathList("META-INF", "versions", xs@_, "module-info.class") =>
MergeStrategy.discard
case x =>
val oldStrategy = (assembly / assemblyMergeStrategy).value
oldStrategy(x)
},
assembly / test := (Test / test).value
)

lazy val sparkSqlApplicationCosmetic = project
.settings(
Expand Down
86 changes: 81 additions & 5 deletions spark-sql-application/README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
# Spark SQL Application

This application execute sql query and store the result in OpenSearch index in following format
We have two applications: SQLJob and FlintJob.

SQLJob is designed for EMR Spark, executing SQL queries and storing the results in the OpenSearch index in the following format:
```
"stepId":"<emr-step-id>",
"applicationId":"<spark-application-id>"
"applicationId":"<spark-application-id>",
"schema": "json blob",
"result": "json blob"
```

FlintJob is designed for EMR Serverless Spark, executing SQL queries and storing the results in the OpenSearch index in the following format:

```
"jobRunId":"<emrs-job-id>",
"applicationId":"<spark-application-id>",
"schema": "json blob",
"result": "json blob",
"dataSourceName":"<opensearch-data-source-name>"
```

## Prerequisites

+ Spark 3.3.1
Expand All @@ -16,8 +28,9 @@ This application execute sql query and store the result in OpenSearch index in f

## Usage

To use this application, you can run Spark with Flint extension:
To use these applications, you can run Spark with Flint extension:

SQLJob
```
./bin/spark-submit \
--class org.opensearch.sql.SQLJob \
Expand All @@ -32,11 +45,41 @@ To use this application, you can run Spark with Flint extension:
<opensearch-region> \
```

FlintJob
```
aws emr-serverless start-job-run \
--region <region-name> \
--application-id <application-id> \
--execution-role-arn <execution-role> \
--job-driver '{"sparkSubmit": {"entryPoint": "<flint-job-s3-path>", \
"entryPointArguments":["'<sql-query>'", "<result-index>", "<data-source-name>"], \
"sparkSubmitParameters":"--class org.opensearch.sql.FlintJob \
--conf spark.hadoop.fs.s3.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider \
--conf spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=<role-to-access-s3-and-opensearch> \
--conf spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=<role-to-access-s3-and-opensearch> \
--conf spark.hadoop.aws.catalog.credentials.provider.factory.class=com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory \
--conf spark.hive.metastore.glue.role.arn=<role-to-access-s3-and-opensearch> \
--conf spark.jars=<path-to-AWSGlueDataCatalogHiveMetaStoreAuth-jar> \
--conf spark.jars.packages=<flint-spark-integration-jar-name> \
--conf spark.jars.repositories=<path-to-download_spark-integration-jar> \
--conf spark.emr-serverless.driverEnv.JAVA_HOME=<java-home-in-emr-serverless-host> \
--conf spark.executorEnv.JAVA_HOME=<java-home-in-emr-serverless-host> \
--conf spark.datasource.flint.host=<opensearch-url> \
--conf spark.datasource.flint.port=<opensearch-port> \
--conf spark.datasource.flint.scheme=<http-or-https> \
--conf spark.datasource.flint.auth=<auth-type> \
--conf spark.datasource.flint.region=<region-name> \
--conf spark.datasource.flint.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider \
--conf spark.sql.extensions=org.opensearch.flint.spark.FlintSparkExtensions \
--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory "}}'
<data-source-name>
```

## Result Specifications

Following example shows how the result is written to OpenSearch index after query execution.

Let's assume sql query result is
Let's assume SQL query result is
```
+------+------+
|Letter|Number|
Expand All @@ -46,7 +89,7 @@ Let's assume sql query result is
|C |3 |
+------+------+
```
OpenSearch index document will look like
For SQLJob, OpenSearch index document will look like
```json
{
"_index" : ".query_execution_result",
Expand All @@ -68,6 +111,31 @@ OpenSearch index document will look like
}
```

For FlintJob, OpenSearch index document will look like
```json
{
"_index" : ".query_execution_result",
"_id" : "A2WOsYgBMUoqCqlDJHrn",
"_score" : 1.0,
"_source" : {
"result" : [
"{'Letter':'A','Number':1}",
"{'Letter':'B','Number':2}",
"{'Letter':'C','Number':3}"
],
"schema" : [
"{'column_name':'Letter','data_type':'string'}",
"{'column_name':'Number','data_type':'integer'}"
],
"jobRunId" : "s-JZSB1139WIVU",
"applicationId" : "application_1687726870985_0003",
"dataSourceName": "myS3Glue",
"status": "SUCCESS",
"error": ""
}
}
```

## Build

To build and run this application with Spark, you can run:
Expand All @@ -76,6 +144,8 @@ To build and run this application with Spark, you can run:
sbt clean sparkSqlApplicationCosmetic/publishM2
```

The jar file is located at `spark-sql-application/target/scala-2.12` folder.

## Test

To run tests, you can use:
Expand All @@ -92,6 +162,12 @@ To check code with scalastyle, you can run:
sbt scalastyle
```

To check code with scalastyle, you can run:

```
sbt testScalastyle
```

## Code of Conduct

This project has adopted an [Open Source Code of Conduct](../CODE_OF_CONDUCT.md).
Expand Down
Loading

0 comments on commit 71d67a0

Please sign in to comment.