diff --git a/ext/gvl_wrappers.c b/ext/gvl_wrappers.c index db935707..b3eee5df 100644 --- a/ext/gvl_wrappers.c +++ b/ext/gvl_wrappers.c @@ -11,6 +11,7 @@ PGresult *PQclosePrepared(PGconn *conn, const char *stmtName){return NULL;} PGresult *PQclosePortal(PGconn *conn, const char *portalName){return NULL;} int PQsendClosePrepared(PGconn *conn, const char *stmtName){return 0;} int PQsendClosePortal(PGconn *conn, const char *portalName){return 0;} +int PQsendPipelineSync(PGconn *conn){return 0;} #endif #ifdef ENABLE_GVL_UNLOCK diff --git a/ext/gvl_wrappers.h b/ext/gvl_wrappers.h index 0c2c5b84..97f5ab07 100644 --- a/ext/gvl_wrappers.h +++ b/ext/gvl_wrappers.h @@ -208,6 +208,10 @@ #define FOR_EACH_PARAM_OF_PQsendClosePortal(param) \ param(PGconn *, conn) +#define FOR_EACH_PARAM_OF_PQpipelineSync(param) + +#define FOR_EACH_PARAM_OF_PQsendPipelineSync(param) + #define FOR_EACH_PARAM_OF_PQsetClientEncoding(param) \ param(PGconn *, conn) @@ -252,6 +256,8 @@ function(PQsendDescribePortal, GVL_TYPE_NONVOID, int, const char *, portal) \ function(PQsendClosePrepared, GVL_TYPE_NONVOID, int, const char *, stmt) \ function(PQsendClosePortal, GVL_TYPE_NONVOID, int, const char *, portal) \ + function(PQpipelineSync, GVL_TYPE_NONVOID, int, PGconn *, conn) \ + function(PQsendPipelineSync, GVL_TYPE_NONVOID, int, PGconn *, conn) \ function(PQsetClientEncoding, GVL_TYPE_NONVOID, int, const char *, encoding) \ function(PQisBusy, GVL_TYPE_NONVOID, int, PGconn *, conn) \ function(PQencryptPasswordConn, GVL_TYPE_NONVOID, char *, const char *, algorithm) \ diff --git a/ext/pg_connection.c b/ext/pg_connection.c index 22b7d9af..5a2ff6aa 100644 --- a/ext/pg_connection.c +++ b/ext/pg_connection.c @@ -3751,6 +3751,8 @@ pgconn_pipeline_status(VALUE self) * Raises PG::Error and has no effect if the connection is not currently idle, i.e., it has a result ready, or it is waiting for more input from the server, etc. * This function does not actually send anything to the server, it just changes the libpq connection state. * + * See the {PostgreSQL documentation}[https://www.postgresql.org/docs/17/libpq-pipeline-mode.html#LIBPQ-PIPELINE-MODE]. + * * Available since PostgreSQL-14 */ static VALUE @@ -3789,29 +3791,55 @@ pgconn_exit_pipeline_mode(VALUE self) /* * call-seq: - * conn.pipeline_sync -> nil + * conn.sync_pipeline_sync -> nil * - * Marks a synchronization point in a pipeline by sending a sync message and flushing the send buffer. - * This serves as the delimiter of an implicit transaction and an error recovery point; see Section 34.5.1.3 of the PostgreSQL documentation. + * This function has the same behavior as #async_pipeline_sync, but is implemented using the synchronous command processing API of libpq. + * See #async_exec for the differences between the two API variants. + * It's not recommended to use explicit sync or async variants but #pipeline_sync instead, unless you have a good reason to do so. * + * Available since PostgreSQL-14 + */ +static VALUE +pgconn_sync_pipeline_sync(VALUE self) +{ + PGconn *conn = pg_get_pgconn(self); + int res = gvl_PQpipelineSync(conn); + if( res != 1 ) + pg_raise_conn_error( rb_ePGerror, self, "%s", PQerrorMessage(conn)); + + return Qnil; +} + + +#ifdef HAVE_PQSETCHUNKEDROWSMODE +/* + * call-seq: + * conn.send_pipeline_sync -> nil + * + * Marks a synchronization point in a pipeline by sending a sync message without flushing the send buffer. + * + * This serves as the delimiter of an implicit transaction and an error recovery point. * Raises PG::Error if the connection is not in pipeline mode or sending a sync message failed. + * Note that the message is not itself flushed to the server automatically; use flush if necessary. * - * Available since PostgreSQL-14 + * Available since PostgreSQL-17 */ static VALUE -pgconn_pipeline_sync(VALUE self) +pgconn_send_pipeline_sync(VALUE self) { PGconn *conn = pg_get_pgconn(self); - int res = PQpipelineSync(conn); + int res = gvl_PQsendPipelineSync(conn); if( res != 1 ) pg_raise_conn_error( rb_ePGerror, self, "%s", PQerrorMessage(conn)); return Qnil; } +#endif + /* * call-seq: - * conn.pipeline_sync -> nil + * conn.send_flush_request -> nil * * Sends a request for the server to flush its output buffer. * @@ -4769,8 +4797,11 @@ init_pg_connection(void) rb_define_method(rb_cPGconn, "pipeline_status", pgconn_pipeline_status, 0); rb_define_method(rb_cPGconn, "enter_pipeline_mode", pgconn_enter_pipeline_mode, 0); rb_define_method(rb_cPGconn, "exit_pipeline_mode", pgconn_exit_pipeline_mode, 0); - rb_define_method(rb_cPGconn, "pipeline_sync", pgconn_pipeline_sync, 0); + rb_define_method(rb_cPGconn, "sync_pipeline_sync", pgconn_sync_pipeline_sync, 0); rb_define_method(rb_cPGconn, "send_flush_request", pgconn_send_flush_request, 0); +#ifdef HAVE_PQSETCHUNKEDROWSMODE + rb_define_method(rb_cPGconn, "send_pipeline_sync", pgconn_send_pipeline_sync, 0); +#endif #endif /****** PG::Connection INSTANCE METHODS: Large Object Support ******/ diff --git a/lib/pg/connection.rb b/lib/pg/connection.rb index 04613f10..cd328122 100644 --- a/lib/pg/connection.rb +++ b/lib/pg/connection.rb @@ -536,6 +536,25 @@ def put_copy_end(*args) end alias async_put_copy_end put_copy_end + if method_defined? :send_pipeline_sync + # call-seq: + # conn.pipeline_sync + # + # Marks a synchronization point in a pipeline by sending a sync message and flushing the send buffer. + # This serves as the delimiter of an implicit transaction and an error recovery point. + # + # See enter_pipeline_mode + # + # Raises PG::Error if the connection is not in pipeline mode or sending a sync message failed. + # + # Available since PostgreSQL-14 + def pipeline_sync(*args) + send_pipeline_sync(*args) + flush + end + alias async_pipeline_sync pipeline_sync + end + if method_defined? :sync_encrypt_password # call-seq: # conn.encrypt_password( password, username, algorithm=nil ) -> String @@ -894,14 +913,29 @@ def ping(*args) private_constant :REDIRECT_CLASS_METHODS # These methods are affected by PQsetnonblocking - REDIRECT_SEND_METHODS = PG.make_shareable({ + REDIRECT_SEND_METHODS = { :isnonblocking => [:async_isnonblocking, :sync_isnonblocking], :nonblocking? => [:async_isnonblocking, :sync_isnonblocking], :put_copy_data => [:async_put_copy_data, :sync_put_copy_data], :put_copy_end => [:async_put_copy_end, :sync_put_copy_end], :flush => [:async_flush, :sync_flush], - }) + } private_constant :REDIRECT_SEND_METHODS + if PG::Connection.instance_methods.include? :sync_pipeline_sync + if PG::Connection.instance_methods.include? :send_pipeline_sync + # PostgreSQL-17+ + REDIRECT_SEND_METHODS.merge!({ + :pipeline_sync => [:async_pipeline_sync, :sync_pipeline_sync], + }) + else + # PostgreSQL-14+ + REDIRECT_SEND_METHODS.merge!({ + :pipeline_sync => [:sync_pipeline_sync, :sync_pipeline_sync], + }) + end + end + PG.make_shareable(REDIRECT_SEND_METHODS) + REDIRECT_METHODS = { :exec => [:async_exec, :sync_exec], :query => [:async_exec, :sync_exec], diff --git a/spec/pg/connection_spec.rb b/spec/pg/connection_spec.rb index bdaf597d..f8f1571a 100644 --- a/spec/pg/connection_spec.rb +++ b/spec/pg/connection_spec.rb @@ -2173,6 +2173,11 @@ def wait_check_socket(conn) @conn.pipeline_sync }.to raise_error(PG::Error){|err| expect(err).to have_attributes(connection: @conn) } end + + it "has send_pipeline_sync method", :postgresql_17 do + expect( @conn.respond_to?(:send_pipeline_sync) ).to be_truthy + expect( @conn.respond_to?(:async_pipeline_sync) ).to be_truthy + end end describe "send_flush_request" do diff --git a/spec/pg/scheduler_spec.rb b/spec/pg/scheduler_spec.rb index fac37121..637fc503 100644 --- a/spec/pg/scheduler_spec.rb +++ b/spec/pg/scheduler_spec.rb @@ -263,4 +263,20 @@ expect( ping ).to eq( PG::PQPING_OK ) end end + + it "can send a pipeline_sync message" do + run_with_scheduler(99) do |conn| + conn.enter_pipeline_mode + 1000.times do |idx| + # This doesn't fail on sync_pipeline_sync, since PQpipelineSync() tries to flush, but doesn't wait for writablility. + conn.pipeline_sync + end + 1000.times do + expect( conn.get_result.result_status ).to eq( PG::PGRES_PIPELINE_SYNC ) + end + expect( conn.get_result ).to be_nil + expect( conn.get_result ).to be_nil + conn.exit_pipeline_mode + end + end end