Skip to content

Commit

Permalink
feat: workflow
Browse files Browse the repository at this point in the history
Change-Id: I7b5fc7df7b03a414c7c876db01c4112e5220eb54
  • Loading branch information
shentongmartin committed Jan 7, 2025
1 parent 10cdbd9 commit e45266e
Show file tree
Hide file tree
Showing 13 changed files with 1,386 additions and 12 deletions.
228 changes: 228 additions & 0 deletions compose/field_mapping.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package compose

import (
"errors"
"fmt"
"reflect"

"github.com/cloudwego/eino/schema"
"github.com/cloudwego/eino/utils/generic"
)

func takeOne(input any, from string) (any, error) {
if len(from) == 0 {
return input, nil
}

inputValue := reflect.ValueOf(input)

f, err := checkAndExtractFromField(from, inputValue)
if err != nil {
return nil, err
}

return f.Interface(), nil
}

func assignOne[T any](dest T, taken any, to string) (T, error) {
destValue := reflect.ValueOf(dest)

if !destValue.CanAddr() {
destValue = reflect.ValueOf(&dest).Elem()
}

if len(to) == 0 { // assign to output directly
toSet := reflect.ValueOf(taken)
if !toSet.Type().AssignableTo(destValue.Type()) {
return dest, fmt.Errorf("mapping entire value has a mismatched type. from=%v, to=%v", toSet.Type(), destValue.Type())
}

destValue.Set(toSet)

return destValue.Interface().(T), nil
}

toSet := reflect.ValueOf(taken)

field, err := checkAndExtractToField(to, destValue, toSet)
if err != nil {
return dest, err
}

field.Set(toSet)
return destValue.Interface().(T), nil

}

func convertTo[T any](mappings map[string]any) (T, error) {
t := generic.NewInstance[T]()
if len(mappings) == 0 {
return t, errors.New("mapper has no Mappings")
}

var err error
for fieldName, taken := range mappings {
t, err = assignOne(t, taken, fieldName)
if err != nil {
return t, err
}
}

return t, nil
}

type fieldMapFn func(any) (map[string]any, error)
type streamFieldMapFn func(streamReader) streamReader

func mappingAssign[T any](in map[string]any) (any, error) {
return convertTo[T](in)
}

func mappingStreamAssign[T any](in streamReader) streamReader {
return packStreamReader(schema.StreamReaderWithConvert(in.toAnyStreamReader(), func(v any) (T, error) {
var t T
mappings, ok := v.(map[string]any)
if !ok {
return t, fmt.Errorf("stream mapping expects trunk of type map[Mapping]any, but got %T", v)
}

return convertTo[T](mappings)
}))
}

func fieldMap(mappings []*Mapping) fieldMapFn {
return func(input any) (map[string]any, error) {
result := make(map[string]any, len(mappings))
for _, mapping := range mappings {
taken, err := takeOne(input, mapping.from)
if err != nil {
return nil, err
}

if _, ok := result[mapping.to]; ok {
return nil, fmt.Errorf("mapping has duplicate to. field=%v", mapping.to)
}

result[mapping.to] = taken
}

return result, nil
}
}

func streamFieldMap(mappings []*Mapping) streamFieldMapFn {
return func(input streamReader) streamReader {
return packStreamReader(schema.StreamReaderWithConvert(input.toAnyStreamReader(), fieldMap(mappings)))
}
}

var anyType = reflect.TypeOf((*any)(nil)).Elem()

func checkAndExtractFromField(fromField string, input reflect.Value) (reflect.Value, error) {
if input.Kind() == reflect.Ptr {
input = input.Elem()
}

if input.Kind() != reflect.Struct {
return reflect.Value{}, fmt.Errorf("mapping has from but input is not struct or struct ptr, type= %v", input.Type())
}

f := input.FieldByName(fromField)
if !f.IsValid() {
return reflect.Value{}, fmt.Errorf("mapping has from not found. field=%v, inputType=%v", fromField, input.Type())
}

if !f.CanInterface() {
return reflect.Value{}, fmt.Errorf("mapping has from not exported. field= %v, inputType=%v", fromField, input.Type())
}

return f, nil
}

func checkAndExtractToField(toField string, output, toSet reflect.Value) (reflect.Value, error) {
if output.Kind() == reflect.Ptr {
output = output.Elem()
}

if output.Kind() != reflect.Struct {
return reflect.Value{}, fmt.Errorf("mapping has to but output is not a struct, type=%v", output.Type())
}

field := output.FieldByName(toField)
if !field.IsValid() {
return reflect.Value{}, fmt.Errorf("mapping has to not found. field=%v, outputType=%v", toField, output.Type())
}

if !field.CanSet() {
return reflect.Value{}, fmt.Errorf("mapping has to not exported. field=%v, outputType=%v", toField, output.Type())
}

if !toSet.Type().AssignableTo(field.Type()) {
return reflect.Value{}, fmt.Errorf("mapping to has a mismatched type. field=%s, from=%v, to=%v", toField, toSet.Type(), field.Type())
}

return field, nil
}

func checkAndExtractFieldType(field string, typ reflect.Type) (reflect.Type, error) {
if typ.Kind() == reflect.Ptr {
typ = typ.Elem()
}

if typ.Kind() != reflect.Struct {
return nil, fmt.Errorf("type[%v] is not a struct", typ)
}

f, ok := typ.FieldByName(field)
if !ok {
return nil, fmt.Errorf("type[%v] has no field[%s]", typ, field)
}

if !f.IsExported() {
return nil, fmt.Errorf("type[%v] has an unexported field[%s]", typ.String(), field)
}

return f.Type, nil
}

func checkMappingGroup(mappings []*Mapping) error {
if len(mappings) <= 1 {
return nil
}

var toMap = make(map[string]bool, len(mappings))

for _, mapping := range mappings {
if mapping.empty() {
return errors.New("multiple mappings have an empty mapping")
}

if len(mapping.to) == 0 {
return fmt.Errorf("multiple mappings have a mapping to entire output, mapping= %s", mapping)
}

if _, ok := toMap[mapping.to]; ok {
return fmt.Errorf("multiple mappings have the same To = %s, mappings=%v", mapping.to, mappings)
}

toMap[mapping.to] = true
}

return nil
}
Loading

0 comments on commit e45266e

Please sign in to comment.