Skip to content

Commit

Permalink
Large Segment Fetcher implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
sanzmauro committed Oct 24, 2024
1 parent cc49e5b commit e084fa5
Show file tree
Hide file tree
Showing 4 changed files with 1,852 additions and 0 deletions.
119 changes: 119 additions & 0 deletions splitio/proxy/service/api/large_segment_http_fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package api

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"

cmnConf "github.com/splitio/go-split-commons/v6/conf"
cmnDTOs "github.com/splitio/go-split-commons/v6/dtos"
cmnService "github.com/splitio/go-split-commons/v6/service"
cmnAPI "github.com/splitio/go-split-commons/v6/service/api"
"github.com/splitio/go-toolkit/v5/logging"
"github.com/splitio/split-synchronizer/v5/splitio/proxy/service/dtos"
)

var MEM_VERSION_10 = "1.0"

const (
// Unknown format
Unknown = iota
// Csv format
Csv
)

type LargeSegmentFetcher interface {
Fetch(name string, fetchOptions *cmnService.SegmentRequestParams) (*dtos.LargeSegmentDTO, error)
}

type HTTPLargeSegmentFetcher struct {
client cmnAPI.Client
logger logging.LoggerInterface
memVersion *string
httpClient *http.Client
}

// NewHTTPLargeSegmentsFetcher
func NewHTTPLargeSegmentFetcher(apikey string, cfg cmnConf.AdvancedConfig, logger logging.LoggerInterface, metadata cmnDTOs.Metadata) LargeSegmentFetcher {
return &HTTPLargeSegmentFetcher{
client: cmnAPI.NewHTTPClient(apikey, cfg, cfg.SdkURL, logger, metadata),
logger: logger,
memVersion: &MEM_VERSION_10,
httpClient: &http.Client{},
}
}

func (f *HTTPLargeSegmentFetcher) Fetch(name string, fetchOptions *cmnService.SegmentRequestParams) (*dtos.LargeSegmentDTO, error) {
var bufferQuery bytes.Buffer
bufferQuery.WriteString("/proxy/largeSegment/")
bufferQuery.WriteString(name)

data, err := f.client.Get(bufferQuery.String(), fetchOptions)
if err != nil {
f.logger.Error(err.Error())
return nil, err
}

var rfeDTO dtos.RfeDTO
err = json.Unmarshal(data, &rfeDTO)
if err != nil {
f.logger.Error("Error getting Request for Export: ", name, err)
return nil, err
}

keys, err := f.downloadAndParse(rfeDTO)
if err != nil {
return nil, err
}

return &dtos.LargeSegmentDTO{
Name: name,
Keys: keys,
}, nil
}

func (f *HTTPLargeSegmentFetcher) downloadAndParse(rfe dtos.RfeDTO) ([]string, error) {
method := rfe.Params.Method
if len(method) == 0 {
method = http.MethodGet
}

req, _ := http.NewRequest(method, rfe.Params.URL, nil)
req.Header = rfe.Params.Headers
response, err := f.httpClient.Do(req)
if err != nil {
return nil, err
}
fmt.Println(response.StatusCode)
if response.StatusCode < 200 || response.StatusCode >= 300 {
return nil, cmnDTOs.HTTPError{
Code: response.StatusCode,
Message: response.Status,
}
}
defer response.Body.Close()

switch rfe.Format {
case Csv:
body, err := io.ReadAll(response.Body)
if err != nil {
return nil, err
}

return csv(rfe.Version, body)
default:
return nil, fmt.Errorf("unsupported file format")
}
}

func csv(version string, body []byte) ([]string, error) {
switch version {
case MEM_VERSION_10:
return strings.Split(string(body), "\n"), nil
default:
return nil, fmt.Errorf("unsupported csv version %s", version)
}
}
209 changes: 209 additions & 0 deletions splitio/proxy/service/api/large_segment_http_fetcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package api

import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"

"os"

cmnConf "github.com/splitio/go-split-commons/v6/conf"
cmnDTOs "github.com/splitio/go-split-commons/v6/dtos"
cmnService "github.com/splitio/go-split-commons/v6/service"
"github.com/splitio/go-toolkit/v5/logging"
"github.com/splitio/split-synchronizer/v5/splitio/proxy/service/dtos"
"github.com/stretchr/testify/assert"
)

func TestFetchCsvFormat(t *testing.T) {
logger := logging.NewLogger(&logging.LoggerOptions{})

test_csv, _ := os.ReadFile("testdata/large_segment_test.csv")
fileServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write(test_csv)
}))
defer fileServer.Close()

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data, _ := json.Marshal(dtos.RfeDTO{
Params: dtos.ParamsDTO{
Method: "GET",
URL: fileServer.URL,
},
Format: Csv,
TotalKeys: 1500,
Size: 100,
ChangeNumber: 100,
Name: "large_segment_test",
Version: "1.0",
})
w.Write(data)
}))
defer ts.Close()

