Skip to content

Commit

Permalink
feat: llm bridge server communicates with zipper in memory way (#996)
Browse files Browse the repository at this point in the history
# Description

1. Implement `frame.Listener` using a golang channel.
2. Zipper supports managing multiple `frame.Listener` instances.
3. The LLM bridge server communicates with Zipper using the golang
channel implementation of `frame.Listener`.
  • Loading branch information
woorui authored Jan 26, 2025
1 parent 6152a12 commit 2fe44be
Show file tree
Hide file tree
Showing 12 changed files with 502 additions and 66 deletions.
19 changes: 16 additions & 3 deletions cli/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (

"github.com/spf13/cobra"
"github.com/yomorun/yomo"
"github.com/yomorun/yomo/core/auth"
"github.com/yomorun/yomo/core/ylog"
pkgconfig "github.com/yomorun/yomo/pkg/config"
"github.com/yomorun/yomo/pkg/listener/mem"
"github.com/yomorun/yomo/pkg/log"
"github.com/yomorun/yomo/pkg/trace"

Expand Down Expand Up @@ -68,15 +70,18 @@ var serveCmd = &cobra.Command{
// listening address.
listenAddr := fmt.Sprintf("%s:%d", conf.Host, conf.Port)

// memory listener
var listener *mem.Listener

options := []yomo.ZipperOption{}
tokenString := ""
if _, ok := conf.Auth["type"]; ok {
if tokenString, ok = conf.Auth["token"]; ok {
options = append(options, yomo.WithAuth("token", tokenString))
}
}
// check llm bridge server config
// parse the llm bridge config

// check and parse the llm bridge server config
bridgeConf := conf.Bridge
aiConfig, err := ai.ParseConfig(bridgeConf)
if err != nil {
Expand All @@ -88,8 +93,10 @@ var serveCmd = &cobra.Command{
}
}
if aiConfig != nil {
listener = mem.Listen()
// add AI connection middleware
options = append(options, yomo.WithZipperConnMiddleware(ai.RegisterFunctionMW()))
options = append(options, yomo.WithFrameListener(listener))
}
// new zipper
zipper, err := yomo.NewZipper(
Expand All @@ -108,7 +115,13 @@ var serveCmd = &cobra.Command{
registerAIProvider(aiConfig)
// start the llm api server
go func() {
err := ai.Serve(aiConfig, listenAddr, fmt.Sprintf("token:%s", tokenString), ylog.Default())
conn, _ := listener.Dial()
source := ai.NewSource(conn, auth.NewCredential(fmt.Sprintf("token:%s", tokenString)))

conn2, _ := listener.Dial()
reducer := ai.NewReducer(conn2, auth.NewCredential(fmt.Sprintf("token:%s", tokenString)))

err := ai.Serve(aiConfig, ylog.Default(), source, reducer)
if err != nil {
log.FailureStatusEvent(os.Stdout, err.Error())
return
Expand Down
2 changes: 0 additions & 2 deletions cli/serverless/golang/serverless.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ func (s *GolangServerless) Init(opts *serverless.Options) error {

// Build compiles the serverless to executable
func (s *GolangServerless) Build(clean bool) error {
log.PendingStatusEvent(os.Stdout, "Building YoMo Stream Function instance...")
// check if the file exists
appPath := s.source
if _, err := os.Stat(appPath); os.IsNotExist(err) {
Expand Down Expand Up @@ -203,7 +202,6 @@ func (s *GolangServerless) Build(clean bool) error {
if clean {
file.Remove(s.tempDir)
}
log.SuccessStatusEvent(os.Stdout, "YoMo Stream Function build successful!")
return nil
}

Expand Down
42 changes: 29 additions & 13 deletions core/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,20 +143,32 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error {

defer closeServer(s.downstreams, s.connector, s.listener, s.router)

errCount := 0
for {
fconn, err := s.listener.Accept(s.ctx)
if err != nil {
if err == s.ctx.Err() {
return ErrServerClosed
listeners := append(s.opts.listeners, s.listener)

var wg sync.WaitGroup
for _, l := range listeners {
wg.Add(1)
go func(l frame.Listener) {
errCount := 0
for {
fconn, err := l.Accept(s.ctx)
if err != nil {
if err == s.ctx.Err() {
wg.Done()
return
}
errCount++
s.logger.Error("accepted an error when accepting a connection", "err", err, "err_count", errCount)
continue
}

go s.handleFrameConn(fconn, s.logger)
}
errCount++
s.logger.Error("accepted an error when accepting a connection", "err", err, "err_count", errCount)
continue
}

go s.handleFrameConn(fconn, s.logger)
}(l)
}

wg.Wait()
return ErrServerClosed
}

func (s *Server) handleFrameConn(fconn frame.Conn, logger *slog.Logger) {
Expand Down Expand Up @@ -380,7 +392,11 @@ func (s *Server) routingDataFrame(c *Context) error {

// dispatch every DataFrames to all downstreams
func (s *Server) dispatchToDownstreams(c *Context) error {
dataFrame := c.Frame
dataFrame := &frame.DataFrame{
Tag: c.Frame.Tag,
Payload: c.Frame.Payload,
Metadata: c.Frame.Metadata,
}
if c.Connection.ClientType() == ClientTypeUpstreamZipper {
c.Logger.Debug("ignored client", "client_type", c.Connection.ClientType().String())
// loop protection
Expand Down
9 changes: 9 additions & 0 deletions core/server_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/quic-go/quic-go"
"github.com/yomorun/yomo/core/auth"
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/core/router"
"github.com/yomorun/yomo/core/ylog"
)
Expand Down Expand Up @@ -38,6 +39,7 @@ type serverOptions struct {
router router.Router
connMiddlewares []ConnMiddleware
frameMiddlewares []FrameMiddleware
listeners []frame.Listener
}

func defaultServerOptions() *serverOptions {
Expand Down Expand Up @@ -120,3 +122,10 @@ func WithConnMiddleware(mws ...ConnMiddleware) ServerOption {
o.connMiddlewares = append(o.connMiddlewares, mws...)
}
}

// WithFrameListener adds a Listener other than a quic.Listener.
func WithFrameListener(l ...frame.Listener) ServerOption {
return func(o *serverOptions) {
o.listeners = append(o.listeners, l...)
}
}
8 changes: 8 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/quic-go/quic-go"
"github.com/yomorun/yomo/core"
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/core/router"
)

Expand Down Expand Up @@ -147,4 +148,11 @@ var (
o.serverOption = append(o.serverOption, core.WithFrameMiddleware(mw...))
}
}

// WithFrameListener adds a Listener other than a quic.Listener.
WithFrameListener = func(l ...frame.Listener) ZipperOption {
return func(o *zipperOptions) {
o.serverOption = append(o.serverOption, core.WithFrameListener(l...))
}
}
)
23 changes: 10 additions & 13 deletions pkg/bridge/ai/api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

openai "github.com/sashabaranov/go-openai"
"github.com/yomorun/yomo"
"github.com/yomorun/yomo/ai"
"github.com/yomorun/yomo/pkg/bridge/ai/provider"
"github.com/yomorun/yomo/pkg/bridge/ai/register"
Expand All @@ -34,23 +35,20 @@ const (

// BasicAPIServer provides restful service for end user
type BasicAPIServer struct {
zipperAddr string
credential string
httpHandler http.Handler
}

// Serve starts the Basic API Server
func Serve(config *Config, zipperListenAddr string, credential string, logger *slog.Logger) error {
func Serve(config *Config, logger *slog.Logger, source yomo.Source, reducer yomo.StreamFunction) error {
provider, err := provider.GetProvider(config.Server.Provider)
if err != nil {
return err
}
srv, err := NewBasicAPIServer(config, zipperListenAddr, credential, provider, logger)
srv, err := NewBasicAPIServer(config, provider, source, reducer, logger)
if err != nil {
return err
}

logger.Info("start AI Bridge service", "addr", config.Server.Addr, "provider", provider.Name())
return http.ListenAndServe(config.Server.Addr, srv.httpHandler)
}

Expand Down Expand Up @@ -80,24 +78,23 @@ func DecorateHandler(h http.Handler, decorates ...func(handler http.Handler) htt
}

// NewBasicAPIServer creates a new restful service
func NewBasicAPIServer(config *Config, zipperAddr, credential string, provider provider.LLMProvider, logger *slog.Logger) (*BasicAPIServer, error) {
zipperAddr = parseZipperAddr(zipperAddr)

func NewBasicAPIServer(config *Config, provider provider.LLMProvider, source yomo.Source, reducer yomo.StreamFunction, logger *slog.Logger) (*BasicAPIServer, error) {
logger = logger.With("service", "llm-bridge")

service := NewService(zipperAddr, provider, &ServiceOptions{
opts := &ServiceOptions{
Logger: logger,
CredentialFunc: func(r *http.Request) (string, error) { return credential, nil },
})
SourceBuilder: func() yomo.Source { return source },
ReducerBuilder: func() yomo.StreamFunction { return reducer },
}
service := NewService(provider, opts)

mux := NewServeMux(service)

server := &BasicAPIServer{
zipperAddr: zipperAddr,
credential: credential,
httpHandler: DecorateHandler(mux, decorateReqContext(service, logger)),
}

logger.Info("start AI Bridge service", "addr", config.Server.Addr, "provider", provider.Name())
return server, nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/bridge/ai/api_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ func TestServer(t *testing.T) {
return mockCaller(nil), err
}

service := newService("fake_zipper_addr", pd, newCaller, &ServiceOptions{
SourceBuilder: func(_, _ string) yomo.Source { return flow },
ReducerBuilder: func(_, _ string) yomo.StreamFunction { return flow },
service := newService(pd, newCaller, &ServiceOptions{
SourceBuilder: func() yomo.Source { return flow },
ReducerBuilder: func() yomo.StreamFunction { return flow },
MetadataExchanger: func(_ string) (metadata.M, error) { return metadata.M{"hello": "llm bridge"}, nil },
})

Expand Down
129 changes: 129 additions & 0 deletions pkg/bridge/ai/reducer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package ai

import (
"github.com/yomorun/yomo"
"github.com/yomorun/yomo/core"
"github.com/yomorun/yomo/core/auth"
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/core/metadata"
"github.com/yomorun/yomo/core/serverless"
"github.com/yomorun/yomo/pkg/id"
"github.com/yomorun/yomo/pkg/listener/mem"
)

var _ yomo.Source = &memSource{}

type memSource struct {
cred *auth.Credential
conn *mem.FrameConn
}

func NewSource(conn *mem.FrameConn, cred *auth.Credential) yomo.Source {
return &memSource{
conn: conn,
cred: cred,
}
}

func (m *memSource) Connect() error {
hf := &frame.HandshakeFrame{
Name: "fc-source",
ID: id.New(),
ClientType: byte(core.ClientTypeSource),
AuthName: m.cred.Name(),
AuthPayload: m.cred.Payload(),
Version: core.Version,
}

return m.conn.Handshake(hf)
}

func (m *memSource) Write(tag uint32, data []byte) error {
df := &frame.DataFrame{
Tag: tag,
Payload: data,
}
return m.conn.WriteFrame(df)
}

func (m *memSource) Close() error { return nil }
func (m *memSource) SetErrorHandler(_ func(_ error)) {}
func (m *memSource) WriteWithTarget(_ uint32, _ []byte, _ string) error { return nil }

type memStreamFunction struct {
observedTags []uint32
handler core.AsyncHandler
cred *auth.Credential
conn *mem.FrameConn
}

// NewReducer creates a new instance of memory StreamFunction.
func NewReducer(conn *mem.FrameConn, cred *auth.Credential) yomo.StreamFunction {
return &memStreamFunction{
conn: conn,
cred: cred,
}
}

func (m *memStreamFunction) Close() error {
return nil
}

func (m *memStreamFunction) Connect() error {
hf := &frame.HandshakeFrame{
Name: "fc-reducer",
ID: id.New(),
ClientType: byte(core.ClientTypeStreamFunction),
AuthName: m.cred.Name(),
AuthPayload: m.cred.Payload(),
ObserveDataTags: m.observedTags,
Version: core.Version,
}

if err := m.conn.Handshake(hf); err != nil {
return nil
}

go func() {
for {
f, err := m.conn.ReadFrame()
if err != nil {
return
}

switch ff := f.(type) {
case *frame.DataFrame:
go m.onDataFrame(ff)
default:
return
}
}
}()

return nil
}

func (m *memStreamFunction) onDataFrame(dataFrame *frame.DataFrame) {
md, err := metadata.Decode(dataFrame.Metadata)
if err != nil {
return
}

serverlessCtx := serverless.NewContext(m.conn, dataFrame.Tag, md, dataFrame.Payload)
m.handler(serverlessCtx)
}

func (m *memStreamFunction) SetHandler(fn core.AsyncHandler) error {
m.handler = fn
return nil
}

func (m *memStreamFunction) Init(_ func() error) error { return nil }
func (m *memStreamFunction) SetCronHandler(_ string, _ core.CronHandler) error { return nil }
func (m *memStreamFunction) SetErrorHandler(_ func(err error)) {}
func (m *memStreamFunction) SetObserveDataTags(tags ...uint32) { m.observedTags = tags }
func (m *memStreamFunction) SetPipeHandler(fn core.PipeHandler) error { return nil }
func (m *memStreamFunction) SetWantedTarget(string) {}
func (m *memStreamFunction) Wait() {}

var _ yomo.StreamFunction = &memStreamFunction{}
Loading

0 comments on commit 2fe44be

Please sign in to comment.