Skip to content

Commit

Permalink
Add Connection#send_pipeline_sync, async_pipeline_sync and release GV…
Browse files Browse the repository at this point in the history
…L at PQ(sendP|P)ipelineSync

Also make pipeline_sync an alias for sync_pipeline_sync vs. async_pipeline_sync.

Now send_pipeline_sync and flush is used to notify IO waiting to the scheduler.
  • Loading branch information
larskanis committed Nov 25, 2024
1 parent df2ef1d commit 6dbb3e6
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 10 deletions.
4 changes: 4 additions & 0 deletions ext/gvl_wrappers.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ PGresult *PQclosePrepared(PGconn *conn, const char *stmtName){return NULL;}
PGresult *PQclosePortal(PGconn *conn, const char *portalName){return NULL;}
int PQsendClosePrepared(PGconn *conn, const char *stmtName){return 0;}
int PQsendClosePortal(PGconn *conn, const char *portalName){return 0;}
int PQsendPipelineSync(PGconn *conn){return 0;}
#endif
#ifndef HAVE_PQENTERPIPELINEMODE
int PQpipelineSync(PGconn *conn){return 0;}
#endif

#ifdef ENABLE_GVL_UNLOCK
Expand Down
6 changes: 6 additions & 0 deletions ext/gvl_wrappers.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@
#define FOR_EACH_PARAM_OF_PQsendClosePortal(param) \
param(PGconn *, conn)

#define FOR_EACH_PARAM_OF_PQpipelineSync(param)

#define FOR_EACH_PARAM_OF_PQsendPipelineSync(param)

#define FOR_EACH_PARAM_OF_PQsetClientEncoding(param) \
param(PGconn *, conn)

