Skip to content

Commit

Permalink
✨ scrape comparation
Browse files Browse the repository at this point in the history
  • Loading branch information
perebaj committed Jan 21, 2024
1 parent cb825f4 commit 6d8cc42
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 90 deletions.
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@ coverage:
go test -coverprofile=c.out
go tool cover -html=c.out

## Run all tests including the integration tests (requires docker up and running). Usage `make integration-test` or `make integration-test testcase="TestFunctionName"` to run an isolated tests
.PHONY: integration-test
integration-test:
go test -timeout 5s -tags=integration ./... -v
if [ -n "$(testcase)" ]; then \
go test ./... -timeout 5s -tags integration -v -run="^$(testcase)$$" ; \
else \
go test ./... -timeout 5s -tags integration; \
fi

## builds the service
.PHONY: service
Expand Down
59 changes: 35 additions & 24 deletions mongodb/newsletter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
)

// Newsletter is the struct that gather what websites to scrape for an user email
Expand All @@ -22,12 +21,13 @@ type Engineer struct {
URL string `bson:"url"`
}

// Site is the struct that gather the scraped content of a website
type Site struct {
UserEmail string `bson:"user_email"`
// Page is the struct that gather the scraped content of a website
type Page struct {
URL string `bson:"url"`
Content string `bson:"content"`
ScrapeDatetime time.Time `bson:"scrape_date"`
HashMD5 [16]byte `bson:"hash_md5"`
IsMostRecent bool `bson:"is_most_recent"`
}

// SaveNewsletter saves a newsletter in the database
Expand Down Expand Up @@ -59,7 +59,7 @@ func (m *NLStorage) DistinctEngineerURLs(ctx context.Context) ([]interface{}, er

resp, err := collection.Distinct(ctx, "url", bson.M{})
if err != nil {
return nil, fmt.Errorf("error getting engineers: %w", err)
return nil, fmt.Errorf("error getting engineers: %v", err)
}

return resp, nil
Expand All @@ -82,13 +82,13 @@ func (m *NLStorage) Newsletter() ([]Newsletter, error) {
return newsletters, nil
}

// SaveSite saves a site in the database
func (m *NLStorage) SaveSite(ctx context.Context, sites []Site) error {
// SavePage saves a site in the database
func (m *NLStorage) SavePage(ctx context.Context, pages []Page) error {
database := m.client.Database(m.DBName)
collection := database.Collection("sites")
collection := database.Collection("pages")

var docs []interface{}
for _, site := range sites {
for _, site := range pages {
docs = append(docs, site)
}
_, err := collection.InsertMany(ctx, docs)
Expand All @@ -98,25 +98,36 @@ func (m *NLStorage) SaveSite(ctx context.Context, sites []Site) error {
return nil
}

// Sites returns given an user email and a URL, the last scraped content of that URL
func (m *NLStorage) Sites(usrEmail, URL string) ([]Site, error) {
// Page returns the last scraped content of a given url
func (m *NLStorage) Page(ctx context.Context, url string) ([]Page, error) {
var page []Page
database := m.client.Database(m.DBName)
collection := database.Collection("sites")
max := int64(2)

filter := bson.M{"user_email": usrEmail, "url": URL}
sort := bson.D{{Key: "scrape_date", Value: -1}}
opts := options.Find().SetSort(sort)
opts.Limit = &max
collection := database.Collection("pages")

pipeline := []bson.M{
{
"$match": bson.M{
"url": url,
},
},
{
"$sort": bson.M{
"scrape_date": -1,
},
},
{
"$limit": 1,
},
}

cursor, err := collection.Find(context.Background(), filter, opts)
cursor, err := collection.Aggregate(ctx, pipeline)
if err != nil {
return nil, err
return page, fmt.Errorf("error getting page: %v", err)
}

var sites []Site
if err = cursor.All(context.Background(), &sites); err != nil {
return nil, err
if err = cursor.All(ctx, &page); err != nil {
return page, fmt.Errorf("error decoding page: %v", err)
}
return sites, nil

return page, nil
}
98 changes: 59 additions & 39 deletions mongodb/newsletter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package mongodb

import (
"context"
"crypto/md5"
"fmt"
"os"
"reflect"
Expand Down Expand Up @@ -90,80 +91,99 @@ func TestNLStorageNewsletter(t *testing.T) {
t.Cleanup(teardown(ctx, client, DBName))
}

func TestNLStorageSaveSite(t *testing.T) {
func TestNLStorageSavePage(t *testing.T) {
ctx := context.Background()
client, DBName := setup(ctx, t)

database := client.Database(DBName)
collection := database.Collection("sites")
collection := database.Collection("pages")

want := []Site{
{UserEmail: "[email protected]", URL: "https://www.google.com", Content: "HTML", ScrapeDatetime: time.Date(2023, time.August, 14, 15, 30, 0, 0, time.UTC)},
{UserEmail: "[email protected]", URL: "https://www.google.com", Content: "HTML", ScrapeDatetime: time.Date(2023, time.August, 14, 15, 30, 0, 0, time.UTC)},
{UserEmail: "[email protected]", URL: "https://www.jj.com", Content: "HTML", ScrapeDatetime: time.Date(2023, time.August, 14, 15, 30, 0, 0, time.UTC)},
want := []Page{
{IsMostRecent: true, URL: "https://www.google.com", Content: "HTML", HashMD5: md5.Sum([]byte("HTML")), ScrapeDatetime: time.Date(2023, time.August, 13, 15, 30, 0, 0, time.UTC)},
}

NLStorage := NewNLStorage(client, DBName)
err := NLStorage.SaveSite(ctx, want)

storage := NewNLStorage(client, DBName)
err := storage.SavePage(ctx, want)
if err != nil {
t.Fatal("error saving site", err)
t.Fatal("error saving page", err)
}

var got []Site
var got []Page
cursor, err := collection.Find(context.Background(), bson.M{})
if err != nil {
t.Fatal("error finding site", err)
t.Fatal("error finding page", err)
}

if err := cursor.All(ctx, &got); err != nil {
t.Fatal("error decoding site", err)
t.Fatal("error decoding page", err)
}

if len(got) == 3 {
if !reflect.DeepEqual(got, want) {
t.Fatalf("got %v, want %v", got, want)
if len(got) == 1 {
if !reflect.DeepEqual(got[0], want[0]) {
t.Fatalf("got %v, want %v", got[0], want[0])
}
} else {
t.Fatal("expected 2 sites, got", len(got))
}
t.Fatal("expected 1 page, got", len(got))
}

// if len(page) == 0 {
// want[0].Updated = true
// err := NLStorage.SavePage(ctx, want)
// if err != nil {
// t.Fatal("error saving page", err)
// }
// } else {
// // Verifing if the new content is newest than the last one
// lastScrapedPage, err := NLStorage.Page(ctx, want[0].URL)
// if err != nil {
// t.Fatal("error getting page", err)
// }

// if lastScrapedPage[0].HashMD5 != want[0].HashMD5 {
// want[0].Updated = true
// err = NLStorage.SavePage(ctx, want)
// if err != nil {
// t.Fatal("error saving page", err)
// }
// } else {
// t.Fatal("page already scraped")
// want[0].Updated = false
// err = NLStorage.SavePage(ctx, want)
// if err != nil {
// t.Fatal("error saving page", err)
// }
// }

t.Cleanup(teardown(ctx, client, DBName))
}

func TestNLStorageSites(t *testing.T) {
func TestNLStoragePage(t *testing.T) {
ctx := context.Background()
client, DBName := setup(ctx, t)

want := []Site{
{UserEmail: "[email protected]", URL: "https://www.google.com", Content: "HTML", ScrapeDatetime: time.Date(2023, time.August, 13, 15, 30, 0, 0, time.UTC)},
{UserEmail: "[email protected]", URL: "https://www.google.com", Content: "HTML", ScrapeDatetime: time.Date(2023, time.August, 12, 15, 30, 0, 0, time.UTC)},
{UserEmail: "[email protected]", URL: "https://www.google.com", Content: "HTML", ScrapeDatetime: time.Date(2023, time.August, 11, 15, 30, 0, 0, time.UTC)},
want := []Page{
{URL: "https://www.google.com", Content: "HTML", ScrapeDatetime: time.Date(2023, time.August, 13, 15, 30, 0, 0, time.UTC)},
{URL: "https://www.google.com", Content: "HTML", ScrapeDatetime: time.Date(2023, time.August, 11, 15, 30, 0, 0, time.UTC)},
{URL: "https://www.google.com", Content: "HTML", ScrapeDatetime: time.Date(2023, time.August, 12, 15, 30, 0, 0, time.UTC)},
}

NLStorage := NewNLStorage(client, DBName)
err := NLStorage.SaveSite(ctx, want)
storage := NewNLStorage(client, DBName)
err := storage.SavePage(ctx, want)
if err != nil {
t.Fatal("error saving site", err)
t.Fatal("error saving page", err)
}

got, err := NLStorage.Sites("[email protected]", "https://www.google.com")
got, err := storage.Page(ctx, "https://www.google.com")
if err != nil {
t.Fatal("error getting site", err)
t.Fatal("error getting page", err)
}

if len(got) == 2 {
assert(t, got[0].UserEmail, want[0].UserEmail)
assert(t, got[0].URL, want[0].URL)
assert(t, got[0].Content, want[0].Content)
assert(t, got[0].ScrapeDatetime, want[0].ScrapeDatetime)

assert(t, got[1].UserEmail, want[1].UserEmail)
assert(t, got[1].URL, want[1].URL)
assert(t, got[1].Content, want[1].Content)
assert(t, got[1].ScrapeDatetime, want[1].ScrapeDatetime)
if len(got) == 1 {
if !reflect.DeepEqual(got[0], want[0]) {
t.Fatalf("got %v, want %v", got[0], want[0])
}
} else {
t.Fatal("expected 2 sites, got", len(got))
t.Fatal("expected 1 page, got", len(got))
}

t.Cleanup(teardown(ctx, client, DBName))
Expand Down
68 changes: 50 additions & 18 deletions scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package newsletter
import (
"bytes"
"context"
"crypto/md5"
"fmt"
"log/slog"
"net/http"
Expand All @@ -15,22 +16,24 @@ import (
"github.com/perebaj/newsletter/mongodb"
)

// PageContent is the struct that gather important information of a website
type PageContent struct {
Content string
URL string
// Page is the struct that gather important information of a website
type Page struct {
Content string
URL string
ScrapeDateTime time.Time
}

// Storage is the interface that wraps the basic methods to save and get data from the database
type Storage interface {
SaveSite(ctx context.Context, site []mongodb.Site) error
SavePage(ctx context.Context, site []mongodb.Page) error
DistinctEngineerURLs(ctx context.Context) ([]interface{}, error)
Page(ctx context.Context, url string) ([]mongodb.Page, error)
}

// Crawler contains the necessary information to run the crawler
type Crawler struct {
URLch chan string
resultCh chan PageContent
resultCh chan Page
signalCh chan os.Signal
MaxJobs int
wg *sync.WaitGroup
Expand All @@ -42,7 +45,7 @@ type Crawler struct {
func NewCrawler(maxJobs int, s time.Duration, signalCh chan os.Signal) *Crawler {
return &Crawler{
URLch: make(chan string),
resultCh: make(chan PageContent),
resultCh: make(chan Page),
signalCh: signalCh,
wg: &sync.WaitGroup{},
MaxJobs: maxJobs,
Expand All @@ -63,7 +66,7 @@ func (c *Crawler) Run(ctx context.Context, s Storage, f func(string) (string, er
slog.Debug("fetching engineers")
gotURLs, err := s.DistinctEngineerURLs(ctx)
if err != nil {
slog.Error("error getting engineers", "error", err)
slog.Error("error getting engineers: %v", err)
c.signalCh <- syscall.SIGTERM
}

Expand All @@ -80,23 +83,52 @@ func (c *Crawler) Run(ctx context.Context, s Storage, f func(string) (string, er
}()

go func() {
for v := range c.resultCh {
for r := range c.resultCh {
slog.Debug("saving fetched sites response")
err := s.SaveSite(ctx, []mongodb.Site{
{
URL: v.URL,
Content: v.Content,
ScrapeDatetime: time.Now().UTC(),
},
})

lastScrapedPage, err := s.Page(ctx, r.URL)
if err != nil {
slog.Error("error saving site result", "error", err)
slog.Error("error gerrting Page: %v", err)
c.signalCh <- syscall.SIGTERM
}

newPage := pageComparation(lastScrapedPage, r)

err = s.SavePage(ctx, newPage)
if err != nil {
slog.Error("error saving site result: %v", err)
c.signalCh <- syscall.SIGTERM
}
}
}()
}

// pageComparation verify if the content of a website has changed and assign the flag updated to true if it has changed or false otherwise.
func pageComparation(lastScrapedPage []mongodb.Page, recentScrapedPage Page) []mongodb.Page {
hashMD5 := md5.Sum([]byte(recentScrapedPage.Content))
newPage := []mongodb.Page{
{
URL: recentScrapedPage.URL,
Content: recentScrapedPage.Content,
ScrapeDatetime: recentScrapedPage.ScrapeDateTime,
HashMD5: hashMD5,
},
}

// If the page does not exist, it is the first time that the page is being scraped
// so it is considered the most recent version.
if len(lastScrapedPage) == 0 {
newPage[0].IsMostRecent = true
} else {
if lastScrapedPage[0].HashMD5 != hashMD5 {
newPage[0].IsMostRecent = true
} else {
newPage[0].IsMostRecent = false
}
}
return newPage
}

// Worker use a worker pool to process jobs and send the restuls through a channel
func (c *Crawler) Worker(f func(string) (string, error)) {
defer c.wg.Done()
Expand All @@ -105,7 +137,7 @@ func (c *Crawler) Worker(f func(string) (string, error)) {
if err != nil {
slog.Error(fmt.Sprintf("error getting reference: %s", url), "error", err)
}
c.resultCh <- PageContent{Content: content, URL: url}
c.resultCh <- Page{Content: content, URL: url, ScrapeDateTime: time.Now().UTC()}
}
}

Expand Down
Loading

0 comments on commit 6d8cc42

Please sign in to comment.