-
Notifications
You must be signed in to change notification settings - Fork 0
/
ops.go
134 lines (110 loc) · 2.95 KB
/
ops.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package barrel
import (
"bytes"
"fmt"
"hash/crc32"
"time"
"github.com/deepgolani4/LogVaultDB/internal/datafile/internal/datafile"
)
func (b *Barrel) get(k string) (Record, error) {
// Check for entry in KeyDir.
meta, ok := b.keydir[k]
if !ok {
return Record{}, ErrNoKey
}
var (
// Header object for decoding the binary data into it.
header Header
reader *datafile.DataFile
)
// Set the current file ID as the default.
reader = b.df
// Check if the ID is different from the current ID.
if meta.FileID != b.df.ID() {
reader, ok = b.stale[meta.FileID]
if !ok {
return Record{}, fmt.Errorf("error looking up for the db file for the given id: %d", meta.FileID)
}
}
// Read the file with the given offset.
data, err := reader.Read(meta.RecordPos, meta.RecordSize)
if err != nil {
return Record{}, fmt.Errorf("error reading data from file: %v", err)
}
// Decode the header.
if err := header.decode(data); err != nil {
return Record{}, fmt.Errorf("error decoding header: %v", err)
}
var (
// Get the offset position in record to start reading the value from.
valPos = meta.RecordSize - int(header.ValSize)
// Read the value from the record.
val = data[valPos:]
)
record := Record{
Header: header,
Key: k,
Value: val,
}
return record, nil
}
func (b *Barrel) put(df *datafile.DataFile, k string, val []byte, expiry *time.Time) error {
// Prepare header.
header := Header{
Checksum: crc32.ChecksumIEEE(val),
Timestamp: uint32(time.Now().Unix()),
KeySize: uint32(len(k)),
ValSize: uint32(len(val)),
}
// Check for expiry.
if expiry != nil {
header.Expiry = uint32(expiry.Unix())
} else {
header.Expiry = 0
}
// Prepare the record.
record := Record{
Key: k,
Value: val,
}
// Get the buffer from the pool for writing data.
buf := b.bufPool.Get().(*bytes.Buffer)
defer b.bufPool.Put(buf)
// Resetting the buffer is important since the length of bytes written should be reset on each `set` operation.
defer buf.Reset()
// Encode header.
header.encode(buf)
// Write key/value.
buf.WriteString(k)
buf.Write(val)
// Append to underlying file.
offset, err := df.Write(buf.Bytes())
if err != nil {
return fmt.Errorf("error writing data to file: %v", err)
}
// Add entry to KeyDir.
// We just save the value of key and some metadata for faster lookups.
// The value is only stored in disk.
b.keydir[k] = Meta{
Timestamp: int(record.Header.Timestamp),
RecordSize: len(buf.Bytes()),
RecordPos: offset + len(buf.Bytes()),
FileID: df.ID(),
}
// Ensure filesystem's in memory buffer is flushed to disk.
if b.opts.alwaysFSync {
if err := df.Sync(); err != nil {
return fmt.Errorf("error syncing file to disk: %v", err)
}
}
return nil
}
func (b *Barrel) delete(k string) error {
// Store an empty tombstone value for the given key.
if err := b.put(b.df, k, []byte{}, nil); err != nil {
return err
}
// Delete it from the map as well.
delete(b.keydir, k)
return nil
}