]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
RDMA: Move defenitions from RDMAStack.h into .cc
authorAmir Vadai <amir@vadai.me>
Mon, 2 Jan 2017 08:36:32 +0000 (10:36 +0200)
committerAmir Vadai <amir@vadai.me>
Mon, 2 Jan 2017 10:12:04 +0000 (12:12 +0200)
Signed-off-by: Amir Vadai <amir@vadai.me>
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAServerSocketImpl.cc
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h

index 1f97c3e21dd8b7d6a7334afbf23aec848fd65d6e..f8f6e79e46c40057891173cebe582892dfd5e022 100644 (file)
 #undef dout_prefix
 #define dout_prefix *_dout << " RDMAConnectedSocketImpl "
 
+RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
+                                                RDMAWorker *w)
+  : cct(cct), connected(0), error(0), infiniband(ib),
+    dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"),
+    is_server(false), con_handler(new C_handle_connection(this)),
+    active(false), detached(false)
+{
+  qp = infiniband->create_queue_pair(
+                                    cct, s->get_rx_cq(), s->get_rx_cq(), IBV_QPT_RC);
+  my_msg.qpn = qp->get_local_qp_number();
+  my_msg.psn = qp->get_initial_psn();
+  my_msg.lid = infiniband->get_lid();
+  my_msg.peer_qpn = 0;
+  my_msg.gid = infiniband->get_gid();
+  notify_fd = dispatcher->register_qp(qp, this);
+}
+
+RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
+{
+  worker->remove_pending_conn(this);
+  dispatcher->erase_qpn(my_msg.qpn);
+  cleanup();
+  if (notify_fd >= 0)
+    ::close(notify_fd);
+  if (tcp_fd >= 0)
+    ::close(tcp_fd);
+  error = ECONNRESET;
+  Mutex::Locker l(lock);
+  for (unsigned i=0; i < wc.size(); ++i)
+    infiniband->recall_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
+  for (unsigned i=0; i < buffers.size(); ++i)
+    infiniband->recall_chunk(buffers[i]);
+}
+
+void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
+{
+  Mutex::Locker l(lock);
+  if (wc.empty())
+    wc = std::move(v);
+  else
+    wc.insert(wc.end(), v.begin(), v.end());
+  notify();
+}
+
+void RDMAConnectedSocketImpl::get_wc(std::vector<ibv_wc> &w)
+{
+  Mutex::Locker l(lock);
+  if (wc.empty())
+    return ;
+  w.swap(wc);
+}
+
 int RDMAConnectedSocketImpl::activate()
 {
   ibv_qp_attr qpa;
@@ -467,3 +519,45 @@ void RDMAConnectedSocketImpl::cleanup() {
     con_handler = nullptr;
   }
 }
+
+void RDMAConnectedSocketImpl::notify()
+{
+  uint64_t i = 1;
+  assert(write(notify_fd, &i, sizeof(i)) == sizeof(i));
+}
+
+void RDMAConnectedSocketImpl::shutdown()
+{
+  if (!error)
+    fin();
+  error = ECONNRESET;
+  active = false;
+}
+
+void RDMAConnectedSocketImpl::close()
+{
+  if (!error)
+    fin();
+  error = ECONNRESET;
+  active = false;
+}
+
+void RDMAConnectedSocketImpl::fault()
+{
+  /*if (qp) {
+    qp->to_dead();
+    qp = NULL;
+    }*/
+  error = ECONNRESET;
+  connected = 1;
+  notify();
+}
+
+void RDMAConnectedSocketImpl::set_accept_fd(int sd)
+{
+  tcp_fd = sd;
+  is_server = true;
+  worker->center.submit_to(worker->center.get_id(), [this]() {
+                          worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
+                          }, true);
+}
index 03be56da75988f56c8c0c277ff0d578d7a879c7a..231865dd9fac85e1f54424f14a8401bd1a0b142d 100644 (file)
 #undef dout_prefix
 #define dout_prefix *_dout << " RDMAServerSocketImpl "
 
+RDMAServerSocketImpl::RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
+  : cct(cct), net(cct), server_setup_socket(-1), infiniband(i), dispatcher(s), worker(w), sa(a)
+{
+}
+
 int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt)
 {
   int rc = 0;
@@ -106,3 +111,9 @@ int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt
 
   return 0;
 }
+
+void RDMAServerSocketImpl::abort_accept()
+{
+  if (server_setup_socket >= 0)
+    ::close(server_setup_socket);
+}
index 4d5c5077d75f3105f573d211a0b1917d7c12eec7..d33ab1a0c2e191824b81043680cb250dd291f4b6 100644 (file)
 
 static Infiniband* global_infiniband;
 
-RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
-  : Worker(c, i), stack(nullptr), infiniband(NULL),
-    tx_handler(new C_handle_cq_tx(this)), memory_manager(NULL), lock("RDMAWorker::lock"), pended(false)
-{
-}
-
-int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
-{
-  auto p = new RDMAServerSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this, sa);
-  int r = p->listen(sa, opt);
-  if (r < 0) {
-    delete p;
-    return r;
-  }
-
-  *sock = ServerSocket(std::unique_ptr<ServerSocketImpl>(p));
-  return 0;
-}
-
-int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
-{
-  RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this);
-  int r = p->try_connect(addr, opts);
-
-  if (r < 0) {
-    ldout(cct, 1) << __func__ << " try connecting failed." << dendl;
-    return r;
-  }
-  std::unique_ptr<RDMAConnectedSocketImpl> csi(p);
-  *socket = ConnectedSocket(std::move(csi));
-  return 0;
-}
-
-
-RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
-{
-  if (!global_infiniband)
-    global_infiniband = new Infiniband(
-      cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
-  ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
-  dispatcher = new RDMADispatcher(cct, global_infiniband, this);
-  unsigned num = get_num_worker();
-  for (unsigned i = 0; i < num; ++i) {
-    RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
-    w->set_ib(global_infiniband);
-    w->set_stack(this);
-  }
-  ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher << dendl;
-}
-
-void RDMAWorker::initialize()
-{
-  if (!dispatcher) {
-    dispatcher = stack->get_dispatcher();
-    notify_fd = dispatcher->register_worker(this);
-    center.create_file_event(notify_fd, EVENT_READABLE, tx_handler);
-    memory_manager = infiniband->get_memory_manager();
-  }
-}
-
-int RDMAWorker::reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
-{
-  int r = infiniband->get_tx_buffers(c, bytes);
-  if (r > 0) {
-    stack->get_dispatcher()->inflight += c.size();
-    ldout(cct, 30) << __func__ << " reserve " << c.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl;
-    return r;
-  }
-  assert(r == 0);
-
-  if (pending_sent_conns.back() != o)
-    pending_sent_conns.push_back(o);
-  dispatcher->pending_buffers(this);
-  return r;
-}
-
-/**
- * Add the given Chunks to the given free queue.
- *
- * \param[in] chunks
- *      The Chunks to enqueue.
- * \return
- *      0 if success or -1 for failure
- */
-int RDMAWorker::post_tx_buffer(std::vector<Chunk*> &chunks)
-{
-  if (chunks.empty())
-    return 0;
-
-  stack->get_dispatcher()->inflight -= chunks.size();
-  memory_manager->return_tx(chunks);
-  ldout(cct, 30) << __func__ << " release " << chunks.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl;
-
-  pended = false;
-  std::set<RDMAConnectedSocketImpl*> done;
-  while (!pending_sent_conns.empty()) {
-    RDMAConnectedSocketImpl *o = pending_sent_conns.front();
-    if (done.count(o) == 0) {
-      done.insert(o);
-    } else {
-      pending_sent_conns.pop_front();
-      continue;
-    }
-    ssize_t r = o->submit(false);
-    ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
-    if (r < 0) {
-      if (r == -EAGAIN)
-        break;
-      o->fault();
-    }
-    pending_sent_conns.pop_front();
-  }
-  return 0;
-}
-
-void RDMAWorker::handle_tx_event()
-{
-  std::vector<Chunk*> tx_chunks;
-  std::vector<ibv_wc> cqe;
-  get_wc(cqe);
-
-  for (size_t i = 0; i < cqe.size(); ++i) {
-    ibv_wc* response = &cqe[i];
-    Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
-    ldout(cct, 25) << __func__ << " QP: " << response->qp_num << " len: " << response->byte_len << " , addr:" << chunk << " " << infiniband->wc_status_to_string(response->status) << dendl;
-
-    if (response->status != IBV_WC_SUCCESS) {
-      if (response->status == IBV_WC_RETRY_EXC_ERR) {
-        ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl;
-      } else if (response->status == IBV_WC_WR_FLUSH_ERR) {
-        ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp="
-                      << response->qp_num << " should be down while this WR=" << response->wr_id
-                      << " still in flight." << dendl;
-      } else {
-        ldout(cct, 1) << __func__ << " send work request returned error for buffer("
-                      << response->wr_id << ") status(" << response->status << "): "
-                      << infiniband->wc_status_to_string(response->status) << dendl;
-      }
-      RDMAConnectedSocketImpl *conn = stack->get_dispatcher()->get_conn_by_qp(response->qp_num);
-      if (conn) {
-        ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi
-        conn->fault();
-      } else {
-        ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
-      }
-    }
-
-    //assert(memory_manager->is_tx_chunk(chunk));
-    if (memory_manager->is_tx_chunk(chunk)) {
-      tx_chunks.push_back(chunk);
-    } else {
-      ldout(cct, 1) << __func__ << " a outter chunk: " << chunk << dendl;//fin
-    }
-  }
-
-  post_tx_buffer(tx_chunks);
-
-  ldout(cct, 20) << __func__ << " give back " << tx_chunks.size() << " in Worker " << this << dendl;
-  dispatcher->notify_pending_workers();
-}
-
 RDMADispatcher::~RDMADispatcher()
 {
   done = true;
@@ -208,6 +47,18 @@ RDMADispatcher::~RDMADispatcher()
   delete async_handler;
 }
 
+RDMADispatcher::RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s)
+  : cct(c), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
+  w_lock("RDMADispatcher::for worker pending list"), qp_lock("for qp lock"), stack(s)
+{
+  rx_cc = ib->create_comp_channel(c);
+  assert(rx_cc);
+  rx_cq = ib->create_comp_queue(c, rx_cc);
+  assert(rx_cq);
+  t = std::thread(&RDMADispatcher::polling, this);
+  cct->register_fork_watcher(this);
+}
+
 void RDMADispatcher::handle_async_event()
 {
   ldout(cct, 20) << __func__ << dendl;
@@ -358,3 +209,302 @@ void RDMADispatcher::notify_pending_workers() {
     pending_workers.front()->pass_wc(std::move(vector<ibv_wc>()));
     pending_workers.pop_front();
 }
+
+int RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
+{
+  int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
+  assert(fd >= 0);
+  Mutex::Locker l(lock);
+  assert(!qp_conns.count(qp->get_local_qp_number()));
+  qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
+  return fd;
+}
+
+int RDMADispatcher::register_worker(RDMAWorker* w)
+{
+  int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
+  assert(fd >= 0);
+  Mutex::Locker l(w_lock);
+  workers[w] = fd;
+  return fd;
+}
+
+void RDMADispatcher::pending_buffers(RDMAWorker* w)
+{
+  Mutex::Locker l(w_lock);
+  pending_workers.push_back(w);
+}
+
+RDMAWorker* RDMADispatcher::get_worker_from_list()
+{
+  Mutex::Locker l(w_lock);
+  if (pending_workers.empty())
+    return nullptr;
+  else {
+    RDMAWorker* w = pending_workers.front();
+    pending_workers.pop_front();
+    return w;
+  }
+}
+
+RDMAConnectedSocketImpl* RDMADispatcher::get_conn_by_qp(uint32_t qp)
+{
+  Mutex::Locker l(lock);
+  auto it = qp_conns.find(qp);
+  if (it == qp_conns.end())
+    return nullptr;
+  if (it->second.first->is_dead())
+    return nullptr;
+  return it->second.second;
+}
+
+RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp)
+{
+  auto it = qp_conns.find(qp);
+  if (it == qp_conns.end())
+    return nullptr;
+  if (it->second.first->is_dead())
+    return nullptr;
+  return it->second.second;
+}
+
+void RDMADispatcher::erase_qpn(uint32_t qpn)
+{
+  Mutex::Locker l(lock);
+  auto it = qp_conns.find(qpn);
+  if (it == qp_conns.end())
+    return ;
+  dead_queue_pairs.push_back(it->second.first);
+  qp_conns.erase(it);
+}
+
+void RDMADispatcher::handle_pre_fork()
+{
+  done = true;
+  t.join();
+  done = false;
+}
+
+void RDMADispatcher::handle_post_fork()
+{
+  t = std::thread(&RDMADispatcher::polling, this);
+}
+
+
+RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
+  : Worker(c, i), stack(nullptr), infiniband(NULL),
+    tx_handler(new C_handle_cq_tx(this)), memory_manager(NULL), lock("RDMAWorker::lock"), pended(false)
+{
+}
+
+RDMAWorker::~RDMAWorker()
+{
+  delete tx_handler;
+  if (notify_fd >= 0)
+    ::close(notify_fd);
+}
+
+void RDMAWorker::initialize()
+{
+  if (!dispatcher) {
+    dispatcher = stack->get_dispatcher();
+    notify_fd = dispatcher->register_worker(this);
+    center.create_file_event(notify_fd, EVENT_READABLE, tx_handler);
+    memory_manager = infiniband->get_memory_manager();
+  }
+}
+
+void RDMAWorker::notify()
+{
+  uint64_t i = 1;
+  assert(write(notify_fd, &i, sizeof(i)) == sizeof(i));
+}
+
+void RDMAWorker::pass_wc(std::vector<ibv_wc> &&v)
+{
+  Mutex::Locker l(lock);
+  if (wc.empty())
+    wc = std::move(v);
+  else
+    wc.insert(wc.end(), v.begin(), v.end());
+  notify();
+}
+
+void RDMAWorker::add_pending_conn(RDMAConnectedSocketImpl* o)
+{
+  pending_sent_conns.push_back(o);
+  if (!pended) {
+    dispatcher->pending_buffers(this);
+    pended = true;
+  }
+}
+
+void RDMAWorker::get_wc(std::vector<ibv_wc> &w)
+{
+  Mutex::Locker l(lock);
+  if (wc.empty())
+    return ;
+  w.swap(wc);
+}
+
+int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
+{
+  auto p = new RDMAServerSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this, sa);
+  int r = p->listen(sa, opt);
+  if (r < 0) {
+    delete p;
+    return r;
+  }
+
+  *sock = ServerSocket(std::unique_ptr<ServerSocketImpl>(p));
+  return 0;
+}
+
+int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
+{
+  RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this);
+  int r = p->try_connect(addr, opts);
+
+  if (r < 0) {
+    ldout(cct, 1) << __func__ << " try connecting failed." << dendl;
+    return r;
+  }
+  std::unique_ptr<RDMAConnectedSocketImpl> csi(p);
+  *socket = ConnectedSocket(std::move(csi));
+  return 0;
+}
+
+int RDMAWorker::reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
+{
+  int r = infiniband->get_tx_buffers(c, bytes);
+  if (r > 0) {
+    stack->get_dispatcher()->inflight += c.size();
+    ldout(cct, 30) << __func__ << " reserve " << c.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl;
+    return r;
+  }
+  assert(r == 0);
+
+  if (pending_sent_conns.back() != o)
+    pending_sent_conns.push_back(o);
+  dispatcher->pending_buffers(this);
+  return r;
+}
+
+/**
+ * Add the given Chunks to the given free queue.
+ *
+ * \param[in] chunks
+ *      The Chunks to enqueue.
+ * \return
+ *      0 if success or -1 for failure
+ */
+int RDMAWorker::post_tx_buffer(std::vector<Chunk*> &chunks)
+{
+  if (chunks.empty())
+    return 0;
+
+  stack->get_dispatcher()->inflight -= chunks.size();
+  memory_manager->return_tx(chunks);
+  ldout(cct, 30) << __func__ << " release " << chunks.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl;
+
+  pended = false;
+  std::set<RDMAConnectedSocketImpl*> done;
+  while (!pending_sent_conns.empty()) {
+    RDMAConnectedSocketImpl *o = pending_sent_conns.front();
+    if (done.count(o) == 0) {
+      done.insert(o);
+    } else {
+      pending_sent_conns.pop_front();
+      continue;
+    }
+    ssize_t r = o->submit(false);
+    ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
+    if (r < 0) {
+      if (r == -EAGAIN)
+        break;
+      o->fault();
+    }
+    pending_sent_conns.pop_front();
+  }
+  return 0;
+}
+
+void RDMAWorker::handle_tx_event()
+{
+  std::vector<Chunk*> tx_chunks;
+  std::vector<ibv_wc> cqe;
+  get_wc(cqe);
+
+  for (size_t i = 0; i < cqe.size(); ++i) {
+    ibv_wc* response = &cqe[i];
+    Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
+    ldout(cct, 25) << __func__ << " QP: " << response->qp_num << " len: " << response->byte_len << " , addr:" << chunk << " " << infiniband->wc_status_to_string(response->status) << dendl;
+
+    if (response->status != IBV_WC_SUCCESS) {
+      if (response->status == IBV_WC_RETRY_EXC_ERR) {
+        ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl;
+      } else if (response->status == IBV_WC_WR_FLUSH_ERR) {
+        ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp="
+                      << response->qp_num << " should be down while this WR=" << response->wr_id
+                      << " still in flight." << dendl;
+      } else {
+        ldout(cct, 1) << __func__ << " send work request returned error for buffer("
+                      << response->wr_id << ") status(" << response->status << "): "
+                      << infiniband->wc_status_to_string(response->status) << dendl;
+      }
+      RDMAConnectedSocketImpl *conn = stack->get_dispatcher()->get_conn_by_qp(response->qp_num);
+      if (conn) {
+        ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi
+        conn->fault();
+      } else {
+        ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
+      }
+    }
+
+    //assert(memory_manager->is_tx_chunk(chunk));
+    if (memory_manager->is_tx_chunk(chunk)) {
+      tx_chunks.push_back(chunk);
+    } else {
+      ldout(cct, 1) << __func__ << " a outter chunk: " << chunk << dendl;//fin
+    }
+  }
+
+  post_tx_buffer(tx_chunks);
+
+  ldout(cct, 20) << __func__ << " give back " << tx_chunks.size() << " in Worker " << this << dendl;
+  dispatcher->notify_pending_workers();
+}
+
+
+RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
+{
+  if (!global_infiniband)
+    global_infiniband = new Infiniband(
+      cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
+  ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
+  dispatcher = new RDMADispatcher(cct, global_infiniband, this);
+  unsigned num = get_num_worker();
+  for (unsigned i = 0; i < num; ++i) {
+    RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
+    w->set_ib(global_infiniband);
+    w->set_stack(this);
+  }
+  ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher << dendl;
+}
+
+RDMAStack::~RDMAStack()
+{
+  delete dispatcher;
+}
+
+void RDMAStack::spawn_worker(unsigned i, std::function<void ()> &&func)
+{
+  threads.resize(i+1);
+  threads[i] = std::move(std::thread(func));
+}
+
+void RDMAStack::join_worker(unsigned i)
+{
+  assert(threads.size() > i && threads[i].joinable());
+  threads[i].join();
+}
index f8574068d1edb1a5e6f4c0825536593b511e5c38..b3c726d90d689b8a9559324fc408ae31847f86ad 100644 (file)
@@ -72,6 +72,7 @@ class RDMADispatcher : public CephContext::ForkWatcher {
   ceph::unordered_map<RDMAWorker*, int> workers;;
   std::list<RDMAWorker*> pending_workers;
   RDMAStack* stack;
+
   class C_handle_cq_async : public EventCallback {
     RDMADispatcher *dispatcher;
    public:
@@ -83,87 +84,24 @@ class RDMADispatcher : public CephContext::ForkWatcher {
   };
 
  public:
-  std::atomic<uint64_t> inflight = {0};
-  explicit RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s)
-    : cct(c), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
-      w_lock("RDMADispatcher::for worker pending list"), qp_lock("for qp lock"), stack(s) {
-    rx_cc = ib->create_comp_channel(c);
-    assert(rx_cc);
-    rx_cq = ib->create_comp_queue(c, rx_cc);
-    assert(rx_cq);
-    t = std::thread(&RDMADispatcher::polling, this);
-    cct->register_fork_watcher(this);
-  }
+  explicit RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s);
   virtual ~RDMADispatcher();
   void handle_async_event();
   void polling();
-  int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi) {
-    int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
-    assert(fd >= 0);
-    Mutex::Locker l(lock);
-    assert(!qp_conns.count(qp->get_local_qp_number()));
-    qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
-    return fd;
-  }
-  int register_worker(RDMAWorker* w) {
-    int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
-    assert(fd >= 0);
-    Mutex::Locker l(w_lock);
-    workers[w] = fd;
-    return fd;
-  }
-  void pending_buffers(RDMAWorker* w) {
-    Mutex::Locker l(w_lock);
-    pending_workers.push_back(w);
-  }
-  RDMAStack* get_stack() {
-    return stack;
-  }
-  RDMAWorker* get_worker_from_list() {
-    Mutex::Locker l(w_lock);
-    if (pending_workers.empty())
-      return nullptr;
-    else {
-      RDMAWorker* w = pending_workers.front();
-      pending_workers.pop_front();
-      return w;
-    }
-  }
-  RDMAConnectedSocketImpl* get_conn_by_qp(uint32_t qp) {
-    Mutex::Locker l(lock);
-    auto it = qp_conns.find(qp);
-    if (it == qp_conns.end())
-      return nullptr;
-    if (it->second.first->is_dead())
-      return nullptr;
-    return it->second.second;
-  }
-  RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp) {
-    auto it = qp_conns.find(qp);
-    if (it == qp_conns.end())
-      return nullptr;
-    if (it->second.first->is_dead())
-      return nullptr;
-    return it->second.second;
-  }
-  void erase_qpn(uint32_t qpn) {
-    Mutex::Locker l(lock);
-    auto it = qp_conns.find(qpn);
-    if (it == qp_conns.end())
-      return ;
-    dead_queue_pairs.push_back(it->second.first);
-    qp_conns.erase(it);
-  }
+  int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
+  int register_worker(RDMAWorker* w);
+  void pending_buffers(RDMAWorker* w);
+  RDMAStack* get_stack() { return stack; }
+  RDMAWorker* get_worker_from_list();
+  RDMAConnectedSocketImpl* get_conn_by_qp(uint32_t qp);
+  RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp);
+  void erase_qpn(uint32_t qpn);
   Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; }
   void notify_pending_workers();