fetcher := NewHTTPLargeSegmentFetcher(
"api-key",
cmnConf.AdvancedConfig{
EventsURL: ts.URL,
SdkURL: ts.URL,
},
logger,
cmnDTOs.Metadata{},
)

lsData, err := fetcher.Fetch("large_segment_test", &cmnService.SegmentRequestParams{})
if err != nil {
t.Error(err)
}

assert.Equal(t, "large_segment_test", lsData.Name)
assert.Equal(t, 1500, len(lsData.Keys))
}

func TestFetchCsvFormatWithOtherVersion(t *testing.T) {
logger := logging.NewLogger(&logging.LoggerOptions{})

test_csv, _ := os.ReadFile("testdata/large_segment_test.csv")
fileServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write(test_csv)
}))
defer fileServer.Close()

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data, _ := json.Marshal(dtos.RfeDTO{
Params: dtos.ParamsDTO{
Method: "GET",
URL: fileServer.URL,
},
Format: Csv,
TotalKeys: 1500,
Size: 100,
ChangeNumber: 100,
Name: "large_segment_test",
Version: "1111.0",
})
w.Write(data)
}))
defer ts.Close()

fetcher := NewHTTPLargeSegmentFetcher(
"api-key",
cmnConf.AdvancedConfig{
EventsURL: ts.URL,
SdkURL: ts.URL,
},
logger,
cmnDTOs.Metadata{},
)

lsData, err := fetcher.Fetch("large_segment_test", &cmnService.SegmentRequestParams{})

assert.Equal(t, "unsupported csv version 1111.0", err.Error())
assert.Equal(t, (*dtos.LargeSegmentDTO)(nil), lsData)
}

func TestFetchUnknownFormat(t *testing.T) {
logger := logging.NewLogger(&logging.LoggerOptions{})

test_csv, _ := os.ReadFile("testdata/large_segment_test.csv")
fileServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write(test_csv)
}))
defer fileServer.Close()

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data, _ := json.Marshal(dtos.RfeDTO{
Params: dtos.ParamsDTO{
Method: "GET",
URL: fileServer.URL,
},
Format: Unknown,
TotalKeys: 1500,
Size: 100,
ChangeNumber: 100,
Name: "large_segment_test",
Version: "1.0",
})
w.Write(data)
}))
defer ts.Close()

fetcher := NewHTTPLargeSegmentFetcher(
"api-key",
cmnConf.AdvancedConfig{
EventsURL: ts.URL,
SdkURL: ts.URL,
},
logger,
cmnDTOs.Metadata{},
)

lsData, err := fetcher.Fetch("large_segment_test", &cmnService.SegmentRequestParams{})

assert.Equal(t, "unsupported file format", err.Error())
assert.Equal(t, (*dtos.LargeSegmentDTO)(nil), lsData)
}

func TestFetchAPIError(t *testing.T) {
logger := logging.NewLogger(&logging.LoggerOptions{})

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
}))
defer ts.Close()

fetcher := NewHTTPLargeSegmentFetcher(
"api-key",
cmnConf.AdvancedConfig{
EventsURL: ts.URL,
SdkURL: ts.URL,
},
logger,
cmnDTOs.Metadata{},
)

lsData, err := fetcher.Fetch("large_segment_test", &cmnService.SegmentRequestParams{})
assert.Equal(t, "500 Internal Server Error", err.Error())
assert.Equal(t, (*dtos.LargeSegmentDTO)(nil), lsData)
}

func TestFetchDownloadServerError(t *testing.T) {
logger := logging.NewLogger(&logging.LoggerOptions{})

fileServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
}))
defer fileServer.Close()

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data, _ := json.Marshal(dtos.RfeDTO{
Params: dtos.ParamsDTO{
Method: "GET",
URL: fileServer.URL,
},
Format: Csv,
TotalKeys: 1500,
Size: 100,
ChangeNumber: 100,
Name: "large_segment_test",
Version: "1.0",
})
w.Write(data)
}))
defer ts.Close()

fetcher := NewHTTPLargeSegmentFetcher(
"api-key",
cmnConf.AdvancedConfig{
EventsURL: ts.URL,
SdkURL: ts.URL,
},
logger,
cmnDTOs.Metadata{},
)

lsData, err := fetcher.Fetch("large_segment_test", &cmnService.SegmentRequestParams{})
assert.Equal(t, "500 Internal Server Error", err.Error())
assert.Equal(t, (*dtos.LargeSegmentDTO)(nil), lsData)
}
Loading

0 comments on commit e084fa5

Please sign in to comment.