Skip to content

Commit

Permalink
1.1.3 fixes (#488)
Browse files Browse the repository at this point in the history
* fix waitgroup scoping issues

* trust time is always idx 0 for influxql responses

* fix concurrency issue in series merge

* ensure backfill tolerance only applies to realtime

* update image tag in docker-compose demo project (was using `1.1.0-beta` instead of `1.1`)

* improve clickhouse time series / cacheable detection

* update go 1.15

* update dependencies
  • Loading branch information
James Ranson authored Sep 18, 2020
1 parent 4f553c5 commit 5ebdfcd
Show file tree
Hide file tree
Showing 501 changed files with 47,053 additions and 24,515 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ matrix:

- language: go
go:
- "1.14.x"
- "1.15.x"
- master
before_install:
- go get github.com/mattn/goveralls
Expand Down
2 changes: 1 addition & 1 deletion cmd/trickster/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var (

const (
applicationName = "trickster"
applicationVersion = "1.1.2"
applicationVersion = "1.1.3"
)

var fatalStartupErrors = true
Expand Down
2 changes: 1 addition & 1 deletion deploy/trickster-demo/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ services:
restart: always

trickster:
image: tricksterproxy/trickster:1.1.0-beta
image: tricksterproxy/trickster:1.1
depends_on:
- prometheus
- mockster
Expand Down
30 changes: 14 additions & 16 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,32 @@ require (
github.com/BurntSushi/toml v0.3.1
github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6 // indirect
github.com/alicebob/miniredis v2.5.0+incompatible
github.com/coreos/bbolt v1.3.3
github.com/dgraph-io/badger v1.6.0
github.com/dgraph-io/badger v1.6.2
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
github.com/go-kit/kit v0.9.0
github.com/go-logfmt/logfmt v0.5.0 // indirect
github.com/go-redis/redis v6.15.6+incompatible
github.com/go-kit/kit v0.10.0
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-stack/stack v1.8.0
github.com/golang/snappy v0.0.1
github.com/gomodule/redigo v2.0.0+incompatible // indirect
github.com/gorilla/handlers v1.4.2
github.com/gorilla/mux v1.7.4
github.com/influxdata/influxdb v1.8.0
github.com/gorilla/handlers v1.5.1
github.com/gorilla/mux v1.8.0
github.com/influxdata/influxdb v1.8.2
github.com/onsi/ginkgo v1.10.1 // indirect
github.com/onsi/gomega v1.7.0 // indirect
github.com/prometheus/client_golang v1.5.0
github.com/prometheus/common v0.9.1
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/common v0.13.0
github.com/stretchr/testify v1.5.1 // indirect
github.com/tinylib/msgp v1.1.1
github.com/tinylib/msgp v1.1.2
github.com/tricksterproxy/mockster v1.1.1
github.com/yuin/gopher-lua v0.0.0-20190514113301-1cd887cd7036 // indirect
go.etcd.io/bbolt v1.3.2 // indirect
go.etcd.io/bbolt v1.3.5
go.opentelemetry.io/otel v0.6.0
go.opentelemetry.io/otel/exporters/trace/jaeger v0.6.0
go.opentelemetry.io/otel/exporters/trace/zipkin v0.6.0
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e
google.golang.org/api v0.24.0 // indirect
google.golang.org/grpc v1.29.1
golang.org/x/net v0.0.0-20200904194848-62affa334b73
google.golang.org/api v0.32.0 // indirect
google.golang.org/grpc v1.32.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)

go 1.14
go 1.15
344 changes: 321 additions & 23 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/cache/bbolt/bbolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/tricksterproxy/trickster/pkg/locks"
"github.com/tricksterproxy/trickster/pkg/util/log"

"github.com/coreos/bbolt"
"go.etcd.io/bbolt"
)

// Cache describes a BBolt Cache
Expand Down
11 changes: 5 additions & 6 deletions pkg/proxy/engines/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,24 +503,24 @@ func (me *MatrixEnvelope) Merge(sort bool, collection ...timeseries.Timeseries)
for _, ts := range collection {
if ts != nil {
me2 := ts.(*MatrixEnvelope)
wg2 := sync.WaitGroup{}
for _, s := range me2.Data.Result {
wg.Add(1)
wg2.Add(1)
go func(t *model.SampleStream) {
defer wg2.Done()
mtx.Lock()
name := t.Metric.String()
if _, ok := meMetrics[name]; !ok {
meMetrics[name] = t
me.Data.Result = append(me.Data.Result, t)
mtx.Unlock()
wg.Done()
return
}
meMetrics[name].Values = append(meMetrics[name].Values, t.Values...)
mtx.Unlock()
wg.Done()
}(s)
}
wg.Wait()
wg2.Wait()
me.ExtentList = append(me.ExtentList, me2.ExtentList...)
}
}
Expand Down Expand Up @@ -740,13 +740,12 @@ func (me *MatrixEnvelope) Sort() {
}

tsm := map[time.Time]bool{}
wg := sync.WaitGroup{}
mtx := sync.Mutex{}

for i, s := range me.Data.Result { // []SampleStream
m := make(map[time.Time]model.SamplePair)
keys := make(times.Times, 0, len(m))

wg := sync.WaitGroup{}
for _, v := range s.Values { // []SamplePair
wg.Add(1)
go func(sp model.SamplePair) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/proxy/engines/deltaproxycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request) {
bf := timeseries.Extent{Start: time.Unix(0, 0), End: trq.Extent.End}
bt := trq.GetBackfillTolerance(oc.BackfillTolerance)

if !trq.IsOffset && bt > 0 {
if !trq.IsOffset && bt > 0 && !time.Now().Add(-bt).After(bf.End) {
bf.End = bf.End.Add(-bt)
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/proxy/origins/clickhouse/handler_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ import (
"strings"

"github.com/tricksterproxy/trickster/pkg/proxy/engines"
"github.com/tricksterproxy/trickster/pkg/proxy/params"
"github.com/tricksterproxy/trickster/pkg/proxy/urls"
)

// QueryHandler handles timeseries requests for ClickHouse and processes them through the delta proxy cache
func (c *Client) QueryHandler(w http.ResponseWriter, r *http.Request) {

rqlc := strings.Replace(strings.ToLower(r.URL.RawQuery), "%20", "+", -1)
qp, _, isBody := params.GetRequestValues(r)
q := strings.ToLower(qp.Get(upQuery))
// if it's not a select statement, just proxy it instead
if (!strings.HasPrefix(rqlc, "query=select+")) && (!(strings.Index(rqlc, "&query=select+") > 0)) &&
(!strings.HasSuffix(rqlc, "format+json")) {
if isBody || (!strings.Contains(q, "select ") &&
(!strings.HasSuffix(q, " format json"))) {
c.ProxyHandler(w, r)
return
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/proxy/origins/clickhouse/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ func (re *ResultsEnvelope) SetStep(step time.Duration) {
// and optionally sorts the merged Timeseries
func (re *ResultsEnvelope) Merge(sort bool, collection ...timeseries.Timeseries) {

wg := sync.WaitGroup{}
mtx := sync.Mutex{}

for _, ts := range collection {
if ts != nil {
re2 := ts.(*ResultsEnvelope)
wg := sync.WaitGroup{}
for k, s := range re2.Data {
wg.Add(1)
go func(l string, d *DataSet) {
Expand Down Expand Up @@ -359,24 +359,24 @@ func (re *ResultsEnvelope) Sort() {
}

tsm := map[time.Time]bool{}
wg := sync.WaitGroup{}
mtx := sync.Mutex{}

for i, s := range re.Data {
m := make(map[time.Time]Point)
keys := make(times.Times, 0, len(s.Points))
wg := sync.WaitGroup{}
for _, v := range s.Points {
wg.Add(1)
go func(sp Point) {
go func(sp Point, l map[time.Time]Point) {
mtx.Lock()
if _, ok := m[sp.Timestamp]; !ok {
if _, ok := l[sp.Timestamp]; !ok {
keys = append(keys, sp.Timestamp)
m[sp.Timestamp] = sp
l[sp.Timestamp] = sp
}
tsm[sp.Timestamp] = true
mtx.Unlock()
wg.Done()
}(v)
}(v, m)
}
wg.Wait()
sort.Sort(keys)
Expand Down
107 changes: 54 additions & 53 deletions pkg/proxy/origins/influxdb/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,17 @@ func (se *SeriesEnvelope) Merge(sort bool, collection ...timeseries.Timeseries)
defer se.updateLock.Unlock()

series := make(map[seriesKey]*models.Row)
for i, r := range se.Results {
for i := range se.Results {
for j := range se.Results[i].Series {
wg.Add(1)
go func(s *models.Row) {
go func(l, m int) {
mtx.Lock()
series[seriesKey{ResultID: i, StatementID: r.StatementID, Name: s.Name,
s := &se.Results[l].Series[m]
series[seriesKey{ResultID: l, StatementID: se.Results[l].StatementID, Name: s.Name,
Tags: tags(s.Tags).String(), Columns: strings.Join(s.Columns, ",")}] = s
mtx.Unlock()
wg.Done()
}(&se.Results[i].Series[j])
}(i, j)
}
}
wg.Wait()
Expand All @@ -156,31 +157,30 @@ func (se *SeriesEnvelope) Merge(sort bool, collection ...timeseries.Timeseries)
if ts != nil {
se2 := ts.(*SeriesEnvelope)
for g, r := range se2.Results {

if g >= len(se.Results) {
mtx.Lock()
se.Results = append(se.Results, se2.Results[g:]...)
mtx.Unlock()
break
}

for i := range r.Series {
wg.Add(1)
go func(s *models.Row, resultID int) {
go func(l, m int) {
mtx.Lock()
sk := seriesKey{ResultID: g, StatementID: r.StatementID, Name: s.Name,
s := &se2.Results[l].Series[m]
sk := seriesKey{ResultID: l, StatementID: se.Results[l].StatementID, Name: s.Name,
Tags: tags(s.Tags).String(), Columns: strings.Join(s.Columns, ",")}
if _, ok := series[sk]; !ok {
series[sk] = s
se.Results[resultID].Series = append(se.Results[resultID].Series, *s)
se.Results[l].Series = append(se.Results[l].Series, *s)
mtx.Unlock()
wg.Done()
return
}
series[sk].Values = append(series[sk].Values, s.Values...)
mtx.Unlock()
wg.Done()
}(&r.Series[i], g)
}(g, i)
}
}
wg.Wait()
Expand Down Expand Up @@ -424,55 +424,56 @@ func (se *SeriesEnvelope) Sort() {
return
}

wg := sync.WaitGroup{}
mtx := sync.Mutex{}

var hasWarned bool
tsm := map[time.Time]bool{}
if ti := str.IndexOfString(se.Results[0].Series[0].Columns, "time"); ti != -1 {
for ri := range se.Results {
seriesWG := sync.WaitGroup{}
for si := range se.Results[ri].Series {
seriesWG.Add(1)
go func(j int) {
tsLookup := make(map[int64][]interface{})
timestamps := make([]int64, 0, len(se.Results[ri].Series[j].Values))
for _, v := range se.Results[ri].Series[j].Values {
wg.Add(1)
go func(s []interface{}) {
if tf, ok := s[ti].(float64); ok {
t := int64(tf)
mtx.Lock()
if _, ok := tsLookup[t]; !ok {
timestamps = append(timestamps, t)
tsLookup[t] = s
}
tsm[time.Unix(t/1000, 0)] = true
mtx.Unlock()
} else if !hasWarned {
hasWarned = true
// this makeshift warning is temporary during the beta cycle to help
// troubleshoot #433
fmt.Println("WARN", "could not convert influxdb time to a float64:",
s[ti], "resultSet:", se)
for ri := range se.Results {
seriesWG := sync.WaitGroup{}
for si := range se.Results[ri].Series {
seriesWG.Add(1)
go func(j int) {
wg := sync.WaitGroup{}
tsLookup := make(map[int64][]interface{})
timestamps := make([]int64, 0, len(se.Results[ri].Series[j].Values))
for _, v := range se.Results[ri].Series[j].Values {
wg.Add(1)
go func(s []interface{}) {
defer wg.Done()
if len(s) == 0 {
return
}
if tf, ok := s[0].(float64); ok {
t := int64(tf)
mtx.Lock()
if _, ok := tsLookup[t]; !ok {
timestamps = append(timestamps, t)
tsLookup[t] = s
}
wg.Done()
}(v)
}
wg.Wait()
sort.Slice(timestamps, func(i, j int) bool {
return timestamps[i] < timestamps[j]
})
sm := make([][]interface{}, len(timestamps))
for i, key := range timestamps {
sm[i] = tsLookup[key]
}
se.Results[ri].Series[j].Values = sm
seriesWG.Done()
}(si)
}
seriesWG.Wait()
tsm[time.Unix(t/1000, 0)] = true
mtx.Unlock()
} else if !hasWarned {
hasWarned = true
// this makeshift warning is temporary during the beta cycle to help
// troubleshoot #433
fmt.Println("WARN", "could not convert influxdb time to a float64:",
s[0], "resultSet:", se)
}
}(v)
}
wg.Wait()
sort.Slice(timestamps, func(i, j int) bool {
return timestamps[i] < timestamps[j]
})
sm := make([][]interface{}, len(timestamps))
for i, key := range timestamps {
sm[i] = tsLookup[key]
}
se.Results[ri].Series[j].Values = sm
seriesWG.Done()
}(si)
}
seriesWG.Wait()
}

sort.Sort(se.ExtentList)
Expand Down
2 changes: 1 addition & 1 deletion pkg/proxy/origins/irondb/handler_histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestHistogramHandlerParseTimeRangeQuery(t *testing.T) {

// case where the period can't be parsed
r.URL.Path = "/histogram/0/900/z/00112233-4455-6677-8899-aabbccddeeff/metric"
expected2 = `unable to parse duration zs: time: invalid duration zs`
expected2 = `unable to parse duration zs: time: invalid duration "zs"`
_, err = client.histogramHandlerParseTimeRangeQuery(r)
if err == nil || err.Error() != expected2 {
t.Errorf("expected %s got %s", expected2, err.Error())
Expand Down
2 changes: 1 addition & 1 deletion pkg/proxy/origins/irondb/handler_rollup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestRollupHandlerParseTimeRangeQuery(t *testing.T) {
// unparsable rollup_span param
r.URL.RawQuery = "start_ts=9012&end_ts=3456&rollup_span=pqrs"
_, err = client.rollupHandlerParseTimeRangeQuery(r)
expectedS = `unable to parse duration pqrs: time: invalid duration pqrs`
expectedS = `unable to parse duration pqrs: time: invalid duration "pqrs"`
if err.Error() != expectedS {
t.Errorf("expected %s got %s", expectedS, err.Error())
}
Expand Down
Loading

0 comments on commit 5ebdfcd

Please sign in to comment.