]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/rdma: refactor RDMAStack to accelerate tx handle
authorHaomai Wang <haomai@xsky.com>
Mon, 27 Feb 2017 15:25:41 +0000 (23:25 +0800)
committerHaomai Wang <haomai@xsky.com>
Wed, 1 Mar 2017 16:04:59 +0000 (00:04 +0800)
previously Dispatcher thread will poll both rx and tx events, then dispatch
these events to RDMAWorker and RDMAConnectedSocketImpl.

Actually tx event handling is a lightweight task and we make these handling
inline now. rx event dispatching is still working.

Another change is adding tx cq to make event polling separated.

removing lots of codes yet.

Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAServerSocketImpl.cc
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h

index 0160c04cf31a5c14bb5fb96bba0bcd0ae96ec39b..739a371e47ec38ee1cef09c98ccc1c08fdc3819b 100644 (file)
@@ -864,12 +864,12 @@ int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
   }
   if (r < 0) {
     r = -errno;
-    lderr(cct) << __func__ << " got error " << errno << ": "
-               << cpp_strerror(errno) << dendl;
+    lderr(cct) << __func__ << " got error " << r << ": "
+               << cpp_strerror(r) << dendl;
   } else if (r == 0) { // valid disconnect message of length 0
     ldout(cct, 10) << __func__ << " got disconnect message " << dendl;
   } else if ((size_t)r != sizeof(msg)) { // invalid message
-    ldout(cct, 1) << __func__ << " got bad length (" << r << "): " << cpp_strerror(errno) << dendl;
+    ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
     r = -EINVAL;
   } else { // valid message
     sscanf(msg, "%hu:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid);
index a9666b66382f243f1cf7b4c69707cb332583f510..c636f4c521c3dbc938c867fe98ad0b352e393037 100644 (file)
@@ -28,7 +28,7 @@ RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* i
     active(false), detached(false)
 {
   qp = infiniband->create_queue_pair(
-                                    cct, s->get_rx_cq(), s->get_rx_cq(), IBV_QPT_RC);
+                                    cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC);
   my_msg.qpn = qp->get_local_qp_number();
   my_msg.psn = qp->get_initial_psn();
   my_msg.lid = infiniband->get_lid();
@@ -183,6 +183,7 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S
   int r = net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size);
   if (r < 0) {
     ::close(tcp_fd);
+    tcp_fd = -1;
     return -errno;
   }
 
@@ -419,7 +420,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
                                  std::list<bufferptr>::const_iterator &end) -> unsigned {
     assert(start != end);
     auto chunk_idx = tx_buffers.size();
-    int ret = worker->reserve_message_buffer(this, tx_buffers, bytes);
+    int ret = worker->get_reged_mem(this, tx_buffers, bytes);
     if (ret == 0) {
       ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
       worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
@@ -495,7 +496,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
     return r;
 
   ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl;
-  return bytes;
+  return pending_bl.length() ? -EAGAIN : 0;
 }
 
 int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
@@ -602,6 +603,7 @@ void RDMAConnectedSocketImpl::close()
 
 void RDMAConnectedSocketImpl::fault()
 {
+  ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl;
   /*if (qp) {
     qp->to_dead();
     qp = NULL;
index 95eab99b75b4ecaa88a45e16d5194ddac7dec3c6..f15de84bec4bc49c063e077e4235cd7ff563b265 100644 (file)
@@ -83,7 +83,6 @@ int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt
   if (sd < 0) {
     return -errno;
   }
-  ldout(cct, 20) << __func__ << " accepted a new QP, tcp_fd: " << sd << dendl;
 
   net.set_close_on_exec(sd);
   int r = net.set_nonblock(sd);
index ec01760bda247d4188aab6dbf97c59f79b51c840..40f2510df2a91271269ae73e300cc0ee0036e215 100644 (file)
@@ -31,29 +31,29 @@ RDMADispatcher::~RDMADispatcher()
   done = true;
   t.join();
   ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
-  auto i = qp_conns.begin();
-  while (i != qp_conns.end()) {
-    delete i->second.first;
-    ++i;
-  }
 
-  while (!dead_queue_pairs.empty()) {
-    delete dead_queue_pairs.back();
-    dead_queue_pairs.pop_back();
-  }
+  assert(qp_conns.empty());
+  assert(dead_queue_pairs.empty());
 
+  tx_cc->ack_events();
   rx_cc->ack_events();
+  delete tx_cq;
   delete rx_cq;
+  delete tx_cc;
   delete rx_cc;
   delete async_handler;
 }
 
 RDMADispatcher::RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s)
   : cct(c), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
-  w_lock("RDMADispatcher::for worker pending list"), qp_lock("for qp lock"), stack(s)
+  w_lock("RDMADispatcher::for worker pending list"), stack(s)
 {
+  tx_cc = ib->create_comp_channel(c);
+  assert(tx_cc);
   rx_cc = ib->create_comp_channel(c);
   assert(rx_cc);
+  tx_cq = ib->create_comp_queue(c, tx_cc);
+  assert(tx_cq);
   rx_cq = ib->create_comp_queue(c, rx_cc);
   assert(rx_cq);
 
@@ -62,6 +62,11 @@ RDMADispatcher::RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s)
   plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
   plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks");
 
+  plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions");
+  plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors");
+  plb.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors, "tx_retry_errors", "The number of tx retry errors");
+  plb.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors, "tx_wr_flush_errors", "The number of tx work request flush errors");
+
   plb.add_u64_counter(l_msgr_rdma_rx_total_wc, "rx_total_wc", "The number of total rx work completion");
   plb.add_u64_counter(l_msgr_rdma_rx_total_wc_errors, "rx_total_wc_errors", "The number of total rx error work completion");
   plb.add_u64_counter(l_msgr_rdma_rx_fin, "rx_fin", "The number of rx finish work request");
@@ -125,16 +130,68 @@ void RDMADispatcher::polling()
 
   std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
   std::vector<ibv_wc> tx_cqe;
-  RDMAWorker* worker;
-  ldout(cct, 20) << __func__ << " going to poll rx cq:" << rx_cq << dendl;
+  ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl;
   RDMAConnectedSocketImpl *conn = nullptr;
   utime_t last_inactive = ceph_clock_now();
   bool rearmed = false;
-  int ret = 0;
+  int r = 0;
 
   while (true) {
-    int n = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
-    if (!n) {
+    int tx_ret = tx_cq->poll_cq(MAX_COMPLETIONS, wc);
+    if (tx_ret > 0) {
+      ldout(cct, 20) << __func__ << " tx completion queue got " << tx_ret
+                     << " responses."<< dendl;
+      handle_tx_event(wc, tx_ret);
+    }
+
+    int rx_ret = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
+    if (rx_ret > 0) {
+      ldout(cct, 20) << __func__ << " rt completion queue got " << rx_ret
+                     << " responses."<< dendl;
+      perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret);
+
+      Mutex::Locker l(lock);//make sure connected socket alive when pass wc
+      for (int i = 0; i < rx_ret; ++i) {
+        ibv_wc* response = &wc[i];
+        Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
+        ldout(cct, 25) << __func__ << " got chunk=" << chunk << " bytes:" << response->byte_len << " opcode:" << response->opcode << dendl;
+
+        assert(wc[i].opcode == IBV_WC_RECV);
+
+        if (response->status == IBV_WC_SUCCESS) {
+          conn = get_conn_lockless(response->qp_num);
+          if (!conn) {
+            assert(ib->is_rx_buffer(chunk->buffer));
+            r = ib->post_chunk(chunk);
+            ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
+            assert(r == 0);
+          } else {
+            polled[conn].push_back(*response);
+          }
+        } else {
+          perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
+          ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
+              << ") status(" << response->status << ":"
+              << ib->wc_status_to_string(response->status) << ")" << dendl;
+          assert(ib->is_rx_buffer(chunk->buffer));
+          r = ib->post_chunk(chunk);
+          if (r) {
+            ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(r) << dendl;
+            assert(r == 0);
+          }
+
+          conn = get_conn_lockless(response->qp_num);
+          if (conn && conn->is_connected())
+            conn->fault();
+        }
+      }
+
+      for (auto &&i : polled)
+        i.first->pass_wc(std::move(i.second));
+      polled.clear();
+    }
+
+    if (!tx_ret && !rx_ret) {
       // NOTE: Has TX just transitioned to idle? We should do it when idle!
       // It's now safe to delete queue pairs (see comment by declaration
       // for dead_queue_pairs).
@@ -157,102 +214,55 @@ void RDMADispatcher::polling()
         if (!rearmed) {
           // Clean up cq events after rearm notify ensure no new incoming event
           // arrived between polling and rearm
+          tx_cq->rearm_notify();
           rx_cq->rearm_notify();
           rearmed = true;
           continue;
         }
 
-        struct pollfd channel_poll;
-        channel_poll.fd = rx_cc->get_fd();
-        channel_poll.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
-        channel_poll.revents = 0;
-        int r = 0;
+        struct pollfd channel_poll[2];
+        channel_poll[0].fd = tx_cc->get_fd();
+        channel_poll[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
+        channel_poll[0].revents = 0;
+        channel_poll[1].fd = rx_cc->get_fd();
+        channel_poll[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
+        channel_poll[1].revents = 0;
+        r = 0;
         perf_logger->set(l_msgr_rdma_polling, 0);
         while (!done && r == 0) {
-          r = poll(&channel_poll, 1, 1);
+          r = poll(channel_poll, 2, 1);
           if (r < 0) {
             r = -errno;
             lderr(cct) << __func__ << " poll failed " << r << dendl;
             ceph_abort();
           }
         }
+        if (r > 0 && tx_cc->get_cq_event())
+          ldout(cct, 20) << __func__ << " got tx cq event." << dendl;
         if (r > 0 && rx_cc->get_cq_event())
-          ldout(cct, 20) << __func__ << " got cq event." << dendl;
+          ldout(cct, 20) << __func__ << " got rx cq event." << dendl;
         last_inactive = ceph_clock_now();
         perf_logger->set(l_msgr_rdma_polling, 1);
         rearmed = false;
       }
-      continue;
-    }
-
-    ldout(cct, 20) << __func__ << " pool completion queue got " << n
-                   << " responses."<< dendl;
-    perf_logger->inc(l_msgr_rdma_rx_total_wc, n);
-    Mutex::Locker l(lock);//make sure connected socket alive when pass wc
-    for (int i = 0; i < n; ++i) {
-      ibv_wc* response = &wc[i];
-      Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
-
-      ldout(cct, 25) << __func__ << " got chunk=" << chunk << " bytes:" << response->byte_len << " opcode:" << response->opcode << dendl;
-
-      if (wc[i].opcode == IBV_WC_SEND) {
-        tx_cqe.push_back(wc[i]);
-        ldout(cct, 25) << " got a tx cqe, bytes:" << wc[i].byte_len << dendl;
-        continue;
-      }
-
-      if (response->status != IBV_WC_SUCCESS) {
-        perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
-        ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
-                      << ") status(" << response->status << ":"
-                      << ib->wc_status_to_string(response->status) << ")" << dendl;
-        if (ib->is_rx_buffer(chunk->buffer)) {
-          ret = ib->post_chunk(chunk);
-          if (ret) {
-            ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(ret) << dendl;
-            assert(ret == 0);
-          }
-          conn = get_conn_lockless(response->qp_num);
-          if (conn && conn->is_connected())
-            conn->fault();
-          notify_pending_workers();
-        } else if (ib->is_tx_buffer(chunk->buffer)) {
-          tx_cqe.push_back(wc[i]);
-        } else {
-          ldout(cct, 0) << __func__ << " unknown chunk: " << chunk << dendl;
-        }
-        continue;
-      }
-
-      conn = get_conn_lockless(response->qp_num);
-      if (!conn) {
-        ret = ib->post_chunk(chunk);
-        ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << ret << dendl;
-        assert(ret == 0);
-        continue;
-      }
-      polled[conn].push_back(*response);
-    }
-    for (auto &&i : polled)
-      i.first->pass_wc(std::move(i.second));
-    polled.clear();
-    if (!tx_cqe.empty()) {
-      worker = get_worker_from_list();
-      if (worker == nullptr)
-        worker = dynamic_cast<RDMAWorker*>(stack->get_worker());
-      worker->pass_wc(std::move(tx_cqe));
-      tx_cqe.clear();
     }
   }
 }
 
 void RDMADispatcher::notify_pending_workers() {
-  {
-    Mutex::Locker l(w_lock);
-    if (pending_workers.empty())
-      return ;
+  if (num_pending_workers) {
+    RDMAWorker *w = nullptr;
+    {
+      Mutex::Locker l(w_lock);
+      if (!pending_workers.empty()) {
+        w = pending_workers.front();
+        pending_workers.pop_front();
+        --num_pending_workers;
+      }
+    }
+    if (w)
+      w->notify_worker();
   }
-  pending_workers.front()->notify_worker();
 }
 
 int RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
@@ -265,33 +275,6 @@ int RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
   return fd;
 }
 
-int RDMADispatcher::register_worker(RDMAWorker* w)
-{
-  int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
-  assert(fd >= 0);
-  Mutex::Locker l(w_lock);
-  workers[w] = fd;
-  return fd;
-}
-
-void RDMADispatcher::pending_buffers(RDMAWorker* w)
-{
-  Mutex::Locker l(w_lock);
-  pending_workers.push_back(w);
-}
-
-RDMAWorker* RDMADispatcher::get_worker_from_list()
-{
-  Mutex::Locker l(w_lock);
-  if (pending_workers.empty())
-    return nullptr;
-  else {
-    RDMAWorker* w = pending_workers.front();
-    pending_workers.pop_front();
-    return w;
-  }
-}
-
 RDMAConnectedSocketImpl* RDMADispatcher::get_conn_by_qp(uint32_t qp)
 {
   Mutex::Locker l(lock);
@@ -335,6 +318,75 @@ void RDMADispatcher::handle_post_fork()
   t = std::thread(&RDMADispatcher::polling, this);
 }
 
+void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
+{
+  std::vector<Chunk*> tx_chunks;
+
+  for (int i = 0; i < n; ++i) {
+    ibv_wc* response = &cqe[i];
+    Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
+    ldout(cct, 25) << __func__ << " QP: " << response->qp_num
+                   << " len: " << response->byte_len << " , addr:" << chunk
+                   << " " << ib->wc_status_to_string(response->status) << dendl;
+
+    if (response->status != IBV_WC_SUCCESS) {
+      perf_logger->inc(l_msgr_rdma_tx_total_wc_errors);
+      if (response->status == IBV_WC_RETRY_EXC_ERR) {
+        ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl;
+        perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors);
+      } else if (response->status == IBV_WC_WR_FLUSH_ERR) {
+        ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp="
+                      << response->qp_num << " should be down while this WR=" << response->wr_id
+                      << " still in flight." << dendl;
+        perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors);
+      } else {
+        ldout(cct, 1) << __func__ << " send work request returned error for buffer("
+                      << response->wr_id << ") status(" << response->status << "): "
+                      << ib->wc_status_to_string(response->status) << dendl;
+      }
+
+      Mutex::Locker l(lock);//make sure connected socket alive when pass wc
+      RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
+
+      if (conn && conn->is_connected()) {
+        ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi
+        conn->fault();
+      } else {
+        ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
+      }
+    }
+
+    // FIXME: why not tx?
+    if (ib->get_memory_manager()->is_tx_buffer(chunk->buffer))
+      tx_chunks.push_back(chunk);
+    else
+      ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl;
+  }
+
+  perf_logger->inc(l_msgr_rdma_tx_total_wc, n);
+  post_tx_buffer(tx_chunks);
+}
+
+/**
+ * Add the given Chunks to the given free queue.
+ *
+ * \param[in] chunks
+ *      The Chunks to enqueue.
+ * \return
+ *      0 if success or -1 for failure
+ */
+void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
+{
+  if (chunks.empty())
+    return ;
+
+  inflight -= chunks.size();
+  ib->get_memory_manager()->return_tx(chunks);
+  ldout(cct, 30) << __func__ << " release " << chunks.size()
+                 << " chunks, inflight " << inflight << dendl;
+  notify_pending_workers();
+}
+
 
 RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
   : Worker(c, i), stack(nullptr), infiniband(NULL),
@@ -345,11 +397,6 @@ RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
   sprintf(name, "AsyncMessenger::RDMAWorker-%u", id);
   PerfCountersBuilder plb(cct, name, l_msgr_rdma_first, l_msgr_rdma_last);
 
-  plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions");
-  plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors");
-  plb.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors, "tx_retry_errors", "The number of tx retry errors");
-  plb.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors, "tx_wr_flush_errors", "The number of tx work request flush errors");
-
   plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer");
   plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer");
   plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted");
