From 2443ac6c826b11c44d6add7e8e3b5b26f6c05a2d Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Thu, 13 Jul 2023 16:09:37 +0300 Subject: [PATCH] Add support for native distributed execution This adds the `k6 agent` and `k6 coordinator` sub-commands and adds a very simple way to do distributed execution, including packaging and sending the script to agents, and setup() and teardown() handling. However, it doesn't include automatic metric handling (e.g. thresholds and the end-of-test summary). --- cmd/agent.go | 125 ++++ cmd/coordinator.go | 85 +++ cmd/root.go | 1 + cmd/run.go | 16 +- execution/distributed/agent.go | 193 ++++++ execution/distributed/coordinator.go | 284 +++++++++ execution/distributed/distributed.pb.go | 607 +++++++++++++++++++ execution/distributed/distributed.proto | 45 ++ execution/distributed/distributed_grpc.pb.go | 174 ++++++ execution/distributed/gen.go | 5 + 10 files changed, 1529 insertions(+), 6 deletions(-) create mode 100644 cmd/agent.go create mode 100644 cmd/coordinator.go create mode 100644 execution/distributed/agent.go create mode 100644 execution/distributed/coordinator.go create mode 100644 execution/distributed/distributed.pb.go create mode 100644 execution/distributed/distributed.proto create mode 100644 execution/distributed/distributed_grpc.pb.go create mode 100644 execution/distributed/gen.go diff --git a/cmd/agent.go b/cmd/agent.go new file mode 100644 index 00000000000..45bebf370ad --- /dev/null +++ b/cmd/agent.go @@ -0,0 +1,125 @@ +package cmd + +import ( + "bytes" + "encoding/json" + + "github.com/spf13/afero" + "github.com/spf13/cobra" + "go.k6.io/k6/cmd/state" + "go.k6.io/k6/execution" + "go.k6.io/k6/execution/distributed" + "go.k6.io/k6/js" + "go.k6.io/k6/lib" + "go.k6.io/k6/loader" + "go.k6.io/k6/metrics" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "gopkg.in/guregu/null.v3" +) + +// TODO: a whole lot of cleanup, refactoring, error handling and hardening +func getCmdAgent(gs *state.GlobalState) *cobra.Command { //nolint: funlen + c := &cmdsRunAndAgent{gs: gs} + + c.loadConfiguredTest = func(cmd *cobra.Command, args []string) ( + *loadedAndConfiguredTest, execution.Controller, error, + ) { + // TODO: add some gRPC authentication + conn, err := grpc.Dial(args[0], grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, nil, err + } + c.testEndHook = func(err error) { + gs.Logger.Debugf("k6 agent run ended with err=%s", err) + _ = conn.Close() + } + + client := distributed.NewDistributedTestClient(conn) + + resp, err := client.Register(gs.Ctx, &distributed.RegisterRequest{}) + if err != nil { + return nil, nil, err + } + + controller, err := distributed.NewAgentController(gs.Ctx, resp.InstanceID, client, gs.Logger) + if err != nil { + return nil, nil, err + } + + var options lib.Options + if err = json.Unmarshal(resp.Options, &options); err != nil { + return nil, nil, err + } + + arc, err := lib.ReadArchive(bytes.NewReader(resp.Archive)) + if err != nil { + return nil, nil, err + } + + registry := metrics.NewRegistry() + piState := &lib.TestPreInitState{ + Logger: gs.Logger, + RuntimeOptions: lib.RuntimeOptions{ + NoThresholds: null.BoolFrom(true), + NoSummary: null.BoolFrom(true), + Env: arc.Env, + CompatibilityMode: null.StringFrom(arc.CompatibilityMode), + }, + Registry: registry, + BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry), + } + + initRunner, err := js.NewFromArchive(piState, arc) + if err != nil { + return nil, nil, err + } + + test := &loadedTest{ + pwd: arc.Pwd, + sourceRootPath: arc.Filename, + source: &loader.SourceData{ + Data: resp.Archive, + URL: arc.FilenameURL, + }, + fs: afero.NewMemMapFs(), // TODO: figure out what should be here + fileSystems: arc.Filesystems, + preInitState: piState, + initRunner: initRunner, + } + + pseudoConsoldatedConfig := applyDefault(Config{Options: options}) + for _, thresholds := range pseudoConsoldatedConfig.Thresholds { + if err = thresholds.Parse(); err != nil { + return nil, nil, err + } + } + derivedConfig, err := deriveAndValidateConfig(pseudoConsoldatedConfig, initRunner.IsExecutable, gs.Logger) + if err != nil { + return nil, nil, err + } + + configuredTest := &loadedAndConfiguredTest{ + loadedTest: test, + consolidatedConfig: pseudoConsoldatedConfig, + derivedConfig: derivedConfig, + } + + gs.Flags.Address = "" // TODO: fix, this is a hack so agents don't start an API server + + return configuredTest, controller, nil // TODO + } + + agentCmd := &cobra.Command{ + Use: "agent", + Short: "Join a distributed load test", + Long: `TODO`, + Args: exactArgsWithMsg(1, "arg should either the IP and port of the controller k6 instance"), + RunE: c.run, + Hidden: true, // TODO: remove when officially released + } + + // TODO: add flags + + return agentCmd +} diff --git a/cmd/coordinator.go b/cmd/coordinator.go new file mode 100644 index 00000000000..7c561131aaf --- /dev/null +++ b/cmd/coordinator.go @@ -0,0 +1,85 @@ +package cmd + +import ( + "net" + + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "go.k6.io/k6/cmd/state" + "go.k6.io/k6/execution/distributed" + "google.golang.org/grpc" +) + +// cmdCoordinator handles the `k6 coordinator` sub-command +type cmdCoordinator struct { + gs *state.GlobalState + gRPCAddress string + instanceCount int +} + +func (c *cmdCoordinator) run(cmd *cobra.Command, args []string) (err error) { + test, err := loadAndConfigureLocalTest(c.gs, cmd, args, getPartialConfig) + if err != nil { + return err + } + + coordinator, err := distributed.NewCoordinatorServer( + c.instanceCount, test.initRunner.MakeArchive(), c.gs.Logger, + ) + if err != nil { + return err + } + + c.gs.Logger.Infof("Starting gRPC server on %s", c.gRPCAddress) + listener, err := net.Listen("tcp", c.gRPCAddress) + if err != nil { + return err + } + + grpcServer := grpc.NewServer() // TODO: add auth and a whole bunch of other options + distributed.RegisterDistributedTestServer(grpcServer, coordinator) + + go func() { + err := grpcServer.Serve(listener) + c.gs.Logger.Debugf("gRPC server end: %s", err) + }() + coordinator.Wait() + c.gs.Logger.Infof("All done!") + return nil +} + +func (c *cmdCoordinator) flagSet() *pflag.FlagSet { + flags := pflag.NewFlagSet("", pflag.ContinueOnError) + flags.SortFlags = false + flags.AddFlagSet(optionFlagSet()) + flags.AddFlagSet(runtimeOptionFlagSet(false)) + + // TODO: add support bi-directional gRPC authentication and authorization + flags.StringVar(&c.gRPCAddress, "grpc-addr", "localhost:6566", "address on which to bind the gRPC server") + + // TODO: add some better way to specify the test, e.g. an execution segment + // sequence + some sort of a way to map instances with specific segments + // (e.g. key-value tags that can be matched to every execution segment, with + // each instance advertising its own tags when it connects). + flags.IntVar(&c.instanceCount, "instance-count", 1, "number of distributed instances") + return flags +} + +func getCmdCoordnator(gs *state.GlobalState) *cobra.Command { + c := &cmdCoordinator{ + gs: gs, + } + + coordinatorCmd := &cobra.Command{ + Use: "coordinator", + Short: "Start a distributed load test", + Long: `TODO`, + RunE: c.run, + Hidden: true, // TODO: remove when officially released + } + + coordinatorCmd.Flags().SortFlags = false + coordinatorCmd.Flags().AddFlagSet(c.flagSet()) + + return coordinatorCmd +} diff --git a/cmd/root.go b/cmd/root.go index b50a7de56ad..61c7baa9571 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -64,6 +64,7 @@ func newRootCommand(gs *state.GlobalState) *rootCommand { getCmdArchive, getCmdCloud, getCmdNewScript, getCmdInspect, getCmdLogin, getCmdPause, getCmdResume, getCmdScale, getCmdRun, getCmdStats, getCmdStatus, getCmdVersion, + getCmdAgent, getCmdCoordnator, } for _, sc := range subCommands { diff --git a/cmd/run.go b/cmd/run.go index 953869912a6..ed1ba22edbb 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -37,12 +37,13 @@ import ( "go.k6.io/k6/ui/pb" ) -// cmdRun handles the `k6 run` sub-command -type cmdRun struct { +// cmdsRunAndAgent handles the `k6 run` and `k6 agent` sub-commands +type cmdsRunAndAgent struct { gs *state.GlobalState // TODO: figure out something more elegant? loadConfiguredTest func(cmd *cobra.Command, args []string) (*loadedAndConfiguredTest, execution.Controller, error) + testEndHook func(err error) } const ( @@ -60,7 +61,7 @@ const ( // TODO: split apart some more // //nolint:funlen,gocognit,gocyclo,cyclop -func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { +func (c *cmdsRunAndAgent) run(cmd *cobra.Command, args []string) (err error) { var logger logrus.FieldLogger = c.gs.Logger defer func() { if err == nil { @@ -68,6 +69,9 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { } else { logger.WithError(err).Debug("Everything has finished, exiting k6 with an error!") } + if c.testEndHook != nil { + c.testEndHook(err) + } }() printBanner(c.gs) @@ -435,7 +439,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { return nil } -func (c *cmdRun) flagSet() *pflag.FlagSet { +func (c *cmdsRunAndAgent) flagSet() *pflag.FlagSet { flags := pflag.NewFlagSet("", pflag.ContinueOnError) flags.SortFlags = false flags.AddFlagSet(optionFlagSet()) @@ -444,7 +448,7 @@ func (c *cmdRun) flagSet() *pflag.FlagSet { return flags } -func (c *cmdRun) setupTracerProvider(ctx context.Context, test *loadedAndConfiguredTest) error { +func (c *cmdsRunAndAgent) setupTracerProvider(ctx context.Context, test *loadedAndConfiguredTest) error { ro := test.preInitState.RuntimeOptions if ro.TracesOutput.String == "none" { test.preInitState.TracerProvider = trace.NewNoopTracerProvider() @@ -461,7 +465,7 @@ func (c *cmdRun) setupTracerProvider(ctx context.Context, test *loadedAndConfigu } func getCmdRun(gs *state.GlobalState) *cobra.Command { - c := &cmdRun{ + c := &cmdsRunAndAgent{ gs: gs, loadConfiguredTest: func(cmd *cobra.Command, args []string) (*loadedAndConfiguredTest, execution.Controller, error) { test, err := loadAndConfigureLocalTest(gs, cmd, args, getConfig) diff --git a/execution/distributed/agent.go b/execution/distributed/agent.go new file mode 100644 index 00000000000..b29e4b6de18 --- /dev/null +++ b/execution/distributed/agent.go @@ -0,0 +1,193 @@ +package distributed + +import ( + context "context" + "errors" + "sync" + + "github.com/sirupsen/logrus" + "go.k6.io/k6/execution" +) + +// AgentController implements the execution.Controller interface for distributed +// tests. Every `k6 agent` in the test can use it to synchronize itself with the +// other instances. Itq sends requests to the coordinator, listens to +// responses and controls the local test on the agent instance. +type AgentController struct { + instanceID uint32 + cnc DistributedTest_CommandAndControlClient + logger logrus.FieldLogger + + // TODO: something much more robust and nicer to use... + doneWaitQueuesLock sync.Mutex + doneWaitQueues map[string]chan *ControllerMessage_DoneWaitWithID + dataReceiveQueuesLock sync.Mutex + dataReceiveQueues map[string]chan *ControllerMessage_DataWithID + createDataQueuesLock sync.Mutex + createDataQueues map[string]chan *ControllerMessage_CreateDataWithID +} + +var _ execution.Controller = &AgentController{} + +// NewAgentController creates a new AgentController for the given instance. It +// uses the supplied k6 coordinator server to synchronize the current instance +// with other instances in the same test run. +func NewAgentController( + ctx context.Context, instanceID uint32, client DistributedTestClient, parentLogger logrus.FieldLogger, +) (*AgentController, error) { + cnc, err := client.CommandAndControl(ctx) + if err != nil { + return nil, err + } + + logger := parentLogger.WithField("component", "agent-controller") + logger.Debugf("Sending instance ID %d to coordinator", instanceID) + err = cnc.Send(&AgentMessage{Message: &AgentMessage_InitInstanceID{instanceID}}) + if err != nil { + return nil, err + } + + ac := &AgentController{ + instanceID: instanceID, + cnc: cnc, + logger: logger, + doneWaitQueues: make(map[string]chan *ControllerMessage_DoneWaitWithID), + dataReceiveQueues: make(map[string]chan *ControllerMessage_DataWithID), + createDataQueues: make(map[string]chan *ControllerMessage_CreateDataWithID), + } + + go func() { + for { + msgContainer, err := cnc.Recv() + if err != nil { + logger.WithError(err).Debug("received an unexpected error from recv stream") + return + } + + switch msg := msgContainer.Message.(type) { + case *ControllerMessage_DoneWaitWithID: + ac.doneWaitQueuesLock.Lock() + ac.doneWaitQueues[msg.DoneWaitWithID] <- msg + ac.doneWaitQueuesLock.Unlock() + case *ControllerMessage_DataWithID: + ac.dataReceiveQueuesLock.Lock() + ac.dataReceiveQueues[msg.DataWithID.Id] <- msg + ac.dataReceiveQueuesLock.Unlock() + case *ControllerMessage_CreateDataWithID: + ac.createDataQueuesLock.Lock() + ac.createDataQueues[msg.CreateDataWithID] <- msg + ac.createDataQueuesLock.Unlock() + default: + logger.Errorf("Unknown controller message type '%#v'", msg) + } + } + }() + + return ac, nil +} + +func errStr(err error) string { + if err != nil { + return err.Error() + } + return "" +} + +// GetOrCreateData requests the data chunk with the given id, if it already +// exists. If it doesn't (i.e. if this is the first time this function is called +// with that id), the given callback is called and its result and error are +// saved for the id. +// +// This is an atomic function, so any calls to it while the callback is being +// executed the the same ID will wait for it to finish. +func (c *AgentController) GetOrCreateData(dataID string, callback func() ([]byte, error)) ([]byte, error) { + c.logger.Debugf("GetOrCreateData(%s)", dataID) + + msg := &AgentMessage{Message: &AgentMessage_GetOrCreateDataWithID{dataID}} + c.dataReceiveQueuesLock.Lock() + chGetData := make(chan *ControllerMessage_DataWithID) + c.dataReceiveQueues[dataID] = chGetData + c.dataReceiveQueuesLock.Unlock() + + c.createDataQueuesLock.Lock() + chCreateData := make(chan *ControllerMessage_CreateDataWithID) + c.createDataQueues[dataID] = chCreateData + c.createDataQueuesLock.Unlock() + + if err := c.cnc.Send(msg); err != nil { + return nil, err + } + + var result []byte + var err error + select { + case <-chCreateData: + c.logger.Debugf("We get to create the data for %s", dataID) + result, err = callback() + msgBack := &AgentMessage{ + Message: &AgentMessage_CreatedData{CreatedData: &DataPacket{ + Id: dataID, + Data: result, + Error: errStr(err), + }}, + } + if serr := c.cnc.Send(msgBack); err != nil { + c.logger.Errorf("Could not send back data message: %s", serr) + } + case data := <-chGetData: + c.logger.Debugf("Received data for %s", dataID) + result = data.DataWithID.Data + if data.DataWithID.Error != "" { + err = errors.New(data.DataWithID.Error) + } + } + + c.dataReceiveQueuesLock.Lock() + delete(c.dataReceiveQueues, dataID) + c.dataReceiveQueuesLock.Unlock() + + c.createDataQueuesLock.Lock() + delete(c.createDataQueues, dataID) + c.createDataQueuesLock.Unlock() + + return result, err +} + +// Subscribe creates a listener for the specified event ID and returns a +// callback that can wait until all other instances have reache it. +func (c *AgentController) Subscribe(eventID string) (wait func() error) { + c.logger.Debugf("Subscribe(%s)", eventID) + + c.doneWaitQueuesLock.Lock() + ch := make(chan *ControllerMessage_DoneWaitWithID) + c.doneWaitQueues[eventID] = ch + c.doneWaitQueuesLock.Unlock() + + // TODO: implement proper error handling, network outage handling, etc. + return func() error { + <-ch + c.doneWaitQueuesLock.Lock() + delete(c.doneWaitQueues, eventID) + c.doneWaitQueuesLock.Unlock() + return nil + } +} + +// Signal sends a signal to the coordinator that the current instance has +// reached the given event ID, or that it has had an error. +func (c *AgentController) Signal(eventID string, sigErr error) error { + c.logger.Debugf("Signal(%s, %q)", eventID, sigErr) + + msg := &AgentMessage{Message: &AgentMessage_SignalAndWaitOnID{eventID}} + if sigErr != nil { + // TODO: something a bit more robust and information-packed, also + // including the event ID in the error + msg.Message = &AgentMessage_Error{sigErr.Error()} + } + + if err := c.cnc.Send(msg); err != nil { + c.logger.Errorf("Signal(%s) got an unexpected error: %s", eventID, err) + return err + } + return nil +} diff --git a/execution/distributed/coordinator.go b/execution/distributed/coordinator.go new file mode 100644 index 00000000000..42a22a067af --- /dev/null +++ b/execution/distributed/coordinator.go @@ -0,0 +1,284 @@ +package distributed + +import ( + "bytes" + context "context" + "encoding/json" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/sirupsen/logrus" + "go.k6.io/k6/lib" +) + +// CoordinatorServer coordinates multiple k6 agents. +// +// TODO: something more robust and polished... +type CoordinatorServer struct { + UnimplementedDistributedTestServer + instanceCount int + test *lib.Archive + logger logrus.FieldLogger + + testStartTimeLock sync.Mutex + testStartTime *time.Time + + cc *coordinatorController + currentInstance int32 // TODO: something a bit better, support full execution plans from JSON? + ess lib.ExecutionSegmentSequence + archive []byte + wg *sync.WaitGroup +} + +// NewCoordinatorServer initializes and returns a new CoordinatorServer. +func NewCoordinatorServer( + instanceCount int, test *lib.Archive, logger logrus.FieldLogger, +) (*CoordinatorServer, error) { + segments, err := test.Options.ExecutionSegment.Split(int64(instanceCount)) + if err != nil { + return nil, err + } + ess, err := lib.NewExecutionSegmentSequence(segments...) + if err != nil { + return nil, err + } + + // TODO: figure out some way to add metrics from the instance to the metricsEngine + + buf := &bytes.Buffer{} + if err = test.Write(buf); err != nil { + return nil, err + } + + wg := &sync.WaitGroup{} + wg.Add(instanceCount) + + cs := &CoordinatorServer{ + instanceCount: instanceCount, + test: test, + logger: logger, + ess: ess, + cc: newCoordinatorController(instanceCount, logger), + archive: buf.Bytes(), + wg: wg, + } + + go cs.monitorProgress() + + return cs, nil +} + +func (cs *CoordinatorServer) monitorProgress() { + wg := cs.cc.getSignalWG("test-start") // TODO: use constant when we refactor scheduler.go + wg.Wait() + cs.logger.Info("All instances ready to start initializing VUs...") + + wg = cs.cc.getSignalWG("test-ready-to-run-setup") // TODO: use constant when we refactor scheduler.go + wg.Wait() + cs.logger.Info("VUs initialized, setup()...") + cs.testStartTimeLock.Lock() + t := time.Now() + cs.testStartTime = &t + cs.testStartTimeLock.Unlock() + + wg = cs.cc.getSignalWG("setup-done") // TODO: use constant when we refactor scheduler.go + wg.Wait() + cs.logger.Info("setup() done, starting test!") + + wg = cs.cc.getSignalWG("test-done") // TODO: use constant when we refactor scheduler.go + wg.Wait() + cs.logger.Info("Instances finished with the test") +} + +// GetCurrentTestRunDuration returns how long the current test has been running. +func (cs *CoordinatorServer) GetCurrentTestRunDuration() time.Duration { + cs.testStartTimeLock.Lock() + startTime := cs.testStartTime + cs.testStartTimeLock.Unlock() + + if startTime == nil { + return 0 + } + return time.Since(*startTime) +} + +// Register allows an instance to register itself to the coordinator. +func (cs *CoordinatorServer) Register(context.Context, *RegisterRequest) (*RegisterResponse, error) { + instanceID := atomic.AddInt32(&cs.currentInstance, 1) + if instanceID > int32(cs.instanceCount) { + return nil, fmt.Errorf("we don't need any more instances") + } + cs.logger.Infof("Instance %d of %d connected!", instanceID, cs.instanceCount) + + instanceOptions := cs.test.Options + instanceOptions.ExecutionSegment = cs.ess[instanceID-1] + instanceOptions.ExecutionSegmentSequence = &cs.ess + options, err := json.Marshal(instanceOptions) + if err != nil { + return nil, err + } + + return &RegisterResponse{ + InstanceID: uint32(instanceID), + Archive: cs.archive, + Options: options, + }, nil +} + +// CommandAndControl handles bi-directional messages from and to an individual +// k6 agent instance. +func (cs *CoordinatorServer) CommandAndControl(stream DistributedTest_CommandAndControlServer) error { + defer cs.wg.Done() + msgContainer, err := stream.Recv() + if err != nil { + return err + } + + initInstMsg, ok := msgContainer.Message.(*AgentMessage_InitInstanceID) + if !ok { + return fmt.Errorf("received wrong message type") + } + + return cs.cc.handleInstanceStream(initInstMsg.InitInstanceID, stream) +} + +// Wait blocks until all instances have disconnected. +func (cs *CoordinatorServer) Wait() { + cs.wg.Wait() +} + +type coordinatorController struct { + logger logrus.FieldLogger + + dataRegistryLock sync.Mutex + dataRegistry map[string]*dataWaiter + + signalsLock sync.Mutex + signals map[string]*sync.WaitGroup + + instanceCount int +} + +type dataWaiter struct { + once sync.Once + done chan struct{} + data []byte + err string +} + +func newCoordinatorController(instanceCount int, logger logrus.FieldLogger) *coordinatorController { + return &coordinatorController{ + logger: logger, + instanceCount: instanceCount, + dataRegistry: make(map[string]*dataWaiter), + signals: make(map[string]*sync.WaitGroup), + } +} + +func (cc *coordinatorController) getSignalWG(signalID string) *sync.WaitGroup { + cc.signalsLock.Lock() + wg, ok := cc.signals[signalID] + if !ok { + wg = &sync.WaitGroup{} + wg.Add(cc.instanceCount) + cc.signals[signalID] = wg + } + cc.signalsLock.Unlock() + return wg +} + +func (cc *coordinatorController) getDataWaiter(dwID string) *dataWaiter { + cc.dataRegistryLock.Lock() + dw, ok := cc.dataRegistry[dwID] + if !ok { + dw = &dataWaiter{ + done: make(chan struct{}), + } + cc.dataRegistry[dwID] = dw + } + cc.dataRegistryLock.Unlock() + return dw +} + +// TODO: split apart and simplify +func (cc *coordinatorController) handleInstanceStream( + instanceID uint32, stream DistributedTest_CommandAndControlServer, +) (err error) { + cc.logger.Debug("Starting to handle command and control stream for instance %d", instanceID) + defer cc.logger.Infof("Instance %d disconnected", instanceID) + + handleSignal := func(id string, wg *sync.WaitGroup) { + wg.Done() + wg.Wait() + err := stream.Send(&ControllerMessage{ + InstanceID: instanceID, + Message: &ControllerMessage_DoneWaitWithID{id}, + }) + if err != nil { + cc.logger.Error(err) + } + } + handleData := func(id string, dw *dataWaiter) { + thisInstanceCreatedTheData := false + dw.once.Do(func() { + err := stream.Send(&ControllerMessage{ + InstanceID: instanceID, + Message: &ControllerMessage_CreateDataWithID{id}, + }) + if err != nil { + cc.logger.Error(err) + } + <-dw.done + thisInstanceCreatedTheData = true + }) + if thisInstanceCreatedTheData { + return // nothing to do + } + err := stream.Send(&ControllerMessage{ + InstanceID: instanceID, + Message: &ControllerMessage_DataWithID{DataWithID: &DataPacket{ + Id: id, + Data: dw.data, + Error: dw.err, + }}, + }) + if err != nil { + cc.logger.Error(err) + } + } + + for { + msgContainer, err := stream.Recv() + if err != nil { + return err + } + + switch msg := msgContainer.Message.(type) { + case *AgentMessage_Error: + // TODO: handle errors from instances + + case *AgentMessage_SignalAndWaitOnID: + wg := cc.getSignalWG(msg.SignalAndWaitOnID) + go handleSignal(msg.SignalAndWaitOnID, wg) + + case *AgentMessage_GetOrCreateDataWithID: + dw := cc.getDataWaiter(msg.GetOrCreateDataWithID) + go handleData(msg.GetOrCreateDataWithID, dw) + + case *AgentMessage_CreatedData: + cc.dataRegistryLock.Lock() + dw, ok := cc.dataRegistry[msg.CreatedData.Id] + if !ok { + return fmt.Errorf("expected data waiter object for %s to be created already", msg.CreatedData.Id) + } + cc.dataRegistryLock.Unlock() + dw.data = msg.CreatedData.Data + dw.err = msg.CreatedData.Error + close(dw.done) + default: + return fmt.Errorf("unknown controller message type '%#v'", msg) + } + } +} diff --git a/execution/distributed/distributed.pb.go b/execution/distributed/distributed.pb.go new file mode 100644 index 00000000000..92221687a88 --- /dev/null +++ b/execution/distributed/distributed.pb.go @@ -0,0 +1,607 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.21.12 +// source: distributed.proto + +package distributed + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type RegisterRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *RegisterRequest) Reset() { + *x = RegisterRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_distributed_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RegisterRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RegisterRequest) ProtoMessage() {} + +func (x *RegisterRequest) ProtoReflect() protoreflect.Message { + mi := &file_distributed_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RegisterRequest.ProtoReflect.Descriptor instead. +func (*RegisterRequest) Descriptor() ([]byte, []int) { + return file_distributed_proto_rawDescGZIP(), []int{0} +} + +type RegisterResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + InstanceID uint32 `protobuf:"varint,1,opt,name=instanceID,proto3" json:"instanceID,omitempty"` + Archive []byte `protobuf:"bytes,2,opt,name=archive,proto3" json:"archive,omitempty"` // TODO: send this with a `stream` of bytes chunks + Options []byte `protobuf:"bytes,3,opt,name=options,proto3" json:"options,omitempty"` +} + +func (x *RegisterResponse) Reset() { + *x = RegisterResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_distributed_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RegisterResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RegisterResponse) ProtoMessage() {} + +func (x *RegisterResponse) ProtoReflect() protoreflect.Message { + mi := &file_distributed_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RegisterResponse.ProtoReflect.Descriptor instead. +func (*RegisterResponse) Descriptor() ([]byte, []int) { + return file_distributed_proto_rawDescGZIP(), []int{1} +} + +func (x *RegisterResponse) GetInstanceID() uint32 { + if x != nil { + return x.InstanceID + } + return 0 +} + +func (x *RegisterResponse) GetArchive() []byte { + if x != nil { + return x.Archive + } + return nil +} + +func (x *RegisterResponse) GetOptions() []byte { + if x != nil { + return x.Options + } + return nil +} + +type AgentMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // TODO: actually use random session IDs to prevent spoofing + // + // Types that are assignable to Message: + // *AgentMessage_Error + // *AgentMessage_InitInstanceID + // *AgentMessage_SignalAndWaitOnID + // *AgentMessage_GetOrCreateDataWithID + // *AgentMessage_CreatedData + Message isAgentMessage_Message `protobuf_oneof:"Message"` +} + +func (x *AgentMessage) Reset() { + *x = AgentMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_distributed_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *AgentMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AgentMessage) ProtoMessage() {} + +func (x *AgentMessage) ProtoReflect() protoreflect.Message { + mi := &file_distributed_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AgentMessage.ProtoReflect.Descriptor instead. +func (*AgentMessage) Descriptor() ([]byte, []int) { + return file_distributed_proto_rawDescGZIP(), []int{2} +} + +func (m *AgentMessage) GetMessage() isAgentMessage_Message { + if m != nil { + return m.Message + } + return nil +} + +func (x *AgentMessage) GetError() string { + if x, ok := x.GetMessage().(*AgentMessage_Error); ok { + return x.Error + } + return "" +} + +func (x *AgentMessage) GetInitInstanceID() uint32 { + if x, ok := x.GetMessage().(*AgentMessage_InitInstanceID); ok { + return x.InitInstanceID + } + return 0 +} + +func (x *AgentMessage) GetSignalAndWaitOnID() string { + if x, ok := x.GetMessage().(*AgentMessage_SignalAndWaitOnID); ok { + return x.SignalAndWaitOnID + } + return "" +} + +func (x *AgentMessage) GetGetOrCreateDataWithID() string { + if x, ok := x.GetMessage().(*AgentMessage_GetOrCreateDataWithID); ok { + return x.GetOrCreateDataWithID + } + return "" +} + +func (x *AgentMessage) GetCreatedData() *DataPacket { + if x, ok := x.GetMessage().(*AgentMessage_CreatedData); ok { + return x.CreatedData + } + return nil +} + +type isAgentMessage_Message interface { + isAgentMessage_Message() +} + +type AgentMessage_Error struct { + Error string `protobuf:"bytes,1,opt,name=error,proto3,oneof"` +} + +type AgentMessage_InitInstanceID struct { + InitInstanceID uint32 `protobuf:"varint,2,opt,name=initInstanceID,proto3,oneof"` +} + +type AgentMessage_SignalAndWaitOnID struct { + SignalAndWaitOnID string `protobuf:"bytes,3,opt,name=signalAndWaitOnID,proto3,oneof"` +} + +type AgentMessage_GetOrCreateDataWithID struct { + GetOrCreateDataWithID string `protobuf:"bytes,4,opt,name=getOrCreateDataWithID,proto3,oneof"` +} + +type AgentMessage_CreatedData struct { + CreatedData *DataPacket `protobuf:"bytes,5,opt,name=createdData,proto3,oneof"` +} + +func (*AgentMessage_Error) isAgentMessage_Message() {} + +func (*AgentMessage_InitInstanceID) isAgentMessage_Message() {} + +func (*AgentMessage_SignalAndWaitOnID) isAgentMessage_Message() {} + +func (*AgentMessage_GetOrCreateDataWithID) isAgentMessage_Message() {} + +func (*AgentMessage_CreatedData) isAgentMessage_Message() {} + +type ControllerMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + InstanceID uint32 `protobuf:"varint,1,opt,name=instanceID,proto3" json:"instanceID,omitempty"` + // Types that are assignable to Message: + // *ControllerMessage_DoneWaitWithID + // *ControllerMessage_CreateDataWithID + // *ControllerMessage_DataWithID + Message isControllerMessage_Message `protobuf_oneof:"Message"` +} + +func (x *ControllerMessage) Reset() { + *x = ControllerMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_distributed_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ControllerMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ControllerMessage) ProtoMessage() {} + +func (x *ControllerMessage) ProtoReflect() protoreflect.Message { + mi := &file_distributed_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ControllerMessage.ProtoReflect.Descriptor instead. +func (*ControllerMessage) Descriptor() ([]byte, []int) { + return file_distributed_proto_rawDescGZIP(), []int{3} +} + +func (x *ControllerMessage) GetInstanceID() uint32 { + if x != nil { + return x.InstanceID + } + return 0 +} + +func (m *ControllerMessage) GetMessage() isControllerMessage_Message { + if m != nil { + return m.Message + } + return nil +} + +func (x *ControllerMessage) GetDoneWaitWithID() string { + if x, ok := x.GetMessage().(*ControllerMessage_DoneWaitWithID); ok { + return x.DoneWaitWithID + } + return "" +} + +func (x *ControllerMessage) GetCreateDataWithID() string { + if x, ok := x.GetMessage().(*ControllerMessage_CreateDataWithID); ok { + return x.CreateDataWithID + } + return "" +} + +func (x *ControllerMessage) GetDataWithID() *DataPacket { + if x, ok := x.GetMessage().(*ControllerMessage_DataWithID); ok { + return x.DataWithID + } + return nil +} + +type isControllerMessage_Message interface { + isControllerMessage_Message() +} + +type ControllerMessage_DoneWaitWithID struct { + DoneWaitWithID string `protobuf:"bytes,2,opt,name=doneWaitWithID,proto3,oneof"` +} + +type ControllerMessage_CreateDataWithID struct { + CreateDataWithID string `protobuf:"bytes,3,opt,name=createDataWithID,proto3,oneof"` +} + +type ControllerMessage_DataWithID struct { + DataWithID *DataPacket `protobuf:"bytes,4,opt,name=dataWithID,proto3,oneof"` +} + +func (*ControllerMessage_DoneWaitWithID) isControllerMessage_Message() {} + +func (*ControllerMessage_CreateDataWithID) isControllerMessage_Message() {} + +func (*ControllerMessage_DataWithID) isControllerMessage_Message() {} + +type DataPacket struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` + Error string `protobuf:"bytes,3,opt,name=error,proto3" json:"error,omitempty"` +} + +func (x *DataPacket) Reset() { + *x = DataPacket{} + if protoimpl.UnsafeEnabled { + mi := &file_distributed_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DataPacket) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DataPacket) ProtoMessage() {} + +func (x *DataPacket) ProtoReflect() protoreflect.Message { + mi := &file_distributed_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DataPacket.ProtoReflect.Descriptor instead. +func (*DataPacket) Descriptor() ([]byte, []int) { + return file_distributed_proto_rawDescGZIP(), []int{4} +} + +func (x *DataPacket) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *DataPacket) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +func (x *DataPacket) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +var File_distributed_proto protoreflect.FileDescriptor + +var file_distributed_proto_rawDesc = []byte{ + 0x0a, 0x11, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x12, 0x0b, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, + 0x22, 0x11, 0x0a, 0x0f, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x22, 0x66, 0x0a, 0x10, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1e, 0x0a, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, + 0x6e, 0x63, 0x65, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x69, 0x6e, 0x73, + 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x44, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x72, 0x63, 0x68, 0x69, + 0x76, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, + 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x80, 0x02, 0x0a, 0x0c, + 0x41, 0x67, 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x16, 0x0a, 0x05, + 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x05, 0x65, + 0x72, 0x72, 0x6f, 0x72, 0x12, 0x28, 0x0a, 0x0e, 0x69, 0x6e, 0x69, 0x74, 0x49, 0x6e, 0x73, 0x74, + 0x61, 0x6e, 0x63, 0x65, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x48, 0x00, 0x52, 0x0e, + 0x69, 0x6e, 0x69, 0x74, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x44, 0x12, 0x2e, + 0x0a, 0x11, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x41, 0x6e, 0x64, 0x57, 0x61, 0x69, 0x74, 0x4f, + 0x6e, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x11, 0x73, 0x69, 0x67, + 0x6e, 0x61, 0x6c, 0x41, 0x6e, 0x64, 0x57, 0x61, 0x69, 0x74, 0x4f, 0x6e, 0x49, 0x44, 0x12, 0x36, + 0x0a, 0x15, 0x67, 0x65, 0x74, 0x4f, 0x72, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, 0x61, 0x74, + 0x61, 0x57, 0x69, 0x74, 0x68, 0x49, 0x44, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, + 0x15, 0x67, 0x65, 0x74, 0x4f, 0x72, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, 0x61, 0x74, 0x61, + 0x57, 0x69, 0x74, 0x68, 0x49, 0x44, 0x12, 0x3b, 0x0a, 0x0b, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, + 0x64, 0x44, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x64, 0x69, + 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, + 0x63, 0x6b, 0x65, 0x74, 0x48, 0x00, 0x52, 0x0b, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x44, + 0x61, 0x74, 0x61, 0x42, 0x09, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0xd1, + 0x01, 0x0a, 0x11, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x12, 0x1e, 0x0a, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, + 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, + 0x63, 0x65, 0x49, 0x44, 0x12, 0x28, 0x0a, 0x0e, 0x64, 0x6f, 0x6e, 0x65, 0x57, 0x61, 0x69, 0x74, + 0x57, 0x69, 0x74, 0x68, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x0e, + 0x64, 0x6f, 0x6e, 0x65, 0x57, 0x61, 0x69, 0x74, 0x57, 0x69, 0x74, 0x68, 0x49, 0x44, 0x12, 0x2c, + 0x0a, 0x10, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, 0x61, 0x74, 0x61, 0x57, 0x69, 0x74, 0x68, + 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x10, 0x63, 0x72, 0x65, 0x61, + 0x74, 0x65, 0x44, 0x61, 0x74, 0x61, 0x57, 0x69, 0x74, 0x68, 0x49, 0x44, 0x12, 0x39, 0x0a, 0x0a, + 0x64, 0x61, 0x74, 0x61, 0x57, 0x69, 0x74, 0x68, 0x49, 0x44, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x17, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x44, + 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x48, 0x00, 0x52, 0x0a, 0x64, 0x61, 0x74, + 0x61, 0x57, 0x69, 0x74, 0x68, 0x49, 0x44, 0x42, 0x09, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x22, 0x46, 0x0a, 0x0a, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, + 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, + 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, + 0x64, 0x61, 0x74, 0x61, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x32, 0xb2, 0x01, 0x0a, 0x0f, 0x44, + 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x54, 0x65, 0x73, 0x74, 0x12, 0x49, + 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x12, 0x1c, 0x2e, 0x64, 0x69, 0x73, + 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, + 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, + 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x54, 0x0a, 0x11, 0x43, 0x6f, 0x6d, + 0x6d, 0x61, 0x6e, 0x64, 0x41, 0x6e, 0x64, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x12, 0x19, + 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x41, 0x67, 0x65, + 0x6e, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1e, 0x2e, 0x64, 0x69, 0x73, 0x74, + 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, + 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, + 0x23, 0x5a, 0x21, 0x67, 0x6f, 0x2e, 0x6b, 0x36, 0x2e, 0x69, 0x6f, 0x2f, 0x6b, 0x36, 0x2f, 0x65, + 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, + 0x75, 0x74, 0x65, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_distributed_proto_rawDescOnce sync.Once + file_distributed_proto_rawDescData = file_distributed_proto_rawDesc +) + +func file_distributed_proto_rawDescGZIP() []byte { + file_distributed_proto_rawDescOnce.Do(func() { + file_distributed_proto_rawDescData = protoimpl.X.CompressGZIP(file_distributed_proto_rawDescData) + }) + return file_distributed_proto_rawDescData +} + +var file_distributed_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_distributed_proto_goTypes = []interface{}{ + (*RegisterRequest)(nil), // 0: distributed.RegisterRequest + (*RegisterResponse)(nil), // 1: distributed.RegisterResponse + (*AgentMessage)(nil), // 2: distributed.AgentMessage + (*ControllerMessage)(nil), // 3: distributed.ControllerMessage + (*DataPacket)(nil), // 4: distributed.DataPacket +} +var file_distributed_proto_depIdxs = []int32{ + 4, // 0: distributed.AgentMessage.createdData:type_name -> distributed.DataPacket + 4, // 1: distributed.ControllerMessage.dataWithID:type_name -> distributed.DataPacket + 0, // 2: distributed.DistributedTest.Register:input_type -> distributed.RegisterRequest + 2, // 3: distributed.DistributedTest.CommandAndControl:input_type -> distributed.AgentMessage + 1, // 4: distributed.DistributedTest.Register:output_type -> distributed.RegisterResponse + 3, // 5: distributed.DistributedTest.CommandAndControl:output_type -> distributed.ControllerMessage + 4, // [4:6] is the sub-list for method output_type + 2, // [2:4] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_distributed_proto_init() } +func file_distributed_proto_init() { + if File_distributed_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_distributed_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RegisterRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_distributed_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RegisterResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_distributed_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*AgentMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_distributed_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ControllerMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_distributed_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*DataPacket); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_distributed_proto_msgTypes[2].OneofWrappers = []interface{}{ + (*AgentMessage_Error)(nil), + (*AgentMessage_InitInstanceID)(nil), + (*AgentMessage_SignalAndWaitOnID)(nil), + (*AgentMessage_GetOrCreateDataWithID)(nil), + (*AgentMessage_CreatedData)(nil), + } + file_distributed_proto_msgTypes[3].OneofWrappers = []interface{}{ + (*ControllerMessage_DoneWaitWithID)(nil), + (*ControllerMessage_CreateDataWithID)(nil), + (*ControllerMessage_DataWithID)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_distributed_proto_rawDesc, + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_distributed_proto_goTypes, + DependencyIndexes: file_distributed_proto_depIdxs, + MessageInfos: file_distributed_proto_msgTypes, + }.Build() + File_distributed_proto = out.File + file_distributed_proto_rawDesc = nil + file_distributed_proto_goTypes = nil + file_distributed_proto_depIdxs = nil +} diff --git a/execution/distributed/distributed.proto b/execution/distributed/distributed.proto new file mode 100644 index 00000000000..8d9da559b37 --- /dev/null +++ b/execution/distributed/distributed.proto @@ -0,0 +1,45 @@ +syntax = "proto3"; + +package distributed; + +option go_package = "go.k6.io/k6/execution/distributed"; + +service DistributedTest { + rpc Register(RegisterRequest) returns (RegisterResponse) {}; + + rpc CommandAndControl(stream AgentMessage) + returns (stream ControllerMessage) {}; +} + +message RegisterRequest {} +message RegisterResponse { + uint32 instanceID = 1; + bytes archive = 2; // TODO: send this with a `stream` of bytes chunks + bytes options = 3; +} + +message AgentMessage { + // TODO: actually use random session IDs to prevent spoofing + oneof Message { + string error = 1; + uint32 initInstanceID = 2; + string signalAndWaitOnID = 3; + string getOrCreateDataWithID = 4; + DataPacket createdData = 5; + } +} + +message ControllerMessage { + uint32 instanceID = 1; + oneof Message { + string doneWaitWithID = 2; + string createDataWithID = 3; + DataPacket dataWithID = 4; + } +} + +message DataPacket { + string id = 1; + bytes data = 2; + string error = 3; +} \ No newline at end of file diff --git a/execution/distributed/distributed_grpc.pb.go b/execution/distributed/distributed_grpc.pb.go new file mode 100644 index 00000000000..41f6c9afc21 --- /dev/null +++ b/execution/distributed/distributed_grpc.pb.go @@ -0,0 +1,174 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.21.12 +// source: distributed.proto + +package distributed + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// DistributedTestClient is the client API for DistributedTest service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type DistributedTestClient interface { + Register(ctx context.Context, in *RegisterRequest, opts ...grpc.CallOption) (*RegisterResponse, error) + CommandAndControl(ctx context.Context, opts ...grpc.CallOption) (DistributedTest_CommandAndControlClient, error) +} + +type distributedTestClient struct { + cc grpc.ClientConnInterface +} + +func NewDistributedTestClient(cc grpc.ClientConnInterface) DistributedTestClient { + return &distributedTestClient{cc} +} + +func (c *distributedTestClient) Register(ctx context.Context, in *RegisterRequest, opts ...grpc.CallOption) (*RegisterResponse, error) { + out := new(RegisterResponse) + err := c.cc.Invoke(ctx, "/distributed.DistributedTest/Register", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *distributedTestClient) CommandAndControl(ctx context.Context, opts ...grpc.CallOption) (DistributedTest_CommandAndControlClient, error) { + stream, err := c.cc.NewStream(ctx, &DistributedTest_ServiceDesc.Streams[0], "/distributed.DistributedTest/CommandAndControl", opts...) + if err != nil { + return nil, err + } + x := &distributedTestCommandAndControlClient{stream} + return x, nil +} + +type DistributedTest_CommandAndControlClient interface { + Send(*AgentMessage) error + Recv() (*ControllerMessage, error) + grpc.ClientStream +} + +type distributedTestCommandAndControlClient struct { + grpc.ClientStream +} + +func (x *distributedTestCommandAndControlClient) Send(m *AgentMessage) error { + return x.ClientStream.SendMsg(m) +} + +func (x *distributedTestCommandAndControlClient) Recv() (*ControllerMessage, error) { + m := new(ControllerMessage) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// DistributedTestServer is the server API for DistributedTest service. +// All implementations must embed UnimplementedDistributedTestServer +// for forward compatibility +type DistributedTestServer interface { + Register(context.Context, *RegisterRequest) (*RegisterResponse, error) + CommandAndControl(DistributedTest_CommandAndControlServer) error + mustEmbedUnimplementedDistributedTestServer() +} + +// UnimplementedDistributedTestServer must be embedded to have forward compatible implementations. +type UnimplementedDistributedTestServer struct { +} + +func (UnimplementedDistributedTestServer) Register(context.Context, *RegisterRequest) (*RegisterResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Register not implemented") +} +func (UnimplementedDistributedTestServer) CommandAndControl(DistributedTest_CommandAndControlServer) error { + return status.Errorf(codes.Unimplemented, "method CommandAndControl not implemented") +} +func (UnimplementedDistributedTestServer) mustEmbedUnimplementedDistributedTestServer() {} + +// UnsafeDistributedTestServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to DistributedTestServer will +// result in compilation errors. +type UnsafeDistributedTestServer interface { + mustEmbedUnimplementedDistributedTestServer() +} + +func RegisterDistributedTestServer(s grpc.ServiceRegistrar, srv DistributedTestServer) { + s.RegisterService(&DistributedTest_ServiceDesc, srv) +} + +func _DistributedTest_Register_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RegisterRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DistributedTestServer).Register(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/distributed.DistributedTest/Register", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DistributedTestServer).Register(ctx, req.(*RegisterRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _DistributedTest_CommandAndControl_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(DistributedTestServer).CommandAndControl(&distributedTestCommandAndControlServer{stream}) +} + +type DistributedTest_CommandAndControlServer interface { + Send(*ControllerMessage) error + Recv() (*AgentMessage, error) + grpc.ServerStream +} + +type distributedTestCommandAndControlServer struct { + grpc.ServerStream +} + +func (x *distributedTestCommandAndControlServer) Send(m *ControllerMessage) error { + return x.ServerStream.SendMsg(m) +} + +func (x *distributedTestCommandAndControlServer) Recv() (*AgentMessage, error) { + m := new(AgentMessage) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// DistributedTest_ServiceDesc is the grpc.ServiceDesc for DistributedTest service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var DistributedTest_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "distributed.DistributedTest", + HandlerType: (*DistributedTestServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Register", + Handler: _DistributedTest_Register_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "CommandAndControl", + Handler: _DistributedTest_CommandAndControl_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "distributed.proto", +} diff --git a/execution/distributed/gen.go b/execution/distributed/gen.go new file mode 100644 index 00000000000..031c63aaf31 --- /dev/null +++ b/execution/distributed/gen.go @@ -0,0 +1,5 @@ +// Package distributed implements the execution.Controller interface for +// distributed (multi-instance) k6 execution. +package distributed + +//go:generate protoc --go-grpc_opt=paths=source_relative --go_opt=paths=source_relative --go_out=./ --go-grpc_out=./ ./distributed.proto