-
Notifications
You must be signed in to change notification settings - Fork 40
/
Copy pathfiles.go
143 lines (122 loc) · 2.9 KB
/
files.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package main
import (
"encoding/hex"
"flag"
"fmt"
"io"
"log"
"os"
"path/filepath"
"strings"
"time"
)
var maxStorageString = flag.String("maxSize", "",
"Approximate maximum amount of space to allocate")
var maxStorage int64
func hashFilename(base, hstr string) string {
pathBelowBase := filepath.Clean("/" + hstr[:2] + "/" + hstr)
return base + pathBelowBase
}
type ReadSeekCloser interface {
io.ReadSeeker
io.Closer
}
func openLocalBlob(hstr string) (ReadSeekCloser, error) {
return os.Open(hashFilename(*root, hstr))
}
func removeObject(h string) error {
err := maybeRemoveBlobOwnership(h)
if err == nil {
err = os.Remove(hashFilename(*root, h))
log.Printf("Removed local copy of %v, result=%v",
h, errorOrSuccess(err))
}
return err
}
func forceRemoveObject(h string) error {
removeBlobOwnershipRecord(h, serverId)
return os.Remove(hashFilename(*root, h))
}
func verifyObjectHash(h string) error {
f, err := openLocalBlob(h)
if err != nil {
return err
}
defer f.Close()
sh := getHash()
_, err = io.Copy(sh, f)
if err != nil {
return err
}
hstring := hex.EncodeToString(sh.Sum([]byte{}))
if h != hstring {
err = forceRemoveObject(h)
log.Printf("Removed corrupt file from disk: %v (was %v), result=%v",
h, hstring, errorOrSuccess(err))
return fmt.Errorf("Hash from disk of %v was %v", h, hstring)
}
return nil
}
func shouldVerifyObject(h string) bool {
b, err := getBlobOwnership(h)
if err != nil {
return true
}
// True if we haven't checked the object in 30 days.
return b.Nodes[serverId].Add(globalConfig.ReconcileAge).Before(time.Now())
}
func verifyWorker(ch chan os.FileInfo) {
nl, err := findAllNodes()
if err != nil {
log.Printf("Couldn't find node list during verify: %v", err)
nl = NodeList{}
}
for info := range ch {
var err error
force := false
if shouldVerifyObject(info.Name()) {
err = verifyObjectHash(info.Name())
force = true
}
if err == nil {
recordBlobOwnership(info.Name(), info.Size(), force)
} else {
log.Printf("Invalid hash for object %v found at verification: %v",
info.Name(), err)
removeBlobOwnershipRecord(info.Name(), serverId)
if len(nl) > 0 {
salvageBlob(info.Name(), "", 1, nl)
}
}
}
}
func quickVerifyWorker(ch chan os.FileInfo) {
for info := range ch {
recordBlobOwnership(info.Name(), info.Size(), false)
}
}
func reconcileWith(wf func(chan os.FileInfo)) error {
explen := getHash().Size() * 2
vch := make(chan os.FileInfo)
defer close(vch)
for i := 0; i < *verifyWorkers; i++ {
go wf(vch)
}
return filepath.Walk(*root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() && !strings.HasPrefix(info.Name(), "tmp") &&
len(info.Name()) == explen {
vch <- info
return err
}
return nil
})
}
func reconcile() error {
return reconcileWith(verifyWorker)
}
func quickReconcile() error {
return reconcileWith(quickVerifyWorker)
}