Skip to content

Commit

Permalink
Revert refactoring http ammo
Browse files Browse the repository at this point in the history
Revert "refactoring http ammo"
This reverts commit 19bec38cfddef61db1c13b7f15ef8b542acf6a95.
  • Loading branch information
oke11o committed Jun 9, 2023
1 parent 9747a00 commit ab46885
Show file tree
Hide file tree
Showing 21 changed files with 475 additions and 412 deletions.
103 changes: 20 additions & 83 deletions components/providers/base/ammo.go
Original file line number Diff line number Diff line change
@@ -1,110 +1,47 @@
package base

import (
"bytes"
"errors"
"fmt"
"io"
"net/http"
urlpkg "net/url"
import "github.com/yandex/pandora/core/aggregator/netsample"

"github.com/yandex/pandora/components/providers/http/util"
"github.com/yandex/pandora/core/aggregator/netsample"
"github.com/yandex/pandora/lib/netutil"
)

func NewAmmo(method string, url string, body []byte, header http.Header, tag string) (*Ammo, error) {
if ok := netutil.ValidHTTPMethod(method); !ok {
return nil, errors.New("invalid HTTP method " + method)
}
if _, err := urlpkg.Parse(url); err != nil {
return nil, fmt.Errorf("invalid URL %s; err %w ", url, err)
}
return &Ammo{
method: method,
body: body,
url: url,
tag: tag,
header: header,
constructor: true,
}, nil
}

