diff --git a/.github/workflows/test-and-upload-coverage.yml b/.github/workflows/test-and-upload-coverage.yml index d613f32f98..be85f5464a 100644 --- a/.github/workflows/test-and-upload-coverage.yml +++ b/.github/workflows/test-and-upload-coverage.yml @@ -34,6 +34,7 @@ jobs: client-type: [go, http, cli] database-type: [badger-file, badger-memory] mutation-type: [gql, collection-named, collection-save] + lens-type: [wasm-time] detect-changes: [false] database-encryption: [false] include: @@ -63,6 +64,20 @@ jobs: ## mutation-type: collection-save ## detect-changes: false ## database-encryption: false + - os: ubuntu-latest + client-type: go + database-type: badger-memory + mutation-type: collection-save + lens-type: wazero + detect-changes: false + database-encryption: false + - os: ubuntu-latest + client-type: go + database-type: badger-memory + mutation-type: collection-save + lens-type: wasmer + detect-changes: false + database-encryption: false runs-on: ${{ matrix.os }} @@ -80,6 +95,7 @@ jobs: DEFRA_BADGER_FILE: ${{ matrix.database-type == 'badger-file' }} DEFRA_BADGER_ENCRYPTION: ${{ matrix.database-encryption }} DEFRA_MUTATION_TYPE: ${{ matrix.mutation-type }} + DEFRA_LENS_TYPE: ${{ matrix.lens-type }} steps: - name: Checkout code into the directory diff --git a/cli/server_dump.go b/cli/server_dump.go index 1d3c68e54a..9008c81730 100644 --- a/cli/server_dump.go +++ b/cli/server_dump.go @@ -37,7 +37,7 @@ func MakeServerDumpCmd() *cobra.Command { if err != nil { return err } - db, err := db.NewDB(cmd.Context(), rootstore, acp.NoACP) + db, err := db.NewDB(cmd.Context(), rootstore, acp.NoACP, nil) if err != nil { return errors.Wrap("failed to initialize database", err) } diff --git a/cli/start.go b/cli/start.go index 9b863a1f07..9505fd7fff 100644 --- a/cli/start.go +++ b/cli/start.go @@ -126,6 +126,7 @@ func MakeStartCommand() *cobra.Command { http.WithAllowedOrigins(cfg.GetStringSlice("api.allowed-origins")...), http.WithTLSCertPath(cfg.GetString("api.pubKeyPath")), http.WithTLSKeyPath(cfg.GetString("api.privKeyPath")), + node.WithLensRuntime(node.LensRuntimeType(cfg.GetString("lens.runtime"))), } if cfg.GetString("datastore.store") != configStoreMemory { diff --git a/client/lens.go b/client/lens.go index 3f5befc604..997ddb4831 100644 --- a/client/lens.go +++ b/client/lens.go @@ -15,6 +15,8 @@ import ( "github.com/lens-vm/lens/host-go/config/model" "github.com/sourcenetwork/immutable/enumerable" + + "github.com/sourcenetwork/defradb/datastore" ) // LensConfig represents the configuration of a Lens migration in Defra. @@ -38,9 +40,18 @@ type LensConfig struct { model.Lens } +// TxnSource represents an object capable of constructing the transactions that +// implicit-transaction registries need internally. +type TxnSource interface { + NewTxn(context.Context, bool) (datastore.Txn, error) +} + // LensRegistry exposes several useful thread-safe migration related functions which may // be used to manage migrations. type LensRegistry interface { + // Init initializes the registry with the provided transaction source. + Init(TxnSource) + // SetMigration caches the migration for the given collection ID. It does not persist the migration in long // term storage, for that one should call [Store.SetMigration(ctx, cfg)]. // diff --git a/go.mod b/go.mod index 5e47eeadf2..f9bac2daa8 100644 --- a/go.mod +++ b/go.mod @@ -282,8 +282,10 @@ require ( github.com/tendermint/go-amino v0.16.0 // indirect github.com/tendermint/tm-db v0.6.7 // indirect github.com/teserakt-io/golang-ed25519 v0.0.0-20210104091850-3888c087a4c8 // indirect + github.com/tetratelabs/wazero v1.5.0 // indirect github.com/textileio/go-log/v2 v2.1.3-gke-2 // indirect github.com/ugorji/go/codec v1.2.12 // indirect + github.com/wasmerio/wasmer-go v1.0.4 // indirect github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect diff --git a/go.sum b/go.sum index 64f0b7b56b..3203ff8b9c 100644 --- a/go.sum +++ b/go.sum @@ -1139,6 +1139,8 @@ github.com/tendermint/tm-db v0.6.7 h1:fE00Cbl0jayAoqlExN6oyQJ7fR/ZtoVOmvPJ//+shu github.com/tendermint/tm-db v0.6.7/go.mod h1:byQDzFkZV1syXr/ReXS808NxA2xvyuuVgXOJ/088L6I= github.com/teserakt-io/golang-ed25519 v0.0.0-20210104091850-3888c087a4c8 h1:RBkacARv7qY5laaXGlF4wFB/tk5rnthhPb8oIBGoagY= github.com/teserakt-io/golang-ed25519 v0.0.0-20210104091850-3888c087a4c8/go.mod h1:9PdLyPiZIiW3UopXyRnPYyjUXSpiQNHRLu8fOsR3o8M= +github.com/tetratelabs/wazero v1.5.0 h1:Yz3fZHivfDiZFUXnWMPUoiW7s8tC1sjdBtlJn08qYa0= +github.com/tetratelabs/wazero v1.5.0/go.mod h1:0U0G41+ochRKoPKCJlh0jMg1CHkyfK8kDqiirMmKY8A= github.com/textileio/go-datastore-extensions v1.0.1 h1:qIJGqJaigQ1wD4TdwS/hf73u0HChhXvvUSJuxBEKS+c= github.com/textileio/go-datastore-extensions v1.0.1/go.mod h1:Pzj9FDRkb55910dr/FX8M7WywvnS26gBgEDez1ZBuLE= github.com/textileio/go-ds-badger3 v0.1.0 h1:q0kBuBmAcRUR3ClMSYlyw0224XeuzjjGinU53Qz1uXI= @@ -1168,6 +1170,8 @@ github.com/warpfork/go-testmark v0.12.1 h1:rMgCpJfwy1sJ50x0M0NgyphxYYPMOODIJHhsX github.com/warpfork/go-testmark v0.12.1/go.mod h1:kHwy7wfvGSPh1rQJYKayD4AbtNaeyZdcGi9tNJTaa5Y= github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSDJfjId/PEGEShv6ugrt4kYsC5UIDaQ= github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= +github.com/wasmerio/wasmer-go v1.0.4 h1:MnqHoOGfiQ8MMq2RF6wyCeebKOe84G88h5yv+vmxJgs= +github.com/wasmerio/wasmer-go v1.0.4/go.mod h1:0gzVdSfg6pysA6QVp6iVRPTagC6Wq9pOE8J86WKb2Fk= github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc h1:BCPnHtcboadS0DvysUuJXZ4lWVv5Bh5i7+tbIyi+ck4= github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc/go.mod h1:r45hJU7yEoA81k6MWNhpMj/kms0n14dkzkxYHoB96UM= github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f h1:jQa4QT2UP9WYv2nzyawpKMOCl+Z/jW7djv2/J50lj9E= diff --git a/http/client_lens.go b/http/client_lens.go index 34945a41d6..249eb99984 100644 --- a/http/client_lens.go +++ b/http/client_lens.go @@ -35,6 +35,8 @@ type setMigrationRequest struct { Config model.Lens } +func (w *LensRegistry) Init(txnSource client.TxnSource) {} + func (c *LensRegistry) SetMigration(ctx context.Context, collectionID uint32, config model.Lens) error { methodURL := c.http.baseURL.JoinPath("lens", "registry") diff --git a/http/handler_ccip_test.go b/http/handler_ccip_test.go index ab8381565a..e17d8a882a 100644 --- a/http/handler_ccip_test.go +++ b/http/handler_ccip_test.go @@ -193,7 +193,7 @@ func TestCCIPPost_WithInvalidBody(t *testing.T) { func setupDatabase(t *testing.T) client.DB { ctx := context.Background() - cdb, err := db.NewDB(ctx, memory.NewDatastore(ctx), acp.NoACP, db.WithUpdateEvents()) + cdb, err := db.NewDB(ctx, memory.NewDatastore(ctx), acp.NoACP, nil, db.WithUpdateEvents()) require.NoError(t, err) _, err = cdb.AddSchema(ctx, `type User { diff --git a/internal/db/config.go b/internal/db/config.go index a655647df7..1364cab09b 100644 --- a/internal/db/config.go +++ b/internal/db/config.go @@ -11,7 +11,6 @@ package db import ( - "github.com/lens-vm/lens/host-go/engine/module" "github.com/sourcenetwork/immutable" "github.com/sourcenetwork/defradb/events" @@ -40,19 +39,3 @@ func WithMaxRetries(num int) Option { db.maxTxnRetries = immutable.Some(num) } } - -// WithLensPoolSize sets the maximum number of cached migrations instances to preserve per schema version. -// -// Will default to `5` if not set. -func WithLensPoolSize(size int) Option { - return func(db *db) { - db.lensPoolSize = immutable.Some(size) - } -} - -// WithLensRuntime returns an option that sets the lens registry runtime. -func WithLensRuntime(runtime module.Runtime) Option { - return func(db *db) { - db.lensRuntime = immutable.Some(runtime) - } -} diff --git a/internal/db/config_test.go b/internal/db/config_test.go index d4dbadaec6..f80e538b4f 100644 --- a/internal/db/config_test.go +++ b/internal/db/config_test.go @@ -13,7 +13,6 @@ package db import ( "testing" - "github.com/lens-vm/lens/host-go/runtimes/wasmtime" "github.com/stretchr/testify/assert" ) @@ -29,15 +28,3 @@ func TestWithMaxRetries(t *testing.T) { assert.True(t, d.maxTxnRetries.HasValue()) assert.Equal(t, 10, d.maxTxnRetries.Value()) } - -func TestWithLensPoolSize(t *testing.T) { - d := &db{} - WithLensPoolSize(10)(d) - assert.Equal(t, 10, d.lensPoolSize.Value()) -} - -func TestWithLensRuntime(t *testing.T) { - d := &db{} - WithLensRuntime(wasmtime.New())(d) - assert.NotNil(t, d.lensRuntime.Value()) -} diff --git a/internal/db/db.go b/internal/db/db.go index 4379928c82..979626034c 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -21,7 +21,6 @@ import ( ds "github.com/ipfs/go-datastore" dsq "github.com/ipfs/go-datastore/query" - "github.com/lens-vm/lens/host-go/engine/module" "github.com/sourcenetwork/corelog" "github.com/sourcenetwork/immutable" @@ -32,7 +31,6 @@ import ( "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/events" "github.com/sourcenetwork/defradb/internal/core" - "github.com/sourcenetwork/defradb/internal/lens" "github.com/sourcenetwork/defradb/internal/request/graphql" ) @@ -57,10 +55,6 @@ type db struct { parser core.Parser - // The maximum number of cached migrations instances to preserve per schema version. - lensPoolSize immutable.Option[int] - lensRuntime immutable.Option[module.Runtime] - lensRegistry client.LensRegistry // The maximum number of retries per transaction. @@ -81,15 +75,17 @@ func NewDB( ctx context.Context, rootstore datastore.RootStore, acp immutable.Option[acp.ACP], + lens client.LensRegistry, options ...Option, ) (client.DB, error) { - return newDB(ctx, rootstore, acp, options...) + return newDB(ctx, rootstore, acp, lens, options...) } func newDB( ctx context.Context, rootstore datastore.RootStore, acp immutable.Option[acp.ACP], + lens client.LensRegistry, options ...Option, ) (*db, error) { multistore := datastore.MultiStoreFrom(rootstore) @@ -100,11 +96,12 @@ func newDB( } db := &db{ - rootstore: rootstore, - multistore: multistore, - acp: acp, - parser: parser, - options: options, + rootstore: rootstore, + multistore: multistore, + acp: acp, + lensRegistry: lens, + parser: parser, + options: options, } // apply options @@ -112,9 +109,9 @@ func newDB( opt(db) } - // lens options may be set by `WithLens` funcs, and because they are funcs on db - // we have to mutate `db` here to set the registry. - db.lensRegistry = lens.NewRegistry(db, db.lensPoolSize, db.lensRuntime) + if lens != nil { + lens.Init(db) + } err = db.initialize(ctx) if err != nil { diff --git a/internal/db/db_test.go b/internal/db/db_test.go index fe60449cc2..6f5a03e809 100644 --- a/internal/db/db_test.go +++ b/internal/db/db_test.go @@ -26,7 +26,7 @@ func newMemoryDB(ctx context.Context) (*db, error) { if err != nil { return nil, err } - return newDB(ctx, rootstore, acp.NoACP) + return newDB(ctx, rootstore, acp.NoACP, nil) } func TestNewDB(t *testing.T) { @@ -38,7 +38,7 @@ func TestNewDB(t *testing.T) { return } - _, err = NewDB(ctx, rootstore, acp.NoACP) + _, err = NewDB(ctx, rootstore, acp.NoACP, nil) if err != nil { t.Error(err) } diff --git a/internal/lens/registry.go b/internal/lens/registry.go index 1d9c51ab46..c0fc87a14f 100644 --- a/internal/lens/registry.go +++ b/internal/lens/registry.go @@ -17,8 +17,6 @@ import ( "github.com/lens-vm/lens/host-go/config" "github.com/lens-vm/lens/host-go/config/model" "github.com/lens-vm/lens/host-go/engine/module" - "github.com/lens-vm/lens/host-go/runtimes/wasmtime" - "github.com/sourcenetwork/immutable" "github.com/sourcenetwork/immutable/enumerable" "github.com/sourcenetwork/defradb/client" @@ -72,12 +70,6 @@ func newTxnCtx(txn datastore.Txn) *txnContext { } } -// TxnSource represents an object capable of constructing the transactions that -// implicit-transaction registries need internally. -type TxnSource interface { - NewTxn(context.Context, bool) (datastore.Txn, error) -} - // DefaultPoolSize is the default size of the lens pool for each schema version. const DefaultPoolSize int = 5 @@ -85,28 +77,19 @@ const DefaultPoolSize int = 5 // // It will be of size 5 (per schema version) if a size is not provided. func NewRegistry( - db TxnSource, - poolSize immutable.Option[int], - runtime immutable.Option[module.Runtime], + poolSize int, + runtime module.Runtime, ) client.LensRegistry { registry := &lensRegistry{ - poolSize: DefaultPoolSize, - runtime: wasmtime.New(), + poolSize: poolSize, + runtime: runtime, modulesByPath: map[string]module.Module{}, lensPoolsByCollectionID: map[uint32]*lensPool{}, reversedPoolsByCollectionID: map[uint32]*lensPool{}, txnCtxs: map[uint64]*txnContext{}, } - if poolSize.HasValue() { - registry.poolSize = poolSize.Value() - } - if runtime.HasValue() { - registry.runtime = runtime.Value() - } - return &implicitTxnLensRegistry{ - db: db, registry: registry, } } diff --git a/internal/lens/txn_registry.go b/internal/lens/txn_registry.go index 8093dedbdd..65ad12cf2b 100644 --- a/internal/lens/txn_registry.go +++ b/internal/lens/txn_registry.go @@ -22,7 +22,7 @@ import ( type implicitTxnLensRegistry struct { registry *lensRegistry - db TxnSource + db client.TxnSource } type explicitTxnLensRegistry struct { @@ -33,13 +33,12 @@ type explicitTxnLensRegistry struct { var _ client.LensRegistry = (*implicitTxnLensRegistry)(nil) var _ client.LensRegistry = (*explicitTxnLensRegistry)(nil) -func (r *implicitTxnLensRegistry) WithTxn(txn datastore.Txn) client.LensRegistry { - return &explicitTxnLensRegistry{ - registry: r.registry, - txn: txn, - } +func (r *implicitTxnLensRegistry) Init(txnSource client.TxnSource) { + r.db = txnSource } +func (r *explicitTxnLensRegistry) Init(txnSource client.TxnSource) {} + func (r *explicitTxnLensRegistry) WithTxn(txn datastore.Txn) client.LensRegistry { return &explicitTxnLensRegistry{ registry: r.registry, diff --git a/net/node_test.go b/net/node_test.go index 5e0b30570f..55b0573474 100644 --- a/net/node_test.go +++ b/net/node_test.go @@ -36,7 +36,7 @@ func FixtureNewMemoryDBWithBroadcaster(t *testing.T) client.DB { opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)} rootstore, err := badgerds.NewDatastore("", &opts) require.NoError(t, err) - database, err = db.NewDB(ctx, rootstore, acp.NoACP, db.WithUpdateEvents()) + database, err = db.NewDB(ctx, rootstore, acp.NoACP, nil, db.WithUpdateEvents()) require.NoError(t, err) return database } @@ -44,7 +44,7 @@ func FixtureNewMemoryDBWithBroadcaster(t *testing.T) client.DB { func TestNewNode_WithEnableRelay_NoError(t *testing.T) { ctx := context.Background() store := memory.NewDatastore(ctx) - db, err := db.NewDB(ctx, store, acp.NoACP, db.WithUpdateEvents()) + db, err := db.NewDB(ctx, store, acp.NoACP, nil, db.WithUpdateEvents()) require.NoError(t, err) n, err := NewNode( context.Background(), @@ -59,7 +59,7 @@ func TestNewNode_WithDBClosed_NoError(t *testing.T) { ctx := context.Background() store := memory.NewDatastore(ctx) - db, err := db.NewDB(ctx, store, acp.NoACP, db.WithUpdateEvents()) + db, err := db.NewDB(ctx, store, acp.NoACP, nil, db.WithUpdateEvents()) require.NoError(t, err) db.Close() @@ -73,7 +73,7 @@ func TestNewNode_WithDBClosed_NoError(t *testing.T) { func TestNewNode_NoPubSub_NoError(t *testing.T) { ctx := context.Background() store := memory.NewDatastore(ctx) - db, err := db.NewDB(ctx, store, acp.NoACP, db.WithUpdateEvents()) + db, err := db.NewDB(ctx, store, acp.NoACP, nil, db.WithUpdateEvents()) require.NoError(t, err) n, err := NewNode( context.Background(), @@ -88,7 +88,7 @@ func TestNewNode_NoPubSub_NoError(t *testing.T) { func TestNewNode_WithEnablePubSub_NoError(t *testing.T) { ctx := context.Background() store := memory.NewDatastore(ctx) - db, err := db.NewDB(ctx, store, acp.NoACP, db.WithUpdateEvents()) + db, err := db.NewDB(ctx, store, acp.NoACP, nil, db.WithUpdateEvents()) require.NoError(t, err) n, err := NewNode( @@ -106,7 +106,7 @@ func TestNewNode_WithEnablePubSub_NoError(t *testing.T) { func TestNodeClose_NoError(t *testing.T) { ctx := context.Background() store := memory.NewDatastore(ctx) - db, err := db.NewDB(ctx, store, acp.NoACP, db.WithUpdateEvents()) + db, err := db.NewDB(ctx, store, acp.NoACP, nil, db.WithUpdateEvents()) require.NoError(t, err) n, err := NewNode( context.Background(), @@ -119,7 +119,7 @@ func TestNodeClose_NoError(t *testing.T) { func TestNewNode_BootstrapWithNoPeer_NoError(t *testing.T) { ctx := context.Background() store := memory.NewDatastore(ctx) - db, err := db.NewDB(ctx, store, acp.NoACP, db.WithUpdateEvents()) + db, err := db.NewDB(ctx, store, acp.NoACP, nil, db.WithUpdateEvents()) require.NoError(t, err) n1, err := NewNode( @@ -135,7 +135,7 @@ func TestNewNode_BootstrapWithNoPeer_NoError(t *testing.T) { func TestNewNode_BootstrapWithOnePeer_NoError(t *testing.T) { ctx := context.Background() store := memory.NewDatastore(ctx) - db, err := db.NewDB(ctx, store, acp.NoACP, db.WithUpdateEvents()) + db, err := db.NewDB(ctx, store, acp.NoACP, nil, db.WithUpdateEvents()) require.NoError(t, err) n1, err := NewNode( @@ -162,7 +162,7 @@ func TestNewNode_BootstrapWithOnePeer_NoError(t *testing.T) { func TestNewNode_BootstrapWithOneValidPeerAndManyInvalidPeers_NoError(t *testing.T) { ctx := context.Background() store := memory.NewDatastore(ctx) - db, err := db.NewDB(ctx, store, acp.NoACP, db.WithUpdateEvents()) + db, err := db.NewDB(ctx, store, acp.NoACP, nil, db.WithUpdateEvents()) require.NoError(t, err) n1, err := NewNode( @@ -192,7 +192,7 @@ func TestNewNode_BootstrapWithOneValidPeerAndManyInvalidPeers_NoError(t *testing func TestListenAddrs_WithListenAddresses_NoError(t *testing.T) { ctx := context.Background() store := memory.NewDatastore(ctx) - db, err := db.NewDB(ctx, store, acp.NoACP, db.WithUpdateEvents()) + db, err := db.NewDB(ctx, store, acp.NoACP, nil, db.WithUpdateEvents()) require.NoError(t, err) n, err := NewNode( context.Background(), diff --git a/net/peer_test.go b/net/peer_test.go index e708ff0708..dca864a1e3 100644 --- a/net/peer_test.go +++ b/net/peer_test.go @@ -75,7 +75,7 @@ func newTestNode(ctx context.Context, t *testing.T) (client.DB, *Node) { store := memory.NewDatastore(ctx) acpLocal := acp.NewLocalACP() acpLocal.Init(context.Background(), "") - db, err := db.NewDB(ctx, store, immutable.Some[acp.ACP](acpLocal), db.WithUpdateEvents()) + db, err := db.NewDB(ctx, store, immutable.Some[acp.ACP](acpLocal), nil, db.WithUpdateEvents()) require.NoError(t, err) n, err := NewNode( @@ -91,7 +91,7 @@ func newTestNode(ctx context.Context, t *testing.T) (client.DB, *Node) { func TestNewPeer_NoError(t *testing.T) { ctx := context.Background() store := memory.NewDatastore(ctx) - db, err := db.NewDB(ctx, store, acp.NoACP, db.WithUpdateEvents()) + db, err := db.NewDB(ctx, store, acp.NoACP, nil, db.WithUpdateEvents()) require.NoError(t, err) h, err := libp2p.New() @@ -114,7 +114,7 @@ func TestNewPeer_NoDB_NilDBError(t *testing.T) { func TestNewPeer_WithExistingTopic_TopicAlreadyExistsError(t *testing.T) { ctx := context.Background() store := memory.NewDatastore(ctx) - db, err := db.NewDB(ctx, store, acp.NoACP, db.WithUpdateEvents()) + db, err := db.NewDB(ctx, store, acp.NoACP, nil, db.WithUpdateEvents()) require.NoError(t, err) _, err = db.AddSchema(ctx, `type User { @@ -164,11 +164,11 @@ func TestStartAndClose_NoError(t *testing.T) { func TestStart_WithKnownPeer_NoError(t *testing.T) { ctx := context.Background() store := memory.NewDatastore(ctx) - db1, err := db.NewDB(ctx, store, acp.NoACP, db.WithUpdateEvents()) + db1, err := db.NewDB(ctx, store, acp.NoACP, nil, db.WithUpdateEvents()) require.NoError(t, err) store2 := memory.NewDatastore(ctx) - db2, err := db.NewDB(ctx, store2, acp.NoACP, db.WithUpdateEvents()) + db2, err := db.NewDB(ctx, store2, acp.NoACP, nil, db.WithUpdateEvents()) require.NoError(t, err) n1, err := NewNode( @@ -200,11 +200,11 @@ func TestStart_WithKnownPeer_NoError(t *testing.T) { func TestStart_WithOfflineKnownPeer_NoError(t *testing.T) { ctx := context.Background() store := memory.NewDatastore(ctx) - db1, err := db.NewDB(ctx, store, acp.NoACP, db.WithUpdateEvents()) + db1, err := db.NewDB(ctx, store, acp.NoACP, nil, db.WithUpdateEvents()) require.NoError(t, err) store2 := memory.NewDatastore(ctx) - db2, err := db.NewDB(ctx, store2, acp.NoACP, db.WithUpdateEvents()) + db2, err := db.NewDB(ctx, store2, acp.NoACP, nil, db.WithUpdateEvents()) require.NoError(t, err) n1, err := NewNode( @@ -240,7 +240,7 @@ func TestStart_WithOfflineKnownPeer_NoError(t *testing.T) { func TestStart_WithNoUpdateChannel_NilUpdateChannelError(t *testing.T) { ctx := context.Background() store := memory.NewDatastore(ctx) - db, err := db.NewDB(ctx, store, acp.NoACP) + db, err := db.NewDB(ctx, store, acp.NoACP, nil) require.NoError(t, err) n, err := NewNode( @@ -259,7 +259,7 @@ func TestStart_WithNoUpdateChannel_NilUpdateChannelError(t *testing.T) { func TestStart_WitClosedUpdateChannel_ClosedChannelError(t *testing.T) { ctx := context.Background() store := memory.NewDatastore(ctx) - db, err := db.NewDB(ctx, store, acp.NoACP, db.WithUpdateEvents()) + db, err := db.NewDB(ctx, store, acp.NoACP, nil, db.WithUpdateEvents()) require.NoError(t, err) n, err := NewNode( diff --git a/node/errors.go b/node/errors.go new file mode 100644 index 0000000000..d19b53359b --- /dev/null +++ b/node/errors.go @@ -0,0 +1,25 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package node + +import ( + "github.com/sourcenetwork/defradb/errors" +) + +const ( + errLensRuntimeNotSupported string = "the selected lens runtime is not supported by this build" +) + +var ErrLensRuntimeNotSupported = errors.New(errLensRuntimeNotSupported) + +func NewErrLensRuntimeNotSupported(lens LensRuntimeType) error { + return errors.New(errLensRuntimeNotSupported, errors.NewKV("Lens", lens)) +} diff --git a/node/lens.go b/node/lens.go new file mode 100644 index 0000000000..7fb2d89fbe --- /dev/null +++ b/node/lens.go @@ -0,0 +1,83 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package node + +import ( + "context" + + "github.com/lens-vm/lens/host-go/engine/module" + + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/internal/lens" +) + +type LensRuntimeType string + +const ( + DefaultLens LensRuntimeType = "" +) + +var runtimeConstructors = map[LensRuntimeType]func() module.Runtime{} + +// LensOptions contains Lens configuration values. +type LensOptions struct { + lensRuntime LensRuntimeType + + // The maximum number of cached migrations instances to preserve per schema version. + lensPoolSize int +} + +// DefaultACPOptions returns new options with default values. +func DefaultLensOptions() *LensOptions { + return &LensOptions{ + lensPoolSize: lens.DefaultPoolSize, + } +} + +type LenOpt func(*LensOptions) + +// WithLensRuntime returns an option that sets the lens registry runtime. +func WithLensRuntime(runtime LensRuntimeType) Option { + return func(o *LensOptions) { + o.lensRuntime = runtime + } +} + +// WithLensPoolSize sets the maximum number of cached migrations instances to preserve per schema version. +// +// Will default to `5` if not set. +func WithLensPoolSize(size int) Option { + return func(o *LensOptions) { + o.lensPoolSize = size + } +} + +func NewLens( + ctx context.Context, + opts ...LenOpt, +) (client.LensRegistry, error) { + options := DefaultLensOptions() + for _, opt := range opts { + opt(options) + } + + var runtime module.Runtime + if runtimeConstructor, ok := runtimeConstructors[options.lensRuntime]; ok { + runtime = runtimeConstructor() + } else { + return nil, NewErrLensRuntimeNotSupported(options.lensRuntime) + } + + return lens.NewRegistry( + options.lensPoolSize, + runtime, + ), nil +} diff --git a/node/lens_wasmer.go b/node/lens_wasmer.go new file mode 100644 index 0000000000..fd99357ab7 --- /dev/null +++ b/node/lens_wasmer.go @@ -0,0 +1,24 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +//go:build !windows && !js + +package node + +import ( + "github.com/lens-vm/lens/host-go/engine/module" + "github.com/lens-vm/lens/host-go/runtimes/wasmer" +) + +const Wasmer LensRuntimeType = "wasmer" + +func init() { + runtimeConstructors[Wasmer] = func() module.Runtime { return wasmer.New() } +} diff --git a/node/lens_wasmtime.go b/node/lens_wasmtime.go new file mode 100644 index 0000000000..9f0070f3bf --- /dev/null +++ b/node/lens_wasmtime.go @@ -0,0 +1,25 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +//go:build !js + +package node + +import ( + "github.com/lens-vm/lens/host-go/engine/module" + "github.com/lens-vm/lens/host-go/runtimes/wasmtime" +) + +const WasmTime LensRuntimeType = "wasm-time" + +func init() { + runtimeConstructors[DefaultLens] = func() module.Runtime { return wasmtime.New() } + runtimeConstructors[WasmTime] = func() module.Runtime { return wasmtime.New() } +} diff --git a/node/lens_wazero.go b/node/lens_wazero.go new file mode 100644 index 0000000000..40d3f1b056 --- /dev/null +++ b/node/lens_wazero.go @@ -0,0 +1,24 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +//go:build !js + +package node + +import ( + "github.com/lens-vm/lens/host-go/engine/module" + "github.com/lens-vm/lens/host-go/runtimes/wazero" +) + +const Wazero LensRuntimeType = "wazero" + +func init() { + runtimeConstructors[Wazero] = func() module.Runtime { return wazero.New() } +} diff --git a/node/node.go b/node/node.go index bb3163834c..215cf05fc7 100644 --- a/node/node.go +++ b/node/node.go @@ -89,6 +89,7 @@ func NewNode(ctx context.Context, opts ...Option) (*Node, error) { netOpts []net.NodeOpt storeOpts []StoreOpt serverOpts []http.ServerOpt + lensOpts []LenOpt ) options := DefaultOptions() @@ -111,6 +112,9 @@ func NewNode(ctx context.Context, opts ...Option) (*Node, error) { case net.NodeOpt: netOpts = append(netOpts, t) + + case LenOpt: + lensOpts = append(lensOpts, t) } } @@ -124,7 +128,12 @@ func NewNode(ctx context.Context, opts ...Option) (*Node, error) { return nil, err } - db, err := db.NewDB(ctx, rootstore, acp, dbOpts...) + lens, err := NewLens(ctx, lensOpts...) + if err != nil { + return nil, err + } + + db, err := db.NewDB(ctx, rootstore, acp, lens, dbOpts...) if err != nil { return nil, err } diff --git a/tests/clients/cli/wrapper_lens.go b/tests/clients/cli/wrapper_lens.go index a9f3e20bd1..3aac1ae392 100644 --- a/tests/clients/cli/wrapper_lens.go +++ b/tests/clients/cli/wrapper_lens.go @@ -28,6 +28,8 @@ type LensRegistry struct { cmd *cliWrapper } +func (w *LensRegistry) Init(txnSource client.TxnSource) {} + func (w *LensRegistry) SetMigration(ctx context.Context, collectionID uint32, config model.Lens) error { args := []string{"client", "schema", "migration", "set-registry"} diff --git a/tests/gen/cli/util_test.go b/tests/gen/cli/util_test.go index dbfef99524..6f5535e6c6 100644 --- a/tests/gen/cli/util_test.go +++ b/tests/gen/cli/util_test.go @@ -50,7 +50,7 @@ func start(ctx context.Context) (*defraInstance, error) { return nil, errors.Wrap("failed to open datastore", err) } - db, err := db.NewDB(ctx, rootstore, acp.NoACP) + db, err := db.NewDB(ctx, rootstore, acp.NoACP, nil) if err != nil { return nil, errors.Wrap("failed to create a database", err) } diff --git a/tests/integration/db.go b/tests/integration/db.go index db2217a04d..c473e4cdd0 100644 --- a/tests/integration/db.go +++ b/tests/integration/db.go @@ -105,13 +105,14 @@ func NewBadgerFileDB(ctx context.Context, t testing.TB) (client.DB, error) { func setupDatabase(s *state) (client.DB, string, error) { opts := []node.Option{ db.WithUpdateEvents(), - db.WithLensPoolSize(lensPoolSize), + node.WithLensPoolSize(lensPoolSize), // The test framework sets this up elsewhere when required so that it may be wrapped // into a [client.DB]. node.WithDisableAPI(true), // The p2p is configured in the tests by [ConfigureNode] actions, we disable it here // to keep the tests as lightweight as possible. node.WithDisableP2P(true), + node.WithLensRuntime(lensType), } if badgerEncryption && encryptionKey == nil { diff --git a/tests/integration/lens.go b/tests/integration/lens.go index c99cc3f5b7..61ece97d73 100644 --- a/tests/integration/lens.go +++ b/tests/integration/lens.go @@ -11,12 +11,27 @@ package tests import ( + "os" + "github.com/sourcenetwork/immutable" "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/internal/db" + "github.com/sourcenetwork/defradb/node" +) + +const ( + lensTypeEnvName = "DEFRA_LENS_TYPE" ) +var ( + lensType node.LensRuntimeType +) + +func init() { + lensType = node.LensRuntimeType(os.Getenv(lensTypeEnvName)) +} + // ConfigureMigration is a test action which will configure a Lens migration using the // provided configuration. type ConfigureMigration struct {