forked from apache/cassandra-gocql-driver
-
Notifications
You must be signed in to change notification settings - Fork 59
/
cluster.go
528 lines (433 loc) · 19.3 KB
/
cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
// Copyright (c) 2012 The gocql Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package gocql
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"net"
"sync/atomic"
"time"
)
const defaultDriverName = "ScyllaDB GoCQL Driver"
// PoolConfig configures the connection pool used by the driver, it defaults to
// using a round-robin host selection policy and a round-robin connection selection
// policy for each host.
type PoolConfig struct {
// HostSelectionPolicy sets the policy for selecting which host to use for a
// given query (default: RoundRobinHostPolicy())
// It is not supported to use a single HostSelectionPolicy in multiple sessions
// (even if you close the old session before using in a new session).
HostSelectionPolicy HostSelectionPolicy
}
func (p PoolConfig) buildPool(session *Session) *policyConnPool {
return newPolicyConnPool(session)
}
// ClusterConfig is a struct to configure the default cluster implementation
// of gocql. It has a variety of attributes that can be used to modify the
// behavior to fit the most common use cases. Applications that require a
// different setup must implement their own cluster.
type ClusterConfig struct {
// addresses for the initial connections. It is recommended to use the value set in
// the Cassandra config for broadcast_address or listen_address, an IP address not
// a domain name. This is because events from Cassandra will use the configured IP
// address, which is used to index connected hosts. If the domain name specified
// resolves to more than 1 IP address then the driver may connect multiple times to
// the same host, and will not mark the node being down or up from events.
Hosts []string
// CQL version (default: 3.0.0)
CQLVersion string
// ProtoVersion sets the version of the native protocol to use, this will
// enable features in the driver for specific protocol versions, generally this
// should be set to a known version (2,3,4) for the cluster being connected to.
//
// If it is 0 or unset (the default) then the driver will attempt to discover the
// highest supported protocol for the cluster. In clusters with nodes of different
// versions the protocol selected is not defined (ie, it can be any of the supported in the cluster)
ProtoVersion int
// Timeout limits the time spent on the client side while executing a query.
// Specifically, query or batch execution will return an error if the client does not receive a response
// from the server within the Timeout period.
// Timeout is also used to configure the read timeout on the underlying network connection.
// Client Timeout should always be higher than the request timeouts configured on the server,
// so that retries don't overload the server.
// Timeout has a default value of 11 seconds, which is higher than default server timeout for most query types.
// Timeout is not applied to requests during initial connection setup, see ConnectTimeout.
Timeout time.Duration
// ConnectTimeout limits the time spent during connection setup.
// During initial connection setup, internal queries, AUTH requests will return an error if the client
// does not receive a response within the ConnectTimeout period.
// ConnectTimeout is applied to the connection setup queries independently.
// ConnectTimeout also limits the duration of dialing a new TCP connection
// in case there is no Dialer nor HostDialer configured.
// ConnectTimeout has a default value of 11 seconds.
ConnectTimeout time.Duration
// WriteTimeout limits the time the driver waits to write a request to a network connection.
// WriteTimeout should be lower than or equal to Timeout.
// WriteTimeout defaults to the value of Timeout.
WriteTimeout time.Duration
// Port used when dialing.
// Default: 9042
Port int
// Initial keyspace. Optional.
Keyspace string
// Number of connections per host.
// Default: 2
NumConns int
// Maximum number of inflight requests allowed per connection.
// Default: 32768 for CQL v3 and newer
// Default: 128 for older CQL versions
MaxRequestsPerConn int
// Default consistency level.
// Default: Quorum
Consistency Consistency
// Compression algorithm.
// Default: nil
Compressor Compressor
// Default: nil
Authenticator Authenticator
// An Authenticator factory. Can be used to create alternative authenticators.
// Default: nil
AuthProvider func(h *HostInfo) (Authenticator, error)
// Default retry policy to use for queries.
// Default: no retries.
RetryPolicy RetryPolicy
// ConvictionPolicy decides whether to mark host as down based on the error and host info.
// Default: SimpleConvictionPolicy
ConvictionPolicy ConvictionPolicy
// Default reconnection policy to use for reconnecting before trying to mark host as down.
ReconnectionPolicy ReconnectionPolicy
// A reconnection policy to use for reconnecting when connecting to the cluster first time.
InitialReconnectionPolicy ReconnectionPolicy
// The keepalive period to use, enabled if > 0 (default: 15 seconds)
// SocketKeepalive is used to set up the default dialer and is ignored if Dialer or HostDialer is provided.
SocketKeepalive time.Duration
// Maximum cache size for prepared statements globally for gocql.
// Default: 1000
MaxPreparedStmts int
// Maximum cache size for query info about statements for each session.
// Default: 1000
MaxRoutingKeyInfo int
// Default page size to use for created sessions.
// Default: 5000
PageSize int
// Consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL.
// Default: unset
SerialConsistency SerialConsistency
// SslOpts configures TLS use when HostDialer is not set.
// SslOpts is ignored if HostDialer is set.
SslOpts *SslOptions
actualSslOpts atomic.Value
// Sends a client side timestamp for all requests which overrides the timestamp at which it arrives at the server.
// Default: true, only enabled for protocol 3 and above.
DefaultTimestamp bool
// The name of the driver that is going to be reported to the server.
// Default: "ScyllaDB GoLang Driver"
DriverName string
// The version of the driver that is going to be reported to the server.
// Defaulted to current library version
DriverVersion string
// PoolConfig configures the underlying connection pool, allowing the
// configuration of host selection and connection selection policies.
PoolConfig PoolConfig
// If not zero, gocql attempt to reconnect known DOWN nodes in every ReconnectInterval.
ReconnectInterval time.Duration
// The maximum amount of time to wait for schema agreement in a cluster after
// receiving a schema change frame. (default: 60s)
MaxWaitSchemaAgreement time.Duration
// HostFilter will filter all incoming events for host, any which don't pass
// the filter will be ignored. If set will take precedence over any options set
// via Discovery
HostFilter HostFilter
// AddressTranslator will translate addresses found on peer discovery and/or
// node change events.
AddressTranslator AddressTranslator
// If IgnorePeerAddr is true and the address in system.peers does not match
// the supplied host by either initial hosts or discovered via events then the
// host will be replaced with the supplied address.
//
// For example if an event comes in with host=10.0.0.1 but when looking up that
// address in system.local or system.peers returns 127.0.0.1, the peer will be
// set to 10.0.0.1 which is what will be used to connect to.
IgnorePeerAddr bool
// If DisableInitialHostLookup then the driver will not attempt to get host info
// from the system.peers table, this will mean that the driver will connect to
// hosts supplied and will not attempt to lookup the hosts information, this will
// mean that data_centre, rack and token information will not be available and as
// such host filtering and token aware query routing will not be available.
DisableInitialHostLookup bool
// Configure events the driver will register for
Events struct {
// disable registering for status events (node up/down)
DisableNodeStatusEvents bool
// disable registering for topology events (node added/removed/moved)
DisableTopologyEvents bool
// disable registering for schema events (keyspace/table/function removed/created/updated)
DisableSchemaEvents bool
}
// DisableSkipMetadata will override the internal result metadata cache so that the driver does not
// send skip_metadata for queries, this means that the result will always contain
// the metadata to parse the rows and will not reuse the metadata from the prepared
// statement.
//
// See https://issues.apache.org/jira/browse/CASSANDRA-10786
// See https://github.com/scylladb/scylladb/issues/20860
//
// Default: true
DisableSkipMetadata bool
// QueryObserver will set the provided query observer on all queries created from this session.
// Use it to collect metrics / stats from queries by providing an implementation of QueryObserver.
QueryObserver QueryObserver
// BatchObserver will set the provided batch observer on all queries created from this session.
// Use it to collect metrics / stats from batch queries by providing an implementation of BatchObserver.
BatchObserver BatchObserver
// ConnectObserver will set the provided connect observer on all queries
// created from this session.
ConnectObserver ConnectObserver
// FrameHeaderObserver will set the provided frame header observer on all frames' headers created from this session.
// Use it to collect metrics / stats from frames by providing an implementation of FrameHeaderObserver.
FrameHeaderObserver FrameHeaderObserver
// StreamObserver will be notified of stream state changes.
// This can be used to track in-flight protocol requests and responses.
StreamObserver StreamObserver
// Default idempotence for queries
DefaultIdempotence bool
// The time to wait for frames before flushing the frames connection to Cassandra.
// Can help reduce syscall overhead by making less calls to write. Set to 0 to
// disable.
//
// (default: 200 microseconds)
WriteCoalesceWaitTime time.Duration
// Dialer will be used to establish all connections created for this Cluster.
// If not provided, a default dialer configured with ConnectTimeout will be used.
// Dialer is ignored if HostDialer is provided.
Dialer Dialer
// HostDialer will be used to establish all connections for this Cluster.
// Unlike Dialer, HostDialer is responsible for setting up the entire connection, including the TLS session.
// To support shard-aware port, HostDialer should implement ShardDialer.
// If not provided, Dialer will be used instead.
HostDialer HostDialer
// DisableShardAwarePort will prevent the driver from connecting to Scylla's shard-aware port,
// even if there are nodes in the cluster that support it.
//
// It is generally recommended to leave this option turned off because gocql can use
// the shard-aware port to make the process of establishing more robust.
// However, if you have a cluster with nodes which expose shard-aware port
// but the port is unreachable due to network configuration issues, you can use
// this option to work around the issue. Set it to true only if you neither can fix
// your network nor disable shard-aware port on your nodes.
DisableShardAwarePort bool
// Logger for this ClusterConfig.
// If not specified, defaults to the global gocql.Logger.
Logger StdLogger
// The timeout for the requests to the schema tables. (default: 60s)
MetadataSchemaRequestTimeout time.Duration
// internal config for testing
disableControlConn bool
disableInit bool
}
type Dialer interface {
DialContext(ctx context.Context, network, addr string) (net.Conn, error)
}
// NewCluster generates a new config for the default cluster implementation.
//
// The supplied hosts are used to initially connect to the cluster then the rest of
// the ring will be automatically discovered. It is recommended to use the value set in
// the Cassandra config for broadcast_address or listen_address, an IP address not
// a domain name. This is because events from Cassandra will use the configured IP
// address, which is used to index connected hosts. If the domain name specified
// resolves to more than 1 IP address then the driver may connect multiple times to
// the same host, and will not mark the node being down or up from events.
func NewCluster(hosts ...string) *ClusterConfig {
cfg := &ClusterConfig{
Hosts: hosts,
CQLVersion: "3.0.0",
Timeout: 11 * time.Second,
ConnectTimeout: 11 * time.Second,
Port: 9042,
NumConns: 2,
Consistency: Quorum,
MaxPreparedStmts: defaultMaxPreparedStmts,
MaxRoutingKeyInfo: 1000,
PageSize: 5000,
DefaultTimestamp: true,
DriverName: defaultDriverName,
DriverVersion: defaultDriverVersion,
MaxWaitSchemaAgreement: 60 * time.Second,
ReconnectInterval: 60 * time.Second,
ConvictionPolicy: &SimpleConvictionPolicy{},
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
InitialReconnectionPolicy: &NoReconnectionPolicy{},
SocketKeepalive: 15 * time.Second,
WriteCoalesceWaitTime: 200 * time.Microsecond,
MetadataSchemaRequestTimeout: 60 * time.Second,
DisableSkipMetadata: true,
}
return cfg
}
func (cfg *ClusterConfig) logger() StdLogger {
if cfg.Logger == nil {
return Logger
}
return cfg.Logger
}
// CreateSession initializes the cluster based on this config and returns a
// session object that can be used to interact with the database.
func (cfg *ClusterConfig) CreateSession() (*Session, error) {
return NewSession(*cfg)
}
// translateAddressPort is a helper method that will use the given AddressTranslator
// if defined, to translate the given address and port into a possibly new address
// and port, If no AddressTranslator or if an error occurs, the given address and
// port will be returned.
func (cfg *ClusterConfig) translateAddressPort(addr net.IP, port int) (net.IP, int) {
if cfg.AddressTranslator == nil || len(addr) == 0 {
return addr, port
}
newAddr, newPort := cfg.AddressTranslator.Translate(addr, port)
if gocqlDebug {
cfg.logger().Printf("gocql: translating address '%v:%d' to '%v:%d'", addr, port, newAddr, newPort)
}
return newAddr, newPort
}
func (cfg *ClusterConfig) filterHost(host *HostInfo) bool {
return !(cfg.HostFilter == nil || cfg.HostFilter.Accept(host))
}
func (cfg *ClusterConfig) ValidateAndInitSSL() error {
if cfg.SslOpts == nil {
return nil
}
actualTLSConfig, err := setupTLSConfig(cfg.SslOpts)
if err != nil {
return fmt.Errorf("failed to initialize ssl configuration: %s", err.Error())
}
cfg.actualSslOpts.Store(actualTLSConfig)
return nil
}
func (cfg *ClusterConfig) getActualTLSConfig() *tls.Config {
val, ok := cfg.actualSslOpts.Load().(*tls.Config)
if !ok {
return nil
}
return val.Clone()
}
func (cfg *ClusterConfig) Validate() error {
if len(cfg.Hosts) == 0 {
return ErrNoHosts
}
if cfg.Authenticator != nil && cfg.AuthProvider != nil {
return errors.New("Can't use both Authenticator and AuthProvider in cluster config.")
}
if cfg.InitialReconnectionPolicy == nil {
return errors.New("InitialReconnectionPolicy is nil")
}
if cfg.InitialReconnectionPolicy.GetMaxRetries() <= 0 {
return errors.New("InitialReconnectionPolicy.GetMaxRetries returns negative number")
}
if cfg.ReconnectionPolicy == nil {
return errors.New("ReconnectionPolicy is nil")
}
if cfg.InitialReconnectionPolicy.GetMaxRetries() <= 0 {
return errors.New("ReconnectionPolicy.GetMaxRetries returns negative number")
}
if cfg.PageSize < 0 {
return errors.New("PageSize should be positive number or zero")
}
if cfg.MaxRoutingKeyInfo < 0 {
return errors.New("MaxRoutingKeyInfo should be positive number or zero")
}
if cfg.MaxPreparedStmts < 0 {
return errors.New("MaxPreparedStmts should be positive number or zero")
}
if cfg.SocketKeepalive < 0 {
return errors.New("SocketKeepalive should be positive time.Duration or zero")
}
if cfg.MaxRequestsPerConn < 0 {
return errors.New("MaxRequestsPerConn should be positive number or zero")
}
if cfg.NumConns < 0 {
return errors.New("NumConns should be positive non-zero number or zero")
}
if cfg.Port <= 0 || cfg.Port > 65535 {
return errors.New("Port should be a valid port number: a number between 1 and 65535")
}
if cfg.WriteTimeout < 0 {
return errors.New("WriteTimeout should be positive time.Duration or zero")
}
if cfg.Timeout < 0 {
return errors.New("Timeout should be positive time.Duration or zero")
}
if cfg.ConnectTimeout < 0 {
return errors.New("ConnectTimeout should be positive time.Duration or zero")
}
if cfg.MetadataSchemaRequestTimeout < 0 {
return errors.New("MetadataSchemaRequestTimeout should be positive time.Duration or zero")
}
if cfg.WriteCoalesceWaitTime < 0 {
return errors.New("WriteCoalesceWaitTime should be positive time.Duration or zero")
}
if cfg.ReconnectInterval < 0 {
return errors.New("ReconnectInterval should be positive time.Duration or zero")
}
if cfg.MaxWaitSchemaAgreement < 0 {
return errors.New("MaxWaitSchemaAgreement should be positive time.Duration or zero")
}
if cfg.ProtoVersion < 0 {
return errors.New("ProtoVersion should be positive number or zero")
}
if !cfg.DisableSkipMetadata {
Logger.Println("warning: enabling skipping metadata can lead to unpredictible results when executing query and altering columns involved in the query.")
}
return cfg.ValidateAndInitSSL()
}
var (
ErrNoHosts = errors.New("no hosts provided")
ErrNoConnectionsStarted = errors.New("no connections were made when creating the session")
ErrHostQueryFailed = errors.New("unable to populate Hosts")
)
func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
// Config.InsecureSkipVerify | EnableHostVerification | Result
// Config is nil | true | verify host
// Config is nil | false | do not verify host
// false | false | verify host
// true | false | do not verify host
// false | true | verify host
// true | true | verify host
var tlsConfig *tls.Config
if sslOpts.Config == nil {
tlsConfig = &tls.Config{
InsecureSkipVerify: !sslOpts.EnableHostVerification,
}
} else {
// use clone to avoid race.
tlsConfig = sslOpts.Config.Clone()
}
if tlsConfig.InsecureSkipVerify && sslOpts.EnableHostVerification {
tlsConfig.InsecureSkipVerify = false
}
// ca cert is optional
if sslOpts.CaPath != "" {
if tlsConfig.RootCAs == nil {
tlsConfig.RootCAs = x509.NewCertPool()
}
pem, err := ioutil.ReadFile(sslOpts.CaPath)
if err != nil {
return nil, fmt.Errorf("unable to open CA certs: %v", err)
}
if !tlsConfig.RootCAs.AppendCertsFromPEM(pem) {
return nil, errors.New("failed parsing or CA certs")
}
}
if sslOpts.CertPath != "" || sslOpts.KeyPath != "" {
mycert, err := tls.LoadX509KeyPair(sslOpts.CertPath, sslOpts.KeyPath)
if err != nil {
return nil, fmt.Errorf("unable to load X509 key pair: %v", err)
}
tlsConfig.Certificates = append(tlsConfig.Certificates, mycert)
}
return tlsConfig, nil
}