Skip to content

Commit

Permalink
Fix bugs for rc/1.3.3 (#113)
Browse files Browse the repository at this point in the history
* Return error when constructing cluster session failed

* fix bugs
  • Loading branch information
shuwenwei authored Dec 11, 2024
1 parent a9ebffb commit e269bad
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 34 deletions.
3 changes: 2 additions & 1 deletion client/bitmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ var UnmarkBitUtil = []byte{
}

func NewBitMap(size int) *BitMap {
// Need to maintain consistency with the calculation method on the IoTDB side.
bitMap := &BitMap{
size: size,
bits: make([]byte, (size+7)/8),
bits: make([]byte, size/8+1),
}
return bitMap
}
Expand Down
4 changes: 4 additions & 0 deletions client/rpcdataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func (s *IoTDBRpcDataSet) getColumnType(columnName string) TSDataType {
return s.columnTypeDeduplicatedList[s.getColumnIndex(columnName)]
}

func (s *IoTDBRpcDataSet) isNullWithColumnName(columnName string) bool {
return s.isNull(int(s.getColumnIndex(columnName)), s.rowsIndex-1)
}

func (s *IoTDBRpcDataSet) isNull(columnIndex int, rowIndex int) bool {
if s.closed {
return true
Expand Down
52 changes: 25 additions & 27 deletions client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Session struct {
sessionId int64
trans thrift.TTransport
requestStatementId int64
protocolFactory thrift.TProtocolFactory
}

type endPoint struct {
Expand All @@ -83,7 +84,6 @@ func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) err
s.config.ConnectRetryMax = DefaultConnectRetryMax
}

var protocolFactory thrift.TProtocolFactory
var err error

// in thrift 0.14.1, this func returns two values; in thrift 0.15.0, it returns one.
Expand All @@ -99,13 +99,10 @@ func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) err
return err
}
}
if enableRPCCompression {
protocolFactory = thrift.NewTCompactProtocolFactory()
} else {
protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
}
iprot := protocolFactory.GetProtocol(s.trans)
oprot := protocolFactory.GetProtocol(s.trans)
s.protocolFactory = getProtocolFactory(enableRPCCompression)
iprot := s.protocolFactory.GetProtocol(s.trans)
oprot := s.protocolFactory.GetProtocol(s.trans)

s.client = rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, oprot))
req := rpc.TSOpenSessionReq{ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, Username: s.config.UserName,
Password: &s.config.Password}
Expand Down Expand Up @@ -147,16 +144,11 @@ func (s *Session) OpenCluster(enableRPCCompression bool) error {
s.config.ConnectRetryMax = DefaultConnectRetryMax
}

var protocolFactory thrift.TProtocolFactory
var err error

if enableRPCCompression {
protocolFactory = thrift.NewTCompactProtocolFactory()
} else {
protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
}
iprot := protocolFactory.GetProtocol(s.trans)
oprot := protocolFactory.GetProtocol(s.trans)
s.protocolFactory = getProtocolFactory(enableRPCCompression)
iprot := s.protocolFactory.GetProtocol(s.trans)
oprot := s.protocolFactory.GetProtocol(s.trans)
s.client = rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, oprot))
req := rpc.TSOpenSessionReq{ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, Username: s.config.UserName,
Password: &s.config.Password}
Expand All @@ -170,14 +162,22 @@ func (s *Session) OpenCluster(enableRPCCompression bool) error {
return err
}

