Skip to content

Commit

Permalink
refactor cloud operations
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bli committed Oct 30, 2024
1 parent 793aa21 commit a162c7d
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 51 deletions.
15 changes: 3 additions & 12 deletions src/it/scala/net/snowflake/spark/snowflake/io/StageSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -311,16 +311,13 @@ class StageSuite extends IntegrationSuiteBase {
try {
// The credential for the external stage is fake.
val azureExternalStage = ExternalAzureStorage(
param,
containerName = "test_fake_container",
azureAccount = "test_fake_account",
azureEndpoint = "blob.core.windows.net",
azureSAS =
"?sig=test_test_test_test_test_test_test_test_test_test_test_test" +
"_test_test_test_test_test_fak&spr=https&sp=rwdl&sr=c",
param.proxyInfo,
param.maxRetryCount,
param.sfURL,
param.useExponentialBackoff,
param.expectedPartitionCount,
pref = "test_dir",
connection = connection
Expand Down Expand Up @@ -367,13 +364,10 @@ class StageSuite extends IntegrationSuiteBase {
try {
// The credential for the external stage is fake.
val s3ExternalStage = ExternalS3Storage(
param,
bucketName = "test_fake_bucket",
awsId = "TEST_TEST_TEST_TEST1",
awsKey = "TEST_TEST_TEST_TEST_TEST_TEST_TEST_TEST2",
param.proxyInfo,
param.maxRetryCount,
param.sfURL,
param.useExponentialBackoff,
param.expectedPartitionCount,
pref = "test_dir",
connection = connection,
Expand Down Expand Up @@ -487,13 +481,10 @@ class StageSuite extends IntegrationSuiteBase {
try {
// The credential for the external stage is fake.
val s3ExternalStage = ExternalS3Storage(
param,
bucketName = "test_fake_bucket",
awsId = "TEST_TEST_TEST_TEST1",
awsKey = "TEST_TEST_TEST_TEST_TEST_TEST_TEST_TEST2",
param.proxyInfo,
param.maxRetryCount,
param.sfURL,
param.useExponentialBackoff,
param.expectedPartitionCount,
pref = "test_dir",
connection = connection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,14 +290,11 @@ object CloudStorageOperations {

(
ExternalAzureStorage(
param = param,
containerName = container,
azureAccount = account,
azureEndpoint = endpoint,
azureSAS = azureSAS,
param.proxyInfo,
param.maxRetryCount,
param.sfURL,
param.useExponentialBackoff,
param.expectedPartitionCount,
pref = path,
connection = conn
Expand All @@ -321,13 +318,10 @@ object CloudStorageOperations {

(
ExternalS3Storage(
param = param,
bucketName = bucket,
awsId = param.awsAccessKey.get,
awsKey = param.awsSecretKey.get,
param.proxyInfo,
param.maxRetryCount,
param.sfURL,
param.useExponentialBackoff,
param.expectedPartitionCount,
pref = prefix,
connection = conn,
Expand Down Expand Up @@ -495,14 +489,15 @@ private[io] object StorageInfo {
}

sealed trait CloudStorage {
protected val param: MergedParameters
protected val RETRY_SLEEP_TIME_UNIT_IN_MS: Int = 1500
protected val MAX_SLEEP_TIME_IN_MS: Int = 3 * 60 * 1000
private var processedFileCount = 0
protected val connection: ServerConnection
protected val maxRetryCount: Int
protected val proxyInfo: Option[ProxyInfo]
protected val sfURL: String
protected val useExponentialBackoff: Boolean
protected val maxRetryCount: Int = param.maxRetryCount
protected val proxyInfo: Option[ProxyInfo] = param.proxyInfo
protected val sfURL: String = param.sfURL
protected val useExponentialBackoff: Boolean = param.useExponentialBackoff

// The first 10 sleep time in second will be like
// 3, 6, 12, 24, 48, 96, 192, 300, 300, 300, etc
Expand Down Expand Up @@ -921,15 +916,11 @@ sealed trait CloudStorage {
def fileExists(fileName: String): Boolean
}

case class InternalAzureStorage(param: MergedParameters,
case class InternalAzureStorage(override protected val param: MergedParameters,
stageName: String,
@transient override val connection: ServerConnection)
extends CloudStorage {

override val maxRetryCount = param.maxRetryCount
override val proxyInfo: Option[ProxyInfo] = param.proxyInfo
override val sfURL = param.sfURL
override val useExponentialBackoff = param.useExponentialBackoff

override protected def getStageInfo(
isWrite: Boolean,
Expand Down Expand Up @@ -1150,14 +1141,11 @@ case class InternalAzureStorage(param: MergedParameters,
}
}

case class ExternalAzureStorage(containerName: String,
case class ExternalAzureStorage(override protected val param: MergedParameters,
containerName: String,
azureAccount: String,
azureEndpoint: String,
azureSAS: String,
override val proxyInfo: Option[ProxyInfo],
override val maxRetryCount: Int,
override val sfURL: String,
override val useExponentialBackoff: Boolean,
fileCountPerPartition: Int,
pref: String = "",
@transient override val connection: ServerConnection)
Expand Down Expand Up @@ -1302,16 +1290,12 @@ case class ExternalAzureStorage(containerName: String,
}
}

case class InternalS3Storage(param: MergedParameters,
case class InternalS3Storage(override protected val param: MergedParameters,
stageName: String,
@transient override val connection: ServerConnection,
parallelism: Int =
CloudStorageOperations.DEFAULT_PARALLELISM)
extends CloudStorage {
override val maxRetryCount = param.maxRetryCount
override val proxyInfo: Option[ProxyInfo] = param.proxyInfo
override val sfURL = param.sfURL
override val useExponentialBackoff = param.useExponentialBackoff

override protected def getStageInfo(
isWrite: Boolean,
Expand Down Expand Up @@ -1550,13 +1534,10 @@ case class InternalS3Storage(param: MergedParameters,
}
}

case class ExternalS3Storage(bucketName: String,
case class ExternalS3Storage(override protected val param: MergedParameters,
bucketName: String,
awsId: String,
awsKey: String,
override val proxyInfo: Option[ProxyInfo],
override val maxRetryCount: Int,
override val sfURL: String,
override val useExponentialBackoff: Boolean,
fileCountPerPartition: Int,
awsToken: Option[String] = None,
pref: String = "",
Expand Down Expand Up @@ -1704,18 +1685,12 @@ case class ExternalS3Storage(bucketName: String,

// Internal CloudStorage for GCS (Google Cloud Storage).
// NOTE: External storage for GCS is not supported.
case class InternalGcsStorage(param: MergedParameters,
case class InternalGcsStorage(override protected val param: MergedParameters,
stageName: String,
@transient override val connection: ServerConnection,
@transient stageManager: SFInternalStage)
extends CloudStorage {

override val proxyInfo: Option[ProxyInfo] = param.proxyInfo
// Max retry count to upload a file
override val maxRetryCount: Int = param.maxRetryCount
override val sfURL = param.sfURL
override val useExponentialBackoff = param.useExponentialBackoff

// Generate file transfer metadata objects for file upload. On GCS,
// the file transfer metadata is pre-signed URL and related metadata.
// This function is called on Master node.
Expand Down

0 comments on commit a162c7d

Please sign in to comment.