Skip to content

Commit

Permalink
Merge branch 'main' into uninstaller
Browse files Browse the repository at this point in the history
  • Loading branch information
nitisht authored Dec 17, 2024
2 parents a996941 + f86d7ef commit cdb39e2
Show file tree
Hide file tree
Showing 6 changed files with 323 additions and 151 deletions.
161 changes: 161 additions & 0 deletions cmd/generate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package cmd

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"pb/pkg/common"
internalHTTP "pb/pkg/http"

"github.com/spf13/cobra"
)

const (
generateStaticSchemaPath = "/logstream/schema/detect"
)

var GenerateSchemaCmd = &cobra.Command{
Use: "generate",
Short: "Generate Schema for JSON",
Example: "pb schema generate --file=test.json",
RunE: func(cmd *cobra.Command, _ []string) error {
// Get the file path from the `--file` flag
filePath, err := cmd.Flags().GetString("file")
if err != nil {
return fmt.Errorf(common.Red+"failed to read file flag: %w"+common.Reset, err)
}

if filePath == "" {
return fmt.Errorf(common.Red + "file flag is required" + common.Reset)
}

// Read the file content
fileContent, err := os.ReadFile(filePath)
if err != nil {
return fmt.Errorf(common.Red+"failed to read file %s: %w"+common.Reset, filePath, err)
}

// Initialize HTTP client
client := internalHTTP.DefaultClient(&DefaultProfile)

// Create the HTTP request
req, err := client.NewRequest(http.MethodPost, generateStaticSchemaPath, bytes.NewBuffer(fileContent))
if err != nil {
return fmt.Errorf(common.Red+"failed to create new request: %w"+common.Reset, err)
}

// Set Content-Type header
req.Header.Set("Content-Type", "application/json")

// Execute the request
resp, err := client.Client.Do(req)
if err != nil {
return fmt.Errorf(common.Red+"request execution failed: %w"+common.Reset, err)
}
defer resp.Body.Close()

// Check for non-200 status codes
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
fmt.Printf(common.Red+"Error response: %s\n"+common.Reset, string(body))
return fmt.Errorf(common.Red+"non-200 status code received: %s"+common.Reset, resp.Status)
}

// Parse and print the response
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf(common.Red+"failed to read response body: %w"+common.Reset, err)
}

var prettyJSON bytes.Buffer
if err := json.Indent(&prettyJSON, respBody, "", " "); err != nil {
return fmt.Errorf(common.Red+"failed to format response as JSON: %w"+common.Reset, err)
}

fmt.Println(common.Green + prettyJSON.String() + common.Reset)
return nil
},
}

var CreateSchemaCmd = &cobra.Command{
Use: "create",
Short: "Create Schema for a Parseable stream",
Example: "pb schema create --stream=my_stream --file=schema.json",
RunE: func(cmd *cobra.Command, _ []string) error {
// Get the stream name from the `--stream` flag
streamName, err := cmd.Flags().GetString("stream")
if err != nil {
return fmt.Errorf(common.Red+"failed to read stream flag: %w"+common.Reset, err)
}

if streamName == "" {
return fmt.Errorf(common.Red + "stream flag is required" + common.Reset)
}

// Get the file path from the `--file` flag
filePath, err := cmd.Flags().GetString("file")
if err != nil {
return fmt.Errorf(common.Red+"failed to read config flag: %w"+common.Reset, err)
}

if filePath == "" {
return fmt.Errorf(common.Red + "file path flag is required" + common.Reset)
}

// Read the JSON schema file
schemaContent, err := os.ReadFile(filePath)
if err != nil {
return fmt.Errorf(common.Red+"failed to read schema file %s: %w"+common.Reset, filePath, err)
}

// Initialize HTTP client
client := internalHTTP.DefaultClient(&DefaultProfile)

// Construct the API path
apiPath := fmt.Sprintf("/logstream/%s", streamName)

// Create the HTTP PUT request
req, err := client.NewRequest(http.MethodPut, apiPath, bytes.NewBuffer(schemaContent))
if err != nil {
return fmt.Errorf(common.Red+"failed to create new request: %w"+common.Reset, err)
}

// Set custom headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-P-Static-Schema-Flag", "true")

// Execute the request
resp, err := client.Client.Do(req)
if err != nil {
return fmt.Errorf(common.Red+"request execution failed: %w"+common.Reset, err)
}
defer resp.Body.Close()

// Check for non-200 status codes
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
fmt.Printf(common.Red+"Error response: %s\n"+common.Reset, string(body))
return fmt.Errorf(common.Red+"non-200 status code received: %s"+common.Reset, resp.Status)
}

// Parse and print the response
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf(common.Red+"failed to read response body: %w"+common.Reset, err)
}