@@ -367,44 +414,16 @@ RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
 RDMAWorker::~RDMAWorker()
 {
   delete tx_handler;
-  if (notify_fd >= 0)
-    ::close(notify_fd);
 }
 
 void RDMAWorker::initialize()
 {
   if (!dispatcher) {
     dispatcher = stack->get_dispatcher();
-    notify_fd = dispatcher->register_worker(this);
-    center.create_file_event(notify_fd, EVENT_READABLE, tx_handler);
     memory_manager = infiniband->get_memory_manager();
   }
 }
 
-void RDMAWorker::notify_worker()
-{
-  uint64_t i = 1;
-  assert(write(notify_fd, &i, sizeof(i)) == sizeof(i));
-}
-
-void RDMAWorker::pass_wc(std::vector<ibv_wc> &&v)
-{
-  Mutex::Locker l(lock);
-  if (wc.empty())
-    wc = std::move(v);
-  else
-    wc.insert(wc.end(), v.begin(), v.end());
-  notify_worker();
-}
-
-void RDMAWorker::get_wc(std::vector<ibv_wc> &w)
-{
-  Mutex::Locker l(lock);
-  if (wc.empty())
-    return ;
-  w.swap(wc);
-}
-
 int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
 {
   auto p = new RDMAServerSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this, sa);