-  virtual void handle_pre_fork() override {
-    done = true;
-    t.join();
-    done = false;
-  }
-  virtual void handle_post_fork() override {
-    t = std::thread(&RDMADispatcher::polling, this);
-  }
+  virtual void handle_pre_fork() override;
+  virtual void handle_post_fork() override;
+
+  std::atomic<uint64_t> inflight = {0};
 };
 
 
@@ -183,6 +121,7 @@ class RDMAWorker : public Worker {
   Mutex lock;
   std::vector<ibv_wc> wc;
   bool pended;
+
   class C_handle_cq_tx : public EventCallback {
     RDMAWorker *worker;
     public:
@@ -194,54 +133,21 @@ class RDMAWorker : public Worker {
 
  public:
   explicit RDMAWorker(CephContext *c, unsigned i);
-  virtual ~RDMAWorker() {
-    delete tx_handler;
-    if (notify_fd >= 0)
-      ::close(notify_fd);
-  }
-  void notify() {
-    uint64_t i = 1;
-    assert(write(notify_fd, &i, sizeof(i)) == sizeof(i));
-  }
-  void pass_wc(std::vector<ibv_wc> &&v) {
-    Mutex::Locker l(lock);
-    if (wc.empty())
-      wc = std::move(v);
-    else
-      wc.insert(wc.end(), v.begin(), v.end());
-    notify();
-  }
-  void get_wc(std::vector<ibv_wc> &w) {
-    Mutex::Locker l(lock);
-    if (wc.empty())
-      return ;
-    w.swap(wc);
-  }
+  virtual ~RDMAWorker();
+  void notify();
+  void pass_wc(std::vector<ibv_wc> &&v);
+  void get_wc(std::vector<ibv_wc> &w);
   virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override;
   virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
   virtual void initialize() override;
-  RDMAStack *get_stack() {
-    return stack;
-  }
+  RDMAStack *get_stack() { return stack; }
   int reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes);
   int post_tx_buffer(std::vector<Chunk*> &chunks);
-  void add_pending_conn(RDMAConnectedSocketImpl* o) {
-    pending_sent_conns.push_back(o);
-    if (!pended) {
-      dispatcher->pending_buffers(this);
-      pended = true;
-    }
-  }
-  void remove_pending_conn(RDMAConnectedSocketImpl *o) {
-    pending_sent_conns.remove(o);
-  }
+  void add_pending_conn(RDMAConnectedSocketImpl* o);
+  void remove_pending_conn(RDMAConnectedSocketImpl *o) { pending_sent_conns.remove(o); }
   void handle_tx_event();
-  void set_ib(Infiniband* ib) {
-    infiniband = ib;
-  }
-  void set_stack(RDMAStack *s) {
-    stack = s;
-  }
+  void set_ib(Infiniband* ib) { infiniband = ib; }
+  void set_stack(RDMAStack *s) { stack = s; }
 };
 
 class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
@@ -273,109 +179,35 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
   bool active;// qp is active ?
   bool detached;
 
-  void notify() {
-    uint64_t i = 1;
-    assert(write(notify_fd, &i, sizeof(i)) == sizeof(i));
-  }
+  void notify();
   ssize_t read_buffers(char* buf, size_t len);
   int post_work_request(std::vector<Chunk*>&);
 
  public:
   RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
-                          RDMAWorker *w)
-    : cct(cct), connected(0), error(0), infiniband(ib),
-      dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"),
-      is_server(false), con_handler(new C_handle_connection(this)),
-      active(false), detached(false) {
-    qp = infiniband->create_queue_pair(
-        cct, s->get_rx_cq(), s->get_rx_cq(), IBV_QPT_RC);
-    my_msg.qpn = qp->get_local_qp_number();
-    my_msg.psn = qp->get_initial_psn();
-    my_msg.lid = infiniband->get_lid();
-    my_msg.peer_qpn = 0;
-    my_msg.gid = infiniband->get_gid();
-    notify_fd = dispatcher->register_qp(qp, this);
-  }
-
-  virtual ~RDMAConnectedSocketImpl() {
-    worker->remove_pending_conn(this);
-    dispatcher->erase_qpn(my_msg.qpn);
-    cleanup();
-    if (notify_fd >= 0)
-      ::close(notify_fd);
-    if (tcp_fd >= 0)
-      ::close(tcp_fd);
-    error = ECONNRESET;
-    Mutex::Locker l(lock);
-    for (unsigned i=0; i < wc.size(); ++i)
-      infiniband->recall_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
-    for (unsigned i=0; i < buffers.size(); ++i)
-      infiniband->recall_chunk(buffers[i]);
-  }
-
-  void pass_wc(std::vector<ibv_wc> &&v) {
-    Mutex::Locker l(lock);
-    if (wc.empty())
-      wc = std::move(v);
-    else
-      wc.insert(wc.end(), v.begin(), v.end());
-    notify();
-  }
+                          RDMAWorker *w);
+  virtual ~RDMAConnectedSocketImpl();
 
