Skip to content

Commit

Permalink
input/sqlserver: Add service and save connection pools (influxdata#8596)
Browse files Browse the repository at this point in the history
  • Loading branch information
mjiderhamn authored Mar 29, 2021
1 parent e6165ec commit 871447b
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 54 deletions.
1 change: 1 addition & 0 deletions docs/LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ following works:
- github.com/gofrs/uuid [MIT License](https://github.com/gofrs/uuid/blob/master/LICENSE)
- github.com/gogo/googleapis [Apache License 2.0](https://github.com/gogo/googleapis/blob/master/LICENSE)
- github.com/gogo/protobuf [BSD 3-Clause Clear License](https://github.com/gogo/protobuf/blob/master/LICENSE)
- github.com/golang-sql/civil [Apache License 2.0](https://github.com/golang-sql/civil/blob/master/LICENSE)
- github.com/golang/geo [Apache License 2.0](https://github.com/golang/geo/blob/master/LICENSE)
- github.com/golang/groupcache [Apache License 2.0](https://github.com/golang/groupcache/blob/master/LICENSE)
- github.com/golang/protobuf [BSD 3-Clause "New" or "Revised" License](https://github.com/golang/protobuf/blob/master/LICENSE)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/couchbase/go-couchbase v0.0.0-20180501122049-16db1f1fe037
github.com/couchbase/gomemcached v0.0.0-20180502221210-0da75df14530 // indirect
github.com/couchbase/goutils v0.0.0-20180530154633-e865a1461c8a // indirect
github.com/denisenkom/go-mssqldb v0.0.0-20190707035753-2be1aa521ff4
github.com/denisenkom/go-mssqldb v0.9.0
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1
github.com/dimchansky/utfbom v1.1.1
github.com/docker/docker v17.12.0-ce-rc1.0.20200916142827-bd33bbf0497b+incompatible
Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.37.4/go.mod h1:NHPJ89PdicEuT9hdPXMROBD91xc5uRDxsMtSB16k7hw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
cloud.google.com/go v0.43.0/go.mod h1:BOSR3VbTLkk6FDC/TcffxP4NF/FFBGA5ku+jvKOP7pg=
cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU=
Expand Down Expand Up @@ -260,8 +259,8 @@ github.com/dave/jennifer v1.2.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhr
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/denisenkom/go-mssqldb v0.0.0-20190707035753-2be1aa521ff4 h1:YcpmyvADGYw5LqMnHqSkyIELsHCGF6PkrmM31V8rF7o=
github.com/denisenkom/go-mssqldb v0.0.0-20190707035753-2be1aa521ff4/go.mod h1:zAg7JM8CkOJ43xKXIj7eRO9kmWm/TW578qo+oDO6tuM=
github.com/denisenkom/go-mssqldb v0.9.0 h1:RSohk2RsiZqLZ0zCjtfn3S4Gp4exhpBWHyQ7D0yGjAk=
github.com/denisenkom/go-mssqldb v0.9.0/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
github.com/devigned/tab v0.1.1 h1:3mD6Kb1mUOYeLpJvTVSDwSg5ZsfSxfvxGRTxRsJsITA=
github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
Expand Down Expand Up @@ -483,6 +482,8 @@ github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/golang/geo v0.0.0-20190916061304-5b978397cfec h1:lJwO/92dFXWeXOZdoGXgptLmNLwynMSHUmU6besqtiw=
github.com/golang/geo v0.0.0-20190916061304-5b978397cfec/go.mod h1:QZ0nwyI2jOfgRAoBvP+ab5aRr7c9x7lhGEJrKvBwjWI=
Expand Down Expand Up @@ -1317,6 +1318,7 @@ golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191128015809-6d18c012aee9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down Expand Up @@ -1345,6 +1347,7 @@ golang.org/x/sys v0.0.0-20201112073958-5cba982894dd/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down Expand Up @@ -1462,7 +1465,6 @@ google.golang.org/appengine v1.6.6 h1:lMO5rYAqUxkmaj76jAkRUvt5JZgFymx/+Q5Mzfivuh
google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190404172233-64821d5d2107/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
Expand Down
73 changes: 44 additions & 29 deletions plugins/inputs/sqlserver/sqlserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import (

// SQLServer struct
type SQLServer struct {
Servers []string `toml:"servers"`
QueryVersion int `toml:"query_version"`
AzureDB bool `toml:"azuredb"`
DatabaseType string `toml:"database_type"`
IncludeQuery []string `toml:"include_query"`
ExcludeQuery []string `toml:"exclude_query"`
HealthMetric bool `toml:"health_metric"`
queries MapQuery
isInitialized bool
Servers []string `toml:"servers"`
QueryVersion int `toml:"query_version"`
AzureDB bool `toml:"azuredb"`
DatabaseType string `toml:"database_type"`
IncludeQuery []string `toml:"include_query"`
ExcludeQuery []string `toml:"exclude_query"`
HealthMetric bool `toml:"health_metric"`
pools []*sql.DB
queries MapQuery
}

// Query struct
Expand Down Expand Up @@ -223,8 +223,6 @@ func initQueries(s *SQLServer) error {
}
}

// Set a flag so we know that queries have already been initialized
s.isInitialized = true
var querylist []string
for query := range queries {
querylist = append(querylist, query)
Expand All @@ -236,32 +234,25 @@ func initQueries(s *SQLServer) error {

// Gather collect data from SQL Server
func (s *SQLServer) Gather(acc telegraf.Accumulator) error {
if !s.isInitialized {
if err := initQueries(s); err != nil {
acc.AddError(err)
return err
}
}

var wg sync.WaitGroup
var mutex sync.Mutex
var healthMetrics = make(map[string]*HealthMetric)

for _, serv := range s.Servers {
for i, pool := range s.pools {
for _, query := range s.queries {
wg.Add(1)
go func(serv string, query Query) {
go func(pool *sql.DB, query Query, serverIndex int) {
defer wg.Done()
queryError := s.gatherServer(serv, query, acc)
queryError := s.gatherServer(pool, query, acc)

if s.HealthMetric {
mutex.Lock()
s.gatherHealth(healthMetrics, serv, queryError)
s.gatherHealth(healthMetrics, s.Servers[serverIndex], queryError)
mutex.Unlock()
}

acc.AddError(queryError)
}(serv, query)
}(pool, query, i)
}
}

Expand All @@ -274,16 +265,40 @@ func (s *SQLServer) Gather(acc telegraf.Accumulator) error {
return nil
}

func (s *SQLServer) gatherServer(server string, query Query, acc telegraf.Accumulator) error {
// deferred opening
conn, err := sql.Open("mssql", server)
if err != nil {
// Start initialize a list of connection pools
func (s *SQLServer) Start(acc telegraf.Accumulator) error {
if err := initQueries(s); err != nil {
acc.AddError(err)
return err
}
defer conn.Close()

if len(s.Servers) == 0 {
s.Servers = append(s.Servers, defaultServer)
}

for _, serv := range s.Servers {
pool, err := sql.Open("mssql", serv)
if err != nil {
acc.AddError(err)
return err
}

s.pools = append(s.pools, pool)
}

return nil
}

// Stop cleanup server connection pools
func (s *SQLServer) Stop() {
for _, pool := range s.pools {
_ = pool.Close()
}
}

func (s *SQLServer) gatherServer(pool *sql.DB, query Query, acc telegraf.Accumulator) error {
// execute query
rows, err := conn.Query(query.Script)
rows, err := pool.Query(query.Script)
if err != nil {
return fmt.Errorf("Script %s failed: %w", query.ScriptName, err)
//return err
Expand Down
48 changes: 28 additions & 20 deletions plugins/inputs/sqlserver/sqlserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,13 @@ func TestSqlServer_MultipleInstanceIntegration(t *testing.T) {
}

var acc, acc2 testutil.Accumulator
require.NoError(t, s.Start(&acc))
err := s.Gather(&acc)
require.NoError(t, err)
assert.Equal(t, s.isInitialized, true)
assert.Equal(t, s2.isInitialized, false)

require.NoError(t, s2.Start(&acc2))
err = s2.Gather(&acc2)
require.NoError(t, err)
assert.Equal(t, s.isInitialized, true)
assert.Equal(t, s2.isInitialized, true)

// acc includes size metrics, and excludes memory metrics
assert.False(t, acc.HasMeasurement("Memory breakdown (%)"))
Expand All @@ -141,6 +139,9 @@ func TestSqlServer_MultipleInstanceIntegration(t *testing.T) {
// acc2 includes memory metrics, and excludes size metrics
assert.True(t, acc2.HasMeasurement("Memory breakdown (%)"))
assert.False(t, acc2.HasMeasurement("Log size (bytes)"))

s.Stop()
s2.Stop()
}

func TestSqlServer_MultipleInstanceWithHealthMetricIntegration(t *testing.T) {
Expand All @@ -162,15 +163,13 @@ func TestSqlServer_MultipleInstanceWithHealthMetricIntegration(t *testing.T) {
}

var acc, acc2 testutil.Accumulator
require.NoError(t, s.Start(&acc))
err := s.Gather(&acc)
require.NoError(t, err)
assert.Equal(t, s.isInitialized, true)
assert.Equal(t, s2.isInitialized, false)

require.NoError(t, s2.Start(&acc))
err = s2.Gather(&acc2)
require.NoError(t, err)
assert.Equal(t, s.isInitialized, true)
assert.Equal(t, s2.isInitialized, true)

// acc includes size metrics, and excludes memory metrics and the health metric
assert.False(t, acc.HasMeasurement(healthMetricName))
Expand All @@ -186,6 +185,9 @@ func TestSqlServer_MultipleInstanceWithHealthMetricIntegration(t *testing.T) {
tags := map[string]string{healthMetricInstanceTag: sqlInstance, healthMetricDatabaseTag: database}
assert.True(t, acc2.HasPoint(healthMetricName, tags, healthMetricAttemptedQueries, 9))
assert.True(t, acc2.HasPoint(healthMetricName, tags, healthMetricSuccessfulQueries, 9))

s.Stop()
s2.Stop()
}

func TestSqlServer_HealthMetric(t *testing.T) {
Expand All @@ -205,6 +207,7 @@ func TestSqlServer_HealthMetric(t *testing.T) {

// acc1 should have the health metric because it is specified in the config
var acc1 testutil.Accumulator
require.NoError(t, s1.Start(&acc1))
s1.Gather(&acc1)
assert.True(t, acc1.HasMeasurement(healthMetricName))

Expand All @@ -222,8 +225,12 @@ func TestSqlServer_HealthMetric(t *testing.T) {

// acc2 should not have the health metric because it is not specified in the config
var acc2 testutil.Accumulator
require.NoError(t, s2.Start(&acc2))
s2.Gather(&acc2)
assert.False(t, acc2.HasMeasurement(healthMetricName))

s1.Stop()
s2.Stop()
}

func TestSqlServer_MultipleInit(t *testing.T) {
Expand All @@ -236,15 +243,14 @@ func TestSqlServer_MultipleInit(t *testing.T) {
_, ok := s.queries["DatabaseSize"]
// acc includes size metrics
assert.True(t, ok)
assert.Equal(t, s.isInitialized, true)
assert.Equal(t, s2.isInitialized, false)

initQueries(s2)
_, ok = s2.queries["DatabaseSize"]
// acc2 excludes size metrics
assert.False(t, ok)
assert.Equal(t, s.isInitialized, true)
assert.Equal(t, s2.isInitialized, true)

s.Stop()
s2.Stop()
}

func TestSqlServer_ConnectionString(t *testing.T) {
Expand Down Expand Up @@ -349,15 +355,13 @@ func TestSqlServer_AGQueriesApplicableForDatabaseTypeSQLServer(t *testing.T) {
}

var acc, acc2 testutil.Accumulator
require.NoError(t, s.Start(&acc))
err := s.Gather(&acc)
require.NoError(t, err)
assert.Equal(t, s.isInitialized, true)
assert.Equal(t, s2.isInitialized, false)

err = s2.Gather(&acc2)
require.NoError(t, s2.Start(&acc))
require.NoError(t, err)
assert.Equal(t, s.isInitialized, true)
assert.Equal(t, s2.isInitialized, true)

// acc includes size metrics, and excludes memory metrics
assert.True(t, acc.HasMeasurement("sqlserver_hadr_replica_states"))
Expand All @@ -366,6 +370,9 @@ func TestSqlServer_AGQueriesApplicableForDatabaseTypeSQLServer(t *testing.T) {
// acc2 includes memory metrics, and excludes size metrics
assert.False(t, acc2.HasMeasurement("sqlserver_hadr_replica_states"))
assert.False(t, acc2.HasMeasurement("sqlserver_hadr_dbreplica_states"))

s.Stop()
s2.Stop()
}

func TestSqlServer_AGQueryFieldsOutputBasedOnSQLServerVersion(t *testing.T) {
Expand All @@ -390,15 +397,13 @@ func TestSqlServer_AGQueryFieldsOutputBasedOnSQLServerVersion(t *testing.T) {
}

var acc2019, acc2012 testutil.Accumulator
require.NoError(t, s2019.Start(&acc2019))
err := s2019.Gather(&acc2019)
require.NoError(t, err)
assert.Equal(t, s2019.isInitialized, true)
assert.Equal(t, s2012.isInitialized, false)

err = s2012.Gather(&acc2012)
require.NoError(t, s2012.Start(&acc2012))
require.NoError(t, err)
assert.Equal(t, s2019.isInitialized, true)
assert.Equal(t, s2012.isInitialized, true)

// acc2019 includes new HADR query fields
assert.True(t, acc2019.HasField("sqlserver_hadr_replica_states", "basic_features"))
Expand All @@ -415,6 +420,9 @@ func TestSqlServer_AGQueryFieldsOutputBasedOnSQLServerVersion(t *testing.T) {
assert.False(t, acc2012.HasTag("sqlserver_hadr_replica_states", "seeding_mode_desc"))
assert.False(t, acc2012.HasField("sqlserver_hadr_dbreplica_states", "is_primary_replica"))
assert.False(t, acc2012.HasField("sqlserver_hadr_dbreplica_states", "secondary_lag_seconds"))

s2019.Stop()
s2012.Stop()
}

const mockPerformanceMetrics = `measurement;servername;type;Point In Time Recovery;Available physical memory (bytes);Average pending disk IO;Average runnable tasks;Average tasks;Buffer pool rate (bytes/sec);Connection memory per connection (bytes);Memory grant pending;Page File Usage (%);Page lookup per batch request;Page split per batch request;Readahead per page read;Signal wait (%);Sql compilation per batch request;Sql recompilation per batch request;Total target memory ratio
Expand Down

0 comments on commit 871447b

Please sign in to comment.