From 94c5390a4177d722130c5aa6e57bc013762ac6ff Mon Sep 17 00:00:00 2001 From: hadrianl <137150224@qq.com> Date: Thu, 22 Oct 2020 18:43:33 +0800 Subject: [PATCH] v9.80 1.add ReplaceFAEnd(v9.80) 2.try restart goroutine instead of disconnec 3.decoder did not recover panic anymore --- README.md | 2 +- client.go | 34 ++++++++++++++++++++-------------- const.go | 6 ++++-- decoder.go | 27 ++++++++++++++++++--------- wrapper.go | 5 +++++ 5 files changed, 48 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index 5d4d231..17e6bee 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ style="display: block; margin: 0 auto;" src="http://interactivebrokers.github.io/tws-api/nav_iblogo.png" /> -* Interactive Brokers API 9.79 +* Interactive Brokers API 9.80 * pure golang Implement * Unofficial, use at you own risk diff --git a/client.go b/client.go index d15c1f2..3ec2119 100644 --- a/client.go +++ b/client.go @@ -182,15 +182,14 @@ func (ic *IbClient) HandShake() error { var msg bytes.Buffer var msgBytes []byte head := []byte("API\x00") - minVer := []byte(strconv.FormatInt(int64(MIN_CLIENT_VER), 10)) - maxVer := []byte(strconv.FormatInt(int64(MAX_CLIENT_VER), 10)) - connectOptions := []byte{} + connectOptions := "" if ic.connectOptions != "" { - connectOptions = []byte(" " + ic.connectOptions) + connectOptions = " " + ic.connectOptions } - clientVersion := bytes.Join([][]byte{[]byte("v"), minVer, []byte(".."), maxVer, connectOptions}, nil) + clientVersion := []byte(fmt.Sprintf("v%d..%d%s", MIN_CLIENT_VER, MAX_CLIENT_VER, connectOptions)) + sizeofCV := make([]byte, 4) binary.BigEndian.PutUint32(sizeofCV, uint32(len(clientVersion))) @@ -222,7 +221,7 @@ func (ic *IbClient) HandShake() error { // Init Decoder ic.decoder.setVersion(ic.serverVersion) - ic.decoder.errChan = make(chan error, 100) + // ic.decoder.errChan = make(chan error, 100) ic.decoder.setmsgID2process() log.Info("init info", zap.Int("serverVersion", ic.serverVersion)) @@ -2852,7 +2851,9 @@ func (ic *IbClient) goRequest() { if err := recover(); err != nil { log.Error("requester got unexpected error", zap.Error(err.(error))) ic.err = err.(error) - ic.Disconnect() + // ic.Disconnect() + log.Info("try to restart requester") + go ic.goRequest() } }() defer log.Info("requester end") @@ -2891,7 +2892,9 @@ func (ic *IbClient) goReceive() { if err := recover(); err != nil { log.Error("receiver got unexpected error", zap.Error(err.(error))) ic.err = err.(error) - ic.Disconnect() + // ic.Disconnect() + log.Info("try to restart receiver") + go ic.goReceive() } }() defer log.Info("receiver end") @@ -2910,18 +2913,19 @@ func (ic *IbClient) goReceive() { select { case <-ic.terminatedSignal: default: - err := ic.scanner.Err() - switch err { + switch err := ic.scanner.Err(); err { case io.EOF: err = errors.Wrap(err, "scanner Done") + ic.Disconnect() case bufio.ErrTooLong: errBytes := ic.scanner.Bytes() ic.wrapper.Error(NO_VALID_ID, BAD_LENGTH.code, fmt.Sprintf("%s:%d:%s", BAD_LENGTH.msg, len(errBytes), errBytes)) err = errors.Wrap(err, BAD_LENGTH.msg) + panic(err) default: err = errors.Wrap(err, "scanner Error") + panic(err) } - panic(err) } } @@ -2933,7 +2937,9 @@ func (ic *IbClient) goDecode() { if err := recover(); err != nil { log.Error("decoder got unexpected error", zap.Error(err.(error))) ic.err = err.(error) - ic.Disconnect() + // ic.Disconnect() + log.Info("try to restart decoder") + go ic.goDecode() } }() defer log.Info("decoder end") @@ -2948,8 +2954,8 @@ decodeLoop: ic.decoder.interpret(m) case e := <-ic.errChan: log.Error("got client error in decode loop", zap.Error(e)) - case e := <-ic.decoder.errChan: - ic.wrapper.Error(NO_VALID_ID, BAD_MESSAGE.code, BAD_MESSAGE.msg+e.Error()) + // case e := <-ic.decoder.errChan: + // ic.wrapper.Error(NO_VALID_ID, BAD_MESSAGE.code, BAD_MESSAGE.msg+e.Error()) case <-ic.terminatedSignal: break decodeLoop } diff --git a/const.go b/const.go index 5e5f1c5..7a5736c 100644 --- a/const.go +++ b/const.go @@ -92,6 +92,7 @@ const ( mORDER_BOUND IN = 100 mCOMPLETED_ORDER IN = 101 mCOMPLETED_ORDERS_END IN = 102 + mREPLACE_FA_END IN = 103 ) const ( @@ -269,9 +270,10 @@ const ( mMIN_SERVER_VER_ENCODE_MSG_ASCII7 Version = 153 mMIN_SERVER_VER_SEND_ALL_FAMILY_CODES Version = 154 mMIN_SERVER_VER_NO_DEFAULT_OPEN_CLOSE Version = 155 + mMIN_SERVER_VER_REPLACE_FA_END Version = 157 - MIN_CLIENT_VER = 100 - MAX_CLIENT_VER = mMIN_SERVER_VER_NO_DEFAULT_OPEN_CLOSE + MIN_CLIENT_VER Version = 100 + MAX_CLIENT_VER Version = mMIN_SERVER_VER_REPLACE_FA_END ) // tick const diff --git a/decoder.go b/decoder.go index 513c71a..6d37db8 100644 --- a/decoder.go +++ b/decoder.go @@ -16,7 +16,7 @@ type ibDecoder struct { wrapper IbWrapper version Version msgID2process map[IN]func(*MsgBuffer) - errChan chan error + // errChan chan error } func (d *ibDecoder) setVersion(version Version) { @@ -34,13 +34,14 @@ func (d *ibDecoder) interpret(msgBytes []byte) { return } - // if decode error ocours,handle the error - defer func() { - if err := recover(); err != nil { - log.Error("failed to decode", zap.Error(err.(error))) - d.errChan <- err.(error) - } - }() + // try not to handle the error produced by decoder and wrapper, user should be responsible for this + // // if decode error ocours,handle the error + // defer func() { + // if err := recover(); err != nil { + // log.Error("failed to decode", zap.Error(err.(error))) + // d.errChan <- err.(error) + // } + // }() // log.Debug("interpret", zap.Binary("MsgBytes", msgBuf.Bytes())) @@ -156,7 +157,8 @@ func (d *ibDecoder) setmsgID2process() { mTICK_BY_TICK: d.processTickByTickMsg, mORDER_BOUND: d.processOrderBoundMsg, mCOMPLETED_ORDER: d.processCompletedOrderMsg, - mCOMPLETED_ORDERS_END: d.processCompletedOrdersEndMsg} + mCOMPLETED_ORDERS_END: d.processCompletedOrdersEndMsg, + mREPLACE_FA_END: d.processReplaceFAEndMsg} } @@ -2145,3 +2147,10 @@ func (d *ibDecoder) processCompletedOrderMsg(msgBuf *MsgBuffer) { func (d *ibDecoder) processCompletedOrdersEndMsg(msgBuf *MsgBuffer) { d.wrapper.CompletedOrdersEnd() } + +func (d *ibDecoder) processReplaceFAEndMsg(msgBuf *MsgBuffer) { + reqID := msgBuf.readInt() + text := msgBuf.readString() + + d.wrapper.ReplaceFAEnd(reqID, text) +} diff --git a/wrapper.go b/wrapper.go index 376d968..14a81d2 100644 --- a/wrapper.go +++ b/wrapper.go @@ -92,6 +92,7 @@ type IbWrapper interface { CommissionReport(commissionReport CommissionReport) ConnectAck() ConnectionClosed() + ReplaceFAEnd(reqID int64, text string) } // Wrapper is the default wrapper provided by this golang implement. @@ -668,3 +669,7 @@ func (w Wrapper) CompletedOrder(contract *Contract, order *Order, orderState *Or func (w Wrapper) CompletedOrdersEnd() { log.Info(":") } + +func (w Wrapper) ReplaceFAEnd(reqID int64, text string) { + log.With(zap.Int64("reqID", reqID)).Info("", zap.String("text", text)) +}