]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/rdma: implement send/recv in QueuePair
authorChangcheng Liu <changcheng.liu@aliyun.com>
Tue, 27 Aug 2019 03:48:52 +0000 (11:48 +0800)
committerChangcheng Liu <changcheng.liu@aliyun.com>
Tue, 10 Sep 2019 13:22:12 +0000 (21:22 +0800)
send/recv is used to transact connection management meta data.
QueuePair is the obj which has the meta data. Use QueuePair to
transact the meta data.
1. rename send_msg to send_cm_meta
2. rename recv_msg to recv_cm_meta
3. move send/recv_cm_meta to QueuePair class scope
4. change code to adapt to the above change

Signed-off-by: Changcheng Liu <changcheng.liu@aliyun.com>
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAConnectedSocketImpl.cc

index 23fc5ff31d2e88a128f0a884b0a8776114df089c..b511df29327fa0a57fbad61f06b61938703db675 100644 (file)
@@ -266,6 +266,98 @@ int Infiniband::QueuePair::init()
   return 0;
 }
 
+void Infiniband::QueuePair::wire_gid_to_gid(const char *wgid, ib_cm_meta_t* cm_meta_data)
+{
+  char tmp[9];
+  uint32_t v32;
+  int i;
+
+  for (tmp[8] = 0, i = 0; i < 4; ++i) {
+    memcpy(tmp, wgid + i * 8, 8);
+    sscanf(tmp, "%x", &v32);
+    *(uint32_t *)(&cm_meta_data->gid.raw[i * 4]) = ntohl(v32);
+  }
+}
+
+void Infiniband::QueuePair::gid_to_wire_gid(const ib_cm_meta_t& cm_meta_data, char wgid[])
+{
+  for (int i = 0; i < 4; ++i)
+    sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(cm_meta_data.gid.raw + i * 4)));
+}
+
+/*
+ * return value
+ *   1: means no valid buffer read
+ *   0: means got enough buffer
+ * < 0: means error
+ */
+int Infiniband::QueuePair::recv_cm_meta(CephContext *cct, int socket_fd, ib_cm_meta_t& cm_meta_data)
+{
+  char msg[TCP_MSG_LEN];
+  char gid[33];
+  ssize_t r = ::read(socket_fd, &msg, sizeof(msg));
+  // Drop incoming qpt
+  if (cct->_conf->ms_inject_socket_failures && socket_fd >= 0) {
+    if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
+      ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
+      return -EINVAL;
+    }
+  }
+  if (r < 0) {
+    r = -errno;
+    lderr(cct) << __func__ << " got error " << r << ": "
+               << cpp_strerror(r) << dendl;
+  } else if (r == 0) { // valid disconnect message of length 0
+    ldout(cct, 10) << __func__ << " got disconnect message " << dendl;
+  } else if ((size_t)r != sizeof(msg)) { // invalid message
+    ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
+    r = -EINVAL;
+  } else { // valid message
+    sscanf(msg, "%hx:%x:%x:%x:%s", &(cm_meta_data.lid), &(cm_meta_data.local_qpn), &(cm_meta_data.psn), &(cm_meta_data.peer_qpn), gid);
+    wire_gid_to_gid(gid, &cm_meta_data);
+    ldout(cct, 5) << __func__ << " recevd: " << cm_meta_data.lid << ", " << cm_meta_data.local_qpn
+                  << ", " << cm_meta_data.psn << ", " << cm_meta_data.peer_qpn << ", " << gid << dendl;
+  }
+  return r;
+}
+
+int Infiniband::QueuePair::send_cm_meta(CephContext *cct, int socket_fd, ib_cm_meta_t& cm_meta_data)
+{
+  int retry = 0;
+  ssize_t r;
+
+  char msg[TCP_MSG_LEN];
+  char gid[33];
+retry:
+  gid_to_wire_gid(cm_meta_data, gid);
+  sprintf(msg, "%04x:%08x:%08x:%08x:%s", cm_meta_data.lid, cm_meta_data.local_qpn, cm_meta_data.psn, cm_meta_data.peer_qpn, gid);
+  ldout(cct, 10) << __func__ << " sending: " << cm_meta_data.lid << ", " << cm_meta_data.local_qpn
+                 << ", " << cm_meta_data.psn << ", " << cm_meta_data.peer_qpn << ", "  << gid  << dendl;
+  r = ::write(socket_fd, msg, sizeof(msg));
+  // Drop incoming qpt
+  if (cct->_conf->ms_inject_socket_failures && socket_fd >= 0) {
+    if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
+      ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
+      return -EINVAL;
+    }
+  }
+
+  if ((size_t)r != sizeof(msg)) {
+    // FIXME need to handle EAGAIN instead of retry
+    if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {
+      retry++;
+      goto retry;
+    }
+    if (r < 0)
+      lderr(cct) << __func__ << " send returned error " << errno << ": "
+                 << cpp_strerror(errno) << dendl;
+    else
+      lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;
+    return -errno;
+  }
+  return 0;
+}
+
 /**
  * Change RC QueuePair into the ERROR state. This is necessary modify
  * the Queue Pair into the Error state and poll all of the relevant
@@ -1043,93 +1135,6 @@ Infiniband::CompletionQueue* Infiniband::create_comp_queue(
   return cq;
 }
 
-// 1 means no valid buffer read, 0 means got enough buffer
-// else return < 0 means error
-int Infiniband::recv_msg(CephContext *cct, int sd, ib_cm_meta_t& im)
-{
-  char msg[TCP_MSG_LEN];
-  char gid[33];
-  ssize_t r = ::read(sd, &msg, sizeof(msg));
-  // Drop incoming qpt
-  if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
-    if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
-      ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
-      return -EINVAL;
-    }
-  }
-  if (r < 0) {
-    r = -errno;
-    lderr(cct) << __func__ << " got error " << r << ": "
-               << cpp_strerror(r) << dendl;
-  } else if (r == 0) { // valid disconnect message of length 0
-    ldout(cct, 10) << __func__ << " got disconnect message " << dendl;
-  } else if ((size_t)r != sizeof(msg)) { // invalid message
-    ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
-    r = -EINVAL;
-  } else { // valid message
-    sscanf(msg, "%hx:%x:%x:%x:%s", &(im.lid), &(im.local_qpn), &(im.psn), &(im.peer_qpn),gid);
-    wire_gid_to_gid(gid, &im);
-    ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.local_qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid  << dendl;
-  }
-  return r;
-}
-
-int Infiniband::send_msg(CephContext *cct, int sd, ib_cm_meta_t& im)
-{
-  int retry = 0;
-  ssize_t r;
-
-  char msg[TCP_MSG_LEN];
-  char gid[33];
-retry:
-  gid_to_wire_gid(im, gid);
-  sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.local_qpn, im.psn, im.peer_qpn, gid);
-  ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.local_qpn << ", " << im.psn
-                 << ", " << im.peer_qpn << ", "  << gid  << dendl;
-  r = ::write(sd, msg, sizeof(msg));
-  // Drop incoming qpt
-  if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
-    if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
-      ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
-      return -EINVAL;
-    }
-  }
-
-  if ((size_t)r != sizeof(msg)) {
-    // FIXME need to handle EAGAIN instead of retry
-    if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {
-      retry++;
-      goto retry;
-    }
-    if (r < 0)
-      lderr(cct) << __func__ << " send returned error " << errno << ": "
-                 << cpp_strerror(errno) << dendl;
-    else
-      lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;
-    return -errno;
-  }
-  return 0;
-}
-
-void Infiniband::wire_gid_to_gid(const char *wgid, ib_cm_meta_t* im)
-{
-  char tmp[9];
-  uint32_t v32;
-  int i;
-
-  for (tmp[8] = 0, i = 0; i < 4; ++i) {
-    memcpy(tmp, wgid + i * 8, 8);
-    sscanf(tmp, "%x", &v32);
-    *(uint32_t *)(&im->gid.raw[i * 4]) = ntohl(v32);
-  }
-}
-
-void Infiniband::gid_to_wire_gid(const ib_cm_meta_t& im, char wgid[])
-{
-  for (int i = 0; i < 4; ++i)
-    sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(im.gid.raw + i * 4)));
-}
-
 Infiniband::QueuePair::~QueuePair()
 {
   if (qp) {
index 63ac44c2c3e6469d4cec5b4b93372af119d49024..dc7ff9e306aac10fd2dfa0fd30915078d354dc55 100644 (file)
@@ -373,8 +373,6 @@ class Infiniband {
   Device *device = NULL;
   ProtectionDomain *pd = NULL;
   DeviceList *device_list = nullptr;
-  void wire_gid_to_gid(const char *wgid, ib_cm_meta_t* im);
-  void gid_to_wire_gid(const ib_cm_meta_t& im, char wgid[]);
   CephContext *cct;
   ceph::mutex lock = ceph::make_mutex("IB lock");
   bool initialized = false;
@@ -475,6 +473,13 @@ class Infiniband {
      * Get the state of a QueuePair.
      */
     int get_state() const;
