Skip to content

Commit

Permalink
Merge pull request #229 from taosdata/enh/xftan/TD-27498/compress-3.0
Browse files Browse the repository at this point in the history
enh: support websocket compression
huskar-t authored Dec 29, 2023
2 parents 9dd19ef + 03094d1 commit 3d3f581
Showing 17 changed files with 125 additions and 66 deletions.
22 changes: 12 additions & 10 deletions README-CN.md
Original file line number Diff line number Diff line change
@@ -10,16 +10,17 @@

v2 与 v3 版本不兼容,与 TDengine 版本对应如下:

| **driver-go 版本** | **TDengine 版本** | **主要功能** |
|------------------|-----------------|--------------------------------|
| v3.5.0 | 3.0.5.0+ | 获取消费进度及按照指定进度开始消费 |
| v3.3.1 | 3.0.4.1+ | 基于 websocket 的 schemaless 协议写入 |
| v3.1.0 | 3.0.2.2+ | 提供贴近 kafka 的订阅 api |
| v3.0.4 | 3.0.2.2+ | 新增 request id 相关接口 |
| v3.0.3 | 3.0.1.5+ | 基于 websocket 的 statement 写入 |
| v3.0.2 | 3.0.1.5+ | 基于 websocket 的数据查询和写入 |
| v3.0.1 | 3.0.0.0+ | 基于 websocket 的消息订阅 |
| v3.0.0 | 3.0.0.0+ | 适配 TDengine 3.0 查询和写入 |
| **driver-go 版本** | **TDengine 版本** | **主要功能** |
|------------------|----------------------|--------------------------------|
| v3.5.1 | 3.2.1.0+ / 3.1.1.13+ | 原生 stmt 查询和 geometry 类型支持 |
| v3.5.0 | 3.0.5.0+ | 获取消费进度及按照指定进度开始消费 |
| v3.3.1 | 3.0.4.1+ | 基于 websocket 的 schemaless 协议写入 |
| v3.1.0 | 3.0.2.2+ | 提供贴近 kafka 的订阅 api |
| v3.0.4 | 3.0.2.2+ | 新增 request id 相关接口 |
| v3.0.3 | 3.0.1.5+ | 基于 websocket 的 statement 写入 |
| v3.0.2 | 3.0.1.5+ | 基于 websocket 的数据查询和写入 |
| v3.0.1 | 3.0.0.0+ | 基于 websocket 的消息订阅 |
| v3.0.0 | 3.0.0.0+ | 适配 TDengine 3.0 查询和写入 |

## 安装

@@ -483,6 +484,7 @@ DSN 格式为:

- `writeTimeout` 通过 websocket 发送数据的超时时间。
- `readTimeout` 通过 websocket 接收响应数据的超时时间。
- `enableCompression` 是否压缩传输数据,默认为 `false` 不发送压缩数据。

## 通过 websocket 使用 tmq

2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ v2 is not compatible with v3 version and corresponds to the TDengine version as

| **driver-go version** | **TDengine version** | **major features** |
|-----------------------|----------------------|----------------------------------------|
| v3.5.1 | 3.2.1.0+ / 3.1.1.13+ | native stmt query and geometry support |
| v3.5.0 | 3.0.5.0+ | tmq: get assignment and seek offset |
| v3.3.1 | 3.0.4.1+ | schemaless insert over websocket |
| v3.1.0 | 3.0.2.2+ | provide tmq apis close to kafka |
@@ -484,6 +485,7 @@ Parameters:

- `writeTimeout` The timeout to send data via websocket.
- `readTimeout` The timeout to receive response data via websocket.
- `enableCompression` Whether to compress the transmitted data, the default is `false` and no compressed data is sent.

## Using tmq over websocket

