Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(outputs.sql): Add option to automate table schema updates #16544

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions plugins/outputs/sql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ driver selected.
Through the nature of the inputs plugins, the amounts of columns inserted within
rows for a given metric may differ. Since the tables are created based on the
tags and fields available within an input metric, it's possible the created
table won't contain all the necessary columns. You might need to initialize
the schema yourself, to avoid this scenario.
table won't contain all the necessary columns. If you wish to automate table
updates, check out the [Schema updates](#schema-updates) section for more info.

## Advanced options

Expand Down Expand Up @@ -110,6 +110,15 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## {TABLE} - tablename as a quoted identifier
# table_exists_template = "SELECT 1 FROM {TABLE} LIMIT 1"

## Table update template, available template variables:
## {TABLE} - table name as a quoted identifier
## {COLUMNS} - column definitions (list of quoted identifiers and types)
## NOTE: Ensure the user (you're using to write to the database) has necessary permissions
##
## Use the following setting for automatically adding columns:
## table_update_template = "ALTER TABLE {TABLE} ADD COLUMN {COLUMN}"
# table_update_template = ""

## Initialization SQL
# init_sql = ""

Expand Down Expand Up @@ -155,6 +164,29 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
# # conversion_style = "unsigned_suffix"
```

## Schema updates

The default behavior of this plugin is to create a schema for the table,
based on the current metric (for both fields and tags). However, writing
subsequent metrics with additional fields or tags will result in errors.

If you wish the plugin to sync the column-schema for every metric,
uncomment the `table_update_template` setting in your config file.

> [!NOTE] The default setting is a generic query that your database may
> or may not support. Consult your database's documentation for proper
> syntax and table options.

```toml
# Save metrics to an SQL Database
[[outputs.sql]]
## Table update template, available template variables:
## {TABLE} - table name as a quoted identifier
## {COLUMNS} - column definitions (list of quoted identifiers and types)
## NOTE: Ensure the user (you're using to write to the database) has necessary permissions
table_update_template = "ALTER TABLE {TABLE} ADD COLUMN {COLUMN}"
```

## Driver-specific information

### go-sql-driver/mysql
Expand Down
9 changes: 9 additions & 0 deletions plugins/outputs/sql/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@
## {TABLE} - tablename as a quoted identifier
# table_exists_template = "SELECT 1 FROM {TABLE} LIMIT 1"

## Table update template, available template variables:
## {TABLE} - table name as a quoted identifier
## {COLUMNS} - column definitions (list of quoted identifiers and types)
## NOTE: Ensure the user (you're using to write to the database) has necessary permissions
##
## Use the following setting for automatically adding columns:
## table_update_template = "ALTER TABLE {TABLE} ADD COLUMN {COLUMN}"
# table_update_template = ""

## Initialization SQL
# init_sql = ""

Expand Down
129 changes: 108 additions & 21 deletions plugins/outputs/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,23 @@ type ConvertStruct struct {
}

type SQL struct {
Driver string `toml:"driver"`
DataSourceName string `toml:"data_source_name"`
TimestampColumn string `toml:"timestamp_column"`
TableTemplate string `toml:"table_template"`
TableExistsTemplate string `toml:"table_exists_template"`
InitSQL string `toml:"init_sql"`
Convert ConvertStruct `toml:"convert"`
ConnectionMaxIdleTime config.Duration `toml:"connection_max_idle_time"`
ConnectionMaxLifetime config.Duration `toml:"connection_max_lifetime"`
ConnectionMaxIdle int `toml:"connection_max_idle"`
ConnectionMaxOpen int `toml:"connection_max_open"`
Log telegraf.Logger `toml:"-"`
Driver string `toml:"driver"`
DataSourceName string `toml:"data_source_name"`
TimestampColumn string `toml:"timestamp_column"`
TableTemplate string `toml:"table_template"`
TableExistsTemplate string `toml:"table_exists_template"`
TableUpdateTemplate string `toml:"table_update_template"`
TableListColumnsTemplate string `toml:"-"`
InitSQL string `toml:"init_sql"`
Convert ConvertStruct `toml:"convert"`
ConnectionMaxIdleTime config.Duration `toml:"connection_max_idle_time"`
ConnectionMaxLifetime config.Duration `toml:"connection_max_lifetime"`
ConnectionMaxIdle int `toml:"connection_max_idle"`
ConnectionMaxOpen int `toml:"connection_max_open"`
Log telegraf.Logger `toml:"-"`

db *gosql.DB
tables map[string]bool
tables map[string]map[string]bool
}

func (*SQL) SampleConfig() string {
Expand Down Expand Up @@ -87,7 +89,7 @@ func (p *SQL) Connect() error {
}

p.db = db
p.tables = make(map[string]bool)
p.tables = make(map[string]map[string]bool)

return nil
}
Expand Down Expand Up @@ -177,6 +179,14 @@ func (p *SQL) generateCreateTable(metric telegraf.Metric) string {
return query
}

func (p *SQL) generateAddColumn(tablename, column, columnType string) string {
query := p.TableUpdateTemplate
query = strings.ReplaceAll(query, "{TABLE}", quoteIdent(tablename))
query = strings.ReplaceAll(query, "{COLUMN}", quoteIdent(column)+" "+columnType)

return query
}

func (p *SQL) generateInsert(tablename string, columns []string) string {
placeholders := make([]string, 0, len(columns))
quotedColumns := make([]string, 0, len(columns))
Expand All @@ -201,28 +211,95 @@ func (p *SQL) generateInsert(tablename string, columns []string) string {
strings.Join(placeholders, ","))
}

func (p *SQL) createTable(metric telegraf.Metric) error {
tablename := metric.Name()
if _, found := p.tables[tablename]; !found && !p.tableExists(tablename) {
stmt := p.generateCreateTable(metric)
if _, err := p.db.Exec(stmt); err != nil {
return fmt.Errorf("creating table failed: %w", err)
}
p.tables[tablename] = make(map[string]bool)
if err := p.updateTableCache(tablename); err != nil {
return fmt.Errorf("updating table cache failed: %w", err)
}
}
return nil
}

func (p *SQL) createColumn(tablename, column, columnType string) error {
// modifying the table schema is opt-in
if p.TableUpdateTemplate == "" {
return nil
}

// update table cache with the latest table schema
// some servers may lack support for "IF NOT EXIST"
// clause in which case, we don't want to burden users
// with writing complex "IF NOT EXIST" workarounds
if !p.tables[tablename][column] {
if err := p.updateTableCache(tablename); err != nil {
return fmt.Errorf("updating table cache failed: %w", err)
}
}

if !p.tables[tablename][column] {
createColumn := p.generateAddColumn(tablename, column, columnType)
if _, err := p.db.Exec(createColumn); err != nil {
return fmt.Errorf("creating column failed: %w", err)
}
if err := p.updateTableCache(tablename); err != nil {
return fmt.Errorf("updating table cache failed: %w", err)
}
}

return nil
}

func (p *SQL) tableExists(tableName string) bool {
stmt := strings.ReplaceAll(p.TableExistsTemplate, "{TABLE}", quoteIdent(tableName))

_, err := p.db.Exec(stmt)
return err == nil
}

func (p *SQL) updateTableCache(tablename string) error {
// modifying the table schema is opt-in
if p.TableUpdateTemplate == "" {
return nil
}

stmt := strings.ReplaceAll(p.TableListColumnsTemplate, "{TABLE}", quoteStr(tablename))

columns, err := p.db.Query(stmt)
if err != nil {
return fmt.Errorf("fetching columns for table(%s) failed: %w", tablename, err)
}
defer columns.Close()

for columns.Next() {
var columnName string
if err := columns.Scan(&columnName); err != nil {
return err
}

if !p.tables[tablename][columnName] {
p.tables[tablename][columnName] = true
}
}

return nil
}

func (p *SQL) Write(metrics []telegraf.Metric) error {
var err error

for _, metric := range metrics {
tablename := metric.Name()

// create table if needed
if !p.tables[tablename] && !p.tableExists(tablename) {
createStmt := p.generateCreateTable(metric)
_, err := p.db.Exec(createStmt)
if err != nil {
return err
}
if err := p.createTable(metric); err != nil {
return err
Comment on lines -218 to +301
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I would like to keep the condition here as otherwise this reads like "always create the table"!

Copy link
Author

@hautecodure hautecodure Mar 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to keep the Write loop easier to read and handle the minutia in the respective function.

Should i revert back to the previous statement?

}
p.tables[tablename] = true

var columns []string
var values []interface{}
Expand All @@ -242,6 +319,12 @@ func (p *SQL) Write(metrics []telegraf.Metric) error {
values = append(values, value)
}

for i := range len(columns) {
if err := p.createColumn(tablename, columns[i], p.deriveDatatype(values[i])); err != nil {
return err
}
}

sql := p.generateInsert(tablename, columns)

switch p.Driver {
Expand Down Expand Up @@ -289,6 +372,10 @@ func (p *SQL) Init() error {
p.TableTemplate = "CREATE TABLE {TABLE}({COLUMNS})"
}
}
p.TableListColumnsTemplate = "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME={TABLE}"
if p.Driver == "sqlite" {
p.TableListColumnsTemplate = "SELECT name AS column_name FROM pragma_table_info({TABLE})"
}

return nil
}
Expand Down
Loading
Loading