From bc580b0a6100637ecbfeeecefc84e2b81ff25c34 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Mon, 27 Feb 2017 23:25:41 +0800 Subject: [PATCH] msg/async/rdma: refactor RDMAStack to accelerate tx handle previously Dispatcher thread will poll both rx and tx events, then dispatch these events to RDMAWorker and RDMAConnectedSocketImpl. Actually tx event handling is a lightweight task and we make these handling inline now. rx event dispatching is still working. Another change is adding tx cq to make event polling separated. removing lots of codes yet. Signed-off-by: Haomai Wang --- src/msg/async/rdma/Infiniband.cc | 6 +- src/msg/async/rdma/RDMAConnectedSocketImpl.cc | 8 +- src/msg/async/rdma/RDMAServerSocketImpl.cc | 1 - src/msg/async/rdma/RDMAStack.cc | 397 ++++++++---------- src/msg/async/rdma/RDMAStack.h | 52 ++- 5 files changed, 215 insertions(+), 249 deletions(-) diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc index 0160c04cf31a..739a371e47ec 100644 --- a/src/msg/async/rdma/Infiniband.cc +++ b/src/msg/async/rdma/Infiniband.cc @@ -864,12 +864,12 @@ int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im) } if (r < 0) { r = -errno; - lderr(cct) << __func__ << " got error " << errno << ": " - << cpp_strerror(errno) << dendl; + lderr(cct) << __func__ << " got error " << r << ": " + << cpp_strerror(r) << dendl; } else if (r == 0) { // valid disconnect message of length 0 ldout(cct, 10) << __func__ << " got disconnect message " << dendl; } else if ((size_t)r != sizeof(msg)) { // invalid message - ldout(cct, 1) << __func__ << " got bad length (" << r << "): " << cpp_strerror(errno) << dendl; + ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl; r = -EINVAL; } else { // valid message sscanf(msg, "%hu:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid); diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc index a9666b66382f..c636f4c521c3 100644 --- a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -28,7 +28,7 @@ RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* i active(false), detached(false) { qp = infiniband->create_queue_pair( - cct, s->get_rx_cq(), s->get_rx_cq(), IBV_QPT_RC); + cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC); my_msg.qpn = qp->get_local_qp_number(); my_msg.psn = qp->get_initial_psn(); my_msg.lid = infiniband->get_lid(); @@ -183,6 +183,7 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S int r = net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size); if (r < 0) { ::close(tcp_fd); + tcp_fd = -1; return -errno; } @@ -419,7 +420,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more) std::list::const_iterator &end) -> unsigned { assert(start != end); auto chunk_idx = tx_buffers.size(); - int ret = worker->reserve_message_buffer(this, tx_buffers, bytes); + int ret = worker->get_reged_mem(this, tx_buffers, bytes); if (ret == 0) { ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl; worker->perf_logger->inc(l_msgr_rdma_tx_no_mem); @@ -495,7 +496,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more) return r; ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl; - return bytes; + return pending_bl.length() ? -EAGAIN : 0; } int RDMAConnectedSocketImpl::post_work_request(std::vector &tx_buffers) @@ -602,6 +603,7 @@ void RDMAConnectedSocketImpl::close() void RDMAConnectedSocketImpl::fault() { + ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl; /*if (qp) { qp->to_dead(); qp = NULL; diff --git a/src/msg/async/rdma/RDMAServerSocketImpl.cc b/src/msg/async/rdma/RDMAServerSocketImpl.cc index 95eab99b75b4..f15de84bec4b 100644 --- a/src/msg/async/rdma/RDMAServerSocketImpl.cc +++ b/src/msg/async/rdma/RDMAServerSocketImpl.cc @@ -83,7 +83,6 @@ int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt if (sd < 0) { return -errno; } - ldout(cct, 20) << __func__ << " accepted a new QP, tcp_fd: " << sd << dendl; net.set_close_on_exec(sd); int r = net.set_nonblock(sd); diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index ec01760bda24..40f2510df2a9 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -31,29 +31,29 @@ RDMADispatcher::~RDMADispatcher() done = true; t.join(); ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl; - auto i = qp_conns.begin(); - while (i != qp_conns.end()) { - delete i->second.first; - ++i; - } - while (!dead_queue_pairs.empty()) { - delete dead_queue_pairs.back(); - dead_queue_pairs.pop_back(); - } + assert(qp_conns.empty()); + assert(dead_queue_pairs.empty()); + tx_cc->ack_events(); rx_cc->ack_events(); + delete tx_cq; delete rx_cq; + delete tx_cc; delete rx_cc; delete async_handler; } RDMADispatcher::RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s) : cct(c), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"), - w_lock("RDMADispatcher::for worker pending list"), qp_lock("for qp lock"), stack(s) + w_lock("RDMADispatcher::for worker pending list"), stack(s) { + tx_cc = ib->create_comp_channel(c); + assert(tx_cc); rx_cc = ib->create_comp_channel(c); assert(rx_cc); + tx_cq = ib->create_comp_queue(c, tx_cc); + assert(tx_cq); rx_cq = ib->create_comp_queue(c, rx_cc); assert(rx_cq); @@ -62,6 +62,11 @@ RDMADispatcher::RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s) plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling"); plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks"); + plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions"); + plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors"); + plb.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors, "tx_retry_errors", "The number of tx retry errors"); + plb.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors, "tx_wr_flush_errors", "The number of tx work request flush errors"); + plb.add_u64_counter(l_msgr_rdma_rx_total_wc, "rx_total_wc", "The number of total rx work completion"); plb.add_u64_counter(l_msgr_rdma_rx_total_wc_errors, "rx_total_wc_errors", "The number of total rx error work completion"); plb.add_u64_counter(l_msgr_rdma_rx_fin, "rx_fin", "The number of rx finish work request"); @@ -125,16 +130,68 @@ void RDMADispatcher::polling() std::map > polled; std::vector tx_cqe; - RDMAWorker* worker; - ldout(cct, 20) << __func__ << " going to poll rx cq:" << rx_cq << dendl; + ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl; RDMAConnectedSocketImpl *conn = nullptr; utime_t last_inactive = ceph_clock_now(); bool rearmed = false; - int ret = 0; + int r = 0; while (true) { - int n = rx_cq->poll_cq(MAX_COMPLETIONS, wc); - if (!n) { + int tx_ret = tx_cq->poll_cq(MAX_COMPLETIONS, wc); + if (tx_ret > 0) { + ldout(cct, 20) << __func__ << " tx completion queue got " << tx_ret + << " responses."<< dendl; + handle_tx_event(wc, tx_ret); + } + + int rx_ret = rx_cq->poll_cq(MAX_COMPLETIONS, wc); + if (rx_ret > 0) { + ldout(cct, 20) << __func__ << " rt completion queue got " << rx_ret + << " responses."<< dendl; + perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret); + + Mutex::Locker l(lock);//make sure connected socket alive when pass wc + for (int i = 0; i < rx_ret; ++i) { + ibv_wc* response = &wc[i]; + Chunk* chunk = reinterpret_cast(response->wr_id); + ldout(cct, 25) << __func__ << " got chunk=" << chunk << " bytes:" << response->byte_len << " opcode:" << response->opcode << dendl; + + assert(wc[i].opcode == IBV_WC_RECV); + + if (response->status == IBV_WC_SUCCESS) { + conn = get_conn_lockless(response->qp_num); + if (!conn) { + assert(ib->is_rx_buffer(chunk->buffer)); + r = ib->post_chunk(chunk); + ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl; + assert(r == 0); + } else { + polled[conn].push_back(*response); + } + } else { + perf_logger->inc(l_msgr_rdma_rx_total_wc_errors); + ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk + << ") status(" << response->status << ":" + << ib->wc_status_to_string(response->status) << ")" << dendl; + assert(ib->is_rx_buffer(chunk->buffer)); + r = ib->post_chunk(chunk); + if (r) { + ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(r) << dendl; + assert(r == 0); + } + + conn = get_conn_lockless(response->qp_num); + if (conn && conn->is_connected()) + conn->fault(); + } + } + + for (auto &&i : polled) + i.first->pass_wc(std::move(i.second)); + polled.clear(); + } + + if (!tx_ret && !rx_ret) { // NOTE: Has TX just transitioned to idle? We should do it when idle! // It's now safe to delete queue pairs (see comment by declaration // for dead_queue_pairs). @@ -157,102 +214,55 @@ void RDMADispatcher::polling() if (!rearmed) { // Clean up cq events after rearm notify ensure no new incoming event // arrived between polling and rearm + tx_cq->rearm_notify(); rx_cq->rearm_notify(); rearmed = true; continue; } - struct pollfd channel_poll; - channel_poll.fd = rx_cc->get_fd(); - channel_poll.events = POLLIN | POLLERR | POLLNVAL | POLLHUP; - channel_poll.revents = 0; - int r = 0; + struct pollfd channel_poll[2]; + channel_poll[0].fd = tx_cc->get_fd(); + channel_poll[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; + channel_poll[0].revents = 0; + channel_poll[1].fd = rx_cc->get_fd(); + channel_poll[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; + channel_poll[1].revents = 0; + r = 0; perf_logger->set(l_msgr_rdma_polling, 0); while (!done && r == 0) { - r = poll(&channel_poll, 1, 1); + r = poll(channel_poll, 2, 1); if (r < 0) { r = -errno; lderr(cct) << __func__ << " poll failed " << r << dendl; ceph_abort(); } } + if (r > 0 && tx_cc->get_cq_event()) + ldout(cct, 20) << __func__ << " got tx cq event." << dendl; if (r > 0 && rx_cc->get_cq_event()) - ldout(cct, 20) << __func__ << " got cq event." << dendl; + ldout(cct, 20) << __func__ << " got rx cq event." << dendl; last_inactive = ceph_clock_now(); perf_logger->set(l_msgr_rdma_polling, 1); rearmed = false; } - continue; - } - - ldout(cct, 20) << __func__ << " pool completion queue got " << n - << " responses."<< dendl; - perf_logger->inc(l_msgr_rdma_rx_total_wc, n); - Mutex::Locker l(lock);//make sure connected socket alive when pass wc - for (int i = 0; i < n; ++i) { - ibv_wc* response = &wc[i]; - Chunk* chunk = reinterpret_cast(response->wr_id); - - ldout(cct, 25) << __func__ << " got chunk=" << chunk << " bytes:" << response->byte_len << " opcode:" << response->opcode << dendl; - - if (wc[i].opcode == IBV_WC_SEND) { - tx_cqe.push_back(wc[i]); - ldout(cct, 25) << " got a tx cqe, bytes:" << wc[i].byte_len << dendl; - continue; - } - - if (response->status != IBV_WC_SUCCESS) { - perf_logger->inc(l_msgr_rdma_rx_total_wc_errors); - ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk - << ") status(" << response->status << ":" - << ib->wc_status_to_string(response->status) << ")" << dendl; - if (ib->is_rx_buffer(chunk->buffer)) { - ret = ib->post_chunk(chunk); - if (ret) { - ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(ret) << dendl; - assert(ret == 0); - } - conn = get_conn_lockless(response->qp_num); - if (conn && conn->is_connected()) - conn->fault(); - notify_pending_workers(); - } else if (ib->is_tx_buffer(chunk->buffer)) { - tx_cqe.push_back(wc[i]); - } else { - ldout(cct, 0) << __func__ << " unknown chunk: " << chunk << dendl; - } - continue; - } - - conn = get_conn_lockless(response->qp_num); - if (!conn) { - ret = ib->post_chunk(chunk); - ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << ret << dendl; - assert(ret == 0); - continue; - } - polled[conn].push_back(*response); - } - for (auto &&i : polled) - i.first->pass_wc(std::move(i.second)); - polled.clear(); - if (!tx_cqe.empty()) { - worker = get_worker_from_list(); - if (worker == nullptr) - worker = dynamic_cast(stack->get_worker()); - worker->pass_wc(std::move(tx_cqe)); - tx_cqe.clear(); } } } void RDMADispatcher::notify_pending_workers() { - { - Mutex::Locker l(w_lock); - if (pending_workers.empty()) - return ; + if (num_pending_workers) { + RDMAWorker *w = nullptr; + { + Mutex::Locker l(w_lock); + if (!pending_workers.empty()) { + w = pending_workers.front(); + pending_workers.pop_front(); + --num_pending_workers; + } + } + if (w) + w->notify_worker(); } - pending_workers.front()->notify_worker(); } int RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi) @@ -265,33 +275,6 @@ int RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi) return fd; } -int RDMADispatcher::register_worker(RDMAWorker* w) -{ - int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK); - assert(fd >= 0); - Mutex::Locker l(w_lock); - workers[w] = fd; - return fd; -} - -void RDMADispatcher::pending_buffers(RDMAWorker* w) -{ - Mutex::Locker l(w_lock); - pending_workers.push_back(w); -} - -RDMAWorker* RDMADispatcher::get_worker_from_list() -{ - Mutex::Locker l(w_lock); - if (pending_workers.empty()) - return nullptr; - else { - RDMAWorker* w = pending_workers.front(); - pending_workers.pop_front(); - return w; - } -} - RDMAConnectedSocketImpl* RDMADispatcher::get_conn_by_qp(uint32_t qp) { Mutex::Locker l(lock); @@ -335,6 +318,75 @@ void RDMADispatcher::handle_post_fork() t = std::thread(&RDMADispatcher::polling, this); } +void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) +{ + std::vector tx_chunks; + + for (int i = 0; i < n; ++i) { + ibv_wc* response = &cqe[i]; + Chunk* chunk = reinterpret_cast(response->wr_id); + ldout(cct, 25) << __func__ << " QP: " << response->qp_num + << " len: " << response->byte_len << " , addr:" << chunk + << " " << ib->wc_status_to_string(response->status) << dendl; + + if (response->status != IBV_WC_SUCCESS) { + perf_logger->inc(l_msgr_rdma_tx_total_wc_errors); + if (response->status == IBV_WC_RETRY_EXC_ERR) { + ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl; + perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors); + } else if (response->status == IBV_WC_WR_FLUSH_ERR) { + ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp=" + << response->qp_num << " should be down while this WR=" << response->wr_id + << " still in flight." << dendl; + perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors); + } else { + ldout(cct, 1) << __func__ << " send work request returned error for buffer(" + << response->wr_id << ") status(" << response->status << "): " + << ib->wc_status_to_string(response->status) << dendl; + } + + Mutex::Locker l(lock);//make sure connected socket alive when pass wc + RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num); + + if (conn && conn->is_connected()) { + ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi + conn->fault(); + } else { + ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl; + } + } + + // FIXME: why not tx? + if (ib->get_memory_manager()->is_tx_buffer(chunk->buffer)) + tx_chunks.push_back(chunk); + else + ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl; + } + + perf_logger->inc(l_msgr_rdma_tx_total_wc, n); + post_tx_buffer(tx_chunks); +} + +/** + * Add the given Chunks to the given free queue. + * + * \param[in] chunks + * The Chunks to enqueue. + * \return + * 0 if success or -1 for failure + */ +void RDMADispatcher::post_tx_buffer(std::vector &chunks) +{ + if (chunks.empty()) + return ; + + inflight -= chunks.size(); + ib->get_memory_manager()->return_tx(chunks); + ldout(cct, 30) << __func__ << " release " << chunks.size() + << " chunks, inflight " << inflight << dendl; + notify_pending_workers(); +} + RDMAWorker::RDMAWorker(CephContext *c, unsigned i) : Worker(c, i), stack(nullptr), infiniband(NULL), @@ -345,11 +397,6 @@ RDMAWorker::RDMAWorker(CephContext *c, unsigned i) sprintf(name, "AsyncMessenger::RDMAWorker-%u", id); PerfCountersBuilder plb(cct, name, l_msgr_rdma_first, l_msgr_rdma_last); - plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions"); - plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors"); - plb.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors, "tx_retry_errors", "The number of tx retry errors"); - plb.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors, "tx_wr_flush_errors", "The number of tx work request flush errors"); - plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer"); plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer"); plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted"); @@ -367,44 +414,16 @@ RDMAWorker::RDMAWorker(CephContext *c, unsigned i) RDMAWorker::~RDMAWorker() { delete tx_handler; - if (notify_fd >= 0) - ::close(notify_fd); } void RDMAWorker::initialize() { if (!dispatcher) { dispatcher = stack->get_dispatcher(); - notify_fd = dispatcher->register_worker(this); - center.create_file_event(notify_fd, EVENT_READABLE, tx_handler); memory_manager = infiniband->get_memory_manager(); } } -void RDMAWorker::notify_worker() -{ - uint64_t i = 1; - assert(write(notify_fd, &i, sizeof(i)) == sizeof(i)); -} - -void RDMAWorker::pass_wc(std::vector &&v) -{ - Mutex::Locker l(lock); - if (wc.empty()) - wc = std::move(v); - else - wc.insert(wc.end(), v.begin(), v.end()); - notify_worker(); -} - -void RDMAWorker::get_wc(std::vector &w) -{ - Mutex::Locker l(lock); - if (wc.empty()) - return ; - w.swap(wc); -} - int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock) { auto p = new RDMAServerSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this, sa); @@ -433,43 +452,29 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co return 0; } -int RDMAWorker::reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector &c, size_t bytes) +int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector &c, size_t bytes) { assert(center.in_thread()); int r = infiniband->get_tx_buffers(c, bytes); - if (r > 0) { - stack->get_dispatcher()->inflight += r; - ldout(cct, 30) << __func__ << " reserve " << r << " chunks, inflight " << dispatcher->inflight << dendl; + assert(r >= 0); + size_t got = infiniband->get_memory_manager()->get_tx_chunk_size() * r; + ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl; + stack->get_dispatcher()->inflight += r; + if (got == bytes) return r; - } - assert(r == 0); if (o) { if (pending_sent_conns.back() != o) pending_sent_conns.push_back(o); - dispatcher->pending_buffers(this); + dispatcher->make_pending_worker(this); } return r; } -/** - * Add the given Chunks to the given free queue. - * - * \param[in] chunks - * The Chunks to enqueue. - * \return - * 0 if success or -1 for failure - */ -void RDMAWorker::post_tx_buffer(std::vector &chunks) -{ - assert(center.in_thread()); - if (chunks.empty()) - return ; - - dispatcher->inflight -= chunks.size(); - memory_manager->return_tx(chunks); - ldout(cct, 30) << __func__ << " release " << chunks.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl; +void RDMAWorker::handle_pending_message() +{ + ldout(cct, 20) << __func__ << " pending conns " << pending_sent_conns.size() << dendl; std::set done; while (!pending_sent_conns.empty()) { RDMAConnectedSocketImpl *o = pending_sent_conns.front(); @@ -481,63 +486,17 @@ void RDMAWorker::post_tx_buffer(std::vector &chunks) if (r < 0) { if (r == -EAGAIN) { pending_sent_conns.push_front(o); - break; + dispatcher->make_pending_worker(this); + return ; } o->fault(); } } } -} - -void RDMAWorker::handle_tx_event() -{ - std::vector tx_chunks; - std::vector cqe; - get_wc(cqe); - - for (size_t i = 0; i < cqe.size(); ++i) { - ibv_wc* response = &cqe[i]; - Chunk* chunk = reinterpret_cast(response->wr_id); - ldout(cct, 25) << __func__ << " QP: " << response->qp_num << " len: " << response->byte_len << " , addr:" << chunk << " " << infiniband->wc_status_to_string(response->status) << dendl; - - if (response->status != IBV_WC_SUCCESS) { - perf_logger->inc(l_msgr_rdma_tx_total_wc_errors); - if (response->status == IBV_WC_RETRY_EXC_ERR) { - ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl; - perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors); - } else if (response->status == IBV_WC_WR_FLUSH_ERR) { - ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp=" - << response->qp_num << " should be down while this WR=" << response->wr_id - << " still in flight." << dendl; - perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors); - } else { - ldout(cct, 1) << __func__ << " send work request returned error for buffer(" - << response->wr_id << ") status(" << response->status << "): " - << infiniband->wc_status_to_string(response->status) << dendl; - } - RDMAConnectedSocketImpl *conn = stack->get_dispatcher()->get_conn_by_qp(response->qp_num); - - if (conn && conn->is_connected()) { - ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi - conn->fault(); - } else { - ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl; - } - } - - // FIXME: why not tx? - if (memory_manager->is_tx_buffer(chunk->buffer)) - tx_chunks.push_back(chunk); - } - perf_logger->inc(l_msgr_rdma_tx_total_wc, cqe.size()); - post_tx_buffer(tx_chunks); - - ldout(cct, 20) << __func__ << " give back " << tx_chunks.size() << " in Worker " << this << dendl; dispatcher->notify_pending_workers(); } - RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t) { if (!global_infiniband) diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index c92af60746a1..be9e70a82650 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -40,6 +40,11 @@ enum { l_msgr_rdma_polling, l_msgr_rdma_inflight_tx_chunks, + l_msgr_rdma_tx_total_wc, + l_msgr_rdma_tx_total_wc_errors, + l_msgr_rdma_tx_wc_retry_errors, + l_msgr_rdma_tx_wc_wr_flush_errors, + l_msgr_rdma_rx_total_wc, l_msgr_rdma_rx_total_wc_errors, l_msgr_rdma_rx_fin, @@ -63,12 +68,12 @@ class RDMADispatcher : public CephContext::ForkWatcher { std::thread t; CephContext *cct; Infiniband* ib; - Infiniband::CompletionQueue* rx_cq; // common completion queue for all transmits - Infiniband::CompletionChannel* rx_cc; + Infiniband::CompletionQueue* tx_cq; + Infiniband::CompletionQueue* rx_cq; + Infiniband::CompletionChannel *tx_cc, *rx_cc; EventCallbackRef async_handler; bool done = false; - Mutex lock; // protect `qp_conns - Mutex w_lock; // protect pending workers + Mutex lock; // protect `qp_conns`, `dead_queue_pairs` // qp_num -> InfRcConnection // The main usage of `qp_conns` is looking up connection by qp_num, // so the lifecycle of element in `qp_conns` is the lifecycle of qp. @@ -90,8 +95,10 @@ class RDMADispatcher : public CephContext::ForkWatcher { /// save them in this vector and delete them at a safe time, when there are /// no outstanding transmit buffers to be lost. std::vector dead_queue_pairs; - Mutex qp_lock;//for csi reuse qp - ceph::unordered_map workers;; + + std::atomic num_pending_workers = {0}; + Mutex w_lock; // protect pending workers + // fixme: lockfree std::list pending_workers; RDMAStack* stack; @@ -113,17 +120,24 @@ class RDMADispatcher : public CephContext::ForkWatcher { void handle_async_event(); void polling(); int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi); - int register_worker(RDMAWorker* w); - void pending_buffers(RDMAWorker* w); + void make_pending_worker(RDMAWorker* w) { + Mutex::Locker l(w_lock); + if (pending_workers.back() != w) { + pending_workers.push_back(w); + ++num_pending_workers; + } + } RDMAStack* get_stack() { return stack; } - RDMAWorker* get_worker_from_list(); RDMAConnectedSocketImpl* get_conn_by_qp(uint32_t qp); RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp); void erase_qpn(uint32_t qpn); + Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; } Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; } void notify_pending_workers(); virtual void handle_pre_fork() override; virtual void handle_post_fork() override; + void handle_tx_event(ibv_wc *cqe, int n); + void post_tx_buffer(std::vector &chunks); std::atomic inflight = {0}; }; @@ -132,11 +146,6 @@ class RDMADispatcher : public CephContext::ForkWatcher { enum { l_msgr_rdma_first = 95000, - l_msgr_rdma_tx_total_wc, - l_msgr_rdma_tx_total_wc_errors, - l_msgr_rdma_tx_wc_retry_errors, - l_msgr_rdma_tx_wc_wr_flush_errors, - l_msgr_rdma_tx_no_mem, l_msgr_rdma_tx_parital_mem, l_msgr_rdma_tx_failed, @@ -162,16 +171,14 @@ class RDMAWorker : public Worker { MemoryManager *memory_manager; std::list pending_sent_conns; RDMADispatcher* dispatcher = nullptr; - int notify_fd = -1; Mutex lock; - std::vector wc; class C_handle_cq_tx : public EventCallback { RDMAWorker *worker; public: C_handle_cq_tx(RDMAWorker *w): worker(w) {} void do_request(int fd) { - worker->handle_tx_event(); + worker->handle_pending_message(); } }; @@ -179,22 +186,21 @@ class RDMAWorker : public Worker { PerfCounters *perf_logger; explicit RDMAWorker(CephContext *c, unsigned i); virtual ~RDMAWorker(); - void notify_worker(); - void pass_wc(std::vector &&v); - void get_wc(std::vector &w); virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override; virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override; virtual void initialize() override; RDMAStack *get_stack() { return stack; } - int reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector &c, size_t bytes); - void post_tx_buffer(std::vector &chunks); + int get_reged_mem(RDMAConnectedSocketImpl *o, std::vector &c, size_t bytes); void remove_pending_conn(RDMAConnectedSocketImpl *o) { assert(center.in_thread()); pending_sent_conns.remove(o); } - void handle_tx_event(); + void handle_pending_message(); void set_ib(Infiniband* ib) { infiniband = ib; } void set_stack(RDMAStack *s) { stack = s; } + void notify_worker() { + center.dispatch_event_external(tx_handler); + } }; class RDMAConnectedSocketImpl : public ConnectedSocketImpl { -- 2.47.3