This repository has been archived by the owner on Oct 17, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 215
/
client.go
151 lines (136 loc) · 3.41 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package rabbitmq
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"os"
"path/filepath"
"github.com/compose/transporter/client"
"github.com/streadway/amqp"
)
const (
// DefaultURI is the default endpoint of RabbitMQ on the local machine.
// Primarily used when initializing a new Client without a specific URI.
DefaultURI = "amqp://guest:guest@localhost:5672/"
)
var (
_ client.Client = &Client{}
)
// ClientOptionFunc is a function that configures a Client.
// It is used in NewClient.
type ClientOptionFunc func(*Client) error
// Client wraps the underlying connection to a RabbitMQ cluster.
type Client struct {
uri string
tlsConfig *tls.Config
conn *amqp.Connection
ch *amqp.Channel
}
// NewClient creates a new client to work with RabbitMQ.
//
// The caller can configure the new client by passing configuration options
// to the func.
//
// Example:
//
// client, err := NewClient(
// WithURI("mongodb://localhost:27017"))
//
// If no URI is configured, it uses DefaultURI.
//
// An error is also returned when a configuration option is invalid
func NewClient(options ...ClientOptionFunc) (*Client, error) {
// Set up the client
c := &Client{
uri: DefaultURI,
tlsConfig: nil,
}
// Run the options on it
for _, option := range options {
if err := option(c); err != nil {
return nil, err
}
}
return c, nil
}
// WithURI defines the full connection string for the RabbitMQ connection
func WithURI(uri string) ClientOptionFunc {
return func(c *Client) error {
if _, err := amqp.ParseURI(uri); err != nil {
return client.InvalidURIError{URI: uri, Err: err.Error()}
}
c.uri = uri
return nil
}
}
// WithSSL configures the database connection to connect via TLS.
func WithSSL(ssl bool) ClientOptionFunc {
return func(c *Client) error {
if ssl {
tlsConfig := &tls.Config{InsecureSkipVerify: true}
tlsConfig.RootCAs = x509.NewCertPool()
c.tlsConfig = tlsConfig
}
return nil
}
}
// WithCACerts configures the RootCAs for the underlying TLS connection
func WithCACerts(certs []string) ClientOptionFunc {
return func(c *Client) error {
if len(certs) > 0 {
roots := x509.NewCertPool()
for _, cert := range certs {
if _, err := os.Stat(cert); err == nil {
filepath.Abs(cert)
c, err := ioutil.ReadFile(cert)
if err != nil {
return err
}
cert = string(c)
}
if ok := roots.AppendCertsFromPEM([]byte(cert)); !ok {
return client.ErrInvalidCert
}
}
if c.tlsConfig != nil {
c.tlsConfig.RootCAs = roots
} else {
c.tlsConfig = &tls.Config{RootCAs: roots}
}
c.tlsConfig.InsecureSkipVerify = false
}
return nil
}
}
// Connect satisfies the client.Client interface.
func (c *Client) Connect() (client.Session, error) {
if c.conn == nil {
if err := c.initConnection(); err != nil {
return nil, err
}
}
return &Session{c.conn, c.ch}, nil
}
func (c *Client) initConnection() error {
if c.tlsConfig != nil {
conn, err := amqp.DialTLS(c.uri, c.tlsConfig)
if err != nil {
return client.ConnectError{Reason: err.Error()}
}
c.conn = conn
return nil
}
conn, err := amqp.Dial(c.uri)
if err != nil {
return client.ConnectError{Reason: err.Error()}
}
if c.ch, err = conn.Channel(); err != nil {
return client.ConnectError{Reason: err.Error()}
}
c.conn = conn
return nil
}
// Close implements necessary calls to cleanup the underlying connection.
func (c *Client) Close() {
c.conn.Close()
}