From 4e857b889e93968ff6d57ea9cb5242ec6c33ea98 Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Sun, 10 Nov 2024 00:07:46 +0530 Subject: [PATCH] streams --- cmd/stream.go | 124 +++++++++++++++++++++++++++++--------------------- main.go | 3 ++ 2 files changed, 76 insertions(+), 51 deletions(-) diff --git a/cmd/stream.go b/cmd/stream.go index bde61e6..d5d981b 100644 --- a/cmd/stream.go +++ b/cmd/stream.go @@ -20,7 +20,6 @@ import ( "errors" "fmt" "io" - "net/http" internalHTTP "pb/pkg/http" "strconv" "strings" @@ -111,24 +110,39 @@ var AddStreamCmd = &cobra.Command{ Example: " pb stream add backend_logs", Short: "Create a new stream", Args: cobra.ExactArgs(1), - RunE: func(_ *cobra.Command, args []string) error { + RunE: func(cmd *cobra.Command, args []string) error { + // Capture start time + startTime := time.Now() + cmd.Annotations = make(map[string]string) + defer func() { + cmd.Annotations["executionTime"] = time.Since(startTime).String() + }() + name := args[0] client := internalHTTP.DefaultClient(&DefaultProfile) req, err := client.NewRequest("PUT", "logstream/"+name, nil) if err != nil { + // Capture error + cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error()) return err } resp, err := client.Client.Do(req) if err != nil { + // Capture error + cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error()) return err } + // Capture execution time + cmd.Annotations["executionTime"] = time.Since(startTime).String() + if resp.StatusCode == 200 { fmt.Printf("Created stream %s\n", StyleBold.Render(name)) } else { bytes, err := io.ReadAll(resp.Body) if err != nil { + cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error()) return err } body := string(bytes) @@ -147,12 +161,21 @@ var StatStreamCmd = &cobra.Command{ Short: "Get statistics for a stream", Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { + // Capture start time + startTime := time.Now() + cmd.Annotations = make(map[string]string) + defer func() { + cmd.Annotations["executionTime"] = time.Since(startTime).String() + }() + name := args[0] client := internalHTTP.DefaultClient(&DefaultProfile) // Fetch stats data stats, err := fetchStats(&client, name) if err != nil { + // Capture error + cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error()) return err } @@ -164,12 +187,16 @@ var StatStreamCmd = &cobra.Command{ // Fetch retention data retention, err := fetchRetention(&client, name) if err != nil { + // Capture error + cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error()) return err } // Fetch alerts data alertsData, err := fetchAlerts(&client, name) if err != nil { + // Capture error + cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error()) return err } @@ -190,6 +217,8 @@ var StatStreamCmd = &cobra.Command{ jsonData, err := json.MarshalIndent(data, "", " ") if err != nil { + // Capture error + cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error()) return err } fmt.Println(string(jsonData)) @@ -255,29 +284,43 @@ var RemoveStreamCmd = &cobra.Command{ Example: " pb stream remove backend_logs", Short: "Delete a stream", Args: cobra.ExactArgs(1), - RunE: func(_ *cobra.Command, args []string) error { + RunE: func(cmd *cobra.Command, args []string) error { + // Capture start time + startTime := time.Now() + cmd.Annotations = make(map[string]string) + defer func() { + cmd.Annotations["executionTime"] = time.Since(startTime).String() + }() + name := args[0] client := internalHTTP.DefaultClient(&DefaultProfile) req, err := client.NewRequest("DELETE", "logstream/"+name, nil) if err != nil { + // Capture error + cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error()) return err } resp, err := client.Client.Do(req) if err != nil { + // Capture error + cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error()) return err } + // Capture execution time + cmd.Annotations["executionTime"] = time.Since(startTime).String() + if resp.StatusCode == 200 { - fmt.Printf("Removed stream %s\n", StyleBold.Render(name)) + fmt.Printf("Successfully deleted stream %s\n", StyleBold.Render(name)) } else { bytes, err := io.ReadAll(resp.Body) if err != nil { + cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error()) return err } body := string(bytes) defer resp.Body.Close() - fmt.Printf("Request Failed\nStatus Code: %s\nResponse: %s\n", resp.Status, body) } @@ -285,74 +328,53 @@ var RemoveStreamCmd = &cobra.Command{ }, } +// ListStreamCmd is the list command for streams var ListStreamCmd = &cobra.Command{ Use: "list", - Short: "List all streams", Example: " pb stream list", - RunE: func(cmd *cobra.Command, _ []string) error { + Short: "List all streams", + RunE: func(cmd *cobra.Command, args []string) error { + // Capture start time + startTime := time.Now() + cmd.Annotations = make(map[string]string) + defer func() { + cmd.Annotations["executionTime"] = time.Since(startTime).String() + }() + client := internalHTTP.DefaultClient(&DefaultProfile) req, err := client.NewRequest("GET", "logstream", nil) if err != nil { + // Capture error + cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error()) return err } resp, err := client.Client.Do(req) if err != nil { + // Capture error + cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error()) return err } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - // Read response body for error message + var streams []StreamListItem + if resp.StatusCode == 200 { bytes, err := io.ReadAll(resp.Body) if err != nil { + cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error()) return err } - body := string(bytes) - fmt.Printf("Request Failed\nStatus Code: %s\nResponse: %s\n", resp.Status, body) - return nil - } - - var items []map[string]string - err = json.NewDecoder(resp.Body).Decode(&items) - if err != nil { - return err - } - - // Get output flag value - outputFormat, err := cmd.Flags().GetString("output") - if err != nil { - return err - } - - // Handle JSON output format - if outputFormat == "json" { - // Collect stream names for JSON output - streams := make([]string, len(items)) - for i, item := range items { - streams[i] = item["name"] - } - jsonOutput, err := json.MarshalIndent(streams, "", " ") - if err != nil { - return fmt.Errorf("failed to marshal JSON output: %w", err) + if err := json.Unmarshal(bytes, &streams); err != nil { + cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error()) + return err } - fmt.Println(string(jsonOutput)) - return nil - } - // Default to text output - if len(items) == 0 { - fmt.Println("No streams found") - return nil + for _, stream := range streams { + fmt.Println(stream.Render()) + } + } else { + fmt.Printf("Failed to fetch streams. Status Code: %s\n", resp.Status) } - fmt.Println() - for _, item := range items { - streamItem := StreamListItem{Name: item["name"]} - fmt.Print("• ") - fmt.Println(streamItem.Render()) - } - fmt.Println() return nil }, } diff --git a/main.go b/main.go index 6dbd737..aed936e 100644 --- a/main.go +++ b/main.go @@ -91,6 +91,9 @@ var stream = &cobra.Command{ Short: "Manage streams", Long: "\nstream command is used to manage streams.", PersistentPreRunE: cmd.PreRunDefaultProfile, + PersistentPostRun: func(cmd *cobra.Command, args []string) { + analytics.PostRunAnalytics(cmd, args) + }, } var query = &cobra.Command{