Skip to content

Commit

Permalink
use catalogd HTTP server instead of CatalogMetadata API
Browse files Browse the repository at this point in the history
Signed-off-by: Bryce Palmer <[email protected]>
  • Loading branch information
everettraven committed Sep 21, 2023
1 parent 52a1e28 commit 038505c
Show file tree
Hide file tree
Showing 13 changed files with 596 additions and 338 deletions.
14 changes: 10 additions & 4 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"flag"
"net/http"
"os"

"github.com/spf13/pflag"
Expand All @@ -35,6 +36,7 @@ import (
rukpakv1alpha1 "github.com/operator-framework/rukpak/api/v1alpha1"

operatorsv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1"
"github.com/operator-framework/operator-controller/internal/catalogmetadata/cache"
catalogclient "github.com/operator-framework/operator-controller/internal/catalogmetadata/client"
"github.com/operator-framework/operator-controller/internal/controllers"
"github.com/operator-framework/operator-controller/pkg/features"
Expand All @@ -56,14 +58,18 @@ func init() {
}

func main() {
var metricsAddr string
var enableLeaderElection bool
var probeAddr string
var (
metricsAddr string
enableLeaderElection bool
probeAddr string
cachePath string
)
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.StringVar(&cachePath, "cache-path", "/var/cache", "The local directory path used for filesystem based caching")
opts := zap.Options{
Development: true,
}
Expand Down Expand Up @@ -100,7 +106,7 @@ func main() {
}

cl := mgr.GetClient()
catalogClient := catalogclient.New(cl)
catalogClient := catalogclient.New(cl, cache.NewFilesystemCache(cachePath, http.DefaultClient))

if err = (&controllers.OperatorReconciler{
Client: cl,
Expand Down
6 changes: 6 additions & 0 deletions config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ spec:
image: controller:latest
imagePullPolicy: IfNotPresent
name: manager
volumeMounts:
- name: cache
mountPath: /var/cache
securityContext:
allowPrivilegeEscalation: false
capabilities:
Expand All @@ -97,3 +100,6 @@ spec:
memory: 64Mi
serviceAccountName: controller-manager
terminationGracePeriodSeconds: 10
volumes:
- name: cache
emptyDir: {}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/onsi/ginkgo/v2 v2.12.1
github.com/onsi/gomega v1.27.10
github.com/operator-framework/api v0.17.4-0.20230223191600-0131a6301e42
github.com/operator-framework/catalogd v0.6.0
github.com/operator-framework/catalogd v0.7.0
github.com/operator-framework/deppy v0.0.1
github.com/operator-framework/operator-registry v1.28.0
github.com/operator-framework/rukpak v0.13.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,8 @@ github.com/opencontainers/selinux v1.8.0/go.mod h1:RScLhm78qiWa2gbVCcGkC7tCGdgk3
github.com/opencontainers/selinux v1.8.2/go.mod h1:MUIHuUEvKB1wtJjQdOyYRgOnLD2xAPP8dBsCoU0KuF8=
github.com/operator-framework/api v0.17.4-0.20230223191600-0131a6301e42 h1:d/Pnr19TnmIq3zQ6ebewC+5jt5zqYbRkvYd37YZENQY=
github.com/operator-framework/api v0.17.4-0.20230223191600-0131a6301e42/go.mod h1:l/cuwtPxkVUY7fzYgdust2m9tlmb8I4pOvbsUufRb24=
github.com/operator-framework/catalogd v0.6.0 h1:dSZ54MVSHJ8hcoV7OCRxnk3x4O3ramlyPvvz0vsKYdk=
github.com/operator-framework/catalogd v0.6.0/go.mod h1:I0n086a4a+nP1YZy742IrPaWvOlWu0Mj6qA6j4K96Vg=
github.com/operator-framework/catalogd v0.7.0 h1:L0uesxq+r59rGubtxMoVtIShKn7gSSSLqxpWLfwpAaw=
github.com/operator-framework/catalogd v0.7.0/go.mod h1:tVhaenJVFTHHgdJ0Pju7U4G3epeoZfUWWM1J5nPISPQ=
github.com/operator-framework/deppy v0.0.1 h1:PLTtaFGwktPhKuKZkfUruTimrWpyaO3tghbsjs0uMjc=
github.com/operator-framework/deppy v0.0.1/go.mod h1:EV6vnxRodSFRn2TFztfxFhMPGh5QufOhn3tpIP1Z8cc=
github.com/operator-framework/operator-registry v1.28.0 h1:vtmd2WgJxkx7vuuOxW4k5Le/oo0SfonSeJVMU3rKIfk=
Expand Down
143 changes: 143 additions & 0 deletions internal/catalogmetadata/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package cache

import (
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"sync"

catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1"

"github.com/operator-framework/operator-controller/internal/catalogmetadata/client"
)

var _ client.Fetcher = &filesystemCache{}

// NewFilesystemCache returns a client.Fetcher implementation that uses a
// local filesystem to cache Catalog contents. When fetching the Catalog contents
// it will:
// - Check if the Catalog is cached
// - IF !cached it will fetch from the catalogd HTTP server and cache the response
// - IF cached it will verify the cache is up to date. If it is up to date it will return
// the cached contents, if not it will fetch the new contents from the catalogd HTTP
// server and update the cached contents.
func NewFilesystemCache(cachePath string, client *http.Client) client.Fetcher {
return &filesystemCache{
cachePath: cachePath,
mutex: sync.RWMutex{},
client: client,
cacheDataByCatalogName: map[string]cacheData{},
}
}

// cacheData holds information about a catalog
// other than it's contents that is used for
// making decisions on when to attempt to refresh
// the cache.
type cacheData struct {
ResolvedRef string
}

// FilesystemCache is a cache that
// uses the local filesystem for caching
// catalog contents. It will fetch catalog
// contents if the catalog does not already
// exist in the cache.
type filesystemCache struct {
mutex sync.RWMutex
cachePath string
client *http.Client
cacheDataByCatalogName map[string]cacheData
}

// FetchCatalogContents implements the client.Fetcher interface and
// will fetch the contents for the provided Catalog from the filesystem.
// If the provided Catalog has not yet been cached, it will make a GET
// request to the Catalogd HTTP server to get the Catalog contents and cache
// them. The cache will be updated automatically if a Catalog is noticed to
// have a different resolved image reference.
// The Catalog provided to this function is expected to:
// - Be non-nil
// - Have a non-nil Catalog.Status.ResolvedSource.Image
// This ensures that we are only attempting to fetch catalog contents for Catalog
// resources that have been successfully reconciled, unpacked, and are being served.
// These requirements help ensure that we can rely on status conditions to determine
// when to issue a request to update the cached Catalog contents.
func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *catalogd.Catalog) (io.ReadCloser, error) {
if catalog == nil {
return nil, fmt.Errorf("error: provided catalog must be non-nil")
}

if catalog.Status.ResolvedSource == nil {
return nil, fmt.Errorf("error: catalog %q has a nil status.resolvedSource value", catalog.Name)
}

if catalog.Status.ResolvedSource.Image == nil {
return nil, fmt.Errorf("error: catalog %q has a nil status.resolvedSource.image value", catalog.Name)
}

cacheDir := filepath.Join(fsc.cachePath, catalog.Name)
cacheFilePath := filepath.Join(cacheDir, "data.json")

fsc.mutex.RLock()
if data, ok := fsc.cacheDataByCatalogName[catalog.Name]; ok {
if catalog.Status.ResolvedSource.Image.Ref == data.ResolvedRef {
fsc.mutex.RUnlock()
return os.Open(cacheFilePath)
}
}
fsc.mutex.RUnlock()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, catalog.Status.ContentURL, nil)
if err != nil {
return nil, fmt.Errorf("error forming request: %s", err)
}

resp, err := fsc.client.Do(req)
if err != nil {
return nil, fmt.Errorf("error performing request: %s", err)
}
defer resp.Body.Close()

switch resp.StatusCode {
case http.StatusOK:
fsc.mutex.Lock()
defer fsc.mutex.Unlock()

// make sure we only write if this info hasn't been updated
// by another thread. The check here, if multiple threads are
// updating this, has no way to tell if the current ref is the
// newest possible ref. If another thread has already updated
// this to be the same value, skip the write logic and return
// the cached contents
if data, ok := fsc.cacheDataByCatalogName[catalog.Name]; ok {
if data.ResolvedRef == catalog.Status.ResolvedSource.Image.Ref {
break
}
}

if err = os.MkdirAll(cacheDir, os.ModePerm); err != nil {
return nil, fmt.Errorf("error creating cache directory for Catalog %q: %s", catalog.Name, err)
}

file, err := os.Create(cacheFilePath)
if err != nil {
return nil, fmt.Errorf("error creating cache file for Catalog %q: %s", catalog.Name, err)
}

if _, err := io.Copy(file, resp.Body); err != nil {
return nil, fmt.Errorf("error writing contents to cache for Catalog %q: %s", catalog.Name, err)
}

fsc.cacheDataByCatalogName[catalog.Name] = cacheData{
ResolvedRef: catalog.Status.ResolvedSource.Image.Ref,
}
default:
return nil, fmt.Errorf("error: received unexpected response status code %d", resp.StatusCode)
}

return os.Open(cacheFilePath)
}
Loading

0 comments on commit 038505c

Please sign in to comment.