]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg/async/rdma: cleanup
authorHaomai Wang <haomai@xsky.com>
Fri, 17 Feb 2017 17:02:54 +0000 (01:02 +0800)
committerHaomai Wang <haomai@xsky.com>
Mon, 27 Feb 2017 08:22:32 +0000 (16:22 +0800)
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h

index 2c2e79a2cdfdeb2fb4fecdb078dc3e52546bf0ec..eeb6ffe2f11eb4188b5bbb4af9e7af5ffad562de 100644 (file)
@@ -593,7 +593,7 @@ int Infiniband::MemoryManager::Cluster::add(uint32_t num)
     assert(m);
     new(chunk) Chunk(m, chunk_size, base+offset);
     free_chunks.push_back(chunk);
-    all_chunks.insert(chunk);
+    all_buffers.insert(chunk->buffer);
     ptr += sizeof(Chunk);
   }
   return 0;
@@ -771,20 +771,6 @@ int Infiniband::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes)
   return memory_manager->get_send_buffers(c, bytes);
 }
 
-int Infiniband::recall_chunk(Chunk* c)
-{
-  if (memory_manager->is_rx_chunk(c)) {
-    post_chunk(c);  
-    return 1;
-  } else if (memory_manager->is_tx_chunk(c)) {
-    vector<Chunk*> v;
-    v.push_back(c);
-    memory_manager->return_tx(v);  
-    return 2;
-  }
-  return -1;
-}
-
 /**
  * Create a new QueuePair. This factory should be used in preference to
  * the QueuePair constructor directly, since this lets derivatives of
index 20bdcccbefbacaddbd364c62be2e473eb81a951c..d09bf80ce377c9d58bb761fc9edcffaa21dc140b 100644 (file)
@@ -169,12 +169,17 @@ class Infiniband {
       int add(uint32_t num);
       void take_back(std::vector<Chunk*> &ck);
       int get_buffers(std::vector<Chunk*> &chunks, size_t bytes);
+      Chunk *get_chunk_by_buffer(const char *c) {
+        uint32_t idx = (c - base) / chunk_size;
+        Chunk *chunk = reinterpret_cast<Chunk*>(chunk_base + sizeof(Chunk) * idx);
+        return chunk;
+      }
 
       MemoryManager& manager;
       uint32_t chunk_size;
       Mutex lock;
       std::vector<Chunk*> free_chunks;
-      std::set<Chunk*> all_chunks;
+      std::set<const char*> all_buffers;
       char* base;
       char* chunk_base;
     };
@@ -188,8 +193,15 @@ class Infiniband {
     void return_tx(std::vector<Chunk*> &chunks);
     int get_send_buffers(std::vector<Chunk*> &c, size_t bytes);
     int get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes);
-    int is_tx_chunk(Chunk* c) { return send->all_chunks.count(c);}
-    int is_rx_chunk(Chunk* c) { return channel->all_chunks.count(c);}
+    // TODO: optimize via address judgement
+    bool is_tx_buffer(const char* c) { return send->all_buffers.count(c); }
+    bool is_rx_buffer(const char* c) { return channel->all_buffers.count(c); }
+    Chunk *get_tx_chunk_by_buffer(const char *c) {
+      return send->get_chunk_by_buffer(c);
+    }
+    uint32_t get_tx_chunk_size() const {
+      return send->chunk_size;
+    }
 
     bool enabled_huge_page;
 
@@ -349,9 +361,9 @@ class Infiniband {
   MemoryManager* get_memory_manager() { return memory_manager; }
   Device* get_device() { return device; }
   int get_async_fd() { return device->ctxt->async_fd; }
-  int recall_chunk(Chunk* c);
-  int is_tx_chunk(Chunk* c) { return memory_manager->is_tx_chunk(c); }
-  int is_rx_chunk(Chunk* c) { return memory_manager->is_rx_chunk(c); }
+  bool is_tx_buffer(const char* c) { return memory_manager->is_tx_buffer(c);}
+  bool is_rx_buffer(const char* c) { return memory_manager->is_rx_buffer(c);}
+  Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); }
   static const char* wc_status_to_string(int status);
   static const char* qp_state_string(int status);
 };
index 2747193c0ba0a2f43b8ade21defa243d9658753f..a9666b66382f243f1cf7b4c69707cb332583f510 100644 (file)
@@ -43,19 +43,24 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
 {
   ldout(cct, 20) << __func__ << " destruct." << dendl;
   dispatcher->perf_logger->dec(l_msgr_rdma_active_queue_pair);
+  cleanup();
   worker->remove_pending_conn(this);
   dispatcher->erase_qpn(my_msg.qpn);
-  cleanup();
+  Mutex::Locker l(lock);
   if (notify_fd >= 0)
     ::close(notify_fd);
   if (tcp_fd >= 0)
     ::close(tcp_fd);
   error = ECONNRESET;
-  Mutex::Locker l(lock);
-  for (unsigned i=0; i < wc.size(); ++i)
-    infiniband->recall_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
-  for (unsigned i=0; i < buffers.size(); ++i)
-    infiniband->recall_chunk(buffers[i]);
+  int ret = 0;
+  for (unsigned i=0; i < wc.size(); ++i) {
+    ret = infiniband->post_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
+    assert(ret == 0);
+  }
+  for (unsigned i=0; i < buffers.size(); ++i) {
+    ret = infiniband->post_chunk(buffers[i]);
+    assert(ret == 0);
+  }
 }
 
 void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
@@ -266,27 +271,26 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
     Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
     ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl;
     chunk->prepare_read(response->byte_len);
+    worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len);
     if (response->byte_len == 0) {
       dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
       if (connected) {
         error = ECONNRESET;
-        assert(infiniband->post_chunk(chunk) == 0);
         ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
       }
-      break;
-    }
-    worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len);
-    //assert(response->byte_len);
-    if (read == (ssize_t)len) {
-      buffers.push_back(chunk);
-      ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;
-    } else if (read + response->byte_len > (ssize_t)len) {
-      read += chunk->read(buf+read, (ssize_t)len-read);
-      buffers.push_back(chunk);
-      ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
-    } else {
-      read += chunk->read(buf+read, response->byte_len);
       assert(infiniband->post_chunk(chunk) == 0);
+    } else {
+      if (read == (ssize_t)len) {
+        buffers.push_back(chunk);
+        ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;
+      } else if (read + response->byte_len > (ssize_t)len) {
+        read += chunk->read(buf+read, (ssize_t)len-read);
+        buffers.push_back(chunk);
+        ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
+      } else {
+        read += chunk->read(buf+read, response->byte_len);
+        assert(infiniband->post_chunk(chunk) == 0);
+      }
     }
   }
 
@@ -306,7 +310,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
 ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
 {
   size_t read = 0, tmp = 0;
-  vector<Chunk*>::iterator c = buffers.begin();
+  auto c = buffers.begin();
   for (; c != buffers.end() ; ++c) {
     tmp = (*c)->read(buf+read, len-read);
     read += tmp;
@@ -404,39 +408,75 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
   if (error)
     return -error;
   Mutex::Locker l(lock);
-  std::vector<Chunk*> tx_buffers;
   size_t bytes = pending_bl.length();
   ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: "
                  << pending_bl.buffers().size() << dendl;
   if (!bytes)
     return 0;
 
-  int ret = worker->reserve_message_buffer(this, tx_buffers, bytes);
-  if (ret == 0) {
-    ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
-    worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
-    return -EAGAIN; // that is ok , cause send will return bytes. == 0 enough buffers, < 0 no buffer, >0 not enough
-  }
-  vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
-  list<bufferptr>::const_iterator it = pending_bl.buffers().begin();
+  auto fill_tx_via_copy = [this](std::vector<Chunk*> &tx_buffers, unsigned bytes,
+                                 std::list<bufferptr>::const_iterator &start,
+                                 std::list<bufferptr>::const_iterator &end) -> unsigned {
+    assert(start != end);
+    auto chunk_idx = tx_buffers.size();
+    int ret = worker->reserve_message_buffer(this, tx_buffers, bytes);
+    if (ret == 0) {
+      ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
+      worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
+      return 0;
+    }
+
+    unsigned total_copied = 0;
+    Chunk *current_chunk = tx_buffers[chunk_idx];
+    while (start != end) {
+      const uintptr_t addr = reinterpret_cast<const uintptr_t>(start->c_str());
+      unsigned copied = 0;
+      while (copied < start->length()) {
+        uint32_t r = current_chunk->write((char*)addr+copied, start->length() - copied);
+        copied += r;
+        total_copied += r;
+        bytes -= r;
+        if (current_chunk->full()){
+          current_chunk = tx_buffers[++chunk_idx];
+          if (chunk_idx == tx_buffers.size())
+            return total_copied;
+        }
+      }
+      ++start;
+    }
+    assert(bytes == 0);
+    return total_copied;
+  };
+
+  std::vector<Chunk*> tx_buffers;
+  std::list<bufferptr>::const_iterator it = pending_bl.buffers().begin();
+  std::list<bufferptr>::const_iterator copy_it = it;
   unsigned total = 0;
+  unsigned need_reserve_bytes = 0;
   while (it != pending_bl.buffers().end()) {
-    const uintptr_t addr = reinterpret_cast<const uintptr_t>(it->c_str());
-    unsigned copied = 0;
-    while (copied < it->length()) {
-      uint32_t r = (*current_buffer)->write((char*)addr+copied, it->length() - copied);
-      copied += r;
-      total += r;
-      if ((*current_buffer)->full()){
-        ++current_buffer;
-        if (current_buffer == tx_buffers.end())
+    if (infiniband->is_tx_buffer(it->raw_c_str())) {
+      if (need_reserve_bytes) {
+        unsigned copied = fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it);
+        total += copied;
+        if (copied < need_reserve_bytes)
           goto sending;
+        need_reserve_bytes = 0;
       }
+      assert(copy_it == it);
+      tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str()));
+      total += it->length();
+      ++copy_it;
+    } else {
+      need_reserve_bytes += it->length();
     }
     ++it;
   }
+  if (need_reserve_bytes)
+    total += fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it);
 
  sending:
+  if (total == 0)
+    return -EAGAIN;
   assert(total <= pending_bl.length());
   bufferlist swapped;
   if (total < pending_bl.length()) {
@@ -448,7 +488,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
   }
 
   ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers "
-                 << pending_bl.buffers().size() << dendl;
+                 << pending_bl.buffers().size() << " tx chunks " << tx_buffers.size() << dendl;
 
   int r = post_work_request(tx_buffers);
   if (r < 0)
@@ -506,7 +546,7 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
     return -errno;
   }
   worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size());
-  ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
+  ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
   return 0;
 }
 
@@ -541,7 +581,7 @@ void RDMAConnectedSocketImpl::cleanup() {
 void RDMAConnectedSocketImpl::notify()
 {
   uint64_t i = 1;
-  assert(write(notify_fd, &i, sizeof(i)) == sizeof(i));
+  write(notify_fd, &i, sizeof(i));
 }
 
 void RDMAConnectedSocketImpl::shutdown()
index c16795720443e89b775a785f903414616261f54b..f5e81dd9d3c5bb0d1c6871fdd2d3928bfbed20ec 100644 (file)
@@ -17,6 +17,7 @@
 #include <poll.h>
 
 #include "include/str_list.h"
+#include "common/deleter.h"
 #include "RDMAStack.h"
 
 #define dout_subsys ceph_subsys_ms
@@ -129,6 +130,7 @@ void RDMADispatcher::polling()
   RDMAConnectedSocketImpl *conn = nullptr;
   utime_t last_inactive = ceph_clock_now();
   bool rearmed = false;
+  int ret = 0;
 
   while (true) {
     int n = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
@@ -191,29 +193,42 @@ void RDMADispatcher::polling()
       ibv_wc* response = &wc[i];
       Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
 
+      ldout(cct, 25) << __func__ << " got chunk=" << chunk << " bytes:" << response->byte_len << " opcode:" << response->opcode << dendl;
+
+      if (wc[i].opcode == IBV_WC_SEND) {
+        tx_cqe.push_back(wc[i]);
+        ldout(cct, 25) << " got a tx cqe, bytes:" << wc[i].byte_len << dendl;
+        continue;
+      }
+
       if (response->status != IBV_WC_SUCCESS) {
         perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
         ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
                       << ") status(" << response->status << ":"
-                      << ib->wc_status_to_string(response->status) << dendl;
-        ib->recall_chunk(chunk);
-        conn = get_conn_lockless(response->qp_num);
-        if (conn && conn->is_connected())
-          conn->fault();
-        notify_pending_workers();
+                      << ib->wc_status_to_string(response->status) << ")" << dendl;
+        if (ib->is_rx_buffer(chunk->buffer)) {
+          ret = ib->post_chunk(chunk);
+          if (ret) {
+            ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(ret) << dendl;
+            assert(ret == 0);
+          }
+          conn = get_conn_lockless(response->qp_num);
+          if (conn && conn->is_connected())
+            conn->fault();
+          notify_pending_workers();
+        } else if (ib->is_tx_buffer(chunk->buffer)) {
+          tx_cqe.push_back(wc[i]);
+        } else {
+          ldout(cct, 0) << __func__ << " unknown chunk: " << chunk << dendl;
+        }
         continue;
       }
 
-      if (wc[i].opcode == IBV_WC_SEND) {
-        tx_cqe.push_back(wc[i]);
-        ldout(cct, 25) << " got a tx cqe, bytes:" << wc[i].byte_len << dendl; 
-        continue;
-      }
-      ldout(cct, 25) << __func__ << " got chunk=" << chunk << " bytes:" << response->byte_len << " opcode:" << response->opcode << dendl;
       conn = get_conn_lockless(response->qp_num);
       if (!conn) {
-        int ret = ib->recall_chunk(chunk);
+        ret = ib->post_chunk(chunk);
         ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << ret << dendl;
+        assert(ret == 0);
         continue;
       }
       polled[conn].push_back(*response);
@@ -232,11 +247,11 @@ void RDMADispatcher::polling()
 }
 
 void RDMADispatcher::notify_pending_workers() {
-    Mutex::Locker l(w_lock);
-    if (pending_workers.empty())
-      return ;
-    pending_workers.front()->pass_wc(std::move(vector<ibv_wc>()));
-    pending_workers.pop_front();
+  Mutex::Locker l(w_lock);
+  if (pending_workers.empty())
+    return ;
+  pending_workers.front()->pass_wc(std::move(vector<ibv_wc>()));
+  pending_workers.pop_front();
 }
 
 int RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
@@ -337,6 +352,7 @@ RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
   plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer");
   plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer");
   plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted");
+  plb.add_u64_counter(l_msgr_rdma_rx_no_registered_mem, "rx_no_registered_mem", "The count of no registered buffer when receiving");
 
   plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted");
   plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted");
@@ -380,15 +396,6 @@ void RDMAWorker::pass_wc(std::vector<ibv_wc> &&v)
   notify();
 }
 
-void RDMAWorker::add_pending_conn(RDMAConnectedSocketImpl* o)
-{
-  pending_sent_conns.push_back(o);
-  if (!pended) {
-    dispatcher->pending_buffers(this);
-    pended = true;
-  }
-}
-
 void RDMAWorker::get_wc(std::vector<ibv_wc> &w)
 {
   Mutex::Locker l(lock);
@@ -429,15 +436,20 @@ int RDMAWorker::reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<C
 {
   int r = infiniband->get_tx_buffers(c, bytes);
   if (r > 0) {
-    stack->get_dispatcher()->inflight += c.size();
-    ldout(cct, 30) << __func__ << " reserve " << c.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl;
+    stack->get_dispatcher()->inflight += r;
+    ldout(cct, 30) << __func__ << " reserve " << r << " chunks, inflight " << dispatcher->inflight << dendl;
     return r;
   }
   assert(r == 0);
 
-  if (pending_sent_conns.back() != o)
-    pending_sent_conns.push_back(o);
-  dispatcher->pending_buffers(this);
+  if (o) {
+    {
+      Mutex::Locker l(lock);
+      if (pending_sent_conns.back() != o)
+        pending_sent_conns.push_back(o);
+    }
+    dispatcher->pending_buffers(this);
+  }
   return r;
 }
 
@@ -449,35 +461,36 @@ int RDMAWorker::reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<C
  * \return
  *      0 if success or -1 for failure
  */
