From 6296427c09e9136b9ab0bac66173bc485a2f52ce Mon Sep 17 00:00:00 2001 From: "luohuaqing.2018" Date: Thu, 19 Dec 2024 14:00:05 +0800 Subject: [PATCH] refactor: callbacks -- move some code into internal/callbacks -- rm deprecated code -- refactor duplicated code Change-Id: I443b3074426bf406c4ad68ad027f1f9e143b288f --- _typos.toml | 1 + callbacks/aspect_inject.go | 123 ++--------------- callbacks/aspect_inject_test.go | 4 +- callbacks/handler_builder.go | 145 ++++++-------------- callbacks/interface.go | 43 ++---- callbacks/internal/manager.go | 21 --- callbacks/manager.go | 222 ------------------------------- callbacks/manager_test.go | 142 -------------------- compose/utils.go | 151 +++++++++++---------- internal/callbacks/inject.go | 145 ++++++++++++++++++++ internal/callbacks/interface.go | 52 ++++++++ internal/callbacks/manager.go | 69 ++++++++++ utils/callbacks/template_test.go | 14 +- utils/generic/generic.go | 7 + 14 files changed, 416 insertions(+), 723 deletions(-) delete mode 100644 callbacks/internal/manager.go delete mode 100644 callbacks/manager.go delete mode 100644 callbacks/manager_test.go create mode 100644 internal/callbacks/inject.go create mode 100644 internal/callbacks/interface.go create mode 100644 internal/callbacks/manager.go diff --git a/_typos.toml b/_typos.toml index da8f582..699cc05 100644 --- a/_typos.toml +++ b/_typos.toml @@ -10,6 +10,7 @@ outter = "outter" Opion = "Opion" TOpion = "TOpion" renderring = "renderring" +inouts = "inouts" [files] diff --git a/callbacks/aspect_inject.go b/callbacks/aspect_inject.go index cfc2c92..64f01b3 100644 --- a/callbacks/aspect_inject.go +++ b/callbacks/aspect_inject.go @@ -19,6 +19,7 @@ package callbacks import ( "context" + "github.com/cloudwego/eino/internal/callbacks" "github.com/cloudwego/eino/schema" ) @@ -48,21 +49,11 @@ import ( // return resp, nil // } // + // OnStart invokes the OnStart logic for the particular context, ensuring that all registered // handlers are executed in reverse order (compared to add order) when a process begins. -func OnStart(ctx context.Context, input CallbackInput) context.Context { - mgr, ok := managerFromCtx(ctx) - if !ok { - return ctx - } - - for i := len(mgr.handlers) - 1; i >= 0; i-- { - handler := mgr.handlers[i] - timingChecker, ok := handler.(TimingChecker) - if !ok || timingChecker.Needed(ctx, mgr.runInfo, TimingOnStart) { - ctx = handler.OnStart(ctx, mgr.runInfo, input) - } - } +func OnStart[T any](ctx context.Context, input T) context.Context { + ctx, _ = callbacks.On(ctx, input, callbacks.OnStartHandle[T], TimingOnStart) return ctx } @@ -70,19 +61,8 @@ func OnStart(ctx context.Context, input CallbackInput) context.Context { // OnEnd invokes the OnEnd logic of the particular context, allowing for proper cleanup // and finalization when a process ends. // handlers are executed in normal order (compared to add order). -func OnEnd(ctx context.Context, output CallbackOutput) context.Context { - mgr, ok := managerFromCtx(ctx) - if !ok { - return ctx - } - - for i := 0; i < len(mgr.handlers); i++ { - handler := mgr.handlers[i] - timingChecker, ok := handler.(TimingChecker) - if !ok || timingChecker.Needed(ctx, mgr.runInfo, TimingOnEnd) { - ctx = handler.OnEnd(ctx, mgr.runInfo, output) - } - } +func OnEnd[T any](ctx context.Context, output T) context.Context { + ctx, _ = callbacks.On(ctx, output, callbacks.OnEndHandle[T], TimingOnEnd) return ctx } @@ -93,37 +73,7 @@ func OnEnd(ctx context.Context, output CallbackOutput) context.Context { func OnStartWithStreamInput[T any](ctx context.Context, input *schema.StreamReader[T]) ( nextCtx context.Context, newStreamReader *schema.StreamReader[T]) { - mgr, ok := managerFromCtx(ctx) - if !ok { - return ctx, input - } - - if len(mgr.handlers) == 0 { - return ctx, input - } - - var neededHandlers []Handler - for i := range mgr.handlers { - h := mgr.handlers[i] - timingChecker, ok := h.(TimingChecker) - if !ok || timingChecker.Needed(ctx, mgr.runInfo, TimingOnStartWithStreamInput) { - neededHandlers = append(neededHandlers, h) - } - } - - if len(neededHandlers) == 0 { - return ctx, input - } - - cp := input.Copy(len(neededHandlers) + 1) - for i := len(neededHandlers) - 1; i >= 0; i-- { - h := neededHandlers[i] - ctx = h.OnStartWithStreamInput(ctx, mgr.runInfo, schema.StreamReaderWithConvert(cp[i], func(src T) (CallbackInput, error) { - return src, nil - })) - } - - return ctx, cp[len(cp)-1] + return callbacks.On(ctx, input, callbacks.OnStartWithStreamInputHandle[T], TimingOnStartWithStreamInput) } // OnEndWithStreamOutput invokes the OnEndWithStreamOutput logic of the particular, ensuring that @@ -132,75 +82,24 @@ func OnStartWithStreamInput[T any](ctx context.Context, input *schema.StreamRead func OnEndWithStreamOutput[T any](ctx context.Context, output *schema.StreamReader[T]) ( nextCtx context.Context, newStreamReader *schema.StreamReader[T]) { - mgr, ok := managerFromCtx(ctx) - if !ok { - return ctx, output - } - - if len(mgr.handlers) == 0 { - return ctx, output - } - - var neededHandlers []Handler - for i := range mgr.handlers { - h := mgr.handlers[i] - timingChecker, ok := h.(TimingChecker) - if !ok || timingChecker.Needed(ctx, mgr.runInfo, TimingOnEndWithStreamOutput) { - neededHandlers = append(neededHandlers, h) - } - } - - if len(neededHandlers) == 0 { - return ctx, output - } - - cp := output.Copy(len(neededHandlers) + 1) - for i := 0; i < len(neededHandlers); i++ { - h := neededHandlers[i] - ctx = h.OnEndWithStreamOutput(ctx, mgr.runInfo, schema.StreamReaderWithConvert(cp[i], func(src T) (CallbackOutput, error) { - return src, nil - })) - } - - return ctx, cp[len(cp)-1] + return callbacks.On(ctx, output, callbacks.OnEndWithStreamOutputHandle[T], TimingOnEndWithStreamOutput) } // OnError invokes the OnError logic of the particular, notice that error in stream will not represent here. // handlers are executed in normal order (compared to add order). func OnError(ctx context.Context, err error) context.Context { - mgr, ok := managerFromCtx(ctx) - if !ok { - return ctx - } - - for i := 0; i < len(mgr.handlers); i++ { - handler := mgr.handlers[i] - timingChecker, ok := handler.(TimingChecker) - if !ok || timingChecker.Needed(ctx, mgr.runInfo, TimingOnError) { - ctx = handler.OnError(ctx, mgr.runInfo, err) - } - } + ctx, _ = callbacks.On(ctx, err, callbacks.OnErrorHandle, TimingOnError) return ctx } // SetRunInfo sets the RunInfo to be passed to Handler. func SetRunInfo(ctx context.Context, info *RunInfo) context.Context { - cbm, ok := managerFromCtx(ctx) - if !ok { - return ctx - } - - return ctxWithManager(ctx, cbm.withRunInfo(info)) + return callbacks.SetRunInfo(ctx, info) } // InitCallbacks initializes a new context with the provided RunInfo and handlers. // Any previously set RunInfo and Handlers for this ctx will be overwritten. func InitCallbacks(ctx context.Context, info *RunInfo, handlers ...Handler) context.Context { - mgr, ok := newManager(info, handlers...) - if ok { - return ctxWithManager(ctx, mgr) - } - - return ctxWithManager(ctx, nil) + return callbacks.InitCallbacks(ctx, info, handlers...) } diff --git a/callbacks/aspect_inject_test.go b/callbacks/aspect_inject_test.go index e0f5d80..ee6fb5d 100644 --- a/callbacks/aspect_inject_test.go +++ b/callbacks/aspect_inject_test.go @@ -126,10 +126,8 @@ func TestAspectInject(t *testing.T) { return ctx }).Build() - manager, ok := newManager(nil, hb) - assert.True(t, ok) + ctx = InitCallbacks(ctx, nil, hb) - ctx = ctxWithManager(ctx, manager) ctx = OnStart(ctx, 1) ctx = OnEnd(ctx, 2) ctx = OnError(ctx, fmt.Errorf("3")) diff --git a/callbacks/handler_builder.go b/callbacks/handler_builder.go index 04bd54e..d8266a4 100644 --- a/callbacks/handler_builder.go +++ b/callbacks/handler_builder.go @@ -22,72 +22,7 @@ import ( "github.com/cloudwego/eino/schema" ) -// HandlerBuilder can be used to build a Handler with callback functions. -// e.g. -// -// handler := &HandlerBuilder{ -// OnStartFn: func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context {} // self defined start callback function -// } -// -// graph := compose.NewGraph[inputType, outputType]() -// runnable, err := graph.Compile() -// if err != nil {...} -// runnable.Invoke(ctx, params, compose.WithCallback(handler)) // => only implement functions which you want to override -// -// Deprecated: In most situations, it is preferred to use callbacks.NewHandlerHelper. Otherwise, use NewHandlerBuilder().OnStartFn()...Build(). type HandlerBuilder struct { - OnStartFn func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context - OnEndFn func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context - OnErrorFn func(ctx context.Context, info *RunInfo, err error) context.Context - OnStartWithStreamInputFn func(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context - OnEndWithStreamOutputFn func(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context -} - -func (h *HandlerBuilder) OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context { - if h.OnStartFn != nil { - return h.OnStartFn(ctx, info, input) - } - - return ctx -} - -func (h *HandlerBuilder) OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context { - if h.OnEndFn != nil { - return h.OnEndFn(ctx, info, output) - } - - return ctx -} - -func (h *HandlerBuilder) OnError(ctx context.Context, info *RunInfo, err error) context.Context { - if h.OnErrorFn != nil { - return h.OnErrorFn(ctx, info, err) - } - - return ctx -} - -func (h *HandlerBuilder) OnStartWithStreamInput(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context { - if h.OnStartWithStreamInputFn != nil { - return h.OnStartWithStreamInputFn(ctx, info, input) - } - - input.Close() - - return ctx -} - -func (h *HandlerBuilder) OnEndWithStreamOutput(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context { - if h.OnEndWithStreamOutputFn != nil { - return h.OnEndWithStreamOutputFn(ctx, info, output) - } - - output.Close() - - return ctx -} - -type handlerBuilder struct { onStartFn func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context onEndFn func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context onErrorFn func(ctx context.Context, info *RunInfo, err error) context.Context @@ -95,51 +30,35 @@ type handlerBuilder struct { onEndWithStreamOutputFn func(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context } -func (hb *handlerBuilder) OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context { - if hb.onStartFn != nil { - return hb.onStartFn(ctx, info, input) - } - - return ctx +type handlerImpl struct { + HandlerBuilder } -func (hb *handlerBuilder) OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context { - if hb.onEndFn != nil { - return hb.onEndFn(ctx, info, output) - } - - return ctx +func (hb *handlerImpl) OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context { + return hb.onStartFn(ctx, info, input) } -func (hb *handlerBuilder) OnError(ctx context.Context, info *RunInfo, err error) context.Context { - if hb.onErrorFn != nil { - return hb.onErrorFn(ctx, info, err) - } - - return ctx +func (hb *handlerImpl) OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context { + return hb.onEndFn(ctx, info, output) } -func (hb *handlerBuilder) OnStartWithStreamInput(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context { - if hb.onStartWithStreamInputFn != nil { - return hb.onStartWithStreamInputFn(ctx, info, input) - } +func (hb *handlerImpl) OnError(ctx context.Context, info *RunInfo, err error) context.Context { + return hb.onErrorFn(ctx, info, err) +} - input.Close() +func (hb *handlerImpl) OnStartWithStreamInput(ctx context.Context, info *RunInfo, + input *schema.StreamReader[CallbackInput]) context.Context { - return ctx + return hb.onStartWithStreamInputFn(ctx, info, input) } -func (hb *handlerBuilder) OnEndWithStreamOutput(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context { - if hb.onEndWithStreamOutputFn != nil { - return hb.onEndWithStreamOutputFn(ctx, info, output) - } +func (hb *handlerImpl) OnEndWithStreamOutput(ctx context.Context, info *RunInfo, + output *schema.StreamReader[CallbackOutput]) context.Context { - output.Close() - - return ctx + return hb.onEndWithStreamOutputFn(ctx, info, output) } -func (hb *handlerBuilder) Needed(_ context.Context, _ *RunInfo, timing CallbackTiming) bool { +func (hb *handlerImpl) Needed(_ context.Context, _ *RunInfo, timing CallbackTiming) bool { switch timing { case TimingOnStart: return hb.onStartFn != nil @@ -156,36 +75,50 @@ func (hb *handlerBuilder) Needed(_ context.Context, _ *RunInfo, timing CallbackT } } -func NewHandlerBuilder() *handlerBuilder { - return &handlerBuilder{} +// NewHandlerBuilder creates and returns a new HandlerBuilder instance. +// HandlerBuilder is used to construct a Handler with custom callback functions +func NewHandlerBuilder() *HandlerBuilder { + return &HandlerBuilder{} } -func (hb *handlerBuilder) OnStartFn(fn func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context) *handlerBuilder { +func (hb *HandlerBuilder) OnStartFn( + fn func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context) *HandlerBuilder { + hb.onStartFn = fn return hb } -func (hb *handlerBuilder) OnEndFn(fn func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context) *handlerBuilder { +func (hb *HandlerBuilder) OnEndFn( + fn func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context) *HandlerBuilder { + hb.onEndFn = fn return hb } -func (hb *handlerBuilder) OnErrorFn(fn func(ctx context.Context, info *RunInfo, err error) context.Context) *handlerBuilder { +func (hb *HandlerBuilder) OnErrorFn( + fn func(ctx context.Context, info *RunInfo, err error) context.Context) *HandlerBuilder { + hb.onErrorFn = fn return hb } -func (hb *handlerBuilder) OnStartWithStreamInputFn(fn func(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context) *handlerBuilder { +// OnStartWithStreamInputFn sets the callback function to be called. +func (hb *HandlerBuilder) OnStartWithStreamInputFn( + fn func(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context) *HandlerBuilder { + hb.onStartWithStreamInputFn = fn return hb } -func (hb *handlerBuilder) OnEndWithStreamOutputFn(fn func(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context) *handlerBuilder { +// OnEndWithStreamOutputFn sets the callback function to be called. +func (hb *HandlerBuilder) OnEndWithStreamOutputFn( + fn func(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context) *HandlerBuilder { + hb.onEndWithStreamOutputFn = fn return hb } // Build returns a Handler with the functions set in the builder. -func (hb *handlerBuilder) Build() Handler { - return hb +func (hb *HandlerBuilder) Build() Handler { + return &handlerImpl{*hb} } diff --git a/callbacks/interface.go b/callbacks/interface.go index 5ace52d..1cd1e0d 100644 --- a/callbacks/interface.go +++ b/callbacks/interface.go @@ -17,18 +17,11 @@ package callbacks import ( - "context" - - "github.com/cloudwego/eino/components" - "github.com/cloudwego/eino/schema" + "github.com/cloudwego/eino/internal/callbacks" ) -// RunInfo is the info of run node. -type RunInfo struct { - Name string - Type string - Component components.Component -} +// RunInfo contains information about the running component. +type RunInfo = callbacks.RunInfo // CallbackInput is the input of the callback. // the type of input is defined by the component. @@ -50,37 +43,21 @@ type RunInfo struct { // // is not a model callback input, just ignore it // return // } -type CallbackInput any - -type CallbackOutput any +type CallbackInput = callbacks.CallbackInput -type Handler interface { - OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context - OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context +type CallbackOutput = callbacks.CallbackOutput - OnError(ctx context.Context, info *RunInfo, err error) context.Context - - OnStartWithStreamInput(ctx context.Context, info *RunInfo, - input *schema.StreamReader[CallbackInput]) context.Context - OnEndWithStreamOutput(ctx context.Context, info *RunInfo, - output *schema.StreamReader[CallbackOutput]) context.Context -} - -var globalHandlers []Handler +type Handler = callbacks.Handler // InitCallbackHandlers sets the global callback handlers. // It should be called BEFORE any callback handler by user. // It's useful when you want to inject some basic callbacks to all nodes. func InitCallbackHandlers(handlers []Handler) { - globalHandlers = handlers -} - -func GetGlobalHandlers() []Handler { - return globalHandlers + callbacks.GlobalHandlers = handlers } // CallbackTiming enumerates all the timing of callback aspects. -type CallbackTiming uint8 +type CallbackTiming = callbacks.CallbackTiming const ( TimingOnStart CallbackTiming = iota @@ -95,6 +72,4 @@ const ( // If a callback handler is created by using callbacks.HandlerHelper or handlerBuilder, then this interface is automatically implemented. // Eino's callback mechanism will try to use this interface to determine whether any handlers are needed for the given timing. // Also, the callback handler that is not needed for that timing will be skipped. -type TimingChecker interface { - Needed(ctx context.Context, info *RunInfo, timing CallbackTiming) bool -} +type TimingChecker = callbacks.TimingChecker diff --git a/callbacks/internal/manager.go b/callbacks/internal/manager.go deleted file mode 100644 index c6302b2..0000000 --- a/callbacks/internal/manager.go +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright 2024 CloudWeGo Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package internal - -type CtxManagerKey struct{} - -// TODO: move callback manager to internal diff --git a/callbacks/manager.go b/callbacks/manager.go deleted file mode 100644 index be42cb1..0000000 --- a/callbacks/manager.go +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Copyright 2024 CloudWeGo Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package callbacks - -import ( - "context" - - "github.com/cloudwego/eino/callbacks/internal" - "github.com/cloudwego/eino/schema" -) - -// Manager is a callback manager of one running node. -// Deprecated: Manager will become the inner conception, use methods in aspect_inject.go instead -type Manager struct { - *manager -} - -type manager struct { - handlers []Handler - runInfo *RunInfo -} - -// NewManager creates a callback manager. -// It will return a nil manager if no callback handler is provided, please check the return value first before using. -// Deprecated: Manager will become the inner conception, use methods in aspect_inject.go instead -func NewManager(runInfo *RunInfo, handlers ...Handler) (*Manager, bool) { - m, ok := newManager(runInfo, handlers...) - if !ok { - return nil, false - } - - return &Manager{ - manager: m, - }, true -} - -func newManager(runInfo *RunInfo, handlers ...Handler) (*manager, bool) { - l := len(handlers) + len(globalHandlers) - if l == 0 { - return nil, false - } - hs := make([]Handler, 0, l) - hs = append(hs, globalHandlers...) - hs = append(hs, handlers...) - - return &manager{ - handlers: hs, - runInfo: runInfo, - }, true -} - -func (m *manager) Handlers() []Handler { - return m.handlers -} - -// Deprecated: Manager will become the inner conception, use methods in aspect_inject.go instead -func (mm *Manager) WithRunInfo(runInfo *RunInfo) *Manager { - if mm == nil { - return nil - } - - m := mm.manager.withRunInfo(runInfo) - - return &Manager{ - manager: m, - } -} - -func (m *manager) withRunInfo(runInfo *RunInfo) *manager { - if m == nil { - return nil - } - - return &manager{ - handlers: m.handlers, - runInfo: runInfo, - } -} - -func (m *manager) appendHandlers(handlers ...Handler) *manager { - if m == nil { - return nil - } - - return &manager{ - handlers: append(m.handlers, handlers...), - runInfo: m.runInfo, - } -} - -// Deprecated: Manager will become the inner conception, use methods in aspect_inject.go instead -func ManagerFromCtx(ctx context.Context) (*Manager, bool) { - internalM, ok := managerFromCtx(ctx) - if ok { - return &Manager{ - manager: internalM, - }, true - } - - return nil, false -} - -func managerFromCtx(ctx context.Context) (*manager, bool) { - v := ctx.Value(internal.CtxManagerKey{}) - m, ok := v.(*manager) - if ok && m != nil { - return &manager{ - handlers: m.handlers, - runInfo: m.runInfo, - }, true - } - - return nil, false -} - -// Deprecated: Manager will become the inner conception, use methods in aspect_inject.go instead -func CtxWithManager(ctx context.Context, manager *Manager) context.Context { - return ctxWithManager(ctx, manager.manager) -} - -func ctxWithManager(ctx context.Context, manager *manager) context.Context { - return context.WithValue(ctx, internal.CtxManagerKey{}, manager) -} - -func (m *manager) OnStart(ctx context.Context, input CallbackInput) context.Context { - if m == nil { - return ctx - } - - for i := len(m.handlers) - 1; i >= 0; i-- { - handler := m.handlers[i] - ctx = handler.OnStart(ctx, m.runInfo, input) - } - - return ctx -} - -func (m *manager) OnEnd(ctx context.Context, output CallbackOutput) context.Context { - if m == nil { - return ctx - } - - for i := 0; i < len(m.handlers); i++ { - handler := m.handlers[i] - ctx = handler.OnEnd(ctx, m.runInfo, output) - } - - return ctx -} - -func (m *manager) OnError(ctx context.Context, err error) context.Context { - if m == nil { - return ctx - } - - for i := 0; i < len(m.handlers); i++ { - handler := m.handlers[i] - ctx = handler.OnError(ctx, m.runInfo, err) - } - - return ctx -} - -func (m *manager) OnStartWithStreamInput( - ctx context.Context, input *schema.StreamReader[CallbackInput]) context.Context { - if m == nil { - if input != nil { - input.Close() - } - return ctx - } - - if len(m.handlers) == 0 { - input.Close() - return ctx - } - - ins := input.Copy(len(m.handlers)) - for i := len(m.handlers) - 1; i >= 0; i-- { - handler := m.handlers[i] - ctx = handler.OnStartWithStreamInput(ctx, m.runInfo, ins[i]) - } - - return ctx -} - -func (m *manager) OnEndWithStreamOutput( - ctx context.Context, output *schema.StreamReader[CallbackOutput]) context.Context { - if m == nil { - if output != nil { - output.Close() - } - return ctx - } - - if len(m.handlers) == 0 { - output.Close() - return ctx - } - - outs := output.Copy(len(m.handlers)) - for i := 0; i < len(m.handlers); i++ { - handler := m.handlers[i] - ctx = handler.OnEndWithStreamOutput(ctx, m.runInfo, outs[i]) - } - - return ctx -} diff --git a/callbacks/manager_test.go b/callbacks/manager_test.go deleted file mode 100644 index 7a47c29..0000000 --- a/callbacks/manager_test.go +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Copyright 2024 CloudWeGo Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package callbacks - -import ( - "context" - "fmt" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/cloudwego/eino/schema" -) - -func TestManager(t *testing.T) { - - t.Run("usable_manager", func(t *testing.T) { - defer func() { - globalHandlers = nil - }() - - var startCnt, endCnt, errCnt int - var globalKey, sessionKey = "global", "session" - - globalHandlers = []Handler{ - NewHandlerBuilder(). - OnStartFn(func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context { - startCnt++ - return context.WithValue(ctx, globalKey, "start") - }). - OnEndFn(func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context { - if ctx.Value(globalKey).(string) == "start" { - endCnt++ - return context.WithValue(ctx, globalKey, "end") - } - return ctx - }). - OnErrorFn(func(ctx context.Context, info *RunInfo, err error) context.Context { - if ctx.Value(globalKey).(string) == "start" { - errCnt++ - return context.WithValue(ctx, globalKey, "error") - } - return ctx - }).Build(), - } - - sessionHandler := NewHandlerBuilder(). - OnStartFn(func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context { - startCnt++ - return context.WithValue(ctx, sessionKey, "start") - }). - OnEndFn(func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context { - if ctx.Value(sessionKey).(string) == "start" { - endCnt++ - return context.WithValue(ctx, sessionKey, "end") - } - return ctx - }). - OnErrorFn(func(ctx context.Context, info *RunInfo, err error) context.Context { - if ctx.Value(sessionKey).(string) == "start" { - errCnt++ - return context.WithValue(ctx, sessionKey, "error") - } - return ctx - }).Build() - - manager, ok := newManager(&RunInfo{}, sessionHandler) - assert.True(t, ok) - c0 := context.Background() - c1 := ctxWithManager(c0, manager) - c2 := ctxWithManager(c1, nil) - - m0, ok := managerFromCtx(c0) - assert.False(t, ok) - assert.Nil(t, m0) - - m1, ok := managerFromCtx(c1) - assert.True(t, ok) - assert.NotNil(t, m1) - m2, ok := managerFromCtx(c2) - assert.False(t, ok) - assert.Nil(t, m2) - - c3 := manager.OnStart(context.Background(), nil) - c4 := manager.OnError(c3, fmt.Errorf("mock err")) - c5 := manager.OnEnd(c3, nil) - assert.Equal(t, startCnt, 2) - assert.Equal(t, endCnt, 2) - assert.Equal(t, errCnt, 2) - assert.Equal(t, c3.Value(globalKey).(string), "start") - assert.Equal(t, c3.Value(sessionKey).(string), "start") - assert.Equal(t, c4.Value(globalKey).(string), "error") - assert.Equal(t, c4.Value(sessionKey).(string), "error") - assert.Equal(t, c5.Value(globalKey).(string), "end") - assert.Equal(t, c5.Value(sessionKey).(string), "end") - }) - - t.Run("empty manager", func(t *testing.T) { - ctx := context.Background() - globalHandlers = nil - mgr, ok := newManager(nil) - assert.False(t, ok) - assert.Nil(t, mgr) - - nCtx := mgr.OnStart(ctx, nil) - assert.IsType(t, ctx, nCtx) - - ctx = mgr.OnEnd(ctx, nil) - assert.IsType(t, ctx, nCtx) - - ctx = mgr.OnError(ctx, fmt.Errorf("mock err")) - assert.IsType(t, ctx, nCtx) - - sri, _ := schema.Pipe[CallbackInput](1) - ctx = mgr.OnStartWithStreamInput(ctx, sri) - assert.IsType(t, ctx, nCtx) - - sro, _ := schema.Pipe[CallbackOutput](1) - ctx = mgr.OnEndWithStreamOutput(ctx, sro) - assert.IsType(t, ctx, nCtx) - - ctx = mgr.OnStartWithStreamInput(ctx, nil) - assert.IsType(t, ctx, nCtx) - - ctx = mgr.OnEndWithStreamOutput(ctx, nil) - assert.IsType(t, ctx, nCtx) - }) -} diff --git a/compose/utils.go b/compose/utils.go index 546458a..5a837fd 100644 --- a/compose/utils.go +++ b/compose/utils.go @@ -22,7 +22,9 @@ import ( "reflect" "github.com/cloudwego/eino/callbacks" + icb "github.com/cloudwego/eino/internal/callbacks" "github.com/cloudwego/eino/schema" + "github.com/cloudwego/eino/utils/generic" ) func mergeMap(vs []any) (any, error) { @@ -87,119 +89,116 @@ func mergeValues(vs []any) (any, error) { return nil, fmt.Errorf("(mergeValues) unsupported type: %v", t0) } -func invokeWithCallbacks[I, O, TOption any](i Invoke[I, O, TOption]) Invoke[I, O, TOption] { - return func(ctx context.Context, input I, opts ...TOption) (output O, err error) { - defer func() { - if err != nil { - _ = callbacks.OnError(ctx, err) - return - } +type on[T any] func(context.Context, T) (context.Context, T) - _ = callbacks.OnEnd(ctx, output) +func onStart[T any](ctx context.Context, input T) (context.Context, T) { + return icb.On(ctx, input, icb.OnStartHandle[T], callbacks.TimingOnStart) +} - }() +func onEnd[T any](ctx context.Context, output T) (context.Context, T) { + return icb.On(ctx, output, icb.OnEndHandle[T], callbacks.TimingOnEnd) +} - ctx = callbacks.OnStart(ctx, input) +func onStartWithStreamInput[T any](ctx context.Context, input *schema.StreamReader[T]) ( + context.Context, *schema.StreamReader[T]) { - return i(ctx, input, opts...) - } + return icb.On(ctx, input, icb.OnStartWithStreamInputHandle[T], callbacks.TimingOnStartWithStreamInput) } -func genericInvokeWithCallbacks(i invoke) invoke { - return func(ctx context.Context, input any, opts ...any) (output any, err error) { - defer func() { - if err != nil { - _ = callbacks.OnError(ctx, err) - return - } +func genericOnStartWithStreamInputHandle(ctx context.Context, input streamReader, + runInfo *icb.RunInfo, handlers []icb.Handler) (context.Context, streamReader) { - _ = callbacks.OnEnd(ctx, output) + generic.Reverse(handlers) - }() + cpy := input.copy - ctx = callbacks.OnStart(ctx, input) + handle := func(handler icb.Handler, in streamReader) context.Context { + in_, ok := unpackStreamReader[icb.CallbackInput](in) + if !ok { + panic("impossible") + } - return i(ctx, input, opts...) + return handler.OnStartWithStreamInput(ctx, runInfo, in_) } -} -func streamWithCallbacks[I, O, TOption any](s Stream[I, O, TOption]) Stream[I, O, TOption] { - return func(ctx context.Context, input I, opts ...TOption) (output *schema.StreamReader[O], err error) { - ctx = callbacks.OnStart(ctx, input) + return icb.OnWithStream(ctx, input, handlers, cpy, handle) +} - output, err = s(ctx, input, opts...) - if err != nil { - _ = callbacks.OnError(ctx, err) - return output, err - } +func genericOnStartWithStreamInput(ctx context.Context, input streamReader) (context.Context, streamReader) { + return icb.On(ctx, input, genericOnStartWithStreamInputHandle, callbacks.TimingOnStartWithStreamInput) +} - _, newS := callbacks.OnEndWithStreamOutput(ctx, output) +func onEndWithStreamOutput[T any](ctx context.Context, output *schema.StreamReader[T]) ( + context.Context, *schema.StreamReader[T]) { - return newS, nil - } + return icb.On(ctx, output, icb.OnEndWithStreamOutputHandle[T], callbacks.TimingOnEndWithStreamOutput) } -func collectWithCallbacks[I, O, TOption any](c Collect[I, O, TOption]) Collect[I, O, TOption] { - return func(ctx context.Context, input *schema.StreamReader[I], opts ...TOption) (output O, err error) { - defer func() { - if err != nil { - _ = callbacks.OnError(ctx, err) - return - } - _ = callbacks.OnEnd(ctx, output) - }() +func genericOnEndWithStreamOutputHandle(ctx context.Context, output streamReader, + runInfo *icb.RunInfo, handlers []icb.Handler) (context.Context, streamReader) { + + cpy := output.copy - ctx, newS := callbacks.OnStartWithStreamInput(ctx, input) + handle := func(handler icb.Handler, out streamReader) context.Context { + out_, ok := unpackStreamReader[icb.CallbackOutput](out) + if !ok { + panic("impossible") + } - return c(ctx, newS, opts...) + return handler.OnEndWithStreamOutput(ctx, runInfo, out_) } + + return icb.OnWithStream(ctx, output, handlers, cpy, handle) } -func transformWithCallbacks[I, O, TOption any](t Transform[I, O, TOption]) Transform[I, O, TOption] { - return func(ctx context.Context, input *schema.StreamReader[I], - opts ...TOption) (output *schema.StreamReader[O], err error) { - ctx, input = callbacks.OnStartWithStreamInput(ctx, input) +func genericOnEndWithStreamOutput(ctx context.Context, output streamReader) (context.Context, streamReader) { + return icb.On(ctx, output, genericOnEndWithStreamOutputHandle, callbacks.TimingOnEndWithStreamOutput) +} - output, err = t(ctx, input, opts...) +func onError(ctx context.Context, err error) (context.Context, error) { + return icb.On(ctx, err, icb.OnErrorHandle, callbacks.TimingOnError) +} + +func runWithCallbacks[I, O, TOption any](r func(context.Context, I, ...TOption) (O, error), + onStart on[I], onEnd on[O], onError on[error]) func(context.Context, I, ...TOption) (O, error) { + + return func(ctx context.Context, input I, opts ...TOption) (output O, err error) { + ctx, input = onStart(ctx, input) + + output, err = r(ctx, input, opts...) if err != nil { - _ = callbacks.OnError(ctx, err) + ctx, err = onError(ctx, err) return output, err } - _, output = callbacks.OnEndWithStreamOutput(ctx, output) + ctx, output = onEnd(ctx, output) return output, nil } } -func genericTransformWithCallbacks(t transform) transform { - return func(ctx context.Context, input streamReader, opts ...any) (output streamReader, err error) { - inArr := input.copy(2) - is, ok := unpackStreamReader[callbacks.CallbackInput](inArr[1]) - if !ok { // unexpected - return t(ctx, inArr[0], opts...) - } +func invokeWithCallbacks[I, O, TOption any](i Invoke[I, O, TOption]) Invoke[I, O, TOption] { + return runWithCallbacks(i, onStart[I], onEnd[O], onError) +} - ctx, is = callbacks.OnStartWithStreamInput(ctx, is) - is.Close() // goroutine free copy buffer release +func genericInvokeWithCallbacks(i invoke) invoke { + return runWithCallbacks(i, onStart[any], onEnd[any], onError) +} - output, err = t(ctx, inArr[0], opts...) - if err != nil { - _ = callbacks.OnError(ctx, err) - return output, err - } +func streamWithCallbacks[I, O, TOption any](s Stream[I, O, TOption]) Stream[I, O, TOption] { + return runWithCallbacks(s, onStart[I], onEndWithStreamOutput[O], onError) +} - outArr := output.copy(2) - os, ok := unpackStreamReader[callbacks.CallbackOutput](outArr[1]) - if !ok { // unexpected - return outArr[0], nil - } +func collectWithCallbacks[I, O, TOption any](c Collect[I, O, TOption]) Collect[I, O, TOption] { + return runWithCallbacks(c, onStartWithStreamInput[I], onEnd[O], onError) +} - _, os = callbacks.OnEndWithStreamOutput(ctx, os) - os.Close() +func transformWithCallbacks[I, O, TOption any](t Transform[I, O, TOption]) Transform[I, O, TOption] { + return runWithCallbacks(t, onStartWithStreamInput[I], onEndWithStreamOutput[O], onError) +} - return outArr[0], nil - } +func genericTransformWithCallbacks(t transform) transform { + return runWithCallbacks(t, genericOnStartWithStreamInput, genericOnEndWithStreamOutput, onError) } func initGraphCallbacks(ctx context.Context, info *nodeInfo, meta *executorMeta, opts ...Option) context.Context { diff --git a/internal/callbacks/inject.go b/internal/callbacks/inject.go new file mode 100644 index 0000000..936c71b --- /dev/null +++ b/internal/callbacks/inject.go @@ -0,0 +1,145 @@ +/* + * Copyright 2024 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package callbacks + +import ( + "context" + + "github.com/cloudwego/eino/schema" + "github.com/cloudwego/eino/utils/generic" +) + +type CtxManagerKey struct{} + +func InitCallbacks(ctx context.Context, info *RunInfo, handlers ...Handler) context.Context { + mgr, ok := newManager(info, handlers...) + if ok { + return ctxWithManager(ctx, mgr) + } + + return ctxWithManager(ctx, nil) +} + +func SetRunInfo(ctx context.Context, info *RunInfo) context.Context { + cbm, ok := managerFromCtx(ctx) + if !ok { + return ctx + } + + return ctxWithManager(ctx, cbm.withRunInfo(info)) +} + +type Handle[T any] func(context.Context, T, *RunInfo, []Handler) (context.Context, T) + +func On[T any](ctx context.Context, inout T, handle Handle[T], timing CallbackTiming) (context.Context, T) { + mgr, ok := managerFromCtx(ctx) + if !ok { + return ctx, inout + } + + hs := make([]Handler, 0, len(mgr.handlers)) + for _, handler := range mgr.handlers { + timingChecker, ok_ := handler.(TimingChecker) + if !ok_ || timingChecker.Needed(ctx, mgr.runInfo, timing) { + hs = append(hs, handler) + } + } + + return handle(ctx, inout, mgr.runInfo, hs) +} + +func OnStartHandle[T any](ctx context.Context, input T, + runInfo *RunInfo, handlers []Handler) (context.Context, T) { + + for i := len(handlers) - 1; i >= 0; i-- { + ctx = handlers[i].OnStart(ctx, runInfo, input) + } + + return ctx, input +} + +func OnEndHandle[T any](ctx context.Context, output T, + runInfo *RunInfo, handlers []Handler) (context.Context, T) { + + for _, handler := range handlers { + ctx = handler.OnEnd(ctx, runInfo, output) + } + + return ctx, output +} + +func OnWithStream[S any]( + ctx context.Context, + inout S, + handlers []Handler, + cpy func(int) []S, + handle func(Handler, S) context.Context) (context.Context, S) { + + if len(handlers) == 0 { + return ctx, inout + } + + inouts := cpy(len(handlers) + 1) + + for i, handler := range handlers { + ctx = handle(handler, inouts[i]) + } + + return ctx, inouts[len(inouts)-1] +} + +func OnStartWithStreamInputHandle[T any](ctx context.Context, input *schema.StreamReader[T], + runInfo *RunInfo, handlers []Handler) (context.Context, *schema.StreamReader[T]) { + + generic.Reverse(handlers) + + cpy := input.Copy + + handle := func(handler Handler, in *schema.StreamReader[T]) context.Context { + in_ := schema.StreamReaderWithConvert(in, func(i T) (CallbackInput, error) { + return i, nil + }) + return handler.OnStartWithStreamInput(ctx, runInfo, in_) + } + + return OnWithStream(ctx, input, handlers, cpy, handle) +} + +func OnEndWithStreamOutputHandle[T any](ctx context.Context, output *schema.StreamReader[T], + runInfo *RunInfo, handlers []Handler) (context.Context, *schema.StreamReader[T]) { + + cpy := output.Copy + + handle := func(handler Handler, out *schema.StreamReader[T]) context.Context { + out_ := schema.StreamReaderWithConvert(out, func(i T) (CallbackOutput, error) { + return i, nil + }) + return handler.OnEndWithStreamOutput(ctx, runInfo, out_) + } + + return OnWithStream(ctx, output, handlers, cpy, handle) +} + +func OnErrorHandle(ctx context.Context, err error, + runInfo *RunInfo, handlers []Handler) (context.Context, error) { + + for _, handler := range handlers { + ctx = handler.OnError(ctx, runInfo, err) + } + + return ctx, err +} diff --git a/internal/callbacks/interface.go b/internal/callbacks/interface.go new file mode 100644 index 0000000..91b747b --- /dev/null +++ b/internal/callbacks/interface.go @@ -0,0 +1,52 @@ +/* + * Copyright 2024 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package callbacks + +import ( + "context" + + "github.com/cloudwego/eino/components" + "github.com/cloudwego/eino/schema" +) + +type RunInfo struct { + Name string + Type string + Component components.Component +} + +type CallbackInput any + +type CallbackOutput any + +type Handler interface { + OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context + OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context + + OnError(ctx context.Context, info *RunInfo, err error) context.Context + + OnStartWithStreamInput(ctx context.Context, info *RunInfo, + input *schema.StreamReader[CallbackInput]) context.Context + OnEndWithStreamOutput(ctx context.Context, info *RunInfo, + output *schema.StreamReader[CallbackOutput]) context.Context +} + +type CallbackTiming uint8 + +type TimingChecker interface { + Needed(ctx context.Context, info *RunInfo, timing CallbackTiming) bool +} diff --git a/internal/callbacks/manager.go b/internal/callbacks/manager.go new file mode 100644 index 0000000..d40c8c1 --- /dev/null +++ b/internal/callbacks/manager.go @@ -0,0 +1,69 @@ +/* + * Copyright 2024 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package callbacks + +import "context" + +type manager struct { + handlers []Handler + runInfo *RunInfo +} + +var GlobalHandlers []Handler + +func newManager(runInfo *RunInfo, handlers ...Handler) (*manager, bool) { + l := len(handlers) + len(GlobalHandlers) + if l == 0 { + return nil, false + } + hs := make([]Handler, 0, l) + hs = append(hs, GlobalHandlers...) + hs = append(hs, handlers...) + + return &manager{ + handlers: hs, + runInfo: runInfo, + }, true +} + +func ctxWithManager(ctx context.Context, manager *manager) context.Context { + return context.WithValue(ctx, CtxManagerKey{}, manager) +} + +func (m *manager) withRunInfo(runInfo *RunInfo) *manager { + if m == nil { + return nil + } + + return &manager{ + handlers: m.handlers, + runInfo: runInfo, + } +} + +func managerFromCtx(ctx context.Context) (*manager, bool) { + v := ctx.Value(CtxManagerKey{}) + m, ok := v.(*manager) + if ok && m != nil { + return &manager{ + handlers: m.handlers, + runInfo: m.runInfo, + }, true + } + + return nil, false +} diff --git a/utils/callbacks/template_test.go b/utils/callbacks/template_test.go index a621f58..4c15ab8 100644 --- a/utils/callbacks/template_test.go +++ b/utils/callbacks/template_test.go @@ -173,15 +173,15 @@ func TestNewComponentTemplate(t *testing.T) { ctx = context.Background() ctx = callbacks.InitCallbacks(ctx, &callbacks.RunInfo{Component: components.ComponentOfTransformer}, handler) - callbacks.OnStart(ctx, nil) + callbacks.OnStart[any](ctx, nil) assert.Equal(t, 22, cnt) ctx = callbacks.SetRunInfo(ctx, &callbacks.RunInfo{Component: components.ComponentOfPrompt}) - callbacks.OnStart(ctx, nil) + callbacks.OnStart[any](ctx, nil) assert.Equal(t, 23, cnt) ctx = callbacks.SetRunInfo(ctx, &callbacks.RunInfo{Component: components.ComponentOfIndexer}) - callbacks.OnEnd(ctx, nil) + callbacks.OnEnd[any](ctx, nil) assert.Equal(t, 23, cnt) ctx = callbacks.SetRunInfo(ctx, &callbacks.RunInfo{Component: components.ComponentOfEmbedding}) @@ -189,7 +189,7 @@ func TestNewComponentTemplate(t *testing.T) { assert.Equal(t, 24, cnt) ctx = callbacks.SetRunInfo(ctx, &callbacks.RunInfo{Component: components.ComponentOfLoader}) - callbacks.OnStart(ctx, nil) + callbacks.OnStart[any](ctx, nil) assert.Equal(t, 24, cnt) tpl.Transformer(&TransformerCallbackHandler{ @@ -236,15 +236,15 @@ func TestNewComponentTemplate(t *testing.T) { handler = tpl.Handler() ctx = context.Background() ctx = callbacks.InitCallbacks(ctx, &callbacks.RunInfo{Component: components.ComponentOfTransformer}, handler) - callbacks.OnEnd(ctx, nil) + callbacks.OnEnd[any](ctx, nil) assert.Equal(t, 25, cnt) ctx = callbacks.SetRunInfo(ctx, &callbacks.RunInfo{Component: components.ComponentOfIndexer}) - callbacks.OnStart(ctx, nil) + callbacks.OnStart[any](ctx, nil) assert.Equal(t, 26, cnt) ctx = callbacks.SetRunInfo(ctx, &callbacks.RunInfo{Component: components.ComponentOfLoader}) - callbacks.OnEnd(ctx, nil) + callbacks.OnEnd[any](ctx, nil) assert.Equal(t, 27, cnt) }) } diff --git a/utils/generic/generic.go b/utils/generic/generic.go index e0a794e..479919d 100644 --- a/utils/generic/generic.go +++ b/utils/generic/generic.go @@ -69,3 +69,10 @@ type Pair[F, S any] struct { First F Second S } + +// Reverse reverses the elements of the slice in place. +func Reverse[S ~[]E, E any](s S) { + for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 { + s[i], s[j] = s[j], s[i] + } +}