Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multiple issues with a driver pool #250

Merged
merged 4 commits into from
Feb 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions cmd/bblfshd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import (
"flag"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"

_ "net/http/pprof"

"github.com/bblfsh/bblfshd/daemon"
"github.com/bblfsh/bblfshd/runtime"

Expand Down Expand Up @@ -46,6 +49,10 @@ var (
format *string
fields *string
}
pprof struct {
enabled *bool
address *string
}
cmd *flag.FlagSet

usrListener net.Listener
Expand All @@ -66,6 +73,8 @@ func init() {
log.level = cmd.String("log-level", "info", "log level: panic, fatal, error, warning, info, debug.")
log.format = cmd.String("log-format", "text", "format of the logs: text or json.")
log.fields = cmd.String("log-fields", "", "extra fields to add to every log line in json format.")
pprof.enabled = cmd.Bool("profiler", false, "run profiler http endpoint (pprof).")
pprof.address = cmd.String("profiler-address", ":6060", "profiler address to listen on.")
cmd.Parse(os.Args[1:])

buildLogger()
Expand Down Expand Up @@ -101,6 +110,15 @@ func installRecommended(d *daemon.Daemon) error {
func main() {
logrus.Infof("bblfshd version: %s (build: %s)", version, build)

if *pprof.enabled {
logrus.Infof("running pprof on %s", *pprof.address)
go func() {
if err := http.ListenAndServe(*pprof.address, nil); err != nil {
logrus.Errorf("cannot start pprof: %v", err)
}
}()
}

c, err := jaegercfg.FromEnv()
if err != nil {
logrus.Errorf("error configuring tracer: %s", err)
Expand Down
3 changes: 2 additions & 1 deletion daemon/common_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package daemon

import (
"context"
"encoding/hex"
"fmt"
"os"
Expand Down Expand Up @@ -44,7 +45,7 @@ func (d *mockDriver) ServiceV2() protocol2.DriverClient {
return nil
}

func (d *mockDriver) Start() error {
func (d *mockDriver) Start(ctx context.Context) error {
return nil
}

Expand Down
27 changes: 20 additions & 7 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Daemon struct {
runtime *runtime.Runtime
driverEnv []string

mu sync.Mutex
mu sync.RWMutex
pool map[string]*DriverPool
}

Expand Down Expand Up @@ -131,10 +131,17 @@ func (d *Daemon) RemoveDriver(language string) error {
}

func (d *Daemon) DriverPool(ctx context.Context, language string) (*DriverPool, error) {
d.mu.RLock()
dp, ok := d.pool[language]
d.mu.RUnlock()
if ok {
return dp, nil
}

d.mu.Lock()
defer d.mu.Unlock()

if dp, ok := d.pool[language]; ok {
dp, ok = d.pool[language]
if ok {
return dp, nil
}

Expand Down Expand Up @@ -179,7 +186,7 @@ func (d *Daemon) newDriverPool(rctx context.Context, language string, image runt
return nil, err
}

if err := driver.Start(); err != nil {
if err := driver.Start(rctx); err != nil {
return nil, err
}

Expand All @@ -191,8 +198,12 @@ func (d *Daemon) newDriverPool(rctx context.Context, language string, image runt
"language": language,
})

if err := dp.Start(); err != nil {
return nil, err
}

d.pool[language] = dp
return dp, dp.Start()
return dp, nil
}

func (d *Daemon) removePool(language string) error {
Expand All @@ -211,8 +222,8 @@ func (d *Daemon) removePool(language string) error {

// Current returns the current list of driver pools.
func (d *Daemon) Current() map[string]*DriverPool {
d.mu.Lock()
defer d.mu.Unlock()
d.mu.RLock()
defer d.mu.RUnlock()

out := make(map[string]*DriverPool, len(d.pool))
for k, pool := range d.pool {
Expand All @@ -224,6 +235,8 @@ func (d *Daemon) Current() map[string]*DriverPool {

// Stop stops all the pools and containers.
func (d *Daemon) Stop() error {
d.mu.Lock()
defer d.mu.Unlock()
var err error
for _, dp := range d.pool {
if cerr := dp.Stop(); cerr != nil && err != nil {
Expand Down
25 changes: 15 additions & 10 deletions daemon/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

type Driver interface {
ID() string
Start() error
Start(ctx context.Context) error
Stop() error
Status() (protocol.Status, error)
State() (*protocol.DriverInstanceState, error)
Expand All @@ -47,10 +47,9 @@ type DriverInstance struct {
}

const (
DriverBinary = "/opt/driver/bin/driver"
GRPCSocket = "rpc.sock"
TmpPathPattern = "/tmp/%s"
ConnectionTimeout = 5 * time.Second
DriverBinary = "/opt/driver/bin/driver"
GRPCSocket = "rpc.sock"
TmpPathPattern = "/tmp/%s"
)

type Options struct {
Expand Down Expand Up @@ -116,12 +115,13 @@ func (i *DriverInstance) ID() string {
}

// Start starts a container and connects to it.
func (i *DriverInstance) Start() error {
func (i *DriverInstance) Start(ctx context.Context) error {
if err := i.Container.Start(); err != nil {
return err
}

if err := i.dial(); err != nil {
if err := i.dial(ctx); err != nil {
_ = i.Container.Stop()
return err
}

Expand All @@ -132,19 +132,24 @@ func (i *DriverInstance) Start() error {
return nil
}

func (i *DriverInstance) dial() error {
func (i *DriverInstance) dial(ctx context.Context) error {
addr := filepath.Join(i.tmp, GRPCSocket)

opts := []grpc.DialOption{
grpc.WithDialer(func(addr string, t time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, t)
}),
// always wait for the connection to become active
grpc.WithBlock(),
grpc.WithTimeout(ConnectionTimeout),
// we want to know sooner rather than later
// TODO(dennwc): sometimes the initialization of the container takes >5 sec
// meaning that the time between Container.Start and the actual
// execution of a Go server (not the native driver) takes this long
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, could you please help me understand where would this time be visible in our current instrumentation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be included in the bblfshd.pool.newDriverPool span in Jaeger.

But there is no direct measurement for this specific delay - it will account for the whole "start driver, wait for the gRPC server" step. I plan to add more instrumentation later because it requires changes to SDK and drivers.

grpc.WithBackoffMaxDelay(time.Second),
grpc.WithInsecure(),
}
opts = append(opts, protocol2.DialOptions()...)
conn, err := grpc.Dial(addr, opts...)
conn, err := grpc.DialContext(ctx, addr, opts...)

i.conn = conn
i.srv1 = protocol1.NewProtocolServiceClient(conn)
Expand Down
11 changes: 9 additions & 2 deletions daemon/driver_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package daemon

import (
"context"
"io/ioutil"
"os"
"testing"
Expand Down Expand Up @@ -32,7 +33,10 @@ func TestNewDriver(t *testing.T) {

require.NoError(err)

err = i.Start()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

err = i.Start(ctx)
require.NoError(err)

time.Sleep(50 * time.Millisecond)
Expand Down Expand Up @@ -60,7 +64,10 @@ func TestDriverInstance_State(t *testing.T) {
require.Len(state.Processes, 0)
require.True(state.Created.IsZero())

err = i.Start()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

err = i.Start(ctx)
require.NoError(err)
defer func() {
err = i.Stop()
Expand Down
6 changes: 4 additions & 2 deletions daemon/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,18 @@ func TestDriverPoolExecute_Recovery(t *testing.T) {
err := dp.Start()
require.NoError(err)

ctx := context.Background()

for i := 0; i < 100; i++ {
err := dp.Execute(func(_ context.Context, d Driver) error {
err := dp.ExecuteCtx(ctx, func(_ context.Context, d Driver) error {
require.NotNil(d)

if i%10 == 0 {
d.(*mockDriver).MockStatus = protocol.Stopped
}

return nil
}, 0)
})

require.Nil(err)
require.Len(dp.Current(), 1)
Expand Down