func (s *Session) Close() (r *common.TSStatus, err error) {
func getProtocolFactory(enableRPCCompression bool) thrift.TProtocolFactory {
if enableRPCCompression {
return thrift.NewTCompactProtocolFactoryConf(&thrift.TConfiguration{})
} else {
return thrift.NewTBinaryProtocolFactoryConf(&thrift.TConfiguration{})
}
}

func (s *Session) Close() error {
req := rpc.NewTSCloseSessionReq()
req.SessionId = s.sessionId
_, err = s.client.CloseSession(context.Background(), req)
_, err := s.client.CloseSession(context.Background(), req)
if err != nil {
return nil, err
return err
}
return nil, s.trans.Close()
return s.trans.Close()
}

/*
Expand Down Expand Up @@ -1085,7 +1085,7 @@ func NewSession(config *Config) Session {
return Session{config: config}
}

func NewClusterSession(clusterConfig *ClusterConfig) Session {
func NewClusterSession(clusterConfig *ClusterConfig) (Session, error) {
session := Session{}
node := endPoint{}
for i := 0; i < len(clusterConfig.NodeUrls); i++ {
Expand Down Expand Up @@ -1113,9 +1113,9 @@ func NewClusterSession(clusterConfig *ClusterConfig) Session {
}
}
if !session.trans.IsOpen() {
log.Fatal("No Server Can Connect")
return session, fmt.Errorf("no server can connect")
}
return session
return session, nil
}

func (s *Session) initClusterConn(node endPoint) error {
Expand Down Expand Up @@ -1148,10 +1148,8 @@ func (s *Session) initClusterConn(node endPoint) error {
s.config.ConnectRetryMax = DefaultConnectRetryMax
}

var protocolFactory thrift.TProtocolFactory
protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
iprot := protocolFactory.GetProtocol(s.trans)
oprot := protocolFactory.GetProtocol(s.trans)
iprot := s.protocolFactory.GetProtocol(s.trans)
oprot := s.protocolFactory.GetProtocol(s.trans)
s.client = rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, oprot))
req := rpc.TSOpenSessionReq{ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, Username: s.config.UserName,
Password: &s.config.Password}
Expand Down
4 changes: 4 additions & 0 deletions client/sessiondataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (s *SessionDataSet) GetText(columnName string) string {
return s.ioTDBRpcDataSet.getText(columnName)
}

func (s *SessionDataSet) IsNull(columnName string) bool {
return s.ioTDBRpcDataSet.isNullWithColumnName(columnName)
}

func (s *SessionDataSet) GetBool(columnName string) bool {
return s.ioTDBRpcDataSet.getBool(columnName)
}
Expand Down
7 changes: 5 additions & 2 deletions client/sessionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,11 @@ func (spool *SessionPool) GetSession() (session Session, err error) {

func (spool *SessionPool) ConstructSession(config *PoolConfig) (session Session, err error) {
if len(config.NodeUrls) > 0 {
session = NewClusterSession(getClusterSessionConfig(config))
if err := session.OpenCluster(spool.enableCompression); err != nil {
session, err = NewClusterSession(getClusterSessionConfig(config))
if err != nil {
return session, err
}
if err = session.OpenCluster(spool.enableCompression); err != nil {
log.Print(err)
return session, err
}
Expand Down
2 changes: 2 additions & 0 deletions client/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (t *Tablet) SetValueAt(value interface{}, columnIndex, rowIndex int) error
}
// Mark the nil value position
t.bitMaps[columnIndex].Mark(rowIndex)
return nil
}

switch t.measurementSchemas[columnIndex].DataType {
Expand Down Expand Up @@ -296,6 +297,7 @@ func (t *Tablet) getValuesBytes() ([]byte, error) {
columnHasNil := bitMap != nil && !bitMap.IsAllUnmarked()
binary.Write(buff, binary.BigEndian, columnHasNil)
if columnHasNil {
// Need to maintain consistency with the calculation method on the IoTDB side.
binary.Write(buff, binary.BigEndian, bitMap.GetBits()[0:t.RowSize/8+1])
}
}
Expand Down
7 changes: 5 additions & 2 deletions example/session_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,11 @@ func connectCluster() {
UserName: "root",
Password: "root",
}
session = client.NewClusterSession(config)
if err := session.OpenCluster(false); err != nil {
session, err := client.NewClusterSession(config)
if err != nil {
log.Fatal(err)
}
if err = session.OpenCluster(false); err != nil {
log.Fatal(err)
}
}
Expand Down
6 changes: 4 additions & 2 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ func (s *e2eTestSuite) SetupSuite() {
UserName: "root",
Password: "root",
}
s.session = client.NewClusterSession(&clusterConfig)
err := s.session.Open(false, 0)
session, err := client.NewClusterSession(&clusterConfig)
s.Require().NoError(err)
s.session = session
err = s.session.Open(false, 0)
s.Require().NoError(err)
}

Expand Down

0 comments on commit e269bad

Please sign in to comment.