-
Notifications
You must be signed in to change notification settings - Fork 63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Provide spark catalog, dsv2 and use parquet for copy/unload #120
Conversation
…563_remove-itests-from-public Remove itests. Fix jdbc url. Update Redshift jdbc driver
…488_cleanup-fix-double-to-float Fix double type to float and cleanup
…486_avoid-log-creds datalake-486 avoid log creds
…4899_empty-string-to-null Empty string is converted to null
…sion 3.0.0 release
…un - fix for STS token aws access in progress
…ion between different libraries versions! Tests pass and can compile spark-on-paasta and spark successfullygit add src/ project/
Merge branch 'lbc/build-catalog' into parisni-dsv2
apparently this does not work. More importantly, agg pushdown put the load on redshift, and this might not be a good idea.
@@ -0,0 +1 @@ | |||
sbt.version=1.7.1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this file about? I don't think it's needed, and it also doesn't match the other sbt.version
project/plugins.sbt
Outdated
addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.6.0") | ||
|
||
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.5") | ||
|
||
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0") | ||
|
||
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.8.0") | ||
|
||
addSbtPlugin("me.lessis" % "bintray-sbt" % "0.3.0") | ||
|
||
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.0") | ||
|
||
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why removing these plugins? Some like dependency-graph as just nice to have, but I'm pretty sure others like sbt-release and sbt-pgp are needed in order to release the jars to sonatype.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will revert that commit about sbt. For obscur reasons I need them to build in my current Dev setup.
build.sbt
Outdated
releaseCrossBuild := true, | ||
licenses += ("Apache-2.0", url("http://www.apache.org/licenses/LICENSE-2.0")), | ||
releasePublishArtifactsAction := PgpKeys.publishSigned.value, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about changes in this file either
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same, ignore this, will revert
@@ -570,6 +607,12 @@ and use that as a temp location for this data. | |||
<td>Determined by the JDBC URL's subprotocol</td> | |||
<td>The class name of the JDBC driver to use. This class must be on the classpath. In most cases, it should not be necessary to specify this option, as the appropriate driver classname should automatically be determined by the JDBC URL's subprotocol.</td> | |||
</tr> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets add a readme entry for the unload format too
"unloadformat" -> "csv", | ||
"table_minutes_ttl" -> "-1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand the PR correctly, only the v2 sources implement these parameters. Do you intend to make the v1 sources respect these parameters too? I think it's ok if they don't, but then we probably should make a distinct parameters class for dsv2 to make it clearer to users about what is available in each version.
That would also let us make the v2 sources default to parquet unloadformat without breaking any backwards compatibility things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could backport the TTL for data source v1. But your proposal makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dsv2 provides limit pushdown in spark 3.3.x, so dsv1 and lot of code about CSV format could just be removed instead. That's the alternate way idlike to discuss
import io.github.spark_redshift_community.spark.redshift | ||
|
||
|
||
class RedshiftCatalog extends JDBCTableCatalog { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add any test coverage of the catalog capabilities?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah catalog is a good way to test the whole thing
(ident.namespace() :+ ident.name()).map(dialect.quoteIdentifier).mkString(".") | ||
} | ||
override def invalidateTable(ident: Identifier): Unit = { | ||
// TODO When refresh table, then drop the s3 folder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a todo within this PR or for late? One of the readme entries mentioned we could invalidate the cache by doing a table refresh. I'm not sure if that is the same thing as invalidateTable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, and I am about implementing it
val convertedReadSchema = StructType(readDataSchema() | ||
.copy().map(field => field.copy(dataType = StringType))) | ||
val convertedDataSchema = StructType(dataSchema.copy().map(x => x.copy(dataType = StringType))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpicking: only need to do these conversions if we're in csv mode
/** | ||
* A name to identify this table. Implementations should provide a meaningful name, like the | ||
* database and table name from catalog, or the location of files for this table. | ||
*/ | ||
override def name(): String = "redshift" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know how this name is used in spark. Do we need to provide something more descriptive if we want users to be able to load multiple tables? We have getTableNameOrSubquery
param which might be nice to include in this name depending on how it is used?
|
||
val jdbcWrapper: JDBCWrapper = DefaultJDBCWrapper | ||
|
||
private def buildUnloadStmt( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
related to the other comment about v1 sources supporting ttl and parquet. iirc there is a build unload stmt method somewhere already and we might be able to share code between the v1 and v2 sources to have a single unload stmt builder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah refactoring this would be valuable
src/main/scala/io/github/spark_redshift_community/spark/redshift/RedshiftCatalog.scala
Outdated
Show resolved
Hide resolved
import org.apache.hadoop.fs.{FileSystem, Path} | ||
import java.net.URI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious why importing here instead of on the module like usual.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well in case I d'like to move the logic somewhere else this will help a bit. Nothing really important
This avoid s3 file listings to find the last cache candidate Also use hadoop FS instead of aws low level client
@@ -179,15 +181,44 @@ private[redshift] class JDBCWrapper { | |||
val isSigned = rsmd.isSigned(i + 1) | |||
val nullable = rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls | |||
val columnType = getCatalystType(dataType, fieldSize, fieldScale, isSigned) | |||
val comment = comments.get(columnName) | |||
if(!comment.isEmpty){ | |||
fields(i) = StructField(columnName, columnType, nullable, comment.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Indentation.
@@ -165,7 +166,8 @@ private[redshift] class JDBCWrapper { | |||
// the underlying JDBC driver implementation implements PreparedStatement.getMetaData() by | |||
// executing the query. It looks like the standard Redshift and Postgres JDBC drivers don't do | |||
// this but we leave the LIMIT condition here as a safety-net to guard against perf regressions. | |||
val ps = conn.prepareStatement(s"SELECT * FROM $table LIMIT 1") | |||
val comments = resolveComments(conn, table) | |||
val ps = conn.prepareStatement(s"SELECT * FROM $table LIMIT 0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the LIMIT changed to 0, since we only care about column metadata?
And making slightly more performant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah definitely
@@ -179,15 +181,44 @@ private[redshift] class JDBCWrapper { | |||
val isSigned = rsmd.isSigned(i + 1) | |||
val nullable = rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls | |||
val columnType = getCatalystType(dataType, fieldSize, fieldScale, isSigned) | |||
val comment = comments.get(columnName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is added to preseve the comments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indeed
found out 2 issues on this:
|
checkAnswer( | ||
sqlContext.sql("select * from test_table"), | ||
TestUtils.expectedData) | ||
withUnloadFormat { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, RedshiftReadSuite only caters towards testing 'csv' format?
If so, are there plans to extend this to 'parquet' format too?
If not, I believe the changes in this particular file are no-op.
/** | ||
* Create a new DataFrameReader using common options for reading from Redshift. | ||
*/ | ||
override protected def read: DataFrameReader = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this test file complete? Are there plans to add more tests similar to RedshiftReadSuite.scala?
/** | ||
* The AWS SSE-KMS key to use for encryption during UNLOAD operations | ||
* instead of AWS's default encryption | ||
*/ | ||
def sseKmsKey: Option[String] = parameters.get("sse_kms_key") | ||
|
||
/** | ||
* The Int value to write for nulls when using CSV. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slightly confused here for the comment.
Do you intend to write, "The Int value to write for ttl when using CSV."
Because of an introduction of sensitive materials recently, I have to rewrite history using the procedure here: https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/removing-sensitive-data-from-a-repository This create a lot more conflict in this pull request. If this PR is still wanted, but probably open a new one instead. In addition, the AWS contribution has brought along many improvement that included some of the intended features of this original PR. Check #128 |
This PR: