Skip to content

Commit

Permalink
update to workflow for openai
Browse files Browse the repository at this point in the history
  • Loading branch information
AdheipSingh committed Nov 26, 2024
1 parent 7fe0ac5 commit f30de65
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 71 deletions.
216 changes: 148 additions & 68 deletions cmd/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"archive/zip"
"encoding/json"
"fmt"
"io"
"log"
Expand All @@ -10,11 +11,14 @@ import (
"os/exec"
"path/filepath"
"runtime"
"strings"
"sync"
"time"

internalHTTP "pb/pkg/http"

"github.com/briandowns/spinner"
"github.com/manifoldco/promptui"

"pb/pkg/analyze/duckdb"
"pb/pkg/analyze/k8s"
Expand All @@ -36,118 +40,107 @@ const (
func validateLLMConfig() {
provider, exists := os.LookupEnv("P_LLM_PROVIDER")
if !exists || (provider != "openai" && provider != "ollama" && provider != "claude") {
fmt.Printf(red + "Error: P_LLM_PROVIDER must be set to one of: openai, ollama, claude\n" + reset)
os.Exit(1)
log.Fatalf(red + "Error: P_LLM_PROVIDER must be set to one of: openai, ollama, claude\n" + reset)
}

_, keyExists := os.LookupEnv("P_LLM_KEY")
if !keyExists {
fmt.Printf(red + "Error: P_LLM_KEY must be set\n" + reset)
os.Exit(1)
log.Fatalf(red + "Error: P_LLM_KEY must be set\n" + reset)
}

if provider == "ollama" {
_, endpointExists := os.LookupEnv("P_LLM_ENDPOINT")
if !endpointExists {
fmt.Printf(red + "Error: P_LLM_ENDPOINT must be set when using ollama as the provider\n" + reset)
os.Exit(1)
log.Fatalf(red + "Error: P_LLM_ENDPOINT must be set when using ollama as the provider\n" + reset)
}
}

fmt.Printf(green+"Using %s for analysis.\n"+reset, provider)
}

var AnalyzeCmd = &cobra.Command{
Use: "stream", // Subcommand for "analyze"
Use: "stream",
Short: "Analyze streams in the Parseable server",
Example: "pb analyze stream <stream-name>",
Args: cobra.ExactArgs(1), // Ensure exactly one argument is passed
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {

// Check and install DuckDB if necessary
checkAndInstallDuckDB()

// Validate LLM environment variables
validateLLMConfig()

name := args[0]
fmt.Printf(yellow+"Analyzing stream: %s\n"+reset, name)

detectSchema(name)

// Initialize spinner
s := spinner.New(spinner.CharSets[14], 100*time.Millisecond)
var wg sync.WaitGroup

// Step 1: Query Data
s := spinner.New(spinner.CharSets[14], 100*time.Millisecond)
s.Suffix = fmt.Sprintf(yellow+" Querying data from Parseable server...(%s)", DefaultProfile.URL)
s.Start()
client := internalHTTP.DefaultClient(&DefaultProfile)

client := internalHTTP.DefaultClient(&DefaultProfile)
query := `with distinct_name as (select distinct(\"involvedObject_name\") as name from \"k8s-events\" where reason ilike '%kill%' or reason ilike '%fail%' or reason ilike '%back%') select reason, message, \"involvedObject_name\", \"involvedObject_namespace\", \"reportingComponent\", timestamp from \"k8s-events\" as t1 join distinct_name t2 on t1.\"involvedObject_name\" = t2.name order by timestamp`

allData, err := duckdb.QueryPb(&client, query, "2024-11-11T00:00:00+00:00", "2024-11-21T00:00:00+00:00")
s.Stop()
if err != nil {
fmt.Printf(red+"Error querying data in Parseable: %v\n"+reset, err)
return fmt.Errorf("error querying data in Parseable, err [%w]", err)
log.Printf(red+"Error querying data in Parseable: %v\n"+reset, err)
return fmt.Errorf("error querying data: %w", err)
}
fmt.Println(green + "Data successfully queried from Parseable." + reset)

// Step 2: Store Data in DuckDB
s.Suffix = " Storing data in DuckDB..."
s.Start()
if err := duckdb.StoreInDuckDB(allData); err != nil {
s.Stop()
fmt.Printf(red+"Error storing data in DuckDB: %v\n"+reset, err)
return fmt.Errorf("error storing data in DuckDB, err [%w]", err)
}
s.Stop()
fmt.Println(green + "Data successfully stored in DuckDB." + reset)

// Step 3: Prompt Kubernetes Context
_, err = k8s.PromptK8sContext()
if err != nil {
fmt.Printf(red+"Error prompting Kubernetes context: %v\n"+reset, err)
return err
}
wg.Add(1)
go func() {
defer wg.Done()
if err := duckdb.StoreInDuckDB(allData); err != nil {
log.Fatalf(red+"Error storing data in DuckDB: %v\n"+reset, err)
}
fmt.Println(green + "Data successfully stored in DuckDB." + reset)
}()
wg.Wait()

// Main analysis loop
for {
// Kubernetes context prompt
_, err := k8s.PromptK8sContext()
if err != nil {
return fmt.Errorf(red+"Error prompting Kubernetes context: %w\n"+reset, err)
}

// Step 4: Select Namespace
namespace, err := k8s.PromptNamespace(k8s.GetKubeClient())
if err != nil {
log.Fatalf(red+"Error selecting namespace: %v\n"+reset, err)
}
fmt.Printf(yellow+"Selected Namespace: %s\n"+reset, namespace)
namespace, err := k8s.PromptNamespace(k8s.GetKubeClient())
if err != nil {
return fmt.Errorf(red+"Error selecting namespace: %w\n"+reset, err)
}
fmt.Printf(yellow+"Selected Namespace: %s\n"+reset, namespace)

// Step 5: Select Pod
pod, err := k8s.PromptPod(k8s.GetKubeClient(), namespace)
if err != nil {
log.Fatalf(red+"Error selecting pod: %v\n"+reset, err)
}
fmt.Printf(yellow+"Selected Pod: %s\n"+reset, pod)
pod, err := k8s.PromptPod(k8s.GetKubeClient(), namespace)
if err != nil {
return fmt.Errorf(red+"Error selecting pod: %w\n"+reset, err)
}
fmt.Printf(yellow+"Selected Pod: %s\n"+reset, pod)

// Step 6: Fetch Events from DuckDB
s.Suffix = " Fetching pod events from DuckDB..."
s.Start()
result, err := duckdb.FetchPodEventsfromDb(pod)
s.Stop()
if err != nil {
fmt.Printf(red+"Error fetching pod events from DuckDB: %v\n"+reset, err)
return err
}
s.Suffix = " Fetching pod events from DuckDB..."
s.Start()
result, err := duckdb.FetchPodEventsfromDb(pod)
s.Stop()
if err != nil {
return fmt.Errorf(red+"Error fetching pod events: %w\n"+reset, err)
}

// Step 7: Analyze with LLM
s.Suffix = " Analyzing events with LLM..."
s.Start()
gptResponse, err := openai.AnalyzeEventsWithGPT(pod, namespace, result)
s.Stop()
if err != nil {
fmt.Printf(red+"Failed to analyze events with LLM: %v\n"+reset, err)
return fmt.Errorf("failed to analyze events with LLM: %w", err)
}
s.Suffix = " Analyzing events with LLM..."
s.Start()
gptResponse, err := openai.AnalyzeEventsWithGPT(pod, namespace, result)
s.Stop()
if err != nil {
return fmt.Errorf(red+"Failed to analyze events: %w\n"+reset, err)
}

// Display Analysis Result
fmt.Println(green + "\nLLM Analysis:\n" + reset + gptResponse)
// Display results using pager
shouldContinue := parseAndSelectAnalysis(gptResponse)
if !shouldContinue {
break // Exit the loop if "no"
}

}
return nil
},
}
Expand Down Expand Up @@ -273,3 +266,90 @@ func unzip(src, dest string) error {
}
return nil
}

type AnalysisResponse struct {
Summary string `json:"summary"`
RootCauseAnalysis string `json:"root_cause_analysis"`
MitigationSteps []string `json:"mitigation_steps"`
}

// In parseAndSelectAnalysis, modify to handle user decision to continue or not.
func parseAndSelectAnalysis(response string) bool {
var analysis AnalysisResponse
err := json.Unmarshal([]byte(response), &analysis)
if err != nil {
log.Fatalf("Failed to parse LLM response: %v", err)
}

// Display the summary by default
fmt.Println(green + "Summary:\n" + reset + analysis.Summary)

// Prompt the user to choose between "Root Cause Analysis" and "Mitigation Steps"
initialPrompt := promptui.Select{
Label: "Select Analysis to View",
Items: []string{"Root Cause Analysis", "Mitigation Steps"},
Size: 3,
}

_, choice, err := initialPrompt.Run()
if err != nil {
log.Fatalf("Prompt failed: %v", err)
}

switch choice {
case "Root Cause Analysis":
// Show Root Cause Analysis
fmt.Println(green + "Root Cause Analysis:\n" + reset + analysis.RootCauseAnalysis)

// Now prompt the user to choose between "Mitigation" or "Pods"
secondPrompt := promptui.Select{
Label: "What would you like to do next?",
Items: []string{"Mitigation"},
Size: 3,
}

_, secondChoice, err := secondPrompt.Run()
if err != nil {
log.Fatalf("Prompt failed: %v", err)
}

switch secondChoice {
case "Mitigation":
// Show Mitigation Steps
fmt.Println(green + "Mitigation Steps:\n" + reset)
for i, step := range analysis.MitigationSteps {
fmt.Printf("%d. %s\n", i+1, step)
}

// After displaying mitigation steps, ask if the user wants to analyze another pod/namespace
prompt := promptui.Prompt{
Label: "Analyze another namespace/pod (yes/no)",
Default: "no",
}
choice, _ := prompt.Run()
if strings.ToLower(choice) != "yes" {
return false // Exit the loop if "no"
}

}

case "Mitigation Steps":
// Show Mitigation Steps directly
fmt.Println(green + "Mitigation Steps:\n" + reset)
for i, step := range analysis.MitigationSteps {
fmt.Printf("%d. %s\n", i+1, step)
}

// After displaying mitigation steps, ask if the user wants to analyze another pod/namespace
prompt := promptui.Prompt{
Label: "Analyze another namespace/pod (yes/no)",
Default: "no",
}
choice, _ := prompt.Run()
if strings.ToLower(choice) != "yes" {
return false // Exit the loop if "no"
}
}

return true // Continue the loop if "yes"
}
14 changes: 11 additions & 3 deletions pkg/analyze/openai/openai.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,16 @@ func AnalyzeEventsWithGPT(podName, namespace string, data []SummaryStat) (string
prompt := fmt.Sprintf(
`You are an expert at debugging Kubernetes Events.
I have a table containing those events and want to debug what is happening in this pod (%s) / namespace (%s).
Give me a summary and overview of what happened by looking at these events.
Provide a root cause analysis and suggest steps to mitigate the error if present.
Give me a detailed summary of what happened by looking at these events.
Provide a root cause analysis and suggest steps to mitigate the error if present.
When sending the response give it in a structured body json. With fields summary, root_cause_analysis and mitigation_steps.
Don't add any json keywords in the response, make sure it just a clean json dump. Please adhere to the following structure
type AnalysisResponse struct {
Summary string json:"summary"
RootCauseAnalysis string json:"root_cause_analysis"
MitigationSteps []string json:"mitigation_steps"
}
In mitigation steps give a command to get logs.
In case you are unable to figure out what happened, just say "I'm unable to figure out what is happening here.".
%s`, podName, namespace, formattedData)

Expand All @@ -68,7 +76,7 @@ func AnalyzeEventsWithGPT(podName, namespace string, data []SummaryStat) (string
}

// Send the request to the OpenAI API
apiKey := os.Getenv("OPENAI_API_KEY")
apiKey := os.Getenv("P_LLM_KEY")
req, err := http.NewRequest("POST", "https://api.openai.com/v1/chat/completions", bytes.NewBuffer(payload))
if err != nil {
return "", fmt.Errorf("failed to create request: %w", err)
Expand Down

0 comments on commit f30de65

Please sign in to comment.