Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Object storage support #1570

Merged
merged 38 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
0b605bd
tsparser: initial object storage implementation
eandre Sep 6, 2024
5f8e5f7
runtimes/js: initial interface sketch
eandre Sep 6, 2024
23c4ce9
runtimes/core: initial bucket impl
eandre Sep 6, 2024
36d62c5
WIP on object-storage
eandre Oct 18, 2024
6550162
Add basic demo impl with `exists`
eandre Oct 20, 2024
6d550eb
Add more object storage operations
eandre Oct 21, 2024
8428eef
Add delete, usage parsing
eandre Oct 24, 2024
53bd618
Add support for infra namespaces
eandre Oct 24, 2024
e1c9d37
Add key prefix support
eandre Oct 29, 2024
84aa985
Add s3 implementation
eandre Oct 29, 2024
264ea67
runtimes/js: improve errors, fix upload return type
eandre Oct 30, 2024
932a646
Fix s3 exists
eandre Oct 30, 2024
0b6b6a9
Vendor gcsemu with minor tweaks
eandre Oct 30, 2024
44029ab
Add options to most apis
eandre Oct 30, 2024
1663b00
runtimes/core: better error handling, api
eandre Oct 31, 2024
b0318e3
Add object storage tracing
eandre Oct 31, 2024
dfea583
Add bucket versioned attribute
eandre Oct 31, 2024
16bfbf6
Fix bucket config parsing
eandre Oct 31, 2024
4b76f00
More tracing tweaks
eandre Nov 4, 2024
cb1f66a
Tracing protocol improvements
eandre Nov 5, 2024
580d55d
v2/parser: add bucket parsing
eandre Nov 6, 2024
89d24cd
wip object storage impl
eandre Nov 6, 2024
13761a5
Work on cloud storage for Go
eandre Nov 6, 2024
8afef3f
object storage tracing
eandre Nov 7, 2024
0d687a2
More work on Go implementation
eandre Nov 12, 2024
bafd1f4
Object config stuff (#8)
ekerfelt Nov 12, 2024
4596263
Add s3 implementation
eandre Nov 12, 2024
9d39a46
Make object errors type-safe
eandre Nov 12, 2024
a30d716
s3: handle preconditions
eandre Nov 12, 2024
f803917
s3: resolve default credentials
eandre Nov 14, 2024
735e367
Fix s3 listing
eandre Nov 14, 2024
4732093
s3: fix listing
eandre Nov 14, 2024
3e11a2b
Add object storage docs
eandre Nov 14, 2024
1179cce
Parse simple config for GO runtime
ekerfelt Nov 14, 2024
d4649aa
Merge branch 'main' into simple-cfg
ekerfelt Nov 15, 2024
eeb8d4f
Improve docs
eandre Nov 14, 2024
5e3f0fe
Fix tests
eandre Nov 15, 2024
aed216e
Fix lints
eandre Nov 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
398 changes: 364 additions & 34 deletions Cargo.lock

Large diffs are not rendered by default.

23 changes: 19 additions & 4 deletions cli/cmd/encore/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,21 @@ import (
daemonpb "encr.dev/proto/encore/daemon"
)

var (
targetOS = cmdutil.Oneof{
Value: "linux",
Allowed: []string{"linux"},
Flag: "os",
Desc: "the target operating system",
}
targetArch = cmdutil.Oneof{
Value: "amd64",
Allowed: []string{"amd64", "arm64"},
Flag: "arch",
Desc: "the target architecture",
}
)

func init() {
buildCmd := &cobra.Command{
Use: "build",
Expand All @@ -23,14 +38,14 @@ func init() {

p := buildParams{
CgoEnabled: os.Getenv("CGO_ENABLED") == "1",
Goos: or(os.Getenv("GOOS"), "linux"),
Goarch: or(os.Getenv("GOARCH"), "amd64"),
}
dockerBuildCmd := &cobra.Command{
Use: "docker IMAGE_TAG",
Short: "docker builds a portable docker image of your Encore application",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
p.Goarch = targetArch.Value
p.Goos = targetOS.Value
p.AppRoot, _ = determineAppRoot()
file, err := appfile.ParseFile(filepath.Join(p.AppRoot, appfile.Name))
if err == nil {
Expand All @@ -48,13 +63,13 @@ func init() {

dockerBuildCmd.Flags().BoolVarP(&p.Push, "push", "p", false, "push image to remote repository")
dockerBuildCmd.Flags().StringVar(&p.BaseImg, "base", "scratch", "base image to build from")
dockerBuildCmd.Flags().StringVar(&p.Goos, "os", p.Goos, "target operating system. One of (linux, darwin, windows)")
dockerBuildCmd.Flags().StringVar(&p.Goarch, "arch", p.Goarch, "target architecture. One of (amd64, arm64)")
dockerBuildCmd.Flags().BoolVar(&p.CgoEnabled, "cgo", false, "enable cgo")
dockerBuildCmd.Flags().BoolVar(&p.SkipInfraConf, "skip-config", false, "do not read or generate a infra configuration file")
dockerBuildCmd.Flags().StringVar(&p.InfraConfPath, "config", "", "infra configuration file path")
p.Services = dockerBuildCmd.Flags().StringSlice("services", nil, "services to include in the image")
p.Gateways = dockerBuildCmd.Flags().StringSlice("gateways", nil, "gateways to include in the image")
targetOS.AddFlag(dockerBuildCmd)
targetArch.AddFlag(dockerBuildCmd)
rootCmd.AddCommand(buildCmd)
buildCmd.AddCommand(dockerBuildCmd)
}
Expand Down
5 changes: 5 additions & 0 deletions cli/cmd/encore/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"encr.dev/cli/daemon/engine/trace2"
"encr.dev/cli/daemon/engine/trace2/sqlite"
"encr.dev/cli/daemon/namespace"
"encr.dev/cli/daemon/objects"
"encr.dev/cli/daemon/run"
"encr.dev/cli/daemon/secret"
"encr.dev/cli/daemon/sqldb"
Expand Down Expand Up @@ -105,6 +106,7 @@ type Daemon struct {
RunMgr *run.Manager
NS *namespace.Manager
ClusterMgr *sqldb.ClusterManager
ObjectsMgr *objects.ClusterManager
Trace trace2.Store
Server *daemon.Server

Expand Down Expand Up @@ -144,6 +146,7 @@ func (d *Daemon) init(ctx context.Context) {

d.NS = namespace.NewManager(d.EncoreDB)
d.ClusterMgr = sqldb.NewClusterManager(sqldbDriver, d.Apps, d.NS)
d.ObjectsMgr = objects.NewClusterManager(d.NS)

d.Trace = sqlite.New(ctx, d.EncoreDB)
d.Secret = secret.New()
Expand All @@ -153,11 +156,13 @@ func (d *Daemon) init(ctx context.Context) {
DashBaseURL: fmt.Sprintf("http://%s", d.Dash.ClientAddr()),
Secret: d.Secret,
ClusterMgr: d.ClusterMgr,
ObjectsMgr: d.ObjectsMgr,
}

// Register namespace deletion handlers.
d.NS.RegisterDeletionHandler(d.ClusterMgr)
d.NS.RegisterDeletionHandler(d.RunMgr)
d.NS.RegisterDeletionHandler(d.ObjectsMgr)

d.Server = daemon.New(d.Apps, d.RunMgr, d.ClusterMgr, d.Secret, d.NS)
}
Expand Down
19 changes: 19 additions & 0 deletions cli/daemon/export/infra_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,25 @@ func buildAndValidateInfraConfig(params EmbeddedInfraConfigParams) (*infra.Infra
missing["Topics"] = topics
}

// Validate bucket config
buckets := fns.FlatMap(maps.Values(hostedSvcs), func(svc *meta.Service) []string {
return fns.Map(svc.Buckets, (*meta.BucketUsage).GetBucket)
})
slices.Sort(buckets)
buckets = slices.Compact(buckets)

for _, storage := range infraCfg.ObjectStorage {
for name, _ := range storage.GetBuckets() {
buckets, ok = fns.Delete(buckets, name)
if !ok {
storage.DeleteBucket(name)
}
}
}
if len(buckets) > 0 {
missing["Buckets"] = buckets
}

// Copy CORS config
cors := infra.CORS(params.GlobalCORS)
infraCfg.CORS = &cors
Expand Down
2 changes: 2 additions & 0 deletions cli/daemon/namespace/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type (
Name string
)

func (id ID) String() string { return string(id) }

func NewManager(db *sql.DB) *Manager {
return &Manager{db, nil}
}
Expand Down
44 changes: 44 additions & 0 deletions cli/daemon/objects/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package objects

import (
"context"
"os"
"path/filepath"

"encr.dev/cli/daemon/apps"
"encr.dev/cli/daemon/namespace"
)

// NewClusterManager creates a new ClusterManager.
func NewClusterManager(ns *namespace.Manager) *ClusterManager {
return &ClusterManager{
ns: ns,
}
}

type ClusterManager struct {
ns *namespace.Manager
}

func (cm *ClusterManager) BaseDir(ctx context.Context, ns *namespace.Namespace) (string, error) {
cache, err := os.UserCacheDir()
if err != nil {
return "", err
}

return filepath.Join(cache, "encore", "objects", ns.ID.String()), nil
}

// CanDeleteNamespace implements namespace.DeletionHandler.
func (cm *ClusterManager) CanDeleteNamespace(ctx context.Context, app *apps.Instance, ns *namespace.Namespace) error {
return nil
}

// DeleteNamespace implements namespace.DeletionHandler.
func (cm *ClusterManager) DeleteNamespace(ctx context.Context, app *apps.Instance, ns *namespace.Namespace) error {
baseDir, err := cm.BaseDir(ctx, ns)
if err == nil {
err = os.RemoveAll(baseDir)
}
return err
}
90 changes: 90 additions & 0 deletions cli/daemon/objects/objects.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package objects

import (
// nosemgrep

"fmt"
"net"
"net/http"

"encr.dev/pkg/emulators/storage/gcsemu"
"github.com/cockroachdb/errors"
"github.com/rs/zerolog/log"
"go4.org/syncutil"

meta "encr.dev/proto/encore/parser/meta/v1"
)

type Server struct {
cm *ClusterManager
startOnce syncutil.Once
cancel func() // set by Start
emu *gcsemu.GcsEmu
ln net.Listener
srv *http.Server
}

func NewInMemoryServer() *Server {
return &Server{
// TODO set up dir storage
emu: gcsemu.NewGcsEmu(gcsemu.Options{
Store: gcsemu.NewMemStore(),
}),
}
}

func NewDirServer(baseDir string) *Server {
return &Server{
emu: gcsemu.NewGcsEmu(gcsemu.Options{
Store: gcsemu.NewFileStore(baseDir),
}),
}
}

func (s *Server) Initialize(md *meta.Data) error {
for _, bucket := range md.Buckets {
if err := s.emu.InitBucket(bucket.Name); err != nil {
return errors.Wrap(err, "initialize object storage bucket")
}
}
return nil
}

func (s *Server) Start() error {
return s.startOnce.Do(func() error {
mux := http.NewServeMux()
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return errors.Wrap(err, "listen tcp")
}
s.emu.Register(mux)
s.ln = ln
s.srv = &http.Server{Handler: mux}

go func() {
if err := s.srv.Serve(ln); !errors.Is(err, http.ErrServerClosed) {
log.Error().Err(err).Msg("unable to listen to gcs server")
}
}()

return nil
})
}

func (s *Server) Stop() {
_ = s.srv.Close()
}

func (s *Server) Endpoint() string {
// Ensure the server has been started
if err := s.Start(); err != nil {
panic(err)
}
port := s.ln.Addr().(*net.TCPAddr).Port
return fmt.Sprintf("http://localhost:%d", port)
}

// IsUsed reports whether the application uses object storage at all.
func IsUsed(md *meta.Data) bool {
return len(md.Buckets) > 0
}
2 changes: 1 addition & 1 deletion cli/daemon/run/exec_script.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (mgr *Manager) ExecScript(ctx context.Context, p ExecScriptParams) (err err
return err
}

rm := infra.NewResourceManager(p.App, mgr.ClusterMgr, p.NS, p.Environ, mgr.DBProxyPort, false)
rm := infra.NewResourceManager(p.App, mgr.ClusterMgr, mgr.ObjectsMgr, p.NS, p.Environ, mgr.DBProxyPort, false)
defer rm.StopAll()

tracker := p.OpTracker
Expand Down
71 changes: 67 additions & 4 deletions cli/daemon/run/infra/infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"encore.dev/appruntime/exported/config"
"encr.dev/cli/daemon/apps"
"encr.dev/cli/daemon/namespace"
"encr.dev/cli/daemon/objects"
"encr.dev/cli/daemon/pubsub"
"encr.dev/cli/daemon/redis"
"encr.dev/cli/daemon/sqldb"
Expand All @@ -25,9 +26,10 @@ import (
type Type string

const (
PubSub Type = "pubsub"
Cache Type = "cache"
SQLDB Type = "sqldb"
PubSub Type = "pubsub"
Cache Type = "cache"
SQLDB Type = "sqldb"
Objects Type = "objects"
)

const (
Expand All @@ -46,6 +48,7 @@ type ResourceManager struct {
app *apps.Instance
dbProxyPort int
sqlMgr *sqldb.ClusterManager
objectsMgr *objects.ClusterManager
ns *namespace.Namespace
environ environ.Environ
log zerolog.Logger
Expand All @@ -55,11 +58,12 @@ type ResourceManager struct {
servers map[Type]Resource
}

func NewResourceManager(app *apps.Instance, sqlMgr *sqldb.ClusterManager, ns *namespace.Namespace, environ environ.Environ, dbProxyPort int, forTests bool) *ResourceManager {
func NewResourceManager(app *apps.Instance, sqlMgr *sqldb.ClusterManager, objectsMgr *objects.ClusterManager, ns *namespace.Namespace, environ environ.Environ, dbProxyPort int, forTests bool) *ResourceManager {
return &ResourceManager{
app: app,
dbProxyPort: dbProxyPort,
sqlMgr: sqlMgr,
objectsMgr: objectsMgr,
ns: ns,
environ: environ,
forTests: forTests,
Expand Down Expand Up @@ -99,6 +103,10 @@ func (rm *ResourceManager) StartRequiredServices(a *optracker.AsyncBuildJobs, md
if redis.IsUsed(md) && rm.GetRedis() == nil {
a.Go("Starting Redis server", true, 250*time.Millisecond, rm.StartRedis)
}

if objects.IsUsed(md) && rm.GetObjects() == nil {
a.Go("Starting Object Storage server", true, 250*time.Millisecond, rm.StartObjects(md))
}
}

// StartPubSub starts a PubSub daemon.
Expand Down Expand Up @@ -151,6 +159,47 @@ func (rm *ResourceManager) GetRedis() *redis.Server {
return nil
}

// StartObjects starts an Object Storage server.
func (rm *ResourceManager) StartObjects(md *meta.Data) func(context.Context) error {
return func(ctx context.Context) error {
var srv *objects.Server
if rm.forTests {
srv = objects.NewInMemoryServer()
} else {
if rm.sqlMgr == nil {
return fmt.Errorf("StartObjects: no Object Storage cluster manager provided")
}
baseDir, err := rm.objectsMgr.BaseDir(ctx, rm.ns)
if err != nil {
return err
}
srv = objects.NewDirServer(baseDir)
}

if err := srv.Initialize(md); err != nil {
return err
} else if err := srv.Start(); err != nil {
return err
}

rm.mutex.Lock()
rm.servers[Objects] = srv
rm.mutex.Unlock()
return nil
}
}

// GetObjects returns the Object Storage server if it is running otherwise it returns nil
func (rm *ResourceManager) GetObjects() *objects.Server {
rm.mutex.Lock()
defer rm.mutex.Unlock()

if srv, found := rm.servers[Objects]; found {
return srv.(*objects.Server)
}
return nil
}

func (rm *ResourceManager) StartSQLCluster(a *optracker.AsyncBuildJobs, md *meta.Data) func(ctx context.Context) error {
return func(ctx context.Context) error {
// This can be the case in tests.
Expand Down Expand Up @@ -408,3 +457,17 @@ func (rm *ResourceManager) RedisConfig(redis *meta.CacheCluster) (config.RedisSe

return srvCfg, dbCfg, nil
}

// BucketProviderConfig returns the bucket provider configuration.
func (rm *ResourceManager) BucketProviderConfig() (config.BucketProvider, error) {
obj := rm.GetObjects()
if obj == nil {
return config.BucketProvider{}, errors.New("no object storage found")
}

return config.BucketProvider{
GCS: &config.GCSBucketProvider{
Endpoint: obj.Endpoint(),
},
}, nil
}
Loading
Loading