Skip to content

Commit

Permalink
Fix tcoudp trigger take much of CPU (#143)
Browse files Browse the repository at this point in the history
* FIx tcoudp trigger take much of CPU

* listen only once if no delimiter

* revert to keep previous code

* Fixed that we need send data first as long as data is not empty
  • Loading branch information
lixingwang authored Aug 17, 2021
1 parent 1dc6403 commit 11a8a44
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 37 deletions.
2 changes: 1 addition & 1 deletion trigger/tcpudp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ flogo install github.com/project-flogo/contrib/trigger/tcpudp
| network | string | Network type. Supported types: tcp,tcp4,tcp6,udp,udp4,udp6 - ***REQUIRED***
| host | string | Host IP or DNS resolvable name
| port | string | Port to listen on - ***REQUIRED***
| delimiter | string | Delimiter for read and write. If not set, trigger will read data until EOF
| delimiter | string | Delimiter for read and write. If not set, trigger will take line delimiter '\n' as default value
| timeout | integer | Read and Write timeout in milliseconds. To disable timeout, set value to 0.


Expand Down
73 changes: 37 additions & 36 deletions trigger/tcpudp/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package tcpudp

import (
"bufio"
"bytes"
"context"
"errors"
"io"
Expand Down Expand Up @@ -111,9 +110,7 @@ func (t *Trigger) handleNewConnection(conn net.Conn) {

//Gather connection list for later cleanup
t.connections = append(t.connections, conn)

for {

if t.settings.TimeOut > 0 {
t.logger.Info("Setting timeout: ", t.settings.TimeOut)
conn.SetDeadline(time.Now().Add(time.Duration(t.settings.TimeOut) * time.Millisecond))
Expand All @@ -123,9 +120,13 @@ func (t *Trigger) handleNewConnection(conn net.Conn) {

if t.delimiter != 0 {
data, err := bufio.NewReader(conn).ReadBytes(t.delimiter)
if len(data) > 0 {
output.Data = string(data)
t.triggerFlow(conn, output)
}
if err != nil {
errString := err.Error()
if !strings.Contains(errString, "use of closed network connection") {
if !strings.Contains(errString, "use of closed network connection") && err != io.EOF {
t.logger.Error("Error reading data from connection: ", err.Error())
} else {
t.logger.Info("Connection is closed.")
Expand All @@ -134,16 +135,16 @@ func (t *Trigger) handleNewConnection(conn net.Conn) {
// Return if not timeout error
return
}

} else {
output.Data = string(data[:len(data)-1])
}
} else {
var buf bytes.Buffer
_, err := io.Copy(&buf, conn)
data, err := bufio.NewReader(conn).ReadBytes('\n')
if len(data) > 0 {
output.Data = string(data)
t.triggerFlow(conn, output)
}
if err != nil {
errString := err.Error()
if !strings.Contains(errString, "use of closed network connection") {
if !strings.Contains(errString, "use of closed network connection") && err != io.EOF {
t.logger.Error("Error reading data from connection: ", err.Error())
} else {
t.logger.Info("Connection is closed.")
Expand All @@ -152,38 +153,38 @@ func (t *Trigger) handleNewConnection(conn net.Conn) {
// Return if not timeout error
return
}
} else {
output.Data = string(buf.Bytes())
}
}
}
}

if output.Data != "" {
var replyData []string
for i := 0; i < len(t.handlers); i++ {
results, err := t.handlers[i].Handle(context.Background(), output)
if err != nil {
t.logger.Error("Error invoking action : ", err.Error())
continue
}
func (t *Trigger) triggerFlow(conn net.Conn, output *Output) {
if output.Data != "" {
var replyData []string
for i := 0; i < len(t.handlers); i++ {
results, err := t.handlers[i].Handle(context.Background(), output)
if err != nil {
t.logger.Error("Error invoking action : ", err.Error())
continue
}

reply := &Reply{}
err = reply.FromMap(results)
if err != nil {
t.logger.Error("Failed to convert flow output : ", err.Error())
continue
}
if reply.Reply != "" {
replyData = append(replyData, reply.Reply)
}
reply := &Reply{}
err = reply.FromMap(results)
if err != nil {
t.logger.Error("Failed to convert flow output : ", err.Error())
continue
}
if reply.Reply != "" {
replyData = append(replyData, reply.Reply)
}
}

if len(replyData) > 0 {
replyToSend := strings.Join(replyData, string(t.delimiter))
// Send a response back to client contacting us.
_, err := conn.Write([]byte(replyToSend + "\n"))
if err != nil {
t.logger.Error("Failed to write to connection : ", err.Error())
}
if len(replyData) > 0 {
replyToSend := strings.Join(replyData, string(t.delimiter))
// Send a response back to client contacting us.
_, err := conn.Write([]byte(replyToSend + "\n"))
if err != nil {
t.logger.Error("Failed to write to connection : ", err.Error())
}
}
}
Expand Down

0 comments on commit 11a8a44

Please sign in to comment.