diff --git a/golang/service/service.go b/golang/service/service.go index 3a323f1..071459c 100644 --- a/golang/service/service.go +++ b/golang/service/service.go @@ -1,6 +1,7 @@ package service import ( + "bufio" "bytes" "crypto/md5" "encoding/hex" @@ -72,6 +73,40 @@ type RuntimeOptions struct { var processStartTime int64 = time.Now().UnixNano() / 1e6 var seqId int64 = 0 +type SSEEvent struct { + ID *string + Event *string + Data *string + Retry *int +} + +func parseEvent(eventLines []string) (SSEEvent, error) { + var event SSEEvent + + for _, line := range eventLines { + if strings.HasPrefix(line, "data:") { + event.Data = tea.String(tea.StringValue(event.Data) + strings.TrimPrefix(line, "data:") + "\n") + } else if strings.HasPrefix(line, "id:") { + id := strings.TrimPrefix(line, "id:") + event.ID = tea.String(strings.Trim(id, " ")) + } else if strings.HasPrefix(line, "event:") { + eventName := strings.TrimPrefix(line, "event:") + event.Event = tea.String(strings.Trim(eventName, " ")) + } else if strings.HasPrefix(line, "retry:") { + trimmedLine := strings.TrimPrefix(line, "retry:") + trimmedLine = strings.Trim(trimmedLine, " ") + retryValue, _err := strconv.Atoi(trimmedLine) + if _err != nil { + return event, fmt.Errorf("retry %v is not a int", trimmedLine) + } + event.Retry = tea.Int(retryValue) + } + } + data := strings.TrimRight(tea.StringValue(event.Data), "\n") + event.Data = tea.String(strings.Trim(data, " ")) + return event, nil +} + func getGID() uint64 { // https://blog.sgmansfield.com/2015/12/goroutine-ids/ b := make([]byte, 64) @@ -555,3 +590,41 @@ func ToArray(in interface{}) []map[string]interface{} { } return tmp } + +func ReadAsSSE(body io.ReadCloser) (<-chan SSEEvent, <-chan error) { + eventChannel := make(chan SSEEvent) + errorChannel := make(chan error) + + go func() { + defer body.Close() + defer close(eventChannel) + + reader := bufio.NewReader(body) + var eventLines []string + + for { + line, err := reader.ReadString('\n') + if err == io.EOF { + break + } + if err != nil { + errorChannel <- err + } + + line = strings.TrimRight(line, "\n") + if line == "" { + if len(eventLines) > 0 { + event, err := parseEvent(eventLines) + if err != nil { + errorChannel <- err + } + eventChannel <- event + eventLines = []string{} + } + continue + } + eventLines = append(eventLines, line) + } + }() + return eventChannel, errorChannel +} diff --git a/golang/service/service_test.go b/golang/service/service_test.go index 8b8a8a4..c8d9151 100644 --- a/golang/service/service_test.go +++ b/golang/service/service_test.go @@ -1,7 +1,9 @@ package service import ( + "fmt" "io/ioutil" + "net/http" "strings" "testing" "time" @@ -10,6 +12,22 @@ import ( "github.com/alibabacloud-go/tea/utils" ) +type mockHandler struct { + content string +} + +func (mock *mockHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Connection", "keep-alive") + w.WriteHeader(http.StatusOK) + flusher, _ := w.(http.Flusher) + for i := 0; i < 5; i++ { + time.Sleep(1 * time.Second) + w.Write([]byte("data: {count:" + fmt.Sprint(i) + "}\nevent: flow\nid: sse-test\nretry: 3\n:heartbeat\n\n")) + flusher.Flush() + } +} + func Test_SetFunc(t *testing.T) { runtime := new(RuntimeOptions).SetAutoretry(true). SetBackoffPeriod(10). @@ -126,6 +144,32 @@ func Test_ReadAsJSON(t *testing.T) { utils.AssertEqual(t, "test", res["cleint"]) } +func Test_ReadAsSSE(t *testing.T) { + mux := http.NewServeMux() + mux.Handle("/", &mockHandler{}) + var server *http.Server + server = &http.Server{ + Addr: ":9000", + WriteTimeout: time.Second * 20, + Handler: mux, + } + go server.ListenAndServe() + + request, err := http.NewRequest("GET", "http://127.0.0.1:9000", strings.NewReader("test")) + utils.AssertNil(t, err) + response, err := http.DefaultClient.Do(request) + utils.AssertNil(t, err) + events, _ := ReadAsSSE(response.Body) + i := 0 + for event := range events { + utils.AssertEqual(t, "sse-test", tea.StringValue(event.ID)) + utils.AssertEqual(t, "flow", tea.StringValue(event.Event)) + utils.AssertEqual(t, "{count:"+fmt.Sprint(i)+"}", tea.StringValue(event.Data)) + utils.AssertEqual(t, 3, tea.IntValue(event.Retry)) + i++ + } +} + func Test_GetNonce(t *testing.T) { nonce := GetNonce() utils.AssertEqual(t, 32, len(tea.StringValue(nonce))) diff --git a/main.tea b/main.tea index 4332b2e..a418201 100644 --- a/main.tea +++ b/main.tea @@ -75,6 +75,13 @@ static async function readAsJSON(stream: readable): any { return parseJSON(readAsString(stream)); } +/** + * Read data from a readable stream, and parse it by event iterator format + * @param stream the readable stream + * @return the parsed result + */ +static async function readAsSSE(stream: readable): asyncIterator[object]; + /** * Generate a nonce string * @return the nonce string