1. Don't use bare pointer to manage RDMADispatcher obj.
2. access RDMADispatcher obj directly instead of accessing it
from RDMAStack. This could avoid caching RDMAStack obj in
RDMAWorker & RDMADispatcher.
Signed-off-by: Changcheng Liu <changcheng.liu@aliyun.com>
#undef dout_prefix
#define dout_prefix *_dout << " RDMAConnectedSocketImpl "
-RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband> &ib, RDMADispatcher* s,
- RDMAWorker *w)
+RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband> &ib,
+ shared_ptr<RDMADispatcher>& rdma_dispatcher,
+ RDMAWorker *w)
: cct(cct), connected(0), error(0), ib(ib),
- dispatcher(s), worker(w),
+ dispatcher(rdma_dispatcher), worker(w),
is_server(false), con_handler(new C_handle_connection(this)),
active(false), pending(false)
{
if (!cct->_conf->ms_async_rdma_cm) {
- qp = ib->create_queue_pair(cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC, NULL);
+ qp = ib->create_queue_pair(cct, dispatcher->get_tx_cq(), dispatcher->get_rx_cq(), IBV_QPT_RC, NULL);
my_msg.qpn = qp->get_local_qp_number();
my_msg.psn = qp->get_initial_psn();
my_msg.lid = ib->get_lid();
#define TIMEOUT_MS 3000
#define RETRY_COUNT 7
-RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher* s,
- RDMAWorker *w, RDMACMInfo *info)
- : RDMAConnectedSocketImpl(cct, ib, s, w), cm_con_handler(new C_handle_cm_connection(this))
+RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib,
+ shared_ptr<RDMADispatcher>& rdma_dispatcher,
+ RDMAWorker *w, RDMACMInfo *info)
+ : RDMAConnectedSocketImpl(cct, ib, rdma_dispatcher, w), cm_con_handler(new C_handle_cm_connection(this))
{
status = IDLE;
notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
RDMAIWARPServerSocketImpl::RDMAIWARPServerSocketImpl(
CephContext *cct, shared_ptr<Infiniband>& ib,
- RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a, unsigned addr_slot)
- : RDMAServerSocketImpl(cct, ib, s, w, a, addr_slot)
+ shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w,
+ entity_addr_t& a, unsigned addr_slot)
+ : RDMAServerSocketImpl(cct, ib, rdma_dispatcher, w, a, addr_slot)
{
}
#define dout_prefix *_dout << " RDMAServerSocketImpl "
RDMAServerSocketImpl::RDMAServerSocketImpl(
- CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher *s, RDMAWorker *w,
- entity_addr_t& a, unsigned slot)
+ CephContext *cct, shared_ptr<Infiniband>& ib,
+ shared_ptr<RDMADispatcher>& rdma_dispatcher,
+ RDMAWorker *w, entity_addr_t& a, unsigned slot)
: ServerSocketImpl(a.get_type(), slot),
cct(cct), net(cct), server_setup_socket(-1), ib(ib),
- dispatcher(s), worker(w), sa(a)
+ dispatcher(rdma_dispatcher), worker(w), sa(a)
{
}
void RDMAWorker::initialize()
{
- if (!dispatcher) {
- dispatcher = &stack->get_dispatcher();
- }
+ ceph_assert(dispatcher);
}
int RDMAWorker::listen(entity_addr_t &sa, unsigned addr_slot,
}
RDMAStack::RDMAStack(CephContext *cct, const string &t)
- : NetworkStack(cct, t), ib(make_shared<Infiniband>(cct)), dispatcher(cct, ib)
+ : NetworkStack(cct, t), ib(make_shared<Infiniband>(cct)),
+ rdma_dispatcher(make_shared<RDMADispatcher>(cct, ib))
{
ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
for (unsigned i = 0; i < num; ++i) {
RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
w->set_stack(this);
+ w->set_dispatcher(rdma_dispatcher);
w->set_ib(ib);
}
- ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << &dispatcher << dendl;
+ ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << rdma_dispatcher.get() << dendl;
}
RDMAStack::~RDMAStack()
shared_ptr<Infiniband> ib;
EventCallbackRef tx_handler;
std::list<RDMAConnectedSocketImpl*> pending_sent_conns;
- RDMADispatcher* dispatcher = nullptr;
+ shared_ptr<RDMADispatcher> dispatcher;
ceph::mutex lock = ceph::make_mutex("RDMAWorker::lock");
class C_handle_cq_tx : public EventCallback {
}
void handle_pending_message();
void set_stack(RDMAStack *s) { stack = s; }
+ void set_dispatcher(shared_ptr<RDMADispatcher>& dispatcher) { this->dispatcher = dispatcher; }
void set_ib(shared_ptr<Infiniband> &ib) {this->ib = ib;}
void notify_worker() {
center.dispatch_event_external(tx_handler);
int connected;
int error;
shared_ptr<Infiniband> ib;
- RDMADispatcher* dispatcher;
+ shared_ptr<RDMADispatcher> dispatcher;
RDMAWorker* worker;
std::vector<Chunk*> buffers;
int notify_fd = -1;
const decltype(std::cbegin(pending_bl.buffers()))& end);
public:
- RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher* s
- RDMAWorker *w);
+ RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib,
+ shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w);
virtual ~RDMAConnectedSocketImpl();
void pass_wc(std::vector<ibv_wc> &&v);
class RDMAIWARPConnectedSocketImpl : public RDMAConnectedSocketImpl {
public:
- RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher* s,
- RDMAWorker *w, RDMACMInfo *info = nullptr);
+ RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib,
+ shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w, RDMACMInfo *info = nullptr);
~RDMAIWARPConnectedSocketImpl();
virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override;
virtual void close() override;
NetHandler net;
int server_setup_socket;
shared_ptr<Infiniband> ib;
- RDMADispatcher *dispatcher;
+ shared_ptr<RDMADispatcher> dispatcher;
RDMAWorker *worker;
entity_addr_t sa;
public:
- RDMAServerSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher *s,
+ RDMAServerSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib,
+ shared_ptr<RDMADispatcher>& rdma_dispatcher,
RDMAWorker *w, entity_addr_t& a, unsigned slot);
virtual int listen(entity_addr_t &sa, const SocketOptions &opt);
class RDMAIWARPServerSocketImpl : public RDMAServerSocketImpl {
public:
RDMAIWARPServerSocketImpl(
- CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher *s, RDMAWorker *w,
- entity_addr_t& addr, unsigned addr_slot);
+ CephContext *cct, shared_ptr<Infiniband>& ib,
+ shared_ptr<RDMADispatcher>& rdma_dispatcher,
+ RDMAWorker* w, entity_addr_t& addr, unsigned addr_slot);
virtual int listen(entity_addr_t &sa, const SocketOptions &opt) override;
virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
virtual void abort_accept() override;
vector<std::thread> threads;
PerfCounters *perf_counter;
shared_ptr<Infiniband> ib;
- RDMADispatcher dispatcher;
+ shared_ptr<RDMADispatcher> rdma_dispatcher;
std::atomic<bool> fork_finished = {false};
virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
virtual void join_worker(unsigned i) override;
- RDMADispatcher &get_dispatcher() { return dispatcher; }
virtual bool is_ready() override { return fork_finished.load(); };
virtual void ready() override { fork_finished = true; };
};