From 38241dc5fc4adf1271cdef418974fc0a9a2369de Mon Sep 17 00:00:00 2001 From: Gabriel Saratura Date: Tue, 28 Nov 2023 19:55:13 +0100 Subject: [PATCH] Refactor cloud billing collector --- go.mod | 2 +- go.sum | 2 + pkg/cloudscale/accumulate.go | 152 ---------- pkg/cloudscale/fixtures.go | 12 +- pkg/cloudscale/metrics.go | 152 ++++++++++ .../{accumulate_test.go => metrics_test.go} | 11 +- pkg/cloudscale/objectstorage.go | 37 +-- pkg/cmd/cloudscale.go | 95 +++--- pkg/cmd/exoscale.go | 271 +++++++++++------- pkg/exofixtures/types.go | 2 +- pkg/exoscale/dbaas.go | 83 +++--- pkg/exoscale/objectstorage.go | 90 +++--- pkg/exoscale/objectstorage_test.go | 2 +- pkg/kubernetes/client.go | 44 +-- pkg/odoo/odoo.go | 86 ++++++ pkg/prom/exporter.go | 78 ----- pkg/prom/prom.go | 54 ++++ 17 files changed, 681 insertions(+), 492 deletions(-) delete mode 100644 pkg/cloudscale/accumulate.go create mode 100644 pkg/cloudscale/metrics.go rename pkg/cloudscale/{accumulate_test.go => metrics_test.go} (89%) create mode 100644 pkg/odoo/odoo.go delete mode 100644 pkg/prom/exporter.go create mode 100644 pkg/prom/prom.go diff --git a/go.mod b/go.mod index e0b2718..b08c96c 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/vshn/provider-cloudscale v0.5.0 github.com/vshn/provider-exoscale v0.8.1 go.uber.org/zap v1.24.0 + golang.org/x/oauth2 v0.4.0 gopkg.in/dnaeon/go-vcr.v3 v3.1.2 k8s.io/api v0.26.1 k8s.io/apimachinery v0.26.1 @@ -81,7 +82,6 @@ require ( go.uber.org/multierr v1.9.0 // indirect golang.org/x/crypto v0.6.0 // indirect golang.org/x/net v0.6.0 // indirect - golang.org/x/oauth2 v0.4.0 // indirect golang.org/x/sys v0.5.0 // indirect golang.org/x/term v0.5.0 // indirect golang.org/x/text v0.7.0 // indirect diff --git a/go.sum b/go.sum index 7226c8b..1881869 100644 --- a/go.sum +++ b/go.sum @@ -264,6 +264,7 @@ github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g= github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= @@ -315,6 +316,7 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU= github.com/onsi/ginkgo/v2 v2.6.1 h1:1xQPCjcqYw/J5LchOcp4/2q/jzJFjiAOc25chhnDw+Q= github.com/onsi/ginkgo/v2 v2.6.1/go.mod h1:yjiuMwPokqY1XauOgju45q3sJt6VzQ/Fict1LFVcsAo= github.com/onsi/gomega v1.24.2 h1:J/tulyYK6JwBldPViHJReihxxZ+22FHs0piGjQAvoUE= diff --git a/pkg/cloudscale/accumulate.go b/pkg/cloudscale/accumulate.go deleted file mode 100644 index 1ba223a..0000000 --- a/pkg/cloudscale/accumulate.go +++ /dev/null @@ -1,152 +0,0 @@ -package cloudscale - -import ( - "context" - "fmt" - "time" - - "github.com/cloudscale-ch/cloudscale-go-sdk/v2" - "github.com/vshn/billing-collector-cloudservices/pkg/kubernetes" - "github.com/vshn/billing-collector-cloudservices/pkg/log" - cloudscalev1 "github.com/vshn/provider-cloudscale/apis/cloudscale/v1" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -const ( - organizationLabel = "appuio.io/organization" - namespaceLabel = "crossplane.io/claim-namespace" -) - -// AccumulateKey represents one data point ("fact") in the billing database. -// The actual value for the data point is not present in this type, as this type is just a map key, and the corresponding value is stored as a map value. -type AccumulateKey struct { - Query string - // Zone is currently always just `cloudscale` - Zone string - Tenant string - Namespace string - Start time.Time -} - -// GetSourceString returns the full "source" string as used by the appuio-cloud-reporting -func (k AccumulateKey) GetSourceString() string { - return k.Query + ":" + k.Zone + ":" + k.Tenant + ":" + k.Namespace -} - -func (k AccumulateKey) GetCategoryString() string { - return k.Zone + ":" + k.Namespace -} - -// MarshalText implements encoding.TextMarshaler to be able to e.g. log the map with this key type. -func (k AccumulateKey) MarshalText() ([]byte, error) { - return []byte(k.GetSourceString()), nil -} - -/* -accumulateBucketMetrics gets all the bucket metrics from cloudscale and puts them into a map. The map key is the "AccumulateKey", -and the value is the raw value of the data returned by cloudscale (e.g. bytes, requests). In order to construct the -correct AccumulateKey, this function needs to fetch the namespace and bucket custom resources, because that's where the tenant is stored. -This method is "accumulating" data because it collects data from possibly multiple ObjectsUsers under the same -AccumulateKey. This is because the billing system can't handle multiple ObjectsUsers per namespace. -*/ -func accumulateBucketMetrics(ctx context.Context, date time.Time, cloudscaleClient *cloudscale.Client, k8sclient client.Client, orgOverride string) (map[AccumulateKey]uint64, error) { - logger := log.Logger(ctx) - - logger.V(1).Info("fetching bucket metrics from cloudscale", "date", date) - - bucketMetricsRequest := cloudscale.BucketMetricsRequest{Start: date, End: date} - bucketMetrics, err := cloudscaleClient.Metrics.GetBucketMetrics(ctx, &bucketMetricsRequest) - if err != nil { - return nil, err - } - - logger.V(1).Info("fetching namespaces") - - nsTenants, err := kubernetes.FetchNamespaceWithOrganizationMap(ctx, k8sclient, orgOverride) - if err != nil { - return nil, err - } - - logger.V(1).Info("fetching buckets") - - buckets, err := fetchBuckets(ctx, k8sclient) - if err != nil { - return nil, err - } - - accumulated := make(map[AccumulateKey]uint64) - - for _, bucketMetricsData := range bucketMetrics.Data { - name := bucketMetricsData.Subject.BucketName - logger := logger.WithValues("bucket", name) - ns, ok := buckets[name] - if !ok { - logger.Info("unable to sync bucket, ObjectBucket not found") - continue - } - tenant, ok := nsTenants[ns] - if !ok { - logger.Info("unable to sync bucket, no tenant mapping available for namespace", "namespace", ns) - continue - } - err = accumulateBucketMetricsForObjectsUser(accumulated, bucketMetricsData, tenant, ns) - if err != nil { - logger.Error(err, "unable to sync bucket", "namespace", ns) - continue - } - logger.V(1).Info("accumulated raw bucket metrics", "namespace", ns, "tenant", tenant, "accumulated", accumulated) - } - - return accumulated, nil -} - -func fetchBuckets(ctx context.Context, k8sclient client.Client) (map[string]string, error) { - buckets := &cloudscalev1.BucketList{} - if err := k8sclient.List(ctx, buckets, client.HasLabels{namespaceLabel}); err != nil { - return nil, fmt.Errorf("bucket list: %w", err) - } - - bucketNS := map[string]string{} - for _, b := range buckets.Items { - bucketNS[b.GetBucketName()] = b.Labels[namespaceLabel] - } - return bucketNS, nil -} - -func accumulateBucketMetricsForObjectsUser(accumulated map[AccumulateKey]uint64, bucketMetricsData cloudscale.BucketMetricsData, tenant, namespace string) error { - if len(bucketMetricsData.TimeSeries) != 1 { - return fmt.Errorf("there must be exactly one metrics data point, found %d", len(bucketMetricsData.TimeSeries)) - } - - // For now all the buckets have the same zone. This may change in the future if Cloudscale decides to have different - // prices for different locations. - zone := sourceZones[0] - - sourceStorage := AccumulateKey{ - Query: sourceQueryStorage, - Zone: zone, - Tenant: tenant, - Namespace: namespace, - Start: bucketMetricsData.TimeSeries[0].Start, - } - sourceTrafficOut := AccumulateKey{ - Query: sourceQueryTrafficOut, - Zone: zone, - Tenant: tenant, - Namespace: namespace, - Start: bucketMetricsData.TimeSeries[0].Start, - } - sourceRequests := AccumulateKey{ - Query: sourceQueryRequests, - Zone: zone, - Tenant: tenant, - Namespace: namespace, - Start: bucketMetricsData.TimeSeries[0].Start, - } - - accumulated[sourceStorage] += uint64(bucketMetricsData.TimeSeries[0].Usage.StorageBytes) - accumulated[sourceTrafficOut] += uint64(bucketMetricsData.TimeSeries[0].Usage.SentBytes) - accumulated[sourceRequests] += uint64(bucketMetricsData.TimeSeries[0].Usage.Requests) - - return nil -} diff --git a/pkg/cloudscale/fixtures.go b/pkg/cloudscale/fixtures.go index e0f01d2..74081d3 100644 --- a/pkg/cloudscale/fixtures.go +++ b/pkg/cloudscale/fixtures.go @@ -3,9 +3,9 @@ package cloudscale const ( // source format: 'query:zone:tenant:namespace' or 'query:zone:tenant:namespace:class' // We do not have real (prometheus) queries here, just random hardcoded strings. - sourceQueryStorage = "appcat_object-storage-storage" - sourceQueryTrafficOut = "appcat_object-storage-traffic-out" - sourceQueryRequests = "appcat_object-storage-requests" + productIdStorage = "appcat-cloudscale-object-storage-storage" + productIdTrafficOut = "appcat-cloudscale-object-storage-traffic-out" + productIdQueryRequests = "appcat_object-storage-requests" ) var ( @@ -15,7 +15,7 @@ var ( ) var units = map[string]string{ - sourceQueryStorage: "GBDay", - sourceQueryTrafficOut: "GB", - sourceQueryRequests: "KReq", + productIdStorage: "GBDay", + productIdTrafficOut: "GB", + productIdQueryRequests: "KReq", } diff --git a/pkg/cloudscale/metrics.go b/pkg/cloudscale/metrics.go new file mode 100644 index 0000000..de3daa2 --- /dev/null +++ b/pkg/cloudscale/metrics.go @@ -0,0 +1,152 @@ +package cloudscale + +import ( + "context" + "fmt" + apiv1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/vshn/billing-collector-cloudservices/pkg/kubernetes" + "github.com/vshn/billing-collector-cloudservices/pkg/odoo" + "github.com/vshn/billing-collector-cloudservices/pkg/prom" + "time" + + "github.com/cloudscale-ch/cloudscale-go-sdk/v2" + "github.com/vshn/billing-collector-cloudservices/pkg/log" + cloudscalev1 "github.com/vshn/provider-cloudscale/apis/cloudscale/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + organizationLabel = "appuio.io/organization" + namespaceLabel = "crossplane.io/claim-namespace" +) + +func getOdooMeteredBillingRecord(ctx context.Context, date time.Time, cloudscaleClient *cloudscale.Client, k8sclient client.Client, promClient apiv1.API, salesOrderId string) ([]odoo.OdooMeteredBillingRecord, error) { + logger := log.Logger(ctx) + + logger.V(1).Info("fetching bucket metrics from cloudscale", "date", date) + + bucketMetricsRequest := cloudscale.BucketMetricsRequest{Start: date, End: date} + bucketMetrics, err := cloudscaleClient.Metrics.GetBucketMetrics(ctx, &bucketMetricsRequest) + if err != nil { + return nil, err + } + + // Fetch organisations in case salesOrderId is missing + var nsTenants map[string]string + if salesOrderId == "" { + logger.V(1).Info("Sales order id is missing, fetching namespaces to get the associated org id") + nsTenants, err = kubernetes.FetchNamespaceWithOrganizationMap(ctx, k8sclient) + if err != nil { + return nil, err + } + } + + logger.V(1).Info("fetching buckets") + + buckets, err := fetchBuckets(ctx, k8sclient) + if err != nil { + return nil, err + } + + allRecords := make([]odoo.OdooMeteredBillingRecord, 0) + for _, bucketMetricsData := range bucketMetrics.Data { + name := bucketMetricsData.Subject.BucketName + logger = logger.WithValues("bucket", name) + ns, ok := buckets[name] + if !ok { + logger.Info("unable to sync bucket, ObjectBucket not found") + continue + } + if salesOrderId == "" { + salesOrderId, err = prom.GetSalesOrderId(ctx, promClient, nsTenants[ns]) + if err != nil { + logger.Error(err, "unable to sync bucket", "namespace", ns) + continue + } + } + records, err := createOdooRecord(bucketMetricsData, salesOrderId, ns) + if err != nil { + logger.Error(err, "unable to sync bucket", "namespace", ns) + continue + } + allRecords = append(allRecords, records...) + logger.V(1).Info("Created Odoo records", "namespace", ns, "records", records) + } + + return allRecords, nil +} + +func fetchBuckets(ctx context.Context, k8sclient client.Client) (map[string]string, error) { + buckets := &cloudscalev1.BucketList{} + if err := k8sclient.List(ctx, buckets, client.HasLabels{namespaceLabel}); err != nil { + return nil, fmt.Errorf("bucket list: %w", err) + } + + bucketNS := map[string]string{} + for _, b := range buckets.Items { + bucketNS[b.GetBucketName()] = b.Labels[namespaceLabel] + } + return bucketNS, nil +} + +func createOdooRecord(bucketMetricsData cloudscale.BucketMetricsData, salesOrderId, namespace string) ([]odoo.OdooMeteredBillingRecord, error) { + if len(bucketMetricsData.TimeSeries) != 1 { + return nil, fmt.Errorf("there must be exactly one metrics data point, found %d", len(bucketMetricsData.TimeSeries)) + } + + storageBytesValue, err := convertUnit(units[productIdStorage], uint64(bucketMetricsData.TimeSeries[0].Usage.StorageBytes)) + if err != nil { + return nil, err + } + trafficOutValue, err := convertUnit(units[productIdTrafficOut], uint64(bucketMetricsData.TimeSeries[0].Usage.SentBytes)) + if err != nil { + return nil, err + } + queryRequestsValue, err := convertUnit(units[productIdQueryRequests], uint64(bucketMetricsData.TimeSeries[0].Usage.SentBytes)) + if err != nil { + return nil, err + } + + //TODO where to put zone and namespace? + return []odoo.OdooMeteredBillingRecord{ + { + ProductID: productIdStorage, + InstanceID: bucketMetricsData.Subject.BucketName, + ItemDescription: "Cloudscale ObjectStorage", + ItemGroupDescription: "AppCat Cloudscale ObjectStorage", + SalesOrderID: salesOrderId, + UnitID: units[productIdStorage], + ConsumedUnits: storageBytesValue, + TimeRange: odoo.TimeRange{ + From: bucketMetricsData.TimeSeries[0].Start, + To: bucketMetricsData.TimeSeries[0].End, + }, + }, + { + ProductID: productIdTrafficOut, + InstanceID: bucketMetricsData.Subject.BucketName, + ItemDescription: "Cloudscale ObjectStorage", + ItemGroupDescription: "AppCat Cloudscale ObjectStorage", + SalesOrderID: salesOrderId, + UnitID: units[productIdTrafficOut], + ConsumedUnits: trafficOutValue, + TimeRange: odoo.TimeRange{ + From: bucketMetricsData.TimeSeries[0].Start, + To: bucketMetricsData.TimeSeries[0].End, + }, + }, + { + ProductID: productIdQueryRequests, + InstanceID: bucketMetricsData.Subject.BucketName, + ItemDescription: "Cloudscale ObjectStorage", + ItemGroupDescription: "AppCat Cloudscale ObjectStorage", + SalesOrderID: salesOrderId, + UnitID: units[productIdQueryRequests], + ConsumedUnits: queryRequestsValue, + TimeRange: odoo.TimeRange{ + From: bucketMetricsData.TimeSeries[0].Start, + To: bucketMetricsData.TimeSeries[0].End, + }, + }, + }, nil +} diff --git a/pkg/cloudscale/accumulate_test.go b/pkg/cloudscale/metrics_test.go similarity index 89% rename from pkg/cloudscale/accumulate_test.go rename to pkg/cloudscale/metrics_test.go index 466e35c..ddb55ca 100644 --- a/pkg/cloudscale/accumulate_test.go +++ b/pkg/cloudscale/metrics_test.go @@ -1,3 +1,5 @@ +//go:build integration + package cloudscale import ( @@ -49,23 +51,22 @@ func TestAccumulateBucketMetricsForObjectsUser(t *testing.T) { } accumulated := make(map[AccumulateKey]uint64) - assert.NoError(t, accumulateBucketMetricsForObjectsUser(accumulated, bucketMetricsData, organization, namespace)) + assert.NoError(t, accumulateBucketMetricsForObjectsUser(accumulated, bucketMetricsData, namespace)) require.Len(t, accumulated, 3, "incorrect amount of values 'accumulated'") key := AccumulateKey{ Zone: zone, - Tenant: organization, Namespace: namespace, Start: date, } - key.Query = "appcat_object-storage-requests" + key.ProductId = "appcat_object-storage-requests" assertEqualfUint64(t, uint64(5), accumulated[key], "incorrect value in %s", key) - key.Query = "appcat_object-storage-storage" + key.ProductId = "appcat_object-storage-storage" assertEqualfUint64(t, uint64(1000000), accumulated[key], "incorrect value in %s", key) - key.Query = "appcat_object-storage-traffic-out" + key.ProductId = "appcat_object-storage-traffic-out" assertEqualfUint64(t, uint64(2000000), accumulated[key], "incorrect value in %s", key) } diff --git a/pkg/cloudscale/objectstorage.go b/pkg/cloudscale/objectstorage.go index 1a5af4c..2a11157 100644 --- a/pkg/cloudscale/objectstorage.go +++ b/pkg/cloudscale/objectstorage.go @@ -3,29 +3,30 @@ package cloudscale import ( "context" "errors" + apiv1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/vshn/billing-collector-cloudservices/pkg/odoo" "time" "github.com/cloudscale-ch/cloudscale-go-sdk/v2" - "github.com/vshn/billing-collector-cloudservices/pkg/prom" "sigs.k8s.io/controller-runtime/pkg/client" ) type ObjectStorage struct { - client *cloudscale.Client - k8sClient client.Client - orgOverride string + client *cloudscale.Client + k8sClient client.Client + promClient apiv1.API } -func NewObjectStorage(client *cloudscale.Client, k8sClient client.Client, orgOverride string) (*ObjectStorage, error) { +func NewObjectStorage(client *cloudscale.Client, k8sClient client.Client, promClient apiv1.API) (*ObjectStorage, error) { return &ObjectStorage{ - client: client, - k8sClient: k8sClient, - orgOverride: orgOverride, + client: client, + k8sClient: k8sClient, + promClient: promClient, }, nil } -func (obj *ObjectStorage) Accumulate(ctx context.Context, date time.Time) (map[AccumulateKey]uint64, error) { - return accumulateBucketMetrics(ctx, date, obj.client, obj.k8sClient, obj.orgOverride) +func (obj *ObjectStorage) GetMetrics(ctx context.Context, date time.Time, salesOrderId string) ([]odoo.OdooMeteredBillingRecord, error) { + return getOdooMeteredBillingRecord(ctx, date, obj.client, obj.k8sClient, obj.promClient, salesOrderId) } func convertUnit(unit string, value uint64) (float64, error) { @@ -37,19 +38,3 @@ func convertUnit(unit string, value uint64) (float64, error) { } return 0, errors.New("Unknown query unit " + unit) } - -func Export(metrics map[AccumulateKey]uint64, billingHour int) error { - prom.ResetAppCatMetric() - - billingParts := 24 - billingHour - - for k, v := range metrics { - value, err := convertUnit(units[k.Query], v) - if err != nil { - return err - } - value = value / float64(billingParts) - prom.UpdateAppCatMetric(value, k.GetCategoryString(), k.GetSourceString(), "objectStorage") - } - return nil -} diff --git a/pkg/cmd/cloudscale.go b/pkg/cmd/cloudscale.go index 9778417..009155f 100644 --- a/pkg/cmd/cloudscale.go +++ b/pkg/cmd/cloudscale.go @@ -2,9 +2,12 @@ package cmd import ( "fmt" + apiv1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/vshn/billing-collector-cloudservices/pkg/odoo" + "github.com/vshn/billing-collector-cloudservices/pkg/prom" "net/http" "os" - "strings" + "sync" "time" "github.com/cloudscale-ch/cloudscale-go-sdk/v2" @@ -12,43 +15,50 @@ import ( cs "github.com/vshn/billing-collector-cloudservices/pkg/cloudscale" "github.com/vshn/billing-collector-cloudservices/pkg/kubernetes" "github.com/vshn/billing-collector-cloudservices/pkg/log" - "github.com/vshn/billing-collector-cloudservices/pkg/prom" ) +const defaultTextForRequiredFlags = "" +const defaultTextForOptionalFlags = "" + func CloudscaleCmds() *cli.Command { var ( - apiToken string - kubeconfig string - days int + apiToken string + kubeconfig string + days int + odooURL string + odooOauthTokenURL string + odooClientId string + odooClientSecret string + salesOrderId string + prometheusURL string ) return &cli.Command{ Name: "cloudscale", Usage: "Collect metrics from cloudscale", Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "cloudscale-api-token", - EnvVars: []string{"CLOUDSCALE_API_TOKEN"}, - Required: true, - Usage: "API token for cloudscale", - Destination: &apiToken, - }, - &cli.StringFlag{ - Name: "kubeconfig", - EnvVars: []string{"KUBECONFIG"}, - Usage: "Path to a kubeconfig file which will be used instead of url/token flags if set", - Destination: &kubeconfig, - }, - &cli.IntFlag{ - Name: "days", - EnvVars: []string{"DAYS"}, - Value: 1, - Usage: "Days of metrics to fetch since today, set to 0 to get current metrics", - Destination: &days, - }, + &cli.StringFlag{Name: "cloudscale-api-token", Usage: "API token for cloudscale", + EnvVars: []string{"CLOUDSCALE_API_TOKEN"}, Destination: &apiToken, Required: true, DefaultText: defaultTextForRequiredFlags}, + &cli.StringFlag{Name: "kubeconfig", Usage: "Path to a kubeconfig file which will be used instead of url/token flags if set", + EnvVars: []string{"KUBECONFIG"}, Destination: &kubeconfig, Required: false, DefaultText: defaultTextForOptionalFlags}, + &cli.IntFlag{Name: "days", Usage: "Days of metrics to fetch since today, set to 0 to get current metrics", + EnvVars: []string{"DAYS"}, Destination: &days, Value: 1, Required: false, DefaultText: defaultTextForOptionalFlags}, + &cli.StringFlag{Name: "odoo-url", Usage: "URL of the Odoo Metered Billing API", + EnvVars: []string{"ODOO_URL"}, Destination: &odooURL, Value: "http://localhost:8080"}, + &cli.StringFlag{Name: "odoo-oauth-token-url", Usage: "Oauth Token URL to authenticate with Odoo metered billing API", + EnvVars: []string{"ODOO_OAUTH_TOKEN_URL"}, Destination: &odooOauthTokenURL, Required: true, DefaultText: defaultTextForRequiredFlags}, + &cli.StringFlag{Name: "odoo-oauth-client-id", Usage: "Client ID of the oauth client to interact with Odoo metered billing API", + EnvVars: []string{"ODOO_OAUTH_CLIENT_ID"}, Destination: &odooClientId, Required: true, DefaultText: defaultTextForRequiredFlags}, + &cli.StringFlag{Name: "odoo-oauth-client-secret", Usage: "Client secret of the oauth client to interact with Odoo metered billing API", + EnvVars: []string{"ODOO_OAUTH_CLIENT_SECRET"}, Destination: &odooClientSecret, Required: true, DefaultText: defaultTextForRequiredFlags}, + &cli.StringFlag{Name: "appuio-managed-sales-order", Usage: "Sales order for APPUiO Managed clusters", + EnvVars: []string{"APPUIO_MANAGED_SALES_ORDER"}, Destination: &salesOrderId, Required: false, DefaultText: defaultTextForOptionalFlags}, + &cli.StringFlag{Name: "prom-url", Usage: "Prometheus connection URL in the form of http://host:port, required for APPUiO Cloud", + EnvVars: []string{"PROM_URL"}, Destination: &prometheusURL, Value: "http://localhost:9090"}, }, Before: addCommandName, Action: func(c *cli.Context) error { logger := log.Logger(c.Context) + var wg sync.WaitGroup logger.Info("Creating cloudscale client") cloudscaleClient := cloudscale.NewClient(http.DefaultClient) @@ -60,20 +70,34 @@ func CloudscaleCmds() *cli.Command { return fmt.Errorf("k8s client: %w", err) } + var promClient apiv1.API + if salesOrderId == "" { + promClient, err = prom.NewPrometheusAPIClient(prometheusURL) + if err != nil { + return fmt.Errorf("prometheus client: %w", err) + } + } + + odooClient := odoo.NewOdooAPIClient(c.Context, odooURL, odooOauthTokenURL, odooClientId, odooClientSecret, logger) + location, err := time.LoadLocation("Europe/Zurich") if err != nil { return fmt.Errorf("load loaction: %w", err) } - orgOverride := strings.ToLower(c.String("organizationOverride")) - - o, err := cs.NewObjectStorage(cloudscaleClient, k8sClient, orgOverride) + o, err := cs.NewObjectStorage(cloudscaleClient, k8sClient, promClient) if err != nil { return fmt.Errorf("object storage: %w", err) } collectInterval := c.Int("collectInterval") + if collectInterval < 1 || collectInterval > 23 { + // Set to run once a day after billingHour in case the collectInterval is out of boundaries + collectInterval = 23 + } + billingHour := c.Int("billingHour") + wg.Add(1) go func() { for { if time.Now().Hour() >= billingHour { @@ -84,24 +108,25 @@ func CloudscaleCmds() *cli.Command { } logger.V(1).Info("Running cloudscale collector") - collected, err := o.Accumulate(c.Context, billingDate) + metrics, err := o.GetMetrics(c.Context, billingDate, salesOrderId) if err != nil { logger.Error(err, "could not collect cloudscale bucket metrics") - os.Exit(1) + wg.Done() } - logger.Info("Exporting bucket metrics", "billingHour", billingHour, "date", billingDate) - err = cs.Export(collected, billingHour) + logger.Info("Exporting data to Odoo", "billingHour", billingHour, "date", billingDate) + err = odooClient.SendData(metrics) if err != nil { logger.Error(err, "could not export cloudscale bucket metrics") } } - time.Sleep(time.Second * time.Duration(collectInterval)) + time.Sleep(time.Hour * time.Duration(collectInterval)) } }() - - return prom.ServeMetrics(c.Context, c.String("bind")) + wg.Wait() + os.Exit(1) + return nil }, } } diff --git a/pkg/cmd/exoscale.go b/pkg/cmd/exoscale.go index c145743..d7d34d8 100644 --- a/pkg/cmd/exoscale.go +++ b/pkg/cmd/exoscale.go @@ -2,21 +2,17 @@ package cmd import ( "fmt" + apiv1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/vshn/billing-collector-cloudservices/pkg/odoo" + "github.com/vshn/billing-collector-cloudservices/pkg/prom" "os" - "strings" + "sync" "time" "github.com/urfave/cli/v2" "github.com/vshn/billing-collector-cloudservices/pkg/exoscale" "github.com/vshn/billing-collector-cloudservices/pkg/kubernetes" "github.com/vshn/billing-collector-cloudservices/pkg/log" - "github.com/vshn/billing-collector-cloudservices/pkg/prom" -) - -var ( - secret string - accessKey string - kubeconfig string ) func addCommandName(c *cli.Context) error { @@ -25,106 +21,177 @@ func addCommandName(c *cli.Context) error { } func ExoscaleCmds() *cli.Command { - + var ( + secret string + accessKey string + kubeconfig string + odooURL string + odooOauthTokenURL string + odooClientId string + odooClientSecret string + salesOrderId string + prometheusURL string + ) return &cli.Command{ Name: "exoscale", Usage: "Collect metrics from exoscale", Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "exoscale-secret", - Aliases: []string{"s"}, - EnvVars: []string{"EXOSCALE_API_SECRET"}, - Required: true, - Usage: "The secret which has unrestricted SOS service access in an Exoscale organization", - Destination: &secret, - }, - &cli.StringFlag{ - Name: "exoscale-access-key", - Aliases: []string{"k"}, - EnvVars: []string{"EXOSCALE_API_KEY"}, - Required: true, - Usage: "A key which has unrestricted SOS service access in an Exoscale organization", - Destination: &accessKey, - }, - &cli.StringFlag{ - Name: "kubeconfig", - EnvVars: []string{"KUBECONFIG"}, - Usage: "Path to a kubeconfig file which will be used instead of url/token flags if set", - Destination: &kubeconfig, - }, + &cli.StringFlag{Name: "exoscale-secret", Aliases: []string{"s"}, Usage: "The secret which has unrestricted SOS service access in an Exoscale organization", + EnvVars: []string{"EXOSCALE_API_SECRET"}, Destination: &secret, Required: true, DefaultText: defaultTextForRequiredFlags}, + &cli.StringFlag{Name: "exoscale-access-key", Aliases: []string{"k"}, Usage: "A key which has unrestricted SOS service access in an Exoscale organization", + EnvVars: []string{"EXOSCALE_API_KEY"}, Destination: &accessKey, Required: true, DefaultText: defaultTextForRequiredFlags}, + &cli.StringFlag{Name: "kubeconfig", Usage: "Path to a kubeconfig file which will be used instead of url/token flags if set", + EnvVars: []string{"KUBECONFIG"}, Destination: &kubeconfig, Required: false, DefaultText: defaultTextForOptionalFlags}, + &cli.StringFlag{Name: "odoo-url", Usage: "URL of the Odoo Metered Billing API", + EnvVars: []string{"ODOO_URL"}, Destination: &odooURL, Value: "http://localhost:8080"}, + &cli.StringFlag{Name: "odoo-oauth-token-url", Usage: "Oauth Token URL to authenticate with Odoo metered billing API", + EnvVars: []string{"ODOO_OAUTH_TOKEN_URL"}, Destination: &odooOauthTokenURL, Required: true, DefaultText: defaultTextForRequiredFlags}, + &cli.StringFlag{Name: "odoo-oauth-client-id", Usage: "Client ID of the oauth client to interact with Odoo metered billing API", + EnvVars: []string{"ODOO_OAUTH_CLIENT_ID"}, Destination: &odooClientId, Required: true, DefaultText: defaultTextForRequiredFlags}, + &cli.StringFlag{Name: "odoo-oauth-client-secret", Usage: "Client secret of the oauth client to interact with Odoo metered billing API", + EnvVars: []string{"ODOO_OAUTH_CLIENT_SECRET"}, Destination: &odooClientSecret, Required: true, DefaultText: defaultTextForRequiredFlags}, + &cli.StringFlag{Name: "appuio-managed-sales-order", Usage: "Sales order for APPUiO Managed clusters", + EnvVars: []string{"APPUIO_MANAGED_SALES_ORDER"}, Destination: &salesOrderId, Required: false, DefaultText: defaultTextForOptionalFlags}, + &cli.StringFlag{Name: "prom-url", Usage: "Prometheus connection URL in the form of http://host:port, required for APPUiO Cloud", + EnvVars: []string{"PROM_URL"}, Destination: &prometheusURL, Value: "http://localhost:9090"}, }, Before: addCommandName, - Action: collectAllExo, - } -} - -func collectAllExo(c *cli.Context) error { - logger := log.Logger(c.Context) - - logger.Info("Creating Exoscale client") - exoscaleClient, err := exoscale.NewClient(accessKey, secret) - if err != nil { - return fmt.Errorf("exoscale client: %w", err) - } - - logger.Info("Creating k8s client") - k8sClient, err := kubernetes.NewClient(kubeconfig) - if err != nil { - return fmt.Errorf("k8s client: %w", err) - } - - orgOverride := strings.ToLower(c.String("organizationOverride")) - - d, err := exoscale.NewDBaaS(exoscaleClient, k8sClient, orgOverride) - if err != nil { - return fmt.Errorf("dbaas service: %w", err) - } - - o, err := exoscale.NewObjectStorage(exoscaleClient, k8sClient, orgOverride) - if err != nil { - return fmt.Errorf("objectbucket service: %w", err) - } - - billingHour := c.Int("billingHour") - collectInterval := c.Int("collectInterval") - - go func() { - for { - metrics, err := d.Accumulate(c.Context) - if err != nil { - logger.Error(err, "cannot execute exoscale dbaas collector") - os.Exit(1) - } - - logger.Info("Collecting ObjectStorage metrics after", "hour", billingHour) - if time.Now().Hour() >= billingHour { - buckets, err := o.Accumulate(c.Context, billingHour) - if err != nil { - logger.Error(err, "cannot execute objectstorage collector") + Subcommands: []*cli.Command{ + { + Name: "objectstorage", + Usage: "Get metrics from object storage service", + Before: addCommandName, + Action: func(c *cli.Context) error { + logger := log.Logger(c.Context) + + var wg sync.WaitGroup + logger.Info("Creating Exoscale client") + exoscaleClient, err := exoscale.NewClient(accessKey, secret) + if err != nil { + return fmt.Errorf("exoscale client: %w", err) + } + + logger.Info("Creating k8s client") + k8sClient, err := kubernetes.NewClient(kubeconfig) + if err != nil { + return fmt.Errorf("k8s client: %w", err) + } + + odooClient := odoo.NewOdooAPIClient(c.Context, odooURL, odooOauthTokenURL, odooClientId, odooClientSecret, logger) + + var promClient apiv1.API + if salesOrderId == "" { + promClient, err = prom.NewPrometheusAPIClient(prometheusURL) + if err != nil { + return fmt.Errorf("prometheus client: %w", err) + } + } + + o, err := exoscale.NewObjectStorage(exoscaleClient, k8sClient, promClient) + if err != nil { + return fmt.Errorf("objectbucket service: %w", err) + } + + collectInterval := c.Int("collectInterval") + if collectInterval < 1 || collectInterval > 23 { + // Set to run once a day after billingHour in case the collectInterval is out of boundaries + collectInterval = 23 + } + + billingHour := c.Int("billingHour") + wg.Add(1) + go func() { + for { + if time.Now().Hour() >= billingHour { + + logger.Info("Collecting ObjectStorage metrics after", "hour", billingHour) + + metrics, err := o.GetMetrics(c.Context, salesOrderId) + if err != nil { + logger.Error(err, "cannot execute objectstorage collector") + os.Exit(1) + } + err = odooClient.SendData(metrics) + if err != nil { + logger.Error(err, "cannot export metrics") + } + + } + + time.Sleep(time.Second * time.Duration(collectInterval)) + } + }() + + wg.Wait() os.Exit(1) - } - metrics = mergeMaps(metrics, buckets) - } - - err = exoscale.Export(metrics) - if err != nil { - logger.Error(err, "cannot export metrics") - } - - time.Sleep(time.Second * time.Duration(collectInterval)) - } - }() - - return prom.ServeMetrics(c.Context, c.String("bind")) -} - -func mergeMaps(m1, m2 map[exoscale.Key]exoscale.Aggregated) map[exoscale.Key]exoscale.Aggregated { - merged := map[exoscale.Key]exoscale.Aggregated{} - for k, v := range m1 { - merged[k] = v - } - for k, v := range m2 { - merged[k] = v + return nil + }, + }, + { + Name: "dbaas", + Usage: "Get metrics from database service", + Before: addCommandName, + Action: func(c *cli.Context) error { + logger := log.Logger(c.Context) + + var wg sync.WaitGroup + logger.Info("Creating Exoscale client") + exoscaleClient, err := exoscale.NewClient(accessKey, secret) + if err != nil { + return fmt.Errorf("exoscale client: %w", err) + } + + logger.Info("Creating k8s client") + k8sClient, err := kubernetes.NewClient(kubeconfig) + if err != nil { + return fmt.Errorf("k8s client: %w", err) + } + + odooClient := odoo.NewOdooAPIClient(c.Context, odooURL, odooOauthTokenURL, odooClientId, odooClientSecret, logger) + + var promClient apiv1.API + if salesOrderId == "" { + promClient, err = prom.NewPrometheusAPIClient(prometheusURL) + if err != nil { + return fmt.Errorf("prometheus client: %w", err) + } + } + + d, err := exoscale.NewDBaaS(exoscaleClient, k8sClient, promClient) + if err != nil { + return fmt.Errorf("dbaas service: %w", err) + } + + collectInterval := c.Int("collectInterval") + if collectInterval < 1 || collectInterval > 24 { + // Set to run once a day after billingHour in case the collectInterval is out of boundaries + collectInterval = 1 + } + + wg.Add(1) + go func() { + for { + logger.Info("Collecting DBaaS metrics") + metrics, err := d.GetMetrics(c.Context, collectInterval, salesOrderId) + if err != nil { + logger.Error(err, "cannot execute dbaas collector") + os.Exit(1) + } + + err = odooClient.SendData(metrics) + if err != nil { + logger.Error(err, "cannot export metrics") + } + + time.Sleep(time.Second * time.Duration(collectInterval)) + } + }() + + wg.Wait() + os.Exit(1) + return nil + }, + }, + }, } - return merged } diff --git a/pkg/exofixtures/types.go b/pkg/exofixtures/types.go index 54978b9..38630b1 100644 --- a/pkg/exofixtures/types.go +++ b/pkg/exofixtures/types.go @@ -49,7 +49,7 @@ const ( defaultUnitDBaaS = "Instances" ) -// BillingTypes contains exoscale service types to Query billing Database types +// BillingTypes contains exoscale service types to ProductId billing Database types var BillingTypes = map[string]string{ "pg": queryDBaaSPostgres, "mysql": queryDBaaSMysql, diff --git a/pkg/exoscale/dbaas.go b/pkg/exoscale/dbaas.go index 695731a..7da72fa 100644 --- a/pkg/exoscale/dbaas.go +++ b/pkg/exoscale/dbaas.go @@ -3,19 +3,22 @@ package exoscale import ( "context" "fmt" - "strings" - egoscale "github.com/exoscale/egoscale/v2" - "github.com/vshn/billing-collector-cloudservices/pkg/exofixtures" + apiv1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/vshn/billing-collector-cloudservices/pkg/kubernetes" "github.com/vshn/billing-collector-cloudservices/pkg/log" + "github.com/vshn/billing-collector-cloudservices/pkg/odoo" "github.com/vshn/billing-collector-cloudservices/pkg/prom" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" k8s "sigs.k8s.io/controller-runtime/pkg/client" + "time" ) +const productIdPrefix = "appcat-exoscale-dbaas" +const unit = "Instances" + var ( groupVersionKinds = map[string]schema.GroupVersionKind{ "pg": { @@ -55,19 +58,19 @@ type Detail struct { type DBaaS struct { exoscaleClient *egoscale.Client k8sClient k8s.Client - orgOverride string + promClient apiv1.API } // NewDBaaS creates a Service with the initial setup -func NewDBaaS(exoscaleClient *egoscale.Client, k8sClient k8s.Client, orgOverride string) (*DBaaS, error) { +func NewDBaaS(exoscaleClient *egoscale.Client, k8sClient k8s.Client, promClient apiv1.API) (*DBaaS, error) { return &DBaaS{ exoscaleClient: exoscaleClient, k8sClient: k8sClient, - orgOverride: orgOverride, + promClient: promClient, }, nil } -func (ds *DBaaS) Accumulate(ctx context.Context) (map[Key]Aggregated, error) { +func (ds *DBaaS) GetMetrics(ctx context.Context, collectInterval int, salesOrderId string) ([]odoo.OdooMeteredBillingRecord, error) { detail, err := ds.fetchManagedDBaaSAndNamespaces(ctx) if err != nil { return nil, fmt.Errorf("fetchManagedDBaaSAndNamespaces: %w", err) @@ -78,7 +81,7 @@ func (ds *DBaaS) Accumulate(ctx context.Context) (map[Key]Aggregated, error) { return nil, fmt.Errorf("fetchDBaaSUsage: %w", err) } - return aggregateDBaaS(ctx, usage, detail), nil + return aggregateDBaaS(ctx, ds.promClient, usage, detail, collectInterval, salesOrderId) } // fetchManagedDBaaSAndNamespaces fetches instances and namespaces from kubernetes cluster @@ -86,7 +89,7 @@ func (ds *DBaaS) fetchManagedDBaaSAndNamespaces(ctx context.Context) ([]Detail, logger := log.Logger(ctx) logger.V(1).Info("Listing namespaces from cluster") - namespaces, err := kubernetes.FetchNamespaceWithOrganizationMap(ctx, ds.k8sClient, ds.orgOverride) + namespaces, err := kubernetes.FetchNamespaceWithOrganizationMap(ctx, ds.k8sClient) if err != nil { return nil, fmt.Errorf("cannot list namespaces: %w", err) } @@ -161,7 +164,7 @@ func (ds *DBaaS) fetchDBaaSUsage(ctx context.Context) ([]*egoscale.DatabaseServi } // aggregateDBaaS aggregates DBaaS services by namespaces and plan -func aggregateDBaaS(ctx context.Context, exoscaleDBaaS []*egoscale.DatabaseService, dbaasDetails []Detail) map[Key]Aggregated { +func aggregateDBaaS(ctx context.Context, promClient apiv1.API, exoscaleDBaaS []*egoscale.DatabaseService, dbaasDetails []Detail, collectInterval int, salesOrderId string) ([]odoo.OdooMeteredBillingRecord, error) { logger := log.Logger(ctx) logger.Info("Aggregating DBaaS instances by namespace and plan") @@ -171,42 +174,52 @@ func aggregateDBaaS(ctx context.Context, exoscaleDBaaS []*egoscale.DatabaseServi dbaasServiceUsageMap[*usage.Name] = *usage } - aggregatedDBaasS := make(map[Key]Aggregated) + location, err := time.LoadLocation("Europe/Zurich") + if err != nil { + return nil, fmt.Errorf("load loaction: %w", err) + } + + now := time.Now().In(location) + billingDateStart := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-collectInterval, 0, 0, 0, now.Location()) + billingDateEnd := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) + + records := make([]odoo.OdooMeteredBillingRecord, 0) for _, dbaasDetail := range dbaasDetails { logger.V(1).Info("Checking DBaaS", "instance", dbaasDetail.DBName) dbaasUsage, exists := dbaasServiceUsageMap[dbaasDetail.DBName] if exists && dbaasDetail.Kind == groupVersionKinds[*dbaasUsage.Type].Kind { logger.V(1).Info("Found exoscale dbaas usage", "instance", dbaasUsage.Name, "instance created", dbaasUsage.CreatedAt) - key := NewKey(dbaasDetail.Namespace, *dbaasUsage.Plan, *dbaasUsage.Type) - aggregated := aggregatedDBaasS[key] - aggregated.Key = key - aggregated.Source = &exofixtures.DBaaSSourceString{ - Organization: dbaasDetail.Organization, - Namespace: dbaasDetail.Namespace, - Plan: *dbaasUsage.Plan, - Query: exofixtures.BillingTypes[*dbaasUsage.Type], + + if salesOrderId == "" { + salesOrderId, err = prom.GetSalesOrderId(ctx, promClient, dbaasDetail.Organization) + if err != nil { + logger.Error(err, "Unable to sync DBaaS, cannot get salesOrderId", "namespace", dbaasDetail.Namespace) + continue + } } - aggregated.Value++ - aggregatedDBaasS[key] = aggregated - } else { - logger.Info("Could not find any DBaaS on exoscale", "instance", dbaasDetail.DBName) - } - } - return aggregatedDBaasS -} + // TODO zones and namespaces? + o := odoo.OdooMeteredBillingRecord{ + ProductID: productIdPrefix + fmt.Sprintf("-%s-%s", *dbaasUsage.Type, *dbaasUsage.Plan), + InstanceID: dbaasDetail.DBName, + ItemDescription: "Exoscale DBaaS", + ItemGroupDescription: "AppCat Exoscale DBaaS", + SalesOrderID: salesOrderId, + UnitID: unit, + ConsumedUnits: 1, + TimeRange: odoo.TimeRange{ + From: billingDateStart, + To: billingDateEnd, + }, + } -func Export(accumulated map[Key]Aggregated) error { + records = append(records, o) - prom.ResetAppCatMetric() - for _, val := range accumulated { - prodType := "dbaas" - if strings.Contains(val.Source.GetSourceString(), "object") { - prodType = "objectstorage" + } else { + logger.Info("Could not find any DBaaS on exoscale", "instance", dbaasDetail.DBName) } - prom.UpdateAppCatMetric(val.Value, val.Source.GetCategoryString(), val.Source.GetSourceString(), prodType) } - return nil + return records, nil } diff --git a/pkg/exoscale/objectstorage.go b/pkg/exoscale/objectstorage.go index 9d1f26a..18aa498 100644 --- a/pkg/exoscale/objectstorage.go +++ b/pkg/exoscale/objectstorage.go @@ -3,23 +3,27 @@ package exoscale import ( "context" "fmt" - + egoscale "github.com/exoscale/egoscale/v2" + "github.com/exoscale/egoscale/v2/oapi" + apiv1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/vshn/billing-collector-cloudservices/pkg/exofixtures" "github.com/vshn/billing-collector-cloudservices/pkg/kubernetes" "github.com/vshn/billing-collector-cloudservices/pkg/log" - - egoscale "github.com/exoscale/egoscale/v2" - "github.com/exoscale/egoscale/v2/oapi" + "github.com/vshn/billing-collector-cloudservices/pkg/odoo" + "github.com/vshn/billing-collector-cloudservices/pkg/prom" exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "time" k8s "sigs.k8s.io/controller-runtime/pkg/client" ) +const productIdStorage = "appcat-exoscale-object-storage" + // ObjectStorage gathers bucket data from exoscale provider and cluster and saves to the database type ObjectStorage struct { k8sClient k8s.Client exoscaleClient *egoscale.Client - orgOverride string + promClient apiv1.API } // BucketDetail a k8s bucket object with relevant data @@ -28,20 +32,20 @@ type BucketDetail struct { } // NewObjectStorage creates an ObjectStorage with the initial setup -func NewObjectStorage(exoscaleClient *egoscale.Client, k8sClient k8s.Client, orgOverride string) (*ObjectStorage, error) { +func NewObjectStorage(exoscaleClient *egoscale.Client, k8sClient k8s.Client, promClient apiv1.API) (*ObjectStorage, error) { return &ObjectStorage{ exoscaleClient: exoscaleClient, k8sClient: k8sClient, - orgOverride: orgOverride, + promClient: promClient, }, nil } -func (o *ObjectStorage) Accumulate(ctx context.Context, billingHour int) (map[Key]Aggregated, error) { +func (o *ObjectStorage) GetMetrics(ctx context.Context, salesOrderId string) ([]odoo.OdooMeteredBillingRecord, error) { detail, err := o.fetchManagedBucketsAndNamespaces(ctx) if err != nil { return nil, fmt.Errorf("fetchManagedBucketsAndNamespaces: %w", err) } - aggregated, err := o.getBucketUsage(ctx, detail, billingHour) + aggregated, err := o.getBucketUsage(ctx, detail, salesOrderId) if err != nil { return nil, fmt.Errorf("getBucketUsage: %w", err) } @@ -50,20 +54,19 @@ func (o *ObjectStorage) Accumulate(ctx context.Context, billingHour int) (map[Ke // getBucketUsage gets bucket usage from Exoscale and matches them with the bucket from the cluster // If there are no buckets in Exoscale, the API will return an empty slice -func (o *ObjectStorage) getBucketUsage(ctx context.Context, bucketDetails []BucketDetail, billingHour int) (map[Key]Aggregated, error) { +func (o *ObjectStorage) getBucketUsage(ctx context.Context, bucketDetails []BucketDetail, salesOrderId string) ([]odoo.OdooMeteredBillingRecord, error) { logger := log.Logger(ctx) logger.Info("Fetching bucket usage from Exoscale") - billingParts := 24 - billingHour - - logger.V(1).Info("calculated billing parts", "billingParts", billingParts) - resp, err := o.exoscaleClient.ListSosBucketsUsageWithResponse(ctx) if err != nil { return nil, err } - aggregatedBuckets := getAggregatedBuckets(ctx, *resp.JSON200.SosBucketsUsage, bucketDetails, billingParts) + aggregatedBuckets, err := getOdooMeteredBillingRecords(ctx, o.promClient, *resp.JSON200.SosBucketsUsage, bucketDetails, salesOrderId) + if err != nil { + return nil, err + } if len(aggregatedBuckets) == 0 { logger.Info("There are no bucket usage to be exported") return nil, nil @@ -72,7 +75,7 @@ func (o *ObjectStorage) getBucketUsage(ctx context.Context, bucketDetails []Buck return aggregatedBuckets, nil } -func getAggregatedBuckets(ctx context.Context, sosBucketsUsage []oapi.SosBucketUsage, bucketDetails []BucketDetail, billingParts int) map[Key]Aggregated { +func getOdooMeteredBillingRecords(ctx context.Context, promClient apiv1.API, sosBucketsUsage []oapi.SosBucketUsage, bucketDetails []BucketDetail, salesOrderId string) ([]odoo.OdooMeteredBillingRecord, error) { logger := log.Logger(ctx) logger.Info("Aggregating buckets by namespace") @@ -81,32 +84,55 @@ func getAggregatedBuckets(ctx context.Context, sosBucketsUsage []oapi.SosBucketU sosBucketsUsageMap[*usage.Name] = usage } - aggregatedBuckets := make(map[Key]Aggregated) + location, err := time.LoadLocation("Europe/Zurich") + if err != nil { + return nil, fmt.Errorf("load loaction: %w", err) + } + + now := time.Now().In(location) + billingDate := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location()) + + aggregatedBuckets := make([]odoo.OdooMeteredBillingRecord, 0) for _, bucketDetail := range bucketDetails { logger.V(1).Info("Checking bucket", "bucket", bucketDetail.BucketName) if bucketUsage, exists := sosBucketsUsageMap[bucketDetail.BucketName]; exists { logger.V(1).Info("Found exoscale bucket usage", "bucket", bucketUsage.Name, "bucket size", bucketUsage.Name) - key := NewKey(bucketDetail.Namespace) - aggregatedBucket := aggregatedBuckets[key] - aggregatedBucket.Key = key - aggregatedBucket.Source = exofixtures.SOSSourceString{ - Namespace: bucketDetail.Namespace, - Organization: bucketDetail.Organization, - } - logger.V(1).Info("dividing by billing parts", "billingParts", billingParts) - usagePart := float64(*bucketUsage.Size) / float64(billingParts) - adjustedSize, err := adjustStorageSizeUnit(usagePart) + value, err := adjustStorageSizeUnit(float64(*bucketUsage.Size)) if err != nil { - logger.Error(err, "cannot adjust bucket size") + return nil, err + } + + if salesOrderId == "" { + salesOrderId, err = prom.GetSalesOrderId(ctx, promClient, bucketDetail.Organization) + if err != nil { + logger.Error(err, "unable to sync bucket", "namespace", bucketDetail.Namespace) + continue + } + } + + // TODO zones and namespaces? + o := odoo.OdooMeteredBillingRecord{ + ProductID: productIdStorage, + InstanceID: bucketDetail.BucketName, + ItemDescription: "Exoscale ObjectStorage", + ItemGroupDescription: "AppCat Exoscale ObjectStorage", + SalesOrderID: salesOrderId, + UnitID: exofixtures.DefaultUnitSos, + ConsumedUnits: value, + TimeRange: odoo.TimeRange{ + From: billingDate, + To: billingDate.AddDate(0, 0, 1), + }, } - aggregatedBucket.Value += adjustedSize - aggregatedBuckets[key] = aggregatedBucket + + aggregatedBuckets = append(aggregatedBuckets, o) + } else { logger.Info("Could not find any bucket on exoscale", "bucket", bucketDetail.BucketName) } } - return aggregatedBuckets + return aggregatedBuckets, nil } func (o *ObjectStorage) fetchManagedBucketsAndNamespaces(ctx context.Context) ([]BucketDetail, error) { @@ -121,7 +147,7 @@ func (o *ObjectStorage) fetchManagedBucketsAndNamespaces(ctx context.Context) ([ } logger.V(1).Info("Listing namespaces from cluster") - namespaces, err := kubernetes.FetchNamespaceWithOrganizationMap(ctx, o.k8sClient, o.orgOverride) + namespaces, err := kubernetes.FetchNamespaceWithOrganizationMap(ctx, o.k8sClient) if err != nil { return nil, fmt.Errorf("cannot list namespaces: %w", err) } diff --git a/pkg/exoscale/objectstorage_test.go b/pkg/exoscale/objectstorage_test.go index b4fc5b3..b840d54 100644 --- a/pkg/exoscale/objectstorage_test.go +++ b/pkg/exoscale/objectstorage_test.go @@ -78,7 +78,7 @@ func TestObjectStorage_GetAggregated(t *testing.T) { ctx := getTestContext(t) // When - aggregated := getAggregatedBuckets(ctx, tc.givenSosBucketsUsage, tc.givenBucketDetails, 1) + aggregated := getOdooMeteredBillingRecords(ctx, tc.givenSosBucketsUsage, tc.givenBucketDetails, 1) // Then assert.Equal(t, tc.expectedAggregated, aggregated) diff --git a/pkg/kubernetes/client.go b/pkg/kubernetes/client.go index 270029a..1b418c5 100644 --- a/pkg/kubernetes/client.go +++ b/pkg/kubernetes/client.go @@ -3,17 +3,17 @@ package kubernetes import ( "context" "fmt" - "github.com/vshn/billing-collector-cloudservices/pkg/log" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + ctrl "sigs.k8s.io/controller-runtime" + cloudscaleapis "github.com/vshn/provider-cloudscale/apis" exoapis "github.com/vshn/provider-exoscale/apis" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -37,21 +37,34 @@ func NewClient(kubeconfig string) (client.Client, error) { return nil, fmt.Errorf("cloudscale scheme: %w", err) } - c, err := client.New(ctrl.GetConfigOrDie(), client.Options{ - Scheme: scheme, - }) + var c client.Client + var err error + if kubeconfig != "" { + config, err := restConfig(kubeconfig) + if err != nil { + return nil, fmt.Errorf("cannot initialize k8s client: %w", err) + } + c, err = client.New(config, client.Options{ + Scheme: scheme, + }) + } else { + c, err = client.New(ctrl.GetConfigOrDie(), client.Options{ + Scheme: scheme, + }) + } if err != nil { return nil, fmt.Errorf("cannot initialize k8s client: %w", err) } return c, nil + } func restConfig(kubeconfig string) (*rest.Config, error) { return clientcmd.BuildConfigFromFlags("", kubeconfig) } -func FetchNamespaceWithOrganizationMap(ctx context.Context, k8sClient client.Client, orgOverride string) (map[string]string, error) { +func FetchNamespaceWithOrganizationMap(ctx context.Context, k8sClient client.Client) (map[string]string, error) { logger := log.Logger(ctx) gvk := schema.GroupVersionKind{ @@ -69,17 +82,12 @@ func FetchNamespaceWithOrganizationMap(ctx context.Context, k8sClient client.Cli namespaces := map[string]string{} for _, ns := range list.Items { - org := orgOverride - if org == "" { - orgLabel, ok := ns.GetLabels()[OrganizationLabel] - if !ok { - logger.Info("Organization label not found in namespace", "namespace", ns.GetName()) - continue - } - org = orgLabel + orgLabel, ok := ns.GetLabels()[OrganizationLabel] + if !ok { + logger.Info("Organization label not found in namespace", "namespace", ns.GetName()) + continue } - - namespaces[ns.GetName()] = org + namespaces[ns.GetName()] = orgLabel } return namespaces, nil } diff --git a/pkg/odoo/odoo.go b/pkg/odoo/odoo.go new file mode 100644 index 0000000..1489122 --- /dev/null +++ b/pkg/odoo/odoo.go @@ -0,0 +1,86 @@ +package odoo + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "time" + + "github.com/go-logr/logr" + "golang.org/x/oauth2/clientcredentials" +) + +type OdooAPIClient struct { + odooURL string + logger logr.Logger + oauthClient *http.Client +} + +type apiObject struct { + Data []OdooMeteredBillingRecord `json:"data"` +} + +type OdooMeteredBillingRecord struct { + ProductID string `json:"product_id"` + InstanceID string `json:"instance_id"` + ItemDescription string `json:"item_description,omitempty"` + ItemGroupDescription string `json:"item_group_description,omitempty"` + SalesOrderID string `json:"sales_order_id"` + UnitID string `json:"unit_id"` + ConsumedUnits float64 `json:"consumed_units"` + TimeRange TimeRange `json:"timerange"` +} + +type TimeRange struct { + From time.Time + To time.Time +} + +func (t TimeRange) MarshalJSON() ([]byte, error) { + return []byte(`"` + t.From.Format(time.RFC3339) + "/" + t.To.Format(time.RFC3339) + `"`), nil +} + +func (t *TimeRange) UnmarshalJSON([]byte) error { + return errors.New("Not implemented") +} + +func NewOdooAPIClient(ctx context.Context, odooURL string, oauthTokenURL string, oauthClientId string, oauthClientSecret string, logger logr.Logger) *OdooAPIClient { + oauthConfig := clientcredentials.Config{ + ClientID: oauthClientId, + ClientSecret: oauthClientSecret, + TokenURL: oauthTokenURL, + } + oauthClient := oauthConfig.Client(ctx) + return &OdooAPIClient{ + odooURL: odooURL, + logger: logger, + oauthClient: oauthClient, + } +} + +func (c OdooAPIClient) SendData(data []OdooMeteredBillingRecord) error { + apiObject := apiObject{ + Data: data, + } + str, err := json.Marshal(apiObject) + if err != nil { + return err + } + resp, err := c.oauthClient.Post(c.odooURL, "application/json", bytes.NewBuffer(str)) + if err != nil { + return err + } + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + c.logger.Info("Records sent to Odoo API", "status", resp.Status, "body", string(body), "numberOfRecords", len(data)) + + if resp.StatusCode != 200 { + return errors.New(fmt.Sprintf("API error when sending records to Odoo:\n%s", body)) + } + + return nil +} diff --git a/pkg/prom/exporter.go b/pkg/prom/exporter.go deleted file mode 100644 index 3018b8f..0000000 --- a/pkg/prom/exporter.go +++ /dev/null @@ -1,78 +0,0 @@ -package prom - -import ( - "context" - "errors" - "net/http" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/vshn/billing-collector-cloudservices/pkg/log" -) - -var ( - gauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "appcat:raw:billing", - Help: "AppCat raw billing metrics, not intended for direct use in billing", - }, []string{ - "category", - "product", - "type", - }) -) - -const defaultBind = ":9123" - -// ResetAppCatMetric makes sure that old metrics are removed from the metrics list. -func ResetAppCatMetric() { - gauge.Reset() -} - -// UpdateAppCatMetric resets the metrics and exports new values. -func UpdateAppCatMetric(metric float64, category, product, prodType string) { - gauge.WithLabelValues(category, product, prodType).Set(metric) -} - -func ServeMetrics(ctx context.Context, bindString string) error { - - if bindString == "" { - bindString = defaultBind - } - - log := log.Logger(ctx) - - log.Info("Starting metric http server") - - err := prometheus.Register(gauge) - if err != nil { - are := &prometheus.AlreadyRegisteredError{} - if !errors.As(err, are) { - return err - } - } - - mux := http.NewServeMux() - mux.Handle("/metrics", promhttp.Handler()) - - server := &http.Server{ - Addr: bindString, - Handler: mux, - } - - go func() { - <-ctx.Done() - log.Info("Received shutdown signal for http server") - err := server.Shutdown(ctx) - if err != nil { - log.Error(err, "error stopping http server") - } - }() - - if err := server.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) { - return err - } - - log.Info("Shutting down metrics exporter") - - return nil -} diff --git a/pkg/prom/prom.go b/pkg/prom/prom.go new file mode 100644 index 0000000..b9f6613 --- /dev/null +++ b/pkg/prom/prom.go @@ -0,0 +1,54 @@ +package prom + +import ( + "context" + "fmt" + "github.com/appuio/appuio-cloud-reporting/pkg/thanos" + "github.com/prometheus/client_golang/api" + apiv1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + "time" +) + +const ( + salesOrderPromMetrics = "control_api_organization_info" + salesOrderIDLabel = "sales_order_id" +) + +func NewPrometheusAPIClient(promURL string) (apiv1.API, error) { + rt := api.DefaultRoundTripper + rt = &thanos.PartialResponseRoundTripper{ + RoundTripper: rt, + } + + client, err := api.NewClient(api.Config{ + Address: promURL, + RoundTripper: rt, + }) + + return apiv1.NewAPI(client), err +} + +// GetSalesOrderId retrieves from prometheus the sales order id associated to orgId +func GetSalesOrderId(ctx context.Context, client apiv1.API, orgId string) (string, error) { + query := salesOrderPromMetrics + fmt.Sprintf("{name=\"%s\"}", orgId) + + res, _, err := client.Query(ctx, query, time.Now()) + if err != nil { + return "", fmt.Errorf("cannot query '%s' for organisation %s, err: %v", salesOrderPromMetrics, orgId, err) + } + samples := res.(model.Vector) + if samples.Len() > 1 { + return "", fmt.Errorf("prometheus metric '%s' has multiple results for organisation %s ", salesOrderPromMetrics, orgId) + } + + return getMetricLabel(samples[0].Metric) +} + +func getMetricLabel(m model.Metric) (string, error) { + value, ok := m[model.LabelName(salesOrderIDLabel)] + if !ok { + return "", fmt.Errorf("no '%s' label in metrics '%s'", salesOrderIDLabel, salesOrderPromMetrics) + } + return string(value), nil +}