From bb31ceb313f450629ada8bdfc1d2dbb3fb330523 Mon Sep 17 00:00:00 2001 From: Tom Prince Date: Tue, 24 Mar 2020 18:46:52 -0600 Subject: [PATCH] Add hgmo pulse support. --- main.go | 23 +++++++ proxyservice/hgmo_handler.go | 122 +++++++++++++++++++++++++++++++++++ proxyservice/jenkins.go | 13 ++++ 3 files changed, 158 insertions(+) create mode 100644 proxyservice/hgmo_handler.go diff --git a/main.go b/main.go index 75ef248..7766cf2 100644 --- a/main.go +++ b/main.go @@ -65,6 +65,18 @@ func main() { Value: "", EnvVar: "PULSE_HOST", }, + cli.StringSliceFlag{ + Name: "hgmo-repo", + Usage: "hg.mozilla.org repo to listend to pushes from (can be used multiple times)", + Value: &cli.StringSlice{"ci/ci-admin", "ci/ci-configuration"}, + EnvVar: "HGMO_REPO", + }, + cli.StringFlag{ + Name: "hgmo-pulse-queue", + Usage: "Pulse quue to listen to.", + Value: "hgmo", + EnvVar: "HGMO_PULSE_QUEUE", + }, } app.Action = func(c *cli.Context) error { @@ -98,6 +110,17 @@ func main() { c.String("pulse-password"), c.String("pulse-host"), ) + + hgmoPulseHandler := proxyservice.NewHgmoPulseHandler( + jenkins, + &pulse, + c.String("hgmo-pulse-queue"), + c.StringSlice("hgmo-repo")..., + ) + + if err := hgmoPulseHandler.Consume(); err != nil { + return cli.NewExitError(fmt.Sprintf("Could not listen to hgmo pulse: %v", err), 1) + } } server := &http.Server{ diff --git a/proxyservice/hgmo_handler.go b/proxyservice/hgmo_handler.go new file mode 100644 index 0000000..5685b65 --- /dev/null +++ b/proxyservice/hgmo_handler.go @@ -0,0 +1,122 @@ +package proxyservice + +import ( + "fmt" + "log" + "encoding/json" + + "github.com/streadway/amqp" + "github.com/taskcluster/pulse-go/pulse" +) + + +// https://mozilla-version-control-tools.readthedocs.io/en/latest/hgmo/notifications.html#pulse-notifications +type HgMessage struct { + Type string + Data interface{} +} + +type changegroupMessage struct { + RepoUrl string `json:"repo_url"` + Heads []string `json:"heads"` + PushlogPushes []struct{ + pushid int + time int + user string + push_json_url string + push_full_json_url string + } `json:"pushlog_pushes"` + Source string `json:"Source"` +} + + +func (msg *HgMessage) UnmarshalJSON(b []byte) error { + var raw struct { + Type string + Data json.RawMessage + } + if err := json.Unmarshal(b, &raw); err != nil { + return err + } + msg.Type = raw.Type + if raw.Type == "changegroup.1" { + var data changegroupMessage + if err := json.Unmarshal(b, &data); err != nil { + return err + } + msg.Data = data + return nil + } + return fmt.Errorf("Unknown hg message type %s", msg.Type) +} + +type hgPushBinding struct { + Repository string +} + +func (binding hgPushBinding) RoutingKey() string { + return fmt.Sprintf("route.%s", binding.Repository) +} + +func (binding hgPushBinding) ExchangeName() string { + return "exchange/hgpushes/v2" +} + +func (binding hgPushBinding) NewPayloadObject() interface{} { + return new(HgMessage) +} + +type HgmoPulseHandler struct { + Jenkins *Jenkins + Pulse *pulse.Connection + QueueName string + ValidHgRepos map[string]bool +} + +func NewHgmoPulseHandler(jenkins *Jenkins, pulse *pulse.Connection, queueName string, hgRepos ...string) *HgmoPulseHandler { + validHgRepos := make(map[string]bool) + for _, hgRepo := range hgRepos { + validHgRepos[hgRepo] = true + } + return &HgmoPulseHandler{ + Jenkins: jenkins, + Pulse: pulse, + QueueName: queueName, + ValidHgRepos: validHgRepos, + } +} + +func (handler *HgmoPulseHandler) handleMessage(message interface{}, delivery amqp.Delivery) { + switch t := message.(type) { + case *HgMessage: + switch data := t.Data.(type) { + case *changegroupMessage: + if len(data.Heads) != 1 { + log.Printf("Message %s has %d heads, only 1 supported", t, len(data.Heads)) + } + if len(data.PushlogPushes) != 1 { + log.Printf("Message %s has %d pushlog pushes, only 1 supported", t, len(data.PushlogPushes)) + } + if err := handler.Jenkins.TriggerHgJob(data.RepoUrl, data.Heads[0], t); err != nil { + log.Printf("Error triggering hg.mozilla.org job: %s", err) + } + } + } + delivery.Ack(false) // acknowledge message *after* processing +} + +func (handler *HgmoPulseHandler) Consume() error { + bindings := make([]pulse.Binding, 0) + for validHgRepo := range handler.ValidHgRepos { + bindings = append(bindings, hgPushBinding{Repository: validHgRepo}) + } + _, err := handler.Pulse.Consume( + handler.QueueName, + handler.handleMessage, + 1, // prefetch 1 message at a time + false, // don't autoacknowledge messages + bindings... + ) + return err +} + diff --git a/proxyservice/jenkins.go b/proxyservice/jenkins.go index 018c985..c12656a 100644 --- a/proxyservice/jenkins.go +++ b/proxyservice/jenkins.go @@ -116,3 +116,16 @@ func (j *Jenkins) TriggerDockerhubJob(data *DockerHubWebhookData) error { params.Set("RawJSON", string(rawJSON)) return j.TriggerJob(path, params) } + +func (j *Jenkins) TriggerHgJob(repoUrl string, head string, data *HgMessage) error { + rawJSON, err := json.Marshal(data) + if err != nil { + return fmt.Errorf("Error marshaling data: %v", err) + } + path := path.Join("/job/hgmo/job", repoUrl) + params := url.Values{} + params.Set("HEAD_REPOSITORY", repoUrl) + params.Set("HEAD_REV", head) + params.Set("RawJSON", string(rawJSON)) + return j.TriggerJob(path, params) +}