-  void get_wc(std::vector<ibv_wc> &w) {
-    Mutex::Locker l(lock);
-    if (wc.empty())
-      return ;
-    w.swap(wc);
-  }
-
-  virtual int is_connected() override {
-    return connected;
-  }
+  void pass_wc(std::vector<ibv_wc> &&v);
+  void get_wc(std::vector<ibv_wc> &w);
+  virtual int is_connected() override { return connected; }
 
   virtual ssize_t read(char* buf, size_t len) override;
   virtual ssize_t zero_copy_read(bufferptr &data) override;
   virtual ssize_t send(bufferlist &bl, bool more) override;
-  virtual void shutdown() override {
-    if (!error)
-      fin();
-    error = ECONNRESET;
-    active = false;
-  }
-  virtual void close() override {
-    if (!error)
-      fin();
-    error = ECONNRESET;
-    active = false;
-  }
-  virtual int fd() const override {
-    return notify_fd;
-  }
-  void fault() {
-    /*if (qp) {
-      qp->to_dead();
-      qp = NULL;
-    }*/
-    error = ECONNRESET;
-    connected = 1;
-    notify();
-  }
-  const char* get_qp_state() {
-    return Infiniband::qp_state_string(qp->get_state());
-  }
+  virtual void shutdown() override;
+  virtual void close() override;
+  virtual int fd() const override { return notify_fd; }
+  void fault();
+  const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); }
   ssize_t submit(bool more);
   int activate();
   void fin();
   void handle_connection();
   void cleanup();
