From fe7a8411a6a7a871d42cda8f3948a5d0803aca99 Mon Sep 17 00:00:00 2001 From: jojo Date: Wed, 11 Oct 2023 19:40:31 -0300 Subject: [PATCH] :sparkles: feaT: concurrent fs operations --- Makefile | 2 +- cmd/metrica/main.go | 4 +- handler.go | 69 ++++++++++++++++++++++++ handler_test.go | 125 ++++++++++++++++++++++++++++++++++++++++++++ metrica.go | 36 ------------- metrica_test.go | 59 --------------------- storage.go | 75 ++++++++++++++++++++++---- storage_test.go | 32 +++++++++--- 8 files changed, 288 insertions(+), 114 deletions(-) create mode 100644 handler.go create mode 100644 handler_test.go delete mode 100644 metrica.go delete mode 100644 metrica_test.go diff --git a/Makefile b/Makefile index 374a8ca..6259b97 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,7 @@ devrun=docker run $(devrunopts) --rm \ ## run isolated tests .PHONY: test test: - go test -run="$(testcase)" ./... -timeout 10s -race -shuffle on + go test -run="$(testcase)" ./... -cover -race -shuffle on ## Run lint .PHONY: lint diff --git a/cmd/metrica/main.go b/cmd/metrica/main.go index 023b798..6330521 100644 --- a/cmd/metrica/main.go +++ b/cmd/metrica/main.go @@ -4,14 +4,16 @@ package main import ( "log/slog" "net/http" + "sync" "github.com/perebaj/metrica" ) func main() { c := metrica.NewAtomicCounter() + fs := metrica.NewFileStorage(&sync.Mutex{}, "counters.txt") + mux := metrica.Handler(c, fs) - mux := metrica.Handler(c) slog.Info("Starting server", "port", 8080) err := http.ListenAndServe(":8080", mux) diff --git a/handler.go b/handler.go new file mode 100644 index 0000000..0810c4e --- /dev/null +++ b/handler.go @@ -0,0 +1,69 @@ +package metrica + +import ( + "encoding/json" + "net/http" + "sync" + "time" + + "log/slog" +) + +type countResponse struct { + Count int64 +} + +// Handler returns a http.Handler for new endpoints. +func Handler(c *AtomicCounter, fs *FileStorage) http.Handler { + mux := http.NewServeMux() + mux.HandleFunc("/count", func(w http.ResponseWriter, r *http.Request) { + counter(w, r, c) + }) + mux.HandleFunc("/countfs", func(w http.ResponseWriter, r *http.Request) { + counterFs(w, r, fs) + }) + return mux +} + +func counter(w http.ResponseWriter, _ *http.Request, c *AtomicCounter) { + c.Inc(1) + send(w, http.StatusOK, countResponse{Count: c.Value()}) +} + +func counterFs(w http.ResponseWriter, _ *http.Request, fs *FileStorage) { + var wg sync.WaitGroup + var counter int64 + + c := Counter(time.Now()) + + for i := 0; i < 1; i++ { + wg.Add(1) + go func() { + defer wg.Done() + err := fs.Write(c) + if err != nil { + send(w, http.StatusInternalServerError, err) + return + } + + counters, err := fs.Read() + if err != nil { + send(w, http.StatusInternalServerError, err) + return + } + counter = counters.Count60sec() + }() + } + wg.Wait() + send(w, http.StatusOK, countResponse{Count: counter}) +} + +func send(w http.ResponseWriter, statusCode int, body interface{}) { + const jsonContentType = "application/json; charset=utf-8" + + w.Header().Set("Content-Type", jsonContentType) + w.WriteHeader(statusCode) + if err := json.NewEncoder(w).Encode(body); err != nil { + slog.Error("Unable to encode body as JSON", "error", err) + } +} diff --git a/handler_test.go b/handler_test.go new file mode 100644 index 0000000..ddae6ef --- /dev/null +++ b/handler_test.go @@ -0,0 +1,125 @@ +package metrica + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "os" + "sync" + "testing" +) + +func TestHandlerCount_Sequential(t *testing.T) { + c := NewAtomicCounter() + mux := Handler(c, nil) + + for i := 0; i < 100; i++ { + req := httptest.NewRequest("GET", "/count", nil) + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Errorf("expected status OK; got %v", w.Code) + } + + var gotRes countResponse + if err := json.NewDecoder(w.Body).Decode(&gotRes); err != nil { + t.Errorf("unable to decode body: %v", err) + } + + wantCount := int64(i + 1) + assert(t, wantCount, gotRes.Count) + } +} + +func TestHandlerCount_Concurrent(t *testing.T) { + c := NewAtomicCounter() + mux := Handler(c, nil) + var wg sync.WaitGroup + + for i := 0; i < 1000; i++ { + wg.Add(1) + go func() { + req := httptest.NewRequest("GET", "/count", nil) + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Errorf("expected status OK; got %v", w.Code) + } + defer wg.Done() + }() + } + wg.Wait() + + assert(t, int64(1000), c.Value()) +} + +func TestHandlerCountFS_Sequential(t *testing.T) { + f, err := os.Create("test") + if err != nil { + t.Fatalf("error creating temp file: %v", err) + } + defer func() { + _ = os.Remove(f.Name()) + }() + + fs := NewFileStorage(&sync.Mutex{}, f.Name()) + mux := Handler(nil, fs) + + for i := 0; i < 100; i++ { + req := httptest.NewRequest("GET", "/countfs", nil) + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Errorf("expected status OK; got %v", w.Code) + } + + var gotRes countResponse + if err := json.NewDecoder(w.Body).Decode(&gotRes); err != nil { + t.Errorf("unable to decode body: %v", err) + } + + wantCount := int64(i + 1) + assert(t, wantCount, gotRes.Count) + } +} + +func TestHandlerCountFS_Concurrent(t *testing.T) { + f, err := os.Create("test") + if err != nil { + t.Fatalf("error creating temp file: %v", err) + } + defer func() { + _ = os.Remove(f.Name()) + }() + + fs := NewFileStorage(&sync.Mutex{}, f.Name()) + + mux := Handler(nil, fs) + var wg sync.WaitGroup + + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + req := httptest.NewRequest("GET", "/countfs", nil) + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Errorf("expected status OK; got %v", w.Code) + } + defer wg.Done() + }() + } + wg.Wait() + + got, err := fs.Read() + if err != nil { + t.Fatalf("error reading file: %v", err) + } + assert(t, int64(100), got.Count60sec()) +} + +func assert(t *testing.T, want interface{}, got interface{}) { + if want != got { + t.Errorf("expected %v; got %v", want, got) + } +} diff --git a/metrica.go b/metrica.go deleted file mode 100644 index 9a392e7..0000000 --- a/metrica.go +++ /dev/null @@ -1,36 +0,0 @@ -package metrica - -import ( - "encoding/json" - "net/http" - - "log/slog" -) - -type countResponse struct { - Count int -} - -// Handler returns a http.Handler for new endpoints. -func Handler(c *AtomicCounter) http.Handler { - mux := http.NewServeMux() - mux.HandleFunc("/count", func(w http.ResponseWriter, r *http.Request) { - counter(w, r, c) - }) - return mux -} - -func counter(w http.ResponseWriter, _ *http.Request, c *AtomicCounter) { - c.Inc(1) - send(w, http.StatusOK, countResponse{Count: int(c.Value())}) -} - -func send(w http.ResponseWriter, statusCode int, body interface{}) { - const jsonContentType = "application/json; charset=utf-8" - - w.Header().Set("Content-Type", jsonContentType) - w.WriteHeader(statusCode) - if err := json.NewEncoder(w).Encode(body); err != nil { - slog.Error("Unable to encode body as JSON", "error", err) - } -} diff --git a/metrica_test.go b/metrica_test.go deleted file mode 100644 index f763665..0000000 --- a/metrica_test.go +++ /dev/null @@ -1,59 +0,0 @@ -package metrica - -import ( - "encoding/json" - "net/http" - "net/http/httptest" - "sync" - "testing" -) - -func TestHandlerCount_Sequential(t *testing.T) { - c := NewAtomicCounter() - mux := Handler(c) - - for i := 0; i < 100; i++ { - req := httptest.NewRequest("GET", "/count", nil) - w := httptest.NewRecorder() - mux.ServeHTTP(w, req) - if w.Code != http.StatusOK { - t.Errorf("expected status OK; got %v", w.Code) - } - - var gotRes countResponse - if err := json.NewDecoder(w.Body).Decode(&gotRes); err != nil { - t.Errorf("unable to decode body: %v", err) - } - - wantCount := i + 1 - assert(t, wantCount, gotRes.Count) - } -} - -func TestHandlerCount_Concurrent(t *testing.T) { - c := NewAtomicCounter() - mux := Handler(c) - var wg sync.WaitGroup - - for i := 0; i < 1000; i++ { - wg.Add(1) - go func() { - req := httptest.NewRequest("GET", "/count", nil) - w := httptest.NewRecorder() - mux.ServeHTTP(w, req) - if w.Code != http.StatusOK { - t.Errorf("expected status OK; got %v", w.Code) - } - defer wg.Done() - }() - } - wg.Wait() - - assert(t, int64(1000), c.Value()) -} - -func assert(t *testing.T, want interface{}, got interface{}) { - if want != got { - t.Errorf("expected %v; got %v", want, got) - } -} diff --git a/storage.go b/storage.go index ab91ad9..95277c5 100644 --- a/storage.go +++ b/storage.go @@ -1,3 +1,4 @@ +// Package metrica (storage.go) implements a version of a storage using the filesystem package metrica import ( @@ -5,17 +6,41 @@ import ( "io" "os" "strings" + "sync" "time" ) -// Counter is a struct that holds the datetime of the request -type Counter struct { - Datetime string +// Counter is an alias for time.Time +type Counter time.Time + +// Counters is a list of Counter +type Counters []Counter + +// FileStorage is a struct that implements the Storage interface +type FileStorage struct { + mu *sync.Mutex + fileName string +} + +// NewFileStorage initializes a new FileStorage +func NewFileStorage(mu *sync.Mutex, fileName string) *FileStorage { + return &FileStorage{ + mu: mu, + fileName: fileName, + } } // Write writes the datetime to the file -func Write(f *os.File, c Counter) error { - _, err := f.WriteString(fmt.Sprintf("%v\n", c.Datetime)) +func (m *FileStorage) Write(c Counter) error { + m.mu.Lock() + defer m.mu.Unlock() + + f, err := os.OpenFile(m.fileName, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("error opening file: %v", err) + } + + _, err = f.WriteString(fmt.Sprintf("%v\n", c.Format())) if err != nil { return fmt.Errorf("error writing file: %v", err) } @@ -23,8 +48,11 @@ func Write(f *os.File, c Counter) error { } // Read reads the datetime from the file -func Read(filename string) ([]time.Time, error) { - f, err := os.Open(filename) +func (m *FileStorage) Read() (Counters, error) { + m.mu.Lock() + defer m.mu.Unlock() + + f, err := os.Open(m.fileName) if err != nil { return nil, fmt.Errorf("error opening file: %v", err) } @@ -43,14 +71,39 @@ func Read(filename string) ([]time.Time, error) { datetimes = datetimes[:len(datetimes)-1] } - var parsedDatetime []time.Time + return parseDatetimesToCounters(datetimes) +} + +// Format returns the datetime in RFC3339Nano format +func (c Counter) Format() string { + return time.Time(c).Format(time.RFC3339Nano) +} + +// Count60sec returns the number of datetimes in the last 60 seconds +func (cs Counters) Count60sec() int64 { + currentTime := time.Now() + var count int64 + + lowerLimit := currentTime.Add(-60 * time.Second) + for _, c := range cs { + t := time.Time(c) + + if t.After(lowerLimit) && t.Before(currentTime) { + count++ + } + } + + return count +} + +func parseDatetimesToCounters(datetimes []string) (Counters, error) { + var counters []Counter for _, datetime := range datetimes { t, err := time.Parse(time.RFC3339Nano, datetime) if err != nil { return nil, fmt.Errorf("error parsing datetime: %v", err) } - parsedDatetime = append(parsedDatetime, t) - + counters = append(counters, Counter(t)) } - return parsedDatetime, nil + return counters, nil } diff --git a/storage_test.go b/storage_test.go index a14e500..c623e88 100644 --- a/storage_test.go +++ b/storage_test.go @@ -2,14 +2,13 @@ package metrica import ( "os" + "sync" "testing" "time" ) func TestWriteRead(t *testing.T) { - c := Counter{ - Datetime: time.Now().Format(time.RFC3339Nano), - } + c := Counter(time.Now()) f, err := os.CreateTemp("", "test") if err != nil { @@ -19,18 +18,39 @@ func TestWriteRead(t *testing.T) { _ = os.Remove(f.Name()) }() - if err := Write(f, c); err != nil { + fs := NewFileStorage(&sync.Mutex{}, f.Name()) + + if err := fs.Write(c); err != nil { t.Fatalf("error writing to file: %v", err) } - got, err := Read(f.Name()) + got, err := fs.Read() if err != nil { t.Fatalf("error reading file: %v", err) } if len(got) == 1 { - assert(t, got[0].Format(time.RFC3339Nano), c.Datetime) + assert(t, c.Format(), got[0].Format()) } else { t.Fatalf("expected 1; got %v", len(got)) } } + +func TestCountersCount60sec(t *testing.T) { + currentTime := time.Now() + counters := Counters{ + Counter(currentTime.Add(-time.Second * 10)), + Counter(currentTime.Add(-time.Second * 10)), + Counter(currentTime.Add(-time.Second * 20)), + Counter(currentTime.Add(-time.Second * 30)), + Counter(currentTime.Add(-time.Second * 40)), + Counter(currentTime.Add(-time.Second * 50)), + Counter(currentTime.Add(-time.Second * 60)), + } + want := int64(6) + got := counters.Count60sec() + if got != want { + t.Fatalf("expected %v; got %v", want, got) + } + +}