-int RDMAWorker::post_tx_buffer(std::vector<Chunk*> &chunks)
+void RDMAWorker::post_tx_buffer(std::vector<Chunk*> &chunks)
 {
   if (chunks.empty())
-    return 0;
+    return ;
 
-  stack->get_dispatcher()->inflight -= chunks.size();
+  dispatcher->inflight -= chunks.size();
   memory_manager->return_tx(chunks);
   ldout(cct, 30) << __func__ << " release " << chunks.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl;
 
   pended = false;
   std::set<RDMAConnectedSocketImpl*> done;
+  Mutex::Locker l(lock);
   while (!pending_sent_conns.empty()) {
     RDMAConnectedSocketImpl *o = pending_sent_conns.front();
-    if (done.count(o) == 0) {
+    pending_sent_conns.pop_front();
+    if (!done.count(o)) {
+      lock.Unlock();
       done.insert(o);
-    } else {
-      pending_sent_conns.pop_front();
-      continue;
-    }
-    ssize_t r = o->submit(false);
-    ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
-    if (r < 0) {
-      if (r == -EAGAIN)
-        break;
-      o->fault();
+      ssize_t r = o->submit(false);
+      ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
+      lock.Lock();
+      if (r < 0) {
+        if (r == -EAGAIN) {
+          pending_sent_conns.push_front(o);
+          break;
+        }
+        o->fault();
+      }
     }
-    pending_sent_conns.pop_front();
   }
-  return 0;
 }
 
 void RDMAWorker::handle_tx_event()
@@ -507,7 +520,8 @@ void RDMAWorker::handle_tx_event()
                       << infiniband->wc_status_to_string(response->status) << dendl;
       }
       RDMAConnectedSocketImpl *conn = stack->get_dispatcher()->get_conn_by_qp(response->qp_num);
