Skip to content

Commit

Permalink
[#475]: fix: async updates handler
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Feb 29, 2024
2 parents c84c354 + beaf58f commit 1b0f945
Show file tree
Hide file tree
Showing 48 changed files with 2,190 additions and 984 deletions.
87 changes: 81 additions & 6 deletions .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,77 @@ on:
- stable

jobs:
rrtemporal_updates_test:
name: RR Temporal updates tests with (Go ${{ matrix.go }}, PHP ${{ matrix.php }}, OS ${{matrix.os}})
runs-on: ${{ matrix.os }}
timeout-minutes: 60
strategy:
matrix:
php: [ "8.3" ]
go: [ stable ]
os: [ "ubuntu-latest" ]
steps:
- name: Set up Go ${{ matrix.go }}
uses: actions/setup-go@v5 # action page: <https://github.com/actions/setup-go>
with:
go-version: ${{ matrix.go }}

- name: Set up PHP ${{ matrix.php }}
uses: shivammathur/setup-php@v2 # action page: <https://github.com/shivammathur/setup-php>
with:
php-version: ${{ matrix.php }}
extensions: sockets

- name: Check out code
uses: actions/checkout@v4

- name: Get Composer Cache Directory
id: composer-cache
run: |
cd tests/php_test_files
echo "dir=$(composer config cache-files-dir)" >> $GITHUB_OUTPUT
- name: Init Composer Cache # Docs: <https://git.io/JfAKn#php---composer>
uses: actions/cache@v4
with:
path: ${{ steps.composer-cache.outputs.dir }}
key: ${{ runner.os }}-composer-${{ matrix.php }}-${{ hashFiles('**/composer.json') }}
restore-keys: ${{ runner.os }}-composer-

- name: Install Composer dependencies
run: cd tests/php_test_files && composer update --prefer-dist --no-progress --ansi

- name: Init Go modules Cache # Docs: <https://git.io/JfAKn#go---modules>
uses: actions/cache@v4
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ runner.os }}-go-

- name: Install Go dependencies
run: go mod download

- name: Create folders
run: |
mkdir ./tests/coverage-ci
- name: Run Temporal tests with coverage
run: |
cd tests
docker-compose -f env/docker-compose-temporal-updates.yaml up -d --remove-orphans
go test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=$(cat pkgs.txt) -coverprofile=./coverage-ci/rrt_upd.out -covermode=atomic ./updates
docker-compose -f env/docker-compose-temporal-updates.yaml up -d --remove-orphans
- name: Archive code coverage results
uses: actions/upload-artifact@v4
with:
name: coverage_1
path: ./tests/coverage-ci

rrtemporal_test:
name: RR Temporal plugin (Go ${{ matrix.go }}, PHP ${{ matrix.php }}, OS ${{matrix.os}})
name: RR Temporal general tests with (Go ${{ matrix.go }}, PHP ${{ matrix.php }}, OS ${{matrix.os}})
runs-on: ${{ matrix.os }}
timeout-minutes: 60
strategy:
Expand Down Expand Up @@ -82,7 +151,7 @@ jobs:
cd tests
docker-compose -f env/docker-compose-temporal.yaml up -d --remove-orphans
go test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=$(cat pkgs.txt) -coverprofile=./coverage-ci/rrt.out -covermode=atomic cancel_test.go child_test.go disaster_test.go general_test.go helpers.go hp_test.go interceptor_test.go metrics_test.go otlp_test.go query_test.go signal_test.go temporal_interceptor_plugin.go rpc_test.go
go test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=$(cat pkgs.txt) -coverprofile=./coverage-ci/rrt.out -covermode=atomic ./general
docker-compose -f env/docker-compose-temporal.yaml up -d --remove-orphans
Expand All @@ -93,7 +162,7 @@ jobs:
path: ./tests/coverage-ci

rrtemporal_tls_test:
name: RR Temporal TLS plugin (Go ${{ matrix.go }}, PHP ${{ matrix.php }}, OS ${{matrix.os}})
name: RR Temporal TLS plugin tests with (Go ${{ matrix.go }}, PHP ${{ matrix.php }}, OS ${{matrix.os}})
runs-on: ${{ matrix.os }}
timeout-minutes: 60
strategy:
Expand Down Expand Up @@ -156,14 +225,14 @@ jobs:
docker-compose -f env/temporal_tls/docker-compose.yml up -d --remove-orphans
sleep 60
go test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=$(cat pkgs.txt) -coverprofile=./coverage-ci/rrt_tls.out -covermode=atomic ./tls/cancel_tls_test.go ./tls/child_tls_test.go ./tls/disaster_tls_test.go ./tls/hp_tls_test.go ./tls/metrics_tls_test.go ./tls/query_tls_test.go ./tls/signal_tls_test.go
go test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=$(cat pkgs.txt) -coverprofile=./coverage-ci/rrt_tls.out -covermode=atomic ./tls
docker-compose -f env/temporal_tls/docker-compose.yml down
- name: Archive code coverage results
uses: actions/upload-artifact@v4
with:
name: coverage_1
name: coverage_3
path: ./tests/coverage-ci

