static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");\r
static const uint32_t CQ_DEPTH = 30000;\r
\r
-Device::Device(CephContext *c, ibv_device* d): cct(c), device(d), device_attr(new ibv_device_attr), active_port(nullptr)\r
+Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_device_attr), active_port(nullptr)\r
{\r
if (device == NULL) {\r
lderr(cct) << __func__ << "device == NULL" << cpp_strerror(errno) << dendl;\r
}\r
}\r
\r
-void Device::binding_port(uint8_t port_num) {\r
+void Device::binding_port(CephContext *cct, uint8_t port_num) {\r
port_cnt = device_attr->phys_port_cnt;\r
ports = new Port*[port_cnt];\r
for (uint8_t i = 0; i < port_cnt; ++i) {\r
}\r
}\r
\r
-Infiniband::Infiniband(CephContext *c, const std::string &device_name, uint8_t port_num): cct(c), device_list(c), net(c)\r
+Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num): device_list(cct), net(cct)\r
{\r
device = device_list.get_device(device_name.c_str());\r
- device->binding_port(port_num);\r
+ device->binding_port(cct, port_num);\r
assert(device);\r
ib_physical_port = device->active_port->get_port_num();\r
pd = new ProtectionDomain(cct, device);\r
ldout(cct, 1) << __func__ << " device allow " << device->device_attr->max_cqe\r
<< " completion entries" << dendl;\r
\r
- memory_manager = new MemoryManager(cct, device, pd);\r
+ memory_manager = new MemoryManager(device, pd,\r
+ cct->_conf->ms_async_rdma_enable_hugepage);\r
memory_manager->register_rx_tx(\r
cct->_conf->ms_async_rdma_buffer_size,\r
cct->_conf->ms_async_rdma_receive_buffers,\r
*/\r
ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge)\r
{\r
- ldout(cct, 20) << __func__ << " max_wr=" << max_wr << " max_sge=" << max_sge << dendl;\r
ibv_srq_init_attr sia;\r
memset(&sia, 0, sizeof(sia));\r
sia.srq_context = device->ctxt;\r
* QueuePair on success or NULL if init fails\r
* See QueuePair::QueuePair for parameter documentation.\r
*/\r
-Infiniband::QueuePair* Infiniband::create_queue_pair(CompletionQueue *tx, CompletionQueue* rx, ibv_qp_type type)\r
+Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx, CompletionQueue* rx, ibv_qp_type type)\r
{\r
- Infiniband::QueuePair *qp = new QueuePair(*this, type, ib_physical_port, srq, tx, rx, max_send_wr, max_recv_wr);\r
+ Infiniband::QueuePair *qp = new QueuePair(\r
+ cct, *this, type, ib_physical_port, srq, tx, rx, max_send_wr, max_recv_wr);\r
if (qp->init()) {\r
delete qp;\r
return NULL;\r
\r
int Infiniband::QueuePair::init()\r
{\r
- ldout(infiniband.cct, 20) << __func__ << " started." << dendl;\r
+ ldout(cct, 20) << __func__ << " started." << dendl;\r
ibv_qp_init_attr qpia;\r
memset(&qpia, 0, sizeof(qpia));\r
qpia.send_cq = txcq->get_cq();\r
\r
qp = ibv_create_qp(pd, &qpia);\r
if (qp == NULL) {\r
- lderr(infiniband.cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;\r
+ lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;\r
return -1;\r
}\r
\r
- ldout(infiniband.cct, 20) << __func__ << " successfully create queue pair: "\r
- << "qp=" << qp << dendl;\r
+ ldout(cct, 20) << __func__ << " successfully create queue pair: "\r
+ << "qp=" << qp << dendl;\r
\r
// move from RESET to INIT state\r
ibv_qp_attr qpa;\r
int ret = ibv_modify_qp(qp, &qpa, mask);\r
if (ret) {\r
ibv_destroy_qp(qp);\r
- lderr(infiniband.cct) << __func__ << " failed to transition to INIT state: "\r
- << cpp_strerror(errno) << dendl;\r
+ lderr(cct) << __func__ << " failed to transition to INIT state: "\r
+ << cpp_strerror(errno) << dendl;\r
return -1;\r
}\r
- ldout(infiniband.cct, 20) << __func__ << " successfully change queue pair to INIT:"\r
- << " qp=" << qp << dendl;\r
+ ldout(cct, 20) << __func__ << " successfully change queue pair to INIT:"\r
+ << " qp=" << qp << dendl;\r
return 0;\r
}\r
\r
int mask = IBV_QP_STATE;\r
int ret = ibv_modify_qp(qp, &qpa, mask);\r
if (ret) {\r
- lderr(infiniband.cct) << __func__ << " failed to transition to ERROR state: "\r
- << cpp_strerror(errno) << dendl;\r
+ lderr(cct) << __func__ << " failed to transition to ERROR state: "\r
+ << cpp_strerror(errno) << dendl;\r
return -errno;\r
}\r
dead = true;\r
\r
ibv_recv_wr *badWorkRequest;\r
int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest);\r
- if (ret) {\r
- lderr(cct) << __func__ << " ib_post_srq_recv failed on post "\r
- << cpp_strerror(errno) << dendl;\r
+ if (ret)\r
return -1;\r
- }\r
return 0;\r
}\r
\r
r = post_chunk(*iter);\r
assert(r == 0);\r
}\r
- ldout(cct, 20) << __func__ << " posted buffers to srq. "<< dendl;\r
return 0;\r
}\r
\r
-Infiniband::CompletionChannel* Infiniband::create_comp_channel()\r
+Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c)\r
{\r
- ldout(cct, 20) << __func__ << " started." << dendl;\r
- Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(*this);\r
+ Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this);\r
if (cc->init()) {\r
delete cc;\r
return NULL;\r
return cc;\r
}\r
\r
-Infiniband::CompletionQueue* Infiniband::create_comp_queue(CompletionChannel *cc)\r
+Infiniband::CompletionQueue* Infiniband::create_comp_queue(\r
+ CephContext *cct, CompletionChannel *cc)\r
{\r
- Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(*this, CQ_DEPTH, cc);\r
+ Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(\r
+ cct, *this, CQ_DEPTH, cc);\r
if (cq->init()) {\r
delete cq;\r
return NULL;\r
\r
\r
Infiniband::QueuePair::QueuePair(\r
- Infiniband& infiniband, ibv_qp_type type, int port, ibv_srq *srq,\r
+ CephContext *c, Infiniband& infiniband, ibv_qp_type type,\r
+ int port, ibv_srq *srq,\r
Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,\r
uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key)\r
-: infiniband(infiniband),\r
+: cct(c), infiniband(infiniband),\r
type(type),\r
ctxt(infiniband.device->ctxt),\r
ib_physical_port(port),\r
{\r
initial_psn = lrand48() & 0xffffff;\r
if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) {\r
- lderr(infiniband.cct) << __func__ << "invalid queue pair type" << cpp_strerror(errno) << dendl;\r
+ lderr(cct) << __func__ << "invalid queue pair type" << cpp_strerror(errno) << dendl;\r
assert(0);\r
}\r
pd = infiniband.pd->pd;\r
\r
// 1 means no valid buffer read, 0 means got enough buffer\r
// else return < 0 means error\r
-int Infiniband::recv_msg(int sd, IBSYNMsg& im)\r
+int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)\r
{\r
char msg[TCP_MSG_LEN];\r
char gid[33];\r
return r;\r
}\r
\r
-int Infiniband::send_msg(int sd, IBSYNMsg& im)\r
+int Infiniband::send_msg(CephContext *cct, int sd, IBSYNMsg& im)\r
{\r
int retry = 0;\r
ssize_t r;\r
if (channel) {\r
int r = ibv_destroy_comp_channel(channel);\r
if (r < 0)\r
- lderr(infiniband.cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl;\r
+ lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl;\r
assert(r == 0);\r
}\r
}\r
if (cq) {\r
int r = ibv_destroy_cq(cq);\r
if (r < 0)\r
- lderr(infiniband.cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl;\r
+ lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl;\r
assert(r == 0);\r
}\r
}\r
\r
int Infiniband::CompletionQueue::rearm_notify(bool solicite_only)\r
{\r
- ldout(infiniband.cct, 20) << __func__ << " started." << dendl;\r
+ ldout(cct, 20) << __func__ << " started." << dendl;\r
int r = ibv_req_notify_cq(cq, 0);\r
if (r < 0)\r
- lderr(infiniband.cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl;\r
+ lderr(cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl;\r
return r;\r
}\r
\r
int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) {\r
int r = ibv_poll_cq(cq, num_entries, ret_wc_array);\r
if (r < 0) {\r
- lderr(infiniband.cct) << __func__ << " poll_completion_queue occur met error: "\r
+ lderr(cct) << __func__ << " poll_completion_queue occur met error: "\r
<< cpp_strerror(errno) << dendl;\r
return -1;\r
}\r
\r
bool Infiniband::CompletionChannel::get_cq_event()\r
{\r
- // ldout(infiniband.cct, 21) << __func__ << " started." << dendl;\r
ibv_cq *cq = NULL;\r
void *ev_ctx;\r
if (ibv_get_cq_event(channel, &cq, &ev_ctx)) {\r
if (errno != EAGAIN && errno != EINTR)\r
- lderr(infiniband.cct) << __func__ << "failed to retrieve CQ event: "\r
- << cpp_strerror(errno) << dendl;\r
+ lderr(cct) << __func__ << " failed to retrieve CQ event: "\r
+ << cpp_strerror(errno) << dendl;\r
return false;\r
}\r
\r
* * be acked, and periodically ack them\r
* */\r
if (++cq_events_that_need_ack == MAX_ACK_EVENT) {\r
- ldout(infiniband.cct, 20) << __func__ << " ack aq events." << dendl;\r
+ ldout(cct, 20) << __func__ << " ack aq events." << dendl;\r
ibv_ack_cq_events(cq, MAX_ACK_EVENT);\r
cq_events_that_need_ack = 0;\r
}\r
{\r
cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0);\r
if (!cq) {\r
- lderr(infiniband.cct) << __func__ << " failed to create receive completion queue: "\r
+ lderr(cct) << __func__ << " failed to create receive completion queue: "\r
<< cpp_strerror(errno) << dendl;\r
return -1;\r
}\r
\r
if (ibv_req_notify_cq(cq, 0)) {\r
- lderr(infiniband.cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl;\r
+ lderr(cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl;\r
ibv_destroy_cq(cq);\r
cq = nullptr;\r
return -1;\r
}\r
\r
channel->bind_cq(cq);\r
- ldout(infiniband.cct, 20) << __func__ << " successfully create cq=" << cq << dendl;\r
+ ldout(cct, 20) << __func__ << " successfully create cq=" << cq << dendl;\r
return 0;\r
}\r
\r
int Infiniband::CompletionChannel::init()\r
{\r
- ldout(infiniband.cct, 20) << __func__ << " started." << dendl;\r
+ ldout(cct, 20) << __func__ << " started." << dendl;\r
channel = ibv_create_comp_channel(infiniband.device->ctxt);\r
if (!channel) {\r
- lderr(infiniband.cct) << __func__ << " failed to create receive completion channel: "\r
+ lderr(cct) << __func__ << " failed to create receive completion channel: "\r
<< cpp_strerror(errno) << dendl;\r
return -1;\r
}\r
};\r
\r
if (status < IBV_WC_SUCCESS || status > IBV_WC_GENERAL_ERR)\r
- return "<status out of range!>";\r
+ return "<status out of range!>";\r
return lookup[status];\r
}\r
\r
class CephContext;\r
\r
class Port {\r
- CephContext *cct;\r
struct ibv_context* ctxt;\r
uint8_t port_num;\r
struct ibv_port_attr* port_attr;\r
union ibv_gid gid;\r
\r
public:\r
- explicit Port(CephContext *c, struct ibv_context* ictxt, uint8_t ipn): cct(c), ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr) {\r
+ explicit Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr) {\r
int r = ibv_query_port(ctxt, port_num, port_attr);\r
if (r == -1) {\r
lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl;\r
\r
\r
class Device {\r
- CephContext *cct;\r
ibv_device *device;\r
const char* name;\r
uint8_t port_cnt;\r
const char* get_name() { return name;}\r
uint16_t get_lid() { return active_port->get_lid(); }\r
ibv_gid get_gid() { return active_port->get_gid(); }\r
- void binding_port(uint8_t port_num);\r
+ void binding_port(CephContext *c, uint8_t port_num);\r
struct ibv_context *ctxt;\r
ibv_device_attr *device_attr;\r
Port* active_port;\r
\r
\r
class DeviceList {\r
- CephContext *cct;\r
struct ibv_device ** device_list;\r
int num;\r
Device** devices;\r
public:\r
- DeviceList(CephContext *c): cct(c), device_list(ibv_get_device_list(&num)) {\r
+ DeviceList(CephContext *cct): device_list(ibv_get_device_list(&num)) {\r
if (device_list == NULL || num == 0) {\r
lderr(cct) << __func__ << " failed to get rdma device list. " << cpp_strerror(errno) << dendl;\r
assert(0);\r
public:\r
class ProtectionDomain {\r
public:\r
- explicit ProtectionDomain(CephContext *c, Device *device)\r
- : cct(c), pd(ibv_alloc_pd(device->ctxt))\r
+ explicit ProtectionDomain(CephContext *cct, Device *device)\r
+ : pd(ibv_alloc_pd(device->ctxt))\r
{\r
if (pd == NULL) {\r
lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl;\r
}\r
~ProtectionDomain() {\r
int rc = ibv_dealloc_pd(pd);\r
- if (rc != 0) {\r
- lderr(cct) << __func__ << " ibv_dealloc_pd failed: "\r
- << cpp_strerror(errno) << dendl;\r
- }\r
+ assert(rc == 0);\r
}\r
- CephContext *cct;\r
ibv_pd* const pd;\r
};\r
\r
char* base;\r
};\r
\r
- MemoryManager(CephContext *cct, Device *d, ProtectionDomain *p) : cct(cct), device(d), pd(p) {\r
- enabled_huge_page = cct->_conf->ms_async_rdma_enable_hugepage;\r
+ MemoryManager(Device *d, ProtectionDomain *p, bool hugepage): device(d), pd(p) {\r
+ enabled_huge_page = hugepage;\r
}\r
~MemoryManager() {\r
if (channel)\r
size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE);\r
char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0);\r
if (ptr == MAP_FAILED) {\r
- lderr(cct) << __func__ << " MAP_FAILED" << dendl;\r
ptr = (char *)malloc(real_size);\r
if (ptr == NULL) return NULL;\r
real_size = 0;\r
}\r
*((size_t *)ptr) = real_size;\r
- lderr(cct) << __func__ << " bingo!" << dendl;\r
return ptr + HUGE_PAGE_SIZE;\r
}\r
void free_huge_pages(void *ptr) {\r
private:\r
Cluster* channel;//RECV\r
Cluster* send;// SEND\r
- CephContext *cct;\r
Device *device;\r
ProtectionDomain *pd;\r
};\r
ibv_srq* srq; // shared receive work queue\r
Device *device;\r
ProtectionDomain *pd;\r
- CephContext* cct;\r
DeviceList device_list;\r
void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);\r
void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);\r
}\r
\r
class CompletionChannel {\r
- static const uint32_t MAX_ACK_EVENT = 5000;\r
- Infiniband& infiniband;\r
- ibv_comp_channel *channel;\r
- ibv_cq *cq;\r
- uint32_t cq_events_that_need_ack;\r
+ static const uint32_t MAX_ACK_EVENT = 5000;\r
+ CephContext *cct;\r
+ Infiniband& infiniband;\r
+ ibv_comp_channel *channel;\r
+ ibv_cq *cq;\r
+ uint32_t cq_events_that_need_ack;\r
\r
public:\r
- CompletionChannel(Infiniband &ib): infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0) {}\r
+ CompletionChannel(CephContext *c, Infiniband &ib)\r
+ : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0) {}\r
~CompletionChannel();\r
int init();\r
bool get_cq_event();\r
// You need to call init and it will create a cq and associate to comp channel\r
class CompletionQueue {\r
public:\r
- CompletionQueue(Infiniband &ib, const uint32_t qd, CompletionChannel *cc):infiniband(ib), channel(cc), cq(NULL), queue_depth(qd) {}\r
+ CompletionQueue(CephContext *c, Infiniband &ib,\r
+ const uint32_t qd, CompletionChannel *cc)\r
+ : cct(c), infiniband(ib), channel(cc), cq(NULL), queue_depth(qd) {}\r
~CompletionQueue();\r
int init();\r
int poll_cq(int num_entries, ibv_wc *ret_wc_array);\r
int rearm_notify(bool solicited_only=true);\r
CompletionChannel* get_cc() const { return channel; }\r
private:\r
+ CephContext *cct;\r
Infiniband& infiniband; // Infiniband to which this QP belongs\r
CompletionChannel *channel;\r
ibv_cq *cq;\r
// must call plumb() to bring the queue pair to the RTS state.\r
class QueuePair {\r
public:\r
- QueuePair(Infiniband& infiniband, ibv_qp_type type,int ib_physical_port, ibv_srq *srq, Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq, uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key = 0);\r
+ QueuePair(CephContext *c, Infiniband& infiniband, ibv_qp_type type,\r
+ int ib_physical_port, ibv_srq *srq,\r
+ Infiniband::CompletionQueue* txcq,\r
+ Infiniband::CompletionQueue* rxcq,\r
+ uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key = 0);\r
~QueuePair();\r
\r
int init();\r
\r
int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia);\r
if (r) {\r
- lderr(infiniband.cct) << __func__ << " failed to query qp: "\r
+ lderr(cct) << __func__ << " failed to query qp: "\r
<< cpp_strerror(errno) << dendl;\r
return -1;\r
}\r
\r
int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia);\r
if (r) {\r
- lderr(infiniband.cct) << __func__ << " failed to query qp: "\r
+ lderr(cct) << __func__ << " failed to query qp: "\r
<< cpp_strerror(errno) << dendl;\r
return -1;\r
}\r
\r
int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia);\r
if (r) {\r
- lderr(infiniband.cct) << __func__ << " failed to get state: "\r
+ lderr(cct) << __func__ << " failed to get state: "\r
<< cpp_strerror(errno) << dendl;\r
return -1;\r
}\r
\r
int r = ibv_query_qp(qp, &qpa, -1, &qpia);\r
if (r) {\r
- lderr(infiniband.cct) << __func__ << " failed to get state: "\r
- << cpp_strerror(errno) << dendl;\r
+ lderr(cct) << __func__ << " failed to get state: "\r
+ << cpp_strerror(errno) << dendl;\r
return true;\r
}\r
return qpa.cur_qp_state == IBV_QPS_ERR;\r
bool is_dead() const { return dead; }\r
\r
private:\r
+ CephContext *cct;\r
Infiniband& infiniband; // Infiniband to which this QP belongs\r
ibv_qp_type type; // QP type (IBV_QPT_RC, etc.)\r
ibv_context* ctxt; // device context of the HCA to use\r
public:\r
typedef MemoryManager::Cluster Cluster;\r
typedef MemoryManager::Chunk Chunk;\r
- QueuePair* create_queue_pair(CompletionQueue*, CompletionQueue*, ibv_qp_type type);\r
+ QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*, ibv_qp_type type);\r
ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);\r
int post_chunk(Chunk* chunk);\r
int post_channel_cluster();\r
int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes) {\r
return memory_manager->get_send_buffers(c, bytes);\r
}\r
- CompletionChannel *create_comp_channel();\r
- CompletionQueue *create_comp_queue(CompletionChannel *cc=NULL);\r
+ CompletionChannel *create_comp_channel(CephContext *c);\r
+ CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);\r
uint8_t get_ib_physical_port() {\r
return ib_physical_port;\r
}\r
- int send_msg(int sd, IBSYNMsg& msg);\r
- int recv_msg(int sd, IBSYNMsg& msg);\r
+ int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);\r
+ int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);\r
uint16_t get_lid() { return device->get_lid(); }\r
ibv_gid get_gid() { return device->get_gid(); }\r
MemoryManager* get_memory_manager() { return memory_manager; }\r
ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;\r
infiniband->net.set_priority(tcp_fd, opts.priority);\r
my_msg.peer_qpn = 0;\r
- r = infiniband->send_msg(tcp_fd, my_msg);\r
+ r = infiniband->send_msg(cct, tcp_fd, my_msg);\r
if (r < 0)\r
return r;\r
\r
\r
void RDMAConnectedSocketImpl::handle_connection() {\r
ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " fd: " << notify_fd << dendl;\r
- int r = infiniband->recv_msg(tcp_fd, peer_msg);\r
+ int r = infiniband->recv_msg(cct, tcp_fd, peer_msg);\r
if (r < 0) {\r
if (r != -EAGAIN)\r
fault();\r
assert(!r);\r
}\r
notify();\r
- r = infiniband->send_msg(tcp_fd, my_msg);\r
+ r = infiniband->send_msg(cct, tcp_fd, my_msg);\r
if (r < 0) {\r
ldout(cct, 1) << __func__ << " send client ack failed." << dendl;\r
fault();\r
ldout(cct, 10) << __func__ << " server is already active." << dendl;\r
return ;\r
}\r
- r = infiniband->send_msg(tcp_fd, my_msg);\r
+ r = infiniband->send_msg(cct, tcp_fd, my_msg);\r
if (r < 0) {\r
ldout(cct, 1) << __func__ << " server ack failed." << dendl;\r
fault();\r
explicit RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s)
: cct(c), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
w_lock("RDMADispatcher::for worker pending list"), qp_lock("for qp lock"), stack(s) {
- rx_cc = ib->create_comp_channel();
+ rx_cc = ib->create_comp_channel(c);
assert(rx_cc);
- rx_cq = ib->create_comp_queue(rx_cc);
+ rx_cq = ib->create_comp_queue(c, rx_cc);
assert(rx_cq);
t = std::thread(&RDMADispatcher::polling, this);
cct->register_fork_watcher(this);
dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"),
is_server(false), con_handler(new C_handle_connection(this)),
active(false), detached(false) {
- qp = infiniband->create_queue_pair(s->get_rx_cq(), s->get_rx_cq(), IBV_QPT_RC);
+ qp = infiniband->create_queue_pair(
+ cct, s->get_rx_cq(), s->get_rx_cq(), IBV_QPT_RC);
my_msg.qpn = qp->get_local_qp_number();
my_msg.psn = qp->get_initial_psn();
my_msg.lid = infiniband->get_lid();