Expand Down Expand Up @@ -252,6 +256,8 @@
function(PQsendDescribePortal, GVL_TYPE_NONVOID, int, const char *, portal) \
function(PQsendClosePrepared, GVL_TYPE_NONVOID, int, const char *, stmt) \
function(PQsendClosePortal, GVL_TYPE_NONVOID, int, const char *, portal) \
function(PQpipelineSync, GVL_TYPE_NONVOID, int, PGconn *, conn) \
function(PQsendPipelineSync, GVL_TYPE_NONVOID, int, PGconn *, conn) \
function(PQsetClientEncoding, GVL_TYPE_NONVOID, int, const char *, encoding) \
function(PQisBusy, GVL_TYPE_NONVOID, int, PGconn *, conn) \
function(PQencryptPasswordConn, GVL_TYPE_NONVOID, char *, const char *, algorithm) \
Expand Down
47 changes: 39 additions & 8 deletions ext/pg_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -3751,6 +3751,8 @@ pgconn_pipeline_status(VALUE self)
* Raises PG::Error and has no effect if the connection is not currently idle, i.e., it has a result ready, or it is waiting for more input from the server, etc.
* This function does not actually send anything to the server, it just changes the libpq connection state.
*
* See the {PostgreSQL documentation}[https://www.postgresql.org/docs/17/libpq-pipeline-mode.html#LIBPQ-PIPELINE-MODE].
*
* Available since PostgreSQL-14
*/
static VALUE
Expand Down Expand Up @@ -3789,29 +3791,55 @@ pgconn_exit_pipeline_mode(VALUE self)

/*
* call-seq:
* conn.pipeline_sync -> nil
* conn.sync_pipeline_sync -> nil
*
* Marks a synchronization point in a pipeline by sending a sync message and flushing the send buffer.
* This serves as the delimiter of an implicit transaction and an error recovery point; see Section 34.5.1.3 of the PostgreSQL documentation.
* This function has the same behavior as #async_pipeline_sync, but is implemented using the synchronous command processing API of libpq.
* See #async_exec for the differences between the two API variants.
* It's not recommended to use explicit sync or async variants but #pipeline_sync instead, unless you have a good reason to do so.
*
* Available since PostgreSQL-14
*/
static VALUE
pgconn_sync_pipeline_sync(VALUE self)
{
PGconn *conn = pg_get_pgconn(self);
int res = gvl_PQpipelineSync(conn);
if( res != 1 )
pg_raise_conn_error( rb_ePGerror, self, "%s", PQerrorMessage(conn));

return Qnil;
}


#ifdef HAVE_PQSETCHUNKEDROWSMODE
/*
* call-seq:
* conn.send_pipeline_sync -> nil
*
* Marks a synchronization point in a pipeline by sending a sync message without flushing the send buffer.
*
* This serves as the delimiter of an implicit transaction and an error recovery point.
* Raises PG::Error if the connection is not in pipeline mode or sending a sync message failed.
* Note that the message is not itself flushed to the server automatically; use flush if necessary.
*
* Available since PostgreSQL-14
* Available since PostgreSQL-17
*/
static VALUE
pgconn_pipeline_sync(VALUE self)
pgconn_send_pipeline_sync(VALUE self)
{
PGconn *conn = pg_get_pgconn(self);
int res = PQpipelineSync(conn);
int res = gvl_PQsendPipelineSync(conn);
if( res != 1 )
pg_raise_conn_error( rb_ePGerror, self, "%s", PQerrorMessage(conn));

return Qnil;
}
#endif


/*
* call-seq:
* conn.pipeline_sync -> nil
* conn.send_flush_request -> nil
*
* Sends a request for the server to flush its output buffer.
*
Expand Down Expand Up @@ -4769,8 +4797,11 @@ init_pg_connection(void)
rb_define_method(rb_cPGconn, "pipeline_status", pgconn_pipeline_status, 0);
rb_define_method(rb_cPGconn, "enter_pipeline_mode", pgconn_enter_pipeline_mode, 0);
rb_define_method(rb_cPGconn, "exit_pipeline_mode", pgconn_exit_pipeline_mode, 0);
rb_define_method(rb_cPGconn, "pipeline_sync", pgconn_pipeline_sync, 0);
rb_define_method(rb_cPGconn, "sync_pipeline_sync", pgconn_sync_pipeline_sync, 0);
rb_define_method(rb_cPGconn, "send_flush_request", pgconn_send_flush_request, 0);
#ifdef HAVE_PQSETCHUNKEDROWSMODE
rb_define_method(rb_cPGconn, "send_pipeline_sync", pgconn_send_pipeline_sync, 0);
#endif
#endif

/****** PG::Connection INSTANCE METHODS: Large Object Support ******/
Expand Down
38 changes: 36 additions & 2 deletions lib/pg/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,25 @@ def put_copy_end(*args)
end
alias async_put_copy_end put_copy_end

if method_defined? :send_pipeline_sync
# call-seq:
# conn.pipeline_sync
#
# Marks a synchronization point in a pipeline by sending a sync message and flushing the send buffer.
# This serves as the delimiter of an implicit transaction and an error recovery point.
#
# See enter_pipeline_mode
#
# Raises PG::Error if the connection is not in pipeline mode or sending a sync message failed.
#
# Available since PostgreSQL-14
def pipeline_sync(*args)
send_pipeline_sync(*args)
flush
end
alias async_pipeline_sync pipeline_sync
end

if method_defined? :sync_encrypt_password
# call-seq:
# conn.encrypt_password( password, username, algorithm=nil ) -> String
Expand Down Expand Up @@ -894,14 +913,29 @@ def ping(*args)
private_constant :REDIRECT_CLASS_METHODS

# These methods are affected by PQsetnonblocking
REDIRECT_SEND_METHODS = PG.make_shareable({
REDIRECT_SEND_METHODS = {
:isnonblocking => [:async_isnonblocking, :sync_isnonblocking],
:nonblocking? => [:async_isnonblocking, :sync_isnonblocking],
:put_copy_data => [:async_put_copy_data, :sync_put_copy_data],
:put_copy_end => [:async_put_copy_end, :sync_put_copy_end],
:flush => [:async_flush, :sync_flush],
})
}
private_constant :REDIRECT_SEND_METHODS
if PG::Connection.instance_methods.include? :sync_pipeline_sync
if PG::Connection.instance_methods.include? :send_pipeline_sync
# PostgreSQL-17+
REDIRECT_SEND_METHODS.merge!({
:pipeline_sync => [:async_pipeline_sync, :sync_pipeline_sync],
})
else
# PostgreSQL-14+
REDIRECT_SEND_METHODS.merge!({
:pipeline_sync => [:sync_pipeline_sync, :sync_pipeline_sync],
})
end
end
PG.make_shareable(REDIRECT_SEND_METHODS)

REDIRECT_METHODS = {
:exec => [:async_exec, :sync_exec],
:query => [:async_exec, :sync_exec],
Expand Down
5 changes: 5 additions & 0 deletions spec/pg/connection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2173,6 +2173,11 @@ def wait_check_socket(conn)
@conn.pipeline_sync
}.to raise_error(PG::Error){|err| expect(err).to have_attributes(connection: @conn) }
end

it "has send_pipeline_sync method", :postgresql_17 do
expect( @conn.respond_to?(:send_pipeline_sync) ).to be_truthy
expect( @conn.respond_to?(:async_pipeline_sync) ).to be_truthy
end
end

describe "send_flush_request" do
Expand Down
16 changes: 16 additions & 0 deletions spec/pg/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,20 @@
expect( ping ).to eq( PG::PQPING_OK )
end
end

it "can send a pipeline_sync message", :postgresql_14 do
run_with_scheduler(99) do |conn|
conn.enter_pipeline_mode
1000.times do |idx|
# This doesn't fail on sync_pipeline_sync, since PQpipelineSync() tries to flush, but doesn't wait for writablility.
conn.pipeline_sync
end
1000.times do
expect( conn.get_result.result_status ).to eq( PG::PGRES_PIPELINE_SYNC )
end
expect( conn.get_result ).to be_nil
expect( conn.get_result ).to be_nil
conn.exit_pipeline_mode
end
end
end

0 comments on commit 6dbb3e6

Please sign in to comment.