From e285b5c4b2dacbe2db1f4afcc7b9c8ac1e52317f Mon Sep 17 00:00:00 2001 From: Lanre Adelowo Date: Sat, 20 Jan 2024 21:13:02 +0100 Subject: [PATCH] Implementation of requests ingestion (#4) * start working on ingesting and storing http requests * ingest and store request data * add database tests --- cmd/http.go | 3 +- datastore/postgres/ingest.go | 26 +++++++ datastore/postgres/ingest_test.go | 33 +++++++++ ...20165926_create_http_ingest_table.down.sql | 1 + ...0120165926_create_http_ingest_table.up.sql | 10 +++ datastore/postgres/url.go | 26 +++++++ datastore/postgres/url_test.go | 19 +++++ ingest.go | 37 ++++++++++ internal/util/ip.go | 35 +++++++++ server/httpd/http.go | 12 ++-- server/httpd/url.go | 71 ++++++++++++++++++- url.go | 14 ++++ 12 files changed, 279 insertions(+), 8 deletions(-) create mode 100644 datastore/postgres/ingest.go create mode 100644 datastore/postgres/ingest_test.go create mode 100644 datastore/postgres/migrations/20240120165926_create_http_ingest_table.down.sql create mode 100644 datastore/postgres/migrations/20240120165926_create_http_ingest_table.up.sql create mode 100644 ingest.go create mode 100644 internal/util/ip.go diff --git a/cmd/http.go b/cmd/http.go index 060f658..f72a29d 100644 --- a/cmd/http.go +++ b/cmd/http.go @@ -40,6 +40,7 @@ func createHTTPCommand(cmd *cobra.Command, cfg *config.Config) { } urlStore := postgres.NewURLRepositoryTable(db) + ingestStore := postgres.NewIngestRepository(db) hostName, err := os.Hostname() if err != nil { @@ -49,7 +50,7 @@ func createHTTPCommand(cmd *cobra.Command, cfg *config.Config) { logger := logrus.WithField("host", hostName). WithField("module", "http.server") - httpServer := httpd.New(*cfg, urlStore, logger) + httpServer := httpd.New(*cfg, urlStore, ingestStore, logger) go func() { logger.Debug("starting HTTP server") diff --git a/datastore/postgres/ingest.go b/datastore/postgres/ingest.go new file mode 100644 index 0000000..6664484 --- /dev/null +++ b/datastore/postgres/ingest.go @@ -0,0 +1,26 @@ +package postgres + +import ( + "context" + + "github.com/adelowo/sdump" + "github.com/uptrace/bun" +) + +type ingestRepository struct { + inner *bun.DB +} + +func NewIngestRepository(db *bun.DB) sdump.IngestRepository { + return &ingestRepository{ + inner: db, + } +} + +func (u *ingestRepository) Create(ctx context.Context, + model *sdump.IngestHTTPRequest, +) error { + _, err := bun.NewInsertQuery(u.inner).Model(model). + Exec(ctx) + return err +} diff --git a/datastore/postgres/ingest_test.go b/datastore/postgres/ingest_test.go new file mode 100644 index 0000000..9ae12e6 --- /dev/null +++ b/datastore/postgres/ingest_test.go @@ -0,0 +1,33 @@ +//go:build integration +// +build integration + +package postgres + +import ( + "context" + "testing" + + "github.com/adelowo/sdump" + "github.com/stretchr/testify/require" +) + +func TestIngestRepository_Create(t *testing.T) { + client, teardownFunc := setupDatabase(t) + defer teardownFunc() + + ingestStore := NewIngestRepository(client) + + urlStore := NewURLRepositoryTable(client) + + endpoint, err := urlStore.Get(context.Background(), &sdump.FindURLOptions{ + Reference: "cmltfm6g330l5l1vq110", // see fixtures/urls.yml + }) + require.NoError(t, err) + + require.NoError(t, ingestStore.Create(context.Background(), &sdump.IngestHTTPRequest{ + UrlID: endpoint.ID, + Request: sdump.RequestDefinition{ + Body: "{}", + }, + })) +} diff --git a/datastore/postgres/migrations/20240120165926_create_http_ingest_table.down.sql b/datastore/postgres/migrations/20240120165926_create_http_ingest_table.down.sql new file mode 100644 index 0000000..11fc8dc --- /dev/null +++ b/datastore/postgres/migrations/20240120165926_create_http_ingest_table.down.sql @@ -0,0 +1 @@ +DROP TABLE ingests; diff --git a/datastore/postgres/migrations/20240120165926_create_http_ingest_table.up.sql b/datastore/postgres/migrations/20240120165926_create_http_ingest_table.up.sql new file mode 100644 index 0000000..0850b1c --- /dev/null +++ b/datastore/postgres/migrations/20240120165926_create_http_ingest_table.up.sql @@ -0,0 +1,10 @@ +CREATE TABLE ingests ( + id uuid PRIMARY KEY DEFAULT uuid_generate_v4(), + url_id uuid NOT NULL REFERENCES urls(id), + + request jsonb NOT NULL DEFAULT '{}'::jsonb, + + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, + deleted_at TIMESTAMP WITH TIME ZONE +); diff --git a/datastore/postgres/url.go b/datastore/postgres/url.go index 9e53bca..12c6b1b 100644 --- a/datastore/postgres/url.go +++ b/datastore/postgres/url.go @@ -2,8 +2,11 @@ package postgres import ( "context" + "database/sql" + "errors" "github.com/adelowo/sdump" + "github.com/google/uuid" "github.com/uptrace/bun" ) @@ -24,3 +27,26 @@ func (u *urlRepositoryTable) Create(ctx context.Context, Exec(ctx) return err } + +func (u *urlRepositoryTable) Get(ctx context.Context, + opts *sdump.FindURLOptions, +) (*sdump.URLEndpoint, error) { + res := new(sdump.URLEndpoint) + + query := bun.NewSelectQuery(u.inner).Model(res) + + if opts.ID != uuid.Nil { + query = query.Where("id = ?", opts.ID) + } + + if opts.Reference != "" { + query = query.Where("reference = ?", opts.Reference) + } + + err := query.Scan(ctx, res) + if errors.Is(err, sql.ErrNoRows) { + return nil, sdump.ErrURLEndpointNotFound + } + + return res, err +} diff --git a/datastore/postgres/url_test.go b/datastore/postgres/url_test.go index 9d9f07a..f51fd7e 100644 --- a/datastore/postgres/url_test.go +++ b/datastore/postgres/url_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/adelowo/sdump" + "github.com/google/uuid" "github.com/stretchr/testify/require" ) @@ -19,3 +20,21 @@ func TestURLRepositoryTable_Create(t *testing.T) { require.NoError(t, urlStore.Create(context.Background(), sdump.NewURLEndpoint())) } + +func TestURLRepositoryTable_Get(t *testing.T) { + client, teardownFunc := setupDatabase(t) + defer teardownFunc() + + urlStore := NewURLRepositoryTable(client) + + _, err := urlStore.Get(context.Background(), &sdump.FindURLOptions{ + Reference: uuid.NewString(), + }) + require.Error(t, err) + require.Equal(t, err, sdump.ErrURLEndpointNotFound) + + _, err = urlStore.Get(context.Background(), &sdump.FindURLOptions{ + Reference: "cmltfm6g330l5l1vq110", // see fixtures/urls.yml + }) + require.NoError(t, err) +} diff --git a/ingest.go b/ingest.go new file mode 100644 index 0000000..2f773ac --- /dev/null +++ b/ingest.go @@ -0,0 +1,37 @@ +package sdump + +import ( + "context" + "net" + "net/http" + "time" + + "github.com/google/uuid" + "github.com/uptrace/bun" +) + +type RequestDefinition struct { + Body string `mapstructure:"body" json:"body,omitempty"` + Query string `json:"query,omitempty"` + Headers http.Header `json:"headers,omitempty"` + IPAddress net.IP `json:"ip_address,omitempty" bson:"ip_address"` + Size int64 `json:"size,omitempty"` +} + +type IngestHTTPRequest struct { + ID uuid.UUID `bun:"type:uuid,default:uuid_generate_v4()" json:"id,omitempty" mapstructure:"id"` + UrlID uuid.UUID `json:"url_id,omitempty"` + Request RequestDefinition `json:"request,omitempty"` + + // No need to store content type, it will always be application/json + + CreatedAt time.Time `bun:",nullzero,notnull,default:current_timestamp" json:"created_at,omitempty" bson:"created_at" mapstructure:"created_at"` + UpdatedAt time.Time `bun:",nullzero,notnull,default:current_timestamp" json:"updated_at,omitempty" bson:"updated_at" mapstructure:"updated_at"` + DeletedAt *time.Time `bun:",soft_delete,nullzero" json:"-,omitempty" bson:"deleted_at" mapstructure:"deleted_at"` + + bun.BaseModel `bun:"table:ingests"` +} + +type IngestRepository interface { + Create(context.Context, *IngestHTTPRequest) error +} diff --git a/internal/util/ip.go b/internal/util/ip.go new file mode 100644 index 0000000..15cab71 --- /dev/null +++ b/internal/util/ip.go @@ -0,0 +1,35 @@ +package util + +import ( + "net" + "net/http" + "strings" +) + +var ( + xForwardedFor = http.CanonicalHeaderKey("X-Forwarded-For") + xRealIP = http.CanonicalHeaderKey("X-Real-IP") +) + +func GetIP(r *http.Request) net.IP { + cloudflareIP := r.Header.Get("CF-Connecting-IP") + if cloudflareIP != "" { + return net.ParseIP(cloudflareIP) + } + + if xff := r.Header.Get(xForwardedFor); xff != "" { + i := strings.Index(xff, ", ") + + if i == -1 { + i = len(xff) + } + + return net.ParseIP(xff[:i]) + } + + if ip := r.Header.Get(xRealIP); ip != "" { + return net.ParseIP(ip) + } + + return net.ParseIP(r.RemoteAddr) +} diff --git a/server/httpd/http.go b/server/httpd/http.go index 0f21809..97653f4 100644 --- a/server/httpd/http.go +++ b/server/httpd/http.go @@ -13,10 +13,11 @@ import ( func New(cfg config.Config, urlRepo sdump.URLRepository, + ingestRepo sdump.IngestRepository, logger *logrus.Entry, ) *http.Server { return &http.Server{ - Handler: buildRoutes(cfg, logger, urlRepo), + Handler: buildRoutes(cfg, logger, urlRepo, ingestRepo), Addr: fmt.Sprintf(":%d", cfg.HTTP.Port), } } @@ -24,6 +25,7 @@ func New(cfg config.Config, func buildRoutes(cfg config.Config, logger *logrus.Entry, urlRepo sdump.URLRepository, + ingestRepo sdump.IngestRepository, ) http.Handler { router := chi.NewRouter() @@ -32,12 +34,14 @@ func buildRoutes(cfg config.Config, router.Use(writeRequestIDHeader) urlHandler := &urlHandler{ - cfg: cfg, - urlRepo: urlRepo, - logger: logger, + cfg: cfg, + urlRepo: urlRepo, + logger: logger, + ingestRepo: ingestRepo, } router.Post("/", urlHandler.create) + router.Post("/{reference}", urlHandler.ingest) return router } diff --git a/server/httpd/url.go b/server/httpd/url.go index e540d4a..198d0fa 100644 --- a/server/httpd/url.go +++ b/server/httpd/url.go @@ -1,19 +1,25 @@ package httpd import ( + "errors" "fmt" + "io" "net/http" + "strings" "github.com/adelowo/sdump" "github.com/adelowo/sdump/config" + "github.com/adelowo/sdump/internal/util" + "github.com/go-chi/chi/v5" "github.com/go-chi/render" "github.com/sirupsen/logrus" ) type urlHandler struct { - logger *logrus.Entry - urlRepo sdump.URLRepository - cfg config.Config + logger *logrus.Entry + urlRepo sdump.URLRepository + ingestRepo sdump.IngestRepository + cfg config.Config } func (u *urlHandler) create(w http.ResponseWriter, r *http.Request) { @@ -48,3 +54,62 @@ func (u *urlHandler) create(w http.ResponseWriter, r *http.Request) { }, }) } + +func (u *urlHandler) ingest(w http.ResponseWriter, r *http.Request) { + reference := chi.URLParam(r, "reference") + + logger := u.logger.WithField("request_id", retrieveRequestID(r)). + WithField("method", "urlHandler.ingest"). + WithField("reference", reference) + + logger.Debug("Ingesting http request") + + ctx := r.Context() + + endpoint, err := u.urlRepo.Get(ctx, &sdump.FindURLOptions{ + Reference: reference, + }) + if errors.Is(err, sdump.ErrURLEndpointNotFound) { + _ = render.Render(w, r, newAPIError(http.StatusNotFound, + "Dump url does not exist")) + return + } + + if err != nil { + logger.WithError(err).Error("could not find dump url by reference") + _ = render.Render(w, r, newAPIError(http.StatusInternalServerError, + "an error occurred while ingesting HTTP request")) + return + } + + s := &strings.Builder{} + + size, err := io.Copy(s, r.Body) + if err != nil { + logger.WithError(err).Error("could not copy request body") + _ = render.Render(w, r, newAPIError(http.StatusInternalServerError, + "could not copy request body")) + return + } + + ingestedRequest := &sdump.IngestHTTPRequest{ + UrlID: endpoint.ID, + Request: sdump.RequestDefinition{ + Body: s.String(), + Query: r.URL.Query().Encode(), + Headers: r.Header, + IPAddress: util.GetIP(r), + Size: size, + }, + } + + if err := u.ingestRepo.Create(ctx, ingestedRequest); err != nil { + logger.WithError(err).Error("could not ingest request") + _ = render.Render(w, r, newAPIError(http.StatusInternalServerError, + "an error occurred while ingesting request")) + return + } + + _ = render.Render(w, r, newAPIStatus(http.StatusAccepted, + "Request ingested")) +} diff --git a/url.go b/url.go index 3135099..8df9f23 100644 --- a/url.go +++ b/url.go @@ -9,6 +9,14 @@ import ( "github.com/uptrace/bun" ) +type appError string + +func (a appError) Error() string { return string(a) } + +const ( + ErrURLEndpointNotFound = appError("endpoint not found") +) + type URLEndpointMetadata struct{} type URLEndpoint struct { @@ -32,6 +40,12 @@ func NewURLEndpoint() *URLEndpoint { } } +type FindURLOptions struct { + Reference string + ID uuid.UUID +} + type URLRepository interface { Create(context.Context, *URLEndpoint) error + Get(context.Context, *FindURLOptions) (*URLEndpoint, error) }