From 589d876e4311adda8d8c6b2bed1540c72439a103 Mon Sep 17 00:00:00 2001 From: ochom Date: Wed, 23 Oct 2024 18:54:44 +0300 Subject: [PATCH] publish with correct routing key --- pubsub/publisher.go | 8 ++-- quickmq/client.go | 94 ---------------------------------------- quickmq/client_test.go | 97 ------------------------------------------ quickmq/quickmq.go | 33 -------------- 4 files changed, 4 insertions(+), 228 deletions(-) delete mode 100644 quickmq/client.go delete mode 100644 quickmq/client_test.go delete mode 100644 quickmq/quickmq.go diff --git a/pubsub/publisher.go b/pubsub/publisher.go index 17df93b..0b7e4df 100644 --- a/pubsub/publisher.go +++ b/pubsub/publisher.go @@ -125,10 +125,10 @@ func (p *publisher) publish(body []byte, delay time.Duration) error { // publish message to exchange err = channel.Publish( - p.exchange, // exchange - "", // routing key - true, // mandatory - false, // immediate + p.exchange, // exchange + p.routingKey, // routing key + true, // mandatory + false, // immediate amqp.Publishing{ ContentType: "application/json", Body: body, diff --git a/quickmq/client.go b/quickmq/client.go deleted file mode 100644 index c7be3c6..0000000 --- a/quickmq/client.go +++ /dev/null @@ -1,94 +0,0 @@ -package quickmq - -import ( - "bytes" - "encoding/base64" - "fmt" - "time" - - "github.com/ochom/gutils/gttp" - "github.com/ochom/gutils/helpers" - "github.com/r3labs/sse/v2" -) - -// Client ... -type Client struct { - url string - username string - password string - queue string -} - -// NewClient creates a new quickmq client -func NewClient(quickUrl, queue string) *Client { - url, username, password, err := parseUrl(quickUrl) - if err != nil { - panic(err) - } - - return &Client{url: url, username: username, password: password, queue: queue} -} - -// publish ... -func (p *Client) publish(body []byte, delay time.Duration) error { - message := map[string]any{ - "body": string(body), - "delay": delay, - "queue": p.queue, - } - - headers := map[string]string{ - "Content-Type": "application/json", - "Authorization": "Basic " + base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", p.username, p.password))), - } - - url := fmt.Sprintf("%s/publish", p.url) - res, err := gttp.Post(url, headers, helpers.ToBytes(message), time.Minute) - if err != nil { - return fmt.Errorf("failed to publish message: %s", err.Error()) - } - - if res.Status != 200 { - return fmt.Errorf("failed to publish message: %s", string(res.Body)) - } - - return nil -} - -// PublishWithDelay ... -func (p *Client) PublishWithDelay(body []byte, delay time.Duration) error { - return p.publish(body, delay) -} - -// Publish ... -func (p *Client) Publish(body []byte) error { - return p.publish(body, 0) -} - -// Consume consume messages from the channels -func (c *Client) Consume(stop chan bool, workerFunc func([]byte)) error { - events := make(chan *sse.Event) - url := fmt.Sprintf("%s/subscribe?queue=%s", c.url, c.queue) - - client := sse.NewClient(url, func(sseClient *sse.Client) { - headers := map[string]string{ - "Authorization": "Basic " + base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", c.username, c.password))), - } - sseClient.Headers = headers - }) - if err := client.SubscribeChanRaw(events); err != nil { - return err - } - - for { - select { - case <-stop: - return fmt.Errorf("stop signal received") - case message := <-events: - if bytes.Equal(message.Data, []byte(`{}`)) { - continue - } - workerFunc(message.Data) - } - } -} diff --git a/quickmq/client_test.go b/quickmq/client_test.go deleted file mode 100644 index 76a137a..0000000 --- a/quickmq/client_test.go +++ /dev/null @@ -1,97 +0,0 @@ -package quickmq - -import ( - "testing" - "time" -) - -func Test_publisher_Publish(t *testing.T) { - type fields struct { - url string - queue string - } - type args struct { - body []byte - } - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - { - name: "test 1", - fields: fields{ - url: "quick://admin:admin@localhost:16321", - queue: "TEST", - }, - args: args{ - body: []byte("test"), - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - client := NewClient(tt.fields.url, tt.fields.queue) - if err := client.Publish(tt.args.body); (err != nil) != tt.wantErr { - t.Errorf("publisher.Publish() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} - -func TestClient_Consume(t *testing.T) { - type fields struct { - url string - queue string - } - type args struct { - stop chan bool - workerFunc func([]byte) - } - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - { - name: "test 1", - fields: fields{ - url: "quick://admin:admin@localhost:16321", - queue: "TEST", - }, - args: args{ - stop: make(chan bool), - workerFunc: func(body []byte) { - t.Logf("received message: %v", string(body)) - }, - }, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // wait 10 seconds and stop consuming - go func() { - time.Sleep(10 * time.Second) - tt.args.stop <- true - }() - c := NewClient(tt.fields.url, tt.fields.queue) - if err := c.Publish([]byte("test")); err != nil { - t.Errorf("Client.Publish() error = %v", err) - } - if err := c.Publish([]byte("test")); err != nil { - t.Errorf("Client.Publish() error = %v", err) - } - if err := c.Publish([]byte("test")); err != nil { - t.Errorf("Client.Publish() error = %v", err) - } - - if err := c.Consume(tt.args.stop, tt.args.workerFunc); (err != nil) != tt.wantErr { - t.Errorf("Client.Consume() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} diff --git a/quickmq/quickmq.go b/quickmq/quickmq.go deleted file mode 100644 index a8551e9..0000000 --- a/quickmq/quickmq.go +++ /dev/null @@ -1,33 +0,0 @@ -package quickmq - -import ( - "fmt" - "net" - "net/url" -) - -// quickUrl should be of the format quick://username:password@host:port -func parseUrl(quickUrl string) (newUrl, username, password string, err error) { - u, err := url.Parse(quickUrl) - if err != nil { - return "", "", "", err - } - - host, port, err := net.SplitHostPort(u.Host) - if err != nil { - return "", "", "", err - } - - username = u.User.Username() - if username == "" { - username = "admin" - } - - password, ok := u.User.Password() - if !ok { - password = "admin" - } - - newUrl = fmt.Sprintf("http://%s:%s", host, port) - return newUrl, username, password, nil -}