From: Amir Vadai Date: Sun, 19 Mar 2017 14:36:48 +0000 (+0200) Subject: msg/async/rdma: Move resource handling to Device X-Git-Tag: v12.0.2~300^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9cba3b36ef90a1f3d5d474f0fefe48e9c7ec6bad;p=ceph-ci.git msg/async/rdma: Move resource handling to Device Move rx/tx completion channel and completion queue from RDMAStack into Device class Move SRQ and QP handling from Infiniband into Device class. Adapt polling() to poll on multiple devices - will be used in a later commit. On construction Device will create a completion channel. This will be done for every HCA on the server. Only the Device in use will be initialized (init() will be called) and cq, srq, MemoryManager and the rest of the resources will be allocated. This patch also introduces RDMADispatcher::poll_{start,stop} It is being used to stop the polling thread before destructing the Device resources. Issue: 995322 Change-Id: I79bfdc687ab690a46c05e271a436b33d8dba0182 Signed-off-by: Amir Vadai --- diff --git a/src/msg/async/rdma/Device.cc b/src/msg/async/rdma/Device.cc index 41b2d898232..c629f2c9edf 100644 --- a/src/msg/async/rdma/Device.cc +++ b/src/msg/async/rdma/Device.cc @@ -15,14 +15,20 @@ */ #include "Infiniband.h" +#include "RDMAStack.h" #include "Device.h" #include "common/errno.h" #include "common/debug.h" +#include + #define dout_subsys ceph_subsys_ms #undef dout_prefix #define dout_prefix *_dout << "IBDevice " +static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1; +static const uint32_t CQ_DEPTH = 30000; + Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr) { #ifdef HAVE_IBV_EXP @@ -102,7 +108,9 @@ Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt } -Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_device_attr), active_port(nullptr) +Device::Device(CephContext *cct, ibv_device* d) + : cct(cct), device(d), lock("ibdev_lock"), + device_attr(new ibv_device_attr), active_port(nullptr) { if (device == NULL) { lderr(cct) << __func__ << " device == NULL" << cpp_strerror(errno) << dendl; @@ -119,10 +127,78 @@ Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_ lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl; ceph_abort(); } + + tx_cc = create_comp_channel(cct); + assert(tx_cc); + + rx_cc = create_comp_channel(cct); + assert(rx_cc); + + assert(NetHandler(cct).set_nonblock(ctxt->async_fd) == 0); +} + +void Device::init() +{ + Mutex::Locker l(lock); + + if (initialized) + return; + + pd = new ProtectionDomain(cct, this); + + max_recv_wr = std::min(device_attr->max_srq_wr, (int)cct->_conf->ms_async_rdma_receive_buffers); + ldout(cct, 1) << __func__ << " assigning: " << max_recv_wr << " receive buffers" << dendl; + + max_send_wr = std::min(device_attr->max_qp_wr, (int)cct->_conf->ms_async_rdma_send_buffers); + ldout(cct, 1) << __func__ << " assigning: " << max_send_wr << " send buffers" << dendl; + + ldout(cct, 1) << __func__ << " device allow " << device_attr->max_cqe + << " completion entries" << dendl; + + memory_manager = new MemoryManager(this, pd, + cct->_conf->ms_async_rdma_enable_hugepage); + memory_manager->register_rx_tx( + cct->_conf->ms_async_rdma_buffer_size, max_recv_wr, max_send_wr); + + srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT); + post_channel_cluster(); + + tx_cq = create_comp_queue(cct, tx_cc); + assert(tx_cq); + + rx_cq = create_comp_queue(cct, rx_cc); + assert(rx_cq); + + initialized = true; + ldout(cct, 5) << __func__ << ":" << __LINE__ << " device " << *this << " is initialized" << dendl; +} + +void Device::uninit() +{ + Mutex::Locker l(lock); + + if (!initialized) + return; + + tx_cc->ack_events(); + rx_cc->ack_events(); + + initialized = false; + + delete rx_cq; + delete tx_cq; + delete rx_cc; + delete tx_cc; + + assert(ibv_destroy_srq(srq) == 0); + delete memory_manager; + delete pd; } Device::~Device() { + uninit(); + if (active_port) { delete active_port; assert(ibv_close_device(ctxt) == 0); @@ -148,9 +224,141 @@ void Device::binding_port(CephContext *cct, int port_num) { } } +/** + * Create a new QueuePair. This factory should be used in preference to + * the QueuePair constructor directly, since this lets derivatives of + * Infiniband, e.g. MockInfiniband (if it existed), + * return mocked out QueuePair derivatives. + * + * \return + * QueuePair on success or NULL if init fails + * See QueuePair::QueuePair for parameter documentation. + */ +Infiniband::QueuePair* Device::create_queue_pair(CephContext *cct, + ibv_qp_type type) +{ + Infiniband::QueuePair *qp = new QueuePair( + cct, *this, type, active_port->get_port_num(), srq, tx_cq, rx_cq, max_send_wr, max_recv_wr); + if (qp->init()) { + delete qp; + return NULL; + } + return qp; +} + +/** + * Create a shared receive queue. This basically wraps the verbs call. + * + * \param[in] max_wr + * The max number of outstanding work requests in the SRQ. + * \param[in] max_sge + * The max number of scatter elements per WR. + * \return + * A valid ibv_srq pointer, or NULL on error. + */ +ibv_srq* Device::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge) +{ + ibv_srq_init_attr sia; + memset(&sia, 0, sizeof(sia)); + sia.srq_context = ctxt; + sia.attr.max_wr = max_wr; + sia.attr.max_sge = max_sge; + return ibv_create_srq(pd->pd, &sia); +} + +Infiniband::CompletionChannel* Device::create_comp_channel(CephContext *c) +{ + Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this); + if (cc->init()) { + delete cc; + return NULL; + } + return cc; +} + +Infiniband::CompletionQueue* Device::create_comp_queue( + CephContext *cct, CompletionChannel *cc) +{ + Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue( + cct, *this, CQ_DEPTH, cc); + if (cq->init()) { + delete cq; + return NULL; + } + return cq; +} + +int Device::post_chunk(Chunk* chunk) +{ + ibv_sge isge; + isge.addr = reinterpret_cast(chunk->buffer); + isge.length = chunk->bytes; + isge.lkey = chunk->mr->lkey; + ibv_recv_wr rx_work_request; + + memset(&rx_work_request, 0, sizeof(rx_work_request)); + rx_work_request.wr_id = reinterpret_cast(chunk);// stash descriptor ptr + rx_work_request.next = NULL; + rx_work_request.sg_list = &isge; + rx_work_request.num_sge = 1; + + ibv_recv_wr *badWorkRequest; + int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest); + if (ret) + return -errno; + return 0; +} + +int Device::post_channel_cluster() +{ + vector free_chunks; + int r = memory_manager->get_channel_buffers(free_chunks, 0); + assert(r > 0); + for (vector::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) { + r = post_chunk(*iter); + assert(r == 0); + } + return 0; +} + +int Device::get_tx_buffers(std::vector &c, size_t bytes) +{ + return memory_manager->get_send_buffers(c, bytes); +} + +int Device::poll_tx_cq(int n, ibv_wc *wc) +{ + if (!initialized) + return 0; + + return tx_cq->poll_cq(n, wc); +} + +int Device::poll_rx_cq(int n, ibv_wc *wc) +{ + if (!initialized) + return 0; + + return rx_cq->poll_cq(n, wc); +} + +void Device::rearm_cqs() +{ + int ret; + + if (!initialized) + return; + + ret = tx_cq->rearm_notify(); + assert(!ret); + + ret = rx_cq->rearm_notify(); + assert(!ret); +} + DeviceList::DeviceList(CephContext *cct) - : device_list(ibv_get_device_list(&num)) + : cct(cct), device_list(ibv_get_device_list(&num)) { if (device_list == NULL || num == 0) { lderr(cct) << __func__ << " failed to get rdma device list. " << cpp_strerror(errno) << dendl; @@ -158,13 +366,29 @@ DeviceList::DeviceList(CephContext *cct) } devices = new Device*[num]; - for (int i = 0;i < num; ++i) { - devices[i] = new Device(cct, device_list[i]); + poll_fds = new struct pollfd[2 * num]; + + for (int i = 0; i < num; ++i) { + struct pollfd *pfd = &poll_fds[i * 2]; + struct Device *d; + + d = new Device(cct, device_list[i]); + devices[i] = d; + + pfd[0].fd = d->tx_cc->get_fd(); + pfd[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; + pfd[0].revents = 0; + + pfd[1].fd = d->rx_cc->get_fd(); + pfd[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; + pfd[1].revents = 0; } } DeviceList::~DeviceList() { + delete poll_fds; + for (int i=0; i < num; ++i) { delete devices[i]; } @@ -182,3 +406,67 @@ Device* DeviceList::get_device(const char* device_name) } return NULL; } + +int DeviceList::poll_tx(int num_entries, Device **d, ibv_wc *wc) +{ + int n = 0; + + for (int i = 0; i < num; i++) { + *d = devices[++last_poll_dev % num]; + + n = (*d)->poll_tx_cq(num_entries, wc); + if (n) + break; + } + + return n; +} + +int DeviceList::poll_rx(int num_entries, Device **d, ibv_wc *wc) +{ + int n = 0; + + for (int i = 0; i < num; i++) { + *d = devices[++last_poll_dev % num]; + + n = (*d)->poll_rx_cq(num_entries, wc); + if (n) + break; + } + + return n; +} + +int DeviceList::poll_blocking(bool &done) +{ + int r = 0; + while (!done && r == 0) { + r = poll(poll_fds, num * 2, 100); + if (r < 0) { + r = -errno; + lderr(cct) << __func__ << " poll failed " << r << dendl; + ceph_abort(); + } + } + + if (r <= 0) + return r; + + for (int i = 0; i < num ; i++) { + Device *d = devices[i]; + + if (d->tx_cc->get_cq_event()) + ldout(cct, 20) << __func__ << " " << *d << ": got tx cq event" << dendl; + + if (d->rx_cc->get_cq_event()) + ldout(cct, 20) << __func__ << " " << *d << ": got rx cq event" << dendl; + } + + return r; +} + +void DeviceList::rearm_notify() +{ + for (int i = 0; i < num; i++) + devices[i]->rearm_cqs(); +} diff --git a/src/msg/async/rdma/Device.h b/src/msg/async/rdma/Device.h index 9bd51a4af1c..a19640c2baf 100644 --- a/src/msg/async/rdma/Device.h +++ b/src/msg/async/rdma/Device.h @@ -30,6 +30,14 @@ #include "msg/async/net_handler.h" #include "common/Mutex.h" +typedef Infiniband::QueuePair QueuePair; +typedef Infiniband::CompletionChannel CompletionChannel; +typedef Infiniband::CompletionQueue CompletionQueue; +typedef Infiniband::ProtectionDomain ProtectionDomain; +typedef Infiniband::MemoryManager::Cluster Cluster; +typedef Infiniband::MemoryManager::Chunk Chunk; +typedef Infiniband::MemoryManager MemoryManager; + class Port { struct ibv_context* ctxt; int port_num; @@ -49,33 +57,85 @@ class Port { class Device { + CephContext *cct; ibv_device *device; - const char* name; + const char *name; uint8_t port_cnt; + + uint32_t max_send_wr; + uint32_t max_recv_wr; + uint32_t max_sge; + + Mutex lock; // Protects from concurrent intialization of the device + bool initialized = false; + public: explicit Device(CephContext *c, ibv_device* d); ~Device(); - const char* get_name() { return name;} + void init(); + void uninit(); + + const char* get_name() const { return name;} uint16_t get_lid() { return active_port->get_lid(); } ibv_gid get_gid() { return active_port->get_gid(); } int get_gid_idx() { return active_port->get_gid_idx(); } void binding_port(CephContext *c, int port_num); + + QueuePair* create_queue_pair(CephContext *c, ibv_qp_type type); + ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge); + CompletionChannel *create_comp_channel(CephContext *c); + CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL); + int post_chunk(Chunk* chunk); + int post_channel_cluster(); + + MemoryManager* get_memory_manager() { return memory_manager; } + bool is_tx_buffer(const char* c) { return memory_manager->is_tx_buffer(c);} + bool is_rx_buffer(const char* c) { return memory_manager->is_rx_buffer(c);} + Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); } + int get_tx_buffers(std::vector &c, size_t bytes); + int poll_tx_cq(int n, ibv_wc *wc); + int poll_rx_cq(int n, ibv_wc *wc); + void rearm_cqs(); + struct ibv_context *ctxt; ibv_device_attr *device_attr; Port* active_port; + + MemoryManager* memory_manager = nullptr; + ibv_srq *srq = nullptr; + Infiniband::CompletionQueue *rx_cq = nullptr; + Infiniband::CompletionChannel *rx_cc = nullptr; + Infiniband::CompletionQueue *tx_cq = nullptr; + Infiniband::CompletionChannel *tx_cc = nullptr; + ProtectionDomain *pd = nullptr; }; +inline ostream& operator<<(ostream& out, const Device &d) +{ + return out << d.get_name(); +} + class DeviceList { + CephContext *cct; struct ibv_device ** device_list; int num; Device** devices; + + int last_poll_dev; + struct pollfd *poll_fds; + public: DeviceList(CephContext *cct); ~DeviceList(); Device* get_device(const char* device_name); + + void rearm_notify(); + int poll_tx(int n, Device **d, ibv_wc *wc); + int poll_rx(int n, Device **d, ibv_wc *wc); + int poll_blocking(bool &done); }; #endif diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc index 74429af4938..73f36ae9587 100644 --- a/src/msg/async/rdma/Infiniband.cc +++ b/src/msg/async/rdma/Infiniband.cc @@ -15,6 +15,7 @@ */ #include "Infiniband.h" +#include "RDMAStack.h" #include "Device.h" #include "common/errno.h" @@ -24,21 +25,19 @@ #undef dout_prefix #define dout_prefix *_dout << "Infiniband " -static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1; static const uint32_t MAX_INLINE_DATA = 0; static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000"); -static const uint32_t CQ_DEPTH = 30000; Infiniband::QueuePair::QueuePair( - CephContext *c, Infiniband& infiniband, ibv_qp_type type, + CephContext *c, Device &device, ibv_qp_type type, int port, ibv_srq *srq, Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq, uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key) -: cct(c), infiniband(infiniband), +: cct(c), ibdev(device), type(type), - ctxt(infiniband.device->ctxt), + ctxt(ibdev.ctxt), ib_physical_port(port), - pd(infiniband.pd->pd), + pd(ibdev.pd->pd), srq(srq), qp(NULL), txcq(txcq), @@ -54,7 +53,6 @@ Infiniband::QueuePair::QueuePair( lderr(cct) << __func__ << " invalid queue pair type" << cpp_strerror(errno) << dendl; ceph_abort(); } - pd = infiniband.pd->pd; } int Infiniband::QueuePair::init() @@ -229,8 +227,8 @@ bool Infiniband::QueuePair::is_error() const } -Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Infiniband &ib) - : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0) +Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Device &ibdev) + : cct(c), ibdev(ibdev), channel(NULL), cq(NULL), cq_events_that_need_ack(0) { } @@ -247,7 +245,7 @@ Infiniband::CompletionChannel::~CompletionChannel() int Infiniband::CompletionChannel::init() { ldout(cct, 20) << __func__ << " started." << dendl; - channel = ibv_create_comp_channel(infiniband.device->ctxt); + channel = ibv_create_comp_channel(ibdev.ctxt); if (!channel) { lderr(cct) << __func__ << " failed to create receive completion channel: " << cpp_strerror(errno) << dendl; @@ -303,7 +301,7 @@ Infiniband::CompletionQueue::~CompletionQueue() int Infiniband::CompletionQueue::init() { - cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0); + cq = ibv_create_cq(ibdev.ctxt, queue_depth, this, channel->get_channel(), 0); if (!cq) { lderr(cct) << __func__ << " failed to create receive completion queue: " << cpp_strerror(errno) << dendl; @@ -441,7 +439,7 @@ void Infiniband::MemoryManager::Chunk::clear() void Infiniband::MemoryManager::Chunk::post_srq(Infiniband *ib) { - ib->post_chunk(this); + ib->device->post_chunk(this); } Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s) @@ -599,147 +597,27 @@ Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t : device_list(new DeviceList(cct)) { device = device_list->get_device(device_name.c_str()); + + device->init(); + device->binding_port(cct, port_num); assert(device); ib_physical_port = device->active_port->get_port_num(); - pd = new ProtectionDomain(cct, device); - assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0); - - max_recv_wr = device->device_attr->max_srq_wr; - if (max_recv_wr > cct->_conf->ms_async_rdma_receive_buffers) { - max_recv_wr = cct->_conf->ms_async_rdma_receive_buffers; - ldout(cct, 1) << __func__ << " assigning: " << max_recv_wr << " receive buffers" << dendl; - } else { - ldout(cct, 1) << __func__ << " using the max allowed receive buffers: " << max_recv_wr << dendl; - } - - max_send_wr = device->device_attr->max_qp_wr; - if (max_send_wr > cct->_conf->ms_async_rdma_send_buffers) { - max_send_wr = cct->_conf->ms_async_rdma_send_buffers; - ldout(cct, 1) << __func__ << " assigning: " << max_send_wr << " send buffers" << dendl; - } else { - ldout(cct, 1) << __func__ << " using the max allowed send buffers: " << max_send_wr << dendl; - } - - ldout(cct, 1) << __func__ << " device allow " << device->device_attr->max_cqe - << " completion entries" << dendl; - - memory_manager = new MemoryManager(device, pd, - cct->_conf->ms_async_rdma_enable_hugepage); - memory_manager->register_rx_tx( - cct->_conf->ms_async_rdma_buffer_size, max_recv_wr, max_send_wr); - - srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT); - post_channel_cluster(); } Infiniband::~Infiniband() { - assert(ibv_destroy_srq(srq) == 0); - delete memory_manager; - delete pd; - delete device_list; -} - -/** - * Create a shared receive queue. This basically wraps the verbs call. - * - * \param[in] max_wr - * The max number of outstanding work requests in the SRQ. - * \param[in] max_sge - * The max number of scatter elements per WR. - * \return - * A valid ibv_srq pointer, or NULL on error. - */ -ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge) -{ - ibv_srq_init_attr sia; - memset(&sia, 0, sizeof(sia)); - sia.srq_context = device->ctxt; - sia.attr.max_wr = max_wr; - sia.attr.max_sge = max_sge; - return ibv_create_srq(pd->pd, &sia); -} - -int Infiniband::get_tx_buffers(std::vector &c, size_t bytes) -{ - return memory_manager->get_send_buffers(c, bytes); -} - -/** - * Create a new QueuePair. This factory should be used in preference to - * the QueuePair constructor directly, since this lets derivatives of - * Infiniband, e.g. MockInfiniband (if it existed), - * return mocked out QueuePair derivatives. - * - * \return - * QueuePair on success or NULL if init fails - * See QueuePair::QueuePair for parameter documentation. - */ -Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx, CompletionQueue* rx, ibv_qp_type type) -{ - Infiniband::QueuePair *qp = new QueuePair( - cct, *this, type, ib_physical_port, srq, tx, rx, max_send_wr, max_recv_wr); - if (qp->init()) { - delete qp; - return NULL; - } - return qp; -} - -int Infiniband::post_chunk(Chunk* chunk) -{ - ibv_sge isge; - isge.addr = reinterpret_cast(chunk->buffer); - isge.length = chunk->bytes; - isge.lkey = chunk->mr->lkey; - ibv_recv_wr rx_work_request; + if (dispatcher) + dispatcher->polling_stop(); - memset(&rx_work_request, 0, sizeof(rx_work_request)); - rx_work_request.wr_id = reinterpret_cast(chunk);// stash descriptor ptr - rx_work_request.next = NULL; - rx_work_request.sg_list = &isge; - rx_work_request.num_sge = 1; - - ibv_recv_wr *badWorkRequest; - int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest); - if (ret) - return -errno; - return 0; -} - -int Infiniband::post_channel_cluster() -{ - vector free_chunks; - int r = memory_manager->get_channel_buffers(free_chunks, 0); - assert(r > 0); - for (vector::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) { - r = post_chunk(*iter); - assert(r == 0); - } - return 0; + delete device_list; } -Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c) +void Infiniband::set_dispatcher(RDMADispatcher *d) { - Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this); - if (cc->init()) { - delete cc; - return NULL; - } - return cc; -} + assert(!d ^ !dispatcher); -Infiniband::CompletionQueue* Infiniband::create_comp_queue( - CephContext *cct, CompletionChannel *cc) -{ - Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue( - cct, *this, CQ_DEPTH, cc); - if (cq->init()) { - delete cq; - return NULL; - } - return cq; + dispatcher = d; } // 1 means no valid buffer read, 0 means got enough buffer @@ -890,3 +768,33 @@ const char* Infiniband::qp_state_string(int status) { default: return " out of range."; } } + +void Infiniband::handle_pre_fork() +{ + device->uninit(); +} + +void Infiniband::handle_post_fork() +{ + device->init(); +} + +int Infiniband::poll_tx(int n, Device **d, ibv_wc *wc) +{ + return device_list->poll_tx(n, d, wc); +} + +int Infiniband::poll_rx(int n, Device **d, ibv_wc *wc) +{ + return device_list->poll_rx(n, d, wc); +} + +int Infiniband::poll_blocking(bool &done) +{ + return device_list->poll_blocking(done); +} + +void Infiniband::rearm_notify() +{ + device_list->rearm_notify(); +} diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h index 25b711ac2e6..68b03cebce3 100644 --- a/src/msg/async/rdma/Infiniband.h +++ b/src/msg/async/rdma/Infiniband.h @@ -47,6 +47,7 @@ class CephContext; class Port; class Device; class DeviceList; +class RDMADispatcher; class Infiniband { public: @@ -141,15 +142,11 @@ class Infiniband { }; private: - uint32_t max_send_wr; - uint32_t max_recv_wr; - uint32_t max_sge; uint8_t ib_physical_port; - MemoryManager* memory_manager; - ibv_srq* srq; // shared receive work queue Device *device; - ProtectionDomain *pd; DeviceList *device_list; + RDMADispatcher *dispatcher = nullptr; + void wire_gid_to_gid(const char *wgid, union ibv_gid *gid); void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]); @@ -157,16 +154,18 @@ class Infiniband { explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p); ~Infiniband(); + void set_dispatcher(RDMADispatcher *d); + class CompletionChannel { static const uint32_t MAX_ACK_EVENT = 5000; CephContext *cct; - Infiniband& infiniband; + Device &ibdev; ibv_comp_channel *channel; ibv_cq *cq; uint32_t cq_events_that_need_ack; public: - CompletionChannel(CephContext *c, Infiniband &ib); + CompletionChannel(CephContext *c, Device &ibdev); ~CompletionChannel(); int init(); bool get_cq_event(); @@ -182,9 +181,9 @@ class Infiniband { // You need to call init and it will create a cq and associate to comp channel class CompletionQueue { public: - CompletionQueue(CephContext *c, Infiniband &ib, + CompletionQueue(CephContext *c, Device &ibdev, const uint32_t qd, CompletionChannel *cc) - : cct(c), infiniband(ib), channel(cc), cq(NULL), queue_depth(qd) {} + : cct(c), ibdev(ibdev), channel(cc), cq(NULL), queue_depth(qd) {} ~CompletionQueue(); int init(); int poll_cq(int num_entries, ibv_wc *ret_wc_array); @@ -194,7 +193,7 @@ class Infiniband { CompletionChannel* get_cc() const { return channel; } private: CephContext *cct; - Infiniband& infiniband; // Infiniband to which this QP belongs + Device &ibdev; CompletionChannel *channel; ibv_cq *cq; uint32_t queue_depth; @@ -208,7 +207,7 @@ class Infiniband { // must call plumb() to bring the queue pair to the RTS state. class QueuePair { public: - QueuePair(CephContext *c, Infiniband& infiniband, ibv_qp_type type, + QueuePair(CephContext *c, Device &device, ibv_qp_type type, int ib_physical_port, ibv_srq *srq, Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq, @@ -255,7 +254,7 @@ class Infiniband { private: CephContext *cct; - Infiniband& infiniband; // Infiniband to which this QP belongs + Device &ibdev; // Infiniband to which this QP belongs ibv_qp_type type; // QP type (IBV_QPT_RC, etc.) ibv_context* ctxt; // device context of the HCA to use int ib_physical_port; @@ -274,23 +273,20 @@ class Infiniband { public: typedef MemoryManager::Cluster Cluster; typedef MemoryManager::Chunk Chunk; - QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*, ibv_qp_type type); - ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge); - int post_chunk(Chunk* chunk); - int post_channel_cluster(); - int get_tx_buffers(std::vector &c, size_t bytes); - CompletionChannel *create_comp_channel(CephContext *c); - CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL); uint8_t get_ib_physical_port() { return ib_physical_port; } int send_msg(CephContext *cct, int sd, IBSYNMsg& msg); int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg); - MemoryManager* get_memory_manager() { return memory_manager; } Device* get_device() { return device; } - bool is_tx_buffer(const char* c) { return memory_manager->is_tx_buffer(c);} - bool is_rx_buffer(const char* c) { return memory_manager->is_rx_buffer(c);} - Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); } static const char* wc_status_to_string(int status); static const char* qp_state_string(int status); + + void handle_pre_fork(); + void handle_post_fork(); + + int poll_tx(int n, Device **d, ibv_wc *wc); + int poll_rx(int n, Device **d, ibv_wc *wc); + int poll_blocking(bool &done); + void rearm_notify(); }; #endif diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc index 8269e44f00d..9a82abf3881 100644 --- a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -31,8 +31,7 @@ RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* i ibdev = ib->get_device(); ibport = ib->get_ib_physical_port(); - qp = infiniband->create_queue_pair( - cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC); + qp = ibdev->create_queue_pair(cct, IBV_QPT_RC); my_msg.qpn = qp->get_local_qp_number(); my_msg.psn = qp->get_initial_psn(); my_msg.lid = ibdev->get_lid(); @@ -57,11 +56,11 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl() error = ECONNRESET; int ret = 0; for (unsigned i=0; i < wc.size(); ++i) { - ret = infiniband->post_chunk(reinterpret_cast(wc[i].wr_id)); + ret = ibdev->post_chunk(reinterpret_cast(wc[i].wr_id)); assert(ret == 0); } for (unsigned i=0; i < buffers.size(); ++i) { - ret = infiniband->post_chunk(buffers[i]); + ret = ibdev->post_chunk(buffers[i]); assert(ret == 0); } } @@ -282,7 +281,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) error = ECONNRESET; ldout(cct, 20) << __func__ << " got remote close msg..." << dendl; } - assert(infiniband->post_chunk(chunk) == 0); + assert(ibdev->post_chunk(chunk) == 0); } else { if (read == (ssize_t)len) { buffers.push_back(chunk); @@ -293,7 +292,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl; } else { read += chunk->read(buf+read, response->byte_len); - assert(infiniband->post_chunk(chunk) == 0); + assert(ibdev->post_chunk(chunk) == 0); } } } @@ -320,7 +319,7 @@ ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len) read += tmp; ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl; if ((*c)->over()) { - assert(infiniband->post_chunk(*c) == 0); + assert(ibdev->post_chunk(*c) == 0); ldout(cct, 25) << __func__ << " one chunk over." << dendl; } if (read == len) { @@ -458,7 +457,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more) unsigned total = 0; unsigned need_reserve_bytes = 0; while (it != pending_bl.buffers().end()) { - if (infiniband->is_tx_buffer(it->raw_c_str())) { + if (ibdev->is_tx_buffer(it->raw_c_str())) { if (need_reserve_bytes) { unsigned copied = fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it); total += copied; @@ -467,7 +466,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more) need_reserve_bytes = 0; } assert(copy_it == it); - tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str())); + tx_buffers.push_back(ibdev->get_tx_chunk_by_buffer(it->raw_c_str())); total += it->length(); ++copy_it; } else { diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index 86b6c1b4a71..be09a75717b 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -14,7 +14,6 @@ * */ -#include #include #include @@ -32,37 +31,22 @@ static Tub global_infiniband; RDMADispatcher::~RDMADispatcher() { - done = true; - t.join(); + polling_stop(); + ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl; + global_infiniband->set_dispatcher(nullptr); + assert(qp_conns.empty()); assert(num_qp_conn == 0); assert(dead_queue_pairs.empty()); assert(num_dead_queue_pair == 0); - - tx_cc->ack_events(); - rx_cc->ack_events(); - delete tx_cq; - delete rx_cq; - delete tx_cc; - delete rx_cc; - delete async_handler; } RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s) : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"), w_lock("RDMADispatcher::for worker pending list"), stack(s) { - tx_cc = global_infiniband->create_comp_channel(c); - assert(tx_cc); - rx_cc = global_infiniband->create_comp_channel(c); - assert(rx_cc); - tx_cq = global_infiniband->create_comp_queue(c, tx_cc); - assert(tx_cq); - rx_cq = global_infiniband->create_comp_queue(c, rx_cc); - assert(rx_cq); - PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last); plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling"); @@ -89,10 +73,23 @@ RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s) perf_logger = plb.create_perf_counters(); cct->get_perfcounters_collection()->add(perf_logger); - t = std::thread(&RDMADispatcher::polling, this); cct->register_fork_watcher(this); } +void RDMADispatcher::polling_start() +{ + t = std::thread(&RDMADispatcher::polling, this); +} + +void RDMADispatcher::polling_stop() +{ + if (!t.joinable()) + return; + + done = true; + t.join(); +} + void RDMADispatcher::handle_async_event() { ldout(cct, 30) << __func__ << dendl; @@ -137,23 +134,24 @@ void RDMADispatcher::polling() std::map > polled; std::vector tx_cqe; - ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl; RDMAConnectedSocketImpl *conn = nullptr; utime_t last_inactive = ceph_clock_now(); bool rearmed = false; int r = 0; while (true) { - int tx_ret = tx_cq->poll_cq(MAX_COMPLETIONS, wc); + Device *ibdev; + + int tx_ret = global_infiniband->poll_tx(MAX_COMPLETIONS, &ibdev, wc); if (tx_ret > 0) { ldout(cct, 20) << __func__ << " tx completion queue got " << tx_ret << " responses."<< dendl; - handle_tx_event(wc, tx_ret); + handle_tx_event(ibdev, wc, tx_ret); } - int rx_ret = rx_cq->poll_cq(MAX_COMPLETIONS, wc); + int rx_ret = global_infiniband->poll_rx(MAX_COMPLETIONS, &ibdev, wc); if (rx_ret > 0) { - ldout(cct, 20) << __func__ << " rt completion queue got " << rx_ret + ldout(cct, 20) << __func__ << " rx completion queue got " << rx_ret << " responses."<< dendl; perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret); @@ -168,8 +166,8 @@ void RDMADispatcher::polling() if (response->status == IBV_WC_SUCCESS) { conn = get_conn_lockless(response->qp_num); if (!conn) { - assert(global_infiniband->is_rx_buffer(chunk->buffer)); - r = global_infiniband->post_chunk(chunk); + assert(ibdev->is_rx_buffer(chunk->buffer)); + r = ibdev->post_chunk(chunk); ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl; assert(r == 0); } else { @@ -179,9 +177,9 @@ void RDMADispatcher::polling() perf_logger->inc(l_msgr_rdma_rx_total_wc_errors); ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk << ") status(" << response->status << ":" - << global_infiniband->wc_status_to_string(response->status) << ")" << dendl; - assert(global_infiniband->is_rx_buffer(chunk->buffer)); - r = global_infiniband->post_chunk(chunk); + << Infiniband::wc_status_to_string(response->status) << ")" << dendl; + assert(ibdev->is_rx_buffer(chunk->buffer)); + r = ibdev->post_chunk(chunk); if (r) { ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(r) << dendl; assert(r == 0); @@ -223,33 +221,17 @@ void RDMADispatcher::polling() if (!rearmed) { // Clean up cq events after rearm notify ensure no new incoming event // arrived between polling and rearm - tx_cq->rearm_notify(); - rx_cq->rearm_notify(); + global_infiniband->rearm_notify(); rearmed = true; continue; } - struct pollfd channel_poll[2]; - channel_poll[0].fd = tx_cc->get_fd(); - channel_poll[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; - channel_poll[0].revents = 0; - channel_poll[1].fd = rx_cc->get_fd(); - channel_poll[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; - channel_poll[1].revents = 0; - r = 0; perf_logger->set(l_msgr_rdma_polling, 0); - while (!done && r == 0) { - r = poll(channel_poll, 2, 100); - if (r < 0) { - r = -errno; - lderr(cct) << __func__ << " poll failed " << r << dendl; - ceph_abort(); - } - } - if (r > 0 && tx_cc->get_cq_event()) - ldout(cct, 20) << __func__ << " got tx cq event." << dendl; - if (r > 0 && rx_cc->get_cq_event()) - ldout(cct, 20) << __func__ << " got rx cq event." << dendl; + + r = global_infiniband->poll_blocking(done); + if (r > 0) + ldout(cct, 20) << __func__ << " got a cq event." << dendl; + last_inactive = ceph_clock_now(); perf_logger->set(l_msgr_rdma_polling, 1); rearmed = false; @@ -314,39 +296,28 @@ void RDMADispatcher::erase_qpn(uint32_t qpn) void RDMADispatcher::handle_pre_fork() { - done = true; - t.join(); + polling_stop(); done = false; - tx_cc->ack_events(); - rx_cc->ack_events(); - delete tx_cq; - delete rx_cq; - delete tx_cc; - delete rx_cc; + global_infiniband->handle_pre_fork(); global_infiniband.destroy(); } void RDMADispatcher::handle_post_fork() { - if (!global_infiniband) + if (!global_infiniband) { global_infiniband.construct( cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num); + global_infiniband->set_dispatcher(this); + } - tx_cc = global_infiniband->create_comp_channel(cct); - assert(tx_cc); - rx_cc = global_infiniband->create_comp_channel(cct); - assert(rx_cc); - tx_cq = global_infiniband->create_comp_queue(cct, tx_cc); - assert(tx_cq); - rx_cq = global_infiniband->create_comp_queue(cct, rx_cc); - assert(rx_cq); + global_infiniband->handle_post_fork(); - t = std::thread(&RDMADispatcher::polling, this); + polling_start(); } -void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) +void RDMADispatcher::handle_tx_event(Device *ibdev, ibv_wc *cqe, int n) { std::vector tx_chunks; @@ -385,14 +356,14 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) } // FIXME: why not tx? - if (global_infiniband->get_memory_manager()->is_tx_buffer(chunk->buffer)) + if (ibdev->get_memory_manager()->is_tx_buffer(chunk->buffer)) tx_chunks.push_back(chunk); else ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl; } perf_logger->inc(l_msgr_rdma_tx_total_wc, n); - post_tx_buffer(tx_chunks); + post_tx_buffer(ibdev, tx_chunks); } /** @@ -403,13 +374,13 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) * \return * 0 if success or -1 for failure */ -void RDMADispatcher::post_tx_buffer(std::vector &chunks) +void RDMADispatcher::post_tx_buffer(Device *ibdev, std::vector &chunks) { if (chunks.empty()) return ; inflight -= chunks.size(); - global_infiniband->get_memory_manager()->return_tx(chunks); + ibdev->get_memory_manager()->return_tx(chunks); ldout(cct, 30) << __func__ << " release " << chunks.size() << " chunks, inflight " << inflight << dendl; notify_pending_workers(); @@ -481,10 +452,12 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector &c, size_t bytes) { + Device *ibdev = o->get_device(); + assert(center.in_thread()); - int r = global_infiniband->get_tx_buffers(c, bytes); + int r = ibdev->get_tx_buffers(c, bytes); assert(r >= 0); - size_t got = global_infiniband->get_memory_manager()->get_tx_buffer_size() * r; + size_t got = ibdev->get_memory_manager()->get_tx_buffer_size() * r; ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl; stack->get_dispatcher()->inflight += r; if (got == bytes) @@ -539,6 +512,9 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t) cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num); ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl; dispatcher = new RDMADispatcher(cct, this); + global_infiniband->set_dispatcher(dispatcher); + dispatcher->polling_start(); + unsigned num = get_num_worker(); for (unsigned i = 0; i < num; ++i) { RDMAWorker* w = dynamic_cast(get_worker(i)); diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index e8a0b56ac8d..da279f04866 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -67,9 +67,6 @@ class RDMADispatcher : public CephContext::ForkWatcher { std::thread t; CephContext *cct; - Infiniband::CompletionQueue* tx_cq; - Infiniband::CompletionQueue* rx_cq; - Infiniband::CompletionChannel *tx_cc, *rx_cc; EventCallbackRef async_handler; bool done = false; std::atomic num_dead_queue_pair = {0}; @@ -118,8 +115,13 @@ class RDMADispatcher : public CephContext::ForkWatcher { explicit RDMADispatcher(CephContext* c, RDMAStack* s); virtual ~RDMADispatcher(); + void handle_async_event(); + + void polling_start(); + void polling_stop(); void polling(); + int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi); void make_pending_worker(RDMAWorker* w) { Mutex::Locker l(w_lock); @@ -132,13 +134,11 @@ class RDMADispatcher : public CephContext::ForkWatcher { RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp); void erase_qpn_lockless(uint32_t qpn); void erase_qpn(uint32_t qpn); - Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; } - Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; } void notify_pending_workers(); virtual void handle_pre_fork() override; virtual void handle_post_fork() override; - void handle_tx_event(ibv_wc *cqe, int n); - void post_tx_buffer(std::vector &chunks); + void handle_tx_event(Device *ibdev, ibv_wc *cqe, int n); + void post_tx_buffer(Device *ibdev, std::vector &chunks); std::atomic inflight = {0}; }; @@ -239,6 +239,8 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl { RDMAWorker *w); virtual ~RDMAConnectedSocketImpl(); + Device *get_device() { return ibdev; } + void pass_wc(std::vector &&v); void get_wc(std::vector &w); virtual int is_connected() override { return connected; }