From: Radoslaw Zarzynski Date: Mon, 8 Jul 2019 14:19:45 +0000 (+0200) Subject: msg/async: drop zero_copy_read() & co from ConnectedSocket. X-Git-Tag: v15.1.0~316^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=0c45f36f76a84a52e9dff19eb1e79e79939b4313;p=ceph.git msg/async: drop zero_copy_read() & co from ConnectedSocket. The unit test is the method's sole user. Signed-off-by: Radoslaw Zarzynski --- diff --git a/src/msg/async/PosixStack.cc b/src/msg/async/PosixStack.cc index 5a364b8fbe09..cf6b220bf01a 100644 --- a/src/msg/async/PosixStack.cc +++ b/src/msg/async/PosixStack.cc @@ -62,10 +62,6 @@ class PosixConnectedSocketImpl final : public ConnectedSocketImpl { } } - ssize_t zero_copy_read(bufferptr&) override { - return -EOPNOTSUPP; - } - ssize_t read(char *buf, size_t len) override { ssize_t r = ::read(_fd, buf, len); if (r < 0) diff --git a/src/msg/async/Stack.h b/src/msg/async/Stack.h index d05f934fd093..cff6707f7b07 100644 --- a/src/msg/async/Stack.h +++ b/src/msg/async/Stack.h @@ -28,7 +28,6 @@ class ConnectedSocketImpl { virtual ~ConnectedSocketImpl() {} virtual int is_connected() = 0; virtual ssize_t read(char*, size_t) = 0; - virtual ssize_t zero_copy_read(bufferptr&) = 0; virtual ssize_t send(bufferlist &bl, bool more) = 0; virtual void shutdown() = 0; virtual void close() = 0; @@ -95,12 +94,6 @@ class ConnectedSocket { ssize_t read(char* buf, size_t len) { return _csi->read(buf, len); } - /// Gets the input stream. - /// - /// Gets an object returning data sent from the remote endpoint. - ssize_t zero_copy_read(bufferptr &data) { - return _csi->zero_copy_read(data); - } /// Gets the output stream. /// /// Gets an object that sends data to the remote endpoint. @@ -329,8 +322,6 @@ class NetworkStack { static Worker* create_worker( CephContext *c, const string &t, unsigned i); - // backend need to override this method if supports zero copy read - virtual bool support_zero_copy_read() const { return false; } // backend need to override this method if backend doesn't support shared // listen table. // For example, posix backend has in kernel global listen table. If one diff --git a/src/msg/async/dpdk/DPDKStack.h b/src/msg/async/dpdk/DPDKStack.h index a44ae38367f9..2b465976e16f 100644 --- a/src/msg/async/dpdk/DPDKStack.h +++ b/src/msg/async/dpdk/DPDKStack.h @@ -97,7 +97,8 @@ class NativeConnectedSocketImpl : public ConnectedSocketImpl { return len - left ? len - left : -EAGAIN; } - virtual ssize_t zero_copy_read(bufferptr &data) override { +private: + ssize_t zero_copy_read(bufferptr &data) { auto err = _conn.get_errno(); if (err <= 0) return err; @@ -166,6 +167,8 @@ class NativeConnectedSocketImpl : public ConnectedSocketImpl { return _conn.send(Packet(std::move(frags), make_deleter(std::move(del)))); } } + +public: virtual void shutdown() override { _conn.close_write(); } @@ -247,7 +250,6 @@ class DPDKStack : public NetworkStack { explicit DPDKStack(CephContext *cct, const string &t): NetworkStack(cct, t) { funcs.resize(cct->_conf->ms_async_max_op_threads); } - virtual bool support_zero_copy_read() const override { return true; } virtual bool support_local_listen_table() const override { return true; } virtual void spawn_worker(unsigned i, std::function &&func) override; diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc index bf9d072ef9a5..a95151331a4f 100644 --- a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -363,53 +363,6 @@ ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len) return read; } -ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data) -{ - if (error) - return -error; - static const int MAX_COMPLETIONS = 16; - ibv_wc wc[MAX_COMPLETIONS]; - ssize_t size = 0; - - ibv_wc* response; - Chunk* chunk; - bool loaded = false; - auto iter = buffers.begin(); - if (iter != buffers.end()) { - chunk = *iter; - // FIXME need to handle release - // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband); - buffers.erase(iter); - loaded = true; - size = chunk->bound; - } - - std::vector cqe; - get_wc(cqe); - if (cqe.empty()) - return size == 0 ? -EAGAIN : size; - - ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl; - - for (size_t i = 0; i < cqe.size(); ++i) { - response = &wc[i]; - chunk = reinterpret_cast(response->wr_id); - chunk->prepare_read(response->byte_len); - if (!loaded && i == 0) { - // FIXME need to handle release - // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband); - size = chunk->bound; - continue; - } - buffers.push_back(chunk); - iter++; - } - - if (size == 0) - return -EAGAIN; - return size; -} - ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more) { if (error) { diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index e038d3625986..ee2206012691 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -216,7 +216,6 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl { virtual int is_connected() override { return connected; } virtual ssize_t read(char* buf, size_t len) override; - virtual ssize_t zero_copy_read(bufferptr &data) override; virtual ssize_t send(bufferlist &bl, bool more) override; virtual void shutdown() override; virtual void close() override; @@ -345,7 +344,6 @@ class RDMAStack : public NetworkStack { public: explicit RDMAStack(CephContext *cct, const string &t); virtual ~RDMAStack(); - virtual bool support_zero_copy_read() const override { return false; } virtual bool nonblock_connect_need_writable_event() const override { return false; } virtual void spawn_worker(unsigned i, std::function &&func) override; diff --git a/src/test/msgr/test_async_networkstack.cc b/src/test/msgr/test_async_networkstack.cc index 3077c1f13e66..0e52c4eb424d 100644 --- a/src/test/msgr/test_async_networkstack.cc +++ b/src/test/msgr/test_async_networkstack.cc @@ -853,11 +853,7 @@ class StressFactory { while (true) { char buf[4096]; bufferptr data; - if (factory->zero_copy_read) { - r = socket.zero_copy_read(data); - } else { - r = socket.read(buf, sizeof(buf)); - } + r = socket.read(buf, sizeof(buf)); ASSERT_TRUE(r == -EAGAIN || (r >= 0 && (size_t)r <= sizeof(buf))); if (r == 0) { ASSERT_TRUE(buffers.empty()); @@ -865,11 +861,7 @@ class StressFactory { return ; } else if (r == -EAGAIN) break; - if (factory->zero_copy_read) { - buffers.emplace_back(data.c_str(), 0, data.length()); - } else { - buffers.emplace_back(buf, 0, r); - } + buffers.emplace_back(buf, 0, r); std::cerr << " server " << this << " receive " << r << " content: " << std::endl; } if (!buffers.empty() && !write_enabled) @@ -961,14 +953,12 @@ class StressFactory { atomic_int message_count, message_left; entity_addr_t bind_addr; std::atomic_bool already_bind = {false}; - bool zero_copy_read; SocketOptions options; explicit StressFactory(const std::shared_ptr &s, const string &addr, - size_t cli, size_t qd, size_t mc, size_t l, bool zero_copy) + size_t cli, size_t qd, size_t mc, size_t l) : stack(s), rs(128), client_num(cli), queue_depth(qd), - max_message_length(l), message_count(mc), message_left(mc), - zero_copy_read(zero_copy) { + max_message_length(l), message_count(mc), message_left(mc) { bind_addr.parse(addr.c_str()); rs.prepare(100); } @@ -1054,8 +1044,7 @@ class StressFactory { }; TEST_P(NetworkWorkerTest, StressTest) { - StressFactory factory(stack, get_addr(), 16, 16, 10000, 1024, - strncmp(GetParam(), "dpdk", 4) == 0); + StressFactory factory(stack, get_addr(), 16, 16, 10000, 1024); StressFactory *f = &factory; exec_events([f](Worker *worker) mutable { f->start(worker);