Skip to content
This repository has been archived by the owner on Feb 29, 2024. It is now read-only.

Commit

Permalink
Merge pull request #49 from ties/feature/etags
Browse files Browse the repository at this point in the history
Use ETags + if-modified-since to reduce number of full requests
  • Loading branch information
lspgn authored Mar 3, 2020
2 parents e407106 + 156a388 commit b3e1f0f
Showing 1 changed file with 112 additions and 21 deletions.
133 changes: 112 additions & 21 deletions cmd/gortr/gortr.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ var (
PublicKey = flag.String("verify.key", "cf.pub", "Public key path (PEM file)")

CacheBin = flag.String("cache", "https://rpki.cloudflare.com/rpki.json", "URL of the cached JSON data")
Etag = flag.Bool("etag", true, "Enable Etag header")
UserAgent = flag.String("useragent", fmt.Sprintf("Cloudflare-%v (+https://github.com/cloudflare/gortr)", AppVersion), "User-Agent header")
RefreshInterval = flag.Int("refresh", 600, "Refresh interval in seconds")
MaxConn = flag.Int("maxconn", 0, "Max simultaneous connections (0 to disable limit)")
Expand All @@ -96,10 +97,24 @@ var (
LastRefresh = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "rpki_refresh",
Help: "Last refresh.",
Help: "Last successfull request for the given URL.",
},
[]string{"path"},
)
LastChange = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "rpki_change",
Help: "Last change.",
},
[]string{"path"},
)
RefreshStatusCode = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "refresh_requests_total",
Help: "Total number of HTTP requests by status code",
},
[]string{"path", "code"},
)
ClientsMetric = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "rtr_clients",
Expand Down Expand Up @@ -128,7 +143,9 @@ var (

func initMetrics() {
prometheus.MustRegister(NumberOfROAs)
prometheus.MustRegister(LastChange)
prometheus.MustRegister(LastRefresh)
prometheus.MustRegister(RefreshStatusCode)
prometheus.MustRegister(ClientsMetric)
prometheus.MustRegister(PDUsRecv)
}
Expand All @@ -138,7 +155,7 @@ func metricHTTP() {
log.Fatal(http.ListenAndServe(*MetricsAddr, nil))
}

func fetchFile(file string, ua string) ([]byte, error) {
func (s *state) fetchFile(file string) ([]byte, error) {
var f io.Reader
var err error
if len(file) > 8 && (file[0:7] == "http://" || file[0:8] == "https://") {
Expand All @@ -159,13 +176,18 @@ func fetchFile(file string, ua string) ([]byte, error) {
ProxyConnectHeader: map[string][]string{},
}
// Keep User-Agent in proxy request
tr.ProxyConnectHeader.Set("User-Agent", ua)
tr.ProxyConnectHeader.Set("User-Agent", s.userAgent)

client := &http.Client{Transport: tr}
req, err := http.NewRequest("GET", file, nil)
req.Header.Set("User-Agent", ua)
req.Header.Set("User-Agent", s.userAgent)
req.Header.Set("Accept", "text/json")

etag, ok := s.etags[file]
if s.enableEtags && ok {
req.Header.Set("If-None-Match", etag)
}

proxyurl, err := http.ProxyFromEnvironment(req)
if err != nil {
return nil, err
Expand All @@ -181,16 +203,41 @@ func fetchFile(file string, ua string) ([]byte, error) {
if err != nil {
return nil, err
}

RefreshStatusCode.WithLabelValues(file, fmt.Sprintf("%d", fhttp.StatusCode)).Inc()

if fhttp.StatusCode == 304 {
LastRefresh.WithLabelValues(file).Set(float64(s.lastts.UnixNano() / 1e9))
return nil, HttpNotModified{
File: file,
}
} else if fhttp.StatusCode != 200 {
delete(s.etags, file)
return nil, fmt.Errorf("HTTP %s", fhttp.Status)
}
LastRefresh.WithLabelValues(file).Set(float64(s.lastts.UnixNano() / 1e9))

f = fhttp.Body

newEtag := fhttp.Header.Get("ETag")

if !s.enableEtags || newEtag == "" || newEtag != s.etags[file] {
s.etags[file] = newEtag
} else {
return nil, IdenticalEtag{
File: file,
Etag: newEtag,
}
}
} else {
f, err = os.Open(file)
if err != nil {
return nil, err
}
}
data, err2 := ioutil.ReadAll(f)
if err2 != nil {
return nil, err2
data, err := ioutil.ReadAll(f)
if err != nil {
return nil, err
}
return data, nil
}
Expand Down Expand Up @@ -236,7 +283,7 @@ func processData(roalistjson []prefixfile.ROAJson) ([]rtr.ROA, int, int, int) {
countv6++
}

key := fmt.Sprintf("%v,%v,%v", prefix, asn, v.Length)
key := fmt.Sprintf("%s,%d,%d", prefix, asn, v.Length)
_, exists := filterDuplicates[key]
if !exists {
filterDuplicates[key] = true
Expand All @@ -259,14 +306,32 @@ type IdenticalFile struct {
}

func (e IdenticalFile) Error() string {
return fmt.Sprintf("File %v is identical to the previous version", e.File)
return fmt.Sprintf("File %s is identical to the previous version", e.File)
}

type HttpNotModified struct {
File string
}

func (e HttpNotModified) Error() string {
return fmt.Sprintf("HTTP 304 Not modified for %s", e.File)
}

type IdenticalEtag struct {
File string
Etag string
}

func (e IdenticalEtag) Error() string {
return fmt.Sprintf("File %s is identical according to Etag: %s", e.File, e.Etag)
}

func (s *state) updateFile(file string) error {
log.Debugf("Refreshing cache from %v", file)
data, err := fetchFile(file, s.userAgent)
log.Debugf("Refreshing cache from %s", file)

s.lastts = time.Now().UTC()
data, err := s.fetchFile(file)
if err != nil {
log.Error(err)
return err
}
hsum, _ := checkFile(data)
Expand All @@ -277,7 +342,7 @@ func (s *state) updateFile(file string) error {
}
}

s.lastts = time.Now().UTC()
s.lastchange = time.Now().UTC()
s.lastdata = data

roalistjson, err := decodeJSON(s.lastdata)
Expand All @@ -291,7 +356,6 @@ func (s *state) updateFile(file string) error {
return errors.New(fmt.Sprintf("File is expired: %v", validtime))
}
}

if s.verify {
log.Debugf("Verifying signature in %v", file)
if roalistjson.Metadata.SignatureDate == "" || roalistjson.Metadata.Signature == "" {
Expand Down Expand Up @@ -342,6 +406,7 @@ func (s *state) updateFile(file string) error {
if err != nil {
return err
}

log.Infof("New update (%v uniques, %v total prefixes). %v bytes. Updating sha256 hash %x -> %x",
len(roas), count, len(s.lastconverted), s.lasthash, hsum)
s.lasthash = hsum
Expand All @@ -366,16 +431,15 @@ func (s *state) updateFile(file string) error {
countv6_dup++
}
}
s.metricsEvent.UpdateMetrics(countv4, countv6, countv4_dup, countv6_dup, s.lastts, file)
s.metricsEvent.UpdateMetrics(countv4, countv6, countv4_dup, countv6_dup, s.lastchange, s.lastts, file)
}
return nil
}

func (s *state) updateSlurm(file string) error {
log.Debugf("Refreshing slurm from %v", file)
data, err := fetchFile(file, s.userAgent)
data, err := s.fetchFile(file)
if err != nil {
log.Error(err)
return err
}

Expand Down Expand Up @@ -404,12 +468,23 @@ func (s *state) routineUpdate(file string, interval int, slurmFile string) {
if slurmFile != "" {
err := s.updateSlurm(slurmFile)
if err != nil {
log.Errorf("Slurm: %v", err)
switch err.(type) {
case HttpNotModified:
log.Info(err)
case IdenticalEtag:
log.Info(err)
default:
log.Errorf("Slurm: %v", err)
}
}
}
err := s.updateFile(file)
if err != nil {
switch err.(type) {
case HttpNotModified:
log.Info(err)
case IdenticalEtag:
log.Info(err)
case IdenticalFile:
log.Info(err)
default:
Expand All @@ -431,9 +506,12 @@ type state struct {
lastdata []byte
lastconverted []byte
lasthash []byte
lastchange time.Time
lastts time.Time
sendNotifs bool
userAgent string
etags map[string]string
enableEtags bool

server *rtr.Server

Expand Down Expand Up @@ -471,12 +549,12 @@ func (m *metricsEvent) HandlePDU(c *rtr.Client, pdu rtr.PDU) {
"_", -1))).Inc()
}

func (m *metricsEvent) UpdateMetrics(numIPv4 int, numIPv6 int, numIPv4filtered int, numIPv6filtered int, refreshed time.Time, file string) {
func (m *metricsEvent) UpdateMetrics(numIPv4 int, numIPv6 int, numIPv4filtered int, numIPv6filtered int, changed time.Time, refreshed time.Time, file string) {
NumberOfROAs.WithLabelValues("ipv4", "filtered", file).Set(float64(numIPv4filtered))
NumberOfROAs.WithLabelValues("ipv4", "unfiltered", file).Set(float64(numIPv4))
NumberOfROAs.WithLabelValues("ipv6", "filtered", file).Set(float64(numIPv6filtered))
NumberOfROAs.WithLabelValues("ipv6", "unfiltered", file).Set(float64(numIPv6))
LastRefresh.WithLabelValues(file).Set(float64(refreshed.UnixNano() / 1e9))
LastChange.WithLabelValues(file).Set(float64(changed.UnixNano() / 1e9))
}

func ReadPublicKey(key []byte, isPem bool) (*ecdsa.PublicKey, error) {
Expand Down Expand Up @@ -563,6 +641,8 @@ func main() {
verify: *Verify,
checktime: *TimeCheck,
userAgent: *UserAgent,
etags: make(map[string]string),
enableEtags: *Etag,
lockJson: &sync.RWMutex{},
}

Expand Down Expand Up @@ -708,7 +788,14 @@ func main() {
if slurmFile != "" {
err := s.updateSlurm(slurmFile)
if err != nil {
log.Errorf("Slurm: %v", err)
switch err.(type) {
case HttpNotModified:
log.Info(err)
case IdenticalEtag:
log.Info(err)
default:
log.Errorf("Slurm: %v", err)
}
}
if !*SlurmRefresh {
slurmFile = ""
Expand All @@ -718,8 +805,12 @@ func main() {
err := s.updateFile(*CacheBin)
if err != nil {
switch err.(type) {
case HttpNotModified:
log.Info(err)
case IdenticalFile:
log.Info(err)
case IdenticalEtag:
log.Info(err)
default:
log.Errorf("Error updating: %v", err)
}
Expand Down

0 comments on commit b3e1f0f

Please sign in to comment.