Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
coanor committed Nov 10, 2023
1 parent 7df071a commit a92c749
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 55 deletions.
2 changes: 1 addition & 1 deletion diskcache/diskcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
dataHeaderLen = 4

// EOFHint labels a file's end.
EOFHint = uint32(0xdeadbeef)
EOFHint = uint32(0xdeadbeef) // Deprecated
)

// Generic diskcache errors.
Expand Down
52 changes: 21 additions & 31 deletions diskcache/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@ import (
// Fn is the handler to eat cache from diskcache.
type Fn func([]byte) error

func (c *DiskCache) skipBadFile() error {
defer func() {
droppedBatchVec.WithLabelValues(c.path, reasonBadDataFile).Inc()
}()

if err := c.removeCurrentReadingFile(); err != nil {
return fmt.Errorf("removeCurrentReadingFile: %w", err)
func (c *DiskCache) switchNextFile() error {
if c.curReadfile != "" {
if err := c.removeCurrentReadingFile(); err != nil {
return fmt.Errorf("removeCurrentReadingFile: %w", err)
}
}

// clear .pos
Expand All @@ -32,15 +30,15 @@ func (c *DiskCache) skipBadFile() error {
}

// reopen next file to read
if err := c.switchNextFile(); err != nil {
return err
}
return c.doSwitchNextFile()
}

// swith to next file: to skip the bad file.
if err := c.switchNextFile(); err != nil {
return fmt.Errorf("switch to next file on bad file: %w", err)
}
return nil
func (c *DiskCache) skipBadFile() error {
defer func() {
droppedBatchVec.WithLabelValues(c.path, reasonBadDataFile).Inc()
}()

return c.switchNextFile()
}

// Get fetch new data from disk cache, then passing to fn
Expand Down Expand Up @@ -96,33 +94,25 @@ retry:

hdr := make([]byte, dataHeaderLen)
if n, err = c.rfd.Read(hdr); err != nil || n != dataHeaderLen {
//
// On bad datafile, just ignore and delete the file.
//
if err = c.skipBadFile(); err != nil {
return err
}

goto retry // read next new file
goto retry // read next new file to save another Get() calling.
}

// how many bytes of current data?
nbytes = int(binary.LittleEndian.Uint32(hdr[0:]))

if uint32(nbytes) == EOFHint { // EOF
if err = c.removeCurrentReadingFile(); err != nil {
return fmt.Errorf("removeCurrentReadingFile: %w", err)
}

// clear .pos
if !c.noPos {
if err = c.pos.reset(); err != nil {
return err
}
}

// reopen next file to read
if err = c.switchNextFile(); err != nil {
return err
if err := c.switchNextFile(); err != nil {
return fmt.Errorf("switchNextFile: %w", err)
}

goto retry // read next new file
goto retry // read next new file to save another Get() calling.
}

databuf := make([]byte, nbytes)
Expand Down
39 changes: 22 additions & 17 deletions diskcache/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,39 @@ func TestDropInvalidDataFile(t *T.T) {
c, err := Open(WithPath(p))
require.NoError(t, err)

// put some data
// put some data and rotate 10 datafiles
data := make([]byte, 100)
assert.NoError(t, c.Put(data))
assert.NoError(t, c.rotate())

// rotate 2 data file
assert.NoError(t, c.Put(data))
assert.NoError(t, c.rotate())
for i := 0; i < 10; i++ {
assert.NoError(t, c.Put(data))
assert.NoError(t, c.rotate())

// distroy 1st data file
assert.NoError(t, os.Truncate(c.dataFiles[0], 0))
// destroy the datafile
if i%2 == 0 {
assert.NoError(t, os.Truncate(c.dataFiles[i], 0))
}
}

t.Logf("%q", c.dataFiles)
assert.Len(t, c.datafiles, 10)

assert.NoError(t, c.Get(func(get []byte) error {
// switch to 2nd file
assert.Len(t, c.dataFiles, 1)
assert.Equal(t, data, get)
for {
err := c.Get(func(get []byte) error {
// switch to 2nd file
assert.Equal(t, data, get)
return nil
})

return nil
}))
if err != nil {
require.ErrorIs(t, err, ErrEOF)
break
}
}

reg := prometheus.NewRegistry()
register(reg)
mfs, err := reg.Gather()
require.NoError(t, err)

assert.Equalf(t, float64(1),
assert.Equalf(t, float64(5),
metrics.GetMetricOnLabels(mfs,
"diskcache_dropped_total",
c.path,
Expand Down
4 changes: 3 additions & 1 deletion diskcache/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ func WithBatchSize(size int64) CacheOption {
}

// WithMaxDataSize set max single data size, default 32MB.
// NOTE: the max size of single data is 0xdeadbeef(~ 3.47GB), if size
// larger than 0xdeadbeef, reset to default value(32MB).
func WithMaxDataSize(size int32) CacheOption {
return func(c *DiskCache) {
if size > 0 {
if size > 0 && uint32(size) < EOFHint /* about 3.47GB */ {
c.maxDataSize = size
}
}
Expand Down
8 changes: 4 additions & 4 deletions diskcache/rotate.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,14 @@ func (c *DiskCache) removeCurrentReadingFile() error {
c.rfd = nil
}

if fi, err := os.Stat(c.curReadfile); err == nil {
if fi, err := os.Stat(c.curReadfile); err == nil { // file exist
if fi.Size() > dataHeaderLen {
c.size -= (fi.Size() - dataHeaderLen) // EOF bytes do not counted in size
}
}

if err := os.Remove(c.curReadfile); err != nil {
return fmt.Errorf("removeCurrentReadingFile: %s: %w", c.curReadfile, err)
if err := os.Remove(c.curReadfile); err != nil {
return fmt.Errorf("removeCurrentReadingFile: %q: %w", c.curReadfile, err)
}
}

if len(c.dataFiles) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion diskcache/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (c *DiskCache) loadUnfinishedFile() error {
}

// open next read file.
func (c *DiskCache) switchNextFile() error {
func (c *DiskCache) doSwitchNextFile() error {
c.rwlock.Lock()
defer c.rwlock.Unlock()

Expand Down

0 comments on commit a92c749

Please sign in to comment.