Skip to content

Commit

Permalink
broker: replace --disable-stores with --file-only
Browse files Browse the repository at this point in the history
Remove --disable-stores behavior, which causes fragments to never persist
to their configured remote store. This flag has historically been used
in testing contexts.

Replace it with --file-only, which achieves a similar testing outcome by
re-interpreting a s3://bucket/foobar store to instead use file:///foobar

This preserves the desired behavior of keeping changes local and not
writing to remote fragment stores, while allowing for test brokers
to be restarted without data loss, as well as allowing for testing
of features that require persistence (like journal suspension).
  • Loading branch information
jgraettinger committed Jan 10, 2025
1 parent 557ca28 commit 0b9a9d8
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 12 deletions.
4 changes: 0 additions & 4 deletions broker/fragment/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,6 @@ func (fi *Index) Inspect(ctx context.Context, callback func(CoverSet) error) err
func WalkAllStores(ctx context.Context, name pb.Journal, stores []pb.FragmentStore) (CoverSet, error) {
var set CoverSet

if DisableStores {
return set, nil
}

for _, store := range stores {
var err = List(ctx, store, name, func(f pb.Fragment) {
set, _ = set.Add(Fragment{Fragment: f})
Expand Down
14 changes: 8 additions & 6 deletions broker/fragment/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import (
pb "go.gazette.dev/core/broker/protocol"
)

// DisableStores disables the use of configured journal stores.
// If true, fragments are not persisted, and stores are not listed for existing fragments.
var DisableStores bool = false
// ForceFileStore reinterprets journal stores to use the file:// scheme, ignoring
// a configured cloud scheme and bucket such as s3:// or gcs://
var ForceFileStore bool = false

type backend interface {
Provider() string
Expand Down Expand Up @@ -48,6 +48,10 @@ var sharedStores = struct {
}

func getBackend(scheme string) backend {
if ForceFileStore {
return sharedStores.fs
}

switch scheme {
case "s3":
return sharedStores.s3
Expand Down Expand Up @@ -89,9 +93,7 @@ func Open(ctx context.Context, fragment pb.Fragment) (io.ReadCloser, error) {
// present, this is a no-op. If the Spool has not been compressed incrementally,
// it will be compressed before being persisted.
func Persist(ctx context.Context, spool Spool, spec *pb.JournalSpec) error {
if DisableStores {
return nil // No-op.
} else if len(spec.Fragment.Stores) == 0 {
if len(spec.Fragment.Stores) == 0 {
return nil // No-op.
}
spool.BackingStore = spec.Fragment.Stores[0]
Expand Down
7 changes: 5 additions & 2 deletions cmd/gazette/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"crypto/tls"
"fmt"
"net/http"
"os"
"os/signal"
Expand Down Expand Up @@ -32,10 +33,10 @@ var Config = new(struct {
mbp.ServiceConfig
Limit uint32 `long:"limit" env:"LIMIT" default:"1024" description:"Maximum number of Journals the broker will allocate"`
FileRoot string `long:"file-root" env:"FILE_ROOT" description:"Local path which roots file:// fragment stores (optional)"`
FileOnly bool `long:"file-only" env:"FILE_ONLY" description:"Use the local file:// store for all journal fragments, ignoring cloud bucket storage configuration (for example, S3)"`
MaxAppendRate uint32 `long:"max-append-rate" env:"MAX_APPEND_RATE" default:"0" description:"Max rate (in bytes-per-sec) that any one journal may be appended to. If zero, there is no max rate"`
MaxReplication uint32 `long:"max-replication" env:"MAX_REPLICATION" default:"9" description:"Maximum effective replication of any one journal, which upper-bounds its stated replication."`
MinAppendRate uint32 `long:"min-append-rate" env:"MIN_APPEND_RATE" default:"65536" description:"Min rate (in bytes-per-sec) at which a client may stream Append RPC content. RPCs unable to sustain this rate are aborted"`
DisableStores bool `long:"disable-stores" env:"DISABLE_STORES" description:"Disable use of any configured journal fragment stores. The broker will neither list or persist remote fragments, and all data is discarded on broker exit."`
WatchDelay time.Duration `long:"watch-delay" env:"WATCH_DELAY" default:"30ms" description:"Delay applied to the application of watched Etcd events. Larger values amortize the processing of fast-changing Etcd keys."`
AuthKeys string `long:"auth-keys" env:"AUTH_KEYS" description:"Whitespace or comma separated, base64-encoded keys used to sign (first key) and verify (all keys) Authorization tokens." json:"-"`
AutoSuspend bool `long:"auto-suspend" env:"AUTO_SUSPEND" description:"Automatically suspend journals which have persisted all fragments"`
Expand Down Expand Up @@ -102,12 +103,14 @@ func (cmdServe) Execute(args []string) error {
_, err = os.Stat(Config.Broker.FileRoot)
mbp.Must(err, "configured local file:// root failed")
fragment.FileSystemStoreRoot = Config.Broker.FileRoot
} else if Config.Broker.FileOnly {
mbp.Must(fmt.Errorf("--file-root is not configured"), "a file root must be defined when using --file-only")
}

broker.AutoSuspend = Config.Broker.AutoSuspend
broker.MaxAppendRate = int64(Config.Broker.MaxAppendRate)
broker.MinAppendRate = int64(Config.Broker.MinAppendRate)
fragment.DisableStores = Config.Broker.DisableStores
fragment.ForceFileStore = Config.Broker.FileOnly
pb.MaxReplication = int32(Config.Broker.MaxReplication)

var (
Expand Down

0 comments on commit 0b9a9d8

Please sign in to comment.