Skip to content

Commit

Permalink
1.fix scanner splitfunc
Browse files Browse the repository at this point in the history
2.use goroutine to interpret the msg
3.add more code annotation
  • Loading branch information
hadrianl committed Mar 12, 2020
1 parent 4d90f67 commit 59ca2b1
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 67 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

func main(){
var err error
ibwrapper := Wrapper{}
ibwrapper := &Wrapper{}
ic := NewIbClient(ibwrapper)
err = ic.Connect("172.0.0.1", 4002, 0)
if err != nil {
Expand Down
94 changes: 69 additions & 25 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"context"
"encoding/binary"
"fmt"
"net"
"strconv"
"strings"
Expand Down Expand Up @@ -68,7 +69,7 @@ func (ic *IbClient) ConnState() int {
func (ic *IbClient) setConnState(connState int) {
OldConnState := ic.conn.state
ic.conn.state = connState
log.Infof("ConnState: %v -> %v", OldConnState, connState)
log.Debugf("ConnState: %v -> %v", OldConnState, connState)
}

// GetReqID before request data or place order
Expand Down Expand Up @@ -100,9 +101,11 @@ func (ic *IbClient) Connect(host string, port int, clientID int64) error {
ic.host = host
ic.port = port
ic.clientID = clientID
log.Debugf("Connecting to %s:%d clientid:%d", host, port, clientID)
ic.setConnState(CONNECTING)
if err := ic.conn.connect(host, port); err != nil {
ic.wrapper.Error(NO_VALID_ID, CONNECT_FAIL.code, CONNECT_FAIL.msg)
ic.reset()
return CONNECT_FAIL
}

Expand All @@ -122,7 +125,8 @@ func (ic *IbClient) Disconnect() error {

defer log.Info("Disconnected!")
ic.wg.Wait()
ic.setConnState(DISCONNECTED)
ic.wrapper.ConnectionClosed()
ic.reset()
return nil
}

Expand Down Expand Up @@ -190,7 +194,10 @@ func (ic *IbClient) HandShake() error {
v, _ := strconv.Atoi(string(serverInfo[0]))
ic.serverVersion = Version(v)
ic.connTime = bytesToTime(serverInfo[1])
ic.decoder.setVersion(ic.serverVersion) // Init Decoder

// Init Decoder
ic.decoder.setVersion(ic.serverVersion)
ic.decoder.errChan = make(chan error, 100)
ic.decoder.setmsgID2process()
log.Info("ServerVersion:", ic.serverVersion)
log.Info("ConnectionTime:", ic.connTime)
Expand Down Expand Up @@ -238,25 +245,54 @@ comfirmReadyLoop:
return nil
}

func (ic IbClient) ServerVersion() Version {
return ic.serverVersion
}

func (ic IbClient) ConnectionTime() time.Time {
return ic.connTime
}

func (ic *IbClient) reset() {
log.Info("Reset IbClient.")
ic.reqIDSeq = 0
ic.conn = &IbConnection{}
ic.conn.reset()
ic.host = ""
ic.port = -1
ic.extraAuth = false
ic.clientID = -1
ic.serverVersion = -1
ic.connTime = time.Time{}
ic.reader = bufio.NewReader(ic.conn)
ic.writer = bufio.NewWriter(ic.conn)
ic.reqChan = make(chan []byte, 10)
ic.errChan = make(chan error, 10)
ic.msgChan = make(chan []byte, 100)
ic.terminatedSignal = make(chan int, 3)
ic.wg = sync.WaitGroup{}
ic.connectOptions = ""
ic.setConnState(DISCONNECTED)

if ic.ctx == nil {
ic.ctx = context.TODO()
}

}

func (ic *IbClient) SetServerLogLevel(logLevel int64) {
v := 1
fields := make([]interface{}, 0, 3)
fields = append(fields,
mSET_SERVER_LOGLEVEL,
v,
logLevel,
)

msg := makeMsgBytes(fields...)

ic.reqChan <- msg
}

// ---------------req func ----------------------------------------------

/*
Expand Down Expand Up @@ -393,8 +429,11 @@ The API can receive frozen market data from Trader
trading day, market data will automatically switch back to real-time
market data.
marketDataType:int - 1 for real-time streaming market data or 2 for
frozen market data
marketDataType:int -
1 -> realtime streaming market data
2 -> frozen market data
3 -> delayed market data
4 -> delayed frozen market data
*/
func (ic *IbClient) ReqMarketDataType(marketDataType int64) {
if ic.serverVersion < mMIN_SERVER_VER_REQ_MARKET_DATA_TYPE {
Expand Down Expand Up @@ -2611,13 +2650,12 @@ requestLoop:
}

nn, err := ic.writer.Write(req)
err = ic.writer.Flush()
log.Debug(nn, req)
if err != nil {
log.Print(err)
ic.writer.Reset(ic.conn)
ic.errChan <- err
}
ic.writer.Flush()
case <-ic.terminatedSignal:
break requestLoop
}
Expand All @@ -2629,20 +2667,21 @@ requestLoop:
//goReceive handle the msgBuf which is different from the offical.Not continuously read, but split first and then decode
func (ic *IbClient) goReceive() {
log.Info("Receiver START!")
defer log.Info("Receiver END!")
defer func() {
if err := recover(); err != nil {
log.Errorf("Receiver got unexpected error: %v", err)
log.Errorf("Receiver got unexpected error: %v... \ntry to reset the Receiver", err)
go ic.goReceive()
}
}()
defer log.Info("Receiver END!")
defer ic.wg.Done()

ic.wg.Add(1)

ic.reader.Reset(ic.conn)
scanner := bufio.NewScanner(ic.reader)
scanner.Split(scanFields)

scanLoop:
// scanLoop:
for scanner.Scan() {
msgBytes := scanner.Bytes()
ic.msgChan <- msgBytes
Expand All @@ -2653,16 +2692,16 @@ scanLoop:
}

err := scanner.Err()
if err, ok := err.(*net.OpError); ok && !err.Temporary() {
log.Panicf("Receiver Panic: %v", err)
return
} else if err != nil {
log.Errorf("Receiver Temporary Error: %v", err)
ic.errChan <- err
ic.reader.Reset(ic.conn)
goto scanLoop
if err, ok := err.(*net.OpError); ok {
if err.Temporary() {
// ic.errChan <- err
log.Panicf("Receiver Temporary Error: %v", err) // HELP: it is ok to panic if the error is temporary
// goto scanLoop
} else {
log.Panicf("Receiver Panic: %v", err)
}
} else {
log.Panicf("Scanner Panic: %v", scanner.Err())
log.Panicf("Scanner Panic: %v", err)
}
}

Expand All @@ -2673,17 +2712,22 @@ func (ic *IbClient) goDecode() {
defer ic.wg.Done()

ic.wg.Add(1)
msgBuf := NewMsgBuffer(nil)

decodeLoop:
for {
select {
case m := <-ic.msgChan:
msgBuf.Reset()
msgBuf.Write(m)
ic.decoder.interpret(msgBuf)
if l := len(m); l > MAX_MSG_LEN {
ic.wrapper.Error(NO_VALID_ID, BAD_LENGTH.code, fmt.Sprintf("%s:%d:%s", BAD_LENGTH.msg, l, m))
ic.Disconnect()
}

msgBuf := NewMsgBuffer(m) // FIXME: use object pool
go ic.decoder.interpret(msgBuf)
case e := <-ic.errChan:
log.Error(e)
case e := <-ic.decoder.errChan:
go ic.wrapper.Error(NO_VALID_ID, BAD_MESSAGE.code, BAD_MESSAGE.msg+e.Error())
case <-ic.terminatedSignal:
break decodeLoop
}
Expand Down
33 changes: 18 additions & 15 deletions client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package ibapi

import (
"runtime"
"strings"
"testing"
"time"

Expand All @@ -10,10 +12,11 @@ import (

func TestClient(t *testing.T) {
// log.SetLevel(log.DebugLevel)
runtime.GOMAXPROCS(4)
var err error
ibwrapper := new(Wrapper)
ic := NewIbClient(ibwrapper)
err = ic.Connect("192.168.2.226", 4002, 19)
err = ic.Connect("192.168.2.226", 4001, 19)
if err != nil {
log.Info("Connect failed:", err)
return
Expand All @@ -29,32 +32,32 @@ func TestClient(t *testing.T) {

ic.ReqCurrentTime()
// ic.ReqAutoOpenOrders(true)
// ic.ReqAccountUpdates(true, "")
ic.ReqAccountUpdates(true, "")
// ic.ReqExecutions(ic.GetReqID(), ExecutionFilter{})

hsi2003 := Contract{ContractID: 376399002, Symbol: "HSI", SecurityType: "FUT", Exchange: "HKFE"}
ic.ReqHistoricalData(ic.GetReqID(), &hsi2003, "", "4800 S", "1 min", "TRADES", false, 1, true, nil)
// ic.ReqMktDepth(ic.GetReqID(), &hsi1909, 5, true, nil)
ic.ReqContractDetails(ic.GetReqID(), &hsi2003)
// ic.ReqAllOpenOrders()
// ic.ReqMktData(ic.GetReqID(), &hsi1909, "", false, false, nil)
ic.ReqMktData(ic.GetReqID(), &hsi2003, "", false, false, nil)
// ic.ReqPositions()
// ic.ReqRealTimeBars(ic.GetReqID(), &hsi1909, 5, "TRADES", false, nil)

// tags := []string{"AccountType", "NetLiquidation", "TotalCashValue", "SettledCash",
// "AccruedCash", "BuyingPower", "EquityWithLoanValue",
// "PreviousEquityWithLoanValue", "GrossPositionValue", "ReqTEquity",
// "ReqTMargin", "SMA", "InitMarginReq", "MaintMarginReq", "AvailableFunds",
// "ExcessLiquidity", "Cushion", "FullInitMarginReq", "FullMaintMarginReq",
// "FullAvailableFunds", "FullExcessLiquidity", "LookAheadNextChange",
// "LookAheadInitMarginReq", "LookAheadMaintMarginReq",
// "LookAheadAvailableFunds", "LookAheadExcessLiquidity",
// "HighestSeverity", "DayTradesRemaining", "Leverage", "$LEDGER:ALL"}
// ic.ReqAccountSummary(ic.GetReqID(), "All", strings.Join(tags, ","))
tags := []string{"AccountType", "NetLiquidation", "TotalCashValue", "SettledCash",
"AccruedCash", "BuyingPower", "EquityWithLoanValue",
"PreviousEquityWithLoanValue", "GrossPositionValue", "ReqTEquity",
"ReqTMargin", "SMA", "InitMarginReq", "MaintMarginReq", "AvailableFunds",
"ExcessLiquidity", "Cushion", "FullInitMarginReq", "FullMaintMarginReq",
"FullAvailableFunds", "FullExcessLiquidity", "LookAheadNextChange",
"LookAheadInitMarginReq", "LookAheadMaintMarginReq",
"LookAheadAvailableFunds", "LookAheadExcessLiquidity",
"HighestSeverity", "DayTradesRemaining", "Leverage", "$LEDGER:ALL"}
ic.ReqAccountSummary(ic.GetReqID(), "All", strings.Join(tags, ","))
// ic.ReqFamilyCodes()
// ic.ReqMatchingSymbols(ic.GetReqID(), "HSI")
// ic.ReqScannerParameters()
// ic.ReqTickByTickData(ic.GetReqID(), &hsi1909, "Last", 5, false)
// ic.ReqTickByTickData(ic.GetReqID(), &hsi2003, "Last", 5, false)
// ic.ReqHistoricalTicks(ic.GetReqID(), &hsi1909, "20190916 09:15:00", "", 100, "Trades", false, false, nil)
// ic.ReqManagedAccts()
// ic.ReqSoftDollarTiers(ic.GetReqID())
Expand All @@ -79,7 +82,7 @@ func TestClient(t *testing.T) {
loop:
for {
select {
case <-time.After(time.Second * 20):
case <-time.After(time.Second * 60 * 60 * 24):
ic.Disconnect()
break loop
}
Expand Down
10 changes: 5 additions & 5 deletions const.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,13 @@ const (
mMIN_SERVER_VER_MKT_DEPTH_PRIM_EXCHANGE Version = 149
mMIN_SERVER_VER_COMPLETED_ORDERS Version = 150
mMIN_SERVER_VER_PRICE_MGMT_ALGO Version = 151
mMIN_SERVER_VER_STOCK_TYPE Version = 152
mMIN_SERVER_VER_ENCODE_MSG_ASCII7 Version = 153
mMIN_SERVER_VER_SEND_ALL_FAMILY_CODES Version = 154
mMIN_SERVER_VER_NO_DEFAULT_OPEN_CLOSE Version = 155
mMIN_SERVER_VER_STOCK_TYPE Version = 152
mMIN_SERVER_VER_ENCODE_MSG_ASCII7 Version = 153
mMIN_SERVER_VER_SEND_ALL_FAMILY_CODES Version = 154
mMIN_SERVER_VER_NO_DEFAULT_OPEN_CLOSE Version = 155

MIN_CLIENT_VER = 100
MAX_CLIENT_VER = mMIN_SERVER_VER_PRICE_MGMT_ALGO
MAX_CLIENT_VER = mMIN_SERVER_VER_NO_DEFAULT_OPEN_CLOSE
)

// tick const
Expand Down
4 changes: 2 additions & 2 deletions contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ type Contract struct {
Currency string
LocalSymbol string
TradingClass string
PrimaryExchange string
PrimaryExchange string // pick an actual (ie non-aggregate) exchange that the contract trades on. DO NOT SET TO SMART.
IncludeExpired bool
SecurityIDType string
SecurityIDType string // CUSIP;SEDOL;ISIN;RIC
SecurityID string

// combos les
Expand Down
3 changes: 2 additions & 1 deletion decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (d *ibDecoder) interpret(msgBuf *msgBuffer) {
defer func() {
if err := recover(); err != nil {
log.Errorf("Deocde error -> %v", err) //TODO: handle error
d.errChan <- err.(error)
}
}()

Expand All @@ -44,7 +45,7 @@ func (d *ibDecoder) interpret(msgBuf *msgBuffer) {
if processer, ok := d.msgID2process[IN(MsgID)]; ok {
processer(msgBuf)
} else {
log.Infof("MsgId: %v NOT FOUND!!!-> MsgBytes: %v", MsgID, msgBuf.Bytes())
log.Warnf("MsgId: %v NOT FOUND!!!-> MsgBytes: %v", MsgID, msgBuf.Bytes())
}

}
Expand Down
20 changes: 5 additions & 15 deletions order.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ type Order struct {
TriggerMethod int64 // 0=Default, 1=Double_Bid_Ask, 2=Last, 3=Double_Last, 4=Bid_Ask, 7=Last_or_Bid_Ask, 8=Mid-point
OutsideRTH bool
Hidden bool
GoodAfterTime string
GoodTillDate string
GoodAfterTime string // Format: 20060505 08:00:00 {time zone}
GoodTillDate string // Format: 20060505 08:00:00 {time zone}
OverridePercentageConstraints bool
Rule80A string // Individual = 'I', Agency = 'A', AgentOtherMember = 'W', IndividualPTIA = 'J', AgencyPTIA = 'U', AgentOtherMemberPTIA = 'M', IndividualPT = 'K', AgencyPT = 'Y', AgentOtherMemberPT = 'N'
AllOrNone bool
MinQty int64 `default:"UNSETINT"`
PercentOffset float64 `default:"UNSETFLOAT"`
PercentOffset float64 `default:"UNSETFLOAT"` // REL orders only
TrailStopPrice float64 `default:"UNSETFLOAT"`
TrailingPercent float64 `default:"UNSETFLOAT"`
TrailingPercent float64 `default:"UNSETFLOAT"` // TRAILLIMIT orders only
//---- financial advisors only -----
FAGroup string
FAProfile string
Expand All @@ -61,7 +61,7 @@ type Order struct {
Origin int64 // 0=Customer, 1=Firm
ShortSaleSlot int64 // 1 if you hold the shares, 2 if they will be delivered from elsewhere. Only for Action=SSHORT
DesignatedLocation string // used only when shortSaleSlot=2
ExemptCode int64
ExemptCode int64 `default:-1`
// ---------------------------------
// ------- SMART routing only ------
DiscretionaryAmount float64
Expand Down Expand Up @@ -331,13 +331,3 @@ func NewMarketOrder(action string, quantity float64) *Order {

return o
}

func NewOrderState() *OrderState {
orderState := &OrderState{}

orderState.Commission = UNSETFLOAT
orderState.MinCommission = UNSETFLOAT
orderState.MaxCommission = UNSETFLOAT

return orderState
}
Loading

0 comments on commit 59ca2b1

Please sign in to comment.