-
Notifications
You must be signed in to change notification settings - Fork 0
/
articleSupplier.go
106 lines (91 loc) · 2.61 KB
/
articleSupplier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package main
import (
beanstalk "github.com/JalfResi/gobeanstalk"
"gopkg.in/yaml.v2"
)
// ArticleURLSupplier interface
type ArticleURLSupplier interface {
GetArticleURL() *ArticleURL
Done(fu *ArticleURL)
}
// ArticleSupplier struct
type ArticleSupplier struct {
bsConn *beanstalk.Conn
minTTR int
}
// NewArticleSupplier constructor for ArticleSupplier
func NewArticleSupplier(bs *beanstalk.Conn, minTTR int, srcTube string) *ArticleSupplier {
fs := &ArticleSupplier{
bsConn: bs,
minTTR: minTTR,
}
fs.SetSrcTube(srcTube)
return fs
}
// SetSrcTube method
func (as *ArticleSupplier) SetSrcTube(srcTube string) {
// Watch our source tube or bail
_, err := as.bsConn.Watch(srcTube)
if err != nil {
logError.Fatalf("Could not watch tube %s: %v\n", srcTube, err)
}
}
// Done method
func (as *ArticleSupplier) Done(au *ArticleURL) {
_ = as.bsConn.Delete(au.job.ID)
}
// Retry method
func (as *ArticleSupplier) Retry(au *ArticleURL) {
_ = as.bsConn.Release(au.job.ID, 1, 0)
}
// GetArticleURL method
func (as *ArticleSupplier) GetArticleURL() *ArticleURL {
for {
job, err := as.bsConn.Reserve()
if err != nil {
logError.Fatal(err)
}
// First we check the TTR of the job
// if it is lower than *timeout, we
// put it back in the tube with an
// increased TTR. This ensures that
// all jobs we deal with can be dealt
// within a sensible timeframe, otherwise
// the job will keep failing and will be
// automatically reclaimed by beanstalk
stats := as.getJobTTR(job)
if stats.TTR < as.minTTR {
as.increaseJobTTR(job, stats, as.minTTR)
logError.Printf("Increased job %d TTR to %d from %d\n", job.ID, as.minTTR, stats.TTR)
continue
}
au, err := NewArticleURL(job, stats)
if err != nil {
_ = as.bsConn.Bury(job.ID, 1)
logError.Printf("Bad Article URL format; burying: %s\n", err)
continue
}
logInfo.Printf("Article URL: %s\n", au)
return au
}
}
// getJobTTR method
func (as *ArticleSupplier) getJobTTR(job *beanstalk.Job) *StatsJob {
rawJobStats, err := as.bsConn.StatsJob(job.ID)
if err != nil {
logError.Fatalf("Job %d StatsJob failed: %s\n", job.ID, err)
}
statsJob := StatsJob{}
err = yaml.Unmarshal(rawJobStats, &statsJob)
if err != nil {
logError.Fatalf("Job %d yaml: %s\n", job.ID, err)
}
return &statsJob
}
// NOTE:
// Uses globals! Naughty!
func (as *ArticleSupplier) increaseJobTTR(job *beanstalk.Job, stats *StatsJob, newTTR int) {
_ = as.bsConn.Use(config.srcTube)
_, _ = as.bsConn.PutUnique(job.Body, stats.Pri, 1, newTTR) // We can set the delay to 1 because the delay is already up and will be reset when we crawl the feed
_ = as.bsConn.Delete(job.ID)
}