Skip to content

Commit

Permalink
Print encoding of workflow results (temporalio#579)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource authored May 29, 2024
1 parent 69af57c commit 231b456
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 23 deletions.
12 changes: 6 additions & 6 deletions temporalcli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func (c *ClientOptions) dialClient(cctx *CommandContext) (client.Client, error)
// We do not put codec on data converter here, it is applied via
// interceptor. Same for failure conversion.
// XXX: If this is altered to be more dynamic, have to also update
// everywhere dataConverter is used.
DataConverter: dataConverter,
// everywhere DataConverterWithRawValue is used.
DataConverter: DataConverterWithRawValue,
}

// API key
Expand Down Expand Up @@ -185,7 +185,7 @@ func (s stringMapHeadersProvider) GetHeaders(context.Context) (map[string]string
return s, nil
}

var dataConverter = converter.NewCompositeDataConverter(
var DataConverterWithRawValue = converter.NewCompositeDataConverter(
rawValuePayloadConverter{},
converter.NewNilPayloadConverter(),
converter.NewByteSlicePayloadConverter(),
Expand All @@ -194,14 +194,14 @@ var dataConverter = converter.NewCompositeDataConverter(
converter.NewJSONPayloadConverter(),
)

type rawValue struct{ payload *common.Payload }
type RawValue struct{ Payload *common.Payload }

type rawValuePayloadConverter struct{}

func (rawValuePayloadConverter) ToPayload(value any) (*common.Payload, error) {
// Only convert if value is a raw value
if r, ok := value.(rawValue); ok {
return r.payload, nil
if r, ok := value.(RawValue); ok {
return r.Payload, nil
}
return nil, nil
}
Expand Down
23 changes: 16 additions & 7 deletions temporalcli/commands.workflow_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,11 @@ func printTextResult(
}
cctx.Printer.Println(color.MagentaString("Results:"))
result := struct {
RunTime string `cli:",cardOmitEmpty"`
Status string
Result json.RawMessage `cli:",cardOmitEmpty"`
Failure string `cli:",cardOmitEmpty"`
RunTime string `cli:",cardOmitEmpty"`
Status string
Result json.RawMessage `cli:",cardOmitEmpty"`
ResultEncoding string `cli:",cardOmitEmpty"`
Failure string `cli:",cardOmitEmpty"`
}{}
if duration > 0 {
result.RunTime = duration.Truncate(10 * time.Millisecond).String()
Expand All @@ -183,11 +184,19 @@ func printTextResult(
case enums.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED:
result.Status = color.GreenString("COMPLETED")
var err error
result.Result, err = cctx.MarshalFriendlyJSONPayloads(
closeEvent.GetWorkflowExecutionCompletedEventAttributes().GetResult())
resultPayloads := closeEvent.GetWorkflowExecutionCompletedEventAttributes().GetResult()
result.Result, err = cctx.MarshalFriendlyJSONPayloads(resultPayloads)
if err != nil {
return fmt.Errorf("failed marshaling result: %w", err)
}
if resultPayloads != nil && len(resultPayloads.Payloads) > 0 {
metadata := resultPayloads.Payloads[0].GetMetadata()
if metadata != nil {
if enc, ok := metadata["encoding"]; ok {
result.ResultEncoding = string(enc)
}
}
}
case enums.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED:
result.Status = color.RedString("FAILED")
result.Failure = cctx.MarshalFriendlyFailureBodyText(
Expand Down Expand Up @@ -286,7 +295,7 @@ func (p *PayloadInputOptions) buildRawInput() ([]any, error) {
// Convert to raw values that our special data converter understands
ret := make([]any, len(payloads.Payloads))
for i, payload := range payloads.Payloads {
ret[i] = rawValue{payload}
ret[i] = RawValue{payload}
}
return ret, nil
}
Expand Down
67 changes: 57 additions & 10 deletions temporalcli/commands.workflow_view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"go.temporal.io/api/common/v1"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -107,6 +108,51 @@ func (s *SharedServerSuite) TestWorkflow_Describe_Completed() {
s.Equal(map[string]any{"foo": "bar"}, jsonOut["result"])
}

func (s *SharedServerSuite) TestWorkflow_Describe_NotDecodable() {
s.Worker().OnDevWorkflow(func(ctx workflow.Context, input any) (any, error) {
return temporalcli.RawValue{
Payload: &common.Payload{
Metadata: map[string][]byte{"encoding": []byte("some-encoding")},
Data: []byte("some-data"),
},
}, nil
})
// Start the workflow and wait until it has at least reached activity failure
run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{TaskQueue: s.Worker().Options.TaskQueue},
DevWorkflow,
nil, // input is irrelevant
)
s.NoError(err)
s.NoError(run.Get(s.Context, nil))

// Text
res := s.Execute(
"workflow", "describe",
"--address", s.Address(),
"-w", run.GetID(),
)
s.NoError(res.Err)
out := res.Stdout.String()
s.ContainsOnSameLine(out, "Status", "COMPLETED")
s.ContainsOnSameLine(out, "ResultEncoding", "some-encoding")

// TODO: Enable once updated to api-go >= 1.33
// JSON
//res = s.Execute(
// "workflow", "describe",
// "-o", "json",
// "--address", s.Address(),
// "-w", run.GetID(),
//)
//s.NoError(res.Err)
//var jsonOut map[string]any
//s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut))
//s.NotNil(jsonOut["closeEvent"])
//s.Equal("some-encoding", jsonOut["resultEncoding"])
}

func (s *SharedServerSuite) TestWorkflow_Describe_ResetPoints() {
// Start the workflow and wait until it has at least reached activity failure
run, err := s.Client.ExecuteWorkflow(
Expand Down Expand Up @@ -173,7 +219,7 @@ func (s *SharedServerSuite) testWorkflowShowFollow(eventDetails bool) {
)
s.NoError(err)

doneFollowingCh := make(chan struct{})
outputCh := make(chan *CommandResult)
// Follow the workflow
go func() {
args := []string{"workflow", "show",
Expand All @@ -184,21 +230,22 @@ func (s *SharedServerSuite) testWorkflowShowFollow(eventDetails bool) {
args = append(args, "--event-details")
}
res := s.Execute(args...)
s.NoError(res.Err)
out := res.Stdout.String()
if eventDetails {
s.Contains(out, "my-signal")
}
s.Contains(out, "Result \"hi!\"")
close(doneFollowingCh)
outputCh <- res
close(outputCh)
}()

// Send signals to complete
s.NoError(s.Client.SignalWorkflow(s.Context, run.GetID(), "", "my-signal", nil))
s.NoError(s.Client.SignalWorkflow(s.Context, run.GetID(), "", "my-signal", nil))

// Ensure following completes
<-doneFollowingCh
res := <-outputCh
s.NoError(res.Err)
output := res.Stdout.String()
if eventDetails {
s.Contains(output, "my-signal")
}
s.ContainsOnSameLine(output, "Result", `"hi!"`)
s.NoError(run.Get(s.Context, nil))
}

Expand Down Expand Up @@ -251,7 +298,7 @@ func (s *SharedServerSuite) testWorkflowShowNoFollow(eventDetails bool) {
if eventDetails {
s.Contains(out, "my-signal")
}
s.Contains(out, "Result \"hi!\"")
s.ContainsOnSameLine(out, "Result", `"hi!"`)
}

func (s *SharedServerSuite) TestWorkflow_Show_JSON() {
Expand Down
1 change: 1 addition & 0 deletions temporalcli/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ func StartDevServer(t *testing.T, options DevServerOptions) *DevServer {
if d.Options.ClientOptions.Identity == "" {
d.Options.ClientOptions.Identity = "cli-test-client"
}
d.Options.ClientOptions.DataConverter = temporalcli.DataConverterWithRawValue
if d.Options.DynamicConfigValues == nil {
d.Options.DynamicConfigValues = map[string]any{}
}
Expand Down

0 comments on commit 231b456

Please sign in to comment.