Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async reindexing #8695

Draft
wants to merge 4 commits into
base: upgrade_opensearch_2.x
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
289 changes: 214 additions & 75 deletions components/automate-cli/cmd/chef-automate/reindexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,48 @@ import (
"fmt"
"io"
"net/http"
"os"
"os/exec"
"strconv"
"strings"
"sync"
"syscall"
"time"

"github.com/pkg/errors"
"github.com/spf13/cobra"
)

type OpensearchInfo struct {
Name string `json:"name"`
ClusterName string `json:"cluster_name"`
ClusterUUID string `json:"cluster_uuid"`
Version struct {
Distribution string `json:"distribution"`
Number string `json:"number"`
BuildType string `json:"build_type"`
BuildHash string `json:"build_hash"`
BuildSnapshot bool `json:"build_snapshot"`
LuceneVersion string `json:"lucene_version"`
MinimumWireCompatibilityVersion string `json:"minimum_wire_compatibility_version"`
MinimumIndexCompatibilityVersion string `json:"minimum_index_compatibility_version"`
} `json:"version"`
Tagline string `json:"tagline"`
}

type FailedIndex struct {
IndexName string `json:"index_name"`
ErrorMessage string `json:"error_message,omitempty"`
}

type Status struct {
Status string `json:"status"`
CompletedIndex []string `json:"completed_index"`
FailedIndex []FailedIndex `json:"failed_index"`
OpenSearchVersion string `json:"opensearch_version"`
LastUpdatedAt string `json:"last_updated_at"`
}

