Skip to content

Commit

Permalink
✨ crawler
Browse files Browse the repository at this point in the history
  • Loading branch information
perebaj committed Jan 19, 2024
1 parent 1f7366d commit 8ed182d
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 85 deletions.
52 changes: 5 additions & 47 deletions cmd/newsletter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"log/slog"
"os"
"os/signal"
"sync"
"syscall"
"time"

Expand All @@ -17,10 +16,9 @@ import (

// Config is the struct that contains the configuration for the service.
type Config struct {
LogLevel string
LogType string
LoopDurationMinutes time.Duration
Mongo mongodb.Config
LogLevel string
LogType string
Mongo mongodb.Config
}

func main() {
Expand All @@ -31,7 +29,6 @@ func main() {
Mongo: mongodb.Config{
URI: getEnvWithDefault("NL_MONGO_URI", ""),
},
LoopDurationMinutes: time.Duration(10) * time.Second,
}

signalCh := make(chan os.Signal, 1)
Expand Down Expand Up @@ -74,49 +71,10 @@ func main() {
signalCh <- syscall.SIGTERM
}

URLCh := make(chan string)
fetchResultCh := make(chan string)

var wg sync.WaitGroup
wg.Add(5)

for i := 0; i < 5; i++ {
go newsletter.Worker(&wg, URLCh, fetchResultCh, newsletter.Fetch)
}

go func() {
defer close(URLCh)
for range time.Tick(cfg.LoopDurationMinutes) {
slog.Info("fetching engineers")
gotURLs, err := storage.DistinctEngineerURLs(ctx)
if err != nil {
slog.Error("error getting engineers", "error", err)
signalCh <- syscall.SIGTERM
}

slog.Info("fetched engineers", "engineers", len(gotURLs))
for _, url := range gotURLs {
URLCh <- url.(string)
}
}
}()

go func() {
wg.Wait()
defer close(fetchResultCh)
}()
crawler := newsletter.NewCrawler(5, time.Duration(10)*time.Second, signalCh)

go func() {
for v := range fetchResultCh {
slog.Info("saving fetched sites response", "response", v[:10])
err := storage.SaveSite(ctx, []mongodb.Site{
{Content: v, ScrapeDatetime: time.Now().UTC()},
})
if err != nil {
slog.Error("error saving site result", "error", err)
signalCh <- syscall.SIGTERM
}
}
crawler.Run(ctx, storage, newsletter.Fetch)
}()

<-signalCh
Expand Down
Binary file modified cmd/newsletter/newsletter
Binary file not shown.
112 changes: 100 additions & 12 deletions scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,112 @@ package newsletter

import (
"bytes"
"context"
"fmt"
"log/slog"
"net/http"
"os"
"sync"
"syscall"
"time"

"github.com/perebaj/newsletter/mongodb"
)

// PageContent is the struct that gather important information of a website
type PageContent struct {
Content string
URL string
}

// Storage is the interface that wraps the basic methods to save data and get data from the database
type Storage interface {
SaveSite(ctx context.Context, site []mongodb.Site) error
DistinctEngineerURLs(ctx context.Context) ([]interface{}, error)
}

// Crawler contains the necessary information to run the crawler
type Crawler struct {
URLch chan string
resultCh chan PageContent
signalCh chan os.Signal
MaxJobs int
wg *sync.WaitGroup
// scheduler is the pace time between each fetch
scheduler time.Duration
}

// NewCrawler initializes a new Crawler
func NewCrawler(maxJobs int, s time.Duration, signalCh chan os.Signal) *Crawler {
return &Crawler{
URLch: make(chan string),
resultCh: make(chan PageContent),
signalCh: signalCh,
wg: &sync.WaitGroup{},
MaxJobs: maxJobs,
scheduler: s,
}
}

// Run starts the crawler, s is the storage to save the results, f is the function to get the content of a url
func (c *Crawler) Run(ctx context.Context, s Storage, f func(string) (string, error)) {
c.wg.Add(c.MaxJobs)
for i := 0; i < c.MaxJobs; i++ {
go c.Worker(f)
}

go func() {
defer close(c.URLch)
for range time.Tick(c.scheduler) {
slog.Info("fetching engineers")
gotURLs, err := s.DistinctEngineerURLs(ctx)
if err != nil {
slog.Error("error getting engineers", "error", err)
c.signalCh <- syscall.SIGTERM
}

slog.Info("fetched engineers", "engineers", len(gotURLs))
for _, url := range gotURLs {
c.URLch <- url.(string)
}
}
}()

go func() {
c.wg.Wait()
defer close(c.resultCh)
}()

go func() {
for v := range c.resultCh {
slog.Info("saving fetched sites response")
err := s.SaveSite(ctx, []mongodb.Site{
{
URL: v.URL,
Content: v.Content,
ScrapeDatetime: time.Now().UTC(),
},
})
if err != nil {
slog.Error("error saving site result", "error", err)
c.signalCh <- syscall.SIGTERM
}
}
}()
}

// Worker use a worker pool to process jobs and send the restuls through a channel
func (c *Crawler) Worker(f func(string) (string, error)) {
defer c.wg.Done()
for url := range c.URLch {
content, err := f(url)
if err != nil {
slog.Error(fmt.Sprintf("error getting reference: %s", url), "error", err)
}
c.resultCh <- PageContent{Content: content, URL: url}
}
}

// Fetch returns the content of a url as a string
func Fetch(url string) (string, error) {
resp, err := http.Get(url)
Expand All @@ -35,15 +135,3 @@ func Fetch(url string) (string, error) {

return bodyString, nil
}

// Worker use a worker pool to process jobs and send the restuls through a channel
func Worker(wg *sync.WaitGroup, urls <-chan string, result chan<- string, f func(string) (string, error)) {
defer wg.Done()
for url := range urls {
content, err := f(url)
if err != nil {
slog.Error(fmt.Sprintf("error getting reference: %s", url), "error", err)
}
result <- content
}
}
65 changes: 39 additions & 26 deletions scrape_test.go
Original file line number Diff line number Diff line change
@@ -1,45 +1,38 @@
package newsletter

import (
"fmt"
"context"
"net/http"
"net/http/httptest"
"sync"
"os"
"testing"
"time"

"github.com/perebaj/newsletter/mongodb"
)

func TestWorker(_ *testing.T) {
urls := make(chan string)
results := make(chan string)
const fakeURL = "http://fakeurl.test"

f := func(s string) (string, error) {
time.Sleep(100 * time.Millisecond)
return fmt.Sprintf("job %s done", s), nil
}
func TestCrawlerRun(t *testing.T) {
timeoutCh := time.After(time.Duration(150) * time.Millisecond)
ctx := context.Background()
s := NewStorageMock()

var wg sync.WaitGroup
wg.Add(2)
go Worker(&wg, urls, results, f)
go Worker(&wg, urls, results, f)
f := func(string) (string, error) {
return "Hello, World!", nil
}

go func() {
urls <- "job1"
urls <- "job2"
urls <- "job3"
urls <- "job4"
urls <- "job5"
urls <- "job6"
defer close(urls)
}()
signalCh := make(chan os.Signal, 1)

c := NewCrawler(1, time.Duration(1000)*time.Millisecond, signalCh)
go func() {
wg.Wait()
defer close(results)
c.Run(ctx, s, f)
}()

for i := 0; i < 6; i++ {
<-results
select {
case <-signalCh:
t.Error("unexpected signal error")
case <-timeoutCh:
}
}

Expand Down Expand Up @@ -79,3 +72,23 @@ func TestGetReferences_Status500(t *testing.T) {
t.Errorf("expected empty body, got %s", got)
}
}

type StorageMock interface {
SaveSite(ctx context.Context, site []mongodb.Site) error
DistinctEngineerURLs(ctx context.Context) ([]interface{}, error)
}

type StorageMockImpl struct {
}

func NewStorageMock() StorageMock {
return StorageMockImpl{}
}

func (s StorageMockImpl) SaveSite(ctx context.Context, site []mongodb.Site) error {
return nil
}

func (s StorageMockImpl) DistinctEngineerURLs(ctx context.Context) ([]interface{}, error) {
return []interface{}{fakeURL}, nil
}

0 comments on commit 8ed182d

Please sign in to comment.