]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Revert "msg/async/rdma: Move async event handling to Device"
authorAmir Vadai <amir@vadai.me>
Tue, 23 May 2017 07:33:49 +0000 (10:33 +0300)
committerAmir Vadai <amir@vadai.me>
Tue, 23 May 2017 14:04:26 +0000 (17:04 +0300)
This reverts commit 4b10ba7578b5d378edf715711bbf222dfe41b91d.

Change-Id: I9a294890c1cf01461356d87491bfa5e954b42444
Issue: 995322
Signed-off-by: Amir Vadai <amir@vadai.me>
src/msg/async/rdma/Device.cc
src/msg/async/rdma/Device.h
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h

index 58700db2c210e67391d40be35feedf5d0b4e3f3b..c629f2c9edfb6ada97d6137af5237b2931b8357d 100644 (file)
@@ -108,9 +108,8 @@ Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt
 }
 
 
-Device::Device(CephContext *cct, Infiniband *ib, ibv_device* d)
+Device::Device(CephContext *cct, ibv_device* d)
   : cct(cct), device(d), lock("ibdev_lock"),
-    async_handler(new C_handle_cq_async(this)), infiniband(ib),
     device_attr(new ibv_device_attr), active_port(nullptr)
 {
   if (device == NULL) {
@@ -357,26 +356,8 @@ void Device::rearm_cqs()
   assert(!ret);
 }
 
-void Device::handle_async_event()
-{
-  ibv_async_event async_event;
-
-  ldout(cct, 30) << __func__ << dendl;
 
-  while (!ibv_get_async_event(ctxt, &async_event)) {
-    infiniband->process_async_event(async_event);
-
-    ibv_ack_async_event(&async_event);
-  }
-
-  if (errno != EAGAIN) {
-    lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
-      << " " << cpp_strerror(errno) << ")" << dendl;
-  }
-}
-
-
-DeviceList::DeviceList(CephContext *cct, Infiniband *ib)
+DeviceList::DeviceList(CephContext *cct)
   : cct(cct), device_list(ibv_get_device_list(&num))
 {
   if (device_list == NULL || num == 0) {
@@ -391,7 +372,7 @@ DeviceList::DeviceList(CephContext *cct, Infiniband *ib)
     struct pollfd *pfd = &poll_fds[i * 2];
     struct Device *d;
 
-    d = new Device(cct, ib, device_list[i]);
+    d = new Device(cct, device_list[i]);
     devices[i] = d;
 
     pfd[0].fd = d->tx_cc->get_fd();
@@ -489,9 +470,3 @@ void DeviceList::rearm_notify()
   for (int i = 0; i < num; i++)
     devices[i]->rearm_cqs();
 }
-
-void DeviceList::handle_async_event()
-{
-  for (int i = 0; i < num; i++)
-    devices[i]->handle_async_event();
-}
index 2fb46dc13b4c11514638b3c49c4a0d227714e016..a19640c2baf76591c4d0f62bda2c86b657b278b6 100644 (file)
@@ -29,7 +29,6 @@
 #include "msg/msg_types.h"
 #include "msg/async/net_handler.h"
 #include "common/Mutex.h"
-#include "msg/async/Event.h"
 
 typedef Infiniband::QueuePair QueuePair;
 typedef Infiniband::CompletionChannel CompletionChannel;
@@ -58,15 +57,6 @@ class Port {
 
 
 class Device {
-  class C_handle_cq_async : public EventCallback {
-    Device *device;
-  public:
-    C_handle_cq_async(Device *d): device(d) {}
-    void do_request(int fd) {
-      device->handle_async_event();
-    }
-  };
-
   CephContext *cct;
   ibv_device *device;
   const char *name;
@@ -78,18 +68,14 @@ class Device {
 
   Mutex lock; // Protects from concurrent intialization of the device
   bool initialized = false;
-  EventCallbackRef async_handler;
-  Infiniband *infiniband;
 
  public:
-  explicit Device(CephContext *c, Infiniband *ib, ibv_device* d);
+  explicit Device(CephContext *c, ibv_device* d);
   ~Device();
 
   void init();
   void uninit();
 
-  void handle_async_event();
-
   const char* get_name() const { return name;}
   uint16_t get_lid() { return active_port->get_lid(); }
   ibv_gid get_gid() { return active_port->get_gid(); }
@@ -141,7 +127,7 @@ class DeviceList {
   struct pollfd *poll_fds;
 
  public:
-  DeviceList(CephContext *cct, Infiniband *ib);
+  DeviceList(CephContext *cct);
   ~DeviceList();
 
   Device* get_device(const char* device_name);
@@ -150,8 +136,6 @@ class DeviceList {
   int poll_tx(int n, Device **d, ibv_wc *wc);
   int poll_rx(int n, Device **d, ibv_wc *wc);
   int poll_blocking(bool &done);
-
-  void handle_async_event();
 };
 
 #endif
index b3bf617864e7659b98d296846a3e8b0257a5710d..66b669b45d6fec63c2cb4fa169a4b41f1c5a8c97 100644 (file)
@@ -591,7 +591,7 @@ int Infiniband::MemoryManager::get_channel_buffers(std::vector<Chunk*> &chunks,
 
 
 Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num)
-  : device_list(new DeviceList(cct, this))
+  : device_list(new DeviceList(cct))
 {
   device = device_list->get_device(device_name.c_str());
 
@@ -795,13 +795,3 @@ void Infiniband::rearm_notify()
 {
   device_list->rearm_notify();
 }
-
-void Infiniband::handle_async_event()
-{
-  device_list->handle_async_event();
-}
-
-void Infiniband::process_async_event(ibv_async_event &async_event)
-{
-  dispatcher->process_async_event(async_event);
-}
index c24448d824f61297c2c934106873c461e333dd21..a3da3edd4c05aa75ed4ecf7c0e92fb42d1197348 100644 (file)
@@ -287,8 +287,6 @@ class Infiniband {
   int poll_rx(int n, Device **d, ibv_wc *wc);
   int poll_blocking(bool &done);
   void rearm_notify();
-  void handle_async_event();
-  void process_async_event(ibv_async_event &async_event);
 };
 
 #endif
index 21241e28cb93ef90562e656c66323ff27ce008eb..40d9be96b60cd4992c7750312462c469040a7195 100644 (file)
@@ -44,7 +44,7 @@ RDMADispatcher::~RDMADispatcher()
 }
 
 RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
-  : cct(c), lock("RDMADispatcher::lock"),
+  : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
   w_lock("RDMADispatcher::for worker pending list"), stack(s)
 {
   PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
@@ -91,29 +91,40 @@ void RDMADispatcher::polling_stop()
   t.join();
 }
 
-void RDMADispatcher::process_async_event(ibv_async_event &async_event)
+void RDMADispatcher::handle_async_event()
 {
-  perf_logger->inc(l_msgr_rdma_total_async_events);
-  // FIXME: Currently we must ensure no other factor make QP in ERROR state,
-  // otherwise this qp can't be deleted in current cleanup flow.
-  if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
-    perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
-    uint64_t qpn = async_event.element.qp->qp_num;
-    ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
-      << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
-    Mutex::Locker l(lock);
-    RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
-    if (!conn) {
-      ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
+  ldout(cct, 30) << __func__ << dendl;
+  while (1) {
+    ibv_async_event async_event;
+    if (ibv_get_async_event(global_infiniband->get_device()->ctxt, &async_event)) {
+      if (errno != EAGAIN)
+       lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
+                  << " " << cpp_strerror(errno) << ")" << dendl;
+      return;
+    }
+    perf_logger->inc(l_msgr_rdma_total_async_events);
+    // FIXME: Currently we must ensure no other factor make QP in ERROR state,
+    // otherwise this qp can't be deleted in current cleanup flow.
+    if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
+      perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
+      uint64_t qpn = async_event.element.qp->qp_num;
+      ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
+                     << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
+      Mutex::Locker l(lock);
+      RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
+      if (!conn) {
+        ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
+      } else {
+        ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
+        conn->fault();
+        erase_qpn_lockless(qpn);
+      }
     } else {
-      ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
-      conn->fault();
-      erase_qpn_lockless(qpn);
+      ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
+                    << " evt: " << ibv_event_type_str(async_event.event_type)
+                    << dendl;
     }
-  } else {
-    ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
-      << " evt: " << ibv_event_type_str(async_event.event_type)
-      << dendl;
+    ibv_ack_async_event(&async_event);
   }
 }
 
@@ -209,7 +220,7 @@ void RDMADispatcher::polling()
         break;
 
       if ((ceph_clock_now() - last_inactive).to_nsec() / 1000 > cct->_conf->ms_async_rdma_polling_us) {
-        global_infiniband->handle_async_event();
+        handle_async_event();
         if (!rearmed) {
           // Clean up cq events after rearm notify ensure no new incoming event
           // arrived between polling and rearm
index 4122975d5d7ea892e23ed245d6bfeb9983f75785..733650dc9727f6a72f8f0e5b907a1c5e36cb0efb 100644 (file)
@@ -68,6 +68,7 @@ class RDMADispatcher : public CephContext::ForkWatcher {
 
   std::thread t;
   CephContext *cct;
+  EventCallbackRef async_handler;
   bool done = false;
   std::atomic<uint64_t> num_dead_queue_pair = {0};
   std::atomic<uint64_t> num_qp_conn = {0};
@@ -100,13 +101,23 @@ class RDMADispatcher : public CephContext::ForkWatcher {
   std::list<RDMAWorker*> pending_workers;
   RDMAStack* stack;
 
+  class C_handle_cq_async : public EventCallback {
+    RDMADispatcher *dispatcher;
+   public:
+    C_handle_cq_async(RDMADispatcher *w): dispatcher(w) {}
+    void do_request(int fd) {
+      // worker->handle_tx_event();
+      dispatcher->handle_async_event();
+    }
+  };
+
  public:
   PerfCounters *perf_logger;
 
   explicit RDMADispatcher(CephContext* c, RDMAStack* s);
   virtual ~RDMADispatcher();
 
-  void process_async_event(ibv_async_event &async_event);
+  void handle_async_event();
 
   void polling_start();
   void polling_stop();