Skip to content

Commit

Permalink
Fix ClusterSessionInit
Browse files Browse the repository at this point in the history
  • Loading branch information
HTHou authored Aug 29, 2024
1 parent bf6a3e3 commit bb812b8
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 51 deletions.
26 changes: 12 additions & 14 deletions client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1137,23 +1137,21 @@ func NewClusterSession(clusterConfig *ClusterConfig) Session {
session.trans = thrift.NewTSocketConf(net.JoinHostPort(e.Value.(endPoint).Host, e.Value.(endPoint).Port), &thrift.TConfiguration{
ConnectTimeout: time.Duration(0), // Use 0 for no timeout
})
if err == nil {
// session.trans = thrift.NewTFramedTransport(session.trans) // deprecated
var tmp_conf = thrift.TConfiguration{MaxFrameSize: thrift.DEFAULT_MAX_FRAME_SIZE}
session.trans = thrift.NewTFramedTransportConf(session.trans, &tmp_conf)
if !session.trans.IsOpen() {
err = session.trans.Open()
if err != nil {
log.Println(err)
} else {
session.config = getConfig(e.Value.(endPoint).Host, e.Value.(endPoint).Port,
clusterConfig.UserName, clusterConfig.Password, clusterConfig.FetchSize, clusterConfig.TimeZone, clusterConfig.ConnectRetryMax)
break
}
// session.trans = thrift.NewTFramedTransport(session.trans) // deprecated
var tmp_conf = thrift.TConfiguration{MaxFrameSize: thrift.DEFAULT_MAX_FRAME_SIZE}
session.trans = thrift.NewTFramedTransportConf(session.trans, &tmp_conf)
if !session.trans.IsOpen() {
err = session.trans.Open()
if err != nil {
log.Println(err)
} else {
session.config = getConfig(e.Value.(endPoint).Host, e.Value.(endPoint).Port,
clusterConfig.UserName, clusterConfig.Password, clusterConfig.FetchSize, clusterConfig.TimeZone, clusterConfig.ConnectRetryMax)
break
}
}
}
if err != nil {
if !session.trans.IsOpen() {
log.Fatal("No Server Can Connect")
}
return session
Expand Down
28 changes: 15 additions & 13 deletions example/session_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func main() {
}
defer session.Close()

//connectCluster()

setStorageGroup("root.ln1")
deleteStorageGroup("root.ln1")

Expand Down Expand Up @@ -143,6 +145,19 @@ func main() {
deleteTimeseries("root.ln.device1.*")
}

// If your IotDB is a cluster version, you can use the following code for multi node connection
func connectCluster() {
config := &client.ClusterConfig{
NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669", ","),
UserName: "root",
Password: "root",
}
session = client.NewClusterSession(config)
if err := session.OpenCluster(false); err != nil {
log.Fatal(err)
}
}

func printDevice1(sds *client.SessionDataSet) {
showTimestamp := !sds.IsIgnoreTimeStamp()
if showTimestamp {
Expand Down Expand Up @@ -667,16 +682,3 @@ func checkError(status *common.TSStatus, err error) {
}
}
}

// If your IotDB is a cluster version, you can use the following code for multi node connection
func connectCluster() {
config := &client.ClusterConfig{
NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669", ","),
UserName: "root",
Password: "root",
}
session = client.NewClusterSession(config)
if err := session.OpenCluster(false); err != nil {
log.Fatal(err)
}
}
39 changes: 20 additions & 19 deletions example/session_pool/session_pool_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func main() {
}()

}
//useNodeUrls()
setStorageGroup("root.ln1")
setStorageGroup("root.ln2")
deleteStorageGroups("root.ln1", "root.ln2")
Expand Down Expand Up @@ -139,6 +140,25 @@ func main() {

}

// If your IoTDB is a cluster version, you can use the following code for session pool connection
func useNodeUrls() {

config := &client.PoolConfig{
UserName: user,
Password: password,
NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
}
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
defer sessionPool.Close()
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err != nil {
log.Print(err)
return
}

}

func setStorageGroup(sg string) {
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
Expand Down Expand Up @@ -762,22 +782,3 @@ func checkError(status *common.TSStatus, err error) {
}
}
}

// If your IotDB is a cluster version or doubleLive, you can use the following code for session pool connection
func useSessionPool() {

config := &client.PoolConfig{
UserName: user,
Password: password,
NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
}
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
defer sessionPool.Close()
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err != nil {
log.Print(err)
return
}

}
9 changes: 4 additions & 5 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/apache/iotdb-client-go/common"
"log"
"math/rand"
"strings"
"testing"
"time"

Expand All @@ -41,14 +42,12 @@ func TestE2ETestSuite(t *testing.T) {
}

func (s *e2eTestSuite) SetupSuite() {
config := &client.Config{
Host: "iotdb",
Port: "6667",
clusterConfig := client.ClusterConfig{
NodeUrls: strings.Split("iotdb:6668,iotdb:6667,iotdb:6669", ","),
UserName: "root",
Password: "root",
}

s.session = client.NewSession(config)
s.session = client.NewClusterSession(&clusterConfig)
err := s.session.Open(false, 0)
s.Require().NoError(err)
}
Expand Down

0 comments on commit bb812b8

Please sign in to comment.