Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: TSA available on the StartWorkflow stage #605

Merged
merged 22 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions aggregatedpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package aggregatedpool

import (
"context"
"fmt"
"strconv"
"sync/atomic"
"time"
Expand Down Expand Up @@ -98,6 +99,7 @@ func (wp *Workflow) handleCancel() {

// schedule the signal processing
func (wp *Workflow) handleSignal(name string, input *commonpb.Payloads, header *commonpb.Header) error {
wp.log.Debug("signal request", zap.String("RunID", wp.env.WorkflowInfo().WorkflowExecution.RunID), zap.String("name", name))
wp.mq.PushCommand(
internal.InvokeSignal{
RunID: wp.env.WorkflowInfo().WorkflowExecution.RunID,
Expand All @@ -113,6 +115,9 @@ func (wp *Workflow) handleSignal(name string, input *commonpb.Payloads, header *
// Handle query in blocking mode.
func (wp *Workflow) handleQuery(queryType string, queryArgs *commonpb.Payloads, header *commonpb.Header) (*commonpb.Payloads, error) {
const op = errors.Op("workflow_process_handle_query")

wp.log.Debug("query request", zap.String("RunID", wp.env.WorkflowInfo().WorkflowExecution.RunID), zap.String("name", queryType))

result, err := wp.runCommand(internal.InvokeQuery{
RunID: wp.env.WorkflowInfo().WorkflowExecution.RunID,
Name: queryType,
Expand Down Expand Up @@ -304,6 +309,143 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {
return errors.E(op, err)
}

case *internal.UpsertWorkflowTypedSearchAttributes:
wp.log.Debug("upsert typed search attributes request", zap.Uint64("ID", msg.ID), zap.Any("search_attributes", command.SearchAttributes))
var sau []temporal.SearchAttributeUpdate

for k, v := range command.SearchAttributes {
switch v.Type {
case internal.BoolType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyBool(k).ValueUnset())
continue
}
if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(bool); ok {
sau = append(sau, temporal.NewSearchAttributeKeyBool(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a bool type", zap.String("key", k), zap.Any("value", v.Value))
}

case internal.FloatType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyFloat64(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(float64); ok {
sau = append(sau, temporal.NewSearchAttributeKeyFloat64(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a float64 type", zap.String("key", k), zap.Any("value", v.Value))
}

case internal.IntType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(int); ok {
sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueSet(int64(tt)))
} else {
wp.log.Warn("field value is not an int type", zap.String("key", k), zap.Any("value", v.Value))
}
case internal.KeywordType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyKeyword(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(string); ok {
sau = append(sau, temporal.NewSearchAttributeKeyKeyword(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a string type", zap.String("key", k), zap.Any("value", v.Value))
}
case internal.KeywordListType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyKeywordList(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.([]string); ok {
sau = append(sau, temporal.NewSearchAttributeKeyKeywordList(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a []string (strings array) type", zap.String("key", k), zap.Any("value", v.Value))
}
case internal.StringType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyString(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(string); ok {
sau = append(sau, temporal.NewSearchAttributeKeyString(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a string type", zap.String("key", k), zap.Any("value", v.Value))
}
case internal.DatetimeType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyTime(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(string); ok {
tm, err := time.Parse(time.RFC3339, tt)
if err != nil {
return errors.E(op, fmt.Errorf("failed to parse time into RFC3339: %w", err))
}

sau = append(sau, temporal.NewSearchAttributeKeyTime(k).ValueSet(tm))
} else {
wp.log.Warn("bool field value is not a bool type", zap.String("key", k), zap.Any("value", v.Value))
}
}
}

if len(sau) == 0 {
wp.log.Warn("search attributes called, but no attributes were set")
return nil
}

err := wp.env.UpsertTypedSearchAttributes(temporal.NewSearchAttributes(sau...))
if err != nil {
return errors.E(op, err)
}

case *internal.SignalExternalWorkflow:
wp.log.Debug("signal external workflow request", zap.Uint64("ID", msg.ID))
wp.env.SignalExternalWorkflow(
Expand Down
115 changes: 108 additions & 7 deletions aggregatedpool/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/temporalio/roadrunner-temporal/v5/queue"
"github.com/temporalio/roadrunner-temporal/v5/registry"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
temporalClient "go.temporal.io/sdk/client"
bindings "go.temporal.io/sdk/internalbindings"
"go.uber.org/zap"
Expand Down Expand Up @@ -131,23 +132,123 @@ func (wp *Workflow) Execute(env bindings.WorkflowEnvironment, header *commonpb.H
env.RegisterQueryHandler(wp.handleQuery)
env.RegisterUpdateHandler(wp.handleUpdate)

var lastCompletion = bindings.GetLastCompletionResult(env)
var lastCompletionOffset = 0
// check if we have some TSA
tsa := env.TypedSearchAttributes()
// start workflow command
stwfcmd := internal.StartWorkflow{
Info: env.WorkflowInfo(),
}

// search attributes types are:
/*
INDEXED_VALUE_TYPE_TEXT IndexedValueType = 1
INDEXED_VALUE_TYPE_KEYWORD IndexedValueType = 2
INDEXED_VALUE_TYPE_INT IndexedValueType = 3
INDEXED_VALUE_TYPE_DOUBLE IndexedValueType = 4
INDEXED_VALUE_TYPE_BOOL IndexedValueType = 5
INDEXED_VALUE_TYPE_DATETIME IndexedValueType = 6
INDEXED_VALUE_TYPE_KEYWORD_LIST IndexedValueType = 7

*/
// only process if there're values, obviously
if tsa.Size() > 0 {
untuped := tsa.GetUntypedValues()
tsaParsed := make(map[string]*internal.TypedSearchAttribute, tsa.Size())
for k, v := range untuped {
vt := k.GetValueType()
switch vt {
// just for the linters, should be never reached
case enumspb.INDEXED_VALUE_TYPE_UNSPECIFIED:
continue
case enumspb.INDEXED_VALUE_TYPE_TEXT:
str, ok := v.(string)
if !ok {
wp.log.Warn("typed search attribute found, but it is not a string", zap.String("key", k.GetName()))
continue
}
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.StringType,
Value: str,
}
case enumspb.INDEXED_VALUE_TYPE_KEYWORD:
str, ok := v.(string)
if !ok {
wp.log.Warn("typed search attribute found, but it is not a string[keyword]", zap.String("key", k.GetName()))
continue
}
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.KeywordType,
Value: str,
}
case enumspb.INDEXED_VALUE_TYPE_INT:
str, ok := v.(int)
if !ok {
wp.log.Warn("typed search attribute found, but it is not an int", zap.String("key", k.GetName()))
continue
}
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.IntType,
Value: str,
}
case enumspb.INDEXED_VALUE_TYPE_DOUBLE:
str, ok := v.(float64)
if !ok {
wp.log.Warn("typed search attribute found, but it is not a float64", zap.String("key", k.GetName()))
continue
}
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.FloatType,
Value: str,
}
case enumspb.INDEXED_VALUE_TYPE_BOOL:
str, ok := v.(bool)
if !ok {
wp.log.Warn("typed search attribute found, but it is not a bool", zap.String("key", k.GetName()))
continue
}
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.BoolType,
Value: str,
}
case enumspb.INDEXED_VALUE_TYPE_DATETIME:
str, ok := v.(time.Time)
if !ok {
wp.log.Warn("typed search attribute found, but it is not a datetime", zap.String("key", k.GetName()))
continue
}
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.DatetimeType,
Value: str.Format(time.RFC3339),
}
case enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST:
str, ok := v.([]string)
if !ok {
wp.log.Warn("typed search attribute found, but it is not a []string", zap.String("key", k.GetName()))
continue
}
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.KeywordListType,
Value: str,
}
}
}

// set typed search attributes
stwfcmd.SearchAttributes = tsaParsed
}

var lastCompletion = bindings.GetLastCompletionResult(env)
if lastCompletion != nil && len(lastCompletion.Payloads) != 0 {
if input == nil {
input = &commonpb.Payloads{Payloads: []*commonpb.Payload{}}
}

input.Payloads = append(input.Payloads, lastCompletion.Payloads...)
lastCompletionOffset = len(lastCompletion.Payloads)
stwfcmd.LastCompletion = len(lastCompletion.Payloads)
}

wp.mq.PushCommand(
internal.StartWorkflow{
Info: env.WorkflowInfo(),
LastCompletion: lastCompletionOffset,
},
stwfcmd,
input,
wp.header,
)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/roadrunner-server/pool v1.1.2
github.com/stretchr/testify v1.10.0
github.com/uber-go/tally/v4 v4.1.17-0.20240412215630-22fe011f5ff0
go.temporal.io/api v1.43.0
go.temporal.io/api v1.43.1
go.temporal.io/sdk v1.31.0
go.temporal.io/sdk/contrib/tally v0.2.0
go.temporal.io/server v1.26.2
Expand Down Expand Up @@ -69,6 +69,6 @@ require (
golang.org/x/time v0.9.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422 // indirect
google.golang.org/grpc v1.69.2
google.golang.org/grpc v1.69.4
gopkg.in/yaml.v3 v3.0.1 // indirect
)
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qq
go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.temporal.io/api v1.5.0/go.mod h1:BqKxEJJYdxb5dqf0ODfzfMxh8UEQ5L3zKS51FiIYYkA=
go.temporal.io/api v1.43.0 h1:lBhq+u5qFJqGMXwWsmg/i8qn1UA/3LCwVc88l2xUMHg=
go.temporal.io/api v1.43.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/api v1.43.1 h1:44Q12pUczfGkcAwZtJNhfv3+L6RFzL3kNk547/r8QY8=
go.temporal.io/api v1.43.1/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/sdk v1.12.0/go.mod h1:lSp3lH1lI0TyOsus0arnO3FYvjVXBZGi/G7DjnAnm6o=
go.temporal.io/sdk v1.31.0 h1:CLYiP0R5Sdj0gq8LyYKDDz4ccGOdJPR8wNGJU0JGwj8=
go.temporal.io/sdk v1.31.0/go.mod h1:8U8H7rF9u4Hyb4Ry9yiEls5716DHPNvVITPNkgWUwE8=
Expand Down Expand Up @@ -390,8 +390,8 @@ google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3Iji
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU=
google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A=
google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
Loading
Loading