Skip to content

Commit

Permalink
feat: delete staging and load files from object storage post successf…
Browse files Browse the repository at this point in the history
…ul sync
  • Loading branch information
shekhar-rudder committed Jan 21, 2025
1 parent 2541b1c commit 47e7504
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 19 deletions.
68 changes: 49 additions & 19 deletions integration_test/warehouse/warehouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ func TestMain(m *testing.M) {

func TestUploads(t *testing.T) {
t.Run("tracks loading", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false, nil, nil)
db, minioResource, whClient, bcConfig := setupServer(t, false, func(m map[string]backendconfig.ConfigT, _ *minio.Resource) {
m[workspaceID].Sources[0].Destinations[0].Config["cleanupObjectStorageFiles"] = true
}, nil)

var (
ctx = context.Background()
Expand All @@ -95,6 +97,7 @@ func TestUploads(t *testing.T) {
uuid.New().String(),
)
}), "\n")
useRudderStorage := false

require.NoError(t, whClient.Process(ctx, whclient.StagingFile{
WorkspaceID: workspaceID,
Expand All @@ -104,7 +107,7 @@ func TestUploads(t *testing.T) {
TotalEvents: events,
FirstEventAt: time.Now().Format(misc.RFC3339Milli),
LastEventAt: time.Now().Add(time.Minute * 30).Format(misc.RFC3339Milli),
UseRudderStorage: false,
UseRudderStorage: useRudderStorage,
DestinationRevisionID: destinationID,
Schema: map[string]map[string]any{
"tracks": {
Expand Down Expand Up @@ -132,9 +135,11 @@ func TestUploads(t *testing.T) {
{A: "status", B: exportedData},
}...)
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "tracks"), events)
// All the files should be deleted from the object storage since cleanupObjectStorageFiles is set to true
requireObjectStorageCount(t, ctx, bcConfig, useRudderStorage, 0)
})
t.Run("user and identifies loading", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false, nil, nil)
db, minioResource, whClient, bcConfig := setupServer(t, false, nil, nil)

ctx := context.Background()
events := 100
Expand All @@ -153,6 +158,7 @@ func TestUploads(t *testing.T) {
})
eventsPayload := strings.Join(append(append([]string{}, userEvents...), identifyEvents...), "\n")

useRudderStorage := false
require.NoError(t, whClient.Process(ctx, whclient.StagingFile{
WorkspaceID: workspaceID,
SourceID: sourceID,
Expand All @@ -161,7 +167,7 @@ func TestUploads(t *testing.T) {
TotalEvents: events,
FirstEventAt: time.Now().Format(misc.RFC3339Milli),
LastEventAt: time.Now().Add(time.Minute * 30).Format(misc.RFC3339Milli),
UseRudderStorage: false,
UseRudderStorage: useRudderStorage,
DestinationRevisionID: destinationID,
Schema: map[string]map[string]any{
"users": {
Expand Down Expand Up @@ -194,10 +200,13 @@ func TestUploads(t *testing.T) {
}...)
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "users"), events)
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "identifies"), events)
// Files should not be deleted from the object storage since cleanupObjectStorageFiles is not set to true
// 1 staging file + 2 load files (users and identifies)
requireObjectStorageCount(t, ctx, bcConfig, useRudderStorage, 3)
})
t.Run("schema change", func(t *testing.T) {
t.Run("add columns", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false, nil, nil)
db, minioResource, whClient, _ := setupServer(t, false, nil, nil)

ctx := context.Background()
events := 100
Expand Down Expand Up @@ -607,7 +616,7 @@ func TestUploads(t *testing.T) {
})
t.Run("reports", func(t *testing.T) {
t.Run("succeeded", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false, nil, nil)
db, minioResource, whClient, _ := setupServer(t, false, nil, nil)

ctx := context.Background()
events := 100
Expand Down Expand Up @@ -668,7 +677,7 @@ func TestUploads(t *testing.T) {
}...)
})
t.Run("aborted", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false,
db, minioResource, whClient, _ := setupServer(t, false,
func(m map[string]backendconfig.ConfigT, _ *minio.Resource) {
m[workspaceID].Sources[0].Destinations[0].Config["port"] = "5432"
},
Expand Down Expand Up @@ -740,7 +749,7 @@ func TestUploads(t *testing.T) {
})
})
t.Run("retries then aborts", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false,
db, minioResource, whClient, _ := setupServer(t, false,
func(m map[string]backendconfig.ConfigT, _ *minio.Resource) {
m[workspaceID].Sources[0].Destinations[0].Config["port"] = "5432"
},
Expand Down Expand Up @@ -797,7 +806,7 @@ func TestUploads(t *testing.T) {
}...)
})
t.Run("discards", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false, nil, nil)
db, minioResource, whClient, _ := setupServer(t, false, nil, nil)

