From 4f044e0b90bab9ef4eea9ed5dd0438aaed7f49c0 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 11 Oct 2023 14:43:05 +0800 Subject: [PATCH] adding common shared service things and starting to use them in artifact service --- flyteartifacts/cmd/main.go | 162 +++++++------ flyteartifacts/cmd/shared/root.go | 108 +++++++++ flyteartifacts/cmd/shared/serve.go | 212 ++++++++++++++++++ flyteartifacts/cmd/shared/types.go | 11 + .../pkg/configuration/shared/config.go | 64 ++++++ flyteartifacts/pkg/server/server.go | 14 ++ 6 files changed, 488 insertions(+), 83 deletions(-) create mode 100644 flyteartifacts/cmd/shared/root.go create mode 100644 flyteartifacts/cmd/shared/serve.go create mode 100644 flyteartifacts/cmd/shared/types.go create mode 100644 flyteartifacts/pkg/configuration/shared/config.go diff --git a/flyteartifacts/cmd/main.go b/flyteartifacts/cmd/main.go index fc4e939d17..bcddb89ccf 100644 --- a/flyteartifacts/cmd/main.go +++ b/flyteartifacts/cmd/main.go @@ -1,96 +1,92 @@ package main import ( - "flag" - "fmt" - "github.com/flyteorg/flyte/flyteartifacts/pkg/configuration" - "github.com/flyteorg/flyte/flyteartifacts/pkg/server" - "github.com/flyteorg/flyte/flytestdlib/config" - "github.com/flyteorg/flyte/flytestdlib/config/viper" - "github.com/flyteorg/flyte/flytestdlib/logger" - "github.com/golang/glog" - "github.com/spf13/cobra" - "github.com/spf13/pflag" - "google.golang.org/grpc" - "os" - "context" + sharedCmd "github.com/flyteorg/flyte/flyteartifacts/cmd/shared" + "github.com/flyteorg/flyte/flytestdlib/logger" _ "net/http/pprof" // Required to serve application. ) -var ( - cfgFile string - configAccessor = viper.NewAccessor(config.Options{}) -) - -var serveCmd = &cobra.Command{ - Use: "serve", - Short: "Launches the Flyte artifacts server", - RunE: func(cmd *cobra.Command, args []string) error { - ctx := context.Background() - cfg := configuration.ApplicationConfig.GetConfig().(*configuration.ApplicationConfiguration) - fmt.Printf("cfg: [%+v]\n", cfg) - opts := make([]grpc.ServerOption, 0) - return server.Serve(ctx, opts...) - }, -} - -// RootCmd represents the base command when called without any subcommands -var RootCmd = &cobra.Command{ - Use: "artifacts", - Short: "Fill in later", - Long: ` -To get started run the serve subcommand which will start a server on localhost:50051 -`, - PersistentPreRunE: func(cmd *cobra.Command, args []string) error { - return initConfig(cmd.Flags()) - }, -} - -func init() { - pflag.CommandLine.AddGoFlagSet(flag.CommandLine) - - // Add persistent flags - persistent flags persist through all sub-commands - RootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is ./artifact_config.yaml)") - - // Allow viper to read the value of the flags - configAccessor.InitializePflags(RootCmd.PersistentFlags()) - - // Command information - RootCmd.AddCommand(serveCmd) - - err := flag.CommandLine.Parse([]string{}) - if err != nil { - fmt.Println(err) - os.Exit(-1) - } - -} - -func initConfig(flags *pflag.FlagSet) error { - configAccessor = viper.NewAccessor(config.Options{ - SearchPaths: []string{cfgFile, "./artifact_config.yaml", ".", "/etc/flyte/config", "$GOPATH/src/github.com/flyteorg/flyte/flyteartifacts"}, - StrictMode: false, - }) - - logger.Infof(context.TODO(), "Using config file: %v", configAccessor.ConfigFilesUsed()) - - configAccessor.InitializePflags(flags) - - err := flag.CommandLine.Parse([]string{}) - if err != nil { - fmt.Println(err) - os.Exit(-1) - } - - return configAccessor.UpdateConfig(context.TODO()) -} +// +//var ( +// cfgFile string +// configAccessor = viper.NewAccessor(config.Options{}) +//) +// +//var serveCmd = &cobra.Command{ +// Use: "serve", +// Short: "Launches the Flyte artifacts server", +// RunE: func(cmd *cobra.Command, args []string) error { +// ctx := context.Background() +// cfg := configuration.ApplicationConfig.GetConfig().(*configuration.ApplicationConfiguration) +// fmt.Printf("cfg: [%+v]\n", cfg) +// opts := make([]grpc.ServerOption, 0) +// return server.Serve(ctx, opts...) +// }, +//} +// +//// RootCmd represents the base command when called without any subcommands +//var RootCmd = &cobra.Command{ +// Use: "artifacts", +// Short: "Fill in later", +// Long: ` +//To get started run the serve subcommand which will start a server on localhost:50051 +//`, +// PersistentPreRunE: func(cmd *cobra.Command, args []string) error { +// return initConfig(cmd.Flags()) +// }, +//} +// +//func init() { +// pflag.CommandLine.AddGoFlagSet(flag.CommandLine) +// +// // Add persistent flags - persistent flags persist through all sub-commands +// RootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is ./artifact_config.yaml)") +// +// // Allow viper to read the value of the flags +// configAccessor.InitializePflags(RootCmd.PersistentFlags()) +// +// // Command information +// RootCmd.AddCommand(serveCmd) +// +// err := flag.CommandLine.Parse([]string{}) +// if err != nil { +// fmt.Println(err) +// os.Exit(-1) +// } +// +//} +// +//func initConfig(flags *pflag.FlagSet) error { +// configAccessor = viper.NewAccessor(config.Options{ +// SearchPaths: []string{cfgFile, "./artifact_config.yaml", ".", "/etc/flyte/config", "$GOPATH/src/github.com/flyteorg/flyte/flyteartifacts"}, +// StrictMode: false, +// }) +// +// logger.Infof(context.TODO(), "Using config file: %v", configAccessor.ConfigFilesUsed()) +// +// configAccessor.InitializePflags(flags) +// +// err := flag.CommandLine.Parse([]string{}) +// if err != nil { +// fmt.Println(err) +// os.Exit(-1) +// } +// +// return configAccessor.UpdateConfig(context.TODO()) +//} func main() { - glog.V(2).Info("Beginning Flyte Artifacts Service") - if err := RootCmd.Execute(); err != nil { - fmt.Println(err) + ctx := context.Background() + logger.Infof(ctx, "Beginning Flyte Artifacts Service") + rootCmd := sharedCmd.NewRootCmd("artifacts") + //if err := RootCmd.ExecuteC(); err != nil { + // fmt.Println(err) + // panic(err) + //} + err := rootCmd.ExecuteContext(ctx) + if err != nil { panic(err) } } diff --git a/flyteartifacts/cmd/shared/root.go b/flyteartifacts/cmd/shared/root.go new file mode 100644 index 0000000000..514133dce3 --- /dev/null +++ b/flyteartifacts/cmd/shared/root.go @@ -0,0 +1,108 @@ +package shared + +import ( + "context" + "flag" + "fmt" + sharedCfg "github.com/flyteorg/flyte/flyteartifacts/pkg/configuration/shared" + "github.com/flyteorg/flyte/flytestdlib/config" + "github.com/flyteorg/flyte/flytestdlib/config/viper" + "github.com/flyteorg/flyte/flytestdlib/contextutils" + "github.com/flyteorg/flyte/flytestdlib/logger" + "github.com/flyteorg/flyte/flytestdlib/profutils" + "github.com/flyteorg/flyte/flytestdlib/promutils/labeled" + "github.com/flyteorg/flyte/flytestdlib/storage" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "os" +) + +var ( + cfgFile string + configAccessor = viper.NewAccessor(config.Options{}) +) + +//var XXRootCmd = &cobra.Command{ +// Use: "artifacts", +// Short: "Fill in later", +// PersistentPreRunE: func(cmd *cobra.Command, args []string) error { +// return initConfig(cmd.Flags()) +// }, +//} + +// NewRootCmd represents the base command when called without any subcommands +func NewRootCmd(rootUse string, grpcHook GrpcRegistrationHook, httpHook HttpRegistrationHook) *cobra.Command { + + rootCmd := &cobra.Command{ + Use: rootUse, + Short: "Short description", + Long: "Long description to be filled in later", + PersistentPreRunE: func(cmd *cobra.Command, args []string) error { + err := initConfig(cmd, args) + if err != nil { + return err + } + + go func() { + ctx := context.Background() + sharedConfig := sharedCfg.SharedServerConfig.GetConfig().(*sharedCfg.ServerConfiguration) + err := profutils.StartProfilingServerWithDefaultHandlers(ctx, + sharedConfig.Metrics.Port.Port, nil) + if err != nil { + logger.Panicf(ctx, "Failed to Start profiling and metrics server. Error: %v", err) + } + }() + + return nil + }, + } + + initSubCommands(rootCmd, grpcHook, httpHook) + return rootCmd +} + +func init() { + // Set Keys + labeled.SetMetricKeys(contextutils.AppNameKey, contextutils.ProjectKey, + contextutils.DomainKey, storage.FailureTypeLabel) +} + +func initConfig(cmd *cobra.Command, _ []string) error { + configAccessor = viper.NewAccessor(config.Options{ + SearchPaths: []string{cfgFile, ".", "/etc/flyte/config", "$GOPATH/src/github.com/flyteorg/flyte/flyteartifacts"}, + StrictMode: false, + }) + + fmt.Println("Using config file: ", configAccessor.ConfigFilesUsed()) + + // persistent flags were initially bound to the root command so we must bind to the same command to avoid + // overriding those initial ones. We need to traverse up to the root command and initialize pflags for that. + rootCmd := cmd + for rootCmd.Parent() != nil { + rootCmd = rootCmd.Parent() + } + + configAccessor.InitializePflags(rootCmd.PersistentFlags()) + + return configAccessor.UpdateConfig(context.TODO()) +} + +func initSubCommands(rootCmd *cobra.Command, grpcHook GrpcRegistrationHook, httpHook HttpRegistrationHook) { + // allows ` --logtostderr` to work + pflag.CommandLine.AddGoFlagSet(flag.CommandLine) + + // Add persistent flags - persistent flags persist through all sub-commands + rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "artifact_config.yaml", "config file (default is ./artifact_config.yaml)") + + rootCmd.AddCommand(viper.GetConfigCommand()) + rootCmd.AddCommand(NewServeCmd(rootCmd.Use, grpcHook, httpHook)) + + // Allow viper to read the value of the flags + configAccessor.InitializePflags(rootCmd.PersistentFlags()) + + err := flag.CommandLine.Parse([]string{}) + if err != nil { + fmt.Println(err) + os.Exit(-1) + } +} diff --git a/flyteartifacts/cmd/shared/serve.go b/flyteartifacts/cmd/shared/serve.go new file mode 100644 index 0000000000..74e5b1e57f --- /dev/null +++ b/flyteartifacts/cmd/shared/serve.go @@ -0,0 +1,212 @@ +package shared + +import ( + "context" + "google.golang.org/grpc/reflection" + "net" + "net/http" + + sharedCfg "github.com/flyteorg/flyte/flyteartifacts/pkg/configuration/shared" + "github.com/flyteorg/flyte/flytestdlib/logger" + "github.com/flyteorg/flyte/flytestdlib/promutils" + grpcMiddleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/grpc-ecosystem/grpc-gateway/runtime" + "github.com/pkg/errors" + "github.com/spf13/cobra" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" +) + +func NewServeCmd(commandName string, grpcHook GrpcRegistrationHook, httpHook HttpRegistrationHook) *cobra.Command { + // serveCmd represents the serve command + return &cobra.Command{ + Use: "serve", + Short: "Launches the server", + RunE: func(cmd *cobra.Command, args []string) error { + ctx := context.Background() + serverCfg := sharedCfg.SharedServerConfig.GetConfig().(*sharedCfg.ServerConfiguration) + return serveGateway(ctx, commandName, serverCfg, grpcHook, httpHook) + }, + } +} + +// Creates a new gRPC Server with all the configuration +func newGRPCServer(ctx context.Context, serviceName string, serverCfg *sharedCfg.ServerConfiguration, + grpcHook GrpcRegistrationHook, opts ...grpc.ServerOption) (*grpc.Server, error) { + + scope := promutils.NewScope(serverCfg.Metrics.MetricsScope) + + var grpcUnaryInterceptors = make([]grpc.UnaryServerInterceptor, 0) + var streamServerInterceptors = make([]grpc.StreamServerInterceptor, 0) + + serverOpts := []grpc.ServerOption{ + grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), + grpc.UnaryInterceptor(grpcMiddleware.ChainUnaryServer( + grpcUnaryInterceptors..., + )), + grpc.ChainStreamInterceptor( + streamServerInterceptors..., + ), + } + + serverOpts = append(serverOpts, opts...) + + grpcServer := grpc.NewServer(serverOpts...) + grpcPrometheus.Register(grpcServer) + if grpcHook != nil { + err := grpcHook(ctx, grpcServer, scope) + if err != nil { + return nil, err + } + } + + healthServer := health.NewServer() + healthServer.SetServingStatus(serviceName, grpc_health_v1.HealthCheckResponse_SERVING) + grpc_health_v1.RegisterHealthServer(grpcServer, healthServer) + if serverCfg.GrpcServerReflection { + reflection.Register(grpcServer) + } + + return grpcServer, nil +} + +// serveGateway launches the grpc and http servers. +func serveGateway(ctx context.Context, serviceName string, serverCfg *sharedCfg.ServerConfiguration, grpcHook GrpcRegistrationHook, httpHook HttpRegistrationHook) error { + + if grpcHook != nil { + if err := launchGrpcServer(ctx, serviceName, serverCfg, grpcHook); err != nil { + return err + } + } + + if httpHook != nil { + if err := launchHttpServer(ctx, serverCfg, httpHook); err != nil { + return err + } + } + + for { + <-ctx.Done() + return nil + } +} + +// launchGrpcServer launches grpc server with server config and also registers the grpc hook for the service. +func launchGrpcServer(ctx context.Context, serviceName string, serverCfg *sharedCfg.ServerConfiguration, grpcHook GrpcRegistrationHook) error { + var grpcServer *grpc.Server + if serverCfg.Security.Secure { + tlsCreds, err := credentials.NewServerTLSFromFile(serverCfg.Security.Ssl.CertificateFile, serverCfg.Security.Ssl.KeyFile) + if err != nil { + return err + } + + grpcServer, err = newGRPCServer(ctx, serviceName, serverCfg, grpcHook, grpc.Creds(tlsCreds)) + if err != nil { + return errors.Wrap(err, "failed to create secure GRPC server") + } + } else { + var err error + grpcServer, err = newGRPCServer(ctx, serviceName, serverCfg, grpcHook) + if err != nil { + return errors.Wrap(err, "failed to create GRPC server") + } + } + + return listenAndServe(ctx, grpcServer, serverCfg.GetGrpcHostAddress()) +} + +// listenAndServe on the grpcHost address and serve connections using the grpcServer +func listenAndServe(ctx context.Context, grpcServer *grpc.Server, grpcHost string) error { + conn, err := net.Listen("tcp", grpcHost) + if err != nil { + panic(err) + } + + go func() { + if err := grpcServer.Serve(conn); err != nil { + logger.Fatalf(ctx, "Failed to create GRPC Server, Err: ", err) + } + + logger.Infof(ctx, "Serving GRPC Gateway server on: %v", grpcHost) + }() + return nil +} + +// launchHttpServer launches an http server for converting REST calls to grpc internally +func launchHttpServer(ctx context.Context, cfg *sharedCfg.ServerConfiguration, httpHook HttpRegistrationHook) error { + logger.Infof(ctx, "Starting HTTP/1 Gateway server on %s", cfg.GetHttpHostAddress()) + + httpServer, serverMux, err := newHTTPServer(cfg.Security) + if err != nil { + return err + } + + if err = registerHttpHook(ctx, serverMux, cfg.GetGrpcHostAddress(), uint32(cfg.GrpcMaxResponseStatusBytes), httpHook, promutils.NewScope(cfg.Metrics.MetricsScope)); err != nil { + return err + } + + handler := getHTTPHandler(httpServer, cfg.Security) + + go func() { + err = http.ListenAndServe(cfg.GetHttpHostAddress(), handler) + if err != nil { + logger.Fatalf(ctx, "Failed to Start HTTP Server, Err: %v", err) + } + + logger.Infof(ctx, "Serving HTTP/1 on: %v", cfg.GetHttpHostAddress()) + }() + return nil +} + +// getHTTPHandler gets the http handler for the configured security options +func getHTTPHandler(httpServer *http.ServeMux, _ sharedCfg.ServerSecurityOptions) http.Handler { + // not really used yet (reserved for admin) + var handler http.Handler + handler = httpServer + return handler +} + +// registerHttpHook registers the http hook for a service which multiplexes to the grpc endpoint for that service. +func registerHttpHook(ctx context.Context, gwmux *runtime.ServeMux, grpcHost string, maxResponseStatusBytes uint32, httpHook HttpRegistrationHook, scope promutils.Scope) error { + if httpHook == nil { + return nil + } + grpcOptions := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithMaxHeaderListSize(maxResponseStatusBytes), + } + + return httpHook(ctx, gwmux, grpcHost, grpcOptions, scope) +} + +func healthCheckFunc(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) +} + +// newHTTPServer creates a new http sever +func newHTTPServer(_ sharedCfg.ServerSecurityOptions) (*http.ServeMux, *runtime.ServeMux, error) { + // Register the server that will serve HTTP/REST Traffic + mux := http.NewServeMux() + + // Register healthcheck + mux.HandleFunc("/healthcheck", healthCheckFunc) + + var gwmuxOptions = make([]runtime.ServeMuxOption, 0) + // This option means that http requests are served with protobufs, instead of json. We always want this. + gwmuxOptions = append(gwmuxOptions, runtime.WithMarshalerOption("application/octet-stream", &runtime.ProtoMarshaller{})) + + // Create the grpc-gateway server with the options specified + gwmux := runtime.NewServeMux(gwmuxOptions...) + + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + //ctx := GetOrGenerateRequestIDForRequest(r) + ctx := r.Context() + logger.Debugf(ctx, "Running identity interceptor for http endpoint [%s]", r.URL.String()) + + gwmux.ServeHTTP(w, r.WithContext(ctx)) + }) + return mux, gwmux, nil +} diff --git a/flyteartifacts/cmd/shared/types.go b/flyteartifacts/cmd/shared/types.go new file mode 100644 index 0000000000..be1878a6d3 --- /dev/null +++ b/flyteartifacts/cmd/shared/types.go @@ -0,0 +1,11 @@ +package shared + +import ( + "context" + "github.com/flyteorg/flyte/flytestdlib/promutils" + "github.com/grpc-ecosystem/grpc-gateway/runtime" + "google.golang.org/grpc" +) + +type GrpcRegistrationHook func(ctx context.Context, server *grpc.Server, scope promutils.Scope) error +type HttpRegistrationHook func(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption, scope promutils.Scope) error diff --git a/flyteartifacts/pkg/configuration/shared/config.go b/flyteartifacts/pkg/configuration/shared/config.go new file mode 100644 index 0000000000..ce87276863 --- /dev/null +++ b/flyteartifacts/pkg/configuration/shared/config.go @@ -0,0 +1,64 @@ +package shared + +import ( + "fmt" + "github.com/flyteorg/flyte/flytestdlib/config" +) + +const sharedServer = "sharedServer" + +type Metrics struct { + MetricsScope string `json:"metricsScope" pflag:",MetricsScope"` + Port config.Port `json:"port" pflag:",Profile port to start listen for pprof and metric handlers on."` + ProfilerEnabled bool `json:"profilerEnabled" pflag:",Enable Profiler on server"` +} + +type SslOptions struct { + CertificateAuthorityFile string `json:"certificateAuthorityFile"` + CertificateFile string `json:"certificateFile"` + KeyFile string `json:"keyFile"` +} + +type ServerSecurityOptions struct { + Secure bool `json:"secure"` + Ssl SslOptions `json:"ssl"` + UseAuth bool `json:"useAuth"` + AllowLocalhostAccess bool `json:"allowLocalhostAccess" pflag:",Whether to permit localhost unauthenticated access to the server"` +} + +type ServerConfiguration struct { + Metrics Metrics `json:"metrics" pflag:",Metrics configuration"` + Port config.Port `json:"port" pflag:",On which grpc port to serve"` + HttpPort config.Port `json:"httpPort" pflag:",On which http port to serve"` + GrpcMaxResponseStatusBytes int32 `json:"grpcMaxResponseStatusBytes" pflag:", specifies the maximum (uncompressed) size of header list that the client is prepared to accept on grpc calls"` + GrpcServerReflection bool `json:"grpcServerReflection" pflag:",Enable GRPC Server Reflection"` + Security ServerSecurityOptions `json:"security"` + MaxConcurrentStreams int `json:"maxConcurrentStreams" pflag:",Limit on the number of concurrent streams to each ServerTransport."` +} + +var sharedServerConfiguration = ServerConfiguration{ + Metrics: Metrics{ + MetricsScope: "service:", + Port: config.Port{Port: 10254}, + ProfilerEnabled: false, + }, + Port: config.Port{Port: 8089}, + HttpPort: config.Port{Port: 8088}, + GrpcMaxResponseStatusBytes: 320000, + GrpcServerReflection: false, + Security: ServerSecurityOptions{ + Secure: false, + UseAuth: false, + }, + MaxConcurrentStreams: 100, +} + +func (s ServerConfiguration) GetGrpcHostAddress() string { + return fmt.Sprintf(":%s", s.Port.String()) +} + +func (s ServerConfiguration) GetHttpHostAddress() string { + return fmt.Sprintf(":%s", s.HttpPort.String()) +} + +var SharedServerConfig = config.MustRegisterSection(sharedServer, &sharedServerConfiguration) diff --git a/flyteartifacts/pkg/server/server.go b/flyteartifacts/pkg/server/server.go index 1c998248a8..3227029725 100644 --- a/flyteartifacts/pkg/server/server.go +++ b/flyteartifacts/pkg/server/server.go @@ -1,7 +1,11 @@ package server import ( + "github.com/flyteorg/flyte/flyteartifacts/pkg/configuration" + "github.com/flyteorg/flyte/flytestdlib/promutils" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/artifact" + "github.com/grpc-ecosystem/grpc-gateway/runtime" + "github.com/pkg/errors" "google.golang.org/grpc" "net" @@ -18,9 +22,19 @@ func NewArtifactService() *ArtifactService { return &ArtifactService{} } +func HttpRegistrationHook(ctx context.Context, gwmux *runtime.ServeMux, grpcAddress string, grpcConnectionOpts []grpc.DialOption, _ promutils.Scope) error { + //err := executionsvc.RegisterExecutionServiceHandlerFromEndpoint(ctx, gwmux, grpcAddress, grpcConnectionOpts) + if err != nil { + return errors.Wrap(err, "error registering execution service") + } + return nil +} + func Serve(ctx context.Context, opts ...grpc.ServerOption) error { var serverOpts []grpc.ServerOption + cfg := configuration.ApplicationConfig.GetConfig().(*configuration.ApplicationConfiguration) + serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(3000000)) serverOpts = append(serverOpts, opts...) grpcServer := grpc.NewServer(serverOpts...)