Skip to content

Commit

Permalink
feat: add zip file func to simplify working with gzip encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
jsteenb2 committed Jun 14, 2024
1 parent 4c712d2 commit e57c4f7
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 25 deletions.
98 changes: 98 additions & 0 deletions file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package fdk

import (
"compress/gzip"
"encoding/json"
"errors"
"io"
"sync"
"sync/atomic"
)

// File represents a response that is a response body. The runner is in charge
// of getting the contents to the destination. The metadata will be received.
type File struct {
ContentType string `json:"content_type"`
Encoding string `json:"encoding"`
Filename string `json:"filename"`
Contents io.ReadCloser `json:"-"`
}

// MarshalJSON marshals the file metadata.
func (f File) MarshalJSON() ([]byte, error) {
type alias File
return json.Marshal(alias(f))
}

// FileGZip writes the
func FileGZip(filename, contentType string, contents io.ReadCloser) File {
return File{
ContentType: contentType,
Encoding: "gzip",
Filename: filename,
Contents: newCompressorGZIP(contents),
}
}

type compressorGZIP struct {
rc io.ReadCloser
pr *io.PipeReader

pwClosed bool
pw *io.PipeWriter
gwClosed bool
gw *gzip.Writer

mu sync.Mutex
started atomic.Int32
closeErr error
copyErr error
}

func newCompressorGZIP(rc io.ReadCloser) *compressorGZIP {
pr, pw := io.Pipe()
return &compressorGZIP{
rc: rc,
pw: pw,
pr: pr,
gw: gzip.NewWriter(pw),
}
}

func (c *compressorGZIP) Read(p []byte) (int, error) {
if c.started.CompareAndSwap(0, 1) {
go c.compressInput()
}
return c.pr.Read(p)
}

func (c *compressorGZIP) compressInput() {
defer func() {
c.gwClosed, c.pwClosed = true, true
err := c.pw.CloseWithError(c.gw.Close())
c.mu.Lock()
c.closeErr = err
c.mu.Unlock()
}()
if _, err := io.Copy(c.gw, c.rc); err != nil && !errors.Is(err, io.EOF) {
c.mu.Lock()
c.copyErr = err
c.mu.Unlock()
}
}