ctx := context.Background()
events := 100
Expand Down Expand Up @@ -856,7 +865,7 @@ func TestUploads(t *testing.T) {
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "rudder_discards"), events/2)
})
t.Run("archiver", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false, nil,
db, minioResource, whClient, _ := setupServer(t, false, nil,
func(minioResource *minio.Resource) []lo.Tuple2[string, any] {
return []lo.Tuple2[string, any]{
{A: "Warehouse.archiveUploadRelatedRecords", B: true},
Expand Down Expand Up @@ -923,7 +932,7 @@ func TestUploads(t *testing.T) {
})
t.Run("sync behaviour", func(t *testing.T) {
t.Run("default behaviour", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false, nil, nil)
db, minioResource, whClient, _ := setupServer(t, false, nil, nil)

ctx := context.Background()
events := 100
Expand Down Expand Up @@ -998,7 +1007,7 @@ func TestUploads(t *testing.T) {
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "tracks"), events)
})
t.Run("allowMerge=false,preferAppend=false", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false, nil,
db, minioResource, whClient, _ := setupServer(t, false, nil,
func(_ *minio.Resource) []lo.Tuple2[string, any] {
return []lo.Tuple2[string, any]{
{A: "Warehouse.postgres.allowMerge", B: false},
Expand Down Expand Up @@ -1079,7 +1088,7 @@ func TestUploads(t *testing.T) {
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "tracks"), events*2)
})
t.Run("allowMerge=true,preferAppend=true", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, true, nil,
db, minioResource, whClient, _ := setupServer(t, true, nil,
func(_ *minio.Resource) []lo.Tuple2[string, any] {
return []lo.Tuple2[string, any]{
{A: "Warehouse.postgres.allowMerge", B: true},
Expand Down Expand Up @@ -1160,7 +1169,7 @@ func TestUploads(t *testing.T) {
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "tracks"), events*2)
})
t.Run("allowMerge=false,preferAppend=true", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, true, nil,
db, minioResource, whClient, _ := setupServer(t, true, nil,
func(_ *minio.Resource) []lo.Tuple2[string, any] {
return []lo.Tuple2[string, any]{
{A: "Warehouse.postgres.allowMerge", B: false},
Expand Down Expand Up @@ -1241,7 +1250,7 @@ func TestUploads(t *testing.T) {
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "tracks"), events*2)
})
t.Run("allowMerge=false,preferAppend=true,isSourceETL=true", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, true, nil,
db, minioResource, whClient, _ := setupServer(t, true, nil,
func(_ *minio.Resource) []lo.Tuple2[string, any] {
return []lo.Tuple2[string, any]{
{A: "Warehouse.postgres.allowMerge", B: false},
Expand Down Expand Up @@ -1325,7 +1334,7 @@ func TestUploads(t *testing.T) {
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "tracks"), events*2)
})
t.Run("allowMerge=false,preferAppend=true,IsReplaySource=true", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, true,
db, minioResource, whClient, _ := setupServer(t, true,
func(m map[string]backendconfig.ConfigT, _ *minio.Resource) {
m[workspaceID].Sources[0].OriginalID = sourceID
},
Expand Down Expand Up @@ -1409,7 +1418,7 @@ func TestUploads(t *testing.T) {
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "tracks"), events*2)
})
t.Run("allowMerge=false,preferAppend=true,sourceCategory=cloud", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, true,
db, minioResource, whClient, _ := setupServer(t, true,
func(m map[string]backendconfig.ConfigT, _ *minio.Resource) {
m[workspaceID].Sources[0].SourceDefinition.Category = "cloud"
},
Expand Down Expand Up @@ -1494,7 +1503,7 @@ func TestUploads(t *testing.T) {
})
})
t.Run("id resolution", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false, nil, nil)
db, minioResource, whClient, _ := setupServer(t, false, nil, nil)

ctx := context.Background()
events := 100
Expand Down Expand Up @@ -2330,6 +2339,26 @@ func requireDownstreamEventsCount(
)
}

func requireObjectStorageCount(t testing.TB, ctx context.Context, bcConfig map[string]backendconfig.ConfigT, useRudderStorage bool, expectedCount int) {
destination := bcConfig[workspaceID].Sources[0].Destinations[0]
storageProvider := whutils.ObjectStorageType(destination.DestinationDefinition.Name, destination.Config, useRudderStorage)
fm, err := filemanager.New(&filemanager.Settings{Provider: storageProvider, Config: destination.Config})
require.NoError(t, err)
fileIter := fm.ListFilesWithPrefix(ctx, "", fm.Prefix(), 1000)
files := make([]string, 0)
for {
fileInfo, err := fileIter.Next()
require.NoError(t, err)
if len(fileInfo) == 0 {
break
}
for _, file := range fileInfo {
files = append(files, file.Key)
}
}
require.Len(t, files, expectedCount)
}

func requireReportsCount(
t testing.TB,
ctx context.Context,
Expand Down Expand Up @@ -2372,6 +2401,7 @@ func setupServer(
*sqlmw.DB,
*minio.Resource,
*whclient.Warehouse,
map[string]backendconfig.ConfigT,
) {
t.Helper()

Expand Down Expand Up @@ -2411,5 +2441,5 @@ func setupServer(
serverURL := fmt.Sprintf("http://localhost:%d", webPort)
health.WaitUntilReady(ctx, t, serverURL+"/health", time.Second*30, 100*time.Millisecond, t.Name())

return sqlmw.New(pgResource.DB), minioResource, whclient.NewWarehouse(serverURL)
return sqlmw.New(pgResource.DB), minioResource, whclient.NewWarehouse(serverURL), bcConfig
}
35 changes: 35 additions & 0 deletions warehouse/router/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cenkalti/backoff/v4"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

Expand Down Expand Up @@ -363,6 +364,9 @@ func (job *UploadJob) run() (err error) {
if err = job.exportData(); err != nil {
break
}
if err = job.cleanupObjectStorageFiles(); err != nil {
break

Check warning on line 368 in warehouse/router/upload.go

View check run for this annotation

Codecov / codecov/patch

warehouse/router/upload.go#L368

Added line #L368 was not covered by tests
}
newStatus = nextUploadState.completed

default:
Expand Down Expand Up @@ -427,6 +431,37 @@ func (job *UploadJob) run() (err error) {
return nil
}

func (job *UploadJob) cleanupObjectStorageFiles() error {
destination := job.warehouse.Destination
cleanupObjectStorageFiles, _ := destination.Config["cleanupObjectStorageFiles"].(bool)
if cleanupObjectStorageFiles {
storageProvider := whutils.ObjectStorageType(destination.DestinationDefinition.Name, destination.Config, job.upload.UseRudderStorage)
fm, err := filemanager.New(&filemanager.Settings{Provider: storageProvider, Config: destination.Config})
if err != nil {
return err
}

Check warning on line 442 in warehouse/router/upload.go

View check run for this annotation

Codecov / codecov/patch

warehouse/router/upload.go#L441-L442

Added lines #L441 - L442 were not covered by tests
stagingFileKeys := make([]string, len(job.stagingFiles))
for i, file := range job.stagingFiles {
stagingFileKeys[i] = fm.GetDownloadKeyFromFileLocation(file.Location)
}
if err = fm.Delete(job.ctx, stagingFileKeys); err != nil {
return err
}

Check warning on line 449 in warehouse/router/upload.go

View check run for this annotation

Codecov / codecov/patch

warehouse/router/upload.go#L448-L449

Added lines #L448 - L449 were not covered by tests
loadingFiles, err := job.loadFilesRepo.GetByStagingFiles(job.ctx, job.stagingFileIDs)
if err != nil {
return err
}

Check warning on line 453 in warehouse/router/upload.go

View check run for this annotation

Codecov / codecov/patch

warehouse/router/upload.go#L452-L453

Added lines #L452 - L453 were not covered by tests
loadingFileKeys := make([]string, len(loadingFiles))
for i, file := range loadingFiles {
loadingFileKeys[i] = fm.GetDownloadKeyFromFileLocation(file.Location)
}
if err = fm.Delete(job.ctx, loadingFileKeys); err != nil {
return err
}

Check warning on line 460 in warehouse/router/upload.go

View check run for this annotation

Codecov / codecov/patch

warehouse/router/upload.go#L459-L460

Added lines #L459 - L460 were not covered by tests
}
return nil
}

// CanAppend returns true if:
// * the source is not an ETL source
// * the source is not a replay source
Expand Down

0 comments on commit 47e7504

Please sign in to comment.