From abe6e7aa7197189bb0e73d80f0f5d129c00b2608 Mon Sep 17 00:00:00 2001 From: MikeAlejoBR Date: Fri, 20 Dec 2024 16:52:42 +0100 Subject: [PATCH] refactor: Sources' REST API client The REST API functions that were being used were almost a copy paste of one another, which made following the code difficult. The goal of the refactor was to simplify the code and make it easier to follow and reason about. Also, it introduces a timeout for the requests by default, which could be the cause for which the Superkey struggles so much and ends up restarting non-stop. Another goal of this refactor was to leave everything in an state where it is easier to reason about how the Superkey worker should be refactored further, like introducing proper dependency injection and untangling the resource making from the "ForgeApplication" struct. However, that's for another PR. RHCLOUD-35843 --- config/config.go | 76 ++++--- provider/forge.go | 11 +- sources/api_client.go | 346 ++++++++++++++------------------ sources/api_client_interface.go | 24 +++ superkey/create_request.go | 25 ++- superkey/forged_application.go | 50 +++-- superkey/types.go | 2 - 7 files changed, 280 insertions(+), 254 deletions(-) create mode 100644 sources/api_client_interface.go diff --git a/config/config.go b/config/config.go index 6ec62ad..39a32ea 100644 --- a/config/config.go +++ b/config/config.go @@ -11,21 +11,22 @@ import ( // SuperKeyWorkerConfig is the struct for storing runtime configuration type SuperKeyWorkerConfig struct { - Hostname string - KafkaBrokerConfig []clowder.BrokerConfig - KafkaTopics map[string]string - KafkaGroupID string - MetricsPort int - LogLevel string - LogGroup string - LogHandler string - AwsRegion string - AwsAccessKeyID string - AwsSecretAccessKey string - SourcesHost string - SourcesScheme string - SourcesPort int - SourcesPSK string + Hostname string + KafkaBrokerConfig []clowder.BrokerConfig + KafkaTopics map[string]string + KafkaGroupID string + MetricsPort int + LogLevel string + LogGroup string + LogHandler string + AwsRegion string + AwsAccessKeyID string + AwsSecretAccessKey string + SourcesHost string + SourcesScheme string + SourcesPort int + SourcesPSK string + SourcesRequestsMaxAttempts int } // Get - returns the config parsed from runtime vars @@ -81,6 +82,20 @@ func Get() *SuperKeyWorkerConfig { options.SetDefault("SourcesPort", os.Getenv("SOURCES_PORT")) options.SetDefault("SourcesPSK", os.Getenv("SOURCES_PSK")) + // Get the number of maximum request attempts we want to make to the Sources' API. + sourcesRequestsMaxAttempts, err := strconv.Atoi(os.Getenv("SOURCES_MAX_ATTEMPTS")) + if err != nil { + log.Printf(`Warning: the provided max attempts value \"%s\" is not an integer. Setting default value of 1.`, os.Getenv("SOURCES_MAX_ATTEMPTS")) + sourcesRequestsMaxAttempts = 1 + } + + if sourcesRequestsMaxAttempts < 1 { + log.Printf(`Warning: the provided max attempts value \"%s\" is lower than 1, and we need to at least make one attempt when calling Sources. Setting default value of 1.`, os.Getenv("SOURCES_MAX_ATTEMPTS")) + sourcesRequestsMaxAttempts = 1 + } + + options.SetDefault("SourcesRequestsMaxAttempts", sourcesRequestsMaxAttempts) + hostname, _ := os.Hostname() options.SetDefault("Hostname", hostname) @@ -94,21 +109,22 @@ func Get() *SuperKeyWorkerConfig { options.AutomaticEnv() return &SuperKeyWorkerConfig{ - Hostname: options.GetString("Hostname"), - KafkaBrokerConfig: brokerConfig, - KafkaTopics: options.GetStringMapString("KafkaTopics"), - KafkaGroupID: options.GetString("KafkaGroupID"), - MetricsPort: options.GetInt("MetricsPort"), - LogLevel: options.GetString("LogLevel"), - LogHandler: options.GetString("LogHandler"), - LogGroup: options.GetString("LogGroup"), - AwsRegion: options.GetString("AwsRegion"), - AwsAccessKeyID: options.GetString("AwsAccessKeyID"), - AwsSecretAccessKey: options.GetString("AwsSecretAccessKey"), - SourcesHost: options.GetString("SourcesHost"), - SourcesScheme: options.GetString("SourcesScheme"), - SourcesPort: options.GetInt("SourcesPort"), - SourcesPSK: options.GetString("SourcesPSK"), + Hostname: options.GetString("Hostname"), + KafkaBrokerConfig: brokerConfig, + KafkaTopics: options.GetStringMapString("KafkaTopics"), + KafkaGroupID: options.GetString("KafkaGroupID"), + MetricsPort: options.GetInt("MetricsPort"), + LogLevel: options.GetString("LogLevel"), + LogHandler: options.GetString("LogHandler"), + LogGroup: options.GetString("LogGroup"), + AwsRegion: options.GetString("AwsRegion"), + AwsAccessKeyID: options.GetString("AwsAccessKeyID"), + AwsSecretAccessKey: options.GetString("AwsSecretAccessKey"), + SourcesHost: options.GetString("SourcesHost"), + SourcesScheme: options.GetString("SourcesScheme"), + SourcesPort: options.GetInt("SourcesPort"), + SourcesPSK: options.GetString("SourcesPSK"), + SourcesRequestsMaxAttempts: options.GetInt("SourcesRequestsMaxAttempts"), } } diff --git a/provider/forge.go b/provider/forge.go index 50fddca..d4ec774 100644 --- a/provider/forge.go +++ b/provider/forge.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/redhatinsights/sources-superkey-worker/amazon" + "github.com/redhatinsights/sources-superkey-worker/config" "github.com/redhatinsights/sources-superkey-worker/sources" "github.com/redhatinsights/sources-superkey-worker/superkey" ) @@ -45,8 +46,14 @@ func TearDown(ctx context.Context, f *superkey.ForgedApplication) []error { // getProvider returns a provider based on create request's provider + credentials func getProvider(ctx context.Context, request *superkey.CreateRequest) (superkey.Provider, error) { - client := sources.SourcesClient{AccountNumber: request.TenantID, IdentityHeader: request.IdentityHeader, OrgId: request.OrgIdHeader} - auth, err := client.GetInternalAuthentication(ctx, request.SuperKey) + sourcesRestClient := sources.NewSourcesClient(config.Get()) + + authData := sources.AuthenticationData{ + IdentityHeader: request.IdentityHeader, + OrgId: request.OrgIdHeader, + } + + auth, err := sourcesRestClient.GetInternalAuthentication(ctx, &authData, request.SuperKey) if err != nil { return nil, fmt.Errorf(`error while fetching internal authentication "%s" from Sources: %w`, request.SuperKey, err) } diff --git a/sources/api_client.go b/sources/api_client.go index e5315e9..a99b27a 100644 --- a/sources/api_client.go +++ b/sources/api_client.go @@ -16,269 +16,225 @@ import ( "github.com/sirupsen/logrus" ) -var conf = config.Get() +// sourcesClient holds the required information to be able to send requests back to the Sources API. +type sourcesClient struct { + baseV31URL *url.URL + baseV20InternalUrl *url.URL + config *config.SuperKeyWorkerConfig +} -type SourcesClient struct { +// AuthenticationData +type AuthenticationData struct { IdentityHeader string OrgId string AccountNumber string } -func (sc *SourcesClient) CheckAvailability(ctx context.Context, sourceId string) error { - reqURL, _ := url.Parse(fmt.Sprintf( - "%v://%v:%v/api/sources/v3.1/sources/%v/check_availability", conf.SourcesScheme, conf.SourcesHost, conf.SourcesPort, sourceId, - )) - - req := &http.Request{ - Method: http.MethodPost, - URL: reqURL, - Header: sc.headers(), - } - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("unable to send request: %w", err) - } - - l.LogWithContext(ctx).WithField("request_url", reqURL).Debugf("Requesting an availability check") +// PatchApplicationRequest represents the fields that we might want to update when updating the application's details. +// +// The AvailabilityStatus field represents the current application's availability status. +// The AvailabilityStatusError field gives information about why the status might not be "available". +// The Extra field allows adding extra fields to the application, such as the Superkey key. +type PatchApplicationRequest struct { + AvailabilityStatus *string `json:"availability_status"` + AvailabilityStatusError *string `json:"availability_status_error"` + Extra map[string]interface{} `json:"extra"` +} - defer resp.Body.Close() +// PatchSourceRequest represents the availability status field that we might want to update in a Source. +// +// The AvailabilityStatus field represents the current sources' availability status. +type PatchSourceRequest struct { + AvailabilityStatus *string `json:"availability_status"` +} - if resp.StatusCode != 202 { - return fmt.Errorf(`expecting a 202 status code, got "%d"`, resp.StatusCode) +// NewSourcesClient initializes a new SourcesClient to be able to communicate with the Sources API. +func NewSourcesClient(config *config.SuperKeyWorkerConfig) *sourcesClient { + return &sourcesClient{ + baseV20InternalUrl: &url.URL{ + Host: fmt.Sprintf("%s:%d", config.SourcesHost, config.SourcesPort), + Path: "/internal/v2.0/", + Scheme: config.SourcesScheme, + }, + baseV31URL: &url.URL{ + Host: fmt.Sprintf("%s:%d", config.SourcesHost, config.SourcesPort), + Path: "/api/sources/v3.1", + Scheme: config.SourcesScheme, + }, + config: config, } - - return nil } -func (sc *SourcesClient) CreateAuthentication(ctx context.Context, auth *model.AuthenticationCreateRequest) error { - reqURL, _ := url.Parse(fmt.Sprintf( - "%v://%v:%v/api/sources/v3.1/authentications", conf.SourcesScheme, conf.SourcesHost, conf.SourcesPort, - )) - - body, err := json.Marshal(auth) - if err != nil { - return fmt.Errorf("failed to marshal request body: %w", err) - } +func (sc *sourcesClient) TriggerSourceAvailabilityCheck(ctx context.Context, authData *AuthenticationData, sourceId string) error { + checkAvailabilityUrl := sc.baseV31URL.JoinPath("/sources/", url.PathEscape(sourceId), "/check_availability") - l.LogWithContext(ctx).WithFields(logrus.Fields{"request_url": reqURL, "body": string(body)}).Debugf("Creating authentication in Sources") + return sc.sendRequest(ctx, http.MethodPost, checkAvailabilityUrl, authData, nil, nil) +} - req := &http.Request{ - Method: http.MethodPost, - URL: reqURL, - Header: sc.headers(), - Body: io.NopCloser(bytes.NewBuffer(body)), - } +func (sc *sourcesClient) CreateAuthentication(ctx context.Context, authData *AuthenticationData, sourcesAuthentication *model.AuthenticationCreateRequest) (*model.AuthenticationResponse, error) { + createAuthenticationUrl := sc.baseV31URL.JoinPath("/authentications") - resp, err := http.DefaultClient.Do(req) + var createdAuthentication model.AuthenticationResponse + err := sc.sendRequest(ctx, http.MethodPost, createAuthenticationUrl, authData, sourcesAuthentication, createdAuthentication) if err != nil { - return fmt.Errorf("unable to send request: %w", err) + return nil, fmt.Errorf("error while creating authentication: %w", err) } - defer resp.Body.Close() - - if resp.StatusCode > 299 { - b, _ := io.ReadAll(resp.Body) - return fmt.Errorf(`expecting a 200 status code, got "%d" with body "%s"`, resp.StatusCode, string(b)) - } + return &createdAuthentication, nil +} - bytes, _ := io.ReadAll(resp.Body) - var createdAuth model.AuthenticationResponse - err = json.Unmarshal(bytes, &createdAuth) - if err != nil { - return fmt.Errorf("unable to unmarshal authentication creation response from Sources: %w", err) - } +func (sc *sourcesClient) CreateApplicationAuthentication(ctx context.Context, authData *AuthenticationData, appAuthCreateRequest *model.ApplicationAuthenticationCreateRequest) error { + createApplicationAuthenticationUrl := sc.baseV31URL.JoinPath("/application_authentications") - err = sc.createApplicationAuthentication(ctx, &model.ApplicationAuthenticationCreateRequest{ - ApplicationIDRaw: auth.ResourceIDRaw, - AuthenticationIDRaw: createdAuth.ID, - }) + err := sc.sendRequest(ctx, http.MethodPost, createApplicationAuthenticationUrl, authData, appAuthCreateRequest, nil) if err != nil { - return err + return fmt.Errorf("error while creating the application authentication: %w", err) } return nil } -func (sc *SourcesClient) PatchApplication(ctx context.Context, appID string, payload map[string]interface{}) error { - reqURL, _ := url.Parse(fmt.Sprintf( - "%v://%v:%v/api/sources/v3.1/applications/%v", conf.SourcesScheme, conf.SourcesHost, conf.SourcesPort, appID, - )) +func (sc *sourcesClient) PatchApplication(ctx context.Context, authData *AuthenticationData, appId string, patchApplicationRequest *PatchApplicationRequest) error { + patchApplicationUrl := sc.baseV31URL.JoinPath("/applications/", url.PathEscape(appId)) - body, err := json.Marshal(payload) - if err != nil { - return fmt.Errorf("failed to marshal request body: %w", err) - } - - l.LogWithContext(ctx).WithFields(logrus.Fields{"request_url": reqURL, "body": string(body)}).Debugf("Patching application in Sources") + return sc.sendRequest(ctx, http.MethodPatch, patchApplicationUrl, authData, patchApplicationRequest, nil) +} - req := &http.Request{ - Method: http.MethodPatch, - URL: reqURL, - Header: sc.headers(), - Body: io.NopCloser(bytes.NewBuffer(body)), - } +func (sc *sourcesClient) PatchSource(ctx context.Context, authData *AuthenticationData, sourceId string, patchSourceRequest *PatchSourceRequest) error { + patchSourceUrl := sc.baseV31URL.JoinPath("/sources/" + url.PathEscape(sourceId)) - resp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("unable to send request: %w", err) - } + return sc.sendRequest(ctx, http.MethodPatch, patchSourceUrl, authData, patchSourceRequest, nil) +} - defer resp.Body.Close() +func (sc *sourcesClient) GetInternalAuthentication(ctx context.Context, authData *AuthenticationData, authId string) (*model.AuthenticationInternalResponse, error) { + getInternalAuthUrl := sc.baseV20InternalUrl.JoinPath("/authentications/", url.PathEscape(authId), "/?expose_encrypted_attribute[]=password") - if resp.StatusCode > 299 { - b, _ := io.ReadAll(resp.Body) - return fmt.Errorf(`expecting a 200 status code, got "%d" with body "%s"`, resp.StatusCode, string(b)) - } + var authInternalResponse *model.AuthenticationInternalResponse = nil + err := sc.sendRequest(ctx, http.MethodGet, getInternalAuthUrl, authData, nil, &authInternalResponse) - return nil + return authInternalResponse, err } -func (sc *SourcesClient) PatchSource(ctx context.Context, sourceId string, payload map[string]interface{}) error { - reqURL, _ := url.Parse(fmt.Sprintf( - "%v://%v:%v/api/sources/v3.1/sources/%v", conf.SourcesScheme, conf.SourcesHost, conf.SourcesPort, sourceId, - )) - - body, err := json.Marshal(payload) - if err != nil { - return err - } +// sendRequest sends a request with the provided method and body to the given url, performing a maximum number of +// attempts and marshaling the incoming response's body. You can leave the body and the marshalTarget arguments empty +// if you do not require them. +func (sc *sourcesClient) sendRequest(ctx context.Context, httpMethod string, url *url.URL, authData *AuthenticationData, body interface{}, marshalTarget interface{}) error { + // Set up a timeout so that the requests don't hang up forever. + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + // When a body is specified, attempt to marshal it as JSON. + var requestBody *bytes.Buffer = nil + if body != nil { + tmp, err := json.Marshal(body) + if err != nil { + return fmt.Errorf("failed to marshal request body: %w", err) + } - req := &http.Request{ - Method: http.MethodPatch, - URL: reqURL, - Header: sc.headers(), - Body: io.NopCloser(bytes.NewBuffer(body)), + requestBody = bytes.NewBuffer(tmp) } - l.LogWithContext(ctx).WithFields(logrus.Fields{"request_url": reqURL, "body": string(body)}).Debugf("Patching source in Sources") - - resp, err := http.DefaultClient.Do(req) + // Create the request. + request, err := http.NewRequestWithContext(ctx, httpMethod, url.String(), requestBody) if err != nil { - return fmt.Errorf("unable to send request: %w", err) + return fmt.Errorf(`failed to create request: %w`, err) } - defer resp.Body.Close() + // Include the headers in the request. + sc.addAuthenticationHeaders(request, authData) - if resp.StatusCode > 299 { - b, _ := io.ReadAll(resp.Body) - return fmt.Errorf(`expecting a 200 status code, got "%d" with body "%s"`, resp.StatusCode, string(b)) - } + // Perform the actual request. + var response *http.Response + for attempt := 0; attempt < sc.config.SourcesRequestsMaxAttempts; attempt++ { + response, err = http.DefaultClient.Do(request) - return nil -} + // The "err" check is to avoid nil dereference errors, since if we attempt checking for the status code + // directly when an error has occurred, the "response" struct might be nil. + if err == nil && sc.isStatusCodeFamilyOf2xx(response.StatusCode) { + break + } -// GetInternalAuthentication requests an authentication via the internal sources api -// that way we can expose the password. -// returns: populated sources api Authentication object, error -func (sc *SourcesClient) GetInternalAuthentication(ctx context.Context, authID string) (*model.AuthenticationInternalResponse, error) { - reqURL, _ := url.Parse(fmt.Sprintf( - "%v://%v:%v/internal/v2.0/authentications/%v?expose_encrypted_attribute[]=password", conf.SourcesScheme, conf.SourcesHost, conf.SourcesPort, authID, - )) - - req := &http.Request{ - Method: http.MethodGet, - URL: reqURL, - Header: sc.headers(), - } + // When there are no errors but the status code is not the expected one, we attempt to drain the body so that + // the default client can reuse the connection, and then we close the body to avoid memory leaks. + if err == nil && !sc.isStatusCodeFamilyOf2xx(response.StatusCode) { + _, drainErr := io.Copy(io.Discard, response.Body) - var res *http.Response - var err error - for retry := 0; retry < 5; retry++ { - l.LogWithContext(ctx).WithFields(logrus.Fields{"request_url": reqURL, "authentication_id": authID}).Debugf("Getting internal authentication from Sources") + if drainErr != nil { + l.Log.WithFields(logrus.Fields{}).Warnf("Unable to drain response body. The connection will not be reused by the default HTTP client: %s", drainErr) + } - res, err = http.DefaultClient.Do(req) + if closeErr := response.Body.Close(); closeErr != nil { + l.Log.WithFields(logrus.Fields{}).Errorf("Failed to close incoming response's body: %s", closeErr) + } - if err != nil || res.StatusCode == 200 { - defer res.Body.Close() - break - } else { - l.LogWithContext(ctx).WithField("authentication_id", authID).Warn("Unable to fetch internal authentication. Retrying...") - time.Sleep(3 * time.Second) + l.Log.WithFields(logrus.Fields{}).Debugf(`Unexpected status code received. Want "2xx", got "%d"`, response.StatusCode) + continue + } + + if err != nil { + l.Log.WithFields(logrus.Fields{}).Warn("Failed to send request. Retrying...") + l.Log.WithFields(logrus.Fields{}).Debugf("Failed to send request. Retrying... Cause: %s", err) } } - if err != nil || res.StatusCode != 200 { - return nil, fmt.Errorf(`unable to fetch internal authentication "%s" after 5 retries: %w`, authID, err) + // In the case in which we deplete all the attempts, we have to return the error and stop the execution here. + if err != nil || response == nil { + return fmt.Errorf("failed to send request: %w", err) } - data, _ := io.ReadAll(res.Body) - auth := model.AuthenticationInternalResponse{} + // Always read the response body, in case we need to return it in an error or marshal it to a struct. + responseBody, err := io.ReadAll(response.Body) + if err != nil { + return fmt.Errorf(`failed to read response body: %w`, err) + } - // unmarshaling the data from the request, the id comes back as a string which fills `err` - // we can safely ignore that as long as username/pass are there. - err = json.Unmarshal(data, &auth) - if err != nil && (auth.Username == "" || auth.Password == "") { - return nil, fmt.Errorf(`internal authentication "%s"'s username or password are empty'`, authID) + // Make sure that the status code is a "2xx" one. + if !sc.isStatusCodeFamilyOf2xx(response.StatusCode) { + return fmt.Errorf(`unexpected status code received. Want "2xx", got "%d". Response body: %s`, response.StatusCode, string(responseBody)) } - return &auth, nil + // We might need to marshal the incoming response in the specified struct. + if marshalTarget != nil { + err = json.Unmarshal(responseBody, &marshalTarget) + if err != nil { + return fmt.Errorf(`failed to unmarshal response body: %w`, err) + } + } + + return nil } -func (sc *SourcesClient) headers() map[string][]string { - var headers = make(map[string][]string) +// isStatusCodeFamilyOf2xx returns true if the given status code is a 2xx status code. +func (sc *sourcesClient) isStatusCodeFamilyOf2xx(statusCode int) bool { + return statusCode >= 200 && statusCode < 300 +} - headers["Content-Type"] = []string{"application/json"} +func (sc *sourcesClient) addAuthenticationHeaders(request *http.Request, authData *AuthenticationData) { + request.Header.Add("Content-Type", "application/json") - if conf.SourcesPSK == "" { + if sc.config.SourcesPSK == "" { var xRhId string - if sc.IdentityHeader == "" { - xRhId = encodeIdentity(sc.AccountNumber, sc.OrgId) + if authData.IdentityHeader == "" { + xRhId = encodeIdentity(authData.AccountNumber, authData.OrgId) } else { - xRhId = sc.IdentityHeader + xRhId = authData.IdentityHeader } - headers["x-rh-identity"] = []string{xRhId} + request.Header.Add("x-rh-identity", xRhId) } else { - headers["x-rh-sources-psk"] = []string{conf.SourcesPSK} + request.Header.Add("x-rh-sources-psk", sc.config.SourcesPSK) - if sc.AccountNumber != "" { - headers["x-rh-sources-account-number"] = []string{sc.AccountNumber} + if authData.AccountNumber != "" { + request.Header.Add("x-rh-sources-account-number", authData.AccountNumber) } - if sc.IdentityHeader != "" { - headers["x-rh-identity"] = []string{sc.IdentityHeader} + if authData.IdentityHeader != "" { + request.Header.Add("x-rh-identity", authData.IdentityHeader) } - if sc.OrgId != "" { - headers["x-rh-org-id"] = []string{sc.OrgId} + if authData.OrgId != "" { + request.Header.Add("x-rh-org-id", authData.OrgId) } } - - return headers -} - -func (sc *SourcesClient) createApplicationAuthentication(ctx context.Context, appAuth *model.ApplicationAuthenticationCreateRequest) error { - reqURL, _ := url.Parse(fmt.Sprintf( - "%v://%v:%v/api/sources/v3.1/application_authentications", conf.SourcesScheme, conf.SourcesHost, conf.SourcesPort, - )) - - body, err := json.Marshal(appAuth) - if err != nil { - return err - } - - req := &http.Request{ - Method: http.MethodPost, - URL: reqURL, - Header: sc.headers(), - Body: io.NopCloser(bytes.NewBuffer(body)), - } - - l.LogWithContext(ctx).WithFields(logrus.Fields{"request_url": reqURL, "body": string(body)}).Debugf("Creating application authentication in Sources") - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("unable to send request: %w", err) - } - - defer resp.Body.Close() - - if resp.StatusCode > 299 { - b, _ := io.ReadAll(resp.Body) - return fmt.Errorf(`expecting a 200 status code, got "%d" with body "%s"`, resp.StatusCode, string(b)) - } - - return nil } diff --git a/sources/api_client_interface.go b/sources/api_client_interface.go new file mode 100644 index 0000000..17a1728 --- /dev/null +++ b/sources/api_client_interface.go @@ -0,0 +1,24 @@ +package sources + +import ( + "context" + + "github.com/RedHatInsights/sources-api-go/model" +) + +// RestClient represents the Sources' endpoints that are required for the Superkey to be able to talk to the Sources' +// API. +type RestClient interface { + // TriggerSourceAvailabilityCheck triggers an availability status check in the Sources API for the given source. + TriggerSourceAvailabilityCheck(ctx context.Context, authData *AuthenticationData, sourceId string) error + // CreateAuthentication creates an authentication in Sources. + CreateAuthentication(ctx context.Context, authData *AuthenticationData, sourcesAuthentication *model.AuthenticationCreateRequest) (*model.AuthenticationResponse, error) + // CreateApplicationAuthentication links the created authentication with an application in Sources. + CreateApplicationAuthentication(ctx context.Context, authData *AuthenticationData, appAuthCreateRequest *model.ApplicationAuthenticationCreateRequest) error + // PatchApplication modifies an application in Sources. + PatchApplication(ctx context.Context, authData *AuthenticationData, appId string, patchApplicationRequest *PatchApplicationRequest) error + // PatchSource modifies an application in Sources. + PatchSource(ctx context.Context, authData *AuthenticationData, sourceId string, patchSourceRequest *PatchSourceRequest) error + // GetInternalAuthentication fetches an authentication using the internal Sources' endpoint, which ensure that the authentication will have the password as well. + GetInternalAuthentication(ctx context.Context, authData *AuthenticationData, authId string) (*model.AuthenticationInternalResponse, error) +} diff --git a/superkey/create_request.go b/superkey/create_request.go index c3854b7..7fa727c 100644 --- a/superkey/create_request.go +++ b/superkey/create_request.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/redhatinsights/sources-superkey-worker/config" l "github.com/redhatinsights/sources-superkey-worker/logger" "github.com/redhatinsights/sources-superkey-worker/sources" ) @@ -26,24 +27,28 @@ func (req *CreateRequest) MarkSourceUnavailable(ctx context.Context, incomingErr newApplication = &ForgedApplication{} } - if newApplication.SourcesClient == nil { - newApplication.SourcesClient = &sources.SourcesClient{IdentityHeader: req.IdentityHeader, OrgId: req.OrgIdHeader, AccountNumber: req.TenantID} + sourcesClient := sources.NewSourcesClient(config.Get()) + + authData := &sources.AuthenticationData{ + IdentityHeader: req.IdentityHeader, + OrgId: req.OrgIdHeader, + } + + patchAppRequestBody := &sources.PatchApplicationRequest{ + AvailabilityStatus: &availabilityStatus, + AvailabilityStatusError: &availabilityStatusError, + Extra: extra, } - err := newApplication.SourcesClient.PatchApplication(ctx, req.ApplicationID, map[string]interface{}{ - "availability_status": availabilityStatus, - "availability_status_error": availabilityStatusError, - "extra": extra, - }) + err := sourcesClient.PatchApplication(ctx, authData, req.ApplicationID, patchAppRequestBody) if err != nil { return fmt.Errorf("error while updating the application: %w", err) } l.LogWithContext(ctx).Info(`Application marked as "unavailable"`) - err = newApplication.SourcesClient.PatchSource(ctx, req.SourceID, map[string]interface{}{ - "availability_status": availabilityStatus, - }) + patchSourceRequestBody := &sources.PatchSourceRequest{AvailabilityStatus: &availabilityStatus} + err = sourcesClient.PatchSource(ctx, authData, req.SourceID, patchSourceRequestBody) if err != nil { return fmt.Errorf("error while updating the source: %w", err) } diff --git a/superkey/forged_application.go b/superkey/forged_application.go index 00c0869..6fcc1bc 100644 --- a/superkey/forged_application.go +++ b/superkey/forged_application.go @@ -8,6 +8,7 @@ import ( "time" "github.com/RedHatInsights/sources-api-go/model" + "github.com/redhatinsights/sources-superkey-worker/config" l "github.com/redhatinsights/sources-superkey-worker/logger" "github.com/redhatinsights/sources-superkey-worker/sources" ) @@ -40,27 +41,24 @@ func (f *ForgedApplication) CreateInSourcesAPI(ctx context.Context) error { // before it's ready. time.Sleep(waitTime() * time.Second) - // create a sources client for our identity + account number - if f.SourcesClient == nil { - f.SourcesClient = &sources.SourcesClient{IdentityHeader: f.Request.IdentityHeader, OrgId: f.Request.OrgIdHeader, AccountNumber: f.Request.TenantID} - } + sourcesClient := sources.NewSourcesClient(config.Get()) l.LogWithContext(ctx).Debugf("Posting resources back to Sources API: %v", f) - err := f.storeSuperKeyData(ctx) + err := f.storeSuperKeyData(ctx, sourcesClient) if err != nil { return fmt.Errorf("error while storing the superkey data in Sources: %w", err) } l.LogWithContext(ctx).Info("Superkey data stored in Sources") - err = f.createAuthentications(ctx) + err = f.createAuthentications(ctx, sourcesClient) if err != nil { return fmt.Errorf("error while creating the authentications in Sources: %w", err) } l.LogWithContext(ctx).Info("Authentications created in Sources") - err = f.checkAvailability(ctx) + err = f.checkAvailability(ctx, sourcesClient) if err != nil { return fmt.Errorf("error while triggering an availability check in Sources: %w", err) } @@ -71,13 +69,18 @@ func (f *ForgedApplication) CreateInSourcesAPI(ctx context.Context) error { return nil } -func (f *ForgedApplication) createAuthentications(ctx context.Context) error { +func (f *ForgedApplication) createAuthentications(ctx context.Context, sourcesRestClient sources.RestClient) error { extra := map[string]interface{}{} externalID, ok := f.Request.Extra["external_id"] if ok { extra["external_id"] = externalID } + authData := sources.AuthenticationData{ + IdentityHeader: f.Request.IdentityHeader, + OrgId: f.Request.OrgIdHeader, + } + auth := model.AuthenticationCreateRequest{ AuthType: f.Product.AuthPayload.AuthType, Username: f.Product.AuthPayload.Username, @@ -86,19 +89,31 @@ func (f *ForgedApplication) createAuthentications(ctx context.Context) error { Extra: extra, } - err := f.SourcesClient.CreateAuthentication(ctx, &auth) + createdAuthentication, err := sourcesRestClient.CreateAuthentication(ctx, &authData, &auth) if err != nil { return fmt.Errorf("error while creating the authentication in Sources: %w", err) } + appAuthBody := model.ApplicationAuthenticationCreateRequest{ + ApplicationIDRaw: f.Request.ApplicationID, + AuthenticationIDRaw: createdAuthentication.ID, + } + + err = sourcesRestClient.CreateApplicationAuthentication(ctx, &authData, &appAuthBody) + if err != nil { + return fmt.Errorf("error while associating the authentication with an application in Sources: %w", err) + } + return nil } -func (f *ForgedApplication) storeSuperKeyData(ctx context.Context) error { - err := f.SourcesClient.PatchApplication(ctx, f.Request.ApplicationID, map[string]interface{}{ - "extra": f.Product.Extra, - }) +func (f *ForgedApplication) storeSuperKeyData(ctx context.Context, sourcesRestClient sources.RestClient) error { + authData := sources.AuthenticationData{ + IdentityHeader: f.Request.IdentityHeader, + OrgId: f.Request.OrgIdHeader, + } + err := sourcesRestClient.PatchApplication(ctx, &authData, f.Request.ApplicationID, &sources.PatchApplicationRequest{Extra: f.Product.Extra}) if err != nil { return fmt.Errorf("failed to update application with superkey data: %w", err) } @@ -106,8 +121,13 @@ func (f *ForgedApplication) storeSuperKeyData(ctx context.Context) error { return nil } -func (f *ForgedApplication) checkAvailability(ctx context.Context) error { - err := f.SourcesClient.CheckAvailability(ctx, f.Product.SourceID) +func (f *ForgedApplication) checkAvailability(ctx context.Context, sourcesRestClient sources.RestClient) error { + authData := sources.AuthenticationData{ + IdentityHeader: f.Request.IdentityHeader, + OrgId: f.Request.OrgIdHeader, + } + + err := sourcesRestClient.TriggerSourceAvailabilityCheck(ctx, &authData, f.Product.SourceID) if err != nil { return err } diff --git a/superkey/types.go b/superkey/types.go index e96b46a..53cebf7 100644 --- a/superkey/types.go +++ b/superkey/types.go @@ -4,7 +4,6 @@ import ( "context" "github.com/RedHatInsights/sources-api-go/model" - "github.com/redhatinsights/sources-superkey-worker/sources" ) // CreateRequest - struct representing a request for a superkey @@ -56,7 +55,6 @@ type ForgedApplication struct { Request *CreateRequest Client Provider GUID string - SourcesClient *sources.SourcesClient } // Provider the interface for all of the superkey providers currently just a