Skip to content

Commit

Permalink
Init commit for public repo
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilya Levin committed Jul 31, 2021
0 parents commit 132fd1b
Show file tree
Hide file tree
Showing 11 changed files with 1,249 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pkg
src
bin
.vscode
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.vscode
bin
pkg
src
26 changes: 26 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
FROM golang:1.13

WORKDIR /go/

ENV GOPATH /go/src
ENV GOBIN /go/bin
ENV NAMESPACE sites
ENV REDIS_HOST localhost
ENV REDIS_PORT 6379
ENV LOGLEVEL INFO
ENV AWS_ACCESS_KEY_ID SECRET
ENV AWS_SECRET_ACCESS_KEY SECRET
# By default REMOTE_REDIS_HOSTS should not be set
# ENV REMOTE_REDIS_HOSTS localhost
# By defualt WEBHOOKS_CONFIG should not be set
# ENV WEBHOOKS_CONFIG /config/webhook.json
COPY ./cmd ./cmd/
COPY ./go.mod ./go.mod
COPY ./go.sum ./go.sum

# Download all dependencies and generate the binary
RUN go mod download && \
go install ./cmd/cache-invalidator && \
chmod -R +x ./bin

CMD ["./bin/cache-invalidator"]
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
## cache invalidator

Invalidate cache (Cloud front) when stage or prod upgrade occurs

## Info

This worker will continuously go over all the deployments in a give namespace and will cache all the current tags.
If a tag changes, i.e. upgrade occured, the worker will get the site domain from the ingress object,find the corresponding CF distribution and will invalidate it.
The invalidation will only happen once the new deployment rolled out succesfully otherwise the worker will wait for all the pods to be in a running state.
Waiting for pods is a non blocking operation as goroutines are used, i.e. each invalidation is happening concurrently.
If a deployment is unsuccessful (and is rolled back) the waiting for pods procedure will timeout and skip invalidation

## Cluster groups

A highly available (K8S) architecture with multiple clusters hosting the same workloads with a single routing point are called cluster groups. In this arch the worker needs :
1. For a signle rollout across all the cluster groups only 1 invalidaion should happens
2. Invalidation happens only when all the workloads (across all the cluster groups) were rolled out successfuly

This worker uses redis instances to implement this logic

## Env vars
* `REDIS_HOST` - Redis domain to cache all the current tags
* `REDIS_PORT` - Redis port
* `REMOTE_REDIS_HOSTS` - Comma seperated list of remote redis hosts (redis for every cluster in the cluster group), If empty the worker is working in a non cluster groups mode
* `LOGLEVEL` - Worker log level
* `AWS_ACCESS_KEY_ID` - aws key with CF invalidation privileges
* `AWS_SECRET_ACCESS_KEY` - aws secret key
* `WEBHOOKS_CONFIG` - path to webhook config file

## Webhooks
This controller also supports post invalidation actions via webhooks.
If `WEBHOOKS_CONFIG` is set - pointing to the webhook config file (json), the controller will trigger a webhook once invalidation is complete.
In case the invalidation takes too long to complete, a timeout will be issued for this "invalidation" process. An example of the config file can be found under configs/webhooks.go

# How to install
TODO
243 changes: 243 additions & 0 deletions cmd/cache-invalidator/aws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
package main

import (
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudfront"
"github.com/go-redis/redis/v8"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
)

const (
InvalidationThreshold = time.Second * 600
InvalidationCheckInterval = time.Second * 30
)

func waitForClusterGroups(deploymentName string, tag string, remoteRedisHost *redis.Client) error {
end := time.Now().Add(time.Second * 120)

for true {
<-time.NewTimer(time.Second * 10).C

log.Debug("Waiting for " + deploymentName + ":" + tag + " rollout on remote cluster - " + remoteRedisHost.String())
latestTag, err := remoteRedisHost.Get(ctx, deploymentName).Result()
if err != nil {
return fmt.Errorf("Error while trying to query for " + deploymentName + ":" + tag + " in other clusters - " + remoteRedisHost.String() + ":" + err.Error())
}
if latestTag == tag {
return nil
}
if time.Now().After(end) {
return fmt.Errorf("Rollout of " + deploymentName + ":" + tag + " did not complete in time in other cluster groups")
}
}
return nil
}