type Ammo struct {
Req *http.Request
method string
body []byte
url string
tag string
header http.Header
id uint64
isInvalid bool
constructor bool
type Ammo[R any] struct {
Req *R
tag string
id uint64
isInvalid bool
}

func (a *Ammo) Request() (*http.Request, *netsample.Sample) {
if a.Req == nil {
_ = a.BuildRequest() // TODO: what if error. There isn't a logger
}
func (a *Ammo[R]) Request() (*R, *netsample.Sample) {
sample := netsample.Acquire(a.Tag())
sample.SetID(a.ID())
return a.Req, sample
}

func (a *Ammo) SetID(id uint64) {
func (a *Ammo[R]) Reset(req *R, tag string) {
a.Req = req
a.tag = tag
a.id = 0
a.isInvalid = false
}

func (a *Ammo[_]) SetID(id uint64) {
a.id = id
}

func (a *Ammo) ID() uint64 {
func (a *Ammo[_]) ID() uint64 {
return a.id
}

func (a *Ammo) Invalidate() {
func (a *Ammo[_]) Invalidate() {
a.isInvalid = true
}

func (a *Ammo) IsInvalid() bool {
func (a *Ammo[_]) IsInvalid() bool {
return a.isInvalid
}

func (a *Ammo) IsValid() bool {
func (a *Ammo[_]) IsValid() bool {
return !a.isInvalid
}

func (a *Ammo) SetTag(tag string) {
a.tag = tag
}

func (a *Ammo) Tag() string {
func (a *Ammo[_]) Tag() string {
return a.tag
}

func (a *Ammo) FromConstructor() bool {
return a.constructor
}

// use NewAmmo() for skipping error here
func (a *Ammo) BuildRequest() error {
var buff io.Reader
if a.body != nil {
buff = bytes.NewReader(a.body)
}
req, err := http.NewRequest(a.method, a.url, buff)
if err != nil {
return fmt.Errorf("cant create request: %w", err)
}
a.Req = req
util.EnrichRequestWithHeaders(req, a.header)
return nil
}

func (a *Ammo) Reset() {
a.Req = nil
a.method = ""
a.body = nil
a.url = ""
a.tag = ""
a.header = nil
a.id = 0
a.isInvalid = false
}
17 changes: 17 additions & 0 deletions components/providers/base/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package base

import "sync"

type Decoder[R any] struct {
Sink chan<- *Ammo[R]
Pool *sync.Pool
}

func NewDecoder[R any](sink chan<- *Ammo[R]) Decoder[R] {
return Decoder[R]{
Sink: sink,
Pool: &sync.Pool{New: func() any {
return new(Ammo[R])
}},
}
}
2 changes: 1 addition & 1 deletion components/providers/base/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

type ProviderBase struct {
Deps core.ProviderDeps
core.ProviderDeps
FS afero.Fs
idCounter atomic.Uint64
}
Expand Down
2 changes: 1 addition & 1 deletion components/providers/http/ammo.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ type Request interface {
http.Request
}

var _ phttp.Ammo = (*base.Ammo)(nil)
var _ phttp.Ammo = (*base.Ammo[http.Request])(nil)
3 changes: 1 addition & 2 deletions components/providers/http/decoders/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"io"
"net/http"

"github.com/yandex/pandora/components/providers/base"
"github.com/yandex/pandora/components/providers/http/config"
"github.com/yandex/pandora/components/providers/http/util"
)
Expand All @@ -25,7 +24,7 @@ var (

type Decoder interface {
// Decode(context.Context, chan<- *base.Ammo[http.Request], io.ReadSeeker) error
Scan(context.Context) (*base.Ammo, error)
Scan(context.Context) (*http.Request, string, error)
}

type protoDecoder struct {
Expand Down
74 changes: 38 additions & 36 deletions components/providers/http/decoders/jsonline.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"net/http"
"strings"

"github.com/yandex/pandora/components/providers/base"
"github.com/yandex/pandora/components/providers/http/config"
"github.com/yandex/pandora/components/providers/http/decoders/jsonline"
"github.com/yandex/pandora/components/providers/http/util"
"golang.org/x/xerrors"
)

Expand All @@ -35,51 +35,53 @@ type jsonlineDecoder struct {
line uint
}

func (d *jsonlineDecoder) Scan(ctx context.Context) (*base.Ammo, error) {
func (d *jsonlineDecoder) Scan(ctx context.Context) (*http.Request, string, error) {
if d.config.Limit != 0 && d.ammoNum >= d.config.Limit {
return nil, ErrAmmoLimit
return nil, "", ErrAmmoLimit
}
for {
if d.config.Passes != 0 && d.passNum >= d.config.Passes {
return nil, ErrPassLimit
for ; ; d.line++ {
if ctx.Err() != nil {
return nil, "", ctx.Err()
}

for d.scanner.Scan() {
d.line++
data := d.scanner.Bytes()
if len(strings.TrimSpace(string(data))) == 0 {
continue
}
d.ammoNum++
ammo, err := jsonline.DecodeAmmo(data, d.decodedConfigHeaders)
if err != nil {
if !d.config.ContinueOnError {
return nil, xerrors.Errorf("failed to decode ammo at line: %v; data: %q, with err: %w", d.line+1, data, err)
if !d.scanner.Scan() {
if d.scanner.Err() == nil { // assume as io.EOF; FIXME: check possible nil error with other reason
d.line = 0
d.passNum++
if d.config.Passes != 0 && d.passNum >= d.config.Passes {
return nil, "", ErrPassLimit
}
if d.ammoNum == 0 {
return nil, "", ErrNoAmmo
}
// TODO: add log message about error
continue // skipping ammo
_, err := d.file.Seek(0, io.SeekStart)
if err != nil {
return nil, "", err
}
d.scanner = bufio.NewScanner(d.file)
if d.config.MaxAmmoSize != 0 {
var buffer []byte
d.scanner.Buffer(buffer, d.config.MaxAmmoSize)
}
continue
}
return ammo, err
return nil, "", d.scanner.Err()
}

err := d.scanner.Err()
if err != nil {
return nil, err
}
if d.ammoNum == 0 {
return nil, ErrNoAmmo
data := d.scanner.Bytes()
if len(strings.TrimSpace(string(data))) == 0 {
continue
}
d.line = 0
d.passNum++
d.ammoNum++

_, err = d.file.Seek(0, io.SeekStart)
req, tag, err := jsonline.DecodeAmmo(data)
if err != nil {
return nil, err
}
d.scanner = bufio.NewScanner(d.file)
if d.config.MaxAmmoSize != 0 {
var buffer []byte
d.scanner.Buffer(buffer, d.config.MaxAmmoSize)
if !d.config.ContinueOnError {
return nil, "", xerrors.Errorf("failed to decode ammo at line: %v; data: %q, with err: %w", d.line+1, data, err)
}
// TODO: add log message about error
continue // skipping ammo
}
util.EnrichRequestWithHeaders(req, d.decodedConfigHeaders)
return req, tag, err
}
}
32 changes: 18 additions & 14 deletions components/providers/http/decoders/jsonline/data.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//go:generate github.com/pquerna/ffjson@latest data_ffjson.go
//go:generate github.com/pquerna/ffjson data_ffjson.go

package jsonline

import (
"net/http"
"strings"

"github.com/pkg/errors"
"github.com/yandex/pandora/components/providers/base"
)

// ffjson: noencoder
Expand All @@ -24,20 +24,24 @@ type data struct {
Body string `json:"body"`
}

func DecodeAmmo(jsonDoc []byte, headers http.Header) (*base.Ammo, error) {
var d = new(data)
if err := d.UnmarshalJSON(jsonDoc); err != nil {
err = errors.WithStack(err)
return nil, err
func (d *data) ToRequest() (*http.Request, error) {
uri := "http://" + d.Host + d.URI
req, err := http.NewRequest(d.Method, uri, strings.NewReader(d.Body))
if err != nil {
return nil, errors.WithStack(err)
}

for k, v := range d.Headers {
headers.Set(k, v)
req.Header.Set(k, v)
}
url := "http://" + d.Host + d.URI
var body []byte
if d.Body != "" {
body = []byte(d.Body)
return req, err
}

func DecodeAmmo(jsonDoc []byte) (*http.Request, string, error) {
var data = new(data)
if err := data.UnmarshalJSON(jsonDoc); err != nil {
err = errors.WithStack(err)
return nil, data.Tag, err
}
return base.NewAmmo(d.Method, url, body, headers, d.Tag)
req, err := data.ToRequest()
return req, data.Tag, err
}
Loading

0 comments on commit ab46885

Please sign in to comment.