Skip to content

Commit

Permalink
improve and debug!
Browse files Browse the repository at this point in the history
  • Loading branch information
hadrianl committed Sep 18, 2019
1 parent 1da5ac8 commit 9d8990e
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 163 deletions.
98 changes: 64 additions & 34 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package ibapi
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"errors"
"net"
"strconv"
"strings"
Expand Down Expand Up @@ -43,6 +43,7 @@ type IbClient struct {
connTime time.Time
extraAuth bool
wg sync.WaitGroup
ctx context.Context
}

// NewIbClient create IbClient with wrapper
Expand Down Expand Up @@ -79,21 +80,27 @@ func (ic *IbClient) GetReqID() int64 {
// Set the Wrapper to IbClient
func (ic *IbClient) SetWrapper(wrapper IbWrapper) {
ic.wrapper = wrapper
log.Debug("setWrapper:", wrapper)
log.Infof("Set Wrapper: %v", wrapper)
ic.decoder = &ibDecoder{wrapper: ic.wrapper}
}

// Set the Connection Context to IbClient
func (ic *IbClient) SetContext(ctx context.Context) {
ic.ctx = ctx
}

// Connect try to connect the TWS or IB GateWay, after this, handshake should be call to get the connection done
func (ic *IbClient) Connect(host string, port int, clientID int64) error {

ic.host = host
ic.port = port
ic.clientID = clientID
ic.setConnState(CONNECTING)
if err := ic.conn.connect(host, port); err != nil {
return err
ic.wrapper.Error(NO_VALID_ID, CONNECT_FAIL.code, CONNECT_FAIL.msg)
return CONNECT_FAIL
}

ic.setConnState(CONNECTING)
return nil
// 连接后开始
}
Expand Down Expand Up @@ -129,7 +136,7 @@ func (ic *IbClient) startAPI() error {
startAPI = makeMsgBytes(int64(mSTART_API), int64(v), ic.clientID)
}

log.Debug("Start API:", startAPI)
log.Info("Start API:", startAPI)
if _, err := ic.writer.Write(startAPI); err != nil {
return err
}
Expand All @@ -141,8 +148,10 @@ func (ic *IbClient) startAPI() error {

// HandShake with the TWS or GateWay to ensure the version
func (ic *IbClient) HandShake() error {
log.Debug("Try to handShake with TWS or GateWay...")
log.Info("Try to handShake with TWS or GateWay...")
var msg bytes.Buffer
var msgBytes []byte
var err error
head := []byte("API\x00")
minVer := []byte(strconv.FormatInt(int64(MIN_CLIENT_VER), 10))
maxVer := []byte(strconv.FormatInt(int64(MAX_CLIENT_VER), 10))
Expand All @@ -153,30 +162,30 @@ func (ic *IbClient) HandShake() error {
msg.Write(head)
msg.Write(sizeofCV)
msg.Write(clientVersion)
log.Debug("HandShake Init...")
if _, err := ic.writer.Write(msg.Bytes()); err != nil {
log.Info("HandShake Init...")
if _, err = ic.writer.Write(msg.Bytes()); err != nil {
return err
}

if err := ic.writer.Flush(); err != nil {
if err = ic.writer.Flush(); err != nil {
return err
}

log.Debug("Recv ServerInitInfo...")
if msgBytes, err := readMsgBytes(ic.reader); err != nil {
log.Info("Recv ServerInitInfo...")
if msgBytes, err = readMsgBytes(ic.reader); err != nil {
return err
} else {
serverInfo := splitMsgBytes(msgBytes)
v, _ := strconv.Atoi(string(serverInfo[0]))
ic.serverVersion = Version(v)
ic.connTime = bytesToTime(serverInfo[1])
ic.decoder.setVersion(ic.serverVersion) // Init Decoder
ic.decoder.setmsgID2process()
log.Info("ServerVersion:", ic.serverVersion)
log.Info("ConnectionTime:", ic.connTime)
}

if err := ic.startAPI(); err != nil {
serverInfo := splitMsgBytes(msgBytes)
v, _ := strconv.Atoi(string(serverInfo[0]))
ic.serverVersion = Version(v)
ic.connTime = bytesToTime(serverInfo[1])
ic.decoder.setVersion(ic.serverVersion) // Init Decoder
ic.decoder.setmsgID2process()
log.Info("ServerVersion:", ic.serverVersion)
log.Info("ConnectionTime:", ic.connTime)

if err = ic.startAPI(); err != nil {
return err
}

Expand All @@ -190,11 +199,11 @@ comfirmReadyLoop:
f := splitMsgBytes(m)
MsgID, _ := strconv.ParseInt(string(f[0]), 10, 64)

msgBuf := &msgBuffer{
bytes.NewBuffer(m)}
msgBuf := NewMsgBuffer(m)
// msgBuf := &msgBuffer{
// bytes.NewBuffer(m)}

ic.decoder.interpret(msgBuf)
log.Debug(MsgID)
for i, ID := range comfirmMsgIDs {
if MsgID == int64(ID) {
comfirmMsgIDs = append(comfirmMsgIDs[:i], comfirmMsgIDs[i+1:]...)
Expand All @@ -205,7 +214,13 @@ comfirmReadyLoop:
ic.wrapper.ConnectAck()
break comfirmReadyLoop
}
case <-time.After(10 * time.Second):
case <-time.After(60 * time.Second):
ic.setConnState(DISCONNECTED)
ic.wrapper.Error(NO_VALID_ID, ALREADY_CONNECTED.code, ALREADY_CONNECTED.msg)
return ALREADY_CONNECTED
case <-ic.ctx.Done():
ic.setConnState(DISCONNECTED)
ic.wrapper.Error(NO_VALID_ID, ALREADY_CONNECTED.code, ALREADY_CONNECTED.msg)
return ALREADY_CONNECTED
}
}
Expand All @@ -214,7 +229,7 @@ comfirmReadyLoop:
}

func (ic *IbClient) reset() {
log.Debug("reset IbClient.")
log.Info("reset IbClient.")
ic.reqIDSeq = 0
ic.conn = &IbConnection{}
ic.conn.reset()
Expand All @@ -226,6 +241,10 @@ func (ic *IbClient) reset() {
ic.terminatedSignal = make(chan int, 3)
ic.wg = sync.WaitGroup{}

if ic.ctx == nil {
ic.ctx = context.TODO()
}

}

// ---------------req func ----------------------------------------------
Expand Down Expand Up @@ -436,7 +455,7 @@ func (ic *IbClient) ReqTickByTickData(reqID int64, contract *Contract, tickType
fields = append(fields, numberOfTicks, ignoreSize)
}

msg := makeMsgBytes(fields)
msg := makeMsgBytes(fields...)

ic.reqChan <- msg
}
Expand Down Expand Up @@ -1197,9 +1216,18 @@ func (ic *IbClient) ReqGlobalCancel() {
ic.reqChan <- msg
}

func (ic *IbClient) ReqIDs(numIDs int) {
/*
Call this function to request from TWS the next valid ID that
can be used when placing an order. After calling this function, the
nextValidId() event will be triggered, and the id returned is that next
valid ID. That ID will reflect any autobinding that has occurred (which
generates new IDs and increments the next valid ID therein).
numIds:int - deprecated
*/
func (ic *IbClient) ReqIDs() {
v := 1
msg := makeMsgBytes(mREQ_IDS, v, numIDs)
msg := makeMsgBytes(mREQ_IDS, v, 0)

ic.reqChan <- msg
}
Expand Down Expand Up @@ -2635,13 +2663,14 @@ func (ic *IbClient) goDecode() {
defer ic.wg.Done()

ic.wg.Add(1)
msgBuf := NewMsgBuffer(nil)

decodeLoop:
for {
select {
case m := <-ic.msgChan:
msgBuf := &msgBuffer{
bytes.NewBuffer(m)}
msgBuf.Reset()
msgBuf.Write(m)
ic.decoder.interpret(msgBuf)
case e := <-ic.errChan:
log.Error(e)
Expand All @@ -2656,10 +2685,11 @@ decodeLoop:

// Run make the event loop run, all make sense after run!
func (ic *IbClient) Run() error {
if ic.conn.state == DISCONNECTED {
return errors.New("ibClient is DISCONNECTED")
if !ic.IsConnected() {
ic.wrapper.Error(NO_VALID_ID, NOT_CONNECTED.code, NOT_CONNECTED.msg)
return NOT_CONNECTED
}
log.Println("RUN Client")
log.Info("RUN Client")

go ic.goRequest()
go ic.goDecode()
Expand Down
73 changes: 60 additions & 13 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
)

func TestClient(t *testing.T) {

// log.SetLevel(log.DebugLevel)
var err error
ibwrapper := Wrapper{}
ibwrapper := new(Wrapper)
ic := NewIbClient(ibwrapper)
err = ic.Connect("192.168.2.226", 4002, 19)
if err != nil {
log.Panic("Connect failed:", err)
log.Info("Connect failed:", err)
return
}

Expand All @@ -24,17 +24,64 @@ func TestClient(t *testing.T) {
log.Println("HandShake failed:", err)
return
}
ic.Run()

ic.ReqCurrentTime()
ic.ReqAutoOpenOrders(true)
ic.ReqAccountUpdates(true, "")
ic.ReqExecutions(ic.GetReqID(), ExecutionFilter{})
// ic.ReqCurrentTime()
// ic.ReqAutoOpenOrders(true)
// ic.ReqAccountUpdates(true, "")
// ic.ReqExecutions(ic.GetReqID(), ExecutionFilter{})

hsi1909 := Contract{ContractID: 351872027, Symbol: "HSI", SecurityType: "FUT", Exchange: "HKFE"}
ic.ReqHistoricalData(ic.GetReqID(), &hsi1909, "", "4800 S", "1 min", "TRADES", false, 1, true, nil)
ic.ReqMktDepth(ic.GetReqID(), &hsi1909, 5, true, nil)
ic.Run()
// hsi1909 := Contract{ContractID: 351872027, Symbol: "HSI", SecurityType: "FUT", Exchange: "HKFE"}
// ic.ReqHistoricalData(ic.GetReqID(), &hsi1909, "", "4800 S", "1 min", "TRADES", false, 1, true, nil)
// ic.ReqMktDepth(ic.GetReqID(), &hsi1909, 5, true, nil)
// ic.ReqContractDetails(ic.GetReqID(), &hsi1909)
// ic.ReqAllOpenOrders()
// ic.ReqMktData(ic.GetReqID(), &hsi1909, "", false, false, nil)
// ic.ReqPositions()
// ic.ReqRealTimeBars(ic.GetReqID(), &hsi1909, 5, "TRADES", false, nil)

// tags := []string{"AccountType", "NetLiquidation", "TotalCashValue", "SettledCash",
// "AccruedCash", "BuyingPower", "EquityWithLoanValue",
// "PreviousEquityWithLoanValue", "GrossPositionValue", "ReqTEquity",
// "ReqTMargin", "SMA", "InitMarginReq", "MaintMarginReq", "AvailableFunds",
// "ExcessLiquidity", "Cushion", "FullInitMarginReq", "FullMaintMarginReq",
// "FullAvailableFunds", "FullExcessLiquidity", "LookAheadNextChange",
// "LookAheadInitMarginReq", "LookAheadMaintMarginReq",
// "LookAheadAvailableFunds", "LookAheadExcessLiquidity",
// "HighestSeverity", "DayTradesRemaining", "Leverage", "$LEDGER:ALL"}
// ic.ReqAccountSummary(ic.GetReqID(), "All", strings.Join(tags, ","))
// ic.ReqFamilyCodes()
// ic.ReqMatchingSymbols(ic.GetReqID(), "HSI")
// ic.ReqScannerParameters()
// ic.ReqTickByTickData(ic.GetReqID(), &hsi1909, "Last", 5, false)
// ic.ReqHistoricalTicks(ic.GetReqID(), &hsi1909, "20190916 09:15:00", "", 100, "Trades", false, false, nil)
// ic.ReqManagedAccts()
// ic.ReqSoftDollarTiers(ic.GetReqID())
// ic.ReqNewsProviders()
// ic.ReqMarketDataType(1)
// ic.ReqPnLSingle(ic.GetReqID(), "DU1382837", "", 351872027)
// ic.ReqNewsBulletins(true)
// ic.ReqSmartComponents(ic.GetReqID(), "a6")
// ic.ReqMktDepthExchanges()
// ic.ReqMatchingSymbols(ic.GetReqID(), "HSI")
// ic.ReqSecDefOptParams(ic.GetReqID(), "HSI", "", "IND", 1328298)
// ic.ReqHistogramData(ic.GetReqID(), &hsi1909, false, "3 days")
// ic.ReqGlobalCancel()
// ic.ReqIDs()
// ic.ReqAccountUpdatesMulti(ic.GetReqID(), "DU1382837", "", true)
// ic.ReqPositionsMulti(ic.GetReqID(), "DU1382837", "")
// lmtOrder := NewLimitOrder("BUY", 26640, 1)
// mktOrder := NewMarketOrder("BUY", 1)
// ic.PlaceOrder(ibwrapper.GetNextOrderID(), &hsi1909, lmtOrder)
// ic.CancelOrder(ibwrapper.OrderID() - 1)

loop:
for {
select {
case <-time.After(time.Second * 20):
ic.Disconnect()
break loop
}
}

time.Sleep(time.Second * 10)
ic.Disconnect()
}
18 changes: 9 additions & 9 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,21 @@ type IbConnection struct {
numMsgRecv int
}

func (ibconn *IbConnection) Write(msg []byte) (int, error) {
n, err := ibconn.conn.Write(msg)
func (ibconn *IbConnection) Write(bs []byte) (int, error) {
n, err := ibconn.conn.Write(bs)

ibconn.numBytesSent += n
ibconn.numMsgSent++

log.WithFields(log.Fields{"func": "write", "count": n}).Debug(msg)
log.WithFields(log.Fields{"func": "write", "count": n}).Debug(bs)
return n, err
}

func (ibconn *IbConnection) Read(b []byte) (int, error) {
n, err := ibconn.conn.Read(b)
func (ibconn *IbConnection) Read(bs []byte) (int, error) {
n, err := ibconn.conn.Read(bs)
ibconn.numBytesRecv += n
ibconn.numMsgRecv++
log.WithFields(log.Fields{"func": "read", "count": n}).Debug(b)
log.WithFields(log.Fields{"func": "read", "count": n}).Debug(bs)

return n, err
}
Expand Down Expand Up @@ -65,16 +65,16 @@ func (ibconn *IbConnection) connect(host string, port int) error {
server := ibconn.host + ":" + strconv.Itoa(port)
addr, err = net.ResolveTCPAddr("tcp4", server)
if err != nil {
log.Printf("ResolveTCPAddr Error: %v", err)
log.Errorf("ResolveTCPAddr Error: %v", err)
return err
}
ibconn.conn, err = net.DialTCP("tcp4", nil, addr)
if err != nil {
log.Printf("DialTCP Error: %v", err)
log.Errorf("DialTCP Error: %v", err)
return err
}

log.Println("TCP Socket Connected to:", ibconn.conn.RemoteAddr())
log.Debugf("TCP Socket Connected to: %v", ibconn.conn.RemoteAddr())

return err
}
Loading

0 comments on commit 9d8990e

Please sign in to comment.