Skip to content

Commit

Permalink
influxdb performance
Browse files Browse the repository at this point in the history
  • Loading branch information
James Ranson committed Jun 6, 2020
1 parent 0e6685e commit a7c2203
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 31 deletions.
70 changes: 39 additions & 31 deletions pkg/proxy/origins/influxdb/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func (se *SeriesEnvelope) CropToRange(e timeseries.Extent) {
// Sort sorts all Values in each Series chronologically by their timestamp
func (se *SeriesEnvelope) Sort() {

if se.isSorted || len(se.Results) == 0 || len(se.Results[0].Series) == 0 {
if se.isSorted || len(se.Results) == 0 {
return
}

Expand All @@ -429,41 +429,49 @@ func (se *SeriesEnvelope) Sort() {

var hasWarned bool
tsm := map[time.Time]bool{}
m := make(map[int64][]interface{})
if ti := str.IndexOfString(se.Results[0].Series[0].Columns, "time"); ti != -1 {
for ri := range se.Results {
seriesWG := sync.WaitGroup{}
for si := range se.Results[ri].Series {
keys := make([]int64, 0, len(m))
for _, v := range se.Results[ri].Series[si].Values {
wg.Add(1)
go func(s []interface{}) {
if tf, ok := s[ti].(float64); ok {
t := int64(tf)
mtx.Lock()
if _, ok := m[t]; !ok {
keys = append(keys, t)
m[t] = s
seriesWG.Add(1)
go func(j int) {
tsLookup := make(map[int64][]interface{})
timestamps := make([]int64, 0, len(se.Results[ri].Series[j].Values))
for _, v := range se.Results[ri].Series[j].Values {
wg.Add(1)
go func(s []interface{}) {
if tf, ok := s[ti].(float64); ok {
t := int64(tf)
mtx.Lock()
if _, ok := tsLookup[t]; !ok {
timestamps = append(timestamps, t)
tsLookup[t] = s
}
tsm[time.Unix(t/1000, 0)] = true
mtx.Unlock()
} else if !hasWarned {
hasWarned = true
// this makeshift warning is temporary during the beta cycle to help
// troubleshoot #433
fmt.Println("WARN", "could not convert influxdb time to a float64:",
s[ti], "resultSet:", se)
}
tsm[time.Unix(t/1000, 0)] = true
mtx.Unlock()
} else if !hasWarned {
hasWarned = true
// this makeshift warning is temporary during the beta cycle to help
// troubleshoot #433
fmt.Println("WARN", "could not convert influxdb time to a float64:",
s[ti], "resultSet:", se)
}
wg.Done()
}(v)
}
wg.Wait()
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
sm := make([][]interface{}, 0, len(keys))
for _, key := range keys {
sm = append(sm, m[key])
}
se.Results[ri].Series[si].Values = sm
wg.Done()
}(v)
}
wg.Wait()
sort.Slice(timestamps, func(i, j int) bool {
return timestamps[i] < timestamps[j]
})
sm := make([][]interface{}, len(timestamps))
for i, key := range timestamps {
sm[i] = tsLookup[key]
}
se.Results[ri].Series[j].Values = sm
seriesWG.Done()
}(si)
}
seriesWG.Wait()
}
}

Expand Down
138 changes: 138 additions & 0 deletions pkg/proxy/origins/influxdb/series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1545,6 +1545,144 @@ func TestSort(t *testing.T) {
if se.isCounted {
t.Errorf("got %t expected %t", se.isCounted, false)
}

tests := []struct {
before, after *SeriesEnvelope
}{
// case 0
{
before: &SeriesEnvelope{
Results: []Result{
{
Series: []models.Row{
{
Name: "a",
Columns: []string{"time", "units"},
Tags: map[string]string{"tagName1": "tagValue1"},
Values: [][]interface{}{
{float64(15000), 1.5},
{float64(5000), 1.5},
{float64(10000), 1.5},
},
},
{
Name: "b",
Columns: []string{"time", "units"},
Tags: map[string]string{"tagName1": "tagValue1"},
Values: [][]interface{}{
{float64(15000), 1.5},
{float64(5000), 1.5},
{float64(10000), 1.5},
},
},
},
},
{
Series: []models.Row{
{
Name: "c",
Columns: []string{"time", "units"},
Tags: map[string]string{"tagName1": "tagValue1"},
Values: [][]interface{}{
{float64(15000), 1.5},
{float64(5000), 1.5},
{float64(10000), 1.5},
},
},
{
Name: "d",
Columns: []string{"time", "units"},
Tags: map[string]string{"tagName1": "tagValue1"},
Values: [][]interface{}{
{float64(15000), 1.5},
{float64(5000), 1.5},
{float64(10000), 1.5},
},
},
},
},
},
ExtentList: timeseries.ExtentList{
timeseries.Extent{Start: time.Unix(5, 0), End: time.Unix(15, 0)},
},
StepDuration: time.Duration(5) * time.Second,
timestamps: map[time.Time]bool{time.Unix(5, 0): true, time.Unix(10, 0): true, time.Unix(15, 0): true},
tslist: times.Times{time.Unix(5, 0), time.Unix(10, 0), time.Unix(15, 0)},
isSorted: false,
isCounted: false,
},
after: &SeriesEnvelope{
Results: []Result{
{
Series: []models.Row{
{
Name: "a",
Columns: []string{"time", "units"},
Tags: map[string]string{"tagName1": "tagValue1"},
Values: [][]interface{}{
{float64(5000), 1.5},
{float64(10000), 1.5},
{float64(15000), 1.5},
},
},
{
Name: "b",
Columns: []string{"time", "units"},
Tags: map[string]string{"tagName1": "tagValue1"},
Values: [][]interface{}{
{float64(5000), 1.5},
{float64(10000), 1.5},
{float64(15000), 1.5},
},
},
},
},
{
Series: []models.Row{
{
Name: "c",
Columns: []string{"time", "units"},
Tags: map[string]string{"tagName1": "tagValue1"},
Values: [][]interface{}{
{float64(5000), 1.5},
{float64(10000), 1.5},
{float64(15000), 1.5},
},
},
{
Name: "d",
Columns: []string{"time", "units"},
Tags: map[string]string{"tagName1": "tagValue1"},
Values: [][]interface{}{
{float64(5000), 1.5},
{float64(10000), 1.5},
{float64(15000), 1.5},
},
},
},
},
},
ExtentList: timeseries.ExtentList{
timeseries.Extent{Start: time.Unix(5, 0), End: time.Unix(15, 0)},
},
StepDuration: time.Duration(5) * time.Second,
timestamps: map[time.Time]bool{time.Unix(5, 0): true, time.Unix(10, 0): true, time.Unix(15, 0): true},
tslist: times.Times{time.Unix(5, 0), time.Unix(10, 0), time.Unix(15, 0)},
isSorted: true,
isCounted: true,
},
},
}

for i, test := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
test.before.Sort()
if !reflect.DeepEqual(test.before, test.after) {
t.Errorf("mismatch\nexpected=%v\ngot =%v", test.after, test.before)
}
})
}

}

func TestSize(t *testing.T) {
Expand Down

0 comments on commit a7c2203

Please sign in to comment.