]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: drop zero_copy_read() & co from ConnectedSocket. 28921/head
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Mon, 8 Jul 2019 14:19:45 +0000 (16:19 +0200)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Mon, 8 Jul 2019 14:21:28 +0000 (16:21 +0200)
The unit test is the method's sole user.

Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/msg/async/PosixStack.cc
src/msg/async/Stack.h
src/msg/async/dpdk/DPDKStack.h
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAStack.h
src/test/msgr/test_async_networkstack.cc

index 5a364b8fbe09269317226b45c5b49b0f833ab4c7..cf6b220bf01a471a0b5c188166e1f15ae6000d04 100644 (file)
@@ -62,10 +62,6 @@ class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
     }
   }
 
-  ssize_t zero_copy_read(bufferptr&) override {
-    return -EOPNOTSUPP;
-  }
-
   ssize_t read(char *buf, size_t len) override {
     ssize_t r = ::read(_fd, buf, len);
     if (r < 0)
index d05f934fd093aba035b43d83d1a363e347555b21..cff6707f7b0702d4ac2b2c8888497e943d7544c9 100644 (file)
@@ -28,7 +28,6 @@ class ConnectedSocketImpl {
   virtual ~ConnectedSocketImpl() {}
   virtual int is_connected() = 0;
   virtual ssize_t read(char*, size_t) = 0;
-  virtual ssize_t zero_copy_read(bufferptr&) = 0;
   virtual ssize_t send(bufferlist &bl, bool more) = 0;
   virtual void shutdown() = 0;
   virtual void close() = 0;
@@ -95,12 +94,6 @@ class ConnectedSocket {
   ssize_t read(char* buf, size_t len) {
     return _csi->read(buf, len);
   }
-  /// Gets the input stream.
-  ///
-  /// Gets an object returning data sent from the remote endpoint.
-  ssize_t zero_copy_read(bufferptr &data) {
-    return _csi->zero_copy_read(data);
-  }
   /// Gets the output stream.
   ///
   /// Gets an object that sends data to the remote endpoint.
@@ -329,8 +322,6 @@ class NetworkStack {
 
   static Worker* create_worker(
           CephContext *c, const string &t, unsigned i);
-  // backend need to override this method if supports zero copy read
-  virtual bool support_zero_copy_read() const { return false; }
   // backend need to override this method if backend doesn't support shared
   // listen table.
   // For example, posix backend has in kernel global listen table. If one
index a44ae38367f9d41d781af4abfceae4299cbebcdc..2b465976e16f0825d0a876a2d9de63a92f56133d 100644 (file)
@@ -97,7 +97,8 @@ class NativeConnectedSocketImpl : public ConnectedSocketImpl {
     return len - left ? len - left : -EAGAIN;
   }
 
-  virtual ssize_t zero_copy_read(bufferptr &data) override {
+private:
+  ssize_t zero_copy_read(bufferptr &data) {
     auto err = _conn.get_errno();
     if (err <= 0)
       return err;
@@ -166,6 +167,8 @@ class NativeConnectedSocketImpl : public ConnectedSocketImpl {
       return _conn.send(Packet(std::move(frags), make_deleter(std::move(del))));
     }
   }
+
+public:
   virtual void shutdown() override {
     _conn.close_write();
   }
@@ -247,7 +250,6 @@ class DPDKStack : public NetworkStack {
   explicit DPDKStack(CephContext *cct, const string &t): NetworkStack(cct, t) {
     funcs.resize(cct->_conf->ms_async_max_op_threads);
   }
-  virtual bool support_zero_copy_read() const override { return true; }
   virtual bool support_local_listen_table() const override { return true; }
 
   virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
index bf9d072ef9a55019ef557e6e119b1fc3f587f58c..a95151331a4f862e4b656ba61a8d1b77b81bf599 100644 (file)
@@ -363,53 +363,6 @@ ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
   return read;
 }
 
-ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)
-{
-  if (error)
-    return -error;
-  static const int MAX_COMPLETIONS = 16;
-  ibv_wc wc[MAX_COMPLETIONS];
-  ssize_t size = 0;
-
-  ibv_wc*  response;
-  Chunk* chunk;
-  bool loaded = false;
-  auto iter = buffers.begin();
-  if (iter != buffers.end()) {
-    chunk = *iter;
-    // FIXME need to handle release
-    // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
-    buffers.erase(iter);
-    loaded = true;
-    size = chunk->bound;
-  }
-
-  std::vector<ibv_wc> cqe;
-  get_wc(cqe);
-  if (cqe.empty())
-    return size == 0 ? -EAGAIN : size;
-
-  ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl;
-
-  for (size_t i = 0; i < cqe.size(); ++i) {
-    response = &wc[i];
-    chunk = reinterpret_cast<Chunk*>(response->wr_id);
-    chunk->prepare_read(response->byte_len);
-    if (!loaded && i == 0) {
-      // FIXME need to handle release
-      // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
-      size = chunk->bound;
-      continue;
-    }
-    buffers.push_back(chunk);
-    iter++;
-  }
-
-  if (size == 0)
-    return -EAGAIN;
-  return size;
-}
-
 ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
 {
   if (error) {
index e038d3625986aa173d9c588ec8532c55467613eb..ee220601269129023bc631380312dfa9dba8ebf4 100644 (file)
@@ -216,7 +216,6 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
   virtual int is_connected() override { return connected; }
 
   virtual ssize_t read(char* buf, size_t len) override;
-  virtual ssize_t zero_copy_read(bufferptr &data) override;
   virtual ssize_t send(bufferlist &bl, bool more) override;
   virtual void shutdown() override;
   virtual void close() override;
@@ -345,7 +344,6 @@ class RDMAStack : public NetworkStack {
  public:
   explicit RDMAStack(CephContext *cct, const string &t);
   virtual ~RDMAStack();
-  virtual bool support_zero_copy_read() const override { return false; }
   virtual bool nonblock_connect_need_writable_event() const override { return false; }
 
   virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
index 3077c1f13e663cabb87483ac748eb5d10d11f27b..0e52c4eb424df73af986d8ae9bdf3faf1e64235d 100644 (file)
@@ -853,11 +853,7 @@ class StressFactory {
       while (true) {
         char buf[4096];
         bufferptr data;
-        if (factory->zero_copy_read) {
-          r = socket.zero_copy_read(data);
-        } else {
-          r = socket.read(buf, sizeof(buf));
-        }
+        r = socket.read(buf, sizeof(buf));
         ASSERT_TRUE(r == -EAGAIN || (r >= 0 && (size_t)r <= sizeof(buf)));
         if (r == 0) {
           ASSERT_TRUE(buffers.empty());
@@ -865,11 +861,7 @@ class StressFactory {
           return ;
         } else if (r == -EAGAIN)
           break;
-        if (factory->zero_copy_read) {
-          buffers.emplace_back(data.c_str(), 0, data.length());
-        } else {
-          buffers.emplace_back(buf, 0, r);
-        }
+        buffers.emplace_back(buf, 0, r);
         std::cerr << " server " << this << " receive " << r << " content: " << std::endl;
       }
       if (!buffers.empty() && !write_enabled)
@@ -961,14 +953,12 @@ class StressFactory {
   atomic_int message_count, message_left;
   entity_addr_t bind_addr;
   std::atomic_bool already_bind = {false};
-  bool zero_copy_read;
   SocketOptions options;
 
   explicit StressFactory(const std::shared_ptr<NetworkStack> &s, const string &addr,
-                         size_t cli, size_t qd, size_t mc, size_t l, bool zero_copy)
+                         size_t cli, size_t qd, size_t mc, size_t l)
       : stack(s), rs(128), client_num(cli), queue_depth(qd),
-        max_message_length(l), message_count(mc), message_left(mc),
-        zero_copy_read(zero_copy) {
+        max_message_length(l), message_count(mc), message_left(mc) {
     bind_addr.parse(addr.c_str());
     rs.prepare(100);
   }
@@ -1054,8 +1044,7 @@ class StressFactory {
 };
 
 TEST_P(NetworkWorkerTest, StressTest) {
-  StressFactory factory(stack, get_addr(), 16, 16, 10000, 1024,
-                        strncmp(GetParam(), "dpdk", 4) == 0);
+  StressFactory factory(stack, get_addr(), 16, 16, 10000, 1024);
   StressFactory *f = &factory;
   exec_events([f](Worker *worker) mutable {
     f->start(worker);