From 156a388f650eb1654a94ed3392650e0837f56a36 Mon Sep 17 00:00:00 2001 From: Ties de Kock Date: Sun, 12 Jan 2020 15:42:37 +0100 Subject: [PATCH] Add ETag/If-Not-Modified support * Send If-Not-Modified header, save ETags. * Add metrics for HTTP response code and last request timestamp. --- cmd/gortr/gortr.go | 133 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 112 insertions(+), 21 deletions(-) diff --git a/cmd/gortr/gortr.go b/cmd/gortr/gortr.go index ab36254..9f9cd8e 100644 --- a/cmd/gortr/gortr.go +++ b/cmd/gortr/gortr.go @@ -75,6 +75,7 @@ var ( PublicKey = flag.String("verify.key", "cf.pub", "Public key path (PEM file)") CacheBin = flag.String("cache", "https://rpki.cloudflare.com/rpki.json", "URL of the cached JSON data") + Etag = flag.Bool("etag", true, "Enable Etag header") UserAgent = flag.String("useragent", fmt.Sprintf("Cloudflare-%v (+https://github.com/cloudflare/gortr)", AppVersion), "User-Agent header") RefreshInterval = flag.Int("refresh", 600, "Refresh interval in seconds") MaxConn = flag.Int("maxconn", 0, "Max simultaneous connections (0 to disable limit)") @@ -96,10 +97,24 @@ var ( LastRefresh = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "rpki_refresh", - Help: "Last refresh.", + Help: "Last successfull request for the given URL.", }, []string{"path"}, ) + LastChange = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "rpki_change", + Help: "Last change.", + }, + []string{"path"}, + ) + RefreshStatusCode = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "refresh_requests_total", + Help: "Total number of HTTP requests by status code", + }, + []string{"path", "code"}, + ) ClientsMetric = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "rtr_clients", @@ -128,7 +143,9 @@ var ( func initMetrics() { prometheus.MustRegister(NumberOfROAs) + prometheus.MustRegister(LastChange) prometheus.MustRegister(LastRefresh) + prometheus.MustRegister(RefreshStatusCode) prometheus.MustRegister(ClientsMetric) prometheus.MustRegister(PDUsRecv) } @@ -138,7 +155,7 @@ func metricHTTP() { log.Fatal(http.ListenAndServe(*MetricsAddr, nil)) } -func fetchFile(file string, ua string) ([]byte, error) { +func (s *state) fetchFile(file string) ([]byte, error) { var f io.Reader var err error if len(file) > 8 && (file[0:7] == "http://" || file[0:8] == "https://") { @@ -159,13 +176,18 @@ func fetchFile(file string, ua string) ([]byte, error) { ProxyConnectHeader: map[string][]string{}, } // Keep User-Agent in proxy request - tr.ProxyConnectHeader.Set("User-Agent", ua) + tr.ProxyConnectHeader.Set("User-Agent", s.userAgent) client := &http.Client{Transport: tr} req, err := http.NewRequest("GET", file, nil) - req.Header.Set("User-Agent", ua) + req.Header.Set("User-Agent", s.userAgent) req.Header.Set("Accept", "text/json") + etag, ok := s.etags[file] + if s.enableEtags && ok { + req.Header.Set("If-None-Match", etag) + } + proxyurl, err := http.ProxyFromEnvironment(req) if err != nil { return nil, err @@ -181,16 +203,41 @@ func fetchFile(file string, ua string) ([]byte, error) { if err != nil { return nil, err } + + RefreshStatusCode.WithLabelValues(file, fmt.Sprintf("%d", fhttp.StatusCode)).Inc() + + if fhttp.StatusCode == 304 { + LastRefresh.WithLabelValues(file).Set(float64(s.lastts.UnixNano() / 1e9)) + return nil, HttpNotModified{ + File: file, + } + } else if fhttp.StatusCode != 200 { + delete(s.etags, file) + return nil, fmt.Errorf("HTTP %s", fhttp.Status) + } + LastRefresh.WithLabelValues(file).Set(float64(s.lastts.UnixNano() / 1e9)) + f = fhttp.Body + + newEtag := fhttp.Header.Get("ETag") + + if !s.enableEtags || newEtag == "" || newEtag != s.etags[file] { + s.etags[file] = newEtag + } else { + return nil, IdenticalEtag{ + File: file, + Etag: newEtag, + } + } } else { f, err = os.Open(file) if err != nil { return nil, err } } - data, err2 := ioutil.ReadAll(f) - if err2 != nil { - return nil, err2 + data, err := ioutil.ReadAll(f) + if err != nil { + return nil, err } return data, nil } @@ -236,7 +283,7 @@ func processData(roalistjson []prefixfile.ROAJson) ([]rtr.ROA, int, int, int) { countv6++ } - key := fmt.Sprintf("%v,%v,%v", prefix, asn, v.Length) + key := fmt.Sprintf("%s,%d,%d", prefix, asn, v.Length) _, exists := filterDuplicates[key] if !exists { filterDuplicates[key] = true @@ -259,14 +306,32 @@ type IdenticalFile struct { } func (e IdenticalFile) Error() string { - return fmt.Sprintf("File %v is identical to the previous version", e.File) + return fmt.Sprintf("File %s is identical to the previous version", e.File) +} + +type HttpNotModified struct { + File string +} + +func (e HttpNotModified) Error() string { + return fmt.Sprintf("HTTP 304 Not modified for %s", e.File) +} + +type IdenticalEtag struct { + File string + Etag string +} + +func (e IdenticalEtag) Error() string { + return fmt.Sprintf("File %s is identical according to Etag: %s", e.File, e.Etag) } func (s *state) updateFile(file string) error { - log.Debugf("Refreshing cache from %v", file) - data, err := fetchFile(file, s.userAgent) + log.Debugf("Refreshing cache from %s", file) + + s.lastts = time.Now().UTC() + data, err := s.fetchFile(file) if err != nil { - log.Error(err) return err } hsum, _ := checkFile(data) @@ -277,7 +342,7 @@ func (s *state) updateFile(file string) error { } } - s.lastts = time.Now().UTC() + s.lastchange = time.Now().UTC() s.lastdata = data roalistjson, err := decodeJSON(s.lastdata) @@ -291,7 +356,6 @@ func (s *state) updateFile(file string) error { return errors.New(fmt.Sprintf("File is expired: %v", validtime)) } } - if s.verify { log.Debugf("Verifying signature in %v", file) if roalistjson.Metadata.SignatureDate == "" || roalistjson.Metadata.Signature == "" { @@ -342,6 +406,7 @@ func (s *state) updateFile(file string) error { if err != nil { return err } + log.Infof("New update (%v uniques, %v total prefixes). %v bytes. Updating sha256 hash %x -> %x", len(roas), count, len(s.lastconverted), s.lasthash, hsum) s.lasthash = hsum @@ -366,16 +431,15 @@ func (s *state) updateFile(file string) error { countv6_dup++ } } - s.metricsEvent.UpdateMetrics(countv4, countv6, countv4_dup, countv6_dup, s.lastts, file) + s.metricsEvent.UpdateMetrics(countv4, countv6, countv4_dup, countv6_dup, s.lastchange, s.lastts, file) } return nil } func (s *state) updateSlurm(file string) error { log.Debugf("Refreshing slurm from %v", file) - data, err := fetchFile(file, s.userAgent) + data, err := s.fetchFile(file) if err != nil { - log.Error(err) return err } @@ -404,12 +468,23 @@ func (s *state) routineUpdate(file string, interval int, slurmFile string) { if slurmFile != "" { err := s.updateSlurm(slurmFile) if err != nil { - log.Errorf("Slurm: %v", err) + switch err.(type) { + case HttpNotModified: + log.Info(err) + case IdenticalEtag: + log.Info(err) + default: + log.Errorf("Slurm: %v", err) + } } } err := s.updateFile(file) if err != nil { switch err.(type) { + case HttpNotModified: + log.Info(err) + case IdenticalEtag: + log.Info(err) case IdenticalFile: log.Info(err) default: @@ -431,9 +506,12 @@ type state struct { lastdata []byte lastconverted []byte lasthash []byte + lastchange time.Time lastts time.Time sendNotifs bool userAgent string + etags map[string]string + enableEtags bool server *rtr.Server @@ -471,12 +549,12 @@ func (m *metricsEvent) HandlePDU(c *rtr.Client, pdu rtr.PDU) { "_", -1))).Inc() } -func (m *metricsEvent) UpdateMetrics(numIPv4 int, numIPv6 int, numIPv4filtered int, numIPv6filtered int, refreshed time.Time, file string) { +func (m *metricsEvent) UpdateMetrics(numIPv4 int, numIPv6 int, numIPv4filtered int, numIPv6filtered int, changed time.Time, refreshed time.Time, file string) { NumberOfROAs.WithLabelValues("ipv4", "filtered", file).Set(float64(numIPv4filtered)) NumberOfROAs.WithLabelValues("ipv4", "unfiltered", file).Set(float64(numIPv4)) NumberOfROAs.WithLabelValues("ipv6", "filtered", file).Set(float64(numIPv6filtered)) NumberOfROAs.WithLabelValues("ipv6", "unfiltered", file).Set(float64(numIPv6)) - LastRefresh.WithLabelValues(file).Set(float64(refreshed.UnixNano() / 1e9)) + LastChange.WithLabelValues(file).Set(float64(changed.UnixNano() / 1e9)) } func ReadPublicKey(key []byte, isPem bool) (*ecdsa.PublicKey, error) { @@ -563,6 +641,8 @@ func main() { verify: *Verify, checktime: *TimeCheck, userAgent: *UserAgent, + etags: make(map[string]string), + enableEtags: *Etag, lockJson: &sync.RWMutex{}, } @@ -708,7 +788,14 @@ func main() { if slurmFile != "" { err := s.updateSlurm(slurmFile) if err != nil { - log.Errorf("Slurm: %v", err) + switch err.(type) { + case HttpNotModified: + log.Info(err) + case IdenticalEtag: + log.Info(err) + default: + log.Errorf("Slurm: %v", err) + } } if !*SlurmRefresh { slurmFile = "" @@ -718,8 +805,12 @@ func main() { err := s.updateFile(*CacheBin) if err != nil { switch err.(type) { + case HttpNotModified: + log.Info(err) case IdenticalFile: log.Info(err) + case IdenticalEtag: + log.Info(err) default: log.Errorf("Error updating: %v", err) }