Skip to content

Commit

Permalink
sync-controller: allow source pods to shutdown gracefully
Browse files Browse the repository at this point in the history
sync-controller: set Xmx flag for java source pods
  • Loading branch information
absorbb committed Oct 25, 2024
1 parent 8faea2f commit 1ba160f
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 14 deletions.
5 changes: 3 additions & 2 deletions sync-controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ type Config struct {
// nodeSelector for sync pods in json format, e.g: {"disktype": "ssd"}
KubernetesNodeSelector string `mapstructure:"KUBERNETES_NODE_SELECTOR"`

ContainerStatusCheckSeconds int `mapstructure:"CONTAINER_STATUS_CHECK_SECONDS" default:"10"`
ContainerInitTimeoutSeconds int `mapstructure:"CONTAINER_INIT_TIMEOUT_SECONDS" default:"180"`
ContainerStatusCheckSeconds int `mapstructure:"CONTAINER_STATUS_CHECK_SECONDS" default:"10"`
ContainerGraceShutdownSeconds int `mapstructure:"CONTAINER_GRACE_SHUTDOWN_SECONDS" default:"30"`
ContainerInitTimeoutSeconds int `mapstructure:"CONTAINER_INIT_TIMEOUT_SECONDS" default:"180"`

TaskTimeoutHours int `mapstructure:"TASK_TIMEOUT_HOURS" default:"48"`

Expand Down
45 changes: 33 additions & 12 deletions sync-controller/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/hjson/hjson-go/v4"
"github.com/jitsucom/bulker/jitsubase/appbase"
"github.com/jitsucom/bulker/jitsubase/safego"
"github.com/jitsucom/bulker/jitsubase/types"
"github.com/jitsucom/bulker/jitsubase/utils"
"github.com/jitsucom/bulker/jitsubase/uuid"
"github.com/mitchellh/mapstructure"
Expand All @@ -34,12 +35,13 @@ var nonAlphaNum = regexp.MustCompile(`[^a-zA-Z0-9-]`)

type JobRunner struct {
appbase.Service
config *Config
namespace string
clientset *kubernetes.Clientset
closeCh chan struct{}
taskStatusCh chan *TaskStatus
runningPods map[string]time.Time
config *Config
namespace string
clientset *kubernetes.Clientset
closeCh chan struct{}
taskStatusCh chan *TaskStatus
runningPods map[string]time.Time
cleanedUpPods types.Set[string]
}

func NewJobRunner(appContext *Context) (*JobRunner, error) {
Expand All @@ -49,9 +51,10 @@ func NewJobRunner(appContext *Context) (*JobRunner, error) {
return nil, err
}
j := &JobRunner{Service: base, config: appContext.config, clientset: clientset, namespace: appContext.config.KubernetesNamespace,
closeCh: make(chan struct{}),
taskStatusCh: make(chan *TaskStatus, 100),
runningPods: map[string]time.Time{},
closeCh: make(chan struct{}),
taskStatusCh: make(chan *TaskStatus, 100),
runningPods: map[string]time.Time{},
cleanedUpPods: types.NewSet[string](),
}
safego.RunWithRestart(j.watchPodStatuses)
return j, nil
Expand All @@ -70,7 +73,12 @@ func (j *JobRunner) watchPodStatuses() {
j.Errorf("failed to list pods: %v", err.Error())
continue
}
activePods := types.NewSet[string]()
for _, pod := range list.Items {
activePods.Put(pod.Name)
if j.cleanedUpPods.Contains(pod.Name) {
continue
}
taskStatus := TaskStatus{}
_ = mapstructure.Decode(pod.Annotations, &taskStatus)
taskStatus.PodName = pod.Name
Expand Down Expand Up @@ -131,6 +139,17 @@ func (j *JobRunner) watchPodStatuses() {
}
j.sendStatus(&taskStatus)
}
//clean up pods that are not active anymore
for podName := range j.runningPods {
if !activePods.Contains(podName) {
delete(j.runningPods, podName)
}
}
for podName := range j.cleanedUpPods {
if !activePods.Contains(podName) {
j.cleanedUpPods.Remove(podName)
}
}
}
}

Expand All @@ -145,11 +164,11 @@ func (j *JobRunner) sendStatus(taskStatus *TaskStatus) {
}

func (j *JobRunner) cleanupPod(name string) {
gracePeriodSeconds := int64(math.Max(1.0, float64(j.config.ContainerStatusCheckSeconds)*0.8))
j.cleanedUpPods.Put(name)
gracePeriodSeconds := int64(j.config.ContainerGraceShutdownSeconds)
_ = j.clientset.CoreV1().Pods(j.namespace).Delete(context.Background(), name, metav1.DeleteOptions{GracePeriodSeconds: &gracePeriodSeconds})
_ = j.clientset.CoreV1().Secrets(j.namespace).Delete(context.Background(), name+"-config", metav1.DeleteOptions{GracePeriodSeconds: &gracePeriodSeconds})
_ = j.clientset.CoreV1().ConfigMaps(j.namespace).Delete(context.Background(), name+"-config", metav1.DeleteOptions{GracePeriodSeconds: &gracePeriodSeconds})
delete(j.runningPods, name)
}

func accumulatePodStatus(status v1.PodStatus) string {
Expand Down Expand Up @@ -631,7 +650,9 @@ func (j *JobRunner) createPod(podName string, task TaskDescriptor, configuration
Image: fmt.Sprintf("%s:%s", task.Package, task.PackageVersion),
Command: []string{"sh", "-c", fmt.Sprintf("eval \"$AIRBYTE_ENTRYPOINT %s\" 2> /pipes/stderr > /pipes/stdout", command)},
Env: []v1.EnvVar{{Name: "USE_STREAM_CAPABLE_STATE", Value: "true"},
{Name: "AUTO_DETECT_SCHEMA", Value: "true"}},
{Name: "AUTO_DETECT_SCHEMA", Value: "true"},
{Name: "JAVA_OPTS", Value: "-Xmx8192m"}},

VolumeMounts: volumeMounts,
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
Expand Down

0 comments on commit 1ba160f

Please sign in to comment.