diff --git a/event.go b/event.go new file mode 100644 index 00000000..d803f77e --- /dev/null +++ b/event.go @@ -0,0 +1,9 @@ +package storage + +// EventType represents the type of event kind +type EventType uint8 + +const ( + EventPutObject EventType = iota + 1 + EventDeleteObject +) diff --git a/go.mod b/go.mod index cf2c4e99..235ac4eb 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/aliyun/aliyun-oss-go-sdk v2.2.0+incompatible github.com/aws/aws-sdk-go v1.43.16 github.com/baidubce/bce-sdk-go v0.9.105 + github.com/fsnotify/fsnotify v1.5.1 github.com/gophercloud/gophercloud v0.24.0 github.com/oracle/oci-go-sdk v24.3.0+incompatible github.com/stretchr/testify v1.7.0 diff --git a/go.sum b/go.sum index 381cf8aa..b0b5d7ce 100644 --- a/go.sum +++ b/go.sum @@ -137,6 +137,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.m github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= +github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= diff --git a/local.go b/local.go index 2171af22..2ab51a90 100644 --- a/local.go +++ b/local.go @@ -17,11 +17,15 @@ limitations under the License. package storage import ( + "fmt" "io/ioutil" "os" + "sync" pathutil "path" "path/filepath" + + "github.com/fsnotify/fsnotify" ) // LocalFilesystemBackend is a storage backend for local filesystem storage @@ -29,16 +33,81 @@ type LocalFilesystemBackend struct { RootDirectory string } +type NewLocalFilesystemBackendOption struct { + notifier map[EventType]func() +} + +var watcherOnce sync.Once +var globalWatcher *fsnotify.Watcher + +func WithEventNotifier(e EventType, fn func()) func(*NewLocalFilesystemBackendOption) { + return func(option *NewLocalFilesystemBackendOption) { + if option.notifier == nil { + option.notifier = make(map[EventType]func()) + } + option.notifier[e] = fn + } +} + // NewLocalFilesystemBackend creates a new instance of LocalFilesystemBackend -func NewLocalFilesystemBackend(rootDirectory string) *LocalFilesystemBackend { +func NewLocalFilesystemBackend(rootDirectory string, opts ...func(*NewLocalFilesystemBackendOption)) *LocalFilesystemBackend { + var option NewLocalFilesystemBackendOption + for _, opt := range opts { + opt(&option) + } absPath, err := filepath.Abs(rootDirectory) if err != nil { panic(err) } - b := &LocalFilesystemBackend{RootDirectory: absPath} + b := &LocalFilesystemBackend{ + RootDirectory: absPath, + } + + watcherOnce.Do(func() { + globalWatcher, err = fsnotify.NewWatcher() + if err != nil { + panic(err) + } + // since it is a longTerm watcher , we do not need to close it + go func() { + for { + select { + case event, ok := <-globalWatcher.Events: + if !ok { + continue + } + + switch event.Op { + case fsnotify.Write, fsnotify.Create: + if fn, ok := option.notifier[EventPutObject]; ok { + fn() + } + case fsnotify.Remove: + if fn, ok := option.notifier[EventDeleteObject]; ok { + fn() + } + } + case _, ok := <-globalWatcher.Errors: + if !ok { + continue + } + } + } + }() + + if err := globalWatcher.Add(b.RootDirectory); err != nil { + panic(err) + } + }) + return b } +func (b LocalFilesystemBackend) AddWatcherPath(path string) error { + // TODO: to ensure that we should lock here ? + return globalWatcher.Add(path) +} + // ListObjects lists all objects in root directory (depth 1) func (b LocalFilesystemBackend) ListObjects(prefix string) ([]Object, error) { var objects []Object @@ -84,6 +153,7 @@ func (b LocalFilesystemBackend) PutObject(path string, content []byte) error { _, err := os.Stat(folderPath) if err != nil { if os.IsNotExist(err) { + // NOTE: works for dynamic depth tenant err := os.MkdirAll(folderPath, 0774) if err != nil { return err @@ -95,17 +165,25 @@ func (b LocalFilesystemBackend) PutObject(path string, content []byte) error { if err != nil { return err } + // also adds the fsnotify watcher path + if err := b.AddWatcherPath(folderPath); err != nil { + return err + } } else { return err } } - err = ioutil.WriteFile(fullpath, content, 0644) - return err + if err = ioutil.WriteFile(fullpath, content, 0644); err != nil { + return err + } + return nil } // DeleteObject removes an object from root directory func (b LocalFilesystemBackend) DeleteObject(path string) error { fullpath := pathutil.Join(b.RootDirectory, path) - err := os.Remove(fullpath) - return err + if err := os.Remove(fullpath); err != nil { + return fmt.Errorf("failed to delete object %s: %w", path, err) + } + return nil }