Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: pkg/resource: output progress logs in applicators #532

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.20
require (
dario.cat/mergo v1.0.0
github.com/bufbuild/buf v1.26.1
github.com/evanphx/json-patch v5.6.0+incompatible
github.com/go-logr/logr v1.2.4
github.com/google/go-cmp v0.5.9
github.com/spf13/afero v1.9.5
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5y
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U=
github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww=
github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4=
github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs=
Expand Down
2 changes: 1 addition & 1 deletion pkg/connection/store/kubernetes/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewSecretStore(ctx context.Context, local client.Client, _ *tls.Config, cfg
return &SecretStore{
client: resource.ClientApplicator{
Client: kube,
Applicator: resource.NewApplicatorWithRetry(resource.NewAPIPatchingApplicator(kube), resource.IsAPIErrorWrapped, nil),
Applicator: resource.NewApplicatorWithRetry(resource.NewAPIPatchingApplicator(kube), resource.IsNonConflictAPIErrorWrapped, nil),
},
defaultNamespace: cfg.DefaultScope,
}, nil
Expand Down
43 changes: 43 additions & 0 deletions pkg/logging/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
Copyright 2023 The Crossplane Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package logging

import (
"fmt"

"sigs.k8s.io/controller-runtime/pkg/client"
)

// ForResource returns logging values for a resource.
func ForResource(object client.Object) []string {
ret := make([]string, 0, 10)
gvk := object.GetObjectKind().GroupVersionKind()
if gvk.Kind == "" {
gvk.Kind = fmt.Sprintf("%T", object) // best effort for native Go types
}
ret = append(ret,
"name", object.GetName(),
"kind", gvk.Kind,
"version", gvk.Version,
"group", gvk.Group,
)
if ns := object.GetNamespace(); ns != "" {
ret = append(ret, "namespace", ns)
}

return ret
}
2 changes: 1 addition & 1 deletion pkg/reconciler/managed/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func NewAPISecretPublisher(c client.Client, ot runtime.ObjectTyper) *APISecretPu
// backward compatibility with the original API of this function.
return &APISecretPublisher{
secret: resource.NewApplicatorWithRetry(resource.NewAPIPatchingApplicator(c),
resource.IsAPIErrorWrapped, nil),
resource.IsNonConflictAPIErrorWrapped, nil),
typer: ot,
}
}
Expand Down
186 changes: 146 additions & 40 deletions pkg/resource/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,117 +20,223 @@ import (
"context"
"encoding/json"

jsonpatch "github.com/evanphx/json-patch"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/crossplane/crossplane-runtime/pkg/errors"
"github.com/crossplane/crossplane-runtime/pkg/logging"
"github.com/crossplane/crossplane-runtime/pkg/meta"
)

// Error strings.
const (
errUpdateObject = "cannot update object"

// taken from k8s.io/apiserver. Not crucial to match, but for uniformity it
// better should.
// TODO(sttts): import from k8s.io/apiserver/pkg/registry/generic/registry when
// kube has updated otel dependencies post-1.28.
errOptimisticLock = "the object has been modified; please apply your changes to the latest version and try again"
)

// An APIPatchingApplicator applies changes to an object by either creating or
// patching it in a Kubernetes API server.
type APIPatchingApplicator struct {
client client.Client
log logging.Logger
}

// NewAPIPatchingApplicator returns an Applicator that applies changes to an
// object by either creating or patching it in a Kubernetes API server.
func NewAPIPatchingApplicator(c client.Client) *APIPatchingApplicator {
return &APIPatchingApplicator{client: c}
return &APIPatchingApplicator{client: c, log: logging.NewNopLogger()}
}

// WithLogger sets the logger on the APIPatchingApplicator. The logger logs
// client operations including diffs of objects that are patched. Diffs of
// secrets are redacted.
func (a *APIPatchingApplicator) WithLogger(l logging.Logger) *APIPatchingApplicator {
a.log = l
return a
}

