This repository has been archived by the owner on Nov 26, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbuffer_cache_test.go
98 lines (92 loc) · 1.96 KB
/
buffer_cache_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package main
import (
"fmt"
"io"
"log"
"net/http"
_ "net/http/pprof"
"os"
"testing"
"time"
)
func TestBufferCache_Read(t *testing.T) {
filename := "test_file_large"
fp, e := os.Open(filename)
if e != nil {
t.Errorf("failed to open test file: %s\n", filename)
return
}
defer func(fp *os.File) { _ = fp.Close() }(fp)
bc := newBufferCache()
ch := make(chan bool)
//pprof
go func() {
fmt.Println(http.ListenAndServe("localhost:8000", nil))
}()
consumed := make(chan int64)
go consumer(ch, consumed, bc)
total := int64(0)
cancel := func(chan bool) { ch <- true }
s := make([]byte, SlotSize)
for {
// time.Sleep(time.Microsecond * time.Duration(10))
n, e := fp.Read(s)
if e != nil && e == io.EOF {
log.Println("producer Load file finish")
goto L
}
ret := 0
for i := 0; i < 3; i++ {
nW, e := bc.Write(s[ret:n])
if e != nil {
t.Error(e)
// time.Sleep(time.Microsecond * time.Duration(10))
continue
}
// log.Printf("INFO : producer written %d bytes OK\n", nW)
total += int64(nW)
ret += nW
if ret == n {
break
}
}
}
L:
log.Printf("INFO : producer written %d bytes\n", total)
cancel(ch)
time.Sleep(time.Second * time.Duration(1)) // 2s is enough for reading 4M bytes
consumedSize := <-consumed
if total != consumedSize {
t.Errorf("test failed. produce %d , consume %d\n", total, consumedSize)
}
}
func consumer(cancel chan bool, ret chan int64, cache *bufferCache) {
if cache == nil {
return
}
total := int64(0)
s := make([]byte, SlotSize)
customerInternal := func() {
if cache.Len() <= 0 {
return
}
n, e := cache.Read(s)
if e != nil {
log.Printf("WARN: consumer faild to read data\n")
return
}
// fmt.Printf("INFO: consumer read %d bytes ok\n", n)
total += int64(n)
}
for {
// time.Sleep(time.Microsecond * time.Duration(10))
select {
case <-cancel:
log.Printf("INFO: consumer read %d bytes\n", total)
ret <- total
return
default:
customerInternal()
}
}
}