]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg/async/rdma: use shared_ptr to manage RDMADispatcher obj
authorChangcheng Liu <changcheng.liu@aliyun.com>
Wed, 7 Aug 2019 07:08:15 +0000 (15:08 +0800)
committerKefu Chai <kchai@redhat.com>
Fri, 23 Aug 2019 06:36:05 +0000 (14:36 +0800)
1. Don't use bare pointer to manage RDMADispatcher obj.

2. access RDMADispatcher obj directly instead of accessing it
from RDMAStack. This could avoid caching RDMAStack obj in
RDMAWorker & RDMADispatcher.

Signed-off-by: Changcheng Liu <changcheng.liu@aliyun.com>
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc
src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc
src/msg/async/rdma/RDMAServerSocketImpl.cc
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h

index d4f902de7cf77d4a31e1b108e46cd9e1eb460d49..0c593b0116bbc16a7731adfa3343fde4167010db 100644 (file)
 #undef dout_prefix
 #define dout_prefix *_dout << " RDMAConnectedSocketImpl "
 
-RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband> &ib, RDMADispatcher* s,
-                                                RDMAWorker *w)
+RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband> &ib,
+                                                 shared_ptr<RDMADispatcher>& rdma_dispatcher,
+                                                 RDMAWorker *w)
   : cct(cct), connected(0), error(0), ib(ib),
-    dispatcher(s), worker(w),
+    dispatcher(rdma_dispatcher), worker(w),
     is_server(false), con_handler(new C_handle_connection(this)),
     active(false), pending(false)
 {
   if (!cct->_conf->ms_async_rdma_cm) {
-    qp = ib->create_queue_pair(cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC, NULL);
+    qp = ib->create_queue_pair(cct, dispatcher->get_tx_cq(), dispatcher->get_rx_cq(), IBV_QPT_RC, NULL);
     my_msg.qpn = qp->get_local_qp_number();
     my_msg.psn = qp->get_initial_psn();
     my_msg.lid = ib->get_lid();
index 6505bda9cb66ae1da0fba5955feaa75c8b025a82..21408417e18696a82aa7565be7174d8339c73ddb 100644 (file)
@@ -7,9 +7,10 @@
 #define TIMEOUT_MS 3000
 #define RETRY_COUNT 7
 
-RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher* s,
-                                                RDMAWorker *w, RDMACMInfo *info)
-  : RDMAConnectedSocketImpl(cct, ib, s, w), cm_con_handler(new C_handle_cm_connection(this))
+RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib,
+                                                           shared_ptr<RDMADispatcher>& rdma_dispatcher,
+                                                           RDMAWorker *w, RDMACMInfo *info)
+  : RDMAConnectedSocketImpl(cct, ib, rdma_dispatcher, w), cm_con_handler(new C_handle_cm_connection(this))
 {
   status = IDLE;
   notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
index dbbb5427af12be659e86ae198d951a5ff90d166b..2b8291d35d322d8737dfd3bd9b46a0a5d9fb552e 100644 (file)
@@ -9,8 +9,9 @@
 
 RDMAIWARPServerSocketImpl::RDMAIWARPServerSocketImpl(
   CephContext *cct, shared_ptr<Infiniband>& ib,
-  RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a, unsigned addr_slot)
-  : RDMAServerSocketImpl(cct, ib, s, w, a, addr_slot)
+  shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w,
+  entity_addr_t& a, unsigned addr_slot)
+  : RDMAServerSocketImpl(cct, ib, rdma_dispatcher, w, a, addr_slot)
 {
 }
 
index cdbed1aa114c98caa37438e22568eddaa3cdab33..cc85832eddda0fb3f21a57e7d69f92a1a972cfe4 100644 (file)
 #define dout_prefix *_dout << " RDMAServerSocketImpl "
 
 RDMAServerSocketImpl::RDMAServerSocketImpl(
-  CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher *s, RDMAWorker *w,
-  entity_addr_t& a, unsigned slot)
+  CephContext *cct, shared_ptr<Infiniband>& ib,
+  shared_ptr<RDMADispatcher>& rdma_dispatcher,
+  RDMAWorker *w, entity_addr_t& a, unsigned slot)
   : ServerSocketImpl(a.get_type(), slot),
     cct(cct), net(cct), server_setup_socket(-1), ib(ib),
-    dispatcher(s), worker(w), sa(a)
+    dispatcher(rdma_dispatcher), worker(w), sa(a)
 {
 }
 
index 3a6bda44a4528c34eddc8f9f257a72fa6381f149..6540660092e4d26cb66d496428615926e6c6d1f2 100644 (file)
@@ -560,9 +560,7 @@ RDMAWorker::~RDMAWorker()
 
 void RDMAWorker::initialize()
 {
-  if (!dispatcher) {
-    dispatcher = &stack->get_dispatcher();
-  }
+  ceph_assert(dispatcher);
 }
 
 int RDMAWorker::listen(entity_addr_t &sa, unsigned addr_slot,
@@ -655,7 +653,8 @@ void RDMAWorker::handle_pending_message()
 }
 
 RDMAStack::RDMAStack(CephContext *cct, const string &t)
