From acbf96e375b9016b98c75623e72c38aadc6ac7ee Mon Sep 17 00:00:00 2001 From: Ties de Kock Date: Sun, 1 Aug 2021 16:26:30 +0200 Subject: [PATCH] Refactor to update state when cache or SLURM changes Before this commit the new stayrtr state was only recalculated when the validated cache changed. This commit refactors the update loop and separates the actual update from pulling the updated cache. The update is triggered when the SLURM file changes _or_ when the validated cache changes. Should fix https://github.com/cloudflare/gortr/issues/95 and make further changes that trigger an update when an update in the cache has expired easier. --- cmd/stayrtr/stayrtr.go | 117 +++++++++++++++++++++++------------------ 1 file changed, 66 insertions(+), 51 deletions(-) diff --git a/cmd/stayrtr/stayrtr.go b/cmd/stayrtr/stayrtr.go index 2d032a7..ea63ac7 100644 --- a/cmd/stayrtr/stayrtr.go +++ b/cmd/stayrtr/stayrtr.go @@ -248,41 +248,12 @@ func (e IdenticalFile) Error() string { return fmt.Sprintf("File %s is identical to the previous version", e.File) } -func (s *state) updateFile(file string) error { +// Update the state based on the current slurm file and data. +func (s *state) updateFromNewState() error { sessid, _ := s.server.GetSessionId(nil) - log.Debugf("Refreshing cache from %s", file) - - s.lastts = time.Now().UTC() - data, code, lastrefresh, err := s.fetchConfig.FetchFile(file) - if err != nil { - return err - } - if lastrefresh { - LastRefresh.WithLabelValues(file).Set(float64(s.lastts.UnixNano() / 1e9)) - } - if code != -1 { - RefreshStatusCode.WithLabelValues(file, fmt.Sprintf("%d", code)).Inc() - } - - hsum, _ := checkFile(data) - if s.lasthash != nil { - cres := bytes.Compare(s.lasthash, hsum) - if cres == 0 { - return IdenticalFile{File: file} - } - } - - s.lastchange = time.Now().UTC() - s.lastdata = data - - vrplistjson, err := decodeJSON(s.lastdata) - if err != nil { - return err - } - if s.checktime { - buildtime, err := time.Parse(time.RFC3339, vrplistjson.Metadata.Buildtime) + buildtime, err := time.Parse(time.RFC3339, s.lastdata.Metadata.Buildtime) if err != nil { return err } @@ -292,7 +263,7 @@ func (s *state) updateFile(file string) error { } } - vrpsjson := vrplistjson.Data + vrpsjson := s.lastdata.Data if s.slurm != nil { kept, removed := s.slurm.FilterOnVRPs(vrpsjson) asserted := s.slurm.AssertVRPs() @@ -301,13 +272,9 @@ func (s *state) updateFile(file string) error { } vrps, count, countv4, countv6 := processData(vrpsjson) - if err != nil { - return err - } - log.Infof("New update (%v uniques, %v total prefixes). %v bytes. Updating sha256 hash %x -> %x", - len(vrps), count, len(s.lastconverted), s.lasthash, hsum) - s.lasthash = hsum + log.Infof("New update (%v uniques, %v total prefixes). %v bytes.", + len(vrps), count) s.server.AddVRPs(vrps) @@ -322,7 +289,7 @@ func (s *state) updateFile(file string) error { s.exported = prefixfile.VRPList{ Metadata: prefixfile.MetaData{ Counts: len(vrpsjson), - Buildtime: vrplistjson.Metadata.Buildtime, + Buildtime: s.lastdata.Metadata.Buildtime, }, Data: vrpsjson, } @@ -339,16 +306,54 @@ func (s *state) updateFile(file string) error { countv6_dup++ } } - s.metricsEvent.UpdateMetrics(countv4, countv6, countv4_dup, countv6_dup, s.lastchange, s.lastts, file) + s.metricsEvent.UpdateMetrics(countv4, countv6, countv4_dup, countv6_dup, s.lastchange, s.lastts, *CacheBin) } + return nil } -func (s *state) updateSlurm(file string) error { +func (s *state) updateFile(file string) (bool, error) { + log.Debugf("Refreshing cache from %s", file) + + s.lastts = time.Now().UTC() + data, code, lastrefresh, err := s.fetchConfig.FetchFile(file) + if err != nil { + return false, err + } + if lastrefresh { + LastRefresh.WithLabelValues(file).Set(float64(s.lastts.UnixNano() / 1e9)) + } + if code != -1 { + RefreshStatusCode.WithLabelValues(file, fmt.Sprintf("%d", code)).Inc() + } + + hsum, _ := checkFile(data) + if s.lasthash != nil { + cres := bytes.Compare(s.lasthash, hsum) + if cres == 0 { + return false, IdenticalFile{File: file} + } + } + + log.Infof("new cache file: Updating sha256 hash %x -> %x", s.lasthash, hsum) + + vrplistjson, err := decodeJSON(data) + if err != nil { + return false, err + } + + s.lasthash = hsum + s.lastchange = time.Now().UTC() + s.lastdata = vrplistjson + + return true, nil +} + +func (s *state) updateSlurm(file string) (bool, error) { log.Debugf("Refreshing slurm from %v", file) data, code, lastrefresh, err := s.fetchConfig.FetchFile(file) if err != nil { - return err + return false, err } if lastrefresh { LastRefresh.WithLabelValues(file).Set(float64(s.lastts.UnixNano() / 1e9)) @@ -361,10 +366,10 @@ func (s *state) updateSlurm(file string) error { slurm, err := prefixfile.DecodeJSONSlurm(buf) if err != nil { - return err + return false, err } s.slurm = slurm - return nil + return true, nil } func (s *state) routineUpdate(file string, interval int, slurmFile string) { @@ -379,8 +384,10 @@ func (s *state) routineUpdate(file string, interval int, slurmFile string) { log.Debug("Received HUP signal") } delay.Stop() + slurmNotPresentOrUpdated := false if slurmFile != "" { - err := s.updateSlurm(slurmFile) + var err error + slurmNotPresentOrUpdated, err = s.updateSlurm(slurmFile) if err != nil { switch err.(type) { case utils.HttpNotModified: @@ -392,7 +399,7 @@ func (s *state) routineUpdate(file string, interval int, slurmFile string) { } } } - err := s.updateFile(file) + cacheUpdated, err := s.updateFile(file) if err != nil { switch err.(type) { case utils.HttpNotModified: @@ -405,6 +412,15 @@ func (s *state) routineUpdate(file string, interval int, slurmFile string) { log.Errorf("Error updating: %v", err) } } + + // Only process the first time after there is either a cache or SLURM + // update. + if (cacheUpdated || slurmNotPresentOrUpdated) { + err := s.updateFromNewState() + if err != nil { + log.Errorf("Error updating from new state: %v", err) + } + } } } @@ -417,8 +433,7 @@ func (s *state) exporter(wr http.ResponseWriter, r *http.Request) { } type state struct { - lastdata []byte - lastconverted []byte + lastdata *prefixfile.VRPList lasthash []byte lastchange time.Time lastts time.Time @@ -532,7 +547,7 @@ func main() { log.Fatalf("Specify at least a bind address") } - err := s.updateFile(*CacheBin) + _, err := s.updateFile(*CacheBin) if err != nil { switch err.(type) { case utils.HttpNotModified: @@ -548,7 +563,7 @@ func main() { slurmFile := *Slurm if slurmFile != "" { - err := s.updateSlurm(slurmFile) + _, err := s.updateSlurm(slurmFile) if err != nil { switch err.(type) { case utils.HttpNotModified: