-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathdb.go
146 lines (131 loc) · 4.38 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package go_lsm
import (
"errors"
"fmt"
"go-lsm/compact"
"go-lsm/future"
"go-lsm/kv"
"go-lsm/state"
"go-lsm/txn"
"log/slog"
"sync/atomic"
"time"
)
var DbAlreadyStoppedErr = errors.New("db is stopped, can not perform the operation")
// Db represents the key/value database (/storage engine).
type Db struct {
storageState *state.StorageState
oracle *txn.Oracle
stopped atomic.Bool
stopChannel chan struct{}
}
// KeyValue is an abstraction which contains a key/value pair.
// It is returned from the Scan operation.
type KeyValue struct {
Key []byte
Value []byte
}
// Open opens the database (either new or existing) and creates a new instance of key/value Db.
func Open(options state.StorageOptions) (*Db, error) {
storageState, err := state.NewStorageStateWithOptions(options)
if err != nil {
return nil, err
}
db := &Db{
storageState: storageState,
oracle: txn.NewOracleWithLastCommitTimestamp(txn.NewExecutor(storageState), storageState.LastCommitTimestamp()),
stopChannel: make(chan struct{}),
}
db.startCompaction()
return db, nil
}
// Read supports read operation by passing an instance of txn.Transaction (via txn.NewReadonlyTransaction) to the callback.
// The passed transaction is a Readonly txn.Transaction which will panic on any form of write and commit operations.
func (db *Db) Read(callback func(transaction *txn.Transaction)) error {
if db.stopped.Load() {
return DbAlreadyStoppedErr
}
transaction := txn.NewReadonlyTransaction(db.oracle, db.storageState)
defer db.oracle.FinishBeginTimestamp(transaction)
callback(transaction)
return nil
}
// Write supports writes operation by passing an instance of txn.Transaction via (txn.NewReadwriteTransaction) to the callback.
// The passed transaction is a Readwrite txn.Transaction which supports both read and write operations.
func (db *Db) Write(callback func(transaction *txn.Transaction)) (*future.Future, error) {
if db.stopped.Load() {
return nil, DbAlreadyStoppedErr
}
transaction := txn.NewReadwriteTransaction(db.oracle, db.storageState)
defer db.oracle.FinishBeginTimestamp(transaction)
callback(transaction)
return transaction.Commit()
}
// Scan supports scan operation by taking an instance of kv.InclusiveKeyRange.
// It returns a slice of KeyValue in increasing order, if no error occurs.
// This implementation only supports kv.InclusiveKeyRange, there is no support for Open and HalfOpen ranges.
func (db *Db) Scan(keyRange kv.InclusiveKeyRange[kv.RawKey]) ([]KeyValue, error) {
if db.stopped.Load() {
return nil, DbAlreadyStoppedErr
}
transaction := txn.NewReadonlyTransaction(db.oracle, db.storageState)
defer db.oracle.FinishBeginTimestamp(transaction)
iterator, err := transaction.Scan(keyRange)
if err != nil {
return nil, err
}
defer iterator.Close()
var keyValuePairs []KeyValue
for iterator.IsValid() {
keyValuePairs = append(keyValuePairs, KeyValue{
Key: iterator.Key().RawBytes(),
Value: iterator.Value().Bytes(),
})
err := iterator.Next()
if err != nil {
return nil, err
}
}
return keyValuePairs, nil
}
// Close closes the database.
// It involves:
// 1. Closing txn.Oracle.
// 2. Closing state.StorageState.
func (db *Db) Close() {
if db.stopped.CompareAndSwap(false, true) {
db.oracle.Close()
db.storageState.Close()
close(db.stopChannel)
}
}
// startCompaction start the compaction goroutine.
// It attempts to perform compaction at fixed intervals.
// If compaction happens between 2 levels, it returns a state.StorageStateChangeEvent,
// which is then applied to state.StorageState.
func (db *Db) startCompaction() {
go func() {
compactionTimer := time.NewTimer(db.storageState.Options().CompactionOptions.Duration)
defer compactionTimer.Stop()
compaction := compact.NewCompaction(db.oracle, db.storageState.SSTableIdGenerator(), db.storageState.Options())
for {
select {
case <-compactionTimer.C:
storageStateChangeEvent, err := compaction.Start(db.storageState.Snapshot())
if err != nil {
slog.Error(fmt.Sprintf("error in starting compaction %v", err))
return
}
if storageStateChangeEvent.HasAnyChanges() {
if err := db.storageState.Apply(storageStateChangeEvent, false); err != nil {
slog.Error(fmt.Sprintf("error in apply state change event %v", err))
return
}
}
compactionTimer.Reset(db.storageState.Options().CompactionOptions.Duration)
case <-db.stopChannel:
return
}
}
}()
}