Skip to content

Commit

Permalink
fix: workflow definition race
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <[email protected]>
  • Loading branch information
rustatian committed Sep 10, 2023
1 parent 2700a47 commit ec45e21
Show file tree
Hide file tree
Showing 7 changed files with 351 additions and 266 deletions.
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package roadrunner_temporal //nolint:revive,stylecheck
package rrtemporal

import (
"crypto/tls"
Expand Down
2 changes: 1 addition & 1 deletion info.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package roadrunner_temporal //nolint:revive,stylecheck
package rrtemporal

import (
"context"
Expand Down
159 changes: 159 additions & 0 deletions internal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package rrtemporal

import (
"context"
"time"

"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/sdk/v4/pool"
"github.com/temporalio/roadrunner-temporal/v4/aggregatedpool"
"github.com/temporalio/roadrunner-temporal/v4/data_converter"
"github.com/temporalio/roadrunner-temporal/v4/internal/codec/proto"
"github.com/temporalio/roadrunner-temporal/v4/internal/logger"
tclient "go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/worker"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

func (p *Plugin) initPool() error {
var err error
ap, err := p.server.NewPool(context.Background(), p.config.Activities, map[string]string{RrMode: pluginName, RrCodec: RrCodecVal}, p.log)
if err != nil {
return err
}

dc := data_converter.NewDataConverter(converter.GetDefaultDataConverter())
codec := proto.NewCodec(p.log, dc)

actDef := aggregatedpool.NewActivityDefinition(codec, ap, p.log)

// ---------- WORKFLOW POOL -------------
wp, err := p.server.NewPool(
context.Background(),
&pool.Config{
NumWorkers: 1,
Command: p.config.Activities.Command,
AllocateTimeout: time.Hour * 240,
DestroyTimeout: time.Second * 30,
// no supervisor for the workflow worker
Supervisor: nil,
},
map[string]string{RrMode: pluginName, RrCodec: RrCodecVal},
nil,
)
if err != nil {
return err
}

wfDef := aggregatedpool.NewWorkflowDefinition(codec, wp, p.log)

// get worker information
wi, err := WorkerInfo(codec, wp, p.rrVersion)
if err != nil {
return err
}

if len(wi) == 0 {
return errors.Str("worker info should contain at least 1 worker")
}

err = p.initTemporalClient(wi[0].PhpSdkVersion, dc)
if err != nil {
return err
}

workers, err := aggregatedpool.TemporalWorkers(wfDef, actDef, wi, p.log, p.temporal.client, p.temporal.interceptors)
if err != nil {
return err
}

for i := 0; i < len(workers); i++ {
err = workers[i].Start()
if err != nil {
return err
}
}

if len(wp.Workers()) < 1 {
return errors.E(errors.Str("failed to allocate a workflow worker"))
}

// set all fields
// we have only 1 worker for the workflow pool
p.wwPID = int(wp.Workers()[0].Pid())

p.temporal.rrWorkflowDef = wfDef
p.temporal.rrActivityDef = actDef
p.temporal.workers = workers
p.codec = codec

p.temporal.activities = ActivitiesInfo(wi)
p.temporal.workflows = WorkflowsInfo(wi)
p.actP = ap
p.wfP = wp

return nil
}

func (p *Plugin) getWfDef() *aggregatedpool.Workflow {
p.mu.RLock()
defer p.mu.RUnlock()
return p.temporal.rrWorkflowDef
}

func (p *Plugin) getActDef() *aggregatedpool.Activity {
p.mu.RLock()
defer p.mu.RUnlock()
return p.temporal.rrActivityDef
}

func (p *Plugin) initTemporalClient(phpSdkVersion string, dc converter.DataConverter) error {
if phpSdkVersion == "" {
phpSdkVersion = clientBaselineVersion
}
p.log.Debug("PHP-SDK version: " + phpSdkVersion)
worker.SetStickyWorkflowCacheSize(p.config.CacheSize)

opts := tclient.Options{
HostPort: p.config.Address,
MetricsHandler: p.temporal.mh,
Namespace: p.config.Namespace,
Logger: logger.NewZapAdapter(p.log),
DataConverter: dc,
ConnectionOptions: tclient.ConnectionOptions{
TLS: p.temporal.tlsCfg,
DialOptions: []grpc.DialOption{
grpc.WithUnaryInterceptor(rewriteNameAndVersion(phpSdkVersion)),
},
},
}

var err error
p.temporal.client, err = tclient.Dial(opts)
if err != nil {
return err
}

p.log.Info("connected to temporal server", zap.String("address", p.config.Address))

return nil
}

func rewriteNameAndVersion(phpSdkVersion string) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
md, _, _ := metadata.FromOutgoingContextRaw(ctx)
if md == nil {
return invoker(ctx, method, req, reply, cc, opts...)
}

md.Set(clientNameHeaderName, clientNameHeaderValue)
md.Set(clientVersionHeaderName, phpSdkVersion)

ctx = metadata.NewOutgoingContext(ctx, md)

return invoker(ctx, method, req, reply, cc, opts...)
}
}
29 changes: 28 additions & 1 deletion metrics.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package roadrunner_temporal //nolint:revive,stylecheck
package rrtemporal

import (
"io"
"time"

"github.com/cactus/go-statsd-client/v5/statsd"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/sdk/v4/metrics"
"github.com/roadrunner-server/sdk/v4/state/process"
"github.com/uber-go/tally/v4"
"github.com/uber-go/tally/v4/prometheus"
tclient "go.temporal.io/sdk/client"
ttally "go.temporal.io/sdk/contrib/tally" // temporal tally hanlders
statsdreporter "go.temporal.io/server/common/metrics/tally/statsd"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -117,3 +120,27 @@ func newStatsdScope(statsdConfig *Statsd) (tally.Scope, io.Closer, error) {

return scope, closer, nil
}

// init RR metrics
func initMetrics(cfg *Config, log *zap.Logger) (tclient.MetricsHandler, io.Closer, error) {
switch cfg.Metrics.Driver {
case driverPrometheus:
ms, cl, err := newPrometheusScope(prometheus.Configuration{
ListenAddress: cfg.Metrics.Prometheus.Address,
TimerType: cfg.Metrics.Prometheus.Type,
}, cfg.Metrics.Prometheus.Prefix, log)
if err != nil {
return nil, nil, err
}

return ttally.NewMetricsHandler(ms), cl, nil
case driverStatsd:
ms, cl, err := newStatsdScope(cfg.Metrics.Statsd)
if err != nil {
return nil, nil, err
}
return ttally.NewMetricsHandler(ms), cl, nil
default:
return nil, nil, errors.Errorf("unknown driver provided: %s", cfg.Metrics.Driver)
}
}
Loading

0 comments on commit ec45e21

Please sign in to comment.