Skip to content

Commit

Permalink
Fix controller memory leak
Browse files Browse the repository at this point in the history
  • Loading branch information
Shai Katz committed Mar 29, 2021
1 parent b045f14 commit 3cbc22d
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 12 deletions.
1 change: 1 addition & 0 deletions src/crd-controller/HostedServices/V1Alpha2Controller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ private IDisposable ObserveKamusSecret(CancellationToken token)
ApiVersion,
"kamussecrets",
token)
.Take(1)
.SelectMany(x =>
Observable.FromAsync(async () => await HandleEvent(x.Item1, x.Item2))
)
Expand Down
27 changes: 15 additions & 12 deletions src/crd-controller/utils/KubernetesExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Reactive.Linq;
using System.Threading;
using k8s;
using Microsoft.VisualBasic;

namespace CustomResourceDescriptorController.utils
{
Expand All @@ -13,22 +14,24 @@ public static class KubernetesExtensions
string version,
string plural,
CancellationToken cancellationToken
) where TCRD : class
) where TCRD : class
{
Watcher<TCRD> watcher = null;
return Observable.FromAsync(async () =>
{
var subject = new System.Reactive.Subjects.Subject<(WatchEventType, TCRD)>();
var path = $"apis/{group}/{version}/watch/{plural}";
await kubernetes.WatchObjectAsync<TCRD>(path,
timeoutSeconds: int.MaxValue,
onEvent: (@type, @event) => subject.OnNext((@type, @event)),
onError: e => subject.OnError(e),
onClosed: () => subject.OnCompleted(), cancellationToken: cancellationToken);
return subject;
})
{
var subject = new System.Reactive.Subjects.Subject<(WatchEventType, TCRD)>();
var path = $"apis/{group}/{version}/watch/{plural}";
watcher = await kubernetes.WatchObjectAsync<TCRD>(path,
timeoutSeconds: int.MaxValue,
onEvent: (@type, @event) => subject.OnNext((@type, @event)),
onError: e => subject.OnError(e),
onClosed: () => subject.OnCompleted(), cancellationToken: cancellationToken);
return subject;
})
.SelectMany(x => x)
.Select(t => (t.Item1, t.Item2 as TCRD))
.Where(t => t.Item2 != null);
.Where(t => t.Item2 != null)
.Finally(() => watcher?.Dispose());
}
}
}

0 comments on commit 3cbc22d

Please sign in to comment.