diff --git a/src/uct/ib/rc/accel/rc_mlx5.h b/src/uct/ib/rc/accel/rc_mlx5.h index c4c963d94e3..388ee2f2148 100644 --- a/src/uct/ib/rc/accel/rc_mlx5.h +++ b/src/uct/ib/rc/accel/rc_mlx5.h @@ -34,6 +34,7 @@ typedef struct uct_rc_mlx5_ep { uct_ib_mlx5_qp_t tm_qp; uct_rc_mlx5_mp_context_t mp; uint16_t atomic_mr_offset; + uint8_t connected; } uct_rc_mlx5_ep_t; typedef struct uct_rc_mlx5_ep_address { diff --git a/src/uct/ib/rc/accel/rc_mlx5_common.c b/src/uct/ib/rc/accel/rc_mlx5_common.c index f1f3c168721..96180054ed6 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_common.c +++ b/src/uct/ib/rc/accel/rc_mlx5_common.c @@ -1148,6 +1148,8 @@ static int uct_rc_mlx5_common_clean_tx_cq_cb(uct_rc_mlx5_iface_common_t *iface, { uct_rc_mlx5_common_clean_tx_cq_ctx_t *ctx = arg; + ucs_assert(iface->super.tx.cq_available >= -1); + if (cqe != NULL) { uct_rc_mlx5_common_free_tx_res(&iface->super, ctx->txwq, ctx->txqp, htons(cqe->wqe_counter)); @@ -1160,9 +1162,11 @@ static int uct_rc_mlx5_common_clean_tx_cq_cb(uct_rc_mlx5_iface_common_t *iface, /* If not posted NOP already, and have the resources, post it to flush * any unsignaled sends */ - if (ctx->post_nop && (iface->super.tx.cq_available > 0) && - (uct_rc_txqp_available(ctx->txqp) > 0)) - { + if (ctx->post_nop && (uct_rc_txqp_available(ctx->txqp) > 0)) { + /* We reserved one CQ credit for NOP operation, so the lowest value + * cq_available can reach is -1 (after posting the nop) + */ + ucs_assert(iface->super.tx.cq_available >= 0); ucs_trace("qp 0x%x: posted NOP", ctx->txwq->super.qp_num); uct_rc_mlx5_common_post_nop(iface, ctx->txqp, ctx->txwq); ucs_assert(uct_rc_txqp_unsignaled(ctx->txqp) == 0); @@ -1183,6 +1187,11 @@ void uct_rc_mlx5_iface_commom_cq_clean_tx(uct_rc_mlx5_iface_common_t *iface, }; uct_rc_mlx5_iface_commom_cq_clean(iface, UCT_IB_DIR_TX, txwq->super.qp_num, uct_rc_mlx5_common_clean_tx_cq_cb, &ctx); + + /* If NOP was posted and cq_available became -1, it must also be completed + * and restore cq_available to 0 + */ + ucs_assert(iface->super.tx.cq_available >= 0); } void uct_rc_mlx5_iface_print(uct_rc_mlx5_iface_common_t *mlx5_iface, diff --git a/src/uct/ib/rc/accel/rc_mlx5_ep.c b/src/uct/ib/rc/accel/rc_mlx5_ep.c index 5278df2b3ff..e7b37b7725e 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_ep.c +++ b/src/uct/ib/rc/accel/rc_mlx5_ep.c @@ -579,9 +579,8 @@ ucs_status_t uct_rc_mlx5_ep_fc_ctrl(uct_ep_t *tl_ep, unsigned op, * messages are bundled with AM. */ ucs_assert(op == UCT_RC_EP_FC_PURE_GRANT); - if (ucs_unlikely(ep->tx.wq.super.verbs.qp->state != IBV_QPS_RTS)) { - return (ep->tx.wq.super.verbs.qp->state == IBV_QPS_INIT) ? UCS_OK : - UCS_ERR_CONNECTION_RESET; + if (!ep->connected) { + return UCS_OK; } UCT_RC_CHECK_RES(&iface->super, &ep->super); @@ -703,6 +702,7 @@ ucs_status_t uct_rc_mlx5_ep_connect_to_ep(uct_ep_h tl_ep, } ep->atomic_mr_offset = uct_ib_md_atomic_offset(rc_addr->atomic_mr_id); + ep->connected = 1; return UCS_OK; } @@ -927,6 +927,8 @@ UCS_CLASS_INIT_FUNC(uct_rc_mlx5_ep_t, const uct_ep_params_t *params) self->tx.wq.bb_max = ucs_min(self->tx.wq.bb_max, iface->tx.bb_max); self->mp.free = 1; + self->connected = 0; + uct_rc_txqp_available_set(&self->super.txqp, self->tx.wq.bb_max); return UCS_OK; diff --git a/src/uct/ib/rc/base/rc_iface.c b/src/uct/ib/rc/base/rc_iface.c index 19a3a47c100..682b016ba40 100644 --- a/src/uct/ib/rc/base/rc_iface.c +++ b/src/uct/ib/rc/base/rc_iface.c @@ -553,7 +553,7 @@ UCS_CLASS_INIT_FUNC(uct_rc_iface_t, uct_rc_iface_ops_t *ops, uct_md_h md, UCS_CLASS_CALL_SUPER_INIT(uct_ib_iface_t, &ops->super, md, worker, params, &config->super, init_attr); - self->tx.cq_available = init_attr->tx_cq_len - 1; + self->tx.cq_available = init_attr->tx_cq_len - 2; /* reserve for nop */ self->tx.cq_free = 0; self->rx.srq.available = 0; self->rx.srq.quota = 0; diff --git a/test/apps/iodemo/io_demo.cc b/test/apps/iodemo/io_demo.cc index 2af9c50cf21..e2d2c9fc62f 100644 --- a/test/apps/iodemo/io_demo.cc +++ b/test/apps/iodemo/io_demo.cc @@ -74,8 +74,8 @@ typedef struct { template class MemoryPool { public: - MemoryPool(size_t buffer_size, size_t offcache = 0) : - _num_allocated(0), _buffer_size(buffer_size) { + MemoryPool(size_t buffer_size, const std::string& name, size_t offcache = 0) : + _num_allocated(0), _buffer_size(buffer_size), _name(name) { for (size_t i = 0; i < offcache; ++i) { _offcache_queue.push(get_free()); @@ -89,8 +89,8 @@ class MemoryPool { } if (_num_allocated != _free_stack.size()) { - LOG << "Some items were not freed. Total:" << _num_allocated - << ", current:" << _free_stack.size() << "."; + LOG << (_num_allocated - _free_stack.size()) + << " buffers were not released from " << _name; } for (size_t i = 0; i < _free_stack.size(); i++) { @@ -137,6 +137,7 @@ class MemoryPool { std::queue _offcache_queue; uint32_t _num_allocated; size_t _buffer_size; + std::string _name; }; /** @@ -438,11 +439,12 @@ class P2pDemoCommon : public UcxContext { P2pDemoCommon(const options_t& test_opts) : UcxContext(test_opts.iomsg_size), _test_opts(test_opts), - _io_msg_pool(test_opts.iomsg_size), - _send_callback_pool(0), + _io_msg_pool(test_opts.iomsg_size, "io messages"), + _send_callback_pool(0, "send callbacks"), _data_buffers_pool(get_chunk_cnt(test_opts.max_data_size, - test_opts.chunk_size)), - _data_chunks_pool(test_opts.chunk_size, test_opts.num_offcache_buffers) { + test_opts.chunk_size), "data iovs"), + _data_chunks_pool(test_opts.chunk_size, "data chunks", + test_opts.num_offcache_buffers) { } const options_t& opts() const { @@ -604,7 +606,7 @@ class DemoServer : public P2pDemoCommon { } state_t; DemoServer(const options_t& test_opts) : - P2pDemoCommon(test_opts), _callback_pool(0) { + P2pDemoCommon(test_opts), _callback_pool(0, "callbacks") { _curr_state.read_count = 0; _curr_state.write_count = 0; _curr_state.active_conns = 0; @@ -671,9 +673,14 @@ class DemoServer : public P2pDemoCommon { recv_data(conn, *iov, msg->tr.sn, w); } + virtual void dispatch_connection_accepted(UcxConnection* conn) { + ++_curr_state.active_conns; + } + virtual void dispatch_connection_error(UcxConnection *conn) { LOG << "deleting connection with status " << ucs_status_string(conn->ucx_status()); + --_curr_state.active_conns; delete conn; } @@ -699,24 +706,6 @@ class DemoServer : public P2pDemoCommon { } private: - virtual bool add_connection(UcxConnection *conn) { - bool added = P2pDemoCommon::add_connection(conn); - if (added) { - ++_curr_state.active_conns; - } - - return added; - } - - virtual bool remove_connection(UcxConnection *conn) { - bool removed = P2pDemoCommon::remove_connection(conn); - if (removed) { - --_curr_state.active_conns; - } - - return removed; - } - void save_prev_state() { _prev_state = _curr_state; } @@ -812,7 +801,7 @@ class DemoClient : public P2pDemoCommon { P2pDemoCommon(test_opts), _prev_connect_time(0), _num_sent(0), _num_completed(0), _status(OK), _start_time(get_time()), - _retry(0), _read_callback_pool(opts().iomsg_size) { + _retry(0), _read_callback_pool(opts().iomsg_size, "read callbacks") { } typedef enum { @@ -927,13 +916,15 @@ class DemoClient : public P2pDemoCommon { size_t server_index = get_server_index(conn); server_info_t& server_info = _server_info[server_index]; - // Don't wait from completions on this connection - _num_sent -= get_num_uncompleted(server_index); - // Remove connection pointer _server_index_lookup.erase(conn); + + // Destroying the connection will complete its outstanding operations delete conn; + // Don't wait for any more completions on this connection + _num_sent -= get_num_uncompleted(server_index); + // Replace in _active_servers by the last element in the vector size_t active_index = server_info.active_index; std::swap(_active_servers[active_index], _active_servers.back()); diff --git a/test/apps/iodemo/ucx_wrapper.cc b/test/apps/iodemo/ucx_wrapper.cc index 5500b3d333f..64d4a0022dd 100644 --- a/test/apps/iodemo/ucx_wrapper.cc +++ b/test/apps/iodemo/ucx_wrapper.cc @@ -256,6 +256,7 @@ void UcxContext::progress_conn_requests() UcxConnection *conn = new UcxConnection(*this, get_next_conn_id()); if (conn->accept(_conn_requests.front())) { add_connection(conn); + dispatch_connection_accepted(conn); } else { delete conn; } @@ -342,27 +343,24 @@ void UcxContext::recv_io_message() _iomsg_recv_request = reinterpret_cast(status_ptr); } -bool UcxContext::add_connection(UcxConnection *conn) +void UcxContext::add_connection(UcxConnection *conn) { - if (_conns.find(conn->id()) == _conns.end()) { - _conns[conn->id()] = conn; - return true; - } else { - return false; - } + assert(_conns.find(conn->id()) == _conns.end()); + _conns[conn->id()] = conn; } -bool UcxContext::remove_connection(UcxConnection *conn) +void UcxContext::remove_connection(UcxConnection *conn) { conn_map_t::iterator i = _conns.find(conn->id()); - if (i == _conns.end()) { - return false; - } else { + if (i != _conns.end()) { _conns.erase(i); - return true; } } +void UcxContext::dispatch_connection_accepted(UcxConnection* conn) +{ +} + void UcxContext::handle_connection_error(UcxConnection *conn) { remove_connection(conn); @@ -718,7 +716,7 @@ bool UcxConnection::process_request(const char *what, return true; } else if (UCS_PTR_IS_ERR(ptr_status)) { status = UCS_PTR_STATUS(ptr_status); - UCX_CONN_LOG << what << "failed with status: " + UCX_CONN_LOG << what << " failed with status: " << ucs_status_string(status); (*callback)(status); return false; diff --git a/test/apps/iodemo/ucx_wrapper.h b/test/apps/iodemo/ucx_wrapper.h index ee40360fc5f..943a5bbfbde 100644 --- a/test/apps/iodemo/ucx_wrapper.h +++ b/test/apps/iodemo/ucx_wrapper.h @@ -94,9 +94,8 @@ class UcxContext { // Called when there is a fatal failure on the connection virtual void dispatch_connection_error(UcxConnection* conn) = 0; - virtual bool add_connection(UcxConnection *conn); - - virtual bool remove_connection(UcxConnection *conn); + // Called when new server connection is accepted + virtual void dispatch_connection_accepted(UcxConnection* conn); private: typedef enum { @@ -138,6 +137,10 @@ class UcxContext { void recv_io_message(); + void add_connection(UcxConnection *conn); + + void remove_connection(UcxConnection *conn); + void handle_connection_error(UcxConnection *conn); void destroy_connections();