// Get the distribution ID based on the domain
func getDistributionID(sess *session.Session, domain string) string {
svc := cloudfront.New(sess)
ID := ""
for true {
var result *cloudfront.ListDistributionsOutput
// Get the CF dist list in chunks of 50
input := &cloudfront.ListDistributionsInput{
Marker: aws.String(ID),
MaxItems: aws.Int64(50),
}
result, err := svc.ListDistributions(input)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case cloudfront.ErrCodeInvalidArgument:
log.Error(cloudfront.ErrCodeInvalidArgument, aerr.Error())
default:
log.Error(aerr.Error())
}
} else {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
log.Error(err.Error())
}
return ""
}

// If no more distribution chunks left exit the loop
if result.DistributionList.Items == nil {
break
}

// For every distribution
for _, d := range result.DistributionList.Items {
// For every CNAME in a distribution
for _, c := range d.Aliases.Items {
if *c == domain {
return *d.Id
}
}
// Paginating results, i.e. using ID as the marker to indicate where to begin in the list for the next call
ID = *d.Id
}
}

log.Info("Couldn't find distribution id for " + domain)
return ""
}

// Invalidate cache
func invalidate(sess *session.Session, distID string, clientset *kubernetes.Clientset, namespace string, deploymentName string, rdb *redis.Client, tag string, remoteRedisHosts []*redis.Client) {
if distID == "" {
release_lock(rdb, deploymentName)
return
}

// Confirm rolling upgrade complete before invalidating (5 min timeout)
err := waitForPodContainersRunning(clientset, deploymentName, namespace, tag)
if err != nil {
log.Error(err)
release_lock(rdb, deploymentName)
return
}

log.Debug("Upgrade of " + deploymentName + " completed with success, updating new tag " + tag)
err = rdb.Set(ctx, deploymentName, tag, 0).Err()
if err != nil {
panic(err)
}
release_lock(rdb, deploymentName)

// Confirm upgrade completed on all other clusters (deploymentName-tag is a distributed workload across multiple clusters with only 1 CDN instance that needs invalidation)
log.Debug("Waiting for successful rollout on other clusters if available")
for _, remoteRedisHost := range remoteRedisHosts {
err := waitForClusterGroups(deploymentName, tag, remoteRedisHost)
if err != nil {
log.Error(err)
return
}
}

log.Info("Invalidating cache for distribution " + distID)
svc := cloudfront.New(sess)
input := &cloudfront.CreateInvalidationInput{
DistributionId: aws.String(distID),
InvalidationBatch: &cloudfront.InvalidationBatch{
CallerReference: aws.String(
fmt.Sprintf("invalidation-id-%s-%s", deploymentName, tag)),
Paths: &cloudfront.Paths{
Quantity: aws.Int64(1),
Items: []*string{
aws.String("/*"),
},
},
},
}

result, err := svc.CreateInvalidation(input)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case cloudfront.ErrCodeAccessDenied:
log.Error(cloudfront.ErrCodeAccessDenied, aerr.Error())
case cloudfront.ErrCodeMissingBody:
log.Error(cloudfront.ErrCodeMissingBody, aerr.Error())
case cloudfront.ErrCodeInvalidArgument:
log.Error(cloudfront.ErrCodeInvalidArgument, aerr.Error())
case cloudfront.ErrCodeNoSuchDistribution:
log.Error(cloudfront.ErrCodeNoSuchDistribution, aerr.Error())
case cloudfront.ErrCodeBatchTooLarge:
log.Error(cloudfront.ErrCodeBatchTooLarge, aerr.Error())
case cloudfront.ErrCodeTooManyInvalidationsInProgress:
log.Error(cloudfront.ErrCodeTooManyInvalidationsInProgress, aerr.Error())
case cloudfront.ErrCodeInconsistentQuantities:
log.Error(cloudfront.ErrCodeInconsistentQuantities, aerr.Error())
default:
log.Error(aerr.Error())
}
} else {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
log.Error(err.Error())
}
return
}

log.Info(result)

// Call post invalidation webhook once invalidation finished (timeout in case invalidation does not complete in time)
end := time.Now().Add(InvalidationThreshold)
getInvalInput := &cloudfront.GetInvalidationInput{
DistributionId: aws.String(distID),
Id: result.Invalidation.Id,
}
invalidationStatus, err := svc.GetInvalidation(getInvalInput)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case cloudfront.ErrCodeAccessDenied:
log.Error(cloudfront.ErrCodeAccessDenied, aerr.Error())
case cloudfront.ErrCodeMissingBody:
log.Error(cloudfront.ErrCodeMissingBody, aerr.Error())
case cloudfront.ErrCodeInvalidArgument:
log.Error(cloudfront.ErrCodeInvalidArgument, aerr.Error())
case cloudfront.ErrCodeNoSuchDistribution:
log.Error(cloudfront.ErrCodeNoSuchDistribution, aerr.Error())
case cloudfront.ErrCodeBatchTooLarge:
log.Error(cloudfront.ErrCodeBatchTooLarge, aerr.Error())
case cloudfront.ErrCodeTooManyInvalidationsInProgress:
log.Error(cloudfront.ErrCodeTooManyInvalidationsInProgress, aerr.Error())
case cloudfront.ErrCodeInconsistentQuantities:
log.Error(cloudfront.ErrCodeInconsistentQuantities, aerr.Error())
default:
log.Error(aerr.Error())
}
} else {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
log.Error(err.Error())
}
return
}

