Skip to content

Commit

Permalink
feat: update Request to take stream instead of json.RawMessage
Browse files Browse the repository at this point in the history
this enables us to support a streaming API :yaaaas:. With
this we can enable our entire system to work with a streamed
input. Locally, that means working with a multipart/form body
file and the meta field.
  • Loading branch information
jsteenb2 committed Aug 15, 2024
1 parent 9d21731 commit 8783e1b
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 31 deletions.
2 changes: 1 addition & 1 deletion handler_fns.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (h HandlerFn) Handle(ctx context.Context, r Request) Response {
func HandleFnOf[T any](fn func(context.Context, RequestOf[T]) Response) Handler {
return HandlerFn(func(ctx context.Context, r Request) Response {
var v T
if err := json.Unmarshal(r.Body, &v); err != nil {
if err := json.NewDecoder(r.Body).Decode(&v); err != nil {
return Response{Errors: []APIError{{Code: http.StatusBadRequest, Message: "failed to unmarshal payload: " + err.Error()}}}
}

Expand Down
5 changes: 3 additions & 2 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package fdk

import (
"encoding/json"
"io"
"net/http"
"net/url"
)

type (
// Request defines a request structure that is given to the runner. The Body is set to
// json.RawMessage, to enable decoration/middleware.
Request RequestOf[json.RawMessage]
// io.Reader, to enable decoration/middleware.
Request RequestOf[io.Reader]

// RequestOf provides a generic body we can target our unmarshaling into.
RequestOf[T any] struct {
Expand Down
105 changes: 81 additions & 24 deletions runner_http.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fdk

import (
"bytes"
"context"
"crypto/sha256"
"encoding/base64"
Expand All @@ -13,6 +14,7 @@ import (
"net/url"
"os"
"strconv"
"strings"
"time"
)

Expand All @@ -27,7 +29,16 @@ type runnerHTTP struct{}
func (r *runnerHTTP) Run(ctx context.Context, logger *slog.Logger, h Handler) {
mux := http.NewServeMux()
mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
r, err := toRequest(req)
defer func() {
if n, err := io.Copy(io.Discard, req.Body); err != nil {
logger.Error("failed to drain request body", "err", err.Error(), "bytes_drained", n)
}
if err := req.Body.Close(); err != nil {
logger.Error("failed to close request body", "err", err.Error())
}
}()

r, closeFn, err := toRequest(req)
if err != nil {
logger.Error("failed to create request", "err", err)
writeErr := writeResponse(logger, w, ErrResp(APIError{Code: http.StatusInternalServerError, Message: "unable to process incoming request"}))
Expand All @@ -36,6 +47,11 @@ func (r *runnerHTTP) Run(ctx context.Context, logger *slog.Logger, h Handler) {
}
return
}
defer func() {
if err := closeFn(); err != nil {
logger.Error("failed to close request body", "err", err.Error())
}
}()

const ctxKeyTraceID = "_traceid"
ctx = context.WithValue(ctx, ctxKeyTraceID, r.TraceID)
Expand Down Expand Up @@ -104,28 +120,15 @@ func (r *runnerHTTP) Run(ctx context.Context, logger *slog.Logger, h Handler) {
}
}

func toRequest(req *http.Request) (Request, error) {
var r struct {
FnID string `json:"fn_id"`
FnVersion int `json:"fn_version"`
Body json.RawMessage `json:"body"`
Context json.RawMessage `json:"context"`
AccessToken string `json:"access_token"`
Method string `json:"method"`
Params struct {
Header http.Header `json:"header"`
Query url.Values `json:"query"`
} `json:"params"`
URL string `json:"url"`
TraceID string `json:"trace_id"`
}
payload, err := io.ReadAll(io.LimitReader(req.Body, 5*mb))
if err != nil {
return Request{}, fmt.Errorf("failed to read request body: %s", err)
func toRequest(req *http.Request) (Request, func() error, error) {
fromFn := fromJSONReq
if strings.HasPrefix(req.Header.Get("Content-Type"), "multipart/form-data") {
fromFn = fromMultipartReq
}

if err = json.Unmarshal(payload, &r); err != nil {
return Request{}, fmt.Errorf("failed to unmarshal request body: %s", err)
r, body, err := fromFn(req)
if err != nil {
return Request{}, nil, fmt.Errorf("failed to prepare request: %w", err)
}

// Ensure headers are canonically formatted else header.Get("my-key") won't necessarily work.
Expand All @@ -137,10 +140,28 @@ func toRequest(req *http.Request) (Request, error) {
}
r.Params.Header = hCanon

out := Request{
return reqMetaToRequest(r, body), body.Close, nil
}

type reqMeta struct {
FnID string `json:"fn_id"`
FnVersion int `json:"fn_version"`
Context json.RawMessage `json:"context"`
AccessToken string `json:"access_token"`
Method string `json:"method"`
Params struct {
Header http.Header `json:"header"`
Query url.Values `json:"query"`
} `json:"params"`
URL string `json:"url"`
TraceID string `json:"trace_id"`
}

func reqMetaToRequest(r reqMeta, body io.Reader) Request {
return Request{
FnID: r.FnID,
FnVersion: r.FnVersion,
Body: r.Body,
Body: body,
Context: r.Context,
Params: struct {
Header http.Header
Expand All @@ -151,7 +172,43 @@ func toRequest(req *http.Request) (Request, error) {
TraceID: r.TraceID,
AccessToken: r.AccessToken,
}
return out, nil
}

func fromMultipartReq(req *http.Request) (reqMeta, io.ReadCloser, error) {
meta := req.FormValue("meta")
if meta == "" {
return reqMeta{}, nil, errors.New("no meta field provided in multipart form submission")
}

var reqFn reqMeta
err := json.Unmarshal([]byte(meta), &reqFn)
if err != nil {
return reqMeta{}, nil, fmt.Errorf("failed to json unmarshal meta from multipart field: %w", err)
}

body, _, err := req.FormFile("body")
if err != nil {
return reqMeta{}, nil, fmt.Errorf("failed to read multipart body form file: %w", err)
}

return reqFn, body, nil
}

func fromJSONReq(req *http.Request) (reqMeta, io.ReadCloser, error) {
var r struct {
reqMeta
Body json.RawMessage `json:"body"`
}
payload, err := io.ReadAll(io.LimitReader(req.Body, 5*mb))
if err != nil {
return reqMeta{}, nil, fmt.Errorf("failed to read request body: %s", err)
}

if err = json.Unmarshal(payload, &r); err != nil {
return reqMeta{}, nil, fmt.Errorf("failed to unmarshal request body: %s", err)
}

return r.reqMeta, io.NopCloser(bytes.NewReader(r.Body)), nil
}

func writeResponse(logger *slog.Logger, w http.ResponseWriter, resp Response) error {
Expand Down
8 changes: 5 additions & 3 deletions runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1108,18 +1108,20 @@ func newJSONBodyHandler(cfg config) fdk.Handler {
Message: "gots the error",
})
}
return fdk.HandleFnOf(func(ctx context.Context, r fdk.RequestOf[json.RawMessage]) fdk.Response {
return newEchoResp(ctx, cfg, fdk.Request(r))
return fdk.HandlerFn(func(ctx context.Context, r fdk.Request) fdk.Response {
return newEchoResp(ctx, cfg, r)
})
}

func newEchoResp(ctx context.Context, cfg config, r fdk.Request) fdk.Response {
traceID, _ := ctx.Value("_traceid").(string)
bodyB, _ := io.ReadAll(r.Body)
return fdk.Response{

Body: fdk.JSON(echoReq{
Config: cfg,
Req: echoInputs{
Body: r.Body,
Body: bodyB,
Context: r.Context,
Headers: r.Params.Header,
Queries: r.Params.Query,
Expand Down
3 changes: 2 additions & 1 deletion sdk_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fdk_test

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -36,7 +37,7 @@ func TestHandlerFnOfOK(t *testing.T) {
})
mustNoErr(t, err)

resp := h.Handle(context.TODO(), fdk.Request{Body: b})
resp := h.Handle(context.TODO(), fdk.Request{Body: bytes.NewReader(b)})
fdk.EqualVals(t, http.StatusInternalServerError, resp.Code)

wantErrs := []fdk.APIError{
Expand Down

0 comments on commit 8783e1b

Please sign in to comment.