From bb812b80b9ad6af6ce34d5f23a68a5fee9237468 Mon Sep 17 00:00:00 2001 From: Haonan Date: Thu, 29 Aug 2024 15:46:58 +0800 Subject: [PATCH] Fix ClusterSessionInit --- client/session.go | 26 ++++++------- example/session_example.go | 28 +++++++------- example/session_pool/session_pool_example.go | 39 ++++++++++---------- test/e2e/e2e_test.go | 9 ++--- 4 files changed, 51 insertions(+), 51 deletions(-) diff --git a/client/session.go b/client/session.go index 0b55866..47985f8 100644 --- a/client/session.go +++ b/client/session.go @@ -1137,23 +1137,21 @@ func NewClusterSession(clusterConfig *ClusterConfig) Session { session.trans = thrift.NewTSocketConf(net.JoinHostPort(e.Value.(endPoint).Host, e.Value.(endPoint).Port), &thrift.TConfiguration{ ConnectTimeout: time.Duration(0), // Use 0 for no timeout }) - if err == nil { - // session.trans = thrift.NewTFramedTransport(session.trans) // deprecated - var tmp_conf = thrift.TConfiguration{MaxFrameSize: thrift.DEFAULT_MAX_FRAME_SIZE} - session.trans = thrift.NewTFramedTransportConf(session.trans, &tmp_conf) - if !session.trans.IsOpen() { - err = session.trans.Open() - if err != nil { - log.Println(err) - } else { - session.config = getConfig(e.Value.(endPoint).Host, e.Value.(endPoint).Port, - clusterConfig.UserName, clusterConfig.Password, clusterConfig.FetchSize, clusterConfig.TimeZone, clusterConfig.ConnectRetryMax) - break - } + // session.trans = thrift.NewTFramedTransport(session.trans) // deprecated + var tmp_conf = thrift.TConfiguration{MaxFrameSize: thrift.DEFAULT_MAX_FRAME_SIZE} + session.trans = thrift.NewTFramedTransportConf(session.trans, &tmp_conf) + if !session.trans.IsOpen() { + err = session.trans.Open() + if err != nil { + log.Println(err) + } else { + session.config = getConfig(e.Value.(endPoint).Host, e.Value.(endPoint).Port, + clusterConfig.UserName, clusterConfig.Password, clusterConfig.FetchSize, clusterConfig.TimeZone, clusterConfig.ConnectRetryMax) + break } } } - if err != nil { + if !session.trans.IsOpen() { log.Fatal("No Server Can Connect") } return session diff --git a/example/session_example.go b/example/session_example.go index b370435..fbfba4f 100644 --- a/example/session_example.go +++ b/example/session_example.go @@ -57,6 +57,8 @@ func main() { } defer session.Close() + //connectCluster() + setStorageGroup("root.ln1") deleteStorageGroup("root.ln1") @@ -143,6 +145,19 @@ func main() { deleteTimeseries("root.ln.device1.*") } +// If your IotDB is a cluster version, you can use the following code for multi node connection +func connectCluster() { + config := &client.ClusterConfig{ + NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669", ","), + UserName: "root", + Password: "root", + } + session = client.NewClusterSession(config) + if err := session.OpenCluster(false); err != nil { + log.Fatal(err) + } +} + func printDevice1(sds *client.SessionDataSet) { showTimestamp := !sds.IsIgnoreTimeStamp() if showTimestamp { @@ -667,16 +682,3 @@ func checkError(status *common.TSStatus, err error) { } } } - -// If your IotDB is a cluster version, you can use the following code for multi node connection -func connectCluster() { - config := &client.ClusterConfig{ - NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669", ","), - UserName: "root", - Password: "root", - } - session = client.NewClusterSession(config) - if err := session.OpenCluster(false); err != nil { - log.Fatal(err) - } -} diff --git a/example/session_pool/session_pool_example.go b/example/session_pool/session_pool_example.go index a4641d9..459393b 100644 --- a/example/session_pool/session_pool_example.go +++ b/example/session_pool/session_pool_example.go @@ -67,6 +67,7 @@ func main() { }() } + //useNodeUrls() setStorageGroup("root.ln1") setStorageGroup("root.ln2") deleteStorageGroups("root.ln1", "root.ln2") @@ -139,6 +140,25 @@ func main() { } +// If your IoTDB is a cluster version, you can use the following code for session pool connection +func useNodeUrls() { + + config := &client.PoolConfig{ + UserName: user, + Password: password, + NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","), + } + sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false) + defer sessionPool.Close() + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err != nil { + log.Print(err) + return + } + +} + func setStorageGroup(sg string) { session, err := sessionPool.GetSession() defer sessionPool.PutBack(session) @@ -762,22 +782,3 @@ func checkError(status *common.TSStatus, err error) { } } } - -// If your IotDB is a cluster version or doubleLive, you can use the following code for session pool connection -func useSessionPool() { - - config := &client.PoolConfig{ - UserName: user, - Password: password, - NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","), - } - sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false) - defer sessionPool.Close() - session, err := sessionPool.GetSession() - defer sessionPool.PutBack(session) - if err != nil { - log.Print(err) - return - } - -} diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 5509d40..c7a971e 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -24,6 +24,7 @@ import ( "github.com/apache/iotdb-client-go/common" "log" "math/rand" + "strings" "testing" "time" @@ -41,14 +42,12 @@ func TestE2ETestSuite(t *testing.T) { } func (s *e2eTestSuite) SetupSuite() { - config := &client.Config{ - Host: "iotdb", - Port: "6667", + clusterConfig := client.ClusterConfig{ + NodeUrls: strings.Split("iotdb:6668,iotdb:6667,iotdb:6669", ","), UserName: "root", Password: "root", } - - s.session = client.NewSession(config) + s.session = client.NewClusterSession(&clusterConfig) err := s.session.Open(false, 0) s.Require().NoError(err) }