-  void set_accept_fd(int sd) {
-    tcp_fd = sd;
-    is_server = true;
-    worker->center.submit_to(worker->center.get_id(), [this]() {
-      worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
-    }, true);
-  }
+  void set_accept_fd(int sd);
   int try_connect(const entity_addr_t&, const SocketOptions &opt);
+
   class C_handle_connection : public EventCallback {
     RDMAConnectedSocketImpl *csi;
     bool active;
@@ -401,17 +233,12 @@ class RDMAServerSocketImpl : public ServerSocketImpl {
   entity_addr_t sa;
 
  public:
-  RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
-    : cct(cct), net(cct), server_setup_socket(-1), infiniband(i), dispatcher(s), worker(w), sa(a) {}
+  RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);
+
   int listen(entity_addr_t &sa, const SocketOptions &opt);
   virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
-  virtual void abort_accept() override {
-    if (server_setup_socket >= 0)
-      ::close(server_setup_socket);
-  }
-  virtual int fd() const override {
-    return server_setup_socket;
-  }
+  virtual void abort_accept() override;
+  virtual int fd() const override { return server_setup_socket; }
   int get_fd() { return server_setup_socket; }
 };
 
@@ -421,20 +248,12 @@ class RDMAStack : public NetworkStack {
 
  public:
   explicit RDMAStack(CephContext *cct, const string &t);
-  virtual ~RDMAStack() {
-    delete dispatcher;
-  }
+  virtual ~RDMAStack();
   virtual bool support_zero_copy_read() const override { return false; }
   virtual bool nonblock_connect_need_writable_event() const { return false; }
 
-  virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override {
-    threads.resize(i+1);
-    threads[i] = std::move(std::thread(func));
-  }
-  virtual void join_worker(unsigned i) override {
-    assert(threads.size() > i && threads[i].joinable());
-    threads[i].join();
-  }
+  virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
+  virtual void join_worker(unsigned i) override;
   RDMADispatcher *get_dispatcher() { return dispatcher; }
 };