-  : NetworkStack(cct, t), ib(make_shared<Infiniband>(cct)), dispatcher(cct, ib)
+  : NetworkStack(cct, t), ib(make_shared<Infiniband>(cct)),
+    rdma_dispatcher(make_shared<RDMADispatcher>(cct, ib))
 {
   ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
 
@@ -663,9 +662,10 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t)
   for (unsigned i = 0; i < num; ++i) {
     RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
     w->set_stack(this);
+    w->set_dispatcher(rdma_dispatcher);
     w->set_ib(ib);
   }
-  ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << &dispatcher << dendl;
+  ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << rdma_dispatcher.get() << dendl;
 }
 
 RDMAStack::~RDMAStack()
index a92b925efe0f9305b1ce6075772645d998e6ff10..fc54a7faeb489dd26aec689394572e7cbfe6e780 100644 (file)
@@ -135,7 +135,7 @@ class RDMAWorker : public Worker {
   shared_ptr<Infiniband> ib;
   EventCallbackRef tx_handler;
   std::list<RDMAConnectedSocketImpl*> pending_sent_conns;
-  RDMADispatcher* dispatcher = nullptr;
+  shared_ptr<RDMADispatcher> dispatcher;
   ceph::mutex lock = ceph::make_mutex("RDMAWorker::lock");
 
   class C_handle_cq_tx : public EventCallback {
@@ -164,6 +164,7 @@ class RDMAWorker : public Worker {
   }
   void handle_pending_message();
   void set_stack(RDMAStack *s) { stack = s; }
+  void set_dispatcher(shared_ptr<RDMADispatcher>& dispatcher) { this->dispatcher = dispatcher; }
   void set_ib(shared_ptr<Infiniband> &ib) {this->ib = ib;}
   void notify_worker() {
     center.dispatch_event_external(tx_handler);
@@ -192,7 +193,7 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
   int connected;
   int error;
   shared_ptr<Infiniband> ib;
-  RDMADispatcher* dispatcher;
+  shared_ptr<RDMADispatcher> dispatcher;
   RDMAWorker* worker;
   std::vector<Chunk*> buffers;
   int notify_fd = -1;
@@ -216,8 +217,8 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
       const decltype(std::cbegin(pending_bl.buffers()))& end);
 
  public:
-  RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher* s
-                          RDMAWorker *w);
+  RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib,
+      shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w);
   virtual ~RDMAConnectedSocketImpl();
 
   void pass_wc(std::vector<ibv_wc> &&v);
@@ -273,8 +274,8 @@ enum RDMA_CM_STATUS {
 
 class RDMAIWARPConnectedSocketImpl : public RDMAConnectedSocketImpl {
   public:
-    RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher* s,
-                          RDMAWorker *w, RDMACMInfo *info = nullptr);
+    RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib,
+        shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w, RDMACMInfo *info = nullptr);
     ~RDMAIWARPConnectedSocketImpl();
     virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override;
     virtual void close() override;
@@ -314,12 +315,13 @@ class RDMAServerSocketImpl : public ServerSocketImpl {
     NetHandler net;
     int server_setup_socket;
     shared_ptr<Infiniband> ib;
-    RDMADispatcher *dispatcher;
+    shared_ptr<RDMADispatcher> dispatcher;
     RDMAWorker *worker;
     entity_addr_t sa;
 
  public:
-  RDMAServerSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher *s,
+  RDMAServerSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib,
+                       shared_ptr<RDMADispatcher>& rdma_dispatcher,
                       RDMAWorker *w, entity_addr_t& a, unsigned slot);
 
   virtual int listen(entity_addr_t &sa, const SocketOptions &opt);
@@ -331,8 +333,9 @@ class RDMAServerSocketImpl : public ServerSocketImpl {
 class RDMAIWARPServerSocketImpl : public RDMAServerSocketImpl {
   public:
     RDMAIWARPServerSocketImpl(
-      CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher *s, RDMAWorker *w,
-      entity_addr_t& addr, unsigned addr_slot);
+      CephContext *cct, shared_ptr<Infiniband>& ib,
+      shared_ptr<RDMADispatcher>& rdma_dispatcher,
+      RDMAWorker* w, entity_addr_t& addr, unsigned addr_slot);
     virtual int listen(entity_addr_t &sa, const SocketOptions &opt) override;
     virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
     virtual void abort_accept() override;
@@ -345,7 +348,7 @@ class RDMAStack : public NetworkStack {
   vector<std::thread> threads;
   PerfCounters *perf_counter;
   shared_ptr<Infiniband> ib;
-  RDMADispatcher dispatcher;
+  shared_ptr<RDMADispatcher> rdma_dispatcher;
 
   std::atomic<bool> fork_finished = {false};
 
@@ -357,7 +360,6 @@ class RDMAStack : public NetworkStack {
 
   virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
   virtual void join_worker(unsigned i) override;
-  RDMADispatcher &get_dispatcher() { return dispatcher; }
   virtual bool is_ready() override { return fork_finished.load(); };
   virtual void ready() override { fork_finished = true; };
 };