Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: delete staging and load files post successful sync #5428

Merged
merged 5 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 49 additions & 19 deletions integration_test/warehouse/warehouse_test.go
achettyiitr marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ func TestMain(m *testing.M) {

func TestUploads(t *testing.T) {
t.Run("tracks loading", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false, nil, nil)
db, minioResource, whClient, bcConfig := setupServer(t, false, func(m map[string]backendconfig.ConfigT, _ *minio.Resource) {
m[workspaceID].Sources[0].Destinations[0].Config["cleanupObjectStorageFiles"] = true
}, nil)

var (
ctx = context.Background()
Expand All @@ -95,6 +97,7 @@ func TestUploads(t *testing.T) {
uuid.New().String(),
)
}), "\n")
useRudderStorage := false

require.NoError(t, whClient.Process(ctx, whclient.StagingFile{
WorkspaceID: workspaceID,
Expand All @@ -104,7 +107,7 @@ func TestUploads(t *testing.T) {
TotalEvents: events,
FirstEventAt: time.Now().Format(misc.RFC3339Milli),
LastEventAt: time.Now().Add(time.Minute * 30).Format(misc.RFC3339Milli),
UseRudderStorage: false,
UseRudderStorage: useRudderStorage,
DestinationRevisionID: destinationID,
Schema: map[string]map[string]any{
"tracks": {
Expand Down Expand Up @@ -132,9 +135,11 @@ func TestUploads(t *testing.T) {
{A: "status", B: exportedData},
}...)
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "tracks"), events)
// All the files should be deleted from the object storage since cleanupObjectStorageFiles is set to true
requireObjectStorageCount(t, ctx, bcConfig, useRudderStorage, 0)
})
t.Run("user and identifies loading", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false, nil, nil)
db, minioResource, whClient, bcConfig := setupServer(t, false, nil, nil)

ctx := context.Background()
events := 100
Expand All @@ -153,6 +158,7 @@ func TestUploads(t *testing.T) {
})
eventsPayload := strings.Join(append(append([]string{}, userEvents...), identifyEvents...), "\n")

useRudderStorage := false
require.NoError(t, whClient.Process(ctx, whclient.StagingFile{
WorkspaceID: workspaceID,
SourceID: sourceID,
Expand All @@ -161,7 +167,7 @@ func TestUploads(t *testing.T) {
TotalEvents: events,
FirstEventAt: time.Now().Format(misc.RFC3339Milli),
LastEventAt: time.Now().Add(time.Minute * 30).Format(misc.RFC3339Milli),
UseRudderStorage: false,
UseRudderStorage: useRudderStorage,
DestinationRevisionID: destinationID,
Schema: map[string]map[string]any{
"users": {
Expand Down Expand Up @@ -194,10 +200,13 @@ func TestUploads(t *testing.T) {
}...)
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "users"), events)
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "identifies"), events)
// Files should not be deleted from the object storage since cleanupObjectStorageFiles is not set to true
// 1 staging file + 2 load files (users and identifies)
requireObjectStorageCount(t, ctx, bcConfig, useRudderStorage, 3)
})
t.Run("schema change", func(t *testing.T) {
t.Run("add columns", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false, nil, nil)
db, minioResource, whClient, _ := setupServer(t, false, nil, nil)

ctx := context.Background()
events := 100
Expand Down Expand Up @@ -607,7 +616,7 @@ func TestUploads(t *testing.T) {
})
t.Run("reports", func(t *testing.T) {
t.Run("succeeded", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false, nil, nil)
db, minioResource, whClient, _ := setupServer(t, false, nil, nil)

ctx := context.Background()
events := 100
Expand Down Expand Up @@ -668,7 +677,7 @@ func TestUploads(t *testing.T) {
}...)
})
t.Run("aborted", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false,
db, minioResource, whClient, _ := setupServer(t, false,
func(m map[string]backendconfig.ConfigT, _ *minio.Resource) {
m[workspaceID].Sources[0].Destinations[0].Config["port"] = "5432"
},
Expand Down Expand Up @@ -740,7 +749,7 @@ func TestUploads(t *testing.T) {
})
})
t.Run("retries then aborts", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false,
db, minioResource, whClient, _ := setupServer(t, false,
func(m map[string]backendconfig.ConfigT, _ *minio.Resource) {
m[workspaceID].Sources[0].Destinations[0].Config["port"] = "5432"
},
Expand Down Expand Up @@ -797,7 +806,7 @@ func TestUploads(t *testing.T) {
}...)
})
t.Run("discards", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false, nil, nil)
db, minioResource, whClient, _ := setupServer(t, false, nil, nil)

