From 3cbc22d5f12a3d85c1f6105eda18992670155a78 Mon Sep 17 00:00:00 2001 From: Shai Katz Date: Mon, 29 Mar 2021 16:25:07 +0300 Subject: [PATCH] Fix controller memory leak --- .../HostedServices/V1Alpha2Controller.cs | 1 + .../utils/KubernetesExtensions.cs | 27 ++++++++++--------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/src/crd-controller/HostedServices/V1Alpha2Controller.cs b/src/crd-controller/HostedServices/V1Alpha2Controller.cs index 3107217d..da1de6a2 100644 --- a/src/crd-controller/HostedServices/V1Alpha2Controller.cs +++ b/src/crd-controller/HostedServices/V1Alpha2Controller.cs @@ -53,6 +53,7 @@ private IDisposable ObserveKamusSecret(CancellationToken token) ApiVersion, "kamussecrets", token) + .Take(1) .SelectMany(x => Observable.FromAsync(async () => await HandleEvent(x.Item1, x.Item2)) ) diff --git a/src/crd-controller/utils/KubernetesExtensions.cs b/src/crd-controller/utils/KubernetesExtensions.cs index 10647dbc..d829392c 100644 --- a/src/crd-controller/utils/KubernetesExtensions.cs +++ b/src/crd-controller/utils/KubernetesExtensions.cs @@ -2,6 +2,7 @@ using System.Reactive.Linq; using System.Threading; using k8s; +using Microsoft.VisualBasic; namespace CustomResourceDescriptorController.utils { @@ -13,22 +14,24 @@ public static class KubernetesExtensions string version, string plural, CancellationToken cancellationToken - ) where TCRD : class + ) where TCRD : class { + Watcher watcher = null; return Observable.FromAsync(async () => - { - var subject = new System.Reactive.Subjects.Subject<(WatchEventType, TCRD)>(); - var path = $"apis/{group}/{version}/watch/{plural}"; - await kubernetes.WatchObjectAsync(path, - timeoutSeconds: int.MaxValue, - onEvent: (@type, @event) => subject.OnNext((@type, @event)), - onError: e => subject.OnError(e), - onClosed: () => subject.OnCompleted(), cancellationToken: cancellationToken); - return subject; - }) + { + var subject = new System.Reactive.Subjects.Subject<(WatchEventType, TCRD)>(); + var path = $"apis/{group}/{version}/watch/{plural}"; + watcher = await kubernetes.WatchObjectAsync(path, + timeoutSeconds: int.MaxValue, + onEvent: (@type, @event) => subject.OnNext((@type, @event)), + onError: e => subject.OnError(e), + onClosed: () => subject.OnCompleted(), cancellationToken: cancellationToken); + return subject; + }) .SelectMany(x => x) .Select(t => (t.Item1, t.Item2 as TCRD)) - .Where(t => t.Item2 != null); + .Where(t => t.Item2 != null) + .Finally(() => watcher?.Dispose()); } } }