Skip to content

Commit

Permalink
Add functions to count object in use and objects in pool (#29)
Browse files Browse the repository at this point in the history
* Add functions to count object in use and objects in pool

* Define `keytype` and `valtype` for `Pool`

* fixup! Define `keytype` and `valtype` for `Pool`

* fixup! Add functions to count object in use and objects in pool

* Bump version

* Rename in terms of "use"

* Rename `max` -> `limit`

* Docs for keytype/valtype
  • Loading branch information
nickrobinson251 authored Nov 1, 2023
1 parent c5cb574 commit 4bf9b3e
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 29 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "ConcurrentUtilities"
uuid = "f0e56b4a-5159-44fe-b623-3e5288b988bb"
authors = ["Jacob Quinn <[email protected]>"]
version = "2.2.1"
version = "2.3.0"

[deps]
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
Expand Down
89 changes: 66 additions & 23 deletions src/pools.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,53 +4,96 @@ export Pool, acquire, release, drain!
import Base: acquire, release

"""
Pool{T}(max::Int=4096)
Pool{K, T}(max::Int=4096)
Pool{T}(limit::Int=4096)
Pool{K, T}(limit::Int=4096)
A threadsafe object for managing a pool of objects of type `T`, optionally keyed by objects
of type `K`. Objects can be requested by calling `acquire(f, pool, [key])`, where `f` is a
of type `K`.
Objects can be requested by calling `acquire(f, pool, [key])`, where `f` is a
function that returns a new object of type `T`.
The `key` argument is optional and can be used to lookup objects that match a certain criteria
(a Dict is used internally, so matching is `isequal`).
(a `Dict` is used internally, so matching is `isequal`).
The `max` argument will limit the number of objects
that can be acquired at any given time. If the limit has been reached, `acquire` will
block until an object is returned to the pool via `release`.
The `limit` argument will limit the number of objects that can be in use at any given time.
If the limit has been reached, `acquire` will block until an object is released
via `release`.
By default, `release(pool, obj)` will return the object to the pool for reuse.
`release(pool)` will return the "permit" to the pool while not returning
any object for reuse.
- `release(pool, obj)` will return the object to the pool for reuse.
- `release(pool)` will decrement the number in use but not return any object for reuse.
- `drain!` can be used to remove objects that have been returned to the pool for reuse;
it does *not* release any objects that are in use.
`drain!` can be used to remove any cached objects for reuse, but it does *not* release
any active acquires.
See also `acquire`, `release`, `Pools.limit`, `Pools.in_use`, `Pools.in_pool`, `drain!`.
The key and object types can be inspected with `keytype` and `valtype` respectively.
"""
mutable struct Pool{K, T}
lock::Threads.Condition
max::Int
limit::Int
cur::Int
keyedvalues::Dict{K, Vector{T}}
values::Vector{T}

function Pool{K, T}(max::Int=4096) where {K, T}
function Pool{K, T}(limit::Int=4096) where {K, T}
T === Nothing && throw(ArgumentError("Pool type can not be `Nothing`"))
x = new(Threads.Condition(), max, 0)
x = new(Threads.Condition(), limit, 0)
if K === Nothing
x.values = T[]
safesizehint!(x.values, max)
safesizehint!(x.values, limit)
else
x.keyedvalues = Dict{K, Vector{T}}()
end
return x
end
end

Pool{T}(max::Int=4096) where {T} = Pool{Nothing, T}(max)
Pool{T}(limit::Int=4096) where {T} = Pool{Nothing, T}(limit)

safesizehint!(x, n) = sizehint!(x, min(4096, n))

# determines whether we'll look up object caches in .keyedvalues or .values
iskeyed(::Pool{K}) where {K} = K !== Nothing

"""
keytype(::Pool)
Return the type of the keys for the pool.
If the pool is not keyed, this will return `Nothing`.
"""
Base.keytype(::Type{<:Pool{K}}) where {K} = K
Base.keytype(p::Pool) = keytype(typeof(p))

"""
valtype(::Pool)
Return the type of the objects that can be stored in the pool.
"""
Base.valtype(::Type{<:Pool{<:Any, T}}) where {T} = T
Base.valtype(p::Pool) = valtype(typeof(p))

"""
Pools.limit(pool::Pool) -> Int
Return the maximum number of objects permitted to be in use at the same time.
See `Pools.in_use(pool)` for the number of objects currently in use.
"""
limit(pool::Pool) = Base.@lock pool.lock pool.limit

"""
Pools.in_use(pool::Pool) -> Int
Return the number of objects currently in use. Less than or equal to `Pools.limit(pool)`.
"""
in_use(pool::Pool) = Base.@lock pool.lock pool.cur

"""
Pools.in_pool(pool::Pool) -> Int
Return the number of objects in the pool available for reuse.
"""
in_pool(pool::Pool) = Base.@lock pool.lock mapreduce(length, +, values(pool.keyedvalues); init=0)
in_pool(pool::Pool{Nothing}) = Base.@lock pool.lock length(pool.values)

"""
drain!(pool)
Expand All @@ -72,7 +115,7 @@ end
TRUE(x) = true

@noinline keyerror(key, K) = throw(ArgumentError("invalid key `$key` provided for pool key type $K"))
@noinline releaseerror() = throw(ArgumentError("cannot release permit when pool is empty"))
@noinline releaseerror() = throw(ArgumentError("cannot release when no objects are in use"))

# NOTE: assumes you have the lock!
function releasepermit(pool::Pool)
Expand All @@ -92,19 +135,19 @@ The `forcenew` keyword argument can be used to force the creation of a new objec
The `isvalid` keyword argument can be used to specify a function that will be called to determine if an object is still valid
for reuse. By default, all objects are considered valid.
If there are no objects available for reuse, `f` will be called to create a new object.
If the pool is already at its maximum capacity, `acquire` will block until an object is returned to the pool via `release`.
If the pool is already at its usage limit, `acquire` will block until an object is returned to the pool via `release`.
"""
function Base.acquire(f, pool::Pool{K, T}, key=nothing; forcenew::Bool=false, isvalid::Function=TRUE) where {K, T}
key isa K || keyerror(key, K)
Base.@lock pool.lock begin
# first get a permit
while pool.cur >= pool.max
while pool.cur >= pool.limit
wait(pool.lock)
end
pool.cur += 1
# now see if we can get an object from the pool for reuse
if !forcenew
objs = iskeyed(pool) ? get!(() -> safesizehint!(T[], pool.max), pool.keyedvalues, key) : pool.values
objs = iskeyed(pool) ? get!(() -> safesizehint!(T[], pool.limit), pool.keyedvalues, key) : pool.values
while !isempty(objs)
obj = pop!(objs)
isvalid(obj) && return obj
Expand All @@ -126,10 +169,10 @@ end
release(pool::Pool{K, T}, obj::T)
release(pool::Pool{K, T})
Return an object to a `pool`, optionally keyed by the provided `key`.
Release an object from usage by a `pool`, optionally keyed by the provided `key`.
If `obj` is provided, it will be returned to the pool for reuse.
Otherwise, if `nothing` is returned, or `release(pool)` is called,
just the "permit" will be returned to the pool.
the usage count will be decremented without an object being returned to the pool for reuse.
"""
function Base.release(pool::Pool{K, T}, key, obj::Union{T, Nothing}=nothing) where {K, T}
key isa K || keyerror(key, K)
Expand Down
99 changes: 94 additions & 5 deletions test/pools.jl
Original file line number Diff line number Diff line change
@@ -1,24 +1,49 @@
using ConcurrentUtilities, Test
using ConcurrentUtilities.Pools, Test

@testset "Pools" begin
pool_size = lengthPools.values
@testset "nonkeyed and pool basics" begin
pool = Pool{Int}(3)
@test keytype(pool) === Nothing
@test valtype(pool) === Int

@test Pools.limit(pool) == 3
@test Pools.in_use(pool) == 0
@test Pools.in_pool(pool) == 0

# acquire an object from the pool
x1 = acquire(() -> 1, pool)
# no existing objects in the pool, so our function was called to create a new one
@test x1 == 1
@test Pools.limit(pool) == 3
@test Pools.in_use(pool) == 1
@test Pools.in_pool(pool) == 0

# release back to the pool for reuse
release(pool, x1)
@test Pools.in_use(pool) == 0
@test Pools.in_pool(pool) == 1

# acquire another object from the pool
x1 = acquire(() -> 2, pool)
# this time, the pool had an existing object, so our function was not called
@test x1 == 1
@test Pools.in_use(pool) == 1
@test Pools.in_pool(pool) == 0

# but now there are no objects to reuse again, so the next acquire will call our function
x2 = acquire(() -> 2, pool)
@test x2 == 2
@test Pools.in_use(pool) == 2
@test Pools.in_pool(pool) == 0

x3 = acquire(() -> 3, pool)
@test x3 == 3
# the pool is now at capacity, so the next acquire will block until an object is released
@test Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 0

# the pool is now at `Pools.limit`, so the next acquire will block until an object is released
@test Pools.in_use(pool) == Pools.limit(pool)
tsk = @async acquire(() -> 4, pool; forcenew=true)
yield()
@test !istaskdone(tsk)
Expand All @@ -28,60 +53,110 @@ using ConcurrentUtilities, Test
x1 = fetch(tsk)
# even though we released 1 for reuse, we passed forcenew, so our function was called to create new
@test x1 == 4
@test Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 1

# error to try and provide a key to a non-keyed pool
@test_throws ArgumentError acquire(() -> 1, pool, 1)

# release objects back to the pool
release(pool, x1)
release(pool, x2)
release(pool, x3)
@test Pools.in_use(pool) == 0
@test Pools.in_pool(pool) == 4

# acquire an object, but checking isvalid
x1 = acquire(() -> 5, pool; isvalid=x -> x == 1)
@test x1 == 1
@test Pools.in_use(pool) == 1

# no valid objects, so our function was called to create a new one
x2 = acquire(() -> 6, pool; isvalid=x -> x == 1)
@test x2 == 6
# we have one slot left in the pool, we now throw while creating new
@test Pools.in_use(pool) == 2

# we have one permit left, we now throw while creating a new object
# and we want to test that the permit isn't permanently lost for the pool
@test_throws ErrorException acquire(() -> error("oops"), pool; forcenew=true)
@test Pools.in_use(pool) == 2

# we can still acquire a new object
x3 = acquire(() -> 7, pool; forcenew=true)
@test x3 == 7
@test Pools.in_use(pool) == 3

# release objects back to the pool
drain!(pool)
release(pool, x1)
release(pool, x2)
release(pool, x3)
@test Pools.in_use(pool) == 0
@test Pools.in_pool(pool) == 3

# try to do an invalid release
@test_throws ArgumentError release(pool, 10)

# test that the invalid release didn't push the object to our pool for reuse
x1 = acquire(() -> 8, pool)
@test x1 == 7
@test Pools.in_use(pool) == 1
@test Pools.in_pool(pool) == 2
# calling drain! removes all objects for reuse
drain!(pool)
@test Pools.in_use(pool) == 1
@test Pools.in_pool(pool) == 0

x2 = acquire(() -> 9, pool)
@test x2 == 9
@test Pools.in_use(pool) == 2
@test Pools.in_pool(pool) == 0
end

@testset "keyed pool" begin
# now test a keyed pool
pool = Pool{String, Int}(3)
@test keytype(pool) === String
@test valtype(pool) === Int

@test Pools.limit(pool) == 3
@test Pools.in_use(pool) == 0
@test Pools.in_pool(pool) == 0

# acquire an object from the pool
x1 = acquire(() -> 1, pool, "a")
# no existing objects in the pool, so our function was called to create a new one
@test x1 == 1
@test Pools.in_use(pool) == 1
@test Pools.in_pool(pool) == 0

# release back to the pool for reuse
release(pool, "a", x1)
@test Pools.in_use(pool) == 0
@test Pools.in_pool(pool) == 1

# test for a different key
x2 = acquire(() -> 2, pool, "b")
# there's an existing object, but for a different key, so we don't reuse
@test x2 == 2
@test Pools.in_use(pool) == 1
@test Pools.in_pool(pool) == 1

# acquire another object from the pool
x1 = acquire(() -> 2, pool, "a")
# this time, the pool had an existing object, so our function was not called
@test x1 == 1
@test Pools.in_use(pool) == 2
@test Pools.in_pool(pool) == 0

x3 = acquire(() -> 3, pool, "a")
@test x3 == 3
@test Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 0

# the pool is now at capacity, so the next acquire will block until an object is released
# even though we've acquired using different keys, the capacity is shared across the pool
@test Pools.in_use(pool) == Pools.limit(pool)
tsk = @async acquire(() -> 4, pool, "c"; forcenew=true)
yield()
@test !istaskdone(tsk)
Expand All @@ -91,13 +166,27 @@ using ConcurrentUtilities, Test
x1 = fetch(tsk)
# even though we released 1 for reuse, we passed forcenew, so our function was called to create new
@test x1 == 4
@test Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 1

# error to try and provide an invalid key to a keyed pool
@test_throws ArgumentError acquire(() -> 1, pool, 1)
# error to release an invalid key back to the pool
@test_throws KeyError release(pool, "z", 1)
@test Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 1

# error to *not* provide a key to a keyed pool
@test_throws ArgumentError acquire(() -> 1, pool)
@test Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 1

# error to *not* provide a key when releasing to a keyed pool
@test_throws ArgumentError release(pool)
@test Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 1

# error to release an invalid key back to the pool
@test_throws KeyError release(pool, "z", 1)
@test_broken Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 1
end
end

2 comments on commit 4bf9b3e

@nickrobinson251
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/94576

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v2.3.0 -m "<description of version>" 4bf9b3e74944f9122fe2796015250d22bf106f3d
git push origin v2.3.0

Please sign in to comment.