Skip to content

taosdata/driver-go

Repository files navigation

Go Connector for TDengine

Build Status

English | 简体中文

[TDengine] provides Go database/sql driver as [taosSql][driver-go].

Remind

v2 is not compatible with v3 version and corresponds to the TDengine version as follows:

driver-go version TDengine version major features
v3.6.0 3.3.5.0 and above stmt2 native interface, DSN supports passwords containing special characters (url.QueryEscape)
v3.5.8 3.3.2.0 and above / 3.1.2.0 and above fix null pointer exception
v3.5.7 3.3.2.0 and above / 3.1.2.0 and above taosWS and taosRestful support request id
v3.5.6 3.3.2.0 and above / 3.1.2.0 and above WebSocket performance improvements
v3.5.5 3.2.3.0 and above / 3.1.1.27 and above support skip http ssl check
v3.5.4 3.2.3.0 and above / 3.1.1.27 and above compatible with TDengine 3.3.0.0 tmq raw data
v3.5.3 3.2.3.0 and above / 3.1.1.27 and above refactor taosWS
v3.5.2 3.2.3.0 and above / 3.1.1.27 and above websocket compression and optimize tmq poll
v3.5.1 3.2.1.0 and above / 3.1.1.13 and above native stmt query and geometry support
v3.5.0 3.0.5.0 and above tmq: get assignment and seek offset
v3.3.1 3.0.4.1 and above schemaless insert over websocket
v3.1.0 3.0.2.2 and above provide tmq apis close to kafka
v3.0.4 3.0.2.2 and above add request id
v3.0.3 3.0.1.5 and above statement insert over websocket
v3.0.2 3.0.1.5 and above bulk pulling over websocket
v3.0.1 3.0.0.0 and above tmq over websocket
v3.0.0 3.0.0.0 and above adapt to TDengine 3.0 query and insert

Install

Go 1.14+ is highly recommended for newly created projects.

go mod init taos-demo

import taosSql:

import (
    "database/sql"
    _ "github.com/taosdata/driver-go/v3/taosSql"
)

Use go mod for module management:

go mod tidy

Or go get to directly install it:

go get github.com/taosdata/driver-go/v3/taosSql

Usage

database/sql Standard

A simple use case:

package main

import (
    "database/sql"
    "fmt"
    "time"

    _ "github.com/taosdata/driver-go/v3/taosSql"
)

func main() {
    var taosUri = "root:taosdata@tcp(localhost:6030)/"
    taos, err := sql.Open("taosSql", taosUri)
    if err != nil {
        fmt.Println("failed to connect TDengine, err:", err)
        return
    }
    defer taos.Close()
    taos.Exec("create database if not exists test")
    taos.Exec("use test")
    taos.Exec("create table if not exists tb1 (ts timestamp, a int)")
    _, err = taos.Exec("insert into tb1 values(now, 0)(now+1s,1)(now+2s,2)(now+3s,3)")
    if err != nil {
        fmt.Println("failed to insert, err:", err)
        return
    }
    rows, err := taos.Query("select * from tb1")
    if err != nil {
        fmt.Println("failed to select from table, err:", err)
        return
    }

    defer rows.Close()
    for rows.Next() {
        var r struct {
            ts time.Time
            a  int
        }
        err := rows.Scan(&r.ts, &r.a)
        if err != nil {
            fmt.Println("scan error:\n", err)
            return
        }
        fmt.Println(r.ts, r.a)
    }
}

APIs that are worthy to have a check:

  • sql.Open(DRIVER_NAME string, dataSourceName string) *DB

    This API will create a database/sql DB object, results with type *DB. DRIVER_NAME should be set as taosSql, and dataSourceName should be a URI like user:password@tcp(host:port)/dbname. For HA use case, use user:password@cfg(/etc/taos)/dbname to apply configs in /etc/taos/taos.cfg.

  • func (db *DB) Exec(query string, args ...interface{}) (Result, error)

    Execute non resultset SQLs, like create, alter etc.

  • func (db *DB) Query(query string, args ...interface{}) (*Rows, error)

    Execute a query with resultset.

  • func (db *DB) Close() error

    Close an DB object and disconnect.

Subscription

Create consumer:

func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)

Subscribe single topic:

func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error

Subscribe topics:

func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error

Poll message:

func (c *Consumer) Poll(timeoutMs int) tmq.Event

Commit message:

func (c *Consumer) Commit() ([]tmq.TopicPartition, error)

Get assignment:

func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error)

Seek offset:

func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) error

Unsubscribe:

func (c *Consumer) Unsubscribe() error

Close consumer:

func (c *Consumer) Close() error

Example code: examples/tmq/main.go.

schemaless

InfluxDB format:

func (conn *Connector) InfluxDBInsertLines(lines []string, precision string) error

Example code: examples/schemaless/influx/main.go.

OpenTSDB telnet format:

func (conn *Connector) OpenTSDBInsertTelnetLines(lines []string) error

Example code: examples/schemaless/telnet/main.go.

OpenTSDB json format:

func (conn *Connector) OpenTSDBInsertJsonPayload(payload string) error

