Skip to content

Commit

Permalink
Accept search attributes for dev server and disable cache by default (t…
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Jun 17, 2024
1 parent f5013ad commit 102da77
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 0 deletions.
2 changes: 2 additions & 0 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,7 @@ type TemporalServerStartDevCommand struct {
SqlitePragma []string
DynamicConfigValue []string
LogConfig bool
SearchAttribute []string
}

func NewTemporalServerStartDevCommand(cctx *CommandContext, parent *TemporalServerCommand) *TemporalServerStartDevCommand {
Expand Down Expand Up @@ -1278,6 +1279,7 @@ func NewTemporalServerStartDevCommand(cctx *CommandContext, parent *TemporalServ
s.Command.Flags().StringArrayVar(&s.SqlitePragma, "sqlite-pragma", nil, "Specify SQLite pragma statements in pragma=value format.")
s.Command.Flags().StringArrayVar(&s.DynamicConfigValue, "dynamic-config-value", nil, "Dynamic config value, as KEY=JSON_VALUE (string values need quotes).")
s.Command.Flags().BoolVar(&s.LogConfig, "log-config", false, "Log the server config being used to stderr.")
s.Command.Flags().StringArrayVar(&s.SearchAttribute, "search-attribute", nil, "Search attributes to register, in key=type format.")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
Expand Down
84 changes: 84 additions & 0 deletions temporalcli/commands.server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,20 @@ package temporalcli
import (
"encoding/json"
"fmt"
"strings"

"github.com/google/uuid"
"github.com/temporalio/cli/temporalcli/devserver"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/operatorservice/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

var defaultDynamicConfigValues = map[string]any{
"system.forceSearchAttributesCacheRefreshOnRead": true,
}

func (t *TemporalServerStartDevCommand) run(cctx *CommandContext, args []string) error {
// Have to assume "localhost" is 127.0.0.1 for server to work (it expects IP)
if t.Ip == "localhost" {
Expand Down Expand Up @@ -87,6 +96,22 @@ func (t *TemporalServerStartDevCommand) run(cctx *CommandContext, args []string)
}
}

// Apply set of default dynamic config values if not already present
for k, v := range defaultDynamicConfigValues {
if _, ok := opts.DynamicConfigValues[k]; !ok {
if opts.DynamicConfigValues == nil {
opts.DynamicConfigValues = map[string]any{}
}
opts.DynamicConfigValues[k] = v
}
}

// Prepare search attributes for adding before starting server
searchAttrs, err := t.prepareSearchAttributes()
if err != nil {
return err
}

// If not using DB file, set persistent cluster ID
if t.DbFilename == "" {
opts.ClusterID = persistentClusterID()
Expand Down Expand Up @@ -114,6 +139,11 @@ func (t *TemporalServerStartDevCommand) run(cctx *CommandContext, args []string)
}
defer s.Stop()

// Register search attributes
if err := t.registerSearchAttributes(cctx, searchAttrs, opts.Namespaces); err != nil {
return err
}

friendlyIP := t.Ip
if friendlyIP == "127.0.0.1" {
friendlyIP = "localhost"
Expand Down Expand Up @@ -147,3 +177,57 @@ func persistentClusterID() string {
_ = writeEnvConfigFile(file, map[string]map[string]string{"default": {"cluster-id": id}})
return id
}

func (t *TemporalServerStartDevCommand) prepareSearchAttributes() (map[string]enums.IndexedValueType, error) {
opts, err := stringKeysValues(t.SearchAttribute)
if err != nil {
return nil, fmt.Errorf("invalid search attributes: %w", err)
}
attrs := make(map[string]enums.IndexedValueType, len(opts))
for k, v := range opts {
// Case-insensitive index type lookup
var valType enums.IndexedValueType
for valTypeName, valTypeOrd := range enums.IndexedValueType_shorthandValue {
if strings.EqualFold(v, valTypeName) {
valType = enums.IndexedValueType(valTypeOrd)
break
}
}
if valType == 0 {
return nil, fmt.Errorf("invalid search attribute value type %q", v)
}
attrs[k] = valType
}
return attrs, nil
}

func (t *TemporalServerStartDevCommand) registerSearchAttributes(
cctx *CommandContext,
attrs map[string]enums.IndexedValueType,
namespaces []string,
) error {
if len(attrs) == 0 {
return nil
}

conn, err := grpc.NewClient(
fmt.Sprintf("%v:%v", t.Ip, t.Port),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return fmt.Errorf("failed creating client to register search attributes: %w", err)
}
defer conn.Close()
client := operatorservice.NewOperatorServiceClient(conn)
// Call for each namespace
for _, ns := range namespaces {
_, err := client.AddSearchAttributes(cctx, &operatorservice.AddSearchAttributesRequest{
Namespace: ns,
SearchAttributes: attrs,
})
if err != nil {
return fmt.Errorf("failed registering search attributes: %w", err)
}
}
return nil
}
78 changes: 78 additions & 0 deletions temporalcli/commands.server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/temporalio/cli/temporalcli/devserver"
"go.temporal.io/api/operatorservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporal"
)

// TODO(cretz): To test:
Expand Down Expand Up @@ -122,6 +124,82 @@ func TestServer_StartDev_ConcurrentStarts(t *testing.T) {
wg.Wait()
}

func TestServer_StartDev_WithSearchAttributes(t *testing.T) {
h := NewCommandHarness(t)
defer h.Close()

// Start in background, then wait for client to be able to connect
port := strconv.Itoa(devserver.MustGetFreePort("127.0.0.1"))
resCh := make(chan *CommandResult, 1)
go func() {
resCh <- h.Execute(
"server", "start-dev",
"-p", port,
"--headless",
"--search-attribute", "search-attr-1=Int",
"--search-attribute", "search-attr-2=kEyWoRdLiSt",
)
}()

// Try to connect for a bit while checking for error
var cl client.Client
h.EventuallyWithT(func(t *assert.CollectT) {
select {
case res := <-resCh:
if res.Err != nil {
panic(res.Err)
}
default:
}
var err error
cl, err = client.Dial(client.Options{HostPort: "127.0.0.1:" + port})
if !assert.NoError(t, err) {
return
}
// Confirm search attributes are present
resp, err := cl.OperatorService().ListSearchAttributes(
context.Background(), &operatorservice.ListSearchAttributesRequest{Namespace: "default"})
if !assert.NoError(t, err) {
return
}
assert.Contains(t, resp.CustomAttributes, "search-attr-1")
assert.Contains(t, resp.CustomAttributes, "search-attr-2")

}, 3*time.Second, 200*time.Millisecond)
defer cl.Close()

// Do a workflow start with the search attributes
run, err := cl.ExecuteWorkflow(
context.Background(),
client.StartWorkflowOptions{
TaskQueue: "my-task-queue",
TypedSearchAttributes: temporal.NewSearchAttributes(
temporal.NewSearchAttributeKeyInt64("search-attr-1").ValueSet(123),
temporal.NewSearchAttributeKeyKeywordList("search-attr-2").ValueSet([]string{"foo", "bar"}),
),
},
"MyWorkflow",
)
h.NoError(err)
h.NotEmpty(run.GetRunID())

// Check that they are there
desc, err := cl.DescribeWorkflowExecution(context.Background(), run.GetID(), "")
h.NoError(err)
sa := desc.WorkflowExecutionInfo.SearchAttributes.IndexedFields
h.JSONEq("123", string(sa["search-attr-1"].Data))
h.JSONEq(`["foo","bar"]`, string(sa["search-attr-2"].Data))

// Send an interrupt by cancelling context
h.CancelContext()
select {
case <-time.After(20 * time.Second):
h.Fail("didn't cleanup after 20 seconds")
case res := <-resCh:
h.NoError(res.Err)
}
}

type testLogger struct {
t *testing.T
}
Expand Down
1 change: 1 addition & 0 deletions temporalcli/commandsmd/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ To persist Workflows across runs, use:
* `--sqlite-pragma` (string[]) - Specify SQLite pragma statements in pragma=value format.
* `--dynamic-config-value` (string[]) - Dynamic config value, as KEY=JSON_VALUE (string values need quotes).
* `--log-config` (bool) - Log the server config being used to stderr.
* `--search-attribute` (string[]) - Search attributes to register, in key=type format.

### temporal task-queue: Manage Task Queues.

Expand Down

0 comments on commit 102da77

Please sign in to comment.