diff --git a/Makefile b/Makefile index 26bc7ae..6259b97 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -export GO_VERSION=1.21.1 +export GO_VERSION=1.21.3 export GOLANGCI_LINT_VERSION=v1.54.0 # configuration/aliases @@ -16,7 +16,7 @@ devrun=docker run $(devrunopts) --rm \ ## run isolated tests .PHONY: test test: - go test -run="$(testcase)" ./... -timeout 10s -race -shuffle on + go test -run="$(testcase)" ./... -cover -race -shuffle on ## Run lint .PHONY: lint diff --git a/README.md b/README.md index 78d9280..528c739 100644 --- a/README.md +++ b/README.md @@ -21,3 +21,4 @@ Some resources that were useful to solve it - [Mutex](https://golangbot.com/mutex/) - [Atomic Counters](https://gobyexample.com/atomic-counters) - [Bjorn Rabenstein - Prometheus: Designing and Implementing a Modern Monitoring Solution in Go](https://www.youtube.com/watch?v=1V7eJ0jN8-E) +- [go file mutex](https://echorand.me/posts/go-file-mutex/) diff --git a/cmd/metrica/main.go b/cmd/metrica/main.go index 023b798..471d4fe 100644 --- a/cmd/metrica/main.go +++ b/cmd/metrica/main.go @@ -4,14 +4,16 @@ package main import ( "log/slog" "net/http" + "sync" "github.com/perebaj/metrica" ) func main() { c := metrica.NewAtomicCounter() + fs := metrica.NewFileStorage(&sync.Mutex{}, "metrica.txt") + mux := metrica.Handler(c, fs) - mux := metrica.Handler(c) slog.Info("Starting server", "port", 8080) err := http.ListenAndServe(":8080", mux) diff --git a/go.mod b/go.mod index 59cbd87..2c73531 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/perebaj/metrica -go 1.21.0 +go 1.21.3 require ( github.com/golangci/golangci-lint v1.54.2 diff --git a/handler.go b/handler.go new file mode 100644 index 0000000..0810c4e --- /dev/null +++ b/handler.go @@ -0,0 +1,69 @@ +package metrica + +import ( + "encoding/json" + "net/http" + "sync" + "time" + + "log/slog" +) + +type countResponse struct { + Count int64 +} + +// Handler returns a http.Handler for new endpoints. +func Handler(c *AtomicCounter, fs *FileStorage) http.Handler { + mux := http.NewServeMux() + mux.HandleFunc("/count", func(w http.ResponseWriter, r *http.Request) { + counter(w, r, c) + }) + mux.HandleFunc("/countfs", func(w http.ResponseWriter, r *http.Request) { + counterFs(w, r, fs) + }) + return mux +} + +func counter(w http.ResponseWriter, _ *http.Request, c *AtomicCounter) { + c.Inc(1) + send(w, http.StatusOK, countResponse{Count: c.Value()}) +} + +func counterFs(w http.ResponseWriter, _ *http.Request, fs *FileStorage) { + var wg sync.WaitGroup + var counter int64 + + c := Counter(time.Now()) + + for i := 0; i < 1; i++ { + wg.Add(1) + go func() { + defer wg.Done() + err := fs.Write(c) + if err != nil { + send(w, http.StatusInternalServerError, err) + return + } + + counters, err := fs.Read() + if err != nil { + send(w, http.StatusInternalServerError, err) + return + } + counter = counters.Count60sec() + }() + } + wg.Wait() + send(w, http.StatusOK, countResponse{Count: counter}) +} + +func send(w http.ResponseWriter, statusCode int, body interface{}) { + const jsonContentType = "application/json; charset=utf-8" + + w.Header().Set("Content-Type", jsonContentType) + w.WriteHeader(statusCode) + if err := json.NewEncoder(w).Encode(body); err != nil { + slog.Error("Unable to encode body as JSON", "error", err) + } +} diff --git a/handler_test.go b/handler_test.go new file mode 100644 index 0000000..ddae6ef --- /dev/null +++ b/handler_test.go @@ -0,0 +1,125 @@ +package metrica + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "os" + "sync" + "testing" +) + +func TestHandlerCount_Sequential(t *testing.T) { + c := NewAtomicCounter() + mux := Handler(c, nil) + + for i := 0; i < 100; i++ { + req := httptest.NewRequest("GET", "/count", nil) + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Errorf("expected status OK; got %v", w.Code) + } + + var gotRes countResponse + if err := json.NewDecoder(w.Body).Decode(&gotRes); err != nil { + t.Errorf("unable to decode body: %v", err) + } + + wantCount := int64(i + 1) + assert(t, wantCount, gotRes.Count) + } +} + +func TestHandlerCount_Concurrent(t *testing.T) { + c := NewAtomicCounter() + mux := Handler(c, nil) + var wg sync.WaitGroup + + for i := 0; i < 1000; i++ { + wg.Add(1) + go func() { + req := httptest.NewRequest("GET", "/count", nil) + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Errorf("expected status OK; got %v", w.Code) + } + defer wg.Done() + }() + } + wg.Wait() + + assert(t, int64(1000), c.Value()) +} + +func TestHandlerCountFS_Sequential(t *testing.T) { + f, err := os.Create("test") + if err != nil { + t.Fatalf("error creating temp file: %v", err) + } + defer func() { + _ = os.Remove(f.Name()) + }() + + fs := NewFileStorage(&sync.Mutex{}, f.Name()) + mux := Handler(nil, fs) + + for i := 0; i < 100; i++ { + req := httptest.NewRequest("GET", "/countfs", nil) + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Errorf("expected status OK; got %v", w.Code) + } + + var gotRes countResponse + if err := json.NewDecoder(w.Body).Decode(&gotRes); err != nil { + t.Errorf("unable to decode body: %v", err) + } + + wantCount := int64(i + 1) + assert(t, wantCount, gotRes.Count) + } +} + +func TestHandlerCountFS_Concurrent(t *testing.T) { + f, err := os.Create("test") + if err != nil { + t.Fatalf("error creating temp file: %v", err) + } + defer func() { + _ = os.Remove(f.Name()) + }() + + fs := NewFileStorage(&sync.Mutex{}, f.Name()) + + mux := Handler(nil, fs) + var wg sync.WaitGroup + + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + req := httptest.NewRequest("GET", "/countfs", nil) + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Errorf("expected status OK; got %v", w.Code) + } + defer wg.Done() + }() + } + wg.Wait() + + got, err := fs.Read() + if err != nil { + t.Fatalf("error reading file: %v", err) + } + assert(t, int64(100), got.Count60sec()) +} + +func assert(t *testing.T, want interface{}, got interface{}) { + if want != got { + t.Errorf("expected %v; got %v", want, got) + } +} diff --git a/metrica.go b/metrica.go deleted file mode 100644 index 9a392e7..0000000 --- a/metrica.go +++ /dev/null @@ -1,36 +0,0 @@ -package metrica - -import ( - "encoding/json" - "net/http" - - "log/slog" -) - -type countResponse struct { - Count int -} - -// Handler returns a http.Handler for new endpoints. -func Handler(c *AtomicCounter) http.Handler { - mux := http.NewServeMux() - mux.HandleFunc("/count", func(w http.ResponseWriter, r *http.Request) { - counter(w, r, c) - }) - return mux -} - -func counter(w http.ResponseWriter, _ *http.Request, c *AtomicCounter) { - c.Inc(1) - send(w, http.StatusOK, countResponse{Count: int(c.Value())}) -} - -func send(w http.ResponseWriter, statusCode int, body interface{}) { - const jsonContentType = "application/json; charset=utf-8" - - w.Header().Set("Content-Type", jsonContentType) - w.WriteHeader(statusCode) - if err := json.NewEncoder(w).Encode(body); err != nil { - slog.Error("Unable to encode body as JSON", "error", err) - } -} diff --git a/metrica_test.go b/metrica_test.go deleted file mode 100644 index d6ebad5..0000000 --- a/metrica_test.go +++ /dev/null @@ -1,77 +0,0 @@ -package metrica - -import ( - "encoding/json" - "net/http" - "net/http/httptest" - "sync" - "testing" -) - -func TestHandlerCount(t *testing.T) { - req := httptest.NewRequest("GET", "/count", nil) - w := httptest.NewRecorder() - c := AtomicCounter{} - mux := Handler(&c) - mux.ServeHTTP(w, req) - if w.Code != http.StatusOK { - t.Errorf("expected status OK; got %v", w.Code) - } - - var got countResponse - if err := json.NewDecoder(w.Body).Decode(&got); err != nil { - t.Errorf("unable to decode body: %v", err) - } - - assert(t, 1, got.Count) -} - -func TestHandlerCount_Multiple(t *testing.T) { - c := NewAtomicCounter() - mux := Handler(c) - - for i := 0; i < 100; i++ { - req := httptest.NewRequest("GET", "/count", nil) - w := httptest.NewRecorder() - mux.ServeHTTP(w, req) - if w.Code != http.StatusOK { - t.Errorf("expected status OK; got %v", w.Code) - } - - var gotRes countResponse - if err := json.NewDecoder(w.Body).Decode(&gotRes); err != nil { - t.Errorf("unable to decode body: %v", err) - } - - wantCount := i + 1 - assert(t, wantCount, gotRes.Count) - } -} - -func TestHandlerCount_Concurrent(t *testing.T) { - c := NewAtomicCounter() - mux := Handler(c) - var wg sync.WaitGroup - - for i := 0; i < 1000; i++ { - wg.Add(1) - go func() { - req := httptest.NewRequest("GET", "/count", nil) - w := httptest.NewRecorder() - mux.ServeHTTP(w, req) - if w.Code != http.StatusOK { - t.Errorf("expected status OK; got %v", w.Code) - } - defer wg.Done() - }() - } - wg.Wait() - - assert(t, int64(1000), c.Value()) -} - -func assert(t *testing.T, want interface{}, got interface{}) { - if want != got { - t.Errorf("expected %v; got %v", want, got) - } -} diff --git a/storage.go b/storage.go new file mode 100644 index 0000000..dd9a078 --- /dev/null +++ b/storage.go @@ -0,0 +1,109 @@ +// Package metrica (storage.go) implements a version of a storage using the filesystem +package metrica + +import ( + "fmt" + "io" + "os" + "strings" + "sync" + "time" +) + +// Counter is an alias for time.Time +type Counter time.Time + +// Counters is a list of Counters +type Counters []Counter + +// FileStorage is a struct that implements the Storage methods +type FileStorage struct { + mu *sync.Mutex + fileName string +} + +// NewFileStorage initializes a new FileStorage +func NewFileStorage(mu *sync.Mutex, fileName string) *FileStorage { + return &FileStorage{ + mu: mu, + fileName: fileName, + } +} + +// Write save a Counter in the file +func (m *FileStorage) Write(c Counter) error { + m.mu.Lock() + defer m.mu.Unlock() + + f, err := os.OpenFile(m.fileName, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("error opening file: %v", err) + } + + _, err = f.WriteString(fmt.Sprintf("%v\n", c.Format())) + if err != nil { + return fmt.Errorf("error writing file: %v", err) + } + return nil +} + +// Read returns a list of Counters +func (m *FileStorage) Read() (Counters, error) { + m.mu.Lock() + defer m.mu.Unlock() + + f, err := os.Open(m.fileName) + if err != nil { + return nil, fmt.Errorf("error opening file: %v", err) + } + + defer func() { + _ = f.Close() + }() + + fileData, err := io.ReadAll(f) + if err != nil { + return nil, fmt.Errorf("error reading file: %v", err) + } + + datetimes := strings.Split(string(fileData), "\n") + if datetimes[len(datetimes)-1] == "" { + datetimes = datetimes[:len(datetimes)-1] + } + + return parseDatetimesToCounters(datetimes) +} + +// Format returns the datetime in RFC3339Nano format +func (c Counter) Format() string { + return time.Time(c).Format(time.RFC3339Nano) +} + +// Count60sec returns the number of datetimes in the last 60 seconds +func (cs Counters) Count60sec() int64 { + currentTime := time.Now() + var count int64 + + lowerLimit := currentTime.Add(-60 * time.Second) + for _, c := range cs { + t := time.Time(c) + + if t.After(lowerLimit) && t.Before(currentTime) { + count++ + } + } + + return count +} + +func parseDatetimesToCounters(datetimes []string) (Counters, error) { + var counters []Counter + for _, datetime := range datetimes { + t, err := time.Parse(time.RFC3339Nano, datetime) + if err != nil { + return nil, fmt.Errorf("error parsing datetime: %v", err) + } + counters = append(counters, Counter(t)) + } + return counters, nil +} diff --git a/storage_test.go b/storage_test.go new file mode 100644 index 0000000..c623e88 --- /dev/null +++ b/storage_test.go @@ -0,0 +1,56 @@ +package metrica + +import ( + "os" + "sync" + "testing" + "time" +) + +func TestWriteRead(t *testing.T) { + c := Counter(time.Now()) + + f, err := os.CreateTemp("", "test") + if err != nil { + t.Fatalf("error creating temp file: %v", err) + } + defer func() { + _ = os.Remove(f.Name()) + }() + + fs := NewFileStorage(&sync.Mutex{}, f.Name()) + + if err := fs.Write(c); err != nil { + t.Fatalf("error writing to file: %v", err) + } + + got, err := fs.Read() + if err != nil { + t.Fatalf("error reading file: %v", err) + } + + if len(got) == 1 { + assert(t, c.Format(), got[0].Format()) + } else { + t.Fatalf("expected 1; got %v", len(got)) + } +} + +func TestCountersCount60sec(t *testing.T) { + currentTime := time.Now() + counters := Counters{ + Counter(currentTime.Add(-time.Second * 10)), + Counter(currentTime.Add(-time.Second * 10)), + Counter(currentTime.Add(-time.Second * 20)), + Counter(currentTime.Add(-time.Second * 30)), + Counter(currentTime.Add(-time.Second * 40)), + Counter(currentTime.Add(-time.Second * 50)), + Counter(currentTime.Add(-time.Second * 60)), + } + want := int64(6) + got := counters.Count60sec() + if got != want { + t.Fatalf("expected %v; got %v", want, got) + } + +}