Skip to content

Commit

Permalink
streams
Browse files Browse the repository at this point in the history
  • Loading branch information
AdheipSingh committed Nov 9, 2024
1 parent 7a4220a commit 4e857b8
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 51 deletions.
124 changes: 73 additions & 51 deletions cmd/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"io"
"net/http"
internalHTTP "pb/pkg/http"
"strconv"
"strings"
Expand Down Expand Up @@ -111,24 +110,39 @@ var AddStreamCmd = &cobra.Command{
Example: " pb stream add backend_logs",
Short: "Create a new stream",
Args: cobra.ExactArgs(1),
RunE: func(_ *cobra.Command, args []string) error {
RunE: func(cmd *cobra.Command, args []string) error {
// Capture start time
startTime := time.Now()
cmd.Annotations = make(map[string]string)
defer func() {
cmd.Annotations["executionTime"] = time.Since(startTime).String()
}()

name := args[0]
client := internalHTTP.DefaultClient(&DefaultProfile)
req, err := client.NewRequest("PUT", "logstream/"+name, nil)
if err != nil {
// Capture error
cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error())
return err
}

resp, err := client.Client.Do(req)
if err != nil {
// Capture error
cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error())
return err
}

// Capture execution time
cmd.Annotations["executionTime"] = time.Since(startTime).String()

if resp.StatusCode == 200 {
fmt.Printf("Created stream %s\n", StyleBold.Render(name))
} else {
bytes, err := io.ReadAll(resp.Body)
if err != nil {
cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error())
return err
}
body := string(bytes)
Expand All @@ -147,12 +161,21 @@ var StatStreamCmd = &cobra.Command{
Short: "Get statistics for a stream",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
// Capture start time
startTime := time.Now()
cmd.Annotations = make(map[string]string)
defer func() {
cmd.Annotations["executionTime"] = time.Since(startTime).String()
}()

name := args[0]
client := internalHTTP.DefaultClient(&DefaultProfile)

// Fetch stats data
stats, err := fetchStats(&client, name)
if err != nil {
// Capture error
cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error())
return err
}

Expand All @@ -164,12 +187,16 @@ var StatStreamCmd = &cobra.Command{
// Fetch retention data
retention, err := fetchRetention(&client, name)
if err != nil {
// Capture error
cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error())
return err
}

// Fetch alerts data
alertsData, err := fetchAlerts(&client, name)
if err != nil {
// Capture error
cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error())
return err
}

Expand All @@ -190,6 +217,8 @@ var StatStreamCmd = &cobra.Command{

jsonData, err := json.MarshalIndent(data, "", " ")
if err != nil {
// Capture error
cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error())
return err
}
fmt.Println(string(jsonData))
Expand Down Expand Up @@ -255,104 +284,97 @@ var RemoveStreamCmd = &cobra.Command{
Example: " pb stream remove backend_logs",
Short: "Delete a stream",
Args: cobra.ExactArgs(1),
RunE: func(_ *cobra.Command, args []string) error {
RunE: func(cmd *cobra.Command, args []string) error {
// Capture start time
startTime := time.Now()
cmd.Annotations = make(map[string]string)
defer func() {
cmd.Annotations["executionTime"] = time.Since(startTime).String()
}()

name := args[0]
client := internalHTTP.DefaultClient(&DefaultProfile)
req, err := client.NewRequest("DELETE", "logstream/"+name, nil)
if err != nil {
// Capture error
cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error())
return err
}

resp, err := client.Client.Do(req)
if err != nil {
// Capture error
cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error())
return err
}

// Capture execution time
cmd.Annotations["executionTime"] = time.Since(startTime).String()

if resp.StatusCode == 200 {
fmt.Printf("Removed stream %s\n", StyleBold.Render(name))
fmt.Printf("Successfully deleted stream %s\n", StyleBold.Render(name))
} else {
bytes, err := io.ReadAll(resp.Body)
if err != nil {
cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error())
return err
}
body := string(bytes)
defer resp.Body.Close()

fmt.Printf("Request Failed\nStatus Code: %s\nResponse: %s\n", resp.Status, body)
}

return nil
},
}

// ListStreamCmd is the list command for streams
var ListStreamCmd = &cobra.Command{
Use: "list",
Short: "List all streams",
Example: " pb stream list",
RunE: func(cmd *cobra.Command, _ []string) error {
Short: "List all streams",
RunE: func(cmd *cobra.Command, args []string) error {
// Capture start time

Check failure on line 337 in cmd/stream.go

View workflow job for this annotation

GitHub Actions / Build and Test the Go code

unused-parameter: parameter 'args' seems to be unused, consider removing or renaming it as _ (revive)
startTime := time.Now()
cmd.Annotations = make(map[string]string)
defer func() {
cmd.Annotations["executionTime"] = time.Since(startTime).String()
}()

client := internalHTTP.DefaultClient(&DefaultProfile)
req, err := client.NewRequest("GET", "logstream", nil)
if err != nil {
// Capture error
cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error())
return err
}

resp, err := client.Client.Do(req)
if err != nil {
// Capture error
cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error())
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
// Read response body for error message
var streams []StreamListItem
if resp.StatusCode == 200 {
bytes, err := io.ReadAll(resp.Body)
if err != nil {
cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error())
return err
}
body := string(bytes)
fmt.Printf("Request Failed\nStatus Code: %s\nResponse: %s\n", resp.Status, body)
return nil
}

var items []map[string]string
err = json.NewDecoder(resp.Body).Decode(&items)
if err != nil {
return err
}

// Get output flag value
outputFormat, err := cmd.Flags().GetString("output")
if err != nil {
return err
}

// Handle JSON output format
if outputFormat == "json" {
// Collect stream names for JSON output
streams := make([]string, len(items))
for i, item := range items {
streams[i] = item["name"]
}
jsonOutput, err := json.MarshalIndent(streams, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal JSON output: %w", err)
if err := json.Unmarshal(bytes, &streams); err != nil {
cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error())
return err
}
fmt.Println(string(jsonOutput))
return nil
}

// Default to text output
if len(items) == 0 {
fmt.Println("No streams found")
return nil
for _, stream := range streams {
fmt.Println(stream.Render())
}
} else {
fmt.Printf("Failed to fetch streams. Status Code: %s\n", resp.Status)
}

fmt.Println()
for _, item := range items {
streamItem := StreamListItem{Name: item["name"]}
fmt.Print("• ")
fmt.Println(streamItem.Render())
}
fmt.Println()
return nil
},
}
Expand Down
3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ var stream = &cobra.Command{
Short: "Manage streams",
Long: "\nstream command is used to manage streams.",
PersistentPreRunE: cmd.PreRunDefaultProfile,
PersistentPostRun: func(cmd *cobra.Command, args []string) {
analytics.PostRunAnalytics(cmd, args)
},
}

var query = &cobra.Command{
Expand Down

0 comments on commit 4e857b8

Please sign in to comment.