Skip to content

Commit

Permalink
v9.80
Browse files Browse the repository at this point in the history
1.add ReplaceFAEnd(v9.80)
2.try restart goroutine instead of disconnec
3.decoder did not recover panic anymore
  • Loading branch information
hadrianl committed Oct 22, 2020
1 parent df7d4a4 commit 94c5390
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 26 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ style="display: block; margin: 0 auto;"
src="http://interactivebrokers.github.io/tws-api/nav_iblogo.png"
/>

* Interactive Brokers API 9.79
* Interactive Brokers API 9.80
* pure golang Implement
* Unofficial, use at you own risk

Expand Down
34 changes: 20 additions & 14 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,14 @@ func (ic *IbClient) HandShake() error {
var msg bytes.Buffer
var msgBytes []byte
head := []byte("API\x00")
minVer := []byte(strconv.FormatInt(int64(MIN_CLIENT_VER), 10))
maxVer := []byte(strconv.FormatInt(int64(MAX_CLIENT_VER), 10))

connectOptions := []byte{}
connectOptions := ""
if ic.connectOptions != "" {
connectOptions = []byte(" " + ic.connectOptions)
connectOptions = " " + ic.connectOptions
}

clientVersion := bytes.Join([][]byte{[]byte("v"), minVer, []byte(".."), maxVer, connectOptions}, nil)
clientVersion := []byte(fmt.Sprintf("v%d..%d%s", MIN_CLIENT_VER, MAX_CLIENT_VER, connectOptions))

sizeofCV := make([]byte, 4)
binary.BigEndian.PutUint32(sizeofCV, uint32(len(clientVersion)))

Expand Down Expand Up @@ -222,7 +221,7 @@ func (ic *IbClient) HandShake() error {

// Init Decoder
ic.decoder.setVersion(ic.serverVersion)
ic.decoder.errChan = make(chan error, 100)
// ic.decoder.errChan = make(chan error, 100)
ic.decoder.setmsgID2process()

log.Info("init info", zap.Int("serverVersion", ic.serverVersion))
Expand Down Expand Up @@ -2852,7 +2851,9 @@ func (ic *IbClient) goRequest() {
if err := recover(); err != nil {
log.Error("requester got unexpected error", zap.Error(err.(error)))
ic.err = err.(error)
ic.Disconnect()
// ic.Disconnect()
log.Info("try to restart requester")
go ic.goRequest()
}
}()
defer log.Info("requester end")
Expand Down Expand Up @@ -2891,7 +2892,9 @@ func (ic *IbClient) goReceive() {
if err := recover(); err != nil {
log.Error("receiver got unexpected error", zap.Error(err.(error)))
ic.err = err.(error)
ic.Disconnect()
// ic.Disconnect()
log.Info("try to restart receiver")
go ic.goReceive()
}
}()
defer log.Info("receiver end")
Expand All @@ -2910,18 +2913,19 @@ func (ic *IbClient) goReceive() {
select {
case <-ic.terminatedSignal:
default:
err := ic.scanner.Err()
switch err {
switch err := ic.scanner.Err(); err {
case io.EOF:
err = errors.Wrap(err, "scanner Done")
ic.Disconnect()
case bufio.ErrTooLong:
errBytes := ic.scanner.Bytes()
ic.wrapper.Error(NO_VALID_ID, BAD_LENGTH.code, fmt.Sprintf("%s:%d:%s", BAD_LENGTH.msg, len(errBytes), errBytes))
err = errors.Wrap(err, BAD_LENGTH.msg)
panic(err)
default:
err = errors.Wrap(err, "scanner Error")
panic(err)
}
panic(err)
}

}
Expand All @@ -2933,7 +2937,9 @@ func (ic *IbClient) goDecode() {
if err := recover(); err != nil {
log.Error("decoder got unexpected error", zap.Error(err.(error)))
ic.err = err.(error)
ic.Disconnect()
// ic.Disconnect()
log.Info("try to restart decoder")
go ic.goDecode()
}
}()
defer log.Info("decoder end")
Expand All @@ -2948,8 +2954,8 @@ decodeLoop:
ic.decoder.interpret(m)
case e := <-ic.errChan:
log.Error("got client error in decode loop", zap.Error(e))
case e := <-ic.decoder.errChan:
ic.wrapper.Error(NO_VALID_ID, BAD_MESSAGE.code, BAD_MESSAGE.msg+e.Error())
// case e := <-ic.decoder.errChan:
// ic.wrapper.Error(NO_VALID_ID, BAD_MESSAGE.code, BAD_MESSAGE.msg+e.Error())
case <-ic.terminatedSignal:
break decodeLoop
}
Expand Down
6 changes: 4 additions & 2 deletions const.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ const (
mORDER_BOUND IN = 100
mCOMPLETED_ORDER IN = 101
mCOMPLETED_ORDERS_END IN = 102
mREPLACE_FA_END IN = 103
)

const (
Expand Down Expand Up @@ -269,9 +270,10 @@ const (
mMIN_SERVER_VER_ENCODE_MSG_ASCII7 Version = 153
mMIN_SERVER_VER_SEND_ALL_FAMILY_CODES Version = 154
mMIN_SERVER_VER_NO_DEFAULT_OPEN_CLOSE Version = 155
mMIN_SERVER_VER_REPLACE_FA_END Version = 157

MIN_CLIENT_VER = 100
MAX_CLIENT_VER = mMIN_SERVER_VER_NO_DEFAULT_OPEN_CLOSE
MIN_CLIENT_VER Version = 100
MAX_CLIENT_VER Version = mMIN_SERVER_VER_REPLACE_FA_END
)

// tick const
Expand Down
27 changes: 18 additions & 9 deletions decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type ibDecoder struct {
wrapper IbWrapper
version Version
msgID2process map[IN]func(*MsgBuffer)
errChan chan error
// errChan chan error
}

func (d *ibDecoder) setVersion(version Version) {
Expand All @@ -34,13 +34,14 @@ func (d *ibDecoder) interpret(msgBytes []byte) {
return
}

// if decode error ocours,handle the error
defer func() {
if err := recover(); err != nil {
log.Error("failed to decode", zap.Error(err.(error)))
d.errChan <- err.(error)
}
}()
// try not to handle the error produced by decoder and wrapper, user should be responsible for this
// // if decode error ocours,handle the error
// defer func() {
// if err := recover(); err != nil {
// log.Error("failed to decode", zap.Error(err.(error)))
// d.errChan <- err.(error)
// }
// }()

// log.Debug("interpret", zap.Binary("MsgBytes", msgBuf.Bytes()))

Expand Down Expand Up @@ -156,7 +157,8 @@ func (d *ibDecoder) setmsgID2process() {
mTICK_BY_TICK: d.processTickByTickMsg,
mORDER_BOUND: d.processOrderBoundMsg,
mCOMPLETED_ORDER: d.processCompletedOrderMsg,
mCOMPLETED_ORDERS_END: d.processCompletedOrdersEndMsg}
mCOMPLETED_ORDERS_END: d.processCompletedOrdersEndMsg,
mREPLACE_FA_END: d.processReplaceFAEndMsg}

}

Expand Down Expand Up @@ -2145,3 +2147,10 @@ func (d *ibDecoder) processCompletedOrderMsg(msgBuf *MsgBuffer) {
func (d *ibDecoder) processCompletedOrdersEndMsg(msgBuf *MsgBuffer) {
d.wrapper.CompletedOrdersEnd()
}

func (d *ibDecoder) processReplaceFAEndMsg(msgBuf *MsgBuffer) {
reqID := msgBuf.readInt()
text := msgBuf.readString()

d.wrapper.ReplaceFAEnd(reqID, text)
}
5 changes: 5 additions & 0 deletions wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type IbWrapper interface {
CommissionReport(commissionReport CommissionReport)
ConnectAck()
ConnectionClosed()
ReplaceFAEnd(reqID int64, text string)
}

// Wrapper is the default wrapper provided by this golang implement.
Expand Down Expand Up @@ -668,3 +669,7 @@ func (w Wrapper) CompletedOrder(contract *Contract, order *Order, orderState *Or
func (w Wrapper) CompletedOrdersEnd() {
log.Info("<CompletedOrdersEnd>:")
}

func (w Wrapper) ReplaceFAEnd(reqID int64, text string) {
log.With(zap.Int64("reqID", reqID)).Info("<ReplaceFAEnd>", zap.String("text", text))
}

0 comments on commit 94c5390

Please sign in to comment.