@@ -433,43 +452,29 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co
   return 0;
 }
 
-int RDMAWorker::reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
+int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
 {
   assert(center.in_thread());
   int r = infiniband->get_tx_buffers(c, bytes);
-  if (r > 0) {
-    stack->get_dispatcher()->inflight += r;
-    ldout(cct, 30) << __func__ << " reserve " << r << " chunks, inflight " << dispatcher->inflight << dendl;
+  assert(r >= 0);
+  size_t got = infiniband->get_memory_manager()->get_tx_chunk_size() * r;
+  ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered  bytes, inflight " << dispatcher->inflight << dendl;
+  stack->get_dispatcher()->inflight += r;
+  if (got == bytes)
     return r;
-  }
-  assert(r == 0);
 
   if (o) {
     if (pending_sent_conns.back() != o)
       pending_sent_conns.push_back(o);
-    dispatcher->pending_buffers(this);
+    dispatcher->make_pending_worker(this);
   }
   return r;
 }
 
-/**
- * Add the given Chunks to the given free queue.
- *
- * \param[in] chunks
- *      The Chunks to enqueue.
- * \return
- *      0 if success or -1 for failure
- */
-void RDMAWorker::post_tx_buffer(std::vector<Chunk*> &chunks)
-{
-  assert(center.in_thread());
-  if (chunks.empty())
-    return ;
-
-  dispatcher->inflight -= chunks.size();
-  memory_manager->return_tx(chunks);
-  ldout(cct, 30) << __func__ << " release " << chunks.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl;
 
+void RDMAWorker::handle_pending_message()
+{
+  ldout(cct, 20) << __func__ << " pending conns " << pending_sent_conns.size() << dendl;
   std::set<RDMAConnectedSocketImpl*> done;
   while (!pending_sent_conns.empty()) {
     RDMAConnectedSocketImpl *o = pending_sent_conns.front();
@@ -481,63 +486,17 @@ void RDMAWorker::post_tx_buffer(std::vector<Chunk*> &chunks)
       if (r < 0) {
         if (r == -EAGAIN) {
           pending_sent_conns.push_front(o);
-          break;
+          dispatcher->make_pending_worker(this);
+          return ;
         }
         o->fault();
       }
     }
   }
-}
-
-void RDMAWorker::handle_tx_event()
-{
-  std::vector<Chunk*> tx_chunks;
-  std::vector<ibv_wc> cqe;
-  get_wc(cqe);
-
-  for (size_t i = 0; i < cqe.size(); ++i) {
-    ibv_wc* response = &cqe[i];
-    Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
-    ldout(cct, 25) << __func__ << " QP: " << response->qp_num << " len: " << response->byte_len << " , addr:" << chunk << " " << infiniband->wc_status_to_string(response->status) << dendl;
-
-    if (response->status != IBV_WC_SUCCESS) {
-      perf_logger->inc(l_msgr_rdma_tx_total_wc_errors);
-      if (response->status == IBV_WC_RETRY_EXC_ERR) {
-        ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl;
-        perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors);
-      } else if (response->status == IBV_WC_WR_FLUSH_ERR) {
-        ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp="
-                      << response->qp_num << " should be down while this WR=" << response->wr_id
-                      << " still in flight." << dendl;
-        perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors);
-      } else {
-        ldout(cct, 1) << __func__ << " send work request returned error for buffer("
-                      << response->wr_id << ") status(" << response->status << "): "
-                      << infiniband->wc_status_to_string(response->status) << dendl;
-      }
-      RDMAConnectedSocketImpl *conn = stack->get_dispatcher()->get_conn_by_qp(response->qp_num);
-
-      if (conn && conn->is_connected()) {
-        ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi
-        conn->fault();
-      } else {
-        ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
-      }
-    }
-
-    // FIXME: why not tx?
-    if (memory_manager->is_tx_buffer(chunk->buffer))
-      tx_chunks.push_back(chunk);
-  }
 
