diff --git a/config/config.go b/config/config.go index d2d1013..4c477b8 100644 --- a/config/config.go +++ b/config/config.go @@ -7,10 +7,10 @@ import ( // Config configuration. type Config struct { - RedisURL string `env:"REDIS_URL" envDefault:"redis://localhost:6379"` - GitHubToken string `env:"GITHUB_TOKEN"` - GitHubPageSize int `env:"GITHUB_PAGE_SIZE" envDefault:"100"` - Port string `env:"PORT" envDefault:"3000"` + RedisURL string `env:"REDIS_URL" envDefault:"redis://localhost:6379"` + GitHubTokens []string `env:"GITHUB_TOKENS"` + GitHubPageSize int `env:"GITHUB_PAGE_SIZE" envDefault:"100"` + Port string `env:"PORT" envDefault:"3000"` } // Get the current Config. diff --git a/internal/github/github.go b/internal/github/github.go index 537a620..af94016 100644 --- a/internal/github/github.go +++ b/internal/github/github.go @@ -1,10 +1,16 @@ package github import ( + "encoding/json" "errors" + "fmt" + "io" + "net/http" + "github.com/apex/log" "github.com/caarlos0/starcharts/config" "github.com/caarlos0/starcharts/internal/cache" + "github.com/caarlos0/starcharts/internal/roundrobin" "github.com/prometheus/client_golang/prometheus" ) @@ -16,11 +22,9 @@ var ErrGitHubAPI = errors.New("failed to talk with github api") // GitHub client struct. type GitHub struct { - token string + tokens roundrobin.RoundRobiner pageSize int cache *cache.Redis - - rateLimits, effectiveEtags prometheus.Counter } var rateLimits = prometheus.NewCounter(prometheus.CounterOpts{ @@ -35,18 +39,109 @@ var effectiveEtags = prometheus.NewCounter(prometheus.CounterOpts{ Name: "effective_etag_uses_total", }) +var tokensCount = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "starcharts", + Subsystem: "github", + Name: "available_tokens", +}) + +var invalidatedTokens = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "starcharts", + Subsystem: "github", + Name: "invalidated_tokens_total", +}) + +var rateLimiters = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "starcharts", + Subsystem: "github", + Name: "rate_limit_remaining", +}, []string{"token"}) + func init() { - prometheus.MustRegister(rateLimits, effectiveEtags) + prometheus.MustRegister(rateLimits, effectiveEtags, invalidatedTokens, tokensCount, rateLimiters) } // New github client. func New(config config.Config, cache *cache.Redis) *GitHub { - + tokensCount.Set(float64(len(config.GitHubTokens))) return &GitHub{ - token: config.GitHubToken, - pageSize: config.GitHubPageSize, - cache: cache, - rateLimits: rateLimits, - effectiveEtags: effectiveEtags, + tokens: roundrobin.New(config.GitHubTokens), + pageSize: config.GitHubPageSize, + cache: cache, + } +} + +const maxTries = 3 + +func (gh *GitHub) authorizedDo(req *http.Request, try int) (*http.Response, error) { + if try > maxTries { + return nil, fmt.Errorf("couldn't find a valid token") + } + token, err := gh.tokens.Pick() + if err != nil || token == nil { + log.WithError(err).Error("couldn't get a valid token") + return http.DefaultClient.Do(req) // try unauthorized request + } + + if err := gh.checkToken(token); err != nil { + log.WithError(err).Error("couldn't check rate limit, trying again") + return gh.authorizedDo(req, try+1) // try next token } + + // got a valid token, use it + req.Header.Add("Authorization", fmt.Sprintf("token %s", token.Key())) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return resp, err + } + return resp, err +} + +func (gh *GitHub) checkToken(token *roundrobin.Token) error { + req, err := http.NewRequest(http.MethodGet, "https://api.github.com/rate_limit", nil) + if err != nil { + return err + } + req.Header.Add("Authorization", fmt.Sprintf("token %s", token.Key())) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusUnauthorized { + token.Invalidate() + invalidatedTokens.Inc() + return fmt.Errorf("token is invalid") + } + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("request failed with status %d", resp.StatusCode) + } + + bts, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + + var limit rateLimit + if err := json.Unmarshal(bts, &limit); err != nil { + return err + } + rate := limit.Rate + log.Debugf("%s rate %d/%d", token, rate.Remaining, rate.Limit) + rateLimiters.WithLabelValues(token.String()).Set(float64(rate.Remaining)) + if rate.Remaining > rate.Limit/2 { + return nil // allow at most 50% rate limit usage + } + return fmt.Errorf("token usage is too high: %d/%d", rate.Remaining, rate.Limit) +} + +type rateLimit struct { + Rate rate `json:"rate"` +} + +type rate struct { + Remaining int `json:"remaining"` + Limit int `json:"limit"` } diff --git a/internal/github/repo.go b/internal/github/repo.go index ea6f497..f250179 100644 --- a/internal/github/repo.go +++ b/internal/github/repo.go @@ -43,7 +43,7 @@ func (gh *GitHub) RepoDetails(ctx context.Context, name string) (Repository, err switch resp.StatusCode { case http.StatusNotModified: log.Info("not modified") - gh.effectiveEtags.Inc() + effectiveEtags.Inc() err := gh.cache.Get(name, &repo) if err != nil { log.WithError(err).Warnf("failed to get %s from cache", name) @@ -54,7 +54,7 @@ func (gh *GitHub) RepoDetails(ctx context.Context, name string) (Repository, err } return repo, err case http.StatusForbidden: - gh.rateLimits.Inc() + rateLimits.Inc() log.Warn("rate limit hit") return repo, ErrRateLimit case http.StatusOK: @@ -87,8 +87,6 @@ func (gh *GitHub) makeRepoRequest(ctx context.Context, name, etag string) (*http if etag != "" { req.Header.Add("If-None-Match", etag) } - if gh.token != "" { - req.Header.Add("Authorization", fmt.Sprintf("token %s", gh.token)) - } - return http.DefaultClient.Do(req) + + return gh.authorizedDo(req, 0) } diff --git a/internal/github/repo_test.go b/internal/github/repo_test.go index 4718ef8..4669a8c 100644 --- a/internal/github/repo_test.go +++ b/internal/github/repo_test.go @@ -7,6 +7,7 @@ import ( "github.com/alicebob/miniredis" "github.com/caarlos0/starcharts/config" "github.com/caarlos0/starcharts/internal/cache" + "github.com/caarlos0/starcharts/internal/roundrobin" "github.com/go-redis/redis" "github.com/matryer/is" "gopkg.in/h2non/gock.v1" @@ -31,6 +32,11 @@ func TestRepoDetails(t *testing.T) { defer cache.Close() gt := New(config, cache) + gock.New("https://api.github.com"). + Get("/rate_limit"). + Reply(200). + JSON(rateLimit{rate{Limit: 5000, Remaining: 4000}}) + t.Run("get repo details from api", func(t *testing.T) { is := is.New(t) gock.New("https://api.github.com"). @@ -57,6 +63,11 @@ func TestRepoDetails(t *testing.T) { func TestRepoDetails_APIfailure(t *testing.T) { defer gock.Off() + gock.New("https://api.github.com"). + Get("/rate_limit"). + Reply(200). + JSON(rateLimit{rate{Limit: 5000, Remaining: 4000}}) + gock.New("https://api.github.com"). Get("/repos/test/test"). Reply(404) @@ -78,18 +89,23 @@ func TestRepoDetails_APIfailure(t *testing.T) { t.Run("set error if api return 404", func(t *testing.T) { is := is.New(t) _, err := gt.RepoDetails(context.TODO(), "test/test") - is.True(err != nil) //Expected error + is.True(err != nil) // Expected error }) t.Run("set error if api return 403", func(t *testing.T) { is := is.New(t) _, err := gt.RepoDetails(context.TODO(), "private/private") - is.True(err != nil) //Expected error + is.True(err != nil) // Expected error }) } func TestRepoDetails_WithAuthToken(t *testing.T) { defer gock.Off() + gock.New("https://api.github.com"). + Get("/rate_limit"). + Reply(200). + JSON(rateLimit{rate{Limit: 5000, Remaining: 4000}}) + repo := Repository{ FullName: "aasm/aasm", CreatedAt: "2008-02-28T20:40:04Z", @@ -110,7 +126,7 @@ func TestRepoDetails_WithAuthToken(t *testing.T) { cache := cache.New(rc) defer cache.Close() gt := New(config, cache) - gt.token = "12345" + gt.tokens = roundrobin.New([]string{"12345"}) t.Run("get repo with auth token", func(t *testing.T) { is := is.New(t) diff --git a/internal/github/stars.go b/internal/github/stars.go index da209a9..6c0b883 100644 --- a/internal/github/stars.go +++ b/internal/github/stars.go @@ -91,7 +91,7 @@ func (gh *GitHub) getStargazersPage(ctx context.Context, repo Repository, page i switch resp.StatusCode { case http.StatusNotModified: - gh.effectiveEtags.Inc() + effectiveEtags.Inc() log.Info("not modified") err := gh.cache.Get(key, &stars) if err != nil { @@ -103,7 +103,7 @@ func (gh *GitHub) getStargazersPage(ctx context.Context, repo Repository, page i } return stars, err case http.StatusForbidden: - gh.rateLimits.Inc() + rateLimits.Inc() log.Warn("rate limit hit") return stars, ErrRateLimit case http.StatusOK: @@ -151,9 +151,6 @@ func (gh *GitHub) makeStarPageRequest(ctx context.Context, repo Repository, page if etag != "" { req.Header.Add("If-None-Match", etag) } - if gh.token != "" { - req.Header.Add("Authorization", fmt.Sprintf("token %s", gh.token)) - } - return http.DefaultClient.Do(req) + return gh.authorizedDo(req, 0) } diff --git a/internal/github/stars_test.go b/internal/github/stars_test.go index 2f3fe1e..174b175 100644 --- a/internal/github/stars_test.go +++ b/internal/github/stars_test.go @@ -8,6 +8,7 @@ import ( "github.com/alicebob/miniredis" "github.com/caarlos0/starcharts/config" "github.com/caarlos0/starcharts/internal/cache" + "github.com/caarlos0/starcharts/internal/roundrobin" "github.com/go-redis/redis" "github.com/matryer/is" "gopkg.in/h2non/gock.v1" @@ -16,6 +17,11 @@ import ( func TestStargazers(t *testing.T) { defer gock.Off() + gock.New("https://api.github.com"). + Get("/rate_limit"). + Reply(200). + JSON(rateLimit{rate{Limit: 5000, Remaining: 4000}}) + stargazers := []Stargazer{ {StarredAt: time.Now()}, {StarredAt: time.Now()}, @@ -63,6 +69,16 @@ func TestStargazers(t *testing.T) { func TestStargazers_EmptyResponseOnPagination(t *testing.T) { defer gock.Off() + gock.New("https://api.github.com"). + Get("/rate_limit"). + Reply(200). + JSON(rateLimit{rate{Limit: 5000, Remaining: 4000}}) + + gock.New("https://api.github.com"). + Get("/rate_limit"). + Reply(200). + JSON(rateLimit{rate{Limit: 5000, Remaining: 3999}}) + stargazers := []Stargazer{ {StarredAt: time.Now()}, {StarredAt: time.Now()}, @@ -98,7 +114,7 @@ func TestStargazers_EmptyResponseOnPagination(t *testing.T) { defer cache.Close() gt := New(config, cache) gt.pageSize = 2 - gt.token = "12345" + gt.tokens = roundrobin.New([]string{"12345"}) t.Run("get stargazers from api", func(t *testing.T) { is := is.New(t) @@ -110,6 +126,11 @@ func TestStargazers_EmptyResponseOnPagination(t *testing.T) { func TestStargazers_APIFailure(t *testing.T) { defer gock.Off() + gock.New("https://api.github.com"). + Get("/rate_limit"). + Reply(200). + JSON(rateLimit{rate{Limit: 5000, Remaining: 4000}}) + repo1 := Repository{ FullName: "test/test", CreatedAt: "2008-02-28T20:40:04Z", diff --git a/internal/roundrobin/roudrobin.go b/internal/roundrobin/roudrobin.go new file mode 100644 index 0000000..70d3425 --- /dev/null +++ b/internal/roundrobin/roudrobin.go @@ -0,0 +1,96 @@ +// Package roundrobin provides round robin invalidation-aware load balancing of github tokens. +package roundrobin + +import ( + "fmt" + "sync" + "sync/atomic" + + "github.com/apex/log" +) + +// RoundRobiner can pick a token from a list of tokens. +type RoundRobiner interface { + Pick() (*Token, error) +} + +// New round robin implementation with the given list of tokens. +func New(tokens []string) RoundRobiner { + log.Debugf("creating round robin with %d tokens", len(tokens)) + if len(tokens) == 0 { + return &noTokensRoundRobin{} + } + result := make([]*Token, 0, len(tokens)) + for _, item := range tokens { + result = append(result, NewToken(item)) + } + return &realRoundRobin{tokens: result} +} + +type realRoundRobin struct { + tokens []*Token + next int64 +} + +func (rr *realRoundRobin) Pick() (*Token, error) { + return rr.doPick(0) +} + +func (rr *realRoundRobin) doPick(try int) (*Token, error) { + if try > len(rr.tokens) { + return nil, fmt.Errorf("no valid tokens left") + } + idx := atomic.LoadInt64(&rr.next) + atomic.StoreInt64(&rr.next, (idx+1)%int64(len(rr.tokens))) + if pick := rr.tokens[idx]; pick.OK() { + log.Debugf("picked %s", pick.Key()) + return pick, nil + } + return rr.doPick(try + 1) +} + +type noTokensRoundRobin struct{} + +func (rr *noTokensRoundRobin) Pick() (*Token, error) { + return nil, nil +} + +// Token is a github token. +type Token struct { + token string + valid bool + lock sync.RWMutex +} + +// NewToken from its string representation. +func NewToken(token string) *Token { + return &Token{ + token: token, + valid: true, + } +} + +// String returns the last 3 chars for the token. +func (t *Token) String() string { + return t.token[len(t.token)-3:] +} + +// Key returns the actual token. +func (t *Token) Key() string { + return t.token +} + +// OK returns true if the token is valid. +func (t *Token) OK() bool { + t.lock.RLock() + defer t.lock.RUnlock() + return t.valid +} + +// Invalidate invalidates the token. +func (t *Token) Invalidate() { + log.Warnf("invalidated token '...%s'", t) + t.lock.Lock() + defer t.lock.Unlock() + t.valid = false +} diff --git a/internal/roundrobin/roundrobin_test.go b/internal/roundrobin/roundrobin_test.go new file mode 100644 index 0000000..21d4fa1 --- /dev/null +++ b/internal/roundrobin/roundrobin_test.go @@ -0,0 +1,119 @@ +package roundrobin + +import ( + "sync" + "sync/atomic" + "testing" + + "github.com/matryer/is" +) + +const ( + tokenA = "ghp_TokenA" + tokenB = "ghp_TokenB" + tokenC = "ghp_TokenC" + tokenD = "ghp_TokenD" +) + +var tokens = []string{tokenA, tokenB, tokenC, tokenD} + +func TestRoundRobin(t *testing.T) { + is := is.New(t) + rr := New(tokens) + + a, b, c, d := exercise(t, rr, 100) + + for _, n := range []int64{a, b, c, d} { + requireWithinRange(t, n, 23, 27) + } + is.Equal(int64(100), a+b+c+d) +} + +func TestRoundRobinWithInvalidatedKeys(t *testing.T) { + is := is.New(t) + rr := New(tokens) + invalidateN(t, rr, 2) + + a, b, c, d := exercise(t, rr, 100) + is.Equal(a, int64(0)) + is.Equal(b, int64(0)) + requireWithinRange(t, c, 48, 52) + requireWithinRange(t, d, 48, 52) +} + +func TestTokenString(t *testing.T) { + is := is.New(t) + is.Equal("enA", NewToken(tokenA).String()) + is.Equal("enB", NewToken(tokenB).String()) + is.Equal("enC", NewToken(tokenC).String()) + is.Equal("enD", NewToken(tokenD).String()) +} + +func TestNoTokens(t *testing.T) { + is := is.New(t) + rr := New([]string{}) + pick, err := rr.Pick() + is.True(pick == nil) // pick should not nil + is.NoErr(err) // no error should be returned +} + +func TestNoValidTokens(t *testing.T) { + is := is.New(t) + rr := New([]string{tokenA, tokenB}) + invalidateN(t, rr, 2) + + pick, err := rr.Pick() + is.True(pick == nil) // pick should be nil + is.True(err != nil) // should err +} + +func invalidateN(t *testing.T, rr RoundRobiner, n int) { + t.Helper() + is := is.New(t) + for i := 0; i < n; i++ { + pick, err := rr.Pick() + is.True(pick != nil) // pick should not be nil + is.NoErr(err) // no error should be returned + pick.Invalidate() + } +} + +func requireWithinRange(t *testing.T, n, min, max int64) { + t.Helper() + is := is.New(t) + is.True(n >= min) // n should be at least min + is.True(n <= max) // n should be at most max +} + +func exercise(t *testing.T, rr RoundRobiner, n int) (int64, int64, int64, int64) { + t.Helper() + is := is.New(t) + + var a, b, c, d int64 + var wg sync.WaitGroup + + wg.Add(100) + for i := 0; i < 100; i++ { + go func() { + pick, err := rr.Pick() + is.True(pick != nil) // pick should not be nil + is.NoErr(err) // no error should be returned + switch pick.Key() { + case tokenA: + atomic.AddInt64(&a, 1) + case tokenB: + atomic.AddInt64(&b, 1) + case tokenC: + atomic.AddInt64(&c, 1) + case tokenD: + atomic.AddInt64(&d, 1) + default: + t.Error("invalid pick:", pick) + } + wg.Done() + }() + } + wg.Wait() + + return a, b, c, d +} diff --git a/main.go b/main.go index 02249f5..932ca64 100644 --- a/main.go +++ b/main.go @@ -25,6 +25,7 @@ var static embed.FS func main() { log.SetHandler(text.New(os.Stderr)) + // log.SetLevel(log.DebugLevel) config := config.Get() ctx := log.WithField("port", config.Port) options, err := redis.ParseURL(config.RedisURL) @@ -50,7 +51,7 @@ func main() { Methods(http.MethodGet). HandlerFunc(controller.GetRepo(static, github, cache)) - // generic metrics + // generic metrics requestCounter := promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "starcharts", Subsystem: "http",