Skip to content

Commit

Permalink
feat(outputs.sql): Add column (if missing), use Prepared statements
Browse files Browse the repository at this point in the history
Automate table updates, when the table does not contain a field

Default to prepared statements for table insert
  • Loading branch information
hautecodure committed Feb 24, 2025
1 parent 01c633f commit 30b5ab0
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 29 deletions.
24 changes: 22 additions & 2 deletions plugins/outputs/sql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ driver selected.
Through the nature of the inputs plugins, the amounts of columns inserted within
rows for a given metric may differ. Since the tables are created based on the
tags and fields available within an input metric, it's possible the created
table won't contain all the necessary columns. You might need to initialize
the schema yourself, to avoid this scenario.
table won't contain all the necessary columns.

## Advanced options

Expand Down Expand Up @@ -106,6 +105,17 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## {TABLE} - tablename as a quoted identifier
# table_exists_template = "SELECT 1 FROM {TABLE} LIMIT 1"

## Table update template
## Available template variables:
## {TABLE} - table name as a quoted identifier
## {COLUMNS} - column definitions (list of quoted identifiers and types)
# column_template = "ALTER TABLE {TABLE} ADD COLUMN IF NOT EXISTS {COLUMN}"

## Table columns query template
## Available template variables:
## {TABLE} - table name as a quoted identifier
# column_query_template = "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME={TABLE}"

## Initialization SQL
# init_sql = ""