-  perf_logger->inc(l_msgr_rdma_tx_total_wc, cqe.size());
-  post_tx_buffer(tx_chunks);
-
-  ldout(cct, 20) << __func__ << " give back " << tx_chunks.size() << " in Worker " << this << dendl;
   dispatcher->notify_pending_workers();
 }
 
-
 RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
 {
   if (!global_infiniband)
index c92af60746a13ba42bf550abc9c74877aaf00f35..be9e70a826503d1e5d7d43ffc1a98af203d5a432 100644 (file)
@@ -40,6 +40,11 @@ enum {
   l_msgr_rdma_polling,
   l_msgr_rdma_inflight_tx_chunks,
 
+  l_msgr_rdma_tx_total_wc,
+  l_msgr_rdma_tx_total_wc_errors,
+  l_msgr_rdma_tx_wc_retry_errors,
+  l_msgr_rdma_tx_wc_wr_flush_errors,
+
   l_msgr_rdma_rx_total_wc,
   l_msgr_rdma_rx_total_wc_errors,
   l_msgr_rdma_rx_fin,
@@ -63,12 +68,12 @@ class RDMADispatcher : public CephContext::ForkWatcher {
   std::thread t;
   CephContext *cct;
   Infiniband* ib;
-  Infiniband::CompletionQueue* rx_cq;           // common completion queue for all transmits
-  Infiniband::CompletionChannel* rx_cc;
+  Infiniband::CompletionQueue* tx_cq;
+  Infiniband::CompletionQueue* rx_cq;
+  Infiniband::CompletionChannel *tx_cc, *rx_cc;
   EventCallbackRef async_handler;
   bool done = false;
-  Mutex lock; // protect `qp_conns
-  Mutex w_lock; // protect pending workers
+  Mutex lock; // protect `qp_conns`, `dead_queue_pairs`
   // qp_num -> InfRcConnection
   // The main usage of `qp_conns` is looking up connection by qp_num,
   // so the lifecycle of element in `qp_conns` is the lifecycle of qp.
@@ -90,8 +95,10 @@ class RDMADispatcher : public CephContext::ForkWatcher {
   /// save them in this vector and delete them at a safe time, when there are
   /// no outstanding transmit buffers to be lost.
   std::vector<QueuePair*> dead_queue_pairs;
-  Mutex qp_lock;//for csi reuse qp
-  ceph::unordered_map<RDMAWorker*, int> workers;;
+
+  std::atomic<uint64_t> num_pending_workers = {0};
+  Mutex w_lock; // protect pending workers
+  // fixme: lockfree
   std::list<RDMAWorker*> pending_workers;
   RDMAStack* stack;
 
@@ -113,17 +120,24 @@ class RDMADispatcher : public CephContext::ForkWatcher {
   void handle_async_event();
   void polling();
   int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
-  int register_worker(RDMAWorker* w);
-  void pending_buffers(RDMAWorker* w);
+  void make_pending_worker(RDMAWorker* w) {
+    Mutex::Locker l(w_lock);
+    if (pending_workers.back() != w) {
+      pending_workers.push_back(w);
+      ++num_pending_workers;
+    }
+  }
   RDMAStack* get_stack() { return stack; }
-  RDMAWorker* get_worker_from_list();
   RDMAConnectedSocketImpl* get_conn_by_qp(uint32_t qp);
   RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp);
   void erase_qpn(uint32_t qpn);
+  Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; }
   Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; }
   void notify_pending_workers();
   virtual void handle_pre_fork() override;
   virtual void handle_post_fork() override;
+  void handle_tx_event(ibv_wc *cqe, int n);
+  void post_tx_buffer(std::vector<Chunk*> &chunks);
 
   std::atomic<uint64_t> inflight = {0};
 };
@@ -132,11 +146,6 @@ class RDMADispatcher : public CephContext::ForkWatcher {
 enum {
   l_msgr_rdma_first = 95000,
 
-  l_msgr_rdma_tx_total_wc,
-  l_msgr_rdma_tx_total_wc_errors,
-  l_msgr_rdma_tx_wc_retry_errors,
-  l_msgr_rdma_tx_wc_wr_flush_errors,
-
   l_msgr_rdma_tx_no_mem,
   l_msgr_rdma_tx_parital_mem,
   l_msgr_rdma_tx_failed,
@@ -162,16 +171,14 @@ class RDMAWorker : public Worker {
   MemoryManager *memory_manager;
   std::list<RDMAConnectedSocketImpl*> pending_sent_conns;
   RDMADispatcher* dispatcher = nullptr;
-  int notify_fd = -1;
   Mutex lock;
-  std::vector<ibv_wc> wc;
 
   class C_handle_cq_tx : public EventCallback {
     RDMAWorker *worker;
     public:
     C_handle_cq_tx(RDMAWorker *w): worker(w) {}
     void do_request(int fd) {
-      worker->handle_tx_event();
+      worker->handle_pending_message();
     }
   };
 
@@ -179,22 +186,21 @@ class RDMAWorker : public Worker {
   PerfCounters *perf_logger;
   explicit RDMAWorker(CephContext *c, unsigned i);
   virtual ~RDMAWorker();
-  void notify_worker();
-  void pass_wc(std::vector<ibv_wc> &&v);
-  void get_wc(std::vector<ibv_wc> &w);
   virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override;
   virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
   virtual void initialize() override;
   RDMAStack *get_stack() { return stack; }
-  int reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes);
-  void post_tx_buffer(std::vector<Chunk*> &chunks);
+  int get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes);
   void remove_pending_conn(RDMAConnectedSocketImpl *o) {
     assert(center.in_thread());
     pending_sent_conns.remove(o);
   }
-  void handle_tx_event();
+  void handle_pending_message();
   void set_ib(Infiniband* ib) { infiniband = ib; }
   void set_stack(RDMAStack *s) { stack = s; }
+  void notify_worker() {
+    center.dispatch_event_external(tx_handler);
+  }
 };
 
 class RDMAConnectedSocketImpl : public ConnectedSocketImpl {