diff --git a/components/automate-cli/cmd/chef-automate/reindexing.go b/components/automate-cli/cmd/chef-automate/reindexing.go index 4bbd9b04c69..0e43ce8accf 100644 --- a/components/automate-cli/cmd/chef-automate/reindexing.go +++ b/components/automate-cli/cmd/chef-automate/reindexing.go @@ -6,13 +6,48 @@ import ( "fmt" "io" "net/http" + "os" + "os/exec" + "strconv" "strings" "sync" + "syscall" + "time" "github.com/pkg/errors" "github.com/spf13/cobra" ) +type OpensearchInfo struct { + Name string `json:"name"` + ClusterName string `json:"cluster_name"` + ClusterUUID string `json:"cluster_uuid"` + Version struct { + Distribution string `json:"distribution"` + Number string `json:"number"` + BuildType string `json:"build_type"` + BuildHash string `json:"build_hash"` + BuildSnapshot bool `json:"build_snapshot"` + LuceneVersion string `json:"lucene_version"` + MinimumWireCompatibilityVersion string `json:"minimum_wire_compatibility_version"` + MinimumIndexCompatibilityVersion string `json:"minimum_index_compatibility_version"` + } `json:"version"` + Tagline string `json:"tagline"` +} + +type FailedIndex struct { + IndexName string `json:"index_name"` + ErrorMessage string `json:"error_message,omitempty"` +} + +type Status struct { + Status string `json:"status"` + CompletedIndex []string `json:"completed_index"` + FailedIndex []FailedIndex `json:"failed_index"` + OpenSearchVersion string `json:"opensearch_version"` + LastUpdatedAt string `json:"last_updated_at"` +} + type Indices []struct { Health string `json:"health"` Status string `json:"status"` @@ -44,9 +79,20 @@ var infoReindexing = ` Reindexing of Elasticsearch/OpenSearch indices if needed. ` +const lockFile = "/tmp/reindex.lock" + +var ( + statusFile = "/hab/status.json" + taskMutex sync.Mutex +) + +var reindexStatus = &Status{} + func init() { reindexCmd.SetUsageTemplate(infoReindexing) + reindexCmd.Flags().Bool("background", false, "Run reindexing in the background") RootCmd.AddCommand(reindexCmd) + RootCmd.AddCommand(statusCmd) } var reindexCmd = &cobra.Command{ @@ -55,8 +101,11 @@ var reindexCmd = &cobra.Command{ RunE: runReindex, } -var isReindexing bool -var mu sync.Mutex +var statusCmd = &cobra.Command{ + Use: "index-status", + Short: "Check the reindexing status", + RunE: checkStatus, +} var skipIndices = map[string]bool{ "security-auditlog": true, @@ -65,52 +114,170 @@ var skipIndices = map[string]bool{ ".opensearch-observability": true, } +func checkStatus(cmd *cobra.Command, args []string) error { + reindexStatus, err := readStatus() + if err != nil { + fmt.Printf("Error reading status: %v\n", err) + return err + } + + fmt.Printf("Current Status:\n") + fmt.Printf("- Status: %s\n", reindexStatus.Status) + fmt.Printf("- OpenSearch Version: %s\n", reindexStatus.OpenSearchVersion) + fmt.Printf("- Last Updated At: %s\n", reindexStatus.LastUpdatedAt) + fmt.Printf("- Completed Indices:\n") + for _, index := range reindexStatus.CompletedIndex { + fmt.Printf(" - %s\n", index) + } + + fmt.Printf("- Failed Indices:\n") + for _, task := range reindexStatus.FailedIndex { + fmt.Printf(" - Index: %s\n", task.IndexName) + if task.ErrorMessage != "" { + fmt.Printf(", Error: %s", task.ErrorMessage) + } + fmt.Println() + } + + return nil +} + +// Reads the current task status from the JSON file +func readStatus() (Status, error) { + taskMutex.Lock() + defer taskMutex.Unlock() + + file, err := os.ReadFile(statusFile) + if os.IsNotExist(err) { + return Status{}, nil + } else if err != nil { + return Status{}, err + } + + var status Status + err = json.Unmarshal(file, &status) + if err != nil { + return Status{}, err + } + return status, nil +} + +func updateStatus(status *Status) error { + status.LastUpdatedAt = time.Now().Format(time.RFC3339) + + file, err := json.MarshalIndent(status, "", " ") + if err != nil { + return err + } + + return os.WriteFile(statusFile, file, 0644) +} + +func isProcessRunning() bool { + data, err := os.ReadFile(lockFile) + if err != nil { + if os.IsNotExist(err) { + return false // No lock file means no process is running + } + fmt.Printf("Error reading lock file: %v\n", err) + return false + } + + pid := strings.TrimSpace(string(data)) + pidInt, err := strconv.Atoi(pid) + if err != nil { + fmt.Printf("Error converting PID to integer: %v\n", err) + return false + } + process, err := os.FindProcess(pidInt) + if err != nil { + return false // Process not found + } + + // Check if the process is still alive + err = process.Signal(syscall.Signal(0)) + return err == nil +} + +func createLockFile() error { + pid := fmt.Sprintf("%d\n", os.Getpid()) + return os.WriteFile(lockFile, []byte(pid), 0644) +} + +func removeLockFile() { + os.Remove(lockFile) // Ignore errors during cleanup +} + +func runInBackground() { + cmd := exec.Command(os.Args[0], append(os.Args[1:], "--background")...) + cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true} // Detach process group + cmd.Stdout = nil + cmd.Stderr = nil + cmd.Stdin = nil + if err := cmd.Start(); err != nil { + fmt.Println("Error starting process in background:", err) + os.Exit(1) + } + fmt.Println("Task is now running in the background with PID:", cmd.Process.Pid) + os.Exit(0) +} + func runReindex(cmd *cobra.Command, args []string) error { - fmt.Println("Reindexing Elasticsearch/OpenSearch indices.") - mu.Lock() - if isReindexing { - mu.Unlock() - fmt.Println("Reindexing is already in progress. Please wait for it to complete.") - return nil - } - isReindexing = true - mu.Unlock() - - defer func() { - mu.Lock() - isReindexing = false - mu.Unlock() - }() + backgroundFlag, _ := cmd.Flags().GetBool("background") + + if !backgroundFlag { + if isProcessRunning() { + fmt.Println("Reindexing process is already running. Run `chef-automate index-status` to get the status.") + return nil + } + + runInBackground() + } + + if err := createLockFile(); err != nil { + return fmt.Errorf("failed to create lock file: %v", err) + } + defer removeLockFile() // Ensure cleanup after reindexing completes + + fmt.Println("Starting reindexing process...") + + osInfo, err := getOpenSearchInfo() + if err != nil { + fmt.Printf("Error fetching OpenSearch info: %v\n", err) + } + + reindexStatus.OpenSearchVersion = osInfo.Version.Number + reindexStatus.Status = "Running" indices, err := fetchIndices() if err != nil { + fmt.Printf("Error fetching indices: %v\n", err) return err } -Loop1: +OuterLoop: for _, index := range indices { for prefix := range skipIndices { if strings.HasPrefix(index.Index, prefix) { fmt.Printf("Skipping index %s\n", index.Index) - continue Loop1 + continue OuterLoop } } - settings, err := fetchIndexSettingsVersion(index.Index) - if err != nil { - fmt.Printf("Error fetching settings for index %s: %v\n", index.Index, err) - continue - } - if settings.Settings.Index.Version.CreatedString != settings.Settings.Index.Version.UpgradedString { - fmt.Printf("Reindexing required for index: %s\n", index.Index) - if err := triggerReindex(index.Index); err != nil { - fmt.Printf("Error reindexing index %s: %v\n", index.Index, err) - } + if err := triggerReindex(index.Index); err != nil { + reindexStatus.FailedIndex = append(reindexStatus.FailedIndex, FailedIndex{ + IndexName: index.Index, + ErrorMessage: err.Error(), + }) } else { - fmt.Printf("Index %s is up to date. Skipping reindex.\n", index.Index) + reindexStatus.CompletedIndex = append(reindexStatus.CompletedIndex, index.Index) } + + fmt.Printf("reindexStatus: %v\n", reindexStatus) } + reindexStatus.Status = "Completed" + updateStatus(reindexStatus) fmt.Println("Reindexing process completed.") return nil } @@ -242,13 +409,12 @@ func triggerReindex(index string) error { return fmt.Errorf("failed to fetch mappings for index %s: %w", index, err) } - // Fetch aliases for the index aliases, err := fetchAliases(index) if err != nil { fmt.Printf("Warning: failed to fetch aliases for index %s: %v\n", index, err) aliases = []string{} } - fmt.Printf("aliases of index %s: %v\n", index, aliases) + fmt.Printf("Aliases of index %s: %v\n", index, aliases) if err := createIndex(tempIndex, settings, mappings, index); err != nil { return fmt.Errorf("failed to create temporary index %s: %w", tempIndex, err) @@ -275,12 +441,21 @@ func triggerReindex(index string) error { fmt.Println("Original index deleted successfully.") if err := cloneIndex(tempIndex, index); err != nil { - return fmt.Errorf("failed to clone temp index %s to %s: %w", tempIndex, index, err) + // If cloning fails, redirect aliases to the temporary index + fmt.Printf("Failed to clone temp index %s to %s: %v\n", tempIndex, index, err) + fmt.Println("Redirecting aliases to the temporary index to prevent data loss.") + + if len(aliases) > 0 { + if aliasErr := updateAliases(tempIndex, []string{"index"}); aliasErr != nil { + return fmt.Errorf("failed to update aliases for temp index %s: %w", tempIndex, aliasErr) + } + fmt.Println("Aliases updated to point to the temporary index.") + } + return fmt.Errorf("failed to clone index; aliases redirected to temporary index") } fmt.Println("Temporary index cloned to original index name successfully.") - // Reassign aliases to the cloned index if len(aliases) > 0 { if err := updateAliases(index, aliases); err != nil { return fmt.Errorf("failed to update aliases for index %s: %w", index, err) @@ -288,8 +463,8 @@ func triggerReindex(index string) error { fmt.Println("Aliases updated successfully.") } - if err := setIndexWriteBlock(tempIndex, false); err != nil { - return fmt.Errorf("failed to remove write block on temporary index %s: %w", tempIndex, err) + if err := setIndexWriteBlock(index, false); err != nil { + return fmt.Errorf("failed to remove write block on index %s: %w", index, err) } if err := deleteIndex(tempIndex); err != nil { @@ -494,28 +669,6 @@ func deleteIndex(index string) error { return nil } -// Close an index -func closeIndex(index string) error { - fmt.Printf("Closing index: %s\n", index) - url := fmt.Sprintf("http://127.0.0.1:10144/%s/_close", index) - req, err := http.NewRequest(http.MethodPost, url, nil) - if err != nil { - return fmt.Errorf("failed to create close request for index %s: %w", index, err) - } - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("failed to close index %s: %w", index, err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("failed to close index %s: %s", index, string(body)) - } - return nil -} - func cloneIndex(sourceIndex, targetIndex string) error { fmt.Printf("Cloning index from %s to %s\n", sourceIndex, targetIndex) url := fmt.Sprintf("http://127.0.0.1:10144/%s/_clone/%s", sourceIndex, targetIndex) @@ -541,28 +694,6 @@ func cloneIndex(sourceIndex, targetIndex string) error { return nil } -// Open an index -func openIndex(index string) error { - fmt.Printf("Opening index: %s\n", index) - url := fmt.Sprintf("http://127.0.0.1:10144/%s/_open", index) - req, err := http.NewRequest(http.MethodPost, url, nil) - if err != nil { - return fmt.Errorf("failed to create open request for index %s: %w", index, err) - } - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("failed to open index %s: %w", index, err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("failed to open index %s: %s", index, string(body)) - } - return nil -} - func setIndexWriteBlock(index string, readOnly bool) error { fmt.Printf("Setting index.blocks.write to %v for index: %s\n", readOnly, index) url := fmt.Sprintf("http://127.0.0.1:10144/%s/_settings", index) @@ -633,3 +764,19 @@ func fetchAliases(index string) ([]string, error) { return aliases, nil } + +func getOpenSearchInfo() (*OpensearchInfo, error) { + resp, err := http.Get("http://127.0.0.1:10144/") + if err != nil { + return nil, fmt.Errorf("failed to fetch OpenSearch info: %w", err) + } + defer resp.Body.Close() + + // Parse the response + var info OpensearchInfo + if err := json.NewDecoder(resp.Body).Decode(&info); err != nil { + return nil, fmt.Errorf("failed to decode OpenSearch info: %w", err) + } + + return &info, nil +}