Skip to content

Commit

Permalink
Merge pull request #229 from practo/redshift-table-use-metric
Browse files Browse the repository at this point in the history
Collect Redshift Metrics
  • Loading branch information
alok87 authored May 24, 2021
2 parents adaea07 + 3db990f commit 155fe0c
Show file tree
Hide file tree
Showing 10 changed files with 283 additions and 51 deletions.
36 changes: 33 additions & 3 deletions cmd/redshiftsink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License.
package main

import (
"context"
"flag"
"math/rand"
"os"
Expand All @@ -24,13 +25,15 @@ import (

"github.com/practo/klog/v2"
prometheus "github.com/practo/tipoca-stream/redshiftsink/pkg/prometheus"
redshift "github.com/practo/tipoca-stream/redshiftsink/pkg/redshift"
pflag "github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/klog/klogr"
ctrl "sigs.k8s.io/controller-runtime"
client "sigs.k8s.io/controller-runtime/pkg/client"
metrics "sigs.k8s.io/controller-runtime/pkg/metrics"

tipocav1 "github.com/practo/tipoca-stream/redshiftsink/api/v1"
"github.com/practo/tipoca-stream/redshiftsink/controllers"
Expand All @@ -49,20 +52,23 @@ func init() {

_ = tipocav1.AddToScheme(scheme)
// +kubebuilder:scaffold:scheme

metrics.Registry.MustRegister(redshift.RedshiftQueryTotalMetric)
}

func main() {
rand.Seed(time.Now().UnixNano())

var enableLeaderElection bool
var enableLeaderElection, collectRedshiftMetrics bool
var batcherImage, loaderImage, secretRefName, secretRefNamespace, kafkaVersion, metricsAddr, allowedRsks, prometheusURL string
var redshiftMaxOpenConns, redshiftMaxIdleConns int
flag.StringVar(&batcherImage, "default-batcher-image", "746161288457.dkr.ecr.ap-south-1.amazonaws.com/redshiftbatcher:latest", "image to use for the redshiftbatcher")
flag.StringVar(&loaderImage, "default-loader-image", "746161288457.dkr.ecr.ap-south-1.amazonaws.com/redshiftloader:latest", "image to use for the redshiftloader")
flag.StringVar(&secretRefName, "default-secret-ref-name", "redshiftsink-secret", "default secret name for all redshiftsink secret")
flag.StringVar(&secretRefNamespace, "default-secret-ref-namespace", "ts-redshiftsink-latest", "default namespace where redshiftsink secret is there")
flag.BoolVar(&collectRedshiftMetrics, "collect-redshift-metrics", false, "collectRedshiftMetrics when enabled collects redshift metrics for better calculations, used for calculating throttling seconds value at present for each table")
flag.StringVar(&kafkaVersion, "default-kafka-version", "2.6.0", "default kafka version")
flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&metricsAddr, "metrics-addr", ":8443", "The address the metric endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false, "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.")
flag.IntVar(&redshiftMaxOpenConns, "default-redshift-max-open-conns", 10, "the maximum number of open connections allowed to redshift per redshiftsink resource")
flag.IntVar(&redshiftMaxIdleConns, "default-redshift-max-idle-conns", 2, "the maximum number of idle connections allowed to redshift per redshiftsink resource")
Expand Down Expand Up @@ -132,9 +138,33 @@ func main() {
}
// +kubebuilder:scaffold:builder

if !collectRedshiftMetrics {
setupLog.Info("Starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
return
}

ctx, cancel := context.WithCancel(ctrl.SetupSignalHandler())
defer cancel()

setupLog.Info("Staring Redshift metrics")
collector, err := controllers.NewRedshiftCollector(uncachedClient, secretRefName, secretRefNamespace)
if err != nil {
setupLog.Error(err, "problem initializing collector")
os.Exit(1)
}
wg := &sync.WaitGroup{}
wg.Add(1)
go collector.Collect(ctx, wg)

setupLog.Info("Starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}

wg.Wait()
}
2 changes: 1 addition & 1 deletion config/default/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ bases:
# [CERTMANAGER] To enable cert-manager, uncomment all sections with 'CERTMANAGER'. 'WEBHOOK' components are required.
#- ../certmanager
# [PROMETHEUS] To enable prometheus monitor, uncomment all sections with 'PROMETHEUS'.
#- ../prometheus
- ../prometheus

patchesStrategicMerge:
# Protect the /metrics endpoint by putting it behind auth.
Expand Down
1 change: 1 addition & 0 deletions config/operator/redshiftsink_operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ spec:
- --default-redshift-max-idle-conns=2
- --allowed-rsks=
- --promethus-url=
- --collect-redshift-metrics=false
resources:
limits:
cpu: 300m
Expand Down
13 changes: 8 additions & 5 deletions config/prometheus/monitor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
labels:
control-plane: controller-manager
name: controller-manager-metrics-monitor
namespace: system
app: redshiftsink-operator
name: redshiftsink-operator
namespace: monitoring
spec:
namespaceSelector:
matchNames:
- kube-system
endpoints:
- path: /metrics
port: https
port: redshift-metrics
selector:
matchLabels:
control-plane: controller-manager
app: redshiftsink-operator
16 changes: 10 additions & 6 deletions config/rbac/auth_proxy_service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ apiVersion: v1
kind: Service
metadata:
labels:
control-plane: controller-manager
name: controller-manager-metrics-service
namespace: system
app: redshiftsink-operator
name: redshiftsink-operator
namespace: kube-system
spec:
clusterIP: None
ports:
- name: https
- name: redshift-metrics
port: 8443
targetPort: https
protocol: TCP
targetPort: 8443
selector:
control-plane: controller-manager
app: redshiftsink-operator
sessionAffinity: None
type: ClusterIP
106 changes: 106 additions & 0 deletions controllers/redshift.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package controllers

import (
"context"
"fmt"
"github.com/practo/klog/v2"
"github.com/practo/tipoca-stream/redshiftsink/pkg/redshift"
"sigs.k8s.io/controller-runtime/pkg/client"
"sync"
"time"
)

type RedshiftCollector struct {
redshifter *redshift.Redshift
}

func NewRedshiftCollector(client client.Client, secretName, secretNamespace string) (*RedshiftCollector, error) {
secret := make(map[string]string)
k8sSecret, err := getSecret(context.Background(), client, secretName, secretNamespace)
if err != nil {
return nil, fmt.Errorf("Error getting secret, %v", err)
}
for key, value := range k8sSecret.Data {
secret[key] = string(value)
}

redshifter, err := NewRedshiftConnection(secret, "")
if err != nil {
return nil, err
}

return &RedshiftCollector{
redshifter: redshifter,
}, nil
}

func (r *RedshiftCollector) Collect(ctx context.Context, wg *sync.WaitGroup) error {
defer wg.Done()
// TODO: make it possible to remove below comment, create the view
// expects the view to be present redshiftsink_operator.scan_query_total
err := r.redshifter.CollectQueryTotal(ctx)
if err != nil {
klog.Errorf("Redshift Collector shutdown due to error: %v", err)
return err
}
for {
select {
case <-ctx.Done():
klog.V(2).Infof("ctx cancelled, bye collector")
return nil
case <-time.After(time.Second * 1800):
err := r.redshifter.CollectQueryTotal(ctx)
if err != nil {
klog.Errorf("Redshift Collector shutdown due to error: %v", err)
return err
}
}
}

return nil
}

func NewRedshiftConnection(
secret map[string]string,
schema string,
) (
*redshift.Redshift,
error,
) {
redshiftSecret := make(map[string]string)
redshiftSecretKeys := []string{
"redshiftHost",
"redshiftPort",
"redshiftDatabase",
"redshiftUser",
"redshiftPassword",
}
for _, key := range redshiftSecretKeys {
value, err := secretByKey(secret, key)
if err != nil {
return nil, err
}
redshiftSecret[key] = value
}
config := redshift.RedshiftConfig{
Schema: schema,
Host: redshiftSecret["redshiftHost"],
Port: redshiftSecret["redshiftPort"],
Database: redshiftSecret["redshiftDatabase"],
User: redshiftSecret["redshiftUser"],
Password: redshiftSecret["redshiftPassword"],
Timeout: 10,
Stats: true,
MaxOpenConns: 3,
MaxIdleConns: 3,
}

conn, err := redshift.NewRedshift(config)
if err != nil {
return nil, fmt.Errorf(
"Error creating redshift connecton, config: %+v, err: %v",
config, err)
}

return conn, nil
}
4 changes: 3 additions & 1 deletion controllers/redshiftsink_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,9 @@ func (r *RedshiftSinkReconciler) reconcile(

if len(status.realtime) == 0 {
klog.V(2).Infof("rsk/%s nothing done in reconcile", rsk.Name)
if len(status.reloading) > 0 || len(status.realtime) > 0 {
return resultRequeueMilliSeconds(15000), events, nil
}
return resultRequeueMilliSeconds(900000), events, nil
}

Expand All @@ -550,7 +553,6 @@ func (r *RedshiftSinkReconciler) reconcile(
var releaser *releaser
if len(releaseCandidates) > 0 {
releaser, err = newReleaser(
rsk.Spec.Loader.RedshiftSchema,
repo,
filePath,
currentMaskVersion,
Expand Down
38 changes: 3 additions & 35 deletions controllers/release.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type releaser struct {
}

func newReleaser(
schema string,
repo string,
filePath string,
currentVersion string,
Expand All @@ -36,42 +35,11 @@ func newReleaser(
*releaser,
error,
) {
redshiftSecret := make(map[string]string)
redshiftSecretKeys := []string{
"redshiftHost",
"redshiftPort",
"redshiftDatabase",
"redshiftUser",
"redshiftPassword",
}
for _, key := range redshiftSecretKeys {
value, err := secretByKey(secret, key)
if err != nil {
return nil, err
}
redshiftSecret[key] = value
}

config := redshift.RedshiftConfig{
Schema: schema,
Host: redshiftSecret["redshiftHost"],
Port: redshiftSecret["redshiftPort"],
Database: redshiftSecret["redshiftDatabase"],
User: redshiftSecret["redshiftUser"],
Password: redshiftSecret["redshiftPassword"],
Timeout: 10,
Stats: true,
MaxOpenConns: 3,
MaxIdleConns: 3,
}

redshifter, err := redshift.NewRedshift(config)
schema := rsk.Spec.Loader.RedshiftSchema
redshifter, err := NewRedshiftConnection(secret, schema)
if err != nil {
return nil, fmt.Errorf(
"Error creating redshift connecton, config: %+v, err: %v",
config, err)
return nil, err
}

return &releaser{
schema: schema,
redshifter: redshifter,
Expand Down
51 changes: 51 additions & 0 deletions pkg/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
multierror "github.com/hashicorp/go-multierror"
"github.com/practo/klog/v2"
"github.com/prometheus/client_golang/prometheus"
"math"
"strconv"

Expand All @@ -18,6 +19,18 @@ import (
"strings"
)

var (
RedshiftQueryTotalMetric = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "redshift",
Subsystem: "scan",
Name: "query_total",
Help: "total number of queries executed",
},
[]string{"database", "schema", "tablename", "tableid"},
)
)

const (
RedshiftBoolean = "boolean"

Expand Down Expand Up @@ -930,6 +943,44 @@ func (r *Redshift) GetTableMetadata(ctx context.Context, schema, tableName strin
return &retTable, nil
}

type queryTotalRow struct {
schema string
tableID string
tableName string
queryTotal float64
}

// CollectQueryTotal collects total queries for each table
// expects the view redshiftsink_operator.scan_query_total to be present
func (r *Redshift) CollectQueryTotal(ctx context.Context) error {
klog.V(2).Info("collecting redshift.scan.query_total")

query := `select schemaname,tableid,tablename,query_total from redshiftsink_operator.scan_queries_total`
rows, err := r.QueryContext(ctx, query)
if err != nil {
return fmt.Errorf("error running query: %s, err: %s", query, err)
}
defer rows.Close()

for rows.Next() {
var row queryTotalRow
err := rows.Scan(&row.schema, &row.tableID, &row.tableName, &row.queryTotal)
if err != nil {
return fmt.Errorf("error scanning query view, err: %s", err)
}
RedshiftQueryTotalMetric.WithLabelValues(
r.conf.Database,
row.schema,
row.tableID,
row.tableName,
).Set(row.queryTotal)
}

klog.V(2).Info("collected redshift.scan.query_total")

return nil
}

// CheckSchemas takes in two tables and compares their column schemas
// to make sure they're compatible. If they have any mismatched columns
// they are returned in the errors array. Covers most of the schema migration
Expand Down
Loading

0 comments on commit 155fe0c

Please sign in to comment.