Skip to content

Commit

Permalink
[#599]: feature: support typed search attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Jan 13, 2025
2 parents 82b2269 + 3580ee6 commit 5cce3d5
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 4,086 deletions.
138 changes: 138 additions & 0 deletions aggregatedpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package aggregatedpool

import (
"context"
"fmt"
"strconv"
"sync/atomic"
"time"
Expand Down Expand Up @@ -304,6 +305,143 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {
return errors.E(op, err)
}

case *internal.UpsertWorkflowTypedSearchAttributes:
wp.log.Debug("upsert typed search attributes request", zap.Uint64("ID", msg.ID), zap.Any("search_attributes", command.SearchAttributes))
var sau []temporal.SearchAttributeUpdate

for k, v := range command.SearchAttributes {
switch v.Type {
case internal.BoolType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyBool(k).ValueUnset())
continue
}
if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(bool); ok {
sau = append(sau, temporal.NewSearchAttributeKeyBool(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a bool type", zap.String("key", k), zap.Any("value", v.Value))
}

case internal.FloatType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyFloat64(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(float64); ok {
sau = append(sau, temporal.NewSearchAttributeKeyFloat64(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a float64 type", zap.String("key", k), zap.Any("value", v.Value))
}

case internal.IntType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(int); ok {
sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueSet(int64(tt)))
} else {
wp.log.Warn("field value is not an int type", zap.String("key", k), zap.Any("value", v.Value))
}
case internal.KeywordType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyKeyword(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(string); ok {
sau = append(sau, temporal.NewSearchAttributeKeyKeyword(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a string type", zap.String("key", k), zap.Any("value", v.Value))
}
case internal.KeywordListType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyKeywordList(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.([]string); ok {
sau = append(sau, temporal.NewSearchAttributeKeyKeywordList(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a []string (strings array) type", zap.String("key", k), zap.Any("value", v.Value))
}
case internal.StringType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyString(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(string); ok {
sau = append(sau, temporal.NewSearchAttributeKeyString(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a string type", zap.String("key", k), zap.Any("value", v.Value))
}
case internal.DatetimeType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyTime(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(string); ok {
tm, err := time.Parse(time.RFC3339, tt)
if err != nil {
return errors.E(op, fmt.Errorf("failed to parse time into RFC3339: %w", err))
}

sau = append(sau, temporal.NewSearchAttributeKeyTime(k).ValueSet(tm))
} else {
wp.log.Warn("bool field value is not a bool type", zap.String("key", k), zap.Any("value", v.Value))
}
}
}

if len(sau) == 0 {
wp.log.Warn("search attributes called, but no attributes were set")
return nil
}

err := wp.env.UpsertTypedSearchAttributes(temporal.NewSearchAttributes(sau...))
if err != nil {
return errors.E(op, err)
}

case *internal.SignalExternalWorkflow:
wp.log.Debug("signal external workflow request", zap.Uint64("ID", msg.ID))
wp.env.SignalExternalWorkflow(
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/roadrunner-server/pool v1.1.2
github.com/stretchr/testify v1.10.0
github.com/uber-go/tally/v4 v4.1.17-0.20240412215630-22fe011f5ff0
go.temporal.io/api v1.43.0
go.temporal.io/api v1.43.1
go.temporal.io/sdk v1.31.0
go.temporal.io/sdk/contrib/tally v0.2.0
go.temporal.io/server v1.26.2
Expand Down Expand Up @@ -69,6 +69,6 @@ require (
golang.org/x/time v0.9.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422 // indirect
google.golang.org/grpc v1.69.2
google.golang.org/grpc v1.69.4
gopkg.in/yaml.v3 v3.0.1 // indirect
)
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qq
go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.temporal.io/api v1.5.0/go.mod h1:BqKxEJJYdxb5dqf0ODfzfMxh8UEQ5L3zKS51FiIYYkA=
go.temporal.io/api v1.43.0 h1:lBhq+u5qFJqGMXwWsmg/i8qn1UA/3LCwVc88l2xUMHg=
go.temporal.io/api v1.43.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/api v1.43.1 h1:44Q12pUczfGkcAwZtJNhfv3+L6RFzL3kNk547/r8QY8=
go.temporal.io/api v1.43.1/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/sdk v1.12.0/go.mod h1:lSp3lH1lI0TyOsus0arnO3FYvjVXBZGi/G7DjnAnm6o=
go.temporal.io/sdk v1.31.0 h1:CLYiP0R5Sdj0gq8LyYKDDz4ccGOdJPR8wNGJU0JGwj8=
go.temporal.io/sdk v1.31.0/go.mod h1:8U8H7rF9u4Hyb4Ry9yiEls5716DHPNvVITPNkgWUwE8=
Expand Down Expand Up @@ -390,8 +390,8 @@ google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3Iji
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU=
google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A=
google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
Loading

0 comments on commit 5cce3d5

Please sign in to comment.