Skip to content

Commit

Permalink
feat(pingdom): add rate limiting
Browse files Browse the repository at this point in the history
  • Loading branch information
rkettelerij committed Jun 20, 2024
1 parent 7f5f36e commit b6a756b
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 82 deletions.
7 changes: 4 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package main
import (
"crypto/tls"
"flag"
"log"
"os"

"github.com/PDOK/uptime-operator/internal/service"
Expand Down Expand Up @@ -125,11 +124,13 @@ func main() {
if uptimeProvider == "pingdom" {
alertUserIDs, err := util.StringsToInts(pingdomAlertUserIDs)
if err != nil {
log.Fatalf("Unable to parse 'pingdom-alert-user-ids' flag: %v", err)
setupLog.Error(err, "Unable to parse 'pingdom-alert-user-ids' flag")
os.Exit(1)
}
alertIntegrationIDs, err := util.StringsToInts(pingdomAlertIntegrationIDs)
if err != nil {
log.Fatalf("Unable to parse 'pingdom-alert-integration-ids' flag: %v", err)
setupLog.Error(err, "Unable to parse 'pingdom-alert-integration-ids' flag: %v")
os.Exit(1)
}
uptimeProviderSettings = providers.PingdomSettings{
APIToken: pingdomAPIToken,
Expand Down
74 changes: 37 additions & 37 deletions internal/model/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,78 +15,78 @@ func TestNewUptimeCheck(t *testing.T) {
name: "All annotations present",
ingressName: "test-ingress",
annotations: map[string]string{
"uptime.pdok.nl/id": "1234567890",
"uptime.pdok.nl/name": "Test Check",
"uptime.pdok.nl/url": "https://pdok.example",
"uptime.pdok.nl/tags": "tag1, tag2",
"uptime.pdok.nl/request-headers": "key1:value1, key2:value2",
"uptime.pdok.nl/response-check-contains": "test string",
"uptime.pdok.nl/response-check-not-contains": "",
"uptime.pdok.nl/id": "1234567890",
"uptime.pdok.nl/name": "Test Check",
"uptime.pdok.nl/url": "https://pdok.example",
"uptime.pdok.nl/tags": "tag1, tag2",
"uptime.pdok.nl/request-headers": "key1:value1, key2:value2",
"uptime.pdok.nl/response-check-for-string-contains": "test string",
"uptime.pdok.nl/response-check-for-string-not-contains": "",
},
wantErr: false,
},
{
name: "Missing ID annotation",
ingressName: "test-ingress",
annotations: map[string]string{
"uptime.pdok.nl/name": "Test Check",
"uptime.pdok.nl/url": "https://pdok.example",
"uptime.pdok.nl/tags": "tag1, tag2",
"uptime.pdok.nl/request-headers": "key1:value1, key2:value2",
"uptime.pdok.nl/response-check-contains": "test string",
"uptime.pdok.nl/response-check-not-contains": "",
"uptime.pdok.nl/name": "Test Check",
"uptime.pdok.nl/url": "https://pdok.example",
"uptime.pdok.nl/tags": "tag1, tag2",
"uptime.pdok.nl/request-headers": "key1:value1, key2:value2",
"uptime.pdok.nl/response-check-for-string-contains": "test string",
"uptime.pdok.nl/response-check-for-string-not-contains": "",
},
wantErr: true,
},
{
name: "Missing Name annotation",
ingressName: "test-ingress",
annotations: map[string]string{
"uptime.pdok.nl/id": "1234567890",
"uptime.pdok.nl/url": "https://pdok.example",
"uptime.pdok.nl/tags": "tag1, tag2",
"uptime.pdok.nl/request-headers": "key1:value1, key2:value2",
"uptime.pdok.nl/response-check-contains": "test string",
"uptime.pdok.nl/response-check-not-contains": "",
"uptime.pdok.nl/id": "1234567890",
"uptime.pdok.nl/url": "https://pdok.example",
"uptime.pdok.nl/tags": "tag1, tag2",
"uptime.pdok.nl/request-headers": "key1:value1, key2:value2",
"uptime.pdok.nl/response-check-for-string-contains": "test string",
"uptime.pdok.nl/response-check-for-string-not-contains": "",
},
wantErr: true,
},
{
name: "Missing URL annotation",
ingressName: "test-ingress",
annotations: map[string]string{
"uptime.pdok.nl/id": "1234567890",
"uptime.pdok.nl/name": "Test Check",
"uptime.pdok.nl/tags": "tag1, tag2",
"uptime.pdok.nl/request-headers": "key1:value1, key2:value2",
"uptime.pdok.nl/response-check-contains": "test string",
"uptime.pdok.nl/response-check-not-contains": "",
"uptime.pdok.nl/id": "1234567890",
"uptime.pdok.nl/name": "Test Check",
"uptime.pdok.nl/tags": "tag1, tag2",
"uptime.pdok.nl/request-headers": "key1:value1, key2:value2",
"uptime.pdok.nl/response-check-for-string-contains": "test string",
"uptime.pdok.nl/response-check-for-string-not-contains": "",
},
wantErr: true,
},
{
name: "Missing tags annotation",
ingressName: "test-ingress",
annotations: map[string]string{
"uptime.pdok.nl/id": "1234567890",
"uptime.pdok.nl/name": "Test Check",
"uptime.pdok.nl/url": "https://pdok.example",
"uptime.pdok.nl/request-headers": "key1:value1, key2:value2",
"uptime.pdok.nl/response-check-contains": "test string",
"uptime.pdok.nl/response-check-not-contains": "",
"uptime.pdok.nl/id": "1234567890",
"uptime.pdok.nl/name": "Test Check",
"uptime.pdok.nl/url": "https://pdok.example",
"uptime.pdok.nl/request-headers": "key1:value1, key2:value2",
"uptime.pdok.nl/response-check-for-string-contains": "test string",
"uptime.pdok.nl/response-check-for-string-not-contains": "",
},
wantErr: false,
},
{
name: "Missing request-headers annotation",
ingressName: "test-ingress",
annotations: map[string]string{
"uptime.pdok.nl/id": "1234567890",
"uptime.pdok.nl/name": "Test Check",
"uptime.pdok.nl/url": "https://pdok.example",
"uptime.pdok.nl/tags": "tag1, tag2",
"uptime.pdok.nl/response-check-contains": "test string",
"uptime.pdok.nl/response-check-not-contains": "",
"uptime.pdok.nl/id": "1234567890",
"uptime.pdok.nl/name": "Test Check",
"uptime.pdok.nl/url": "https://pdok.example",
"uptime.pdok.nl/tags": "tag1, tag2",
"uptime.pdok.nl/response-check-for-string-contains": "test string",
"uptime.pdok.nl/response-check-for-string-not-contains": "",
},
wantErr: false,
},
Expand Down
6 changes: 4 additions & 2 deletions internal/service/provider.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package service

import (
"context"

"github.com/PDOK/uptime-operator/internal/model"
)

type UptimeProvider interface {
// CreateOrUpdateCheck create the given check with the uptime monitoring
// provider, or update an existing check. Needs to be idempotent!
CreateOrUpdateCheck(check model.UptimeCheck) error
CreateOrUpdateCheck(ctx context.Context, check model.UptimeCheck) error

// DeleteCheck deletes the given check from the uptime monitoring provider
DeleteCheck(check model.UptimeCheck) error
DeleteCheck(ctx context.Context, check model.UptimeCheck) error
}
11 changes: 6 additions & 5 deletions internal/service/providers/mock.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package providers

import (
"context"
"encoding/json"
"log"

"github.com/PDOK/uptime-operator/internal/model"
"sigs.k8s.io/controller-runtime/pkg/log"
)

type MockUptimeProvider struct {
Expand All @@ -17,20 +18,20 @@ func NewMockUptimeProvider() *MockUptimeProvider {
}
}

func (m *MockUptimeProvider) CreateOrUpdateCheck(check model.UptimeCheck) error {
func (m *MockUptimeProvider) CreateOrUpdateCheck(ctx context.Context, check model.UptimeCheck) error {
m.checks[check.ID] = check

checkJSON, _ := json.Marshal(check)
log.Printf("MOCK: created or updated check %s\n", checkJSON)
log.FromContext(ctx).Info("MOCK: created or updated check %s\n", checkJSON)

return nil
}

func (m *MockUptimeProvider) DeleteCheck(check model.UptimeCheck) error {
func (m *MockUptimeProvider) DeleteCheck(ctx context.Context, check model.UptimeCheck) error {
delete(m.checks, check.ID)

checkJSON, _ := json.Marshal(check)
log.Printf("MOCK: deleted check %s\n", checkJSON)
log.FromContext(ctx).Info("MOCK: deleted check %s\n", checkJSON)

return nil
}
79 changes: 55 additions & 24 deletions internal/service/providers/pingdom.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package providers

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
classiclog "log"
"net/http"
"net/url"
"sort"
Expand All @@ -14,6 +15,7 @@ import (
"time"

"github.com/PDOK/uptime-operator/internal/model"
"sigs.k8s.io/controller-runtime/pkg/log"
)

const pingdomURL = "https://api.pingdom.com/api/3.1/checks"
Expand All @@ -31,46 +33,49 @@ type PingdomUptimeProvider struct {
httpClient *http.Client
}

// NewPingdomUptimeProvider creates a PingdomUptimeProvider
func NewPingdomUptimeProvider(settings PingdomSettings) *PingdomUptimeProvider {
if settings.APIToken == "" {
log.Fatal("Pingdom API token is not provided")
classiclog.Fatal("Pingdom API token is not provided")
}
return &PingdomUptimeProvider{
settings: settings,
httpClient: &http.Client{Timeout: time.Duration(5) * time.Minute},
}
}

func (m *PingdomUptimeProvider) CreateOrUpdateCheck(check model.UptimeCheck) (err error) {
existingCheckID, err := m.findCheck(check)
// CreateOrUpdateCheck create the given check with Pingdom, or update an existing check. Needs to be idempotent!
func (m *PingdomUptimeProvider) CreateOrUpdateCheck(ctx context.Context, check model.UptimeCheck) (err error) {
existingCheckID, err := m.findCheck(ctx, check)
if err != nil {
return err
}
if existingCheckID == checkNotFound {
err = m.createCheck(check)
err = m.createCheck(ctx, check)
} else {
err = m.updateCheck(existingCheckID, check)
err = m.updateCheck(ctx, existingCheckID, check)
}
return err
}

func (m *PingdomUptimeProvider) DeleteCheck(check model.UptimeCheck) error {
log.Printf("deleting check %v\n", check)
// DeleteCheck deletes the given check from Pingdom
func (m *PingdomUptimeProvider) DeleteCheck(ctx context.Context, check model.UptimeCheck) error {
log.FromContext(ctx).Info("deleting check", "check", check)

existingCheckID, err := m.findCheck(check)
existingCheckID, err := m.findCheck(ctx, check)
if err != nil {
return err
}
if existingCheckID == checkNotFound {
log.Printf("check with ID '%s' is already deleted", check.ID)
log.FromContext(ctx).Info(fmt.Sprintf("check with ID '%s' is already deleted", check.ID))
return nil
}

req, err := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s/%d", pingdomURL, existingCheckID), nil)
if err != nil {
return err
}
resp, err := m.execRequestWithAuth(req)
resp, err := m.execRequest(ctx, req)
if err != nil {
return err
}
Expand All @@ -82,15 +87,15 @@ func (m *PingdomUptimeProvider) DeleteCheck(check model.UptimeCheck) error {
return nil
}

func (m *PingdomUptimeProvider) findCheck(check model.UptimeCheck) (int64, error) {
func (m *PingdomUptimeProvider) findCheck(ctx context.Context, check model.UptimeCheck) (int64, error) {
result := checkNotFound

req, err := http.NewRequest(http.MethodGet, pingdomURL+"?include_tags=true", nil)
if err != nil {
return result, err
}
req.Header.Add("Accept", "application/json")
resp, err := m.execRequestWithAuth(req)
resp, err := m.execRequest(ctx, req)
if err != nil {
return result, err
}
Expand Down Expand Up @@ -129,8 +134,8 @@ func (m *PingdomUptimeProvider) findCheck(check model.UptimeCheck) (int64, error
return result, nil
}

func (m *PingdomUptimeProvider) createCheck(check model.UptimeCheck) error {
log.Printf("creating check %v\n", check)
func (m *PingdomUptimeProvider) createCheck(ctx context.Context, check model.UptimeCheck) error {
log.FromContext(ctx).Info("creating check", "check", check)

message, err := m.checkToJSON(check, true)
if err != nil {
Expand All @@ -140,15 +145,15 @@ func (m *PingdomUptimeProvider) createCheck(check model.UptimeCheck) error {
if err != nil {
return err
}
err = m.execRequestWithBody(req)
err = m.execRequestWithBody(ctx, req)
if err != nil {
return err
}
return nil
}

func (m *PingdomUptimeProvider) updateCheck(existingPingdomID int64, check model.UptimeCheck) error {
log.Printf("updating check %v\n, using pingdom ID %d", check, existingPingdomID)
func (m *PingdomUptimeProvider) updateCheck(ctx context.Context, existingPingdomID int64, check model.UptimeCheck) error {
log.FromContext(ctx).Info("updating check", "check", check, "pingdom ID", existingPingdomID)

message, err := m.checkToJSON(check, false)
if err != nil {
Expand All @@ -158,7 +163,7 @@ func (m *PingdomUptimeProvider) updateCheck(existingPingdomID int64, check model
if err != nil {
return err
}
err = m.execRequestWithBody(req)
err = m.execRequestWithBody(ctx, req)
if err != nil {
return err
}
Expand Down Expand Up @@ -186,7 +191,7 @@ func (m *PingdomUptimeProvider) checkToJSON(check model.UptimeCheck, includeType
"name": check.Name,
"host": checkURL.Hostname(),
"url": relativeURL,
"encryption": true,
"encryption": true, // assume all checks run over HTTPS
"port": port,
"resolution": 1,
"tags": check.Tags,
Expand Down Expand Up @@ -226,9 +231,9 @@ func (m *PingdomUptimeProvider) checkToJSON(check model.UptimeCheck, includeType
return json.Marshal(message)
}

func (m *PingdomUptimeProvider) execRequestWithBody(req *http.Request) error {
func (m *PingdomUptimeProvider) execRequestWithBody(ctx context.Context, req *http.Request) error {
req.Header.Add("Content-Type", "application/json")
resp, err := m.execRequestWithAuth(req)
resp, err := m.execRequest(ctx, req)
if err != nil {
return err
}
Expand All @@ -240,9 +245,35 @@ func (m *PingdomUptimeProvider) execRequestWithBody(req *http.Request) error {
return nil
}

func (m *PingdomUptimeProvider) execRequestWithAuth(req *http.Request) (*http.Response, error) {
func (m *PingdomUptimeProvider) execRequest(ctx context.Context, req *http.Request) (*http.Response, error) {
req.Header.Add("Authorization", "Bearer "+m.settings.APIToken)
return m.httpClient.Do(req)
resp, err := m.httpClient.Do(req)
if err != nil {
return resp, err
}

// handle rate limits
remainingShort, resetTimeShort, err := parseRateLimitHeader(resp.Header.Get("Req-Limit-Short"))
if remainingShort < 10 {
log.FromContext(ctx).Info(fmt.Sprintf("Waiting for %d seconds to avoid hitting Pingdom rate limit", resetTimeShort+1),
"Req-Limit-Short", remainingShort)
time.Sleep(time.Duration(resetTimeShort+1) * time.Second)
}
remainingLong, resetTimeLong, err := parseRateLimitHeader(resp.Header.Get("Req-Limit-Long"))
if remainingLong < 10 {
log.FromContext(ctx).Info(fmt.Sprintf("Waiting for %d seconds to avoid hitting Pingdom rate limit", resetTimeLong+1),
"Req-Limit-Long", remainingLong)
time.Sleep(time.Duration(resetTimeLong+1) * time.Second)
}
return resp, err
}

func parseRateLimitHeader(header string) (remaining int, resetTime int, err error) {
if header == "" {
return 0, 0, nil
}
_, err = fmt.Sscanf(header, "Remaining: %d Time until reset: %d", &remaining, &resetTime)
return
}

func getPort(checkURL *url.URL) (int, error) {
Expand Down
Loading

0 comments on commit b6a756b

Please sign in to comment.