Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding support to fetch WATER WASM files #628

Merged
merged 16 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions http-proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,11 @@ var (

algenevaAddr = flag.String("algeneva-addr", "", "Address at which to listen for algenAddr connections.")

waterAddr = flag.String("water-addr", "", "Address at which to listen for WATER connections.")
waterWASM = flag.String("water-wasm", "", "Base64 encoded WASM for WATER")
waterTransport = flag.String("water-transport", "", "WATER based transport name")
waterAddr = flag.String("water-addr", "", "Address at which to listen for WATER connections.")
waterWASM = flag.String("water-wasm", "", "Base64 encoded WASM for WATER")
waterWASMAvailableAt = flag.String("water-wasm-available-at", "", "URLs where the WATER WASM is available")
waterWASMHashsum = flag.String("water-wasm-hashsum", "", "Hashsum of the WATER WASM")
waterTransport = flag.String("water-transport", "", "WATER based transport name")

track = flag.String("track", "", "The track this proxy is running on")
)
Expand Down Expand Up @@ -473,6 +475,8 @@ func main() {
AlgenevaAddr: *algenevaAddr,
WaterAddr: *waterAddr,
WaterWASM: *waterWASM,
WaterWASMAvailableAt: *waterWASMAvailableAt,
WaterWASMHashsum: *waterWASMHashsum,
WaterTransport: *waterTransport,
}
if *maxmindLicenseKey != "" {
Expand Down
39 changes: 35 additions & 4 deletions http_proxy.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package proxy

import (
"bytes"
"context"
"crypto/tls"
"encoding/base64"
"encoding/json"
"fmt"
"net"
Expand Down Expand Up @@ -185,9 +187,12 @@ type Proxy struct {

AlgenevaAddr string

WaterAddr string
WaterWASM string
WaterTransport string
// deprecated: use WaterWASMAvailableAt
WaterWASM string
WaterWASMAvailableAt string
WaterWASMHashsum string
WendelHime marked this conversation as resolved.
Show resolved Hide resolved
WaterTransport string
WaterAddr string

throttleConfig throttle.Config
instrument instrument.Instrument
Expand Down Expand Up @@ -988,8 +993,34 @@ func (p *Proxy) listenAlgeneva(baseListen func(string) (net.Listener, error)) li
// Currently water doesn't support customized TCP connections and we need to listen and receive requests directly from the WATER listener
func (p *Proxy) listenWATER(addr string) (net.Listener, error) {
ctx := context.Background()
waterListener, err := water.NewWATERListener(ctx, p.WaterTransport, addr, p.WaterWASM)
var wasm []byte
if p.WaterWASM != "" {
var err error
wasm, err = base64.StdEncoding.DecodeString(p.WaterWASM)
if err != nil {
log.Errorf("failed to decode WASM base64: %v", err)
return nil, err
}
}

if p.WaterWASMAvailableAt != "" {
wasmBuffer := new(bytes.Buffer)
d, err := water.NewWASMDownloader(strings.Split(p.WaterWASMAvailableAt, ","), &http.Client{Timeout: 1 * time.Minute})
if err != nil {
return nil, log.Errorf("failed to create wasm downloader: %w", err)
}

err = d.DownloadWASM(ctx, wasmBuffer)
if err != nil {
return nil, log.Errorf("unable to download water wasm: %w", err)
}
wasm = wasmBuffer.Bytes()
}

// currently the WATER listener doesn't accept a multiplexed connections, so we need to listen and accept connections directly from the listener
waterListener, err := water.NewWATERListener(ctx, nil, p.WaterTransport, addr, wasm)
if err != nil {
log.Errorf("failed to starte WATER listener: %w", err)
return nil, err
}

Expand Down
75 changes: 75 additions & 0 deletions water/downloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package water

import (
"bytes"
"context"
"errors"
"io"
"net/http"
"strings"
)

//go:generate mockgen -package=water -destination=mocks_test.go . WASMDownloader

type WASMDownloader interface {
DownloadWASM(context.Context, io.Writer) error
}

type downloader struct {
urls []string
httpClient *http.Client
httpDownloader WASMDownloader
}

// NewWASMDownloader creates a new WASMDownloader instance.
func NewWASMDownloader(urls []string, client *http.Client) (WASMDownloader, error) {
if len(urls) == 0 {
return nil, log.Error("WASM downloader requires URLs to download but received empty list")
}
return &downloader{
urls: urls,
httpClient: client,
}, nil
}

// DownloadWASM downloads the WASM file from the given URLs, verifies the hash
// sum and writes the file to the given writer.
func (d *downloader) DownloadWASM(ctx context.Context, w io.Writer) error {
joinedErrs := errors.New("failed to download WASM from all URLs")
for _, url := range d.urls {
WendelHime marked this conversation as resolved.
Show resolved Hide resolved
if strings.HasPrefix(url, "magnet:?") {
// Skip magnet links for now
joinedErrs = errors.Join(joinedErrs, errors.New("magnet links are not supported"))
continue
}
tempBuffer := &bytes.Buffer{}
err := d.downloadWASM(ctx, tempBuffer, url)
if err != nil {
joinedErrs = errors.Join(joinedErrs, err)
continue
}

_, err = tempBuffer.WriteTo(w)
if err != nil {
joinedErrs = errors.Join(joinedErrs, err)
continue
}

return nil
}
return joinedErrs
}

// downloadWASM checks what kind of URL was given and downloads the WASM file
// from the URL. It can be a HTTPS URL or a magnet link.
func (d *downloader) downloadWASM(ctx context.Context, w io.Writer, url string) error {
switch {
case strings.HasPrefix(url, "http://"), strings.HasPrefix(url, "https://"):
if d.httpDownloader == nil {
d.httpDownloader = NewHTTPSDownloader(d.httpClient, url)
}
return d.httpDownloader.DownloadWASM(ctx, w)
default:
return log.Errorf("unsupported protocol: %s", url)
}
}
143 changes: 143 additions & 0 deletions water/downloader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package water

import (
"bytes"
"context"
"io"
"net/http"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
gomock "go.uber.org/mock/gomock"
)

func TestNewWASMDownloader(t *testing.T) {
var tests = []struct {
name string
givenURLs []string
givenHTTPClient *http.Client
assert func(*testing.T, WASMDownloader, error)
}{
{
name: "it should return an error when providing an empty list of URLs",
assert: func(t *testing.T, d WASMDownloader, err error) {
assert.Error(t, err)
assert.Nil(t, d)
},
},
{
name: "it should successfully return a wasm downloader",
givenURLs: []string{"http://example.com"},
givenHTTPClient: http.DefaultClient,
assert: func(t *testing.T, wDownloader WASMDownloader, err error) {
assert.NoError(t, err)
d := wDownloader.(*downloader)
assert.Equal(t, []string{"http://example.com"}, d.urls)
assert.Equal(t, http.DefaultClient, d.httpClient)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d, err := NewWASMDownloader(tt.givenURLs, tt.givenHTTPClient)
tt.assert(t, d, err)
})
}
}

func TestDownloadWASM(t *testing.T) {
ctx := context.Background()

contentMessage := "content"
var tests = []struct {
name string
givenHTTPClient *http.Client
givenURLs []string
givenWriter io.Writer
setupHTTPDownloader func(ctrl *gomock.Controller) WASMDownloader
assert func(*testing.T, io.Reader, error)
}{
{
name: "it should return an error telling magnet links are not supported",
givenURLs: []string{"magnet:?"},
assert: func(t *testing.T, r io.Reader, err error) {
b, berr := io.ReadAll(r)
require.NoError(t, berr)
assert.Empty(t, b)
assert.Error(t, err)
assert.ErrorContains(t, err, "magnet links are not supported")
},
},
{
name: "it should return an unupported protocol error when we provide an URL with not implemented downloader",
givenURLs: []string{
"udp://example.com",
},
assert: func(t *testing.T, r io.Reader, err error) {
b, berr := io.ReadAll(r)
require.NoError(t, berr)
assert.Empty(t, b)
assert.Error(t, err)
assert.ErrorContains(t, err, "unsupported protocol")
},
},
{
name: "it should return an error with the HTTP error",
givenURLs: []string{
"http://example.com",
},
setupHTTPDownloader: func(ctrl *gomock.Controller) WASMDownloader {
httpDownloader := NewMockWASMDownloader(ctrl)
httpDownloader.EXPECT().DownloadWASM(ctx, gomock.Any()).Return(assert.AnError)
return httpDownloader
},
assert: func(t *testing.T, r io.Reader, err error) {
b, berr := io.ReadAll(r)
require.NoError(t, berr)
assert.Empty(t, b)
assert.Error(t, err)
assert.ErrorContains(t, err, assert.AnError.Error())
assert.ErrorContains(t, err, "failed to download WASM from all URLs")
},
},
{
name: "it should return an io.Reader with the expected content",
givenURLs: []string{
"http://example.com",
},
setupHTTPDownloader: func(ctrl *gomock.Controller) WASMDownloader {
httpDownloader := NewMockWASMDownloader(ctrl)
httpDownloader.EXPECT().DownloadWASM(ctx, gomock.Any()).DoAndReturn(
func(ctx context.Context, w io.Writer) error {
_, err := w.Write([]byte(contentMessage))
return err
})
return httpDownloader
},
assert: func(t *testing.T, r io.Reader, err error) {
b, berr := io.ReadAll(r)
require.NoError(t, berr)
assert.NoError(t, err)
assert.Equal(t, contentMessage, string(b))
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var httpDownloader WASMDownloader
if tt.setupHTTPDownloader != nil {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
httpDownloader = tt.setupHTTPDownloader(ctrl)
}

b := &bytes.Buffer{}
wDownloader, err := NewWASMDownloader(tt.givenURLs, tt.givenHTTPClient)
require.NoError(t, err)
wDownloader.(*downloader).httpDownloader = httpDownloader
err = wDownloader.DownloadWASM(ctx, b)
tt.assert(t, b, err)
})
}
}
44 changes: 44 additions & 0 deletions water/https_downloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package water

import (
"context"
"fmt"
"io"
"net/http"
)

type httpsDownloader struct {
cli *http.Client
url string
}

func NewHTTPSDownloader(client *http.Client, url string) WASMDownloader {
return &httpsDownloader{cli: client, url: url}
}

func (d *httpsDownloader) DownloadWASM(ctx context.Context, w io.Writer) error {
if d.cli == nil {
d.cli = http.DefaultClient
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, d.url, http.NoBody)
if err != nil {
return fmt.Errorf("failed to create a new HTTP request: %w", err)
}
resp, err := d.cli.Do(req)
if err != nil {
return fmt.Errorf("failed to send a HTTP request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to download WASM file: %s", resp.Status)
}

_, err = io.Copy(w, resp.Body)
if err != nil {
return fmt.Errorf("failed to write the WASM file: %w", err)
}

return nil
}
Loading
Loading