diff --git a/cmd/stream.go b/cmd/stream.go index 7dcef13..5afa8cc 100644 --- a/cmd/stream.go +++ b/cmd/stream.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "io" + "net/http" internalHTTP "pb/pkg/http" "strconv" "strings" @@ -200,6 +201,14 @@ var StatStreamCmd = &cobra.Command{ return err } + // Fetch stream type + streamType, err := fetchInfo(&client, name) + if err != nil { + // Capture error + cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error()) + return err + } + // Check output format output, _ := cmd.Flags().GetString("output") if output == "json" { @@ -211,8 +220,9 @@ var StatStreamCmd = &cobra.Command{ "storage_size": humanize.Bytes(uint64(storageSize)), "compression_ratio": fmt.Sprintf("%.2f%%", compressionRatio), }, - "retention": retention, - "alerts": alertsData.Alerts, + "retention": retention, + "alerts": alertsData.Alerts, + "stream_type": streamType, } jsonData, err := json.MarshalIndent(data, "", " ") @@ -227,13 +237,13 @@ var StatStreamCmd = &cobra.Command{ isRetentionSet := len(retention) > 0 isAlertsSet := len(alertsData.Alerts) > 0 + // Render the info section with consistent alignment fmt.Println(StyleBold.Render("\nInfo:")) - fmt.Printf(" Event Count: %d\n", ingestionCount) - fmt.Printf(" Ingestion Size: %s\n", humanize.Bytes(uint64(ingestionSize))) - fmt.Printf(" Storage Size: %s\n", humanize.Bytes(uint64(storageSize))) - fmt.Printf( - " Compression Ratio: %.2f%s\n", - compressionRatio, "%") + fmt.Printf(" %-18s %d\n", "Event Count:", ingestionCount) + fmt.Printf(" %-18s %s\n", "Ingestion Size:", humanize.Bytes(uint64(ingestionSize))) + fmt.Printf(" %-18s %s\n", "Storage Size:", humanize.Bytes(uint64(storageSize))) + fmt.Printf(" %-18s %.2f%s\n", "Compression Ratio:", compressionRatio, "%") + fmt.Printf(" %-18s %s\n", "Stream Type:", streamType) fmt.Println() if isRetentionSet { @@ -412,7 +422,7 @@ func fetchStats(client *internalHTTP.HTTPClient, name string) (data StreamStatsD } func fetchRetention(client *internalHTTP.HTTPClient, name string) (data StreamRetentionData, err error) { - req, err := client.NewRequest("GET", fmt.Sprintf("logstream/%s/retention", name), nil) + req, err := client.NewRequest(http.MethodGet, fmt.Sprintf("logstream/%s/retention", name), nil) if err != nil { return } @@ -439,7 +449,7 @@ func fetchRetention(client *internalHTTP.HTTPClient, name string) (data StreamRe } func fetchAlerts(client *internalHTTP.HTTPClient, name string) (data AlertConfig, err error) { - req, err := client.NewRequest("GET", fmt.Sprintf("logstream/%s/alert", name), nil) + req, err := client.NewRequest(http.MethodGet, fmt.Sprintf("logstream/%s/alert", name), nil) if err != nil { return } @@ -464,3 +474,45 @@ func fetchAlerts(client *internalHTTP.HTTPClient, name string) (data AlertConfig } return } + +func fetchInfo(client *internalHTTP.HTTPClient, name string) (streamType string, err error) { + // Create a new HTTP GET request + req, err := client.NewRequest(http.MethodGet, fmt.Sprintf("logstream/%s/info", name), nil) + if err != nil { + return "", fmt.Errorf("failed to create request: %w", err) + } + + // Execute the request + resp, err := client.Client.Do(req) + if err != nil { + return "", fmt.Errorf("request execution failed: %w", err) + } + defer resp.Body.Close() + + // Read the response body + bytes, err := io.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("failed to read response body: %w", err) + } + + // Check for successful status code + if resp.StatusCode == http.StatusOK { + // Define a struct to parse the response + var response struct { + StreamType string `json:"stream_type"` + } + + // Unmarshal JSON into the struct + if err := json.Unmarshal(bytes, &response); err != nil { + return "", fmt.Errorf("failed to unmarshal response: %w", err) + } + + // Return the extracted stream_type + return response.StreamType, nil + } + + // Handle non-200 responses + body := string(bytes) + errMsg := fmt.Sprintf("Request failed\nStatus Code: %d\nResponse: %s\n", resp.StatusCode, body) + return "", errors.New(errMsg) +}