Skip to content

Commit

Permalink
Add taskcluster pulse support.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomprince committed Dec 6, 2019
1 parent 2447339 commit ab8dc22
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 0 deletions.
45 changes: 45 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"go.mozilla.org/mozlog"

"github.com/urfave/cli"

"github.com/taskcluster/pulse-go/pulse"
)

func init() {
Expand Down Expand Up @@ -47,6 +49,34 @@ func main() {
Usage: "Password for authing against jenkins",
EnvVar: "JENKINS_PASSWORD",
},
cli.StringFlag{
Name: "cloudops-pulse-prefix",
Usage: "Pulse route to listen to.",
Value: "cloudops.v1.deploy",
EnvVar: "CLOUDOPS_PULSE_PREFIX",
},
cli.StringFlag{
Name: "pulse-queue",
Usage: "Pulse quue to listen to.",
Value: "deploy-proxy",
EnvVar: "PULSE_QUEUE",
},
cli.StringFlag{
Name: "pulse-username",
Usage: "Username for authing against pulse",
EnvVar: "PULSE_USERNAME",
},
cli.StringFlag{
Name: "pulse-password",
Usage: "Password for authing against pulse",
EnvVar: "PULSE_PASSWORD",
},
cli.StringFlag{
Name: "pulse-password",
Usage: "Password for authing against pulse",
Value: "",
EnvVar: "PULSE_HOST",
},
}

app.Action = func(c *cli.Context) error {
Expand All @@ -72,6 +102,21 @@ func main() {
w.Write([]byte("OK"))
})

pulse := pulse.NewConnection(
c.String("pulse-username"),
c.String("pulse-password"),
c.String("pulse-host"),
)
taskclusterPulseHandler := proxyservice.NewTaskclusterPulseHandler(
jenkins,
&pulse,
c.String("cloudops-pulse-prefix"),
)

if err := taskclusterPulseHandler.Consume(); err != nil {
return cli.NewExitError(fmt.Sprintf("Could not listen to pulse: %v", err), 1)
}

server := &http.Server{
Addr: c.String("addr"),
Handler: mux,
Expand Down
22 changes: 22 additions & 0 deletions proxyservice/jenkins.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,25 @@ func (j *Jenkins) TriggerDockerhubJob(data *DockerHubWebhookData) error {
params.Set("RawJSON", string(rawJSON))
return j.TriggerJob(path, params)
}

// triggers a jenkins job given
func (j *Jenkins) TriggerTaskclusterJob(taskID string, route string, data *TaskCompletedMessage) error {
if !regexp.MustCompile(`^[A-Za-z0-9_-]{8}[Q-T][A-Za-z0-9_-][CGKOSWaeimquy26-][A-Za-z0-9_-]{10}[AQgw]$`).MatchString(taskID) {
return fmt.Errorf("Invalid taskID: %s", taskID)
}
// FIXME: This should probably be split on .
if !regexp.MustCompile(`^[a-zA-Z0-9_\-]{2,255}$`).MatchString(route) {
return fmt.Errorf("Invalid data.Repository.Namespace: %s", route)
}

rawJSON, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("Error marshaling data: %v", err)
}
path := path.Join("/job/taskcluster/job",
route, "job", route)
params := url.Values{}
params.Set("TASK_ID", taskID)
params.Set("RawJSON", string(rawJSON))
return j.TriggerJob(path, params)
}
69 changes: 69 additions & 0 deletions proxyservice/taskcluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package proxyservice

import (
"fmt"
"strings"

"github.com/streadway/amqp"
"github.com/taskcluster/pulse-go/pulse"
"github.com/taskcluster/taskcluster/clients/client-go/v23/tcqueueevents"
)

type TaskCompletedMessage = tcqueueevents.TaskCompletedMessage

type routeTaskCompleted struct {
Route string
}

func (binding routeTaskCompleted) RoutingKey() string {
return fmt.Sprintf("route.%s", binding.Route)
}

func (binding routeTaskCompleted) ExchangeName() string {
return "exchange/taskcluster-queue/v1/task-completed"
}

func (binding routeTaskCompleted) NewPayloadObject() interface{} {
return new(tcqueueevents.TaskCompletedMessage)
}

type TaskclusterPulseHandler struct {
Jenkins *Jenkins
Pulse *pulse.Connection
PulseRoutePrefix string
}

func NewTaskclusterPulseHandler(jenkins *Jenkins, pulse *pulse.Connection, routePrefix string) *TaskclusterPulseHandler {
return &TaskclusterPulseHandler{
Jenkins: jenkins,
Pulse: pulse,
PulseRoutePrefix: routePrefix,
}
}

func (handler *TaskclusterPulseHandler) handleMessage(message interface{}, delivery amqp.Delivery) {
routingKeyPrefix := "route." + handler.PulseRoutePrefix
switch t := message.(type) {
case *tcqueueevents.TaskCompletedMessage:
if strings.HasPrefix(delivery.RoutingKey, routingKeyPrefix) {
route := strings.TrimPrefix(delivery.RoutingKey, routingKeyPrefix)
if err := handler.Jenkins.TriggerTaskclusterJob(t.Status.TaskID, route, t); err != nil {
log.Printf("Error triggering taskcluster job: %s", err)
}
}
}
delivery.Ack(false) // acknowledge message *after* processing

}

func (handler *TaskclusterPulseHandler) Consume() error {
routingKeyPrefix := "route." + handler.PulseRoutePrefix
_, err := handler.Pulse.Consume(
"", // queue name
handler.handleMessage,
1, // prefetch 1 message at a time
false, // don't autoacknowledge messages
routeTaskCompleted{Route:routingKeyPrefix+".#"},
)
return err
}

0 comments on commit ab8dc22

Please sign in to comment.