Skip to content

Commit

Permalink
chore: sync release v1.40.2 to main branch (#5433)
Browse files Browse the repository at this point in the history
  • Loading branch information
devops-github-rudderstack authored Jan 14, 2025
1 parent 7bb084e commit ce9bd79
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 1 deletion.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## [1.40.2](https://github.com/rudderlabs/rudder-server/compare/v1.40.1...v1.40.2) (2025-01-14)


### Bug Fixes

* databricks external location ([#5429](https://github.com/rudderlabs/rudder-server/issues/5429)) ([7a2bcf9](https://github.com/rudderlabs/rudder-server/commit/7a2bcf9e877a901ffcb309fb56fadcf35ff15952))


## [1.40.1](https://github.com/rudderlabs/rudder-server/compare/v1.40.0...v1.40.1) (2025-01-08)


Expand Down
2 changes: 1 addition & 1 deletion warehouse/integrations/deltalake/deltalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ func (d *Deltalake) tableLocationQuery(tableName string) string {
enableExternalLocation := d.Warehouse.GetBoolDestinationConfig(model.EnableExternalLocationSetting)
externalLocation := d.Warehouse.GetStringDestinationConfig(d.conf, model.ExternalLocationSetting)

if enableExternalLocation || externalLocation == "" {
if !enableExternalLocation || externalLocation == "" {
return ""
}

Expand Down
50 changes: 50 additions & 0 deletions warehouse/integrations/deltalake/deltalake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,8 @@ func TestIntegration(t *testing.T) {

err = d.CreateTable(ctx, tableName, schemaInWarehouse)
require.NoError(t, err)
metadata := tableMetadata(t, d.DB.DB, namespace, tableName)
require.Equal(t, "MANAGED", metadata["Type"])

loadTableStat, err := d.LoadTable(ctx, tableName)
require.NoError(t, err)
Expand Down Expand Up @@ -1173,6 +1175,34 @@ func TestIntegration(t *testing.T) {
require.Equal(t, records, whth.SampleTestRecords())
})
})
t.Run("external tables", func(t *testing.T) {
tableName := "external_tables"
uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName)

loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false)

externalWarehouse := th.Clone(t, warehouse)
externalWarehouse.Destination.Config["enableExternalLocation"] = true
externalWarehouse.Destination.Config["externalLocation"] = "/path/to/delta/table"

d := deltalake.New(config.New(), logger.NOP, stats.NOP)
require.NoError(t, d.Setup(ctx, externalWarehouse, mockUploader))
require.NoError(t, d.CreateSchema(ctx))
t.Cleanup(func() {
dropSchema(t, d.DB.DB, namespace)
})

require.NoError(t, d.CreateTable(ctx, tableName, schemaInWarehouse))
metadata := tableMetadata(t, d.DB.DB, namespace, tableName)
require.Equal(t, "EXTERNAL", metadata["Type"])
require.Equal(t, "dbfs:/path/to/delta/table/"+namespace+"/"+tableName, metadata["Location"])

loadTableStat, err := d.LoadTable(ctx, tableName)
require.NoError(t, err)
require.Equal(t, loadTableStat.RowsInserted, int64(14))
require.Equal(t, loadTableStat.RowsUpdated, int64(0))
})
})

t.Run("Fetch Schema", func(t *testing.T) {
Expand Down Expand Up @@ -1291,6 +1321,26 @@ func TestIntegration(t *testing.T) {
})
}

func tableMetadata(t *testing.T, db *sql.DB, namespace, tableName string) map[string]string {
t.Helper()
t.Log("table metadata for", namespace, tableName)

var database, table, isTemporary, information string
err := db.QueryRowContext(context.Background(), fmt.Sprintf("SHOW TABLE EXTENDED IN %s LIKE '%s'", namespace, tableName)).Scan(&database, &table, &isTemporary, &information)
require.NoError(t, err)

metadataMap := make(map[string]string)
for _, line := range strings.Split(information, "\n") {
parts := strings.SplitN(line, ":", 2)
if len(parts) >= 2 {
key := strings.TrimSpace(parts[0])
value := strings.TrimSpace(parts[1])
metadataMap[key] = value
}
}
return metadataMap
}

func dropSchema(t *testing.T, db *sql.DB, namespace string) {
t.Helper()
t.Log("dropping schema", namespace)
Expand Down

0 comments on commit ce9bd79

Please sign in to comment.