From 26ae19300daa82c60c843c81995205da7cf5e02b Mon Sep 17 00:00:00 2001 From: Nitish Tiwari Date: Tue, 17 Dec 2024 06:20:56 +0530 Subject: [PATCH 1/3] update install command (#77) Co-authored-by: AdheipSingh <34169002+AdheipSingh@users.noreply.github.com> --- cmd/installer.go | 36 ++++----- pkg/installer/installer.go | 158 +++++++++++-------------------------- pkg/installer/plans.go | 17 ++-- 3 files changed, 70 insertions(+), 141 deletions(-) diff --git a/cmd/installer.go b/cmd/installer.go index 5963289..24dbe49 100644 --- a/cmd/installer.go +++ b/cmd/installer.go @@ -7,21 +7,20 @@ import ( "net" "os" "os/exec" - "pb/pkg/common" - "pb/pkg/helm" - "pb/pkg/installer" "runtime" "strings" "sync" "time" + "pb/pkg/common" + "pb/pkg/helm" + "pb/pkg/installer" + "github.com/briandowns/spinner" "github.com/spf13/cobra" ) -var ( - verbose bool -) +var verbose bool var InstallOssCmd = &cobra.Command{ Use: "oss", @@ -40,10 +39,12 @@ var InstallOssCmd = &cobra.Command{ return err } - fmt.Println(common.Green + "You selected the following plan:" + common.Reset) - fmt.Printf(common.Cyan+"Plan: %s\n"+common.Yellow+"Ingestion Speed: %s\n"+common.Green+"Per Day Ingestion: %s\n"+ - common.Blue+"Query Performance: %s\n"+common.Red+"CPU & Memory: %s\n"+common.Reset, - selectedPlan.Name, selectedPlan.IngestionSpeed, selectedPlan.PerDayIngestion, + fmt.Printf( + common.Cyan+" Ingestion Speed: %s\n"+ + common.Cyan+" Per Day Ingestion: %s\n"+ + common.Cyan+" Query Performance: %s\n"+ + common.Cyan+" CPU & Memory: %s\n"+ + common.Reset, selectedPlan.IngestionSpeed, selectedPlan.PerDayIngestion, selectedPlan.QueryPerformance, selectedPlan.CPUAndMemorySpecs) // Get namespace and chart values from installer @@ -95,7 +96,6 @@ var InstallOssCmd = &cobra.Command{ // Stop the spinner and restore stdout spinner.Stop() if !verbose { - //w.Close() os.Stdout = oldStdout } @@ -115,12 +115,12 @@ var InstallOssCmd = &cobra.Command{ // printSuccessBanner remains the same as in the original code func printSuccessBanner(namespace, deployment, version, username, password string) { - var ingestionUrl, serviceName string + var ingestionURL, serviceName string if deployment == "standalone" { - ingestionUrl = "parseable." + namespace + ".svc.cluster.local" + ingestionURL = "parseable." + namespace + ".svc.cluster.local" serviceName = "parseable" } else if deployment == "distributed" { - ingestionUrl = "parseable-ingestor-svc." + namespace + ".svc.cluster.local" + ingestionURL = "parseable-ingestor-svc." + namespace + ".svc.cluster.local" serviceName = "parseable-query-svc" } @@ -143,7 +143,7 @@ func printSuccessBanner(namespace, deployment, version, username, password strin fmt.Printf("%s Deployment Details:\n", common.Blue+"ℹ️ ") fmt.Printf(" • Namespace: %s\n", common.Blue+namespace) fmt.Printf(" • Chart Version: %s\n", common.Blue+version) - fmt.Printf(" • Ingestion URL: %s\n", ingestionUrl) + fmt.Printf(" • Ingestion URL: %s\n", ingestionURL) fmt.Println("\n" + common.Blue + "🔗 Resources:" + common.Reset) fmt.Println(common.Blue + " • Documentation: https://www.parseable.com/docs/server/introduction") @@ -155,16 +155,14 @@ func printSuccessBanner(namespace, deployment, version, username, password strin localPort := "8000" fmt.Printf(common.Green+"Port-forwarding %s service on port %s...\n"+common.Reset, serviceName, localPort) - err = startPortForward(namespace, serviceName, "80", localPort) - if err != nil { - fmt.Errorf(common.Red+"failed to port-forward service: %w", err) + if err = startPortForward(namespace, serviceName, "80", localPort); err != nil { + fmt.Printf(common.Red+"failed to port-forward service: %s", err.Error()) } // Redirect to UI localURL := fmt.Sprintf("http://localhost:%s/login?q=%s", localPort, base64EncodedString) fmt.Printf(common.Green+"Opening Parseable UI at %s\n"+common.Reset, localURL) openBrowser(localURL) - } func createDeploymentSpinner(namespace string) *spinner.Spinner { diff --git a/pkg/installer/installer.go b/pkg/installer/installer.go index 0141666..cc83a07 100644 --- a/pkg/installer/installer.go +++ b/pkg/installer/installer.go @@ -9,9 +9,10 @@ import ( "log" "os" "path/filepath" - "pb/pkg/common" "strings" + "pb/pkg/common" + "github.com/manifoldco/promptui" yamlv2 "gopkg.in/yaml.v2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -27,19 +28,12 @@ import ( // Installer orchestrates the installation process func Installer(_ Plan) (values *ValuesHolder, chartValues []string) { - - clusterName, err := promptK8sContext() - if err != nil { + if _, err := promptK8sContext(); err != nil { log.Fatalf("Failed to prompt for kubernetes context: %v", err) } - fmt.Printf(common.Yellow+"Kubernetes context set to cluster name %s: "+common.Reset+"\n", clusterName) - - // Prompt for deployment options - deployment, deployValues, err := promptDeploymentType(chartValues) - if err != nil { - log.Fatalf("Failed to prompt for deployment options: %v", err) - } + // pb supports only distributed deployments + chartValues = append(chartValues, "parseable.highAvailability.enabled=true") // Prompt for namespace and credentials pbSecret, err := promptNamespaceAndCredentials() @@ -48,7 +42,7 @@ func Installer(_ Plan) (values *ValuesHolder, chartValues []string) { } // Prompt for agent deployment - agent, agentValues, err := promptAgentDeployment(deployValues, deployment, pbSecret.Namespace) + agent, agentValues, err := promptAgentDeployment(chartValues, distributed, pbSecret.Namespace) if err != nil { log.Fatalf("Failed to prompt for agent deployment: %v", err) } @@ -65,12 +59,12 @@ func Installer(_ Plan) (values *ValuesHolder, chartValues []string) { log.Fatalf("Failed to prompt for object store configuration: %v", err) } - if err := applyParseableSecret(*&pbSecret, store, objectStoreConfig); err != nil { + if err := applyParseableSecret(pbSecret, store, objectStoreConfig); err != nil { log.Fatalf("Failed to apply secret object store configuration: %v", err) } valuesHolder := ValuesHolder{ - DeploymentType: deploymentType(deployment), + DeploymentType: distributed, ObjectStoreConfig: objectStoreConfig, LoggingAgent: loggingAgent(agent), ParseableSecret: *pbSecret, @@ -103,26 +97,6 @@ func promptStorageClass() (string, error) { return storageClass, nil } -// promptIngestorCount prompts the user to enter a ingestor counts -// func promptIngestorCount() (string, error) { -// // Prompt user for storage class -// fmt.Print(common.Yellow + "Enter the kubernetes ingestor count: " + common.Reset) -// reader := bufio.NewReader(os.Stdin) -// ingestorCount, err := reader.ReadString('\n') -// if err != nil { -// return "", fmt.Errorf("failed to read ingestor count class: %w", err) -// } - -// ingestorCount = strings.TrimSpace(ingestorCount) - -// // Validate that the ingestorCount is not empty -// if ingestorCount == "" { -// return "", fmt.Errorf("ingestor count cannot be empty") -// } - -// return ingestorCount, nil -// } - // promptNamespaceAndCredentials prompts the user for namespace and credentials func promptNamespaceAndCredentials() (*ParseableSecret, error) { // Prompt user for namespace @@ -159,7 +133,6 @@ func promptNamespaceAndCredentials() (*ParseableSecret, error) { // applyParseableSecret creates and applies the Kubernetes secret func applyParseableSecret(ps *ParseableSecret, store ObjectStore, objectStoreConfig ObjectStoreConfig) error { - var secretManifest string if store == LocalStore { secretManifest = getParseableSecretLocal(ps) @@ -181,7 +154,6 @@ func applyParseableSecret(ps *ParseableSecret, store ObjectStore, objectStoreCon } func getParseableSecretBlob(ps *ParseableSecret, objectStore ObjectStoreConfig) string { - // Create the Secret manifest secretManifest := fmt.Sprintf(` apiVersion: v1 @@ -217,7 +189,6 @@ data: } func getParseableSecretS3(ps *ParseableSecret, objectStore ObjectStoreConfig) string { - // Create the Secret manifest secretManifest := fmt.Sprintf(` apiVersion: v1 @@ -254,7 +225,6 @@ data: } func getParseableSecretGcs(ps *ParseableSecret, objectStore ObjectStoreConfig) string { - // Create the Secret manifest secretManifest := fmt.Sprintf(` apiVersion: v1 @@ -291,7 +261,6 @@ data: } func getParseableSecretLocal(ps *ParseableSecret) string { - // Create the Secret manifest secretManifest := fmt.Sprintf(` apiVersion: v1 @@ -319,11 +288,16 @@ data: } // promptAgentDeployment prompts the user for agent deployment options -func promptAgentDeployment(chartValues []string, deployment, namespace string) (string, []string, error) { +func promptAgentDeployment(chartValues []string, deployment deploymentType, namespace string) (string, []string, error) { // Prompt for Agent Deployment type promptAgentSelect := promptui.Select{ - Label: fmt.Sprintf(common.Yellow + "Deploy logging agent"), Items: []string{string(fluentbit), string(vector), "I have my agent running / I'll set up later"}, + Templates: &promptui.SelectTemplates{ + Label: "{{ `Logging agent` | yellow }}", + Active: "▸ {{ . | yellow }} ", // Yellow arrow and context name for active selection + Inactive: " {{ . | yellow }}", // Default color for inactive items + Selected: "{{ `Selected option:` | green }} '{{ . | green }}' ✔ ", + }, } _, agentDeploymentType, err := promptAgentSelect.Run() if err != nil { @@ -333,9 +307,9 @@ func promptAgentDeployment(chartValues []string, deployment, namespace string) ( if agentDeploymentType == string(vector) { chartValues = append(chartValues, "vector.enabled=true") } else if agentDeploymentType == string(fluentbit) { - if deployment == string(standalone) { + if deployment == standalone { chartValues = append(chartValues, "fluent-bit.serverHost=parseable."+namespace+".svc.cluster.local") - } else if deployment == string(deployment) { + } else if deployment == distributed { chartValues = append(chartValues, "fluent-bit.serverHost=parseable-ingestor-service."+namespace+".svc.cluster.local") } chartValues = append(chartValues, "fluent-bit.enabled=true") @@ -344,45 +318,17 @@ func promptAgentDeployment(chartValues []string, deployment, namespace string) ( return agentDeploymentType, chartValues, nil } -// promptDeploy prompts the user for deployment options -func promptDeploymentType(chartValues []string) (string, []string, error) { - // Prompt for Deployment Type - promptDeploy := promptui.Select{ - Label: fmt.Sprintf(common.Yellow + "Select deployment type"), - Items: []string{string(standalone), string(distributed)}, - } - _, deploymentType, err := promptDeploy.Run() - if err != nil { - return "", nil, fmt.Errorf("failed to prompt for deployment type: %w", err) - } - - var newChartValues []string - switch deploymentType { - case string(standalone): - newChartValues = []string{} - case string(distributed): - // ingestorCount, err := promptIngestorCount() - // if err != nil { - // return "", nil, fmt.Errorf("failed get ingestor count, err %s", err) - // } - newChartValues = []string{ - "parseable.highAvailability.enabled=true", - // "parseable.highAvailability.ingestor=" + ingestorCount, - } - default: - return "", nil, fmt.Errorf("invalid deployment type selected: %s", deploymentType) - } - - chartValues = append(chartValues, newChartValues...) - return deploymentType, chartValues, nil -} - // promptStore prompts the user for object store options func promptStore(chartValues []string) (ObjectStore, []string, error) { // Prompt for store type promptStore := promptui.Select{ - Label: "Select Object Store", - Items: []string{string(S3Store), string(LocalStore), string(BlobStore), string(GcsStore)}, + Templates: &promptui.SelectTemplates{ + Label: "{{ `Object store` | yellow }}", + Active: "▸ {{ . | yellow }} ", // Yellow arrow and context name for active selection + Inactive: " {{ . | yellow }}", // Default color for inactive items + Selected: "{{ `Selected object store:` | green }} '{{ . | green }}' ✔ ", + }, + Items: []string{string(S3Store), string(BlobStore), string(GcsStore)}, // local store not supported } _, promptStoreType, err := promptStore.Run() if err != nil { @@ -402,17 +348,17 @@ func promptStoreConfigs(store ObjectStore, chartValues []string) (ObjectStoreCon // Initialize a struct to hold store values var storeValues ObjectStoreConfig - // Store selected store type in chart values + fmt.Println(common.Green + "Configuring:" + common.Reset + " " + store) + // Store selected store type in chart values switch store { case S3Store: - fmt.Println(common.Green + "Configuring s3 store..." + common.Reset) storeValues.S3Store = S3{ - URL: promptForInput(common.Yellow + "Enter S3 URL: " + common.Reset), - AccessKey: promptForInput(common.Yellow + "Enter S3 Access Key: " + common.Reset), - SecretKey: promptForInput(common.Yellow + "Enter S3 Secret Key: " + common.Reset), - Bucket: promptForInput(common.Yellow + "Enter S3 Bucket: " + common.Reset), - Region: promptForInput(common.Yellow + "Enter S3 Region: " + common.Reset), + URL: promptForInput(common.Yellow + " Enter S3 URL: " + common.Reset), + AccessKey: promptForInput(common.Yellow + " Enter S3 Access Key: " + common.Reset), + SecretKey: promptForInput(common.Yellow + " Enter S3 Secret Key: " + common.Reset), + Bucket: promptForInput(common.Yellow + " Enter S3 Bucket: " + common.Reset), + Region: promptForInput(common.Yellow + " Enter S3 Region: " + common.Reset), } sc, err := promptStorageClass() if err != nil { @@ -424,30 +370,15 @@ func promptStoreConfigs(store ObjectStore, chartValues []string) (ObjectStoreCon chartValues = append(chartValues, "parseable.s3ModeSecret.enabled=true") chartValues = append(chartValues, "parseable.persistence.staging.enabled=true") chartValues = append(chartValues, "parseable.persistence.staging.storageClass="+sc) - return storeValues, chartValues, nil - case LocalStore: - fmt.Println(common.Green + "Configuring local store..." + common.Reset) - sc, err := promptStorageClass() - if err != nil { - log.Fatalf("Failed to prompt for storage class: %v", err) - } - storeValues.StorageClass = sc - storeValues.ObjectStore = LocalStore - chartValues = append(chartValues, "parseable.store="+string(LocalStore)) - chartValues = append(chartValues, "parseable.localModeSecret.enabled=true") - chartValues = append(chartValues, "parseable.persistence.staging.enabled=true") - chartValues = append(chartValues, "parseable.persistence.staging.storageClass="+sc) - return storeValues, chartValues, nil case BlobStore: - fmt.Println(common.Green + "Configuring blob store..." + common.Reset) sc, err := promptStorageClass() if err != nil { log.Fatalf("Failed to prompt for storage class: %v", err) } storeValues.BlobStore = Blob{ - URL: promptForInput(common.Yellow + "Enter Blob URL: " + common.Reset), - Container: promptForInput(common.Yellow + "Enter Blob Container: " + common.Reset), + URL: promptForInput(common.Yellow + " Enter Blob URL: " + common.Reset), + Container: promptForInput(common.Yellow + " Enter Blob Container: " + common.Reset), } storeValues.StorageClass = sc storeValues.ObjectStore = BlobStore @@ -457,17 +388,16 @@ func promptStoreConfigs(store ObjectStore, chartValues []string) (ObjectStoreCon chartValues = append(chartValues, "parseable.persistence.staging.storageClass="+sc) return storeValues, chartValues, nil case GcsStore: - fmt.Println(common.Green + "Configuring gcs store..." + common.Reset) sc, err := promptStorageClass() if err != nil { log.Fatalf("Failed to prompt for storage class: %v", err) } storeValues.GCSStore = GCS{ - URL: promptForInput(common.Yellow + "Enter GCS URL: " + common.Reset), - AccessKey: promptForInput(common.Yellow + "Enter GCS Access Key: " + common.Reset), - SecretKey: promptForInput(common.Yellow + "Enter GCS Secret Key: " + common.Reset), - Bucket: promptForInput(common.Yellow + "Enter GCS Bucket: " + common.Reset), - Region: promptForInput(common.Yellow + "Enter GCS Region: " + common.Reset), + URL: promptForInput(common.Yellow + " Enter GCS URL: " + common.Reset), + AccessKey: promptForInput(common.Yellow + " Enter GCS Access Key: " + common.Reset), + SecretKey: promptForInput(common.Yellow + " Enter GCS Secret Key: " + common.Reset), + Bucket: promptForInput(common.Yellow + " Enter GCS Bucket: " + common.Reset), + Region: promptForInput(common.Yellow + " Enter GCS Region: " + common.Reset), } storeValues.StorageClass = sc storeValues.ObjectStore = GcsStore @@ -585,7 +515,7 @@ func promptForInput(label string) string { func writeParseableConfig(valuesHolder *ValuesHolder) error { // Create config directory configDir := filepath.Join(os.Getenv("HOME"), ".parseable") - if err := os.MkdirAll(configDir, 0755); err != nil { + if err := os.MkdirAll(configDir, 0o755); err != nil { return fmt.Errorf("failed to create config directory: %w", err) } @@ -599,7 +529,7 @@ func writeParseableConfig(valuesHolder *ValuesHolder) error { } // Write config file - if err := os.WriteFile(configPath, configBytes, 0644); err != nil { + if err := os.WriteFile(configPath, configBytes, 0o644); err != nil { return fmt.Errorf("failed to write config file: %w", err) } @@ -629,12 +559,12 @@ func promptK8sContext() (clusterName string, err error) { // Prompt user to select Kubernetes context promptK8s := promptui.Select{ - Label: "\033[32mSelect your Kubernetes context:\033[0m", Items: contexts, Templates: &promptui.SelectTemplates{ - Active: "\033[33m▸ {{ . }}\033[0m", // Yellow arrow and context name for active selection - Inactive: "{{ . }}", // Default color for inactive items - Selected: "\033[32mKubernetes context '{{ . }}' selected successfully.\033[0m", + Label: "{{ `Select your Kubernetes context` | yellow }}", + Active: "▸ {{ . | yellow }} ", // Yellow arrow and context name for active selection + Inactive: " {{ . | yellow }}", // Default color for inactive items + Selected: "{{ `Selected Kubernetes context:` | green }} '{{ . | green }}' ✔", }, } diff --git a/pkg/installer/plans.go b/pkg/installer/plans.go index 652862a..e8c6dd6 100644 --- a/pkg/installer/plans.go +++ b/pkg/installer/plans.go @@ -2,6 +2,7 @@ package installer import ( "fmt" + "pb/pkg/common" "github.com/manifoldco/promptui" @@ -21,7 +22,7 @@ type Plan struct { var Plans = map[string]Plan{ "Small": { Name: "Small", - IngestionSpeed: "1,000 events/sec", + IngestionSpeed: "1000 events/sec", PerDayIngestion: "~10GB", QueryPerformance: "Basic performance", CPUAndMemorySpecs: "2 CPUs, 4GB RAM", @@ -60,25 +61,25 @@ func PromptUserPlanSelection() (Plan, error) { Label: "{{ . }}", Active: "▶ {{ .Name | yellow }} ({{ .IngestionSpeed | cyan }})", Inactive: " {{ .Name | yellow }} ({{ .IngestionSpeed | cyan }})", - Selected: "✔ {{ .Name | green }}", + Selected: "{{ `Selected plan:` | green }} '{{ .Name | green }}' ✔ ", Details: ` --------- Plan Details ---------- {{ "Plan:" | faint }} {{ .Name }} -{{ "Ingestion Speed:" | faint }} {{ .IngestionSpeed }} -{{ "Per Day Ingestion:" | faint }} {{ .PerDayIngestion }} -{{ "Query Performance:" | faint }} {{ .QueryPerformance }} -{{ "CPU & Memory:" | faint }} {{ .CPUAndMemorySpecs }}`, + {{ "Ingestion Speed:" | faint }} {{ .IngestionSpeed }} + {{ "Per Day Ingestion:" | faint }} {{ .PerDayIngestion }} + {{ "Query Performance:" | faint }} {{ .QueryPerformance }} + {{ "CPU & Memory:" | faint }} {{ .CPUAndMemorySpecs }}`, } prompt := promptui.Select{ - Label: fmt.Sprintf(common.Yellow + "Select Deployment Plan"), + Label: fmt.Sprintf(common.Yellow + "Select deployment type"), Items: planList, Templates: templates, } index, _, err := prompt.Run() if err != nil { - return Plan{}, fmt.Errorf("failed to select deployment plan: %w", err) + return Plan{}, fmt.Errorf("failed to select deployment type: %w", err) } return planList[index], nil From c75a391f7256574ceb95bffc563c0cce78f20b15 Mon Sep 17 00:00:00 2001 From: AdheipSingh <34169002+AdheipSingh@users.noreply.github.com> Date: Tue, 17 Dec 2024 11:00:46 +0530 Subject: [PATCH 2/3] add stream info to the output (#78) --- cmd/stream.go | 72 ++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 62 insertions(+), 10 deletions(-) diff --git a/cmd/stream.go b/cmd/stream.go index 7dcef13..5afa8cc 100644 --- a/cmd/stream.go +++ b/cmd/stream.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "io" + "net/http" internalHTTP "pb/pkg/http" "strconv" "strings" @@ -200,6 +201,14 @@ var StatStreamCmd = &cobra.Command{ return err } + // Fetch stream type + streamType, err := fetchInfo(&client, name) + if err != nil { + // Capture error + cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error()) + return err + } + // Check output format output, _ := cmd.Flags().GetString("output") if output == "json" { @@ -211,8 +220,9 @@ var StatStreamCmd = &cobra.Command{ "storage_size": humanize.Bytes(uint64(storageSize)), "compression_ratio": fmt.Sprintf("%.2f%%", compressionRatio), }, - "retention": retention, - "alerts": alertsData.Alerts, + "retention": retention, + "alerts": alertsData.Alerts, + "stream_type": streamType, } jsonData, err := json.MarshalIndent(data, "", " ") @@ -227,13 +237,13 @@ var StatStreamCmd = &cobra.Command{ isRetentionSet := len(retention) > 0 isAlertsSet := len(alertsData.Alerts) > 0 + // Render the info section with consistent alignment fmt.Println(StyleBold.Render("\nInfo:")) - fmt.Printf(" Event Count: %d\n", ingestionCount) - fmt.Printf(" Ingestion Size: %s\n", humanize.Bytes(uint64(ingestionSize))) - fmt.Printf(" Storage Size: %s\n", humanize.Bytes(uint64(storageSize))) - fmt.Printf( - " Compression Ratio: %.2f%s\n", - compressionRatio, "%") + fmt.Printf(" %-18s %d\n", "Event Count:", ingestionCount) + fmt.Printf(" %-18s %s\n", "Ingestion Size:", humanize.Bytes(uint64(ingestionSize))) + fmt.Printf(" %-18s %s\n", "Storage Size:", humanize.Bytes(uint64(storageSize))) + fmt.Printf(" %-18s %.2f%s\n", "Compression Ratio:", compressionRatio, "%") + fmt.Printf(" %-18s %s\n", "Stream Type:", streamType) fmt.Println() if isRetentionSet { @@ -412,7 +422,7 @@ func fetchStats(client *internalHTTP.HTTPClient, name string) (data StreamStatsD } func fetchRetention(client *internalHTTP.HTTPClient, name string) (data StreamRetentionData, err error) { - req, err := client.NewRequest("GET", fmt.Sprintf("logstream/%s/retention", name), nil) + req, err := client.NewRequest(http.MethodGet, fmt.Sprintf("logstream/%s/retention", name), nil) if err != nil { return } @@ -439,7 +449,7 @@ func fetchRetention(client *internalHTTP.HTTPClient, name string) (data StreamRe } func fetchAlerts(client *internalHTTP.HTTPClient, name string) (data AlertConfig, err error) { - req, err := client.NewRequest("GET", fmt.Sprintf("logstream/%s/alert", name), nil) + req, err := client.NewRequest(http.MethodGet, fmt.Sprintf("logstream/%s/alert", name), nil) if err != nil { return } @@ -464,3 +474,45 @@ func fetchAlerts(client *internalHTTP.HTTPClient, name string) (data AlertConfig } return } + +func fetchInfo(client *internalHTTP.HTTPClient, name string) (streamType string, err error) { + // Create a new HTTP GET request + req, err := client.NewRequest(http.MethodGet, fmt.Sprintf("logstream/%s/info", name), nil) + if err != nil { + return "", fmt.Errorf("failed to create request: %w", err) + } + + // Execute the request + resp, err := client.Client.Do(req) + if err != nil { + return "", fmt.Errorf("request execution failed: %w", err) + } + defer resp.Body.Close() + + // Read the response body + bytes, err := io.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("failed to read response body: %w", err) + } + + // Check for successful status code + if resp.StatusCode == http.StatusOK { + // Define a struct to parse the response + var response struct { + StreamType string `json:"stream_type"` + } + + // Unmarshal JSON into the struct + if err := json.Unmarshal(bytes, &response); err != nil { + return "", fmt.Errorf("failed to unmarshal response: %w", err) + } + + // Return the extracted stream_type + return response.StreamType, nil + } + + // Handle non-200 responses + body := string(bytes) + errMsg := fmt.Sprintf("Request failed\nStatus Code: %d\nResponse: %s\n", resp.StatusCode, body) + return "", errors.New(errMsg) +} From f86d7ef44e8283f751f256baef7f3232e12c188a Mon Sep 17 00:00:00 2001 From: AdheipSingh <34169002+AdheipSingh@users.noreply.github.com> Date: Tue, 17 Dec 2024 11:01:39 +0530 Subject: [PATCH 3/3] generate static schema (#76) add a command to generate static schema --- cmd/generate.go | 161 ++++++++++++++++++++++++++++++++++++++++++++++++ main.go | 30 +++++++++ 2 files changed, 191 insertions(+) create mode 100644 cmd/generate.go diff --git a/cmd/generate.go b/cmd/generate.go new file mode 100644 index 0000000..87f1ed2 --- /dev/null +++ b/cmd/generate.go @@ -0,0 +1,161 @@ +package cmd + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "pb/pkg/common" + internalHTTP "pb/pkg/http" + + "github.com/spf13/cobra" +) + +const ( + generateStaticSchemaPath = "/logstream/schema/detect" +) + +var GenerateSchemaCmd = &cobra.Command{ + Use: "generate", + Short: "Generate Schema for JSON", + Example: "pb schema generate --file=test.json", + RunE: func(cmd *cobra.Command, _ []string) error { + // Get the file path from the `--file` flag + filePath, err := cmd.Flags().GetString("file") + if err != nil { + return fmt.Errorf(common.Red+"failed to read file flag: %w"+common.Reset, err) + } + + if filePath == "" { + return fmt.Errorf(common.Red + "file flag is required" + common.Reset) + } + + // Read the file content + fileContent, err := os.ReadFile(filePath) + if err != nil { + return fmt.Errorf(common.Red+"failed to read file %s: %w"+common.Reset, filePath, err) + } + + // Initialize HTTP client + client := internalHTTP.DefaultClient(&DefaultProfile) + + // Create the HTTP request + req, err := client.NewRequest(http.MethodPost, generateStaticSchemaPath, bytes.NewBuffer(fileContent)) + if err != nil { + return fmt.Errorf(common.Red+"failed to create new request: %w"+common.Reset, err) + } + + // Set Content-Type header + req.Header.Set("Content-Type", "application/json") + + // Execute the request + resp, err := client.Client.Do(req) + if err != nil { + return fmt.Errorf(common.Red+"request execution failed: %w"+common.Reset, err) + } + defer resp.Body.Close() + + // Check for non-200 status codes + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + fmt.Printf(common.Red+"Error response: %s\n"+common.Reset, string(body)) + return fmt.Errorf(common.Red+"non-200 status code received: %s"+common.Reset, resp.Status) + } + + // Parse and print the response + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf(common.Red+"failed to read response body: %w"+common.Reset, err) + } + + var prettyJSON bytes.Buffer + if err := json.Indent(&prettyJSON, respBody, "", " "); err != nil { + return fmt.Errorf(common.Red+"failed to format response as JSON: %w"+common.Reset, err) + } + + fmt.Println(common.Green + prettyJSON.String() + common.Reset) + return nil + }, +} + +var CreateSchemaCmd = &cobra.Command{ + Use: "create", + Short: "Create Schema for a Parseable stream", + Example: "pb schema create --stream=my_stream --file=schema.json", + RunE: func(cmd *cobra.Command, _ []string) error { + // Get the stream name from the `--stream` flag + streamName, err := cmd.Flags().GetString("stream") + if err != nil { + return fmt.Errorf(common.Red+"failed to read stream flag: %w"+common.Reset, err) + } + + if streamName == "" { + return fmt.Errorf(common.Red + "stream flag is required" + common.Reset) + } + + // Get the file path from the `--file` flag + filePath, err := cmd.Flags().GetString("file") + if err != nil { + return fmt.Errorf(common.Red+"failed to read config flag: %w"+common.Reset, err) + } + + if filePath == "" { + return fmt.Errorf(common.Red + "file path flag is required" + common.Reset) + } + + // Read the JSON schema file + schemaContent, err := os.ReadFile(filePath) + if err != nil { + return fmt.Errorf(common.Red+"failed to read schema file %s: %w"+common.Reset, filePath, err) + } + + // Initialize HTTP client + client := internalHTTP.DefaultClient(&DefaultProfile) + + // Construct the API path + apiPath := fmt.Sprintf("/logstream/%s", streamName) + + // Create the HTTP PUT request + req, err := client.NewRequest(http.MethodPut, apiPath, bytes.NewBuffer(schemaContent)) + if err != nil { + return fmt.Errorf(common.Red+"failed to create new request: %w"+common.Reset, err) + } + + // Set custom headers + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-P-Static-Schema-Flag", "true") + + // Execute the request + resp, err := client.Client.Do(req) + if err != nil { + return fmt.Errorf(common.Red+"request execution failed: %w"+common.Reset, err) + } + defer resp.Body.Close() + + // Check for non-200 status codes + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + fmt.Printf(common.Red+"Error response: %s\n"+common.Reset, string(body)) + return fmt.Errorf(common.Red+"non-200 status code received: %s"+common.Reset, resp.Status) + } + + // Parse and print the response + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf(common.Red+"failed to read response body: %w"+common.Reset, err) + } + + fmt.Println(common.Green + string(respBody) + common.Reset) + return nil + }, +} + +func init() { + // Add the `--file` flag to the command + GenerateSchemaCmd.Flags().StringP("file", "f", "", "Path to the JSON file to generate schema") + CreateSchemaCmd.Flags().StringP("stream", "s", "", "Name of the stream to associate with the schema") + CreateSchemaCmd.Flags().StringP("file", "f", "", "Path to the JSON file to create schema") + +} diff --git a/main.go b/main.go index c6c528d..6ef86c4 100644 --- a/main.go +++ b/main.go @@ -92,6 +92,32 @@ var profile = &cobra.Command{ }, } +var schema = &cobra.Command{ + Use: "schema", + Short: "Generate or create schemas for JSON data or Parseable streams", + Long: `The "schema" command allows you to either: + - Generate a schema automatically from a JSON file for analysis or integration. + - Create a custom schema for Parseable streams (PB streams) to structure and process your data. + +Examples: + - To generate a schema from a JSON file: + pb schema generate --file=data.json + - To create a schema for a PB stream: + pb schema create --stream-name=my_stream --config=data.json +`, + PersistentPreRunE: combinedPreRun, + PersistentPostRun: func(cmd *cobra.Command, args []string) { + if os.Getenv("PB_ANALYTICS") == "disable" { + return + } + wg.Add(1) + go func() { + defer wg.Done() + analytics.PostRunAnalytics(cmd, "generate", args) + }() + }, +} + var user = &cobra.Command{ Use: "user", Short: "Manage users", @@ -200,6 +226,9 @@ func main() { query.AddCommand(pb.QueryCmd) query.AddCommand(pb.SavedQueryList) + schema.AddCommand(pb.GenerateSchemaCmd) + schema.AddCommand(pb.CreateSchemaCmd) + install.AddCommand(pb.InstallOssCmd) cli.AddCommand(profile) @@ -211,6 +240,7 @@ func main() { cli.AddCommand(pb.AutocompleteCmd) cli.AddCommand(install) + cli.AddCommand(schema) // Set as command pb.VersionCmd.Run = func(_ *cobra.Command, _ []string) {