+    /*
+     * send/receive connection management meta data
+     */
+    int send_cm_meta(CephContext *cct, int socket_fd, ib_cm_meta_t& cm_meta_data);
+    int recv_cm_meta(CephContext *cct, int socket_fd, ib_cm_meta_t& cm_meta_data);
+    void wire_gid_to_gid(const char *wgid, ib_cm_meta_t* cm_meta_data);
+    void gid_to_wire_gid(const ib_cm_meta_t& cm_meta_data, char wgid[]);
     void add_tx_wr(uint32_t amt) { tx_wr_inflight += amt; }
     void dec_tx_wr(uint32_t amt) { tx_wr_inflight -= amt; }
     uint32_t get_tx_wr() const { return tx_wr_inflight; }
@@ -519,8 +524,6 @@ class Infiniband {
   CompletionChannel *create_comp_channel(CephContext *c);
   CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);
   uint8_t get_ib_physical_port() { return ib_physical_port; }
-  int send_msg(CephContext *cct, int sd, ib_cm_meta_t& msg);
-  int recv_msg(CephContext *cct, int sd, ib_cm_meta_t& msg);
   uint16_t get_lid() { return device->get_lid(); }
   ibv_gid get_gid() { return device->get_gid(); }
   MemoryManager* get_memory_manager() { return memory_manager; }
