Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1708660 Enable parquet by default #595

Merged
merged 5 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions src/it/scala/net/snowflake/spark/snowflake/ParquetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class ParquetSuite extends IntegrationSuiteBase {
val test_column_map_not_match: String = Random.alphanumeric.filter(_.isLetter).take(10).mkString
val test_nested_dataframe: String = Random.alphanumeric.filter(_.isLetter).take(10).mkString
val test_no_staging_table: String = Random.alphanumeric.filter(_.isLetter).take(10).mkString
val test_table_name: String = Random.alphanumeric.filter(_.isLetter).take(10).mkString

override def afterAll(): Unit = {
jdbcUpdate(s"drop table if exists $test_all_type")
Expand All @@ -41,6 +42,7 @@ class ParquetSuite extends IntegrationSuiteBase {
jdbcUpdate(s"drop table if exists $test_column_map_not_match")
jdbcUpdate(s"drop table if exists $test_nested_dataframe")
jdbcUpdate(s"drop table if exists $test_no_staging_table")
jdbcUpdate(s"drop table if exists $test_table_name")
super.afterAll()
}

Expand Down Expand Up @@ -707,4 +709,53 @@ class ParquetSuite extends IntegrationSuiteBase {
val res = sparkSession.sql(s"show tables like '%${test_all_type}_STAGING%'").collect()
assert(res.length == 0)
}

test("use parquet in structured type by default") {
// use CSV by default
sparkSession
.sql("select 1")
.write
.format(SNOWFLAKE_SOURCE_NAME)
.options(connectorOptionsNoTable)
.option("dbtable", test_table_name)
.mode(SaveMode.Overwrite)
.save()
assert(Utils.getLastCopyLoad.contains("TYPE=CSV"))

// use Parquet on structured types
sparkSession
.sql("select array(1, 2)")
.write
.format(SNOWFLAKE_SOURCE_NAME)
.options(connectorOptionsNoTable)
.option("dbtable", test_table_name)
.mode(SaveMode.Overwrite)
.save()
assert(Utils.getLastCopyLoad.contains("TYPE=PARQUET"))

// use Json on structured types when PARAM_USE_JSON_IN_STRUCTURED_DATA is true
sparkSession
.sql("select array(1, 2)")
.write
.format(SNOWFLAKE_SOURCE_NAME)
.options(connectorOptionsNoTable)
.option("dbtable", test_table_name)
.option(Parameters.PARAM_USE_JSON_IN_STRUCTURED_DATA, "true")
.mode(SaveMode.Overwrite)
.save()
assert(Utils.getLastCopyLoad.contains("TYPE = JSON"))

// PARAM_USE_PARQUET_IN_WRITE can overwrite PARAM_USE_JSON_IN_STRUCTURED_DATA
sparkSession
.sql("select array(1, 2)")
.write
.format(SNOWFLAKE_SOURCE_NAME)
.options(connectorOptionsNoTable)
.option("dbtable", test_table_name)
.option(Parameters.PARAM_USE_JSON_IN_STRUCTURED_DATA, "true")
.option(Parameters.PARAM_USE_PARQUET_IN_WRITE, "true")
.mode(SaveMode.Overwrite)
.save()
assert(Utils.getLastCopyLoad.contains("TYPE=PARQUET"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2004,6 +2004,7 @@ class SnowflakeResultSetRDDSuite extends IntegrationSuiteBase {
.format(SNOWFLAKE_SOURCE_NAME)
.options(thisConnectorOptionsNoTable)
.option("dbtable", test_table_write)
.option(Parameters.PARAM_USE_JSON_IN_STRUCTURED_DATA, "true")
.mode(SaveMode.Overwrite)
.save()

Expand All @@ -2019,6 +2020,7 @@ class SnowflakeResultSetRDDSuite extends IntegrationSuiteBase {
.format(SNOWFLAKE_SOURCE_NAME)
.options(thisConnectorOptionsNoTable)
.option("dbtable", test_table_write)
.option(Parameters.PARAM_USE_JSON_IN_STRUCTURED_DATA, "true")
.option(Parameters.PARAM_INTERNAL_QUOTE_JSON_FIELD_NAME, "false")
.mode(SaveMode.Overwrite)
.save()
Expand All @@ -2031,6 +2033,7 @@ class SnowflakeResultSetRDDSuite extends IntegrationSuiteBase {
.options(thisConnectorOptionsNoTable)
.option("dbtable", test_table_write)
.option(Parameters.PARAM_INTERNAL_QUOTE_JSON_FIELD_NAME, "false")
.option(Parameters.PARAM_USE_JSON_IN_STRUCTURED_DATA, "true")
.mode(SaveMode.Overwrite)
.save()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ class VariantTypeSuite extends IntegrationSuiteBase {
.options(connectorOptionsNoTable)
.option("dbtable", tableName2)
.option(Parameters.PARAM_INTERNAL_USE_PARSE_JSON_FOR_WRITE, "true")
.option(Parameters.PARAM_USE_JSON_IN_STRUCTURED_DATA, "true")
.mode(SaveMode.Overwrite)
.save()

Expand Down Expand Up @@ -242,6 +243,7 @@ class VariantTypeSuite extends IntegrationSuiteBase {
.format(SNOWFLAKE_SOURCE_NAME)
.options(connectorOptionsNoTable)
.option("dbtable", tableName4)
.option(Parameters.PARAM_USE_JSON_IN_STRUCTURED_DATA, "true")
.mode(SaveMode.Overwrite)
.save()

Expand Down
28 changes: 25 additions & 3 deletions src/main/scala/net/snowflake/spark/snowflake/Parameters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,18 @@ object Parameters {
val BOOLEAN_VALUES_FALSE: Set[String] =
Set("off", "no", "false", "0", "disabled")

// enable parquet format
// enable parquet format when loading data from Spark to Snowflake.
// When enabled, Spark connector will only use Parquet file format.
val PARAM_USE_PARQUET_IN_WRITE: String = knownParam("use_parquet_in_write")

// By default, Spark connector uses CSV format when loading data from Spark to Snowflake.
// If the dataframe contains any structured type, Spark connector will use Parquet
// format instead of CSV.
// When this parameter is enabled, Spark connector will use JSON format when loading
// structured data but not Parquet.
// it will be ignored if USE_PARQUET_IN_WRITE parameter is enabled.
val PARAM_USE_JSON_IN_STRUCTURED_DATA: String = knownParam("use_json_in_structured_data")

/**
* Helper method to check if a given string represents some form
* of "true" value, see BOOLEAN_VALUES_TRUE
Expand Down Expand Up @@ -297,7 +306,8 @@ object Parameters {
PARAM_TIMESTAMP_LTZ_OUTPUT_FORMAT -> "TZHTZM YYYY-MM-DD HH24:MI:SS.FF3",
PARAM_TIMESTAMP_TZ_OUTPUT_FORMAT -> "TZHTZM YYYY-MM-DD HH24:MI:SS.FF3",
PARAM_TRIM_SPACE -> "false",
PARAM_USE_PARQUET_IN_WRITE -> "false"
PARAM_USE_PARQUET_IN_WRITE -> "false",
PARAM_USE_JSON_IN_STRUCTURED_DATA -> "false"

)

Expand Down Expand Up @@ -613,13 +623,25 @@ object Parameters {
def createPerQueryTempDir(): String = Utils.makeTempPath(rootTempDir)

/**
* Use parquet form in download by default
* Use parquet format when loading data from Spark to Snowflake
*/
def useParquetInWrite(): Boolean = {
isTrue(parameters.getOrElse(PARAM_USE_PARQUET_IN_WRITE, "false"))

}

/**
* Use JSON format when loading structured data from Spark to Snowflake
*/
def useJsonInWrite(): Boolean = {
if (useParquetInWrite()) {
// USE_PARQUET_IN_WRITE parameter can overwrite this parameter
false
} else {
isTrue(parameters.getOrElse(PARAM_USE_JSON_IN_STRUCTURED_DATA, "false"))
}
}

/**
* The Snowflake table to be used as the target when loading or writing data.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private[snowflake] class SnowflakeWriter(jdbcWrapper: JDBCWrapper) {
if (params.useParquetInWrite()) {
SupportedFormat.PARQUET
} else if (Utils.containVariant(data.schema)){
SupportedFormat.JSON
if (params.useJsonInWrite()) SupportedFormat.JSON else SupportedFormat.PARQUET
}
else {
SupportedFormat.CSV
Expand All @@ -74,7 +74,7 @@ private[snowflake] class SnowflakeWriter(jdbcWrapper: JDBCWrapper) {
)
params.setColumnMap(Option(data.schema), toSchema)
} finally conn.close()
} else if (params.columnMap.isDefined && params.useParquetInWrite()){
} else if (params.columnMap.isDefined && format == SupportedFormat.PARQUET){
val conn = jdbcWrapper.getConnector(params)
try {
val toSchema = Utils.removeQuote(
Expand All @@ -94,7 +94,7 @@ private[snowflake] class SnowflakeWriter(jdbcWrapper: JDBCWrapper) {
} finally conn.close()
}

if (params.useParquetInWrite()){
if (format == SupportedFormat.PARQUET){
val conn = jdbcWrapper.getConnector(params)
try{
if (jdbcWrapper.tableExists(params, params.table.get.name)){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ private[io] object StageWriter {
if (!tableExists)
{
writeTableState.createTable(tableName,
if (params.useParquetInWrite()) params.toSnowflakeSchema(schema) else schema,
if (format == SupportedFormat.PARQUET) params.toSnowflakeSchema(schema) else schema,
params)
} else if (params.truncateTable && saveMode == SaveMode.Overwrite) {
writeTableState.truncateTable(tableName)
Expand Down Expand Up @@ -425,7 +425,7 @@ private[io] object StageWriter {
if (saveMode == SaveMode.Overwrite || !tableExists)
{
conn.createTable(targetTable.name,
if (params.useParquetInWrite()) params.toSnowflakeSchema(schema) else schema,
if (format == SupportedFormat.PARQUET) params.toSnowflakeSchema(schema) else schema,
params,
overwrite = false, temporary = false)
}
Expand Down Expand Up @@ -904,7 +904,7 @@ private[io] object StageWriter {
val fromString = ConstantString(s"FROM @$tempStage/$prefix/") !

val mappingList: Option[List[(Int, String)]] =
if (params.useParquetInWrite()) None else params.columnMap match {
if (format == SupportedFormat.PARQUET) None else params.columnMap match {
case Some(map) =>
Some(map.toList.map {
case (key, value) =>
Expand Down
Loading