From 39db7cac9dec888132fa01bcbe836c8fc8c58108 Mon Sep 17 00:00:00 2001 From: Chris Parkins Date: Thu, 30 Nov 2023 12:58:57 -0700 Subject: [PATCH 01/15] [azuremonitorreceiver] Add the ability to pull Resource Metrics using the Batch API. Currently a Proof-of-Concept that does not allow Resources to exist in different regions and does not filter based on dimensions similar to how it's being handled in the previous implementation (by Resource). --- ...d-batch-api-to-azure-monitor-receiver.yaml | 27 ++ cmd/configschema/go.mod | 3 +- cmd/configschema/go.sum | 6 +- cmd/otelcontribcol/go.mod | 3 +- cmd/otelcontribcol/go.sum | 6 +- go.mod | 3 +- go.sum | 6 +- receiver/azuremonitorreceiver/config.go | 8 + receiver/azuremonitorreceiver/factory.go | 29 +- receiver/azuremonitorreceiver/go.mod | 5 +- receiver/azuremonitorreceiver/go.sum | 10 +- .../azuremonitorreceiver/scraper_batch.go | 447 ++++++++++++++++++ 12 files changed, 528 insertions(+), 25 deletions(-) create mode 100644 .chloggen/add-batch-api-to-azure-monitor-receiver.yaml create mode 100644 receiver/azuremonitorreceiver/scraper_batch.go diff --git a/.chloggen/add-batch-api-to-azure-monitor-receiver.yaml b/.chloggen/add-batch-api-to-azure-monitor-receiver.yaml new file mode 100644 index 000000000000..9c1463bbd4e6 --- /dev/null +++ b/.chloggen/add-batch-api-to-azure-monitor-receiver.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: azuremonitorreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Allow the Metrics Batch API to be used instead of the Azure Resource Manager API to run less queries and avoid being throttled. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: This is intended to be a Proof-of-Concept and includes a lot of replication. It is inteded to be used to start a conversation until ideas can be integrated and issues resolved. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/configschema/go.mod b/cmd/configschema/go.mod index a1d819156400..dde4f97e6632 100644 --- a/cmd/configschema/go.mod +++ b/cmd/configschema/go.mod @@ -200,6 +200,7 @@ require ( github.com/Azure/azure-sdk-for-go v68.0.0+incompatible // indirect github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v4 v4.2.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.0 // indirect @@ -355,7 +356,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang-jwt/jwt/v4 v4.5.0 // indirect - github.com/golang-jwt/jwt/v5 v5.0.0 // indirect + github.com/golang-jwt/jwt/v5 v5.1.0 // indirect github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect diff --git a/cmd/configschema/go.sum b/cmd/configschema/go.sum index cd6ffa5516ec..7c80a797cc5d 100644 --- a/cmd/configschema/go.sum +++ b/cmd/configschema/go.sum @@ -101,6 +101,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0/go.mod h1:1fXstnBMas5kzG github.com/Azure/azure-sdk-for-go/sdk/internal v0.7.0/go.mod h1:yqy467j36fJxcRV2TzfVZ1pCb5vxm4BtZPUdYWe/Xo8= github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.0 h1:d81/ng9rET2YqdVkVwkb6EXeRrLJIwyGnJcAlAWKwhs= github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.0/go.mod h1:s4kgfzA0covAXNicZHDMN58jExvcng2mC/DepXiF1EI= +github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1 h1:uG9gOhn40/Ocd12+Nm6vAZM80s9hwB2Yhjg5UM4rb/A= +github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1/go.mod h1:8STfZzbS0RUr8NtnAreiVeLCH3bpJSTmuARvThbTGmk= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v4 v4.2.1 h1:UPeCRD+XY7QlaGQte2EVI2iOcWvUYA2XY8w5T/8v0NQ= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v4 v4.2.1/go.mod h1:oGV6NlB0cvi1ZbYRR2UN44QHxWFyGk+iylgD0qaMXjA= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal v1.1.2 h1:mLY+pNLjCUeKhgnAJWAKhEUQM+RJQo2H1fuGSw1Ky1E= @@ -690,8 +692,8 @@ github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzq github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= -github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE= -github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang-jwt/jwt/v5 v5.1.0 h1:UGKbA/IPjtS6zLcdB7i5TyACMgSbOTiR8qzXgw8HWQU= +github.com/golang-jwt/jwt/v5 v5.1.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= diff --git a/cmd/otelcontribcol/go.mod b/cmd/otelcontribcol/go.mod index 6d756db73b56..1e8e94450125 100644 --- a/cmd/otelcontribcol/go.mod +++ b/cmd/otelcontribcol/go.mod @@ -240,6 +240,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v4 v4.2.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2.1 // indirect @@ -401,7 +402,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang-jwt/jwt/v4 v4.5.0 // indirect - github.com/golang-jwt/jwt/v5 v5.0.0 // indirect + github.com/golang-jwt/jwt/v5 v5.1.0 // indirect github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect diff --git a/cmd/otelcontribcol/go.sum b/cmd/otelcontribcol/go.sum index b319c70d4ff0..f86330afaeae 100644 --- a/cmd/otelcontribcol/go.sum +++ b/cmd/otelcontribcol/go.sum @@ -100,6 +100,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0/go.mod h1:1fXstnBMas5kzG github.com/Azure/azure-sdk-for-go/sdk/internal v0.7.0/go.mod h1:yqy467j36fJxcRV2TzfVZ1pCb5vxm4BtZPUdYWe/Xo8= github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.0 h1:d81/ng9rET2YqdVkVwkb6EXeRrLJIwyGnJcAlAWKwhs= github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.0/go.mod h1:s4kgfzA0covAXNicZHDMN58jExvcng2mC/DepXiF1EI= +github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1 h1:uG9gOhn40/Ocd12+Nm6vAZM80s9hwB2Yhjg5UM4rb/A= +github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1/go.mod h1:8STfZzbS0RUr8NtnAreiVeLCH3bpJSTmuARvThbTGmk= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v4 v4.2.1 h1:UPeCRD+XY7QlaGQte2EVI2iOcWvUYA2XY8w5T/8v0NQ= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v4 v4.2.1/go.mod h1:oGV6NlB0cvi1ZbYRR2UN44QHxWFyGk+iylgD0qaMXjA= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal v1.1.2 h1:mLY+pNLjCUeKhgnAJWAKhEUQM+RJQo2H1fuGSw1Ky1E= @@ -687,8 +689,8 @@ github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzq github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= -github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE= -github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang-jwt/jwt/v5 v5.1.0 h1:UGKbA/IPjtS6zLcdB7i5TyACMgSbOTiR8qzXgw8HWQU= +github.com/golang-jwt/jwt/v5 v5.1.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= diff --git a/go.mod b/go.mod index b2c4cb966391..04d939175d6e 100644 --- a/go.mod +++ b/go.mod @@ -217,6 +217,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v4 v4.2.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2.1 // indirect @@ -380,7 +381,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang-jwt/jwt/v4 v4.5.0 // indirect - github.com/golang-jwt/jwt/v5 v5.0.0 // indirect + github.com/golang-jwt/jwt/v5 v5.1.0 // indirect github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect diff --git a/go.sum b/go.sum index 1facc4d3b11f..c0dd7dd7abcd 100644 --- a/go.sum +++ b/go.sum @@ -101,6 +101,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0/go.mod h1:1fXstnBMas5kzG github.com/Azure/azure-sdk-for-go/sdk/internal v0.7.0/go.mod h1:yqy467j36fJxcRV2TzfVZ1pCb5vxm4BtZPUdYWe/Xo8= github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.0 h1:d81/ng9rET2YqdVkVwkb6EXeRrLJIwyGnJcAlAWKwhs= github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.0/go.mod h1:s4kgfzA0covAXNicZHDMN58jExvcng2mC/DepXiF1EI= +github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1 h1:uG9gOhn40/Ocd12+Nm6vAZM80s9hwB2Yhjg5UM4rb/A= +github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1/go.mod h1:8STfZzbS0RUr8NtnAreiVeLCH3bpJSTmuARvThbTGmk= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v4 v4.2.1 h1:UPeCRD+XY7QlaGQte2EVI2iOcWvUYA2XY8w5T/8v0NQ= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v4 v4.2.1/go.mod h1:oGV6NlB0cvi1ZbYRR2UN44QHxWFyGk+iylgD0qaMXjA= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal v1.1.2 h1:mLY+pNLjCUeKhgnAJWAKhEUQM+RJQo2H1fuGSw1Ky1E= @@ -695,8 +697,8 @@ github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzw github.com/golang-jwt/jwt/v4 v4.2.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= -github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE= -github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang-jwt/jwt/v5 v5.1.0 h1:UGKbA/IPjtS6zLcdB7i5TyACMgSbOTiR8qzXgw8HWQU= +github.com/golang-jwt/jwt/v5 v5.1.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= diff --git a/receiver/azuremonitorreceiver/config.go b/receiver/azuremonitorreceiver/config.go index 5037a08d9b29..16aabd6f1254 100644 --- a/receiver/azuremonitorreceiver/config.go +++ b/receiver/azuremonitorreceiver/config.go @@ -26,6 +26,7 @@ var ( errMissingClientSecret = errors.New(`ClientSecret" is not specified in config`) errMissingFedTokenFile = errors.New(`FederatedTokenFile is not specified in config`) errInvalidCloud = errors.New(`Cloud" is invalid`) + errInvalidRegion = errors.New("`Region` is not specifiec in config`") monitorServices = []string{ "Microsoft.EventGrid/eventSubscriptions", @@ -244,6 +245,9 @@ type Config struct { CacheResourcesDefinitions float64 `mapstructure:"cache_resources_definitions"` MaximumNumberOfMetricsInACall int `mapstructure:"maximum_number_of_metrics_in_a_call"` AppendTagsAsAttributes bool `mapstructure:"append_tags_as_attributes"` + UseBatchApi bool `mapstructure:"use_batch_api"` + Region string `mapstructure:"region"` + MaximumNumberOfDimensionsInACall int `mapstructure:"maximum_number_of_dimensions_in_a_call"` } const ( @@ -290,5 +294,9 @@ func (c Config) Validate() (err error) { err = multierr.Append(err, errInvalidCloud) } + if c.UseBatchApi && c.Region == "" { + err = multierr.Append(err, errInvalidRegion) + } + return } diff --git a/receiver/azuremonitorreceiver/factory.go b/receiver/azuremonitorreceiver/factory.go index 4afb337d583c..9d1de491bf21 100644 --- a/receiver/azuremonitorreceiver/factory.go +++ b/receiver/azuremonitorreceiver/factory.go @@ -36,14 +36,15 @@ func createDefaultConfig() component.Config { cfg.CollectionInterval = defaultCollectionInterval return &Config{ - ScraperControllerSettings: cfg, - MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), - CacheResources: 24 * 60 * 60, - CacheResourcesDefinitions: 24 * 60 * 60, - MaximumNumberOfMetricsInACall: 20, - Services: monitorServices, - Authentication: servicePrincipal, - Cloud: defaultCloud, + ScraperControllerSettings: cfg, + MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), + CacheResources: 24 * 60 * 60, + CacheResourcesDefinitions: 24 * 60 * 60, + MaximumNumberOfMetricsInACall: 20, + Services: monitorServices, + Authentication: servicePrincipal, + Cloud: defaultCloud, + MaximumNumberOfDimensionsInACall: 10, } } @@ -53,8 +54,16 @@ func createMetricsReceiver(_ context.Context, params receiver.CreateSettings, rC return nil, errConfigNotAzureMonitor } - azureScraper := newScraper(cfg, params) - scraper, err := scraperhelper.NewScraper(metadata.Type, azureScraper.scrape, scraperhelper.WithStart(azureScraper.start)) + var scraper scraperhelper.Scraper + var err error + if cfg.UseBatchApi { + azureBatchScraper := newBatchScraper(cfg, params) + scraper, err = scraperhelper.NewScraper(metadata.Type, azureBatchScraper.scrape, scraperhelper.WithStart(azureBatchScraper.start)) + } else { + azureScraper := newScraper(cfg, params) + scraper, err = scraperhelper.NewScraper(metadata.Type, azureScraper.scrape, scraperhelper.WithStart(azureScraper.start)) + } + if err != nil { return nil, err } diff --git a/receiver/azuremonitorreceiver/go.mod b/receiver/azuremonitorreceiver/go.mod index c25a274f635f..c1a503772285 100644 --- a/receiver/azuremonitorreceiver/go.mod +++ b/receiver/azuremonitorreceiver/go.mod @@ -5,6 +5,7 @@ go 1.20 require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.0 github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0 + github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0 github.com/google/go-cmp v0.6.0 @@ -22,11 +23,11 @@ require ( require ( github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.0 // indirect - github.com/AzureAD/microsoft-authentication-library-for-go v1.1.1 // indirect + github.com/AzureAD/microsoft-authentication-library-for-go v1.2.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang-jwt/jwt/v5 v5.0.0 // indirect + github.com/golang-jwt/jwt/v5 v5.1.0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/uuid v1.4.0 // indirect github.com/hashicorp/go-version v1.6.0 // indirect diff --git a/receiver/azuremonitorreceiver/go.sum b/receiver/azuremonitorreceiver/go.sum index e1c3109dc661..05b1966e5eef 100644 --- a/receiver/azuremonitorreceiver/go.sum +++ b/receiver/azuremonitorreceiver/go.sum @@ -6,14 +6,16 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0 h1:BMAjVKJM0U/CYF27gA0ZM github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0/go.mod h1:1fXstnBMas5kzG+S3q8UoJcmyU6nUeunJcMDHcRYHhs= github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.0 h1:d81/ng9rET2YqdVkVwkb6EXeRrLJIwyGnJcAlAWKwhs= github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.0/go.mod h1:s4kgfzA0covAXNicZHDMN58jExvcng2mC/DepXiF1EI= +github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1 h1:uG9gOhn40/Ocd12+Nm6vAZM80s9hwB2Yhjg5UM4rb/A= +github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1/go.mod h1:8STfZzbS0RUr8NtnAreiVeLCH3bpJSTmuARvThbTGmk= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal/v2 v2.0.0 h1:PTFGRSlMKCQelWwxUyYVEUqseBJVemLyqWJjvMyt0do= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/managementgroups/armmanagementgroups v1.0.0 h1:pPvTJ1dY0sA35JOeFq6TsY2xj6Z85Yo23Pj4wCCvu4o= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0 h1:Ds0KRF8ggpEGg4Vo42oX1cIt/IfOhHWJBikksZbVxeg= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0/go.mod h1:jj6P8ybImR+5topJ+eH6fgcemSFBmU6/6bFF8KkwuDI= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0 h1:Dd+RhdJn0OTtVGaeDLZpcumkIVCtA/3/Fo42+eoYvVM= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0/go.mod h1:5kakwfW5CjC9KK+Q4wjXAg+ShuIm2mBMua0ZFj2C8PE= -github.com/AzureAD/microsoft-authentication-library-for-go v1.1.1 h1:WpB/QDNLpMw72xHJc34BNNykqSOeEJDAWkhf0u12/Jk= -github.com/AzureAD/microsoft-authentication-library-for-go v1.1.1/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.0 h1:hVeq+yCyUi+MsoO/CU95yqCIcdzra5ovzk8Q2BBpV2M= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.0/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -35,8 +37,8 @@ github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE= -github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang-jwt/jwt/v5 v5.1.0 h1:UGKbA/IPjtS6zLcdB7i5TyACMgSbOTiR8qzXgw8HWQU= +github.com/golang-jwt/jwt/v5 v5.1.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= diff --git a/receiver/azuremonitorreceiver/scraper_batch.go b/receiver/azuremonitorreceiver/scraper_batch.go new file mode 100644 index 000000000000..b02ef40f72dc --- /dev/null +++ b/receiver/azuremonitorreceiver/scraper_batch.go @@ -0,0 +1,447 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package azuremonitorreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azuremonitorreceiver" + +import ( + "context" + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/arm" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/cloud" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/azidentity" + "github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery" + "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor" + "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azuremonitorreceiver/internal/metadata" +) + +type azureType struct { + name *string + attributes map[string]*string + resourceIds []*string + metricsByCompositeKey map[metricsCompositeKey]*azureResourceMetrics + metricsDefinitionsUpdated time.Time +} + +func newBatchScraper(conf *Config, settings receiver.CreateSettings) *azureBatchScraper { + return &azureBatchScraper{ + cfg: conf, + settings: settings.TelemetrySettings, + mb: metadata.NewMetricsBuilder(conf.MetricsBuilderConfig, settings), + azIDCredentialsFunc: azidentity.NewClientSecretCredential, + azIDWorkloadFunc: azidentity.NewWorkloadIdentityCredential, + armClientFunc: armresources.NewClient, + armMonitorDefinitionsClientFunc: armmonitor.NewMetricDefinitionsClient, + azQueryMetricsBatchClientFunc: azquery.NewMetricsBatchClient, + mutex: &sync.Mutex{}, + } +} + +type azureBatchScraper struct { + cred azcore.TokenCredential + + clientResources ArmClient + clientMetricsDefinitions MetricsDefinitionsClientInterface + clientMetricsBatchValues MetricBatchValuesClient + + cfg *Config + settings component.TelemetrySettings + resources map[string]*azureResource + resourceTypes map[string]*azureType + resourcesUpdated time.Time + mb *metadata.MetricsBuilder + azIDCredentialsFunc func(string, string, string, *azidentity.ClientSecretCredentialOptions) (*azidentity.ClientSecretCredential, error) + azIDWorkloadFunc func(options *azidentity.WorkloadIdentityCredentialOptions) (*azidentity.WorkloadIdentityCredential, error) + armClientOptions *arm.ClientOptions + armClientFunc func(string, azcore.TokenCredential, *arm.ClientOptions) (*armresources.Client, error) + armMonitorDefinitionsClientFunc func(azcore.TokenCredential, *arm.ClientOptions) (*armmonitor.MetricDefinitionsClient, error) + azQueryMetricsBatchClientOptions *azquery.MetricsBatchClientOptions + azQueryMetricsBatchClientFunc func(string, azcore.TokenCredential, *azquery.MetricsBatchClientOptions) (*azquery.MetricsBatchClient, error) + mutex *sync.Mutex +} + +func (s *azureBatchScraper) getArmClientOptions() *arm.ClientOptions { + var cloudToUse cloud.Configuration + switch s.cfg.Cloud { + case azureGovernmentCloud: + cloudToUse = cloud.AzureGovernment + default: + cloudToUse = cloud.AzurePublic + } + options := arm.ClientOptions{ + ClientOptions: azcore.ClientOptions{ + Cloud: cloudToUse, + }, + } + + return &options +} + +func (s *azureBatchScraper) getAzQueryMetricsBatchClientOptions() *azquery.MetricsBatchClientOptions { + var cloudToUse cloud.Configuration + switch s.cfg.Cloud { + case azureGovernmentCloud: + cloudToUse = cloud.AzureGovernment + default: + cloudToUse = cloud.AzurePublic + } + + options := azquery.MetricsBatchClientOptions{ + ClientOptions: azcore.ClientOptions{ + Cloud: cloudToUse, + }, + } + + return &options +} + +func (s *azureBatchScraper) getArmClient() ArmClient { + client, _ := s.armClientFunc(s.cfg.SubscriptionID, s.cred, s.armClientOptions) + return client +} + +func (s *azureBatchScraper) getMetricsDefinitionsClient() MetricsDefinitionsClientInterface { + client, _ := s.armMonitorDefinitionsClientFunc(s.cred, s.armClientOptions) + return client +} + +type MetricBatchValuesClient interface { + QueryBatch(ctx context.Context, subscriptionID string, metricNamespace string, metricNames []string, resourceIDs azquery.ResourceIDList, options *azquery.MetricsBatchClientQueryBatchOptions) ( + azquery.MetricsBatchClientQueryBatchResponse, error, + ) +} + +func (s *azureBatchScraper) GetMetricsBatchValuesClient() MetricBatchValuesClient { + endpoint := "https://" + s.cfg.Region + ".metrics.monitor.azure.com" + s.settings.Logger.Info("Batch Endpoint", zap.String("endpoint", endpoint)) + client, _ := azquery.NewMetricsBatchClient(endpoint, s.cred, s.azQueryMetricsBatchClientOptions) + return client +} + +func (s *azureBatchScraper) start(_ context.Context, _ component.Host) (err error) { + if err = s.loadCredentials(); err != nil { + return err + } + + s.armClientOptions = s.getArmClientOptions() + s.clientResources = s.getArmClient() + s.clientMetricsDefinitions = s.getMetricsDefinitionsClient() + s.azQueryMetricsBatchClientOptions = s.getAzQueryMetricsBatchClientOptions() + s.clientMetricsBatchValues = s.GetMetricsBatchValuesClient() + + s.resources = map[string]*azureResource{} + s.resourceTypes = map[string]*azureType{} + + return +} + +func (s *azureBatchScraper) loadCredentials() (err error) { + switch s.cfg.Authentication { + case servicePrincipal: + if s.cred, err = s.azIDCredentialsFunc(s.cfg.TenantID, s.cfg.ClientID, s.cfg.ClientSecret, nil); err != nil { + return err + } + case workloadIdentity: + if s.cred, err = s.azIDWorkloadFunc(nil); err != nil { + return err + } + default: + return fmt.Errorf("unknown authentication %v", s.cfg.Authentication) + } + return nil +} + +func (s *azureBatchScraper) scrape(ctx context.Context) (pmetric.Metrics, error) { + + s.getResources(ctx) + s.settings.Logger.Info("scrape", zap.Any("s.resourceTypes", s.resourceTypes)) + + resourceTypesWithDefinitions := make(chan string) + go func() { + defer close(resourceTypesWithDefinitions) + for resourceType := range s.resourceTypes { + s.getResourceMetricsDefinitionsByType(ctx, resourceType) + resourceTypesWithDefinitions <- resourceType + } + }() + + var wg sync.WaitGroup + for resourceType := range resourceTypesWithDefinitions { + wg.Add(1) + go func(resourceType string) { + defer wg.Done() + s.settings.Logger.Info("scrape", zap.String("resourceType", resourceType)) + s.settings.Logger.Info("scrape", zap.Any("s.resourceTypes[resourceType]", *s.resourceTypes[resourceType])) + s.getBatchMetricsValues(ctx, resourceType) + }(resourceType) + } + wg.Wait() + + return s.mb.Emit( + metadata.WithAzureMonitorSubscriptionID(s.cfg.SubscriptionID), + metadata.WithAzureMonitorTenantID(s.cfg.TenantID), + ), nil +} + +func (s *azureBatchScraper) getResources(ctx context.Context) { + if time.Since(s.resourcesUpdated).Seconds() < s.cfg.CacheResources { + return + } + + existingResources := map[string]void{} + for id := range s.resources { + existingResources[id] = void{} + } + + filter := s.getResourcesFilter() + opts := &armresources.ClientListOptions{ + Filter: &filter, + } + + updatedTypes := map[string]*azureType{} + pager := s.clientResources.NewListPager(opts) + + for pager.More() { + nextResult, err := pager.NextPage(ctx) + if err != nil { + s.settings.Logger.Error("failed to get Azure Resources data", zap.Error(err)) + return + } + + for _, resource := range nextResult.Value { + if _, ok := s.resources[*resource.ID]; !ok { + resourceGroup := getResourceGroupFromID(*resource.ID) + attributes := map[string]*string{ + attributeName: resource.Name, + attributeResourceGroup: &resourceGroup, + attributeResourceType: resource.Type, + } + + if resource.Location != nil { + attributes[attributeLocation] = resource.Location + } + + s.resources[*resource.ID] = &azureResource{ + attributes: attributes, + tags: resource.Tags, + } + + if updatedTypes[*resource.Type] == nil { + updatedTypes[*resource.Type] = &azureType{ + name: resource.Name, + attributes: map[string]*string{}, + resourceIds: []*string{resource.ID}, + } + } else { + updatedTypes[*resource.Type].resourceIds = append(updatedTypes[*resource.Type].resourceIds, resource.ID) + } + } + delete(existingResources, *resource.ID) + } + } + + if len(existingResources) > 0 { + for idToDelete := range existingResources { + delete(s.resources, idToDelete) + } + } + + s.resourcesUpdated = time.Now() + s.resourceTypes = updatedTypes +} + +func (s *azureBatchScraper) getResourcesFilter() string { + // TODO: switch to parsing services from + // https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/metrics-supported + resourcesTypeFilter := strings.Join(s.cfg.Services, "' or resourceType eq '") + + resourcesGroupFilterString := "" + if len(s.cfg.ResourceGroups) > 0 { + resourcesGroupFilterString = fmt.Sprintf(" and (resourceGroup eq '%s')", + strings.Join(s.cfg.ResourceGroups, "' or resourceGroup eq '")) + } + + return fmt.Sprintf("(resourceType eq '%s')%s", resourcesTypeFilter, resourcesGroupFilterString) +} + +func (s *azureBatchScraper) getResourceMetricsDefinitionsByType(ctx context.Context, resourceType string) { + + if time.Since(s.resourceTypes[resourceType].metricsDefinitionsUpdated).Seconds() < s.cfg.CacheResourcesDefinitions { + return + } + + s.resourceTypes[resourceType].metricsByCompositeKey = map[metricsCompositeKey]*azureResourceMetrics{} + + resourceIds := s.resourceTypes[resourceType].resourceIds + if len(resourceIds) == 0 && resourceIds[0] != nil { + return + } + + pager := s.clientMetricsDefinitions.NewListPager(*resourceIds[0], nil) + for pager.More() { + nextResult, err := pager.NextPage(ctx) + if err != nil { + s.settings.Logger.Error("failed to get Azure Metrics definitions data", zap.Error(err)) + return + } + + for _, v := range nextResult.Value { + s.settings.Logger.Info("getResourceMetricsDefinitionsByType", zap.String("resourceType", resourceType), zap.Any("v", v)) + timeGrain := *v.MetricAvailabilities[0].TimeGrain + name := *v.Name.Value + compositeKey := metricsCompositeKey{timeGrain: timeGrain} + + if len(v.Dimensions) > 0 { + var dimensionsSlice []string + for _, dimension := range v.Dimensions { + if len(strings.TrimSpace(*dimension.Value)) > 0 { + dimensionsSlice = append(dimensionsSlice, *dimension.Value) + } + } + sort.Strings(dimensionsSlice) + compositeKey.dimensions = strings.Join(dimensionsSlice, ",") + } + s.storeMetricsDefinitionByType(resourceType, name, compositeKey) + } + } + s.resourceTypes[resourceType].metricsDefinitionsUpdated = time.Now() +} + +func (s *azureBatchScraper) storeMetricsDefinitionByType(resourceType string, name string, compositeKey metricsCompositeKey) { + if _, ok := s.resourceTypes[resourceType].metricsByCompositeKey[compositeKey]; ok { + s.resourceTypes[resourceType].metricsByCompositeKey[compositeKey].metrics = append( + s.resourceTypes[resourceType].metricsByCompositeKey[compositeKey].metrics, name, + ) + } else { + s.resourceTypes[resourceType].metricsByCompositeKey[compositeKey] = &azureResourceMetrics{metrics: []string{name}} + } +} + +func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, resourceType string) { + resType := *s.resourceTypes[resourceType] + + for compositeKey, metricsByGrain := range resType.metricsByCompositeKey { + + if time.Since(metricsByGrain.metricsValuesUpdated).Seconds() < float64(timeGrains[compositeKey.timeGrain]) { + continue + } + + now := time.Now().UTC() + metricsByGrain.metricsValuesUpdated = now + startTime := now.Add(time.Duration(-timeGrains[compositeKey.timeGrain]) * time.Second) + s.settings.Logger.Info("getBatchMetricsValues", zap.String("resourceType", resourceType), zap.Any("metricNames", metricsByGrain.metrics), zap.Any("startTime", startTime), zap.Any("now", now), zap.String("timeGrain", compositeKey.timeGrain)) + + start := 0 + for start < len(metricsByGrain.metrics) { + + end := start + s.cfg.MaximumNumberOfMetricsInACall + if end > len(metricsByGrain.metrics) { + end = len(metricsByGrain.metrics) + } + + response, err := s.clientMetricsBatchValues.QueryBatch( + ctx, + s.cfg.SubscriptionID, + resourceType, + metricsByGrain.metrics[start:end], + azquery.ResourceIDList{ResourceIDs: resType.resourceIds}, + &azquery.MetricsBatchClientQueryBatchOptions{ + Aggregation: to.SliceOfPtrs( + azquery.AggregationTypeAverage, + azquery.AggregationTypeMaximum, + azquery.AggregationTypeMinimum, + azquery.AggregationTypeTotal, + azquery.AggregationTypeCount, + ), + StartTime: to.Ptr(startTime.Format(time.RFC3339)), + EndTime: to.Ptr(now.Format(time.RFC3339)), + Interval: to.Ptr(compositeKey.timeGrain), + Top: to.Ptr(int32(10)), // Defaults to 10 (may be limiting results) + }, + ) + + if err != nil { + s.settings.Logger.Error("failed to get Azure Metrics values data", zap.Error(err)) + return + } + + start = end + for _, metricValues := range response.Values { + for _, metric := range metricValues.Values { + for _, timeseriesElement := range metric.TimeSeries { + + if timeseriesElement.Data != nil { + res := s.resources[*metricValues.ResourceID] + attributes := map[string]*string{} + for name, value := range res.attributes { + attributes[name] = value + } + for _, value := range timeseriesElement.MetadataValues { + name := metadataPrefix + *value.Name.Value + attributes[name] = value.Value + } + if s.cfg.AppendTagsAsAttributes { + for tagName, value := range res.tags { + name := tagPrefix + tagName + attributes[name] = value + } + } + for _, metricValue := range timeseriesElement.Data { + s.processQueryTimeseriesData(*metricValues.ResourceID, metric, metricValue, attributes) + } + } + } + } + } + } + } +} + +func (s *azureBatchScraper) processQueryTimeseriesData( + resourceID string, + metric *azquery.Metric, + metricValue *azquery.MetricValue, + attributes map[string]*string, +) { + s.mutex.Lock() + defer s.mutex.Unlock() + + ts := pcommon.NewTimestampFromTime(time.Now()) + + aggregationsData := []struct { + name string + value *float64 + }{ + {"Average", metricValue.Average}, + {"Count", metricValue.Count}, + {"Maximum", metricValue.Maximum}, + {"Minimum", metricValue.Minimum}, + {"Total", metricValue.Total}, + } + for _, aggregation := range aggregationsData { + if aggregation.value != nil { + s.mb.AddDataPoint( + resourceID, + *metric.Name.Value, + aggregation.name, + string(*metric.Unit), + attributes, + ts, + *aggregation.value, + ) + } + } +} From f90d82862a3f7b21103c06596b2a4936c79345c1 Mon Sep 17 00:00:00 2001 From: Chris Parkins Date: Thu, 30 Nov 2023 13:19:42 -0700 Subject: [PATCH 02/15] Fix errors that were not ocurring during testing --- receiver/azuremonitorreceiver/scraper_batch.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/receiver/azuremonitorreceiver/scraper_batch.go b/receiver/azuremonitorreceiver/scraper_batch.go index b02ef40f72dc..e3985e72c36e 100644 --- a/receiver/azuremonitorreceiver/scraper_batch.go +++ b/receiver/azuremonitorreceiver/scraper_batch.go @@ -67,7 +67,7 @@ type azureBatchScraper struct { azIDWorkloadFunc func(options *azidentity.WorkloadIdentityCredentialOptions) (*azidentity.WorkloadIdentityCredential, error) armClientOptions *arm.ClientOptions armClientFunc func(string, azcore.TokenCredential, *arm.ClientOptions) (*armresources.Client, error) - armMonitorDefinitionsClientFunc func(azcore.TokenCredential, *arm.ClientOptions) (*armmonitor.MetricDefinitionsClient, error) + armMonitorDefinitionsClientFunc func(string, azcore.TokenCredential, *arm.ClientOptions) (*armmonitor.MetricDefinitionsClient, error) azQueryMetricsBatchClientOptions *azquery.MetricsBatchClientOptions azQueryMetricsBatchClientFunc func(string, azcore.TokenCredential, *azquery.MetricsBatchClientOptions) (*azquery.MetricsBatchClient, error) mutex *sync.Mutex @@ -114,7 +114,7 @@ func (s *azureBatchScraper) getArmClient() ArmClient { } func (s *azureBatchScraper) getMetricsDefinitionsClient() MetricsDefinitionsClientInterface { - client, _ := s.armMonitorDefinitionsClientFunc(s.cred, s.armClientOptions) + client, _ := s.armMonitorDefinitionsClientFunc(s.cfg.SubscriptionID, s.cred, s.armClientOptions) return client } @@ -369,7 +369,7 @@ func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, resourceT StartTime: to.Ptr(startTime.Format(time.RFC3339)), EndTime: to.Ptr(now.Format(time.RFC3339)), Interval: to.Ptr(compositeKey.timeGrain), - Top: to.Ptr(int32(10)), // Defaults to 10 (may be limiting results) + Top: to.Ptr(int32(s.cfg.MaximumNumberOfDimensionsInACall)), // Defaults to 10 (may be limiting results) }, ) From 698548ddb00b707c22bac4dd5267f8ae5fd669de Mon Sep 17 00:00:00 2001 From: Alban HURTAUD Date: Fri, 26 Jan 2024 10:10:15 +0100 Subject: [PATCH 03/15] azure monitor receiver - getBatch few enhancement --- cmd/otelcontribcol/go.mod | 2 +- cmd/otelcontribcol/go.sum | 4 +- go.mod | 2 +- go.sum | 4 +- .../azuremonitorreceiver/scraper_batch.go | 43 ++++++++++--------- 5 files changed, 29 insertions(+), 26 deletions(-) diff --git a/cmd/otelcontribcol/go.mod b/cmd/otelcontribcol/go.mod index a8ded14e19f6..3b2e76ff349b 100644 --- a/cmd/otelcontribcol/go.mod +++ b/cmd/otelcontribcol/go.mod @@ -407,7 +407,7 @@ require ( github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang-jwt/jwt/v4 v4.5.0 // indirect github.com/golang-jwt/jwt/v5 v5.1.0 // indirect - github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect + github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/mock v1.6.0 // indirect diff --git a/cmd/otelcontribcol/go.sum b/cmd/otelcontribcol/go.sum index dcca2d415be7..71c67f6a2bb9 100644 --- a/cmd/otelcontribcol/go.sum +++ b/cmd/otelcontribcol/go.sum @@ -682,8 +682,8 @@ github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOW github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang-jwt/jwt/v5 v5.1.0 h1:UGKbA/IPjtS6zLcdB7i5TyACMgSbOTiR8qzXgw8HWQU= github.com/golang-jwt/jwt/v5 v5.1.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= -github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY= -github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= +github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= +github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= github.com/golang-sql/sqlexp v0.1.0/go.mod h1:J4ad9Vo8ZCWQ2GMrC4UCQy1JpCbwU9m3EOqtpKwwwHI= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= diff --git a/go.mod b/go.mod index 3f9921484298..fc8f0620242d 100644 --- a/go.mod +++ b/go.mod @@ -379,7 +379,7 @@ require ( github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang-jwt/jwt/v4 v4.5.0 // indirect github.com/golang-jwt/jwt/v5 v5.1.0 // indirect - github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect + github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/mock v1.6.0 // indirect diff --git a/go.sum b/go.sum index 98379a8e4ca2..a41b578dce53 100644 --- a/go.sum +++ b/go.sum @@ -690,8 +690,8 @@ github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOW github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang-jwt/jwt/v5 v5.1.0 h1:UGKbA/IPjtS6zLcdB7i5TyACMgSbOTiR8qzXgw8HWQU= github.com/golang-jwt/jwt/v5 v5.1.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= -github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY= -github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= +github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= +github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= github.com/golang-sql/sqlexp v0.1.0/go.mod h1:J4ad9Vo8ZCWQ2GMrC4UCQy1JpCbwU9m3EOqtpKwwwHI= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= diff --git a/receiver/azuremonitorreceiver/scraper_batch.go b/receiver/azuremonitorreceiver/scraper_batch.go index e3985e72c36e..78303b0b092b 100644 --- a/receiver/azuremonitorreceiver/scraper_batch.go +++ b/receiver/azuremonitorreceiver/scraper_batch.go @@ -6,6 +6,7 @@ package azuremonitorreceiver // import "github.com/open-telemetry/opentelemetry- import ( "context" "fmt" + "maps" "sort" "strings" "sync" @@ -241,7 +242,7 @@ func (s *azureBatchScraper) getResources(ctx context.Context) { if updatedTypes[*resource.Type] == nil { updatedTypes[*resource.Type] = &azureType{ - name: resource.Name, + name: resource.Type, attributes: map[string]*string{}, resourceIds: []*string{resource.ID}, } @@ -260,7 +261,7 @@ func (s *azureBatchScraper) getResources(ctx context.Context) { } s.resourcesUpdated = time.Now() - s.resourceTypes = updatedTypes + maps.Copy(s.resourceTypes, updatedTypes) } func (s *azureBatchScraper) getResourcesFilter() string { @@ -341,7 +342,7 @@ func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, resourceT now := time.Now().UTC() metricsByGrain.metricsValuesUpdated = now - startTime := now.Add(time.Duration(-timeGrains[compositeKey.timeGrain]) * time.Second) + startTime := now.Add(time.Duration(-timeGrains[compositeKey.timeGrain]) * time.Second * 2) // times 2 because for some resources, data are missing for the very latest timestamp s.settings.Logger.Info("getBatchMetricsValues", zap.String("resourceType", resourceType), zap.Any("metricNames", metricsByGrain.metrics), zap.Any("startTime", startTime), zap.Any("now", now), zap.String("timeGrain", compositeKey.timeGrain)) start := 0 @@ -381,26 +382,28 @@ func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, resourceT start = end for _, metricValues := range response.Values { for _, metric := range metricValues.Values { - for _, timeseriesElement := range metric.TimeSeries { + for _, timeseriesElement := range metric.TimeSeries { if timeseriesElement.Data != nil { - res := s.resources[*metricValues.ResourceID] - attributes := map[string]*string{} - for name, value := range res.attributes { - attributes[name] = value - } - for _, value := range timeseriesElement.MetadataValues { - name := metadataPrefix + *value.Name.Value - attributes[name] = value.Value - } - if s.cfg.AppendTagsAsAttributes { - for tagName, value := range res.tags { - name := tagPrefix + tagName + if metricValues.ResourceID != nil { + res := s.resources[*metricValues.ResourceID] + attributes := map[string]*string{} + for name, value := range res.attributes { attributes[name] = value } - } - for _, metricValue := range timeseriesElement.Data { - s.processQueryTimeseriesData(*metricValues.ResourceID, metric, metricValue, attributes) + for _, value := range timeseriesElement.MetadataValues { + name := metadataPrefix + *value.Name.Value + attributes[name] = value.Value + } + if s.cfg.AppendTagsAsAttributes { + for tagName, value := range res.tags { + name := tagPrefix + tagName + attributes[name] = value + } + } + for _, metricValue := range timeseriesElement.Data { + s.processQueryTimeseriesData(*metricValues.ResourceID, metric, metricValue, attributes) + } } } } @@ -419,7 +422,7 @@ func (s *azureBatchScraper) processQueryTimeseriesData( s.mutex.Lock() defer s.mutex.Unlock() - ts := pcommon.NewTimestampFromTime(time.Now()) + ts := pcommon.NewTimestampFromTime(*metricValue.TimeStamp) aggregationsData := []struct { name string From 1f7baef167c35e80d50d9927350962f176f90484 Mon Sep 17 00:00:00 2001 From: Alban HURTAUD Date: Mon, 29 Jan 2024 11:12:07 +0100 Subject: [PATCH 04/15] Implemented automatic discovery of subscriptions --- cmd/configschema/go.mod | 5 +- cmd/configschema/go.sum | 6 +- cmd/otelcontribcol/go.mod | 1 + cmd/otelcontribcol/go.sum | 2 + go.mod | 1 + go.sum | 2 + receiver/azuremonitorreceiver/config.go | 7 +- receiver/azuremonitorreceiver/go.mod | 1 + receiver/azuremonitorreceiver/go.sum | 2 + .../azuremonitorreceiver/scraper_batch.go | 283 +++++++++++------- 10 files changed, 188 insertions(+), 122 deletions(-) diff --git a/cmd/configschema/go.mod b/cmd/configschema/go.mod index 57f35a3365cb..bd34e4535490 100644 --- a/cmd/configschema/go.mod +++ b/cmd/configschema/go.mod @@ -202,9 +202,10 @@ require ( github.com/Azure/azure-sdk-for-go v68.0.0+incompatible // indirect github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.1 // indirect - github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1 // indirect + github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v4 v4.2.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2.1 // indirect + github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.1 // indirect github.com/Azure/azure-storage-queue-go v0.0.0-20230531184854-c06a8eff66fe // indirect github.com/Azure/go-amqp v1.0.2 // indirect @@ -357,7 +358,7 @@ require ( github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang-jwt/jwt/v4 v4.5.0 // indirect github.com/golang-jwt/jwt/v5 v5.1.0 // indirect - github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect + github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/mock v1.6.0 // indirect diff --git a/cmd/configschema/go.sum b/cmd/configschema/go.sum index 27c319e72a6b..dd417d2a05e9 100644 --- a/cmd/configschema/go.sum +++ b/cmd/configschema/go.sum @@ -109,6 +109,8 @@ github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2.1/go.mod h1:Bzf34hhAE9NSxailk8xVeLEZbUjOXcC+GnU1mMKdhLw= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0 h1:Dd+RhdJn0OTtVGaeDLZpcumkIVCtA/3/Fo42+eoYvVM= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0/go.mod h1:5kakwfW5CjC9KK+Q4wjXAg+ShuIm2mBMua0ZFj2C8PE= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0 h1:wxQx2Bt4xzPIKvW59WQf1tJNx/ZZKPfN+EhPX3Z6CYY= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0/go.mod h1:TpiwjwnW/khS0LKs4vW5UmmT9OWcxaveS8U7+tlknzo= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.5.0 h1:AifHbc4mg0x9zW52WOpKbsHaDKuRhlI7TVl47thgQ70= github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/azkeys v1.0.0 h1:yfJe15aSwEQ6Oo6J+gdfdulPNoZ3TEhmbhLIoxZcA+U= github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v0.8.0 h1:T028gtTPiYt/RMUfs8nVsAL7FDQrfLlrm/NnRG/zcC4= @@ -685,8 +687,8 @@ github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOW github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang-jwt/jwt/v5 v5.1.0 h1:UGKbA/IPjtS6zLcdB7i5TyACMgSbOTiR8qzXgw8HWQU= github.com/golang-jwt/jwt/v5 v5.1.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= -github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY= -github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= +github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= +github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= github.com/golang-sql/sqlexp v0.1.0/go.mod h1:J4ad9Vo8ZCWQ2GMrC4UCQy1JpCbwU9m3EOqtpKwwwHI= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= diff --git a/cmd/otelcontribcol/go.mod b/cmd/otelcontribcol/go.mod index 3b2e76ff349b..41a71fbeea61 100644 --- a/cmd/otelcontribcol/go.mod +++ b/cmd/otelcontribcol/go.mod @@ -251,6 +251,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.1 // indirect github.com/Azure/azure-storage-queue-go v0.0.0-20230531184854-c06a8eff66fe // indirect github.com/Azure/go-amqp v1.0.2 // indirect diff --git a/cmd/otelcontribcol/go.sum b/cmd/otelcontribcol/go.sum index 71c67f6a2bb9..138060162d78 100644 --- a/cmd/otelcontribcol/go.sum +++ b/cmd/otelcontribcol/go.sum @@ -108,6 +108,8 @@ github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2.1/go.mod h1:Bzf34hhAE9NSxailk8xVeLEZbUjOXcC+GnU1mMKdhLw= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0 h1:Dd+RhdJn0OTtVGaeDLZpcumkIVCtA/3/Fo42+eoYvVM= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0/go.mod h1:5kakwfW5CjC9KK+Q4wjXAg+ShuIm2mBMua0ZFj2C8PE= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0 h1:wxQx2Bt4xzPIKvW59WQf1tJNx/ZZKPfN+EhPX3Z6CYY= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0/go.mod h1:TpiwjwnW/khS0LKs4vW5UmmT9OWcxaveS8U7+tlknzo= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.5.0 h1:AifHbc4mg0x9zW52WOpKbsHaDKuRhlI7TVl47thgQ70= github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/azkeys v1.0.0 h1:yfJe15aSwEQ6Oo6J+gdfdulPNoZ3TEhmbhLIoxZcA+U= github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v0.8.0 h1:T028gtTPiYt/RMUfs8nVsAL7FDQrfLlrm/NnRG/zcC4= diff --git a/go.mod b/go.mod index fc8f0620242d..cb1f74f58bb5 100644 --- a/go.mod +++ b/go.mod @@ -221,6 +221,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.1 // indirect github.com/Azure/azure-storage-queue-go v0.0.0-20230531184854-c06a8eff66fe // indirect github.com/Azure/go-amqp v1.0.2 // indirect diff --git a/go.sum b/go.sum index a41b578dce53..0a7d73259811 100644 --- a/go.sum +++ b/go.sum @@ -109,6 +109,8 @@ github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2.1/go.mod h1:Bzf34hhAE9NSxailk8xVeLEZbUjOXcC+GnU1mMKdhLw= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0 h1:Dd+RhdJn0OTtVGaeDLZpcumkIVCtA/3/Fo42+eoYvVM= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0/go.mod h1:5kakwfW5CjC9KK+Q4wjXAg+ShuIm2mBMua0ZFj2C8PE= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0 h1:wxQx2Bt4xzPIKvW59WQf1tJNx/ZZKPfN+EhPX3Z6CYY= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0/go.mod h1:TpiwjwnW/khS0LKs4vW5UmmT9OWcxaveS8U7+tlknzo= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.5.0 h1:AifHbc4mg0x9zW52WOpKbsHaDKuRhlI7TVl47thgQ70= github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/azkeys v1.0.0 h1:yfJe15aSwEQ6Oo6J+gdfdulPNoZ3TEhmbhLIoxZcA+U= github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v0.8.0 h1:T028gtTPiYt/RMUfs8nVsAL7FDQrfLlrm/NnRG/zcC4= diff --git a/receiver/azuremonitorreceiver/config.go b/receiver/azuremonitorreceiver/config.go index 16aabd6f1254..781cefa7acaf 100644 --- a/receiver/azuremonitorreceiver/config.go +++ b/receiver/azuremonitorreceiver/config.go @@ -26,7 +26,7 @@ var ( errMissingClientSecret = errors.New(`ClientSecret" is not specified in config`) errMissingFedTokenFile = errors.New(`FederatedTokenFile is not specified in config`) errInvalidCloud = errors.New(`Cloud" is invalid`) - errInvalidRegion = errors.New("`Region` is not specifiec in config`") + errInvalidRegion = errors.New("`Region` is not specified in config`") monitorServices = []string{ "Microsoft.EventGrid/eventSubscriptions", @@ -246,6 +246,7 @@ type Config struct { MaximumNumberOfMetricsInACall int `mapstructure:"maximum_number_of_metrics_in_a_call"` AppendTagsAsAttributes bool `mapstructure:"append_tags_as_attributes"` UseBatchApi bool `mapstructure:"use_batch_api"` + DiscoverSubscription bool `mapstructure:"discover_subscriptions"` Region string `mapstructure:"region"` MaximumNumberOfDimensionsInACall int `mapstructure:"maximum_number_of_dimensions_in_a_call"` } @@ -257,7 +258,7 @@ const ( // Validate validates the configuration by checking for missing or invalid fields func (c Config) Validate() (err error) { - if c.SubscriptionID == "" { + if c.SubscriptionID == "" && !c.DiscoverSubscription { err = multierr.Append(err, errMissingSubscriptionID) } @@ -294,7 +295,7 @@ func (c Config) Validate() (err error) { err = multierr.Append(err, errInvalidCloud) } - if c.UseBatchApi && c.Region == "" { + if c.UseBatchApi && c.Region == "" && !c.DiscoverSubscription { err = multierr.Append(err, errInvalidRegion) } diff --git a/receiver/azuremonitorreceiver/go.mod b/receiver/azuremonitorreceiver/go.mod index 0b4885908bfc..b7202897f58f 100644 --- a/receiver/azuremonitorreceiver/go.mod +++ b/receiver/azuremonitorreceiver/go.mod @@ -8,6 +8,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0 + github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0 github.com/google/go-cmp v0.6.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.92.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.92.0 diff --git a/receiver/azuremonitorreceiver/go.sum b/receiver/azuremonitorreceiver/go.sum index 72b4d7c62205..fed4631ffe6b 100644 --- a/receiver/azuremonitorreceiver/go.sum +++ b/receiver/azuremonitorreceiver/go.sum @@ -14,6 +14,8 @@ github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0/go.mod h1:jj6P8ybImR+5topJ+eH6fgcemSFBmU6/6bFF8KkwuDI= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0 h1:Dd+RhdJn0OTtVGaeDLZpcumkIVCtA/3/Fo42+eoYvVM= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0/go.mod h1:5kakwfW5CjC9KK+Q4wjXAg+ShuIm2mBMua0ZFj2C8PE= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0 h1:wxQx2Bt4xzPIKvW59WQf1tJNx/ZZKPfN+EhPX3Z6CYY= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0/go.mod h1:TpiwjwnW/khS0LKs4vW5UmmT9OWcxaveS8U7+tlknzo= github.com/AzureAD/microsoft-authentication-library-for-go v1.2.0 h1:hVeq+yCyUi+MsoO/CU95yqCIcdzra5ovzk8Q2BBpV2M= github.com/AzureAD/microsoft-authentication-library-for-go v1.2.0/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= diff --git a/receiver/azuremonitorreceiver/scraper_batch.go b/receiver/azuremonitorreceiver/scraper_batch.go index 78303b0b092b..5a3d61f34450 100644 --- a/receiver/azuremonitorreceiver/scraper_batch.go +++ b/receiver/azuremonitorreceiver/scraper_batch.go @@ -15,11 +15,13 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azcore/arm" "github.com/Azure/azure-sdk-for-go/sdk/azcore/cloud" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery" "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor" "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources" + "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" @@ -44,30 +46,31 @@ func newBatchScraper(conf *Config, settings receiver.CreateSettings) *azureBatch mb: metadata.NewMetricsBuilder(conf.MetricsBuilderConfig, settings), azIDCredentialsFunc: azidentity.NewClientSecretCredential, azIDWorkloadFunc: azidentity.NewWorkloadIdentityCredential, - armClientFunc: armresources.NewClient, armMonitorDefinitionsClientFunc: armmonitor.NewMetricDefinitionsClient, azQueryMetricsBatchClientFunc: azquery.NewMetricsBatchClient, mutex: &sync.Mutex{}, } } -type azureBatchScraper struct { - cred azcore.TokenCredential - - clientResources ArmClient - clientMetricsDefinitions MetricsDefinitionsClientInterface - clientMetricsBatchValues MetricBatchValuesClient +type ArmsubscriptionClient interface { + NewListPager(options *armsubscriptions.ClientListOptions) *runtime.Pager[armsubscriptions.ClientListResponse] + NewListLocationsPager(subscriptionID string, options *armsubscriptions.ClientListLocationsOptions) *runtime.Pager[armsubscriptions.ClientListLocationsResponse] +} +type azureBatchScraper struct { + cred azcore.TokenCredential cfg *Config settings component.TelemetrySettings - resources map[string]*azureResource - resourceTypes map[string]*azureType + discoveredSubscriptions map[string]*armsubscriptions.Subscription + regionsFromSubscriptions map[string][]string + resources map[string]map[string]*azureResource + resourceTypes map[string]map[string]*azureType resourcesUpdated time.Time mb *metadata.MetricsBuilder azIDCredentialsFunc func(string, string, string, *azidentity.ClientSecretCredentialOptions) (*azidentity.ClientSecretCredential, error) azIDWorkloadFunc func(options *azidentity.WorkloadIdentityCredentialOptions) (*azidentity.WorkloadIdentityCredential, error) armClientOptions *arm.ClientOptions - armClientFunc func(string, azcore.TokenCredential, *arm.ClientOptions) (*armresources.Client, error) + armSubscriptionclient ArmsubscriptionClient armMonitorDefinitionsClientFunc func(string, azcore.TokenCredential, *arm.ClientOptions) (*armmonitor.MetricDefinitionsClient, error) azQueryMetricsBatchClientOptions *azquery.MetricsBatchClientOptions azQueryMetricsBatchClientFunc func(string, azcore.TokenCredential, *azquery.MetricsBatchClientOptions) (*azquery.MetricsBatchClient, error) @@ -109,13 +112,18 @@ func (s *azureBatchScraper) getAzQueryMetricsBatchClientOptions() *azquery.Metri return &options } -func (s *azureBatchScraper) getArmClient() ArmClient { - client, _ := s.armClientFunc(s.cfg.SubscriptionID, s.cred, s.armClientOptions) +func (s *azureBatchScraper) getArmsubscriptionClient() ArmsubscriptionClient { + client, _ := armsubscriptions.NewClient(s.cred, s.armClientOptions) + return client +} + +func (s *azureBatchScraper) getArmClient(subscriptionId string) ArmClient { + client, _ := armresources.NewClient(subscriptionId, s.cred, s.armClientOptions) return client } -func (s *azureBatchScraper) getMetricsDefinitionsClient() MetricsDefinitionsClientInterface { - client, _ := s.armMonitorDefinitionsClientFunc(s.cfg.SubscriptionID, s.cred, s.armClientOptions) +func (s *azureBatchScraper) getMetricsDefinitionsClient(subscriptionId string) MetricsDefinitionsClientInterface { + client, _ := s.armMonitorDefinitionsClientFunc(subscriptionId, s.cred, s.armClientOptions) return client } @@ -125,8 +133,8 @@ type MetricBatchValuesClient interface { ) } -func (s *azureBatchScraper) GetMetricsBatchValuesClient() MetricBatchValuesClient { - endpoint := "https://" + s.cfg.Region + ".metrics.monitor.azure.com" +func (s *azureBatchScraper) GetMetricsBatchValuesClient(region string) MetricBatchValuesClient { + endpoint := "https://" + region + ".metrics.monitor.azure.com" s.settings.Logger.Info("Batch Endpoint", zap.String("endpoint", endpoint)) client, _ := azquery.NewMetricsBatchClient(endpoint, s.cred, s.azQueryMetricsBatchClientOptions) return client @@ -138,13 +146,12 @@ func (s *azureBatchScraper) start(_ context.Context, _ component.Host) (err erro } s.armClientOptions = s.getArmClientOptions() - s.clientResources = s.getArmClient() - s.clientMetricsDefinitions = s.getMetricsDefinitionsClient() s.azQueryMetricsBatchClientOptions = s.getAzQueryMetricsBatchClientOptions() - s.clientMetricsBatchValues = s.GetMetricsBatchValuesClient() - - s.resources = map[string]*azureResource{} - s.resourceTypes = map[string]*azureType{} + s.armSubscriptionclient = s.getArmsubscriptionClient() + s.resources = map[string]map[string]*azureResource{} + s.resourceTypes = map[string]map[string]*azureType{} + s.discoveredSubscriptions = map[string]*armsubscriptions.Subscription{} + s.regionsFromSubscriptions = map[string][]string{} return } @@ -166,44 +173,86 @@ func (s *azureBatchScraper) loadCredentials() (err error) { } func (s *azureBatchScraper) scrape(ctx context.Context) (pmetric.Metrics, error) { + if !s.cfg.DiscoverSubscription { + s.resources[s.cfg.SubscriptionID] = make(map[string]*azureResource) + s.resourceTypes[s.cfg.SubscriptionID] = make(map[string]*azureType) + s.discoveredSubscriptions[s.cfg.SubscriptionID] = &armsubscriptions.Subscription{ + ID: &s.cfg.SubscriptionID, + DisplayName: &s.cfg.SubscriptionID, + } + } else { + s.getSubscriptionsAndRegions(ctx) + } + + for subscriptionid, subscription := range s.discoveredSubscriptions { - s.getResources(ctx) - s.settings.Logger.Info("scrape", zap.Any("s.resourceTypes", s.resourceTypes)) + s.getResources(ctx, subscriptionid) - resourceTypesWithDefinitions := make(chan string) - go func() { - defer close(resourceTypesWithDefinitions) - for resourceType := range s.resourceTypes { - s.getResourceMetricsDefinitionsByType(ctx, resourceType) - resourceTypesWithDefinitions <- resourceType + resourceTypesWithDefinitions := make(chan string) + go func() { + defer close(resourceTypesWithDefinitions) + for resourceType := range s.resourceTypes[subscriptionid] { + s.getResourceMetricsDefinitionsByType(ctx, subscription, resourceType) + resourceTypesWithDefinitions <- resourceType + } + }() + + var wg sync.WaitGroup + for resourceType := range resourceTypesWithDefinitions { + wg.Add(1) + go func(resourceType string) { + defer wg.Done() + s.getBatchMetricsValues(ctx, subscription, resourceType) + }(resourceType) } - }() - - var wg sync.WaitGroup - for resourceType := range resourceTypesWithDefinitions { - wg.Add(1) - go func(resourceType string) { - defer wg.Done() - s.settings.Logger.Info("scrape", zap.String("resourceType", resourceType)) - s.settings.Logger.Info("scrape", zap.Any("s.resourceTypes[resourceType]", *s.resourceTypes[resourceType])) - s.getBatchMetricsValues(ctx, resourceType) - }(resourceType) + wg.Wait() + } - wg.Wait() + return s.mb.Emit(), nil +} + +func (s *azureBatchScraper) getSubscriptionsAndRegions(ctx context.Context) { + opts := &armsubscriptions.ClientListOptions{} + pager := s.armSubscriptionclient.NewListPager(opts) + + for pager.More() { + nextResult, err := pager.NextPage(ctx) + if err != nil { + s.settings.Logger.Error("failed to get Azure Subscriptions", zap.Error(err)) + return + } + + for _, subscription := range nextResult.Value { + s.resources[*subscription.SubscriptionID] = make(map[string]*azureResource) + s.resourceTypes[*subscription.SubscriptionID] = make(map[string]*azureType) + s.discoveredSubscriptions[*subscription.SubscriptionID] = subscription + + locationopts := &armsubscriptions.ClientListLocationsOptions{} + locationpager := s.armSubscriptionclient.NewListLocationsPager(*subscription.SubscriptionID, locationopts) + + for locationpager.More() { - return s.mb.Emit( - metadata.WithAzureMonitorSubscriptionID(s.cfg.SubscriptionID), - metadata.WithAzureMonitorTenantID(s.cfg.TenantID), - ), nil + nextLocationResult, err := locationpager.NextPage(ctx) + if err != nil { + s.settings.Logger.Error("failed to get Azure locations", zap.Error(err)) + return + } + for _, location := range nextLocationResult.Value { + s.regionsFromSubscriptions[*subscription.SubscriptionID] = append(s.regionsFromSubscriptions[*subscription.SubscriptionID], *location.Name) + } + } + } + } } -func (s *azureBatchScraper) getResources(ctx context.Context) { +func (s *azureBatchScraper) getResources(ctx context.Context, subscriptionId string) { if time.Since(s.resourcesUpdated).Seconds() < s.cfg.CacheResources { return } + clientResources := s.getArmClient(subscriptionId) existingResources := map[string]void{} - for id := range s.resources { + for id := range s.resources[subscriptionId] { existingResources[id] = void{} } @@ -213,7 +262,7 @@ func (s *azureBatchScraper) getResources(ctx context.Context) { } updatedTypes := map[string]*azureType{} - pager := s.clientResources.NewListPager(opts) + pager := clientResources.NewListPager(opts) for pager.More() { nextResult, err := pager.NextPage(ctx) @@ -223,7 +272,7 @@ func (s *azureBatchScraper) getResources(ctx context.Context) { } for _, resource := range nextResult.Value { - if _, ok := s.resources[*resource.ID]; !ok { + if _, ok := s.resources[subscriptionId][*resource.ID]; !ok { resourceGroup := getResourceGroupFromID(*resource.ID) attributes := map[string]*string{ attributeName: resource.Name, @@ -235,7 +284,7 @@ func (s *azureBatchScraper) getResources(ctx context.Context) { attributes[attributeLocation] = resource.Location } - s.resources[*resource.ID] = &azureResource{ + s.resources[subscriptionId][*resource.ID] = &azureResource{ attributes: attributes, tags: resource.Tags, } @@ -256,12 +305,12 @@ func (s *azureBatchScraper) getResources(ctx context.Context) { if len(existingResources) > 0 { for idToDelete := range existingResources { - delete(s.resources, idToDelete) + delete(s.resources[subscriptionId], idToDelete) } } s.resourcesUpdated = time.Now() - maps.Copy(s.resourceTypes, updatedTypes) + maps.Copy(s.resourceTypes[subscriptionId], updatedTypes) } func (s *azureBatchScraper) getResourcesFilter() string { @@ -278,20 +327,21 @@ func (s *azureBatchScraper) getResourcesFilter() string { return fmt.Sprintf("(resourceType eq '%s')%s", resourcesTypeFilter, resourcesGroupFilterString) } -func (s *azureBatchScraper) getResourceMetricsDefinitionsByType(ctx context.Context, resourceType string) { +func (s *azureBatchScraper) getResourceMetricsDefinitionsByType(ctx context.Context, subscription *armsubscriptions.Subscription, resourceType string) { - if time.Since(s.resourceTypes[resourceType].metricsDefinitionsUpdated).Seconds() < s.cfg.CacheResourcesDefinitions { + if time.Since(s.resourceTypes[*subscription.SubscriptionID][resourceType].metricsDefinitionsUpdated).Seconds() < s.cfg.CacheResourcesDefinitions { return } - s.resourceTypes[resourceType].metricsByCompositeKey = map[metricsCompositeKey]*azureResourceMetrics{} + s.resourceTypes[*subscription.SubscriptionID][resourceType].metricsByCompositeKey = map[metricsCompositeKey]*azureResourceMetrics{} - resourceIds := s.resourceTypes[resourceType].resourceIds + resourceIds := s.resourceTypes[*subscription.SubscriptionID][resourceType].resourceIds if len(resourceIds) == 0 && resourceIds[0] != nil { return } - pager := s.clientMetricsDefinitions.NewListPager(*resourceIds[0], nil) + clientMetricsDefinitions := s.getMetricsDefinitionsClient(*subscription.SubscriptionID) + pager := clientMetricsDefinitions.NewListPager(*resourceIds[0], nil) for pager.More() { nextResult, err := pager.NextPage(ctx) if err != nil { @@ -315,24 +365,24 @@ func (s *azureBatchScraper) getResourceMetricsDefinitionsByType(ctx context.Cont sort.Strings(dimensionsSlice) compositeKey.dimensions = strings.Join(dimensionsSlice, ",") } - s.storeMetricsDefinitionByType(resourceType, name, compositeKey) + s.storeMetricsDefinitionByType(*subscription.SubscriptionID, resourceType, name, compositeKey) } } - s.resourceTypes[resourceType].metricsDefinitionsUpdated = time.Now() + s.resourceTypes[*subscription.SubscriptionID][resourceType].metricsDefinitionsUpdated = time.Now() } -func (s *azureBatchScraper) storeMetricsDefinitionByType(resourceType string, name string, compositeKey metricsCompositeKey) { - if _, ok := s.resourceTypes[resourceType].metricsByCompositeKey[compositeKey]; ok { - s.resourceTypes[resourceType].metricsByCompositeKey[compositeKey].metrics = append( - s.resourceTypes[resourceType].metricsByCompositeKey[compositeKey].metrics, name, +func (s *azureBatchScraper) storeMetricsDefinitionByType(subscriptionid string, resourceType string, name string, compositeKey metricsCompositeKey) { + if _, ok := s.resourceTypes[subscriptionid][resourceType].metricsByCompositeKey[compositeKey]; ok { + s.resourceTypes[subscriptionid][resourceType].metricsByCompositeKey[compositeKey].metrics = append( + s.resourceTypes[subscriptionid][resourceType].metricsByCompositeKey[compositeKey].metrics, name, ) } else { - s.resourceTypes[resourceType].metricsByCompositeKey[compositeKey] = &azureResourceMetrics{metrics: []string{name}} + s.resourceTypes[subscriptionid][resourceType].metricsByCompositeKey[compositeKey] = &azureResourceMetrics{metrics: []string{name}} } } -func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, resourceType string) { - resType := *s.resourceTypes[resourceType] +func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, subscription *armsubscriptions.Subscription, resourceType string) { + resType := *s.resourceTypes[*subscription.SubscriptionID][resourceType] for compositeKey, metricsByGrain := range resType.metricsByCompositeKey { @@ -343,7 +393,6 @@ func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, resourceT now := time.Now().UTC() metricsByGrain.metricsValuesUpdated = now startTime := now.Add(time.Duration(-timeGrains[compositeKey.timeGrain]) * time.Second * 2) // times 2 because for some resources, data are missing for the very latest timestamp - s.settings.Logger.Info("getBatchMetricsValues", zap.String("resourceType", resourceType), zap.Any("metricNames", metricsByGrain.metrics), zap.Any("startTime", startTime), zap.Any("now", now), zap.String("timeGrain", compositeKey.timeGrain)) start := 0 for start < len(metricsByGrain.metrics) { @@ -352,57 +401,61 @@ func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, resourceT if end > len(metricsByGrain.metrics) { end = len(metricsByGrain.metrics) } + for _, region := range s.regionsFromSubscriptions[*subscription.SubscriptionID] { + clientMetrics := s.GetMetricsBatchValuesClient(region) + s.settings.Logger.Info("scrape", zap.String("subscription", *subscription.DisplayName), zap.String("resourceType", resourceType)) + response, err := clientMetrics.QueryBatch( + ctx, + *subscription.SubscriptionID, + resourceType, + metricsByGrain.metrics[start:end], + azquery.ResourceIDList{ResourceIDs: resType.resourceIds}, + &azquery.MetricsBatchClientQueryBatchOptions{ + Aggregation: to.SliceOfPtrs( + azquery.AggregationTypeAverage, + azquery.AggregationTypeMaximum, + azquery.AggregationTypeMinimum, + azquery.AggregationTypeTotal, + azquery.AggregationTypeCount, + ), + StartTime: to.Ptr(startTime.Format(time.RFC3339)), + EndTime: to.Ptr(now.Format(time.RFC3339)), + Interval: to.Ptr(compositeKey.timeGrain), + Top: to.Ptr(int32(s.cfg.MaximumNumberOfDimensionsInACall)), // Defaults to 10 (may be limiting results) + }, + ) + + if err != nil { + s.settings.Logger.Error("failed to get Azure Metrics values data", zap.String("subscription", *subscription.SubscriptionID), zap.String("region", region), zap.String("resourceType", resourceType), zap.Error(err)) + return + } - response, err := s.clientMetricsBatchValues.QueryBatch( - ctx, - s.cfg.SubscriptionID, - resourceType, - metricsByGrain.metrics[start:end], - azquery.ResourceIDList{ResourceIDs: resType.resourceIds}, - &azquery.MetricsBatchClientQueryBatchOptions{ - Aggregation: to.SliceOfPtrs( - azquery.AggregationTypeAverage, - azquery.AggregationTypeMaximum, - azquery.AggregationTypeMinimum, - azquery.AggregationTypeTotal, - azquery.AggregationTypeCount, - ), - StartTime: to.Ptr(startTime.Format(time.RFC3339)), - EndTime: to.Ptr(now.Format(time.RFC3339)), - Interval: to.Ptr(compositeKey.timeGrain), - Top: to.Ptr(int32(s.cfg.MaximumNumberOfDimensionsInACall)), // Defaults to 10 (may be limiting results) - }, - ) + start = end + for _, metricValues := range response.Values { + for _, metric := range metricValues.Values { - if err != nil { - s.settings.Logger.Error("failed to get Azure Metrics values data", zap.Error(err)) - return - } - - start = end - for _, metricValues := range response.Values { - for _, metric := range metricValues.Values { - - for _, timeseriesElement := range metric.TimeSeries { - if timeseriesElement.Data != nil { - if metricValues.ResourceID != nil { - res := s.resources[*metricValues.ResourceID] - attributes := map[string]*string{} - for name, value := range res.attributes { - attributes[name] = value - } - for _, value := range timeseriesElement.MetadataValues { - name := metadataPrefix + *value.Name.Value - attributes[name] = value.Value - } - if s.cfg.AppendTagsAsAttributes { - for tagName, value := range res.tags { - name := tagPrefix + tagName + for _, timeseriesElement := range metric.TimeSeries { + if timeseriesElement.Data != nil { + if metricValues.ResourceID != nil { + res := s.resources[*subscription.SubscriptionID][*metricValues.ResourceID] + attributes := map[string]*string{} + for name, value := range res.attributes { attributes[name] = value } - } - for _, metricValue := range timeseriesElement.Data { - s.processQueryTimeseriesData(*metricValues.ResourceID, metric, metricValue, attributes) + for _, value := range timeseriesElement.MetadataValues { + name := metadataPrefix + *value.Name.Value + attributes[name] = value.Value + } + if s.cfg.AppendTagsAsAttributes { + for tagName, value := range res.tags { + name := tagPrefix + tagName + attributes[name] = value + } + } + attributes["subscription"] = subscription.DisplayName + for _, metricValue := range timeseriesElement.Data { + s.processQueryTimeseriesData(*metricValues.ResourceID, metric, metricValue, attributes) + } } } } From 5b6aa848d6d7091791a91abd5efa55ab066c8b92 Mon Sep 17 00:00:00 2001 From: Alban HURTAUD Date: Mon, 29 Jan 2024 14:58:05 +0100 Subject: [PATCH 05/15] Fix concurrent multi goroutines --- .../azuremonitorreceiver/scraper_batch.go | 102 +++++++++--------- 1 file changed, 50 insertions(+), 52 deletions(-) diff --git a/receiver/azuremonitorreceiver/scraper_batch.go b/receiver/azuremonitorreceiver/scraper_batch.go index 5a3d61f34450..a57e2cc16375 100644 --- a/receiver/azuremonitorreceiver/scraper_batch.go +++ b/receiver/azuremonitorreceiver/scraper_batch.go @@ -62,7 +62,7 @@ type azureBatchScraper struct { cfg *Config settings component.TelemetrySettings discoveredSubscriptions map[string]*armsubscriptions.Subscription - regionsFromSubscriptions map[string][]string + regionsFromSubscriptions map[string]map[string]struct{} resources map[string]map[string]*azureResource resourceTypes map[string]map[string]*azureType resourcesUpdated time.Time @@ -140,7 +140,7 @@ func (s *azureBatchScraper) GetMetricsBatchValuesClient(region string) MetricBat return client } -func (s *azureBatchScraper) start(_ context.Context, _ component.Host) (err error) { +func (s *azureBatchScraper) start(ctx context.Context, _ component.Host) (err error) { if err = s.loadCredentials(); err != nil { return err } @@ -151,7 +151,18 @@ func (s *azureBatchScraper) start(_ context.Context, _ component.Host) (err erro s.resources = map[string]map[string]*azureResource{} s.resourceTypes = map[string]map[string]*azureType{} s.discoveredSubscriptions = map[string]*armsubscriptions.Subscription{} - s.regionsFromSubscriptions = map[string][]string{} + s.regionsFromSubscriptions = map[string]map[string]struct{}{} + + if !s.cfg.DiscoverSubscription { + s.resources[s.cfg.SubscriptionID] = make(map[string]*azureResource) + s.resourceTypes[s.cfg.SubscriptionID] = make(map[string]*azureType) + s.discoveredSubscriptions[s.cfg.SubscriptionID] = &armsubscriptions.Subscription{ + ID: &s.cfg.SubscriptionID, + DisplayName: &s.cfg.SubscriptionID, + } + } else { + s.getSubscriptions(ctx) + } return } @@ -173,45 +184,44 @@ func (s *azureBatchScraper) loadCredentials() (err error) { } func (s *azureBatchScraper) scrape(ctx context.Context) (pmetric.Metrics, error) { - if !s.cfg.DiscoverSubscription { - s.resources[s.cfg.SubscriptionID] = make(map[string]*azureResource) - s.resourceTypes[s.cfg.SubscriptionID] = make(map[string]*azureType) - s.discoveredSubscriptions[s.cfg.SubscriptionID] = &armsubscriptions.Subscription{ - ID: &s.cfg.SubscriptionID, - DisplayName: &s.cfg.SubscriptionID, - } - } else { - s.getSubscriptionsAndRegions(ctx) + if !(time.Since(s.resourcesUpdated).Seconds() < s.cfg.CacheResources) { + s.getSubscriptions(ctx) } - - for subscriptionid, subscription := range s.discoveredSubscriptions { - - s.getResources(ctx, subscriptionid) - - resourceTypesWithDefinitions := make(chan string) - go func() { - defer close(resourceTypesWithDefinitions) - for resourceType := range s.resourceTypes[subscriptionid] { - s.getResourceMetricsDefinitionsByType(ctx, subscription, resourceType) - resourceTypesWithDefinitions <- resourceType + var wg sync.WaitGroup + for _, subscription := range s.discoveredSubscriptions { + wg.Add(1) + go func(subscription *armsubscriptions.Subscription) { + defer wg.Done() + + s.getResources(ctx, *subscription.SubscriptionID) + resourceTypesWithDefinitions := make(chan string) + go func() { + defer close(resourceTypesWithDefinitions) + for resourceType := range s.resourceTypes[*subscription.SubscriptionID] { + s.getResourceMetricsDefinitionsByType(ctx, subscription, resourceType) + resourceTypesWithDefinitions <- resourceType + } + }() + + var wg2 sync.WaitGroup + for resourceType := range resourceTypesWithDefinitions { + wg2.Add(1) + go func(subscription *armsubscriptions.Subscription, resourceType string) { + defer wg2.Done() + s.getBatchMetricsValues(ctx, subscription, resourceType) + }(subscription, resourceType) } - }() - - var wg sync.WaitGroup - for resourceType := range resourceTypesWithDefinitions { - wg.Add(1) - go func(resourceType string) { - defer wg.Done() - s.getBatchMetricsValues(ctx, subscription, resourceType) - }(resourceType) - } - wg.Wait() + + wg2.Wait() + }(subscription) } + + wg.Wait() return s.mb.Emit(), nil } -func (s *azureBatchScraper) getSubscriptionsAndRegions(ctx context.Context) { +func (s *azureBatchScraper) getSubscriptions(ctx context.Context) { opts := &armsubscriptions.ClientListOptions{} pager := s.armSubscriptionclient.NewListPager(opts) @@ -226,21 +236,7 @@ func (s *azureBatchScraper) getSubscriptionsAndRegions(ctx context.Context) { s.resources[*subscription.SubscriptionID] = make(map[string]*azureResource) s.resourceTypes[*subscription.SubscriptionID] = make(map[string]*azureType) s.discoveredSubscriptions[*subscription.SubscriptionID] = subscription - - locationopts := &armsubscriptions.ClientListLocationsOptions{} - locationpager := s.armSubscriptionclient.NewListLocationsPager(*subscription.SubscriptionID, locationopts) - - for locationpager.More() { - - nextLocationResult, err := locationpager.NextPage(ctx) - if err != nil { - s.settings.Logger.Error("failed to get Azure locations", zap.Error(err)) - return - } - for _, location := range nextLocationResult.Value { - s.regionsFromSubscriptions[*subscription.SubscriptionID] = append(s.regionsFromSubscriptions[*subscription.SubscriptionID], *location.Name) - } - } + s.regionsFromSubscriptions[*subscription.SubscriptionID] = make(map[string]struct{}) } } } @@ -272,6 +268,7 @@ func (s *azureBatchScraper) getResources(ctx context.Context, subscriptionId str } for _, resource := range nextResult.Value { + if _, ok := s.resources[subscriptionId][*resource.ID]; !ok { resourceGroup := getResourceGroupFromID(*resource.ID) attributes := map[string]*string{ @@ -281,6 +278,7 @@ func (s *azureBatchScraper) getResources(ctx context.Context, subscriptionId str } if resource.Location != nil { + s.regionsFromSubscriptions[subscriptionId][*resource.Location] = struct{}{} attributes[attributeLocation] = resource.Location } @@ -401,9 +399,9 @@ func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, subscript if end > len(metricsByGrain.metrics) { end = len(metricsByGrain.metrics) } - for _, region := range s.regionsFromSubscriptions[*subscription.SubscriptionID] { + for region := range s.regionsFromSubscriptions[*subscription.SubscriptionID] { clientMetrics := s.GetMetricsBatchValuesClient(region) - s.settings.Logger.Info("scrape", zap.String("subscription", *subscription.DisplayName), zap.String("resourceType", resourceType)) + s.settings.Logger.Info("scrape", zap.String("subscription", *subscription.DisplayName), zap.String("resourceType", resourceType), zap.String("region", region)) response, err := clientMetrics.QueryBatch( ctx, *subscription.SubscriptionID, From 681d03e107dfba0fca3c6e8adb4cead3b5bd7011 Mon Sep 17 00:00:00 2001 From: Alban HURTAUD Date: Tue, 30 Jan 2024 13:20:21 +0100 Subject: [PATCH 06/15] Add timegrain as attrivutes --- receiver/azuremonitorreceiver/scraper_batch.go | 1 + 1 file changed, 1 insertion(+) diff --git a/receiver/azuremonitorreceiver/scraper_batch.go b/receiver/azuremonitorreceiver/scraper_batch.go index a57e2cc16375..9da372e060d2 100644 --- a/receiver/azuremonitorreceiver/scraper_batch.go +++ b/receiver/azuremonitorreceiver/scraper_batch.go @@ -451,6 +451,7 @@ func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, subscript } } attributes["subscription"] = subscription.DisplayName + attributes["timegrain"] = &compositeKey.timeGrain for _, metricValue := range timeseriesElement.Data { s.processQueryTimeseriesData(*metricValues.ResourceID, metric, metricValue, attributes) } From 9b70a3eccf4a42800bb7365481689eaf00104a40 Mon Sep 17 00:00:00 2001 From: Alban HURTAUD Date: Tue, 30 Jan 2024 13:56:56 +0100 Subject: [PATCH 07/15] retrieve twice timerange only for PT1M and always query (do not wait timegrain) --- receiver/azuremonitorreceiver/scraper_batch.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/receiver/azuremonitorreceiver/scraper_batch.go b/receiver/azuremonitorreceiver/scraper_batch.go index 9da372e060d2..c4b8886e96bc 100644 --- a/receiver/azuremonitorreceiver/scraper_batch.go +++ b/receiver/azuremonitorreceiver/scraper_batch.go @@ -383,14 +383,13 @@ func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, subscript resType := *s.resourceTypes[*subscription.SubscriptionID][resourceType] for compositeKey, metricsByGrain := range resType.metricsByCompositeKey { - - if time.Since(metricsByGrain.metricsValuesUpdated).Seconds() < float64(timeGrains[compositeKey.timeGrain]) { - continue - } - now := time.Now().UTC() metricsByGrain.metricsValuesUpdated = now - startTime := now.Add(time.Duration(-timeGrains[compositeKey.timeGrain]) * time.Second * 2) // times 2 because for some resources, data are missing for the very latest timestamp + + startTime := now.Add(time.Duration(-timeGrains[compositeKey.timeGrain]) * time.Second) + if compositeKey.timeGrain == "PT1M" { + startTime = now.Add(time.Duration(-timeGrains[compositeKey.timeGrain]) * time.Second) // times 2 because for some resources, data are missing for the very latest timestamp + } start := 0 for start < len(metricsByGrain.metrics) { From d1d636a2ac4889c3056653d7f26b57a1a08bc6bb Mon Sep 17 00:00:00 2001 From: Alban HURTAUD Date: Wed, 31 Jan 2024 09:13:22 +0100 Subject: [PATCH 08/15] FIX timegrain PT1M get multiple time --- receiver/azuremonitorreceiver/scraper_batch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/azuremonitorreceiver/scraper_batch.go b/receiver/azuremonitorreceiver/scraper_batch.go index c4b8886e96bc..ce6ad25800fb 100644 --- a/receiver/azuremonitorreceiver/scraper_batch.go +++ b/receiver/azuremonitorreceiver/scraper_batch.go @@ -388,7 +388,7 @@ func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, subscript startTime := now.Add(time.Duration(-timeGrains[compositeKey.timeGrain]) * time.Second) if compositeKey.timeGrain == "PT1M" { - startTime = now.Add(time.Duration(-timeGrains[compositeKey.timeGrain]) * time.Second) // times 2 because for some resources, data are missing for the very latest timestamp + startTime = now.Add(time.Duration(-timeGrains[compositeKey.timeGrain]) * time.Second * 2) // times 2 because for some resources, data are missing for the very latest timestamp } start := 0 From 5fc00964fbfed24b329b58627538b4227d709e3f Mon Sep 17 00:00:00 2001 From: Alban HURTAUD Date: Wed, 31 Jan 2024 16:20:58 +0100 Subject: [PATCH 09/15] Fix limits of 50 resources per getBatch API calls --- .../azuremonitorreceiver/scraper_batch.go | 112 ++++++++++-------- 1 file changed, 61 insertions(+), 51 deletions(-) diff --git a/receiver/azuremonitorreceiver/scraper_batch.go b/receiver/azuremonitorreceiver/scraper_batch.go index ce6ad25800fb..61d22470dc6e 100644 --- a/receiver/azuremonitorreceiver/scraper_batch.go +++ b/receiver/azuremonitorreceiver/scraper_batch.go @@ -399,60 +399,70 @@ func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, subscript end = len(metricsByGrain.metrics) } for region := range s.regionsFromSubscriptions[*subscription.SubscriptionID] { - clientMetrics := s.GetMetricsBatchValuesClient(region) - s.settings.Logger.Info("scrape", zap.String("subscription", *subscription.DisplayName), zap.String("resourceType", resourceType), zap.String("region", region)) - response, err := clientMetrics.QueryBatch( - ctx, - *subscription.SubscriptionID, - resourceType, - metricsByGrain.metrics[start:end], - azquery.ResourceIDList{ResourceIDs: resType.resourceIds}, - &azquery.MetricsBatchClientQueryBatchOptions{ - Aggregation: to.SliceOfPtrs( - azquery.AggregationTypeAverage, - azquery.AggregationTypeMaximum, - azquery.AggregationTypeMinimum, - azquery.AggregationTypeTotal, - azquery.AggregationTypeCount, - ), - StartTime: to.Ptr(startTime.Format(time.RFC3339)), - EndTime: to.Ptr(now.Format(time.RFC3339)), - Interval: to.Ptr(compositeKey.timeGrain), - Top: to.Ptr(int32(s.cfg.MaximumNumberOfDimensionsInACall)), // Defaults to 10 (may be limiting results) - }, - ) - - if err != nil { - s.settings.Logger.Error("failed to get Azure Metrics values data", zap.String("subscription", *subscription.SubscriptionID), zap.String("region", region), zap.String("resourceType", resourceType), zap.Error(err)) - return - } - start = end - for _, metricValues := range response.Values { - for _, metric := range metricValues.Values { - - for _, timeseriesElement := range metric.TimeSeries { - if timeseriesElement.Data != nil { - if metricValues.ResourceID != nil { - res := s.resources[*subscription.SubscriptionID][*metricValues.ResourceID] - attributes := map[string]*string{} - for name, value := range res.attributes { - attributes[name] = value - } - for _, value := range timeseriesElement.MetadataValues { - name := metadataPrefix + *value.Name.Value - attributes[name] = value.Value - } - if s.cfg.AppendTagsAsAttributes { - for tagName, value := range res.tags { - name := tagPrefix + tagName + start_resources := 0 + for start_resources < len(resType.resourceIds) { + + end_resources := start_resources + 50 // getBatch API is limited to 50 resources max + if end_resources > len(resType.resourceIds) { + end_resources = len(resType.resourceIds) + } + + clientMetrics := s.GetMetricsBatchValuesClient(region) + s.settings.Logger.Info("scrape", zap.String("subscription", *subscription.DisplayName), zap.String("resourceType", resourceType), zap.String("region", region)) + response, err := clientMetrics.QueryBatch( + ctx, + *subscription.SubscriptionID, + resourceType, + metricsByGrain.metrics[start:end], + azquery.ResourceIDList{ResourceIDs: resType.resourceIds[start_resources:end_resources]}, + &azquery.MetricsBatchClientQueryBatchOptions{ + Aggregation: to.SliceOfPtrs( + azquery.AggregationTypeAverage, + azquery.AggregationTypeMaximum, + azquery.AggregationTypeMinimum, + azquery.AggregationTypeTotal, + azquery.AggregationTypeCount, + ), + StartTime: to.Ptr(startTime.Format(time.RFC3339)), + EndTime: to.Ptr(now.Format(time.RFC3339)), + Interval: to.Ptr(compositeKey.timeGrain), + Top: to.Ptr(int32(s.cfg.MaximumNumberOfDimensionsInACall)), // Defaults to 10 (may be limiting results) + }, + ) + + if err != nil { + s.settings.Logger.Error("failed to get Azure Metrics values data", zap.String("subscription", *subscription.SubscriptionID), zap.String("region", region), zap.String("resourceType", resourceType), zap.Error(err)) + return + } + + start = end + for _, metricValues := range response.Values { + for _, metric := range metricValues.Values { + + for _, timeseriesElement := range metric.TimeSeries { + if timeseriesElement.Data != nil { + if metricValues.ResourceID != nil { + res := s.resources[*subscription.SubscriptionID][*metricValues.ResourceID] + attributes := map[string]*string{} + for name, value := range res.attributes { attributes[name] = value } - } - attributes["subscription"] = subscription.DisplayName - attributes["timegrain"] = &compositeKey.timeGrain - for _, metricValue := range timeseriesElement.Data { - s.processQueryTimeseriesData(*metricValues.ResourceID, metric, metricValue, attributes) + for _, value := range timeseriesElement.MetadataValues { + name := metadataPrefix + *value.Name.Value + attributes[name] = value.Value + } + if s.cfg.AppendTagsAsAttributes { + for tagName, value := range res.tags { + name := tagPrefix + tagName + attributes[name] = value + } + } + attributes["subscription"] = subscription.DisplayName + attributes["timegrain"] = &compositeKey.timeGrain + for _, metricValue := range timeseriesElement.Data { + s.processQueryTimeseriesData(*metricValues.ResourceID, metric, metricValue, attributes) + } } } } From 8670727fd0601e0abc494815d500fd390ad867b1 Mon Sep 17 00:00:00 2001 From: Alban HURTAUD Date: Thu, 1 Feb 2024 10:25:28 +0100 Subject: [PATCH 10/15] Retreive 4 timeranges for metrics and keep only latest timestamp containing metrics values --- receiver/azuremonitorreceiver/scraper_batch.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/receiver/azuremonitorreceiver/scraper_batch.go b/receiver/azuremonitorreceiver/scraper_batch.go index 61d22470dc6e..3f58de82cde1 100644 --- a/receiver/azuremonitorreceiver/scraper_batch.go +++ b/receiver/azuremonitorreceiver/scraper_batch.go @@ -386,10 +386,7 @@ func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, subscript now := time.Now().UTC() metricsByGrain.metricsValuesUpdated = now - startTime := now.Add(time.Duration(-timeGrains[compositeKey.timeGrain]) * time.Second) - if compositeKey.timeGrain == "PT1M" { - startTime = now.Add(time.Duration(-timeGrains[compositeKey.timeGrain]) * time.Second * 2) // times 2 because for some resources, data are missing for the very latest timestamp - } + startTime := now.Add(time.Duration(-timeGrains[compositeKey.timeGrain]) * time.Second * 4) // times 4 because for some resources, data are missing for the very latest timestamp. The processing will keep only the latest timestamp with data. start := 0 for start < len(metricsByGrain.metrics) { @@ -460,8 +457,13 @@ func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, subscript } attributes["subscription"] = subscription.DisplayName attributes["timegrain"] = &compositeKey.timeGrain - for _, metricValue := range timeseriesElement.Data { - s.processQueryTimeseriesData(*metricValues.ResourceID, metric, metricValue, attributes) + for i := len(timeseriesElement.Data) - 1; i >= 0; i-- { // reverse for loop because newest timestamp is at the end of the slice + metricValue := timeseriesElement.Data[i] + if metricValue.Average != nil { + s.processQueryTimeseriesData(*metricValues.ResourceID, metric, metricValue, attributes) + break + } + s.settings.Logger.Warn("No metric values found for resource.", zap.Any("metricValues", metricValues)) } } } From 83b673e5eb1516f457b283d7d4adfce55dca63f6 Mon Sep 17 00:00:00 2001 From: Alban HURTAUD Date: Thu, 1 Feb 2024 11:22:38 +0100 Subject: [PATCH 11/15] Remove misleading warning log --- receiver/azuremonitorreceiver/scraper_batch.go | 1 - 1 file changed, 1 deletion(-) diff --git a/receiver/azuremonitorreceiver/scraper_batch.go b/receiver/azuremonitorreceiver/scraper_batch.go index 3f58de82cde1..a3a91eba577a 100644 --- a/receiver/azuremonitorreceiver/scraper_batch.go +++ b/receiver/azuremonitorreceiver/scraper_batch.go @@ -463,7 +463,6 @@ func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, subscript s.processQueryTimeseriesData(*metricValues.ResourceID, metric, metricValue, attributes) break } - s.settings.Logger.Warn("No metric values found for resource.", zap.Any("metricValues", metricValues)) } } } From 6af67a01d90d3126b7ec0f127baf1ed9f688cf19 Mon Sep 17 00:00:00 2001 From: Alban HURTAUD Date: Thu, 1 Feb 2024 15:42:53 +0100 Subject: [PATCH 12/15] fix for loop iterations for resourceIds --- receiver/azuremonitorreceiver/scraper_batch.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/receiver/azuremonitorreceiver/scraper_batch.go b/receiver/azuremonitorreceiver/scraper_batch.go index a3a91eba577a..9838fdaa34e7 100644 --- a/receiver/azuremonitorreceiver/scraper_batch.go +++ b/receiver/azuremonitorreceiver/scraper_batch.go @@ -429,11 +429,12 @@ func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, subscript ) if err != nil { - s.settings.Logger.Error("failed to get Azure Metrics values data", zap.String("subscription", *subscription.SubscriptionID), zap.String("region", region), zap.String("resourceType", resourceType), zap.Error(err)) + s.settings.Logger.Error("failed to get Azure Metrics values data", zap.String("subscription", *subscription.SubscriptionID), zap.String("region", region), zap.String("resourceType", resourceType), zap.Any("metrics", metricsByGrain.metrics[start:end]), zap.Any("resources", resType.resourceIds[start_resources:end_resources]), zap.Error(err)) return } start = end + start_resources = end_resources for _, metricValues := range response.Values { for _, metric := range metricValues.Values { From 7c3ca41a2f8ecf1de4ae23ab9c6726ae82a5830d Mon Sep 17 00:00:00 2001 From: Alban HURTAUD Date: Thu, 1 Feb 2024 15:52:40 +0100 Subject: [PATCH 13/15] fix for loop iterations for resourceIds --- .../azuremonitorreceiver/scraper_batch.go | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/receiver/azuremonitorreceiver/scraper_batch.go b/receiver/azuremonitorreceiver/scraper_batch.go index 9838fdaa34e7..91c387b84ce4 100644 --- a/receiver/azuremonitorreceiver/scraper_batch.go +++ b/receiver/azuremonitorreceiver/scraper_batch.go @@ -387,15 +387,16 @@ func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, subscript metricsByGrain.metricsValuesUpdated = now startTime := now.Add(time.Duration(-timeGrains[compositeKey.timeGrain]) * time.Second * 4) // times 4 because for some resources, data are missing for the very latest timestamp. The processing will keep only the latest timestamp with data. + for region := range s.regionsFromSubscriptions[*subscription.SubscriptionID] { + clientMetrics := s.GetMetricsBatchValuesClient(region) - start := 0 - for start < len(metricsByGrain.metrics) { + start := 0 + for start < len(metricsByGrain.metrics) { - end := start + s.cfg.MaximumNumberOfMetricsInACall - if end > len(metricsByGrain.metrics) { - end = len(metricsByGrain.metrics) - } - for region := range s.regionsFromSubscriptions[*subscription.SubscriptionID] { + end := start + s.cfg.MaximumNumberOfMetricsInACall + if end > len(metricsByGrain.metrics) { + end = len(metricsByGrain.metrics) + } start_resources := 0 for start_resources < len(resType.resourceIds) { @@ -405,7 +406,6 @@ func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, subscript end_resources = len(resType.resourceIds) } - clientMetrics := s.GetMetricsBatchValuesClient(region) s.settings.Logger.Info("scrape", zap.String("subscription", *subscription.DisplayName), zap.String("resourceType", resourceType), zap.String("region", region)) response, err := clientMetrics.QueryBatch( ctx, @@ -433,8 +433,6 @@ func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, subscript return } - start = end - start_resources = end_resources for _, metricValues := range response.Values { for _, metric := range metricValues.Values { @@ -470,7 +468,9 @@ func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, subscript } } } + start_resources = end_resources } + start = end } } } From 6a9091f609402b085301411a07a93b3c692afff4 Mon Sep 17 00:00:00 2001 From: Alban HURTAUD Date: Thu, 1 Feb 2024 18:06:30 +0100 Subject: [PATCH 14/15] fix - do not return when getBatch returns an error --- receiver/azuremonitorreceiver/scraper_batch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/azuremonitorreceiver/scraper_batch.go b/receiver/azuremonitorreceiver/scraper_batch.go index 91c387b84ce4..2c20466d21d9 100644 --- a/receiver/azuremonitorreceiver/scraper_batch.go +++ b/receiver/azuremonitorreceiver/scraper_batch.go @@ -430,7 +430,7 @@ func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, subscript if err != nil { s.settings.Logger.Error("failed to get Azure Metrics values data", zap.String("subscription", *subscription.SubscriptionID), zap.String("region", region), zap.String("resourceType", resourceType), zap.Any("metrics", metricsByGrain.metrics[start:end]), zap.Any("resources", resType.resourceIds[start_resources:end_resources]), zap.Error(err)) - return + break } for _, metricValues := range response.Values { From e908a559da591ed9790818ac5562b8f037bc44c6 Mon Sep 17 00:00:00 2001 From: Chris Parkins Date: Mon, 12 Feb 2024 12:40:39 -0700 Subject: [PATCH 15/15] Azure Monitor Receiver: Update the logic for when to pull batches by calculating the nearest minute and then add a check to ensure the last update was not within the time grain. In addition add more logging around erorrs received from the Batch API by casting to the Azure Response Error type and logging the full error. --- .../azuremonitorreceiver/scraper_batch.go | 46 ++++++++++++++++--- 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/receiver/azuremonitorreceiver/scraper_batch.go b/receiver/azuremonitorreceiver/scraper_batch.go index 2c20466d21d9..bddb0f32d83c 100644 --- a/receiver/azuremonitorreceiver/scraper_batch.go +++ b/receiver/azuremonitorreceiver/scraper_batch.go @@ -5,6 +5,7 @@ package azuremonitorreceiver // import "github.com/open-telemetry/opentelemetry- import ( "context" + "errors" "fmt" "maps" "sort" @@ -135,7 +136,7 @@ type MetricBatchValuesClient interface { func (s *azureBatchScraper) GetMetricsBatchValuesClient(region string) MetricBatchValuesClient { endpoint := "https://" + region + ".metrics.monitor.azure.com" - s.settings.Logger.Info("Batch Endpoint", zap.String("endpoint", endpoint)) + s.settings.Logger.Debug("Batch Endpoint", zap.String("endpoint", endpoint)) client, _ := azquery.NewMetricsBatchClient(endpoint, s.cred, s.azQueryMetricsBatchClientOptions) return client } @@ -384,9 +385,23 @@ func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, subscript for compositeKey, metricsByGrain := range resType.metricsByCompositeKey { now := time.Now().UTC() - metricsByGrain.metricsValuesUpdated = now - startTime := now.Add(time.Duration(-timeGrains[compositeKey.timeGrain]) * time.Second * 4) // times 4 because for some resources, data are missing for the very latest timestamp. The processing will keep only the latest timestamp with data. + // Azure Metrics only allows querying every minute? + d := (60 * time.Second) + closestMinute := now.Round(d) + timeSinceClosestMinute := time.Since(closestMinute).Seconds() + if timeSinceClosestMinute < 0 { + // Skip this batch to avoid duplication (on first interval) + continue + } + + if time.Since(metricsByGrain.metricsValuesUpdated).Seconds() < float64(timeGrains[compositeKey.timeGrain]) { + continue + } + metricsByGrain.metricsValuesUpdated = closestMinute + + timeGrain := timeGrains[compositeKey.timeGrain] + startTime := closestMinute.Add(time.Duration(-timeGrain) * time.Second) for region := range s.regionsFromSubscriptions[*subscription.SubscriptionID] { clientMetrics := s.GetMetricsBatchValuesClient(region) @@ -400,13 +415,25 @@ func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, subscript start_resources := 0 for start_resources < len(resType.resourceIds) { - end_resources := start_resources + 50 // getBatch API is limited to 50 resources max if end_resources > len(resType.resourceIds) { end_resources = len(resType.resourceIds) } - s.settings.Logger.Info("scrape", zap.String("subscription", *subscription.DisplayName), zap.String("resourceType", resourceType), zap.String("region", region)) + s.settings.Logger.Debug( + "scrape", + zap.String("subscription", *subscription.DisplayName), + zap.String("region", region), + zap.String("resourceType", resourceType), + zap.Any("resourceIds", resType.resourceIds[start_resources:end_resources]), + zap.Any("metrics", metricsByGrain.metrics[start:end]), + zap.Int("start_resources", start_resources), + zap.Int("end_resrouces", end_resources), + zap.Time("startTime", startTime), + zap.Time("end_time", closestMinute), + zap.String("interval", compositeKey.timeGrain), + ) + response, err := clientMetrics.QueryBatch( ctx, *subscription.SubscriptionID, @@ -422,17 +449,22 @@ func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, subscript azquery.AggregationTypeCount, ), StartTime: to.Ptr(startTime.Format(time.RFC3339)), - EndTime: to.Ptr(now.Format(time.RFC3339)), + EndTime: to.Ptr(closestMinute.Format(time.RFC3339)), Interval: to.Ptr(compositeKey.timeGrain), Top: to.Ptr(int32(s.cfg.MaximumNumberOfDimensionsInACall)), // Defaults to 10 (may be limiting results) }, ) if err != nil { - s.settings.Logger.Error("failed to get Azure Metrics values data", zap.String("subscription", *subscription.SubscriptionID), zap.String("region", region), zap.String("resourceType", resourceType), zap.Any("metrics", metricsByGrain.metrics[start:end]), zap.Any("resources", resType.resourceIds[start_resources:end_resources]), zap.Error(err)) + var respErr *azcore.ResponseError + if errors.As(err, &respErr) { + s.settings.Logger.Error("failed to get Azure Metrics values data", zap.String("subscription", *subscription.SubscriptionID), zap.String("region", region), zap.String("resourceType", resourceType), zap.Any("metrics", metricsByGrain.metrics[start:end]), zap.Any("resources", resType.resourceIds[start_resources:end_resources]), zap.Any("response", response), zap.Error(err)) + } + s.settings.Logger.Error("failed to get Azure Metrics values data", zap.String("subscription", *subscription.SubscriptionID), zap.String("region", region), zap.String("resourceType", resourceType), zap.Any("metrics", metricsByGrain.metrics[start:end]), zap.Any("resources", resType.resourceIds[start_resources:end_resources]), zap.Any("response", response), zap.Any("responseError", respErr)) break } + //s.settings.Logger.Debug("scrape", zap.Any("response.Values", response.Values)) for _, metricValues := range response.Values { for _, metric := range metricValues.Values {