Skip to content

Commit

Permalink
Louis/remove db coalescer
Browse files Browse the repository at this point in the history
  • Loading branch information
Louis DeLosSantos authored Feb 10, 2020
1 parent ce92046 commit 66c3fba
Show file tree
Hide file tree
Showing 27 changed files with 256 additions and 229 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
vendor
*.swp
book
4 changes: 2 additions & 2 deletions alpine/ecosystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ func NewEcosystem(ctx context.Context) *indexer.Ecosystem {
RepositoryScanners: func(ctx context.Context) ([]indexer.RepositoryScanner, error) {
return []indexer.RepositoryScanner{}, nil
},
Coalescer: func(ctx context.Context, store indexer.Store) (indexer.Coalescer, error) {
return linux.NewCoalescer(store, &Scanner{}), nil
Coalescer: func(ctx context.Context) (indexer.Coalescer, error) {
return linux.NewCoalescer(), nil
},
}
}
5 changes: 5 additions & 0 deletions digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import (
"hash"
)

const (
SHA256 = "sha256"
SHA512 = "sha512"
)

// Digest is a type representing the hash of some data.
//
// It's used throughout claircore packages as an attempt to remain independent
Expand Down
4 changes: 2 additions & 2 deletions dpkg/ecosystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func NewEcosystem(ctx context.Context) *indexer.Ecosystem {
RepositoryScanners: func(ctx context.Context) ([]indexer.RepositoryScanner, error) {
return []indexer.RepositoryScanner{}, nil
},
Coalescer: func(ctx context.Context, store indexer.Store) (indexer.Coalescer, error) {
return linux.NewCoalescer(store, &Scanner{}), nil
Coalescer: func(ctx context.Context) (indexer.Coalescer, error) {
return linux.NewCoalescer(), nil
},
}
}
10 changes: 9 additions & 1 deletion internal/indexer/coalescer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,18 @@ import (
"github.com/quay/claircore"
)

// layerArifact aggregates the any artifacts found within a layer
type LayerArtifacts struct {
Hash claircore.Digest
Pkgs []*claircore.Package
Dist []*claircore.Distribution // each layer can only have a single distribution
Repos []*claircore.Repository
}

