diff --git a/api/graph.go b/api/graph.go index 5d1192e..bce0ecc 100644 --- a/api/graph.go +++ b/api/graph.go @@ -3,7 +3,7 @@ package api import ( "fmt" "math" - "time" + //"log" cmodel "github.com/open-falcon/common/model" cutils "github.com/open-falcon/common/utils" @@ -86,6 +86,75 @@ func handleItems(items []*cmodel.GraphItem) { } } +func consolFun(CF string, array []float64) float64 { + if CF == "AVERAGE" { + var sum float64 = 0 + var count int = 0 + for _, value := range(array) { + sum += value + count++ + } + if count == 0 { + return math.NaN() + } else { + return sum / float64(count) + } + } else if CF == "MAX" { + var max float64 = math.NaN() + for _, value := range(array) { + if math.IsNaN(max) || value > max { + max = value + } + } + return max + } else if CF == "MIN" { + var min float64 = math.NaN() + for _, value := range(array) { + if math.IsNaN(min) || value < min { + min = value + } + } + return min + } + return math.NaN() +} + +func calc(start_ts int64, end_ts int64, dsType string, step int, items []*cmodel.GraphItem, CF string, dst_step int) []*cmodel.RRDData { + var array []float64 = []float64{} + var idx int = 0 + var val float64 + var ret []*cmodel.RRDData = make([]*cmodel.RRDData, 0) + + //log.Printf("calc: start_ts=%d, end_ts=%d, dsType=%s, step=%d, len(items)=%d, CF=%s, dst_step=%d\n", start_ts, end_ts, dsType, step, len(items), CF, dst_step) + + for ts := start_ts; ts <= end_ts; ts += int64(step) { + for ; idx < len(items); idx++ { + if items[idx].Timestamp == ts { + if dsType == g.GAUGE { + array = append(array, items[idx].Value) + } else if (dsType == g.COUNTER || dsType == g.DERIVE) && idx < len(items) - 1 { + array = append(array, float64(items[idx+1].Value - items[idx].Value) / float64(items[idx+1].Timestamp - items[idx].Timestamp)) + } + } else if items[idx].Timestamp > ts { + break + } + } + + if ts % int64(dst_step) == 0 { + if dsType == g.GAUGE { + val = consolFun(CF, array) + } else if dsType == g.COUNTER || dsType == g.DERIVE { + val = consolFun(CF, array) + } + //fmt.Println("ConsolFun: ", val) + ret = append(ret, &cmodel.RRDData{Timestamp: ts, Value: cmodel.JsonFloat(val)}) + array = []float64{} + } + + } + return ret +} + func (this *Graph) Query(param cmodel.GraphQueryParam, resp *cmodel.GraphQueryResponse) error { var ( datas []*cmodel.RRDData @@ -108,7 +177,7 @@ func (this *Graph) Query(param cmodel.GraphQueryParam, resp *cmodel.GraphQueryRe resp.DsType = dsType resp.Step = step - start_ts := param.Start - param.Start%int64(step) + start_ts := param.Start - param.Start%int64(step) - int64(step) end_ts := param.End - param.End%int64(step) + int64(step) if end_ts-start_ts-int64(step) < 1 { return nil @@ -118,6 +187,8 @@ func (this *Graph) Query(param cmodel.GraphQueryParam, resp *cmodel.GraphQueryRe key := g.FormRrdCacheKey(md5, dsType, step) filename := g.RrdFileName(cfg.RRD.Storage, md5, dsType, step) + //log.Printf("[DEBUG] Query endpoint = %s, counter = %s, dsType = %s, step = %d, md5 = %s, key = %s, file_name = %s\n", param.Endpoint, param.Counter, dsType, step, md5, key, filename); + // read cached items items, flag := store.GraphItems.FetchAll(key) items_size := len(items) @@ -135,72 +206,34 @@ func (this *Graph) Query(param cmodel.GraphQueryParam, resp *cmodel.GraphQueryRe <-done // fetch data from remote datas = res.Values + resp.Step = res.Step datas_size = len(datas) } else { // read data from rrd file - datas, _ = rrdtool.Fetch(filename, param.ConsolFun, start_ts, end_ts, step) + datas, resp.Step, _ = rrdtool.Fetch(filename, param.ConsolFun, start_ts, end_ts, param.Step) datas_size = len(datas) } - nowTs := time.Now().Unix() - lastUpTs := nowTs - nowTs%int64(step) - rra1StartTs := lastUpTs - int64(rrdtool.RRA1PointCnt*step) - - // consolidated, do not merge - if start_ts < rra1StartTs { - resp.Values = datas - goto _RETURN_OK - } - // no cached items, do not merge if items_size < 1 { resp.Values = datas goto _RETURN_OK } + // 防止意外情况下,出现除0运算 + if resp.Step == 0 { + resp.Step = step + } + // merge { // fmt cached items - var val cmodel.JsonFloat cache := make([]*cmodel.RRDData, 0) ts := items[0].Timestamp - itemEndTs := items[items_size-1].Timestamp - itemIdx := 0 - if dsType == g.DERIVE || dsType == g.COUNTER { - for ts < itemEndTs { - if itemIdx < items_size-1 && ts == items[itemIdx].Timestamp && - ts == items[itemIdx+1].Timestamp-int64(step) { - val = cmodel.JsonFloat(items[itemIdx+1].Value-items[itemIdx].Value) / cmodel.JsonFloat(step) - if val < 0 { - val = cmodel.JsonFloat(math.NaN()) - } - itemIdx++ - } else { - // missing - val = cmodel.JsonFloat(math.NaN()) - } - if ts >= start_ts && ts <= end_ts { - cache = append(cache, &cmodel.RRDData{Timestamp: ts, Value: val}) - } - ts = ts + int64(step) - } - } else if dsType == g.GAUGE { - for ts <= itemEndTs { - if itemIdx < items_size && ts == items[itemIdx].Timestamp { - val = cmodel.JsonFloat(items[itemIdx].Value) - itemIdx++ - } else { - // missing - val = cmodel.JsonFloat(math.NaN()) - } - - if ts >= start_ts && ts <= end_ts { - cache = append(cache, &cmodel.RRDData{Timestamp: ts, Value: val}) - } - ts = ts + int64(step) - } + if ts < end_ts { + cache = calc(items[0].Timestamp, items[items_size-1].Timestamp, dsType, step, items, param.ConsolFun, resp.Step) } cache_size := len(cache) @@ -228,7 +261,7 @@ func (this *Graph) Query(param cmodel.GraphQueryParam, resp *cmodel.GraphQueryRe } // fix missing - for ts := lastTs + int64(step); ts < cache[0].Timestamp; ts += int64(step) { + for ts := lastTs + int64(resp.Step); ts < cache[0].Timestamp; ts += int64(resp.Step) { merged = append(merged, &cmodel.RRDData{Timestamp: ts, Value: cmodel.JsonFloat(math.NaN())}) } @@ -248,10 +281,12 @@ func (this *Graph) Query(param cmodel.GraphQueryParam, resp *cmodel.GraphQueryRe mergedSize := len(merged) // fmt result - ret_size := int((end_ts - start_ts) / int64(step)) + ret_size := int((end_ts - start_ts) / int64(resp.Step)) ret := make([]*cmodel.RRDData, ret_size, ret_size) mergedIdx := 0 - ts = start_ts + if len(merged) > 0 { + ts = merged[0].Timestamp + } for i := 0; i < ret_size; i++ { if mergedIdx < mergedSize && ts == merged[mergedIdx].Timestamp { ret[i] = merged[mergedIdx] @@ -259,7 +294,7 @@ func (this *Graph) Query(param cmodel.GraphQueryParam, resp *cmodel.GraphQueryRe } else { ret[i] = &cmodel.RRDData{Timestamp: ts, Value: cmodel.JsonFloat(math.NaN())} } - ts += int64(step) + ts += int64(resp.Step) } resp.Values = ret } @@ -270,6 +305,7 @@ _RETURN_OK: return nil } + func (this *Graph) Info(param cmodel.GraphInfoParam, resp *cmodel.GraphInfoResp) error { // statistics proc.GraphInfoCnt.Incr() diff --git a/g/git.go b/g/git.go index 696d844..e066b92 100644 --- a/g/git.go +++ b/g/git.go @@ -1,5 +1,4 @@ package g - const ( - COMMIT = "gitversion" + COMMIT = "15b66f2" ) diff --git a/rrdtool/rrdtool.go b/rrdtool/rrdtool.go index 0b9cb1d..a3bb654 100644 --- a/rrdtool/rrdtool.go +++ b/rrdtool/rrdtool.go @@ -169,7 +169,7 @@ func FlushFile(filename string, items []*cmodel.GraphItem) error { return <-done } -func Fetch(filename string, cf string, start, end int64, step int) ([]*cmodel.RRDData, error) { +func Fetch(filename string, cf string, start, end int64, step int) ([]*cmodel.RRDData, int, error) { done := make(chan error, 1) task := &io_task_t{ method: IO_TASK_M_FETCH, @@ -184,17 +184,18 @@ func Fetch(filename string, cf string, start, end int64, step int) ([]*cmodel.RR } io_task_chan <- task err := <-done - return task.args.(*fetch_t).data, err + return task.args.(*fetch_t).data, task.args.(*fetch_t).step, err } -func fetch(filename string, cf string, start, end int64, step int) ([]*cmodel.RRDData, error) { +func fetch(filename string, cf string, start, end int64, step int) ([]*cmodel.RRDData, int, error) { start_t := time.Unix(start, 0) end_t := time.Unix(end, 0) step_t := time.Duration(step) * time.Second fetchRes, err := rrdlite.Fetch(filename, cf, start_t, end_t, step_t) if err != nil { - return []*cmodel.RRDData{}, err + log.Printf("[ERROR] fetch data failed, file_name = %s, err = %s", filename, err.Error()) + return []*cmodel.RRDData{}, 0, err } defer fetchRes.FreeValues() @@ -215,7 +216,7 @@ func fetch(filename string, cf string, start, end int64, step int) ([]*cmodel.RR ret[i] = d } - return ret, nil + return ret, int(step_s), nil } func FlushAll(force bool) { diff --git a/rrdtool/sync_disk.go b/rrdtool/sync_disk.go index 5ae9164..2f495c2 100644 --- a/rrdtool/sync_disk.go +++ b/rrdtool/sync_disk.go @@ -96,7 +96,7 @@ func ioWorker() { } } else if task.method == IO_TASK_M_FETCH { if args, ok := task.args.(*fetch_t); ok { - args.data, err = fetch(args.filename, args.cf, args.start, args.end, args.step) + args.data, args.step, err = fetch(args.filename, args.cf, args.start, args.end, args.step) task.done <- err } }