Skip to content

Commit

Permalink
Add optional force argument to refresh_continuous_aggregate
Browse files Browse the repository at this point in the history
Continuous aggregates that are built on top of tiered hypertables cannot
access and materialize tiered data when `timescaledb.enable_tiered_reads`
GUC is disabled at the server level. And there is no way for a user to
manually force the refresh with tiered reads enabled. Here we add an
optional `force` parameter to the `refresh_continuous_aggregate`
procedure that would allow user to partially re-materialize cagg within
a time window that has alreay been materialized.
  • Loading branch information
zilder committed Dec 5, 2024
1 parent 049714c commit af8c263
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 15 deletions.
3 changes: 2 additions & 1 deletion sql/ddl_api.sql
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ AS '@MODULE_PATHNAME@', 'ts_tablespace_show' LANGUAGE C VOLATILE STRICT;
CREATE OR REPLACE PROCEDURE @[email protected]_continuous_aggregate(
continuous_aggregate REGCLASS,
window_start "any",
window_end "any"
window_end "any",
force BOOLEAN = FALSE
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_continuous_agg_refresh';

11 changes: 11 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,14 @@ CREATE FUNCTION @[email protected]_columnstore_stats (hypertable REGCLASS)
STABLE STRICT
AS 'SELECT * FROM @[email protected]_compression_stats($1)'
SET search_path TO pg_catalog, pg_temp;

-- Recreate `refresh_continuous_aggregate` procedure to add `force` argument
DROP PROCEDURE IF EXISTS @[email protected]_continuous_aggregate (continuous_aggregate REGCLASS, window_start "any", window_end "any");

CREATE PROCEDURE @[email protected]_continuous_aggregate(
continuous_aggregate REGCLASS,
window_start "any",
window_end "any",
force BOOLEAN = FALSE
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_update_placeholder';

8 changes: 8 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,11 @@ ALTER EXTENSION timescaledb DROP VIEW timescaledb_information.chunk_columnstore_
DROP VIEW timescaledb_information.hypertable_columnstore_settings;
DROP VIEW timescaledb_information.chunk_columnstore_settings;

-- Recreate `refresh_continuous_aggregate` procedure to remove the `force` argument
DROP PROCEDURE IF EXISTS @[email protected]_continuous_aggregate (continuous_aggregate REGCLASS, window_start "any", window_end "any", force BOOLEAN);

CREATE PROCEDURE @[email protected]_continuous_aggregate(
continuous_aggregate REGCLASS,
window_start "any",
window_end "any"
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_continuous_agg_refresh';
3 changes: 2 additions & 1 deletion tsl/src/bgw_policy/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ policy_refresh_cagg_execute(int32 job_id, Jsonb *config)
&policy_data.refresh_window,
CAGG_REFRESH_POLICY,
policy_data.start_is_null,
policy_data.end_is_null);
policy_data.end_is_null,
false);

return true;
}
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/continuous_aggs/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,7 @@ tsl_process_continuous_agg_viewstmt(Node *node, const char *query_string, void *
refresh_window.start = cagg_get_time_min(cagg);
refresh_window.end = ts_time_get_noend_or_max(refresh_window.type);

continuous_agg_refresh_internal(cagg, &refresh_window, CAGG_REFRESH_CREATION, true, true);
continuous_agg_refresh_internal(cagg, &refresh_window, CAGG_REFRESH_CREATION, true, true, false);
}

