-
Notifications
You must be signed in to change notification settings - Fork 543
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Compatibility] Added CLIENT UNBLOCK command #886
base: main
Are you sure you want to change the base?
Changes from all commits
63ef1a4
1368fb1
398f6d2
6e0bb98
37db209
3ecb443
1748103
95e015e
6aeb1d2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,6 +46,11 @@ public class CollectionItemBroker : IDisposable | |
private bool disposed = false; | ||
private bool isStarted = false; | ||
|
||
internal bool TryGetObserver(int sessionId, out CollectionItemObserver observer) | ||
{ | ||
return SessionIdToObserver.TryGetValue(sessionId, out observer); | ||
} | ||
|
||
/// <summary> | ||
/// Asynchronously wait for item from collection object | ||
/// </summary> | ||
|
@@ -118,13 +123,15 @@ private async Task<CollectionItemResult> GetCollectionItemAsync(CollectionItemOb | |
? TimeSpan.FromMilliseconds(-1) | ||
: TimeSpan.FromSeconds(timeoutInSeconds); | ||
|
||
var isError = false; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Naming: isError is confusing here, since there's no error actually occurring, should be something like isForceUnblocked. |
||
try | ||
{ | ||
// Wait for either the result found notification or the timeout to expire | ||
await observer.ResultFoundSemaphore.WaitAsync(timeout, observer.CancellationTokenSource.Token); | ||
} | ||
catch (OperationCanceledException) | ||
catch (OperationCanceledException) when (observer.CancellationTokenSource.IsCancellationRequested) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will also be true if the session was disposed |
||
{ | ||
isError = true; | ||
} | ||
|
||
SessionIdToObserver.TryRemove(observer.Session.ObjectStoreSessionID, out _); | ||
|
@@ -136,6 +143,11 @@ private async Task<CollectionItemResult> GetCollectionItemAsync(CollectionItemOb | |
observer.HandleSetResult(CollectionItemResult.Empty); | ||
} | ||
|
||
if (isError) | ||
{ | ||
return CollectionItemResult.Error; | ||
} | ||
|
||
return observer.Result; | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,11 @@ public CollectionItemResult(byte[] key, byte[][] items) | |
Items = items; | ||
} | ||
|
||
private CollectionItemResult(bool isError) | ||
{ | ||
IsError = isError; | ||
} | ||
|
||
/// <summary> | ||
/// True if item was found | ||
/// </summary> | ||
|
@@ -40,9 +45,19 @@ public CollectionItemResult(byte[] key, byte[][] items) | |
/// </summary> | ||
internal byte[][] Items { get; } | ||
|
||
/// <summary> | ||
/// Indicates whether the result represents an error. | ||
/// </summary> | ||
internal readonly bool IsError { get; } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same issue with naming here, should be something like IsForceUnblocked (update the comment accordingly) |
||
|
||
/// <summary> | ||
/// Instance of empty result | ||
/// </summary> | ||
internal static readonly CollectionItemResult Empty = new(null, item: null); | ||
|
||
/// <summary> | ||
/// Instance representing an error result. | ||
/// </summary> | ||
internal static readonly CollectionItemResult Error = new(true); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -568,5 +568,77 @@ private bool NetworkCLIENTSETINFO() | |
|
||
return true; | ||
} | ||
|
||
/// <summary> | ||
/// CLIENT UNBLOCK | ||
/// </summary> | ||
private bool NetworkCLIENTUNBLOCK() | ||
{ | ||
if (parseState.Count is not (1 or 2)) | ||
{ | ||
return AbortWithWrongNumberOfArguments("client|unblock"); | ||
} | ||
|
||
if (!parseState.TryGetLong(0, out var clientId)) | ||
{ | ||
return AbortWithErrorMessage(CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER); | ||
} | ||
|
||
var toThrowError = false; | ||
if (parseState.Count == 2) | ||
{ | ||
var option = parseState.GetArgSliceByRef(1); | ||
if (option.Span.EqualsUpperCaseSpanIgnoringCase(CmdStrings.TIMEOUT)) | ||
{ | ||
toThrowError = false; | ||
} | ||
else if (option.Span.EqualsUpperCaseSpanIgnoringCase(CmdStrings.ERROR)) | ||
{ | ||
toThrowError = true; | ||
} | ||
else | ||
{ | ||
return AbortWithErrorMessage(CmdStrings.RESP_ERR_INVALID_CLIENT_UNBLOCK_REASON); | ||
} | ||
} | ||
|
||
if (Server is GarnetServerBase garnetServer) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is false you should abort with some error, otherwise the client will hang |
||
{ | ||
var session = garnetServer.ActiveConsumers().OfType<RespServerSession>().FirstOrDefault(x => x.Id == clientId); | ||
|
||
if (session is null) | ||
{ | ||
while (!RespWriteUtils.WriteInteger(0, ref dcurr, dend)) | ||
SendAndReset(); | ||
return true; | ||
} | ||
|
||
if (session.storeWrapper?.itemBroker is not null) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it is null then you should abort with an error |
||
{ | ||
var isBlocked = session.storeWrapper.itemBroker.TryGetObserver(session.ObjectStoreSessionID, out var observer); | ||
|
||
if (!isBlocked) | ||
{ | ||
while (!RespWriteUtils.WriteInteger(0, ref dcurr, dend)) | ||
SendAndReset(); | ||
return true; | ||
} | ||
|
||
if (toThrowError) | ||
{ | ||
observer.CancellationTokenSource.Cancel(); | ||
} | ||
else | ||
{ | ||
observer.ResultFoundSemaphore.Release(); | ||
} | ||
} | ||
|
||
while (!RespWriteUtils.WriteInteger(1, ref dcurr, dend)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The client may be unblocked between you checking if it's blocked and attempting to unblock it (in which case you should return 0). In other words, you should get some feedback from the observer that indicates if you initiated the unblocking or not. |
||
SendAndReset(); | ||
} | ||
|
||
return true; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add an XML comment