diff --git a/pkg/patterns/declarative/pkg/watch/dynamic.go b/pkg/patterns/declarative/pkg/watch/dynamic.go index c86805dc..13d3214d 100644 --- a/pkg/patterns/declarative/pkg/watch/dynamic.go +++ b/pkg/patterns/declarative/pkg/watch/dynamic.go @@ -17,6 +17,7 @@ limitations under the License. package watch import ( + "atomic" "context" "fmt" "sync" @@ -35,8 +36,19 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" ) -// WatchDelay is the time between a Watch being dropped and attempting to resume it -const WatchDelay = 30 * time.Second +var ( + // WatchActivityTimeout sets a timeout for a Watch activity under normal operation + WatchActivityTimeout = 300 * time.Second + // WatchActivityFirstTimeout sets a timeout for Watch activity in an Apply path + // We expect the author to set this to a lower value in environments where it makes sense. + // func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { ... watch.WatchActivityFirstTimeout = 10 * time.Second ... } + WatchActivityFirstTimeout = 300 * time.Second +) + +const ( + // WatchDelay is the time between a Watch being dropped and attempting to resume it + WatchDelay = 30 * time.Second +) // NewDynamicWatch constructs a watcher for unstructured objects. // Deprecated: avoid using directly; will move to internal in future. @@ -138,13 +150,44 @@ type clientObject struct { // // [1] https://github.com/kubernetes/kubernetes/issues/54878#issuecomment-357575276 func (w *dynamicKindWatch) watchUntilClosed(ctx context.Context, eventTarget metav1.ObjectMeta, watchStarted *sync.WaitGroup) { + var sawActivity atomic.Bool + log := log.FromContext(ctx) options := w.FilterOptions // Though we don't use the resource version, we allow bookmarks to help keep TCP connections healthy. options.AllowWatchBookmarks = true - events, err := w.resource.Watch(context.TODO(), options) + activityTimeout := WatchActivityTimeout + if watchStarted != nil { + activityTimeout = WatchActivityFirstTimeout + } + ctx, cancel := context.WithCancel(ctx) + defer cancel() + // Check for events periodically + ticker := time.NewTicker(activityTimeout) + defer ticker.Stop() + sawActivity.Store(false) + + go func() { + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if !sawActivity.Load() { + log.WithValues("kind", w.GVK.String()).WithValues("namespace", w.FilterNamespace).WithValues("labels", options.LabelSelector).Info("no activity seen for a while, cancelling watch") + cancel() + return + } + sawActivity.Store(false) + } + } + }() + + events, err := w.resource.Watch(ctx, options) + sawActivity.Store(true) + if watchStarted != nil { watchStarted.Done() } @@ -159,6 +202,7 @@ func (w *dynamicKindWatch) watchUntilClosed(ctx context.Context, eventTarget met defer events.Stop() for clientEvent := range events.ResultChan() { + sawActivity.Store(true) switch clientEvent.Type { case watch.Bookmark: // not an object change, we ignore it