// Coalescer takes a set of layers and creates coalesced IndexReport.
//
// A coalesced IndexReport should provide only the packages present in the
// final container image once all layers were applied.
type Coalescer interface {
Coalesce(ctx context.Context, layers []*claircore.Layer) (*claircore.IndexReport, error)
Coalesce(ctx context.Context, artifacts []*LayerArtifacts) (*claircore.IndexReport, error)
}
59 changes: 45 additions & 14 deletions internal/indexer/controller/coalesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,55 @@ func coalesce(ctx context.Context, s *Controller) (State, error) {
Str("state", s.getState().String()).
Logger()
ctx = log.WithContext(ctx)
coalescers := []indexer.Coalescer{}
cctx, cancel := context.WithCancel(ctx)
defer cancel()
mu := sync.Mutex{}
reports := []*claircore.IndexReport{}
g := errgroup.Group{}
// dispatch a coalescer go routine for each ecosystem
for _, ecosystem := range s.Ecosystems {
c, err := ecosystem.Coalescer(ctx, s.Store)
artifacts := []*indexer.LayerArtifacts{}
pkgScanners, _ := ecosystem.PackageScanners(cctx)
distScanners, _ := ecosystem.DistributionScanners(cctx)
repoScanners, _ := ecosystem.RepositoryScanners(cctx)
// pack artifacts var
for _, layer := range s.manifest.Layers {
la := &indexer.LayerArtifacts{
Hash: layer.Hash,
}
var vscnrs indexer.VersionedScanners
vscnrs.PStoVS(pkgScanners)
// get packages from layer
pkgs, err := s.Store.PackagesByLayer(cctx, layer.Hash, vscnrs)
if err != nil {
// on an early return cctx is canceled, and all inflight coalescers are canceled as well
return Terminal, fmt.Errorf("failed to retrieve packages for %v: %v", layer.Hash, err)
}
la.Pkgs = append(la.Pkgs, pkgs...)
// get distributions from layer
vscnrs.DStoVS(distScanners) // method allocates new vscnr underlying array, clearing old contents
dists, err := s.Store.DistributionsByLayer(cctx, layer.Hash, vscnrs)
if err != nil {
return Terminal, fmt.Errorf("failed to retrieve distributions for %v: %v", layer.Hash, err)
}
la.Dist = append(la.Dist, dists...)
// get repositories from layer
vscnrs.RStoVS(repoScanners)
repos, err := s.Store.RepositoriesByLayer(cctx, layer.Hash, vscnrs)
if err != nil {
return Terminal, fmt.Errorf("failed to retrieve repositories for %v: %v", layer.Hash, err)
}
la.Repos = append(la.Repos, repos...)
// pack artifacts array in layer order
artifacts = append(artifacts, la)
}
coalescer, err := ecosystem.Coalescer(cctx)
if err != nil {
return Terminal, fmt.Errorf("failed to create coalescer: %v", err)
return Terminal, fmt.Errorf("failed to get coalescer from ecosystem: %v", err)
}
coalescers = append(coalescers, c)
}

mu := sync.Mutex{}
reports := []*claircore.IndexReport{}
g, gctx := errgroup.WithContext(ctx)
for _, c := range coalescers {
cc := c
// dispatch coalescer
g.Go(func() error {
sr, err := cc.Coalesce(gctx, s.manifest.Layers)
sr, err := coalescer.Coalesce(cctx, artifacts)
if err != nil {
return err
}
Expand All @@ -47,9 +80,7 @@ func coalesce(ctx context.Context, s *Controller) (State, error) {
if err := g.Wait(); err != nil {
return Terminal, err
}

s.report = MergeSR(s.report, reports)

return IndexFinished, nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/indexer/ecosystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Ecosystem struct {
PackageScanners func(ctx context.Context) ([]PackageScanner, error)
DistributionScanners func(ctx context.Context) ([]DistributionScanner, error)
RepositoryScanners func(ctx context.Context) ([]RepositoryScanner, error)
Coalescer func(ctx context.Context, store Store) (Coalescer, error)
Coalescer func(ctx context.Context) (Coalescer, error)
}

// EcosystemsToScanners extracts and dedupes multiple ecosystems and returns their discrete scanners
Expand Down
69 changes: 17 additions & 52 deletions internal/indexer/linux/coalescer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/quay/claircore"
"github.com/quay/claircore/internal/indexer"
"github.com/quay/claircore/osrelease"
)

// layerArifact aggregates the any artifacts found within a layer
Expand All @@ -22,19 +21,13 @@ type layerArtifacts struct {
// It is expected to run a coalescer per "ecosystem". For example it would make sense to coalesce results
// for dpkg, os-release, and apt scanners
type Coalescer struct {
// a store to access scanartifacts
store indexer.Store
ps indexer.PackageScanner
ds indexer.DistributionScanner
ir *claircore.IndexReport
// the IndexReport this Coalescer is working on
ir *claircore.IndexReport
}

// NewCoalescer is a constructor for a Coalescer
func NewCoalescer(store indexer.Store, ps indexer.PackageScanner) *Coalescer {
func NewCoalescer() *Coalescer {
return &Coalescer{
store: store,
ps: ps,
ds: &osrelease.Scanner{},
ir: &claircore.IndexReport{
// we will only fill these fields
Environments: map[string][]*claircore.Environment{},
Expand All @@ -48,37 +41,9 @@ func NewCoalescer(store indexer.Store, ps indexer.PackageScanner) *Coalescer {
// Coalesce coalesces artifacts found in layers and creates a final IndexReport with
// the final package details found in the image. This method blocks and when its finished
// the c.ir field will hold the final IndexReport
func (c *Coalescer) Coalesce(ctx context.Context, layers []*claircore.Layer) (*claircore.IndexReport, error) {
var err error
// populate layer artifacts
artifacts := []layerArtifacts{}
for _, layer := range layers {
a := layerArtifacts{
hash: layer.Hash,
}

a.pkgs, err = c.store.PackagesByLayer(ctx, layer.Hash, indexer.VersionedScanners{c.ps})
if err != nil {
return nil, err
}

a.dist, err = c.store.DistributionsByLayer(ctx, layer.Hash, indexer.VersionedScanners{c.ds})
if err != nil {
return nil, err
}

artifacts = append(artifacts, a)
}
err = c.coalesce(ctx, artifacts)
return c.ir, err
}

// coalesce performs the business logic of coalescing context free scanned artifacts
// into a penultimate IndexReport. this method is heavily commented to express
// the reasoning and assumptions.
func (c *Coalescer) coalesce(ctx context.Context, artifacts []layerArtifacts) error {
func (c *Coalescer) Coalesce(ctx context.Context, artifacts []*indexer.LayerArtifacts) (*claircore.IndexReport, error) {
if ctx.Err() != nil {
return ctx.Err()
return nil, ctx.Err()
}
// In our coalescing logic if a Distribution is found in layer (n) all packages found
// in layers 0-(n) will be associated with this layer. This is a heuristic.
Expand All @@ -89,8 +54,8 @@ func (c *Coalescer) coalesce(ctx context.Context, artifacts []layerArtifacts) er
// This is a requirement for handling dist upgrades where a layer may have it's operating system updated
var currDist *claircore.Distribution
for _, a := range artifacts {
if len(a.dist) != 0 {
currDist = a.dist[0]
if len(a.Dist) != 0 {
currDist = a.Dist[0]
c.ir.Distributions[currDist.ID] = currDist
break
}
Expand All @@ -108,13 +73,13 @@ func (c *Coalescer) coalesce(ctx context.Context, artifacts []layerArtifacts) er
// creating the environments we discover packages in.
for _, layerArtifacts := range artifacts {
// check if we need to update our currDist
if len(layerArtifacts.dist) != 0 {
currDist = layerArtifacts.dist[0]
if len(layerArtifacts.Dist) != 0 {
currDist = layerArtifacts.Dist[0]
c.ir.Distributions[currDist.ID] = currDist
}
// associate packages with their environments
if len(layerArtifacts.pkgs) != 0 {
for _, pkg := range layerArtifacts.pkgs {
if len(layerArtifacts.Pkgs) != 0 {
for _, pkg := range layerArtifacts.Pkgs {
// if we encounter a package where we haven't recorded a package database,
// initialize the package database
var distID string
Expand All @@ -125,7 +90,7 @@ func (c *Coalescer) coalesce(ctx context.Context, artifacts []layerArtifacts) er
packages := map[string]*claircore.Package{pkg.ID: pkg}
environment := &claircore.Environment{
PackageDB: pkg.PackageDB,
IntroducedIn: layerArtifacts.hash,
IntroducedIn: layerArtifacts.Hash,
DistributionID: distID,
}
environments := map[string]*claircore.Environment{pkg.ID: environment}
Expand All @@ -135,7 +100,7 @@ func (c *Coalescer) coalesce(ctx context.Context, artifacts []layerArtifacts) er
if _, ok := dbs[pkg.PackageDB].packages[pkg.ID]; !ok {
environment := &claircore.Environment{
PackageDB: pkg.PackageDB,
IntroducedIn: layerArtifacts.hash,
IntroducedIn: layerArtifacts.Hash,
DistributionID: distID,
}
dbs[pkg.PackageDB].packages[pkg.ID] = pkg
Expand All @@ -145,7 +110,7 @@ func (c *Coalescer) coalesce(ctx context.Context, artifacts []layerArtifacts) er
}
}
if ctx.Err() != nil {
return ctx.Err()
return nil, ctx.Err()
}
// we now have all the packages associated with their introduced in layers and environments.
// we must now prune any packages removed between layers. this coalescer works on the assumption
Expand All @@ -163,12 +128,12 @@ func (c *Coalescer) coalesce(ctx context.Context, artifacts []layerArtifacts) er
var packagesToKeep = map[string][]string{}
for i := len(artifacts) - 1; i >= 0; i-- {
layerArtifacts := artifacts[i]
if len(layerArtifacts.pkgs) == 0 {
if len(layerArtifacts.Pkgs) == 0 {
continue
}
// used as a temporary accumulator of package ids in this layer
var tmpPackagesToKeep = map[string][]string{}
for _, pkg := range layerArtifacts.pkgs {
for _, pkg := range layerArtifacts.Pkgs {
// have we already inventoried packages from this database ?
if _, ok := packagesToKeep[pkg.PackageDB]; !ok {
// ... we haven't so add to our temporary accumlator
Expand Down Expand Up @@ -203,5 +168,5 @@ func (c *Coalescer) coalesce(ctx context.Context, artifacts []layerArtifacts) er
c.ir.Environments[pkg.ID] = append(c.ir.Environments[pkg.ID], db.environments[pkg.ID])
}
}
return nil
return c.ir, nil
}
Loading

0 comments on commit 66c3fba

Please sign in to comment.