]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/rdma: ensure CephContext existed
authorHaomai Wang <haomai@xsky.com>
Fri, 18 Nov 2016 10:58:26 +0000 (18:58 +0800)
committerHaomai Wang <haomai@xsky.com>
Fri, 18 Nov 2016 11:20:10 +0000 (19:20 +0800)
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAStack.h

index ba9e7799a2b22baab561620e9f330bb56abc25f9..707814e3ffa53da6f41ae9e7bd23187fa79f3fa2 100644 (file)
@@ -27,7 +27,7 @@ static const uint32_t MAX_INLINE_DATA = 128;
 static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");\r
 static const uint32_t CQ_DEPTH = 30000;\r
 \r
-Device::Device(CephContext *c, ibv_device* d): cct(c), device(d), device_attr(new ibv_device_attr), active_port(nullptr)\r
+Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_device_attr), active_port(nullptr)\r
 {\r
   if (device == NULL) {\r
     lderr(cct) << __func__ << "device == NULL" << cpp_strerror(errno) << dendl;\r
@@ -46,7 +46,7 @@ Device::Device(CephContext *c, ibv_device* d): cct(c), device(d), device_attr(ne
   }\r
 }\r
 \r
-void Device::binding_port(uint8_t port_num) {\r
+void Device::binding_port(CephContext *cct, uint8_t port_num) {\r
   port_cnt = device_attr->phys_port_cnt;\r
   ports = new Port*[port_cnt];\r
   for (uint8_t i = 0; i < port_cnt; ++i) {\r
@@ -65,10 +65,10 @@ void Device::binding_port(uint8_t port_num) {
   }\r
 }\r
 \r
-Infiniband::Infiniband(CephContext *c, const std::string &device_name, uint8_t port_num): cct(c), device_list(c), net(c)\r
+Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num): device_list(cct), net(cct)\r
 {\r
   device = device_list.get_device(device_name.c_str());\r
-  device->binding_port(port_num);\r
+  device->binding_port(cct, port_num);\r
   assert(device);\r
   ib_physical_port = device->active_port->get_port_num();\r
   pd = new ProtectionDomain(cct, device);\r
@@ -88,7 +88,8 @@ Infiniband::Infiniband(CephContext *c, const std::string &device_name, uint8_t p
   ldout(cct, 1) << __func__ << " device allow " << device->device_attr->max_cqe\r
                 << " completion entries" << dendl;\r
 \r
-  memory_manager = new MemoryManager(cct, device, pd);\r
+  memory_manager = new MemoryManager(device, pd,\r
+                                     cct->_conf->ms_async_rdma_enable_hugepage);\r
   memory_manager->register_rx_tx(\r
       cct->_conf->ms_async_rdma_buffer_size,\r
       cct->_conf->ms_async_rdma_receive_buffers,\r
@@ -110,7 +111,6 @@ Infiniband::Infiniband(CephContext *c, const std::string &device_name, uint8_t p
  */\r
 ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge)\r
 {\r
-  ldout(cct, 20) << __func__ << " max_wr=" << max_wr << " max_sge=" << max_sge << dendl;\r
   ibv_srq_init_attr sia;\r
   memset(&sia, 0, sizeof(sia));\r
   sia.srq_context = device->ctxt;\r
@@ -129,9 +129,10 @@ ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_s
  *      QueuePair on success or NULL if init fails\r
  * See QueuePair::QueuePair for parameter documentation.\r
  */\r
-Infiniband::QueuePair* Infiniband::create_queue_pair(CompletionQueue *tx, CompletionQueue* rx, ibv_qp_type type)\r
+Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx, CompletionQueue* rx, ibv_qp_type type)\r
 {\r
-  Infiniband::QueuePair *qp = new QueuePair(*this, type, ib_physical_port, srq, tx, rx, max_send_wr, max_recv_wr);\r
+  Infiniband::QueuePair *qp = new QueuePair(\r
+      cct, *this, type, ib_physical_port, srq, tx, rx, max_send_wr, max_recv_wr);\r
   if (qp->init()) {\r
     delete qp;\r
     return NULL;\r
@@ -141,7 +142,7 @@ Infiniband::QueuePair* Infiniband::create_queue_pair(CompletionQueue *tx, Comple
 \r
 int Infiniband::QueuePair::init()\r
 {\r
-  ldout(infiniband.cct, 20) << __func__ << " started." << dendl;\r
+  ldout(cct, 20) << __func__ << " started." << dendl;\r
   ibv_qp_init_attr qpia;\r
   memset(&qpia, 0, sizeof(qpia));\r
   qpia.send_cq = txcq->get_cq();\r
@@ -155,12 +156,12 @@ int Infiniband::QueuePair::init()
 \r
   qp = ibv_create_qp(pd, &qpia);\r
   if (qp == NULL) {\r
-    lderr(infiniband.cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;\r
+    lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;\r
     return -1;\r
   }\r
 \r
-  ldout(infiniband.cct, 20) << __func__ << " successfully create queue pair: "\r
-    << "qp=" << qp << dendl;\r
+  ldout(cct, 20) << __func__ << " successfully create queue pair: "\r
+                 << "qp=" << qp << dendl;\r
 \r
   // move from RESET to INIT state\r
   ibv_qp_attr qpa;\r
@@ -190,12 +191,12 @@ int Infiniband::QueuePair::init()
   int ret = ibv_modify_qp(qp, &qpa, mask);\r
   if (ret) {\r
     ibv_destroy_qp(qp);\r
-    lderr(infiniband.cct) << __func__ << " failed to transition to INIT state: "\r
-      << cpp_strerror(errno) << dendl;\r
+    lderr(cct) << __func__ << " failed to transition to INIT state: "\r
+               << cpp_strerror(errno) << dendl;\r
     return -1;\r
   }\r
-  ldout(infiniband.cct, 20) << __func__ << " successfully change queue pair to INIT:"\r
-    << " qp=" << qp << dendl;\r
+  ldout(cct, 20) << __func__ << " successfully change queue pair to INIT:"\r
+                 << " qp=" << qp << dendl;\r
   return 0;\r
 }\r
 \r
@@ -224,8 +225,8 @@ int Infiniband::QueuePair::to_dead()
   int mask = IBV_QP_STATE;\r
   int ret = ibv_modify_qp(qp, &qpa, mask);\r
   if (ret) {\r
-    lderr(infiniband.cct) << __func__ << " failed to transition to ERROR state: "\r
-                           << cpp_strerror(errno) << dendl;\r
+    lderr(cct) << __func__ << " failed to transition to ERROR state: "\r
+               << cpp_strerror(errno) << dendl;\r
     return -errno;\r
   }\r
   dead = true;\r
@@ -248,11 +249,8 @@ int Infiniband::post_chunk(Chunk* chunk)
 \r
   ibv_recv_wr *badWorkRequest;\r
   int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest);\r
-  if (ret) {\r
-    lderr(cct) << __func__ << " ib_post_srq_recv failed on post "\r
-               << cpp_strerror(errno) << dendl;\r
+  if (ret)\r
     return -1;\r
-  }\r
   return 0;\r
 }\r
 \r
@@ -265,14 +263,12 @@ int Infiniband::post_channel_cluster()
     r = post_chunk(*iter);\r
     assert(r == 0);\r
   }\r
-  ldout(cct, 20) << __func__ << " posted buffers to srq. "<< dendl;\r
   return 0;\r
 }\r
 \r
-Infiniband::CompletionChannel* Infiniband::create_comp_channel()\r
+Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c)\r
 {\r
-  ldout(cct, 20) << __func__ << " started." << dendl;\r
-  Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(*this);\r
+  Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this);\r
   if (cc->init()) {\r
     delete cc;\r
     return NULL;\r
@@ -280,9 +276,11 @@ Infiniband::CompletionChannel* Infiniband::create_comp_channel()
   return cc;\r
 }\r
 \r
-Infiniband::CompletionQueue* Infiniband::create_comp_queue(CompletionChannel *cc)\r
+Infiniband::CompletionQueue* Infiniband::create_comp_queue(\r
+    CephContext *cct, CompletionChannel *cc)\r
 {\r
-  Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(*this, CQ_DEPTH, cc);\r
+  Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(\r
+      cct, *this, CQ_DEPTH, cc);\r
   if (cq->init()) {\r
     delete cq;\r
     return NULL;\r
@@ -292,10 +290,11 @@ Infiniband::CompletionQueue* Infiniband::create_comp_queue(CompletionChannel *cc
 \r
 \r
 Infiniband::QueuePair::QueuePair(\r
-    Infiniband& infiniband, ibv_qp_type type, int port, ibv_srq *srq,\r
+    CephContext *c, Infiniband& infiniband, ibv_qp_type type,\r
+    int port, ibv_srq *srq,\r
     Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,\r
     uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key)\r
-: infiniband(infiniband),\r
+: cct(c), infiniband(infiniband),\r
   type(type),\r
   ctxt(infiniband.device->ctxt),\r
   ib_physical_port(port),\r
@@ -312,7 +311,7 @@ Infiniband::QueuePair::QueuePair(
 {\r
   initial_psn = lrand48() & 0xffffff;\r
   if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) {\r
-    lderr(infiniband.cct) << __func__ << "invalid queue pair type" << cpp_strerror(errno) << dendl;\r
+    lderr(cct) << __func__ << "invalid queue pair type" << cpp_strerror(errno) << dendl;\r
     assert(0);\r
   }\r
   pd = infiniband.pd->pd;\r
@@ -320,7 +319,7 @@ Infiniband::QueuePair::QueuePair(
 \r
 // 1 means no valid buffer read, 0 means got enough buffer\r
 // else return < 0 means error\r
-int Infiniband::recv_msg(int sd, IBSYNMsg& im)\r
+int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)\r
 {\r
   char msg[TCP_MSG_LEN];\r
   char gid[33];\r
@@ -347,7 +346,7 @@ int Infiniband::recv_msg(int sd, IBSYNMsg& im)
   return r;\r
 }\r
 \r
-int Infiniband::send_msg(int sd, IBSYNMsg& im)\r
+int Infiniband::send_msg(CephContext *cct, int sd, IBSYNMsg& im)\r
 {\r
   int retry = 0;\r
   ssize_t r;\r
@@ -414,7 +413,7 @@ Infiniband::CompletionChannel::~CompletionChannel()
   if (channel) {\r
     int r = ibv_destroy_comp_channel(channel);\r
     if (r < 0)\r
-      lderr(infiniband.cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl;\r
+      lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl;\r
     assert(r == 0);\r
   }\r
 }\r
@@ -424,24 +423,24 @@ Infiniband::CompletionQueue::~CompletionQueue()
   if (cq) {\r
     int r = ibv_destroy_cq(cq);\r
     if (r < 0)\r
-      lderr(infiniband.cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl;\r
+      lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl;\r
     assert(r == 0);\r
   }\r
 }\r
 \r
 int Infiniband::CompletionQueue::rearm_notify(bool solicite_only)\r
 {\r
-  ldout(infiniband.cct, 20) << __func__ << " started." << dendl;\r
+  ldout(cct, 20) << __func__ << " started." << dendl;\r
   int r = ibv_req_notify_cq(cq, 0);\r
   if (r < 0)\r
-    lderr(infiniband.cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl;\r
+    lderr(cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl;\r
   return r;\r
 }\r
 \r
 int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) {\r
   int r = ibv_poll_cq(cq, num_entries, ret_wc_array);\r
   if (r < 0) {\r
-    lderr(infiniband.cct) << __func__ << " poll_completion_queue occur met error: "\r
+    lderr(cct) << __func__ << " poll_completion_queue occur met error: "\r
       << cpp_strerror(errno) << dendl;\r
     return -1;\r
   }\r
@@ -450,13 +449,12 @@ int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array)
 \r
 bool Infiniband::CompletionChannel::get_cq_event()\r
 {\r
-  //  ldout(infiniband.cct, 21) << __func__ << " started." << dendl;\r
   ibv_cq *cq = NULL;\r
   void *ev_ctx;\r
   if (ibv_get_cq_event(channel, &cq, &ev_ctx)) {\r
     if (errno != EAGAIN && errno != EINTR)\r
-      lderr(infiniband.cct) << __func__ << "failed to retrieve CQ event: "\r
-        << cpp_strerror(errno) << dendl;\r
+      lderr(cct) << __func__ << " failed to retrieve CQ event: "\r
+                 << cpp_strerror(errno) << dendl;\r
     return false;\r
   }\r
 \r
@@ -464,7 +462,7 @@ bool Infiniband::CompletionChannel::get_cq_event()
    *    * be acked, and periodically ack them\r
    *       */\r
   if (++cq_events_that_need_ack == MAX_ACK_EVENT) {\r
-    ldout(infiniband.cct, 20) << __func__ << " ack aq events." << dendl;\r
+    ldout(cct, 20) << __func__ << " ack aq events." << dendl;\r
     ibv_ack_cq_events(cq, MAX_ACK_EVENT);\r
     cq_events_that_need_ack = 0;\r
   }\r
@@ -476,29 +474,29 @@ int Infiniband::CompletionQueue::init()
 {\r
   cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0);\r
   if (!cq) {\r
-    lderr(infiniband.cct) << __func__ << " failed to create receive completion queue: "\r
+    lderr(cct) << __func__ << " failed to create receive completion queue: "\r
       << cpp_strerror(errno) << dendl;\r
     return -1;\r
   }\r
 \r
   if (ibv_req_notify_cq(cq, 0)) {\r
-    lderr(infiniband.cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl;\r
+    lderr(cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl;\r
     ibv_destroy_cq(cq);\r
     cq = nullptr;\r
     return -1;\r
   }\r
 \r
   channel->bind_cq(cq);\r
-  ldout(infiniband.cct, 20) << __func__ << " successfully create cq=" << cq << dendl;\r
+  ldout(cct, 20) << __func__ << " successfully create cq=" << cq << dendl;\r
   return 0;\r
 }\r
 \r
 int Infiniband::CompletionChannel::init()\r
 {\r
-  ldout(infiniband.cct, 20) << __func__ << " started." << dendl;\r
+  ldout(cct, 20) << __func__ << " started." << dendl;\r
   channel = ibv_create_comp_channel(infiniband.device->ctxt);\r
   if (!channel) {\r
-    lderr(infiniband.cct) << __func__ << " failed to create receive completion channel: "\r
+    lderr(cct) << __func__ << " failed to create receive completion channel: "\r
                           << cpp_strerror(errno) << dendl;\r
     return -1;\r
   }\r
@@ -547,7 +545,7 @@ const char* Infiniband::wc_status_to_string(int status)
   };\r
 \r
   if (status < IBV_WC_SUCCESS || status > IBV_WC_GENERAL_ERR)\r
-      return "<status out of range!>";\r
+    return "<status out of range!>";\r
   return lookup[status];\r
 }\r
 \r
index 921adfe6a33e0f168f215346998018091ace8bf7..dd0e53a5c5e52b5dd476496c5c62d322a76649e6 100644 (file)
@@ -46,7 +46,6 @@ class RDMAStack;
 class CephContext;\r
 \r
 class Port {\r
-  CephContext *cct;\r
   struct ibv_context* ctxt;\r
   uint8_t port_num;\r
   struct ibv_port_attr* port_attr;\r
@@ -55,7 +54,7 @@ class Port {
   union ibv_gid gid;\r
 \r
  public:\r
-  explicit Port(CephContext *c, struct ibv_context* ictxt, uint8_t ipn): cct(c), ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr) {\r
+  explicit Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr) {\r
     int r = ibv_query_port(ctxt, port_num, port_attr);\r
     if (r == -1) {\r
       lderr(cct) << __func__  << " query port failed  " << cpp_strerror(errno) << dendl;\r
@@ -78,7 +77,6 @@ class Port {
 \r
 \r
 class Device {\r
-  CephContext *cct;\r
   ibv_device *device;\r
   const char* name;\r
   uint8_t  port_cnt;\r
@@ -94,7 +92,7 @@ class Device {
   const char* get_name() { return name;}\r
   uint16_t get_lid() { return active_port->get_lid(); }\r
   ibv_gid get_gid() { return active_port->get_gid(); }\r
-  void binding_port(uint8_t port_num);\r
+  void binding_port(CephContext *c, uint8_t port_num);\r
   struct ibv_context *ctxt;\r
   ibv_device_attr *device_attr;\r
   Port* active_port;\r
@@ -102,12 +100,11 @@ class Device {
 \r
 \r
 class DeviceList {\r
-  CephContext *cct;\r
   struct ibv_device ** device_list;\r
   int num;\r
   Device** devices;\r
  public:\r
-  DeviceList(CephContext *c): cct(c), device_list(ibv_get_device_list(&num)) {\r
+  DeviceList(CephContext *cct): device_list(ibv_get_device_list(&num)) {\r
     if (device_list == NULL || num == 0) {\r
       lderr(cct) << __func__ << " failed to get rdma device list.  " << cpp_strerror(errno) << dendl;\r
       assert(0);\r
@@ -142,8 +139,8 @@ class Infiniband {
  public:\r
   class ProtectionDomain {\r
    public:\r
-    explicit ProtectionDomain(CephContext *c, Device *device)\r
-      : cct(c), pd(ibv_alloc_pd(device->ctxt))\r
+    explicit ProtectionDomain(CephContext *cct, Device *device)\r
+      : pd(ibv_alloc_pd(device->ctxt))\r
     {\r
       if (pd == NULL) {\r
         lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl;\r
@@ -152,12 +149,8 @@ class Infiniband {
     }\r
     ~ProtectionDomain() {\r
       int rc = ibv_dealloc_pd(pd);\r
-      if (rc != 0) {\r
-        lderr(cct) << __func__ << " ibv_dealloc_pd failed: "\r
-          << cpp_strerror(errno) << dendl;\r
-      }\r
+      assert(rc == 0);\r
     }\r
-    CephContext *cct;\r
     ibv_pd* const pd;\r
   };\r
 \r
@@ -326,8 +319,8 @@ class Infiniband {
       char* base;\r
     };\r
 \r
-    MemoryManager(CephContext *cct, Device *d, ProtectionDomain *p) : cct(cct), device(d), pd(p) {\r
-      enabled_huge_page = cct->_conf->ms_async_rdma_enable_hugepage;\r
+    MemoryManager(Device *d, ProtectionDomain *p, bool hugepage): device(d), pd(p) {\r
+      enabled_huge_page = hugepage;\r
     }\r
     ~MemoryManager() {\r
       if (channel)\r
@@ -339,13 +332,11 @@ class Infiniband {
       size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE);\r
       char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0);\r
       if (ptr == MAP_FAILED) {\r
-        lderr(cct) << __func__ << " MAP_FAILED" << dendl;\r
         ptr = (char *)malloc(real_size);\r
         if (ptr == NULL) return NULL;\r
         real_size = 0;\r
       }\r
       *((size_t *)ptr) = real_size;\r
-      lderr(cct) << __func__ << " bingo!" << dendl;\r
       return ptr + HUGE_PAGE_SIZE;\r
     }\r
     void free_huge_pages(void *ptr) {\r
@@ -388,7 +379,6 @@ class Infiniband {
    private:\r
     Cluster* channel;//RECV\r
     Cluster* send;// SEND\r
-    CephContext *cct;\r
     Device *device;\r
     ProtectionDomain *pd;\r
   };\r
@@ -402,7 +392,6 @@ class Infiniband {
   ibv_srq* srq;             // shared receive work queue\r
   Device *device;\r
   ProtectionDomain *pd;\r
-  CephContext* cct;\r
   DeviceList device_list;\r
   void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);\r
   void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);\r
@@ -421,14 +410,16 @@ class Infiniband {
   }\r
 \r
   class CompletionChannel {\r
-     static const uint32_t MAX_ACK_EVENT = 5000;\r
-     Infiniband& infiniband;\r
-     ibv_comp_channel *channel;\r
-     ibv_cq *cq;\r
-     uint32_t cq_events_that_need_ack;\r
+    static const uint32_t MAX_ACK_EVENT = 5000;\r
+    CephContext *cct;\r
+    Infiniband& infiniband;\r
+    ibv_comp_channel *channel;\r
+    ibv_cq *cq;\r
+    uint32_t cq_events_that_need_ack;\r
 \r
    public:\r
-    CompletionChannel(Infiniband &ib): infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0) {}\r
+    CompletionChannel(CephContext *c, Infiniband &ib)\r
+      : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0) {}\r
     ~CompletionChannel();\r
     int init();\r
     bool get_cq_event();\r
@@ -447,7 +438,9 @@ class Infiniband {
   // You need to call init and it will create a cq and associate to comp channel\r
   class CompletionQueue {\r
    public:\r
-    CompletionQueue(Infiniband &ib, const uint32_t qd, CompletionChannel *cc):infiniband(ib), channel(cc), cq(NULL), queue_depth(qd) {}\r
+    CompletionQueue(CephContext *c, Infiniband &ib,\r
+                    const uint32_t qd, CompletionChannel *cc)\r
+      : cct(c), infiniband(ib), channel(cc), cq(NULL), queue_depth(qd) {}\r
     ~CompletionQueue();\r
     int init();\r
     int poll_cq(int num_entries, ibv_wc *ret_wc_array);\r
@@ -456,6 +449,7 @@ class Infiniband {
     int rearm_notify(bool solicited_only=true);\r
     CompletionChannel* get_cc() const { return channel; }\r
    private:\r
+    CephContext *cct;\r
     Infiniband&  infiniband;     // Infiniband to which this QP belongs\r
     CompletionChannel *channel;\r
     ibv_cq *cq;\r
@@ -470,7 +464,11 @@ class Infiniband {
   // must call plumb() to bring the queue pair to the RTS state.\r
   class QueuePair {\r
    public:\r
-    QueuePair(Infiniband& infiniband, ibv_qp_type type,int ib_physical_port,  ibv_srq *srq, Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq, uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key = 0);\r
+    QueuePair(CephContext *c, Infiniband& infiniband, ibv_qp_type type,\r
+              int ib_physical_port,  ibv_srq *srq,\r
+              Infiniband::CompletionQueue* txcq,\r
+              Infiniband::CompletionQueue* rxcq,\r
+              uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key = 0);\r
     ~QueuePair();\r
 \r
     int init();\r
@@ -496,7 +494,7 @@ class Infiniband {
 \r
       int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia);\r
       if (r) {\r
-        lderr(infiniband.cct) << __func__ << " failed to query qp: "\r
+        lderr(cct) << __func__ << " failed to query qp: "\r
           << cpp_strerror(errno) << dendl;\r
         return -1;\r
       }\r
@@ -516,7 +514,7 @@ class Infiniband {
 \r
       int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia);\r
       if (r) {\r
-        lderr(infiniband.cct) << __func__ << " failed to query qp: "\r
+        lderr(cct) << __func__ << " failed to query qp: "\r
           << cpp_strerror(errno) << dendl;\r
         return -1;\r
       }\r
@@ -534,7 +532,7 @@ class Infiniband {
 \r
       int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia);\r
       if (r) {\r
-        lderr(infiniband.cct) << __func__ << " failed to get state: "\r
+        lderr(cct) << __func__ << " failed to get state: "\r
           << cpp_strerror(errno) << dendl;\r
         return -1;\r
       }\r
@@ -549,8 +547,8 @@ class Infiniband {
 \r
       int r = ibv_query_qp(qp, &qpa, -1, &qpia);\r
       if (r) {\r
-        lderr(infiniband.cct) << __func__ << " failed to get state: "\r
-          << cpp_strerror(errno) << dendl;\r
+        lderr(cct) << __func__ << " failed to get state: "\r
+                              << cpp_strerror(errno) << dendl;\r
         return true;\r
       }\r
       return qpa.cur_qp_state == IBV_QPS_ERR;\r
@@ -562,6 +560,7 @@ class Infiniband {
     bool is_dead() const { return dead; }\r
 \r
    private:\r
+    CephContext  *cct;\r
     Infiniband&  infiniband;     // Infiniband to which this QP belongs\r
     ibv_qp_type  type;           // QP type (IBV_QPT_RC, etc.)\r
     ibv_context* ctxt;           // device context of the HCA to use\r
@@ -581,20 +580,20 @@ class Infiniband {
  public:\r
   typedef MemoryManager::Cluster Cluster;\r
   typedef MemoryManager::Chunk Chunk;\r
-  QueuePair* create_queue_pair(CompletionQueue*, CompletionQueue*, ibv_qp_type type);\r
+  QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*, ibv_qp_type type);\r
   ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);\r
   int post_chunk(Chunk* chunk);\r
   int post_channel_cluster();\r
   int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes) {\r
     return memory_manager->get_send_buffers(c, bytes);\r
   }\r
-  CompletionChannel *create_comp_channel();\r
-  CompletionQueue *create_comp_queue(CompletionChannel *cc=NULL);\r
+  CompletionChannel *create_comp_channel(CephContext *c);\r
+  CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);\r
   uint8_t get_ib_physical_port() {\r
     return ib_physical_port;\r
   }\r
-  int send_msg(int sd, IBSYNMsg& msg);\r
-  int recv_msg(int sd, IBSYNMsg& msg);\r
+  int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);\r
+  int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);\r
   uint16_t get_lid() { return device->get_lid(); }\r
   ibv_gid get_gid() { return device->get_gid(); }\r
   MemoryManager* get_memory_manager() { return memory_manager; }\r
index 749c94086004a33943afbbb1d5632ea08fc4696a..4227616c33142f82af566762bcfd1eb0227a76d8 100644 (file)
@@ -124,7 +124,7 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S
   ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;\r
   infiniband->net.set_priority(tcp_fd, opts.priority);\r
   my_msg.peer_qpn = 0;\r
-  r = infiniband->send_msg(tcp_fd, my_msg);\r
+  r = infiniband->send_msg(cct, tcp_fd, my_msg);\r
   if (r < 0)\r
     return r;\r
 \r
@@ -134,7 +134,7 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S
 \r
 void RDMAConnectedSocketImpl::handle_connection() {\r
   ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " fd: " << notify_fd << dendl;\r
-  int r = infiniband->recv_msg(tcp_fd, peer_msg);\r
+  int r = infiniband->recv_msg(cct, tcp_fd, peer_msg);\r
   if (r < 0) {\r
     if (r != -EAGAIN)\r
       fault();\r
@@ -150,7 +150,7 @@ void RDMAConnectedSocketImpl::handle_connection() {
       assert(!r);\r
     }\r
     notify();\r
-    r = infiniband->send_msg(tcp_fd, my_msg);\r
+    r = infiniband->send_msg(cct, tcp_fd, my_msg);\r
     if (r < 0) {\r
       ldout(cct, 1) << __func__ << " send client ack failed." << dendl;\r
       fault();\r
@@ -161,7 +161,7 @@ void RDMAConnectedSocketImpl::handle_connection() {
         ldout(cct, 10) << __func__ << " server is already active." << dendl;\r
         return ;\r
       }\r
-      r = infiniband->send_msg(tcp_fd, my_msg);\r
+      r = infiniband->send_msg(cct, tcp_fd, my_msg);\r
       if (r < 0) {\r
         ldout(cct, 1) << __func__ << " server ack failed." << dendl;\r
         fault();\r
index eef848ce6b9f39c9099c9b558a41d2da03c59cef..f8574068d1edb1a5e6f4c0825536593b511e5c38 100644 (file)
@@ -87,9 +87,9 @@ class RDMADispatcher : public CephContext::ForkWatcher {
   explicit RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s)
     : cct(c), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
       w_lock("RDMADispatcher::for worker pending list"), qp_lock("for qp lock"), stack(s) {
-    rx_cc = ib->create_comp_channel();
+    rx_cc = ib->create_comp_channel(c);
     assert(rx_cc);
-    rx_cq = ib->create_comp_queue(rx_cc);
+    rx_cq = ib->create_comp_queue(c, rx_cc);
     assert(rx_cq);
     t = std::thread(&RDMADispatcher::polling, this);
     cct->register_fork_watcher(this);
@@ -287,7 +287,8 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
       dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"),
       is_server(false), con_handler(new C_handle_connection(this)),
       active(false), detached(false) {
-    qp = infiniband->create_queue_pair(s->get_rx_cq(), s->get_rx_cq(), IBV_QPT_RC);
+    qp = infiniband->create_queue_pair(
+        cct, s->get_rx_cq(), s->get_rx_cq(), IBV_QPT_RC);
     my_msg.qpn = qp->get_local_qp_number();
     my_msg.psn = qp->get_initial_psn();
     my_msg.lid = infiniband->get_lid();