Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: TSA available on the StartWorkflow stage #605

Merged
merged 22 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions aggregatedpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package aggregatedpool

import (
"context"
"fmt"
"strconv"
"sync/atomic"
"time"
Expand Down Expand Up @@ -98,6 +99,7 @@ func (wp *Workflow) handleCancel() {

// schedule the signal processing
func (wp *Workflow) handleSignal(name string, input *commonpb.Payloads, header *commonpb.Header) error {
wp.log.Debug("signal request", zap.String("RunID", wp.env.WorkflowInfo().WorkflowExecution.RunID), zap.String("name", name))
wp.mq.PushCommand(
internal.InvokeSignal{
RunID: wp.env.WorkflowInfo().WorkflowExecution.RunID,
Expand All @@ -113,6 +115,9 @@ func (wp *Workflow) handleSignal(name string, input *commonpb.Payloads, header *
// Handle query in blocking mode.
func (wp *Workflow) handleQuery(queryType string, queryArgs *commonpb.Payloads, header *commonpb.Header) (*commonpb.Payloads, error) {
const op = errors.Op("workflow_process_handle_query")

wp.log.Debug("query request", zap.String("RunID", wp.env.WorkflowInfo().WorkflowExecution.RunID), zap.String("name", queryType))

result, err := wp.runCommand(internal.InvokeQuery{
RunID: wp.env.WorkflowInfo().WorkflowExecution.RunID,
Name: queryType,
Expand Down Expand Up @@ -304,6 +309,172 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {
return errors.E(op, err)
}

case *internal.UpsertWorkflowTypedSearchAttributes:
wp.log.Debug("upsert typed search attributes request", zap.Uint64("ID", msg.ID), zap.Any("search_attributes", command.SearchAttributes))
var sau []temporal.SearchAttributeUpdate

for k, v := range command.SearchAttributes {
switch v.Type {
case internal.BoolType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyBool(k).ValueUnset())
continue
}
if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(bool); ok {
sau = append(sau, temporal.NewSearchAttributeKeyBool(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a bool type", zap.String("key", k), zap.Any("value", v.Value))
}

case internal.FloatType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyFloat64(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(float64); ok {
sau = append(sau, temporal.NewSearchAttributeKeyFloat64(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a float64 type", zap.String("key", k), zap.Any("value", v.Value))
}

case internal.IntType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

switch ti := v.Value.(type) {
case float64:
sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueSet(int64(ti)))
case int:
sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueSet(int64(ti)))
case int64:
sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueSet(ti))
case int32:
sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueSet(int64(ti)))
case int16:
sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueSet(int64(ti)))
case int8:
sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueSet(int64(ti)))
case string:
i, err := strconv.ParseInt(ti, 10, 64)
if err != nil {
wp.log.Warn("failed to parse int", zap.Error(err))
continue
}
sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueSet(i))
default:
wp.log.Warn("field value is not an int type", zap.String("key", k), zap.Any("value", v.Value))
}

case internal.KeywordType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyKeyword(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(string); ok {
sau = append(sau, temporal.NewSearchAttributeKeyKeyword(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a string type", zap.String("key", k), zap.Any("value", v.Value))
}
case internal.KeywordListType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyKeywordList(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

switch tt := v.Value.(type) {
case []string:
sau = append(sau, temporal.NewSearchAttributeKeyKeywordList(k).ValueSet(tt))
case []any:
var res []string
for _, v := range tt {
if s, ok := v.(string); ok {
res = append(res, s)
}
}
sau = append(sau, temporal.NewSearchAttributeKeyKeywordList(k).ValueSet(res))
default:
wp.log.Warn("field value is not a []string (strings array) type", zap.String("key", k), zap.Any("value", v.Value))
}

case internal.StringType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyString(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(string); ok {
sau = append(sau, temporal.NewSearchAttributeKeyString(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a string type", zap.String("key", k), zap.Any("value", v.Value))
}
case internal.DatetimeType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyTime(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(string); ok {
tm, err := time.Parse(time.RFC3339, tt)
if err != nil {
return errors.E(op, fmt.Errorf("failed to parse time into RFC3339: %w", err))
}

sau = append(sau, temporal.NewSearchAttributeKeyTime(k).ValueSet(tm))
} else {
wp.log.Warn("bool field value is not a bool type", zap.String("key", k), zap.Any("value", v.Value))
}
}
}

if len(sau) == 0 {
wp.log.Warn("search attributes called, but no attributes were set")
return nil
}

err := wp.env.UpsertTypedSearchAttributes(temporal.NewSearchAttributes(sau...))
if err != nil {
return errors.E(op, err)
}

case *internal.SignalExternalWorkflow:
wp.log.Debug("signal external workflow request", zap.Uint64("ID", msg.ID))
wp.env.SignalExternalWorkflow(
Expand Down
132 changes: 125 additions & 7 deletions aggregatedpool/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package aggregatedpool
import (
"context"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/temporalio/roadrunner-temporal/v5/queue"
"github.com/temporalio/roadrunner-temporal/v5/registry"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
temporalClient "go.temporal.io/sdk/client"
bindings "go.temporal.io/sdk/internalbindings"
"go.uber.org/zap"
Expand Down Expand Up @@ -131,23 +133,139 @@ func (wp *Workflow) Execute(env bindings.WorkflowEnvironment, header *commonpb.H
env.RegisterQueryHandler(wp.handleQuery)
env.RegisterUpdateHandler(wp.handleUpdate)

var lastCompletion = bindings.GetLastCompletionResult(env)
var lastCompletionOffset = 0
// check if we have some TSA
tsa := env.TypedSearchAttributes()
// start workflow command
stwfcmd := internal.StartWorkflow{
Info: env.WorkflowInfo(),
}

// search attributes types are:
/*
INDEXED_VALUE_TYPE_TEXT IndexedValueType = 1
INDEXED_VALUE_TYPE_KEYWORD IndexedValueType = 2
INDEXED_VALUE_TYPE_INT IndexedValueType = 3
INDEXED_VALUE_TYPE_DOUBLE IndexedValueType = 4
INDEXED_VALUE_TYPE_BOOL IndexedValueType = 5
INDEXED_VALUE_TYPE_DATETIME IndexedValueType = 6
INDEXED_VALUE_TYPE_KEYWORD_LIST IndexedValueType = 7

*/
// only process if there're values, obviously
if tsa.Size() > 0 {
untuped := tsa.GetUntypedValues()
tsaParsed := make(map[string]*internal.TypedSearchAttribute, tsa.Size())
for k, v := range untuped {
vt := k.GetValueType()
switch vt {
// just for the linters, should be never reached
case enumspb.INDEXED_VALUE_TYPE_UNSPECIFIED:
continue
case enumspb.INDEXED_VALUE_TYPE_TEXT:
str, ok := v.(string)
if !ok {
wp.log.Warn("typed search attribute found, but it is not a string", zap.String("key", k.GetName()))
continue
}
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.StringType,
Value: str,
}
case enumspb.INDEXED_VALUE_TYPE_KEYWORD:
str, ok := v.(string)
if !ok {
wp.log.Warn("typed search attribute found, but it is not a string[keyword]", zap.String("key", k.GetName()))
continue
}
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.KeywordType,
Value: str,
}
case enumspb.INDEXED_VALUE_TYPE_INT:
switch tt := v.(type) {
case int:
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.IntType,
Value: tt,
}
case int64:
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.IntType,
Value: tt,
}
case string:
res, err := strconv.Atoi(tt)
if err != nil {
wp.log.Warn("typed search attribute found, but it is not an int", zap.Error(err), zap.String("key", k.GetName()))
continue
}
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.IntType,
Value: res,
}
default:
wp.log.Warn("typed search attribute found, but it is not an int", zap.String("key", k.GetName()))
continue
}
case enumspb.INDEXED_VALUE_TYPE_DOUBLE:
str, ok := v.(float64)
if !ok {
wp.log.Warn("typed search attribute found, but it is not a float64", zap.String("key", k.GetName()))
continue
}
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.FloatType,
Value: str,
}
case enumspb.INDEXED_VALUE_TYPE_BOOL:
str, ok := v.(bool)
if !ok {
wp.log.Warn("typed search attribute found, but it is not a bool", zap.String("key", k.GetName()))
continue
}
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.BoolType,
Value: str,
}
case enumspb.INDEXED_VALUE_TYPE_DATETIME:
str, ok := v.(time.Time)
if !ok {
wp.log.Warn("typed search attribute found, but it is not a datetime", zap.String("key", k.GetName()))
continue
}
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.DatetimeType,
Value: str.Format(time.RFC3339),
}
case enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST:
str, ok := v.([]string)
if !ok {
wp.log.Warn("typed search attribute found, but it is not a []string", zap.String("key", k.GetName()))
continue
}
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.KeywordListType,
Value: str,
}
}
}

// set typed search attributes
stwfcmd.SearchAttributes = tsaParsed
}

var lastCompletion = bindings.GetLastCompletionResult(env)
if lastCompletion != nil && len(lastCompletion.Payloads) != 0 {
if input == nil {
input = &commonpb.Payloads{Payloads: []*commonpb.Payload{}}
}

input.Payloads = append(input.Payloads, lastCompletion.Payloads...)
lastCompletionOffset = len(lastCompletion.Payloads)
stwfcmd.LastCompletion = len(lastCompletion.Payloads)
}

wp.mq.PushCommand(
internal.StartWorkflow{
Info: env.WorkflowInfo(),
LastCompletion: lastCompletionOffset,
},
stwfcmd,
input,
wp.header,
)
Expand Down
Loading
Loading