From 9eb6fd472ca0df11d1e75beb63cebf0ac5871ffc Mon Sep 17 00:00:00 2001 From: daveaugustus Date: Mon, 16 Dec 2024 13:21:01 +0530 Subject: [PATCH 1/5] Async reindexing Signed-off-by: daveaugustus --- .../cmd/chef-automate/reindexing.go | 156 +++++++++++++++--- 1 file changed, 135 insertions(+), 21 deletions(-) diff --git a/components/automate-cli/cmd/chef-automate/reindexing.go b/components/automate-cli/cmd/chef-automate/reindexing.go index 4bbd9b04c69..191c8aac1ee 100644 --- a/components/automate-cli/cmd/chef-automate/reindexing.go +++ b/components/automate-cli/cmd/chef-automate/reindexing.go @@ -6,8 +6,10 @@ import ( "fmt" "io" "net/http" + "os" "strings" "sync" + "time" "github.com/pkg/errors" "github.com/spf13/cobra" @@ -44,9 +46,22 @@ var infoReindexing = ` Reindexing of Elasticsearch/OpenSearch indices if needed. ` +type ReindexStatus struct { + IsRunning bool `json:"is_running"` + CurrentIndex string `json:"current_index,omitempty"` + Completed []string `json:"completed,omitempty"` + LastUpdatedAt string `json:"last_updated_at"` +} + +var ( + reindexStatusFile = "/hab/reindex_status.json" + statusMu sync.Mutex +) + func init() { reindexCmd.SetUsageTemplate(infoReindexing) RootCmd.AddCommand(reindexCmd) + RootCmd.AddCommand(statusCmd) } var reindexCmd = &cobra.Command{ @@ -55,6 +70,12 @@ var reindexCmd = &cobra.Command{ RunE: runReindex, } +var statusCmd = &cobra.Command{ + Use: "status", + Short: "Check the reindexing status", + RunE: checkReindexingStatus, +} + var isReindexing bool var mu sync.Mutex @@ -66,27 +87,32 @@ var skipIndices = map[string]bool{ } func runReindex(cmd *cobra.Command, args []string) error { - fmt.Println("Reindexing Elasticsearch/OpenSearch indices.") - mu.Lock() - if isReindexing { - mu.Unlock() - fmt.Println("Reindexing is already in progress. Please wait for it to complete.") + fmt.Println("Starting reindexing process...") + + statusMu.Lock() + status := loadReindexStatus() + if status.IsRunning { + statusMu.Unlock() + fmt.Println("Reindexing is already in progress. Current status:") + printReindexStatus(status) return nil } - isReindexing = true - mu.Unlock() - defer func() { - mu.Lock() - isReindexing = false - mu.Unlock() - }() + // Mark reindexing as started + status.IsRunning = true + status.LastUpdatedAt = time.Now().Format(time.RFC3339) + saveReindexStatus(status) + statusMu.Unlock() + fmt.Println("Reindexing process started.") indices, err := fetchIndices() if err != nil { + fmt.Printf("Error fetching indices: %v\n", err) return err } + // Create a channel to handle reindexing concurrently + done := make(chan string) Loop1: for _, index := range indices { for prefix := range skipIndices { @@ -95,26 +121,114 @@ Loop1: continue Loop1 } } - settings, err := fetchIndexSettingsVersion(index.Index) + + // settings, err := fetchIndexSettingsVersion(index.Index) if err != nil { fmt.Printf("Error fetching settings for index %s: %v\n", index.Index, err) continue } - if settings.Settings.Index.Version.CreatedString != settings.Settings.Index.Version.UpgradedString { - fmt.Printf("Reindexing required for index: %s\n", index.Index) - if err := triggerReindex(index.Index); err != nil { - fmt.Printf("Error reindexing index %s: %v\n", index.Index, err) + // if settings.Settings.Index.Version.CreatedString != settings.Settings.Index.Version.UpgradedString { + fmt.Printf("Reindexing required for index: %s\n", index.Index) + go func(idx string) { + if err := reindexIndex(idx); err != nil { + fmt.Printf("Failed to reindex %s: %v\n", idx, err) } - } else { - fmt.Printf("Index %s is up to date. Skipping reindex.\n", index.Index) - } - } + done <- idx + }(index.Index) + // } else { + // fmt.Printf("Index %s is up to date. Skipping reindex.\n", index.Index) + // } + } + + // Wait for reindexing to complete + for range indices { + completedIndex := <-done + statusMu.Lock() + status.Completed = append(status.Completed, completedIndex) + status.LastUpdatedAt = time.Now().Format(time.RFC3339) + saveReindexStatus(status) + statusMu.Unlock() + } + + // Mark reindexing as finished + statusMu.Lock() + status.IsRunning = false + status.CurrentIndex = "" + status.LastUpdatedAt = time.Now().Format(time.RFC3339) + saveReindexStatus(status) + statusMu.Unlock() fmt.Println("Reindexing process completed.") return nil } +func reindexIndex(index string) error { + fmt.Printf("Reindexing index: %s\n", index) + + // Trigger the actual reindexing logic (placeholder) + err := triggerReindex(index) + if err != nil { + return fmt.Errorf("reindexing failed for index %s: %v", index, err) + } + + fmt.Printf("Reindexing completed for index: %s\n", index) + return nil +} + +func checkReindexingStatus(cmd *cobra.Command, args []string) error { + status := loadReindexStatus() + printReindexStatus(status) + return nil +} + +func printReindexStatus(status ReindexStatus) { + fmt.Println("Reindexing Status:") + fmt.Printf(" Is Running: %v\n", status.IsRunning) + if status.IsRunning { + fmt.Printf(" Current Index: %s\n", status.CurrentIndex) + } + fmt.Printf(" Completed Indices: %v\n", status.Completed) + fmt.Printf(" Last Updated At: %s\n", status.LastUpdatedAt) +} + +func loadReindexStatus() ReindexStatus { + statusMu.Lock() + defer statusMu.Unlock() + + var status ReindexStatus + data, err := os.ReadFile(reindexStatusFile) + if err != nil { + if os.IsNotExist(err) { + return status // Return default status if file doesn't exist + } + fmt.Printf("Failed to read reindex status: %v\n", err) + return status + } + + if err := json.Unmarshal(data, &status); err != nil { + fmt.Printf("Failed to parse reindex status: %v\n", err) + return status + } + + return status +} + +func saveReindexStatus(status ReindexStatus) { + statusMu.Lock() + defer statusMu.Unlock() + + data, err := json.MarshalIndent(status, "", " ") + if err != nil { + fmt.Printf("Failed to serialize reindex status: %v\n", err) + return + } + + if err := os.WriteFile(reindexStatusFile, data, 0644); err != nil { + fmt.Printf("Failed to save reindex status: %v\n", err) + } +} + func fetchIndices() (Indices, error) { fmt.Println("Fetching indices from Elasticsearch/OpenSearch.") resp, err := http.Get("http://127.0.0.1:10144/_cat/indices?format=json") From 657099cd448ac62d5d4123487797b06b61c23c7f Mon Sep 17 00:00:00 2001 From: daveaugustus Date: Wed, 18 Dec 2024 13:42:58 +0530 Subject: [PATCH 2/5] Detach process group Signed-off-by: daveaugustus --- .../cmd/chef-automate/reindexing.go | 385 ++++++++++-------- 1 file changed, 222 insertions(+), 163 deletions(-) diff --git a/components/automate-cli/cmd/chef-automate/reindexing.go b/components/automate-cli/cmd/chef-automate/reindexing.go index 191c8aac1ee..27158db46d6 100644 --- a/components/automate-cli/cmd/chef-automate/reindexing.go +++ b/components/automate-cli/cmd/chef-automate/reindexing.go @@ -7,14 +7,46 @@ import ( "io" "net/http" "os" + "os/exec" "strings" "sync" + "syscall" "time" "github.com/pkg/errors" "github.com/spf13/cobra" ) +type OpensearchInfo struct { + Name string `json:"name"` + ClusterName string `json:"cluster_name"` + ClusterUUID string `json:"cluster_uuid"` + Version struct { + Distribution string `json:"distribution"` + Number string `json:"number"` + BuildType string `json:"build_type"` + BuildHash string `json:"build_hash"` + BuildSnapshot bool `json:"build_snapshot"` + LuceneVersion string `json:"lucene_version"` + MinimumWireCompatibilityVersion string `json:"minimum_wire_compatibility_version"` + MinimumIndexCompatibilityVersion string `json:"minimum_index_compatibility_version"` + } `json:"version"` + Tagline string `json:"tagline"` +} + +type TaskStatus struct { + IndexName string `json:"index_name"` + Status string `json:"status"` + Error string `json:"error,omitempty"` +} + +type Status struct { + Status string `json:"status"` + TaskStatus []TaskStatus `json:"task_status"` + OpenSearchVersion string `json:"opensearch_version"` + LastUpdatedAt string `json:"last_updated_at"` +} + type Indices []struct { Health string `json:"health"` Status string `json:"status"` @@ -46,20 +78,16 @@ var infoReindexing = ` Reindexing of Elasticsearch/OpenSearch indices if needed. ` -type ReindexStatus struct { - IsRunning bool `json:"is_running"` - CurrentIndex string `json:"current_index,omitempty"` - Completed []string `json:"completed,omitempty"` - LastUpdatedAt string `json:"last_updated_at"` -} - var ( - reindexStatusFile = "/hab/reindex_status.json" - statusMu sync.Mutex + statusFile = "status.json" + taskMutex sync.Mutex + taskQueue = make(chan string, 10) // Buffered channel to queue indices + wg sync.WaitGroup // WaitGroup to ensure reindexing complete ) func init() { reindexCmd.SetUsageTemplate(infoReindexing) + reindexCmd.Flags().Bool("background", false, "Run reindexing in the background") RootCmd.AddCommand(reindexCmd) RootCmd.AddCommand(statusCmd) } @@ -71,14 +99,11 @@ var reindexCmd = &cobra.Command{ } var statusCmd = &cobra.Command{ - Use: "status", + Use: "indexing-status", Short: "Check the reindexing status", - RunE: checkReindexingStatus, + RunE: checkStatus, } -var isReindexing bool -var mu sync.Mutex - var skipIndices = map[string]bool{ "security-auditlog": true, ".opendistro": true, @@ -86,147 +111,209 @@ var skipIndices = map[string]bool{ ".opensearch-observability": true, } +func checkStatus(cmd *cobra.Command, args []string) error { + status, err := readStatus() + if err != nil { + fmt.Printf("Error reading status: %v\n", err) + return err + } + + fmt.Printf("Current Status:\n") + fmt.Printf("- Status: %s\n", status.Status) + fmt.Printf("- OpenSearch Version: %s\n", status.OpenSearchVersion) + fmt.Printf("- Last Updated At: %s\n", status.LastUpdatedAt) + for _, task := range status.TaskStatus { + fmt.Printf(" - Index: %s, Status: %s", task.IndexName, task.Status) + if task.Error != "" { + fmt.Printf(", Error: %s", task.Error) + } + fmt.Println() + } + + return nil +} + +// Reads the current task status from the JSON file +func readStatus() (Status, error) { + taskMutex.Lock() + defer taskMutex.Unlock() + + file, err := os.ReadFile(statusFile) + if os.IsNotExist(err) { + return Status{}, nil + } else if err != nil { + return Status{}, err + } + + var status Status + err = json.Unmarshal(file, &status) + if err != nil { + return Status{}, err + } + return status, nil +} + +func updateStatus(status Status) error { + taskMutex.Lock() + defer taskMutex.Unlock() + + status.LastUpdatedAt = time.Now().Format(time.RFC3339) + + file, err := json.MarshalIndent(status, "", " ") + if err != nil { + return err + } + return os.WriteFile(statusFile, file, 0644) +} + +func runInBackground() { + cmd := exec.Command(os.Args[0], append(os.Args[1:], "--background")...) + cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true} // Detach process group + cmd.Stdout = nil + cmd.Stderr = nil + cmd.Stdin = nil + if err := cmd.Start(); err != nil { + fmt.Println("Error starting process in background:", err) + os.Exit(1) + } + fmt.Println("Task is now running in the background with PID:", cmd.Process.Pid) + os.Exit(0) +} + func runReindex(cmd *cobra.Command, args []string) error { + backgroundFlag, _ := cmd.Flags().GetBool("background") + + if !backgroundFlag { + // Check if reindexing is already running + status, err := readStatus() + if err == nil { + for _, task := range status.TaskStatus { + if task.Status == "Running" { + fmt.Println("Reindexing process is already running. Run `chef-automate indexing-status` to get the status.") + return nil + } + } + } + runInBackground() + } + fmt.Println("Starting reindexing process...") - statusMu.Lock() - status := loadReindexStatus() - if status.IsRunning { - statusMu.Unlock() - fmt.Println("Reindexing is already in progress. Current status:") - printReindexStatus(status) + // Fetch current indexing status + status, err := readStatus() + if err != nil { + fmt.Printf("Error reading status: %v\n", err) + return err + } + + if status.Status == "Running" { + fmt.Println("Reindexing process is already running. Run `chef-automate indexing-status` to get the status.") return nil } - // Mark reindexing as started - status.IsRunning = true - status.LastUpdatedAt = time.Now().Format(time.RFC3339) - saveReindexStatus(status) - statusMu.Unlock() + osInfo, err := getOpenSearchInfo() + if err != nil { + fmt.Printf("Error fetching OpenSearch info: %v\n", err) + } - fmt.Println("Reindexing process started.") + status.OpenSearchVersion = osInfo.Version.Number + status.Status = "Running" + fmt.Println("Fetching indices from Elasticsearch/OpenSearch.") indices, err := fetchIndices() if err != nil { fmt.Printf("Error fetching indices: %v\n", err) return err } - // Create a channel to handle reindexing concurrently - done := make(chan string) -Loop1: + // Add indices to the queue for reindexing for _, index := range indices { - for prefix := range skipIndices { - if strings.HasPrefix(index.Index, prefix) { - fmt.Printf("Skipping index %s\n", index.Index) - continue Loop1 + alreadyRunning := false + for _, task := range status.TaskStatus { + if task.IndexName == index.Index && task.Status == "Running" { + fmt.Printf("Task '%s' is already running. Skipping.\n", index.Index) + alreadyRunning = true + break } } - - // settings, err := fetchIndexSettingsVersion(index.Index) - if err != nil { - fmt.Printf("Error fetching settings for index %s: %v\n", index.Index, err) + if alreadyRunning { continue } - // if settings.Settings.Index.Version.CreatedString != settings.Settings.Index.Version.UpgradedString { - fmt.Printf("Reindexing required for index: %s\n", index.Index) - go func(idx string) { - if err := reindexIndex(idx); err != nil { - fmt.Printf("Failed to reindex %s: %v\n", idx, err) - } - done <- idx - }(index.Index) - // } else { - // fmt.Printf("Index %s is up to date. Skipping reindex.\n", index.Index) - // } - } - - // Wait for reindexing to complete - for range indices { - completedIndex := <-done - statusMu.Lock() - status.Completed = append(status.Completed, completedIndex) - status.LastUpdatedAt = time.Now().Format(time.RFC3339) - saveReindexStatus(status) - statusMu.Unlock() - } - - // Mark reindexing as finished - statusMu.Lock() - status.IsRunning = false - status.CurrentIndex = "" - status.LastUpdatedAt = time.Now().Format(time.RFC3339) - saveReindexStatus(status) - statusMu.Unlock() - - fmt.Println("Reindexing process completed.") - return nil -} - -func reindexIndex(index string) error { - fmt.Printf("Reindexing index: %s\n", index) + // Mark reindexing as running + status.TaskStatus = append(status.TaskStatus, TaskStatus{ + IndexName: index.Index, + Status: "Running", + }) + if err := updateStatus(status); err != nil { + fmt.Printf("Error updating status: %v\n", err) + return err + } - // Trigger the actual reindexing logic (placeholder) - err := triggerReindex(index) - if err != nil { - return fmt.Errorf("reindexing failed for index %s: %v", index, err) + wg.Add(1) + taskQueue <- index.Index + fmt.Printf("Task '%s' added to the queue.\n", index.Index) } - fmt.Printf("Reindexing completed for index: %s\n", index) - return nil -} + go processIndices() // Start task index processing + wg.Wait() // Wait for all tasks to complete -func checkReindexingStatus(cmd *cobra.Command, args []string) error { - status := loadReindexStatus() - printReindexStatus(status) + status.Status = "Completed" return nil } -func printReindexStatus(status ReindexStatus) { - fmt.Println("Reindexing Status:") - fmt.Printf(" Is Running: %v\n", status.IsRunning) - if status.IsRunning { - fmt.Printf(" Current Index: %s\n", status.CurrentIndex) +func processIndices() { + for jobName := range taskQueue { + fmt.Printf("Processing task: %s\n", jobName) + err := processReindexing(jobName) + status, readErr := readStatus() + if readErr != nil { + fmt.Printf("Error reading status: %v\n", readErr) + wg.Done() + continue + } + for i := range status.TaskStatus { + if status.TaskStatus[i].IndexName == jobName { + if err != nil { + status.TaskStatus[i].Status = "Error" + status.TaskStatus[i].Error = err.Error() + } else { + status.TaskStatus[i].Status = "Completed" + } + break + } + } + updateErr := updateStatus(status) + if updateErr != nil { + fmt.Printf("Error updating status: %v\n", updateErr) + } + wg.Done() } - fmt.Printf(" Completed Indices: %v\n", status.Completed) - fmt.Printf(" Last Updated At: %s\n", status.LastUpdatedAt) } -func loadReindexStatus() ReindexStatus { - statusMu.Lock() - defer statusMu.Unlock() - - var status ReindexStatus - data, err := os.ReadFile(reindexStatusFile) - if err != nil { - if os.IsNotExist(err) { - return status // Return default status if file doesn't exist +func processReindexing(index string) error { + for prefix := range skipIndices { + if strings.HasPrefix(index, prefix) { + fmt.Printf("Skipping index %s\n", index) + return nil } - fmt.Printf("Failed to read reindex status: %v\n", err) - return status } - if err := json.Unmarshal(data, &status); err != nil { - fmt.Printf("Failed to parse reindex status: %v\n", err) - return status - } - - return status -} - -func saveReindexStatus(status ReindexStatus) { - statusMu.Lock() - defer statusMu.Unlock() - - data, err := json.MarshalIndent(status, "", " ") + settings, err := fetchIndexSettingsVersion(index) if err != nil { - fmt.Printf("Failed to serialize reindex status: %v\n", err) - return + fmt.Printf("Error fetching settings for index %s: %v\n", index, err) + return err } - if err := os.WriteFile(reindexStatusFile, data, 0644); err != nil { - fmt.Printf("Failed to save reindex status: %v\n", err) + if settings.Settings.Index.Version.CreatedString != settings.Settings.Index.Version.UpgradedString { + fmt.Printf("Reindexing required for index: %s\n", index) + if err := triggerReindex(index); err != nil { + fmt.Printf("Failed to reindex %s: %v\n", index, err) + } + } else { + fmt.Printf("Index %s is up to date. Skipping reindex.\n", index) } + + return nil } func fetchIndices() (Indices, error) { @@ -608,28 +695,6 @@ func deleteIndex(index string) error { return nil } -// Close an index -func closeIndex(index string) error { - fmt.Printf("Closing index: %s\n", index) - url := fmt.Sprintf("http://127.0.0.1:10144/%s/_close", index) - req, err := http.NewRequest(http.MethodPost, url, nil) - if err != nil { - return fmt.Errorf("failed to create close request for index %s: %w", index, err) - } - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("failed to close index %s: %w", index, err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("failed to close index %s: %s", index, string(body)) - } - return nil -} - func cloneIndex(sourceIndex, targetIndex string) error { fmt.Printf("Cloning index from %s to %s\n", sourceIndex, targetIndex) url := fmt.Sprintf("http://127.0.0.1:10144/%s/_clone/%s", sourceIndex, targetIndex) @@ -655,28 +720,6 @@ func cloneIndex(sourceIndex, targetIndex string) error { return nil } -// Open an index -func openIndex(index string) error { - fmt.Printf("Opening index: %s\n", index) - url := fmt.Sprintf("http://127.0.0.1:10144/%s/_open", index) - req, err := http.NewRequest(http.MethodPost, url, nil) - if err != nil { - return fmt.Errorf("failed to create open request for index %s: %w", index, err) - } - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("failed to open index %s: %w", index, err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("failed to open index %s: %s", index, string(body)) - } - return nil -} - func setIndexWriteBlock(index string, readOnly bool) error { fmt.Printf("Setting index.blocks.write to %v for index: %s\n", readOnly, index) url := fmt.Sprintf("http://127.0.0.1:10144/%s/_settings", index) @@ -747,3 +790,19 @@ func fetchAliases(index string) ([]string, error) { return aliases, nil } + +func getOpenSearchInfo() (*OpensearchInfo, error) { + resp, err := http.Get("http://127.0.0.1:10144/") + if err != nil { + return nil, fmt.Errorf("failed to fetch OpenSearch info: %w", err) + } + defer resp.Body.Close() + + // Parse the response + var info OpensearchInfo + if err := json.NewDecoder(resp.Body).Decode(&info); err != nil { + return nil, fmt.Errorf("failed to decode OpenSearch info: %w", err) + } + + return &info, nil +} From eb0ccb45478d16f520d5b5453a9fe738be3ff9ff Mon Sep 17 00:00:00 2001 From: daveaugustus Date: Wed, 18 Dec 2024 21:08:02 +0530 Subject: [PATCH 3/5] - Data format corrected - process deattachment - reindex status command --- .../cmd/chef-automate/reindexing.go | 169 ++++++------------ 1 file changed, 50 insertions(+), 119 deletions(-) diff --git a/components/automate-cli/cmd/chef-automate/reindexing.go b/components/automate-cli/cmd/chef-automate/reindexing.go index 27158db46d6..ae45e960213 100644 --- a/components/automate-cli/cmd/chef-automate/reindexing.go +++ b/components/automate-cli/cmd/chef-automate/reindexing.go @@ -34,17 +34,17 @@ type OpensearchInfo struct { Tagline string `json:"tagline"` } -type TaskStatus struct { - IndexName string `json:"index_name"` - Status string `json:"status"` - Error string `json:"error,omitempty"` +type FailedIndex struct { + IndexName string `json:"index_name"` + ErrorMessage string `json:"error_message,omitempty"` } type Status struct { - Status string `json:"status"` - TaskStatus []TaskStatus `json:"task_status"` - OpenSearchVersion string `json:"opensearch_version"` - LastUpdatedAt string `json:"last_updated_at"` + Status string `json:"status"` + CompletedIndex []string `json:"completed_index"` + FailedIndex []FailedIndex `json:"failed_index"` + OpenSearchVersion string `json:"opensearch_version"` + LastUpdatedAt string `json:"last_updated_at"` } type Indices []struct { @@ -79,12 +79,12 @@ Reindexing of Elasticsearch/OpenSearch indices if needed. ` var ( - statusFile = "status.json" + statusFile = "/hab/status.json" taskMutex sync.Mutex - taskQueue = make(chan string, 10) // Buffered channel to queue indices - wg sync.WaitGroup // WaitGroup to ensure reindexing complete ) +var reindexStatus = &Status{} + func init() { reindexCmd.SetUsageTemplate(infoReindexing) reindexCmd.Flags().Bool("background", false, "Run reindexing in the background") @@ -112,20 +112,26 @@ var skipIndices = map[string]bool{ } func checkStatus(cmd *cobra.Command, args []string) error { - status, err := readStatus() + reindexStatus, err := readStatus() if err != nil { fmt.Printf("Error reading status: %v\n", err) return err } fmt.Printf("Current Status:\n") - fmt.Printf("- Status: %s\n", status.Status) - fmt.Printf("- OpenSearch Version: %s\n", status.OpenSearchVersion) - fmt.Printf("- Last Updated At: %s\n", status.LastUpdatedAt) - for _, task := range status.TaskStatus { - fmt.Printf(" - Index: %s, Status: %s", task.IndexName, task.Status) - if task.Error != "" { - fmt.Printf(", Error: %s", task.Error) + fmt.Printf("- Status: %s\n", reindexStatus.Status) + fmt.Printf("- OpenSearch Version: %s\n", reindexStatus.OpenSearchVersion) + fmt.Printf("- Last Updated At: %s\n", reindexStatus.LastUpdatedAt) + fmt.Printf("- Completed Indices:\n") + for _, index := range reindexStatus.CompletedIndex { + fmt.Printf(" - %s\n", index) + } + + fmt.Printf("- Failed Indices:\n") + for _, task := range reindexStatus.FailedIndex { + fmt.Printf(" - Index: %s\n", task.IndexName) + if task.ErrorMessage != "" { + fmt.Printf(", Error: %s", task.ErrorMessage) } fmt.Println() } @@ -153,9 +159,9 @@ func readStatus() (Status, error) { return status, nil } -func updateStatus(status Status) error { - taskMutex.Lock() - defer taskMutex.Unlock() +func updateStatus(status *Status) error { + // taskMutex.Lock() + // defer taskMutex.Unlock() status.LastUpdatedAt = time.Now().Format(time.RFC3339) @@ -163,6 +169,7 @@ func updateStatus(status Status) error { if err != nil { return err } + fmt.Printf("file: %v\n", string(file)) return os.WriteFile(statusFile, file, 0644) } @@ -185,28 +192,16 @@ func runReindex(cmd *cobra.Command, args []string) error { if !backgroundFlag { // Check if reindexing is already running - status, err := readStatus() - if err == nil { - for _, task := range status.TaskStatus { - if task.Status == "Running" { - fmt.Println("Reindexing process is already running. Run `chef-automate indexing-status` to get the status.") - return nil - } - } + if reindexStatus.Status == "Running" { + fmt.Println("Reindexing process is already running. Run `chef-automate indexing-status` to get the status.") + return nil } runInBackground() } fmt.Println("Starting reindexing process...") - // Fetch current indexing status - status, err := readStatus() - if err != nil { - fmt.Printf("Error reading status: %v\n", err) - return err - } - - if status.Status == "Running" { + if reindexStatus.Status == "Running" { fmt.Println("Reindexing process is already running. Run `chef-automate indexing-status` to get the status.") return nil } @@ -216,8 +211,8 @@ func runReindex(cmd *cobra.Command, args []string) error { fmt.Printf("Error fetching OpenSearch info: %v\n", err) } - status.OpenSearchVersion = osInfo.Version.Number - status.Status = "Running" + reindexStatus.OpenSearchVersion = osInfo.Version.Number + reindexStatus.Status = "Running" fmt.Println("Fetching indices from Elasticsearch/OpenSearch.") indices, err := fetchIndices() if err != nil { @@ -227,92 +222,28 @@ func runReindex(cmd *cobra.Command, args []string) error { // Add indices to the queue for reindexing for _, index := range indices { - alreadyRunning := false - for _, task := range status.TaskStatus { - if task.IndexName == index.Index && task.Status == "Running" { - fmt.Printf("Task '%s' is already running. Skipping.\n", index.Index) - alreadyRunning = true - break + for prefix := range skipIndices { + if strings.HasPrefix(index.Index, prefix) { + fmt.Printf("Skipping index %s\n", index.Index) + continue } } - if alreadyRunning { - continue - } - // Mark reindexing as running - status.TaskStatus = append(status.TaskStatus, TaskStatus{ - IndexName: index.Index, - Status: "Running", - }) - if err := updateStatus(status); err != nil { - fmt.Printf("Error updating status: %v\n", err) - return err + if err := triggerReindex(index.Index); err != nil { + reindexStatus.FailedIndex = append(reindexStatus.FailedIndex, FailedIndex{ + IndexName: index.Index, + ErrorMessage: err.Error(), + }) + } else { + reindexStatus.CompletedIndex = append(reindexStatus.CompletedIndex, index.Index) } - wg.Add(1) - taskQueue <- index.Index - fmt.Printf("Task '%s' added to the queue.\n", index.Index) } - go processIndices() // Start task index processing - wg.Wait() // Wait for all tasks to complete - - status.Status = "Completed" - return nil -} - -func processIndices() { - for jobName := range taskQueue { - fmt.Printf("Processing task: %s\n", jobName) - err := processReindexing(jobName) - status, readErr := readStatus() - if readErr != nil { - fmt.Printf("Error reading status: %v\n", readErr) - wg.Done() - continue - } - for i := range status.TaskStatus { - if status.TaskStatus[i].IndexName == jobName { - if err != nil { - status.TaskStatus[i].Status = "Error" - status.TaskStatus[i].Error = err.Error() - } else { - status.TaskStatus[i].Status = "Completed" - } - break - } - } - updateErr := updateStatus(status) - if updateErr != nil { - fmt.Printf("Error updating status: %v\n", updateErr) - } - wg.Done() + reindexStatus.Status = "Completed" + if err := updateStatus(reindexStatus); err != nil { + fmt.Printf("Error updating status: %v\n", err) } -} - -func processReindexing(index string) error { - for prefix := range skipIndices { - if strings.HasPrefix(index, prefix) { - fmt.Printf("Skipping index %s\n", index) - return nil - } - } - - settings, err := fetchIndexSettingsVersion(index) - if err != nil { - fmt.Printf("Error fetching settings for index %s: %v\n", index, err) - return err - } - - if settings.Settings.Index.Version.CreatedString != settings.Settings.Index.Version.UpgradedString { - fmt.Printf("Reindexing required for index: %s\n", index) - if err := triggerReindex(index); err != nil { - fmt.Printf("Failed to reindex %s: %v\n", index, err) - } - } else { - fmt.Printf("Index %s is up to date. Skipping reindex.\n", index) - } - return nil } @@ -489,7 +420,7 @@ func triggerReindex(index string) error { fmt.Println("Aliases updated successfully.") } - if err := setIndexWriteBlock(tempIndex, false); err != nil { + if err := setIndexWriteBlock(index, false); err != nil { return fmt.Errorf("failed to remove write block on temporary index %s: %w", tempIndex, err) } From 65cafb9902c47d9a57821c1565bac2db8c479ce7 Mon Sep 17 00:00:00 2001 From: daveaugustus Date: Thu, 19 Dec 2024 12:49:38 +0530 Subject: [PATCH 4/5] process lock to check if the pid is running in the background Signed-off-by: daveaugustus --- .../cmd/chef-automate/reindexing.go | 73 ++++++++++++++----- 1 file changed, 54 insertions(+), 19 deletions(-) diff --git a/components/automate-cli/cmd/chef-automate/reindexing.go b/components/automate-cli/cmd/chef-automate/reindexing.go index ae45e960213..edd36734e29 100644 --- a/components/automate-cli/cmd/chef-automate/reindexing.go +++ b/components/automate-cli/cmd/chef-automate/reindexing.go @@ -8,6 +8,7 @@ import ( "net/http" "os" "os/exec" + "strconv" "strings" "sync" "syscall" @@ -78,6 +79,8 @@ var infoReindexing = ` Reindexing of Elasticsearch/OpenSearch indices if needed. ` +const lockFile = "/tmp/reindex.lock" + var ( statusFile = "/hab/status.json" taskMutex sync.Mutex @@ -99,7 +102,7 @@ var reindexCmd = &cobra.Command{ } var statusCmd = &cobra.Command{ - Use: "indexing-status", + Use: "index-status", Short: "Check the reindexing status", RunE: checkStatus, } @@ -160,19 +163,51 @@ func readStatus() (Status, error) { } func updateStatus(status *Status) error { - // taskMutex.Lock() - // defer taskMutex.Unlock() - status.LastUpdatedAt = time.Now().Format(time.RFC3339) file, err := json.MarshalIndent(status, "", " ") if err != nil { return err } - fmt.Printf("file: %v\n", string(file)) + return os.WriteFile(statusFile, file, 0644) } +func isProcessRunning() bool { + data, err := os.ReadFile(lockFile) + if err != nil { + if os.IsNotExist(err) { + return false // No lock file means no process is running + } + fmt.Printf("Error reading lock file: %v\n", err) + return false + } + + pid := strings.TrimSpace(string(data)) + pidInt, err := strconv.Atoi(pid) + if err != nil { + fmt.Printf("Error converting PID to integer: %v\n", err) + return false + } + process, err := os.FindProcess(pidInt) + if err != nil { + return false // Process not found + } + + // Check if the process is still alive + err = process.Signal(syscall.Signal(0)) + return err == nil +} + +func createLockFile() error { + pid := fmt.Sprintf("%d\n", os.Getpid()) + return os.WriteFile(lockFile, []byte(pid), 0644) +} + +func removeLockFile() { + os.Remove(lockFile) // Ignore errors during cleanup +} + func runInBackground() { cmd := exec.Command(os.Args[0], append(os.Args[1:], "--background")...) cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true} // Detach process group @@ -191,20 +226,20 @@ func runReindex(cmd *cobra.Command, args []string) error { backgroundFlag, _ := cmd.Flags().GetBool("background") if !backgroundFlag { - // Check if reindexing is already running - if reindexStatus.Status == "Running" { - fmt.Println("Reindexing process is already running. Run `chef-automate indexing-status` to get the status.") + if isProcessRunning() { + fmt.Println("Reindexing process is already running. Run `chef-automate index-status` to get the status.") return nil } + runInBackground() } - fmt.Println("Starting reindexing process...") - - if reindexStatus.Status == "Running" { - fmt.Println("Reindexing process is already running. Run `chef-automate indexing-status` to get the status.") - return nil + if err := createLockFile(); err != nil { + return fmt.Errorf("failed to create lock file: %v", err) } + defer removeLockFile() // Ensure cleanup after reindexing completes + + fmt.Println("Starting reindexing process...") osInfo, err := getOpenSearchInfo() if err != nil { @@ -213,19 +248,19 @@ func runReindex(cmd *cobra.Command, args []string) error { reindexStatus.OpenSearchVersion = osInfo.Version.Number reindexStatus.Status = "Running" - fmt.Println("Fetching indices from Elasticsearch/OpenSearch.") + indices, err := fetchIndices() if err != nil { fmt.Printf("Error fetching indices: %v\n", err) return err } - // Add indices to the queue for reindexing +OuterLoop: for _, index := range indices { for prefix := range skipIndices { if strings.HasPrefix(index.Index, prefix) { fmt.Printf("Skipping index %s\n", index.Index) - continue + continue OuterLoop } } @@ -238,12 +273,12 @@ func runReindex(cmd *cobra.Command, args []string) error { reindexStatus.CompletedIndex = append(reindexStatus.CompletedIndex, index.Index) } + fmt.Printf("reindexStatus: %v\n", reindexStatus) } reindexStatus.Status = "Completed" - if err := updateStatus(reindexStatus); err != nil { - fmt.Printf("Error updating status: %v\n", err) - } + updateStatus(reindexStatus) + fmt.Println("Reindexing process completed.") return nil } From 1514799456baa2e7fa071ca89b3df42f6394000b Mon Sep 17 00:00:00 2001 From: daveaugustus Date: Mon, 23 Dec 2024 15:21:51 +0530 Subject: [PATCH 5/5] update aliases when cloning fails Signed-off-by: daveaugustus --- .../cmd/chef-automate/reindexing.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/components/automate-cli/cmd/chef-automate/reindexing.go b/components/automate-cli/cmd/chef-automate/reindexing.go index edd36734e29..0e43ce8accf 100644 --- a/components/automate-cli/cmd/chef-automate/reindexing.go +++ b/components/automate-cli/cmd/chef-automate/reindexing.go @@ -409,13 +409,12 @@ func triggerReindex(index string) error { return fmt.Errorf("failed to fetch mappings for index %s: %w", index, err) } - // Fetch aliases for the index aliases, err := fetchAliases(index) if err != nil { fmt.Printf("Warning: failed to fetch aliases for index %s: %v\n", index, err) aliases = []string{} } - fmt.Printf("aliases of index %s: %v\n", index, aliases) + fmt.Printf("Aliases of index %s: %v\n", index, aliases) if err := createIndex(tempIndex, settings, mappings, index); err != nil { return fmt.Errorf("failed to create temporary index %s: %w", tempIndex, err) @@ -442,12 +441,21 @@ func triggerReindex(index string) error { fmt.Println("Original index deleted successfully.") if err := cloneIndex(tempIndex, index); err != nil { - return fmt.Errorf("failed to clone temp index %s to %s: %w", tempIndex, index, err) + // If cloning fails, redirect aliases to the temporary index + fmt.Printf("Failed to clone temp index %s to %s: %v\n", tempIndex, index, err) + fmt.Println("Redirecting aliases to the temporary index to prevent data loss.") + + if len(aliases) > 0 { + if aliasErr := updateAliases(tempIndex, []string{"index"}); aliasErr != nil { + return fmt.Errorf("failed to update aliases for temp index %s: %w", tempIndex, aliasErr) + } + fmt.Println("Aliases updated to point to the temporary index.") + } + return fmt.Errorf("failed to clone index; aliases redirected to temporary index") } fmt.Println("Temporary index cloned to original index name successfully.") - // Reassign aliases to the cloned index if len(aliases) > 0 { if err := updateAliases(index, aliases); err != nil { return fmt.Errorf("failed to update aliases for index %s: %w", index, err) @@ -456,7 +464,7 @@ func triggerReindex(index string) error { } if err := setIndexWriteBlock(index, false); err != nil { - return fmt.Errorf("failed to remove write block on temporary index %s: %w", tempIndex, err) + return fmt.Errorf("failed to remove write block on index %s: %w", index, err) } if err := deleteIndex(tempIndex); err != nil {