Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Compatibility] Added BZMPOP, BZPOPMAX and BZPOPMIN commands #884

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions libs/resources/RespCommandsDocs.json
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,110 @@
}
]
},
{
"Command": "BZMPOP",
"Name": "BZMPOP",
"Summary": "Removes and returns a member by score from one or more sorted sets. Blocks until a member is available otherwise. Deletes the sorted set if the last element was popped.",
"Group": "SortedSet",
"Complexity": "O(K) \u002B O(M*log(N)) where K is the number of provided keys, N being the number of elements in the sorted set, and M being the number of elements popped.",
"Arguments": [
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "TIMEOUT",
"DisplayText": "timeout",
"Type": "Double"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "NUMKEYS",
"DisplayText": "numkeys",
"Type": "Integer"
},
{
"TypeDiscriminator": "RespCommandKeyArgument",
"Name": "KEY",
"DisplayText": "key",
"Type": "Key",
"ArgumentFlags": "Multiple",
"KeySpecIndex": 0
},
{
"TypeDiscriminator": "RespCommandContainerArgument",
"Name": "WHERE",
"Type": "OneOf",
"Arguments": [
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "MIN",
"DisplayText": "min",
"Type": "PureToken",
"Token": "MIN"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "MAX",
"DisplayText": "max",
"Type": "PureToken",
"Token": "MAX"
}
]
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "COUNT",
"DisplayText": "count",
"Type": "Integer",
"Token": "COUNT",
"ArgumentFlags": "Optional"
}
]
},
{
"Command": "BZPOPMAX",
"Name": "BZPOPMAX",
"Summary": "Removes and returns the member with the highest score from one or more sorted sets. Blocks until a member available otherwise. Deletes the sorted set if the last element was popped.",
"Group": "SortedSet",
"Complexity": "O(log(N)) with N being the number of elements in the sorted set.",
"Arguments": [
{
"TypeDiscriminator": "RespCommandKeyArgument",
"Name": "KEY",
"DisplayText": "key",
"Type": "Key",
"ArgumentFlags": "Multiple",
"KeySpecIndex": 0
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "TIMEOUT",
"DisplayText": "timeout",
"Type": "Double"
}
]
},
{
"Command": "BZPOPMIN",
"Name": "BZPOPMIN",
"Summary": "Removes and returns the member with the lowest score from one or more sorted sets. Blocks until a member is available otherwise. Deletes the sorted set if the last element was popped.",
"Group": "SortedSet",
"Complexity": "O(log(N)) with N being the number of elements in the sorted set.",
"Arguments": [
{
"TypeDiscriminator": "RespCommandKeyArgument",
"Name": "KEY",
"DisplayText": "key",
"Type": "Key",
"ArgumentFlags": "Multiple",
"KeySpecIndex": 0
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "TIMEOUT",
"DisplayText": "timeout",
"Type": "Double"
}
]
},
{
"Command": "CLIENT",
"Name": "CLIENT",
Expand Down
72 changes: 72 additions & 0 deletions libs/resources/RespCommandsInfo.json
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,78 @@
}
]
},
{
"Command": "BZMPOP",
"Name": "BZMPOP",
"Arity": -5,
"Flags": "Blocking, MovableKeys, Write",
"AclCategories": "Blocking, SortedSet, Slow, Write",
"KeySpecifications": [
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 2
},
"FindKeys": {
"TypeDiscriminator": "FindKeysKeyNum",
"KeyNumIdx": 0,
"FirstKey": 1,
"KeyStep": 1
},
"Flags": "RW, Access, Delete"
}
]
},
{
"Command": "BZPOPMAX",
"Name": "BZPOPMAX",
"Arity": -3,
"Flags": "Blocking, Fast, Write",
"FirstKey": 1,
"LastKey": -2,
"Step": 1,
"AclCategories": "Blocking, Fast, SortedSet, Write",
"KeySpecifications": [
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 1
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": -2,
"KeyStep": 1,
"Limit": 0
},
"Flags": "RW, Access, Delete"
}
]
},
{
"Command": "BZPOPMIN",
"Name": "BZPOPMIN",
"Arity": -3,
"Flags": "Blocking, Fast, Write",
"FirstKey": 1,
"LastKey": -2,
"Step": 1,
"AclCategories": "Blocking, Fast, SortedSet, Write",
"KeySpecifications": [
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 1
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": -2,
"KeyStep": 1,
"Limit": 0
},
"Flags": "RW, Access, Delete"
}
]
},
{
"Command": "CLIENT",
"Name": "CLIENT",
Expand Down
56 changes: 33 additions & 23 deletions libs/server/Objects/ItemBroker/CollectionItemBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -383,37 +383,44 @@ private static bool TryMoveNextListItem(ListObject srcListObj, ListObject dstLis
}

/// <summary>
/// Try to get next available item from sorted set object
/// Try to get next available item from sorted set object based on command type
/// BZPOPMIN and BZPOPMAX share same implementation since Dictionary.First() and Last()
/// handle the ordering automatically based on sorted set scores
/// </summary>
/// <param name="sortedSetObj">Sorted set object</param>
/// <param name="command">RESP command</param>
/// <param name="nextItem">Item retrieved</param>
/// <returns>True if found available item</returns>
private static bool TryGetNextSetObject(SortedSetObject sortedSetObj, RespCommand command, out byte[] nextItem)
private static unsafe bool TryGetNextSetObjects(byte[] key, SortedSetObject sortedSetObj, RespCommand command, ArgSlice[] cmdArgs, out CollectionItemResult result)
{
nextItem = default;
result = default;

// If object has no items, return
if (sortedSetObj.Dictionary.Count == 0) return false;

// Get the next object according to operation type
switch (command)
{
case RespCommand.BZPOPMIN:
case RespCommand.BZPOPMAX:
var element = sortedSetObj.Pop(command == RespCommand.BZPOPMAX);
result = new CollectionItemResult(key, [element]);
return true;

case RespCommand.BZMPOP:
var lowScoresFirst = *(bool*)cmdArgs[0].ptr;
var popCount = *(int*)cmdArgs[1].ptr;
popCount = Math.Min(popCount, sortedSetObj.Dictionary.Count);

var scoredItems = new (double Score, byte[] Element)[popCount];

for (int i = 0; i < popCount; i++)
{
scoredItems[i] = sortedSetObj.Pop(!lowScoresFirst);
}

result = new CollectionItemResult(key, scoredItems);
return true;

default:
return false;
}
}

/// <summary>
/// Try to get next available item from object
/// </summary>
/// <param name="key">Key of object</param>
/// <param name="storageSession">Current storage session</param>
/// <param name="command">RESP command</param>
/// <param name="cmdArgs">Additional command arguments</param>
/// <param name="currCount">Collection size</param>
/// <param name="result">Result of command</param>
/// <returns>True if found available item</returns>
private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, RespCommand command, ArgSlice[] cmdArgs, out int currCount, out CollectionItemResult result)
{
currCount = default;
Expand All @@ -423,6 +430,7 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
var objectType = command switch
{
RespCommand.BLPOP or RespCommand.BRPOP or RespCommand.BLMOVE or RespCommand.BLMPOP => GarnetObjectType.List,
RespCommand.BZPOPMIN or RespCommand.BZPOPMAX or RespCommand.BZMPOP => GarnetObjectType.SortedSet,
_ => throw new NotSupportedException()
};

Expand Down Expand Up @@ -524,11 +532,13 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
}
case SortedSetObject setObj:
currCount = setObj.Dictionary.Count;
if (objectType != GarnetObjectType.SortedSet) return false;
if (objectType != GarnetObjectType.SortedSet)
return false;
if (currCount == 0)
return false;

