Skip to content

Commit

Permalink
Improve transaction check in refresh_cagg
Browse files Browse the repository at this point in the history
Procedures that use multiple transactions cannot be run in a transaction
block (from a function, from dynamic SQL) or in a subtransaction (from a
procedure block with an EXCEPTION clause). Such procedures use
PreventInTransactionBlock function to check whether they can be run.

Though currently such checks are incompete, because
PreventInTransactionBlock requires isTopLevel argument to throw a
consistent error when the call originates from a function. This
isTopLevel flag (that is a bit poorly named - see below) is not readily
available inside C procedures. The source of truth for it -
ProcessUtilityContext parameter is passed to ProcessUtility hooks, but
is not included with the function calls. There is an undocumented
SPI_inside_nonatomic_context function, that would have been sufficient
for isTopLevel flag, but it currently returns false when SPI connection
is absent (that is a valid scenario when C procedures are called from
top-lelev SQL instead of PLPG procedures or DO blocks) so it cannot be
used.

To work around this the value of ProcessUtilityContext parameter is
saved when TS ProcessUtility hook is entered and can be accessed from
C procedures using new ts_process_utility_is_context_nonatomic function.
The result is called "non-atomic" instead of "top-level" because the way
how isTopLevel flag is determined from the ProcessUtilityContext value
in standard_ProcessUtility is insufficient for C procedures - it
excludes PROCESS_UTILITY_QUERY_NONATOMIC value (used when called from
PLPG procedure without an EXCEPTION clause) that is a valid use case for
C procedures with transactions. See details in the description of
ExecuteCallStmt function.

It is expected that calls to C procedures are done with CALL and always
pass though the ProcessUtility hook. The ProcessUtilityContext
parameter is set to PROCESS_UTILITY_TOPLEVEL value by default. In
unlikely case when a C procedure is called without passing through
ProcessUtility hook and the call is done in atomic context, then
PreventInTransactionBlock checks will pass, but SPI_commit will fail
when checking that all current active snapshots are portal-owned
snapshots (the same behaviour that was observed before this change).
In atomic context there will be an additional snapshot set in
_SPI_execute_plan, see the snapshot handling invariants description
in that function.

Closes #6533.
  • Loading branch information
staticlibs committed Jan 1, 2025
1 parent 27c4c02 commit 5ecfc69
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 2 deletions.
19 changes: 19 additions & 0 deletions src/process_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ void _process_utility_fini(void);
static ProcessUtility_hook_type prev_ProcessUtility_hook;

static bool expect_chunk_modification = false;
static ProcessUtilityContext last_process_utility_context = PROCESS_UTILITY_TOPLEVEL;
static DDLResult process_altertable_set_options(AlterTableCmd *cmd, Hypertable *ht);
static DDLResult process_altertable_reset_options(AlterTableCmd *cmd, Hypertable *ht);