17 changes: 8 additions & 9 deletions af/tmq/consumer_test.go
Original file line number Diff line number Diff line change
@@ -68,15 +68,14 @@ func TestTmq(t *testing.T) {
assert.NoError(t, err)

consumer, err := NewConsumer(&tmq.ConfigMap{
"group.id": "test",
"auto.offset.reset": "earliest",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"client.id": "test_tmq_c",
"enable.auto.commit": "false",
//"experimental.snapshot.enable": "true",
"group.id": "test",
"auto.offset.reset": "earliest",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"client.id": "test_tmq_c",
"enable.auto.commit": "false",
"msg.with.table.name": "true",
})
if err != nil {
19 changes: 9 additions & 10 deletions examples/tmq/main.go
Original file line number Diff line number Diff line change
@@ -27,16 +27,15 @@ func main() {
panic(err)
}
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"group.id": "test",
"auto.offset.reset": "earliest",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"client.id": "test_tmq_client",
"enable.auto.commit": "false",
"experimental.snapshot.enable": "true",
"msg.with.table.name": "true",
"group.id": "test",
"auto.offset.reset": "earliest",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"client.id": "test_tmq_client",
"enable.auto.commit": "false",
"msg.with.table.name": "true",
})
if err != nil {
panic(err)
6 changes: 4 additions & 2 deletions taosWS/connection.go
Original file line number Diff line number Diff line change
@@ -57,11 +57,13 @@ func newTaosConn(cfg *config) (*taosConn, error) {
endpointUrl.RawQuery = fmt.Sprintf("token=%s", cfg.token)
}
endpoint := endpointUrl.String()
ws, _, err := common.DefaultDialer.Dial(endpoint, nil)
dialer := common.DefaultDialer
dialer.EnableCompression = cfg.enableCompression
ws, _, err := dialer.Dial(endpoint, nil)
if err != nil {
return nil, err
}
ws.SetReadLimit(common.BufferSize4M)
ws.EnableWriteCompression(cfg.enableCompression)
ws.SetReadDeadline(time.Now().Add(common.DefaultPongWait))
ws.SetPongHandler(func(string) error {
ws.SetReadDeadline(time.Now().Add(common.DefaultPongWait))
2 changes: 1 addition & 1 deletion taosWS/driver_test.go
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ var (
port = 6041
dbName = "test_taos_ws"
dataSourceName = fmt.Sprintf("%s:%s@ws(%s:%d)/", user, password, host, port)
dataSourceNameWithCompression = fmt.Sprintf("%s:%s@ws(%s:%d)/?disableCompression=false", user, password, host, port)
dataSourceNameWithCompression = fmt.Sprintf("%s:%s@ws(%s:%d)/?enableCompression=true", user, password, host, port)
)

type DBTest struct {
6 changes: 6 additions & 0 deletions taosWS/dsn.go
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ type config struct {
params map[string]string // Connection parameters
interpolateParams bool // Interpolate placeholders into query string
token string // cloud platform token
enableCompression bool // Enable write compression
readTimeout time.Duration // read message timeout
writeTimeout time.Duration // write message timeout
}
@@ -143,6 +144,11 @@ func parseDSNParams(cfg *config, params string) (err error) {
}
case "token":
cfg.token = value
case "enableCompression":
cfg.enableCompression, err = strconv.ParseBool(value)
if err != nil {
return &errors.TaosError{Code: 0xffff, ErrStr: "invalid enableCompression value: " + value}
}
case "readTimeout":
cfg.readTimeout, err = time.ParseDuration(value)
if err != nil {
9 changes: 9 additions & 0 deletions taosWS/dsn_test.go
Original file line number Diff line number Diff line change
@@ -25,6 +25,15 @@ func TestParseDsn(t *testing.T) {
{dsn: "user:passwd@wss(:0)/?interpolateParams=false&test=1", want: &config{user: "user", passwd: "passwd", net: "wss", params: map[string]string{"test": "1"}}},
{dsn: "user:passwd@wss(:0)/?interpolateParams=false&token=token", want: &config{user: "user", passwd: "passwd", net: "wss", token: "token"}},
{dsn: "user:passwd@wss(:0)/?writeTimeout=8s&readTimeout=10m", want: &config{user: "user", passwd: "passwd", net: "wss", readTimeout: 10 * time.Minute, writeTimeout: 8 * time.Second, interpolateParams: true}},
{dsn: "user:passwd@wss(:0)/?writeTimeout=8s&readTimeout=10m&enableCompression=true", want: &config{
user: "user",
passwd: "passwd",
net: "wss",
readTimeout: 10 * time.Minute,
writeTimeout: 8 * time.Second,
interpolateParams: true,
enableCompression: true,
}},
}
for _, tc := range tests {
t.Run(tc.dsn, func(t *testing.T) {
23 changes: 15 additions & 8 deletions ws/schemaless/config.go
Original file line number Diff line number Diff line change
@@ -10,14 +10,15 @@ const (
)

type Config struct {
url string
chanLength uint
user string
password string
db string
readTimeout time.Duration
writeTimeout time.Duration
errorHandler func(error)
url string
chanLength uint
user string
password string
db string
readTimeout time.Duration
writeTimeout time.Duration
errorHandler func(error)
enableCompression bool
}

func NewConfig(url string, chanLength uint, opts ...func(*Config)) *Config {
@@ -64,3 +65,9 @@ func SetErrorHandler(errorHandler func(error)) func(*Config) {
c.errorHandler = errorHandler
}
}

func SetEnableCompression(enableCompression bool) func(*Config) {
return func(c *Config) {
c.enableCompression = enableCompression
}
}
5 changes: 4 additions & 1 deletion ws/schemaless/schemaless.go
Original file line number Diff line number Diff line change
@@ -47,7 +47,10 @@ func NewSchemaless(config *Config) (*Schemaless, error) {
if len(wsUrl.Path) == 0 || wsUrl.Path != "/rest/schemaless" {
wsUrl.Path = "/rest/schemaless"
}
ws, _, err := common.DefaultDialer.Dial(wsUrl.String(), nil)
dialer := common.DefaultDialer
dialer.EnableCompression = config.enableCompression
ws, _, err := dialer.Dial(wsUrl.String(), nil)
ws.EnableWriteCompression(config.enableCompression)
if err != nil {
return nil, fmt.Errorf("dial ws error: %s", err)
}
1 change: 1 addition & 0 deletions ws/schemaless/schemaless_test.go
Original file line number Diff line number Diff line change
@@ -68,6 +68,7 @@ func TestSchemaless_Insert(t *testing.T) {
SetWriteTimeout(10*time.Second),
SetUser("root"),
SetPassword("taosdata"),
SetEnableCompression(true),
SetErrorHandler(func(err error) {
t.Fatal(err)
}),
23 changes: 14 additions & 9 deletions ws/stmt/config.go
Original file line number Diff line number Diff line change
@@ -6,15 +6,16 @@ import (
)

type Config struct {
Url string
ChanLength uint
MessageTimeout time.Duration
WriteWait time.Duration
ErrorHandler func(connector *Connector, err error)
CloseHandler func()
User string
Password string
DB string
Url string
ChanLength uint
MessageTimeout time.Duration
WriteWait time.Duration
ErrorHandler func(connector *Connector, err error)
CloseHandler func()
User string
Password string
DB string
EnableCompression bool
}

func NewConfig(url string, chanLength uint) *Config {
@@ -60,3 +61,7 @@ func (c *Config) SetErrorHandler(f func(connector *Connector, err error)) {
func (c *Config) SetCloseHandler(f func()) {
c.CloseHandler = f
}

func (c *Config) SetEnableCompression(enableCompression bool) {
c.EnableCompression = enableCompression
}
5 changes: 4 additions & 1 deletion ws/stmt/connector.go
Original file line number Diff line number Diff line change
@@ -44,10 +44,13 @@ func NewConnector(config *Config) (*Connector, error) {
if config.WriteWait > 0 {
writeTimeout = config.WriteWait
}
ws, _, err := common.DefaultDialer.Dial(config.Url, nil)
dialer := common.DefaultDialer
dialer.EnableCompression = config.EnableCompression
ws, _, err := dialer.Dial(config.Url, nil)
if err != nil {
return nil, err
}
ws.EnableWriteCompression(config.EnableCompression)
defer func() {
if connector == nil {
ws.Close()
1 change: 1 addition & 0 deletions ws/stmt/stmt_test.go
Original file line number Diff line number Diff line change
@@ -164,6 +164,7 @@ func TestStmt(t *testing.T) {
config.SetConnectDB("test_ws_stmt")
config.SetMessageTimeout(common.DefaultMessageTimeout)
config.SetWriteWait(common.DefaultWriteWait)
config.SetEnableCompression(true)
config.SetErrorHandler(func(connector *Connector, err error) {
t.Log(err)
})
10 changes: 10 additions & 0 deletions ws/tmq/config.go
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ type config struct {
AutoCommitIntervalMS string
SnapshotEnable string
WithTableName string
EnableCompression bool
}

func newConfig(url string, chanLength uint) *config {
@@ -138,3 +139,12 @@ func (c *config) setWithTableName(withTableName tmq.ConfigValue) error {
}
return nil
}

func (c *config) setEnableCompression(enableCompression tmq.ConfigValue) error {
var ok bool
c.EnableCompression, ok = enableCompression.(bool)
if !ok {
return fmt.Errorf("ws.message.enableCompression requires bool got %T", enableCompression)
}
return nil
}
13 changes: 12 additions & 1 deletion ws/tmq/consumer.go
Original file line number Diff line number Diff line change
@@ -77,10 +77,13 @@ func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error) {
autoCommitInterval = time.Millisecond * time.Duration(interval)
}

ws, _, err := common.DefaultDialer.Dial(config.Url, nil)
dialer := common.DefaultDialer
dialer.EnableCompression = config.EnableCompression
ws, _, err := dialer.Dial(config.Url, nil)
if err != nil {
return nil, err
}
ws.EnableWriteCompression(config.EnableCompression)
wsClient := client.NewClient(ws, config.ChanLength)

consumer := &Consumer{
@@ -168,6 +171,10 @@ func configMapToConfig(m *tmq.ConfigMap) (*config, error) {
if err != nil {
return nil, err
}
enableCompression, err := m.Get("ws.message.enableCompression", false)
if err != nil {
return nil, err
}
config := newConfig(url.(string), chanLen.(uint))
err = config.setMessageTimeout(messageTimeout.(time.Duration))
if err != nil {
@@ -213,6 +220,10 @@ func configMapToConfig(m *tmq.ConfigMap) (*config, error) {
if err != nil {
return nil, err
}
err = config.setEnableCompression(enableCompression)
if err != nil {
return nil, err
}
return config, nil
}

27 changes: 13 additions & 14 deletions ws/tmq/consumer_test.go
Original file line number Diff line number Diff line change
@@ -276,8 +276,8 @@ func TestSeek(t *testing.T) {
"client.id": "test_consumer",
"auto.offset.reset": "earliest",
"enable.auto.commit": "false",
"experimental.snapshot.enable": "false",
"msg.with.table.name": "true",
"ws.message.enableCompression": true,
})
if err != nil {
t.Error(err)
@@ -394,19 +394,18 @@ func TestAutoCommit(t *testing.T) {
}
defer cleanAutocommitEnv()
consumer, err := NewConsumer(&tmq.ConfigMap{
"ws.url": "ws://127.0.0.1:6041/rest/tmq",
"ws.message.channelLen": uint(0),
"ws.message.timeout": common.DefaultMessageTimeout,
"ws.message.writeWait": common.DefaultWriteWait,
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"group.id": "test",
"client.id": "test_consumer",
"auto.offset.reset": "earliest",
"enable.auto.commit": "true",
"auto.commit.interval.ms": "1000",
"experimental.snapshot.enable": "false",
"msg.with.table.name": "true",
"ws.url": "ws://127.0.0.1:6041/rest/tmq",
"ws.message.channelLen": uint(0),
"ws.message.timeout": common.DefaultMessageTimeout,
"ws.message.writeWait": common.DefaultWriteWait,
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"group.id": "test",
"client.id": "test_consumer",
"auto.offset.reset": "earliest",
"enable.auto.commit": "true",
"auto.commit.interval.ms": "1000",
"msg.with.table.name": "true",
})
assert.NoError(t, err)
if err != nil {

0 comments on commit 3d3f581

Please sign in to comment.