From 3678e1df1459c7f4310cd9f950e98e5070162018 Mon Sep 17 00:00:00 2001 From: vijeyash Date: Tue, 14 May 2024 16:17:50 +0530 Subject: [PATCH] added nats sdk --- agent/kubviz/k8smetrics_agent.go | 41 ++------- .../plugins/events/event_metrics_utils.go | 17 ++-- .../plugins/kuberhealthy/kuberhealthy.go | 15 ++-- pkg/nats/sdk/client.go | 68 +++++++++++++++ pkg/nats/sdk/config.go | 28 ++++++ pkg/nats/sdk/utils.go | 77 +++++++++++++++++ sdk/example/main.go | 60 ------------- sdk/pkg/clickhouse/client.go | 28 ------ sdk/pkg/clickhouse/config.go | 21 ----- sdk/pkg/clickhouse/utils.go | 72 ---------------- sdk/pkg/nats/client.go | 36 -------- sdk/pkg/nats/config.go | 19 ----- sdk/pkg/nats/utils.go | 85 ------------------- sdk/pkg/sdk/clickhouse_insert.go | 10 --- sdk/pkg/sdk/listdata.go | 10 --- sdk/pkg/sdk/nats_consumer.go | 10 --- sdk/pkg/sdk/nats_publisher.go | 9 -- sdk/pkg/sdk/nats_stream.go | 9 -- sdk/pkg/sdk/sdk.go | 38 --------- 19 files changed, 197 insertions(+), 456 deletions(-) create mode 100644 pkg/nats/sdk/client.go create mode 100644 pkg/nats/sdk/config.go create mode 100644 pkg/nats/sdk/utils.go delete mode 100644 sdk/example/main.go delete mode 100644 sdk/pkg/clickhouse/client.go delete mode 100644 sdk/pkg/clickhouse/config.go delete mode 100644 sdk/pkg/clickhouse/utils.go delete mode 100644 sdk/pkg/nats/client.go delete mode 100644 sdk/pkg/nats/config.go delete mode 100644 sdk/pkg/nats/utils.go delete mode 100644 sdk/pkg/sdk/clickhouse_insert.go delete mode 100644 sdk/pkg/sdk/listdata.go delete mode 100644 sdk/pkg/sdk/nats_consumer.go delete mode 100644 sdk/pkg/sdk/nats_publisher.go delete mode 100644 sdk/pkg/sdk/nats_stream.go delete mode 100644 sdk/pkg/sdk/sdk.go diff --git a/agent/kubviz/k8smetrics_agent.go b/agent/kubviz/k8smetrics_agent.go index d0c0c7f6..747e0021 100644 --- a/agent/kubviz/k8smetrics_agent.go +++ b/agent/kubviz/k8smetrics_agent.go @@ -10,11 +10,11 @@ import ( //"github.com/go-co-op/gocron" "github.com/go-co-op/gocron" - "github.com/nats-io/nats.go" + "github.com/intelops/kubviz/constants" + "github.com/intelops/kubviz/pkg/nats/sdk" "context" - "github.com/intelops/kubviz/pkg/mtlsnats" "github.com/intelops/kubviz/pkg/opentelemetry" "k8s.io/client-go/kubernetes" @@ -56,8 +56,6 @@ const ( // nats token, natsurl, clustername var ( ClusterName string = os.Getenv("CLUSTER_NAME") - token string = os.Getenv("NATS_TOKEN") - natsurl string = os.Getenv("NATS_ADDRESS") //for local testing provide the location of kubeconfig cluster_conf_loc string = os.Getenv("CONFIG_LOCATION") @@ -77,36 +75,13 @@ func main() { clientset *kubernetes.Clientset ) - var mtlsConfig mtlsnats.MtlsConfig - var nc *nats.Conn - - if mtlsConfig.IsEnabled { - tlsConfig, err := mtlsnats.GetTlsConfig() - if err != nil { - log.Println("error while getting tls config ", err) - time.Sleep(time.Minute * 30) - } else { - nc, err = nats.Connect( - natsurl, - nats.Name("K8s Metrics"), - nats.Token(token), - nats.Secure(tlsConfig), - ) - if err != nil { - log.Println("error while connecting with mtls ", err) - } - } + natsCli, err := sdk.NewNATSClient() + if err != nil { + log.Fatalf("error occured while creating nats client %v", err.Error()) } - if nc == nil { - nc, err = nats.Connect(natsurl, nats.Name("K8s Metrics"), nats.Token(token)) - events.CheckErr(err) - } - js, err := nc.JetStream() - events.CheckErr(err) - err = events.CreateStream(js) - events.CheckErr(err) + natsCli.CreateStream(constants.StreamName) if env != Production { config, err = clientcmd.BuildConfigFromFlags("", cluster_conf_loc) if err != nil { @@ -131,9 +106,9 @@ func main() { } }() - go events.PublishMetrics(clientset, js, clusterMetricsChan) + go events.PublishMetrics(clientset, natsCli, clusterMetricsChan) if cfg.KuberHealthyEnable { - go kuberhealthy.StartKuberHealthy(js) + go kuberhealthy.StartKuberHealthy(natsCli) } go server.StartServer() collectAndPublishMetrics := func() { diff --git a/agent/kubviz/plugins/events/event_metrics_utils.go b/agent/kubviz/plugins/events/event_metrics_utils.go index 17ef114f..4f780fe4 100644 --- a/agent/kubviz/plugins/events/event_metrics_utils.go +++ b/agent/kubviz/plugins/events/event_metrics_utils.go @@ -13,6 +13,7 @@ import ( "github.com/intelops/kubviz/constants" "github.com/intelops/kubviz/model" + "github.com/intelops/kubviz/pkg/nats/sdk" "github.com/intelops/kubviz/pkg/opentelemetry" "github.com/nats-io/nats.go" "go.opentelemetry.io/otel" @@ -29,7 +30,7 @@ var ClusterName string = os.Getenv("CLUSTER_NAME") // publishMetrics publishes stream of events // with subject "METRICS.created" -func PublishMetrics(clientset *kubernetes.Clientset, js nats.JetStreamContext, errCh chan error) { +func PublishMetrics(clientset *kubernetes.Clientset, natsCli *sdk.NATSClient, errCh chan error) { ctx := context.Background() tracer := otel.Tracer("kubviz-publish-metrics") @@ -37,11 +38,11 @@ func PublishMetrics(clientset *kubernetes.Clientset, js nats.JetStreamContext, e span.SetAttributes(attribute.String("kubviz-agent", "publish-metrics")) defer span.End() - watchK8sEvents(clientset, js) + watchK8sEvents(clientset, natsCli) errCh <- nil } -func publishK8sMetrics(id string, mtype string, mdata *v1.Event, js nats.JetStreamContext, imageName string) (bool, error) { +func publishK8sMetrics(id string, mtype string, mdata *v1.Event, natsCli *sdk.NATSClient, imageName string) (bool, error) { ctx := context.Background() tracer := otel.Tracer("kubviz-publish-k8smetrics") @@ -57,7 +58,7 @@ func publishK8sMetrics(id string, mtype string, mdata *v1.Event, js nats.JetStre ImageName: imageName, } metricsJson, _ := json.Marshal(metrics) - _, err := js.Publish(constants.EventSubject, metricsJson) + err := natsCli.Publish(constants.EventSubject, metricsJson) if err != nil { return true, err } @@ -164,7 +165,7 @@ func LogErr(err error) { log.Println(err) } } -func watchK8sEvents(clientset *kubernetes.Clientset, js nats.JetStreamContext) { +func watchK8sEvents(clientset *kubernetes.Clientset, natsCli *sdk.NATSClient) { ctx := context.Background() tracer := otel.Tracer("kubviz-watch-k8sevents") @@ -191,7 +192,7 @@ func watchK8sEvents(clientset *kubernetes.Clientset, js nats.JetStreamContext) { return } for _, image := range images { - publishK8sMetrics(string(event.ObjectMeta.UID), "ADD", event, js, image) + publishK8sMetrics(string(event.ObjectMeta.UID), "ADD", event, natsCli, image) } }, DeleteFunc: func(obj interface{}) { @@ -202,7 +203,7 @@ func watchK8sEvents(clientset *kubernetes.Clientset, js nats.JetStreamContext) { return } for _, image := range images { - publishK8sMetrics(string(event.ObjectMeta.UID), "DELETE", event, js, image) + publishK8sMetrics(string(event.ObjectMeta.UID), "DELETE", event, natsCli, image) } }, UpdateFunc: func(oldObj, newObj interface{}) { @@ -213,7 +214,7 @@ func watchK8sEvents(clientset *kubernetes.Clientset, js nats.JetStreamContext) { return } for _, image := range images { - publishK8sMetrics(string(event.ObjectMeta.UID), "UPDATE", event, js, image) + publishK8sMetrics(string(event.ObjectMeta.UID), "UPDATE", event, natsCli, image) } }, }, diff --git a/agent/kubviz/plugins/kuberhealthy/kuberhealthy.go b/agent/kubviz/plugins/kuberhealthy/kuberhealthy.go index 2ae66ccd..def0299a 100644 --- a/agent/kubviz/plugins/kuberhealthy/kuberhealthy.go +++ b/agent/kubviz/plugins/kuberhealthy/kuberhealthy.go @@ -11,13 +11,14 @@ import ( "github.com/intelops/kubviz/agent/config" "github.com/intelops/kubviz/constants" + "github.com/intelops/kubviz/pkg/nats/sdk" "github.com/intelops/kubviz/pkg/opentelemetry" "github.com/kuberhealthy/kuberhealthy/v2/pkg/health" "github.com/nats-io/nats.go" "go.opentelemetry.io/otel" ) -func StartKuberHealthy(js nats.JetStreamContext) { +func StartKuberHealthy(natsCli *sdk.NATSClient) { khConfig, err := config.GetKuberHealthyConfig() if err != nil { log.Fatalf("Error getting Kuberhealthy config: %v", err) @@ -27,12 +28,12 @@ func StartKuberHealthy(js nats.JetStreamContext) { defer ticker.Stop() for range ticker.C { - if err := pollAndPublishKuberhealthy(khConfig.KuberhealthyURL, js); err != nil { + if err := pollAndPublishKuberhealthy(khConfig.KuberhealthyURL, natsCli); err != nil { log.Printf("Error polling and publishing Kuberhealthy metrics: %v", err) } } } -func pollAndPublishKuberhealthy(url string, js nats.JetStreamContext) error { +func pollAndPublishKuberhealthy(url string, natsCli *sdk.NATSClient) error { resp, err := http.Get(url) if err != nil { return fmt.Errorf("error making GET request to Kuberhealthy: %w", err) @@ -49,10 +50,10 @@ func pollAndPublishKuberhealthy(url string, js nats.JetStreamContext) error { return fmt.Errorf("error unmarshaling response: %w", err) } - return PublishKuberhealthyMetrics(js, state) + return PublishKuberhealthyMetrics(natsCli, state) } -func PublishKuberhealthyMetrics(js nats.JetStreamContext, state health.State) error { +func PublishKuberhealthyMetrics(natsCli *sdk.NATSClient, state health.State) error { ctx := context.Background() tracer := otel.Tracer("kuberhealthy") _, span := tracer.Start(opentelemetry.BuildContext(ctx), "PublishKuberhealthyMetrics") @@ -63,12 +64,10 @@ func PublishKuberhealthyMetrics(js nats.JetStreamContext, state health.State) er log.Printf("Error marshaling metrics of kuberhealthy %v", err) return err } - - if _, err := js.Publish(constants.KUBERHEALTHY_SUBJECT, metricsJSON); err != nil { + if err := natsCli.Publish(constants.KUBERHEALTHY_SUBJECT, metricsJSON); err != nil { log.Printf("Error publishing metrics for kuberhealthy %v", err) return err } - log.Printf("Kuberhealthy metrics have been published") return nil } diff --git a/pkg/nats/sdk/client.go b/pkg/nats/sdk/client.go new file mode 100644 index 00000000..7ab89537 --- /dev/null +++ b/pkg/nats/sdk/client.go @@ -0,0 +1,68 @@ +package sdk + +import ( + "errors" + "fmt" + "log" + + "github.com/nats-io/nats.go" +) + +type NATSClient struct { + conn *nats.Conn + js nats.JetStreamContext + config natsConfig +} + +func NewNATSClient() (*NATSClient, error) { + config, err := loadNatsConfig() + if err != nil { + return nil, errors.New("Unable to load the nats configurations , error :" + err.Error()) + } + options := []nats.Option{} + if config.EnableToken { + options = append(options, nats.Token(config.NatsToken)) + } + if config.MtlsConfig.IsEnabled { + tlsConfig, err := createTLSConfig(config.MtlsConfig) + if err != nil { + return nil, err + } + options = append(options, nats.Secure(tlsConfig)) + } + conn, err := nats.Connect(config.NatsAddress, options...) + if err != nil { + return nil, err + } + + js, err := conn.JetStream() + if err != nil { + return nil, err + } + + return &NATSClient{conn: conn, js: js, config: *config}, nil +} + +func (natsCli *NATSClient) CreateStream(streamName string) error { + stream, err := natsCli.js.StreamInfo(streamName) + log.Printf("Retrieved stream %s", fmt.Sprintf("%v", stream)) + if err != nil { + log.Printf("Error getting stream %s", err) + } + if stream == nil { + log.Printf("creating stream %q and subjects %q", streamName, streamName+".*") + _, err = natsCli.js.AddStream(&nats.StreamConfig{ + Name: streamName, + Subjects: []string{streamName + ".*"}, + }) + if err != nil { + return err + } + } + return nil +} + +func (natsCli *NATSClient) Publish(subject string, data []byte) error { + _, err := natsCli.js.Publish(subject, data) + return err +} diff --git a/pkg/nats/sdk/config.go b/pkg/nats/sdk/config.go new file mode 100644 index 00000000..13f4083e --- /dev/null +++ b/pkg/nats/sdk/config.go @@ -0,0 +1,28 @@ +package sdk + +import ( + "github.com/kelseyhightower/envconfig" + "github.com/pkg/errors" +) + +type natsConfig struct { + NatsAddress string `envconfig:"NATS_ADDRESS"` + NatsToken string `envconfig:"NATS_TOKEN"` + MtlsConfig mtlsConfig + EnableToken bool `envconfig:"ENABLE_TOKEN"` +} + +type mtlsConfig struct { + CertificateFilePath string `envconfig:"CERT_FILE" default:""` + KeyFilePath string `envconfig:"KEY_FILE" default:""` + CAFilePath string `envconfig:"CA_FILE" default:""` + IsEnabled bool `envconfig:"ENABLE_MTLS_NATS" default:"false"` +} + +func loadNatsConfig() (*natsConfig, error) { + natsConf := &natsConfig{} + if err := envconfig.Process("", natsConf); err != nil { + return nil, errors.WithStack(err) + } + return natsConf, nil +} diff --git a/pkg/nats/sdk/utils.go b/pkg/nats/sdk/utils.go new file mode 100644 index 00000000..0c0594e8 --- /dev/null +++ b/pkg/nats/sdk/utils.go @@ -0,0 +1,77 @@ +package sdk + +import ( + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "io" + "os" +) + +func createTLSConfig(config mtlsConfig) (*tls.Config, error) { + certPEM, keyPEM, CACertPEM, err := readMtlsCerts(config.CertificateFilePath, config.KeyFilePath, config.CAFilePath) + if err != nil { + return nil, errors.New("unable to read the mtls certificates error:" + err.Error()) + } + cert, err := tls.X509KeyPair(certPEM, keyPEM) + if err != nil { + return nil, fmt.Errorf("error loading X509 key pair from PEM: %w", err) + } + + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(CACertPEM) + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + InsecureSkipVerify: false, + } + return tlsConfig, nil +} + +func readMtlsCerts(certificateFilePath, keyFilePath, CAFilePath string) (certPEM, keyPEM, CACertPEM []byte, err error) { + certPEM, err = readMtlsFileContents(certificateFilePath) + if err != nil { + err = fmt.Errorf("error while reading cert file: %w", err) + return + } + + keyPEM, err = readMtlsFileContents(keyFilePath) + if err != nil { + err = fmt.Errorf("error while reading key file: %w", err) + return + } + + CACertPEM, err = readMtlsFileContents(CAFilePath) + if err != nil { + err = fmt.Errorf("error while reading CAcert file: %w", err) + return + } + + return + +} + +func openMtlsCertFile(filepath string) (f *os.File, err error) { + f, err = os.Open(filepath) + if err != nil { + return nil, fmt.Errorf("failed to open mtls certificate file: %w", err) + } + return f, nil +} + +func readMtlsFileContents(filePath string) ([]byte, error) { + file, err := openMtlsCertFile(filePath) + if err != nil { + return nil, err + } + + defer file.Close() + + contents, err := io.ReadAll(file) + if err != nil { + return nil, fmt.Errorf("error while reading file %s:%w", filePath, err) + } + + return contents, nil +} diff --git a/sdk/example/main.go b/sdk/example/main.go deleted file mode 100644 index d8c9e6dc..00000000 --- a/sdk/example/main.go +++ /dev/null @@ -1,60 +0,0 @@ -package main - -import ( - "fmt" - "log" - "time" - - "github.com/intelops/kubviz/sdk/pkg/clickhouse" - "github.com/intelops/kubviz/sdk/pkg/nats" - "github.com/intelops/kubviz/sdk/pkg/sdk" -) - -func main() { - natsConfig, err := nats.LoadConfig() - if err != nil { - log.Fatalf("Failed to load NATS config: %v", err) - } - - chConfig, err := clickhouse.LoadConfig() - if err != nil { - log.Fatalf("Failed to load ClickHouse config: %v", err) - } - - mySDK, err := sdk.New(natsConfig, chConfig) - if err != nil { - log.Fatalf("Failed to initialize SDK: %v", err) - } - streamName := "Simple" - streamSubjects := "Simple.*" - err = mySDK.CreateNatsStream(streamName, []string{streamSubjects}) - if err != nil { - fmt.Println("Error creating NATS Stream:", err) - return - } - - time.Sleep(2 * time.Second) - - data := map[string]interface{}{ - "key": "value", - "count": 42, - } - subject := "Simple.event" - err = mySDK.PublishToNats(subject, streamName, data) - if err != nil { - fmt.Println("Error publishing message to NATS:", err) - return - } - time.Sleep(2 * time.Second) - consumerName := "myConsumer" - err = mySDK.ConsumeNatsData(subject, consumerName) - if err != nil { - fmt.Println("Error creating NATS consumer:", err) - return - } - err = mySDK.ClickHouseInsertData("mytable", data) - if err != nil { - fmt.Println("Error while inserting data into nats:", err) - return - } -} diff --git a/sdk/pkg/clickhouse/client.go b/sdk/pkg/clickhouse/client.go deleted file mode 100644 index c4edb29c..00000000 --- a/sdk/pkg/clickhouse/client.go +++ /dev/null @@ -1,28 +0,0 @@ -// /pkg/clickhouse/client.go -package clickhouse - -import ( - "database/sql" - "fmt" - - _ "github.com/ClickHouse/clickhouse-go/v2" -) - -type Client struct { - db *sql.DB -} - -func NewClient(cfg *Config) (*Client, error) { - dataSourceName := fmt.Sprintf("tcp://%s:%d", cfg.DBAddress, cfg.DBPort) - - db, err := sql.Open("clickhouse", dataSourceName) - if err != nil { - return nil, err - } - - if err := db.Ping(); err != nil { - return nil, err - } - - return &Client{db: db}, nil -} diff --git a/sdk/pkg/clickhouse/config.go b/sdk/pkg/clickhouse/config.go deleted file mode 100644 index a56baa91..00000000 --- a/sdk/pkg/clickhouse/config.go +++ /dev/null @@ -1,21 +0,0 @@ -package clickhouse - -import ( - "github.com/kelseyhightower/envconfig" -) - -type Config struct { - DBAddress string `envconfig:"DB_ADDRESS" default:"localhost"` - DBPort int `envconfig:"DB_PORT" default:"9000"` - Username string `envconfig:"CLICKHOUSE_USERNAME"` - Password string `envconfig:"CLICKHOUSE_PASSWORD"` -} - -func LoadConfig() (*Config, error) { - var cfg Config - err := envconfig.Process("", &cfg) - if err != nil { - return nil, err - } - return &cfg, nil -} diff --git a/sdk/pkg/clickhouse/utils.go b/sdk/pkg/clickhouse/utils.go deleted file mode 100644 index 2525d5e4..00000000 --- a/sdk/pkg/clickhouse/utils.go +++ /dev/null @@ -1,72 +0,0 @@ -package clickhouse - -import ( - "context" - "errors" - "strings" - "time" -) - -func (c *Client) InsertData(tableName string, data interface{}) error { - ctx := context.Background() - - tx, err := c.db.Begin() - if err != nil { - return err - } - defer tx.Rollback() - - dataMap, ok := data.(map[string]interface{}) - if !ok { - return errors.New("data is not in the expected format") - } - - columns := make([]string, 0, len(dataMap)) - values := make([]interface{}, 0, len(dataMap)) - placeholders := make([]string, 0, len(dataMap)) - - for column, value := range dataMap { - columns = append(columns, column) - values = append(values, value) - placeholders = append(placeholders, "?") - } - - stmt, err := tx.PrepareContext(ctx, "INSERT INTO "+tableName+" ("+strings.Join(columns, ",")+") VALUES ("+strings.Join(placeholders, ",")+")") - if err != nil { - return err - } - defer stmt.Close() - - values = append(values, time.Now().UTC()) - - _, err = stmt.ExecContext(ctx, values...) - if err != nil { - return err - } - - return tx.Commit() -} - -func (c *Client) List(input interface{}) ([]map[string]interface{}, error) { - var dataList []map[string]interface{} - - inputMap, ok := input.(map[string]interface{}) - if !ok { - return nil, errors.New("input is not a map[string]interface{}") - } - - var traverse func(m map[string]interface{}) - traverse = func(m map[string]interface{}) { - dataList = append(dataList, m) - - for _, v := range m { - if subMap, ok := v.(map[string]interface{}); ok { - traverse(subMap) - } - } - } - - traverse(inputMap) - - return dataList, nil -} diff --git a/sdk/pkg/nats/client.go b/sdk/pkg/nats/client.go deleted file mode 100644 index b5c26a37..00000000 --- a/sdk/pkg/nats/client.go +++ /dev/null @@ -1,36 +0,0 @@ -// /pkg/nats/client.go -package nats - -import ( - "fmt" - "log" - "os" - - "github.com/nats-io/nats.go" -) - -type Client struct { - js nats.JetStreamContext - logger *log.Logger -} - -func NewClient(cfg *Config) (*Client, error) { - logger := log.New(os.Stdout, "", log.LstdFlags|log.Lshortfile) - - opts := []nats.Option{nats.Token(cfg.Token)} - - conn, err := nats.Connect(cfg.Address, opts...) - if err != nil { - return nil, fmt.Errorf("error connecting to NATS: %v", err) - } - - js, err := conn.JetStream() - if err != nil { - return nil, fmt.Errorf("error obtaining JetStream context: %v", err) - } - - return &Client{ - js: js, - logger: logger, - }, nil -} diff --git a/sdk/pkg/nats/config.go b/sdk/pkg/nats/config.go deleted file mode 100644 index 0fddfa96..00000000 --- a/sdk/pkg/nats/config.go +++ /dev/null @@ -1,19 +0,0 @@ -package nats - -import ( - "github.com/kelseyhightower/envconfig" -) - -type Config struct { - Address string `envconfig:"NATS_ADDRESS" default:"nats://localhost:4222"` - Token string `envconfig:"NATS_TOKEN"` -} - -func LoadConfig() (*Config, error) { - var cfg Config - err := envconfig.Process("", &cfg) - if err != nil { - return nil, err - } - return &cfg, nil -} diff --git a/sdk/pkg/nats/utils.go b/sdk/pkg/nats/utils.go deleted file mode 100644 index 36e144fb..00000000 --- a/sdk/pkg/nats/utils.go +++ /dev/null @@ -1,85 +0,0 @@ -package nats - -import ( - "encoding/json" - "fmt" - "log" - - "github.com/nats-io/nats.go" - "github.com/pkg/errors" -) - -func (client *Client) CreateStream(streamName string, streamSubjects []string) error { - js := client.js - - stream, err := js.StreamInfo(streamName) - if err != nil { - if err == nats.ErrStreamNotFound { - client.logger.Printf("Stream does not exist, creating: %s", streamName) - } else { - client.logger.Printf("Error getting stream: %s", err) - return err - } - } - - if stream != nil { - client.logger.Printf("Stream already exists: %s", fmt.Sprintf("%v", stream)) - return nil - } - client.logger.Printf("Creating stream %q with subjects %q", streamName, streamSubjects) - streamInfo, err := js.AddStream(&nats.StreamConfig{ - Name: streamName, - Subjects: streamSubjects, - }) - - if err != nil { - return errors.WithMessage(err, "Error creating stream") - } - fmt.Println(streamInfo) - return nil -} - -func (client *Client) Consumer(subject, consumerName string) (interface{}, error) { - js := client.js - var data interface{} - handler := func(msg *nats.Msg) { - msg.Ack() - err := json.Unmarshal(msg.Data, &data) - if err != nil { - log.Println("Error unmarshalling message data:", err) - return - } - log.Printf("Data Received: %#v,", data) - } - _, err := js.Subscribe(subject, handler, nats.Durable(consumerName), nats.ManualAck()) - if err != nil { - return nil, fmt.Errorf("error subscribing to stream %s: %w", subject, err) - } - return data, nil -} - -func (client *Client) Publish(subject string, streamName string, data interface{}) error { - js := client.js - - resultdata, err := json.Marshal(data) - if err != nil { - return errors.WithMessage(err, "Error marshaling data to JSON") - } - stream, err := js.StreamInfo(streamName) - if err != nil { - if err == nats.ErrStreamNotFound { - client.logger.Printf("Stream does not exist %s", subject) - } else { - client.logger.Printf("Error getting stream: %s", err) - return err - } - } - if stream == nil { - return errors.New("Stream does not exist") - } - _, err = js.Publish(subject, resultdata) - if err != nil { - return errors.WithMessage(err, "Error publishing message") - } - return nil -} diff --git a/sdk/pkg/sdk/clickhouse_insert.go b/sdk/pkg/sdk/clickhouse_insert.go deleted file mode 100644 index 7fb4a65d..00000000 --- a/sdk/pkg/sdk/clickhouse_insert.go +++ /dev/null @@ -1,10 +0,0 @@ -package sdk - -func (sdk *SDK) ClickHouseInsertData(tableName string, data interface{}) error { - err := sdk.clickhouseClient.InsertData(tableName, data) - if err != nil { - return err - } - sdk.logger.Printf("insert into table successfully %v", data) - return nil -} diff --git a/sdk/pkg/sdk/listdata.go b/sdk/pkg/sdk/listdata.go deleted file mode 100644 index 59580a55..00000000 --- a/sdk/pkg/sdk/listdata.go +++ /dev/null @@ -1,10 +0,0 @@ -package sdk - -func (sdk *SDK) ListtData(data interface{}) error { - data, err := sdk.clickhouseClient.List(data) - if err != nil { - return err - } - sdk.logger.Printf("insert into table successfully %v", data) - return nil -} diff --git a/sdk/pkg/sdk/nats_consumer.go b/sdk/pkg/sdk/nats_consumer.go deleted file mode 100644 index 88cffa4e..00000000 --- a/sdk/pkg/sdk/nats_consumer.go +++ /dev/null @@ -1,10 +0,0 @@ -package sdk - -func (sdk *SDK) ConsumeNatsData(subject, consumerName string) error { - data, err := sdk.natsClient.Consumer(subject, consumerName) - if err != nil { - return err - } - sdk.logger.Printf("Consumed successfully from stream %v", data) - return nil -} diff --git a/sdk/pkg/sdk/nats_publisher.go b/sdk/pkg/sdk/nats_publisher.go deleted file mode 100644 index 43f9476a..00000000 --- a/sdk/pkg/sdk/nats_publisher.go +++ /dev/null @@ -1,9 +0,0 @@ -package sdk - -func (sdk *SDK) PublishToNats(subject string, streamName string, data interface{}) error { - if err := sdk.natsClient.Publish(subject, streamName, data); err != nil { - return err - } - sdk.logger.Printf("Message published successfully to stream %v", streamName) - return nil -} diff --git a/sdk/pkg/sdk/nats_stream.go b/sdk/pkg/sdk/nats_stream.go deleted file mode 100644 index 006b3932..00000000 --- a/sdk/pkg/sdk/nats_stream.go +++ /dev/null @@ -1,9 +0,0 @@ -package sdk - -func (sdk *SDK) CreateNatsStream(streamName string, streamSubjects []string) error { - if err := sdk.natsClient.CreateStream(streamName, streamSubjects); err != nil { - return err - } - sdk.logger.Printf("Stream created successfully for streamName %v, streamSubjects %v", streamName, streamSubjects) - return nil -} diff --git a/sdk/pkg/sdk/sdk.go b/sdk/pkg/sdk/sdk.go deleted file mode 100644 index ade6a6c8..00000000 --- a/sdk/pkg/sdk/sdk.go +++ /dev/null @@ -1,38 +0,0 @@ -package sdk - -import ( - "log" - "os" - - "github.com/intelops/kubviz/sdk/pkg/clickhouse" - "github.com/intelops/kubviz/sdk/pkg/nats" -) - -type SDK struct { - natsClient *nats.Client - clickhouseClient *clickhouse.Client - logger *log.Logger -} - -func New(natsCfg *nats.Config, chCfg *clickhouse.Config) (*SDK, error) { - logger := log.New(os.Stdout, "", log.LstdFlags|log.Lshortfile) - natsClient, err := nats.NewClient(natsCfg) - if err != nil { - return nil, err - } - - chClient, err := clickhouse.NewClient(chCfg) - if err != nil { - return nil, err - } - - return &SDK{ - natsClient: natsClient, - clickhouseClient: chClient, - logger: logger, - }, nil -} - -func (sdk *SDK) Start() error { - return nil -}