]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/rdma: rename type name IBSYNMsg to ib_cm_meta_t
authorChangcheng Liu <changcheng.liu@aliyun.com>
Tue, 27 Aug 2019 02:38:15 +0000 (10:38 +0800)
committerChangcheng Liu <changcheng.liu@aliyun.com>
Tue, 10 Sep 2019 13:22:12 +0000 (21:22 +0800)
IBSYNMsg is responsible for track ib connection management meta
data.
1. rename IBSYNMsg to be ib_cm_meta_t.
2. rename IBSYNMsg::qpn to be IBSYNMsg::local_qpn
3. rename peer_msg to peer_cm_meta & my_msg to local_cm_meta

Signed-off-by: Changcheng Liu <changcheng.liu@aliyun.com>
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc
src/msg/async/rdma/RDMAStack.h

index 360f39dd0a9ceeff285bb46590653c08ba1b1cbc..844861678b00b8fcc907ebbd0a857f0bdfb20e30 100644 (file)
@@ -1043,7 +1043,7 @@ Infiniband::CompletionQueue* Infiniband::create_comp_queue(
 
 // 1 means no valid buffer read, 0 means got enough buffer
 // else return < 0 means error
-int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
+int Infiniband::recv_msg(CephContext *cct, int sd, ib_cm_meta_t& im)
 {
   char msg[TCP_MSG_LEN];
   char gid[33];
@@ -1065,14 +1065,14 @@ int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
     ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
     r = -EINVAL;
   } else { // valid message
-    sscanf(msg, "%hx:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid);
+    sscanf(msg, "%hx:%x:%x:%x:%s", &(im.lid), &(im.local_qpn), &(im.psn), &(im.peer_qpn),gid);
     wire_gid_to_gid(gid, &im);
-    ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid  << dendl;
+    ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.local_qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid  << dendl;
   }
   return r;
 }
 
-int Infiniband::send_msg(CephContext *cct, int sd, IBSYNMsg& im)
+int Infiniband::send_msg(CephContext *cct, int sd, ib_cm_meta_t& im)
 {
   int retry = 0;
   ssize_t r;
@@ -1081,8 +1081,8 @@ int Infiniband::send_msg(CephContext *cct, int sd, IBSYNMsg& im)
   char gid[33];
 retry:
   gid_to_wire_gid(im, gid);
-  sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid);
-  ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn
+  sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.local_qpn, im.psn, im.peer_qpn, gid);
+  ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.local_qpn << ", " << im.psn
                  << ", " << im.peer_qpn << ", "  << gid  << dendl;
   r = ::write(sd, msg, sizeof(msg));
   // Drop incoming qpt
@@ -1109,7 +1109,7 @@ retry:
   return 0;
 }
 
-void Infiniband::wire_gid_to_gid(const char *wgid, IBSYNMsg* im)
+void Infiniband::wire_gid_to_gid(const char *wgid, ib_cm_meta_t* im)
 {
   char tmp[9];
   uint32_t v32;
@@ -1122,7 +1122,7 @@ void Infiniband::wire_gid_to_gid(const char *wgid, IBSYNMsg* im)
   }
 }
 
-void Infiniband::gid_to_wire_gid(const IBSYNMsg& im, char wgid[])
+void Infiniband::gid_to_wire_gid(const ib_cm_meta_t& im, char wgid[])
 {
   for (int i = 0; i < 4; ++i)
     sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(im.gid.raw + i * 4)));
index d47f914e1c241b4d35b7f25a5d30b1bf60664333..7b1da1f2ce95e85b58ff0162b22c279256fb49e7 100644 (file)
@@ -44,9 +44,9 @@
 #define PSN_LEN 24
 #define PSN_MSK ((1 << PSN_LEN) - 1)
 
