Skip to content

Commit

Permalink
wip: 🔕 temporary commit
Browse files Browse the repository at this point in the history
  • Loading branch information
tarampampam committed Nov 23, 2024
1 parent b0502c3 commit 7bdbd19
Show file tree
Hide file tree
Showing 7 changed files with 266 additions and 113 deletions.
7 changes: 6 additions & 1 deletion internal/cli/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,18 @@ func (cmd *command) Run(parentCtx context.Context, log *zap.Logger) error { //no
case storageDriverRedis:
db = storage.NewRedis(rdc, cmd.options.storage.sessionTTL, uint32(cmd.options.storage.maxRequests))
case storageDriverSQLite:
sqliteDb, sqliteErr := sql.Open("sqlite3", fmt.Sprintf("file:%s?_journal_mode=WAL&_txlock=immediate", cmd.options.sqlite.filePath)) //nolint:lll
sqliteDb, sqliteErr := sql.Open(
"sqlite3",
fmt.Sprintf("file:%s?_journal_mode=WAL&_txlock=immediate&_auto_vacuum=full", cmd.options.sqlite.filePath),
)
if sqliteErr != nil {
return fmt.Errorf("failed to open SQLite database: %w", sqliteErr)
}

defer func() { _ = sqliteDb.Close() }()

sqliteDb.SetMaxOpenConns(1) // important for SQLite

sqlite := storage.NewSQLite(sqliteDb, cmd.options.storage.sessionTTL, uint32(cmd.options.storage.maxRequests)) //nolint:contextcheck,lll
if err := sqlite.Migrate(ctx); err != nil {
return fmt.Errorf("failed to migrate SQLite database: %w", err)
Expand Down
132 changes: 86 additions & 46 deletions internal/storage/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ import (
)

type (
InMemory struct { // TODO: add session expiring check on every operation?
InMemory struct {
sessionTTL time.Duration
maxRequests uint32
sessions syncMap[ /* sID */ string, *sessionData]
cleanupInterval time.Duration

// this function returns the current time, it's used to mock the time in tests
timeNow func() time.Time

close chan struct{}
closed atomic.Bool
}
Expand All @@ -43,6 +46,11 @@ func WithInMemoryCleanupInterval(v time.Duration) InMemoryOption {
return func(s *InMemory) { s.cleanupInterval = v }
}

// WithInMemoryTimeNow sets the function that returns the current time.
func WithInMemoryTimeNow(fn func() time.Time) InMemoryOption {
return func(s *InMemory) { s.timeNow = fn }
}

// NewInMemory creates a new in-memory storage with the given session TTL and the maximum number of stored requests.
// Note that the cleanup goroutine is started automatically if the cleanup interval is greater than zero.
// To stop the cleanup goroutine and close the storage, call the InMemory.Close method.
Expand All @@ -52,6 +60,7 @@ func NewInMemory(sessionTTL time.Duration, maxRequests uint32, opts ...InMemoryO
maxRequests: maxRequests,
close: make(chan struct{}),
cleanupInterval: time.Second, // default cleanup interval
timeNow: time.Now, // by default use the stdlib time.Now function
}

for _, opt := range opts {
Expand Down Expand Up @@ -87,7 +96,7 @@ func (s *InMemory) cleanup() {
case <-s.close: // close signal received
return
case <-timer.C:
var now = time.Now()
var now = s.timeNow()

s.sessions.Range(func(sID string, data *sessionData) bool {
data.Lock()
Expand All @@ -106,14 +115,37 @@ func (s *InMemory) cleanup() {
}
}

func (s *InMemory) NewSession(ctx context.Context, session Session, id ...string) (sID string, _ error) {
// isSessionExists checks if the session with the specified ID exists and is not expired.
func (s *InMemory) isSessionExists(sID string) bool {
data, ok := s.sessions.Load(sID)
if !ok {
return false
}

data.Lock()
var expiresAt = data.session.ExpiresAt
data.Unlock()

return expiresAt.After(s.timeNow())
}

// isOpenAndNotDone checks if the storage is open and the context is not done.
func (s *InMemory) isOpenAndNotDone(ctx context.Context) error {
if err := ctx.Err(); err != nil {
return "", err // context is done
return err // context is done
} else if s.closed.Load() {
return "", ErrClosed // storage is closed
return ErrClosed // storage is closed
}

var now = time.Now()
return nil
}

func (s *InMemory) NewSession(ctx context.Context, session Session, id ...string) (sID string, _ error) {
if err := s.isOpenAndNotDone(ctx); err != nil {
return "", err // context is done
}

var now = s.timeNow()

if len(id) > 0 { // use the specified ID
if len(id[0]) == 0 {
Expand All @@ -138,10 +170,8 @@ func (s *InMemory) NewSession(ctx context.Context, session Session, id ...string
}

func (s *InMemory) GetSession(ctx context.Context, sID string) (*Session, error) {
if err := ctx.Err(); err != nil {
return nil, err // context is done
} else if s.closed.Load() {
return nil, ErrClosed // storage is closed
if err := s.isOpenAndNotDone(ctx); err != nil {
return nil, err
}

data, ok := s.sessions.Load(sID)
Expand All @@ -153,7 +183,7 @@ func (s *InMemory) GetSession(ctx context.Context, sID string) (*Session, error)
var expiresAt = data.session.ExpiresAt
data.Unlock()

if expiresAt.Before(time.Now()) {
if expiresAt.Before(s.timeNow()) {
s.sessions.Delete(sID)

return nil, ErrSessionNotFound // session has been expired
Expand All @@ -163,15 +193,17 @@ func (s *InMemory) GetSession(ctx context.Context, sID string) (*Session, error)
}

func (s *InMemory) AddSessionTTL(ctx context.Context, sID string, howMuch time.Duration) error {
if err := ctx.Err(); err != nil {
return err // context is done
} else if s.closed.Load() {
return ErrClosed // storage is closed
if err := s.isOpenAndNotDone(ctx); err != nil {
return err
}

if !s.isSessionExists(sID) {
return ErrSessionNotFound // session not found
}

data, ok := s.sessions.Load(sID)
if !ok {
return ErrSessionNotFound // session not found
return ErrSessionNotFound // like a fuse, because we already checked it
}

data.Lock()
Expand All @@ -182,10 +214,8 @@ func (s *InMemory) AddSessionTTL(ctx context.Context, sID string, howMuch time.D
}

func (s *InMemory) DeleteSession(ctx context.Context, sID string) error {
if err := ctx.Err(); err != nil {
return err // context is done
} else if s.closed.Load() {
return ErrClosed // storage is closed
if err := s.isOpenAndNotDone(ctx); err != nil {
return err
}

if data, ok := s.sessions.LoadAndDelete(sID); !ok {
Expand All @@ -202,18 +232,20 @@ func (s *InMemory) DeleteSession(ctx context.Context, sID string) error {
}

func (s *InMemory) NewRequest(ctx context.Context, sID string, r Request) (rID string, _ error) {
if err := ctx.Err(); err != nil {
return "", err // context is done
} else if s.closed.Load() {
return "", ErrClosed // storage is closed
if err := s.isOpenAndNotDone(ctx); err != nil {
return "", err
}

if !s.isSessionExists(sID) {
return "", ErrSessionNotFound // session not found
}

data, ok := s.sessions.Load(sID)
if !ok {
return "", ErrSessionNotFound // session not found
return "", ErrSessionNotFound // like a fuse, because we already checked it
}

rID, r.CreatedAtUnixMilli = s.newID(), time.Now().UnixMilli()
rID, r.CreatedAtUnixMilli = s.newID(), s.timeNow().UnixMilli()

data.requests.Store(rID, r)

Expand Down Expand Up @@ -244,15 +276,17 @@ func (s *InMemory) NewRequest(ctx context.Context, sID string, r Request) (rID s
}

func (s *InMemory) GetRequest(ctx context.Context, sID, rID string) (*Request, error) {
if err := ctx.Err(); err != nil {
return nil, err // context is done
} else if s.closed.Load() {
return nil, ErrClosed // storage is closed
if err := s.isOpenAndNotDone(ctx); err != nil {
return nil, err
}

if !s.isSessionExists(sID) {
return nil, ErrSessionNotFound // session not found
}

session, sessionOk := s.sessions.Load(sID)
if !sessionOk {
return nil, ErrSessionNotFound // session not found
return nil, ErrSessionNotFound // like a fuse, because we already checked it
}

if request, ok := session.requests.Load(rID); ok {
Expand All @@ -263,15 +297,17 @@ func (s *InMemory) GetRequest(ctx context.Context, sID, rID string) (*Request, e
}

func (s *InMemory) GetAllRequests(ctx context.Context, sID string) (map[string]Request, error) {
if err := ctx.Err(); err != nil {
return nil, err // context is done
} else if s.closed.Load() {
return nil, ErrClosed // storage is closed
if err := s.isOpenAndNotDone(ctx); err != nil {
return nil, err
}

if !s.isSessionExists(sID) {
return nil, ErrSessionNotFound // session not found
}

session, sessionOk := s.sessions.Load(sID)
if !sessionOk {
return nil, ErrSessionNotFound // session not found
return nil, ErrSessionNotFound // like a fuse, because we already checked it
}

var all = make(map[string]Request)
Expand All @@ -286,15 +322,17 @@ func (s *InMemory) GetAllRequests(ctx context.Context, sID string) (map[string]R
}

func (s *InMemory) DeleteRequest(ctx context.Context, sID, rID string) error {
if err := ctx.Err(); err != nil {
return err // context is done
} else if s.closed.Load() {
return ErrClosed // storage is closed
if err := s.isOpenAndNotDone(ctx); err != nil {
return err
}

if !s.isSessionExists(sID) {
return ErrSessionNotFound // session not found
}

session, sessionOk := s.sessions.Load(sID)
if !sessionOk {
return ErrSessionNotFound // session not found
return ErrSessionNotFound // like a fuse, because we already checked it
}

if _, ok := session.requests.LoadAndDelete(rID); ok {
Expand All @@ -305,15 +343,17 @@ func (s *InMemory) DeleteRequest(ctx context.Context, sID, rID string) error {
}

func (s *InMemory) DeleteAllRequests(ctx context.Context, sID string) error {
if err := ctx.Err(); err != nil {
return err // context is done
} else if s.closed.Load() {
return ErrClosed // storage is closed
if err := s.isOpenAndNotDone(ctx); err != nil {
return err
}

if !s.isSessionExists(sID) {
return ErrSessionNotFound // session not found
}

session, sessionOk := s.sessions.Load(sID)
if !sessionOk {
return ErrSessionNotFound // session not found
return ErrSessionNotFound // like a fuse, because we already checked it
}

// delete all session requests
Expand Down
16 changes: 12 additions & 4 deletions internal/storage/inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,26 @@ import (
func TestInMemory_Session_CreateReadDelete(t *testing.T) {
t.Parallel()

var ft = newFakeTime()

testSessionCreateReadDelete(t,
func(sTTL time.Duration, maxReq uint32) storage.Storage { return storage.NewInMemory(sTTL, maxReq) },
func(t time.Duration) { <-time.After(t) },
func(sTTL time.Duration, maxReq uint32) storage.Storage {
return storage.NewInMemory(sTTL, maxReq, storage.WithInMemoryTimeNow(ft.Get))
},
func(t time.Duration) { ft.Add(t) },
)
}

func TestInMemory_Request_CreateReadDelete(t *testing.T) {
t.Parallel()

var ft = newFakeTime()

testRequestCreateReadDelete(t,
func(sTTL time.Duration, maxReq uint32) storage.Storage { return storage.NewInMemory(sTTL, maxReq) },
func(t time.Duration) { <-time.After(t) },
func(sTTL time.Duration, maxReq uint32) storage.Storage {
return storage.NewInMemory(sTTL, maxReq, storage.WithInMemoryTimeNow(ft.Get))
},
func(t time.Duration) { ft.Add(t) },
)
}

Expand Down
Loading

0 comments on commit 7bdbd19

Please sign in to comment.