Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

增加step参数,可用于控制查询的精度 #33

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 89 additions & 53 deletions api/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package api
import (
"fmt"
"math"
"time"
//"log"

cmodel "github.com/open-falcon/common/model"
cutils "github.com/open-falcon/common/utils"
Expand Down Expand Up @@ -86,6 +86,75 @@ func handleItems(items []*cmodel.GraphItem) {
}
}

func consolFun(CF string, array []float64) float64 {
if CF == "AVERAGE" {
var sum float64 = 0
var count int = 0
for _, value := range(array) {
sum += value
count++
}
if count == 0 {
return math.NaN()
} else {
return sum / float64(count)
}
} else if CF == "MAX" {
var max float64 = math.NaN()
for _, value := range(array) {
if math.IsNaN(max) || value > max {
max = value
}
}
return max
} else if CF == "MIN" {
var min float64 = math.NaN()
for _, value := range(array) {
if math.IsNaN(min) || value < min {
min = value
}
}
return min
}
return math.NaN()
}

func calc(start_ts int64, end_ts int64, dsType string, step int, items []*cmodel.GraphItem, CF string, dst_step int) []*cmodel.RRDData {
var array []float64 = []float64{}
var idx int = 0
var val float64
var ret []*cmodel.RRDData = make([]*cmodel.RRDData, 0)

//log.Printf("calc: start_ts=%d, end_ts=%d, dsType=%s, step=%d, len(items)=%d, CF=%s, dst_step=%d\n", start_ts, end_ts, dsType, step, len(items), CF, dst_step)

for ts := start_ts; ts <= end_ts; ts += int64(step) {
for ; idx < len(items); idx++ {
if items[idx].Timestamp == ts {
if dsType == g.GAUGE {
array = append(array, items[idx].Value)
} else if (dsType == g.COUNTER || dsType == g.DERIVE) && idx < len(items) - 1 {
array = append(array, float64(items[idx+1].Value - items[idx].Value) / float64(items[idx+1].Timestamp - items[idx].Timestamp))
}
} else if items[idx].Timestamp > ts {
break
}
}

if ts % int64(dst_step) == 0 {
if dsType == g.GAUGE {
val = consolFun(CF, array)
} else if dsType == g.COUNTER || dsType == g.DERIVE {
val = consolFun(CF, array)
}
//fmt.Println("ConsolFun: ", val)
ret = append(ret, &cmodel.RRDData{Timestamp: ts, Value: cmodel.JsonFloat(val)})
array = []float64{}
}

}
return ret
}

