Skip to content

Commit

Permalink
add stream info
Browse files Browse the repository at this point in the history
  • Loading branch information
AdheipSingh committed Dec 16, 2024
1 parent 344cb2d commit 640b3ce
Showing 1 changed file with 62 additions and 10 deletions.
72 changes: 62 additions & 10 deletions cmd/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io"
"net/http"
internalHTTP "pb/pkg/http"
"strconv"
"strings"
Expand Down Expand Up @@ -200,6 +201,14 @@ var StatStreamCmd = &cobra.Command{
return err
}

// Fetch stream type
streamType, err := fetchInfo(&client, name)
if err != nil {
// Capture error
cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error())
return err
}

// Check output format
output, _ := cmd.Flags().GetString("output")
if output == "json" {
Expand All @@ -211,8 +220,9 @@ var StatStreamCmd = &cobra.Command{
"storage_size": humanize.Bytes(uint64(storageSize)),
"compression_ratio": fmt.Sprintf("%.2f%%", compressionRatio),
},
"retention": retention,
"alerts": alertsData.Alerts,
"retention": retention,
"alerts": alertsData.Alerts,
"stream_type": streamType,
}

jsonData, err := json.MarshalIndent(data, "", " ")
Expand All @@ -227,13 +237,13 @@ var StatStreamCmd = &cobra.Command{
isRetentionSet := len(retention) > 0
isAlertsSet := len(alertsData.Alerts) > 0

// Render the info section with consistent alignment
fmt.Println(StyleBold.Render("\nInfo:"))
fmt.Printf(" Event Count: %d\n", ingestionCount)
fmt.Printf(" Ingestion Size: %s\n", humanize.Bytes(uint64(ingestionSize)))
fmt.Printf(" Storage Size: %s\n", humanize.Bytes(uint64(storageSize)))
fmt.Printf(
" Compression Ratio: %.2f%s\n",
compressionRatio, "%")
fmt.Printf(" %-18s %d\n", "Event Count:", ingestionCount)
fmt.Printf(" %-18s %s\n", "Ingestion Size:", humanize.Bytes(uint64(ingestionSize)))
fmt.Printf(" %-18s %s\n", "Storage Size:", humanize.Bytes(uint64(storageSize)))
fmt.Printf(" %-18s %.2f%s\n", "Compression Ratio:", compressionRatio, "%")
fmt.Printf(" %-18s %s\n", "Stream Type:", streamType)
fmt.Println()

if isRetentionSet {
Expand Down Expand Up @@ -412,7 +422,7 @@ func fetchStats(client *internalHTTP.HTTPClient, name string) (data StreamStatsD
}

func fetchRetention(client *internalHTTP.HTTPClient, name string) (data StreamRetentionData, err error) {
req, err := client.NewRequest("GET", fmt.Sprintf("logstream/%s/retention", name), nil)
req, err := client.NewRequest(http.MethodGet, fmt.Sprintf("logstream/%s/retention", name), nil)
if err != nil {
return
}
Expand All @@ -439,7 +449,7 @@ func fetchRetention(client *internalHTTP.HTTPClient, name string) (data StreamRe
}

func fetchAlerts(client *internalHTTP.HTTPClient, name string) (data AlertConfig, err error) {
req, err := client.NewRequest("GET", fmt.Sprintf("logstream/%s/alert", name), nil)
req, err := client.NewRequest(http.MethodGet, fmt.Sprintf("logstream/%s/alert", name), nil)
if err != nil {
return
}
Expand All @@ -464,3 +474,45 @@ func fetchAlerts(client *internalHTTP.HTTPClient, name string) (data AlertConfig
}
return
}

func fetchInfo(client *internalHTTP.HTTPClient, name string) (streamType string, err error) {
// Create a new HTTP GET request
req, err := client.NewRequest(http.MethodGet, fmt.Sprintf("logstream/%s/info", name), nil)
if err != nil {
return "", fmt.Errorf("failed to create request: %w", err)
}

// Execute the request
resp, err := client.Client.Do(req)
if err != nil {
return "", fmt.Errorf("request execution failed: %w", err)
}
defer resp.Body.Close()

// Read the response body
bytes, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("failed to read response body: %w", err)
}

// Check for successful status code
if resp.StatusCode == http.StatusOK {
// Define a struct to parse the response
var response struct {
StreamType string `json:"stream_type"`
}

// Unmarshal JSON into the struct
if err := json.Unmarshal(bytes, &response); err != nil {
return "", fmt.Errorf("failed to unmarshal response: %w", err)
}

// Return the extracted stream_type
return response.StreamType, nil
}

// Handle non-200 responses
body := string(bytes)
errMsg := fmt.Sprintf("Request failed\nStatus Code: %d\nResponse: %s\n", resp.StatusCode, body)
return "", errors.New(errMsg)
}

0 comments on commit 640b3ce

Please sign in to comment.