Skip to content

Commit

Permalink
Fix: Make has_time_series faster
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-thom committed Dec 7, 2024
1 parent cd78bac commit e2b7959
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 79 deletions.
15 changes: 12 additions & 3 deletions src/time_series_interface.jl
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ Return true if the component or supplemental attribute has time series data.
function has_time_series(owner::TimeSeriesOwners)
mgr = get_time_series_manager(owner)
isnothing(mgr) && return false
return has_time_series(mgr, owner)
return has_metadata(mgr.metadata_store, owner)
end

"""
Expand All @@ -774,7 +774,16 @@ function has_time_series(
) where {T <: TimeSeriesData}
mgr = get_time_series_manager(val)
isnothing(mgr) && return false
return has_time_series(mgr.metadata_store, val, T)
return has_metadata(mgr.metadata_store, val, T)
end

function has_time_series(
val::TimeSeriesOwners,
name::AbstractString,
)
mgr = get_time_series_manager(val)
isnothing(mgr) && return false
return has_metadata(mgr.metadata_store, val, name)
end

function has_time_series(
Expand All @@ -785,7 +794,7 @@ function has_time_series(
) where {T <: TimeSeriesData}
mgr = get_time_series_manager(val)
isnothing(mgr) && return false
return has_time_series(mgr.metadata_store, val, T, name; features...)
return has_metadata(mgr.metadata_store, val, T, name; features...)
end

has_time_series(
Expand Down
6 changes: 1 addition & 5 deletions src/time_series_manager.jl
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,6 @@ function clear_time_series!(mgr::TimeSeriesManager, component::TimeSeriesOwners)
return
end

has_time_series(mgr::TimeSeriesManager, component::TimeSeriesOwners) =
has_time_series(mgr.metadata_store, component)
has_time_series(::Nothing, component::TimeSeriesOwners) = false

get_metadata(
mgr::TimeSeriesManager,
component::TimeSeriesOwners,
Expand Down Expand Up @@ -227,7 +223,7 @@ function remove_time_series!(
end

function _remove_data_if_no_more_references(mgr::TimeSeriesManager, uuid::Base.UUID)
if !has_time_series(mgr.metadata_store, uuid)
if !has_metadata(mgr.metadata_store, uuid)
remove_time_series!(mgr.data_store, uuid)
@debug "Removed time_series data $uuid." _group = LOG_GROUP_TIME_SERIES
end
Expand Down
163 changes: 94 additions & 69 deletions src/time_series_metadata_store.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ const DB_FILENAME = "time_series_metadata.db"

mutable struct TimeSeriesMetadataStore
db::SQLite.DB
# If we don't cache SQL statements, there is a cost of 3-4 us on every query.
# DBInterface.jl does something similar with @prepare. We need this to be tied
# to our connection.
cached_statements::Dict{String, SQLite.Stmt}
# If you add any fields, ensure they are managed in deepcopy_internal below.
end

Expand All @@ -12,7 +16,7 @@ Construct a new TimeSeriesMetadataStore with an in-memory database.
function TimeSeriesMetadataStore()
# This metadata is not expected to exceed system memory, so create an in-memory
# database so that it is faster. This could be changed.
store = TimeSeriesMetadataStore(SQLite.DB())
store = TimeSeriesMetadataStore(SQLite.DB(), Dict{String, SQLite.Stmt}())
_create_metadata_table!(store)
_create_indexes!(store)
@debug "Initialized new time series metadata table" _group = LOG_GROUP_TIME_SERIES
Expand All @@ -26,7 +30,10 @@ function TimeSeriesMetadataStore(filename::AbstractString)
src = SQLite.DB(filename)
db = SQLite.DB()
backup(db, src)
store = TimeSeriesMetadataStore(db)
store = TimeSeriesMetadataStore(db, Dict{Base.UUID, TimeSeriesMetadata}())
if isempty(sql(store, "PRAGMA index_list(time_series_metadata)"))
_create_indexes!(store)

Check warning on line 35 in src/time_series_metadata_store.jl

View check run for this annotation

Codecov / codecov/patch

src/time_series_metadata_store.jl#L35

Added line #L35 was not covered by tests
end
@debug "Loaded time series metadata from file" _group = LOG_GROUP_TIME_SERIES filename
return store
end
Expand Down Expand Up @@ -108,8 +115,10 @@ function Base.deepcopy_internal(store::TimeSeriesMetadataStore, dict::IdDict)

new_db = SQLite.DB()
backup(new_db, store.db)
new_store = TimeSeriesMetadataStore(new_db)
new_cached_statements = deepcopy(store.cached_statements)
new_store = TimeSeriesMetadataStore(new_db, new_cached_statements)
dict[store] = new_store
dict[store.cached_statements] = new_cached_statements
return new_store
end

Expand All @@ -135,8 +144,8 @@ function add_metadata!(
features,
)
params = repeat("?,", length(vals) - 1) * "jsonb(?)"
SQLite.DBInterface.execute(
store.db,
_execute(
store,
"INSERT INTO $METADATA_TABLE_NAME VALUES($params)",
vals,
)
Expand Down Expand Up @@ -541,85 +550,102 @@ function get_time_series_summary_table(store::TimeSeriesMetadataStore)
return DataFrame(_execute(store, query))
end

"""
Return True if there is time series metadata matching the inputs.
"""
function has_metadata(
store::TimeSeriesMetadataStore,
owner::TimeSeriesOwners,
metadata::TimeSeriesMetadata,
)
features = Dict(Symbol(k) => v for (k, v) in get_features(metadata))
return _try_has_time_series_metadata_by_full_params(
store,
owner,
time_series_metadata_to_data(metadata),
get_name(metadata);
features...,
)
query = """
SELECT id FROM
$METADATA_TABLE_NAME
WHERE owner_uuid = ?
LIMIT 1
"""
params = (string(get_uuid(owner)),)
return _has_metadata(store, query, params)
end

function has_metadata(
store::TimeSeriesMetadataStore,
owner::TimeSeriesOwners,
time_series_type::Type{<:TimeSeriesData},
name::String;
features...,
)
if _try_has_time_series_metadata_by_full_params(
store,
owner,
time_series_type,
name;
features...,
)
return true
end

where_clause, params = _make_where_clause(
owner;
time_series_type = time_series_type,
name = name,
features...,
)
query = "SELECT COUNT(*) AS count FROM $METADATA_TABLE_NAME $where_clause"
return _execute_count(store, query, params) > 0
end

"""
Return True if there is time series matching the UUID.
"""
function has_time_series(store::TimeSeriesMetadataStore, time_series_uuid::Base.UUID)
where_clause = "WHERE time_series_uuid = ?"
params = [string(time_series_uuid)]
return _has_time_series(store, where_clause, params)
query = """
SELECT id FROM
$METADATA_TABLE_NAME
WHERE owner_uuid = ? AND time_series_type = ?
LIMIT 1
"""
params = (string(get_uuid(owner)), _convert_ts_type_to_string(time_series_type))
return _has_metadata(store, query, params)
end

function has_time_series(
function has_metadata(
store::TimeSeriesMetadataStore,
owner::TimeSeriesOwners,
name::AbstractString,
)
where_clause = "WHERE owner_uuid = ?"
params = [string(get_uuid(owner))]
return _has_time_series(store, where_clause, params)
query = """
SELECT id FROM
$METADATA_TABLE_NAME
WHERE owner_uuid = ? AND name = ?
LIMIT 1
"""
params = (string(get_uuid(owner)), name)
return _has_metadata(store, query, params)
end

function has_time_series(
function has_metadata(
store::TimeSeriesMetadataStore,
owner::TimeSeriesOwners,
time_series_type::Type{<:TimeSeriesData},
name::String;
features...,
)
where_clause, params = _make_where_clause(owner; time_series_type = time_series_type)
return _has_time_series(store, where_clause, params)
if isempty(features)
params =
(string(get_uuid(owner)), _convert_ts_type_to_string(time_series_type), name)
query = """
SELECT id FROM
$METADATA_TABLE_NAME
WHERE owner_uuid = ? AND time_series_type = ? AND name = ?
LIMIT 1
"""
return _has_metadata(store, query, params)
end

query = """
SELECT id FROM
$METADATA_TABLE_NAME
WHERE owner_uuid = ? AND time_series_type = ? AND name = ? AND features = ?
LIMIT 1
"""
params = Vector{Any}([
string(get_uuid(owner)),
_convert_ts_type_to_string(time_series_type),
name,
])
feature_str = _make_feature_filter!(params; features...)
query = """
SELECT id FROM
$METADATA_TABLE_NAME
WHERE owner_uuid = ? AND time_series_type = ? AND name = ? AND ($feature_str)
LIMIT 1
"""
return _has_metadata(store, query, params)
end

has_time_series(
store::TimeSeriesMetadataStore,
owner::TimeSeriesOwners,
time_series_type::Type{<:TimeSeriesData},
name::String;
features...,
) = has_metadata(store, owner, time_series_type, name; features...)
"""
Return True if there is time series matching the UUID.
"""
function has_metadata(store::TimeSeriesMetadataStore, time_series_uuid::Base.UUID)
query = "SELECT id FROM $METADATA_TABLE_NAME WHERE time_series_uuid = ?"
params = (string(time_series_uuid),)
return _has_metadata(store, query, params)
end

function _has_metadata(store::TimeSeriesMetadataStore, query::String, params)
return !isempty(_execute(store, query, params))
end

"""
Return a sorted Vector of distinct resolutions for all time series of the given type
Expand Down Expand Up @@ -678,7 +704,7 @@ function list_existing_metadata(
end
where_clause = join(where_clauses, " OR ")
query = """
SELECT
SELECT
time_series_type
,name
,owner_uuid
Expand All @@ -702,7 +728,7 @@ function list_existing_time_series_uuids(store::TimeSeriesMetadataStore, uuids)
query = """
SELECT
DISTINCT time_series_uuid
FROM $METADATA_TABLE_NAME
FROM $METADATA_TABLE_NAME
WHERE time_series_uuid IN ($placeholder)
"""
table = Tables.columntable(_execute(store, query, uuids_str))
Expand Down Expand Up @@ -751,7 +777,7 @@ function list_metadata(
end

"""
Return a Vector of NamedTuple of owner UUID and time series metadata matching the inputs.
Return a Vector of NamedTuple of owner UUID and time series metadata matching the inputs.
"""
function list_metadata_with_owner_uuid(
store::TimeSeriesMetadataStore,
Expand Down Expand Up @@ -954,16 +980,15 @@ function _create_row(
)
end

function make_stmt(store::TimeSeriesMetadataStore, query::String)
return get!(() -> SQLite.Stmt(store.db, query), store.cached_statements, query)
end

_execute(s::TimeSeriesMetadataStore, q, p = nothing) =
execute(s.db, q, p, LOG_GROUP_TIME_SERIES)
execute(make_stmt(s, q), p, LOG_GROUP_TIME_SERIES)
_execute_count(s::TimeSeriesMetadataStore, q, p = nothing) =
execute_count(s.db, q, p, LOG_GROUP_TIME_SERIES)

function _has_time_series(store::TimeSeriesMetadataStore, where_clause::String, params)
query = "SELECT COUNT(*) AS count FROM $METADATA_TABLE_NAME $where_clause"
return _execute_count(store, query, params) > 0
end

function _remove_metadata!(
store::TimeSeriesMetadataStore,
where_clause::AbstractString,
Expand Down
27 changes: 26 additions & 1 deletion src/utils/sqlite.jl
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ function backup(
end
end

const STATEMENT_CACHE = Dict{String, SQLite.Stmt}()

"""
Wrapper around SQLite.DBInterface.execute to provide log messages.
Wrapper around SQLite.DBInterface.execute to provide caching of compiled statements
as well as log messages.
"""
function execute(
db::SQLite.DB,
Expand All @@ -48,6 +51,8 @@ function execute(
log_group::Symbol,
)
@debug "Execute SQL" _group = log_group query params
# If we don't cache these statements, there is a cost of 3-4 us on every query.
#stmt = get!(() -> SQLite.Stmt(db, query), STATEMENT_CACHE, query)
try
return if isnothing(params)
SQLite.DBInterface.execute(db, query)
Expand All @@ -60,6 +65,26 @@ function execute(
end
end

function execute(
stmt::SQLite.Stmt,
params::Union{Nothing, Vector, Tuple},
log_group::Symbol,
)
@debug "Execute SQL" _group = log_group params
# If we don't cache these statements, there is a cost of 3-4 us on every query.
#stmt = get!(() -> SQLite.Stmt(db, query), STATEMENT_CACHE, query)
try
return if isnothing(params)
SQLite.DBInterface.execute(stmt)
else
SQLite.DBInterface.execute(stmt, params)
end
catch
@error "Failed to send SQL query" params
rethrow()

Check warning on line 84 in src/utils/sqlite.jl

View check run for this annotation

Codecov / codecov/patch

src/utils/sqlite.jl#L83-L84

Added lines #L83 - L84 were not covered by tests
end
end

"""
Run a query to find a count. The query must produce a column called count with one row.
"""
Expand Down
3 changes: 2 additions & 1 deletion test/test_time_series.jl
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ end
component = IS.TestComponent(component_name, 5)
IS.add_component!(sys, component)
forecast = IS.Deterministic(name, data, resolution)
@test IS.get_initial_timestamp(forecast) == initial_time
#@test IS.get_initial_timestamp(forecast) == initial_time
IS.add_time_series!(sys, component, forecast)
@test IS.has_time_series(component)

Expand Down Expand Up @@ -497,6 +497,7 @@ end
) isa IS.SingleTimeSeries
@test IS.has_time_series(component, IS.SingleTimeSeries)
@test IS.has_time_series(component, IS.SingleTimeSeries, ts_name)
@test IS.has_time_series(component, ts_name)
@test IS.has_time_series(component, IS.SingleTimeSeries, ts_name, scenario = "low")
@test IS.has_time_series(
component,
Expand Down

0 comments on commit e2b7959

Please sign in to comment.