-struct IBSYNMsg {
+struct ib_cm_meta_t {
   uint16_t lid;
-  uint32_t qpn;
+  uint32_t local_qpn;
   uint32_t psn;
   uint32_t peer_qpn;
   union ibv_gid gid;
@@ -373,8 +373,8 @@ class Infiniband {
   Device *device = NULL;
   ProtectionDomain *pd = NULL;
   DeviceList *device_list = nullptr;
-  void wire_gid_to_gid(const char *wgid, IBSYNMsg* im);
-  void gid_to_wire_gid(const IBSYNMsg& im, char wgid[]);
+  void wire_gid_to_gid(const char *wgid, ib_cm_meta_t* im);
+  void gid_to_wire_gid(const ib_cm_meta_t& im, char wgid[]);
   CephContext *cct;
   ceph::mutex lock = ceph::make_mutex("IB lock");
   bool initialized = false;
@@ -518,8 +518,8 @@ class Infiniband {
   CompletionChannel *create_comp_channel(CephContext *c);
   CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);
   uint8_t get_ib_physical_port() { return ib_physical_port; }
-  int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
-  int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
+  int send_msg(CephContext *cct, int sd, ib_cm_meta_t& msg);
+  int recv_msg(CephContext *cct, int sd, ib_cm_meta_t& msg);
   uint16_t get_lid() { return device->get_lid(); }
   ibv_gid get_gid() { return device->get_gid(); }
   MemoryManager* get_memory_manager() { return memory_manager; }
index 0c593b0116bbc16a7731adfa3343fde4167010db..51619010065257194be0760fbca6e2cd4038ce71 100644 (file)
@@ -29,11 +29,11 @@ RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<In
 {
   if (!cct->_conf->ms_async_rdma_cm) {
     qp = ib->create_queue_pair(cct, dispatcher->get_tx_cq(), dispatcher->get_rx_cq(), IBV_QPT_RC, NULL);
-    my_msg.qpn = qp->get_local_qp_number();
-    my_msg.psn = qp->get_initial_psn();
-    my_msg.lid = ib->get_lid();
-    my_msg.peer_qpn = 0;
-    my_msg.gid = ib->get_gid();
+    local_cm_meta.local_qpn = qp->get_local_qp_number();
+    local_cm_meta.psn = qp->get_initial_psn();
+    local_cm_meta.lid = ib->get_lid();
+    local_cm_meta.peer_qpn = 0;
+    local_cm_meta.gid = ib->get_gid();
     notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
     dispatcher->register_qp(qp, this);
     dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
@@ -46,7 +46,7 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
   ldout(cct, 20) << __func__ << " destruct." << dendl;
   cleanup();
   worker->remove_pending_conn(this);
-  dispatcher->erase_qpn(my_msg.qpn);
+  dispatcher->erase_qpn(local_cm_meta.local_qpn);
 
   for (unsigned i=0; i < wc.size(); ++i) {
     dispatcher->post_chunk_to_pool(reinterpret_cast<Chunk*>(wc[i].wr_id));
@@ -90,18 +90,18 @@ int RDMAConnectedSocketImpl::activate()
   memset(&qpa, 0, sizeof(qpa));
   qpa.qp_state = IBV_QPS_RTR;
   qpa.path_mtu = IBV_MTU_1024;
-  qpa.dest_qp_num = peer_msg.qpn;
-  qpa.rq_psn = peer_msg.psn;
+  qpa.dest_qp_num = peer_cm_meta.local_qpn;
+  qpa.rq_psn = peer_cm_meta.psn;
   qpa.max_dest_rd_atomic = 1;
   qpa.min_rnr_timer = 12;
   //qpa.ah_attr.is_global = 0;
   qpa.ah_attr.is_global = 1;
   qpa.ah_attr.grh.hop_limit = 6;
-  qpa.ah_attr.grh.dgid = peer_msg.gid;
+  qpa.ah_attr.grh.dgid = peer_cm_meta.gid;
 
   qpa.ah_attr.grh.sgid_index = ib->get_device()->get_gid_idx();
 
-  qpa.ah_attr.dlid = peer_msg.lid;
+  qpa.ah_attr.dlid = peer_cm_meta.lid;
   qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
   qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp;
   qpa.ah_attr.src_path_bits = 0;
@@ -140,7 +140,7 @@ int RDMAConnectedSocketImpl::activate()
   // before giving up. Occurs when the remote side has not yet posted
   // a receive request.
   qpa.rnr_retry = 7; // 7 is infinite retry.
-  qpa.sq_psn = my_msg.psn;
+  qpa.sq_psn = local_cm_meta.psn;
   qpa.max_rd_atomic = 1;
 
   r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
@@ -162,7 +162,7 @@ int RDMAConnectedSocketImpl::activate()
 
   if (!is_server) {
     connected = 1; //indicate successfully
-    ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl;
+    ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << local_cm_meta.local_qpn << dendl;
     submit(false);
   }
   active = true;
@@ -189,8 +189,8 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S
 
   ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
   net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
-  my_msg.peer_qpn = 0;
-  r = ib->send_msg(cct, tcp_fd, my_msg);
+  local_cm_meta.peer_qpn = 0;
+  r = ib->send_msg(cct, tcp_fd, local_cm_meta);
   if (r < 0)
     return r;
 
@@ -199,8 +199,8 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S
 }
 
 void RDMAConnectedSocketImpl::handle_connection() {
-  ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
-  int r = ib->recv_msg(cct, tcp_fd, peer_msg);
+  ldout(cct, 20) << __func__ << " QP: " << local_cm_meta.local_qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
+  int r = ib->recv_msg(cct, tcp_fd, peer_cm_meta);
   if (r <= 0) {
     if (r != -EAGAIN) {
       dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
@@ -217,29 +217,29 @@ void RDMAConnectedSocketImpl::handle_connection() {
   }
 
   if (!is_server) {// syn + ack from server
-    my_msg.peer_qpn = peer_msg.qpn;
-    ldout(cct, 20) << __func__ << " peer msg :  < " << peer_msg.qpn << ", " << peer_msg.psn
-                   <<  ", " << peer_msg.lid << ", " << peer_msg.peer_qpn << "> " << dendl;
+    local_cm_meta.peer_qpn = peer_cm_meta.local_qpn;
+    ldout(cct, 20) << __func__ << " peer msg :  < " << peer_cm_meta.local_qpn << ", " << peer_cm_meta.psn
+                   <<  ", " << peer_cm_meta.lid << ", " << peer_cm_meta.peer_qpn << "> " << dendl;
     if (!connected) {
       r = activate();
       ceph_assert(!r);
     }
     notify();
-    r = ib->send_msg(cct, tcp_fd, my_msg);
+    r = ib->send_msg(cct, tcp_fd, local_cm_meta);
     if (r < 0) {
       ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
       dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
       fault();
     }
   } else {
-    if (peer_msg.peer_qpn == 0) {// syn from client
+    if (peer_cm_meta.peer_qpn == 0) {// syn from client
       if (active) {
         ldout(cct, 10) << __func__ << " server is already active." << dendl;
         return ;
       }
       r = activate();
       ceph_assert(!r);
-      r = ib->send_msg(cct, tcp_fd, my_msg);
+      r = ib->send_msg(cct, tcp_fd, local_cm_meta);
       if (r < 0) {
         ldout(cct, 1) << __func__ << " server ack failed." << dendl;
         dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
@@ -260,7 +260,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
 {
   eventfd_t event_val = 0;
   int r = eventfd_read(notify_fd, &event_val);
-  ldout(cct, 20) << __func__ << " notify_fd : " << event_val << " in " << my_msg.qpn << " r = " << r << dendl;
+  ldout(cct, 20) << __func__ << " notify_fd : " << event_val << " in " << local_cm_meta.local_qpn << " r = " << r << dendl;
   
   if (!active) {
     ldout(cct, 1) << __func__ << " when ib not active. len: " << len << dendl;
@@ -275,7 +275,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
   read = read_buffers(buf,len);
 
   if (is_server && connected == 0) {
-    ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;
+    ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << local_cm_meta.local_qpn << " peer QP: " << peer_cm_meta.local_qpn << dendl;
     connected = 1; //if so, we don't need the last handshake
     cleanup();
     submit(false);
@@ -411,11 +411,11 @@ ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
     std::lock_guard l{lock};
     pending_bl.claim_append(bl);
     if (!connected) {
-      ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl;
+      ldout(cct, 20) << __func__ << " fake send to upper, QP: " << local_cm_meta.local_qpn << dendl;
       return bytes;
     }
   }
-  ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl;
+  ldout(cct, 20) << __func__ << " QP: " << local_cm_meta.local_qpn << dendl;
   ssize_t r = submit(more);
   if (r < 0 && r != -EAGAIN)
     return r;
@@ -522,7 +522,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
 
 int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
 {
-  ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl;
+  ldout(cct, 20) << __func__ << " QP: " << local_cm_meta.local_qpn << " " << tx_buffers[0] << dendl;
   vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
   ibv_sge isge[tx_buffers.size()];
   uint32_t current_sge = 0;
index 21408417e18696a82aa7565be7174d8339c73ddb..d7cea99746577621b1d46835cf9241103f816aa9 100644 (file)
@@ -30,7 +30,7 @@ RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, sha
     }, false);
     status = RESOURCE_ALLOCATED;
     local_qpn = qp->get_local_qp_number();
-    my_msg.qpn = local_qpn;
+    local_cm_meta.local_qpn = local_qpn;
   } else {
     is_server = false;
     cm_channel = rdma_create_event_channel();
@@ -98,7 +98,7 @@ void RDMAIWARPConnectedSocketImpl::handle_cm_connection() {
         break;
       }
       local_qpn = qp->get_local_qp_number();
-      my_msg.qpn = local_qpn;
+      local_cm_meta.local_qpn = local_qpn;
 
       memset(&cm_params, 0, sizeof(cm_params));
       cm_params.retry_count = RETRY_COUNT;
index 144e5d22432906e011fb451536f51e5bbdc4bf0d..782e57399728f38b21d5d68993ab014ae366b9db 100644 (file)
@@ -185,8 +185,8 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
  protected:
   CephContext *cct;
   Infiniband::QueuePair *qp;
-  IBSYNMsg peer_msg;
-  IBSYNMsg my_msg;
+  ib_cm_meta_t peer_cm_meta;
+  ib_cm_meta_t local_cm_meta;
   int connected;
   int error;
   shared_ptr<Infiniband> ib;