From fcb56fdd451b29de5ebcebd0fcfdb474872d9520 Mon Sep 17 00:00:00 2001 From: Bryce Palmer Date: Thu, 21 Sep 2023 10:04:30 -0400 Subject: [PATCH] fix rebase issues, address review comments Signed-off-by: Bryce Palmer --- cmd/manager/main.go | 14 +- config/manager/manager.yaml | 6 + go.mod | 2 +- go.sum | 4 +- internal/catalogmetadata/cache/cache.go | 22 +- internal/catalogmetadata/cache/cache_test.go | 4 +- .../entitysources/catalogdsource.go | 210 ------------------ .../operator_framework_test.go | 8 - 8 files changed, 33 insertions(+), 237 deletions(-) delete mode 100644 internal/resolution/entitysources/catalogdsource.go diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 27b923833..38a62a10f 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -18,6 +18,7 @@ package main import ( "flag" + "net/http" "os" "github.com/spf13/pflag" @@ -35,6 +36,7 @@ import ( rukpakv1alpha1 "github.com/operator-framework/rukpak/api/v1alpha1" operatorsv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1" + "github.com/operator-framework/operator-controller/internal/catalogmetadata/cache" catalogclient "github.com/operator-framework/operator-controller/internal/catalogmetadata/client" "github.com/operator-framework/operator-controller/internal/controllers" "github.com/operator-framework/operator-controller/pkg/features" @@ -56,14 +58,18 @@ func init() { } func main() { - var metricsAddr string - var enableLeaderElection bool - var probeAddr string + var ( + metricsAddr string + enableLeaderElection bool + probeAddr string + cachePath string + ) flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") flag.BoolVar(&enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. "+ "Enabling this will ensure there is only one active controller manager.") + flag.StringVar(&cachePath, "cache-path", "/var/cache", "The local directory path used for filesystem based caching") opts := zap.Options{ Development: true, } @@ -100,7 +106,7 @@ func main() { } cl := mgr.GetClient() - catalogClient := catalogclient.New(cl) + catalogClient := catalogclient.New(cl, cache.NewFilesystemCache(cachePath, http.DefaultClient)) if err = (&controllers.OperatorReconciler{ Client: cl, diff --git a/config/manager/manager.yaml b/config/manager/manager.yaml index 16f58c1fd..23e407afc 100644 --- a/config/manager/manager.yaml +++ b/config/manager/manager.yaml @@ -72,6 +72,9 @@ spec: image: controller:latest imagePullPolicy: IfNotPresent name: manager + volumeMounts: + - name: cache + mountPath: /var/cache securityContext: allowPrivilegeEscalation: false capabilities: @@ -97,3 +100,6 @@ spec: memory: 64Mi serviceAccountName: controller-manager terminationGracePeriodSeconds: 10 + volumes: + - name: cache + emptyDir: {} \ No newline at end of file diff --git a/go.mod b/go.mod index 4f494d55f..1322419a8 100644 --- a/go.mod +++ b/go.mod @@ -9,8 +9,8 @@ require ( github.com/onsi/ginkgo/v2 v2.12.1 github.com/onsi/gomega v1.27.10 github.com/operator-framework/api v0.17.4-0.20230223191600-0131a6301e42 - github.com/operator-framework/deppy v0.0.1 github.com/operator-framework/catalogd v0.7.0 + github.com/operator-framework/deppy v0.0.1 github.com/operator-framework/operator-registry v1.28.0 github.com/operator-framework/rukpak v0.13.0 github.com/spf13/pflag v1.0.5 diff --git a/go.sum b/go.sum index c881f14d9..6276e2e79 100644 --- a/go.sum +++ b/go.sum @@ -702,8 +702,8 @@ github.com/opencontainers/selinux v1.8.0/go.mod h1:RScLhm78qiWa2gbVCcGkC7tCGdgk3 github.com/opencontainers/selinux v1.8.2/go.mod h1:MUIHuUEvKB1wtJjQdOyYRgOnLD2xAPP8dBsCoU0KuF8= github.com/operator-framework/api v0.17.4-0.20230223191600-0131a6301e42 h1:d/Pnr19TnmIq3zQ6ebewC+5jt5zqYbRkvYd37YZENQY= github.com/operator-framework/api v0.17.4-0.20230223191600-0131a6301e42/go.mod h1:l/cuwtPxkVUY7fzYgdust2m9tlmb8I4pOvbsUufRb24= -github.com/operator-framework/catalogd v0.6.0 h1:dSZ54MVSHJ8hcoV7OCRxnk3x4O3ramlyPvvz0vsKYdk= -github.com/operator-framework/catalogd v0.6.0/go.mod h1:I0n086a4a+nP1YZy742IrPaWvOlWu0Mj6qA6j4K96Vg= +github.com/operator-framework/catalogd v0.7.0 h1:L0uesxq+r59rGubtxMoVtIShKn7gSSSLqxpWLfwpAaw= +github.com/operator-framework/catalogd v0.7.0/go.mod h1:tVhaenJVFTHHgdJ0Pju7U4G3epeoZfUWWM1J5nPISPQ= github.com/operator-framework/deppy v0.0.1 h1:PLTtaFGwktPhKuKZkfUruTimrWpyaO3tghbsjs0uMjc= github.com/operator-framework/deppy v0.0.1/go.mod h1:EV6vnxRodSFRn2TFztfxFhMPGh5QufOhn3tpIP1Z8cc= github.com/operator-framework/operator-registry v1.28.0 h1:vtmd2WgJxkx7vuuOxW4k5Le/oo0SfonSeJVMU3rKIfk= diff --git a/internal/catalogmetadata/cache/cache.go b/internal/catalogmetadata/cache/cache.go index 5459447a3..1a792d9f1 100644 --- a/internal/catalogmetadata/cache/cache.go +++ b/internal/catalogmetadata/cache/cache.go @@ -24,11 +24,11 @@ var _ client.Fetcher = &filesystemCache{} // - IF cached it will verify the cache is up to date. If it is up to date it will return // the cached contents, if not it will fetch the new contents from the catalogd HTTP // server and update the cached contents. -func NewFilesystemCache(cachePath string, tripper http.RoundTripper) client.Fetcher { +func NewFilesystemCache(cachePath string, client *http.Client) client.Fetcher { return &filesystemCache{ cachePath: cachePath, mutex: sync.RWMutex{}, - tripper: tripper, + client: client, cacheDataByCatalogName: map[string]cacheData{}, } } @@ -49,7 +49,7 @@ type cacheData struct { type filesystemCache struct { mutex sync.RWMutex cachePath string - tripper http.RoundTripper + client *http.Client cacheDataByCatalogName map[string]cacheData } @@ -96,7 +96,7 @@ func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *c return nil, fmt.Errorf("error forming request: %s", err) } - resp, err := fsc.tripper.RoundTrip(req) + resp, err := fsc.client.Do(req) if err != nil { return nil, fmt.Errorf("error performing request: %s", err) } @@ -104,11 +104,6 @@ func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *c switch resp.StatusCode { case http.StatusOK: - contents, err := io.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("error reading response body: %s", err) - } - fsc.mutex.Lock() defer fsc.mutex.Unlock() @@ -128,8 +123,13 @@ func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *c return nil, fmt.Errorf("error creating cache directory for Catalog %q: %s", catalog.Name, err) } - if err = os.WriteFile(cacheFilePath, contents, os.ModePerm); err != nil { - return nil, fmt.Errorf("error caching response: %s", err) + file, err := os.Create(cacheFilePath) + if err != nil { + return nil, fmt.Errorf("error creating cache file for Catalog %q: %s", catalog.Name, err) + } + + if _, err := io.Copy(file, resp.Body); err != nil { + return nil, fmt.Errorf("error writing contents to cache for Catalog %q: %s", catalog.Name, err) } fsc.cacheDataByCatalogName[catalog.Name] = cacheData{ diff --git a/internal/catalogmetadata/cache/cache_test.go b/internal/catalogmetadata/cache/cache_test.go index 9b78d3f4d..0d9d834e8 100644 --- a/internal/catalogmetadata/cache/cache_test.go +++ b/internal/catalogmetadata/cache/cache_test.go @@ -220,7 +220,9 @@ func TestClient(t *testing.T) { ctx := context.Background() cacheDir := t.TempDir() tt.tripper.content = tt.contents - c := cache.NewFilesystemCache(cacheDir, tt.tripper) + httpClient := http.DefaultClient + httpClient.Transport = tt.tripper + c := cache.NewFilesystemCache(cacheDir, httpClient) rc, err := c.FetchCatalogContents(ctx, tt.catalog) if !tt.wantErr { diff --git a/internal/resolution/entitysources/catalogdsource.go b/internal/resolution/entitysources/catalogdsource.go deleted file mode 100644 index 56bf785da..000000000 --- a/internal/resolution/entitysources/catalogdsource.go +++ /dev/null @@ -1,210 +0,0 @@ -package entitysources - -import ( - "context" - "encoding/json" - "fmt" - "net/http" - - catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1" - "github.com/operator-framework/deppy/pkg/deppy" - "github.com/operator-framework/deppy/pkg/deppy/input" - "github.com/operator-framework/operator-registry/alpha/declcfg" - "github.com/operator-framework/operator-registry/alpha/property" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/operator-framework/operator-controller/internal/resolution/entities" -) - -// CatalogdEntitySource is a source for(/collection of) deppy defined input.Entity, built from content -// made accessible on-cluster by https://github.com/operator-framework/catalogd. -// It is an implementation of deppy defined input.EntitySource -type CatalogdEntitySource struct { - client client.Client -} - -func NewCatalogdEntitySource(client client.Client) *CatalogdEntitySource { - return &CatalogdEntitySource{ - client: client, - } -} - -func (es *CatalogdEntitySource) Get(_ context.Context, _ deppy.Identifier) (*input.Entity, error) { - panic("not implemented") -} - -func (es *CatalogdEntitySource) Filter(ctx context.Context, filter input.Predicate) (input.EntityList, error) { - resultSet := input.EntityList{} - entities, err := getEntities(ctx, es.client) - if err != nil { - return nil, err - } - for i := range entities { - if filter(&entities[i]) { - resultSet = append(resultSet, entities[i]) - } - } - return resultSet, nil -} - -func (es *CatalogdEntitySource) GroupBy(ctx context.Context, fn input.GroupByFunction) (input.EntityListMap, error) { - entities, err := getEntities(ctx, es.client) - if err != nil { - return nil, err - } - resultSet := input.EntityListMap{} - for i := range entities { - keys := fn(&entities[i]) - for _, key := range keys { - resultSet[key] = append(resultSet[key], entities[i]) - } - } - return resultSet, nil -} - -func (es *CatalogdEntitySource) Iterate(ctx context.Context, fn input.IteratorFunction) error { - entities, err := getEntities(ctx, es.client) - if err != nil { - return err - } - for i := range entities { - if err := fn(&entities[i]); err != nil { - return err - } - } - return nil -} - -func getEntities(ctx context.Context, cl client.Client) (input.EntityList, error) { - allEntitiesList := input.EntityList{} - - var catalogList catalogd.CatalogList - if err := cl.List(ctx, &catalogList); err != nil { - return nil, err - } - for _, catalog := range catalogList.Items { - channels, bundles, err := fetchCatalogMetadata(ctx, catalog.Status.ContentURL, catalog.Name) - if err != nil { - return nil, err - } - - catalogEntitiesList, err := MetadataToEntities(catalog.Name, channels, bundles) - if err != nil { - return nil, err - } - - allEntitiesList = append(allEntitiesList, catalogEntitiesList...) - } - - return allEntitiesList, nil -} - -func MetadataToEntities(catalogName string, channels []declcfg.Channel, bundles []declcfg.Bundle) (input.EntityList, error) { - entityList := input.EntityList{} - - bundlesMap := map[string]*declcfg.Bundle{} - for i := range bundles { - bundleKey := fmt.Sprintf("%s-%s", bundles[i].Package, bundles[i].Name) - bundlesMap[bundleKey] = &bundles[i] - } - - for _, ch := range channels { - for _, chEntry := range ch.Entries { - bundleKey := fmt.Sprintf("%s-%s", ch.Package, chEntry.Name) - bundle, ok := bundlesMap[bundleKey] - if !ok { - return nil, fmt.Errorf("bundle %q not found in catalog %q (package %q, channel %q)", chEntry.Name, catalogName, ch.Package, ch.Name) - } - - props := map[string]string{} - - for _, prop := range bundle.Properties { - switch prop.Type { - case property.TypePackage: - // this is already a json marshalled object, so it doesn't need to be marshalled - // like the other ones - props[property.TypePackage] = string(prop.Value) - case entities.PropertyBundleMediaType: - props[entities.PropertyBundleMediaType] = string(prop.Value) - } - } - - imgValue, err := json.Marshal(bundle.Image) - if err != nil { - return nil, err - } - props[entities.PropertyBundlePath] = string(imgValue) - - channelValue, _ := json.Marshal(property.Channel{ChannelName: ch.Name, Priority: 0}) - props[property.TypeChannel] = string(channelValue) - replacesValue, _ := json.Marshal(entities.ChannelEntry{ - Name: bundle.Name, - Replaces: chEntry.Replaces, - }) - props[entities.PropertyBundleChannelEntry] = string(replacesValue) - - catalogScopedEntryName := fmt.Sprintf("%s-%s", catalogName, bundle.Name) - entity := input.Entity{ - ID: deppy.IdentifierFromString(fmt.Sprintf("%s%s%s", catalogScopedEntryName, bundle.Package, ch.Name)), - Properties: props, - } - entityList = append(entityList, entity) - } - } - - return entityList, nil -} - -func fetchCatalogMetadata(ctx context.Context, url, catalogName string) ([]declcfg.Channel, []declcfg.Bundle, error) { - channels, err := fetchCatalogMetadataByScheme[declcfg.Channel](ctx, url, declcfg.SchemaChannel, catalogName) - if err != nil { - return nil, nil, err - } - bundles, err := fetchCatalogMetadataByScheme[declcfg.Bundle](ctx, url, declcfg.SchemaBundle, catalogName) - if err != nil { - return nil, nil, err - } - - return channels, bundles, nil -} - -type declcfgSchema interface { - declcfg.Package | declcfg.Bundle | declcfg.Channel -} - -// TODO: Cleanup once https://github.com/golang/go/issues/45380 implemented -// We should be able to get rid of the schema arg and switch based on the type passed to this generic -func fetchCatalogMetadataByScheme[T declcfgSchema](ctx context.Context, url, schema, _ string) ([]T, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) - if err != nil { - return nil, fmt.Errorf("error forming request: %s", err) - } - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return nil, fmt.Errorf("error performing request: %s", err) - } - defer resp.Body.Close() - - contents := []T{} - err = declcfg.WalkMetasReader(resp.Body, func(meta *declcfg.Meta, err error) error { - if err != nil { - return err - } - - if meta.Schema == schema { - var content T - if err := json.Unmarshal(meta.Blob, &content); err != nil { - return fmt.Errorf("error unmarshalling content: %s", err) - } - contents = append(contents, content) - } - - return nil - }) - if err != nil { - return nil, fmt.Errorf("error processing response: %s", err) - } - - return contents, nil -} diff --git a/test/operator-framework-e2e/operator_framework_test.go b/test/operator-framework-e2e/operator_framework_test.go index bc51117d7..51852f2ac 100644 --- a/test/operator-framework-e2e/operator_framework_test.go +++ b/test/operator-framework-e2e/operator_framework_test.go @@ -200,10 +200,6 @@ var _ = Describe("Operator Framework E2E for plain+v0 bundles", func() { Expect(err).ToNot(HaveOccurred()) By("creating a Catalog CR and verifying the creation of respective packages and bundle metadata") - bundleVersions := make([]string, len(bundleInfo.bundles)) - for i, bundle := range bundleInfo.bundles { - bundleVersions[i] = bundle.bundleVersion - } operatorCatalog, err = createCatalogCheckResources(operatorCatalog, catalogDInfo) Expect(err).ToNot(HaveOccurred()) @@ -336,10 +332,6 @@ var _ = Describe("Operator Framework E2E for registry+v1 bundles", func() { Expect(err).ToNot(HaveOccurred()) By("creating a Catalog CR and verifying the creation of respective packages and bundle metadata") - bundleVersions := make([]string, len(bundleInfo.bundles)) - for i, bundle := range bundleInfo.bundles { - bundleVersions[i] = bundle.bundleVersion - } operatorCatalog, err = createCatalogCheckResources(operatorCatalog, catalogDInfo) Expect(err).ToNot(HaveOccurred())