Skip to content

Commit

Permalink
[golang]feat: support reader for sse response
Browse files Browse the repository at this point in the history
  • Loading branch information
yndu13 committed Sep 13, 2024
1 parent 77c287f commit 0cca81e
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 0 deletions.
73 changes: 73 additions & 0 deletions golang/service/service.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package service

import (
"bufio"
"bytes"
"crypto/md5"
"encoding/hex"
Expand Down Expand Up @@ -72,6 +73,40 @@ type RuntimeOptions struct {
var processStartTime int64 = time.Now().UnixNano() / 1e6
var seqId int64 = 0

type SSEEvent struct {
ID *string
Event *string
Data *string
Retry *int
}

func parseEvent(eventLines []string) (SSEEvent, error) {
var event SSEEvent

for _, line := range eventLines {
if strings.HasPrefix(line, "data:") {
event.Data = tea.String(tea.StringValue(event.Data) + strings.TrimPrefix(line, "data:") + "\n")
} else if strings.HasPrefix(line, "id:") {
id := strings.TrimPrefix(line, "id:")
event.ID = tea.String(strings.Trim(id, " "))
} else if strings.HasPrefix(line, "event:") {
eventName := strings.TrimPrefix(line, "event:")
event.Event = tea.String(strings.Trim(eventName, " "))
} else if strings.HasPrefix(line, "retry:") {
trimmedLine := strings.TrimPrefix(line, "retry:")
trimmedLine = strings.Trim(trimmedLine, " ")
retryValue, _err := strconv.Atoi(trimmedLine)
if _err != nil {
return event, fmt.Errorf("retry %v is not a int", trimmedLine)
}
event.Retry = tea.Int(retryValue)
}
}
data := strings.TrimRight(tea.StringValue(event.Data), "\n")
event.Data = tea.String(strings.Trim(data, " "))
return event, nil
}

func getGID() uint64 {
// https://blog.sgmansfield.com/2015/12/goroutine-ids/
b := make([]byte, 64)
Expand Down Expand Up @@ -555,3 +590,41 @@ func ToArray(in interface{}) []map[string]interface{} {
}
return tmp
}

func ReadAsSSE(body io.ReadCloser) (<-chan SSEEvent, <-chan error) {
eventChannel := make(chan SSEEvent)
errorChannel := make(chan error)

go func() {
defer body.Close()
defer close(eventChannel)

reader := bufio.NewReader(body)
var eventLines []string

for {
line, err := reader.ReadString('\n')
if err == io.EOF {
break
}
if err != nil {
errorChannel <- err
}

line = strings.TrimRight(line, "\n")
if line == "" {
if len(eventLines) > 0 {
event, err := parseEvent(eventLines)
if err != nil {
errorChannel <- err
}
eventChannel <- event
eventLines = []string{}
}
continue
}
eventLines = append(eventLines, line)
}
}()
return eventChannel, errorChannel
}
44 changes: 44 additions & 0 deletions golang/service/service_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package service

import (
"fmt"
"io/ioutil"
"net/http"
"strings"
"testing"
"time"
Expand All @@ -10,6 +12,22 @@ import (
"github.com/alibabacloud-go/tea/utils"
)

type mockHandler struct {
content string
}

func (mock *mockHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Connection", "keep-alive")
w.WriteHeader(http.StatusOK)
flusher, _ := w.(http.Flusher)
for i := 0; i < 5; i++ {
time.Sleep(1 * time.Second)
w.Write([]byte("data: {count:" + fmt.Sprint(i) + "}\nevent: flow\nid: sse-test\nretry: 3\n:heartbeat\n\n"))
flusher.Flush()
}
}

func Test_SetFunc(t *testing.T) {
runtime := new(RuntimeOptions).SetAutoretry(true).
SetBackoffPeriod(10).
Expand Down Expand Up @@ -126,6 +144,32 @@ func Test_ReadAsJSON(t *testing.T) {
utils.AssertEqual(t, "test", res["cleint"])
}

func Test_ReadAsSSE(t *testing.T) {
mux := http.NewServeMux()
mux.Handle("/", &mockHandler{})
var server *http.Server
server = &http.Server{
Addr: ":9000",
WriteTimeout: time.Second * 20,
Handler: mux,
}
go server.ListenAndServe()

request, err := http.NewRequest("GET", "http://127.0.0.1:9000", strings.NewReader("test"))
utils.AssertNil(t, err)
response, err := http.DefaultClient.Do(request)
utils.AssertNil(t, err)
events, _ := ReadAsSSE(response.Body)
i := 0
for event := range events {
utils.AssertEqual(t, "sse-test", tea.StringValue(event.ID))
utils.AssertEqual(t, "flow", tea.StringValue(event.Event))
utils.AssertEqual(t, "{count:"+fmt.Sprint(i)+"}", tea.StringValue(event.Data))
utils.AssertEqual(t, 3, tea.IntValue(event.Retry))
i++
}
}

func Test_GetNonce(t *testing.T) {
nonce := GetNonce()
utils.AssertEqual(t, 32, len(tea.StringValue(nonce)))
Expand Down
7 changes: 7 additions & 0 deletions main.tea
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ static async function readAsJSON(stream: readable): any {
return parseJSON(readAsString(stream));
}

/**
* Read data from a readable stream, and parse it by event iterator format
* @param stream the readable stream
* @return the parsed result
*/
static async function readAsSSE(stream: readable): asyncIterator[object];

/**
* Generate a nonce string
* @return the nonce string
Expand Down

0 comments on commit 0cca81e

Please sign in to comment.