func (this *Graph) Query(param cmodel.GraphQueryParam, resp *cmodel.GraphQueryResponse) error {
var (
datas []*cmodel.RRDData
Expand All @@ -108,7 +177,7 @@ func (this *Graph) Query(param cmodel.GraphQueryParam, resp *cmodel.GraphQueryRe
resp.DsType = dsType
resp.Step = step

start_ts := param.Start - param.Start%int64(step)
start_ts := param.Start - param.Start%int64(step) - int64(step)
end_ts := param.End - param.End%int64(step) + int64(step)
if end_ts-start_ts-int64(step) < 1 {
return nil
Expand All @@ -118,6 +187,8 @@ func (this *Graph) Query(param cmodel.GraphQueryParam, resp *cmodel.GraphQueryRe
key := g.FormRrdCacheKey(md5, dsType, step)
filename := g.RrdFileName(cfg.RRD.Storage, md5, dsType, step)

//log.Printf("[DEBUG] Query endpoint = %s, counter = %s, dsType = %s, step = %d, md5 = %s, key = %s, file_name = %s\n", param.Endpoint, param.Counter, dsType, step, md5, key, filename);

// read cached items
items, flag := store.GraphItems.FetchAll(key)
items_size := len(items)
Expand All @@ -135,72 +206,34 @@ func (this *Graph) Query(param cmodel.GraphQueryParam, resp *cmodel.GraphQueryRe
<-done
// fetch data from remote
datas = res.Values
resp.Step = res.Step
datas_size = len(datas)
} else {
// read data from rrd file
datas, _ = rrdtool.Fetch(filename, param.ConsolFun, start_ts, end_ts, step)
datas, resp.Step, _ = rrdtool.Fetch(filename, param.ConsolFun, start_ts, end_ts, param.Step)
datas_size = len(datas)
}

nowTs := time.Now().Unix()
lastUpTs := nowTs - nowTs%int64(step)
rra1StartTs := lastUpTs - int64(rrdtool.RRA1PointCnt*step)

// consolidated, do not merge
if start_ts < rra1StartTs {
resp.Values = datas
goto _RETURN_OK
}

// no cached items, do not merge
if items_size < 1 {
resp.Values = datas
goto _RETURN_OK
}

// 防止意外情况下,出现除0运算
if resp.Step == 0 {
resp.Step = step
}

// merge
{
// fmt cached items
var val cmodel.JsonFloat
cache := make([]*cmodel.RRDData, 0)

ts := items[0].Timestamp
itemEndTs := items[items_size-1].Timestamp
itemIdx := 0
if dsType == g.DERIVE || dsType == g.COUNTER {
for ts < itemEndTs {
if itemIdx < items_size-1 && ts == items[itemIdx].Timestamp &&
ts == items[itemIdx+1].Timestamp-int64(step) {
val = cmodel.JsonFloat(items[itemIdx+1].Value-items[itemIdx].Value) / cmodel.JsonFloat(step)
if val < 0 {
val = cmodel.JsonFloat(math.NaN())
}
itemIdx++
} else {
// missing
val = cmodel.JsonFloat(math.NaN())
}

if ts >= start_ts && ts <= end_ts {
cache = append(cache, &cmodel.RRDData{Timestamp: ts, Value: val})
}
ts = ts + int64(step)
}
} else if dsType == g.GAUGE {
for ts <= itemEndTs {
if itemIdx < items_size && ts == items[itemIdx].Timestamp {
val = cmodel.JsonFloat(items[itemIdx].Value)
itemIdx++
} else {
// missing
val = cmodel.JsonFloat(math.NaN())
}

if ts >= start_ts && ts <= end_ts {
cache = append(cache, &cmodel.RRDData{Timestamp: ts, Value: val})
}
ts = ts + int64(step)
}
if ts < end_ts {
cache = calc(items[0].Timestamp, items[items_size-1].Timestamp, dsType, step, items, param.ConsolFun, resp.Step)
}
cache_size := len(cache)

Expand Down Expand Up @@ -228,7 +261,7 @@ func (this *Graph) Query(param cmodel.GraphQueryParam, resp *cmodel.GraphQueryRe
}

// fix missing
for ts := lastTs + int64(step); ts < cache[0].Timestamp; ts += int64(step) {
for ts := lastTs + int64(resp.Step); ts < cache[0].Timestamp; ts += int64(resp.Step) {
merged = append(merged, &cmodel.RRDData{Timestamp: ts, Value: cmodel.JsonFloat(math.NaN())})
}

Expand All @@ -248,18 +281,20 @@ func (this *Graph) Query(param cmodel.GraphQueryParam, resp *cmodel.GraphQueryRe
mergedSize := len(merged)

// fmt result
ret_size := int((end_ts - start_ts) / int64(step))
ret_size := int((end_ts - start_ts) / int64(resp.Step))
ret := make([]*cmodel.RRDData, ret_size, ret_size)
mergedIdx := 0
ts = start_ts
if len(merged) > 0 {
ts = merged[0].Timestamp
}
for i := 0; i < ret_size; i++ {
if mergedIdx < mergedSize && ts == merged[mergedIdx].Timestamp {
ret[i] = merged[mergedIdx]
mergedIdx++
} else {
ret[i] = &cmodel.RRDData{Timestamp: ts, Value: cmodel.JsonFloat(math.NaN())}
}
ts += int64(step)
ts += int64(resp.Step)
}
resp.Values = ret
}
Expand All @@ -270,6 +305,7 @@ _RETURN_OK:
return nil
}


func (this *Graph) Info(param cmodel.GraphInfoParam, resp *cmodel.GraphInfoResp) error {
// statistics
proc.GraphInfoCnt.Incr()
Expand Down
3 changes: 1 addition & 2 deletions g/git.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
package g

const (
COMMIT = "gitversion"
COMMIT = "15b66f2"
)
11 changes: 6 additions & 5 deletions rrdtool/rrdtool.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func FlushFile(filename string, items []*cmodel.GraphItem) error {
return <-done
}

func Fetch(filename string, cf string, start, end int64, step int) ([]*cmodel.RRDData, error) {
func Fetch(filename string, cf string, start, end int64, step int) ([]*cmodel.RRDData, int, error) {
done := make(chan error, 1)
task := &io_task_t{
method: IO_TASK_M_FETCH,
Expand All @@ -184,17 +184,18 @@ func Fetch(filename string, cf string, start, end int64, step int) ([]*cmodel.RR
}
io_task_chan <- task
err := <-done
return task.args.(*fetch_t).data, err
return task.args.(*fetch_t).data, task.args.(*fetch_t).step, err
}

func fetch(filename string, cf string, start, end int64, step int) ([]*cmodel.RRDData, error) {
func fetch(filename string, cf string, start, end int64, step int) ([]*cmodel.RRDData, int, error) {
start_t := time.Unix(start, 0)
end_t := time.Unix(end, 0)
step_t := time.Duration(step) * time.Second

fetchRes, err := rrdlite.Fetch(filename, cf, start_t, end_t, step_t)
if err != nil {
return []*cmodel.RRDData{}, err
log.Printf("[ERROR] fetch data failed, file_name = %s, err = %s", filename, err.Error())
return []*cmodel.RRDData{}, 0, err
}

defer fetchRes.FreeValues()
Expand All @@ -215,7 +216,7 @@ func fetch(filename string, cf string, start, end int64, step int) ([]*cmodel.RR
ret[i] = d
}

return ret, nil
return ret, int(step_s), nil
}

func FlushAll(force bool) {
Expand Down
2 changes: 1 addition & 1 deletion rrdtool/sync_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func ioWorker() {
}
} else if task.method == IO_TASK_M_FETCH {
if args, ok := task.args.(*fetch_t); ok {
args.data, err = fetch(args.filename, args.cf, args.start, args.end, args.step)
args.data, args.step, err = fetch(args.filename, args.cf, args.start, args.end, args.step)
task.done <- err
}
}
Expand Down