Skip to content

Commit

Permalink
simplify and serialize nginx (#131)
Browse files Browse the repository at this point in the history
* prevent parallel reloads of nginx

* fix nil pointer error scenario on shutdown

* skip config random delay if less than a second

* add log lines for nginx restart flow

* add service ID to test service logger
  • Loading branch information
Amir Arad authored Jan 6, 2020
1 parent fefebcd commit cad38eb
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 40 deletions.
12 changes: 8 additions & 4 deletions boyar/boyar.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/orbs-network/scribe/log"
"github.com/pkg/errors"
"strings"
"sync"
)

type Cache struct {
Expand All @@ -36,10 +37,11 @@ type Boyar interface {
}

type boyar struct {
strelets strelets.Strelets
config config.NodeConfiguration
cache *Cache
logger log.Logger
nginxLock sync.Mutex
strelets strelets.Strelets
config config.NodeConfiguration
cache *Cache
logger log.Logger
}

type errorContainer struct {
Expand Down Expand Up @@ -93,6 +95,8 @@ func (b *boyar) ProvisionVirtualChains(ctx context.Context) error {
}

func (b *boyar) ProvisionHttpAPIEndpoint(ctx context.Context) error {
b.nginxLock.Lock()
defer b.nginxLock.Unlock()
// TODO is there a better way to get a loopback interface?
nginxConfig := getNginxConfig(b.config)

Expand Down
16 changes: 10 additions & 6 deletions services/core_boyar.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func NewCoreBoyarService(logger log.Logger) *BoyarService {

func (coreBoyar *BoyarService) OnConfigChange(ctx context.Context, cfg config.NodeConfiguration) error {

orchestrator, err := adapter.NewDockerSwarm(cfg.OrchestratorOptions())
orchestrator, err := adapter.NewDockerSwarm(cfg.OrchestratorOptions(), coreBoyar.logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -58,11 +58,15 @@ func (coreBoyar *BoyarService) OnConfigChange(ctx context.Context, cfg config.No
return nil
}

func randomDelay(ctx context.Context, cfg config.NodeConfiguration, maxDelay time.Duration, logger log.Logger) {
func maybeDelayConfigUpdate(ctx context.Context, cfg config.NodeConfiguration, maxDelay time.Duration, logger log.Logger) {
reloadTimeDelay := cfg.ReloadTimeDelay(maxDelay)
logger.Info("waiting to apply new configuration", log.String("delay", maxDelay.String()))
select {
case <-time.After(reloadTimeDelay):
case <-ctx.Done():
if reloadTimeDelay.Seconds() > 1 { // the delay is designed to break symmetry between nodes. less than a second is practically zero
logger.Info("waiting to update configuration", log.String("delay", maxDelay.String()))
select {
case <-time.After(reloadTimeDelay):
case <-ctx.Done():
}
} else {
logger.Info("updating configuration immediately")
}
}
15 changes: 12 additions & 3 deletions services/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,20 @@ func Execute(ctx context.Context, flags *config.Flags, logger log.Logger) (govnr

// wire cfg and boyar
supervisor.Supervise(govnr.Forever(ctx, "apply config changes", utils.NewLogErrors("apply config changes", logger), func() {
cfg := <-cfgFetcher.Output

var cfg config.NodeConfiguration = nil
select {
case <-ctx.Done():
return
case cfg = <-cfgFetcher.Output:
}
if cfg == nil {
return
}
// random delay when provisioning change (that is, not bootstrap flow or repairing broken system)
if coreBoyar.healthy {
randomDelay(ctx, cfg, flags.MaxReloadTimeDelay, coreBoyar.logger)
maybeDelayConfigUpdate(ctx, cfg, flags.MaxReloadTimeDelay, coreBoyar.logger)
} else {
logger.Info("applying new configuration immediately")
}

ctx, cancel := context.WithTimeout(ctx, flags.Timeout)
Expand Down
2 changes: 1 addition & 1 deletion services/report_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func formatAsISO6801(t time.Time) string {

func reportStatus(ctx context.Context, logger log.Logger, since time.Duration) error {
// We really don't need any options here since we're just observing
orchestrator, err := adapter.NewDockerSwarm(adapter.OrchestratorOptions{})
orchestrator, err := adapter.NewDockerSwarm(adapter.OrchestratorOptions{}, logger)
if err != nil {
return err
}
Expand Down
15 changes: 11 additions & 4 deletions strelets/adapter/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/client"
"github.com/orbs-network/scribe/log"
"github.com/pkg/errors"
"os"
)

type dockerSwarmOrchestrator struct {
client *client.Client
options OrchestratorOptions
logger log.Logger
}

type dockerSwarmSecretsConfig struct {
Expand All @@ -29,14 +31,14 @@ type dockerSwarmNginxSecretsConfig struct {
sslPrivateKeyId string
}

func NewDockerSwarm(options OrchestratorOptions) (Orchestrator, error) {
func NewDockerSwarm(options OrchestratorOptions, logger log.Logger) (Orchestrator, error) {
client, err := client.NewClientWithOpts(client.WithVersion(DOCKER_API_VERSION))

if err != nil {
return nil, err
}

return &dockerSwarmOrchestrator{client: client, options: options}, nil
return &dockerSwarmOrchestrator{client: client, options: options, logger: logger}, nil
}

func (d *dockerSwarmOrchestrator) PullImage(ctx context.Context, imageName string) error {
Expand Down Expand Up @@ -67,12 +69,17 @@ func (d *dockerSwarmOrchestrator) ServiceRemove(ctx context.Context, serviceName
Filters: filters.NewArgs(filters.KeyValuePair{Key: "name", Value: serviceName}),
})
if err != nil {
return fmt.Errorf("could not list swarm services: %s", err)
return fmt.Errorf("could not list swarm services: %s \n %v", serviceName, err)
}
if len(services) == 0 {
d.logger.Info(fmt.Sprintf("no service found for removal: %s", serviceName))
return nil
}
for _, service := range services {

if err := d.client.ServiceRemove(ctx, service.ID); err != nil {
return fmt.Errorf("failed to remove service %s with id %s", serviceName, service.ID)
} else {
d.logger.Info(fmt.Sprintf("successfully removed service %s with id %s", serviceName, service.ID))
}
}
return nil
Expand Down
5 changes: 3 additions & 2 deletions strelets/adapter/swarm_get_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
dockerSwarm "github.com/docker/docker/api/types/swarm"
dockerClient "github.com/docker/docker/client"
"github.com/orbs-network/boyarin/test/helpers"
"github.com/orbs-network/scribe/log"
"github.com/stretchr/testify/require"
"strings"
"testing"
Expand All @@ -20,7 +21,7 @@ func TestDockerSwarm_GetStatusIfUnableToStart(t *testing.T) {
serviceId := startDefunctContainer(t)
defer destroyDefunctBusybox(t, serviceId)

swarm, err := NewDockerSwarm(OrchestratorOptions{})
swarm, err := NewDockerSwarm(OrchestratorOptions{}, log.GetLogger())
require.NoError(t, err)

require.True(t, helpers.Eventually(30*time.Second, func() bool {
Expand All @@ -46,7 +47,7 @@ func TestDockerSwarm_GetStatusIfExitsImmediately(t *testing.T) {
serviceId := startReloadingContainer(t)
defer destroyDefunctBusybox(t, serviceId)

swarm, err := NewDockerSwarm(OrchestratorOptions{})
swarm, err := NewDockerSwarm(OrchestratorOptions{}, log.GetLogger())
require.NoError(t, err)

require.True(t, helpers.Eventually(30*time.Second, func() bool {
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/e2e_boyar_with_signer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/orbs-network/boyarin/strelets"
"github.com/orbs-network/boyarin/strelets/adapter"
"github.com/orbs-network/boyarin/test/helpers"
"github.com/orbs-network/scribe/log"
"github.com/stretchr/testify/require"
"testing"
)
Expand All @@ -16,7 +17,7 @@ func TestE2ESingleVchainWithSignerWithSwarmAndBoyar(t *testing.T) {
// helpers.SkipOnCI(t)
helpers.WithContext(func(ctx context.Context) {
helpers.InitSwarmEnvironment(t, ctx)
swarm, err := adapter.NewDockerSwarm(adapter.OrchestratorOptions{})
swarm, err := adapter.NewDockerSwarm(adapter.OrchestratorOptions{}, log.GetLogger())
require.NoError(t, err)
s := strelets.NewStrelets(swarm)

Expand Down
5 changes: 3 additions & 2 deletions test/e2e/e2e_reverse_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/orbs-network/boyarin/strelets"
"github.com/orbs-network/boyarin/strelets/adapter"
"github.com/orbs-network/boyarin/test/helpers"
"github.com/orbs-network/scribe/log"
"github.com/stretchr/testify/require"
"io/ioutil"
"net/http"
Expand All @@ -24,7 +25,7 @@ func Test_UpdateReverseProxyWithSwarm(t *testing.T) {
server.Start()
defer server.Shutdown()

api, err := adapter.NewDockerSwarm(adapter.OrchestratorOptions{})
api, err := adapter.NewDockerSwarm(adapter.OrchestratorOptions{}, log.GetLogger())
require.NoError(t, err)

s := strelets.NewStrelets(api)
Expand Down Expand Up @@ -77,7 +78,7 @@ func Test_CreateReverseProxyWithSSL(t *testing.T) {
server.Start()
defer server.Shutdown()

api, err := adapter.NewDockerSwarm(adapter.OrchestratorOptions{})
api, err := adapter.NewDockerSwarm(adapter.OrchestratorOptions{}, log.GetLogger())
require.NoError(t, err)

s := strelets.NewStrelets(api)
Expand Down
9 changes: 5 additions & 4 deletions test/e2e/e2e_strelets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/orbs-network/boyarin/strelets"
"github.com/orbs-network/boyarin/strelets/adapter"
"github.com/orbs-network/boyarin/test/helpers"
"github.com/orbs-network/scribe/log"
"github.com/stretchr/testify/require"
"testing"
"time"
Expand Down Expand Up @@ -75,7 +76,7 @@ func TestE2EWithDockerSwarm(t *testing.T) {
// helpers.SkipOnCI(t)
helpers.WithContext(func(ctx context.Context) {
helpers.InitSwarmEnvironment(t, ctx)
swarm, err := adapter.NewDockerSwarm(adapter.OrchestratorOptions{})
swarm, err := adapter.NewDockerSwarm(adapter.OrchestratorOptions{}, log.GetLogger())
require.NoError(t, err)
s := strelets.NewStrelets(swarm)

Expand All @@ -91,7 +92,7 @@ func TestE2EKeepVolumesBetweenReloadsWithSwarm(t *testing.T) {
helpers.SkipOnCI(t)
helpers.WithContext(func(ctx context.Context) {
helpers.InitSwarmEnvironment(t, ctx)
swarm, err := adapter.NewDockerSwarm(adapter.OrchestratorOptions{})
swarm, err := adapter.NewDockerSwarm(adapter.OrchestratorOptions{}, log.GetLogger())
require.NoError(t, err)
s := strelets.NewStrelets(swarm)

Expand Down Expand Up @@ -133,7 +134,7 @@ func TestCreateServiceSysctls(t *testing.T) {
}
defer client.Close()

swarm, err := adapter.NewDockerSwarm(adapter.OrchestratorOptions{})
swarm, err := adapter.NewDockerSwarm(adapter.OrchestratorOptions{}, log.GetLogger())
require.NoError(t, err)
s := strelets.NewStrelets(swarm)

Expand Down Expand Up @@ -192,7 +193,7 @@ func TestCreateSignerService(t *testing.T) {
}
defer client.Close()

swarm, err := adapter.NewDockerSwarm(adapter.OrchestratorOptions{})
swarm, err := adapter.NewDockerSwarm(adapter.OrchestratorOptions{}, log.GetLogger())
require.NoError(t, err)
s := strelets.NewStrelets(swarm)

Expand Down
27 changes: 14 additions & 13 deletions test/helpers/service_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ func LogSwarmServices(t *testing.T, ctx context.Context) {
}

type LogLine struct {
ServiceName string
IsError bool
Text string
Prefix string
IsError bool
Text string
}

type Log = <-chan *LogLine
Expand All @@ -40,7 +40,7 @@ func PrintLog(log Log, w io.Writer) {
if l.IsError {
prefix = "ERROR"
}
_, err := fmt.Fprintln(w, l.ServiceName, ":", prefix, l.Text)
_, err := fmt.Fprintln(w, l.Prefix, ":", prefix, l.Text)
if err != nil {
fmt.Println("error printing log line", err)
}
Expand Down Expand Up @@ -132,7 +132,7 @@ func multiplexLogs(ctx context.Context, logsLock *sync.Mutex, logs map[string]Lo
return multiLog
}

func writerLogPipe(ctx context.Context, serviceName string, isError bool) (io.WriteCloser, Log) {
func writerLogPipe(ctx context.Context, prefix string, isError bool) (io.WriteCloser, Log) {
src := ioutils.NewBytesPipe()
dst := make(chan *LogLine)
scanner := bufio.NewScanner(src)
Expand All @@ -143,9 +143,9 @@ func writerLogPipe(ctx context.Context, serviceName string, isError bool) (io.Wr
return
}
dst <- &LogLine{
ServiceName: serviceName,
IsError: isError,
Text: scanner.Text(),
Prefix: prefix,
IsError: isError,
Text: scanner.Text(),
}
}
}()
Expand All @@ -162,19 +162,20 @@ func ReadServiceLog(ctx context.Context, cli *client.Client, service swarm.Servi
if err != nil {
return nil, err
}
stdOut, stdOutLog := writerLogPipe(ctx, service.Spec.Name, false)
stdErr, stdErrLog := writerLogPipe(ctx, service.Spec.Name, true)
servicePrefix := service.Spec.Name + "|" + service.ID
stdOut, stdOutLog := writerLogPipe(ctx, servicePrefix, false)
stdErr, stdErrLog := writerLogPipe(ctx, servicePrefix, true)

go func() {
defer closeCloser(muxLogs, "muxLogs")
defer closeCloser(stdOut, "stdOut")
defer closeCloser(stdErr, "stdErr")
fmt.Println("started reading logs from", service.Spec.Name)
fmt.Println("started reading logs from", servicePrefix)
_, err = stdcopy.StdCopy(stdOut, stdErr, muxLogs)
if err != nil && ctx.Err() == nil {
fmt.Println("error reading", service.Spec.Name, err)
fmt.Println("error reading", servicePrefix, err)
} else {
fmt.Println("stopped reading logs from", service.Spec.Name)
fmt.Println("stopped reading logs from", servicePrefix)
}
}()
return merge(stdOutLog, stdErrLog), nil
Expand Down

0 comments on commit cad38eb

Please sign in to comment.