func (c *compressorGZIP) Close() error {
c.mu.Lock()
errs := []error{c.closeErr, c.copyErr}
c.mu.Unlock()
if !c.gwClosed {
errs = append(errs, c.gw.Close())
}
if !c.pwClosed {
errs = append(errs, c.pw.Close())
}

errs = append(errs, c.rc.Close(), c.pr.Close())

return errors.Join(errs...)
}
16 changes: 13 additions & 3 deletions runner_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (r *runnerHTTP) Run(ctx context.Context, logger *slog.Logger, h Handler) {
resp := h.Handle(ctx, r)

if f, ok := resp.Body.(File); ok {
err := writeFile(f.Contents, f.Filename)
err := writeFile(logger, f.Contents, f.Filename)
if err != nil {
resp.Errors = append(resp.Errors, APIError{Code: http.StatusInternalServerError, Message: err.Error()})
writeErr := writeResponse(logger, w, resp)
Expand Down Expand Up @@ -155,18 +155,28 @@ func writeResponse(logger *slog.Logger, w http.ResponseWriter, resp Response) er
return err
}

func writeFile(r io.Reader, filename string) error {
func writeFile(logger *slog.Logger, r io.ReadCloser, filename string) error {
f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer func() { _ = f.Close() }() // just in case
defer func() {
// just in case
_ = f.Close()
_ = r.Close()
}()

_, err = io.Copy(f, r)
if err != nil {
return fmt.Errorf("failed to write contents to file: %w", err)
}

err = r.Close()
if err != nil {
// we swallow the error here, there's nothing we can do about it...
logger.Error("failed to close file contents", "err", err)
}

err = f.Close()
if err != nil {
return fmt.Errorf("failed to close file: %w", err)
Expand Down
74 changes: 68 additions & 6 deletions runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fdk_test

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"errors"
Expand Down Expand Up @@ -748,7 +749,7 @@ integer: 1`,
},
},
{
name: "POST to a endpoint that returns a sdk.File with encoding should pass",
name: "POST to an endpoint that returns a sdk.File with encoding should pass",
input: inputs{
body: newReqBody(t, fileInReq{
ContentType: "application/json",
Expand Down Expand Up @@ -777,10 +778,37 @@ integer: 1`,
equalFiles(t, got.Filename, `{"dodgers":"stink"}`)
},
},
{
name: "POST to an endpoint that returns a gzip compressed sdk.File should pass",
input: inputs{
body: newReqBody(t, fileInReq{
ContentType: "application/json",
DestFilename: filepath.Join(tmp, "third_file.json"),
V: `{"dodgers":"reallystank"}`,
}),
method: "POST",
path: "/compress-file",
},
newHandlerFn: func(ctx context.Context) fdk.Handler {
m := fdk.NewMux()
m.Post("/compress-file", fdk.HandleFnOf(newGzippedFileHandler))
return m
},
want: func(t *testing.T, resp *http.Response, got fdk.File) {
equalVals(t, 201, resp.StatusCode)

equalVals(t, "application/json", got.ContentType)
equalVals(t, "gzip", got.Encoding)

wantFilename := filepath.Join(tmp, "third_file.json")
equalVals(t, wantFilename, got.Filename)
equalGzipFiles(t, got.Filename, `{"dodgers":"reallystank"}`)
},
},
}

for _, tt := range tests {
fn := func(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -814,8 +842,7 @@ integer: 1`,
mustNoErr(t, json.Unmarshal(b, &got))

tt.want(t, resp, got.File)
}
t.Run(tt.name, fn)
})
}
})
}
Expand Down Expand Up @@ -928,11 +955,18 @@ func newFileHandler(_ context.Context, r fdk.RequestOf[fileInReq]) fdk.Response
ContentType: r.Body.ContentType,
Encoding: r.Body.Encoding,
Filename: r.Body.DestFilename,
Contents: strings.NewReader(r.Body.V),
Contents: io.NopCloser(strings.NewReader(r.Body.V)),
},
}
}

func newGzippedFileHandler(_ context.Context, r fdk.RequestOf[fileInReq]) fdk.Response {
return fdk.Response{
Code: 201,
Body: fdk.FileGZip(r.Body.DestFilename, r.Body.ContentType, io.NopCloser(strings.NewReader(r.Body.V))),
}
}

func equalVals[T comparable](t testing.TB, want, got T) bool {
t.Helper()

Expand All @@ -946,7 +980,35 @@ func equalVals[T comparable](t testing.TB, want, got T) bool {
func equalFiles(t testing.TB, filename string, want string) {
t.Helper()

b, err := os.ReadFile(filename)
f, err := os.Open(filename)
mustNoErr(t, err)
defer func() { _ = f.Close() }()

equalReader(t, want, f)

err = f.Close()
if err != nil {
t.Errorf("failed to close file: " + err.Error())
}
}

func equalGzipFiles(t testing.TB, filename string, want string) {
t.Helper()

f, err := os.Open(filename)
mustNoErr(t, err)
defer func() { _ = f.Close() }()

gr, err := gzip.NewReader(f)
mustNoErr(t, err)

equalReader(t, want, gr)
}

func equalReader(t testing.TB, want string, got io.Reader) {
t.Helper()

b, err := io.ReadAll(got)
mustNoErr(t, err)

equalVals(t, want, string(b))
Expand Down
16 changes: 0 additions & 16 deletions sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"net/url"
Expand Down Expand Up @@ -198,21 +197,6 @@ func (j jsoned) MarshalJSON() ([]byte, error) {
return json.Marshal(j.v)
}

// File represents a response that is a response body. The runner is in charge
// of getting the contents to the destination. The metadata will be received.
type File struct {
ContentType string `json:"content_type"`
Encoding string `json:"encoding"`
Filename string `json:"filename"`
Contents io.ReadSeeker `json:"-"`
}

// MarshalJSON marshals the file metadata.
func (f File) MarshalJSON() ([]byte, error) {
type alias File
return json.Marshal(alias(f))
}

// ErrHandler creates a new handler to respond with only errors.
func ErrHandler(errs ...APIError) Handler {
return HandlerFn(func(ctx context.Context, r Request) Response {
Expand Down

0 comments on commit e57c4f7

Please sign in to comment.