]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/rdma: add tcp control path handshake to ensure connection built
authorHaomai Wang <haomai@xsky.com>
Thu, 13 Oct 2016 03:38:41 +0000 (11:38 +0800)
committerHaomai Wang <haomai@xsky.com>
Tue, 1 Nov 2016 07:34:04 +0000 (15:34 +0800)
Signed-off-by: Haomai Wang <haomai@xsky.com>
13 files changed:
src/common/config_opts.h
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h
src/msg/async/PosixStack.cc
src/msg/async/Stack.h
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAServerSocketImpl.cc
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h
src/test/msgr/test_async_networkstack.cc

index a433e5937d39ae2cbefb3f1a7f1b2cd38fda05f4..fd8e5dd7d0233601f5e9ad9170a52212e3cf74c0 100644 (file)
@@ -216,8 +216,9 @@ OPTION(ms_async_send_inline, OPT_BOOL, false)
 OPTION(ms_async_rdma_device_name, OPT_STR, "")
 OPTION(ms_async_rdma_enable_hugepage, OPT_BOOL, false)
 OPTION(ms_async_rdma_buffer_size, OPT_INT, 8192)
-OPTION(ms_async_rdma_send_buffers, OPT_U32, 32)
-OPTION(ms_async_rdma_receive_buffers, OPT_U32, 64)
+OPTION(ms_async_rdma_send_buffers, OPT_U32, 10240)
+OPTION(ms_async_rdma_receive_buffers, OPT_U32, 10240)
+OPTION(ms_async_rdma_port_num, OPT_U32, 1)
 
 OPTION(inject_early_sigterm, OPT_BOOL, false)
 
index 73ef39fd921f4c1c0faa39bd8375bfe18e76dc89..abd89387af7d293386466d03bf66ce02022f0077 100644 (file)
@@ -885,7 +885,8 @@ ssize_t AsyncConnection::_process_connection()
           goto fail;
         } else if (r == 0) {
           ldout(async_msgr->cct, 10) << __func__ << " nonblock connect inprogress" << dendl;
-          center->create_file_event(cs.fd(), EVENT_WRITABLE, read_handler);
+          if (async_msgr->get_stack()->nonblock_connect_need_writable_event())
+            center->create_file_event(cs.fd(), EVENT_WRITABLE, read_handler);
           break;
         }
 
index f1dbd04f49f81c72fbecaf7439ce57d898404f2c..360da05419767e75beeb8e9cc1ef7acc6665b264 100644 (file)
@@ -194,11 +194,14 @@ void Processor::accept()
   while (true) {
     entity_addr_t addr;
     ConnectedSocket cli_socket;
-    int r = listen_socket.accept(&cli_socket, opts, &addr);
+    Worker *w = worker;
+    if (!msgr->get_stack()->support_local_listen_table())
+      w = msgr->get_stack()->get_worker();
+    int r = listen_socket.accept(&cli_socket, opts, &addr, w);
     if (r == 0) {
       ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << cli_socket.fd() << dendl;
 
-      msgr->add_accept(worker, std::move(cli_socket), addr);
+      msgr->add_accept(w, std::move(cli_socket), addr);
       continue;
     } else {
       if (r == -EINTR) {
@@ -440,16 +443,13 @@ void AsyncMessenger::wait()
   started = false;
 }
 
-AsyncConnectionRef AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr)
+void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr)
 {
   lock.Lock();
-  if (!stack->support_local_listen_table())
-    w = stack->get_worker();
   AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
   conn->accept(std::move(cli_socket), addr);
   accepting_conns.insert(conn);
   lock.Unlock();
-  return conn;
 }
 
 AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int type)
index 5e9877f7bc78c094ba1d7b9a5ce66197e9f39e51..5518f24f38ddaf487186bc2ef7b65999faf19c51 100644 (file)
@@ -350,7 +350,10 @@ public:
   }
 
   void learned_addr(const entity_addr_t &peer_addr_for_me);
-  AsyncConnectionRef add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr);
+  void add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr);
+  NetworkStack *get_stack() {
+    return stack;
+  }
 
   /**
    * This wraps ms_deliver_get_authorizer. We use it for AsyncConnection.
index e71cd6aa3d1c8804bbdd57496bc469e55b17ca79..21f0ad6fc970596041610899509eb44e24a323df 100644 (file)
@@ -249,7 +249,7 @@ class PosixServerSocketImpl : public ServerSocketImpl {
 
  public:
   explicit PosixServerSocketImpl(NetHandler &h, const entity_addr_t &sa, int f): handler(h), sa(sa), _fd(f) {}
-  virtual int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out) override;
+  virtual int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
   virtual void abort_accept() override {
     ::close(_fd);
   }
@@ -258,7 +258,7 @@ class PosixServerSocketImpl : public ServerSocketImpl {
   }
 };
 
-int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out) {
+int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
   assert(sock);
   sockaddr_storage ss;
   socklen_t slen = sizeof(ss);
index cfcca10c37f3537571f3783f84348dc2d2fab699..431abb7e754817a03c32a4583a60371fcdab0c4f 100644 (file)
@@ -23,6 +23,7 @@
 #include "msg/msg_types.h"
 #include "msg/async/Event.h"
 
+class Worker;
 class ConnectedSocketImpl {
  public:
   virtual ~ConnectedSocketImpl() {}
@@ -47,7 +48,7 @@ struct SocketOptions {
 class ServerSocketImpl {
  public:
   virtual ~ServerSocketImpl() {}
-  virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out) = 0;
+  virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) = 0;
   virtual void abort_accept() = 0;
   /// Get file descriptor
   virtual int fd() const = 0;
@@ -157,8 +158,8 @@ class ServerSocket {
   ///
   /// \Accepts a \ref ConnectedSocket representing the connection, and
   ///          a \ref entity_addr_t describing the remote endpoint.
-  int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out) {
-    return _ssi->accept(sock, opt, out);
+  int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
+    return _ssi->accept(sock, opt, out, w);
   }
 
   /// Stops any \ref accept() in progress.
@@ -309,6 +310,7 @@ class NetworkStack : public CephContext::ForkWatcher {
   // But for dpdk backend, we maintain listen table in each thread. So we
   // need to let each thread do binding port.
   virtual bool support_local_listen_table() const { return false; }
+  virtual bool nonblock_connect_need_writable_event() const { return true; }
 
   void start();
   void stop();
index eac08c6d290d32cbe1b8613052cbe83711699c03..ba9e7799a2b22baab561620e9f330bb56abc25f9 100644 (file)
 \r
 static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1;\r
 static const uint32_t MAX_INLINE_DATA = 128;\r
-static const uint32_t UDP_MSG_LEN = sizeof("0000:00000000:00000000:00000000000000000000000000000000") - 1;\r
+static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");\r
+static const uint32_t CQ_DEPTH = 30000;\r
 \r
-Device::Device(CephContext *c, ibv_device* d): cct(c), device(d), device_attr(new ibv_device_attr)\r
+Device::Device(CephContext *c, ibv_device* d): cct(c), device(d), device_attr(new ibv_device_attr), active_port(nullptr)\r
 {\r
   if (device == NULL) {\r
     lderr(cct) << __func__ << "device == NULL" << cpp_strerror(errno) << dendl;\r
@@ -43,26 +44,31 @@ Device::Device(CephContext *c, ibv_device* d): cct(c), device(d), device_attr(ne
     lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl;\r
     assert(0);\r
   }\r
+}\r
+\r
+void Device::binding_port(uint8_t port_num) {\r
   port_cnt = device_attr->phys_port_cnt;\r
   ports = new Port*[port_cnt];\r
   for (uint8_t i = 0; i < port_cnt; ++i) {\r
     ports[i] = new Port(cct, ctxt, i+1);\r
-    if (ports[i]->get_port_attr()->state == IBV_PORT_ACTIVE) {\r
+    if (i+1 == port_num && ports[i]->get_port_attr()->state == IBV_PORT_ACTIVE) {\r
       active_port = ports[i];\r
       ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl;\r
+      return ;\r
     } else {\r
-      ldout(cct, 10) << __func__ << " port " << i+1 << " is unactive(" << ports[i]->get_port_attr()->state << ")"<< dendl;\r
+      ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << ports[i]->get_port_attr()->state << ")"<< dendl;\r
     }\r
   }\r
-  if (!active_port) {\r
-    lderr(cct) << __func__ << " no active port found" << dendl;\r
+  if (nullptr == active_port) {\r
+    lderr(cct) << __func__ << "  port not found" << dendl;\r
     assert(active_port);\r
   }\r
 }\r
 \r
-Infiniband::Infiniband(CephContext *c, const std::string &device_name): cct(c), device_list(c), net(c)\r
+Infiniband::Infiniband(CephContext *c, const std::string &device_name, uint8_t port_num): cct(c), device_list(c), net(c)\r
 {\r
   device = device_list.get_device(device_name.c_str());\r
+  device->binding_port(port_num);\r
   assert(device);\r
   ib_physical_port = device->active_port->get_port_num();\r
   pd = new ProtectionDomain(cct, device);\r
@@ -123,23 +129,10 @@ ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_s
  *      QueuePair on success or NULL if init fails\r
  * See QueuePair::QueuePair for parameter documentation.\r
  */\r
-Infiniband::QueuePair* Infiniband::create_queue_pair(CompletionQueue *c, ibv_qp_type type)\r
+Infiniband::QueuePair* Infiniband::create_queue_pair(CompletionQueue *tx, CompletionQueue* rx, ibv_qp_type type)\r
 {\r
-  Infiniband::CompletionChannel* cc = create_comp_channel();\r
-  if (!cc)\r
-    return NULL;\r
-\r
-  Infiniband::CompletionQueue* cq = create_comp_queue(cc);\r
-  if (!cq) {\r
-    delete cc;\r
-    lderr(cct) << __func__ << " failed to create cq." << dendl;\r
-    return NULL;\r
-  }\r
-\r
-  Infiniband::QueuePair *qp = new QueuePair(*this, type, ib_physical_port, srq, c, cq, max_send_wr, max_recv_wr);\r
+  Infiniband::QueuePair *qp = new QueuePair(*this, type, ib_physical_port, srq, tx, rx, max_send_wr, max_recv_wr);\r
   if (qp->init()) {\r
-    delete cc;\r
-    delete cq;\r
     delete qp;\r
     return NULL;\r
   }\r
@@ -257,7 +250,7 @@ int Infiniband::post_chunk(Chunk* chunk)
   int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest);\r
   if (ret) {\r
     lderr(cct) << __func__ << " ib_post_srq_recv failed on post "\r
-      << cpp_strerror(errno) << dendl;\r
+               << cpp_strerror(errno) << dendl;\r
     return -1;\r
   }\r
   return 0;\r
