diff --git a/cmd/redshiftsink/main.go b/cmd/redshiftsink/main.go index 0bb5c8caf..91ab43919 100644 --- a/cmd/redshiftsink/main.go +++ b/cmd/redshiftsink/main.go @@ -15,6 +15,7 @@ limitations under the License. package main import ( + "context" "flag" "math/rand" "os" @@ -24,6 +25,7 @@ import ( "github.com/practo/klog/v2" prometheus "github.com/practo/tipoca-stream/redshiftsink/pkg/prometheus" + redshift "github.com/practo/tipoca-stream/redshiftsink/pkg/redshift" pflag "github.com/spf13/pflag" "k8s.io/apimachinery/pkg/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -31,6 +33,7 @@ import ( "k8s.io/klog/klogr" ctrl "sigs.k8s.io/controller-runtime" client "sigs.k8s.io/controller-runtime/pkg/client" + metrics "sigs.k8s.io/controller-runtime/pkg/metrics" tipocav1 "github.com/practo/tipoca-stream/redshiftsink/api/v1" "github.com/practo/tipoca-stream/redshiftsink/controllers" @@ -49,20 +52,23 @@ func init() { _ = tipocav1.AddToScheme(scheme) // +kubebuilder:scaffold:scheme + + metrics.Registry.MustRegister(redshift.RedshiftQueryTotalMetric) } func main() { rand.Seed(time.Now().UnixNano()) - var enableLeaderElection bool + var enableLeaderElection, collectRedshiftMetrics bool var batcherImage, loaderImage, secretRefName, secretRefNamespace, kafkaVersion, metricsAddr, allowedRsks, prometheusURL string var redshiftMaxOpenConns, redshiftMaxIdleConns int flag.StringVar(&batcherImage, "default-batcher-image", "746161288457.dkr.ecr.ap-south-1.amazonaws.com/redshiftbatcher:latest", "image to use for the redshiftbatcher") flag.StringVar(&loaderImage, "default-loader-image", "746161288457.dkr.ecr.ap-south-1.amazonaws.com/redshiftloader:latest", "image to use for the redshiftloader") flag.StringVar(&secretRefName, "default-secret-ref-name", "redshiftsink-secret", "default secret name for all redshiftsink secret") flag.StringVar(&secretRefNamespace, "default-secret-ref-namespace", "ts-redshiftsink-latest", "default namespace where redshiftsink secret is there") + flag.BoolVar(&collectRedshiftMetrics, "collect-redshift-metrics", false, "collectRedshiftMetrics when enabled collects redshift metrics for better calculations, used for calculating throttling seconds value at present for each table") flag.StringVar(&kafkaVersion, "default-kafka-version", "2.6.0", "default kafka version") - flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.") + flag.StringVar(&metricsAddr, "metrics-addr", ":8443", "The address the metric endpoint binds to.") flag.BoolVar(&enableLeaderElection, "enable-leader-election", false, "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.") flag.IntVar(&redshiftMaxOpenConns, "default-redshift-max-open-conns", 10, "the maximum number of open connections allowed to redshift per redshiftsink resource") flag.IntVar(&redshiftMaxIdleConns, "default-redshift-max-idle-conns", 2, "the maximum number of idle connections allowed to redshift per redshiftsink resource") @@ -132,9 +138,33 @@ func main() { } // +kubebuilder:scaffold:builder + if !collectRedshiftMetrics { + setupLog.Info("Starting manager") + if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + setupLog.Error(err, "problem running manager") + os.Exit(1) + } + return + } + + ctx, cancel := context.WithCancel(ctrl.SetupSignalHandler()) + defer cancel() + + setupLog.Info("Staring Redshift metrics") + collector, err := controllers.NewRedshiftCollector(uncachedClient, secretRefName, secretRefNamespace) + if err != nil { + setupLog.Error(err, "problem initializing collector") + os.Exit(1) + } + wg := &sync.WaitGroup{} + wg.Add(1) + go collector.Collect(ctx, wg) + setupLog.Info("Starting manager") - if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + if err := mgr.Start(ctx); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } + + wg.Wait() } diff --git a/config/default/kustomization.yaml b/config/default/kustomization.yaml index c64ed86ae..6084dad17 100644 --- a/config/default/kustomization.yaml +++ b/config/default/kustomization.yaml @@ -22,7 +22,7 @@ bases: # [CERTMANAGER] To enable cert-manager, uncomment all sections with 'CERTMANAGER'. 'WEBHOOK' components are required. #- ../certmanager # [PROMETHEUS] To enable prometheus monitor, uncomment all sections with 'PROMETHEUS'. -#- ../prometheus +- ../prometheus patchesStrategicMerge: # Protect the /metrics endpoint by putting it behind auth. diff --git a/config/operator/redshiftsink_operator.yaml b/config/operator/redshiftsink_operator.yaml index 2584f8589..a701f74ae 100644 --- a/config/operator/redshiftsink_operator.yaml +++ b/config/operator/redshiftsink_operator.yaml @@ -50,6 +50,7 @@ spec: - --default-redshift-max-idle-conns=2 - --allowed-rsks= - --promethus-url= + - --collect-redshift-metrics=false resources: limits: cpu: 300m diff --git a/config/prometheus/monitor.yaml b/config/prometheus/monitor.yaml index 9b8047b76..0950ea1f8 100644 --- a/config/prometheus/monitor.yaml +++ b/config/prometheus/monitor.yaml @@ -4,13 +4,16 @@ apiVersion: monitoring.coreos.com/v1 kind: ServiceMonitor metadata: labels: - control-plane: controller-manager - name: controller-manager-metrics-monitor - namespace: system + app: redshiftsink-operator + name: redshiftsink-operator + namespace: monitoring spec: + namespaceSelector: + matchNames: + - kube-system endpoints: - path: /metrics - port: https + port: redshift-metrics selector: matchLabels: - control-plane: controller-manager + app: redshiftsink-operator diff --git a/config/rbac/auth_proxy_service.yaml b/config/rbac/auth_proxy_service.yaml index 6cf656be1..550299b78 100644 --- a/config/rbac/auth_proxy_service.yaml +++ b/config/rbac/auth_proxy_service.yaml @@ -2,13 +2,17 @@ apiVersion: v1 kind: Service metadata: labels: - control-plane: controller-manager - name: controller-manager-metrics-service - namespace: system + app: redshiftsink-operator + name: redshiftsink-operator + namespace: kube-system spec: + clusterIP: None ports: - - name: https + - name: redshift-metrics port: 8443 - targetPort: https + protocol: TCP + targetPort: 8443 selector: - control-plane: controller-manager + app: redshiftsink-operator + sessionAffinity: None + type: ClusterIP diff --git a/controllers/redshift.go b/controllers/redshift.go new file mode 100644 index 000000000..71ab516cc --- /dev/null +++ b/controllers/redshift.go @@ -0,0 +1,106 @@ +package controllers + +import ( + "context" + "fmt" + "github.com/practo/klog/v2" + "github.com/practo/tipoca-stream/redshiftsink/pkg/redshift" + "sigs.k8s.io/controller-runtime/pkg/client" + "sync" + "time" +) + +type RedshiftCollector struct { + redshifter *redshift.Redshift +} + +func NewRedshiftCollector(client client.Client, secretName, secretNamespace string) (*RedshiftCollector, error) { + secret := make(map[string]string) + k8sSecret, err := getSecret(context.Background(), client, secretName, secretNamespace) + if err != nil { + return nil, fmt.Errorf("Error getting secret, %v", err) + } + for key, value := range k8sSecret.Data { + secret[key] = string(value) + } + + redshifter, err := NewRedshiftConnection(secret, "") + if err != nil { + return nil, err + } + + return &RedshiftCollector{ + redshifter: redshifter, + }, nil +} + +func (r *RedshiftCollector) Collect(ctx context.Context, wg *sync.WaitGroup) error { + defer wg.Done() + // TODO: make it possible to remove below comment, create the view + // expects the view to be present redshiftsink_operator.scan_query_total + err := r.redshifter.CollectQueryTotal(ctx) + if err != nil { + klog.Errorf("Redshift Collector shutdown due to error: %v", err) + return err + } + for { + select { + case <-ctx.Done(): + klog.V(2).Infof("ctx cancelled, bye collector") + return nil + case <-time.After(time.Second * 1800): + err := r.redshifter.CollectQueryTotal(ctx) + if err != nil { + klog.Errorf("Redshift Collector shutdown due to error: %v", err) + return err + } + } + } + + return nil +} + +func NewRedshiftConnection( + secret map[string]string, + schema string, +) ( + *redshift.Redshift, + error, +) { + redshiftSecret := make(map[string]string) + redshiftSecretKeys := []string{ + "redshiftHost", + "redshiftPort", + "redshiftDatabase", + "redshiftUser", + "redshiftPassword", + } + for _, key := range redshiftSecretKeys { + value, err := secretByKey(secret, key) + if err != nil { + return nil, err + } + redshiftSecret[key] = value + } + config := redshift.RedshiftConfig{ + Schema: schema, + Host: redshiftSecret["redshiftHost"], + Port: redshiftSecret["redshiftPort"], + Database: redshiftSecret["redshiftDatabase"], + User: redshiftSecret["redshiftUser"], + Password: redshiftSecret["redshiftPassword"], + Timeout: 10, + Stats: true, + MaxOpenConns: 3, + MaxIdleConns: 3, + } + + conn, err := redshift.NewRedshift(config) + if err != nil { + return nil, fmt.Errorf( + "Error creating redshift connecton, config: %+v, err: %v", + config, err) + } + + return conn, nil +} diff --git a/controllers/redshiftsink_controller.go b/controllers/redshiftsink_controller.go index e6556ad89..8de5e4ae7 100644 --- a/controllers/redshiftsink_controller.go +++ b/controllers/redshiftsink_controller.go @@ -534,6 +534,9 @@ func (r *RedshiftSinkReconciler) reconcile( if len(status.realtime) == 0 { klog.V(2).Infof("rsk/%s nothing done in reconcile", rsk.Name) + if len(status.reloading) > 0 || len(status.realtime) > 0 { + return resultRequeueMilliSeconds(15000), events, nil + } return resultRequeueMilliSeconds(900000), events, nil } @@ -550,7 +553,6 @@ func (r *RedshiftSinkReconciler) reconcile( var releaser *releaser if len(releaseCandidates) > 0 { releaser, err = newReleaser( - rsk.Spec.Loader.RedshiftSchema, repo, filePath, currentMaskVersion, diff --git a/controllers/release.go b/controllers/release.go index ea9a963ff..195c4bee3 100644 --- a/controllers/release.go +++ b/controllers/release.go @@ -25,7 +25,6 @@ type releaser struct { } func newReleaser( - schema string, repo string, filePath string, currentVersion string, @@ -36,42 +35,11 @@ func newReleaser( *releaser, error, ) { - redshiftSecret := make(map[string]string) - redshiftSecretKeys := []string{ - "redshiftHost", - "redshiftPort", - "redshiftDatabase", - "redshiftUser", - "redshiftPassword", - } - for _, key := range redshiftSecretKeys { - value, err := secretByKey(secret, key) - if err != nil { - return nil, err - } - redshiftSecret[key] = value - } - - config := redshift.RedshiftConfig{ - Schema: schema, - Host: redshiftSecret["redshiftHost"], - Port: redshiftSecret["redshiftPort"], - Database: redshiftSecret["redshiftDatabase"], - User: redshiftSecret["redshiftUser"], - Password: redshiftSecret["redshiftPassword"], - Timeout: 10, - Stats: true, - MaxOpenConns: 3, - MaxIdleConns: 3, - } - - redshifter, err := redshift.NewRedshift(config) + schema := rsk.Spec.Loader.RedshiftSchema + redshifter, err := NewRedshiftConnection(secret, schema) if err != nil { - return nil, fmt.Errorf( - "Error creating redshift connecton, config: %+v, err: %v", - config, err) + return nil, err } - return &releaser{ schema: schema, redshifter: redshifter, diff --git a/pkg/redshift/redshift.go b/pkg/redshift/redshift.go index b2e8cf945..9fb8bfa9b 100644 --- a/pkg/redshift/redshift.go +++ b/pkg/redshift/redshift.go @@ -6,6 +6,7 @@ import ( "fmt" multierror "github.com/hashicorp/go-multierror" "github.com/practo/klog/v2" + "github.com/prometheus/client_golang/prometheus" "math" "strconv" @@ -18,6 +19,18 @@ import ( "strings" ) +var ( + RedshiftQueryTotalMetric = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "redshift", + Subsystem: "scan", + Name: "query_total", + Help: "total number of queries executed", + }, + []string{"database", "schema", "tablename", "tableid"}, + ) +) + const ( RedshiftBoolean = "boolean" @@ -930,6 +943,44 @@ func (r *Redshift) GetTableMetadata(ctx context.Context, schema, tableName strin return &retTable, nil } +type queryTotalRow struct { + schema string + tableID string + tableName string + queryTotal float64 +} + +// CollectQueryTotal collects total queries for each table +// expects the view redshiftsink_operator.scan_query_total to be present +func (r *Redshift) CollectQueryTotal(ctx context.Context) error { + klog.V(2).Info("collecting redshift.scan.query_total") + + query := `select schemaname,tableid,tablename,query_total from redshiftsink_operator.scan_queries_total` + rows, err := r.QueryContext(ctx, query) + if err != nil { + return fmt.Errorf("error running query: %s, err: %s", query, err) + } + defer rows.Close() + + for rows.Next() { + var row queryTotalRow + err := rows.Scan(&row.schema, &row.tableID, &row.tableName, &row.queryTotal) + if err != nil { + return fmt.Errorf("error scanning query view, err: %s", err) + } + RedshiftQueryTotalMetric.WithLabelValues( + r.conf.Database, + row.schema, + row.tableID, + row.tableName, + ).Set(row.queryTotal) + } + + klog.V(2).Info("collected redshift.scan.query_total") + + return nil +} + // CheckSchemas takes in two tables and compares their column schemas // to make sure they're compatible. If they have any mismatched columns // they are returned in the errors array. Covers most of the schema migration diff --git a/redshiftsink/README.md b/redshiftsink/README.md index 7f041d8df..4250bbd97 100644 --- a/redshiftsink/README.md +++ b/redshiftsink/README.md @@ -219,3 +219,70 @@ binary: bin/darwin_amd64/redshiftsink ```bash make run ``` + +### Enable Throttling (optional) +By default the throttling is disabled. + +#### Why throttle? +By default there is no limit put on the number of concurrent loads to Redshift. But if you have huge number of tables to be loaded and the number of loads is impacting the READ in redshift. We enable this feature to not run more than 10 table load at a time. All the tables above 10 are throttled for 15 seconds if this is enabled. If Redshift Exporter is also enabled then the throttling value is determined by the frequency of table use in Redshift. + +#### How to enable? +- Export RedshiftLoader metrics to Prometheus. +``` +TODO for adding the manifests here + +kubectl create -f config/redshiftloader/service.yaml +kubectl create -f config/redshiftloader/servicemonitor.yaml +``` +- Set `--prometheus-url` in the RedshiftSink Operator Deployment. +``` +kubectl edit deploy -n kube-system redshiftsink-operator +``` + +### Enable RedshiftSink Exporter (optional) +By default the exporter is disabled. This feature will work only when the Prometheus is also enabled. + +#### Why to export Redshift metrics to Prometheus? +We throttle the loads to keep the READ fast. Throttling logic by default treats all tables as same. But if the redshift exporter is enabled the less frequently used tables are throttled and not all tables are treated as the same. This is helpful in reducing the load in Redshift and keeping the queries fast. + +#### How to enable? +- Prerequisite: Enable Throttling. Please see above. +- Install the table scan view. + +#### Schema +```sql +CREATE SCHEMA redshiftsink_operator; +``` + +#### View +Note: Please substitute `AND s.userid != 100` with the user id(s) of the redshitsink users. We need to ignore the queries from the redshiftsink users to keep the calculation usable. +```sql +CREATE OR REPLACE VIEW redshiftsink_operator.scan_query_total AS +SELECT DATABASE, + SCHEMA AS schemaname, + table_id as tableid, + "table" AS tablename, + SIZE, + sortkey1, + NVL(s.num_qs,0) query_total +FROM svv_table_info t +LEFT JOIN + (SELECT tbl, + perm_table_name, + COUNT(DISTINCT query) num_qs + FROM stl_scan s + WHERE s.userid > 1 + AND s.userid != 100 + AND s.perm_table_name NOT IN ('Internal Worktable', + 'S3') + AND s.perm_table_name NOT LIKE '%staged%' + GROUP BY tbl, + perm_table_name) s ON s.tbl = t.table_id +AND t."schema" NOT IN ('pg_internal') +ORDER BY 7 DESC; +``` + +- Set `--collect-redshift-metrics` as true in the RedshiftSink Operator Deployment. +``` +kubectl edit deploy -n kube-system redshiftsink-operator +```