Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enh:Add TDengine benchmark support #1

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ loaders: tsbs_load \
tsbs_load_siridb \
tsbs_load_timescaledb \
tsbs_load_victoriametrics \
tsbs_load_questdb
tsbs_load_questdb \
tsbs_load_tdengine

runners: tsbs_run_queries_akumuli \
tsbs_run_queries_cassandra \
Expand All @@ -38,7 +39,8 @@ runners: tsbs_run_queries_akumuli \
tsbs_run_queries_timescaledb \
tsbs_run_queries_timestream \
tsbs_run_queries_victoriametrics \
tsbs_run_queries_questdb
tsbs_run_queries_questdb \
tsbs_run_queries_tdengine

test:
$(GOTEST) -v ./...
Expand Down
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Current databases supported:
+ TimescaleDB [(supplemental docs)](docs/timescaledb.md)
+ Timestream [(supplemental docs)](docs/timestream.md)
+ VictoriaMetrics [(supplemental docs)](docs/victoriametrics.md)
+ TDengine [(supplemental docs)](docs/tdengine.md)

## Overview

Expand Down Expand Up @@ -81,6 +82,7 @@ cases are implemented for each database:
|TimescaleDB|X|X|
|Timestream|X||
|VictoriaMetrics|X²||
|TDengine|X|X|

