This repository has been archived by the owner on Sep 22, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 172
/
Copy pathfile_cache.go
133 lines (117 loc) · 2.64 KB
/
file_cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package torus
import (
"time"
"golang.org/x/net/context"
)
type fileCache interface {
newINode(ref INodeRef)
writeToBlock(ctx context.Context, i, from, to int, data []byte) (int, error)
getBlock(ctx context.Context, i int) ([]byte, error)
sync(context.Context) error
}
type singleBlockCache struct {
// half-finished blocks
openIdx int
openData []byte
openWrote bool
ref INodeRef
blocks Blockset
readIdx int
readData []byte
blkSize uint64
}
func newSingleBlockCache(bs Blockset, blkSize uint64) *singleBlockCache {
return &singleBlockCache{
readIdx: -1,
openIdx: -1,
blocks: bs,
blkSize: blkSize,
}
}
func (sb *singleBlockCache) newINode(ref INodeRef) {
sb.ref = ref
}
func (sb *singleBlockCache) openBlock(ctx context.Context, i int) error {
if sb.openIdx == i && sb.openData != nil {
return nil
}
if sb.openWrote {
err := sb.sync(ctx)
if err != nil {
return err
}
}
if i == sb.blocks.Length() {
sb.openData = make([]byte, sb.blkSize)
sb.openIdx = i
return nil
}
if i > sb.blocks.Length() {
panic("writing beyond the end of a file without calling Truncate")
}
if sb.readIdx == i {
sb.openIdx = i
sb.openData = sb.readData
sb.readData = nil
sb.readIdx = -1
return nil
}
start := time.Now()
d, err := sb.blocks.GetBlock(ctx, i)
if err != nil {
return err
}
delta := time.Since(start)
promFileBlockRead.Observe(float64(delta.Nanoseconds()) / 1000)
sb.openData = d
sb.openIdx = i
return nil
}
func (sb *singleBlockCache) writeToBlock(ctx context.Context, i, from, to int, data []byte) (int, error) {
if sb.openIdx != i {
err := sb.openBlock(ctx, i)
if err != nil {
return 0, err
}
}
sb.openWrote = true
if (to - from) != len(data) {
panic("server: different write lengths?")
}
return copy(sb.openData[from:to], data), nil
}
func (sb *singleBlockCache) sync(ctx context.Context) error {
if !sb.openWrote {
return nil
}
start := time.Now()
err := sb.blocks.PutBlock(ctx, sb.ref, sb.openIdx, sb.openData)
delta := time.Since(start)
promFileBlockWrite.Observe(float64(delta.Nanoseconds()) / 1000)
sb.openWrote = false
return err
}
func (sb *singleBlockCache) openRead(ctx context.Context, i int) error {
start := time.Now()
d, err := sb.blocks.GetBlock(ctx, i)
if err != nil {
return err
}
delta := time.Since(start)
promFileBlockRead.Observe(float64(delta.Nanoseconds()) / 1000)
sb.readData = d
sb.readIdx = i
return nil
}
func (sb *singleBlockCache) getBlock(ctx context.Context, i int) ([]byte, error) {
if sb.openIdx == i {
return sb.openData, nil
}
if sb.readIdx != i {
err := sb.openRead(ctx, i)
if err != nil {
return nil, err
}
}
return sb.readData, nil
}