From: Amir Vadai Date: Tue, 23 May 2017 07:33:51 +0000 (+0300) Subject: Revert "msg/async/rdma: Move resource handling to Device" X-Git-Tag: ses5-milestone6~9^2~21^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8a83ccd8340aa0a9fb0497e48f49824ce76d3fa2;p=ceph.git Revert "msg/async/rdma: Move resource handling to Device" This reverts commit 9cba3b36ef90a1f3d5d474f0fefe48e9c7ec6bad. Change-Id: I5953a9380ecd5a07e8ea107cf1fc35d0772e8f61 Issue: 995322 Signed-off-by: Amir Vadai --- diff --git a/src/msg/async/rdma/Device.cc b/src/msg/async/rdma/Device.cc index c629f2c9edfb..41b2d8982327 100644 --- a/src/msg/async/rdma/Device.cc +++ b/src/msg/async/rdma/Device.cc @@ -15,20 +15,14 @@ */ #include "Infiniband.h" -#include "RDMAStack.h" #include "Device.h" #include "common/errno.h" #include "common/debug.h" -#include - #define dout_subsys ceph_subsys_ms #undef dout_prefix #define dout_prefix *_dout << "IBDevice " -static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1; -static const uint32_t CQ_DEPTH = 30000; - Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr) { #ifdef HAVE_IBV_EXP @@ -108,9 +102,7 @@ Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt } -Device::Device(CephContext *cct, ibv_device* d) - : cct(cct), device(d), lock("ibdev_lock"), - device_attr(new ibv_device_attr), active_port(nullptr) +Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_device_attr), active_port(nullptr) { if (device == NULL) { lderr(cct) << __func__ << " device == NULL" << cpp_strerror(errno) << dendl; @@ -127,78 +119,10 @@ Device::Device(CephContext *cct, ibv_device* d) lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl; ceph_abort(); } - - tx_cc = create_comp_channel(cct); - assert(tx_cc); - - rx_cc = create_comp_channel(cct); - assert(rx_cc); - - assert(NetHandler(cct).set_nonblock(ctxt->async_fd) == 0); -} - -void Device::init() -{ - Mutex::Locker l(lock); - - if (initialized) - return; - - pd = new ProtectionDomain(cct, this); - - max_recv_wr = std::min(device_attr->max_srq_wr, (int)cct->_conf->ms_async_rdma_receive_buffers); - ldout(cct, 1) << __func__ << " assigning: " << max_recv_wr << " receive buffers" << dendl; - - max_send_wr = std::min(device_attr->max_qp_wr, (int)cct->_conf->ms_async_rdma_send_buffers); - ldout(cct, 1) << __func__ << " assigning: " << max_send_wr << " send buffers" << dendl; - - ldout(cct, 1) << __func__ << " device allow " << device_attr->max_cqe - << " completion entries" << dendl; - - memory_manager = new MemoryManager(this, pd, - cct->_conf->ms_async_rdma_enable_hugepage); - memory_manager->register_rx_tx( - cct->_conf->ms_async_rdma_buffer_size, max_recv_wr, max_send_wr); - - srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT); - post_channel_cluster(); - - tx_cq = create_comp_queue(cct, tx_cc); - assert(tx_cq); - - rx_cq = create_comp_queue(cct, rx_cc); - assert(rx_cq); - - initialized = true; - ldout(cct, 5) << __func__ << ":" << __LINE__ << " device " << *this << " is initialized" << dendl; -} - -void Device::uninit() -{ - Mutex::Locker l(lock); - - if (!initialized) - return; - - tx_cc->ack_events(); - rx_cc->ack_events(); - - initialized = false; - - delete rx_cq; - delete tx_cq; - delete rx_cc; - delete tx_cc; - - assert(ibv_destroy_srq(srq) == 0); - delete memory_manager; - delete pd; } Device::~Device() { - uninit(); - if (active_port) { delete active_port; assert(ibv_close_device(ctxt) == 0); @@ -224,141 +148,9 @@ void Device::binding_port(CephContext *cct, int port_num) { } } -/** - * Create a new QueuePair. This factory should be used in preference to - * the QueuePair constructor directly, since this lets derivatives of - * Infiniband, e.g. MockInfiniband (if it existed), - * return mocked out QueuePair derivatives. - * - * \return - * QueuePair on success or NULL if init fails - * See QueuePair::QueuePair for parameter documentation. - */ -Infiniband::QueuePair* Device::create_queue_pair(CephContext *cct, - ibv_qp_type type) -{ - Infiniband::QueuePair *qp = new QueuePair( - cct, *this, type, active_port->get_port_num(), srq, tx_cq, rx_cq, max_send_wr, max_recv_wr); - if (qp->init()) { - delete qp; - return NULL; - } - return qp; -} - -/** - * Create a shared receive queue. This basically wraps the verbs call. - * - * \param[in] max_wr - * The max number of outstanding work requests in the SRQ. - * \param[in] max_sge - * The max number of scatter elements per WR. - * \return - * A valid ibv_srq pointer, or NULL on error. - */ -ibv_srq* Device::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge) -{ - ibv_srq_init_attr sia; - memset(&sia, 0, sizeof(sia)); - sia.srq_context = ctxt; - sia.attr.max_wr = max_wr; - sia.attr.max_sge = max_sge; - return ibv_create_srq(pd->pd, &sia); -} - -Infiniband::CompletionChannel* Device::create_comp_channel(CephContext *c) -{ - Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this); - if (cc->init()) { - delete cc; - return NULL; - } - return cc; -} - -Infiniband::CompletionQueue* Device::create_comp_queue( - CephContext *cct, CompletionChannel *cc) -{ - Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue( - cct, *this, CQ_DEPTH, cc); - if (cq->init()) { - delete cq; - return NULL; - } - return cq; -} - -int Device::post_chunk(Chunk* chunk) -{ - ibv_sge isge; - isge.addr = reinterpret_cast(chunk->buffer); - isge.length = chunk->bytes; - isge.lkey = chunk->mr->lkey; - ibv_recv_wr rx_work_request; - - memset(&rx_work_request, 0, sizeof(rx_work_request)); - rx_work_request.wr_id = reinterpret_cast(chunk);// stash descriptor ptr - rx_work_request.next = NULL; - rx_work_request.sg_list = &isge; - rx_work_request.num_sge = 1; - - ibv_recv_wr *badWorkRequest; - int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest); - if (ret) - return -errno; - return 0; -} - -int Device::post_channel_cluster() -{ - vector free_chunks; - int r = memory_manager->get_channel_buffers(free_chunks, 0); - assert(r > 0); - for (vector::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) { - r = post_chunk(*iter); - assert(r == 0); - } - return 0; -} - -int Device::get_tx_buffers(std::vector &c, size_t bytes) -{ - return memory_manager->get_send_buffers(c, bytes); -} - -int Device::poll_tx_cq(int n, ibv_wc *wc) -{ - if (!initialized) - return 0; - - return tx_cq->poll_cq(n, wc); -} - -int Device::poll_rx_cq(int n, ibv_wc *wc) -{ - if (!initialized) - return 0; - - return rx_cq->poll_cq(n, wc); -} - -void Device::rearm_cqs() -{ - int ret; - - if (!initialized) - return; - - ret = tx_cq->rearm_notify(); - assert(!ret); - - ret = rx_cq->rearm_notify(); - assert(!ret); -} - DeviceList::DeviceList(CephContext *cct) - : cct(cct), device_list(ibv_get_device_list(&num)) + : device_list(ibv_get_device_list(&num)) { if (device_list == NULL || num == 0) { lderr(cct) << __func__ << " failed to get rdma device list. " << cpp_strerror(errno) << dendl; @@ -366,29 +158,13 @@ DeviceList::DeviceList(CephContext *cct) } devices = new Device*[num]; - poll_fds = new struct pollfd[2 * num]; - - for (int i = 0; i < num; ++i) { - struct pollfd *pfd = &poll_fds[i * 2]; - struct Device *d; - - d = new Device(cct, device_list[i]); - devices[i] = d; - - pfd[0].fd = d->tx_cc->get_fd(); - pfd[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; - pfd[0].revents = 0; - - pfd[1].fd = d->rx_cc->get_fd(); - pfd[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; - pfd[1].revents = 0; + for (int i = 0;i < num; ++i) { + devices[i] = new Device(cct, device_list[i]); } } DeviceList::~DeviceList() { - delete poll_fds; - for (int i=0; i < num; ++i) { delete devices[i]; } @@ -406,67 +182,3 @@ Device* DeviceList::get_device(const char* device_name) } return NULL; } - -int DeviceList::poll_tx(int num_entries, Device **d, ibv_wc *wc) -{ - int n = 0; - - for (int i = 0; i < num; i++) { - *d = devices[++last_poll_dev % num]; - - n = (*d)->poll_tx_cq(num_entries, wc); - if (n) - break; - } - - return n; -} - -int DeviceList::poll_rx(int num_entries, Device **d, ibv_wc *wc) -{ - int n = 0; - - for (int i = 0; i < num; i++) { - *d = devices[++last_poll_dev % num]; - - n = (*d)->poll_rx_cq(num_entries, wc); - if (n) - break; - } - - return n; -} - -int DeviceList::poll_blocking(bool &done) -{ - int r = 0; - while (!done && r == 0) { - r = poll(poll_fds, num * 2, 100); - if (r < 0) { - r = -errno; - lderr(cct) << __func__ << " poll failed " << r << dendl; - ceph_abort(); - } - } - - if (r <= 0) - return r; - - for (int i = 0; i < num ; i++) { - Device *d = devices[i]; - - if (d->tx_cc->get_cq_event()) - ldout(cct, 20) << __func__ << " " << *d << ": got tx cq event" << dendl; - - if (d->rx_cc->get_cq_event()) - ldout(cct, 20) << __func__ << " " << *d << ": got rx cq event" << dendl; - } - - return r; -} - -void DeviceList::rearm_notify() -{ - for (int i = 0; i < num; i++) - devices[i]->rearm_cqs(); -} diff --git a/src/msg/async/rdma/Device.h b/src/msg/async/rdma/Device.h index a19640c2baf7..9bd51a4af1cd 100644 --- a/src/msg/async/rdma/Device.h +++ b/src/msg/async/rdma/Device.h @@ -30,14 +30,6 @@ #include "msg/async/net_handler.h" #include "common/Mutex.h" -typedef Infiniband::QueuePair QueuePair; -typedef Infiniband::CompletionChannel CompletionChannel; -typedef Infiniband::CompletionQueue CompletionQueue; -typedef Infiniband::ProtectionDomain ProtectionDomain; -typedef Infiniband::MemoryManager::Cluster Cluster; -typedef Infiniband::MemoryManager::Chunk Chunk; -typedef Infiniband::MemoryManager MemoryManager; - class Port { struct ibv_context* ctxt; int port_num; @@ -57,85 +49,33 @@ class Port { class Device { - CephContext *cct; ibv_device *device; - const char *name; + const char* name; uint8_t port_cnt; - - uint32_t max_send_wr; - uint32_t max_recv_wr; - uint32_t max_sge; - - Mutex lock; // Protects from concurrent intialization of the device - bool initialized = false; - public: explicit Device(CephContext *c, ibv_device* d); ~Device(); - void init(); - void uninit(); - - const char* get_name() const { return name;} + const char* get_name() { return name;} uint16_t get_lid() { return active_port->get_lid(); } ibv_gid get_gid() { return active_port->get_gid(); } int get_gid_idx() { return active_port->get_gid_idx(); } void binding_port(CephContext *c, int port_num); - - QueuePair* create_queue_pair(CephContext *c, ibv_qp_type type); - ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge); - CompletionChannel *create_comp_channel(CephContext *c); - CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL); - int post_chunk(Chunk* chunk); - int post_channel_cluster(); - - MemoryManager* get_memory_manager() { return memory_manager; } - bool is_tx_buffer(const char* c) { return memory_manager->is_tx_buffer(c);} - bool is_rx_buffer(const char* c) { return memory_manager->is_rx_buffer(c);} - Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); } - int get_tx_buffers(std::vector &c, size_t bytes); - int poll_tx_cq(int n, ibv_wc *wc); - int poll_rx_cq(int n, ibv_wc *wc); - void rearm_cqs(); - struct ibv_context *ctxt; ibv_device_attr *device_attr; Port* active_port; - - MemoryManager* memory_manager = nullptr; - ibv_srq *srq = nullptr; - Infiniband::CompletionQueue *rx_cq = nullptr; - Infiniband::CompletionChannel *rx_cc = nullptr; - Infiniband::CompletionQueue *tx_cq = nullptr; - Infiniband::CompletionChannel *tx_cc = nullptr; - ProtectionDomain *pd = nullptr; }; -inline ostream& operator<<(ostream& out, const Device &d) -{ - return out << d.get_name(); -} - class DeviceList { - CephContext *cct; struct ibv_device ** device_list; int num; Device** devices; - - int last_poll_dev; - struct pollfd *poll_fds; - public: DeviceList(CephContext *cct); ~DeviceList(); Device* get_device(const char* device_name); - - void rearm_notify(); - int poll_tx(int n, Device **d, ibv_wc *wc); - int poll_rx(int n, Device **d, ibv_wc *wc); - int poll_blocking(bool &done); }; #endif diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc index 66b669b45d6f..83b75b2b08b5 100644 --- a/src/msg/async/rdma/Infiniband.cc +++ b/src/msg/async/rdma/Infiniband.cc @@ -15,7 +15,6 @@ */ #include "Infiniband.h" -#include "RDMAStack.h" #include "Device.h" #include "common/errno.h" @@ -25,19 +24,21 @@ #undef dout_prefix #define dout_prefix *_dout << "Infiniband " +static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1; static const uint32_t MAX_INLINE_DATA = 0; static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000"); +static const uint32_t CQ_DEPTH = 30000; Infiniband::QueuePair::QueuePair( - CephContext *c, Device &device, ibv_qp_type type, + CephContext *c, Infiniband& infiniband, ibv_qp_type type, int port, ibv_srq *srq, Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq, uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key) -: cct(c), ibdev(device), +: cct(c), infiniband(infiniband), type(type), - ctxt(ibdev.ctxt), + ctxt(infiniband.device->ctxt), ib_physical_port(port), - pd(ibdev.pd->pd), + pd(infiniband.pd->pd), srq(srq), qp(NULL), txcq(txcq), @@ -53,6 +54,7 @@ Infiniband::QueuePair::QueuePair( lderr(cct) << __func__ << " invalid queue pair type" << cpp_strerror(errno) << dendl; ceph_abort(); } + pd = infiniband.pd->pd; } int Infiniband::QueuePair::init() @@ -227,8 +229,8 @@ bool Infiniband::QueuePair::is_error() const } -Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Device &ibdev) - : cct(c), ibdev(ibdev), channel(NULL), cq(NULL), cq_events_that_need_ack(0) +Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Infiniband &ib) + : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0) { } @@ -245,7 +247,7 @@ Infiniband::CompletionChannel::~CompletionChannel() int Infiniband::CompletionChannel::init() { ldout(cct, 20) << __func__ << " started." << dendl; - channel = ibv_create_comp_channel(ibdev.ctxt); + channel = ibv_create_comp_channel(infiniband.device->ctxt); if (!channel) { lderr(cct) << __func__ << " failed to create receive completion channel: " << cpp_strerror(errno) << dendl; @@ -301,7 +303,7 @@ Infiniband::CompletionQueue::~CompletionQueue() int Infiniband::CompletionQueue::init() { - cq = ibv_create_cq(ibdev.ctxt, queue_depth, this, channel->get_channel(), 0); + cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0); if (!cq) { lderr(cct) << __func__ << " failed to create receive completion queue: " << cpp_strerror(errno) << dendl; @@ -439,7 +441,7 @@ void Infiniband::MemoryManager::Chunk::clear() void Infiniband::MemoryManager::Chunk::post_srq(Infiniband *ib) { - ib->device->post_chunk(this); + ib->post_chunk(this); } Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s) @@ -594,27 +596,147 @@ Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t : device_list(new DeviceList(cct)) { device = device_list->get_device(device_name.c_str()); - - device->init(); - device->binding_port(cct, port_num); assert(device); ib_physical_port = device->active_port->get_port_num(); + pd = new ProtectionDomain(cct, device); + assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0); + + max_recv_wr = device->device_attr->max_srq_wr; + if (max_recv_wr > cct->_conf->ms_async_rdma_receive_buffers) { + max_recv_wr = cct->_conf->ms_async_rdma_receive_buffers; + ldout(cct, 1) << __func__ << " assigning: " << max_recv_wr << " receive buffers" << dendl; + } else { + ldout(cct, 1) << __func__ << " using the max allowed receive buffers: " << max_recv_wr << dendl; + } + + max_send_wr = device->device_attr->max_qp_wr; + if (max_send_wr > cct->_conf->ms_async_rdma_send_buffers) { + max_send_wr = cct->_conf->ms_async_rdma_send_buffers; + ldout(cct, 1) << __func__ << " assigning: " << max_send_wr << " send buffers" << dendl; + } else { + ldout(cct, 1) << __func__ << " using the max allowed send buffers: " << max_send_wr << dendl; + } + + ldout(cct, 1) << __func__ << " device allow " << device->device_attr->max_cqe + << " completion entries" << dendl; + + memory_manager = new MemoryManager(device, pd, + cct->_conf->ms_async_rdma_enable_hugepage); + memory_manager->register_rx_tx( + cct->_conf->ms_async_rdma_buffer_size, max_recv_wr, max_send_wr); + + srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT); + post_channel_cluster(); } Infiniband::~Infiniband() { - if (dispatcher) - dispatcher->polling_stop(); - + assert(ibv_destroy_srq(srq) == 0); + delete memory_manager; + delete pd; delete device_list; } -void Infiniband::set_dispatcher(RDMADispatcher *d) +/** + * Create a shared receive queue. This basically wraps the verbs call. + * + * \param[in] max_wr + * The max number of outstanding work requests in the SRQ. + * \param[in] max_sge + * The max number of scatter elements per WR. + * \return + * A valid ibv_srq pointer, or NULL on error. + */ +ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge) { - assert(!d ^ !dispatcher); + ibv_srq_init_attr sia; + memset(&sia, 0, sizeof(sia)); + sia.srq_context = device->ctxt; + sia.attr.max_wr = max_wr; + sia.attr.max_sge = max_sge; + return ibv_create_srq(pd->pd, &sia); +} - dispatcher = d; +int Infiniband::get_tx_buffers(std::vector &c, size_t bytes) +{ + return memory_manager->get_send_buffers(c, bytes); +} + +/** + * Create a new QueuePair. This factory should be used in preference to + * the QueuePair constructor directly, since this lets derivatives of + * Infiniband, e.g. MockInfiniband (if it existed), + * return mocked out QueuePair derivatives. + * + * \return + * QueuePair on success or NULL if init fails + * See QueuePair::QueuePair for parameter documentation. + */ +Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx, CompletionQueue* rx, ibv_qp_type type) +{ + Infiniband::QueuePair *qp = new QueuePair( + cct, *this, type, ib_physical_port, srq, tx, rx, max_send_wr, max_recv_wr); + if (qp->init()) { + delete qp; + return NULL; + } + return qp; +} + +int Infiniband::post_chunk(Chunk* chunk) +{ + ibv_sge isge; + isge.addr = reinterpret_cast(chunk->buffer); + isge.length = chunk->bytes; + isge.lkey = chunk->mr->lkey; + ibv_recv_wr rx_work_request; + + memset(&rx_work_request, 0, sizeof(rx_work_request)); + rx_work_request.wr_id = reinterpret_cast(chunk);// stash descriptor ptr + rx_work_request.next = NULL; + rx_work_request.sg_list = &isge; + rx_work_request.num_sge = 1; + + ibv_recv_wr *badWorkRequest; + int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest); + if (ret) + return -errno; + return 0; +} + +int Infiniband::post_channel_cluster() +{ + vector free_chunks; + int r = memory_manager->get_channel_buffers(free_chunks, 0); + assert(r > 0); + for (vector::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) { + r = post_chunk(*iter); + assert(r == 0); + } + return 0; +} + +Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c) +{ + Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this); + if (cc->init()) { + delete cc; + return NULL; + } + return cc; +} + +Infiniband::CompletionQueue* Infiniband::create_comp_queue( + CephContext *cct, CompletionChannel *cc) +{ + Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue( + cct, *this, CQ_DEPTH, cc); + if (cq->init()) { + delete cq; + return NULL; + } + return cq; } // 1 means no valid buffer read, 0 means got enough buffer @@ -765,33 +887,3 @@ const char* Infiniband::qp_state_string(int status) { default: return " out of range."; } } - -void Infiniband::handle_pre_fork() -{ - device->uninit(); -} - -void Infiniband::handle_post_fork() -{ - device->init(); -} - -int Infiniband::poll_tx(int n, Device **d, ibv_wc *wc) -{ - return device_list->poll_tx(n, d, wc); -} - -int Infiniband::poll_rx(int n, Device **d, ibv_wc *wc) -{ - return device_list->poll_rx(n, d, wc); -} - -int Infiniband::poll_blocking(bool &done) -{ - return device_list->poll_blocking(done); -} - -void Infiniband::rearm_notify() -{ - device_list->rearm_notify(); -} diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h index a3da3edd4c05..875b72c3c7b1 100644 --- a/src/msg/async/rdma/Infiniband.h +++ b/src/msg/async/rdma/Infiniband.h @@ -47,7 +47,6 @@ class CephContext; class Port; class Device; class DeviceList; -class RDMADispatcher; class Infiniband { public: @@ -142,11 +141,15 @@ class Infiniband { }; private: + uint32_t max_send_wr; + uint32_t max_recv_wr; + uint32_t max_sge; uint8_t ib_physical_port; + MemoryManager* memory_manager; + ibv_srq* srq; // shared receive work queue Device *device; + ProtectionDomain *pd; DeviceList *device_list; - RDMADispatcher *dispatcher = nullptr; - void wire_gid_to_gid(const char *wgid, union ibv_gid *gid); void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]); @@ -154,18 +157,16 @@ class Infiniband { explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p); ~Infiniband(); - void set_dispatcher(RDMADispatcher *d); - class CompletionChannel { static const uint32_t MAX_ACK_EVENT = 5000; CephContext *cct; - Device &ibdev; + Infiniband& infiniband; ibv_comp_channel *channel; ibv_cq *cq; uint32_t cq_events_that_need_ack; public: - CompletionChannel(CephContext *c, Device &ibdev); + CompletionChannel(CephContext *c, Infiniband &ib); ~CompletionChannel(); int init(); bool get_cq_event(); @@ -181,9 +182,9 @@ class Infiniband { // You need to call init and it will create a cq and associate to comp channel class CompletionQueue { public: - CompletionQueue(CephContext *c, Device &ibdev, + CompletionQueue(CephContext *c, Infiniband &ib, const uint32_t qd, CompletionChannel *cc) - : cct(c), ibdev(ibdev), channel(cc), cq(NULL), queue_depth(qd) {} + : cct(c), infiniband(ib), channel(cc), cq(NULL), queue_depth(qd) {} ~CompletionQueue(); int init(); int poll_cq(int num_entries, ibv_wc *ret_wc_array); @@ -193,7 +194,7 @@ class Infiniband { CompletionChannel* get_cc() const { return channel; } private: CephContext *cct; - Device &ibdev; + Infiniband& infiniband; // Infiniband to which this QP belongs CompletionChannel *channel; ibv_cq *cq; uint32_t queue_depth; @@ -207,7 +208,7 @@ class Infiniband { // must call plumb() to bring the queue pair to the RTS state. class QueuePair { public: - QueuePair(CephContext *c, Device &device, ibv_qp_type type, + QueuePair(CephContext *c, Infiniband& infiniband, ibv_qp_type type, int ib_physical_port, ibv_srq *srq, Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq, @@ -254,7 +255,7 @@ class Infiniband { private: CephContext *cct; - Device &ibdev; // Infiniband to which this QP belongs + Infiniband& infiniband; // Infiniband to which this QP belongs ibv_qp_type type; // QP type (IBV_QPT_RC, etc.) ibv_context* ctxt; // device context of the HCA to use int ib_physical_port; @@ -273,20 +274,23 @@ class Infiniband { public: typedef MemoryManager::Cluster Cluster; typedef MemoryManager::Chunk Chunk; + QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*, ibv_qp_type type); + ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge); + int post_chunk(Chunk* chunk); + int post_channel_cluster(); + int get_tx_buffers(std::vector &c, size_t bytes); + CompletionChannel *create_comp_channel(CephContext *c); + CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL); uint8_t get_ib_physical_port() { return ib_physical_port; } int send_msg(CephContext *cct, int sd, IBSYNMsg& msg); int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg); + MemoryManager* get_memory_manager() { return memory_manager; } Device* get_device() { return device; } + bool is_tx_buffer(const char* c) { return memory_manager->is_tx_buffer(c);} + bool is_rx_buffer(const char* c) { return memory_manager->is_rx_buffer(c);} + Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); } static const char* wc_status_to_string(int status); static const char* qp_state_string(int status); - - void handle_pre_fork(); - void handle_post_fork(); - - int poll_tx(int n, Device **d, ibv_wc *wc); - int poll_rx(int n, Device **d, ibv_wc *wc); - int poll_blocking(bool &done); - void rearm_notify(); }; #endif diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc index 154df978e92e..8df72d9f6421 100644 --- a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -31,7 +31,8 @@ RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* i ibdev = ib->get_device(); ibport = ib->get_ib_physical_port(); - qp = ibdev->create_queue_pair(cct, IBV_QPT_RC); + qp = infiniband->create_queue_pair( + cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC); my_msg.qpn = qp->get_local_qp_number(); my_msg.psn = qp->get_initial_psn(); my_msg.lid = ibdev->get_lid(); @@ -56,12 +57,12 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl() error = ECONNRESET; int ret = 0; for (unsigned i=0; i < wc.size(); ++i) { - ret = ibdev->post_chunk(reinterpret_cast(wc[i].wr_id)); + ret = infiniband->post_chunk(reinterpret_cast(wc[i].wr_id)); assert(ret == 0); dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); } for (unsigned i=0; i < buffers.size(); ++i) { - ret = ibdev->post_chunk(buffers[i]); + ret = infiniband->post_chunk(buffers[i]); assert(ret == 0); dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); } @@ -283,7 +284,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) error = ECONNRESET; ldout(cct, 20) << __func__ << " got remote close msg..." << dendl; } - assert(ibdev->post_chunk(chunk) == 0); + assert(infiniband->post_chunk(chunk) == 0); dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); } else { if (read == (ssize_t)len) { @@ -295,7 +296,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl; } else { read += chunk->read(buf+read, response->byte_len); - assert(ibdev->post_chunk(chunk) == 0); + assert(infiniband->post_chunk(chunk) == 0); dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); } } @@ -323,7 +324,7 @@ ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len) read += tmp; ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl; if ((*c)->over()) { - assert(ibdev->post_chunk(*c) == 0); + assert(infiniband->post_chunk(*c) == 0); dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); ldout(cct, 25) << __func__ << " one chunk over." << dendl; } @@ -462,7 +463,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more) unsigned total = 0; unsigned need_reserve_bytes = 0; while (it != pending_bl.buffers().end()) { - if (ibdev->is_tx_buffer(it->raw_c_str())) { + if (infiniband->is_tx_buffer(it->raw_c_str())) { if (need_reserve_bytes) { unsigned copied = fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it); total += copied; @@ -471,7 +472,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more) need_reserve_bytes = 0; } assert(copy_it == it); - tx_buffers.push_back(ibdev->get_tx_chunk_by_buffer(it->raw_c_str())); + tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str())); total += it->length(); ++copy_it; } else { diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index 40d9be96b60c..3c4bad0c4d09 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -14,6 +14,7 @@ * */ +#include #include #include @@ -31,22 +32,37 @@ static Tub global_infiniband; RDMADispatcher::~RDMADispatcher() { - polling_stop(); - + done = true; + t.join(); ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl; - global_infiniband->set_dispatcher(nullptr); - assert(qp_conns.empty()); assert(num_qp_conn == 0); assert(dead_queue_pairs.empty()); assert(num_dead_queue_pair == 0); + + tx_cc->ack_events(); + rx_cc->ack_events(); + delete tx_cq; + delete rx_cq; + delete tx_cc; + delete rx_cc; + delete async_handler; } RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s) : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"), w_lock("RDMADispatcher::for worker pending list"), stack(s) { + tx_cc = global_infiniband->create_comp_channel(c); + assert(tx_cc); + rx_cc = global_infiniband->create_comp_channel(c); + assert(rx_cc); + tx_cq = global_infiniband->create_comp_queue(c, tx_cc); + assert(tx_cq); + rx_cq = global_infiniband->create_comp_queue(c, rx_cc); + assert(rx_cq); + PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last); plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling"); @@ -74,21 +90,8 @@ RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s) perf_logger = plb.create_perf_counters(); cct->get_perfcounters_collection()->add(perf_logger); - cct->register_fork_watcher(this); -} - -void RDMADispatcher::polling_start() -{ t = std::thread(&RDMADispatcher::polling, this); -} - -void RDMADispatcher::polling_stop() -{ - if (!t.joinable()) - return; - - done = true; - t.join(); + cct->register_fork_watcher(this); } void RDMADispatcher::handle_async_event() @@ -135,24 +138,23 @@ void RDMADispatcher::polling() std::map > polled; std::vector tx_cqe; + ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl; RDMAConnectedSocketImpl *conn = nullptr; utime_t last_inactive = ceph_clock_now(); bool rearmed = false; int r = 0; while (true) { - Device *ibdev; - - int tx_ret = global_infiniband->poll_tx(MAX_COMPLETIONS, &ibdev, wc); + int tx_ret = tx_cq->poll_cq(MAX_COMPLETIONS, wc); if (tx_ret > 0) { ldout(cct, 20) << __func__ << " tx completion queue got " << tx_ret << " responses."<< dendl; - handle_tx_event(ibdev, wc, tx_ret); + handle_tx_event(wc, tx_ret); } - int rx_ret = global_infiniband->poll_rx(MAX_COMPLETIONS, &ibdev, wc); + int rx_ret = rx_cq->poll_cq(MAX_COMPLETIONS, wc); if (rx_ret > 0) { - ldout(cct, 20) << __func__ << " rx completion queue got " << rx_ret + ldout(cct, 20) << __func__ << " rt completion queue got " << rx_ret << " responses."<< dendl; perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret); @@ -167,8 +169,8 @@ void RDMADispatcher::polling() if (response->status == IBV_WC_SUCCESS) { conn = get_conn_lockless(response->qp_num); if (!conn) { - assert(ibdev->is_rx_buffer(chunk->buffer)); - r = ibdev->post_chunk(chunk); + assert(global_infiniband->is_rx_buffer(chunk->buffer)); + r = global_infiniband->post_chunk(chunk); ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl; assert(r == 0); } else { @@ -178,9 +180,9 @@ void RDMADispatcher::polling() perf_logger->inc(l_msgr_rdma_rx_total_wc_errors); ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk << ") status(" << response->status << ":" - << Infiniband::wc_status_to_string(response->status) << ")" << dendl; - assert(ibdev->is_rx_buffer(chunk->buffer)); - r = ibdev->post_chunk(chunk); + << global_infiniband->wc_status_to_string(response->status) << ")" << dendl; + assert(global_infiniband->is_rx_buffer(chunk->buffer)); + r = global_infiniband->post_chunk(chunk); if (r) { ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(r) << dendl; assert(r == 0); @@ -224,17 +226,33 @@ void RDMADispatcher::polling() if (!rearmed) { // Clean up cq events after rearm notify ensure no new incoming event // arrived between polling and rearm - global_infiniband->rearm_notify(); + tx_cq->rearm_notify(); + rx_cq->rearm_notify(); rearmed = true; continue; } + struct pollfd channel_poll[2]; + channel_poll[0].fd = tx_cc->get_fd(); + channel_poll[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; + channel_poll[0].revents = 0; + channel_poll[1].fd = rx_cc->get_fd(); + channel_poll[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; + channel_poll[1].revents = 0; + r = 0; perf_logger->set(l_msgr_rdma_polling, 0); - - r = global_infiniband->poll_blocking(done); - if (r > 0) - ldout(cct, 20) << __func__ << " got a cq event." << dendl; - + while (!done && r == 0) { + r = poll(channel_poll, 2, 100); + if (r < 0) { + r = -errno; + lderr(cct) << __func__ << " poll failed " << r << dendl; + ceph_abort(); + } + } + if (r > 0 && tx_cc->get_cq_event()) + ldout(cct, 20) << __func__ << " got tx cq event." << dendl; + if (r > 0 && rx_cc->get_cq_event()) + ldout(cct, 20) << __func__ << " got rx cq event." << dendl; last_inactive = ceph_clock_now(); perf_logger->set(l_msgr_rdma_polling, 1); rearmed = false; @@ -299,28 +317,39 @@ void RDMADispatcher::erase_qpn(uint32_t qpn) void RDMADispatcher::handle_pre_fork() { - polling_stop(); + done = true; + t.join(); done = false; - global_infiniband->handle_pre_fork(); + tx_cc->ack_events(); + rx_cc->ack_events(); + delete tx_cq; + delete rx_cq; + delete tx_cc; + delete rx_cc; global_infiniband.destroy(); } void RDMADispatcher::handle_post_fork() { - if (!global_infiniband) { + if (!global_infiniband) global_infiniband.construct( cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num); - global_infiniband->set_dispatcher(this); - } - global_infiniband->handle_post_fork(); + tx_cc = global_infiniband->create_comp_channel(cct); + assert(tx_cc); + rx_cc = global_infiniband->create_comp_channel(cct); + assert(rx_cc); + tx_cq = global_infiniband->create_comp_queue(cct, tx_cc); + assert(tx_cq); + rx_cq = global_infiniband->create_comp_queue(cct, rx_cc); + assert(rx_cq); - polling_start(); + t = std::thread(&RDMADispatcher::polling, this); } -void RDMADispatcher::handle_tx_event(Device *ibdev, ibv_wc *cqe, int n) +void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) { std::vector tx_chunks; @@ -359,14 +388,14 @@ void RDMADispatcher::handle_tx_event(Device *ibdev, ibv_wc *cqe, int n) } // FIXME: why not tx? - if (ibdev->get_memory_manager()->is_tx_buffer(chunk->buffer)) + if (global_infiniband->get_memory_manager()->is_tx_buffer(chunk->buffer)) tx_chunks.push_back(chunk); else ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl; } perf_logger->inc(l_msgr_rdma_tx_total_wc, n); - post_tx_buffer(ibdev, tx_chunks); + post_tx_buffer(tx_chunks); } /** @@ -377,13 +406,13 @@ void RDMADispatcher::handle_tx_event(Device *ibdev, ibv_wc *cqe, int n) * \return * 0 if success or -1 for failure */ -void RDMADispatcher::post_tx_buffer(Device *ibdev, std::vector &chunks) +void RDMADispatcher::post_tx_buffer(std::vector &chunks) { if (chunks.empty()) return ; inflight -= chunks.size(); - ibdev->get_memory_manager()->return_tx(chunks); + global_infiniband->get_memory_manager()->return_tx(chunks); ldout(cct, 30) << __func__ << " release " << chunks.size() << " chunks, inflight " << inflight << dendl; notify_pending_workers(); @@ -455,12 +484,10 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector &c, size_t bytes) { - Device *ibdev = o->get_device(); - assert(center.in_thread()); - int r = ibdev->get_tx_buffers(c, bytes); + int r = global_infiniband->get_tx_buffers(c, bytes); assert(r >= 0); - size_t got = ibdev->get_memory_manager()->get_tx_buffer_size() * r; + size_t got = global_infiniband->get_memory_manager()->get_tx_buffer_size() * r; ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl; stack->get_dispatcher()->inflight += r; if (got == bytes) @@ -524,9 +551,6 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t) cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num); ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl; dispatcher = new RDMADispatcher(cct, this); - global_infiniband->set_dispatcher(dispatcher); - dispatcher->polling_start(); - unsigned num = get_num_worker(); for (unsigned i = 0; i < num; ++i) { RDMAWorker* w = dynamic_cast(get_worker(i)); diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index 733650dc9727..154056cf4072 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -68,6 +68,9 @@ class RDMADispatcher : public CephContext::ForkWatcher { std::thread t; CephContext *cct; + Infiniband::CompletionQueue* tx_cq; + Infiniband::CompletionQueue* rx_cq; + Infiniband::CompletionChannel *tx_cc, *rx_cc; EventCallbackRef async_handler; bool done = false; std::atomic num_dead_queue_pair = {0}; @@ -116,13 +119,8 @@ class RDMADispatcher : public CephContext::ForkWatcher { explicit RDMADispatcher(CephContext* c, RDMAStack* s); virtual ~RDMADispatcher(); - void handle_async_event(); - - void polling_start(); - void polling_stop(); void polling(); - int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi); void make_pending_worker(RDMAWorker* w) { Mutex::Locker l(w_lock); @@ -135,11 +133,13 @@ class RDMADispatcher : public CephContext::ForkWatcher { RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp); void erase_qpn_lockless(uint32_t qpn); void erase_qpn(uint32_t qpn); + Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; } + Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; } void notify_pending_workers(); virtual void handle_pre_fork() override; virtual void handle_post_fork() override; - void handle_tx_event(Device *ibdev, ibv_wc *cqe, int n); - void post_tx_buffer(Device *ibdev, std::vector &chunks); + void handle_tx_event(ibv_wc *cqe, int n); + void post_tx_buffer(std::vector &chunks); std::atomic inflight = {0}; }; @@ -240,8 +240,6 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl { RDMAWorker *w); virtual ~RDMAConnectedSocketImpl(); - Device *get_device() { return ibdev; } - void pass_wc(std::vector &&v); void get_wc(std::vector &w); virtual int is_connected() override { return connected; }