*/
#include "Infiniband.h"
-#include "RDMAStack.h"
#include "Device.h"
#include "common/errno.h"
#include "common/debug.h"
-#include <poll.h>
-
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix *_dout << "IBDevice "
-static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1;
-static const uint32_t CQ_DEPTH = 30000;
-
Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr)
{
#ifdef HAVE_IBV_EXP
}
-Device::Device(CephContext *cct, ibv_device* d)
- : cct(cct), device(d), lock("ibdev_lock"),
- device_attr(new ibv_device_attr), active_port(nullptr)
+Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_device_attr), active_port(nullptr)
{
if (device == NULL) {
lderr(cct) << __func__ << " device == NULL" << cpp_strerror(errno) << dendl;
lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl;
ceph_abort();
}
-
- tx_cc = create_comp_channel(cct);
- assert(tx_cc);
-
- rx_cc = create_comp_channel(cct);
- assert(rx_cc);
-
- assert(NetHandler(cct).set_nonblock(ctxt->async_fd) == 0);
-}
-
-void Device::init()
-{
- Mutex::Locker l(lock);
-
- if (initialized)
- return;
-
- pd = new ProtectionDomain(cct, this);
-
- max_recv_wr = std::min(device_attr->max_srq_wr, (int)cct->_conf->ms_async_rdma_receive_buffers);
- ldout(cct, 1) << __func__ << " assigning: " << max_recv_wr << " receive buffers" << dendl;
-
- max_send_wr = std::min(device_attr->max_qp_wr, (int)cct->_conf->ms_async_rdma_send_buffers);
- ldout(cct, 1) << __func__ << " assigning: " << max_send_wr << " send buffers" << dendl;
-
- ldout(cct, 1) << __func__ << " device allow " << device_attr->max_cqe
- << " completion entries" << dendl;
-
- memory_manager = new MemoryManager(this, pd,
- cct->_conf->ms_async_rdma_enable_hugepage);
- memory_manager->register_rx_tx(
- cct->_conf->ms_async_rdma_buffer_size, max_recv_wr, max_send_wr);
-
- srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT);
- post_channel_cluster();
-
- tx_cq = create_comp_queue(cct, tx_cc);
- assert(tx_cq);
-
- rx_cq = create_comp_queue(cct, rx_cc);
- assert(rx_cq);
-
- initialized = true;
- ldout(cct, 5) << __func__ << ":" << __LINE__ << " device " << *this << " is initialized" << dendl;
-}
-
-void Device::uninit()
-{
- Mutex::Locker l(lock);
-
- if (!initialized)
- return;
-
- tx_cc->ack_events();
- rx_cc->ack_events();
-
- initialized = false;
-
- delete rx_cq;
- delete tx_cq;
- delete rx_cc;
- delete tx_cc;
-
- assert(ibv_destroy_srq(srq) == 0);
- delete memory_manager;
- delete pd;
}
Device::~Device()
{
- uninit();
-
if (active_port) {
delete active_port;
assert(ibv_close_device(ctxt) == 0);
}
}
-/**
- * Create a new QueuePair. This factory should be used in preference to
- * the QueuePair constructor directly, since this lets derivatives of
- * Infiniband, e.g. MockInfiniband (if it existed),
- * return mocked out QueuePair derivatives.
- *
- * \return
- * QueuePair on success or NULL if init fails
- * See QueuePair::QueuePair for parameter documentation.
- */
-Infiniband::QueuePair* Device::create_queue_pair(CephContext *cct,
- ibv_qp_type type)
-{
- Infiniband::QueuePair *qp = new QueuePair(
- cct, *this, type, active_port->get_port_num(), srq, tx_cq, rx_cq, max_send_wr, max_recv_wr);
- if (qp->init()) {
- delete qp;
- return NULL;
- }
- return qp;
-}
-
-/**
- * Create a shared receive queue. This basically wraps the verbs call.
- *
- * \param[in] max_wr
- * The max number of outstanding work requests in the SRQ.
- * \param[in] max_sge
- * The max number of scatter elements per WR.
- * \return
- * A valid ibv_srq pointer, or NULL on error.
- */
-ibv_srq* Device::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge)
-{
- ibv_srq_init_attr sia;
- memset(&sia, 0, sizeof(sia));
- sia.srq_context = ctxt;
- sia.attr.max_wr = max_wr;
- sia.attr.max_sge = max_sge;
- return ibv_create_srq(pd->pd, &sia);
-}
-
-Infiniband::CompletionChannel* Device::create_comp_channel(CephContext *c)
-{
- Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this);
- if (cc->init()) {
- delete cc;
- return NULL;
- }
- return cc;
-}
-
-Infiniband::CompletionQueue* Device::create_comp_queue(
- CephContext *cct, CompletionChannel *cc)
-{
- Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(
- cct, *this, CQ_DEPTH, cc);
- if (cq->init()) {
- delete cq;
- return NULL;
- }
- return cq;
-}
-
-int Device::post_chunk(Chunk* chunk)
-{
- ibv_sge isge;
- isge.addr = reinterpret_cast<uint64_t>(chunk->buffer);
- isge.length = chunk->bytes;
- isge.lkey = chunk->mr->lkey;
- ibv_recv_wr rx_work_request;
-
- memset(&rx_work_request, 0, sizeof(rx_work_request));
- rx_work_request.wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr
- rx_work_request.next = NULL;
- rx_work_request.sg_list = &isge;
- rx_work_request.num_sge = 1;
-
- ibv_recv_wr *badWorkRequest;
- int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest);
- if (ret)
- return -errno;
- return 0;
-}
-
-int Device::post_channel_cluster()
-{
- vector<Chunk*> free_chunks;
- int r = memory_manager->get_channel_buffers(free_chunks, 0);
- assert(r > 0);
- for (vector<Chunk*>::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) {
- r = post_chunk(*iter);
- assert(r == 0);
- }
- return 0;
-}
-
-int Device::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes)
-{
- return memory_manager->get_send_buffers(c, bytes);
-}
-
-int Device::poll_tx_cq(int n, ibv_wc *wc)
-{
- if (!initialized)
- return 0;
-
- return tx_cq->poll_cq(n, wc);
-}
-
-int Device::poll_rx_cq(int n, ibv_wc *wc)
-{
- if (!initialized)
- return 0;
-
- return rx_cq->poll_cq(n, wc);
-}
-
-void Device::rearm_cqs()
-{
- int ret;
-
- if (!initialized)
- return;
-
- ret = tx_cq->rearm_notify();
- assert(!ret);
-
- ret = rx_cq->rearm_notify();
- assert(!ret);
-}
-
DeviceList::DeviceList(CephContext *cct)
- : cct(cct), device_list(ibv_get_device_list(&num))
+ : device_list(ibv_get_device_list(&num))
{
if (device_list == NULL || num == 0) {
lderr(cct) << __func__ << " failed to get rdma device list. " << cpp_strerror(errno) << dendl;
}
devices = new Device*[num];
- poll_fds = new struct pollfd[2 * num];
-
- for (int i = 0; i < num; ++i) {
- struct pollfd *pfd = &poll_fds[i * 2];
- struct Device *d;
-
- d = new Device(cct, device_list[i]);
- devices[i] = d;
-
- pfd[0].fd = d->tx_cc->get_fd();
- pfd[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
- pfd[0].revents = 0;
-
- pfd[1].fd = d->rx_cc->get_fd();
- pfd[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
- pfd[1].revents = 0;
+ for (int i = 0;i < num; ++i) {
+ devices[i] = new Device(cct, device_list[i]);
}
}
DeviceList::~DeviceList()
{
- delete poll_fds;
-
for (int i=0; i < num; ++i) {
delete devices[i];
}
}
return NULL;
}
-
-int DeviceList::poll_tx(int num_entries, Device **d, ibv_wc *wc)
-{
- int n = 0;
-
- for (int i = 0; i < num; i++) {
- *d = devices[++last_poll_dev % num];
-
- n = (*d)->poll_tx_cq(num_entries, wc);
- if (n)
- break;
- }
-
- return n;
-}
-
-int DeviceList::poll_rx(int num_entries, Device **d, ibv_wc *wc)
-{
- int n = 0;
-
- for (int i = 0; i < num; i++) {
- *d = devices[++last_poll_dev % num];
-
- n = (*d)->poll_rx_cq(num_entries, wc);
- if (n)
- break;
- }
-
- return n;
-}
-
-int DeviceList::poll_blocking(bool &done)
-{
- int r = 0;
- while (!done && r == 0) {
- r = poll(poll_fds, num * 2, 100);
- if (r < 0) {
- r = -errno;
- lderr(cct) << __func__ << " poll failed " << r << dendl;
- ceph_abort();
- }
- }
-
- if (r <= 0)
- return r;
-
- for (int i = 0; i < num ; i++) {
- Device *d = devices[i];
-
- if (d->tx_cc->get_cq_event())
- ldout(cct, 20) << __func__ << " " << *d << ": got tx cq event" << dendl;
-
- if (d->rx_cc->get_cq_event())
- ldout(cct, 20) << __func__ << " " << *d << ": got rx cq event" << dendl;
- }
-
- return r;
-}
-
-void DeviceList::rearm_notify()
-{
- for (int i = 0; i < num; i++)
- devices[i]->rearm_cqs();
-}
#include "msg/async/net_handler.h"
#include "common/Mutex.h"
-typedef Infiniband::QueuePair QueuePair;
-typedef Infiniband::CompletionChannel CompletionChannel;
-typedef Infiniband::CompletionQueue CompletionQueue;
-typedef Infiniband::ProtectionDomain ProtectionDomain;
-typedef Infiniband::MemoryManager::Cluster Cluster;
-typedef Infiniband::MemoryManager::Chunk Chunk;
-typedef Infiniband::MemoryManager MemoryManager;
-
class Port {
struct ibv_context* ctxt;
int port_num;
class Device {
- CephContext *cct;
ibv_device *device;
- const char *name;
+ const char* name;
uint8_t port_cnt;
-
- uint32_t max_send_wr;
- uint32_t max_recv_wr;
- uint32_t max_sge;
-
- Mutex lock; // Protects from concurrent intialization of the device
- bool initialized = false;
-
public:
explicit Device(CephContext *c, ibv_device* d);
~Device();
- void init();
- void uninit();
-
- const char* get_name() const { return name;}
+ const char* get_name() { return name;}
uint16_t get_lid() { return active_port->get_lid(); }
ibv_gid get_gid() { return active_port->get_gid(); }
int get_gid_idx() { return active_port->get_gid_idx(); }
void binding_port(CephContext *c, int port_num);
-
- QueuePair* create_queue_pair(CephContext *c, ibv_qp_type type);
- ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
- CompletionChannel *create_comp_channel(CephContext *c);
- CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);
- int post_chunk(Chunk* chunk);
- int post_channel_cluster();
-
- MemoryManager* get_memory_manager() { return memory_manager; }
- bool is_tx_buffer(const char* c) { return memory_manager->is_tx_buffer(c);}
- bool is_rx_buffer(const char* c) { return memory_manager->is_rx_buffer(c);}
- Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); }
- int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes);
- int poll_tx_cq(int n, ibv_wc *wc);
- int poll_rx_cq(int n, ibv_wc *wc);
- void rearm_cqs();
-
struct ibv_context *ctxt;
ibv_device_attr *device_attr;
Port* active_port;
-
- MemoryManager* memory_manager = nullptr;
- ibv_srq *srq = nullptr;
- Infiniband::CompletionQueue *rx_cq = nullptr;
- Infiniband::CompletionChannel *rx_cc = nullptr;
- Infiniband::CompletionQueue *tx_cq = nullptr;
- Infiniband::CompletionChannel *tx_cc = nullptr;
- ProtectionDomain *pd = nullptr;
};
-inline ostream& operator<<(ostream& out, const Device &d)
-{
- return out << d.get_name();
-}
-
class DeviceList {
- CephContext *cct;
struct ibv_device ** device_list;
int num;
Device** devices;
-
- int last_poll_dev;
- struct pollfd *poll_fds;
-
public:
DeviceList(CephContext *cct);
~DeviceList();
Device* get_device(const char* device_name);
-
- void rearm_notify();
- int poll_tx(int n, Device **d, ibv_wc *wc);
- int poll_rx(int n, Device **d, ibv_wc *wc);
- int poll_blocking(bool &done);
};
#endif
*/
#include "Infiniband.h"
-#include "RDMAStack.h"
#include "Device.h"
#include "common/errno.h"
#undef dout_prefix
#define dout_prefix *_dout << "Infiniband "
+static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1;
static const uint32_t MAX_INLINE_DATA = 0;
static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");
+static const uint32_t CQ_DEPTH = 30000;
Infiniband::QueuePair::QueuePair(
- CephContext *c, Device &device, ibv_qp_type type,
+ CephContext *c, Infiniband& infiniband, ibv_qp_type type,
int port, ibv_srq *srq,
Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,
uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key)
-: cct(c), ibdev(device),
+: cct(c), infiniband(infiniband),
type(type),
- ctxt(ibdev.ctxt),
+ ctxt(infiniband.device->ctxt),
ib_physical_port(port),
- pd(ibdev.pd->pd),
+ pd(infiniband.pd->pd),
srq(srq),
qp(NULL),
txcq(txcq),
lderr(cct) << __func__ << " invalid queue pair type" << cpp_strerror(errno) << dendl;
ceph_abort();
}
+ pd = infiniband.pd->pd;
}
int Infiniband::QueuePair::init()
}
-Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Device &ibdev)
- : cct(c), ibdev(ibdev), channel(NULL), cq(NULL), cq_events_that_need_ack(0)
+Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Infiniband &ib)
+ : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0)
{
}
int Infiniband::CompletionChannel::init()
{
ldout(cct, 20) << __func__ << " started." << dendl;
- channel = ibv_create_comp_channel(ibdev.ctxt);
+ channel = ibv_create_comp_channel(infiniband.device->ctxt);
if (!channel) {
lderr(cct) << __func__ << " failed to create receive completion channel: "
<< cpp_strerror(errno) << dendl;
int Infiniband::CompletionQueue::init()
{
- cq = ibv_create_cq(ibdev.ctxt, queue_depth, this, channel->get_channel(), 0);
+ cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0);
if (!cq) {
lderr(cct) << __func__ << " failed to create receive completion queue: "
<< cpp_strerror(errno) << dendl;
void Infiniband::MemoryManager::Chunk::post_srq(Infiniband *ib)
{
- ib->device->post_chunk(this);
+ ib->post_chunk(this);
}
Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
: device_list(new DeviceList(cct))
{
device = device_list->get_device(device_name.c_str());
-
- device->init();
-
device->binding_port(cct, port_num);
assert(device);
ib_physical_port = device->active_port->get_port_num();
+ pd = new ProtectionDomain(cct, device);
+ assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
+
+ max_recv_wr = device->device_attr->max_srq_wr;
+ if (max_recv_wr > cct->_conf->ms_async_rdma_receive_buffers) {
+ max_recv_wr = cct->_conf->ms_async_rdma_receive_buffers;
+ ldout(cct, 1) << __func__ << " assigning: " << max_recv_wr << " receive buffers" << dendl;
+ } else {
+ ldout(cct, 1) << __func__ << " using the max allowed receive buffers: " << max_recv_wr << dendl;
+ }
+
+ max_send_wr = device->device_attr->max_qp_wr;
+ if (max_send_wr > cct->_conf->ms_async_rdma_send_buffers) {
+ max_send_wr = cct->_conf->ms_async_rdma_send_buffers;
+ ldout(cct, 1) << __func__ << " assigning: " << max_send_wr << " send buffers" << dendl;
+ } else {
+ ldout(cct, 1) << __func__ << " using the max allowed send buffers: " << max_send_wr << dendl;
+ }
+
+ ldout(cct, 1) << __func__ << " device allow " << device->device_attr->max_cqe
+ << " completion entries" << dendl;
+
+ memory_manager = new MemoryManager(device, pd,
+ cct->_conf->ms_async_rdma_enable_hugepage);
+ memory_manager->register_rx_tx(
+ cct->_conf->ms_async_rdma_buffer_size, max_recv_wr, max_send_wr);
+
+ srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT);
+ post_channel_cluster();
}
Infiniband::~Infiniband()
{
- if (dispatcher)
- dispatcher->polling_stop();
-
+ assert(ibv_destroy_srq(srq) == 0);
+ delete memory_manager;
+ delete pd;
delete device_list;
}
-void Infiniband::set_dispatcher(RDMADispatcher *d)
+/**
+ * Create a shared receive queue. This basically wraps the verbs call.
+ *
+ * \param[in] max_wr
+ * The max number of outstanding work requests in the SRQ.
+ * \param[in] max_sge
+ * The max number of scatter elements per WR.
+ * \return
+ * A valid ibv_srq pointer, or NULL on error.
+ */
+ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge)
{
- assert(!d ^ !dispatcher);
+ ibv_srq_init_attr sia;
+ memset(&sia, 0, sizeof(sia));
+ sia.srq_context = device->ctxt;
+ sia.attr.max_wr = max_wr;
+ sia.attr.max_sge = max_sge;
+ return ibv_create_srq(pd->pd, &sia);
+}
- dispatcher = d;
+int Infiniband::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes)
+{
+ return memory_manager->get_send_buffers(c, bytes);
+}
+
+/**
+ * Create a new QueuePair. This factory should be used in preference to
+ * the QueuePair constructor directly, since this lets derivatives of
+ * Infiniband, e.g. MockInfiniband (if it existed),
+ * return mocked out QueuePair derivatives.
+ *
+ * \return
+ * QueuePair on success or NULL if init fails
+ * See QueuePair::QueuePair for parameter documentation.
+ */
+Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx, CompletionQueue* rx, ibv_qp_type type)
+{
+ Infiniband::QueuePair *qp = new QueuePair(
+ cct, *this, type, ib_physical_port, srq, tx, rx, max_send_wr, max_recv_wr);
+ if (qp->init()) {
+ delete qp;
+ return NULL;
+ }
+ return qp;
+}
+
+int Infiniband::post_chunk(Chunk* chunk)
+{
+ ibv_sge isge;
+ isge.addr = reinterpret_cast<uint64_t>(chunk->buffer);
+ isge.length = chunk->bytes;
+ isge.lkey = chunk->mr->lkey;
+ ibv_recv_wr rx_work_request;
+
+ memset(&rx_work_request, 0, sizeof(rx_work_request));
+ rx_work_request.wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr
+ rx_work_request.next = NULL;
+ rx_work_request.sg_list = &isge;
+ rx_work_request.num_sge = 1;
+
+ ibv_recv_wr *badWorkRequest;
+ int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest);
+ if (ret)
+ return -errno;
+ return 0;
+}
+
+int Infiniband::post_channel_cluster()
+{
+ vector<Chunk*> free_chunks;
+ int r = memory_manager->get_channel_buffers(free_chunks, 0);
+ assert(r > 0);
+ for (vector<Chunk*>::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) {
+ r = post_chunk(*iter);
+ assert(r == 0);
+ }
+ return 0;
+}
+
+Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c)
+{
+ Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this);
+ if (cc->init()) {
+ delete cc;
+ return NULL;
+ }
+ return cc;
+}
+
+Infiniband::CompletionQueue* Infiniband::create_comp_queue(
+ CephContext *cct, CompletionChannel *cc)
+{
+ Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(
+ cct, *this, CQ_DEPTH, cc);
+ if (cq->init()) {
+ delete cq;
+ return NULL;
+ }
+ return cq;
}
// 1 means no valid buffer read, 0 means got enough buffer
default: return " out of range.";
}
}
-
-void Infiniband::handle_pre_fork()
-{
- device->uninit();
-}
-
-void Infiniband::handle_post_fork()
-{
- device->init();
-}
-
-int Infiniband::poll_tx(int n, Device **d, ibv_wc *wc)
-{
- return device_list->poll_tx(n, d, wc);
-}
-
-int Infiniband::poll_rx(int n, Device **d, ibv_wc *wc)
-{
- return device_list->poll_rx(n, d, wc);
-}
-
-int Infiniband::poll_blocking(bool &done)
-{
- return device_list->poll_blocking(done);
-}
-
-void Infiniband::rearm_notify()
-{
- device_list->rearm_notify();
-}
class Port;
class Device;
class DeviceList;
-class RDMADispatcher;
class Infiniband {
public:
};
private:
+ uint32_t max_send_wr;
+ uint32_t max_recv_wr;
+ uint32_t max_sge;
uint8_t ib_physical_port;
+ MemoryManager* memory_manager;
+ ibv_srq* srq; // shared receive work queue
Device *device;
+ ProtectionDomain *pd;
DeviceList *device_list;
- RDMADispatcher *dispatcher = nullptr;
-
void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);
void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p);
~Infiniband();
- void set_dispatcher(RDMADispatcher *d);
-
class CompletionChannel {
static const uint32_t MAX_ACK_EVENT = 5000;
CephContext *cct;
- Device &ibdev;
+ Infiniband& infiniband;
ibv_comp_channel *channel;
ibv_cq *cq;
uint32_t cq_events_that_need_ack;
public:
- CompletionChannel(CephContext *c, Device &ibdev);
+ CompletionChannel(CephContext *c, Infiniband &ib);
~CompletionChannel();
int init();
bool get_cq_event();
// You need to call init and it will create a cq and associate to comp channel
class CompletionQueue {
public:
- CompletionQueue(CephContext *c, Device &ibdev,
+ CompletionQueue(CephContext *c, Infiniband &ib,
const uint32_t qd, CompletionChannel *cc)
- : cct(c), ibdev(ibdev), channel(cc), cq(NULL), queue_depth(qd) {}
+ : cct(c), infiniband(ib), channel(cc), cq(NULL), queue_depth(qd) {}
~CompletionQueue();
int init();
int poll_cq(int num_entries, ibv_wc *ret_wc_array);
CompletionChannel* get_cc() const { return channel; }
private:
CephContext *cct;
- Device &ibdev;
+ Infiniband& infiniband; // Infiniband to which this QP belongs
CompletionChannel *channel;
ibv_cq *cq;
uint32_t queue_depth;
// must call plumb() to bring the queue pair to the RTS state.
class QueuePair {
public:
- QueuePair(CephContext *c, Device &device, ibv_qp_type type,
+ QueuePair(CephContext *c, Infiniband& infiniband, ibv_qp_type type,
int ib_physical_port, ibv_srq *srq,
Infiniband::CompletionQueue* txcq,
Infiniband::CompletionQueue* rxcq,
private:
CephContext *cct;
- Device &ibdev; // Infiniband to which this QP belongs
+ Infiniband& infiniband; // Infiniband to which this QP belongs
ibv_qp_type type; // QP type (IBV_QPT_RC, etc.)
ibv_context* ctxt; // device context of the HCA to use
int ib_physical_port;
public:
typedef MemoryManager::Cluster Cluster;
typedef MemoryManager::Chunk Chunk;
+ QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*, ibv_qp_type type);
+ ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
+ int post_chunk(Chunk* chunk);
+ int post_channel_cluster();
+ int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes);
+ CompletionChannel *create_comp_channel(CephContext *c);
+ CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);
uint8_t get_ib_physical_port() { return ib_physical_port; }
int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
+ MemoryManager* get_memory_manager() { return memory_manager; }
Device* get_device() { return device; }
+ bool is_tx_buffer(const char* c) { return memory_manager->is_tx_buffer(c);}
+ bool is_rx_buffer(const char* c) { return memory_manager->is_rx_buffer(c);}
+ Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); }
static const char* wc_status_to_string(int status);
static const char* qp_state_string(int status);
-
- void handle_pre_fork();
- void handle_post_fork();
-
- int poll_tx(int n, Device **d, ibv_wc *wc);
- int poll_rx(int n, Device **d, ibv_wc *wc);
- int poll_blocking(bool &done);
- void rearm_notify();
};
#endif
ibdev = ib->get_device();
ibport = ib->get_ib_physical_port();
- qp = ibdev->create_queue_pair(cct, IBV_QPT_RC);
+ qp = infiniband->create_queue_pair(
+ cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC);
my_msg.qpn = qp->get_local_qp_number();
my_msg.psn = qp->get_initial_psn();
my_msg.lid = ibdev->get_lid();
error = ECONNRESET;
int ret = 0;
for (unsigned i=0; i < wc.size(); ++i) {
- ret = ibdev->post_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
+ ret = infiniband->post_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
assert(ret == 0);
dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
}
for (unsigned i=0; i < buffers.size(); ++i) {
- ret = ibdev->post_chunk(buffers[i]);
+ ret = infiniband->post_chunk(buffers[i]);
assert(ret == 0);
dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
}
error = ECONNRESET;
ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
}
- assert(ibdev->post_chunk(chunk) == 0);
+ assert(infiniband->post_chunk(chunk) == 0);
dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
} else {
if (read == (ssize_t)len) {
ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
} else {
read += chunk->read(buf+read, response->byte_len);
- assert(ibdev->post_chunk(chunk) == 0);
+ assert(infiniband->post_chunk(chunk) == 0);
dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
}
}
read += tmp;
ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl;
if ((*c)->over()) {
- assert(ibdev->post_chunk(*c) == 0);
+ assert(infiniband->post_chunk(*c) == 0);
dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
ldout(cct, 25) << __func__ << " one chunk over." << dendl;
}
unsigned total = 0;
unsigned need_reserve_bytes = 0;
while (it != pending_bl.buffers().end()) {
- if (ibdev->is_tx_buffer(it->raw_c_str())) {
+ if (infiniband->is_tx_buffer(it->raw_c_str())) {
if (need_reserve_bytes) {
unsigned copied = fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it);
total += copied;
need_reserve_bytes = 0;
}
assert(copy_it == it);
- tx_buffers.push_back(ibdev->get_tx_chunk_by_buffer(it->raw_c_str()));
+ tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str()));
total += it->length();
++copy_it;
} else {
*
*/
+#include <poll.h>
#include <sys/time.h>
#include <sys/resource.h>
RDMADispatcher::~RDMADispatcher()
{
- polling_stop();
-
+ done = true;
+ t.join();
ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
- global_infiniband->set_dispatcher(nullptr);
-
assert(qp_conns.empty());
assert(num_qp_conn == 0);
assert(dead_queue_pairs.empty());
assert(num_dead_queue_pair == 0);
+
+ tx_cc->ack_events();
+ rx_cc->ack_events();
+ delete tx_cq;
+ delete rx_cq;
+ delete tx_cc;
+ delete rx_cc;
+ delete async_handler;
}
RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
: cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
w_lock("RDMADispatcher::for worker pending list"), stack(s)
{
+ tx_cc = global_infiniband->create_comp_channel(c);
+ assert(tx_cc);
+ rx_cc = global_infiniband->create_comp_channel(c);
+ assert(rx_cc);
+ tx_cq = global_infiniband->create_comp_queue(c, tx_cc);
+ assert(tx_cq);
+ rx_cq = global_infiniband->create_comp_queue(c, rx_cc);
+ assert(rx_cq);
+
PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
perf_logger = plb.create_perf_counters();
cct->get_perfcounters_collection()->add(perf_logger);
- cct->register_fork_watcher(this);
-}
-
-void RDMADispatcher::polling_start()
-{
t = std::thread(&RDMADispatcher::polling, this);
-}
-
-void RDMADispatcher::polling_stop()
-{
- if (!t.joinable())
- return;
-
- done = true;
- t.join();
+ cct->register_fork_watcher(this);
}
void RDMADispatcher::handle_async_event()
std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
std::vector<ibv_wc> tx_cqe;
+ ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl;
RDMAConnectedSocketImpl *conn = nullptr;
utime_t last_inactive = ceph_clock_now();
bool rearmed = false;
int r = 0;
while (true) {
- Device *ibdev;
-
- int tx_ret = global_infiniband->poll_tx(MAX_COMPLETIONS, &ibdev, wc);
+ int tx_ret = tx_cq->poll_cq(MAX_COMPLETIONS, wc);
if (tx_ret > 0) {
ldout(cct, 20) << __func__ << " tx completion queue got " << tx_ret
<< " responses."<< dendl;
- handle_tx_event(ibdev, wc, tx_ret);
+ handle_tx_event(wc, tx_ret);
}
- int rx_ret = global_infiniband->poll_rx(MAX_COMPLETIONS, &ibdev, wc);
+ int rx_ret = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
if (rx_ret > 0) {
- ldout(cct, 20) << __func__ << " rx completion queue got " << rx_ret
+ ldout(cct, 20) << __func__ << " rt completion queue got " << rx_ret
<< " responses."<< dendl;
perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret);
if (response->status == IBV_WC_SUCCESS) {
conn = get_conn_lockless(response->qp_num);
if (!conn) {
- assert(ibdev->is_rx_buffer(chunk->buffer));
- r = ibdev->post_chunk(chunk);
+ assert(global_infiniband->is_rx_buffer(chunk->buffer));
+ r = global_infiniband->post_chunk(chunk);
ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
assert(r == 0);
} else {
perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
<< ") status(" << response->status << ":"
- << Infiniband::wc_status_to_string(response->status) << ")" << dendl;
- assert(ibdev->is_rx_buffer(chunk->buffer));
- r = ibdev->post_chunk(chunk);
+ << global_infiniband->wc_status_to_string(response->status) << ")" << dendl;
+ assert(global_infiniband->is_rx_buffer(chunk->buffer));
+ r = global_infiniband->post_chunk(chunk);
if (r) {
ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(r) << dendl;
assert(r == 0);
if (!rearmed) {
// Clean up cq events after rearm notify ensure no new incoming event
// arrived between polling and rearm
- global_infiniband->rearm_notify();
+ tx_cq->rearm_notify();
+ rx_cq->rearm_notify();
rearmed = true;
continue;
}
+ struct pollfd channel_poll[2];
+ channel_poll[0].fd = tx_cc->get_fd();
+ channel_poll[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
+ channel_poll[0].revents = 0;
+ channel_poll[1].fd = rx_cc->get_fd();
+ channel_poll[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
+ channel_poll[1].revents = 0;
+ r = 0;
perf_logger->set(l_msgr_rdma_polling, 0);
-
- r = global_infiniband->poll_blocking(done);
- if (r > 0)
- ldout(cct, 20) << __func__ << " got a cq event." << dendl;
-
+ while (!done && r == 0) {
+ r = poll(channel_poll, 2, 100);
+ if (r < 0) {
+ r = -errno;
+ lderr(cct) << __func__ << " poll failed " << r << dendl;
+ ceph_abort();
+ }
+ }
+ if (r > 0 && tx_cc->get_cq_event())
+ ldout(cct, 20) << __func__ << " got tx cq event." << dendl;
+ if (r > 0 && rx_cc->get_cq_event())
+ ldout(cct, 20) << __func__ << " got rx cq event." << dendl;
last_inactive = ceph_clock_now();
perf_logger->set(l_msgr_rdma_polling, 1);
rearmed = false;
void RDMADispatcher::handle_pre_fork()
{
- polling_stop();
+ done = true;
+ t.join();
done = false;
- global_infiniband->handle_pre_fork();
+ tx_cc->ack_events();
+ rx_cc->ack_events();
+ delete tx_cq;
+ delete rx_cq;
+ delete tx_cc;
+ delete rx_cc;
global_infiniband.destroy();
}
void RDMADispatcher::handle_post_fork()
{
- if (!global_infiniband) {
+ if (!global_infiniband)
global_infiniband.construct(
cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
- global_infiniband->set_dispatcher(this);
- }
- global_infiniband->handle_post_fork();
+ tx_cc = global_infiniband->create_comp_channel(cct);
+ assert(tx_cc);
+ rx_cc = global_infiniband->create_comp_channel(cct);
+ assert(rx_cc);
+ tx_cq = global_infiniband->create_comp_queue(cct, tx_cc);
+ assert(tx_cq);
+ rx_cq = global_infiniband->create_comp_queue(cct, rx_cc);
+ assert(rx_cq);
- polling_start();
+ t = std::thread(&RDMADispatcher::polling, this);
}
-void RDMADispatcher::handle_tx_event(Device *ibdev, ibv_wc *cqe, int n)
+void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
{
std::vector<Chunk*> tx_chunks;
}
// FIXME: why not tx?
- if (ibdev->get_memory_manager()->is_tx_buffer(chunk->buffer))
+ if (global_infiniband->get_memory_manager()->is_tx_buffer(chunk->buffer))
tx_chunks.push_back(chunk);
else
ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl;
}
perf_logger->inc(l_msgr_rdma_tx_total_wc, n);
- post_tx_buffer(ibdev, tx_chunks);
+ post_tx_buffer(tx_chunks);
}
/**
* \return
* 0 if success or -1 for failure
*/
-void RDMADispatcher::post_tx_buffer(Device *ibdev, std::vector<Chunk*> &chunks)
+void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
{
if (chunks.empty())
return ;
inflight -= chunks.size();
- ibdev->get_memory_manager()->return_tx(chunks);
+ global_infiniband->get_memory_manager()->return_tx(chunks);
ldout(cct, 30) << __func__ << " release " << chunks.size()
<< " chunks, inflight " << inflight << dendl;
notify_pending_workers();
int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
{
- Device *ibdev = o->get_device();
-
assert(center.in_thread());
- int r = ibdev->get_tx_buffers(c, bytes);
+ int r = global_infiniband->get_tx_buffers(c, bytes);
assert(r >= 0);
- size_t got = ibdev->get_memory_manager()->get_tx_buffer_size() * r;
+ size_t got = global_infiniband->get_memory_manager()->get_tx_buffer_size() * r;
ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl;
stack->get_dispatcher()->inflight += r;
if (got == bytes)
cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
dispatcher = new RDMADispatcher(cct, this);
- global_infiniband->set_dispatcher(dispatcher);
- dispatcher->polling_start();
-
unsigned num = get_num_worker();
for (unsigned i = 0; i < num; ++i) {
RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
std::thread t;
CephContext *cct;
+ Infiniband::CompletionQueue* tx_cq;
+ Infiniband::CompletionQueue* rx_cq;
+ Infiniband::CompletionChannel *tx_cc, *rx_cc;
EventCallbackRef async_handler;
bool done = false;
std::atomic<uint64_t> num_dead_queue_pair = {0};
explicit RDMADispatcher(CephContext* c, RDMAStack* s);
virtual ~RDMADispatcher();
-
void handle_async_event();
-
- void polling_start();
- void polling_stop();
void polling();
-
int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
void make_pending_worker(RDMAWorker* w) {
Mutex::Locker l(w_lock);
RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp);
void erase_qpn_lockless(uint32_t qpn);
void erase_qpn(uint32_t qpn);
+ Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; }
+ Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; }
void notify_pending_workers();
virtual void handle_pre_fork() override;
virtual void handle_post_fork() override;
- void handle_tx_event(Device *ibdev, ibv_wc *cqe, int n);
- void post_tx_buffer(Device *ibdev, std::vector<Chunk*> &chunks);
+ void handle_tx_event(ibv_wc *cqe, int n);
+ void post_tx_buffer(std::vector<Chunk*> &chunks);
std::atomic<uint64_t> inflight = {0};
};
RDMAWorker *w);
virtual ~RDMAConnectedSocketImpl();
- Device *get_device() { return ibdev; }
-
void pass_wc(std::vector<ibv_wc> &&v);
void get_wc(std::vector<ibv_wc> &w);
virtual int is_connected() override { return connected; }