From 44a1820da8ea8c80c8bf3d7701b3f28c9d1a8131 Mon Sep 17 00:00:00 2001 From: Changcheng Liu Date: Wed, 7 Aug 2019 15:08:15 +0800 Subject: [PATCH] msg/async/rdma: use shared_ptr to manage RDMADispatcher obj 1. Don't use bare pointer to manage RDMADispatcher obj. 2. access RDMADispatcher obj directly instead of accessing it from RDMAStack. This could avoid caching RDMAStack obj in RDMAWorker & RDMADispatcher. Signed-off-by: Changcheng Liu --- src/msg/async/rdma/RDMAConnectedSocketImpl.cc | 9 ++++--- .../rdma/RDMAIWARPConnectedSocketImpl.cc | 7 ++--- .../async/rdma/RDMAIWARPServerSocketImpl.cc | 5 ++-- src/msg/async/rdma/RDMAServerSocketImpl.cc | 7 ++--- src/msg/async/rdma/RDMAStack.cc | 10 +++---- src/msg/async/rdma/RDMAStack.h | 26 ++++++++++--------- 6 files changed, 35 insertions(+), 29 deletions(-) diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc index d4f902de7cf..0c593b0116b 100644 --- a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -19,15 +19,16 @@ #undef dout_prefix #define dout_prefix *_dout << " RDMAConnectedSocketImpl " -RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, shared_ptr &ib, RDMADispatcher* s, - RDMAWorker *w) +RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, shared_ptr &ib, + shared_ptr& rdma_dispatcher, + RDMAWorker *w) : cct(cct), connected(0), error(0), ib(ib), - dispatcher(s), worker(w), + dispatcher(rdma_dispatcher), worker(w), is_server(false), con_handler(new C_handle_connection(this)), active(false), pending(false) { if (!cct->_conf->ms_async_rdma_cm) { - qp = ib->create_queue_pair(cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC, NULL); + qp = ib->create_queue_pair(cct, dispatcher->get_tx_cq(), dispatcher->get_rx_cq(), IBV_QPT_RC, NULL); my_msg.qpn = qp->get_local_qp_number(); my_msg.psn = qp->get_initial_psn(); my_msg.lid = ib->get_lid(); diff --git a/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc index 6505bda9cb6..21408417e18 100644 --- a/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc +++ b/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc @@ -7,9 +7,10 @@ #define TIMEOUT_MS 3000 #define RETRY_COUNT 7 -RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr& ib, RDMADispatcher* s, - RDMAWorker *w, RDMACMInfo *info) - : RDMAConnectedSocketImpl(cct, ib, s, w), cm_con_handler(new C_handle_cm_connection(this)) +RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr& ib, + shared_ptr& rdma_dispatcher, + RDMAWorker *w, RDMACMInfo *info) + : RDMAConnectedSocketImpl(cct, ib, rdma_dispatcher, w), cm_con_handler(new C_handle_cm_connection(this)) { status = IDLE; notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK); diff --git a/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc b/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc index dbbb5427af1..2b8291d35d3 100644 --- a/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc +++ b/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc @@ -9,8 +9,9 @@ RDMAIWARPServerSocketImpl::RDMAIWARPServerSocketImpl( CephContext *cct, shared_ptr& ib, - RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a, unsigned addr_slot) - : RDMAServerSocketImpl(cct, ib, s, w, a, addr_slot) + shared_ptr& rdma_dispatcher, RDMAWorker *w, + entity_addr_t& a, unsigned addr_slot) + : RDMAServerSocketImpl(cct, ib, rdma_dispatcher, w, a, addr_slot) { } diff --git a/src/msg/async/rdma/RDMAServerSocketImpl.cc b/src/msg/async/rdma/RDMAServerSocketImpl.cc index cdbed1aa114..cc85832eddd 100644 --- a/src/msg/async/rdma/RDMAServerSocketImpl.cc +++ b/src/msg/async/rdma/RDMAServerSocketImpl.cc @@ -25,11 +25,12 @@ #define dout_prefix *_dout << " RDMAServerSocketImpl " RDMAServerSocketImpl::RDMAServerSocketImpl( - CephContext *cct, shared_ptr& ib, RDMADispatcher *s, RDMAWorker *w, - entity_addr_t& a, unsigned slot) + CephContext *cct, shared_ptr& ib, + shared_ptr& rdma_dispatcher, + RDMAWorker *w, entity_addr_t& a, unsigned slot) : ServerSocketImpl(a.get_type(), slot), cct(cct), net(cct), server_setup_socket(-1), ib(ib), - dispatcher(s), worker(w), sa(a) + dispatcher(rdma_dispatcher), worker(w), sa(a) { } diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index 3a6bda44a45..6540660092e 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -560,9 +560,7 @@ RDMAWorker::~RDMAWorker() void RDMAWorker::initialize() { - if (!dispatcher) { - dispatcher = &stack->get_dispatcher(); - } + ceph_assert(dispatcher); } int RDMAWorker::listen(entity_addr_t &sa, unsigned addr_slot, @@ -655,7 +653,8 @@ void RDMAWorker::handle_pending_message() } RDMAStack::RDMAStack(CephContext *cct, const string &t) - : NetworkStack(cct, t), ib(make_shared(cct)), dispatcher(cct, ib) + : NetworkStack(cct, t), ib(make_shared(cct)), + rdma_dispatcher(make_shared(cct, ib)) { ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl; @@ -663,9 +662,10 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t) for (unsigned i = 0; i < num; ++i) { RDMAWorker* w = dynamic_cast(get_worker(i)); w->set_stack(this); + w->set_dispatcher(rdma_dispatcher); w->set_ib(ib); } - ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << &dispatcher << dendl; + ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << rdma_dispatcher.get() << dendl; } RDMAStack::~RDMAStack() diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index a92b925efe0..fc54a7faeb4 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -135,7 +135,7 @@ class RDMAWorker : public Worker { shared_ptr ib; EventCallbackRef tx_handler; std::list pending_sent_conns; - RDMADispatcher* dispatcher = nullptr; + shared_ptr dispatcher; ceph::mutex lock = ceph::make_mutex("RDMAWorker::lock"); class C_handle_cq_tx : public EventCallback { @@ -164,6 +164,7 @@ class RDMAWorker : public Worker { } void handle_pending_message(); void set_stack(RDMAStack *s) { stack = s; } + void set_dispatcher(shared_ptr& dispatcher) { this->dispatcher = dispatcher; } void set_ib(shared_ptr &ib) {this->ib = ib;} void notify_worker() { center.dispatch_event_external(tx_handler); @@ -192,7 +193,7 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl { int connected; int error; shared_ptr ib; - RDMADispatcher* dispatcher; + shared_ptr dispatcher; RDMAWorker* worker; std::vector buffers; int notify_fd = -1; @@ -216,8 +217,8 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl { const decltype(std::cbegin(pending_bl.buffers()))& end); public: - RDMAConnectedSocketImpl(CephContext *cct, shared_ptr& ib, RDMADispatcher* s - RDMAWorker *w); + RDMAConnectedSocketImpl(CephContext *cct, shared_ptr& ib, + shared_ptr& rdma_dispatcher, RDMAWorker *w); virtual ~RDMAConnectedSocketImpl(); void pass_wc(std::vector &&v); @@ -273,8 +274,8 @@ enum RDMA_CM_STATUS { class RDMAIWARPConnectedSocketImpl : public RDMAConnectedSocketImpl { public: - RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr& ib, RDMADispatcher* s, - RDMAWorker *w, RDMACMInfo *info = nullptr); + RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr& ib, + shared_ptr& rdma_dispatcher, RDMAWorker *w, RDMACMInfo *info = nullptr); ~RDMAIWARPConnectedSocketImpl(); virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override; virtual void close() override; @@ -314,12 +315,13 @@ class RDMAServerSocketImpl : public ServerSocketImpl { NetHandler net; int server_setup_socket; shared_ptr ib; - RDMADispatcher *dispatcher; + shared_ptr dispatcher; RDMAWorker *worker; entity_addr_t sa; public: - RDMAServerSocketImpl(CephContext *cct, shared_ptr& ib, RDMADispatcher *s, + RDMAServerSocketImpl(CephContext *cct, shared_ptr& ib, + shared_ptr& rdma_dispatcher, RDMAWorker *w, entity_addr_t& a, unsigned slot); virtual int listen(entity_addr_t &sa, const SocketOptions &opt); @@ -331,8 +333,9 @@ class RDMAServerSocketImpl : public ServerSocketImpl { class RDMAIWARPServerSocketImpl : public RDMAServerSocketImpl { public: RDMAIWARPServerSocketImpl( - CephContext *cct, shared_ptr& ib, RDMADispatcher *s, RDMAWorker *w, - entity_addr_t& addr, unsigned addr_slot); + CephContext *cct, shared_ptr& ib, + shared_ptr& rdma_dispatcher, + RDMAWorker* w, entity_addr_t& addr, unsigned addr_slot); virtual int listen(entity_addr_t &sa, const SocketOptions &opt) override; virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; virtual void abort_accept() override; @@ -345,7 +348,7 @@ class RDMAStack : public NetworkStack { vector threads; PerfCounters *perf_counter; shared_ptr ib; - RDMADispatcher dispatcher; + shared_ptr rdma_dispatcher; std::atomic fork_finished = {false}; @@ -357,7 +360,6 @@ class RDMAStack : public NetworkStack { virtual void spawn_worker(unsigned i, std::function &&func) override; virtual void join_worker(unsigned i) override; - RDMADispatcher &get_dispatcher() { return dispatcher; } virtual bool is_ready() override { return fork_finished.load(); }; virtual void ready() override { fork_finished = true; }; }; -- 2.39.5