From c5de052890003e2d4b725319ee30bcc2f37b85cc Mon Sep 17 00:00:00 2001 From: softwarespot Date: Sun, 20 Oct 2024 14:19:05 +0300 Subject: [PATCH] Initial commit --- .github/workflows/go.yml | 23 +++ .gitignore | 26 +++ LICENSE | 20 +++ Makefile | 4 + README.md | 55 ++++++ config.go | 42 +++++ examples/custom_event_server/main.go | 83 +++++++++ examples/event_server/main.go | 27 +++ examples/ping_cmd_server/main.go | 162 ++++++++++++++++++ go.mod | 5 + go.sum | 2 + handler.go | 243 +++++++++++++++++++++++++++ handler_example_test.go | 27 +++ handler_test.go | 77 +++++++++ helpers_test.go | 30 ++++ 15 files changed, 826 insertions(+) create mode 100644 .github/workflows/go.yml create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 Makefile create mode 100644 README.md create mode 100644 config.go create mode 100644 examples/custom_event_server/main.go create mode 100644 examples/event_server/main.go create mode 100644 examples/ping_cmd_server/main.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 handler.go create mode 100644 handler_example_test.go create mode 100644 handler_test.go create mode 100644 helpers_test.go diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml new file mode 100644 index 0000000..a1e466b --- /dev/null +++ b/.github/workflows/go.yml @@ -0,0 +1,23 @@ +# This workflow will build a golang project +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go + +name: Go +on: [push, pull_request] + +jobs: + test: + strategy: + matrix: + go-version: [1.23.x] + + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: ${{ matrix.go-version }} + + - name: Test + run: go test -cover -v ./... diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9d8409a --- /dev/null +++ b/.gitignore @@ -0,0 +1,26 @@ +# Taken from URL: https://github.com/github/gitignore/blob/main/Go.gitignore +# If you prefer the allow list template instead of the deny list, see community template: +# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore +# +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# Go workspace file +go.work +go.work.sum + +# env file +.env diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..1b2ef48 --- /dev/null +++ b/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2024 SoftwareSpot Apps + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..4d0ac76 --- /dev/null +++ b/Makefile @@ -0,0 +1,4 @@ +test: + go test -cover -v ./... + +.PHONY: test diff --git a/README.md b/README.md new file mode 100644 index 0000000..97b2695 --- /dev/null +++ b/README.md @@ -0,0 +1,55 @@ +# Server-Sent Events (SSE) handler + +[![Go Reference](https://pkg.go.dev/badge/github.com/softwarespot/sse.svg)](https://pkg.go.dev/github.com/softwarespot/sse) ![Go Tests](https://github.com/softwarespot/replay/actions/workflows/go.yml/badge.svg) + +**Server-Sent Events (SSE) handler** is a generic and [http.Handler](https://pkg.go.dev/net/http#Handler) compliant module, that implements real-time event streaming from a server to web clients using the [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) protocol. It's a robust module for managing multiple client connections, broadcasting events, and handling client registrations and unregistrations efficiently. + +Examples of using this module can be found from the [./examples](./examples/) directory. + +## Prerequisites + +- Go 1.23.0 or above + +## Installation + +```bash +go get -u github.com/softwarespot/sse +``` + +## Usage + +A basic example of using **SSE**. + +```Go +package main + +import ( + "fmt" + "net/http" + "time" + + "github.com/softwarespot/sse" +) + +func main() { + // Use the default configuration + h := sse.New[int](nil) + defer h.Close() + + go func() { + var evt int + for { + fmt.Println("sse handler: broadcast event", h.Broadcast(evt)) + time.Sleep(64 * time.Millisecond) + evt++ + } + }() + + http.Handle("/events", h) + http.ListenAndServe(":3000", nil) +} +``` + +## License + +The code has been licensed under the [MIT](https://opensource.org/license/mit) license. diff --git a/config.go b/config.go new file mode 100644 index 0000000..e4adfe4 --- /dev/null +++ b/config.go @@ -0,0 +1,42 @@ +package sse + +import "time" + +// Config defines the configuration settings for the Server-Sent Events (SSE) handler. +type Config[T any] struct { + // How often to flush the events to the connected clients. Default is 256ms + FlushFrequency time.Duration + + // How long to wait for all connected client to gracefully close. Default is 30s + CloseTimeout time.Duration + + // Replay events when a client connects + Replay struct { + // How many events to send in chunks, when a client connects. Default is 256 events + Initial int + + // How many events to keep in memory. Default is 2048 events + Maximum int + + // How long an event should be kept in memory for. Default is 30s + Expiry time.Duration + } + + // Events encoder function, which returns a slice of bytes that will then be converted to a string. Default is json.Marshal() + Encoder func([]T) ([]byte, error) +} + +// NewConfig initializes a configuration instance with reasonable defaults. +func NewConfig[T any]() *Config[T] { + cfg := &Config[T]{} + cfg.FlushFrequency = 256 * time.Millisecond + cfg.CloseTimeout = 30 * time.Second + cfg.Replay.Initial = 256 + cfg.Replay.Maximum = 2048 + cfg.Replay.Expiry = 30 * time.Second + + // Use the default encoder + cfg.Encoder = nil + + return cfg +} diff --git a/examples/custom_event_server/main.go b/examples/custom_event_server/main.go new file mode 100644 index 0000000..2427c17 --- /dev/null +++ b/examples/custom_event_server/main.go @@ -0,0 +1,83 @@ +package main + +import ( + "context" + "crypto/rand" + "fmt" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/softwarespot/sse" +) + +type CustomEvent struct { + ID string `json:"id"` +} + +func main() { + ctx, _ := signalTrap(context.Background(), os.Interrupt, syscall.SIGTERM) + ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + + // Use the default configuration + h := sse.New[CustomEvent](nil) + go func() { + for { + id := must(generateID[string]()) + evt := CustomEvent{ + ID: id, + } + fmt.Println("sse handler: broadcast event", h.Broadcast(evt)) + time.Sleep(64 * time.Millisecond) + } + }() + + // Start the server on port "3000" as non-blocking + go func() { + http.Handle("/events", h) + http.ListenAndServe(":3000", nil) + }() + + // Wait for either a termination signal or timeout of the context + <-ctx.Done() + + if err := h.Close(); err != nil { + fmt.Println("sse handler: server shutdown with error:", err) + os.Exit(1) + } + fmt.Println("sse handler: server shutdown gracefully") +} + +// Helpers + +func generateID[T ~string]() (T, error) { + b := make([]byte, 8) + if _, err := rand.Read(b); err != nil { + return "", fmt.Errorf("creating a new ID: %w", err) + } + return T(fmt.Sprintf("%x-%d", b, time.Now().UnixMilli())), nil +} + +func must[T any](res T, err error) T { + if err != nil { + panic(err) + } + return res +} + +func signalTrap(ctx context.Context, sig ...os.Signal) (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(ctx) + go func() { + signals := make(chan os.Signal, 1) + signal.Notify(signals, sig...) + select { + case <-ctx.Done(): + case <-signals: + } + cancel() + }() + return ctx, cancel +} diff --git a/examples/event_server/main.go b/examples/event_server/main.go new file mode 100644 index 0000000..d46c6eb --- /dev/null +++ b/examples/event_server/main.go @@ -0,0 +1,27 @@ +package main + +import ( + "fmt" + "math/rand/v2" + "net/http" + "time" + + "github.com/softwarespot/sse" +) + +func main() { + // Use the default configuration + h := sse.New[int64](nil) + defer h.Close() + + go func() { + for { + evt := rand.Int64N(1000) + fmt.Println("sse handler: broadcast event", h.Broadcast(evt)) + time.Sleep(64 * time.Millisecond) + } + }() + + http.Handle("/events", h) + http.ListenAndServe(":3000", nil) +} diff --git a/examples/ping_cmd_server/main.go b/examples/ping_cmd_server/main.go new file mode 100644 index 0000000..147ced8 --- /dev/null +++ b/examples/ping_cmd_server/main.go @@ -0,0 +1,162 @@ +package main + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "os" + "os/exec" + "os/signal" + "strings" + "sync" + "sync/atomic" + "syscall" + "time" + + "github.com/softwarespot/sse" +) + +type PingEvent struct { + Duration string `json:"duration"` + Host string `json:"host"` +} + +func main() { + ctx, _ := signalTrap(context.Background(), os.Interrupt, syscall.SIGTERM) + ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + + var wg sync.WaitGroup + s := http.Server{ + Addr: ":3000", + } + + // Start the server on port "3000" as non-blocking + go func() { + var connectedClients atomic.Int64 + http.HandleFunc("/state", func(w http.ResponseWriter, _ *http.Request) { + res := map[string]any{ + "clients": connectedClients.Load(), + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(res); err != nil { + fmt.Println("unable to encode the route /state JSON response. Error:", err) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } + }) + http.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) { + wg.Add(1) + defer wg.Done() + + connectedClients.Add(1) + defer connectedClients.Add(-1) + + queryParams := r.URL.Query() + if !queryParams.Has("host") { + http.Error(w, "missing host query param", http.StatusBadRequest) + return + } + host := queryParams.Get("host") + if host == "" { + http.Error(w, "empty host query param", http.StatusBadRequest) + return + } + + // Use the default configuration + h := sse.New[PingEvent](nil) + fmt.Println("sse handler: started handler") + + ctxRequest, requestCancel := context.WithCancelCause(r.Context()) + defer requestCancel(nil) + + go monitorPing(ctxRequest, requestCancel, h, host) + + fmt.Println("sse handler: client connected") + err := h.ServeSSE(w, r.WithContext(ctxRequest)) + fmt.Println("sse handler: client disconnected", err) + + fmt.Println("sse handler: closed handler", h.Close()) + }) + s.ListenAndServe() + }() + + // Wait for either a termination signal or timeout of the context + <-ctx.Done() + + fmt.Println("start server shutdown") + ctxShutdown, shutdownCancel := context.WithTimeout(ctx, 5*time.Second) + defer shutdownCancel() + + err := s.Shutdown(ctxShutdown) + if err != nil && !errors.Is(err, context.Canceled) { + fmt.Println("server shutdown with error:", err) + os.Exit(1) + } + + wg.Wait() + fmt.Println("server shutdown gracefully") +} + +func monitorPing(ctx context.Context, cancel context.CancelCauseFunc, h *sse.Handler[PingEvent], host string) { + defer cancel(nil) + + cmd := exec.CommandContext(ctx, "ping", host) + stdout, err := cmd.StdoutPipe() + if err != nil { + cancel(fmt.Errorf("unable to connect to ping stdout pipe: %w", err)) + return + } + if err := cmd.Start(); err != nil { + cancel(fmt.Errorf("unable to start ping command: %w", err)) + return + } + + fmt.Println("sse handler: strated ping command with PID:", cmd.Process.Pid) + + scanner := bufio.NewScanner(stdout) + for scanner.Scan() { + duration := parsePingDuration(scanner.Text()) + if duration == "" { + continue + } + + evt := PingEvent{ + Duration: duration, + Host: host, + } + if err := h.Broadcast(evt); err != nil { + cancel(fmt.Errorf("unable to broadcast ping command result: %w", err)) + return + } + } + if err := cmd.Wait(); err != nil { + cancel(fmt.Errorf("unable to wait for ping command: %w", err)) + } +} + +func parsePingDuration(res string) string { + _, after, ok := strings.Cut(res, "time=") + if !ok { + return "" + } + return strings.ReplaceAll(after, " ", "") +} + +func signalTrap(ctx context.Context, sig ...os.Signal) (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(ctx) + go func() { + signals := make(chan os.Signal, 1) + signal.Notify(signals, sig...) + select { + case <-ctx.Done(): + case <-signals: + } + cancel() + }() + return ctx, cancel +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..83ee05d --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module github.com/softwarespot/sse + +go 1.23.0 + +require github.com/softwarespot/replay v1.0.0 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..9409356 --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/softwarespot/replay v1.0.0 h1:z8TI1hR2pZcD/QbWCRAjV3LloxMVOnJRBdIOGDUZFms= +github.com/softwarespot/replay v1.0.0/go.mod h1:MtkQWeeo8y3Uv1OPfbFSJshR9bacj5Uhl6oQuyUu54Y= diff --git a/handler.go b/handler.go new file mode 100644 index 0000000..08e0ad5 --- /dev/null +++ b/handler.go @@ -0,0 +1,243 @@ +package sse + +import ( + "encoding/json" + "errors" + "fmt" + "iter" + "net/http" + "time" + + "github.com/softwarespot/replay" +) + +type empty struct{} + +// Handler is a generic Server-Sent Events (SSE) handler. +type Handler[T any] struct { + cfg *Config[T] + + closingCh chan empty + completeCh chan empty + + clientRegisterCh chan chan []T + clientUnregisterCh chan chan []T + clientEvtsChs map[chan []T]empty + + evtsReplay *replay.Replay[T] + evtsCh chan T + + evtsEncoder func([]T) ([]byte, error) +} + +// New initializes a Server-Sent Events (SSE) handler, with an optional configuration. +// If the provided configuration is nil, it will use default settings. +func New[T any](cfg *Config[T]) *Handler[T] { + if cfg == nil { + cfg = NewConfig[T]() + } + h := &Handler[T]{ + cfg: cfg, + + closingCh: make(chan empty), + completeCh: make(chan empty), + + clientRegisterCh: make(chan chan []T), + clientUnregisterCh: make(chan chan []T), + clientEvtsChs: map[chan []T]empty{}, + + evtsReplay: replay.New[T](cfg.Replay.Maximum, cfg.Replay.Expiry), + evtsCh: make(chan T), + + evtsEncoder: defaultEventsEncoder[T], + } + if h.cfg.Encoder != nil { + h.evtsEncoder = h.cfg.Encoder + } + + go h.start() + + return h +} + +func (h *Handler[T]) start() { + flushTicker := time.NewTicker(h.cfg.FlushFrequency) + defer flushTicker.Stop() + + var ( + isClosing bool + cleanup = func() bool { + isCleanable := isClosing && len(h.clientEvtsChs) == 0 + if !isCleanable { + return false + } + + close(h.clientRegisterCh) + close(h.clientUnregisterCh) + close(h.evtsCh) + close(h.completeCh) + return true + } + evts []T + ) + for { + select { + case <-h.closingCh: + isClosing = true + if cleanup() { + return + } + case clientEvtsCh := <-h.clientRegisterCh: + h.clientEvtsChs[clientEvtsCh] = empty{} + for evts := range h.replayedEvents() { + clientEvtsCh <- evts + } + case clientEvtsCh := <-h.clientUnregisterCh: + close(clientEvtsCh) + delete(h.clientEvtsChs, clientEvtsCh) + + if cleanup() { + return + } + case evt := <-h.evtsCh: + evts = append(evts, evt) + case <-flushTicker.C: + if len(evts) == 0 { + break + } + for clientEvtsCh := range h.clientEvtsChs { + clientEvtsCh <- evts + } + for _, evt := range evts { + h.evtsReplay.Add(evt) + } + evts = nil + } + } +} + +// Close closes the Server-Sent Events (SSE) handler. +// It waits for all clients to complete/close, with a timeout defined in the configuration. +func (h *Handler[T]) Close() error { + if h.isClosing() { + return errors.New("sse-handler: handler is closed") + } + + h.closingCh <- empty{} + close(h.closingCh) + + // Wait for all clients to complete/close with a timeout + select { + case <-h.completeCh: + h.evtsReplay.Clear() + return nil + case <-time.After(h.cfg.CloseTimeout): + return errors.New("sse-handler: timeout waiting for clients to close") + } +} + +func (h *Handler[T]) isClosing() bool { + select { + case <-h.closingCh: + return true + case <-h.completeCh: + return true + default: + return false + } +} + +// ServeSSE serves the Server-Sent Events (SSE) to the HTTP response writer. +// It sets the appropriate headers and streams events to the client until the connection is closed. +func (h *Handler[T]) ServeSSE(w http.ResponseWriter, r *http.Request) error { + flusher, ok := w.(http.Flusher) + if !ok { + return errors.New("sse-handler: not supported") + } + if h.isClosing() { + return errors.New("sse-handler: handler is closed") + } + + hdrs := w.Header() + hdrs.Set("Content-Type", "text/event-stream") + hdrs.Set("Cache-Control", "no-cache") + hdrs.Set("Connection", "keep-alive") + + clientEvtsCh := h.register() + defer h.unregister(clientEvtsCh) + + for { + select { + case <-r.Context().Done(): + return nil + case <-h.closingCh: + return nil + case evts := <-clientEvtsCh: + data, err := h.evtsEncoder(evts) + if err != nil { + return err + } + if _, err := fmt.Fprintf(w, "data: %s\n\n", data); err != nil { + return fmt.Errorf("sse-handler: unable to write events: %w", err) + } + flusher.Flush() + } + } +} + +// ServeHTTP implements the http.Handler interface for the SSE handler. +// It calls ServeSSE and handles any errors by writing an HTTP error response. +func (h *Handler[T]) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if err := h.ServeSSE(w, r); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} + +func (h *Handler[T]) register() chan []T { + clientEvtsCh := make(chan []T) + h.clientRegisterCh <- clientEvtsCh + return clientEvtsCh +} + +func (h *Handler[T]) unregister(clientEvtsCh chan []T) { + h.clientUnregisterCh <- clientEvtsCh +} + +func (h *Handler[T]) replayedEvents() iter.Seq[[]T] { + return func(yield func([]T) bool) { + chunk := make([]T, 0, h.cfg.Replay.Initial) + for evt := range h.evtsReplay.All() { + chunk = append(chunk, evt) + if len(chunk) == h.cfg.Replay.Initial { + if !yield(chunk) { + return + } + + // Reset the underlying array, so the length is 0 + chunk = chunk[:0] + } + } + if len(chunk) > 0 { + yield(chunk) + } + } +} + +// Broadcast sends an event to all connected clients. +// It returns an error if the handler is closed. +func (h *Handler[T]) Broadcast(evt T) error { + if h.isClosing() { + return errors.New("sse-handler: handler is closed") + } + + h.evtsCh <- evt + return nil +} + +func defaultEventsEncoder[T any](evts []T) ([]byte, error) { + b, err := json.Marshal(evts) + if err != nil { + return nil, fmt.Errorf("sse-handler: unable to encode events: %w", err) + } + return b, nil +} diff --git a/handler_example_test.go b/handler_example_test.go new file mode 100644 index 0000000..3489504 --- /dev/null +++ b/handler_example_test.go @@ -0,0 +1,27 @@ +package sse_test + +import ( + "fmt" + "net/http" + "time" + + "github.com/softwarespot/sse" +) + +func ExampleNew() { + // Use the default configuration + h := sse.New[int](nil) + defer h.Close() + + go func() { + var evt int + for { + fmt.Println("sse handler: broadcast event", h.Broadcast(evt)) + evt++ + time.Sleep(64 * time.Millisecond) + } + }() + + http.Handle("/events", h) + http.ListenAndServe(":3000", nil) +} diff --git a/handler_test.go b/handler_test.go new file mode 100644 index 0000000..50646fc --- /dev/null +++ b/handler_test.go @@ -0,0 +1,77 @@ +package sse_test + +import ( + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/softwarespot/sse" +) + +func Test_Handler(t *testing.T) { + cfg := sse.NewConfig[string]() + cfg.FlushFrequency = 128 * time.Millisecond + + h := sse.New(cfg) + + w1 := httptest.NewRecorder() + r1 := httptest.NewRequest("GET", "/events", nil) + go h.ServeHTTP(w1, r1) + + // Should send events to the connected client + var ( + event1 = "Event 1" + event2 = "Event 2" + event3 = "Event 3" + ) + assertBroadcastEvents(t, h, event1) + assertBroadcastEvents(t, h, event2, event3) + + res1 := w1.Result() + defer res1.Body.Close() + + assertEqual(t, res1.StatusCode, http.StatusOK) + + b1, err := io.ReadAll(res1.Body) + assertNoError(t, err) + + wantRes1 := `data: ["Event 1"]` + "\n\n" + `data: ["Event 2","Event 3"]` + "\n\n" + assertEqual(t, string(b1), wantRes1) + + // Should replay the events for a connected client + + w2 := httptest.NewRecorder() + r2 := httptest.NewRequest("GET", "/events", nil) + go h.ServeHTTP(w2, r2) + + // Wait for the event(s) to be flushed + time.Sleep(256 * time.Millisecond) + + res2 := w2.Result() + defer res2.Body.Close() + + assertEqual(t, res2.StatusCode, http.StatusOK) + b2, err := io.ReadAll(res2.Body) + assertNoError(t, err) + + wantRes2 := `data: ["Event 1","Event 2","Event 3"]` + "\n\n" + assertEqual(t, string(b2), wantRes2) + + assertNoError(t, h.Close()) + + // Should return error when the handler is closed + assertError(t, h.Broadcast(event3)) + assertError(t, h.Close()) +} + +func assertBroadcastEvents[T any](t testing.TB, h *sse.Handler[T], evts ...T) { + t.Helper() + for _, evt := range evts { + assertNoError(t, h.Broadcast(evt)) + } + + // Wait for the event(s) to be flushed + time.Sleep(256 * time.Millisecond) +} diff --git a/helpers_test.go b/helpers_test.go new file mode 100644 index 0000000..0441ffd --- /dev/null +++ b/helpers_test.go @@ -0,0 +1,30 @@ +package sse_test + +import ( + "reflect" + "testing" +) + +// assertEqual checks if two values are equal. If they are not, it logs using t.Fatalf() +func assertEqual[T any](t testing.TB, got, correct T) { + t.Helper() + if !reflect.DeepEqual(got, correct) { + t.Fatalf("assertEqual: expected values to be equal, got:\n%+v\ncorrect:\n%+v", got, correct) + } +} + +// assertError checks if an error is not nil. If it's nil, it logs using t.Fatalf() +func assertError(t testing.TB, err error) { + t.Helper() + if err == nil { + t.Fatalf("assertError: expected an error, got nil") + } +} + +// assertNoError checks if an error is nil. If it's not nil, it logs using t.Fatalf() +func assertNoError(t testing.TB, err error) { + t.Helper() + if err != nil { + t.Fatalf("assertNoError: expected no error, got %+v", err) + } +}