Skip to content

Commit

Permalink
Latest purge changes
Browse files Browse the repository at this point in the history
  • Loading branch information
dimm0 committed Feb 6, 2019
1 parent 7d737ec commit bc6bc6c
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 12 deletions.
9 changes: 9 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,13 +488,22 @@ var (
pathPurgeParam = purgeCommand.Arg("path", "The path to scan, relative to the mount").Required().String()
)

var purgeQueue = make(chan string, 1000000)

func main() {
rand.Seed(time.Now().UTC().UnixNano())

switch kingpin.MustParse(app.Parse(os.Args[1:])) {
case worker.FullCommand():
readWorkerConfig()

go func() {
file, _ := os.OpenFile("/tmp/purge_queue", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660);
for f := range purgeQueue {
file.WriteString(f+"\n")
}
}()

var checkMountpoints []string

c := make(chan os.Signal, 1)
Expand Down
35 changes: 23 additions & 12 deletions workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@ func processFiles(fromDataStore storage_backend, toDataStore storage_backend, ta
}
}
case "purge":
skipped := 0

for _, filepath := range taskStruct.ItemPath {
logger.Debugf("Processing %s", filepath)
sourceFileMeta, err := fromDataStore.GetMetadata(filepath)
Expand All @@ -328,25 +330,34 @@ func processFiles(fromDataStore storage_backend, toDataStore storage_backend, ta
continue
}

skipped := 0

switch mode := sourceFileMeta.Mode(); {
case mode.IsRegular():
sourceMtime := sourceFileMeta.ModTime()
sourceStat := sourceFileMeta.Sys().(*syscall.Stat_t)
sourceAtime := getAtime(sourceStat)

if fromDataStore.GetPurgeFilesOlder() > 0 && time.Since(sourceMtime).Hours()*24 > float64(fromDataStore.GetPurgeFilesOlder()) {
logger.Infof("Purging the file %s", filepath)
if fromDataStore.GetPurgeFilesOlder() > 0 && (time.Since(sourceAtime).Hours()/24.0 > float64(fromDataStore.GetPurgeFilesOlder())) {
//logger.Error(filepath)
if err := fromDataStore.Remove(filepath); err != nil {
logger.Errorf("Error deleting file %s: %s", filepath, err.Error())
}
atomic.AddUint64(&FilesRemovedCount, 1)
continue
} else {
atomic.AddUint64(&FilesSkippedCount, 1)
skipped++
}

atomic.AddUint64(&FilesSkippedCount, 1)
skipped++;
default: {
skipped++
}
}

}

if skipped == 0 { // dir is empty now
if len(strings.Split(filepath, string(os.PathSeparator))) > 3 {
logger.Infof("Purging now empty folder %s", path.Dir(filepath))
if skipped == 0 && len(taskStruct.ItemPath) > 0 { // dir is now empty
if len(strings.Split(taskStruct.ItemPath[0], string(os.PathSeparator))) > 3 {
//purgeQueue <- "Dir "+path.Dir(taskStruct.ItemPath[0])
//logger.Error("Dir "+path.Dir(taskStruct.ItemPath[0]))
if err := fromDataStore.Remove(path.Dir(taskStruct.ItemPath[0])); err != nil {
logger.Errorf("Error deleting folder %s: %s", path.Dir(taskStruct.ItemPath[0]), err.Error())
}
}
}
Expand Down

0 comments on commit bc6bc6c

Please sign in to comment.