Skip to content

Commit

Permalink
Add hgmo pulse support.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomprince committed Mar 25, 2020
1 parent ddec9c0 commit bb31ceb
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 0 deletions.
23 changes: 23 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ func main() {
Value: "",
EnvVar: "PULSE_HOST",
},
cli.StringSliceFlag{
Name: "hgmo-repo",
Usage: "hg.mozilla.org repo to listend to pushes from (can be used multiple times)",
Value: &cli.StringSlice{"ci/ci-admin", "ci/ci-configuration"},
EnvVar: "HGMO_REPO",
},
cli.StringFlag{
Name: "hgmo-pulse-queue",
Usage: "Pulse quue to listen to.",
Value: "hgmo",
EnvVar: "HGMO_PULSE_QUEUE",
},
}

app.Action = func(c *cli.Context) error {
Expand Down Expand Up @@ -98,6 +110,17 @@ func main() {
c.String("pulse-password"),
c.String("pulse-host"),
)

hgmoPulseHandler := proxyservice.NewHgmoPulseHandler(
jenkins,
&pulse,
c.String("hgmo-pulse-queue"),
c.StringSlice("hgmo-repo")...,
)

if err := hgmoPulseHandler.Consume(); err != nil {
return cli.NewExitError(fmt.Sprintf("Could not listen to hgmo pulse: %v", err), 1)
}
}

server := &http.Server{
Expand Down
122 changes: 122 additions & 0 deletions proxyservice/hgmo_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package proxyservice

import (
"fmt"
"log"
"encoding/json"

"github.com/streadway/amqp"
"github.com/taskcluster/pulse-go/pulse"
)


// https://mozilla-version-control-tools.readthedocs.io/en/latest/hgmo/notifications.html#pulse-notifications
type HgMessage struct {
Type string
Data interface{}
}

type changegroupMessage struct {
RepoUrl string `json:"repo_url"`
Heads []string `json:"heads"`
PushlogPushes []struct{
pushid int
time int
user string
push_json_url string
push_full_json_url string
} `json:"pushlog_pushes"`
Source string `json:"Source"`
}


func (msg *HgMessage) UnmarshalJSON(b []byte) error {
var raw struct {
Type string
Data json.RawMessage
}
if err := json.Unmarshal(b, &raw); err != nil {
return err
}
msg.Type = raw.Type
if raw.Type == "changegroup.1" {
var data changegroupMessage
if err := json.Unmarshal(b, &data); err != nil {
return err
}
msg.Data = data
return nil
}
return fmt.Errorf("Unknown hg message type %s", msg.Type)
}

type hgPushBinding struct {
Repository string
}

func (binding hgPushBinding) RoutingKey() string {
return fmt.Sprintf("route.%s", binding.Repository)
}

func (binding hgPushBinding) ExchangeName() string {
return "exchange/hgpushes/v2"
}

func (binding hgPushBinding) NewPayloadObject() interface{} {
return new(HgMessage)
}

type HgmoPulseHandler struct {
Jenkins *Jenkins
Pulse *pulse.Connection
QueueName string
ValidHgRepos map[string]bool
}

func NewHgmoPulseHandler(jenkins *Jenkins, pulse *pulse.Connection, queueName string, hgRepos ...string) *HgmoPulseHandler {
validHgRepos := make(map[string]bool)
for _, hgRepo := range hgRepos {
validHgRepos[hgRepo] = true
}
return &HgmoPulseHandler{
Jenkins: jenkins,
Pulse: pulse,
QueueName: queueName,
ValidHgRepos: validHgRepos,
}
}

func (handler *HgmoPulseHandler) handleMessage(message interface{}, delivery amqp.Delivery) {
switch t := message.(type) {
case *HgMessage:
switch data := t.Data.(type) {
case *changegroupMessage:
if len(data.Heads) != 1 {
log.Printf("Message %s has %d heads, only 1 supported", t, len(data.Heads))
}
if len(data.PushlogPushes) != 1 {
log.Printf("Message %s has %d pushlog pushes, only 1 supported", t, len(data.PushlogPushes))
}
if err := handler.Jenkins.TriggerHgJob(data.RepoUrl, data.Heads[0], t); err != nil {
log.Printf("Error triggering hg.mozilla.org job: %s", err)
}
}
}
delivery.Ack(false) // acknowledge message *after* processing
}

func (handler *HgmoPulseHandler) Consume() error {
bindings := make([]pulse.Binding, 0)
for validHgRepo := range handler.ValidHgRepos {
bindings = append(bindings, hgPushBinding{Repository: validHgRepo})
}
_, err := handler.Pulse.Consume(
handler.QueueName,
handler.handleMessage,
1, // prefetch 1 message at a time
false, // don't autoacknowledge messages
bindings...
)
return err
}

13 changes: 13 additions & 0 deletions proxyservice/jenkins.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,16 @@ func (j *Jenkins) TriggerDockerhubJob(data *DockerHubWebhookData) error {
params.Set("RawJSON", string(rawJSON))
return j.TriggerJob(path, params)
}

func (j *Jenkins) TriggerHgJob(repoUrl string, head string, data *HgMessage) error {
rawJSON, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("Error marshaling data: %v", err)
}
path := path.Join("/job/hgmo/job", repoUrl)
params := url.Values{}
params.Set("HEAD_REPOSITORY", repoUrl)
params.Set("HEAD_REV", head)
params.Set("RawJSON", string(rawJSON))
return j.TriggerJob(path, params)
}

0 comments on commit bb31ceb

Please sign in to comment.