]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/rdma: Move async event handling to Device 14088/head
authorAmir Vadai <amir@vadai.me>
Wed, 8 Feb 2017 11:43:13 +0000 (13:43 +0200)
committerAdir Lev <adirl@mellanox.com>
Thu, 23 Mar 2017 10:17:58 +0000 (12:17 +0200)
issue: none

Change-Id: I5e1ee73bb2d0d751774eaeb7dd2950c704894caf
Signed-off-by: Amir Vadai <amir@vadai.me>
src/msg/async/rdma/Device.cc
src/msg/async/rdma/Device.h
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h

index c629f2c9edfb6ada97d6137af5237b2931b8357d..58700db2c210e67391d40be35feedf5d0b4e3f3b 100644 (file)
@@ -108,8 +108,9 @@ Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt
 }
 
 
-Device::Device(CephContext *cct, ibv_device* d)
+Device::Device(CephContext *cct, Infiniband *ib, ibv_device* d)
   : cct(cct), device(d), lock("ibdev_lock"),
+    async_handler(new C_handle_cq_async(this)), infiniband(ib),
     device_attr(new ibv_device_attr), active_port(nullptr)
 {
   if (device == NULL) {
@@ -356,8 +357,26 @@ void Device::rearm_cqs()
   assert(!ret);
 }
 
+void Device::handle_async_event()
+{
+  ibv_async_event async_event;
+
+  ldout(cct, 30) << __func__ << dendl;
 
-DeviceList::DeviceList(CephContext *cct)
+  while (!ibv_get_async_event(ctxt, &async_event)) {
+    infiniband->process_async_event(async_event);
+
+    ibv_ack_async_event(&async_event);
+  }
+
+  if (errno != EAGAIN) {
+    lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
+      << " " << cpp_strerror(errno) << ")" << dendl;
+  }
+}
+
+
+DeviceList::DeviceList(CephContext *cct, Infiniband *ib)
   : cct(cct), device_list(ibv_get_device_list(&num))
 {
   if (device_list == NULL || num == 0) {
@@ -372,7 +391,7 @@ DeviceList::DeviceList(CephContext *cct)
     struct pollfd *pfd = &poll_fds[i * 2];
     struct Device *d;
 
-    d = new Device(cct, device_list[i]);
+    d = new Device(cct, ib, device_list[i]);
     devices[i] = d;
 
     pfd[0].fd = d->tx_cc->get_fd();
@@ -470,3 +489,9 @@ void DeviceList::rearm_notify()
   for (int i = 0; i < num; i++)
     devices[i]->rearm_cqs();
 }
+
+void DeviceList::handle_async_event()
+{
+  for (int i = 0; i < num; i++)
+    devices[i]->handle_async_event();
+}
index a19640c2baf76591c4d0f62bda2c86b657b278b6..2fb46dc13b4c11514638b3c49c4a0d227714e016 100644 (file)
@@ -29,6 +29,7 @@
 #include "msg/msg_types.h"
 #include "msg/async/net_handler.h"
 #include "common/Mutex.h"
+#include "msg/async/Event.h"
 
 typedef Infiniband::QueuePair QueuePair;
 typedef Infiniband::CompletionChannel CompletionChannel;
@@ -57,6 +58,15 @@ class Port {
 
 
 class Device {
+  class C_handle_cq_async : public EventCallback {
+    Device *device;
+  public:
+    C_handle_cq_async(Device *d): device(d) {}
+    void do_request(int fd) {
+      device->handle_async_event();
+    }
+  };
+
   CephContext *cct;
   ibv_device *device;
   const char *name;
@@ -68,14 +78,18 @@ class Device {
 
   Mutex lock; // Protects from concurrent intialization of the device
   bool initialized = false;
+  EventCallbackRef async_handler;
+  Infiniband *infiniband;
 
  public:
-  explicit Device(CephContext *c, ibv_device* d);
+  explicit Device(CephContext *c, Infiniband *ib, ibv_device* d);
   ~Device();
 
   void init();
   void uninit();
 
+  void handle_async_event();
+
   const char* get_name() const { return name;}
   uint16_t get_lid() { return active_port->get_lid(); }
   ibv_gid get_gid() { return active_port->get_gid(); }
@@ -127,7 +141,7 @@ class DeviceList {
   struct pollfd *poll_fds;
 
  public:
-  DeviceList(CephContext *cct);
+  DeviceList(CephContext *cct, Infiniband *ib);
   ~DeviceList();
 
   Device* get_device(const char* device_name);
@@ -136,6 +150,8 @@ class DeviceList {
   int poll_tx(int n, Device **d, ibv_wc *wc);
   int poll_rx(int n, Device **d, ibv_wc *wc);
   int poll_blocking(bool &done);
+
+  void handle_async_event();
 };
 
 #endif
index 73f36ae9587594f895f8f4e14c0c9e7ee5550028..9fe77b6742206d4843219378e8979a9b7adb8a34 100644 (file)
@@ -594,7 +594,7 @@ int Infiniband::MemoryManager::get_channel_buffers(std::vector<Chunk*> &chunks,
 
 
 Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num)
-  : device_list(new DeviceList(cct))
+  : device_list(new DeviceList(cct, this))
 {
   device = device_list->get_device(device_name.c_str());
 
@@ -798,3 +798,13 @@ void Infiniband::rearm_notify()
 {
   device_list->rearm_notify();
 }
+
+void Infiniband::handle_async_event()
+{
+  device_list->handle_async_event();
+}
+
+void Infiniband::process_async_event(ibv_async_event &async_event)
+{
+  dispatcher->process_async_event(async_event);
+}
index 68b03cebce35b38c2fab03dc09635fe1a22a1d59..10184823d98470669bdc9da2ebce3c84bad9b521 100644 (file)
@@ -287,6 +287,8 @@ class Infiniband {
   int poll_rx(int n, Device **d, ibv_wc *wc);
   int poll_blocking(bool &done);
   void rearm_notify();
+  void handle_async_event();
+  void process_async_event(ibv_async_event &async_event);
 };
 
 #endif
index be09a75717b507f62921c5ee8cf5886ecbb3ebf8..b50b5f62f0d809ee9e16d83303faca996a893db4 100644 (file)
@@ -44,7 +44,7 @@ RDMADispatcher::~RDMADispatcher()
 }
 
 RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
-  : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
+  : cct(c), lock("RDMADispatcher::lock"),
   w_lock("RDMADispatcher::for worker pending list"), stack(s)
 {
   PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
@@ -90,40 +90,29 @@ void RDMADispatcher::polling_stop()
   t.join();
 }
 
-void RDMADispatcher::handle_async_event()
+void RDMADispatcher::process_async_event(ibv_async_event &async_event)
 {
-  ldout(cct, 30) << __func__ << dendl;
-  while (1) {
-    ibv_async_event async_event;
-    if (ibv_get_async_event(global_infiniband->get_device()->ctxt, &async_event)) {
-      if (errno != EAGAIN)
-       lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
-                  << " " << cpp_strerror(errno) << ")" << dendl;
-      return;
-    }
-    perf_logger->inc(l_msgr_rdma_total_async_events);
-    // FIXME: Currently we must ensure no other factor make QP in ERROR state,
-    // otherwise this qp can't be deleted in current cleanup flow.
-    if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
-      perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
-      uint64_t qpn = async_event.element.qp->qp_num;
-      ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
-                     << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
-      Mutex::Locker l(lock);
-      RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
-      if (!conn) {
-        ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
-      } else {
-        ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
-        conn->fault();
-        erase_qpn_lockless(qpn);
-      }
+  perf_logger->inc(l_msgr_rdma_total_async_events);
+  // FIXME: Currently we must ensure no other factor make QP in ERROR state,
+  // otherwise this qp can't be deleted in current cleanup flow.
+  if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
+    perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
+    uint64_t qpn = async_event.element.qp->qp_num;
+    ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
+      << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
+    Mutex::Locker l(lock);
+    RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
+    if (!conn) {
+      ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
     } else {
-      ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
-                    << " evt: " << ibv_event_type_str(async_event.event_type)
-                    << dendl;
+      ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
+      conn->fault();
+      erase_qpn_lockless(qpn);
     }
-    ibv_ack_async_event(&async_event);
+  } else {
+    ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
+      << " evt: " << ibv_event_type_str(async_event.event_type)
+      << dendl;
   }
 }
 
@@ -217,7 +206,7 @@ void RDMADispatcher::polling()
         break;
 
       if ((ceph_clock_now() - last_inactive).to_nsec() / 1000 > cct->_conf->ms_async_rdma_polling_us) {
-        handle_async_event();
+        global_infiniband->handle_async_event();
         if (!rearmed) {
           // Clean up cq events after rearm notify ensure no new incoming event
           // arrived between polling and rearm
index da279f0486640f8272536020b23b8a920ee14f70..2a775e4ac872aa02e211818c773433e9b3edc5e2 100644 (file)
@@ -67,7 +67,6 @@ class RDMADispatcher : public CephContext::ForkWatcher {
 
   std::thread t;
   CephContext *cct;
-  EventCallbackRef async_handler;
   bool done = false;
   std::atomic<uint64_t> num_dead_queue_pair = {0};
   std::atomic<uint64_t> num_qp_conn = {0};
@@ -100,23 +99,13 @@ class RDMADispatcher : public CephContext::ForkWatcher {
   std::list<RDMAWorker*> pending_workers;
   RDMAStack* stack;
 
-  class C_handle_cq_async : public EventCallback {
-    RDMADispatcher *dispatcher;
-   public:
-    C_handle_cq_async(RDMADispatcher *w): dispatcher(w) {}
-    void do_request(int fd) {
-      // worker->handle_tx_event();
-      dispatcher->handle_async_event();
-    }
-  };
-
  public:
   PerfCounters *perf_logger;
 
   explicit RDMADispatcher(CephContext* c, RDMAStack* s);
   virtual ~RDMADispatcher();
 
-  void handle_async_event();
+  void process_async_event(ibv_async_event &async_event);
 
   void polling_start();
   void polling_stop();