return DDL_DONE;
Expand Down
24 changes: 20 additions & 4 deletions tsl/src/continuous_aggs/invalidation.c
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ static Invalidation cut_cagg_invalidation_and_compute_remainder(
const CaggInvalidationState *state, const InternalTimeRange *refresh_window,
const Invalidation *mergedentry, const Invalidation *current_remainder);
static void clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state,
const InternalTimeRange *refresh_window);
const InternalTimeRange *refresh_window,
const bool force);
static void invalidation_state_init(CaggInvalidationState *state, const ContinuousAgg *cagg,
Oid dimtype, const CaggsInfo *all_caggs);
static void invalidation_state_cleanup(const CaggInvalidationState *state);
Expand Down Expand Up @@ -878,7 +879,8 @@ cut_cagg_invalidation_and_compute_remainder(const CaggInvalidationState *state,
*/
static void
clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state,
const InternalTimeRange *refresh_window)
const InternalTimeRange *refresh_window,
const bool force)
{
ScanIterator iterator;
int32 cagg_hyper_id = state->mat_hypertable_id;
Expand All @@ -892,6 +894,19 @@ clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state,

MemoryContextReset(state->per_tuple_mctx);

/* Force refresh within the entire window */
if (force) {
Invalidation logentry;

logentry.hyper_id = cagg_hyper_id;
logentry.lowest_modified_value = refresh_window->start;
logentry.greatest_modified_value = refresh_window->end;
logentry.is_modified = false;
ItemPointerSet(&logentry.tid, InvalidBlockNumber, 0);

save_invalidation_for_refresh(state, &logentry);
}

/* Process all invalidations for the continuous aggregate */
ts_scanner_foreach(&iterator)
{
Expand Down Expand Up @@ -981,7 +996,8 @@ InvalidationStore *
invalidation_process_cagg_log(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window,
const CaggsInfo *all_caggs_info, const long max_materializations,
bool *do_merged_refresh, InternalTimeRange *ret_merged_refresh_window,
const CaggRefreshCallContext callctx)
const CaggRefreshCallContext callctx,
const bool force)
{
CaggInvalidationState state;
InvalidationStore *store = NULL;
Expand All @@ -991,7 +1007,7 @@ invalidation_process_cagg_log(const ContinuousAgg *cagg, const InternalTimeRange

invalidation_state_init(&state, cagg, refresh_window->type, all_caggs_info);
state.invalidations = tuplestore_begin_heap(false, false, work_mem);
clear_cagg_invalidations_for_refresh(&state, refresh_window);
clear_cagg_invalidations_for_refresh(&state, refresh_window, force);
count = tuplestore_tuple_count(state.invalidations);

if (count == 0)
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/continuous_aggs/invalidation.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ extern InvalidationStore *
invalidation_process_cagg_log(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window,
const CaggsInfo *all_caggs_info, const long max_materializations,
bool *do_merged_refresh, InternalTimeRange *ret_merged_refresh_window,
const CaggRefreshCallContext callctx);
const CaggRefreshCallContext callctx, const bool force);

extern void invalidation_store_free(InvalidationStore *store);
18 changes: 12 additions & 6 deletions tsl/src/continuous_aggs/refresh.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ static void emit_up_to_date_notice(const ContinuousAgg *cagg, const CaggRefreshC
static bool process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const CaggRefreshCallContext callctx,
int32 chunk_id);
int32 chunk_id,
const bool force);
static void fill_bucket_offset_origin(const ContinuousAgg *cagg,
const InternalTimeRange *const refresh_window,
NullableDatum *offset, NullableDatum *origin);
Expand Down Expand Up @@ -628,6 +629,7 @@ Datum
continuous_agg_refresh(PG_FUNCTION_ARGS)
{
Oid cagg_relid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
bool force = PG_ARGISNULL(3) ? false : PG_GETARG_BOOL(3);
ContinuousAgg *cagg;
InternalTimeRange refresh_window = {
.type = InvalidOid,
Expand Down Expand Up @@ -659,7 +661,8 @@ continuous_agg_refresh(PG_FUNCTION_ARGS)
&refresh_window,
CAGG_REFRESH_WINDOW,
PG_ARGISNULL(1),
PG_ARGISNULL(2));
PG_ARGISNULL(2),
force);

PG_RETURN_VOID();
}
Expand Down Expand Up @@ -703,7 +706,8 @@ continuous_agg_calculate_merged_refresh_window(const ContinuousAgg *cagg,
static bool
process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const CaggRefreshCallContext callctx, int32 chunk_id)
const CaggRefreshCallContext callctx, int32 chunk_id,
const bool force)
{
InvalidationStore *invalidations;
Oid hyper_relid = ts_hypertable_id_to_relid(cagg->data.mat_hypertable_id, false);
Expand All @@ -727,7 +731,8 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
ts_guc_cagg_max_individual_materializations,
&do_merged_refresh,
&merged_refresh_window,
callctx);
callctx,
force);

if (invalidations != NULL || do_merged_refresh)
{
Expand Down Expand Up @@ -759,7 +764,8 @@ void
continuous_agg_refresh_internal(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window_arg,
const CaggRefreshCallContext callctx, const bool start_isnull,
const bool end_isnull)
const bool end_isnull,
const bool force)
{
int32 mat_id = cagg->data.mat_hypertable_id;
InternalTimeRange refresh_window = *refresh_window_arg;
Expand Down Expand Up @@ -881,7 +887,7 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,

cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_id, false);

if (!process_cagg_invalidations_and_refresh(cagg, &refresh_window, callctx, INVALID_CHUNK_ID))
if (!process_cagg_invalidations_and_refresh(cagg, &refresh_window, callctx, INVALID_CHUNK_ID, force))
emit_up_to_date_notice(cagg, callctx);