Example code: examples/schemaless/json/main.go.

stmt insert

Prepare sql:

func (stmt *InsertStmt) Prepare(sql string) error

Set the child table name:

func (stmt *InsertStmt) SetSubTableName(name string) error

Set the table name:

func (stmt *InsertStmt) SetTableName(name string) error

Set the subtable name and tags:

func (stmt *InsertStmt) SetTableNameWithTags(tableName string, tags *param.Param) error

Bind parameters:

func (stmt *InsertStmt) BindParam(params []*param.Param, bindType *param.ColumnType) error

Add batch:

func (stmt *InsertStmt) AddBatch() error

implement:

func (stmt *InsertStmt) Execute() error

Get the number of affected rows:

func (stmt *InsertStmt) GetAffectedRows() int

Close stmt:

func (stmt *InsertStmt) Close() error

Example code: examples/stmtinsert/main.go.

restful implementation of the database/sql standard interface

A simple use case:

package main

import (
    "database/sql"
    "fmt"
    "time"

    _ "github.com/taosdata/driver-go/v3/taosRestful"
)

func main() {
    var taosDSN = "root:taosdata@http(localhost:6041)/"
    taos, err := sql.Open("taosRestful", taosDSN)
    if err != nil {
        fmt.Println("failed to connect TDengine, err:", err)
        return
    }
    defer taos.Close()
    taos.Exec("create database if not exists test")
    taos.Exec("create table if not exists test.tb1 (ts timestamp, a int)")
    _, err = taos.Exec("insert into test.tb1 values(now, 0)(now+1s,1)(now+2s,2)(now+3s,3)")
    if err != nil {
        fmt.Println("failed to insert, err:", err)
        return
    }
    rows, err := taos.Query("select * from test.tb1")
    if err != nil {
        fmt.Println("failed to select from table, err:", err)
        return
    }

    defer rows.Close()
    for rows.Next() {
        var r struct {
            ts time.Time
            a  int
        }
        err := rows.Scan(&r.ts, &r.a)
        if err != nil {
            fmt.Println("scan error:\n", err)
            return
        }
        fmt.Println(r.ts, r.a)
    }
}

Usage of taosRestful

import

import (
    "database/sql"
    _ "github.com/taosdata/driver-go/v3/taosRestful"
)

The driverName of sql.Open is taosRestful

The DSN format is:

database username:database password@connection-method(domain or ip:port)/[database][? Parameter]

Example:

root:taosdata@http(localhost:6041)/test?readBufferSize=52428800

Parameters:

  • disableCompression Whether to accept compressed data, default is true Do not accept compressed data, set to false if the transferred data is compressed using gzip.
  • readBufferSize The default size of the buffer for reading data is 4K (4096), which can be adjusted upwards when there is a lot of data in the query result.
  • skipVerify Whether to skip the verification of the server certificate, the default is false, and the server certificate is verified by default. If the server certificate is not verified, set to true.

Usage restrictions

Since the restful interface is stateless, the use db syntax will not work, you need to put the db name into the sql statement, e.g. create table if not exists tb1 (ts timestamp, a int) to create table if not exists test.tb1 (ts timestamp, a int) otherwise it will report an error [0x217] Database not specified or available.

You can also put the db name in the DSN by changing root:taosdata@http(localhost:6041)/ to root:taosdata@http(localhost:6041)/test. Executing the create database statement when the specified db does not exist will not report an error, while executing other queries or inserts will report an error. The example is as follows:

package main

import (
    "database/sql"
    "fmt"
    "time"

    _ "github.com/taosdata/driver-go/v3/taosRestful"
)

func main() {
    var taosDSN = "root:taosdata@http(localhost:6041)/test"
    taos, err := sql.Open("taosRestful", taosDSN)
    if err != nil {
        fmt.Println("failed to connect TDengine, err:", err)
        return
    }
    defer taos.Close()
    taos.Exec("create database if not exists test")
    taos.Exec("create table if not exists tb1 (ts timestamp, a int)")
    _, err = taos.Exec("insert into tb1 values(now, 0)(now+1s,1)(now+2s,2)(now+3s,3)")
    if err != nil {
        fmt.Println("failed to insert, err:", err)
        return
    }
    rows, err := taos.Query("select * from tb1")
    if err != nil {
        fmt.Println("failed to select from table, err:", err)
        return
    }

    defer rows.Close()
    for rows.Next() {
        var r struct {
            ts time.Time
            a  int
        }
        err := rows.Scan(&r.ts, &r.a)
        if err != nil {
            fmt.Println("scan error:\n", err)
            return
        }
        fmt.Println(r.ts, r.a)
    }
}

websocket implementation of the database/sql standard interface

A simple use case:

package main

import (
    "database/sql"
    "fmt"
    "time"

    _ "github.com/taosdata/driver-go/v3/taosWS"
)