// Apply changes to the supplied object. The object will be created if it does
// not exist, or patched if it does. If the object does exist, it will only be
// patched if the passed object has the same or an empty resource version.
func (a *APIPatchingApplicator) Apply(ctx context.Context, o client.Object, ao ...ApplyOption) error {
m, ok := o.(metav1.Object)
if !ok {
return errors.New("cannot access object metadata")
}
func (a *APIPatchingApplicator) Apply(ctx context.Context, obj client.Object, ao ...ApplyOption) error { //nolint:gocyclo // the logic here is crucial and deserves to stay in one method
log := a.log.WithValues(logging.ForResource(obj))

if m.GetName() == "" && m.GetGenerateName() != "" {
return errors.Wrap(a.client.Create(ctx, o), "cannot create object")
if obj.GetName() == "" && obj.GetGenerateName() != "" {
log.Info("creating object")
return a.client.Create(ctx, obj)
}

desired := o.DeepCopyObject()

err := a.client.Get(ctx, types.NamespacedName{Name: m.GetName(), Namespace: m.GetNamespace()}, o)
current := obj.DeepCopyObject().(client.Object)
err := a.client.Get(ctx, types.NamespacedName{Name: obj.GetName(), Namespace: obj.GetNamespace()}, current)
if kerrors.IsNotFound(err) {
// TODO(negz): Apply ApplyOptions here too?
return errors.Wrap(a.client.Create(ctx, o), "cannot create object")
log.Info("creating object")
return a.client.Create(ctx, obj)
}
if err != nil {
return errors.Wrap(err, "cannot get object")
return err
}

for _, fn := range ao {
if err := fn(ctx, o, desired); err != nil {
// Note: this check would ideally not be necessary if the Apply signature
// had a current object that we could use for the diff. But we have no
// current and for consistency of the patch it matters that the object we
// get above is the one that was originally used.
if obj.GetResourceVersion() != "" && obj.GetResourceVersion() != current.GetResourceVersion() {
gvr, err := groupResource(a.client, obj)
if err != nil {
return err
}
return kerrors.NewConflict(gvr, current.GetName(), errors.New(errOptimisticLock))
}

for _, fn := range ao {
if err := fn(ctx, current, obj); err != nil {
return errors.Wrapf(err, "apply option failed for %s", HumanReadableReference(a.client, obj))
}
}

// TODO(negz): Allow callers to override the kind of patch used.
return errors.Wrap(a.client.Patch(ctx, o, &patch{desired}), "cannot patch object")
// log diff
patch := client.MergeFromWithOptions(current, client.MergeFromWithOptimisticLock{})
patchBytes, err := patch.Data(obj)
if err != nil {
return errors.Wrapf(err, "failed to diff %s", HumanReadableReference(a.client, obj))
}
if len(patchBytes) == 0 {
return nil
}
secretGVK := schema.GroupVersionKind{Group: "v1", Version: "Secret", Kind: "Secret"}
if obj.GetObjectKind().GroupVersionKind() == secretGVK {
// TODO(sttts): be more clever and only redact the secret data
log.WithValues("diff", "**REDACTED**").Info("patching object")
} else {
log.WithValues("diff", string(patchBytes)).Info("patching object")
}

return a.client.Patch(ctx, obj, client.RawPatch(patch.Type(), patchBytes))
}

type patch struct{ from runtime.Object }
func groupResource(c client.Client, o client.Object) (schema.GroupResource, error) {
gvk, err := c.GroupVersionKindFor(o)
if err != nil {
return schema.GroupResource{}, errors.Wrapf(err, "cannot determine group version kind of %T", o)
}
m, err := c.RESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return schema.GroupResource{}, errors.Wrapf(err, "cannot determine group resource of %v", gvk)
}
return m.Resource.GroupResource(), nil
}

func (p *patch) Type() types.PatchType { return types.MergePatchType }
func (p *patch) Data(_ client.Object) ([]byte, error) { return json.Marshal(p.from) }
// AdditiveMergePatchApplyOption returns an ApplyOption that makes
// the Apply additive in the sense of a merge patch without null values. This is
// the old behavior of the APIPatchingApplicator.
//
// This only works with a desired object of type *unstructured.Unstructured.
//
// Deprecated: replace with Server Side Apply.
func AdditiveMergePatchApplyOption(_ context.Context, current, desired runtime.Object) error {
u, ok := desired.(*unstructured.Unstructured)
if !ok {
return errors.New("desired object is not an unstructured.Unstructured")
}
currentBytes, err := json.Marshal(current)
if err != nil {
return errors.Wrapf(err, "cannot marshal current %s", HumanReadableReference(nil, current))
}
desiredBytes, err := json.Marshal(u)
if err != nil {
return errors.Wrapf(err, "cannot marshal desired %s", HumanReadableReference(nil, desired))
}
mergedBytes, err := jsonpatch.MergePatch(currentBytes, desiredBytes)
if err != nil {
return errors.Wrapf(err, "cannot merge patch to %s", HumanReadableReference(nil, desired))
}
u.Object = nil
if err = json.Unmarshal(mergedBytes, &u.Object); err != nil {
return errors.Wrapf(err, "cannot unmarshal merged patch to %s", HumanReadableReference(nil, desired))
}
return nil
}

// An APIUpdatingApplicator applies changes to an object by either creating or
// updating it in a Kubernetes API server.
type APIUpdatingApplicator struct {
client client.Client
log logging.Logger
}

// NewAPIUpdatingApplicator returns an Applicator that applies changes to an
// object by either creating or updating it in a Kubernetes API server.
//
// Deprecated: Use NewAPIPatchingApplicator instead. The updating applicator
// can lead to data-loss if the Golang types in this process are not up-to-date.
func NewAPIUpdatingApplicator(c client.Client) *APIUpdatingApplicator {
return &APIUpdatingApplicator{client: c}
return &APIUpdatingApplicator{client: c, log: logging.NewNopLogger()}
}

// WithLogger sets the logger on the APIUpdatingApplicator. The logger logs
// client operations including diffs of objects that are patched. Diffs of
// secrets are redacted.
func (a *APIUpdatingApplicator) WithLogger(l logging.Logger) *APIUpdatingApplicator {
a.log = l
return a
}

// Apply changes to the supplied object. The object will be created if it does
// not exist, or updated if it does.
func (a *APIUpdatingApplicator) Apply(ctx context.Context, o client.Object, ao ...ApplyOption) error {
m, ok := o.(Object)
if !ok {
return errors.New("cannot access object metadata")
}
func (a *APIUpdatingApplicator) Apply(ctx context.Context, obj client.Object, ao ...ApplyOption) error {
log := a.log.WithValues(logging.ForResource(obj))

if m.GetName() == "" && m.GetGenerateName() != "" {
return errors.Wrap(a.client.Create(ctx, o), "cannot create object")
if obj.GetName() == "" && obj.GetGenerateName() != "" {
log.Info("creating object")
return a.client.Create(ctx, obj)
}

current := o.DeepCopyObject().(client.Object)

err := a.client.Get(ctx, types.NamespacedName{Name: m.GetName(), Namespace: m.GetNamespace()}, current)
current := obj.DeepCopyObject().(client.Object)
err := a.client.Get(ctx, types.NamespacedName{Name: obj.GetName(), Namespace: obj.GetNamespace()}, current)
if kerrors.IsNotFound(err) {
// TODO(negz): Apply ApplyOptions here too?
return errors.Wrap(a.client.Create(ctx, m), "cannot create object")
log.Info("creating object")
return a.client.Create(ctx, obj)
}
if err != nil {
return errors.Wrap(err, "cannot get object")
return err
}

for _, fn := range ao {
if err := fn(ctx, current, m); err != nil {
return err
if err := fn(ctx, current, obj); err != nil {
return errors.Wrapf(err, "apply option failed for %s", HumanReadableReference(a.client, obj))
}
}

// NOTE(hasheddan): we must set the resource version of the desired object
// to that of the current or the update will always fail.
m.SetResourceVersion(current.(metav1.Object).GetResourceVersion())
return errors.Wrap(a.client.Update(ctx, m), "cannot update object")
// log diff
patch := client.MergeFromWithOptions(current, client.MergeFromWithOptimisticLock{})
patchBytes, err := patch.Data(obj)
if err != nil {
return errors.Wrapf(err, "failed to diff %s", HumanReadableReference(a.client, obj))
}
if len(patchBytes) == 0 {
return nil
}
secretGVK := schema.GroupVersionKind{Group: "v1", Version: "Secret", Kind: "Secret"}
if obj.GetObjectKind().GroupVersionKind() == secretGVK {
// TODO(sttts): be more clever and only redact the secret data
log.WithValues("diff", "**REDACTED**").Info("patching object")
} else {
log.WithValues("diff", string(patchBytes)).Info("patching object")
}

return a.client.Update(ctx, obj)
}

// An APIFinalizer adds and removes finalizers to and from a resource.
Expand Down
Loading