Skip to content

Commit

Permalink
feat: delete staging and load files from object storage post successf…
Browse files Browse the repository at this point in the history
…ul sync
  • Loading branch information
shekhar-rudder committed Jan 14, 2025
1 parent e1b756b commit ad4fab8
Showing 1 changed file with 29 additions and 0 deletions.
29 changes: 29 additions & 0 deletions warehouse/router/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cenkalti/backoff/v4"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

Expand Down Expand Up @@ -99,6 +100,7 @@ type UploadJob struct {
maxParallelLoadsWorkspaceIDs map[string]interface{}
columnsBatchSize int
longRunningUploadStatThresholdInMin time.Duration
cleanupStorageFiles bool
}

errorHandler ErrorHandler
Expand Down Expand Up @@ -198,6 +200,7 @@ func (f *UploadJobFactory) NewUploadJob(ctx context.Context, dto *model.UploadJo
uj.config.minUploadBackoff = f.conf.GetDurationVar(60, time.Second, "Warehouse.minUploadBackoff", "Warehouse.minUploadBackoffInS")
uj.config.maxUploadBackoff = f.conf.GetDurationVar(1800, time.Second, "Warehouse.maxUploadBackoff", "Warehouse.maxUploadBackoffInS")
uj.config.retryTimeWindow = f.conf.GetDurationVar(180, time.Minute, "Warehouse.retryTimeWindow", "Warehouse.retryTimeWindowInMins")
uj.config.cleanupStorageFiles = f.conf.GetBool("Warehouse.cleanupStorageFiles", true)

uj.stats.uploadTime = uj.timerStat("upload_time")
uj.stats.userTablesLoadTime = uj.timerStat("user_tables_load_time")
Expand Down Expand Up @@ -364,6 +367,32 @@ func (job *UploadJob) run() (err error) {
break
}
newStatus = nextUploadState.completed
if job.config.cleanupStorageFiles {
warehouse := job.warehouse
storageProvider := whutils.ObjectStorageType(warehouse.Destination.DestinationDefinition.Name, warehouse.Destination.Config, job.upload.UseRudderStorage)
fm, err := filemanager.New(&filemanager.Settings{Provider: storageProvider, Config: warehouse.Destination.Config})
if err != nil {
break
}
stagingFileIds := make([]string, len(job.stagingFileIDs))
for i, num := range job.stagingFileIDs {
stagingFileIds[i] = strconv.FormatInt(num, 10)
}
if err = fm.Delete(job.ctx, stagingFileIds); err != nil {
break
}
loadingFiles, err := job.loadFilesRepo.GetByStagingFiles(job.ctx, job.stagingFileIDs)
if err != nil {
break
}
loadingFileIds := make([]string, len(loadingFiles))
for i, loadingFile := range loadingFiles {
loadingFileIds[i] = strconv.FormatInt(loadingFile.ID, 10)
}
if err = fm.Delete(job.ctx, loadingFileIds); err != nil {
break
}
}

default:
// If unknown state, start again
Expand Down

0 comments on commit ad4fab8

Please sign in to comment.