for *invalidationStatus.Invalidation.Status != "Completed" {
<-time.NewTimer(InvalidationCheckInterval).C
invalidationStatus, err = svc.GetInvalidation(getInvalInput)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case cloudfront.ErrCodeAccessDenied:
log.Error(cloudfront.ErrCodeAccessDenied, aerr.Error())
case cloudfront.ErrCodeMissingBody:
log.Error(cloudfront.ErrCodeMissingBody, aerr.Error())
case cloudfront.ErrCodeInvalidArgument:
log.Error(cloudfront.ErrCodeInvalidArgument, aerr.Error())
case cloudfront.ErrCodeNoSuchDistribution:
log.Error(cloudfront.ErrCodeNoSuchDistribution, aerr.Error())
case cloudfront.ErrCodeBatchTooLarge:
log.Error(cloudfront.ErrCodeBatchTooLarge, aerr.Error())
case cloudfront.ErrCodeTooManyInvalidationsInProgress:
log.Error(cloudfront.ErrCodeTooManyInvalidationsInProgress, aerr.Error())
case cloudfront.ErrCodeInconsistentQuantities:
log.Error(cloudfront.ErrCodeInconsistentQuantities, aerr.Error())
default:
log.Error(aerr.Error())
}
} else {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
log.Error(err.Error())
}
return
}
if time.Now().After(end) {
log.Error(distID + "Took to long to invalidate, timeout, skipping webhook call")
return
}
}

execWebhook(deploymentName)
}
74 changes: 74 additions & 0 deletions cmd/cache-invalidator/k8s.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package main

import (
"context"
"fmt"
"strings"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

const (
deployRunningThreshold = time.Second * 120
deployRunningCheckInterval = time.Second * 10
podSelector = "workload.user.cattle.io/workloadselector"
)

func podContainersRunning(clientSet *kubernetes.Clientset, deploymentName string, namespace string, tag string) (bool, error) {
//Validate that the actual tag on the deployment didn't change. If it did this goroutine is now checking running pods for an irelevant tag and needs to exit
deploymentsClient := clientSet.AppsV1().Deployments(namespace)
deployment, err := deploymentsClient.Get(context.TODO(), deploymentName, metav1.GetOptions{})
if err != nil {
return false, err
}
image := deployment.Spec.Template.Spec.Containers[0].Image
currentTag := strings.Split(image, ":")[1]
// Do we still have the original new tag this goroutine started working on - or do we have a new one (workload rolledback/new upgrade etc.)
if currentTag != tag {
return false, fmt.Errorf("Deployment " + deploymentName + " updated while waiting on pods to complete for tag: " + tag + ", Skipping invalidation")
}

pods, err := clientSet.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: fmt.Sprintf(podSelector+"=%s", "deployment-"+namespace+"-"+deploymentName),
})
if err != nil {
return false, err
}
if len(pods.Items) == 0 {
return false, fmt.Errorf("No pods found for selector label : deployment-" + namespace + "-" + deploymentName + ", Skipping invalidation")
}

for _, item := range pods.Items {
for _, condition := range item.Status.Conditions {
if condition.Type == "Ready" && condition.Status == "False" {
return false, nil
}
}
}
return true, nil
}

func waitForPodContainersRunning(clientSet *kubernetes.Clientset, deploymentName string, namespace string, tag string) error {
end := time.Now().Add(deployRunningThreshold)

for true {
<-time.NewTimer(deployRunningCheckInterval).C

var err error
running, err := podContainersRunning(clientSet, deploymentName, namespace, tag)
if running {
return nil
}

if err != nil {
return err
}

if time.Now().After(end) {
return fmt.Errorf("Some of " + deploymentName + " Pods are not starting ... skipping invalidation")
}
}
return nil
}
Loading

0 comments on commit 132fd1b

Please sign in to comment.