Skip to content

Commit

Permalink
merge(#3541): fixed EKS disabled addons
Browse files Browse the repository at this point in the history
Fixed the EKS addons, UpdateAddon not existing addon handling, UpdateCluster respecting addon feature flag & k8s version.
#3541
  • Loading branch information
pregnor authored Jul 15, 2021
2 parents 20bc379 + ceebb9a commit aba35b0
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 30 deletions.
2 changes: 1 addition & 1 deletion cmd/worker/eks.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func registerEKSWorkflows(
eksworkflow2.NewWaitCloudFormationStackUpdateActivity(awsSessionFactory).Register(worker)

// New cluster update
eksworkflow2.NewUpdateClusterWorkflow().Register(worker)
eksworkflow2.NewUpdateClusterWorkflow(config.Distribution.EKS.EnableAddons).Register(worker)

eksworkflow2.NewUpdateClusterVersionActivity(awsSessionFactory, eksFactory).Register(worker)
eksworkflow2.NewWaitUpdateClusterVersionActivity(awsSessionFactory, eksFactory).Register(worker)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,13 @@ func (a *UpdateAddonActivity) Execute(ctx context.Context, input UpdateAddonActi
ClusterName: aws.String(input.ClusterName),
}
addonOutput, err := eksSvc.DescribeAddon(describeAddonInput)
addonNotFoundErrMsg := fmt.Sprintf("No %s addon found in cluster: %s", input.AddonName, input.ClusterName)
if err != nil {
var awsErr awserr.Error
if errors.As(err, &awsErr) {
err = errors.New(awsErr.Message())
}
if err.Error() == addonNotFoundErrMsg {
logger.Infof(addonNotFoundErrMsg)
if isAWSAddonNotFoundError(err, input.AddonName, input.ClusterName) { // Note: no update for not existing addons.
logger.Infof("%s", err.Error())

return &UpdateAddonActivityOutput{UpdateID: ""}, nil
}

return nil, errors.WrapIfWithDetails(err, "failed to retrieve addon", "cluster", input.ClusterName, "addon", input.AddonName)
}

Expand Down Expand Up @@ -174,6 +171,25 @@ func selectLatestVersion(addonVersions *eks.DescribeAddonVersionsOutput, current
return latestVersion.Original(), nil
}

// errorMessageAWSAddonNotFound is the error message returned by AWS when a
// non-existing cluster addon is queried (for example in DescribeAddon()).
func errorMessageAWSAddonNotFound(addonName, clusterName string) string {
return fmt.Sprintf("No addon: %s found in cluster: %s", addonName, clusterName)
}

// isAWSAddonNotFoundError returns a boolean indicator of whether the specified
// error is an error indicating the cluster has no such addon.
func isAWSAddonNotFoundError(err error, addonName, clusterName string) bool {
if err == nil {
return false
}

var awsErr awserr.Error

return errors.As(err, &awsErr) &&
awsErr.Message() == errorMessageAWSAddonNotFound(addonName, clusterName)
}

func versionIsCompatible(compatibilities []*eks.Compatibility, kubernetesVersion string) bool {
for _, c := range compatibilities {
if c.ClusterVersion == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package eksworkflow
import (
"time"

"emperror.dev/errors"
"github.com/Masterminds/semver/v3"
"go.uber.org/cadence"
"go.uber.org/cadence/workflow"

Expand All @@ -42,10 +44,13 @@ type UpdateClusterWorkflowInput struct {
}

type UpdateClusterWorkflow struct {
enableAddons bool
}

func NewUpdateClusterWorkflow() UpdateClusterWorkflow {
return UpdateClusterWorkflow{}
func NewUpdateClusterWorkflow(enableAddons bool) UpdateClusterWorkflow {
return UpdateClusterWorkflow{
enableAddons: enableAddons,
}
}

// Register registers the activity in the worker.
Expand Down Expand Up @@ -106,9 +111,23 @@ func (w UpdateClusterWorkflow) Execute(ctx workflow.Context, input UpdateCluster
}
}

// check add-on are enabled and K8s version is >= 1.18
clusterKubernetesVersion, err := semver.NewVersion(input.Version)
if err != nil {
_ = eksWorkflow.SetClusterStatus(ctx, input.ClusterID, pkgCluster.Warning, pkgCadence.UnwrapError(err).Error())

return errors.WrapIf(err, "parsing cluster version as semantic version failed")
}

kubernetesVersionIsAtLeast1_18, err := semver.NewConstraint(">=1.18")
if err != nil {
_ = eksWorkflow.SetClusterStatus(ctx, input.ClusterID, pkgCluster.Warning, pkgCadence.UnwrapError(err).Error())

return errors.WrapIf(err, "could not set 1.18 constraint for semver")
}

// update core-dns addon if there's a new version available for given KubernetesVersion
var coreDnsUpdateOutput UpdateAddonActivityOutput
{
if w.enableAddons && kubernetesVersionIsAtLeast1_18.Check(clusterKubernetesVersion) {
activityInput := UpdateAddonActivityInput{
OrganizationID: input.OrganizationID,
ProviderSecretID: input.ProviderSecretID,
Expand All @@ -117,30 +136,31 @@ func (w UpdateClusterWorkflow) Execute(ctx workflow.Context, input UpdateCluster
KubernetesVersion: input.Version,
AddonName: "coredns",
}
var coreDnsUpdateOutput UpdateAddonActivityOutput
err := workflow.ExecuteActivity(ctx, UpdateAddonActivityName, activityInput).Get(ctx, &coreDnsUpdateOutput)
if err != nil {
_ = eksWorkflow.SetClusterStatus(ctx, input.ClusterID, pkgCluster.Warning, pkgCadence.UnwrapError(err).Error())
return err
}
}

// wait for addon update to finish
if coreDnsUpdateOutput.UpdateID != "" {
activityInput := &WaitUpdateAddonActivityInput{
OrganizationID: input.OrganizationID,
ProviderSecretID: input.ProviderSecretID,
Region: input.Region,
ClusterName: input.ClusterName,
AddonName: "coredns",
UpdateID: coreDnsUpdateOutput.UpdateID,
}

ctx := workflow.WithStartToCloseTimeout(ctx, 2*time.Hour)

err := workflow.ExecuteActivity(ctx, WaitUpdateAddonActivityName, activityInput).Get(ctx, nil)
if err != nil {
_ = eksWorkflow.SetClusterStatus(ctx, input.ClusterID, pkgCluster.Warning, pkgCadence.UnwrapError(err).Error())
return err
// wait for addon update to finish
if coreDnsUpdateOutput.UpdateID != "" {
activityInput := &WaitUpdateAddonActivityInput{
OrganizationID: input.OrganizationID,
ProviderSecretID: input.ProviderSecretID,
Region: input.Region,
ClusterName: input.ClusterName,
AddonName: "coredns",
UpdateID: coreDnsUpdateOutput.UpdateID,
}

ctx := workflow.WithStartToCloseTimeout(ctx, 2*time.Hour)

err := workflow.ExecuteActivity(ctx, WaitUpdateAddonActivityName, activityInput).Get(ctx, nil)
if err != nil {
_ = eksWorkflow.SetClusterStatus(ctx, input.ClusterID, pkgCluster.Warning, pkgCadence.UnwrapError(err).Error())
return err
}
}
}

Expand Down

0 comments on commit aba35b0

Please sign in to comment.