Skip to content

Commit

Permalink
fix: messaging and references
Browse files Browse the repository at this point in the history
  • Loading branch information
shreddedbacon committed Jan 16, 2025
1 parent e2d6bee commit b436c1f
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 10 deletions.
35 changes: 30 additions & 5 deletions internal/controllers/v1beta2/podmonitor_taskhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (r *LagoonMonitorReconciler) taskLogsToLagoonLogs(opLog logr.Logger,
Environment: lagoonTask.Spec.Environment.Name,
JobName: lagoonTask.ObjectMeta.Name,
JobStatus: condition,
RemoteID: string(jobPod.ObjectMeta.UID),
RemoteID: string(lagoonTask.ObjectMeta.UID),
Key: lagoonTask.Spec.Key,
Cluster: r.LagoonTargetName,
},
Expand All @@ -152,6 +152,15 @@ Logs on pod %s, assigned to cluster %s
// really matter if we don't smootly transition in what we send back to lagoon
return err
}
if r.EnableDebug {
opLog.Info(
fmt.Sprintf(
"Published event %s for %s to lagoon-logs exchange",
fmt.Sprintf("task-logs:job-kubernetes:%s", jobPod.ObjectMeta.Name),
jobPod.ObjectMeta.Name,
),
)
}
// if we are able to publish the message, then we need to remove any pending messages from the resource
// and make sure we don't try and publish again
}
Expand Down Expand Up @@ -186,7 +195,7 @@ func (r *LagoonMonitorReconciler) updateLagoonTask(opLog logr.Logger,
ProjectID: lagoonTask.Spec.Project.ID,
JobName: lagoonTask.ObjectMeta.Name,
JobStatus: condition,
RemoteID: string(jobPod.ObjectMeta.UID),
RemoteID: string(lagoonTask.ObjectMeta.UID),
Key: lagoonTask.Spec.Key,
Cluster: r.LagoonTargetName,
},
Expand Down Expand Up @@ -217,6 +226,14 @@ func (r *LagoonMonitorReconciler) updateLagoonTask(opLog logr.Logger,
// really matter if we don't smootly transition in what we send back to lagoon
return err
}
if r.EnableDebug {
opLog.Info(
fmt.Sprintf(
"Published task update message for %s to lagoon-tasks:controller queue",
jobPod.ObjectMeta.Name,
),
)
}
// if we are able to publish the message, then we need to remove any pending messages from the resource
// and make sure we don't try and publish again
}
Expand All @@ -226,7 +243,6 @@ func (r *LagoonMonitorReconciler) updateLagoonTask(opLog logr.Logger,
// taskStatusLogsToLagoonLogs sends the logs to lagoon-logs message queue, used for general messaging
func (r *LagoonMonitorReconciler) taskStatusLogsToLagoonLogs(opLog logr.Logger,
lagoonTask *lagooncrd.LagoonTask,
jobPod *corev1.Pod,
condition string,
) error {
if r.EnableMQ && lagoonTask != nil {
Expand All @@ -242,7 +258,7 @@ func (r *LagoonMonitorReconciler) taskStatusLogsToLagoonLogs(opLog logr.Logger,
ProjectID: lagoonTask.Spec.Project.ID,
JobName: lagoonTask.ObjectMeta.Name,
JobStatus: condition,
RemoteID: string(jobPod.ObjectMeta.UID),
RemoteID: string(lagoonTask.ObjectMeta.UID),
Key: lagoonTask.Spec.Key,
Cluster: r.LagoonTargetName,
},
Expand All @@ -263,6 +279,15 @@ func (r *LagoonMonitorReconciler) taskStatusLogsToLagoonLogs(opLog logr.Logger,
// really matter if we don't smootly transition in what we send back to lagoon
return err
}
if r.EnableDebug {
opLog.Info(
fmt.Sprintf(
"Published event %s for %s to lagoon-logs exchange",
fmt.Sprintf("task:job-kubernetes:%s", condition),
lagoonTask.ObjectMeta.Name,
),
)
}
// if we are able to publish the message, then we need to remove any pending messages from the resource
// and make sure we don't try and publish again
}
Expand Down Expand Up @@ -347,7 +372,7 @@ Task %s

// send any messages to lagoon message queues
// update the deployment with the status
if err = r.taskStatusLogsToLagoonLogs(opLog, &lagoonTask, &jobPod, taskCondition.ToLower()); err != nil {
if err = r.taskStatusLogsToLagoonLogs(opLog, &lagoonTask, taskCondition.ToLower()); err != nil {
opLog.Error(err, "unable to publish task status logs")
}
if err = r.updateLagoonTask(opLog, &lagoonTask, &jobPod, taskCondition.ToLower()); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions internal/messenger/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (m *Messenger) Consumer(targetName string) { //error {
message.Ack(false) // ack to remove from queue
})
if err != nil {
log.Fatalf(fmt.Sprintf("Failed to set handler to consumer `%s`: %v", "builddeploy-queue", err))
log.Fatalf("Failed to set handler to consumer `%s`: %v", "builddeploy-queue", err)
}

// Handle any tasks that go to the `remove` queue
Expand Down Expand Up @@ -228,7 +228,7 @@ func (m *Messenger) Consumer(targetName string) { //error {
message.Ack(false) // ack to remove from queue
})
if err != nil {
log.Fatalf(fmt.Sprintf("Failed to set handler to consumer `%s`: %v", "remove-queue", err))
log.Fatalf("Failed to set handler to consumer `%s`: %v", "remove-queue", err)
}

// Handle any tasks that go to the `jobs` queue
Expand Down Expand Up @@ -286,7 +286,7 @@ func (m *Messenger) Consumer(targetName string) { //error {
message.Ack(false) // ack to remove from queue
})
if err != nil {
log.Fatalf(fmt.Sprintf("Failed to set handler to consumer `%s`: %v", "jobs-queue", err))
log.Fatalf("Failed to set handler to consumer `%s`: %v", "jobs-queue", err)
}

// Handle any tasks that go to the `misc` queue
Expand Down Expand Up @@ -499,7 +499,7 @@ func (m *Messenger) Consumer(targetName string) { //error {
message.Ack(false) // ack to remove from queue
})
if err != nil {
log.Fatalf(fmt.Sprintf("Failed to set handler to consumer `%s`: %v", "misc-queue", err))
log.Fatalf("Failed to set handler to consumer `%s`: %v", "misc-queue", err)
}
<-forever
}
1 change: 0 additions & 1 deletion internal/messenger/tasks_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func (m *Messenger) ResticRestore(namespace string, jobSpec *lagoonv1beta2.Lagoo
// just log the error then return
return nil
}

// check if k8up crds exist in the cluster
k8upv1alpha1Exists := false
k8upv1Exists := false
Expand Down

0 comments on commit b436c1f

Please sign in to comment.