From f30de65133c0d84c0afb44f0d0310ab2f0cdd996 Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Tue, 26 Nov 2024 20:56:43 +0530 Subject: [PATCH] update to workflow for openai --- cmd/analyze.go | 216 ++++++++++++++++++++++++----------- pkg/analyze/openai/openai.go | 14 ++- 2 files changed, 159 insertions(+), 71 deletions(-) diff --git a/cmd/analyze.go b/cmd/analyze.go index 7981631..a3d9da9 100644 --- a/cmd/analyze.go +++ b/cmd/analyze.go @@ -2,6 +2,7 @@ package cmd import ( "archive/zip" + "encoding/json" "fmt" "io" "log" @@ -10,11 +11,14 @@ import ( "os/exec" "path/filepath" "runtime" + "strings" + "sync" "time" internalHTTP "pb/pkg/http" "github.com/briandowns/spinner" + "github.com/manifoldco/promptui" "pb/pkg/analyze/duckdb" "pb/pkg/analyze/k8s" @@ -36,21 +40,18 @@ const ( func validateLLMConfig() { provider, exists := os.LookupEnv("P_LLM_PROVIDER") if !exists || (provider != "openai" && provider != "ollama" && provider != "claude") { - fmt.Printf(red + "Error: P_LLM_PROVIDER must be set to one of: openai, ollama, claude\n" + reset) - os.Exit(1) + log.Fatalf(red + "Error: P_LLM_PROVIDER must be set to one of: openai, ollama, claude\n" + reset) } _, keyExists := os.LookupEnv("P_LLM_KEY") if !keyExists { - fmt.Printf(red + "Error: P_LLM_KEY must be set\n" + reset) - os.Exit(1) + log.Fatalf(red + "Error: P_LLM_KEY must be set\n" + reset) } if provider == "ollama" { _, endpointExists := os.LookupEnv("P_LLM_ENDPOINT") if !endpointExists { - fmt.Printf(red + "Error: P_LLM_ENDPOINT must be set when using ollama as the provider\n" + reset) - os.Exit(1) + log.Fatalf(red + "Error: P_LLM_ENDPOINT must be set when using ollama as the provider\n" + reset) } } @@ -58,96 +59,88 @@ func validateLLMConfig() { } var AnalyzeCmd = &cobra.Command{ - Use: "stream", // Subcommand for "analyze" + Use: "stream", Short: "Analyze streams in the Parseable server", Example: "pb analyze stream ", - Args: cobra.ExactArgs(1), // Ensure exactly one argument is passed + Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { - - // Check and install DuckDB if necessary checkAndInstallDuckDB() - - // Validate LLM environment variables validateLLMConfig() name := args[0] fmt.Printf(yellow+"Analyzing stream: %s\n"+reset, name) - detectSchema(name) - // Initialize spinner - s := spinner.New(spinner.CharSets[14], 100*time.Millisecond) + var wg sync.WaitGroup - // Step 1: Query Data + s := spinner.New(spinner.CharSets[14], 100*time.Millisecond) s.Suffix = fmt.Sprintf(yellow+" Querying data from Parseable server...(%s)", DefaultProfile.URL) s.Start() - client := internalHTTP.DefaultClient(&DefaultProfile) + client := internalHTTP.DefaultClient(&DefaultProfile) query := `with distinct_name as (select distinct(\"involvedObject_name\") as name from \"k8s-events\" where reason ilike '%kill%' or reason ilike '%fail%' or reason ilike '%back%') select reason, message, \"involvedObject_name\", \"involvedObject_namespace\", \"reportingComponent\", timestamp from \"k8s-events\" as t1 join distinct_name t2 on t1.\"involvedObject_name\" = t2.name order by timestamp` allData, err := duckdb.QueryPb(&client, query, "2024-11-11T00:00:00+00:00", "2024-11-21T00:00:00+00:00") s.Stop() if err != nil { - fmt.Printf(red+"Error querying data in Parseable: %v\n"+reset, err) - return fmt.Errorf("error querying data in Parseable, err [%w]", err) + log.Printf(red+"Error querying data in Parseable: %v\n"+reset, err) + return fmt.Errorf("error querying data: %w", err) } fmt.Println(green + "Data successfully queried from Parseable." + reset) - // Step 2: Store Data in DuckDB - s.Suffix = " Storing data in DuckDB..." - s.Start() - if err := duckdb.StoreInDuckDB(allData); err != nil { - s.Stop() - fmt.Printf(red+"Error storing data in DuckDB: %v\n"+reset, err) - return fmt.Errorf("error storing data in DuckDB, err [%w]", err) - } - s.Stop() - fmt.Println(green + "Data successfully stored in DuckDB." + reset) - - // Step 3: Prompt Kubernetes Context - _, err = k8s.PromptK8sContext() - if err != nil { - fmt.Printf(red+"Error prompting Kubernetes context: %v\n"+reset, err) - return err - } + wg.Add(1) + go func() { + defer wg.Done() + if err := duckdb.StoreInDuckDB(allData); err != nil { + log.Fatalf(red+"Error storing data in DuckDB: %v\n"+reset, err) + } + fmt.Println(green + "Data successfully stored in DuckDB." + reset) + }() + wg.Wait() + + // Main analysis loop + for { + // Kubernetes context prompt + _, err := k8s.PromptK8sContext() + if err != nil { + return fmt.Errorf(red+"Error prompting Kubernetes context: %w\n"+reset, err) + } - // Step 4: Select Namespace - namespace, err := k8s.PromptNamespace(k8s.GetKubeClient()) - if err != nil { - log.Fatalf(red+"Error selecting namespace: %v\n"+reset, err) - } - fmt.Printf(yellow+"Selected Namespace: %s\n"+reset, namespace) + namespace, err := k8s.PromptNamespace(k8s.GetKubeClient()) + if err != nil { + return fmt.Errorf(red+"Error selecting namespace: %w\n"+reset, err) + } + fmt.Printf(yellow+"Selected Namespace: %s\n"+reset, namespace) - // Step 5: Select Pod - pod, err := k8s.PromptPod(k8s.GetKubeClient(), namespace) - if err != nil { - log.Fatalf(red+"Error selecting pod: %v\n"+reset, err) - } - fmt.Printf(yellow+"Selected Pod: %s\n"+reset, pod) + pod, err := k8s.PromptPod(k8s.GetKubeClient(), namespace) + if err != nil { + return fmt.Errorf(red+"Error selecting pod: %w\n"+reset, err) + } + fmt.Printf(yellow+"Selected Pod: %s\n"+reset, pod) - // Step 6: Fetch Events from DuckDB - s.Suffix = " Fetching pod events from DuckDB..." - s.Start() - result, err := duckdb.FetchPodEventsfromDb(pod) - s.Stop() - if err != nil { - fmt.Printf(red+"Error fetching pod events from DuckDB: %v\n"+reset, err) - return err - } + s.Suffix = " Fetching pod events from DuckDB..." + s.Start() + result, err := duckdb.FetchPodEventsfromDb(pod) + s.Stop() + if err != nil { + return fmt.Errorf(red+"Error fetching pod events: %w\n"+reset, err) + } - // Step 7: Analyze with LLM - s.Suffix = " Analyzing events with LLM..." - s.Start() - gptResponse, err := openai.AnalyzeEventsWithGPT(pod, namespace, result) - s.Stop() - if err != nil { - fmt.Printf(red+"Failed to analyze events with LLM: %v\n"+reset, err) - return fmt.Errorf("failed to analyze events with LLM: %w", err) - } + s.Suffix = " Analyzing events with LLM..." + s.Start() + gptResponse, err := openai.AnalyzeEventsWithGPT(pod, namespace, result) + s.Stop() + if err != nil { + return fmt.Errorf(red+"Failed to analyze events: %w\n"+reset, err) + } - // Display Analysis Result - fmt.Println(green + "\nLLM Analysis:\n" + reset + gptResponse) + // Display results using pager + shouldContinue := parseAndSelectAnalysis(gptResponse) + if !shouldContinue { + break // Exit the loop if "no" + } + } return nil }, } @@ -273,3 +266,90 @@ func unzip(src, dest string) error { } return nil } + +type AnalysisResponse struct { + Summary string `json:"summary"` + RootCauseAnalysis string `json:"root_cause_analysis"` + MitigationSteps []string `json:"mitigation_steps"` +} + +// In parseAndSelectAnalysis, modify to handle user decision to continue or not. +func parseAndSelectAnalysis(response string) bool { + var analysis AnalysisResponse + err := json.Unmarshal([]byte(response), &analysis) + if err != nil { + log.Fatalf("Failed to parse LLM response: %v", err) + } + + // Display the summary by default + fmt.Println(green + "Summary:\n" + reset + analysis.Summary) + + // Prompt the user to choose between "Root Cause Analysis" and "Mitigation Steps" + initialPrompt := promptui.Select{ + Label: "Select Analysis to View", + Items: []string{"Root Cause Analysis", "Mitigation Steps"}, + Size: 3, + } + + _, choice, err := initialPrompt.Run() + if err != nil { + log.Fatalf("Prompt failed: %v", err) + } + + switch choice { + case "Root Cause Analysis": + // Show Root Cause Analysis + fmt.Println(green + "Root Cause Analysis:\n" + reset + analysis.RootCauseAnalysis) + + // Now prompt the user to choose between "Mitigation" or "Pods" + secondPrompt := promptui.Select{ + Label: "What would you like to do next?", + Items: []string{"Mitigation"}, + Size: 3, + } + + _, secondChoice, err := secondPrompt.Run() + if err != nil { + log.Fatalf("Prompt failed: %v", err) + } + + switch secondChoice { + case "Mitigation": + // Show Mitigation Steps + fmt.Println(green + "Mitigation Steps:\n" + reset) + for i, step := range analysis.MitigationSteps { + fmt.Printf("%d. %s\n", i+1, step) + } + + // After displaying mitigation steps, ask if the user wants to analyze another pod/namespace + prompt := promptui.Prompt{ + Label: "Analyze another namespace/pod (yes/no)", + Default: "no", + } + choice, _ := prompt.Run() + if strings.ToLower(choice) != "yes" { + return false // Exit the loop if "no" + } + + } + + case "Mitigation Steps": + // Show Mitigation Steps directly + fmt.Println(green + "Mitigation Steps:\n" + reset) + for i, step := range analysis.MitigationSteps { + fmt.Printf("%d. %s\n", i+1, step) + } + + // After displaying mitigation steps, ask if the user wants to analyze another pod/namespace + prompt := promptui.Prompt{ + Label: "Analyze another namespace/pod (yes/no)", + Default: "no", + } + choice, _ := prompt.Run() + if strings.ToLower(choice) != "yes" { + return false // Exit the loop if "no" + } + } + + return true // Continue the loop if "yes" +} diff --git a/pkg/analyze/openai/openai.go b/pkg/analyze/openai/openai.go index 3b6a486..681fec8 100644 --- a/pkg/analyze/openai/openai.go +++ b/pkg/analyze/openai/openai.go @@ -48,8 +48,16 @@ func AnalyzeEventsWithGPT(podName, namespace string, data []SummaryStat) (string prompt := fmt.Sprintf( `You are an expert at debugging Kubernetes Events. I have a table containing those events and want to debug what is happening in this pod (%s) / namespace (%s). - Give me a summary and overview of what happened by looking at these events. - Provide a root cause analysis and suggest steps to mitigate the error if present. + Give me a detailed summary of what happened by looking at these events. + Provide a root cause analysis and suggest steps to mitigate the error if present. + When sending the response give it in a structured body json. With fields summary, root_cause_analysis and mitigation_steps. + Don't add any json keywords in the response, make sure it just a clean json dump. Please adhere to the following structure + type AnalysisResponse struct { + Summary string json:"summary" + RootCauseAnalysis string json:"root_cause_analysis" + MitigationSteps []string json:"mitigation_steps" + } + In mitigation steps give a command to get logs. In case you are unable to figure out what happened, just say "I'm unable to figure out what is happening here.". %s`, podName, namespace, formattedData) @@ -68,7 +76,7 @@ func AnalyzeEventsWithGPT(podName, namespace string, data []SummaryStat) (string } // Send the request to the OpenAI API - apiKey := os.Getenv("OPENAI_API_KEY") + apiKey := os.Getenv("P_LLM_KEY") req, err := http.NewRequest("POST", "https://api.openai.com/v1/chat/completions", bytes.NewBuffer(payload)) if err != nil { return "", fmt.Errorf("failed to create request: %w", err)