func main() {
    var taosDSN = "root:taosdata@ws(localhost:6041)/"
    taos, err := sql.Open("taosWS", taosDSN)
    if err != nil {
        fmt.Println("failed to connect TDengine, err:", err)
        return
    }
    defer taos.Close()
    taos.Exec("create database if not exists test")
    taos.Exec("create table if not exists test.tb1 (ts timestamp, a int)")
    _, err = taos.Exec("insert into test.tb1 values(now, 0)(now+1s,1)(now+2s,2)(now+3s,3)")
    if err != nil {
        fmt.Println("failed to insert, err:", err)
        return
    }
    rows, err := taos.Query("select * from test.tb1")
    if err != nil {
        fmt.Println("failed to select from table, err:", err)
        return
    }

    defer rows.Close()
    for rows.Next() {
        var r struct {
            ts time.Time
            a  int
        }
        err := rows.Scan(&r.ts, &r.a)
        if err != nil {
            fmt.Println("scan error:\n", err)
            return
        }
        fmt.Println(r.ts, r.a)
    }
}

Usage of websocket

import

import (
    "database/sql"
    _ "github.com/taosdata/driver-go/v3/taosWS"
)

The driverName of sql.Open is taosWS

The DSN format is:

database username:database password@connection-method(domain or ip:port)/[database][? parameter]

Example:

root:taosdata@ws(localhost:6041)/test?writeTimeout=10s&readTimeout=10m

Parameters:

  • writeTimeout The timeout to send data via websocket.
  • readTimeout The timeout to receive response data via websocket.
  • enableCompression Whether to compress the transmitted data, the default is false and no compressed data is sent.

Using tmq over websocket

Use tmq over websocket. The server needs to start taoAdapter.

Configure related API

  • func NewConfig(url string, chanLength uint) *Config

Create a configuration, pass in the websocket address and the length of the sending channel.

  • func (c *Config) SetConnectUser(user string) error

Set username.

  • func (c *Config) SetConnectPass(pass string) error

Set password.

  • func (c *Config) SetClientID(clientID string) error

Set the client ID.

  • func (c *Config) SetGroupID(groupID string) error

Set the subscription group ID.

  • func (c *Config) SetWriteWait(writeWait time.Duration) error

Set the waiting time for sending messages.

  • func (c *Config) SetMessageTimeout(timeout time.Duration) error

Set the message timeout.

  • func (c *Config) SetErrorHandler(f func(consumer *Consumer, err error))

Set the error handler.

  • func (c *Config) SetCloseHandler(f func())

Set the close handler.

Subscription related API

  • func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)

Create a consumer.

  • func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error

Subscribe a topic.

  • func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error

Subscribe to topics.

  • func (c *Consumer) Poll(timeoutMs int) tmq.Event

Poll messages.

  • func (c *Consumer) Commit() ([]tmq.TopicPartition, error)

Commit message.

  • func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error)

Get assignment.

  • func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) error

Seek offset.

  • func (c *Consumer) Close() error

Close the connection.

Example code: examples/tmqoverws/main.go.

Parameter binding via WebSocket

Use stmt via websocket. The server needs to start taoAdapter.

Configure related API

  • func NewConfig(url string, chanLength uint) *Config

    Create a configuration item, pass in the websocket address and the length of the sending pipe.

  • func (c *Config) SetCloseHandler(f func())

    Set close handler.

  • func (c *Config) SetConnectDB(db string) error

    Set connect DB.

  • func (c *Config) SetConnectPass(pass string) error

    Set password.

  • func (c *Config) SetConnectUser(user string) error

    Set username.

  • func (c *Config) SetErrorHandler(f func(connector *Connector, err error))

    Set error handler.

  • func (c *Config) SetMessageTimeout(timeout time.Duration) error

    Set the message timeout.

  • func (c *Config) SetWriteWait(writeWait time.Duration) error

    Set the waiting time for sending messages.

Parameter binding related API

  • func NewConnector(config *Config) (*Connector, error)

    Create a connection.

  • func (c *Connector) Init() (*Stmt, error)

    Initialize the parameters.

  • func (c *Connector) Close() error

    Close the connection.

  • func (s *Stmt) Prepare(sql string) error

    Parameter binding preprocessing SQL statement.

  • func (s *Stmt) SetTableName(name string) error

    Bind the table name parameter.

  • func (s *Stmt) SetTags(tags *param.Param, bindType *param.ColumnType) error

    Bind tags.

  • func (s *Stmt) BindParam(params []*param.Param, bindType *param.ColumnType) error

    Parameter bind multiple rows of data.

  • func (s *Stmt) AddBatch() error

    Add to a parameter-bound batch.

  • func (s *Stmt) Exec() error

    Execute a parameter binding.

  • func (s *Stmt) GetAffectedRows() int

    Gets the number of affected rows inserted by the parameter binding.

  • func (s *Stmt) Close() error

    Closes the parameter binding.

For a complete example of parameter binding, see GitHub example file

Directory structure

driver-go
├── af //advanced function
├── common //common function and constants
├── errors // error type
├── examples //examples
├── taosRestful // database operation standard interface (restful)
├── taosSql // database operation standard interface
├── types // inner type
├── wrapper // cgo wrapper
└── ws // websocket

Link

driver-go: https://github.com/taosdata/driver-go

TDengine: https://github.com/taosdata/TDengine