Skip to content

Commit

Permalink
Redshift metrics collector
Browse files Browse the repository at this point in the history
  • Loading branch information
alok87 committed May 18, 2021
1 parent bf4b07a commit e012d31
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 38 deletions.
34 changes: 32 additions & 2 deletions cmd/redshiftsink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License.
package main

import (
"context"
"flag"
"math/rand"
"os"
Expand All @@ -24,13 +25,15 @@ import (

"github.com/practo/klog/v2"
prometheus "github.com/practo/tipoca-stream/redshiftsink/pkg/prometheus"
redshift "github.com/practo/tipoca-stream/redshiftsink/pkg/redshift"
pflag "github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/klog/klogr"
ctrl "sigs.k8s.io/controller-runtime"
client "sigs.k8s.io/controller-runtime/pkg/client"
metrics "sigs.k8s.io/controller-runtime/pkg/metrics"

tipocav1 "github.com/practo/tipoca-stream/redshiftsink/api/v1"
"github.com/practo/tipoca-stream/redshiftsink/controllers"
Expand All @@ -49,18 +52,21 @@ func init() {

_ = tipocav1.AddToScheme(scheme)
// +kubebuilder:scaffold:scheme

metrics.Registry.MustRegister(redshift.RedshiftQueryTotalMetric)
}

func main() {
rand.Seed(time.Now().UnixNano())

var enableLeaderElection bool
var enableLeaderElection, collectRedshiftMetrics bool
var batcherImage, loaderImage, secretRefName, secretRefNamespace, kafkaVersion, metricsAddr, allowedRsks, prometheusURL string
var redshiftMaxOpenConns, redshiftMaxIdleConns int
flag.StringVar(&batcherImage, "default-batcher-image", "746161288457.dkr.ecr.ap-south-1.amazonaws.com/redshiftbatcher:latest", "image to use for the redshiftbatcher")
flag.StringVar(&loaderImage, "default-loader-image", "746161288457.dkr.ecr.ap-south-1.amazonaws.com/redshiftloader:latest", "image to use for the redshiftloader")
flag.StringVar(&secretRefName, "default-secret-ref-name", "redshiftsink-secret", "default secret name for all redshiftsink secret")
flag.StringVar(&secretRefNamespace, "default-secret-ref-namespace", "ts-redshiftsink-latest", "default namespace where redshiftsink secret is there")
flag.BoolVar(&collectRedshiftMetrics, "collect-redshift-metrics", false, "collectRedshiftMetrics when enabled collects redshift metrics for better calculations, used for calculating throttling seconds value at present for each table")
flag.StringVar(&kafkaVersion, "default-kafka-version", "2.6.0", "default kafka version")
flag.StringVar(&metricsAddr, "metrics-addr", ":8443", "The address the metric endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false, "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.")
Expand Down Expand Up @@ -132,9 +138,33 @@ func main() {
}
// +kubebuilder:scaffold:builder

if !collectRedshiftMetrics {
setupLog.Info("Starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
return
}

ctx, cancel := context.WithCancel(ctrl.SetupSignalHandler())
defer cancel()

setupLog.Info("Staring Redshift metrics")
collector, err := controllers.NewRedshiftCollector(uncachedClient, secretRefName, secretRefNamespace)
if err != nil {
setupLog.Error(err, "problem initializing collector")
os.Exit(1)
}
wg := &sync.WaitGroup{}
wg.Add(1)
go collector.Collect(ctx, wg)

setupLog.Info("Starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}

wg.Wait()
}
101 changes: 101 additions & 0 deletions controllers/redshift.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package controllers

import (
"context"
"fmt"
"github.com/practo/klog/v2"
"github.com/practo/tipoca-stream/redshiftsink/pkg/redshift"
"sigs.k8s.io/controller-runtime/pkg/client"
"sync"
"time"
)

type RedshiftCollector struct {
redshifter *redshift.Redshift
}

func NewRedshiftCollector(client client.Client, secretName, secretNamespace string) (*RedshiftCollector, error) {
secret := make(map[string]string)
k8sSecret, err := getSecret(context.Background(), client, secretName, secretNamespace)
if err != nil {
return nil, fmt.Errorf("Error getting secret, %v", err)
}
for key, value := range k8sSecret.Data {
secret[key] = string(value)
}

redshifter, err := NewRedshiftConnection(secret, "")
if err != nil {
return nil, err
}

return &RedshiftCollector{
redshifter: redshifter,
}, nil
}

func (r *RedshiftCollector) Collect(ctx context.Context, wg *sync.WaitGroup) error {
defer wg.Done()
// TODO: make it possible to remove below comment, create the view
// expects the view to be present redshiftsink_operator.scan_query_total
for {
select {
case <-ctx.Done():
klog.V(2).Infof("ctx cancelled, bye collector")
return nil
case <-time.After(time.Second * 1800):
err := r.redshifter.CollectQueryTotal(ctx)
if err != nil {
klog.Errorf("Redshift Collector shutdown due to error: %v", err)
return err
}
}
}

return nil
}

func NewRedshiftConnection(
secret map[string]string,
schema string,
) (
*redshift.Redshift,
error,
) {
redshiftSecret := make(map[string]string)
redshiftSecretKeys := []string{
"redshiftHost",
"redshiftPort",
"redshiftDatabase",
"redshiftUser",
"redshiftPassword",
}
for _, key := range redshiftSecretKeys {
value, err := secretByKey(secret, key)
if err != nil {
return nil, err
}
redshiftSecret[key] = value
}
config := redshift.RedshiftConfig{
Schema: schema,
Host: redshiftSecret["redshiftHost"],
Port: redshiftSecret["redshiftPort"],
Database: redshiftSecret["redshiftDatabase"],
User: redshiftSecret["redshiftUser"],
Password: redshiftSecret["redshiftPassword"],
Timeout: 10,
Stats: true,
MaxOpenConns: 3,
MaxIdleConns: 3,
}

conn, err := redshift.NewRedshift(config)
if err != nil {
return nil, fmt.Errorf(
"Error creating redshift connecton, config: %+v, err: %v",
config, err)
}

return conn, nil
}
1 change: 0 additions & 1 deletion controllers/redshiftsink_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,6 @@ func (r *RedshiftSinkReconciler) reconcile(
var releaser *releaser
if len(releaseCandidates) > 0 {
releaser, err = newReleaser(
rsk.Spec.Loader.RedshiftSchema,
repo,
filePath,
currentMaskVersion,
Expand Down
38 changes: 3 additions & 35 deletions controllers/release.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type releaser struct {
}

func newReleaser(
schema string,
repo string,
filePath string,
currentVersion string,
Expand All @@ -36,42 +35,11 @@ func newReleaser(
*releaser,
error,
) {
redshiftSecret := make(map[string]string)
redshiftSecretKeys := []string{
"redshiftHost",
"redshiftPort",
"redshiftDatabase",
"redshiftUser",
"redshiftPassword",
}
for _, key := range redshiftSecretKeys {
value, err := secretByKey(secret, key)
if err != nil {
return nil, err
}
redshiftSecret[key] = value
}

config := redshift.RedshiftConfig{
Schema: schema,
Host: redshiftSecret["redshiftHost"],
Port: redshiftSecret["redshiftPort"],
Database: redshiftSecret["redshiftDatabase"],
User: redshiftSecret["redshiftUser"],
Password: redshiftSecret["redshiftPassword"],
Timeout: 10,
Stats: true,
MaxOpenConns: 3,
MaxIdleConns: 3,
}

redshifter, err := redshift.NewRedshift(config)
schema := rsk.Spec.Loader.RedshiftSchema
redshifter, err := NewRedshiftConnection(secret, schema)
if err != nil {
return nil, fmt.Errorf(
"Error creating redshift connecton, config: %+v, err: %v",
config, err)
return nil, err
}

return &releaser{
schema: schema,
redshifter: redshifter,
Expand Down
51 changes: 51 additions & 0 deletions pkg/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
multierror "github.com/hashicorp/go-multierror"
"github.com/practo/klog/v2"
"github.com/prometheus/client_golang/prometheus"
"math"
"strconv"

Expand All @@ -18,6 +19,18 @@ import (
"strings"
)

var (
RedshiftQueryTotalMetric = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "redshift",
Subsystem: "scan",
Name: "query_total",
Help: "total number of queries executed",
},
[]string{"database", "schema", "tablename", "tableid"},
)
)

const (
RedshiftString = "character varying"
RedshiftStringMax = "character varying(65535)"
Expand Down Expand Up @@ -928,6 +941,44 @@ func (r *Redshift) GetTableMetadata(ctx context.Context, schema, tableName strin
return &retTable, nil
}

type queryTotalRow struct {
schema string
tableID string
tableName string
queryTotal float64
}

// CollectQueryTotal collects total queries for each table
// expects the view redshiftsink_operator.scan_query_total to be present
func (r *Redshift) CollectQueryTotal(ctx context.Context) error {
klog.V(2).Info("collecting redshift.scan.query_total")

query := `select schemaname,tableid,tablename,query_total from redshiftsink_operator.scan_queries_total`
rows, err := r.QueryContext(ctx, query)
if err != nil {
return fmt.Errorf("error running query: %s, err: %s", query, err)
}
defer rows.Close()

for rows.Next() {
var row queryTotalRow
err := rows.Scan(&row.schema, &row.tableID, &row.tableName, &row.queryTotal)
if err != nil {
return fmt.Errorf("error scanning query view, err: %s", err)
}
RedshiftQueryTotalMetric.WithLabelValues(
r.conf.Database,
row.schema,
row.tableID,
row.tableName,
).Set(row.queryTotal)
}

klog.V(2).Info("collected redshift.scan.query_total")

return nil
}

// CheckSchemas takes in two tables and compares their column schemas
// to make sure they're compatible. If they have any mismatched columns
// they are returned in the errors array. Covers most of the schema migration
Expand Down
61 changes: 61 additions & 0 deletions redshiftsink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,64 @@ binary: bin/darwin_amd64/redshiftsink
```bash
make run
```

### Enable Throttling (optional)
By default the throttling is disabled.

#### Why throttle?
By default there is no limit put on the number of concurrent loads to Redshift. But if you have huge number of tables to be loaded and the number of loads is impacting the READ in redshift. We enable this feature to not run more than 10 table load at a time. All the tables above 10 are throttled for 15 seconds if this is enabled. If Redshift Exporter is also enabled then the throttling value is determined by the frequency of table use in Redshift.

#### How to enable?
- Export RedshiftLoader metrics to Prometheus.
```
TODO for adding the manifests here
kubectl create -f config/redshiftloader/service.yaml
kubectl create -f config/redshiftloader/servicemonitor.yaml
```
- Set `--prometheus-url` in the RedshiftSink Operator Deployment.
```
kubectl edit deploy -n kube-system redshiftsink-operator
```

### Enable RedshiftSink Exporter (optional)
By default the exporter is disabled. This feature will work only when the Prometheus is also enabled.

#### Why export Redshift metrics to Prometheus?
We throttle the loads to keep the READ fast. Throttling logic by default treats all tables as same. But if the redshift exporter is enabled the less frequently used tables are throttled and not all tables are treated as the same. This is helpful in reducing the load in Redshift and keeping the queries fast.

#### How to enable?
- Prerequisite: Enable Throttling. Please see above.
- Install the table scan view.
```sql
CREATE SCHEMA redshiftsink_operator;
```
```sql
CREATE OR REPLACE VIEW redshiftsink_operator.scan_query_total AS
SELECT DATABASE,
SCHEMA AS schemaname,
table_id as tableid,
"table" AS tablename,
SIZE,
sortkey1,
NVL(s.num_qs,0) query_total
FROM svv_table_info t
LEFT JOIN
(SELECT tbl,
perm_table_name,
COUNT(DISTINCT query) num_qs
FROM stl_scan s
WHERE s.userid > 1
AND s.perm_table_name NOT IN ('Internal Worktable',
'S3')
AND s.perm_table_name NOT LIKE '%staged%'
GROUP BY tbl,
perm_table_name) s ON s.tbl = t.table_id
AND t."schema" NOT IN ('pg_internal')
ORDER BY 7 DESC;
```

- Set `--collect-redshift-metrics` as true in the RedshiftSink Operator Deployment.
```
kubectl edit deploy -n kube-system redshiftsink-operator
```

0 comments on commit e012d31

Please sign in to comment.