Skip to content

Commit

Permalink
feat: add streaming-put
Browse files Browse the repository at this point in the history
  • Loading branch information
coanor committed Sep 26, 2024
1 parent 4f57d3f commit fc06869
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 6 deletions.
7 changes: 7 additions & 0 deletions diskcache/diskcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ var (
// Get on no data cache.
ErrEOF = errors.New("EOF")

// Diskcache full, no data can be write now
ErrCacheFull = errors.New("cache full")

// Invalid cache filename.
ErrInvalidDataFileName = errors.New("invalid datafile name")
ErrInvalidDataFileNameSuffix = errors.New("invalid datafile name suffix")
Expand Down Expand Up @@ -86,6 +89,10 @@ type DiskCache struct {
capacity int64 // capacity of the diskcache
maxDataSize int32 // max data size of single Put()

batchHeader []byte
//streamBuf *bytes.Buffer
streamBuf []byte

// File permission, default 0750/0640
dirPerms,
filePerms os.FileMode
Expand Down
5 changes: 4 additions & 1 deletion diskcache/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"encoding/binary"
"fmt"
"io"
"log"
"time"
)

Expand Down Expand Up @@ -60,7 +61,8 @@ func (c *DiskCache) Get(fn Fn) error {

defer func() {
if uint32(nbytes) != EOFHint {
getBytesVec.WithLabelValues(c.path).Add(float64(nbytes))
getBytesVec.WithLabelValues(c.path).Add(float64(nbytes)) // deprecated
getBytesV2Vec.WithLabelValues(c.path).Observe(float64(nbytes))

// get on EOF not counted as a real Get
getVec.WithLabelValues(c.path).Inc()
Expand Down Expand Up @@ -120,6 +122,7 @@ retry:
if n, err = c.rfd.Read(databuf); err != nil {
return err
} else if n != nbytes {
log.Printf("bad read size: %d != %d", n, nbytes)
return ErrUnexpectedReadSize
}

Expand Down
74 changes: 69 additions & 5 deletions diskcache/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@ import (
)

var (
// deprecated
putVec, getVec, putBytesVec, getBytesVec *prometheus.CounterVec

droppedBatchVec,
droppedBytesVec,
rotateVec,
removeVec,
putVec,
getVec,
putBytesVec,
wakeupVec,
seekBackVec,
getBytesVec *prometheus.CounterVec
seekBackVec *prometheus.CounterVec

sizeVec,
openTimeVec,
Expand All @@ -30,18 +29,41 @@ var (
batchSizeVec,
datafilesVec *prometheus.GaugeVec

putBytesV2Vec,
streamPutVec,
getBytesV2Vec,
getLatencyVec,
putLatencyVec *prometheus.SummaryVec

ns = "diskcache"
)

func setupMetrics() {

streamPutVec = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: ns,
Name: "stream_put",
Help: "Stream put times",
Objectives: map[float64]float64{
0.5: 0.05,
0.9: 0.01,
0.99: 0.001,
},
},
[]string{"path"},
)

getLatencyVec = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: ns,
Name: "get_latency",
Help: "Get() time cost(micro-second)",
Objectives: map[float64]float64{
0.5: 0.05,
0.9: 0.01,
0.99: 0.001,
},
},
[]string{"path"},
)
Expand All @@ -51,6 +73,39 @@ func setupMetrics() {
Namespace: ns,
Name: "put_latency",
Help: "Put() time cost(micro-second)",
Objectives: map[float64]float64{
0.5: 0.05,
0.9: 0.01,
0.99: 0.001,
},
},
[]string{"path"},
)

putBytesV2Vec = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: ns,
Name: "put_bytes",
Help: "Cache Put() bytes",
Objectives: map[float64]float64{
0.5: 0.05,
0.9: 0.01,
0.99: 0.001,
},
},
[]string{"path"},
)

getBytesV2Vec = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: ns,
Name: "get_bytes",
Help: "Cache Get() bytes",
Objectives: map[float64]float64{
0.5: 0.05,
0.9: 0.01,
0.99: 0.001,
},
},
[]string{"path"},
)
Expand Down Expand Up @@ -216,6 +271,7 @@ func setupMetrics() {
)

