Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improvement: add stream info #78

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 62 additions & 10 deletions cmd/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io"
"net/http"
internalHTTP "pb/pkg/http"
"strconv"
"strings"
Expand Down Expand Up @@ -200,6 +201,14 @@ var StatStreamCmd = &cobra.Command{
return err
}

// Fetch stream type
streamType, err := fetchInfo(&client, name)
if err != nil {
// Capture error
cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error())
return err
}

// Check output format
output, _ := cmd.Flags().GetString("output")
if output == "json" {
Expand All @@ -211,8 +220,9 @@ var StatStreamCmd = &cobra.Command{
"storage_size": humanize.Bytes(uint64(storageSize)),
"compression_ratio": fmt.Sprintf("%.2f%%", compressionRatio),
},
"retention": retention,
"alerts": alertsData.Alerts,
"retention": retention,
"alerts": alertsData.Alerts,
"stream_type": streamType,
}

jsonData, err := json.MarshalIndent(data, "", " ")
Expand All @@ -227,13 +237,13 @@ var StatStreamCmd = &cobra.Command{
isRetentionSet := len(retention) > 0
isAlertsSet := len(alertsData.Alerts) > 0

// Render the info section with consistent alignment
fmt.Println(StyleBold.Render("\nInfo:"))
fmt.Printf(" Event Count: %d\n", ingestionCount)
fmt.Printf(" Ingestion Size: %s\n", humanize.Bytes(uint64(ingestionSize)))
fmt.Printf(" Storage Size: %s\n", humanize.Bytes(uint64(storageSize)))
fmt.Printf(
" Compression Ratio: %.2f%s\n",
compressionRatio, "%")
fmt.Printf(" %-18s %d\n", "Event Count:", ingestionCount)
fmt.Printf(" %-18s %s\n", "Ingestion Size:", humanize.Bytes(uint64(ingestionSize)))
fmt.Printf(" %-18s %s\n", "Storage Size:", humanize.Bytes(uint64(storageSize)))
fmt.Printf(" %-18s %.2f%s\n", "Compression Ratio:", compressionRatio, "%")
fmt.Printf(" %-18s %s\n", "Stream Type:", streamType)
fmt.Println()

if isRetentionSet {
Expand Down Expand Up @@ -412,7 +422,7 @@ func fetchStats(client *internalHTTP.HTTPClient, name string) (data StreamStatsD
}

func fetchRetention(client *internalHTTP.HTTPClient, name string) (data StreamRetentionData, err error) {
req, err := client.NewRequest("GET", fmt.Sprintf("logstream/%s/retention", name), nil)
req, err := client.NewRequest(http.MethodGet, fmt.Sprintf("logstream/%s/retention", name), nil)
if err != nil {
return
}
Expand All @@ -439,7 +449,7 @@ func fetchRetention(client *internalHTTP.HTTPClient, name string) (data StreamRe
}

func fetchAlerts(client *internalHTTP.HTTPClient, name string) (data AlertConfig, err error) {
req, err := client.NewRequest("GET", fmt.Sprintf("logstream/%s/alert", name), nil)
req, err := client.NewRequest(http.MethodGet, fmt.Sprintf("logstream/%s/alert", name), nil)
if err != nil {
return
}
Expand All @@ -464,3 +474,45 @@ func fetchAlerts(client *internalHTTP.HTTPClient, name string) (data AlertConfig
}
return
}

func fetchInfo(client *internalHTTP.HTTPClient, name string) (streamType string, err error) {
// Create a new HTTP GET request
req, err := client.NewRequest(http.MethodGet, fmt.Sprintf("logstream/%s/info", name), nil)
if err != nil {
return "", fmt.Errorf("failed to create request: %w", err)
}

// Execute the request
resp, err := client.Client.Do(req)
if err != nil {
return "", fmt.Errorf("request execution failed: %w", err)
}
defer resp.Body.Close()

// Read the response body
bytes, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("failed to read response body: %w", err)
}

// Check for successful status code
if resp.StatusCode == http.StatusOK {
// Define a struct to parse the response
var response struct {
StreamType string `json:"stream_type"`
}

// Unmarshal JSON into the struct
if err := json.Unmarshal(bytes, &response); err != nil {
return "", fmt.Errorf("failed to unmarshal response: %w", err)
}

// Return the extracted stream_type
return response.StreamType, nil
}

// Handle non-200 responses
body := string(bytes)
errMsg := fmt.Sprintf("Request failed\nStatus Code: %d\nResponse: %s\n", resp.StatusCode, body)
return "", errors.New(errMsg)
}
Loading