-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.go
92 lines (81 loc) · 2.32 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package main
import (
beanstalk "github.com/JalfResi/gobeanstalk"
)
// WorkerConfig struct
type WorkerConfig struct {
srcTube string
destTube string
beanstalkdHost string
memcachedbHost string
maxRetryAttempts uint64
timeout int
mysqlHost string
mysqlUsername string
mysqlPassword string
}
// Worker chan
type Worker chan struct{}
// DoWork does the following:
// - Pulls a URL out of the srcTube
// - Makes a GET/POST request to textrazor
// - Stores the results in MySQL
// - Deletes the job from Beanstalk
// - Creates a new job in destTube with the URL
//
// So we will need:
// - An ArticleSupplier to read article urls from the queue
// - An ArticleURL to represent an article url
// - An ReportRecorder to store the returned TextRazor report
// - An ArticleAnalyser to contact TextRazor and return a TextRazor report
// -
//
func (w Worker) DoWork(c *WorkerConfig) {
// The following is a worker
// Connect to beanstalkd
bs, err := beanstalk.Dial(c.beanstalkdHost)
if err != nil {
logError.Fatalf("Beanstalk connect failed: %s\n", err)
}
as := NewArticleSupplier(bs, c.timeout, c.srcTube)
aa := NewAnalyser(config.textRazorAPIKey)
rr := NewReportRecorder(c.mysqlHost, c.mysqlUsername, c.mysqlPassword)
for {
article := as.GetArticleURL()
report, err := aa.Analyse(article)
if err != nil {
if err == ErrRequestLimitMet {
logError.Printf("%s: %d\n", err, config.totalRequestLimit)
w.DieGracefully()
// should possibly wait until the next day
// reset the config.currentRequestCount and
// start up the number of workers to continue
// for the next day?
//
// What do we do about articleUrls that may be building up in
// beanstalkd? Should we bin off old (i.e. yesterday)
// article urls to deal with the backlog? Maybe we can
// bury them? (but then how do we deal with the bury list?)
return
}
if err.Error() == "Unauthorized" {
as.Retry(article)
logInfo.Printf("Got '%s' from TextRazor. Retrying\n", err)
continue
}
logError.Printf("%+v %T\n", err, err)
// Possibly bury continuinly failing jobs?
as.Done(article)
continue
}
as.Done(article)
err = rr.StoreTopics(report)
if err != nil {
logError.Println(err)
}
}
}
// DieGracefully method
func (w Worker) DieGracefully() {
close(w)
}