Skip to content

Commit

Permalink
Implementation of requests ingestion (#4)
Browse files Browse the repository at this point in the history
* start working on ingesting and storing http requests

* ingest and store request data

* add database tests
  • Loading branch information
adelowo authored Jan 20, 2024
1 parent 42d5641 commit e285b5c
Show file tree
Hide file tree
Showing 12 changed files with 279 additions and 8 deletions.
3 changes: 2 additions & 1 deletion cmd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func createHTTPCommand(cmd *cobra.Command, cfg *config.Config) {
}

urlStore := postgres.NewURLRepositoryTable(db)
ingestStore := postgres.NewIngestRepository(db)

hostName, err := os.Hostname()
if err != nil {
Expand All @@ -49,7 +50,7 @@ func createHTTPCommand(cmd *cobra.Command, cfg *config.Config) {
logger := logrus.WithField("host", hostName).
WithField("module", "http.server")

httpServer := httpd.New(*cfg, urlStore, logger)
httpServer := httpd.New(*cfg, urlStore, ingestStore, logger)

go func() {
logger.Debug("starting HTTP server")
Expand Down
26 changes: 26 additions & 0 deletions datastore/postgres/ingest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package postgres

import (
"context"

"github.com/adelowo/sdump"
"github.com/uptrace/bun"
)

type ingestRepository struct {
inner *bun.DB
}

func NewIngestRepository(db *bun.DB) sdump.IngestRepository {
return &ingestRepository{
inner: db,
}
}

func (u *ingestRepository) Create(ctx context.Context,
model *sdump.IngestHTTPRequest,
) error {
_, err := bun.NewInsertQuery(u.inner).Model(model).
Exec(ctx)
return err
}
33 changes: 33 additions & 0 deletions datastore/postgres/ingest_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//go:build integration
// +build integration

package postgres

import (
"context"
"testing"

"github.com/adelowo/sdump"
"github.com/stretchr/testify/require"
)

func TestIngestRepository_Create(t *testing.T) {
client, teardownFunc := setupDatabase(t)
defer teardownFunc()

ingestStore := NewIngestRepository(client)

urlStore := NewURLRepositoryTable(client)

endpoint, err := urlStore.Get(context.Background(), &sdump.FindURLOptions{
Reference: "cmltfm6g330l5l1vq110", // see fixtures/urls.yml
})
require.NoError(t, err)

require.NoError(t, ingestStore.Create(context.Background(), &sdump.IngestHTTPRequest{
UrlID: endpoint.ID,
Request: sdump.RequestDefinition{
Body: "{}",
},
}))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE ingests;
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE TABLE ingests (
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
url_id uuid NOT NULL REFERENCES urls(id),

request jsonb NOT NULL DEFAULT '{}'::jsonb,

created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMP WITH TIME ZONE
);
26 changes: 26 additions & 0 deletions datastore/postgres/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package postgres

import (
"context"
"database/sql"
"errors"

"github.com/adelowo/sdump"
"github.com/google/uuid"
"github.com/uptrace/bun"
)

Expand All @@ -24,3 +27,26 @@ func (u *urlRepositoryTable) Create(ctx context.Context,
Exec(ctx)
return err
}

func (u *urlRepositoryTable) Get(ctx context.Context,
opts *sdump.FindURLOptions,
) (*sdump.URLEndpoint, error) {
res := new(sdump.URLEndpoint)

query := bun.NewSelectQuery(u.inner).Model(res)

if opts.ID != uuid.Nil {
query = query.Where("id = ?", opts.ID)
}

if opts.Reference != "" {
query = query.Where("reference = ?", opts.Reference)
}

err := query.Scan(ctx, res)
if errors.Is(err, sql.ErrNoRows) {
return nil, sdump.ErrURLEndpointNotFound
}

return res, err
}
19 changes: 19 additions & 0 deletions datastore/postgres/url_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"

"github.com/adelowo/sdump"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
)

Expand All @@ -19,3 +20,21 @@ func TestURLRepositoryTable_Create(t *testing.T) {

require.NoError(t, urlStore.Create(context.Background(), sdump.NewURLEndpoint()))
}

func TestURLRepositoryTable_Get(t *testing.T) {
client, teardownFunc := setupDatabase(t)
defer teardownFunc()

urlStore := NewURLRepositoryTable(client)

_, err := urlStore.Get(context.Background(), &sdump.FindURLOptions{
Reference: uuid.NewString(),
})
require.Error(t, err)
require.Equal(t, err, sdump.ErrURLEndpointNotFound)

_, err = urlStore.Get(context.Background(), &sdump.FindURLOptions{
Reference: "cmltfm6g330l5l1vq110", // see fixtures/urls.yml
})
require.NoError(t, err)
}
37 changes: 37 additions & 0 deletions ingest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package sdump

import (
"context"
"net"
"net/http"
"time"

"github.com/google/uuid"
"github.com/uptrace/bun"
)

type RequestDefinition struct {
Body string `mapstructure:"body" json:"body,omitempty"`
Query string `json:"query,omitempty"`
Headers http.Header `json:"headers,omitempty"`
IPAddress net.IP `json:"ip_address,omitempty" bson:"ip_address"`
Size int64 `json:"size,omitempty"`
}

type IngestHTTPRequest struct {
ID uuid.UUID `bun:"type:uuid,default:uuid_generate_v4()" json:"id,omitempty" mapstructure:"id"`
UrlID uuid.UUID `json:"url_id,omitempty"`
Request RequestDefinition `json:"request,omitempty"`

// No need to store content type, it will always be application/json

CreatedAt time.Time `bun:",nullzero,notnull,default:current_timestamp" json:"created_at,omitempty" bson:"created_at" mapstructure:"created_at"`
UpdatedAt time.Time `bun:",nullzero,notnull,default:current_timestamp" json:"updated_at,omitempty" bson:"updated_at" mapstructure:"updated_at"`
DeletedAt *time.Time `bun:",soft_delete,nullzero" json:"-,omitempty" bson:"deleted_at" mapstructure:"deleted_at"`

bun.BaseModel `bun:"table:ingests"`
}

type IngestRepository interface {
Create(context.Context, *IngestHTTPRequest) error
}
35 changes: 35 additions & 0 deletions internal/util/ip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package util

import (
"net"
"net/http"
"strings"
)

var (
xForwardedFor = http.CanonicalHeaderKey("X-Forwarded-For")
xRealIP = http.CanonicalHeaderKey("X-Real-IP")
)

func GetIP(r *http.Request) net.IP {
cloudflareIP := r.Header.Get("CF-Connecting-IP")
if cloudflareIP != "" {
return net.ParseIP(cloudflareIP)
}

if xff := r.Header.Get(xForwardedFor); xff != "" {
i := strings.Index(xff, ", ")

if i == -1 {
i = len(xff)
}

return net.ParseIP(xff[:i])
}

if ip := r.Header.Get(xRealIP); ip != "" {
return net.ParseIP(ip)
}

return net.ParseIP(r.RemoteAddr)
}
12 changes: 8 additions & 4 deletions server/httpd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@ import (

func New(cfg config.Config,
urlRepo sdump.URLRepository,
ingestRepo sdump.IngestRepository,
logger *logrus.Entry,
) *http.Server {
return &http.Server{
Handler: buildRoutes(cfg, logger, urlRepo),
Handler: buildRoutes(cfg, logger, urlRepo, ingestRepo),
Addr: fmt.Sprintf(":%d", cfg.HTTP.Port),
}
}

func buildRoutes(cfg config.Config,
logger *logrus.Entry,
urlRepo sdump.URLRepository,
ingestRepo sdump.IngestRepository,
) http.Handler {
router := chi.NewRouter()

Expand All @@ -32,12 +34,14 @@ func buildRoutes(cfg config.Config,
router.Use(writeRequestIDHeader)

urlHandler := &urlHandler{
cfg: cfg,
urlRepo: urlRepo,
logger: logger,
cfg: cfg,
urlRepo: urlRepo,
logger: logger,
ingestRepo: ingestRepo,
}

router.Post("/", urlHandler.create)
router.Post("/{reference}", urlHandler.ingest)

return router
}
Expand Down
71 changes: 68 additions & 3 deletions server/httpd/url.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
package httpd

import (
"errors"
"fmt"
"io"
"net/http"
"strings"

"github.com/adelowo/sdump"
"github.com/adelowo/sdump/config"
"github.com/adelowo/sdump/internal/util"
"github.com/go-chi/chi/v5"
"github.com/go-chi/render"
"github.com/sirupsen/logrus"
)

type urlHandler struct {
logger *logrus.Entry
urlRepo sdump.URLRepository
cfg config.Config
logger *logrus.Entry
urlRepo sdump.URLRepository
ingestRepo sdump.IngestRepository
cfg config.Config
}

func (u *urlHandler) create(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -48,3 +54,62 @@ func (u *urlHandler) create(w http.ResponseWriter, r *http.Request) {
},
})
}

func (u *urlHandler) ingest(w http.ResponseWriter, r *http.Request) {
reference := chi.URLParam(r, "reference")

logger := u.logger.WithField("request_id", retrieveRequestID(r)).
WithField("method", "urlHandler.ingest").
WithField("reference", reference)

logger.Debug("Ingesting http request")

ctx := r.Context()

endpoint, err := u.urlRepo.Get(ctx, &sdump.FindURLOptions{
Reference: reference,
})
if errors.Is(err, sdump.ErrURLEndpointNotFound) {
_ = render.Render(w, r, newAPIError(http.StatusNotFound,
"Dump url does not exist"))
return
}

if err != nil {
logger.WithError(err).Error("could not find dump url by reference")
_ = render.Render(w, r, newAPIError(http.StatusInternalServerError,
"an error occurred while ingesting HTTP request"))
return
}

s := &strings.Builder{}

size, err := io.Copy(s, r.Body)
if err != nil {
logger.WithError(err).Error("could not copy request body")
_ = render.Render(w, r, newAPIError(http.StatusInternalServerError,
"could not copy request body"))
return
}

ingestedRequest := &sdump.IngestHTTPRequest{
UrlID: endpoint.ID,
Request: sdump.RequestDefinition{
Body: s.String(),
Query: r.URL.Query().Encode(),
Headers: r.Header,
IPAddress: util.GetIP(r),
Size: size,
},
}

if err := u.ingestRepo.Create(ctx, ingestedRequest); err != nil {
logger.WithError(err).Error("could not ingest request")
_ = render.Render(w, r, newAPIError(http.StatusInternalServerError,
"an error occurred while ingesting request"))
return
}

_ = render.Render(w, r, newAPIStatus(http.StatusAccepted,
"Request ingested"))
}
14 changes: 14 additions & 0 deletions url.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ import (
"github.com/uptrace/bun"
)

type appError string

func (a appError) Error() string { return string(a) }

const (
ErrURLEndpointNotFound = appError("endpoint not found")
)

type URLEndpointMetadata struct{}

type URLEndpoint struct {
Expand All @@ -32,6 +40,12 @@ func NewURLEndpoint() *URLEndpoint {
}
}

type FindURLOptions struct {
Reference string
ID uuid.UUID
}

type URLRepository interface {
Create(context.Context, *URLEndpoint) error
Get(context.Context, *FindURLOptions) (*URLEndpoint, error)
}

0 comments on commit e285b5c

Please sign in to comment.