Skip to content

Commit

Permalink
WIP: implement random resolver as custom options in parallel_best
Browse files Browse the repository at this point in the history
  • Loading branch information
DerRockWolf committed Nov 12, 2023
1 parent 2c1c2b4 commit 3f54720
Show file tree
Hide file tree
Showing 8 changed files with 372 additions and 558 deletions.
1 change: 1 addition & 0 deletions resolver/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var _ = Describe("Bootstrap", Label("bootstrap"), func() {
)

BeforeEach(func() {
config.GetConfig().Upstreams.Strategy = config.UpstreamStrategyParallelBest
sutConfig = &config.Config{
BootstrapDNS: []config.BootstrappedUpstreamConfig{
{
Expand Down
152 changes: 121 additions & 31 deletions resolver/parallel_best_resolver.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package resolver

import (
"context"
"errors"
"fmt"
"math"
"strings"
Expand All @@ -18,9 +20,10 @@ import (
)

const (
upstreamDefaultCfgName = config.UpstreamDefaultCfgName
parallelResolverType = "parallel_best"
resolverCount = 2
upstreamDefaultCfgName = config.UpstreamDefaultCfgName
parallelResolverType = "parallel_best"
randomResolverType = "random"
parallelBestResolverCount = 2
)

// ParallelBestResolver delegates the DNS message to 2 upstream resolvers and returns the fastest answer
Expand All @@ -30,6 +33,9 @@ type ParallelBestResolver struct {

groupName string
resolvers []*upstreamResolverStatus

resolverCount int
retryWithDifferentResolver bool
}

type upstreamResolverStatus struct {
Expand Down Expand Up @@ -102,12 +108,23 @@ func newParallelBestResolver(
resolverStatuses = append(resolverStatuses, newUpstreamResolverStatus(r))
}

resolverCount := parallelBestResolverCount
retryWithDifferentResolver := false

if config.GetConfig().Upstreams.Strategy == config.UpstreamStrategyRandom {
resolverCount = 1
retryWithDifferentResolver = true
}

r := ParallelBestResolver{
configurable: withConfig(&cfg),
typed: withType(parallelResolverType),

groupName: cfg.Name,
resolvers: resolverStatuses,

resolverCount: resolverCount,
retryWithDifferentResolver: retryWithDifferentResolver,
}

return &r
Expand All @@ -117,6 +134,7 @@ func (r *ParallelBestResolver) Name() string {
return r.String()
}

// TODO: add resolverCount & retryWithDifferentResolver to output
func (r *ParallelBestResolver) String() string {
result := make([]string, len(r.resolvers))
for i, s := range r.resolvers {
Expand All @@ -136,57 +154,129 @@ func (r *ParallelBestResolver) Resolve(request *model.Request) (*model.Response,
return r.resolvers[0].resolver.Resolve(request)
}

r1, r2 := pickRandom(r.resolvers)
logger.Debugf("using %s and %s as resolver", r1.resolver, r2.resolver)
ctx := context.Background()

ch := make(chan requestResponse, resolverCount)
// using context with timeout for random upstream strategy
if r.resolverCount == 1 {
var cancel context.CancelFunc

var collectedErrors []error
logger = log.WithPrefix(logger, "random")
timeout := config.GetConfig().Upstreams.Timeout

ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout))
defer cancel()
}

resolvers := pickRandom(r.resolvers, r.resolverCount)
ch := make(chan requestResponse, len(resolvers))

// build usedResolver log string
var usedResolvers string
for _, resolver := range resolvers {
usedResolvers += fmt.Sprintf("%q,", resolver.resolver)
}

logger.WithField("resolver", r1.resolver).Debug("delegating to resolver")
usedResolvers = strings.TrimSuffix(usedResolvers, ",")

go r1.resolve(request, ch)
logger.Debug("using " + usedResolvers + " as resolver")

for _, resolver := range resolvers {
logger.WithField("resolver", resolver.resolver).Debug("delegating to resolver")

go resolver.resolve(request, ch)
}

logger.WithField("resolver", r2.resolver).Debug("delegating to resolver")
response, collectedErrors := evaluateResponses(ctx, logger, ch, resolvers)
if response != nil {
return response, nil
}

go r2.resolve(request, ch)
if !r.retryWithDifferentResolver {
return nil, fmt.Errorf("resolution was not successful, used resolvers: %s errors: %v",
usedResolvers, collectedErrors)
}

for len(collectedErrors) < resolverCount {
result := <-ch
return r.retryWithDifferent(logger, request, resolvers)
}

if result.err != nil {
logger.Debug("resolution failed from resolver, cause: ", result.err)
collectedErrors = append(collectedErrors, result.err)
} else {
logger.WithFields(logrus.Fields{
"resolver": *result.resolver,
"answer": util.AnswerToString(result.response.Res.Answer),
}).Debug("using response from resolver")
func evaluateResponses(
ctx context.Context, logger *logrus.Entry, ch chan requestResponse, resolvers []*upstreamResolverStatus,
) (*model.Response, []error) {
var collectedErrors []error

return result.response, nil
for len(collectedErrors) < len(resolvers) {
select {
case <-ctx.Done():
// this context is currently only set & canceled if resolverCount == 1
field := logrus.Fields{"resolver": resolvers[0].resolver}
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
logger.WithFields(field).Debug("upstream exceeded timeout, trying other upstream")
resolvers[0].lastErrorTime.Store(time.Now())
}
case result := <-ch:
if result.err != nil {
logger.Debug("resolution failed from resolver, cause: ", result.err)
collectedErrors = append(collectedErrors, result.err)
} else {
logger.WithFields(logrus.Fields{
"resolver": *result.resolver,
"answer": util.AnswerToString(result.response.Res.Answer),
}).Debug("using response from resolver")

return result.response, nil
}
}
}

return nil, fmt.Errorf("resolution was not successful, used resolvers: '%s' and '%s' errors: %v",
r1.resolver, r2.resolver, collectedErrors)
return nil, collectedErrors
}

func (r *ParallelBestResolver) retryWithDifferent(
logger *logrus.Entry, request *model.Request, resolvers []*upstreamResolverStatus,
) (*model.Response, error) {
// second try (if retryWithDifferentResolver == true)
resolver := weightedRandom(r.resolvers, resolvers)
logger.Debugf("using %s as second resolver", resolver.resolver)

ch := make(chan requestResponse, 1)

resolver.resolve(request, ch)

result := <-ch
if result.err != nil {
logger.Debug("resolution failed from resolver, cause: ", result.err)

return nil, errors.New("resolution was not successful, no resolver returned answer in time")
}

logger.WithFields(logrus.Fields{
"resolver": *result.resolver,
"answer": util.AnswerToString(result.response.Res.Answer),
}).Debug("using response from resolver")

return result.response, nil
}

// pick 2 different random resolvers from the resolver pool
func pickRandom(resolvers []*upstreamResolverStatus) (resolver1, resolver2 *upstreamResolverStatus) {
resolver1 = weightedRandom(resolvers, nil)
resolver2 = weightedRandom(resolvers, resolver1.resolver)
// pickRandom picks n (resolverCount) different random resolvers from the given resolver pool
func pickRandom(resolvers []*upstreamResolverStatus, resolverCount int) (choosenResolvers []*upstreamResolverStatus) {
for i := 0; i < resolverCount; i++ {
choosenResolvers = append(choosenResolvers, weightedRandom(resolvers, choosenResolvers))
}

return
}

func weightedRandom(in []*upstreamResolverStatus, exclude Resolver) *upstreamResolverStatus {
func weightedRandom(in, excludedResolvers []*upstreamResolverStatus) *upstreamResolverStatus {
const errorWindowInSec = 60

choices := make([]weightedrand.Choice[*upstreamResolverStatus, uint], 0, len(in))

outer:
for _, res := range in {
if exclude == res.resolver {
continue
for _, exclude := range excludedResolvers {
if exclude.resolver == res.resolver {
continue outer
}
}

var weight float64 = errorWindowInSec
Expand Down
Loading

0 comments on commit 3f54720

Please sign in to comment.