Expand Down Expand Up @@ -5031,6 +5032,8 @@ timescaledb_ddl_command_start(PlannedStmt *pstmt, const char *query_string, bool
QueryEnvironment *queryEnv, DestReceiver *dest,
QueryCompletion *completion_tag)
{
last_process_utility_context = context;

ProcessUtilityArgs args = { .query_string = query_string,
.context = context,
.params = params,
Expand Down Expand Up @@ -5062,6 +5065,7 @@ timescaledb_ddl_command_start(PlannedStmt *pstmt, const char *query_string, bool
if (altering_timescaledb || !ts_extension_is_loaded_and_not_upgrading())
{
prev_ProcessUtility(&args);
ts_process_utility_context_reset();
return;
}

Expand All @@ -5086,6 +5090,8 @@ timescaledb_ddl_command_start(PlannedStmt *pstmt, const char *query_string, bool
*/
if (result == DDL_CONTINUE)
prev_ProcessUtility(&args);

ts_process_utility_context_reset();
}

static void
Expand Down Expand Up @@ -5156,6 +5162,19 @@ ts_process_utility_set_expect_chunk_modification(bool expect)
expect_chunk_modification = expect;
}

bool
ts_process_utility_is_context_nonatomic(void)
{
ProcessUtilityContext context = last_process_utility_context;
return context == PROCESS_UTILITY_TOPLEVEL || context == PROCESS_UTILITY_QUERY_NONATOMIC;
}

void
ts_process_utility_context_reset(void)
{
last_process_utility_context = PROCESS_UTILITY_TOPLEVEL;
}

static void
process_utility_xact_abort(XactEvent event, void *arg)
{
Expand Down
53 changes: 53 additions & 0 deletions src/process_utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,56 @@ typedef enum
typedef DDLResult (*ts_process_utility_handler_t)(ProcessUtilityArgs *args);

extern void ts_process_utility_set_expect_chunk_modification(bool expect);

/*
* Procedures that use multiple transactions cannot be run in a transaction
* block (from a function, from dynamic SQL) or in a subtransaction (from a
* procedure block with an EXCEPTION clause). Such procedures use
* PreventInTransactionBlock function to check whether they can be run.
*
* Though currently such checks are incompete, because
* PreventInTransactionBlock requires isTopLevel argument to throw a
* consistent error when the call originates from a function. This
* isTopLevel flag (that is a bit poorly named - see below) is not readily
* available inside C procedures. The source of truth for it -
* ProcessUtilityContext parameter is passed to ProcessUtility hooks, but
* is not included with the function calls. There is an undocumented
* SPI_inside_nonatomic_context function, that would have been sufficient
* for isTopLevel flag, but it currently returns false when SPI connection
* is absent (that is a valid scenario when C procedures are called from
* top-lelev SQL instead of PLPG procedures or DO blocks) so it cannot be
* used.
*
* To work around this the value of ProcessUtilityContext parameter is
* saved when TS ProcessUtility hook is entered and can be accessed from
* C procedures using new ts_process_utility_is_context_nonatomic function.
* The result is called "non-atomic" instead of "top-level" because the way
* how isTopLevel flag is determined from the ProcessUtilityContext value
* in standard_ProcessUtility is insufficient for C procedures - it
* excludes PROCESS_UTILITY_QUERY_NONATOMIC value (used when called from
* PLPG procedure without an EXCEPTION clause) that is a valid use case for
* C procedures with transactions. See details in the description of
* ExecuteCallStmt function.
*
* It is expected that calls to C procedures are done with CALL and always
* pass though the ProcessUtility hook. The ProcessUtilityContext
* parameter is set to PROCESS_UTILITY_TOPLEVEL value by default. In
* unlikely case when a C procedure is called without passing through
* ProcessUtility hook and the call is done in atomic context, then
* PreventInTransactionBlock checks will pass, but SPI_commit will fail
* when checking that all current active snapshots are portal-owned
* snapshots (the same behaviour that was observed before this change).
* In atomic context there will be an additional snapshot set in
* _SPI_execute_plan, see the snapshot handling invariants description
* in that function.
*/
extern TSDLLEXPORT bool ts_process_utility_is_context_nonatomic(void);

/*
* Curently in TS ProcessUtility hook the saved ProcessUtilityContext
* value is reset back to PROCESS_UTILITY_TOPLEVEL on normal exit but
* is NOT reset in case of ereport exit. C procedures can call this
* function to reset the saved value before doing the checks that can
* result in ereport exit.
*/
extern TSDLLEXPORT void ts_process_utility_context_reset(void);
12 changes: 10 additions & 2 deletions tsl/src/continuous_aggs/refresh.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "invalidation.h"
#include "invalidation_threshold.h"
#include "materialize.h"
#include "process_utility.h"
#include "refresh.h"

#define CAGG_REFRESH_LOG_LEVEL (callctx == CAGG_REFRESH_POLICY ? LOG : DEBUG1)
Expand Down Expand Up @@ -764,9 +765,16 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
int32 mat_id = cagg->data.mat_hypertable_id;
InternalTimeRange refresh_window = *refresh_window_arg;
int64 invalidation_threshold;
bool nonatomic = ts_process_utility_is_context_nonatomic();

/* Reset the saved ProcessUtilityContext value promptly before
* calling PreventCommandIfReadOnly so the potential unsupported
* (atomic) value won't linger there in case of ereport exit.
*/
ts_process_utility_context_reset();

/* Connect to SPI manager due to the underlying SPI calls */
int rc = SPI_connect_ext(SPI_OPT_NONATOMIC);
int rc = SPI_connect_ext(nonatomic ? SPI_OPT_NONATOMIC : 0);
if (rc != SPI_OK_CONNECT)
elog(ERROR, "SPI_connect failed: %s", SPI_result_code_string(rc));

Expand All @@ -789,7 +797,7 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
* invalidation threshold needs no update. However, materialization might
* still take a long time and it is probably best for consistency to always
* prevent transaction blocks. */
PreventInTransactionBlock(true, REFRESH_FUNCTION_NAME);
PreventInTransactionBlock(nonatomic, REFRESH_FUNCTION_NAME);

/* No bucketing when open ended */
if (!(start_isnull && end_isnull))
Expand Down
65 changes: 65 additions & 0 deletions tsl/test/expected/cagg_refresh.out
Original file line number Diff line number Diff line change
Expand Up @@ -489,3 +489,68 @@ SELECT time_bucket('7 days', time) AS day, device, avg(temp) AS avg_temp
FROM conditions
GROUP BY 1,2 WITH NO DATA;
COMMIT;
-- refresh_continuous_aggregate can run two transactions, thus it cannot be
-- called in a transaction block (from a function, from dynamic SQL) or in a
-- subtransaction (from a procedure block with an EXCEPTION clause). Though it
-- does NOT require a top level context and can be called from a procedure
-- block without an EXCEPTION clause.
-- DO block
DO $$
BEGIN
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
END; $$;
psql:include/cagg_refresh_common.sql:320: NOTICE: continuous aggregate "daily_temp" is already up-to-date
-- Procedure without subtransaction
CREATE OR REPLACE PROCEDURE refresh_cagg_proc_normal()
LANGUAGE PLPGSQL AS
$$
BEGIN
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
END; $$;
CALL refresh_cagg_proc_normal();
psql:include/cagg_refresh_common.sql:330: NOTICE: continuous aggregate "daily_temp" is already up-to-date
\set ON_ERROR_STOP 0
-- Procedure with subtransaction
CREATE OR REPLACE PROCEDURE refresh_cagg_proc_subtransaction()
LANGUAGE PLPGSQL AS
$$
DECLARE
errmsg TEXT;
BEGIN
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS errmsg = MESSAGE_TEXT;
RAISE EXCEPTION '%', errmsg;
END; $$;
CALL refresh_cagg_proc_subtransaction();
psql:include/cagg_refresh_common.sql:347: ERROR: refresh_continuous_aggregate() cannot run inside a transaction block
-- Function
CREATE OR REPLACE FUNCTION refresh_cagg_fun()
RETURNS INT LANGUAGE PLPGSQL AS
$$
BEGIN
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
RETURN 1;
END; $$;
SELECT * from refresh_cagg_fun();
psql:include/cagg_refresh_common.sql:358: ERROR: refresh_continuous_aggregate() cannot be executed from a function
-- Dynamic SQL
DO $$
BEGIN
EXECUTE $inner$
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
$inner$;
END; $$;
psql:include/cagg_refresh_common.sql:366: ERROR: refresh_continuous_aggregate() cannot be executed from a function
-- Trigger
CREATE TABLE refresh_cagg_trigger_table(a int);
CREATE FUNCTION refresh_cagg_trigger_fun()
RETURNS TRIGGER LANGUAGE PLPGSQL AS $$
BEGIN
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
END; $$;
CREATE TRIGGER refresh_cagg_trigger AFTER INSERT ON refresh_cagg_trigger_table
EXECUTE FUNCTION refresh_cagg_trigger_fun();
INSERT INTO refresh_cagg_trigger_table VALUES(1);
psql:include/cagg_refresh_common.sql:380: ERROR: refresh_continuous_aggregate() cannot be executed from a function
\set ON_ERROR_STOP 1
65 changes: 65 additions & 0 deletions tsl/test/expected/cagg_refresh_using_merge.out
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,71 @@ SELECT time_bucket('7 days', time) AS day, device, avg(temp) AS avg_temp
FROM conditions
GROUP BY 1,2 WITH NO DATA;
COMMIT;
-- refresh_continuous_aggregate can run two transactions, thus it cannot be
-- called in a transaction block (from a function, from dynamic SQL) or in a
-- subtransaction (from a procedure block with an EXCEPTION clause). Though it
-- does NOT require a top level context and can be called from a procedure
-- block without an EXCEPTION clause.
-- DO block
DO $$
BEGIN
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
END; $$;
psql:include/cagg_refresh_common.sql:320: NOTICE: continuous aggregate "daily_temp" is already up-to-date
-- Procedure without subtransaction
CREATE OR REPLACE PROCEDURE refresh_cagg_proc_normal()
LANGUAGE PLPGSQL AS
$$
BEGIN
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
END; $$;
CALL refresh_cagg_proc_normal();
psql:include/cagg_refresh_common.sql:330: NOTICE: continuous aggregate "daily_temp" is already up-to-date
\set ON_ERROR_STOP 0
-- Procedure with subtransaction
CREATE OR REPLACE PROCEDURE refresh_cagg_proc_subtransaction()
LANGUAGE PLPGSQL AS
$$
DECLARE
errmsg TEXT;
BEGIN
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS errmsg = MESSAGE_TEXT;
RAISE EXCEPTION '%', errmsg;
END; $$;
CALL refresh_cagg_proc_subtransaction();
psql:include/cagg_refresh_common.sql:347: ERROR: refresh_continuous_aggregate() cannot run inside a transaction block
-- Function
CREATE OR REPLACE FUNCTION refresh_cagg_fun()
RETURNS INT LANGUAGE PLPGSQL AS
$$
BEGIN
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
RETURN 1;
END; $$;
SELECT * from refresh_cagg_fun();
psql:include/cagg_refresh_common.sql:358: ERROR: refresh_continuous_aggregate() cannot be executed from a function
-- Dynamic SQL
DO $$
BEGIN
EXECUTE $inner$
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
$inner$;
END; $$;
psql:include/cagg_refresh_common.sql:366: ERROR: refresh_continuous_aggregate() cannot be executed from a function
-- Trigger
CREATE TABLE refresh_cagg_trigger_table(a int);
CREATE FUNCTION refresh_cagg_trigger_fun()
RETURNS TRIGGER LANGUAGE PLPGSQL AS $$
BEGIN
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
END; $$;
CREATE TRIGGER refresh_cagg_trigger AFTER INSERT ON refresh_cagg_trigger_table
EXECUTE FUNCTION refresh_cagg_trigger_fun();
INSERT INTO refresh_cagg_trigger_table VALUES(1);
psql:include/cagg_refresh_common.sql:380: ERROR: refresh_continuous_aggregate() cannot be executed from a function
\set ON_ERROR_STOP 1
-- Additional tests for MERGE refresh
DROP TABLE conditions CASCADE;
NOTICE: drop cascades to 10 other objects
Expand Down
74 changes: 74 additions & 0 deletions tsl/test/sql/include/cagg_refresh_common.sql
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,77 @@ SELECT time_bucket('7 days', time) AS day, device, avg(temp) AS avg_temp
FROM conditions
GROUP BY 1,2 WITH NO DATA;
COMMIT;

-- refresh_continuous_aggregate can run two transactions, thus it cannot be
-- called in a transaction block (from a function, from dynamic SQL) or in a
-- subtransaction (from a procedure block with an EXCEPTION clause). Though it
-- does NOT require a top level context and can be called from a procedure
-- block without an EXCEPTION clause.

-- DO block
DO $$
BEGIN
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
END; $$;

-- Procedure without subtransaction
CREATE OR REPLACE PROCEDURE refresh_cagg_proc_normal()
LANGUAGE PLPGSQL AS
$$
BEGIN
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
END; $$;

CALL refresh_cagg_proc_normal();

\set ON_ERROR_STOP 0

-- Procedure with subtransaction
CREATE OR REPLACE PROCEDURE refresh_cagg_proc_subtransaction()
LANGUAGE PLPGSQL AS
$$
DECLARE
errmsg TEXT;
BEGIN
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS errmsg = MESSAGE_TEXT;
RAISE EXCEPTION '%', errmsg;
END; $$;

CALL refresh_cagg_proc_subtransaction();

-- Function
CREATE OR REPLACE FUNCTION refresh_cagg_fun()
RETURNS INT LANGUAGE PLPGSQL AS
$$
BEGIN
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
RETURN 1;
END; $$;

SELECT * from refresh_cagg_fun();

-- Dynamic SQL
DO $$
BEGIN
EXECUTE $inner$
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
$inner$;
END; $$;

-- Trigger
CREATE TABLE refresh_cagg_trigger_table(a int);

CREATE FUNCTION refresh_cagg_trigger_fun()
RETURNS TRIGGER LANGUAGE PLPGSQL AS $$
BEGIN
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
END; $$;

CREATE TRIGGER refresh_cagg_trigger AFTER INSERT ON refresh_cagg_trigger_table
EXECUTE FUNCTION refresh_cagg_trigger_fun();

INSERT INTO refresh_cagg_trigger_table VALUES(1);

\set ON_ERROR_STOP 1

0 comments on commit 5ecfc69

Please sign in to comment.