@@ -267,7 +260,7 @@ int Infiniband::post_channel_cluster()
 {\r
   vector<Chunk*> free_chunks;\r
   int r = memory_manager->get_channel_buffers(free_chunks, 0);\r
-  assert(r == 0);\r
+  assert(r > 0);\r
   for (vector<Chunk*>::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) {\r
     r = post_chunk(*iter);\r
     assert(r == 0);\r
@@ -289,8 +282,7 @@ Infiniband::CompletionChannel* Infiniband::create_comp_channel()
 \r
 Infiniband::CompletionQueue* Infiniband::create_comp_queue(CompletionChannel *cc)\r
 {\r
-  ldout(cct, 20) << __func__ << " completion channel=" << cc << dendl;\r
-  Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(*this, max_recv_wr, cc);\r
+  Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(*this, CQ_DEPTH, cc);\r
   if (cq->init()) {\r
     delete cq;\r
     return NULL;\r
@@ -328,75 +320,66 @@ Infiniband::QueuePair::QueuePair(
 \r
 // 1 means no valid buffer read, 0 means got enough buffer\r
 // else return < 0 means error\r
-int Infiniband::recv_udp_msg(int sd, IBSYNMsg& im, entity_addr_t *addr)\r
+int Infiniband::recv_msg(int sd, IBSYNMsg& im)\r
 {\r
-  assert(sd >= 0);\r
-  ssize_t r;\r
-  entity_addr_t socket_addr;\r
-  struct sockaddr from;\r
-  socklen_t slen = sizeof(from);\r
-  char msg[UDP_MSG_LEN];\r
-  char gid[32];\r
-  r = ::recvfrom(sd, &msg, sizeof(msg), 0, &from, &slen);\r
+  char msg[TCP_MSG_LEN];\r
+  char gid[33];\r
+  ssize_t r = ::read(sd, &msg, sizeof(msg));\r
   // Drop incoming qpt\r
   if (cct->_conf->ms_inject_socket_failures && sd >= 0) {\r
     if (rand() % cct->_conf->ms_inject_socket_failures == 0) {\r
       ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;\r
-      r = -1;\r
+      return -EINVAL;\r
     }\r
   }\r
-  if (r == -1) {\r
-    lderr(cct) << __func__ << " recv got error " << errno << ": "\r
-      << cpp_strerror(errno) << dendl;\r
-    return -1;\r
+  if (r < 0) {\r
+    r = -errno;\r
+    lderr(cct) << __func__ << " got error " << errno << ": "\r
+               << cpp_strerror(errno) << dendl;\r
   } else if ((size_t)r != sizeof(msg)) { // valid message length\r
-    lderr(cct) << __func__ << " recv got bad length (" << r << ")." << cpp_strerror(errno) << dendl;\r
-    return 1;\r
+    r = -EINVAL;\r
+    lderr(cct) << __func__ << " got bad length (" << r << "): " << cpp_strerror(errno) << dendl;\r
   } else { // valid message\r
-    socket_addr.set_sockaddr(&from);\r
-    if (addr) {\r
-      *addr = socket_addr;\r
-    }\r
-    sscanf(msg, "%04x:%08x:%08x:%s", &(im.lid), &(im.qpn), &(im.psn), gid);\r
+    sscanf(msg, "%x:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid);\r
     wire_gid_to_gid(gid, &(im.gid));\r
-    ldout(cct, 10) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << gid  << dendl;\r
-    return 0;\r
+    ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid  << dendl;\r
   }\r
+  return r;\r
 }\r
 \r
-int Infiniband::send_udp_msg(int sd, IBSYNMsg& im, entity_addr_t &peeraddr)\r
+int Infiniband::send_msg(int sd, IBSYNMsg& im)\r
 {\r
-  assert(sd >= 0);\r
   int retry = 0;\r
   ssize_t r;\r
 \r
-  char msg[UDP_MSG_LEN];\r
-  char gid[32];\r
+  char msg[TCP_MSG_LEN];\r
+  char gid[33];\r
 retry:\r
   gid_to_wire_gid(&(im.gid), gid);\r
-  r = snprintf(msg, UDP_MSG_LEN, "%04x:%08x:%08x:%s", im.lid, im.qpn, im.psn, gid);\r
-  ldout(cct, 20) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << gid << " r=" << r << dendl;\r
-  r = ::sendto(sd, msg, sizeof(msg), 0, peeraddr.get_sockaddr(),\r
-               peeraddr.get_sockaddr_len());\r
+  sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid);\r
+  ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn\r
+                 << ", " << im.peer_qpn << ", "  << gid  << dendl;\r
+  r = ::write(sd, msg, sizeof(msg));\r
   // Drop incoming qpt\r
   if (cct->_conf->ms_inject_socket_failures && sd >= 0) {\r
     if (rand() % cct->_conf->ms_inject_socket_failures == 0) {\r
       ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;\r
-      r = -1;\r
+      return -EINVAL;\r
     }\r
   }\r
 \r
   if ((size_t)r != sizeof(msg)) {\r
+    // FIXME need to handle EAGAIN instead of retry\r
     if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {\r
       retry++;\r
       goto retry;\r
     }\r
     if (r < 0)\r
       lderr(cct) << __func__ << " send returned error " << errno << ": "\r
-        << cpp_strerror(errno) << dendl;\r
+                 << cpp_strerror(errno) << dendl;\r
     else\r
       lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;\r
-    return -1;\r
+    return -errno;\r
   }\r
   return 0;\r
 }\r
@@ -424,36 +407,34 @@ Infiniband::QueuePair::~QueuePair()
 {\r
   if (qp)\r
     assert(!ibv_destroy_qp(qp));\r
-  ldout(infiniband.cct, 20) << __func__ << " successfully destroyed QueuePair." << dendl;\r
 }\r
 \r
 Infiniband::CompletionChannel::~CompletionChannel()\r
 {\r
   if (channel) {\r
     int r = ibv_destroy_comp_channel(channel);\r
-    ldout(infiniband.cct, 20) << __func__ << " r: " << r << dendl;\r
+    if (r < 0)\r
+      lderr(infiniband.cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl;\r
     assert(r == 0);\r
   }\r
-  ldout(infiniband.cct, 20) << __func__ << " successfully destroyed CompletionChannel." << dendl;\r
 }\r
 \r
 Infiniband::CompletionQueue::~CompletionQueue()\r
 {\r
   if (cq) {\r
     int r = ibv_destroy_cq(cq);\r
-    ldout(infiniband.cct, 20) << __func__ << " r: " << cpp_strerror(errno) << dendl;\r
+    if (r < 0)\r
+      lderr(infiniband.cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl;\r
     assert(r == 0);\r
   }\r
-  ldout(infiniband.cct, 20) << __func__ << " successfully destroyed CompletionQueue." << dendl;\r
 }\r
 \r
 int Infiniband::CompletionQueue::rearm_notify(bool solicite_only)\r
 {\r
   ldout(infiniband.cct, 20) << __func__ << " started." << dendl;\r
   int r = ibv_req_notify_cq(cq, 0);\r
-  if (r) {\r
+  if (r < 0)\r
     lderr(infiniband.cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl;\r
-  }\r
   return r;\r
 }\r
 \r
@@ -503,6 +484,7 @@ int Infiniband::CompletionQueue::init()
   if (ibv_req_notify_cq(cq, 0)) {\r
     lderr(infiniband.cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl;\r
     ibv_destroy_cq(cq);\r
+    cq = nullptr;\r
     return -1;\r
   }\r
 \r
@@ -517,7 +499,7 @@ int Infiniband::CompletionChannel::init()
   channel = ibv_create_comp_channel(infiniband.device->ctxt);\r
   if (!channel) {\r
     lderr(infiniband.cct) << __func__ << " failed to create receive completion channel: "\r
-      << cpp_strerror(errno) << dendl;\r
+                          << cpp_strerror(errno) << dendl;\r
     return -1;\r
   }\r
   int rc = infiniband.net.set_nonblock(channel->fd);\r
@@ -568,3 +550,16 @@ const char* Infiniband::wc_status_to_string(int status)
       return "<status out of range!>";\r
   return lookup[status];\r
 }\r
+\r
+const char* Infiniband::qp_state_string(int status) {\r
+  switch(status) {\r
+    case IBV_QPS_RESET : return "IBV_QPS_RESET";\r
+    case IBV_QPS_INIT  : return "IBV_QPS_INIT";\r
+    case IBV_QPS_RTR   : return "IBV_QPS_RTR";\r
+    case IBV_QPS_RTS   : return "IBV_QPS_RTS";\r
+    case IBV_QPS_SQD   : return "IBV_QPS_SQD";\r
+    case IBV_QPS_SQE   : return "IBV_QPS_SQE";\r
+    case IBV_QPS_ERR   : return "IBV_QPS_ERR";\r
+    default: return " out of range.";\r
+  }\r
+}\r
index 3ed5f85d39456b2bdbbe0891df1338a4306f6c89..921adfe6a33e0f168f215346998018091ace8bf7 100644 (file)
@@ -38,6 +38,7 @@ struct IBSYNMsg {
   uint16_t lid;\r
   uint32_t qpn;\r
   uint32_t psn;\r
+  uint32_t peer_qpn;\r
   union ibv_gid gid;\r
 } __attribute__((packed));\r
 \r
@@ -93,6 +94,7 @@ class Device {
   const char* get_name() { return name;}\r
   uint16_t get_lid() { return active_port->get_lid(); }\r
   ibv_gid get_gid() { return active_port->get_gid(); }\r
+  void binding_port(uint8_t port_num);\r
   struct ibv_context *ctxt;\r
   ibv_device_attr *device_attr;\r
   Port* active_port;\r
@@ -117,7 +119,7 @@ class DeviceList {
     }\r
   }\r
   ~DeviceList() {\r
-    for (int i=0; devices[i] != NULL; ++i) {\r
+    for (int i=0; i < num; ++i) {\r
       delete devices[i];\r
     }\r
     delete []devices;\r
@@ -173,7 +175,7 @@ class Infiniband {
         offset = o;\r
       }\r
 \r
-      size_t get_offset() {\r
+      uint32_t get_offset() {\r
         return offset;\r
       }\r
 \r
@@ -190,9 +192,9 @@ class Infiniband {
         return bound;\r
       }\r
 \r
-      size_t read(char* buf, size_t len) {\r
-        size_t left = bound - offset;\r
-        if(left >= len) {\r
+      uint32_t read(char* buf, uint32_t len) {\r
+        uint32_t left = bound - offset;\r
+        if (left >= len) {\r
           memcpy(buf, buffer+offset, len);\r
           offset += len;\r
           return len;\r
@@ -204,8 +206,8 @@ class Infiniband {
         }\r
       }\r
 \r
-      size_t write(char* buf, size_t len) {\r
-        size_t left = bytes - offset;\r
+      uint32_t write(char* buf, uint32_t len) {\r
+        uint32_t left = bytes - offset;\r
         if (left >= len) {\r
           memcpy(buffer+offset, buf, len);\r
           offset += len;\r
@@ -246,7 +248,7 @@ class Infiniband {
       char* buffer;\r
       uint32_t bytes;\r
       uint32_t bound;\r
-      size_t offset;\r
+      uint32_t offset;\r
       ibv_mr* mr;\r
       uint64_t owner;\r
     };\r
@@ -264,6 +266,10 @@ class Infiniband {
           delete *c;\r
           ++c;\r
         }\r
+        if (manager.enabled_huge_page)\r
+          delete base;\r
+        else\r
+          manager.free_huge_pages(base);\r
       }\r
       int add(uint32_t num) {\r
         uint32_t bytes = chunk_size * num;\r
@@ -290,23 +296,28 @@ class Infiniband {
       }\r
 \r
       int get_buffers(std::vector<Chunk*> &chunks, size_t bytes) {\r
+        uint32_t num = bytes / chunk_size + 1;\r
+        if (bytes % chunk_size == 0)\r
+          --num;\r
+        int r = num;\r
         Mutex::Locker l(lock);\r
+        if (free_chunks.empty())\r
+          return 0;\r
         if (!bytes) {\r
           free_chunks.swap(chunks);\r
-          return 0;\r
+          r = chunks.size();\r
+          return r;\r
+        }\r
+        if (free_chunks.size() < num) {\r
+          num = free_chunks.size();\r
+          r = num;\r
         }\r
-        uint32_t num = bytes / chunk_size + 1;\r
-        if (bytes % chunk_size == 0)\r
-          --num;\r
-        if (free_chunks.size() < num)\r
-          return -EAGAIN;\r
         for (uint32_t i = 0; i < num; ++i) {\r
           chunks.push_back(free_chunks.back());\r
           free_chunks.pop_back();\r
         }\r
-        return 0;\r
+        return r;\r
       }\r
-\r
       MemoryManager& manager;\r
       uint32_t chunk_size;\r
       Mutex lock;\r
@@ -319,9 +330,9 @@ class Infiniband {
       enabled_huge_page = cct->_conf->ms_async_rdma_enable_hugepage;\r
     }\r
     ~MemoryManager() {\r
-      if(channel)\r
+      if (channel)\r
         delete channel;\r
-      if(send)\r
+      if (send)\r
         delete send;\r
     }\r
     void* malloc_huge_pages(size_t size) {\r
@@ -372,6 +383,7 @@ class Infiniband {
     }\r
 \r
     int is_tx_chunk(Chunk* c) { return send->all_chunks.count(c);}\r
+    int is_rx_chunk(Chunk* c) { return channel->all_chunks.count(c);}\r
     bool enabled_huge_page;\r
    private:\r
     Cluster* channel;//RECV\r
@@ -397,7 +409,7 @@ class Infiniband {
 \r
  public:\r
   NetHandler net;\r
-  explicit Infiniband(CephContext *c, const std::string &device_name);\r
+  explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p);\r
 \r
   /**\r
    * Destroy an Infiniband object.\r
@@ -569,7 +581,7 @@ class Infiniband {
  public:\r
   typedef MemoryManager::Cluster Cluster;\r
   typedef MemoryManager::Chunk Chunk;\r
-  QueuePair* create_queue_pair(CompletionQueue *w, ibv_qp_type type);\r
+  QueuePair* create_queue_pair(CompletionQueue*, CompletionQueue*, ibv_qp_type type);\r
   ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);\r
   int post_chunk(Chunk* chunk);\r
   int post_channel_cluster();\r
@@ -581,14 +593,29 @@ class Infiniband {
   uint8_t get_ib_physical_port() {\r
     return ib_physical_port;\r
   }\r
-  int send_udp_msg(int sd, IBSYNMsg& msg, entity_addr_t &peeraddr);\r
-  int recv_udp_msg(int sd, IBSYNMsg& msg, entity_addr_t *addr);\r
+  int send_msg(int sd, IBSYNMsg& msg);\r
+  int recv_msg(int sd, IBSYNMsg& msg);\r
   uint16_t get_lid() { return device->get_lid(); }\r
   ibv_gid get_gid() { return device->get_gid(); }\r
   MemoryManager* get_memory_manager() { return memory_manager; }\r
   Device* get_device() { return device; }\r
   int get_async_fd() { return device->ctxt->async_fd; }\r
+  int recall_chunk(Chunk* c) {\r
+    if (memory_manager->is_rx_chunk(c)) {\r
+      post_chunk(c);  \r
+      return 1;\r
+    } else if (memory_manager->is_tx_chunk(c)) {\r
+      vector<Chunk*> v;\r
+      v.push_back(c);\r
+      memory_manager->return_tx(v);  \r
+      return 2;\r
+    }\r
+    return -1;\r
+  }\r
+  int is_tx_chunk(Chunk* c) { return memory_manager->is_tx_chunk(c); }\r
+  int is_rx_chunk(Chunk* c) { return memory_manager->is_rx_chunk(c); }\r
   static const char* wc_status_to_string(int status);\r
+  static const char* qp_state_string(int status);\r
 };\r
 \r
 #endif\r
index c6594f30824fc81928148d51da5b702586bdec0b..749c94086004a33943afbbb1d5632ea08fc4696a 100644 (file)
@@ -20,7 +20,8 @@
 #undef dout_prefix\r
 #define dout_prefix *_dout << " RDMAConnectedSocketImpl "\r
 \r
-int RDMAConnectedSocketImpl::activate() {\r
+int RDMAConnectedSocketImpl::activate()\r
+{\r
   ibv_qp_attr qpa;\r
   int r;\r
 \r
@@ -52,7 +53,7 @@ int RDMAConnectedSocketImpl::activate() {
       IBV_QP_MAX_DEST_RD_ATOMIC);\r
   if (r) {\r
     lderr(cct) << __func__ << " failed to transition to RTR state: "\r
-      << cpp_strerror(errno) << dendl;\r
+               << cpp_strerror(errno) << dendl;\r
     return -1;\r
   }\r
 \r
@@ -85,22 +86,103 @@ int RDMAConnectedSocketImpl::activate() {
       IBV_QP_MAX_QP_RD_ATOMIC);\r
   if (r) {\r
     lderr(cct) << __func__ << " failed to transition to RTS state: "\r
-      << cpp_strerror(errno) << dendl;\r
+               << cpp_strerror(errno) << dendl;\r
     return -1;\r
   }\r
 \r
   // the queue pair should be ready to use once the client has finished\r
   // setting up their end.\r
   ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl;\r
+  ldout(cct, 20) << __func__ << " QueuePair: " << qp << " with qp:" << qp->get_qp() << dendl;\r
+\r
+  if (!is_server) {\r
+    connected = 1; //indicate successfully\r
+    ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl;\r
+    submit(false);\r
+  }\r
+  active = true;\r
 \r
-  connected = 1;//indicate successfully\r
   return 0;\r
 }\r
 \r
+int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {\r
+  ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:"\r
+                 << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl;\r
+  tcp_fd = infiniband->net.connect(peer_addr);\r
+\r
+  if (tcp_fd < 0) {\r
+    return -errno;\r
+  }\r
+  infiniband->net.set_close_on_exec(tcp_fd);\r
+\r
+  int r = infiniband->net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size);\r
+  if (r < 0) {\r
+    ::close(tcp_fd);\r
+    return -errno;\r
+  }\r
+\r
+  ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;\r
+  infiniband->net.set_priority(tcp_fd, opts.priority);\r
+  my_msg.peer_qpn = 0;\r
+  r = infiniband->send_msg(tcp_fd, my_msg);\r
+  if (r < 0)\r
+    return r;\r
+\r
+  worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);\r
+  return 0;\r
+}\r
+\r
+void RDMAConnectedSocketImpl::handle_connection() {\r
+  ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " fd: " << notify_fd << dendl;\r
+  int r = infiniband->recv_msg(tcp_fd, peer_msg);\r
+  if (r < 0) {\r
+    if (r != -EAGAIN)\r
+      fault();\r
+    return;\r
+  }\r
+\r
+  if (!is_server) {// syn + ack from server\r
+    my_msg.peer_qpn = peer_msg.qpn;\r
+    ldout(cct, 20) << __func__ << " peer msg :  < " << peer_msg.qpn << ", " << peer_msg.psn\r
+                   <<  ", " << peer_msg.lid << ", " << peer_msg.peer_qpn << "> " << dendl;\r
+    if (!connected) {\r
+      r = activate();\r
+      assert(!r);\r
+    }\r
+    notify();\r
+    r = infiniband->send_msg(tcp_fd, my_msg);\r
+    if (r < 0) {\r
+      ldout(cct, 1) << __func__ << " send client ack failed." << dendl;\r
+      fault();\r
+    }\r
+  } else {\r
+    if (peer_msg.peer_qpn == 0) {// syn from client\r
+      if (active) {\r
+        ldout(cct, 10) << __func__ << " server is already active." << dendl;\r
+        return ;\r
+      }\r
+      r = infiniband->send_msg(tcp_fd, my_msg);\r
+      if (r < 0) {\r
+        ldout(cct, 1) << __func__ << " server ack failed." << dendl;\r
+        fault();\r
+        return ;\r
+      }\r
+      r = activate();\r
+      assert(!r);\r
+    } else { // ack from client\r
+      connected = 1;\r
+      cleanup();\r
+      submit(false);\r
+      notify();\r
+    }\r
+  }\r
+}\r
+\r
 ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)\r
 {\r
-  ldout(cct, 20) << __func__ << " need to read bytes: " << len  << " buffers size: " << buffers.size() << dendl;\r
-\r
+  uint64_t i = 0;\r
+  int r = ::read(notify_fd, &i, sizeof(i));\r
+  ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl;\r
   if (error)\r
     return -error;\r
   ssize_t read = 0;\r
@@ -112,27 +194,44 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
   if (cqe.empty())\r
     return read == 0 ? -EAGAIN : read;\r
 \r
-  ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses."<< dendl;\r
+  ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl;\r
   for (size_t i = 0; i < cqe.size(); ++i) {\r
     ibv_wc* response = &cqe[i];\r
     assert(response->status == IBV_WC_SUCCESS);\r
-    ldout(cct, 20) << __func__ << " cqe " << response->byte_len << " bytes." << dendl;\r
     Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);\r
+    ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl;\r
     chunk->prepare_read(response->byte_len);\r
-    assert(!response->byte_len);\r
+    if (response->byte_len == 0) {\r
+      if (connected) {\r
+        error = ECONNRESET;\r
+        assert(infiniband->post_chunk(chunk) == 0);\r
+        ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;\r
+      }\r
+      break;\r
+    }\r
+    //assert(response->byte_len);\r
     if (read == (ssize_t)len) {\r
       buffers.push_back(chunk);\r
-      ldout(cct, 20) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;\r
+      ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;\r
     } else if (read + response->byte_len > (ssize_t)len) {\r
       read += chunk->read(buf+read, (ssize_t)len-read);\r
       buffers.push_back(chunk);\r
-      ldout(cct, 20) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;\r
+      ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;\r
     } else {\r
       read += chunk->read(buf+read, response->byte_len);\r
       assert(infiniband->post_chunk(chunk) == 0);\r
     }\r
   }\r
 \r
+  if (is_server && connected == 0) {\r
+    ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;\r
+    connected = 1; //if so, we don't need the last handshake\r
+    cleanup();\r
+    submit(false);\r
+  }\r
+\r
+  if (read == 0 && error)\r
+    return -error;\r
   return read == 0 ? -EAGAIN : read;\r
 }\r
 \r
@@ -143,10 +242,10 @@ ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
   for (; c != buffers.end() ; ++c) {\r
     tmp = (*c)->read(buf+read, len-read);\r
     read += tmp;\r
-    ldout(cct, 20) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound()  << ". Chunk:" << *c  << dendl;\r
+    ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound()  << ". Chunk:" << *c  << dendl;\r
     if ((*c)->over()) {\r
       assert(infiniband->post_chunk(*c) == 0);\r
-      ldout(cct, 20) << __func__ << " one chunk over." << dendl;\r
+      ldout(cct, 25) << __func__ << " one chunk over." << dendl;\r
     }\r
     if (read == len) {\r
       break;\r
@@ -156,7 +255,7 @@ ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
   if (c != buffers.end() && (*c)->over())\r
     c++;\r
   buffers.erase(buffers.begin(), c);\r
-  ldout(cct, 20) << __func__ << " got " << read << " bytes here. buffers size: " << buffers.size() << dendl;\r
+  ldout(cct, 25) << __func__ << " got " << read  << " bytes, buffers size: " << buffers.size() << dendl;\r
   return read;\r
 }\r
 \r
@@ -166,7 +265,7 @@ ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)
     return -error;\r
   static const int MAX_COMPLETIONS = 16;\r
   ibv_wc wc[MAX_COMPLETIONS];\r
-  ssize_t size;\r
+  ssize_t size = 0;\r
 \r
   ibv_wc*  response;\r
   Chunk* chunk;\r
@@ -174,7 +273,8 @@ ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)
   auto iter = buffers.begin();\r
   if (iter != buffers.end()) {\r
     chunk = *iter;\r
-    auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);\r
+    // FIXME need to handle release\r
+    // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);\r
     buffers.erase(iter);\r
     loaded = true;\r
     size = chunk->bound;\r
@@ -191,8 +291,9 @@ ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)
     response = &wc[i];\r
     chunk = reinterpret_cast<Chunk*>(response->wr_id);\r
     chunk->prepare_read(response->byte_len);\r
-    if(!loaded && i == 0) {\r
-      auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);\r
+    if (!loaded && i == 0) {\r
+      // FIXME need to handle release\r
+      // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);\r
       size = chunk->bound;\r
       continue;\r
     }\r
@@ -207,12 +308,23 @@ ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)
 \r
 ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)\r
 {\r
-  if (error)\r
+  if (error) {\r
+    if (!active)\r
+      return -EPIPE;\r
     return -error;\r
+  }\r
   size_t bytes = bl.length();\r
   if (!bytes)\r
     return 0;\r
-  pending_bl.claim_append(bl);\r
+  {\r
+    Mutex::Locker l(lock);\r
+    pending_bl.claim_append(bl);\r
+    if (!connected) {\r
+      ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl;\r
+      return bytes;\r
+    }\r
+  }\r
+  ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl;\r
   ssize_t r = submit(more);\r
   if (r < 0 && r != -EAGAIN)\r
     return r;\r
@@ -223,43 +335,62 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
 {\r
   if (error)\r
     return -error;\r
+  Mutex::Locker l(lock);\r
   std::vector<Chunk*> tx_buffers;\r
   size_t bytes = pending_bl.length();\r
-  if (worker->reserve_message_buffer(this, tx_buffers, bytes) < 0) {\r
-    ldout(cct, 10) << __func__ << " no enough buffers" << dendl;\r
-    pending_bl.claim_append(pending_bl);\r
-    return -EAGAIN;\r
+  ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: "\r
+                 << pending_bl.buffers().size() << dendl;\r
+  if (!bytes)\r
+    return 0;\r
+\r
+  int ret = worker->reserve_message_buffer(this, tx_buffers, bytes);\r
+  if (ret == 0) {\r
+    ldout(cct, 10) << __func__ << " no enough buffers in worker " << worker << dendl;\r
+    return -EAGAIN; // that is ok , cause send will return bytes. == 0 enough buffers, < 0 no buffer, >0 not enough\r
   }\r
-  ldout(cct, 20) << __func__ << " prepare " << bytes << " bytes, tx buffer count: " << tx_buffers.size() << dendl;\r
   vector<Chunk*>::iterator current_buffer = tx_buffers.begin();\r
   list<bufferptr>::const_iterator it = pending_bl.buffers().begin();\r
+  unsigned total = 0;\r
   while (it != pending_bl.buffers().end()) {\r
     const uintptr_t addr = reinterpret_cast<const uintptr_t>(it->c_str());\r
-    uint32_t copied = 0;\r
-    //  ldout(cct, 20) << __func__ << " app_buffer: " << addr << " length:  " << it->length()  << dendl;\r
-    while(copied < it->length()) {\r
-      //   ldout(cct, 20) << __func__ << " current_buffer: " << *current_buffer << " copied:  " << copied  << dendl;\r
-      size_t ret = (*current_buffer)->write((char*)addr+copied, it->length() - copied);\r
-      copied += ret;\r
-      //  ldout(cct, 20) << __func__ << " ret: " << ret << " copied:  " << copied  << dendl;\r
-      if((*current_buffer)->full()){\r
+    unsigned copied = 0;\r
+    while (copied < it->length()) {\r
+      uint32_t r = (*current_buffer)->write((char*)addr+copied, it->length() - copied);\r
+      copied += r;\r
+      total += r;\r
+      if ((*current_buffer)->full()){\r
         ++current_buffer;\r
+        if (current_buffer == tx_buffers.end())\r
+          goto sending;\r
       }\r
     }\r
     ++it;\r
   }\r
 \r
-  ssize_t r = post_work_request(tx_buffers);\r
+ sending:\r
+  assert(total <= pending_bl.length());\r
+  bufferlist swapped;\r
+  if (total < pending_bl.length()) {\r
+    pending_bl.splice(total, pending_bl.length()-total, &swapped);\r
+    pending_bl.swap(swapped);\r
+  } else {\r
+    pending_bl.clear();\r
+  }\r
+\r
+  ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers "\r
+                 << pending_bl.buffers().size() << dendl;\r
+\r
+  int r = post_work_request(tx_buffers);\r
   if (r < 0)\r
     return r;\r
 \r
   ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl;\r
-  pending_bl.clear();\r
   return bytes;\r
 }\r
 \r
 int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)\r
 {\r
+  ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl;\r
   vector<Chunk*>::iterator current_buffer = tx_buffers.begin();\r
   ibv_sge isge[tx_buffers.size()];\r
   uint32_t current_sge = 0;\r
@@ -267,12 +398,14 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
   uint32_t current_swr = 0;\r
   ibv_send_wr* pre_wr = NULL;\r
 \r
+  memset(iswr, 0, sizeof(iswr));\r
+  memset(isge, 0, sizeof(isge));\r
   current_buffer = tx_buffers.begin();\r
   while (current_buffer != tx_buffers.end()) {\r
     isge[current_sge].addr = reinterpret_cast<uint64_t>((*current_buffer)->buffer);\r
     isge[current_sge].length = (*current_buffer)->get_offset();\r
     isge[current_sge].lkey = (*current_buffer)->mr->lkey;\r
-    ldout(cct, 20) << __func__ << " current_buffer: " << *current_buffer << " length: " << isge[current_sge].length  << dendl;\r
+    ldout(cct, 25) << __func__ << " sending buffer: " << *current_buffer << " length: " << isge[current_sge].length  << dendl;\r
 \r
     iswr[current_swr].wr_id = reinterpret_cast<uint64_t>(*current_buffer);\r
     iswr[current_swr].next = NULL;\r
@@ -280,7 +413,7 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
     iswr[current_swr].num_sge = 1;\r
     iswr[current_swr].opcode = IBV_WR_SEND;\r
     iswr[current_swr].send_flags = IBV_SEND_SIGNALED;\r
-    /*if(isge[current_sge].length < infiniband->max_inline_data) {\r
+    /*if (isge[current_sge].length < infiniband->max_inline_data) {\r
       iswr[current_swr].send_flags = IBV_SEND_INLINE;\r
       ldout(cct, 20) << __func__ << " send_inline." << dendl;\r
       }*/\r
@@ -300,5 +433,33 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
                << cpp_strerror(errno) << dendl;\r
     return -errno;\r
   }\r
+  ldout(cct, 20) << __func__ << " qp state is : " << Infiniband::qp_state_string(qp->get_state()) << dendl;\r
   return 0;\r
 }\r
+\r
+void RDMAConnectedSocketImpl::fin() {\r
+  ibv_send_wr wr;\r
+  memset(&wr, 0, sizeof(wr));\r
+  wr.wr_id = reinterpret_cast<uint64_t>(qp);\r
+  wr.num_sge = 0;\r
+  wr.opcode = IBV_WR_SEND;\r
+  wr.send_flags = IBV_SEND_SIGNALED;\r
+  ibv_send_wr* bad_tx_work_request;\r
+  if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) {\r
+    lderr(cct) << __func__ << " failed to send message="\r
+               << " ibv_post_send failed(most probably should be peer not ready): "\r
+               << cpp_strerror(errno) << dendl;\r
+    return ;\r
+  }\r
+}\r
+\r
+void RDMAConnectedSocketImpl::cleanup() {\r
+  if (con_handler) {\r
+    (static_cast<C_handle_connection*>(con_handler))->close();\r
+    worker->center.submit_to(worker->center.get_id(), [this]() {\r
+      worker->center.delete_file_event(tcp_fd, EVENT_READABLE);\r
+    }, false);\r
+    delete con_handler;\r
+    con_handler = nullptr;\r
+  }\r
+}\r
index 6f0c7de8e60f1c3b641a3930dba37b395f0d2327..7b3789e38c83e06431071d83afce5df7fed6c7ab 100644 (file)
 \r
 int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt)\r
 {\r
-  server_setup_socket = ::socket(sa.get_family(), SOCK_DGRAM, 0);\r
-  if (server_setup_socket == -1) {\r
+  int rc = 0;\r
+  server_setup_socket = infiniband->net.create_socket(sa.get_family(), true);\r
+  if (server_setup_socket < 0) {\r
+    rc = -errno;\r
     lderr(cct) << __func__ << " failed to create server socket: "\r
                << cpp_strerror(errno) << dendl;\r
-    return -errno;\r
+    return rc;\r
   }\r
 \r
-  int on = 1;\r
-  int rc = ::setsockopt(server_setup_socket, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));\r
+  rc = net.set_nonblock(server_setup_socket);\r
   if (rc < 0) {\r
-    lderr(cct) << __func__ << " unable to setsockopt: " << cpp_strerror(errno) << dendl;\r
     goto err;\r
   }\r
 \r
-  rc = ::bind(server_setup_socket, sa.get_sockaddr(), sa.get_sockaddr_len());\r
+  rc = net.set_socket_options(server_setup_socket, opt.nodelay, opt.rcbuf_size);\r
   if (rc < 0) {\r
-    lderr(cct) << __func__ << " unable to bind to " << sa.get_sockaddr()\r
-               << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl;\r
     goto err;\r
   }\r
+  net.set_close_on_exec(server_setup_socket);\r
 \r
-  rc = net.set_nonblock(server_setup_socket);\r
+  rc = ::bind(server_setup_socket, sa.get_sockaddr(), sa.get_sockaddr_len());\r
   if (rc < 0) {\r
+    rc = -errno;\r
+    ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr()\r
+                   << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl;\r
     goto err;\r
   }\r
 \r
-  net.set_close_on_exec(server_setup_socket);\r
+  rc = ::listen(server_setup_socket, 128);\r
+  if (rc < 0) {\r
+    rc = -errno;\r
+    lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(errno) << dendl;\r
+    goto err;\r
+  }\r
 \r
   ldout(cct, 20) << __func__ << " bind to " << sa.get_sockaddr() << " on port " << sa.get_port()  << dendl;\r
   return 0;\r
@@ -57,38 +64,45 @@ int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt)
 err:\r
   ::close(server_setup_socket);\r
   server_setup_socket = -1;\r
-  return -1;\r
+  return -errno;\r
 }\r
 \r
-int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out)\r
+int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w)\r
 {\r
   ldout(cct, 15) << __func__ << dendl;\r
-  int r;\r
-  RDMAConnectedSocketImpl* server;\r
-  while (1) {\r
-    IBSYNMsg msg;//TODO\r
-    entity_addr_t addr;\r
-    r = infiniband->recv_udp_msg(server_setup_socket, msg, &addr);\r
-    if (r < 0) {\r
-      r = -errno;\r
-      if (r != -EAGAIN)\r
-        ldout(cct, 10) << __func__ << " recv msg failed:" << cpp_strerror(errno)<< dendl;\r
-      break;\r
-    } else if (r > 0) {\r
-      ldout(cct, 1) << __func__ << " recv msg not whole." << dendl;\r
-      continue;\r
-    } else {\r
-      server = new RDMAConnectedSocketImpl(cct, infiniband, dispatcher, worker, msg);\r
-      msg = server->get_my_msg();\r
-      r = infiniband->send_udp_msg(server_setup_socket, msg, addr);\r
-      server->activate();\r
-      std::unique_ptr<RDMAConnectedSocketImpl> csi(server);\r
-      *sock = ConnectedSocket(std::move(csi));\r
-      if(out)\r
-        *out = sa;\r
-      return r;\r
-    }\r
+\r
+  assert(sock);\r
+  sockaddr_storage ss;\r
+  socklen_t slen = sizeof(ss);\r
+  int sd = ::accept(server_setup_socket, (sockaddr*)&ss, &slen);\r
+  if (sd < 0) {\r
+    return -errno;\r
+  }\r
+  ldout(cct, 20) << __func__ << " accepted a new QP, tcp_fd: " << sd << dendl;\r
+\r
+  infiniband->net.set_close_on_exec(sd);\r
+  int r = infiniband->net.set_nonblock(sd);\r
+  if (r < 0) {\r
+    ::close(sd);\r
+    return -errno;\r
   }\r
 \r
-  return r;\r
+  r = infiniband->net.set_socket_options(sd, opt.nodelay, opt.rcbuf_size);\r
+  if (r < 0) {\r
+    ::close(sd);\r
+    return -errno;\r
+  }\r
+  infiniband->net.set_priority(sd, opt.priority);\r
+\r
+  RDMAConnectedSocketImpl* server;\r
+  //Worker* w = dispatcher->get_stack()->get_worker();\r
+  server = new RDMAConnectedSocketImpl(cct, infiniband, dispatcher, dynamic_cast<RDMAWorker*>(w));\r
+  server->set_accept_fd(sd);\r
+  ldout(cct, 20) << __func__ << " accepted a new QP, tcp_fd: " << sd << dendl;\r
+  std::unique_ptr<RDMAConnectedSocketImpl> csi(server);\r
+  *sock = ConnectedSocket(std::move(csi));\r
+  if (out)\r
+    out->set_sockaddr((sockaddr*)&ss);\r
+\r
+  return 0;\r
 }\r
index 104fa3bf1700ba41ce27b084a2786b7d66e1c49d..3b2c6cc84b9527597ef5538c8ca11e2b5b54ded0 100644 (file)
@@ -25,8 +25,9 @@ static Infiniband* global_infiniband;
 
 RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
   : Worker(c, i), stack(nullptr), infiniband(NULL),
-    tx_handler(new C_handle_cq_tx(this)), memory_manager(NULL)
-{}
+    tx_handler(new C_handle_cq_tx(this)), memory_manager(NULL), lock("RDMAWorker::lock"), pended(false)
+{
+}
 
 int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
 {
@@ -44,77 +45,56 @@ int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket
 int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
 {
   RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this);
-  entity_addr_t sa;
-  memcpy(&sa, &addr, sizeof(addr));
-
-  IBSYNMsg msg = p->get_my_msg();
-  ldout(cct, 20) << __func__ << " connecting to " << sa.get_sockaddr() << " : " << sa.get_port() << dendl;
-  ldout(cct, 20) << __func__ << " my syn msg :  < " << msg.qpn << ", " << msg.psn <<  ", " << msg.lid << ">"<< dendl;
-
-  int client_setup_socket = ::socket(PF_INET, SOCK_DGRAM, 0);
-  if (client_setup_socket == -1) {
-    lderr(cct) << __func__ << " failed to create client socket: " << strerror(errno) << dendl;
-    return -errno;
-  }
-
-  int r = ::connect(client_setup_socket, addr.get_sockaddr(), addr.get_sockaddr_len());
-  if (r < 0) {
-    lderr(cct) << __func__ << " failed to connect " << addr << ": "
-               << strerror(errno) << dendl;
-    return -errno;
-  }
-
-  r = infiniband->send_udp_msg(client_setup_socket, msg, sa);
-  if (r < 0) {
-    ldout(cct, 0) << __func__ << " send msg failed." << dendl;
-    return r;
-  }
+  int r = p->try_connect(addr, opts);
 
-  // FIXME: need to make this async
-  r = infiniband->recv_udp_msg(client_setup_socket, msg, &sa);
   if (r < 0) {
-    ldout(cct, 0) << __func__ << " recv msg failed." << dendl;
+    ldout(cct, 1) << __func__ << " try connecting failed." << dendl;
     return r;
   }
-  p->set_peer_msg(msg);
-  ldout(cct, 20) << __func__ << " peer msg :  < " << msg.qpn << ", " << msg.psn <<  ", " << msg.lid << "> " << dendl;
-  r = p->activate();
-  assert(!r);
   std::unique_ptr<RDMAConnectedSocketImpl> csi(p);
   *socket = ConnectedSocket(std::move(csi));
-  ::close(client_setup_socket);
-
   return 0;
 }
 
+
 RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
 {
   if (!global_infiniband)
     global_infiniband = new Infiniband(
-        cct, cct->_conf->ms_async_rdma_device_name);
-  dispatcher = new RDMADispatcher(cct, global_infiniband);
+      cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
+  ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
+  dispatcher = new RDMADispatcher(cct, global_infiniband, this);
+  unsigned num = get_num_worker();
+  for (unsigned i = 0; i < num; ++i) {
+    RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
+    w->set_ib(global_infiniband);
+    w->set_stack(this);
+  }
+  ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher << dendl;
 }
 
 void RDMAWorker::initialize()
 {
-  infiniband = global_infiniband;
-  tx_cc = infiniband->create_comp_channel();
-  tx_cq = infiniband->create_comp_queue(tx_cc);
-  center.create_file_event(tx_cc->get_fd(), EVENT_READABLE, tx_handler);
+  dispatcher = stack->get_dispatcher();
+  notify_fd = dispatcher->register_worker(this);
+  center.create_file_event(notify_fd, EVENT_READABLE, tx_handler);
   memory_manager = infiniband->get_memory_manager();
 }
 
 int RDMAWorker::reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
 {
   int r = infiniband->get_tx_buffers(c, bytes);
-  if (r == 0) {
+  if (r > 0) {
     stack->get_dispatcher()->inflight += c.size();
+    ldout(cct, 30) << __func__ << " reserve " << c.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl;
     return r;
   }
+  assert(r == 0);
 
   if (pending_sent_conns.back() != o)
     pending_sent_conns.push_back(o);
-  return 0;
+  dispatcher->pending_buffers(this);
+  return r;
 }
 
 /**
@@ -127,10 +107,23 @@ int RDMAWorker::reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<C
  */
 int RDMAWorker::post_tx_buffer(std::vector<Chunk*> &chunks)
 {
+  if (chunks.empty())
+    return 0;
+
   stack->get_dispatcher()->inflight -= chunks.size();
-  infiniband->get_memory_manager()->return_tx(chunks);
+  memory_manager->return_tx(chunks);
+  ldout(cct, 30) << __func__ << " release " << chunks.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl;
+
+  pended = false;
+  std::set<RDMAConnectedSocketImpl*> done;
   while (!pending_sent_conns.empty()) {
     RDMAConnectedSocketImpl *o = pending_sent_conns.front();
+    if (done.count(o) == 0) {
+      done.insert(o);
+    } else {
+      pending_sent_conns.pop_front();
+      continue;
+    }
     ssize_t r = o->submit(false);
     ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
     if (r < 0) {
@@ -145,72 +138,61 @@ int RDMAWorker::post_tx_buffer(std::vector<Chunk*> &chunks)
 
 void RDMAWorker::handle_tx_event()
 {
-  ldout(cct, 20) << __func__ << dendl;
-  if (!tx_cc->get_cq_event())
-    return ;
-
-  static const int MAX_COMPLETIONS = 16;
-  ibv_wc wc[MAX_COMPLETIONS];
   std::vector<Chunk*> tx_chunks;
-  tx_chunks.reserve(MAX_COMPLETIONS);
-
-  bool rearmed = false;
-  int n;
- again:
-  n = tx_cq->poll_cq(MAX_COMPLETIONS, wc);
-  ldout(cct, 20) << __func__ << " pool completion queue got " << n
-                 << " responses."<< dendl;
-  for (int i = 0; i < n; ++i) {
-    ibv_wc* response = &wc[i];
+  std::vector<ibv_wc> cqe;
+  get_wc(cqe);
+
+  for (size_t i = 0; i < cqe.size(); ++i) {
+    ibv_wc* response = &cqe[i];
     Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
-    ldout(cct, 20) << __func__ << " opcode: " << response->opcode << " len: " << response->byte_len << dendl;
+    ldout(cct, 25) << __func__ << " QP: " << response->qp_num << " len: " << response->byte_len << " , addr:" << chunk << " " << infiniband->wc_status_to_string(response->status) << dendl;
 
     if (response->status != IBV_WC_SUCCESS) {
       if (response->status == IBV_WC_RETRY_EXC_ERR) {
-        lderr(cct) << __func__ << " connection between server and client not working. Disconnect this now" << dendl;
+        ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl;
       } else if (response->status == IBV_WC_WR_FLUSH_ERR) {
-        lderr(cct) << __func__ << " Work Request Flushed Error: this connection's qp="
-                               << response->qp_num << " should be down while this WR=" << response->wr_id
-                               << " still in flight." << dendl;
+        ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp="
+                      << response->qp_num << " should be down while this WR=" << response->wr_id
+                      << " still in flight." << dendl;
       } else {
-        lderr(cct) << __func__ << " send work request returned error for buffer("
-                               << response->wr_id << ") status(" << response->status << "): "
-                               << infiniband->wc_status_to_string(response->status) << dendl;
+        ldout(cct, 1) << __func__ << " send work request returned error for buffer("
+                      << response->wr_id << ") status(" << response->status << "): "
+                      << infiniband->wc_status_to_string(response->status) << dendl;
       }
       RDMAConnectedSocketImpl *conn = stack->get_dispatcher()->get_conn_by_qp(response->qp_num);
       if (conn) {
+        ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi
         conn->fault();
       } else {
-        ldout(cct, 0) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
+        ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
       }
     }
 
-    assert(memory_manager->is_tx_chunk(chunk));
-    tx_chunks.push_back(chunk);
-  }
-
-  if (n) {
-    post_tx_buffer(tx_chunks);
-    tx_chunks.clear();
-    goto again;
+    //assert(memory_manager->is_tx_chunk(chunk));
+    if (memory_manager->is_tx_chunk(chunk)) {
+      tx_chunks.push_back(chunk);
+    } else {
+      ldout(cct, 1) << __func__ << " a outter chunk: " << chunk << dendl;//fin
+    }
   }
 
-  if (!rearmed) {
-    tx_cq->rearm_notify();
-    rearmed = true;
-    // Clean up cq events after rearm notify ensure no new incoming event
-    // arrived between polling and rearm
-    goto again;
-  }
+  post_tx_buffer(tx_chunks);
 
-  ldout(cct, 20) << __func__ << " leaving handle_tx_event. " << dendl;
+  ldout(cct, 20) << __func__ << " give back " << tx_chunks.size() << " in Worker " << this << dendl;
+  dispatcher->notify_pending_workers();
 }
 
 RDMADispatcher::~RDMADispatcher()
 {
   done = true;
   t.join();
-  assert(qp_conns.empty());
+  ldout(cct, 20) << __func__ << " ing..." << dendl;
+  auto i = qp_conns.begin();
+  while (i != qp_conns.end()) {
+    delete i->second.first;
+    ++i;
+  }
+
   while (!dead_queue_pairs.empty()) {
     delete dead_queue_pairs.back();
     dead_queue_pairs.pop_back();
@@ -243,12 +225,12 @@ void RDMADispatcher::handle_async_event()
       if (!conn) {
         ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
       } else {
-        ldout(cct, 0) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
+        ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
         conn->fault();
         erase_qpn(qpn);
       }
     } else {
-      ldout(cct, 0) << __func__ << " ibv_get_async_event: dev=" << ib->get_device()->ctxt
+      ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << ib->get_device()->ctxt
                     << " evt: " << ibv_event_type_str(async_event.event_type)
                     << dendl;
     }
@@ -256,16 +238,19 @@ void RDMADispatcher::handle_async_event()
   }
 }
 
-
 void RDMADispatcher::polling()
 {
   static int MAX_COMPLETIONS = 32;
   ibv_wc wc[MAX_COMPLETIONS];
 
   std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
-  int i, n;
-  while (!done) {
-    n = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
+  std::vector<ibv_wc> tx_cqe;
+  RDMAWorker* worker;
+  ldout(cct, 20) << __func__ << " going to poll rx cq:" << rx_cq << dendl;
+  RDMAConnectedSocketImpl *conn = nullptr;
+
+  while (true) {
+    int n = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
     if (!n) {
       // NOTE: Has TX just transitioned to idle? We should do it when idle!
       // It's now safe to delete queue pairs (see comment by declaration
@@ -273,36 +258,48 @@ void RDMADispatcher::polling()
       // Additionally, don't delete qp while outstanding_buffers isn't empty,
       // because we need to check qp's state before sending
       if (!inflight.load()) {
+        Mutex::Locker l(lock); // FIXME reuse dead qp because creating one qp costs 1 ms
         while (!dead_queue_pairs.empty()) {
           ldout(cct, 10) << __func__ << " finally delete qp=" << dead_queue_pairs.back() << dendl;
           delete dead_queue_pairs.back();
           dead_queue_pairs.pop_back();
         }
       }
-      handle_async_event();
+      // handle_async_event();
+      if (done && !inflight)
+        break;
       continue;
     }
 
     ldout(cct, 20) << __func__ << " pool completion queue got " << n
                    << " responses."<< dendl;
-    for (i = 0; i < n; ++i) {
+    Mutex::Locker l(lock);//make sure connected socket alive when pass wc
+    for (int i = 0; i < n; ++i) {
       ibv_wc* response = &wc[i];
       Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
-      ldout(cct, 20) << __func__ << " got chunk=" << response->wr_id << " qp:" << wc[i].qp_num << dendl;
 
       if (response->status != IBV_WC_SUCCESS) {
-        lderr(cct) << __func__ << " work request returned error for buffer(" << response->wr_id
-                   << ") status(" << response->status << ":"
-                   << ib->wc_status_to_string(response->status) << dendl;
-        ib->post_chunk(chunk);
+        ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
+                      << ") status(" << response->status << ":"
+                      << ib->wc_status_to_string(response->status) << dendl;
+        ib->recall_chunk(chunk);
+        conn = get_conn_lockless(response->qp_num);
+        if (conn && conn->is_connected())
+          conn->fault();
+        notify_pending_workers();
         continue;
       }
 
-      RDMAConnectedSocketImpl *conn = get_conn_by_qp(response->qp_num);
+      if (wc[i].opcode == IBV_WC_SEND) {
+        tx_cqe.push_back(wc[i]);
+        ldout(cct, 25) << " got a tx cqe, bytes:" << wc[i].byte_len << dendl; 
+        continue;
+      }
+      ldout(cct, 25) << __func__ << " got chunk=" << chunk << " bytes:" << response->byte_len << " opcode:" << response->opcode << dendl;
+      conn = get_conn_lockless(response->qp_num);
       if (!conn) {
-        // discard buffer
-        ldout(cct, 0) << __func__ << " missing qp_num " << response->qp_num << ", discard bd "
-                      << chunk << dendl;
+        int ret = ib->recall_chunk(chunk);
+        ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << ret << dendl;
         continue;
       }
       polled[conn].push_back(*response);
@@ -310,5 +307,20 @@ void RDMADispatcher::polling()
     for (auto &&i : polled)
       i.first->pass_wc(std::move(i.second));
     polled.clear();
+    if (!tx_cqe.empty()) {
+      worker = get_worker_from_list();
+      if (worker == nullptr)
+        worker = dynamic_cast<RDMAWorker*>(stack->get_worker());
+      worker->pass_wc(std::move(tx_cqe));
+      tx_cqe.clear();
+    }
   }
 }
+
+void RDMADispatcher::notify_pending_workers() {
+    Mutex::Locker l(w_lock);
+    if (pending_workers.empty())
+      return ;
+    pending_workers.front()->pass_wc(std::move(vector<ibv_wc>()));
+    pending_workers.pop_front();
+}
index a1f8158585194d30265c64c9716854f1bd0b2da2..dc5bc1a44e563415a37a04aa2e511998922f6883 100644 (file)
@@ -30,7 +30,9 @@
 #include "Infiniband.h"
 
 class RDMAConnectedSocketImpl;
+class RDMAServerSocketImpl;
 class RDMAStack;
+class RDMAWorker;
 
 class RDMADispatcher {
   typedef Infiniband::MemoryManager::Chunk Chunk;
@@ -44,6 +46,7 @@ class RDMADispatcher {
   EventCallbackRef async_handler;
   bool done = false;
   Mutex lock; // protect `qp_conns
+  Mutex w_lock; // protect pending workers
   // qp_num -> InfRcConnection
   // The main usage of `qp_conns` is looking up connection by qp_num,
   // so the lifecycle of element in `qp_conns` is the lifecycle of qp.
@@ -65,7 +68,10 @@ class RDMADispatcher {
   /// save them in this vector and delete them at a safe time, when there are
   /// no outstanding transmit buffers to be lost.
   std::vector<QueuePair*> dead_queue_pairs;
-
+  Mutex qp_lock;//for csi reuse qp
+  ceph::unordered_map<RDMAWorker*, int> workers;;
+  std::list<RDMAWorker*> pending_workers;
+  RDMAStack* stack;
   class C_handle_cq_async : public EventCallback {
     RDMADispatcher *dispatcher;
    public:
@@ -77,16 +83,17 @@ class RDMADispatcher {
   };
 
  public:
-  std::atomic_ulong inflight;
-  explicit RDMADispatcher(CephContext* cct, Infiniband* i)
-    : t(&RDMADispatcher::polling, this), cct(cct), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock") {
+  std::atomic<uint64_t> inflight = {0};
+  explicit RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s)
+    : cct(c), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
+      w_lock("RDMADispatcher::for worker pending list"), qp_lock("for qp lock"), stack(s) {
     rx_cc = ib->create_comp_channel();
     assert(rx_cc);
     rx_cq = ib->create_comp_queue(rx_cc);
     assert(rx_cq);
+    t = std::thread(&RDMADispatcher::polling, this);
   }
   virtual ~RDMADispatcher();
-
   void handle_async_event();
   void polling();
   int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi) {
@@ -97,13 +104,45 @@ class RDMADispatcher {
     qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
     return fd;
   }
+  int register_worker(RDMAWorker* w) {
+    int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
+    assert(fd >= 0);
+    Mutex::Locker l(w_lock);
+    workers[w] = fd;
+    return fd;
+  }
+  void pending_buffers(RDMAWorker* w) {
+    Mutex::Locker l(w_lock);
+    pending_workers.push_back(w);
+  }
+  RDMAStack* get_stack() {
+    return stack;
+  }
+  RDMAWorker* get_worker_from_list() {
+    Mutex::Locker l(w_lock);
+    if (pending_workers.empty())
+      return nullptr;
+    else {
+      RDMAWorker* w = pending_workers.front();
+      pending_workers.pop_front();
+      return w;
+    }
+  }
   RDMAConnectedSocketImpl* get_conn_by_qp(uint32_t qp) {
     Mutex::Locker l(lock);
     auto it = qp_conns.find(qp);
     if (it == qp_conns.end())
-      return NULL;
+      return nullptr;
+    if (it->second.first->is_dead())
+      return nullptr;
+    return it->second.second;
+  }
+  RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp) {
+    auto it = qp_conns.find(qp);
+    if (it == qp_conns.end())
+      return nullptr;
     if (it->second.first->is_dead())
-      return NULL;
+      return nullptr;
     return it->second.second;
   }
   void erase_qpn(uint32_t qpn) {
@@ -114,6 +153,8 @@ class RDMADispatcher {
     dead_queue_pairs.push_back(it->second.first);
     qp_conns.erase(it);
   }
+  Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; }
+  void notify_pending_workers();
 };
 
 
@@ -122,14 +163,17 @@ class RDMAWorker : public Worker {
   typedef Infiniband::CompletionChannel CompletionChannel;
   typedef Infiniband::MemoryManager::Chunk Chunk;
   typedef Infiniband::MemoryManager MemoryManager;
+  typedef std::vector<Chunk*>::iterator ChunkIter;
   RDMAStack *stack;
   Infiniband *infiniband;
-  CompletionQueue *tx_cq;           // common completion queue for all transmits
-  CompletionChannel *tx_cc;
   EventCallbackRef tx_handler;
   MemoryManager *memory_manager;
   std::list<RDMAConnectedSocketImpl*> pending_sent_conns;
-
+  RDMADispatcher* dispatcher;
+  int notify_fd = -1;
+  Mutex lock;
+  std::vector<ibv_wc> wc;
+  bool pended;
   class C_handle_cq_tx : public EventCallback {
     RDMAWorker *worker;
     public:
@@ -142,25 +186,53 @@ class RDMAWorker : public Worker {
  public:
   explicit RDMAWorker(CephContext *c, unsigned i);
   virtual ~RDMAWorker() {
-    tx_cc->ack_events();
-    delete tx_cq;
-    delete tx_cc;
     delete tx_handler;
+    if (notify_fd >= 0)
+      ::close(notify_fd);
+  }
+  void notify() {
+    uint64_t i = 1;
+    assert(write(notify_fd, &i, sizeof(i)) == sizeof(i));
+  }
+  void pass_wc(std::vector<ibv_wc> &&v) {
+    Mutex::Locker l(lock);
+    if (wc.empty())
+      wc = std::move(v);
+    else
+      wc.insert(wc.end(), v.begin(), v.end());
+    notify();
+  }
+  void get_wc(std::vector<ibv_wc> &w) {
+    Mutex::Locker l(lock);
+    if (wc.empty())
+      return ;
+    w.swap(wc);
   }
-
   virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override;
   virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
   virtual void initialize() override;
-  CompletionQueue* get_tx_cq() { return tx_cq; }
   RDMAStack *get_stack() {
     return stack;
   }
   int reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes);
   int post_tx_buffer(std::vector<Chunk*> &chunks);
+  void add_pending_conn(RDMAConnectedSocketImpl* o) {
+    pending_sent_conns.push_back(o);
+    if (!pended) {
+      dispatcher->pending_buffers(this);
+      pended = true;
+    }
+  }
   void remove_pending_conn(RDMAConnectedSocketImpl *o) {
     pending_sent_conns.remove(o);
   }
   void handle_tx_event();
+  void set_ib(Infiniband* ib) {
+    infiniband = ib;
+  }
+  void set_stack(RDMAStack *s) {
+    stack = s;
+  }
 };
 
 class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
@@ -180,11 +252,17 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
   RDMADispatcher* dispatcher;
   RDMAWorker* worker;
   std::vector<Chunk*> buffers;
-  int notify_fd;
+  int notify_fd = -1;
   bufferlist pending_bl;
 
   Mutex lock;
   std::vector<ibv_wc> wc;
+  bool is_server;
+  RDMAServerSocketImpl* ssi;
+  EventCallbackRef con_handler;
+  int tcp_fd = -1;
+  bool active;// qp is active ?
+  bool detached;
 
   void notify() {
     uint64_t i = 1;
@@ -195,18 +273,34 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
 
  public:
   RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
-                          RDMAWorker *w, IBSYNMsg im = IBSYNMsg())
-    : cct(cct), peer_msg(im), connected(0), error(0), infiniband(ib),
-      dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock") {
-    qp = infiniband->create_queue_pair(w->get_tx_cq(), IBV_QPT_RC);
+                          RDMAWorker *w)
+    : cct(cct), connected(0), error(0), infiniband(ib),
+      dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"),
+      is_server(false), con_handler(new C_handle_connection(this)),
+      active(false), detached(false) {
+    qp = infiniband->create_queue_pair(s->get_rx_cq(), s->get_rx_cq(), IBV_QPT_RC);
     my_msg.qpn = qp->get_local_qp_number();
     my_msg.psn = qp->get_initial_psn();
     my_msg.lid = infiniband->get_lid();
+    my_msg.peer_qpn = 0;
     my_msg.gid = infiniband->get_gid();
     notify_fd = dispatcher->register_qp(qp, this);
   }
+
   virtual ~RDMAConnectedSocketImpl() {
     worker->remove_pending_conn(this);
+    dispatcher->erase_qpn(my_msg.qpn);
+    cleanup();
+    if (notify_fd >= 0)
+      ::close(notify_fd);
+    if (tcp_fd >= 0)
+      ::close(tcp_fd);
+    error = ECONNRESET;
+    Mutex::Locker l(lock);
+    for (unsigned i=0; i < wc.size(); ++i)
+      infiniband->recall_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
+    for (unsigned i=0; i < buffers.size(); ++i)
+      infiniband->recall_chunk(buffers[i]);
   }
 
   void pass_wc(std::vector<ibv_wc> &&v) {
@@ -217,48 +311,76 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
       wc.insert(wc.end(), v.begin(), v.end());
     notify();
   }
+
   void get_wc(std::vector<ibv_wc> &w) {
     Mutex::Locker l(lock);
     if (wc.empty())
       return ;
     w.swap(wc);
   }
+
   virtual int is_connected() override {
     return connected;
   }
+
   virtual ssize_t read(char* buf, size_t len) override;
   virtual ssize_t zero_copy_read(bufferptr &data) override;
   virtual ssize_t send(bufferlist &bl, bool more) override;
   virtual void shutdown() override {
-    if (qp) {
-      qp->to_dead();
-      qp = NULL;
-    }
+    if (!error)
+      fin();
+    error = ECONNRESET;
+    active = false;
   }
   virtual void close() override {
-    if (qp) {
-      qp->to_dead();
-      qp = NULL;
-    }
+    if (!error)
+      fin();
+    error = ECONNRESET;
+    active = false;
   }
   virtual int fd() const override {
     return notify_fd;
   }
   void fault() {
-    if (qp) {
+    /*if (qp) {
       qp->to_dead();
       qp = NULL;
-    }
+    }*/
     error = ECONNRESET;
+    connected = 1;
     notify();
   }
+  const char* get_qp_state() {
+    return Infiniband::qp_state_string(qp->get_state());
+  }
   ssize_t submit(bool more);
   int activate();
-  IBSYNMsg get_my_msg() { return my_msg; }
-  void set_peer_msg(IBSYNMsg m) { peer_msg = m ;}
+  void fin();
+  void handle_connection();
+  void cleanup();
+  void set_accept_fd(int sd) {
+    tcp_fd = sd;
+    is_server = true;
+    worker->center.submit_to(worker->center.get_id(), [this]() {
+      worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
+    }, true);
+  }
+  int try_connect(const entity_addr_t&, const SocketOptions &opt);
+  class C_handle_connection : public EventCallback {
+    RDMAConnectedSocketImpl *csi;
+    bool active;
+   public:
+    C_handle_connection(RDMAConnectedSocketImpl *w): csi(w), active(true) {}
+    void do_request(int fd) {
+      if (active)
+        csi->handle_connection();
+    }
+    void close() {
+      active = false;
+    }
+  };
 };
 
-
 class RDMAServerSocketImpl : public ServerSocketImpl {
   CephContext *cct;
   NetHandler net;
@@ -272,7 +394,7 @@ class RDMAServerSocketImpl : public ServerSocketImpl {
   RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
     : cct(cct), net(cct), server_setup_socket(-1), infiniband(i), dispatcher(s), worker(w), sa(a) {}
   int listen(entity_addr_t &sa, const SocketOptions &opt);
-  virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out) override;
+  virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
   virtual void abort_accept() override {
     if (server_setup_socket >= 0)
       ::close(server_setup_socket);
@@ -280,18 +402,20 @@ class RDMAServerSocketImpl : public ServerSocketImpl {
   virtual int fd() const override {
     return server_setup_socket;
   }
+  int get_fd() { return server_setup_socket; }
 };
 
-
 class RDMAStack : public NetworkStack {
   vector<std::thread> threads;
   RDMADispatcher *dispatcher;
 
  public:
   explicit RDMAStack(CephContext *cct, const string &t);
-  virtual ~RDMAStack() { delete dispatcher; }
-  virtual bool support_zero_copy_read() const override { return true; }
-  //virtual bool support_local_listen_table() const { return true; }
+  virtual ~RDMAStack() {
+    delete dispatcher;
+  }
+  virtual bool support_zero_copy_read() const override { return false; }
+  virtual bool nonblock_connect_need_writable_event() const { return false; }
 
   virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override {
     threads.resize(i+1);
index 3ae4615c94bf45268add89682d2cb71edcc86ac8..7fb0e14e6b95518eaaa6e30106f656f6ed09aae5 100644 (file)
@@ -163,7 +163,7 @@ TEST_P(NetworkWorkerTest, SimpleTest) {
     }
 
     if (is_my_accept) {
-      r = bind_socket.accept(&srv_socket, options, &cli_addr);
+      r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
       ASSERT_EQ(0, r);
       ASSERT_TRUE(srv_socket.fd() > 0);
     }
@@ -268,7 +268,7 @@ TEST_P(NetworkWorkerTest, ConnectFailedTest) {
       center->create_file_event(cli_socket2.fd(), EVENT_READABLE, &cb);
       r = cli_socket2.is_connected();
       if (r == 0) {
-        ASSERT_FALSE(cb.poll(500));
+        cb.poll(500);
         r = cli_socket2.is_connected();
       }
       ASSERT_TRUE(r != 1);
@@ -310,7 +310,7 @@ TEST_P(NetworkWorkerTest, AcceptAndCloseTest) {
 
       ConnectedSocket srv_socket, cli_socket;
       if (bind_socket) {
-        r = bind_socket.accept(&srv_socket, options, &cli_addr);
+        r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
         ASSERT_EQ(-EAGAIN, r);
       }
 
@@ -327,7 +327,7 @@ TEST_P(NetworkWorkerTest, AcceptAndCloseTest) {
         cb.poll(500);
         ConnectedSocket srv_socket2;
         do {
-          r = bind_socket.accept(&srv_socket2, options, &cli_addr);
+          r = bind_socket.accept(&srv_socket2, options, &cli_addr, worker);
           usleep(100);
         } while (r == -EAGAIN && !*accepted_p);
         if (r == 0)
@@ -365,7 +365,7 @@ TEST_P(NetworkWorkerTest, AcceptAndCloseTest) {
 
       if (bind_socket) {
         do {
-          r = bind_socket.accept(&srv_socket, options, &cli_addr);
+          r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
           usleep(100);
         } while (r == -EAGAIN && !*accepted_p);
         if (r == 0)
@@ -398,12 +398,14 @@ TEST_P(NetworkWorkerTest, AcceptAndCloseTest) {
 
 TEST_P(NetworkWorkerTest, ComplexTest) {
   entity_addr_t bind_addr;
+  std::atomic_bool listen_done(false);
+  std::atomic_bool *listen_p = &listen_done;
   std::atomic_bool accepted(false);
   std::atomic_bool *accepted_p = &accepted;
   std::atomic_bool done(false);
   std::atomic_bool *done_p = &done;
   ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
-  exec_events([this, bind_addr, accepted_p, done_p](Worker *worker) mutable {
+  exec_events([this, bind_addr, listen_p, accepted_p, done_p](Worker *worker) mutable {
     entity_addr_t cli_addr;
     EventCenter *center = &worker->center;
     SocketOptions options;
@@ -412,20 +414,28 @@ TEST_P(NetworkWorkerTest, ComplexTest) {
     if (stack->support_local_listen_table() || worker->id == 0) {
       r = worker->listen(bind_addr, options, &bind_socket);
       ASSERT_EQ(0, r);
+      *listen_p = true;
     }
     ConnectedSocket cli_socket, srv_socket;
     if (worker->id == 1) {
-      r = worker->connect(bind_addr, options, &cli_socket);
-      ASSERT_EQ(0, r);
+      while (!*listen_p) {
+        usleep(50);
+        r = worker->connect(bind_addr, options, &cli_socket);
+        ASSERT_EQ(0, r);
+      }
     }
 
     if (bind_socket) {
       C_poll cb(center);
       center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb);
-      if (cb.poll(500)) {
-        r = bind_socket.accept(&srv_socket, options, &cli_addr);
-        ASSERT_EQ(0, r);
-        *accepted_p = true;
+      int count = 3;
+      while (count--) {
+        if (cb.poll(500)) {
+          r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
+          ASSERT_EQ(0, r);
+          *accepted_p = true;
+          break;
+        }
       }
       ASSERT_TRUE(*accepted_p);
       center->delete_file_event(bind_socket.fd(), EVENT_READABLE);
@@ -875,16 +885,17 @@ class StressFactory {
     StressFactory *factory;
     ServerSocket bind_socket;
     ThreadData *t_data;
+    Worker *worker;
 
    public:
-    C_accept(StressFactory *f, ServerSocket s, ThreadData *data)
-        : factory(f), bind_socket(std::move(s)), t_data(data) {}
+    C_accept(StressFactory *f, ServerSocket s, ThreadData *data, Worker *w)
+        : factory(f), bind_socket(std::move(s)), t_data(data), worker(w) {}
     void do_request(int id) {
       while (true) {
         entity_addr_t cli_addr;
         ConnectedSocket srv_socket;
         SocketOptions options;
-        int r = bind_socket.accept(&srv_socket, options, &cli_addr);
+        int r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
         if (r == -EAGAIN) {
           break;
         }
@@ -906,6 +917,7 @@ class StressFactory {
   const size_t client_num, queue_depth, max_message_length;
   atomic_int message_count, message_left;
   entity_addr_t bind_addr;
+  std::atomic_bool already_bind = {false};
   bool zero_copy_read;
   SocketOptions options;
 
@@ -955,12 +967,15 @@ class StressFactory {
     if (stack->support_local_listen_table() || worker->id == 0) {
       r = worker->listen(bind_addr, options, &bind_socket);
       ASSERT_EQ(0, r);
+      already_bind = true;
     }
+    while (!already_bind)
+      usleep(50);
     C_accept *accept_handler = nullptr;
     int bind_fd = 0;
     if (bind_socket) {
       bind_fd = bind_socket.fd();
-      accept_handler = new C_accept(this, std::move(bind_socket), &t_data);
+      accept_handler = new C_accept(this, std::move(bind_socket), &t_data, worker);
       ASSERT_EQ(0, worker->center.create_file_event(
                   bind_fd, EVENT_READABLE, accept_handler));
     }