ctx := context.Background()
events := 100
Expand Down Expand Up @@ -856,7 +865,7 @@ func TestUploads(t *testing.T) {
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "rudder_discards"), events/2)
})
t.Run("archiver", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false, nil,
db, minioResource, whClient, _ := setupServer(t, false, nil,
func(minioResource *minio.Resource) []lo.Tuple2[string, any] {
return []lo.Tuple2[string, any]{
{A: "Warehouse.archiveUploadRelatedRecords", B: true},
Expand Down Expand Up @@ -923,7 +932,7 @@ func TestUploads(t *testing.T) {
})
t.Run("sync behaviour", func(t *testing.T) {
t.Run("default behaviour", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false, nil, nil)
db, minioResource, whClient, _ := setupServer(t, false, nil, nil)

ctx := context.Background()
events := 100
Expand Down Expand Up @@ -998,7 +1007,7 @@ func TestUploads(t *testing.T) {
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "tracks"), events)
})
t.Run("allowMerge=false,preferAppend=false", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false, nil,
db, minioResource, whClient, _ := setupServer(t, false, nil,
func(_ *minio.Resource) []lo.Tuple2[string, any] {
return []lo.Tuple2[string, any]{
{A: "Warehouse.postgres.allowMerge", B: false},
Expand Down Expand Up @@ -1079,7 +1088,7 @@ func TestUploads(t *testing.T) {
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "tracks"), events*2)
})
t.Run("allowMerge=true,preferAppend=true", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, true, nil,
db, minioResource, whClient, _ := setupServer(t, true, nil,
func(_ *minio.Resource) []lo.Tuple2[string, any] {
return []lo.Tuple2[string, any]{
{A: "Warehouse.postgres.allowMerge", B: true},
Expand Down Expand Up @@ -1160,7 +1169,7 @@ func TestUploads(t *testing.T) {
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "tracks"), events*2)
})
t.Run("allowMerge=false,preferAppend=true", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, true, nil,
db, minioResource, whClient, _ := setupServer(t, true, nil,
func(_ *minio.Resource) []lo.Tuple2[string, any] {
return []lo.Tuple2[string, any]{
{A: "Warehouse.postgres.allowMerge", B: false},
Expand Down Expand Up @@ -1241,7 +1250,7 @@ func TestUploads(t *testing.T) {
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "tracks"), events*2)
})
t.Run("allowMerge=false,preferAppend=true,isSourceETL=true", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, true, nil,
db, minioResource, whClient, _ := setupServer(t, true, nil,
func(_ *minio.Resource) []lo.Tuple2[string, any] {
return []lo.Tuple2[string, any]{
{A: "Warehouse.postgres.allowMerge", B: false},
Expand Down Expand Up @@ -1325,7 +1334,7 @@ func TestUploads(t *testing.T) {
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "tracks"), events*2)
})
t.Run("allowMerge=false,preferAppend=true,IsReplaySource=true", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, true,
db, minioResource, whClient, _ := setupServer(t, true,
func(m map[string]backendconfig.ConfigT, _ *minio.Resource) {
m[workspaceID].Sources[0].OriginalID = sourceID
},
Expand Down Expand Up @@ -1409,7 +1418,7 @@ func TestUploads(t *testing.T) {
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "tracks"), events*2)
})
t.Run("allowMerge=false,preferAppend=true,sourceCategory=cloud", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, true,
db, minioResource, whClient, _ := setupServer(t, true,
func(m map[string]backendconfig.ConfigT, _ *minio.Resource) {
m[workspaceID].Sources[0].SourceDefinition.Category = "cloud"
},
Expand Down Expand Up @@ -1494,7 +1503,7 @@ func TestUploads(t *testing.T) {
})
})
t.Run("id resolution", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false, nil, nil)
db, minioResource, whClient, _ := setupServer(t, false, nil, nil)

ctx := context.Background()
events := 100
Expand Down Expand Up @@ -2330,6 +2339,26 @@ func requireDownstreamEventsCount(
)
}

func requireObjectStorageCount(t testing.TB, ctx context.Context, bcConfig map[string]backendconfig.ConfigT, useRudderStorage bool, expectedCount int) {
destination := bcConfig[workspaceID].Sources[0].Destinations[0]
storageProvider := whutils.ObjectStorageType(destination.DestinationDefinition.Name, destination.Config, useRudderStorage)
fm, err := filemanager.New(&filemanager.Settings{Provider: storageProvider, Config: destination.Config})
require.NoError(t, err)
fileIter := fm.ListFilesWithPrefix(ctx, "", fm.Prefix(), 1000)
files := make([]string, 0)
for {
fileInfo, err := fileIter.Next()
require.NoError(t, err)
if len(fileInfo) == 0 {
break
}
for _, file := range fileInfo {
files = append(files, file.Key)
}
}
require.Len(t, files, expectedCount)
}
achettyiitr marked this conversation as resolved.
Show resolved Hide resolved

func requireReportsCount(
t testing.TB,
ctx context.Context,
Expand Down Expand Up @@ -2372,6 +2401,7 @@ func setupServer(
*sqlmw.DB,
*minio.Resource,
*whclient.Warehouse,
map[string]backendconfig.ConfigT,
) {
t.Helper()

Expand Down Expand Up @@ -2411,5 +2441,5 @@ func setupServer(
serverURL := fmt.Sprintf("http://localhost:%d", webPort)
health.WaitUntilReady(ctx, t, serverURL+"/health", time.Second*30, 100*time.Millisecond, t.Name())

return sqlmw.New(pgResource.DB), minioResource, whclient.NewWarehouse(serverURL)
return sqlmw.New(pgResource.DB), minioResource, whclient.NewWarehouse(serverURL), bcConfig
}
35 changes: 35 additions & 0 deletions warehouse/router/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"github.com/cenkalti/backoff/v4"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

Expand Down Expand Up @@ -363,6 +364,9 @@
if err = job.exportData(); err != nil {
break
}
if err = job.cleanupObjectStorageFiles(); err != nil {
break

Check warning on line 368 in warehouse/router/upload.go

View check run for this annotation

Codecov / codecov/patch

warehouse/router/upload.go#L368

Added line #L368 was not covered by tests
}
Comment on lines +368 to +370

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shekhar-rudder Say if this fails. Would the job be retried? If yes, in next retry would it continue from just cleanupObjectStorageFiles?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shekhar-rudder @achettyiitr Also from PRD doc, did we close on failing the job or considering metric and internal alert for now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RanjeetMishra If the cleanup fails, the job will be marked as failed. On the next retry, I would assume it will proceed as it would for any other error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since now it is config based deletion, I think its fine to mark the job as failed. If customer doesn't want the syncs to error out, they can always turn it off.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems fine if we are aligning with marking sync jobs as failed. Regarding destination config for old and new destinations lets finalize tomorrow. This PR is good to go in that case.

newStatus = nextUploadState.completed

default:
Expand Down Expand Up @@ -427,6 +431,37 @@
return nil
}

func (job *UploadJob) cleanupObjectStorageFiles() error {
destination := job.warehouse.Destination
cleanupObjectStorageFiles, _ := destination.Config["cleanupObjectStorageFiles"].(bool)
achettyiitr marked this conversation as resolved.
Show resolved Hide resolved
if cleanupObjectStorageFiles {
storageProvider := whutils.ObjectStorageType(destination.DestinationDefinition.Name, destination.Config, job.upload.UseRudderStorage)
fm, err := filemanager.New(&filemanager.Settings{Provider: storageProvider, Config: destination.Config})
if err != nil {
return err
}

Check warning on line 442 in warehouse/router/upload.go

View check run for this annotation

Codecov / codecov/patch

warehouse/router/upload.go#L441-L442

Added lines #L441 - L442 were not covered by tests
achettyiitr marked this conversation as resolved.
Show resolved Hide resolved
stagingFileKeys := make([]string, len(job.stagingFiles))
for i, file := range job.stagingFiles {
stagingFileKeys[i] = fm.GetDownloadKeyFromFileLocation(file.Location)
}
if err = fm.Delete(job.ctx, stagingFileKeys); err != nil {
return err
}

Check warning on line 449 in warehouse/router/upload.go

View check run for this annotation

Codecov / codecov/patch

warehouse/router/upload.go#L448-L449

Added lines #L448 - L449 were not covered by tests
loadingFiles, err := job.loadFilesRepo.GetByStagingFiles(job.ctx, job.stagingFileIDs)
if err != nil {
return err
}

Check warning on line 453 in warehouse/router/upload.go

View check run for this annotation

Codecov / codecov/patch

warehouse/router/upload.go#L452-L453

Added lines #L452 - L453 were not covered by tests
loadingFileKeys := make([]string, len(loadingFiles))
for i, file := range loadingFiles {
loadingFileKeys[i] = fm.GetDownloadKeyFromFileLocation(file.Location)
}
if err = fm.Delete(job.ctx, loadingFileKeys); err != nil {
return err
}

Check warning on line 460 in warehouse/router/upload.go

View check run for this annotation

Codecov / codecov/patch

warehouse/router/upload.go#L459-L460

Added lines #L459 - L460 were not covered by tests
achettyiitr marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}
achettyiitr marked this conversation as resolved.
Show resolved Hide resolved

// CanAppend returns true if:
// * the source is not an ETL source
// * the source is not a replay source
Expand Down
Loading