diff --git a/cmd/crowdsec-cli/clipapi/papi.go b/cmd/crowdsec-cli/clipapi/papi.go index 7ac2455d28f..c954e3ab996 100644 --- a/cmd/crowdsec-cli/clipapi/papi.go +++ b/cmd/crowdsec-cli/clipapi/papi.go @@ -134,7 +134,7 @@ func (cli *cliPapi) sync(ctx context.Context, out io.Writer, db *database.Client return fmt.Errorf("unable to initialize PAPI client: %w", err) } - t.Go(papi.SyncDecisions) + t.Go(func() error { return papi.SyncDecisions(ctx) }) err = papi.PullOnce(ctx, time.Time{}, true) if err != nil { diff --git a/cmd/crowdsec/lpmetrics.go b/cmd/crowdsec/lpmetrics.go index 24842851294..48eeaae91b9 100644 --- a/cmd/crowdsec/lpmetrics.go +++ b/cmd/crowdsec/lpmetrics.go @@ -130,6 +130,27 @@ func (m *MetricsProvider) metricsPayload() *models.AllMetrics { } } +func (m *MetricsProvider) sendMetrics(ctx context.Context, met *models.AllMetrics) { + defer trace.CatchPanic("crowdsec/MetricsProvider.sendMetrics") + + ctxTime, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + _, resp, err := m.apic.UsageMetrics.Add(ctxTime, met) + switch { + case errors.Is(err, context.DeadlineExceeded): + m.logger.Warnf("timeout sending lp metrics") + case err != nil && resp != nil && resp.Response.StatusCode == http.StatusNotFound: + m.logger.Warnf("metrics endpoint not found, older LAPI?") + case err != nil: + m.logger.Warnf("failed to send lp metrics: %s", err) + case resp.Response.StatusCode != http.StatusCreated: + m.logger.Warnf("failed to send lp metrics: %s", resp.Response.Status) + default: + m.logger.Tracef("lp usage metrics sent") + } +} + func (m *MetricsProvider) Run(ctx context.Context, myTomb *tomb.Tomb) error { defer trace.CatchPanic("crowdsec/MetricsProvider.Run") @@ -144,34 +165,8 @@ func (m *MetricsProvider) Run(ctx context.Context, myTomb *tomb.Tomb) error { for { select { case <-ticker.C: - ctxTime, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - - _, resp, err := m.apic.UsageMetrics.Add(ctxTime, met) - switch { - case errors.Is(err, context.DeadlineExceeded): - m.logger.Warnf("timeout sending lp metrics") - ticker.Reset(m.interval) - continue - case err != nil && resp != nil && resp.Response.StatusCode == http.StatusNotFound: - m.logger.Warnf("metrics endpoint not found, older LAPI?") - ticker.Reset(m.interval) - continue - case err != nil: - m.logger.Warnf("failed to send lp metrics: %s", err) - ticker.Reset(m.interval) - continue - } - - if resp.Response.StatusCode != http.StatusCreated { - m.logger.Warnf("failed to send lp metrics: %s", resp.Response.Status) - ticker.Reset(m.interval) - continue - } - + m.sendMetrics(ctx, met) ticker.Reset(m.interval) - - m.logger.Tracef("lp usage metrics sent") case <-myTomb.Dying(): ticker.Stop() return nil diff --git a/pkg/apiserver/apic.go b/pkg/apiserver/apic.go index 32847f7489a..2c606dcbaee 100644 --- a/pkg/apiserver/apic.go +++ b/pkg/apiserver/apic.go @@ -35,7 +35,7 @@ import ( const ( // delta values must be smaller than the interval pullIntervalDefault = time.Hour * 2 - pullIntervalDelta = 5 * time.Minute + pullIntervalDelta = time.Minute * 5 pushIntervalDefault = time.Second * 10 pushIntervalDelta = time.Second * 7 metricsIntervalDefault = time.Minute * 30 @@ -363,6 +363,15 @@ func shouldShareAlert(alert *models.Alert, consoleConfig *csconfig.ConsoleConfig return true } +func (a *apic) sendBatch(ctx context.Context, signals []*models.AddSignalsRequestItem) error { + ctxBatch, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + _, _, err := a.apiClient.Signal.Add(ctxBatch, (*models.AddSignalsRequest)(&signals)) + + return err +} + func (a *apic) Send(ctx context.Context, cacheOrig *models.AddSignalsRequest) { /*we do have a problem with this : The apic.Push background routine reads from alertToPush chan. @@ -375,44 +384,21 @@ func (a *apic) Send(ctx context.Context, cacheOrig *models.AddSignalsRequest) { I don't know enough about gin to tell how much of an issue it can be. */ - var ( - cache []*models.AddSignalsRequestItem = *cacheOrig - send models.AddSignalsRequest - ) - - bulkSize := 50 - pageStart := 0 - pageEnd := bulkSize - - for { - if pageEnd >= len(cache) { - send = cache[pageStart:] - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + var cache []*models.AddSignalsRequestItem = *cacheOrig - defer cancel() + batchSize := 50 - _, _, err := a.apiClient.Signal.Add(ctx, &send) - if err != nil { - log.Errorf("sending signal to central API: %s", err) - return - } + for start := 0; start < len(cache); start += batchSize { + end := start + batchSize - break + if end > len(cache) { + end = len(cache) } - send = cache[pageStart:pageEnd] - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - - defer cancel() - - _, _, err := a.apiClient.Signal.Add(ctx, &send) - if err != nil { - // we log it here as well, because the return value of func might be discarded + if err := a.sendBatch(ctx, cache[start:end]); err != nil { log.Errorf("sending signal to central API: %s", err) + return } - - pageStart += bulkSize - pageEnd += bulkSize } } diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index a9ab45cebde..a14e656fa19 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -332,8 +332,8 @@ func (s *APIServer) papiPull(ctx context.Context) error { return nil } -func (s *APIServer) papiSync() error { - if err := s.papi.SyncDecisions(); err != nil { +func (s *APIServer) papiSync(ctx context.Context) error { + if err := s.papi.SyncDecisions(ctx); err != nil { log.Errorf("capi decisions sync: %s", err) return err } @@ -351,7 +351,7 @@ func (s *APIServer) initAPIC(ctx context.Context) { if s.papi.URL != "" { log.Info("Starting PAPI decision receiver") s.papi.pullTomb.Go(func() error { return s.papiPull(ctx) }) - s.papi.syncTomb.Go(s.papiSync) + s.papi.syncTomb.Go(func() error { return s.papiSync(ctx) }) } else { log.Warnf("papi_url is not set in online_api_credentials.yaml, can't synchronize with the console. Run cscli console enable console_management to add it.") } diff --git a/pkg/apiserver/papi.go b/pkg/apiserver/papi.go index 7f494c98bf4..cddaabb87cc 100644 --- a/pkg/apiserver/papi.go +++ b/pkg/apiserver/papi.go @@ -287,7 +287,7 @@ func (p *Papi) Pull(ctx context.Context) error { return nil } -func (p *Papi) SyncDecisions() error { +func (p *Papi) SyncDecisions(ctx context.Context) error { defer trace.CatchPanic("lapi/syncDecisionsToCAPI") var cache models.DecisionsDeleteRequest @@ -304,7 +304,7 @@ func (p *Papi) SyncDecisions() error { return nil } - go p.SendDeletedDecisions(&cache) + go p.SendDeletedDecisions(ctx, &cache) return nil case <-ticker.C: @@ -315,7 +315,7 @@ func (p *Papi) SyncDecisions() error { p.mu.Unlock() p.Logger.Infof("sync decisions: %d deleted decisions to push", len(cacheCopy)) - go p.SendDeletedDecisions(&cacheCopy) + go p.SendDeletedDecisions(ctx, &cacheCopy) } case deletedDecisions := <-p.Channels.DeleteDecisionChannel: if (p.consoleConfig.ShareManualDecisions != nil && *p.consoleConfig.ShareManualDecisions) || (p.consoleConfig.ConsoleManagement != nil && *p.consoleConfig.ConsoleManagement) { @@ -335,45 +335,34 @@ func (p *Papi) SyncDecisions() error { } } -func (p *Papi) SendDeletedDecisions(cacheOrig *models.DecisionsDeleteRequest) { - var ( - cache []models.DecisionsDeleteRequestItem = *cacheOrig - send models.DecisionsDeleteRequest - ) +func (p *Papi) sendDeletedDecisionsBatch(ctx context.Context, decisions []models.DecisionsDeleteRequestItem) error { + ctxBatch, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() - bulkSize := 50 - pageStart := 0 - pageEnd := bulkSize - - for { - if pageEnd >= len(cache) { - send = cache[pageStart:] - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + _, _, err := p.apiClient.DecisionDelete.Add(ctxBatch, (*models.DecisionsDeleteRequest)(&decisions)) + if err != nil { + return err + } - defer cancel() + return nil +} - _, _, err := p.apiClient.DecisionDelete.Add(ctx, &send) - if err != nil { - p.Logger.Errorf("sending deleted decisions to central API: %s", err) - return - } +func (p *Papi) SendDeletedDecisions(ctx context.Context, cacheOrig *models.DecisionsDeleteRequest) { + var cache []models.DecisionsDeleteRequestItem = *cacheOrig - break - } + batchSize := 50 - send = cache[pageStart:pageEnd] - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + for start := 0; start < len(cache); start += batchSize { + end := start + batchSize - defer cancel() + if end > len(cache) { + end = len(cache) + } - _, _, err := p.apiClient.DecisionDelete.Add(ctx, &send) - if err != nil { - // we log it here as well, because the return value of func might be discarded + if err := p.sendDeletedDecisionsBatch(ctx, cache[start:end]); err != nil { p.Logger.Errorf("sending deleted decisions to central API: %s", err) + return } - - pageStart += bulkSize - pageEnd += bulkSize } } diff --git a/pkg/csplugin/broker.go b/pkg/csplugin/broker.go index f53c831e186..3d040459638 100644 --- a/pkg/csplugin/broker.go +++ b/pkg/csplugin/broker.go @@ -63,8 +63,7 @@ type PluginConfig struct { Format string `yaml:"format,omitempty"` // specific to notification plugins - Config map[string]interface{} `yaml:",inline"` //to keep the plugin-specific config - + Config map[string]interface{} `yaml:",inline"` // to keep the plugin-specific config } type ProfileAlert struct { @@ -82,14 +81,18 @@ func (pb *PluginBroker) Init(ctx context.Context, pluginCfg *csconfig.PluginCfg, pb.profileConfigs = profileConfigs pb.pluginProcConfig = pluginCfg pb.pluginsTypesToDispatch = make(map[string]struct{}) + if err := pb.loadConfig(configPaths.NotificationDir); err != nil { return fmt.Errorf("while loading plugin config: %w", err) } + if err := pb.loadPlugins(ctx, configPaths.PluginDir); err != nil { return fmt.Errorf("while loading plugin: %w", err) } + pb.watcher = PluginWatcher{} pb.watcher.Init(pb.pluginConfigByName, pb.alertsByPluginName) + return nil } @@ -100,8 +103,11 @@ func (pb *PluginBroker) Kill() { } func (pb *PluginBroker) Run(pluginTomb *tomb.Tomb) { - //we get signaled via the channel when notifications need to be delivered to plugin (via the watcher) + // we get signaled via the channel when notifications need to be delivered to plugin (via the watcher) + ctx := context.TODO() + pb.watcher.Start(&tomb.Tomb{}) + for { select { case profileAlert := <-pb.PluginChannel: @@ -114,14 +120,16 @@ func (pb *PluginBroker) Run(pluginTomb *tomb.Tomb) { tmpAlerts := pb.alertsByPluginName[pluginName] pb.alertsByPluginName[pluginName] = make([]*models.Alert, 0) pluginMutex.Unlock() + go func() { - //Chunk alerts to respect group_threshold + // Chunk alerts to respect group_threshold threshold := pb.pluginConfigByName[pluginName].GroupThreshold if threshold == 0 { threshold = 1 } + for _, chunk := range slicetools.Chunks(tmpAlerts, threshold) { - if err := pb.pushNotificationsToPlugin(pluginName, chunk); err != nil { + if err := pb.pushNotificationsToPlugin(ctx, pluginName, chunk); err != nil { log.WithField("plugin:", pluginName).Error(err) } } @@ -130,11 +138,13 @@ func (pb *PluginBroker) Run(pluginTomb *tomb.Tomb) { case <-pluginTomb.Dying(): log.Infof("pluginTomb dying") pb.watcher.tomb.Kill(errors.New("Terminating")) + for { select { case <-pb.watcher.tomb.Dead(): log.Info("killing all plugins") pb.Kill() + return case pluginName := <-pb.watcher.PluginEvents: // this can be run in goroutine, but then locks will be needed @@ -144,7 +154,7 @@ func (pb *PluginBroker) Run(pluginTomb *tomb.Tomb) { pb.alertsByPluginName[pluginName] = make([]*models.Alert, 0) pluginMutex.Unlock() - if err := pb.pushNotificationsToPlugin(pluginName, tmpAlerts); err != nil { + if err := pb.pushNotificationsToPlugin(ctx, pluginName, tmpAlerts); err != nil { log.WithField("plugin:", pluginName).Error(err) } } @@ -159,6 +169,7 @@ func (pb *PluginBroker) addProfileAlert(profileAlert ProfileAlert) { log.Errorf("plugin %s is not configured properly.", pluginName) continue } + pluginMutex.Lock() pb.alertsByPluginName[pluginName] = append(pb.alertsByPluginName[pluginName], profileAlert.Alert) pluginMutex.Unlock() @@ -174,6 +185,7 @@ func (pb *PluginBroker) profilesContainPlugin(pluginName string) bool { } } } + return false } @@ -182,6 +194,7 @@ func (pb *PluginBroker) loadConfig(path string) error { if err != nil { return err } + for _, configFilePath := range files { if !strings.HasSuffix(configFilePath, ".yaml") && !strings.HasSuffix(configFilePath, ".yml") { continue @@ -191,19 +204,22 @@ func (pb *PluginBroker) loadConfig(path string) error { if err != nil { return err } + for _, pluginConfig := range pluginConfigs { SetRequiredFields(&pluginConfig) + if _, ok := pb.pluginConfigByName[pluginConfig.Name]; ok { log.Warningf("notification '%s' is defined multiple times", pluginConfig.Name) } + pb.pluginConfigByName[pluginConfig.Name] = pluginConfig if !pb.profilesContainPlugin(pluginConfig.Name) { continue } } } - err = pb.verifyPluginConfigsWithProfile() - return err + + return pb.verifyPluginConfigsWithProfile() } // checks whether every notification in profile has its own config file @@ -213,9 +229,11 @@ func (pb *PluginBroker) verifyPluginConfigsWithProfile() error { if _, ok := pb.pluginConfigByName[pluginName]; !ok { return fmt.Errorf("config file for plugin %s not found", pluginName) } + pb.pluginsTypesToDispatch[pb.pluginConfigByName[pluginName].Type] = struct{}{} } } + return nil } @@ -228,6 +246,7 @@ func (pb *PluginBroker) verifyPluginBinaryWithProfile() error { } } } + return nil } @@ -236,14 +255,17 @@ func (pb *PluginBroker) loadPlugins(ctx context.Context, path string) error { if err != nil { return err } + for _, binaryPath := range binaryPaths { if err := pluginIsValid(binaryPath); err != nil { return err } + pType, pSubtype, err := getPluginTypeAndSubtypeFromPath(binaryPath) // eg pType="notification" , pSubtype="slack" if err != nil { return err } + if pType != "notification" { continue } @@ -256,6 +278,7 @@ func (pb *PluginBroker) loadPlugins(ctx context.Context, path string) error { if err != nil { return err } + for _, pc := range pb.pluginConfigByName { if pc.Type != pSubtype { continue @@ -265,15 +288,20 @@ func (pb *PluginBroker) loadPlugins(ctx context.Context, path string) error { if err != nil { return err } + data = []byte(csstring.StrictExpand(string(data), os.LookupEnv)) + _, err = pluginClient.Configure(ctx, &protobufs.Config{Config: data}) if err != nil { return fmt.Errorf("while configuring %s: %w", pc.Name, err) } + log.Infof("registered plugin %s", pc.Name) + pb.notificationPluginByName[pc.Name] = pluginClient } } + return pb.verifyPluginBinaryWithProfile() } @@ -282,13 +310,17 @@ func (pb *PluginBroker) loadNotificationPlugin(name string, binaryPath string) ( if err != nil { return nil, err } + log.Debugf("Executing plugin %s", binaryPath) + cmd, err := pb.CreateCmd(binaryPath) if err != nil { return nil, err } + pb.pluginMap[name] = &NotifierPlugin{} l := log.New() + err = types.ConfigureLogger(l) if err != nil { return nil, err @@ -304,20 +336,44 @@ func (pb *PluginBroker) loadNotificationPlugin(name string, binaryPath string) ( AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC}, Logger: logger, }) + client, err := c.Client() if err != nil { return nil, err } + raw, err := client.Dispense(name) if err != nil { return nil, err } + pb.pluginKillMethods = append(pb.pluginKillMethods, c.Kill) + return raw.(protobufs.NotifierServer), nil } -func (pb *PluginBroker) pushNotificationsToPlugin(pluginName string, alerts []*models.Alert) error { +func (pb *PluginBroker) tryNotify(ctx context.Context, pluginName, message string) error { + timeout := pb.pluginConfigByName[pluginName].TimeOut + ctxTimeout, cancel := context.WithTimeout(ctx, timeout) + + defer cancel() + + plugin := pb.notificationPluginByName[pluginName] + + _, err := plugin.Notify( + ctxTimeout, + &protobufs.Notification{ + Text: message, + Name: pluginName, + }, + ) + + return err +} + +func (pb *PluginBroker) pushNotificationsToPlugin(ctx context.Context, pluginName string, alerts []*models.Alert) error { log.WithField("plugin", pluginName).Debugf("pushing %d alerts to plugin", len(alerts)) + if len(alerts) == 0 { return nil } @@ -326,21 +382,14 @@ func (pb *PluginBroker) pushNotificationsToPlugin(pluginName string, alerts []*m if err != nil { return err } - plugin := pb.notificationPluginByName[pluginName] + backoffDuration := time.Second + for i := 1; i <= pb.pluginConfigByName[pluginName].MaxRetry; i++ { - ctx, cancel := context.WithTimeout(context.Background(), pb.pluginConfigByName[pluginName].TimeOut) - defer cancel() - _, err = plugin.Notify( - ctx, - &protobufs.Notification{ - Text: message, - Name: pluginName, - }, - ) - if err == nil { + if err = pb.tryNotify(ctx, pluginName, message); err == nil { return nil } + log.WithField("plugin", pluginName).Errorf("%s error, retry num %d", err, i) time.Sleep(backoffDuration) backoffDuration *= 2 @@ -351,27 +400,34 @@ func (pb *PluginBroker) pushNotificationsToPlugin(pluginName string, alerts []*m func ParsePluginConfigFile(path string) ([]PluginConfig, error) { parsedConfigs := make([]PluginConfig, 0) + yamlFile, err := os.Open(path) if err != nil { return nil, fmt.Errorf("while opening %s: %w", path, err) } + dec := yaml.NewDecoder(yamlFile) dec.SetStrict(true) + for { pc := PluginConfig{} + err = dec.Decode(&pc) if err != nil { if errors.Is(err, io.EOF) { break } + return nil, fmt.Errorf("while decoding %s got error %s", path, err) } // if the yaml document is empty, skip if reflect.DeepEqual(pc, PluginConfig{}) { continue } + parsedConfigs = append(parsedConfigs, pc) } + return parsedConfigs, nil } @@ -390,6 +446,7 @@ func getUUID() (string, error) { if err != nil { return "", err } + return uuidv4.String(), nil } @@ -398,11 +455,13 @@ func getHandshake() (plugin.HandshakeConfig, error) { if err != nil { return plugin.HandshakeConfig{}, err } + handshake := plugin.HandshakeConfig{ ProtocolVersion: PluginProtocolVersion, MagicCookieKey: CrowdsecPluginKey, MagicCookieValue: uuid, } + return handshake, nil } @@ -411,10 +470,13 @@ func FormatAlerts(format string, alerts []*models.Alert) (string, error) { if err != nil { return "", err } + b := new(strings.Builder) + err = template.Execute(b, alerts) if err != nil { return "", err } + return b.String(), nil } diff --git a/pkg/leakybucket/manager_load.go b/pkg/leakybucket/manager_load.go index cdf8f080773..9216c7f6724 100644 --- a/pkg/leakybucket/manager_load.go +++ b/pkg/leakybucket/manager_load.go @@ -234,100 +234,109 @@ func compileScopeFilter(bucketFactory *BucketFactory) error { return nil } -func LoadBuckets(cscfg *csconfig.CrowdsecServiceCfg, hub *cwhub.Hub, scenarios []*cwhub.Item, tomb *tomb.Tomb, buckets *Buckets, orderEvent bool) ([]BucketFactory, chan types.Event, error) { - var ( - ret = []BucketFactory{} - response chan types.Event - ) +func loadBucketFactoriesFromFile(item *cwhub.Item, hub *cwhub.Hub, buckets *Buckets, tomb *tomb.Tomb, response chan types.Event, orderEvent bool, simulationConfig *csconfig.SimulationConfig) ([]BucketFactory, error) { + itemPath := item.State.LocalPath - response = make(chan types.Event, 1) + // process the yaml + bucketConfigurationFile, err := os.Open(itemPath) + if err != nil { + log.Errorf("Can't access leaky configuration file %s", itemPath) + return nil, err + } - for _, item := range scenarios { - log.Debugf("Loading '%s'", item.State.LocalPath) + defer bucketConfigurationFile.Close() + dec := yaml.NewDecoder(bucketConfigurationFile) + dec.SetStrict(true) - itemPath := item.State.LocalPath + factories := []BucketFactory{} - // process the yaml - bucketConfigurationFile, err := os.Open(itemPath) + for { + bucketFactory := BucketFactory{} + + err = dec.Decode(&bucketFactory) if err != nil { - log.Errorf("Can't access leaky configuration file %s", itemPath) - return nil, nil, err - } + if !errors.Is(err, io.EOF) { + log.Errorf("Bad yaml in %s: %v", itemPath, err) + return nil, fmt.Errorf("bad yaml in %s: %w", itemPath, err) + } - defer bucketConfigurationFile.Close() - dec := yaml.NewDecoder(bucketConfigurationFile) - dec.SetStrict(true) + log.Tracef("End of yaml file") - for { - bucketFactory := BucketFactory{} + break + } - err = dec.Decode(&bucketFactory) - if err != nil { - if !errors.Is(err, io.EOF) { - log.Errorf("Bad yaml in %s: %v", itemPath, err) - return nil, nil, fmt.Errorf("bad yaml in %s: %w", itemPath, err) - } + bucketFactory.DataDir = hub.GetDataDir() + // check empty + if bucketFactory.Name == "" { + log.Errorf("Won't load nameless bucket") + return nil, errors.New("nameless bucket") + } + // check compat + if bucketFactory.FormatVersion == "" { + log.Tracef("no version in %s : %s, assuming '1.0'", bucketFactory.Name, itemPath) + bucketFactory.FormatVersion = "1.0" + } - log.Tracef("End of yaml file") + ok, err := constraint.Satisfies(bucketFactory.FormatVersion, constraint.Scenario) + if err != nil { + return nil, fmt.Errorf("failed to check version: %w", err) + } - break - } + if !ok { + log.Errorf("can't load %s : %s doesn't satisfy scenario format %s, skip", bucketFactory.Name, bucketFactory.FormatVersion, constraint.Scenario) + continue + } - bucketFactory.DataDir = hub.GetDataDir() - // check empty - if bucketFactory.Name == "" { - log.Errorf("Won't load nameless bucket") - return nil, nil, errors.New("nameless bucket") - } - // check compat - if bucketFactory.FormatVersion == "" { - log.Tracef("no version in %s : %s, assuming '1.0'", bucketFactory.Name, itemPath) - bucketFactory.FormatVersion = "1.0" - } + bucketFactory.Filename = filepath.Clean(itemPath) + bucketFactory.BucketName = seed.Generate() + bucketFactory.ret = response - ok, err := constraint.Satisfies(bucketFactory.FormatVersion, constraint.Scenario) - if err != nil { - return nil, nil, fmt.Errorf("failed to check version: %w", err) - } + if simulationConfig != nil { + bucketFactory.Simulated = simulationConfig.IsSimulated(bucketFactory.Name) + } - if !ok { - log.Errorf("can't load %s : %s doesn't satisfy scenario format %s, skip", bucketFactory.Name, bucketFactory.FormatVersion, constraint.Scenario) - continue - } + bucketFactory.ScenarioVersion = item.State.LocalVersion + bucketFactory.hash = item.State.LocalHash - bucketFactory.Filename = filepath.Clean(itemPath) - bucketFactory.BucketName = seed.Generate() - bucketFactory.ret = response + bucketFactory.wgDumpState = buckets.wgDumpState + bucketFactory.wgPour = buckets.wgPour - if cscfg.SimulationConfig != nil { - bucketFactory.Simulated = cscfg.SimulationConfig.IsSimulated(bucketFactory.Name) - } + err = LoadBucket(&bucketFactory, tomb) + if err != nil { + log.Errorf("Failed to load bucket %s: %v", bucketFactory.Name, err) + return nil, fmt.Errorf("loading of %s failed: %w", bucketFactory.Name, err) + } + + bucketFactory.orderEvent = orderEvent - bucketFactory.ScenarioVersion = item.State.LocalVersion - bucketFactory.hash = item.State.LocalHash + factories = append(factories, bucketFactory) + } - bucketFactory.wgDumpState = buckets.wgDumpState - bucketFactory.wgPour = buckets.wgPour + return factories, nil +} - err = LoadBucket(&bucketFactory, tomb) - if err != nil { - log.Errorf("Failed to load bucket %s: %v", bucketFactory.Name, err) - return nil, nil, fmt.Errorf("loading of %s failed: %w", bucketFactory.Name, err) - } +func LoadBuckets(cscfg *csconfig.CrowdsecServiceCfg, hub *cwhub.Hub, scenarios []*cwhub.Item, tomb *tomb.Tomb, buckets *Buckets, orderEvent bool) ([]BucketFactory, chan types.Event, error) { + allFactories := []BucketFactory{} + response := make(chan types.Event, 1) - bucketFactory.orderEvent = orderEvent + for _, item := range scenarios { + log.Debugf("Loading '%s'", item.State.LocalPath) - ret = append(ret, bucketFactory) + factories, err := loadBucketFactoriesFromFile(item, hub, buckets, tomb, response, orderEvent, cscfg.SimulationConfig) + if err != nil { + return nil, nil, err } + + allFactories = append(allFactories, factories...) } if err := alertcontext.NewAlertContext(cscfg.ContextToSend, cscfg.ConsoleContextValueLength); err != nil { return nil, nil, fmt.Errorf("unable to load alert context: %w", err) } - log.Infof("Loaded %d scenarios", len(ret)) + log.Infof("Loaded %d scenarios", len(allFactories)) - return ret, response, nil + return allFactories, response, nil } /* Init recursively process yaml files from a directory and loads them as BucketFactory */