/* Restore search_path */
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/continuous_aggs/refresh.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ extern void continuous_agg_calculate_merged_refresh_window(
extern void continuous_agg_refresh_internal(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const CaggRefreshCallContext callctx,
const bool start_isnull, const bool end_isnull);
const bool start_isnull, const bool end_isnull,
const bool force);
38 changes: 38 additions & 0 deletions tsl/test/expected/cagg_refresh.out
Original file line number Diff line number Diff line change
Expand Up @@ -489,3 +489,41 @@ SELECT time_bucket('7 days', time) AS day, device, avg(temp) AS avg_temp
FROM conditions
GROUP BY 1,2 WITH NO DATA;
COMMIT;
-- Test forceful refreshment. Here we simulate the situation that we've seen
-- with tiered data when `timescaledb.enable_tiered_reads` were disabled on the
-- server level. In that case we would not see materialized tiered data and
-- we wouldn't be able to re-materialize the data using a normal refresh call
-- because it would skip previously materialized ranges, but it should be
-- possible with `force=>true` parameter. To simulate this use-case we clear
-- the materialization hypertable and forefully re-materialize it.
SELECT ht.schema_name || '.' || ht.table_name AS mat_ht, mat_hypertable_id FROM _timescaledb_catalog.continuous_agg cagg
JOIN _timescaledb_catalog.hypertable ht ON cagg.mat_hypertable_id = ht.id
WHERE user_view_name = 'daily_temp' \gset
-- Delete the data from the materialization hypertable
DELETE FROM :mat_ht;
SET timezone TO UTC;
-- Run regular refresh, it should not touch previously materialized range
CALL refresh_continuous_aggregate('daily_temp', '2020-05-04 00:00 UTC', '2020-05-05 00:00 UTC');
NOTICE: continuous aggregate "daily_temp" is already up-to-date
SELECT * FROM daily_temp
ORDER BY day, device;
day | device | avg_temp
-----+--------+----------
(0 rows)

-- Run it again with force=>true, the data should be rematerialized
CALL refresh_continuous_aggregate('daily_temp', '2020-05-04 00:00 UTC', '2020-05-05 00:00 UTC', force=>true);
SELECT * FROM daily_temp
ORDER BY day, device;
day | device | avg_temp
------------------------------+--------+------------------
Mon May 04 00:00:00 2020 UTC | 0 | 15.7647058823529
Mon May 04 00:00:00 2020 UTC | 1 | 24.3142857142857
Mon May 04 00:00:00 2020 UTC | 2 | 14.8205128205128
Mon May 04 00:00:00 2020 UTC | 3 | 18.1111111111111
Tue May 05 00:00:00 2020 UTC | 0 | 19.3846153846154
Tue May 05 00:00:00 2020 UTC | 1 | 16.5555555555556
Tue May 05 00:00:00 2020 UTC | 2 | 18.5714285714286
Tue May 05 00:00:00 2020 UTC | 3 | 23.5714285714286
(8 rows)

25 changes: 25 additions & 0 deletions tsl/test/sql/cagg_refresh.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,28 @@
SET timezone TO PST8PDT;

\ir include/cagg_refresh_common.sql

-- Test forceful refreshment. Here we simulate the situation that we've seen
-- with tiered data when `timescaledb.enable_tiered_reads` were disabled on the
-- server level. In that case we would not see materialized tiered data and
-- we wouldn't be able to re-materialize the data using a normal refresh call
-- because it would skip previously materialized ranges, but it should be
-- possible with `force=>true` parameter. To simulate this use-case we clear
-- the materialization hypertable and forefully re-materialize it.
SELECT ht.schema_name || '.' || ht.table_name AS mat_ht, mat_hypertable_id FROM _timescaledb_catalog.continuous_agg cagg
JOIN _timescaledb_catalog.hypertable ht ON cagg.mat_hypertable_id = ht.id
WHERE user_view_name = 'daily_temp' \gset

-- Delete the data from the materialization hypertable
DELETE FROM :mat_ht;

SET timezone TO UTC;
-- Run regular refresh, it should not touch previously materialized range
CALL refresh_continuous_aggregate('daily_temp', '2020-05-04 00:00 UTC', '2020-05-05 00:00 UTC');
SELECT * FROM daily_temp
ORDER BY day, device;
-- Run it again with force=>true, the data should be rematerialized
CALL refresh_continuous_aggregate('daily_temp', '2020-05-04 00:00 UTC', '2020-05-05 00:00 UTC', force=>true);
SELECT * FROM daily_temp
ORDER BY day, device;

0 comments on commit af8c263

Please sign in to comment.