From 5ecfc691f256ef10b52efae4f525d6413c2502b1 Mon Sep 17 00:00:00 2001 From: Alex Kasko Date: Wed, 1 Jan 2025 16:36:36 +0000 Subject: [PATCH] Improve transaction check in refresh_cagg Procedures that use multiple transactions cannot be run in a transaction block (from a function, from dynamic SQL) or in a subtransaction (from a procedure block with an EXCEPTION clause). Such procedures use PreventInTransactionBlock function to check whether they can be run. Though currently such checks are incompete, because PreventInTransactionBlock requires isTopLevel argument to throw a consistent error when the call originates from a function. This isTopLevel flag (that is a bit poorly named - see below) is not readily available inside C procedures. The source of truth for it - ProcessUtilityContext parameter is passed to ProcessUtility hooks, but is not included with the function calls. There is an undocumented SPI_inside_nonatomic_context function, that would have been sufficient for isTopLevel flag, but it currently returns false when SPI connection is absent (that is a valid scenario when C procedures are called from top-lelev SQL instead of PLPG procedures or DO blocks) so it cannot be used. To work around this the value of ProcessUtilityContext parameter is saved when TS ProcessUtility hook is entered and can be accessed from C procedures using new ts_process_utility_is_context_nonatomic function. The result is called "non-atomic" instead of "top-level" because the way how isTopLevel flag is determined from the ProcessUtilityContext value in standard_ProcessUtility is insufficient for C procedures - it excludes PROCESS_UTILITY_QUERY_NONATOMIC value (used when called from PLPG procedure without an EXCEPTION clause) that is a valid use case for C procedures with transactions. See details in the description of ExecuteCallStmt function. It is expected that calls to C procedures are done with CALL and always pass though the ProcessUtility hook. The ProcessUtilityContext parameter is set to PROCESS_UTILITY_TOPLEVEL value by default. In unlikely case when a C procedure is called without passing through ProcessUtility hook and the call is done in atomic context, then PreventInTransactionBlock checks will pass, but SPI_commit will fail when checking that all current active snapshots are portal-owned snapshots (the same behaviour that was observed before this change). In atomic context there will be an additional snapshot set in _SPI_execute_plan, see the snapshot handling invariants description in that function. Closes #6533. --- src/process_utility.c | 19 +++++ src/process_utility.h | 53 +++++++++++++ tsl/src/continuous_aggs/refresh.c | 12 ++- tsl/test/expected/cagg_refresh.out | 65 ++++++++++++++++ .../expected/cagg_refresh_using_merge.out | 65 ++++++++++++++++ tsl/test/sql/include/cagg_refresh_common.sql | 74 +++++++++++++++++++ 6 files changed, 286 insertions(+), 2 deletions(-) diff --git a/src/process_utility.c b/src/process_utility.c index 1aee1d67243..1ef0e1cfffa 100644 --- a/src/process_utility.c +++ b/src/process_utility.c @@ -93,6 +93,7 @@ void _process_utility_fini(void); static ProcessUtility_hook_type prev_ProcessUtility_hook; static bool expect_chunk_modification = false; +static ProcessUtilityContext last_process_utility_context = PROCESS_UTILITY_TOPLEVEL; static DDLResult process_altertable_set_options(AlterTableCmd *cmd, Hypertable *ht); static DDLResult process_altertable_reset_options(AlterTableCmd *cmd, Hypertable *ht); @@ -5031,6 +5032,8 @@ timescaledb_ddl_command_start(PlannedStmt *pstmt, const char *query_string, bool QueryEnvironment *queryEnv, DestReceiver *dest, QueryCompletion *completion_tag) { + last_process_utility_context = context; + ProcessUtilityArgs args = { .query_string = query_string, .context = context, .params = params, @@ -5062,6 +5065,7 @@ timescaledb_ddl_command_start(PlannedStmt *pstmt, const char *query_string, bool if (altering_timescaledb || !ts_extension_is_loaded_and_not_upgrading()) { prev_ProcessUtility(&args); + ts_process_utility_context_reset(); return; } @@ -5086,6 +5090,8 @@ timescaledb_ddl_command_start(PlannedStmt *pstmt, const char *query_string, bool */ if (result == DDL_CONTINUE) prev_ProcessUtility(&args); + + ts_process_utility_context_reset(); } static void @@ -5156,6 +5162,19 @@ ts_process_utility_set_expect_chunk_modification(bool expect) expect_chunk_modification = expect; } +bool +ts_process_utility_is_context_nonatomic(void) +{ + ProcessUtilityContext context = last_process_utility_context; + return context == PROCESS_UTILITY_TOPLEVEL || context == PROCESS_UTILITY_QUERY_NONATOMIC; +} + +void +ts_process_utility_context_reset(void) +{ + last_process_utility_context = PROCESS_UTILITY_TOPLEVEL; +} + static void process_utility_xact_abort(XactEvent event, void *arg) { diff --git a/src/process_utility.h b/src/process_utility.h index 49f019abd85..3f5bad2eeb6 100644 --- a/src/process_utility.h +++ b/src/process_utility.h @@ -36,3 +36,56 @@ typedef enum typedef DDLResult (*ts_process_utility_handler_t)(ProcessUtilityArgs *args); extern void ts_process_utility_set_expect_chunk_modification(bool expect); + +/* + * Procedures that use multiple transactions cannot be run in a transaction + * block (from a function, from dynamic SQL) or in a subtransaction (from a + * procedure block with an EXCEPTION clause). Such procedures use + * PreventInTransactionBlock function to check whether they can be run. + * + * Though currently such checks are incompete, because + * PreventInTransactionBlock requires isTopLevel argument to throw a + * consistent error when the call originates from a function. This + * isTopLevel flag (that is a bit poorly named - see below) is not readily + * available inside C procedures. The source of truth for it - + * ProcessUtilityContext parameter is passed to ProcessUtility hooks, but + * is not included with the function calls. There is an undocumented + * SPI_inside_nonatomic_context function, that would have been sufficient + * for isTopLevel flag, but it currently returns false when SPI connection + * is absent (that is a valid scenario when C procedures are called from + * top-lelev SQL instead of PLPG procedures or DO blocks) so it cannot be + * used. + * + * To work around this the value of ProcessUtilityContext parameter is + * saved when TS ProcessUtility hook is entered and can be accessed from + * C procedures using new ts_process_utility_is_context_nonatomic function. + * The result is called "non-atomic" instead of "top-level" because the way + * how isTopLevel flag is determined from the ProcessUtilityContext value + * in standard_ProcessUtility is insufficient for C procedures - it + * excludes PROCESS_UTILITY_QUERY_NONATOMIC value (used when called from + * PLPG procedure without an EXCEPTION clause) that is a valid use case for + * C procedures with transactions. See details in the description of + * ExecuteCallStmt function. + * + * It is expected that calls to C procedures are done with CALL and always + * pass though the ProcessUtility hook. The ProcessUtilityContext + * parameter is set to PROCESS_UTILITY_TOPLEVEL value by default. In + * unlikely case when a C procedure is called without passing through + * ProcessUtility hook and the call is done in atomic context, then + * PreventInTransactionBlock checks will pass, but SPI_commit will fail + * when checking that all current active snapshots are portal-owned + * snapshots (the same behaviour that was observed before this change). + * In atomic context there will be an additional snapshot set in + * _SPI_execute_plan, see the snapshot handling invariants description + * in that function. + */ +extern TSDLLEXPORT bool ts_process_utility_is_context_nonatomic(void); + +/* + * Curently in TS ProcessUtility hook the saved ProcessUtilityContext + * value is reset back to PROCESS_UTILITY_TOPLEVEL on normal exit but + * is NOT reset in case of ereport exit. C procedures can call this + * function to reset the saved value before doing the checks that can + * result in ereport exit. + */ +extern TSDLLEXPORT void ts_process_utility_context_reset(void); diff --git a/tsl/src/continuous_aggs/refresh.c b/tsl/src/continuous_aggs/refresh.c index b6097dbc8c5..778de71a066 100644 --- a/tsl/src/continuous_aggs/refresh.c +++ b/tsl/src/continuous_aggs/refresh.c @@ -31,6 +31,7 @@ #include "invalidation.h" #include "invalidation_threshold.h" #include "materialize.h" +#include "process_utility.h" #include "refresh.h" #define CAGG_REFRESH_LOG_LEVEL (callctx == CAGG_REFRESH_POLICY ? LOG : DEBUG1) @@ -764,9 +765,16 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, int32 mat_id = cagg->data.mat_hypertable_id; InternalTimeRange refresh_window = *refresh_window_arg; int64 invalidation_threshold; + bool nonatomic = ts_process_utility_is_context_nonatomic(); + + /* Reset the saved ProcessUtilityContext value promptly before + * calling PreventCommandIfReadOnly so the potential unsupported + * (atomic) value won't linger there in case of ereport exit. + */ + ts_process_utility_context_reset(); /* Connect to SPI manager due to the underlying SPI calls */ - int rc = SPI_connect_ext(SPI_OPT_NONATOMIC); + int rc = SPI_connect_ext(nonatomic ? SPI_OPT_NONATOMIC : 0); if (rc != SPI_OK_CONNECT) elog(ERROR, "SPI_connect failed: %s", SPI_result_code_string(rc)); @@ -789,7 +797,7 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, * invalidation threshold needs no update. However, materialization might * still take a long time and it is probably best for consistency to always * prevent transaction blocks. */ - PreventInTransactionBlock(true, REFRESH_FUNCTION_NAME); + PreventInTransactionBlock(nonatomic, REFRESH_FUNCTION_NAME); /* No bucketing when open ended */ if (!(start_isnull && end_isnull)) diff --git a/tsl/test/expected/cagg_refresh.out b/tsl/test/expected/cagg_refresh.out index 291ce335272..395f363c66f 100644 --- a/tsl/test/expected/cagg_refresh.out +++ b/tsl/test/expected/cagg_refresh.out @@ -489,3 +489,68 @@ SELECT time_bucket('7 days', time) AS day, device, avg(temp) AS avg_temp FROM conditions GROUP BY 1,2 WITH NO DATA; COMMIT; +-- refresh_continuous_aggregate can run two transactions, thus it cannot be +-- called in a transaction block (from a function, from dynamic SQL) or in a +-- subtransaction (from a procedure block with an EXCEPTION clause). Though it +-- does NOT require a top level context and can be called from a procedure +-- block without an EXCEPTION clause. +-- DO block +DO $$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +END; $$; +psql:include/cagg_refresh_common.sql:320: NOTICE: continuous aggregate "daily_temp" is already up-to-date +-- Procedure without subtransaction +CREATE OR REPLACE PROCEDURE refresh_cagg_proc_normal() +LANGUAGE PLPGSQL AS +$$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +END; $$; +CALL refresh_cagg_proc_normal(); +psql:include/cagg_refresh_common.sql:330: NOTICE: continuous aggregate "daily_temp" is already up-to-date +\set ON_ERROR_STOP 0 +-- Procedure with subtransaction +CREATE OR REPLACE PROCEDURE refresh_cagg_proc_subtransaction() +LANGUAGE PLPGSQL AS +$$ +DECLARE + errmsg TEXT; +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +EXCEPTION WHEN OTHERS THEN + GET STACKED DIAGNOSTICS errmsg = MESSAGE_TEXT; + RAISE EXCEPTION '%', errmsg; +END; $$; +CALL refresh_cagg_proc_subtransaction(); +psql:include/cagg_refresh_common.sql:347: ERROR: refresh_continuous_aggregate() cannot run inside a transaction block +-- Function +CREATE OR REPLACE FUNCTION refresh_cagg_fun() +RETURNS INT LANGUAGE PLPGSQL AS +$$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); + RETURN 1; +END; $$; +SELECT * from refresh_cagg_fun(); +psql:include/cagg_refresh_common.sql:358: ERROR: refresh_continuous_aggregate() cannot be executed from a function +-- Dynamic SQL +DO $$ +BEGIN + EXECUTE $inner$ + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); + $inner$; +END; $$; +psql:include/cagg_refresh_common.sql:366: ERROR: refresh_continuous_aggregate() cannot be executed from a function +-- Trigger +CREATE TABLE refresh_cagg_trigger_table(a int); +CREATE FUNCTION refresh_cagg_trigger_fun() +RETURNS TRIGGER LANGUAGE PLPGSQL AS $$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +END; $$; +CREATE TRIGGER refresh_cagg_trigger AFTER INSERT ON refresh_cagg_trigger_table +EXECUTE FUNCTION refresh_cagg_trigger_fun(); +INSERT INTO refresh_cagg_trigger_table VALUES(1); +psql:include/cagg_refresh_common.sql:380: ERROR: refresh_continuous_aggregate() cannot be executed from a function +\set ON_ERROR_STOP 1 diff --git a/tsl/test/expected/cagg_refresh_using_merge.out b/tsl/test/expected/cagg_refresh_using_merge.out index 6ad8fa3a46b..a79ea765b5d 100644 --- a/tsl/test/expected/cagg_refresh_using_merge.out +++ b/tsl/test/expected/cagg_refresh_using_merge.out @@ -490,6 +490,71 @@ SELECT time_bucket('7 days', time) AS day, device, avg(temp) AS avg_temp FROM conditions GROUP BY 1,2 WITH NO DATA; COMMIT; +-- refresh_continuous_aggregate can run two transactions, thus it cannot be +-- called in a transaction block (from a function, from dynamic SQL) or in a +-- subtransaction (from a procedure block with an EXCEPTION clause). Though it +-- does NOT require a top level context and can be called from a procedure +-- block without an EXCEPTION clause. +-- DO block +DO $$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +END; $$; +psql:include/cagg_refresh_common.sql:320: NOTICE: continuous aggregate "daily_temp" is already up-to-date +-- Procedure without subtransaction +CREATE OR REPLACE PROCEDURE refresh_cagg_proc_normal() +LANGUAGE PLPGSQL AS +$$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +END; $$; +CALL refresh_cagg_proc_normal(); +psql:include/cagg_refresh_common.sql:330: NOTICE: continuous aggregate "daily_temp" is already up-to-date +\set ON_ERROR_STOP 0 +-- Procedure with subtransaction +CREATE OR REPLACE PROCEDURE refresh_cagg_proc_subtransaction() +LANGUAGE PLPGSQL AS +$$ +DECLARE + errmsg TEXT; +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +EXCEPTION WHEN OTHERS THEN + GET STACKED DIAGNOSTICS errmsg = MESSAGE_TEXT; + RAISE EXCEPTION '%', errmsg; +END; $$; +CALL refresh_cagg_proc_subtransaction(); +psql:include/cagg_refresh_common.sql:347: ERROR: refresh_continuous_aggregate() cannot run inside a transaction block +-- Function +CREATE OR REPLACE FUNCTION refresh_cagg_fun() +RETURNS INT LANGUAGE PLPGSQL AS +$$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); + RETURN 1; +END; $$; +SELECT * from refresh_cagg_fun(); +psql:include/cagg_refresh_common.sql:358: ERROR: refresh_continuous_aggregate() cannot be executed from a function +-- Dynamic SQL +DO $$ +BEGIN + EXECUTE $inner$ + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); + $inner$; +END; $$; +psql:include/cagg_refresh_common.sql:366: ERROR: refresh_continuous_aggregate() cannot be executed from a function +-- Trigger +CREATE TABLE refresh_cagg_trigger_table(a int); +CREATE FUNCTION refresh_cagg_trigger_fun() +RETURNS TRIGGER LANGUAGE PLPGSQL AS $$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +END; $$; +CREATE TRIGGER refresh_cagg_trigger AFTER INSERT ON refresh_cagg_trigger_table +EXECUTE FUNCTION refresh_cagg_trigger_fun(); +INSERT INTO refresh_cagg_trigger_table VALUES(1); +psql:include/cagg_refresh_common.sql:380: ERROR: refresh_continuous_aggregate() cannot be executed from a function +\set ON_ERROR_STOP 1 -- Additional tests for MERGE refresh DROP TABLE conditions CASCADE; NOTICE: drop cascades to 10 other objects diff --git a/tsl/test/sql/include/cagg_refresh_common.sql b/tsl/test/sql/include/cagg_refresh_common.sql index 83400d9a245..b5b47928d3c 100644 --- a/tsl/test/sql/include/cagg_refresh_common.sql +++ b/tsl/test/sql/include/cagg_refresh_common.sql @@ -306,3 +306,77 @@ SELECT time_bucket('7 days', time) AS day, device, avg(temp) AS avg_temp FROM conditions GROUP BY 1,2 WITH NO DATA; COMMIT; + +-- refresh_continuous_aggregate can run two transactions, thus it cannot be +-- called in a transaction block (from a function, from dynamic SQL) or in a +-- subtransaction (from a procedure block with an EXCEPTION clause). Though it +-- does NOT require a top level context and can be called from a procedure +-- block without an EXCEPTION clause. + +-- DO block +DO $$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +END; $$; + +-- Procedure without subtransaction +CREATE OR REPLACE PROCEDURE refresh_cagg_proc_normal() +LANGUAGE PLPGSQL AS +$$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +END; $$; + +CALL refresh_cagg_proc_normal(); + +\set ON_ERROR_STOP 0 + +-- Procedure with subtransaction +CREATE OR REPLACE PROCEDURE refresh_cagg_proc_subtransaction() +LANGUAGE PLPGSQL AS +$$ +DECLARE + errmsg TEXT; +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +EXCEPTION WHEN OTHERS THEN + GET STACKED DIAGNOSTICS errmsg = MESSAGE_TEXT; + RAISE EXCEPTION '%', errmsg; +END; $$; + +CALL refresh_cagg_proc_subtransaction(); + +-- Function +CREATE OR REPLACE FUNCTION refresh_cagg_fun() +RETURNS INT LANGUAGE PLPGSQL AS +$$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); + RETURN 1; +END; $$; + +SELECT * from refresh_cagg_fun(); + +-- Dynamic SQL +DO $$ +BEGIN + EXECUTE $inner$ + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); + $inner$; +END; $$; + +-- Trigger +CREATE TABLE refresh_cagg_trigger_table(a int); + +CREATE FUNCTION refresh_cagg_trigger_fun() +RETURNS TRIGGER LANGUAGE PLPGSQL AS $$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +END; $$; + +CREATE TRIGGER refresh_cagg_trigger AFTER INSERT ON refresh_cagg_trigger_table +EXECUTE FUNCTION refresh_cagg_trigger_fun(); + +INSERT INTO refresh_cagg_trigger_table VALUES(1); + +\set ON_ERROR_STOP 1