index 51619010065257194be0760fbca6e2cd4038ce71..9b4e9cec948583a26457bdbcc1f3bac819da0e04 100644 (file)
@@ -190,7 +190,7 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S
   ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
   net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
   local_cm_meta.peer_qpn = 0;
-  r = ib->send_msg(cct, tcp_fd, local_cm_meta);
+  r = qp->send_cm_meta(cct, tcp_fd, local_cm_meta);
   if (r < 0)
     return r;
 
@@ -200,7 +200,7 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S
 
 void RDMAConnectedSocketImpl::handle_connection() {
   ldout(cct, 20) << __func__ << " QP: " << local_cm_meta.local_qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
-  int r = ib->recv_msg(cct, tcp_fd, peer_cm_meta);
+  int r = qp->recv_cm_meta(cct, tcp_fd, peer_cm_meta);
   if (r <= 0) {
     if (r != -EAGAIN) {
       dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
@@ -225,7 +225,7 @@ void RDMAConnectedSocketImpl::handle_connection() {
       ceph_assert(!r);
     }
     notify();
-    r = ib->send_msg(cct, tcp_fd, local_cm_meta);
+    r = qp->send_cm_meta(cct, tcp_fd, local_cm_meta);
     if (r < 0) {
       ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
       dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
@@ -239,7 +239,7 @@ void RDMAConnectedSocketImpl::handle_connection() {
       }
       r = activate();
       ceph_assert(!r);
-      r = ib->send_msg(cct, tcp_fd, local_cm_meta);
+      r = qp->send_cm_meta(cct, tcp_fd, local_cm_meta);
       if (r < 0) {
         ldout(cct, 1) << __func__ << " server ack failed." << dendl;
         dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);