return TryGetNextSetObjects(key, setObj, command, cmdArgs, out result);

var hasValue = TryGetNextSetObject(setObj, command, out var sortedSetNextItem);
result = new CollectionItemResult(key, sortedSetNextItem);
return hasValue;
default:
return false;
}
Expand Down
11 changes: 11 additions & 0 deletions libs/server/Objects/ItemBroker/CollectionItemResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ public CollectionItemResult(byte[] key, byte[][] items)
Items = items;
}

public CollectionItemResult(byte[] key, (double Score, byte[] Element)[] scoredItems)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to create a separate PR in future to remove CollectionItemResult class and replace it with SpanByteAndMemory, Let me know if there is any issue with that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? It'd make the code less readable. I'd leave it as-is.

{
Key = key;
ScoredItems = scoredItems;
}

/// <summary>
/// True if item was found
/// </summary>
Expand All @@ -40,6 +46,11 @@ public CollectionItemResult(byte[] key, byte[][] items)
/// </summary>
internal byte[][] Items { get; }

/// <summary>
/// Scored items retrieved from collection, where each item has an associated score.
/// </summary>
internal (double Score, byte[] Element)[] ScoredItems { get; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think instead of doing this we should re-use the Item & Items properties to store the elements & have matching double Score and double[] Scores properties.


/// <summary>
/// Instance of empty result
/// </summary>
Expand Down
14 changes: 14 additions & 0 deletions libs/server/Objects/SortedSet/SortedSetObjectImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.Intrinsics.X86;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mistake?

using System.Text;
using Garnet.common;
using Tsavorite.core;
Expand Down Expand Up @@ -886,6 +887,19 @@ private void SortedSetRank(ref ObjectInput input, ref SpanByteAndMemory output,
}
}

public (double Score, byte[] Element) Pop(bool popMaxScoreElement = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add an XML comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should rename this to PopMinOrMax for clarity

{
if (sortedSet.Count == 0)
return default;

var element = popMaxScoreElement ? sortedSet.Max : sortedSet.Min;
sortedSet.Remove(element);
sortedSetDict.Remove(element.Element);
this.UpdateSize(element.Element, false);

return element;
}

/// <summary>
/// Removes and returns up to COUNT members with the low or high score
/// </summary>
Expand Down
Loading
Loading