type Indices []struct {
Health string `json:"health"`
Status string `json:"status"`
Expand Down Expand Up @@ -44,9 +79,20 @@ var infoReindexing = `
Reindexing of Elasticsearch/OpenSearch indices if needed.
`

const lockFile = "/tmp/reindex.lock"

var (
statusFile = "/hab/status.json"
taskMutex sync.Mutex
)

var reindexStatus = &Status{}

func init() {
reindexCmd.SetUsageTemplate(infoReindexing)
reindexCmd.Flags().Bool("background", false, "Run reindexing in the background")
RootCmd.AddCommand(reindexCmd)
RootCmd.AddCommand(statusCmd)
}

var reindexCmd = &cobra.Command{
Expand All @@ -55,8 +101,11 @@ var reindexCmd = &cobra.Command{
RunE: runReindex,
}

var isReindexing bool
var mu sync.Mutex
var statusCmd = &cobra.Command{
Use: "index-status",
Short: "Check the reindexing status",
RunE: checkStatus,
}

var skipIndices = map[string]bool{
"security-auditlog": true,
Expand All @@ -65,52 +114,170 @@ var skipIndices = map[string]bool{
".opensearch-observability": true,
}

func checkStatus(cmd *cobra.Command, args []string) error {
reindexStatus, err := readStatus()
if err != nil {
fmt.Printf("Error reading status: %v\n", err)
return err
}

fmt.Printf("Current Status:\n")
fmt.Printf("- Status: %s\n", reindexStatus.Status)
fmt.Printf("- OpenSearch Version: %s\n", reindexStatus.OpenSearchVersion)
fmt.Printf("- Last Updated At: %s\n", reindexStatus.LastUpdatedAt)
fmt.Printf("- Completed Indices:\n")
for _, index := range reindexStatus.CompletedIndex {
fmt.Printf(" - %s\n", index)
}

fmt.Printf("- Failed Indices:\n")
for _, task := range reindexStatus.FailedIndex {
fmt.Printf(" - Index: %s\n", task.IndexName)
if task.ErrorMessage != "" {
fmt.Printf(", Error: %s", task.ErrorMessage)
}
fmt.Println()
}

return nil
}

// Reads the current task status from the JSON file
func readStatus() (Status, error) {
taskMutex.Lock()
defer taskMutex.Unlock()

file, err := os.ReadFile(statusFile)
if os.IsNotExist(err) {
return Status{}, nil
} else if err != nil {
return Status{}, err
}

var status Status
err = json.Unmarshal(file, &status)
if err != nil {
return Status{}, err
}
return status, nil
}

func updateStatus(status *Status) error {
status.LastUpdatedAt = time.Now().Format(time.RFC3339)

file, err := json.MarshalIndent(status, "", " ")
if err != nil {
return err
}

return os.WriteFile(statusFile, file, 0644)
}

func isProcessRunning() bool {
data, err := os.ReadFile(lockFile)
if err != nil {
if os.IsNotExist(err) {
return false // No lock file means no process is running
}
fmt.Printf("Error reading lock file: %v\n", err)
return false
}

pid := strings.TrimSpace(string(data))
pidInt, err := strconv.Atoi(pid)
if err != nil {
fmt.Printf("Error converting PID to integer: %v\n", err)
return false
}
process, err := os.FindProcess(pidInt)
if err != nil {
return false // Process not found
}

// Check if the process is still alive
err = process.Signal(syscall.Signal(0))
return err == nil
}

func createLockFile() error {
pid := fmt.Sprintf("%d\n", os.Getpid())
return os.WriteFile(lockFile, []byte(pid), 0644)
}

func removeLockFile() {
os.Remove(lockFile) // Ignore errors during cleanup
}

func runInBackground() {
cmd := exec.Command(os.Args[0], append(os.Args[1:], "--background")...)
cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true} // Detach process group
cmd.Stdout = nil
cmd.Stderr = nil
cmd.Stdin = nil
if err := cmd.Start(); err != nil {
fmt.Println("Error starting process in background:", err)
os.Exit(1)
}
fmt.Println("Task is now running in the background with PID:", cmd.Process.Pid)
os.Exit(0)
}

func runReindex(cmd *cobra.Command, args []string) error {
fmt.Println("Reindexing Elasticsearch/OpenSearch indices.")
mu.Lock()
if isReindexing {
mu.Unlock()
fmt.Println("Reindexing is already in progress. Please wait for it to complete.")
return nil
}
isReindexing = true
mu.Unlock()

defer func() {
mu.Lock()
isReindexing = false
mu.Unlock()
}()
backgroundFlag, _ := cmd.Flags().GetBool("background")

if !backgroundFlag {
if isProcessRunning() {
fmt.Println("Reindexing process is already running. Run `chef-automate index-status` to get the status.")
return nil
}

runInBackground()
}

if err := createLockFile(); err != nil {
return fmt.Errorf("failed to create lock file: %v", err)
}
defer removeLockFile() // Ensure cleanup after reindexing completes

fmt.Println("Starting reindexing process...")

osInfo, err := getOpenSearchInfo()
if err != nil {
fmt.Printf("Error fetching OpenSearch info: %v\n", err)
}

reindexStatus.OpenSearchVersion = osInfo.Version.Number
reindexStatus.Status = "Running"

indices, err := fetchIndices()
if err != nil {
fmt.Printf("Error fetching indices: %v\n", err)
return err
}

Loop1:
OuterLoop:
for _, index := range indices {
for prefix := range skipIndices {
if strings.HasPrefix(index.Index, prefix) {
fmt.Printf("Skipping index %s\n", index.Index)
continue Loop1
continue OuterLoop
}
}
settings, err := fetchIndexSettingsVersion(index.Index)
if err != nil {
fmt.Printf("Error fetching settings for index %s: %v\n", index.Index, err)
continue
}

if settings.Settings.Index.Version.CreatedString != settings.Settings.Index.Version.UpgradedString {
fmt.Printf("Reindexing required for index: %s\n", index.Index)
if err := triggerReindex(index.Index); err != nil {
fmt.Printf("Error reindexing index %s: %v\n", index.Index, err)
}
if err := triggerReindex(index.Index); err != nil {
reindexStatus.FailedIndex = append(reindexStatus.FailedIndex, FailedIndex{
IndexName: index.Index,
ErrorMessage: err.Error(),
})
} else {
fmt.Printf("Index %s is up to date. Skipping reindex.\n", index.Index)
reindexStatus.CompletedIndex = append(reindexStatus.CompletedIndex, index.Index)
}

fmt.Printf("reindexStatus: %v\n", reindexStatus)
}

reindexStatus.Status = "Completed"
updateStatus(reindexStatus)
fmt.Println("Reindexing process completed.")
return nil
}
Expand Down Expand Up @@ -288,7 +455,7 @@ func triggerReindex(index string) error {
fmt.Println("Aliases updated successfully.")
}

if err := setIndexWriteBlock(tempIndex, false); err != nil {
if err := setIndexWriteBlock(index, false); err != nil {
return fmt.Errorf("failed to remove write block on temporary index %s: %w", tempIndex, err)
}

Expand Down Expand Up @@ -494,28 +661,6 @@ func deleteIndex(index string) error {
return nil
}

// Close an index
func closeIndex(index string) error {
fmt.Printf("Closing index: %s\n", index)
url := fmt.Sprintf("http://127.0.0.1:10144/%s/_close", index)
req, err := http.NewRequest(http.MethodPost, url, nil)
if err != nil {
return fmt.Errorf("failed to create close request for index %s: %w", index, err)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("failed to close index %s: %w", index, err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("failed to close index %s: %s", index, string(body))
}
return nil
}

func cloneIndex(sourceIndex, targetIndex string) error {
fmt.Printf("Cloning index from %s to %s\n", sourceIndex, targetIndex)
url := fmt.Sprintf("http://127.0.0.1:10144/%s/_clone/%s", sourceIndex, targetIndex)
Expand All @@ -541,28 +686,6 @@ func cloneIndex(sourceIndex, targetIndex string) error {
return nil
}

// Open an index
func openIndex(index string) error {
fmt.Printf("Opening index: %s\n", index)
url := fmt.Sprintf("http://127.0.0.1:10144/%s/_open", index)
req, err := http.NewRequest(http.MethodPost, url, nil)
if err != nil {
return fmt.Errorf("failed to create open request for index %s: %w", index, err)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("failed to open index %s: %w", index, err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("failed to open index %s: %s", index, string(body))
}
return nil
}

func setIndexWriteBlock(index string, readOnly bool) error {
fmt.Printf("Setting index.blocks.write to %v for index: %s\n", readOnly, index)
url := fmt.Sprintf("http://127.0.0.1:10144/%s/_settings", index)
Expand Down Expand Up @@ -633,3 +756,19 @@ func fetchAliases(index string) ([]string, error) {

return aliases, nil
}

func getOpenSearchInfo() (*OpensearchInfo, error) {
resp, err := http.Get("http://127.0.0.1:10144/")
if err != nil {
return nil, fmt.Errorf("failed to fetch OpenSearch info: %w", err)
}
defer resp.Body.Close()

// Parse the response
var info OpensearchInfo
if err := json.NewDecoder(resp.Body).Decode(&info); err != nil {
return nil, fmt.Errorf("failed to decode OpenSearch info: %w", err)
}

return &info, nil
}
Loading