-      if (conn) {
+
+      if (conn && conn->is_connected()) {
         ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi
         conn->fault();
       } else {
@@ -515,12 +529,9 @@ void RDMAWorker::handle_tx_event()
       }
     }
 
-    //assert(memory_manager->is_tx_chunk(chunk));
-    if (memory_manager->is_tx_chunk(chunk)) {
+    // FIXME: why not tx?
+    if (memory_manager->is_tx_buffer(chunk->buffer))
       tx_chunks.push_back(chunk);
-    } else {
-      ldout(cct, 1) << __func__ << " a outter chunk: " << chunk << dendl;//fin
-    }
   }
 
   perf_logger->inc(l_msgr_rdma_tx_total_wc, cqe.size());
index 7386b3ba92853d4737f8c85f363a69f1c6433065..5bb200c3cb8bec091ca3f3bd27a70e9134dc7883 100644 (file)
@@ -140,6 +140,7 @@ enum {
   l_msgr_rdma_tx_no_mem,
   l_msgr_rdma_tx_parital_mem,
   l_msgr_rdma_tx_failed,
+  l_msgr_rdma_rx_no_registered_mem,
 
   l_msgr_rdma_tx_chunks,
   l_msgr_rdma_tx_bytes,
@@ -187,9 +188,11 @@ class RDMAWorker : public Worker {
   virtual void initialize() override;
   RDMAStack *get_stack() { return stack; }
   int reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes);
-  int post_tx_buffer(std::vector<Chunk*> &chunks);
-  void add_pending_conn(RDMAConnectedSocketImpl* o);
-  void remove_pending_conn(RDMAConnectedSocketImpl *o) { pending_sent_conns.remove(o); }
+  void post_tx_buffer(std::vector<Chunk*> &chunks);
+  void remove_pending_conn(RDMAConnectedSocketImpl *o) {
+    Mutex::Locker l(lock);
+    pending_sent_conns.remove(o);
+  }
   void handle_tx_event();
   void set_ib(Infiniband* ib) { infiniband = ib; }
   void set_stack(RDMAStack *s) { stack = s; }