Expand Down Expand Up @@ -183,6 +193,16 @@ It is not supported on windows/386, mips, and mips64 platforms.
The DSN is a filename or url with scheme "file:". See the [driver
docs](https://modernc.org/sqlite) for details.

As SQLite does not support `INFORMATION_SCHEMA`, you'll have to update
`column_template` to something like:

```toml
# Save metrics to an SQL Database
[[outputs.sql]]
## ...
column_query_template = "SELECT name AS column_name FROM pragma_table_info({TABLE})"
```

### clickhouse

#### DSN
Expand Down
11 changes: 11 additions & 0 deletions plugins/outputs/sql/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@
## {TABLE} - tablename as a quoted identifier
# table_exists_template = "SELECT 1 FROM {TABLE} LIMIT 1"

## Table update template
## Available template variables:
## {TABLE} - table name as a quoted identifier
## {COLUMNS} - column definitions (list of quoted identifiers and types)
# column_template = "ALTER TABLE {TABLE} ADD COLUMN IF NOT EXISTS {COLUMN}"

## Table columns query template
## Available template variables:
## {TABLE} - table name as a quoted identifier
# column_query_template = "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME={TABLE}"

## Initialization SQL
# init_sql = ""

Expand Down
162 changes: 135 additions & 27 deletions plugins/outputs/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
_ "embed"
"fmt"
"net/url"
"slices"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -45,6 +46,8 @@ type SQL struct {
TimestampColumn string `toml:"timestamp_column"`
TableTemplate string `toml:"table_template"`
TableExistsTemplate string `toml:"table_exists_template"`
ColumnTemplate string `toml:"column_template"`
ColumnQueryTemplate string `toml:"column_query_template"`
InitSQL string `toml:"init_sql"`
Convert ConvertStruct `toml:"convert"`
ConnectionMaxIdleTime config.Duration `toml:"connection_max_idle_time"`
Expand All @@ -54,7 +57,7 @@ type SQL struct {
Log telegraf.Logger `toml:"-"`

db *gosql.DB
tables map[string]bool
tables map[string][]string
}

func (*SQL) SampleConfig() string {
Expand Down Expand Up @@ -90,7 +93,7 @@ func (p *SQL) Connect() error {
}

p.db = db
p.tables = make(map[string]bool)
p.tables = make(map[string][]string)

return nil
}
Expand Down Expand Up @@ -176,6 +179,18 @@ func (p *SQL) generateCreateTable(metric telegraf.Metric) string {
return query
}

func (p *SQL) generateAddColumn(tablename string, column string, columnType string) string {
var columnContent string

columnContent = fmt.Sprintf("%s %s", quoteIdent(column), columnType)

query := p.ColumnTemplate
query = strings.ReplaceAll(query, "{TABLE}", quoteIdent(tablename))
query = strings.ReplaceAll(query, "{COLUMN}", columnContent)

return query
}

func (p *SQL) generateInsert(tablename string, columns []string) string {
placeholders := make([]string, 0, len(columns))
quotedColumns := make([]string, 0, len(columns))
Expand Down Expand Up @@ -207,21 +222,94 @@ func (p *SQL) tableExists(tableName string) bool {
return err == nil
}

func (p *SQL) fetchColumns(tableName string) error {
stmt := p.ColumnQueryTemplate
stmt = strings.ReplaceAll(stmt, "{TABLE}", quoteStr(tableName))

columns, err := p.db.Query(stmt)

if err != nil {
return err
}

defer columns.Close()

for columns.Next() {
var columnName string

if err := columns.Scan(&columnName); err != nil {
return err
}

if !slices.Contains(p.tables[tableName], columnName) {
p.tables[tableName] = append(p.tables[tableName], columnName)
}
}

return nil
}

func (p *SQL) addColumnIfDoesNotExist(tablename string, column string, columnType string) error {
if !slices.Contains(p.tables[tablename], column) {
createColumn := p.generateAddColumn(tablename, column, columnType)

_, err := p.db.Exec(createColumn)

if err != nil {
return err
}

err = p.fetchColumns(tablename)

if err != nil {
return err
}
}

return nil
}

func (p *SQL) sortFields(columns []string, params []interface{}) ([]string, []interface{}) {
sortMap := make(map[string]int)

sortedColumns := slices.Clone(columns)

slices.Sort(sortedColumns)

for i, column := range sortedColumns {
sortMap[column] = i
}

sortedParams := make([]interface{}, len(params))

for i := range len(params) {
sortedParams[sortMap[columns[i]]] = params[i]
}

return sortedColumns, sortedParams
}

func (p *SQL) Write(metrics []telegraf.Metric) error {
var err error
batchMetrics := make(map[string][][]interface{})

for _, metric := range metrics {
tablename := metric.Name()

// create table if needed
if !p.tables[tablename] && !p.tableExists(tablename) {
if _, isTableCached := p.tables[tablename]; !isTableCached {
createStmt := p.generateCreateTable(metric)
_, err := p.db.Exec(createStmt)

if err != nil {
return err
}

err = p.fetchColumns(tablename)

if err != nil {
return err
}
}
p.tables[tablename] = true

var columns []string
var values []interface{}
Expand All @@ -234,43 +322,61 @@ func (p *SQL) Write(metrics []telegraf.Metric) error {
for column, value := range metric.Tags() {
columns = append(columns, column)
values = append(values, value)

err := p.addColumnIfDoesNotExist(tablename, column, p.Convert.Text)

if err != nil {
return err
}
}

for column, value := range metric.Fields() {
columns = append(columns, column)
values = append(values, value)
}

sql := p.generateInsert(tablename, columns)
err := p.addColumnIfDoesNotExist(tablename, column, p.deriveDatatype(value))

switch p.Driver {
case "clickhouse":
// ClickHouse needs to batch inserts with prepared statements
tx, err := p.db.Begin()
if err != nil {
return fmt.Errorf("begin failed: %w", err)
}
stmt, err := tx.Prepare(sql)
if err != nil {
return fmt.Errorf("prepare failed: %w", err)
return err
}
defer stmt.Close() //nolint:revive,gocritic // done on purpose, closing will be executed properly
}

_, err = stmt.Exec(values...)
if err != nil {
return fmt.Errorf("execution failed: %w", err)
}
err = tx.Commit()
if err != nil {
return fmt.Errorf("commit failed: %w", err)
}
default:
_, err = p.db.Exec(sql, values...)
// sort the fields to improve the "Prepared" statement throughput
sortedColumns, sortedParams := p.sortFields(columns, values)

sql := p.generateInsert(tablename, sortedColumns)

batchMetrics[sql] = append(batchMetrics[sql], sortedParams)
}

for sql, sliceParams := range batchMetrics {
tx, err := p.db.Begin()
if err != nil {
return fmt.Errorf("begin failed: %w", err)
}

batch, err := tx.Prepare(sql)
if err != nil {
return fmt.Errorf("prepare failed: %w", err)
}
defer batch.Close() //nolint:revive,gocritic // done on purpose, closing will be executed properly

for _, params := range sliceParams {
_, err = batch.Exec(params...)
if err != nil {
if rollbackErr := tx.Rollback(); rollbackErr != nil {
return fmt.Errorf("execution failed: %w, unable to rollback: %w", err, rollbackErr)
}
return fmt.Errorf("execution failed: %w", err)
}
}

err = tx.Commit()
if err != nil {
return fmt.Errorf("commit failed: %w", err)
}
}

return nil
}

Expand All @@ -282,6 +388,8 @@ func newSQL() *SQL {
return &SQL{
TableTemplate: "CREATE TABLE {TABLE}({COLUMNS})",
TableExistsTemplate: "SELECT 1 FROM {TABLE} LIMIT 1",
ColumnTemplate: "ALTER TABLE {TABLE} ADD COLUMN IF NOT EXISTS {COLUMN}",
ColumnQueryTemplate: "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME={TABLE}",
TimestampColumn: "timestamp",
Convert: ConvertStruct{
Integer: "INT",
Expand Down
1 change: 1 addition & 0 deletions plugins/outputs/sql/sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func TestSqlite(t *testing.T) {
p.Log = testutil.Logger{}
p.Driver = "sqlite"
p.DataSourceName = address
p.ColumnQueryTemplate = "SELECT name AS column_name FROM pragma_table_info({TABLE})"

require.NoError(t, p.Connect())
defer p.Close()
Expand Down

0 comments on commit 30b5ab0

Please sign in to comment.