¹ Does not support the `groupby-orderby-limit` query
² Does not support the `groupby-orderby-limit`, `lastpoint`, `high-cpu-1`, `high-cpu-all` queries
Expand Down Expand Up @@ -112,6 +114,10 @@ $ cd $GOPATH/src/github.com/timescale/tsbs
$ make
```

**`Notice`**

The make process will failed if the TDengine client isn't installed. Please refer to the [supplementary documentation for TDengine](docs/tdengine.md).

## How to use TSBS

Using TSBS for benchmarking involves 3 phases: data and query
Expand All @@ -135,7 +141,7 @@ Variables needed:
1. how much time should be between each reading per device, in seconds. E.g., `10s`
1. and which database(s) you want to generate for. E.g., `timescaledb`
(choose from `cassandra`, `clickhouse`, `cratedb`, `influx`, `mongo`, `questdb`, `siridb`,
`timescaledb` or `victoriametrics`)
`timescaledb` `victoriametrics` or `tdengine`)

Given the above steps you can now generate a dataset (or multiple
datasets, if you chose to generate for multiple databases) that can
Expand Down
5 changes: 4 additions & 1 deletion cmd/tsbs_generate_data/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
// MongoDB BSON format
// TimescaleDB pseudo-CSV format (the same as for ClickHouse)
// VictoriaMetrics bulk load format (the same as for InfluxDB)
// TDengine pseudo-CSV format

// Supported use cases:
// devops: scale is the number of hosts to simulate, with log messages
// every log-interval seconds.
//
// every log-interval seconds.
//
// cpu-only: same as `devops` but only generate metrics for CPU
package main

Expand Down
16 changes: 10 additions & 6 deletions cmd/tsbs_generate_queries/databases/akumuli/devops.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,11 @@ type tsdbAggregateAllQuery struct {
// SELECT minute, max(metric1), ..., max(metricN)
// FROM cpu
// WHERE
// (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N')
// AND time >= '$HOUR_START'
// AND time < '$HOUR_END'
//
// (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N')
// AND time >= '$HOUR_START'
// AND time < '$HOUR_END'
//
// GROUP BY minute
// ORDER BY minute ASC
//
Expand Down Expand Up @@ -176,9 +178,11 @@ func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) {
// SELECT MAX(metric1), ..., MAX(metricN)
// FROM cpu
// WHERE
// (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N')
// AND time >= '$HOUR_START'
// AND time < '$HOUR_END'
//
// (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N')
// AND time >= '$HOUR_START'
// AND time < '$HOUR_END'
//
// GROUP BY hour
// ORDER BY hour
//
Expand Down
16 changes: 10 additions & 6 deletions cmd/tsbs_generate_queries/databases/clickhouse/devops.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@ const clickhouseTimeStringFormat = "2006-01-02 15:04:05"
// SELECT MAX(metric1), ..., MAX(metricN)
// FROM cpu
// WHERE
// (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N')
// AND time >= '$HOUR_START'
// AND time < '$HOUR_END'
//
// (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N')
// AND time >= '$HOUR_START'
// AND time < '$HOUR_END'
//
// GROUP BY hour
// ORDER BY hour
//
Expand Down Expand Up @@ -290,9 +292,11 @@ func (d *Devops) LastPointPerHost(qi query.Query) {
// SELECT minute, max(metric1), ..., max(metricN)
// FROM cpu
// WHERE
// (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N')
// AND time >= '$HOUR_START'
// AND time < '$HOUR_END'
//
// (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N')
// AND time >= '$HOUR_START'
// AND time < '$HOUR_END'
//
// GROUP BY minute
// ORDER BY minute ASC
//
Expand Down
3 changes: 2 additions & 1 deletion cmd/tsbs_generate_queries/databases/cratedb/devops.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ const hostnameField = "tags['hostname']"
// a set of column idents.
//
// For instance:
// max(cpu_time) AS max_cpu_time
//
// max(cpu_time) AS max_cpu_time
func (d *Devops) getSelectAggClauses(aggFunc string, idents []string) []string {
selectAggClauses := make([]string, len(idents))
for i, ident := range idents {
Expand Down
3 changes: 2 additions & 1 deletion cmd/tsbs_generate_queries/databases/questdb/devops.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type Devops struct {
// a set of column idents.
//
// For instance:
// max(cpu_time) AS max_cpu_time
//
// max(cpu_time) AS max_cpu_time
func (d *Devops) getSelectAggClauses(aggFunc string, idents []string) []string {
selectAggClauses := make([]string, len(idents))
for i, ident := range idents {
Expand Down
59 changes: 59 additions & 0 deletions cmd/tsbs_generate_queries/databases/tdengine/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package tdengine

import (
"time"

"github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops"
"github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/iot"
"github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils"
"github.com/timescale/tsbs/pkg/query"
)

// BaseGenerator contains settings specific for Influx database.
type BaseGenerator struct {
}

func (g *BaseGenerator) GenerateEmptyQuery() query.Query {
return query.NewTDengine()
}

// fillInQuery fills the query struct with data.
func (g *BaseGenerator) fillInQuery(qi query.Query, humanLabel, humanDesc, table, sql string) {
q := qi.(*query.TDengine)
q.HumanLabel = []byte(humanLabel)
q.HumanDescription = []byte(humanDesc)
q.Hypertable = []byte(table)
q.SqlQuery = []byte(sql)
}

// NewDevops creates a new devops use case query generator.
func (g *BaseGenerator) NewDevops(start, end time.Time, scale int) (utils.QueryGenerator, error) {
core, err := devops.NewCore(start, end, scale)

if err != nil {
return nil, err
}

devops := &Devops{
BaseGenerator: g,
Core: core,
}

return devops, nil
}

// NewIoT creates a new iot use case query generator.
func (g *BaseGenerator) NewIoT(start, end time.Time, scale int) (utils.QueryGenerator, error) {
core, err := iot.NewCore(start, end, scale)

if err != nil {
return nil, err
}

iot := &IoT{
BaseGenerator: g,
Core: core,
}

return iot, nil
}
174 changes: 174 additions & 0 deletions cmd/tsbs_generate_queries/databases/tdengine/devops.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package tdengine

import (
"fmt"
"strings"
"time"

"github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops"
"github.com/timescale/tsbs/pkg/query"
)

// TODO: Remove the need for this by continuing to bubble up errors
func panicIfErr(err error) {
if err != nil {
panic(err.Error())
}
}

// Devops produces TimescaleDB-specific queries for all the devops query types.
type Devops struct {
*BaseGenerator
*devops.Core
}

// getHostWhereWithHostnames creates WHERE SQL statement for multiple hostnames.
// NOTE 'WHERE' itself is not included, just hostname filter clauses, ready to concatenate to 'WHERE' string
func (d *Devops) getHostWhereWithHostnames(hostnames []string) string {
var hostnameClauses []string
for _, s := range hostnames {
hostnameClauses = append(hostnameClauses, fmt.Sprintf("'%s'", s))
}
return fmt.Sprintf("tbname IN (%s)", strings.Join(hostnameClauses, ","))
}

// getHostWhereString gets multiple random hostnames and creates a WHERE SQL statement for these hostnames.
func (d *Devops) getHostWhereString(nHosts int) string {
hostnames, err := d.GetRandomHosts(nHosts)
panicIfErr(err)
return d.getHostWhereWithHostnames(hostnames)
}

func (d *Devops) getSelectClausesAggMetrics(agg string, metrics []string) []string {
selectClauses := make([]string, len(metrics))
for i, m := range metrics {
selectClauses[i] = fmt.Sprintf("%s(%s)", agg, m)
}

return selectClauses
}

func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) {
interval := d.Interval.MustRandWindow(timeRange)
metrics, err := devops.GetCPUMetricsSlice(numMetrics)
panicIfErr(err)
selectClauses := d.getSelectClausesAggMetrics("max", metrics)
if len(selectClauses) < 1 {
panic(fmt.Sprintf("invalid number of select clauses: got %d", len(selectClauses)))
}

//SELECT _wstart as ts,max(usage_user) FROM cpu WHERE tbname IN ('host_249') AND ts >= 1451618560000 AND ts < 1451622160000 INTERVAL(1m) ;
//SELECT _wstart as ts,max(usage_user) FROM host_249 WHERE ts >= 1451618560000 AND ts < 1451622160000 INTERVAL(1m) ;
sql := ""
if nHosts == 1 {
hostnames, err := d.GetRandomHosts(nHosts)
panicIfErr(err)
sql = fmt.Sprintf(`SELECT _wstart as ts,%s FROM %s WHERE ts >= %d AND ts < %d INTERVAL(1m)`,
strings.Join(selectClauses, ", "),
hostnames[0],
interval.StartUnixMillis(),
interval.EndUnixMillis())

} else {
sql = fmt.Sprintf(`SELECT _wstart as ts,%s FROM cpu WHERE %s AND ts >= %d AND ts < %d INTERVAL(1m)`,
strings.Join(selectClauses, ", "),
d.getHostWhereString(nHosts),
interval.StartUnixMillis(),
interval.EndUnixMillis())
}

humanLabel := fmt.Sprintf("TDengine %d cpu metric(s), random %4d hosts, random %s by 1m", numMetrics, nHosts, timeRange)
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString())
d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql)
}

func (d *Devops) GroupByOrderByLimit(qi query.Query) {
interval := d.Interval.MustRandWindow(time.Hour)
//SELECT _wstart as ts,max(usage_user) FROM cpu WHERE ts < 1451618228646 INTERVAL(1m) LIMIT 5;
sql := fmt.Sprintf(`SELECT _wstart as ts,max(usage_user) FROM cpu WHERE ts < %d INTERVAL(1m) LIMIT 5`,
interval.EndUnixMillis())

humanLabel := "TDengine max cpu over last 5 min-intervals (random end)"
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.EndString())
d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql)
}

// GroupByTimeAndPrimaryTag selects the AVG of numMetrics metrics under 'cpu' per device per hour for a day,
func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) {
metrics, err := devops.GetCPUMetricsSlice(numMetrics)
panicIfErr(err)
interval := d.Interval.MustRandWindow(devops.DoubleGroupByDuration)

selectClauses := d.getSelectClausesAggMetrics("avg", metrics)
//SELECT _wstart as ts,tbname, avg(usage_user) from cpu where ts >= 1451733760646 and ts < 1451776960646 partition by tbname interval(1h) order by tbname,ts asc;
//SELECT _wstart as ts,tbname, avg(usage_user), avg(usage_system), avg(usage_idle), avg(usage_nice), avg(usage_iowait), avg(usage_irq), avg(usage_softirq), avg(usage_steal), avg(usage_guest), avg(usage_guest_nice) from cpu where ts >= 1451733760646 and ts < 1451776960646 partition by tbname interval(1h) order by tbname,ts asc;
sql := fmt.Sprintf("SELECT _wstart as ts,tbname,%s from cpu where ts >= %d and ts < %d partition by tbname interval(1h) order by tbname,ts asc", strings.Join(selectClauses, ", "), interval.StartUnixMillis(), interval.EndUnixMillis())

humanLabel := devops.GetDoubleGroupByLabel("TDengine", numMetrics)
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString())
d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql)
}

func (d *Devops) MaxAllCPU(qi query.Query, nHosts int, duration time.Duration) {
interval := d.Interval.MustRandWindow(duration)

metrics := devops.GetAllCPUMetrics()
selectClauses := d.getSelectClausesAggMetrics("max", metrics)
//SELECT _wstart as ts,max(usage_user), max(usage_system), max(usage_idle), max(usage_nice), max(usage_iowait), max(usage_irq), max(usage_softirq), max(usage_steal), max(usage_guest), max(usage_guest_nice) FROM host_249 WHERE ts >= 1451648911646 AND ts < 1451677711646 interval(1h);
//SELECT_wstart as ts, max(usage_user), max(usage_system), max(usage_idle), max(usage_nice), max(usage_iowait), max(usage_irq), max(usage_softirq), max(usage_steal), max(usage_guest), max(usage_guest_nice) FROM cpu WHERE tbname IN ('host_249','host_403','host_435','host_39','host_139','host_75','host_315','host_121') AND ts >= 1451648911646 AND ts < 1451677711646 interval(1h)

sql := ""
if nHosts == 1 {
hostnames, err := d.GetRandomHosts(nHosts)
panicIfErr(err)
sql = fmt.Sprintf(`SELECT _wstart as ts,%s FROM %s WHERE ts >= %d AND ts < %d interval(1h)`,
strings.Join(selectClauses, ", "),
hostnames[0],
interval.StartUnixMillis(),
interval.EndUnixMillis())
} else {
sql = fmt.Sprintf(`SELECT _wstart as ts,%s FROM cpu WHERE %s AND ts >= %d AND ts < %d interval(1h)`,
strings.Join(selectClauses, ", "),
d.getHostWhereString(nHosts),
interval.StartUnixMillis(),
interval.EndUnixMillis())
}
humanLabel := devops.GetMaxAllLabel("TDengine", nHosts)
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString())
d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql)
}

func (d *Devops) LastPointPerHost(qi query.Query) {
//SELECT last_row(*),tbname from cpu group by tbname;
sql := "SELECT last_row(*),tbname from cpu group by tbname"
humanLabel := "TDengine last row per host"
humanDesc := humanLabel
d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql)
}

func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) {
interval := d.Interval.MustRandWindow(devops.HighCPUDuration)
var hostWhereClause string
if nHosts == 0 {
hostWhereClause = ""
} else {
hostWhereClause = fmt.Sprintf("AND %s", d.getHostWhereString(nHosts))
}
//SELECT ts,usage_user,usage_system,usage_idle,usage_nice,usage_iowait,usage_irq,usage_softirq,usage_steal,usage_guest,usage_guest_nice FROM cpu WHERE usage_user > 90.0 and ts >= 1451777731138 AND ts < 1451820931138 AND tbname IN ('host_35')
//modify:SELECT * FROM host_35 WHERE usage_user > 90.0 and ts >= 1451777731138 AND ts < 1451820931138

sql := ""
if nHosts == 1 {
hostnames, err := d.GetRandomHosts(nHosts)
panicIfErr(err)
sql = fmt.Sprintf(`SELECT * FROM %s WHERE usage_user > 90.0 and ts >= %d AND ts < %d `,
hostnames[0], interval.StartUnixMillis(), interval.EndUnixMillis())
} else {
sql = fmt.Sprintf(`SELECT ts,usage_user,usage_system,usage_idle,usage_nice,usage_iowait,usage_irq,usage_softirq,usage_steal,usage_guest,usage_guest_nice FROM cpu WHERE usage_user > 90.0 and ts >= %d AND ts < %d %s`,
interval.StartUnixMillis(), interval.EndUnixMillis(), hostWhereClause)
}
humanLabel, err := devops.GetHighCPULabel("TDengine", nHosts)
panicIfErr(err)
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString())
d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql)
}
Loading