Skip to content

Commit

Permalink
bind results channel method in Gun interface, more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
direvius committed Jan 15, 2016
1 parent b1f3ce9 commit b474ee9
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 68 deletions.
3 changes: 2 additions & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func TestGlobalConfig(t *testing.T) {
gc := &Gun{
GunType: "spdy",
Parameters: map[string]interface{}{
"Target": "localhost:3000",
"Target": "localhost:3000",
"PingPeriod": "5",
},
}
globalConfig := &Global{
Expand Down
4 changes: 2 additions & 2 deletions engine/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (u *User) Run(ctx context.Context) error {
}()
control := u.Limiter.Control()
source := u.Ammunition.Source()
sink := u.Results.Sink()
u.Gun.BindResultsTo(u.Results.Sink())
loop:
for {
select {
Expand All @@ -40,7 +40,7 @@ loop:
}
_, more = <-control
if more {
u.Gun.Shoot(ctx, ammo, sink)
u.Gun.Shoot(ctx, ammo)
u.Ammunition.Release(ammo)
} else {
log.Println("Limiter ended.")
Expand Down
3 changes: 2 additions & 1 deletion gun/gun.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ import (
)

type Gun interface {
Shoot(context.Context, ammo.Ammo, chan<- *aggregate.Sample) error
Shoot(context.Context, ammo.Ammo) error
BindResultsTo(chan<- *aggregate.Sample)
}
20 changes: 12 additions & 8 deletions gun/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,27 @@ const (
)

type HttpGun struct {
target string
ssl bool
client *http.Client
target string
ssl bool
client *http.Client
results chan<- *aggregate.Sample
}

func (hg *HttpGun) BindResultsTo(results chan<- *aggregate.Sample) {
hg.results = results
}

// Shoot to target, this method is not thread safe
func (hg *HttpGun) Shoot(ctx context.Context, a ammo.Ammo,
results chan<- *aggregate.Sample) error {
func (hg *HttpGun) Shoot(ctx context.Context, a ammo.Ammo) error {

if hg.client == nil {
hg.Connect(results)
hg.Connect()
}
start := time.Now()
ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "REQUEST")
defer func() {
ss.RT = int(time.Since(start).Seconds() * 1e6)
results <- ss
hg.results <- ss
}()
// now send the request to obtain a http response
ha, ok := a.(*ammo.Http)
Expand Down Expand Up @@ -99,7 +103,7 @@ func (hg *HttpGun) Close() {
hg.client = nil
}

func (hg *HttpGun) Connect(results chan<- *aggregate.Sample) {
func (hg *HttpGun) Connect() {
hg.Close()
config := tls.Config{
InsecureSkipVerify: true,
Expand Down
14 changes: 8 additions & 6 deletions gun/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ func TestHttpGunWithSsl(t *testing.T) {
defer ts.Close()

gun := &HttpGun{
target: ts.Listener.Addr().String(),
ssl: true,
target: ts.Listener.Addr().String(),
ssl: true,
results: result,
}
promise := utils.Promise(func() error {
defer close(result)
Expand All @@ -48,7 +49,7 @@ func TestHttpGunWithSsl(t *testing.T) {
"Host": "example.org",
"User-Agent": "Pandora/0.0.1",
},
}, result)
})
})
results := aggregate.Drain(ctx, result)
require.Len(t, results, 1)
Expand Down Expand Up @@ -92,8 +93,9 @@ func TestHttpGunWithHttp(t *testing.T) {
defer ts.Close()

gun := &HttpGun{
target: ts.Listener.Addr().String(),
ssl: false,
target: ts.Listener.Addr().String(),
ssl: false,
results: result,
}
promise := utils.Promise(func() error {
defer close(result)
Expand All @@ -107,7 +109,7 @@ func TestHttpGunWithHttp(t *testing.T) {
"Host": "example.org",
"User-Agent": "Pandora/0.0.1",
},
}, result)
})
})
results := aggregate.Drain(ctx, result)
require.Len(t, results, 1)
Expand Down
28 changes: 9 additions & 19 deletions gun/log.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package gun

import (
"fmt"
"log"
"strconv"
"time"

"golang.org/x/net/context"

Expand All @@ -12,27 +11,18 @@ import (
"github.com/yandex/pandora/config"
)

type LogGun struct{}

func (l *LogGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- *aggregate.Sample) error {
log.Println("Log message: ", a.(*ammo.Log).Message)
results <- &aggregate.Sample{}
return nil
type LogGun struct {
results chan<- *aggregate.Sample
}

type DummySample struct {
value int
func (l *LogGun) BindResultsTo(results chan<- *aggregate.Sample) {
l.results = results
}

func (ds *DummySample) String() string {
return fmt.Sprintf("My value is %d", ds.value)
}

func (ds *DummySample) AppendToPhout(dst []byte) []byte {
dst = append(dst, "My value is "...)
dst = strconv.AppendInt(dst, int64(ds.value), 10)
dst = append(dst, '\n')
return dst
func (l *LogGun) Shoot(ctx context.Context, a ammo.Ammo) error {
log.Println("Log message: ", a.(*ammo.Log).Message)
l.results <- aggregate.AcquireSample(float64(time.Now().UnixNano())/1e9, "REQUEST")
return nil
}

func NewLogGunFromConfig(c *config.Gun) (g Gun, err error) {
Expand Down
56 changes: 32 additions & 24 deletions gun/spdy/spdy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,19 @@ import (
)

type SpdyGun struct {
pingPeriod time.Duration
target string
client *spdy.Client
target string
client *spdy.Client
results chan<- *aggregate.Sample
closed bool
}

func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- *aggregate.Sample) error {
func (sg *SpdyGun) BindResultsTo(results chan<- *aggregate.Sample) {
sg.results = results
}

func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo) error {
if sg.client == nil {
if err := sg.Connect(results); err != nil {
if err := sg.Connect(); err != nil {
return err
}
}
Expand All @@ -36,7 +41,7 @@ func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- *aggre
ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "REQUEST")
defer func() {
ss.RT = int(time.Since(start).Seconds() * 1e6)
results <- ss
sg.results <- ss
}()
// now send the request to obtain a http response
ha, ok := a.(*ammo.Http)
Expand Down Expand Up @@ -79,18 +84,19 @@ func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- *aggre
}

func (sg *SpdyGun) Close() {
sg.closed = true
if sg.client != nil {
sg.client.Close()
}
}

func (sg *SpdyGun) Connect(results chan<- *aggregate.Sample) error {
func (sg *SpdyGun) Connect() error {
// FIXME: rewrite connection logic, it isn't thread safe right now.
start := time.Now()
ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "CONNECT")
defer func() {
ss.RT = int(time.Since(start).Seconds() * 1e6)
results <- ss
sg.results <- ss
}()
config := tls.Config{
InsecureSkipVerify: true,
Expand Down Expand Up @@ -118,7 +124,7 @@ func (sg *SpdyGun) Connect(results chan<- *aggregate.Sample) error {
return nil
}

func (sg *SpdyGun) Ping(results chan<- *aggregate.Sample) {
func (sg *SpdyGun) Ping() {
if sg.client == nil {
return
}
Expand All @@ -140,9 +146,22 @@ func (sg *SpdyGun) Ping(results chan<- *aggregate.Sample) {
ss.Err = err
ss.ProtoCode = 500
}
results <- ss
sg.results <- ss
if err != nil {
sg.Connect(results)
sg.Connect()
}
}

func (sg *SpdyGun) startAutoPing(pingPeriod time.Duration) {
if pingPeriod > 0 {
go func() {
for range time.NewTicker(pingPeriod).C {
if sg.closed {
return
}
sg.Ping()
}
}()
}
}

Expand Down Expand Up @@ -171,23 +190,12 @@ func New(c *config.Gun) (gun.Gun, error) {
switch t := target.(type) {
case string:
g = &SpdyGun{
pingPeriod: pingPeriod,
target: target.(string),
target: target.(string),
}
default:
return nil, fmt.Errorf("Target is of the wrong type."+
" Expected 'string' got '%T'", t)
}
// TODO: implement this logic somewhere
// if pingPeriod > 0 {
// go func() {
// for range time.NewTicker(pingPeriod).C {
// if g.closed {
// return
// }
// g.Ping(results)
// }
// }()
// }
g.(*SpdyGun).startAutoPing(pingPeriod)
return g, nil
}
39 changes: 32 additions & 7 deletions gun/spdy/spdy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/yandex/pandora/aggregate"
"github.com/yandex/pandora/ammo"
"github.com/yandex/pandora/config"
"github.com/yandex/pandora/utils"
"golang.org/x/net/context"
)
Expand All @@ -24,8 +25,8 @@ func TestSpdyGun(t *testing.T) {
result := make(chan *aggregate.Sample)

gun := &SpdyGun{
target: "localhost:3000",
pingPeriod: time.Second * 5,
target: "localhost:3000",
results: result,
}
promise := utils.Promise(func() error {
defer gun.Close()
Expand All @@ -40,7 +41,7 @@ func TestSpdyGun(t *testing.T) {
"Host": "example.org",
"User-Agent": "Pandora/0.0.1",
},
}, result)
})
})

results := aggregate.Drain(ctx, result)
Expand Down Expand Up @@ -76,16 +77,16 @@ func TestSpdyConnectPing(t *testing.T) {
result := make(chan *aggregate.Sample)

gun := &SpdyGun{
target: "localhost:3000",
pingPeriod: time.Second * 5,
target: "localhost:3000",
results: result,
}
promise := utils.Promise(func() error {
defer gun.Close()
defer close(result)
if err := gun.Connect(result); err != nil {
if err := gun.Connect(); err != nil {
return err
}
gun.Ping(result)
gun.Ping()
return nil
})

Expand All @@ -112,6 +113,30 @@ func TestSpdyConnectPing(t *testing.T) {

}

func TestNewSpdyGun(t *testing.T) {
spdyConfig := &config.Gun{
GunType: "spdy",
Parameters: map[string]interface{}{
"Target": "localhost:3000",
"PingPeriod": 5.0,
},
}
g, err := New(spdyConfig)
assert.NoError(t, err)
_, ok := g.(*SpdyGun)
assert.Equal(t, true, ok)

failSpdyConfig := &config.Gun{
GunType: "spdy",
Parameters: map[string]interface{}{
"Target": "localhost:3000",
"PingPeriod": "not-a-number",
},
}
_, err = New(failSpdyConfig)
assert.Error(t, err)
}

func runSpdyTestServer() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
Expand Down

0 comments on commit b474ee9

Please sign in to comment.