From ab8dc22c57167d06678e5f40ae914e2112d5526d Mon Sep 17 00:00:00 2001 From: Tom Prince Date: Thu, 5 Dec 2019 21:00:07 -0700 Subject: [PATCH] Add taskcluster pulse support. --- main.go | 45 ++++++++++++++++++++++++ proxyservice/jenkins.go | 22 ++++++++++++ proxyservice/taskcluster.go | 69 +++++++++++++++++++++++++++++++++++++ 3 files changed, 136 insertions(+) create mode 100644 proxyservice/taskcluster.go diff --git a/main.go b/main.go index 52ad8e4..e38afb6 100644 --- a/main.go +++ b/main.go @@ -9,6 +9,8 @@ import ( "go.mozilla.org/mozlog" "github.com/urfave/cli" + + "github.com/taskcluster/pulse-go/pulse" ) func init() { @@ -47,6 +49,34 @@ func main() { Usage: "Password for authing against jenkins", EnvVar: "JENKINS_PASSWORD", }, + cli.StringFlag{ + Name: "cloudops-pulse-prefix", + Usage: "Pulse route to listen to.", + Value: "cloudops.v1.deploy", + EnvVar: "CLOUDOPS_PULSE_PREFIX", + }, + cli.StringFlag{ + Name: "pulse-queue", + Usage: "Pulse quue to listen to.", + Value: "deploy-proxy", + EnvVar: "PULSE_QUEUE", + }, + cli.StringFlag{ + Name: "pulse-username", + Usage: "Username for authing against pulse", + EnvVar: "PULSE_USERNAME", + }, + cli.StringFlag{ + Name: "pulse-password", + Usage: "Password for authing against pulse", + EnvVar: "PULSE_PASSWORD", + }, + cli.StringFlag{ + Name: "pulse-password", + Usage: "Password for authing against pulse", + Value: "", + EnvVar: "PULSE_HOST", + }, } app.Action = func(c *cli.Context) error { @@ -72,6 +102,21 @@ func main() { w.Write([]byte("OK")) }) + pulse := pulse.NewConnection( + c.String("pulse-username"), + c.String("pulse-password"), + c.String("pulse-host"), + ) + taskclusterPulseHandler := proxyservice.NewTaskclusterPulseHandler( + jenkins, + &pulse, + c.String("cloudops-pulse-prefix"), + ) + + if err := taskclusterPulseHandler.Consume(); err != nil { + return cli.NewExitError(fmt.Sprintf("Could not listen to pulse: %v", err), 1) + } + server := &http.Server{ Addr: c.String("addr"), Handler: mux, diff --git a/proxyservice/jenkins.go b/proxyservice/jenkins.go index 018c985..94a71ed 100644 --- a/proxyservice/jenkins.go +++ b/proxyservice/jenkins.go @@ -116,3 +116,25 @@ func (j *Jenkins) TriggerDockerhubJob(data *DockerHubWebhookData) error { params.Set("RawJSON", string(rawJSON)) return j.TriggerJob(path, params) } + +// triggers a jenkins job given +func (j *Jenkins) TriggerTaskclusterJob(taskID string, route string, data *TaskCompletedMessage) error { + if !regexp.MustCompile(`^[A-Za-z0-9_-]{8}[Q-T][A-Za-z0-9_-][CGKOSWaeimquy26-][A-Za-z0-9_-]{10}[AQgw]$`).MatchString(taskID) { + return fmt.Errorf("Invalid taskID: %s", taskID) + } + // FIXME: This should probably be split on . + if !regexp.MustCompile(`^[a-zA-Z0-9_\-]{2,255}$`).MatchString(route) { + return fmt.Errorf("Invalid data.Repository.Namespace: %s", route) + } + + rawJSON, err := json.Marshal(data) + if err != nil { + return fmt.Errorf("Error marshaling data: %v", err) + } + path := path.Join("/job/taskcluster/job", + route, "job", route) + params := url.Values{} + params.Set("TASK_ID", taskID) + params.Set("RawJSON", string(rawJSON)) + return j.TriggerJob(path, params) +} diff --git a/proxyservice/taskcluster.go b/proxyservice/taskcluster.go new file mode 100644 index 0000000..8a68670 --- /dev/null +++ b/proxyservice/taskcluster.go @@ -0,0 +1,69 @@ +package proxyservice + +import ( + "fmt" + "strings" + + "github.com/streadway/amqp" + "github.com/taskcluster/pulse-go/pulse" + "github.com/taskcluster/taskcluster/clients/client-go/v23/tcqueueevents" +) + +type TaskCompletedMessage = tcqueueevents.TaskCompletedMessage + +type routeTaskCompleted struct { + Route string +} + +func (binding routeTaskCompleted) RoutingKey() string { + return fmt.Sprintf("route.%s", binding.Route) +} + +func (binding routeTaskCompleted) ExchangeName() string { + return "exchange/taskcluster-queue/v1/task-completed" +} + +func (binding routeTaskCompleted) NewPayloadObject() interface{} { + return new(tcqueueevents.TaskCompletedMessage) +} + +type TaskclusterPulseHandler struct { + Jenkins *Jenkins + Pulse *pulse.Connection + PulseRoutePrefix string +} + +func NewTaskclusterPulseHandler(jenkins *Jenkins, pulse *pulse.Connection, routePrefix string) *TaskclusterPulseHandler { + return &TaskclusterPulseHandler{ + Jenkins: jenkins, + Pulse: pulse, + PulseRoutePrefix: routePrefix, + } +} + +func (handler *TaskclusterPulseHandler) handleMessage(message interface{}, delivery amqp.Delivery) { + routingKeyPrefix := "route." + handler.PulseRoutePrefix + switch t := message.(type) { + case *tcqueueevents.TaskCompletedMessage: + if strings.HasPrefix(delivery.RoutingKey, routingKeyPrefix) { + route := strings.TrimPrefix(delivery.RoutingKey, routingKeyPrefix) + if err := handler.Jenkins.TriggerTaskclusterJob(t.Status.TaskID, route, t); err != nil { + log.Printf("Error triggering taskcluster job: %s", err) + } + } + } + delivery.Ack(false) // acknowledge message *after* processing + +} + +func (handler *TaskclusterPulseHandler) Consume() error { + routingKeyPrefix := "route." + handler.PulseRoutePrefix + _, err := handler.Pulse.Consume( + "", // queue name + handler.handleMessage, + 1, // prefetch 1 message at a time + false, // don't autoacknowledge messages + routeTaskCompleted{Route:routingKeyPrefix+".#"}, + ) + return err +}