fmt.Println(common.Green + string(respBody) + common.Reset)
return nil
},
}

func init() {
// Add the `--file` flag to the command
GenerateSchemaCmd.Flags().StringP("file", "f", "", "Path to the JSON file to generate schema")
CreateSchemaCmd.Flags().StringP("stream", "s", "", "Name of the stream to associate with the schema")
CreateSchemaCmd.Flags().StringP("file", "f", "", "Path to the JSON file to create schema")

}
36 changes: 17 additions & 19 deletions cmd/installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,20 @@ import (
"net"
"os"
"os/exec"
"pb/pkg/common"
"pb/pkg/helm"
"pb/pkg/installer"
"runtime"
"strings"
"sync"
"time"

"pb/pkg/common"
"pb/pkg/helm"
"pb/pkg/installer"

"github.com/briandowns/spinner"
"github.com/spf13/cobra"
)

var (
verbose bool
)
var verbose bool

var InstallOssCmd = &cobra.Command{
Use: "oss",
Expand All @@ -40,10 +39,12 @@ var InstallOssCmd = &cobra.Command{
return err
}

fmt.Println(common.Green + "You selected the following plan:" + common.Reset)
fmt.Printf(common.Cyan+"Plan: %s\n"+common.Yellow+"Ingestion Speed: %s\n"+common.Green+"Per Day Ingestion: %s\n"+
common.Blue+"Query Performance: %s\n"+common.Red+"CPU & Memory: %s\n"+common.Reset,
selectedPlan.Name, selectedPlan.IngestionSpeed, selectedPlan.PerDayIngestion,
fmt.Printf(
common.Cyan+" Ingestion Speed: %s\n"+
common.Cyan+" Per Day Ingestion: %s\n"+
common.Cyan+" Query Performance: %s\n"+
common.Cyan+" CPU & Memory: %s\n"+
common.Reset, selectedPlan.IngestionSpeed, selectedPlan.PerDayIngestion,
selectedPlan.QueryPerformance, selectedPlan.CPUAndMemorySpecs)

// Get namespace and chart values from installer
Expand Down Expand Up @@ -95,7 +96,6 @@ var InstallOssCmd = &cobra.Command{
// Stop the spinner and restore stdout
spinner.Stop()
if !verbose {
//w.Close()
os.Stdout = oldStdout
}

Expand All @@ -115,12 +115,12 @@ var InstallOssCmd = &cobra.Command{

// printSuccessBanner remains the same as in the original code
func printSuccessBanner(namespace, deployment, version, username, password string) {
var ingestionUrl, serviceName string
var ingestionURL, serviceName string
if deployment == "standalone" {
ingestionUrl = "parseable." + namespace + ".svc.cluster.local"
ingestionURL = "parseable." + namespace + ".svc.cluster.local"
serviceName = "parseable"
} else if deployment == "distributed" {
ingestionUrl = "parseable-ingestor-svc." + namespace + ".svc.cluster.local"
ingestionURL = "parseable-ingestor-svc." + namespace + ".svc.cluster.local"
serviceName = "parseable-query-svc"
}

Expand All @@ -143,7 +143,7 @@ func printSuccessBanner(namespace, deployment, version, username, password strin
fmt.Printf("%s Deployment Details:\n", common.Blue+"ℹ️ ")
fmt.Printf(" • Namespace: %s\n", common.Blue+namespace)
fmt.Printf(" • Chart Version: %s\n", common.Blue+version)
fmt.Printf(" • Ingestion URL: %s\n", ingestionUrl)
fmt.Printf(" • Ingestion URL: %s\n", ingestionURL)

fmt.Println("\n" + common.Blue + "🔗 Resources:" + common.Reset)
fmt.Println(common.Blue + " • Documentation: https://www.parseable.com/docs/server/introduction")
Expand All @@ -155,16 +155,14 @@ func printSuccessBanner(namespace, deployment, version, username, password strin
localPort := "8000"
fmt.Printf(common.Green+"Port-forwarding %s service on port %s...\n"+common.Reset, serviceName, localPort)

err = startPortForward(namespace, serviceName, "80", localPort)
if err != nil {
fmt.Errorf(common.Red+"failed to port-forward service: %w", err)
if err = startPortForward(namespace, serviceName, "80", localPort); err != nil {
fmt.Printf(common.Red+"failed to port-forward service: %s", err.Error())
}

// Redirect to UI
localURL := fmt.Sprintf("http://localhost:%s/login?q=%s", localPort, base64EncodedString)
fmt.Printf(common.Green+"Opening Parseable UI at %s\n"+common.Reset, localURL)
openBrowser(localURL)

}

func createDeploymentSpinner(namespace string) *spinner.Spinner {
Expand Down
72 changes: 62 additions & 10 deletions cmd/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io"
"net/http"
internalHTTP "pb/pkg/http"
"strconv"
"strings"
Expand Down Expand Up @@ -200,6 +201,14 @@ var StatStreamCmd = &cobra.Command{
return err
}

// Fetch stream type
streamType, err := fetchInfo(&client, name)
if err != nil {
// Capture error
cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error())
return err
}

// Check output format
output, _ := cmd.Flags().GetString("output")
if output == "json" {
Expand All @@ -211,8 +220,9 @@ var StatStreamCmd = &cobra.Command{
"storage_size": humanize.Bytes(uint64(storageSize)),
"compression_ratio": fmt.Sprintf("%.2f%%", compressionRatio),
},
"retention": retention,
"alerts": alertsData.Alerts,
"retention": retention,
"alerts": alertsData.Alerts,
"stream_type": streamType,
}

jsonData, err := json.MarshalIndent(data, "", " ")
Expand All @@ -227,13 +237,13 @@ var StatStreamCmd = &cobra.Command{
isRetentionSet := len(retention) > 0
isAlertsSet := len(alertsData.Alerts) > 0

// Render the info section with consistent alignment
fmt.Println(StyleBold.Render("\nInfo:"))
fmt.Printf(" Event Count: %d\n", ingestionCount)
fmt.Printf(" Ingestion Size: %s\n", humanize.Bytes(uint64(ingestionSize)))
fmt.Printf(" Storage Size: %s\n", humanize.Bytes(uint64(storageSize)))
fmt.Printf(
" Compression Ratio: %.2f%s\n",
compressionRatio, "%")
fmt.Printf(" %-18s %d\n", "Event Count:", ingestionCount)
fmt.Printf(" %-18s %s\n", "Ingestion Size:", humanize.Bytes(uint64(ingestionSize)))
fmt.Printf(" %-18s %s\n", "Storage Size:", humanize.Bytes(uint64(storageSize)))
fmt.Printf(" %-18s %.2f%s\n", "Compression Ratio:", compressionRatio, "%")
fmt.Printf(" %-18s %s\n", "Stream Type:", streamType)
fmt.Println()

if isRetentionSet {
Expand Down Expand Up @@ -412,7 +422,7 @@ func fetchStats(client *internalHTTP.HTTPClient, name string) (data StreamStatsD
}

func fetchRetention(client *internalHTTP.HTTPClient, name string) (data StreamRetentionData, err error) {
req, err := client.NewRequest("GET", fmt.Sprintf("logstream/%s/retention", name), nil)
req, err := client.NewRequest(http.MethodGet, fmt.Sprintf("logstream/%s/retention", name), nil)
if err != nil {
return
}
Expand All @@ -439,7 +449,7 @@ func fetchRetention(client *internalHTTP.HTTPClient, name string) (data StreamRe
}

func fetchAlerts(client *internalHTTP.HTTPClient, name string) (data AlertConfig, err error) {
req, err := client.NewRequest("GET", fmt.Sprintf("logstream/%s/alert", name), nil)
req, err := client.NewRequest(http.MethodGet, fmt.Sprintf("logstream/%s/alert", name), nil)
if err != nil {
return
}
Expand All @@ -464,3 +474,45 @@ func fetchAlerts(client *internalHTTP.HTTPClient, name string) (data AlertConfig
}
return
}

func fetchInfo(client *internalHTTP.HTTPClient, name string) (streamType string, err error) {
// Create a new HTTP GET request
req, err := client.NewRequest(http.MethodGet, fmt.Sprintf("logstream/%s/info", name), nil)
if err != nil {
return "", fmt.Errorf("failed to create request: %w", err)
}

// Execute the request
resp, err := client.Client.Do(req)
if err != nil {
return "", fmt.Errorf("request execution failed: %w", err)
}
defer resp.Body.Close()

// Read the response body
bytes, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("failed to read response body: %w", err)
}

// Check for successful status code
if resp.StatusCode == http.StatusOK {
// Define a struct to parse the response
var response struct {
StreamType string `json:"stream_type"`
}

// Unmarshal JSON into the struct
if err := json.Unmarshal(bytes, &response); err != nil {
return "", fmt.Errorf("failed to unmarshal response: %w", err)
}

// Return the extracted stream_type
return response.StreamType, nil
}

// Handle non-200 responses
body := string(bytes)
errMsg := fmt.Sprintf("Request failed\nStatus Code: %d\nResponse: %s\n", resp.StatusCode, body)
return "", errors.New(errMsg)
}
Loading

0 comments on commit cdb39e2

Please sign in to comment.