metrics.MustRegister(
streamPutVec,
droppedBatchVec,
droppedBytesVec,
rotateVec,
Expand All @@ -241,6 +297,7 @@ func setupMetrics() {
// register to specified registry for testing.
func register(reg *prometheus.Registry) {
reg.MustRegister(
streamPutVec,
droppedBatchVec,
droppedBytesVec,
rotateVec,
Expand All @@ -250,6 +307,8 @@ func register(reg *prometheus.Registry) {
wakeupVec,
seekBackVec,
getBytesVec,
putBytesV2Vec,
getBytesV2Vec,

capVec,
batchSizeVec,
Expand All @@ -263,6 +322,7 @@ func register(reg *prometheus.Registry) {

// ResetMetrics used to cleanup exist metrics of diskcache.
func ResetMetrics() {
streamPutVec.Reset()
droppedBatchVec.Reset()
droppedBytesVec.Reset()
rotateVec.Reset()
Expand All @@ -279,6 +339,8 @@ func ResetMetrics() {
datafilesVec.Reset()
getLatencyVec.Reset()
putLatencyVec.Reset()
putBytesV2Vec.Reset()
getBytesV2Vec.Reset()
}

// Labels export cache's labels used to query prometheus metrics.
Expand Down Expand Up @@ -309,6 +371,8 @@ func Metrics() []prometheus.Collector {

getLatencyVec,
putLatencyVec,
getBytesV2Vec,
putBytesV2Vec,
}
}

Expand Down
4 changes: 4 additions & 0 deletions diskcache/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func defaultInstance() *DiskCache {
return &DiskCache{
noSync: false,

//streamBuf: bytes.NewBuffer(make([]byte, 4*1024)),
streamBuf: make([]byte, 4*1024),
batchHeader: make([]byte, dataHeaderLen),

batchSize: 20 * 1024 * 1024,
maxDataSize: 0, // not set

Expand Down
8 changes: 8 additions & 0 deletions diskcache/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,11 @@ func WithPath(x string) CacheOption {
c.path = filepath.Clean(x)
}
}

func WithStreamSize(x int32) CacheOption {
return func(c *DiskCache) {
if x > 0 {
c.streamBuf = make([]byte, x)
}
}
}
85 changes: 85 additions & 0 deletions diskcache/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ package diskcache

import (
"encoding/binary"
"errors"
"fmt"
"io"
"os"
"time"
)

Expand All @@ -21,7 +25,10 @@ func (c *DiskCache) Put(data []byte) error {

defer func() {
putVec.WithLabelValues(c.path).Inc()

putBytesVec.WithLabelValues(c.path).Add(float64(len(data)))
putBytesV2Vec.WithLabelValues(c.path).Observe(float64(len(data)))

putLatencyVec.WithLabelValues(c.path).Observe(float64(time.Since(start) / time.Microsecond))
sizeVec.WithLabelValues(c.path).Set(float64(c.size))
}()
Expand Down Expand Up @@ -66,3 +73,81 @@ func (c *DiskCache) Put(data []byte) error {

return nil
}

func (c *DiskCache) putPart(part []byte) error {
if _, err := c.wfd.Write(part); err != nil {
return err
}

if !c.noSync {
if err := c.wfd.Sync(); err != nil {
return err
}
}
return nil
}

// StreamPut read from r for bytes and write to storage.
func (c *DiskCache) StreamPut(r io.Reader, size int) error {
var (
n = 0
total = 0
err error
startOffset int64
start = time.Now()
round = 0
)

c.wlock.Lock()
defer c.wlock.Unlock()

if c.capacity > 0 && c.size+int64(size) > c.capacity {
return ErrCacheFull
}

if startOffset, err = c.wfd.Seek(0, os.SEEK_CUR); err != nil {
return fmt.Errorf("Seek(0, SEEK_CUR): %w", err)
}

defer func() {
if total > 0 && err != nil { // fallback to origin postion
if _, serr := c.wfd.Seek(startOffset, os.SEEK_SET); serr != nil {
// TODO:
}
}

putBytesV2Vec.WithLabelValues(c.path).Observe(float64(size))
putLatencyVec.WithLabelValues(c.path).Observe(float64(time.Since(start) / time.Microsecond))
sizeVec.WithLabelValues(c.path).Set(float64(c.size))
streamPutVec.WithLabelValues(c.path).Observe(float64(round))
}()

binary.LittleEndian.PutUint32(c.batchHeader, uint32(size))
if _, err := c.wfd.Write(c.batchHeader); err != nil {
return err
}

for {
n, err = r.Read(c.streamBuf)
if err != nil {
if errors.Is(err, io.EOF) {
break
} else {
return err
}
}

if n == 0 {
break
}

if err = c.putPart(c.streamBuf); err != nil {
return err
} else {
total += n
round++
}
}

return nil
}
37 changes: 37 additions & 0 deletions diskcache/put_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package diskcache

import (
"errors"
"strings"
"sync"
"sync/atomic"
T "testing"
Expand Down Expand Up @@ -328,3 +329,39 @@ func TestPutOnCapacityReached(t *T.T) {
})
})
}

func TestStreamPut(t *T.T) {
t.Run("basic", func(t *T.T) {
reg := prometheus.NewRegistry()
register(reg)

raw := "0123456789"
r := strings.NewReader(raw)

var (
p = t.TempDir()
)

t.Logf("path: %s", p)

c, err := Open(WithPath(p), WithStreamSize(2))
assert.NoError(t, err)

assert.NoError(t, c.StreamPut(r, len(raw)))
assert.NoError(t, c.Rotate())

assert.NoError(t, c.Get(func(data []byte) error {
assert.Equal(t, []byte(raw), data)
return nil
}))

mfs, err := reg.Gather()
require.NoError(t, err)
t.Logf("\n%s", metrics.MetricFamily2Text(mfs))

t.Cleanup(func() {
assert.NoError(t, c.Close())
ResetMetrics()
})
})
}

0 comments on commit fc06869

Please sign in to comment.