codecov:
Expand All @@ -172,6 +241,7 @@ jobs:
needs:
- rrtemporal_test
- rrtemporal_tls_test
- rrtemporal_updates_test

timeout-minutes: 60
steps:
Expand All @@ -186,9 +256,14 @@ jobs:
echo 'mode: atomic' > summary.txt
tail -q -n +2 *.out >> summary.txt
sed -i '2,${/roadrunner/!d}' summary.txt
cd ../coverage_3
echo 'mode: atomic' > summary.txt
tail -q -n +2 *.out >> summary.txt
sed -i '2,${/roadrunner/!d}' summary.txt
- name: upload to codecov
uses: codecov/codecov-action@v4 # Docs: <https://github.com/codecov/codecov-action>
with:
files: ./coverage_1/summary.txt,./coverage_2/summary.txt
files: ./coverage_1/summary.txt,./coverage_2/summary.txt,./coverage_3/summary.txt

fail_ci_if_error: false
118 changes: 73 additions & 45 deletions aggregatedpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (
const (
completed string = "completed"
// update types
validate string = "validate"
execute string = "execute"
valExec string = "validate_execute"
)

// execution context.
Expand All @@ -38,56 +37,52 @@ func (wp *Workflow) getContext() *internal.Context {
func (wp *Workflow) handleUpdate(name string, id string, input *commonpb.Payloads, header *commonpb.Header, callbacks bindings.UpdateCallbacks) {
wp.log.Debug("update request received", zap.String("RunID", wp.env.WorkflowInfo().WorkflowExecution.RunID), zap.String("name", name), zap.String("id", id))

wp.updatesQueue = append(wp.updatesQueue, name)
// save update name
wp.updatesQueue[name] = struct{}{}
rid := wp.env.WorkflowInfo().WorkflowExecution.RunID

// this callback executed in the OnTick function
callback := func() {
// we don't execute update validation during replay
if !wp.env.IsReplaying() {
result, err := wp.runCommand(internal.InvokeUpdate{
RunID: rid,
UpdateID: id,
Name: name,
Type: validate,
}, input, header)

if err != nil {
callbacks.Reject(err)
return
updatesQueueCb := func() {
// validate callback
wp.updateValidateCb[id] = func(msg *internal.Message) {
wp.log.Debug("validate request callback", zap.String("RunID", wp.env.WorkflowInfo().WorkflowExecution.RunID), zap.String("name", name), zap.String("id", id), zap.Bool("is_replaying", wp.env.IsReplaying()), zap.Any("result", msg))
if !wp.env.IsReplaying() {
// before accept we have only one option - reject
if msg.Failure != nil {
callbacks.Reject(temporal.GetDefaultFailureConverter().FailureToError(msg.Failure))
return
}
}

// before accept we have only one option - reject
if result.Failure != nil {
callbacks.Reject(temporal.GetDefaultFailureConverter().FailureToError(result.Failure))
return
}
// update should be accepted on validate
callbacks.Accept()
}

callbacks.Accept()

result, err := wp.runCommand(internal.InvokeUpdate{
RunID: rid,
UpdateID: id,
Name: name,
Type: execute,
}, input, header)

if err != nil {
callbacks.Complete(nil, err)
return
}
// execute callback
wp.updateCompleteCb[id] = func(msg *internal.Message) {
wp.log.Debug("update request callback", zap.String("RunID", wp.env.WorkflowInfo().WorkflowExecution.RunID), zap.String("name", name), zap.String("id", id), zap.Any("result", msg))
if msg.Failure != nil {
callbacks.Complete(nil, temporal.GetDefaultFailureConverter().FailureToError(msg.Failure))
return
}

wp.log.Debug("update request result", zap.String("RunID", wp.env.WorkflowInfo().WorkflowExecution.RunID), zap.String("name", name), zap.String("id", id), zap.Any("result", result))
if result.Failure != nil {
callbacks.Complete(nil, temporal.GetDefaultFailureConverter().FailureToError(result.Failure))
return
callbacks.Complete(msg.Payloads, nil)
}

callbacks.Complete(result.Payloads, nil)
// push validate command
wp.mq.PushCommand(
&internal.InvokeUpdate{
RunID: rid,
UpdateID: id,
Name: name,
Type: valExec,
},
input,
header,
)
}

wp.env.QueueUpdate(name, callback)
wp.env.QueueUpdate(name, updatesQueueCb)
}

// schedule cancel command
Expand Down Expand Up @@ -234,6 +229,44 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {
wp.createContinuableCallback(msg.ID, "SideEffect"),
)

case *internal.UpdateCompleted:
wp.log.Debug("complete update request", zap.String("update id", command.ID))

if command.ID == "" {
wp.log.Error("update id is empty, can't complete update", zap.String("workflow id", wp.env.WorkflowInfo().WorkflowExecution.ID), zap.String("run id", wp.env.WorkflowInfo().WorkflowExecution.RunID))
return errors.Str("update id is empty, can't complete update")
}

if _, ok := wp.updateCompleteCb[command.ID]; !ok {
wp.log.Warn("no such update ID, can't complete update", zap.String("requested id", command.ID))
// TODO(rustatian): error here?
return nil
}

wp.updateCompleteCb[command.ID](msg)
delete(wp.updateCompleteCb, command.ID)

case *internal.UpdateValidated:
wp.log.Debug("validate update request", zap.String("update id", command.ID))

if command.ID == "" {
wp.log.Error("update id is empty, can't validate update", zap.String("workflow id", wp.env.WorkflowInfo().WorkflowExecution.ID), zap.String("run id", wp.env.WorkflowInfo().WorkflowExecution.RunID))
return errors.Str("update id is empty, can't validate update")
}

if _, ok := wp.updateValidateCb[command.ID]; !ok {
wp.log.Warn("no such update ID, can't validate update", zap.String("requested id", command.ID))
// TODO(rustatian): error here?
return nil
}

wp.updateValidateCb[command.ID](msg)
delete(wp.updateValidateCb, command.ID)
// delete updateCompleteCb in case of error
if msg.Failure != nil {
delete(wp.updateCompleteCb, command.ID)
}

case *internal.CompleteWorkflow:
wp.log.Debug("complete workflow request", zap.Uint64("ID", msg.ID))
result, _ := wp.env.GetDataConverter().ToPayloads(completed)
Expand Down Expand Up @@ -450,11 +483,6 @@ func (wp *Workflow) flushQueue() error {
return err
}
wp.mq.Flush()

if err != nil {
return errors.E(op, err)
}

wp.pipeline = append(wp.pipeline, msgs...)

return nil
Expand Down
43 changes: 31 additions & 12 deletions aggregatedpool/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,15 @@ type Workflow struct {
ids *registry.IDRegistry
seqID uint64
pipeline []*internal.Message
updatesQueue []string
updatesQueue map[string]struct{}
callbacks []Callback
canceller *canceller.Canceller
inLoop uint32

// updates
updateCompleteCb map[string]func(res *internal.Message)
updateValidateCb map[string]func(res *internal.Message)

log *zap.Logger
mh temporalClient.MetricsHandler

Expand All @@ -81,7 +85,12 @@ func NewWorkflowDefinition(codec common.Codec, pool common.Pool, log *zap.Logger
// DO NOT USE THIS FUNCTION DIRECTLY!!!!
func (wp *Workflow) NewWorkflowDefinition() bindings.WorkflowDefinition {
return &Workflow{
rrID: uuid.NewString(),
rrID: uuid.NewString(),
// updates logic
updateCompleteCb: make(map[string]func(res *internal.Message)),
updateValidateCb: make(map[string]func(res *internal.Message)),
updatesQueue: map[string]struct{}{},
// -- updates
pool: wp.pool,
codec: wp.codec,
log: wp.log,
Expand All @@ -101,7 +110,6 @@ func (wp *Workflow) Execute(env bindings.WorkflowEnvironment, header *commonpb.H
wp.env = env
wp.header = header
wp.seqID = 0
wp.updatesQueue = make([]string, 0, 1)
wp.canceller = new(canceller.Canceller)

// sequenceID shared for all pool workflows
Expand Down Expand Up @@ -160,20 +168,21 @@ func (wp *Workflow) OnWorkflowTaskStarted(t time.Duration) {

wp.callbacks = nil

// at first, we should flush our queue with command, e.g.: startWorkflow
err = wp.flushQueue()
if err != nil {
panic(err)
}

// handle updates
if len(wp.updatesQueue) > 0 {
for i := 0; i < len(wp.updatesQueue); i++ {
wp.env.HandleQueuedUpdates(wp.updatesQueue[i])
for k := range wp.updatesQueue {
wp.env.HandleQueuedUpdates(k)
delete(wp.updatesQueue, k)
}
}
// clean
wp.updatesQueue = make([]string, 0, 1)
wp.updatesQueue = map[string]struct{}{}

// at first, we should flush our queue with command, e.g.: startWorkflow
err = wp.flushQueue()
if err != nil {
panic(err)
}

for len(wp.pipeline) > 0 {
msg := wp.pipeline[0]
Expand Down Expand Up @@ -228,6 +237,16 @@ func (wp *Workflow) Close() {
if wp.env.DrainUnhandledUpdates() {
wp.log.Info("drained unhandled updates")
}

// clean the map
for k := range wp.updatesQueue {
delete(wp.updatesQueue, k)
}

for k := range wp.updateCompleteCb {
delete(wp.updateCompleteCb, k)
}

// send destroy command
_, _ = wp.runCommand(internal.DestroyWorkflow{RunID: wp.env.WorkflowInfo().WorkflowExecution.RunID}, nil, wp.header)
// flush queue
Expand Down
Loading

0 comments on commit 1b0f945

Please sign in to comment.