The unit test is the method's sole user.
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
}
}
- ssize_t zero_copy_read(bufferptr&) override {
- return -EOPNOTSUPP;
- }
-
ssize_t read(char *buf, size_t len) override {
ssize_t r = ::read(_fd, buf, len);
if (r < 0)
virtual ~ConnectedSocketImpl() {}
virtual int is_connected() = 0;
virtual ssize_t read(char*, size_t) = 0;
- virtual ssize_t zero_copy_read(bufferptr&) = 0;
virtual ssize_t send(bufferlist &bl, bool more) = 0;
virtual void shutdown() = 0;
virtual void close() = 0;
ssize_t read(char* buf, size_t len) {
return _csi->read(buf, len);
}
- /// Gets the input stream.
- ///
- /// Gets an object returning data sent from the remote endpoint.
- ssize_t zero_copy_read(bufferptr &data) {
- return _csi->zero_copy_read(data);
- }
/// Gets the output stream.
///
/// Gets an object that sends data to the remote endpoint.
static Worker* create_worker(
CephContext *c, const string &t, unsigned i);
- // backend need to override this method if supports zero copy read
- virtual bool support_zero_copy_read() const { return false; }
// backend need to override this method if backend doesn't support shared
// listen table.
// For example, posix backend has in kernel global listen table. If one
return len - left ? len - left : -EAGAIN;
}
- virtual ssize_t zero_copy_read(bufferptr &data) override {
+private:
+ ssize_t zero_copy_read(bufferptr &data) {
auto err = _conn.get_errno();
if (err <= 0)
return err;
return _conn.send(Packet(std::move(frags), make_deleter(std::move(del))));
}
}
+
+public:
virtual void shutdown() override {
_conn.close_write();
}
explicit DPDKStack(CephContext *cct, const string &t): NetworkStack(cct, t) {
funcs.resize(cct->_conf->ms_async_max_op_threads);
}
- virtual bool support_zero_copy_read() const override { return true; }
virtual bool support_local_listen_table() const override { return true; }
virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
return read;
}
-ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)
-{
- if (error)
- return -error;
- static const int MAX_COMPLETIONS = 16;
- ibv_wc wc[MAX_COMPLETIONS];
- ssize_t size = 0;
-
- ibv_wc* response;
- Chunk* chunk;
- bool loaded = false;
- auto iter = buffers.begin();
- if (iter != buffers.end()) {
- chunk = *iter;
- // FIXME need to handle release
- // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
- buffers.erase(iter);
- loaded = true;
- size = chunk->bound;
- }
-
- std::vector<ibv_wc> cqe;
- get_wc(cqe);
- if (cqe.empty())
- return size == 0 ? -EAGAIN : size;
-
- ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl;
-
- for (size_t i = 0; i < cqe.size(); ++i) {
- response = &wc[i];
- chunk = reinterpret_cast<Chunk*>(response->wr_id);
- chunk->prepare_read(response->byte_len);
- if (!loaded && i == 0) {
- // FIXME need to handle release
- // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
- size = chunk->bound;
- continue;
- }
- buffers.push_back(chunk);
- iter++;
- }
-
- if (size == 0)
- return -EAGAIN;
- return size;
-}
-
ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
{
if (error) {
virtual int is_connected() override { return connected; }
virtual ssize_t read(char* buf, size_t len) override;
- virtual ssize_t zero_copy_read(bufferptr &data) override;
virtual ssize_t send(bufferlist &bl, bool more) override;
virtual void shutdown() override;
virtual void close() override;
public:
explicit RDMAStack(CephContext *cct, const string &t);
virtual ~RDMAStack();
- virtual bool support_zero_copy_read() const override { return false; }
virtual bool nonblock_connect_need_writable_event() const override { return false; }
virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
while (true) {
char buf[4096];
bufferptr data;
- if (factory->zero_copy_read) {
- r = socket.zero_copy_read(data);
- } else {
- r = socket.read(buf, sizeof(buf));
- }
+ r = socket.read(buf, sizeof(buf));
ASSERT_TRUE(r == -EAGAIN || (r >= 0 && (size_t)r <= sizeof(buf)));
if (r == 0) {
ASSERT_TRUE(buffers.empty());
return ;
} else if (r == -EAGAIN)
break;
- if (factory->zero_copy_read) {
- buffers.emplace_back(data.c_str(), 0, data.length());
- } else {
- buffers.emplace_back(buf, 0, r);
- }
+ buffers.emplace_back(buf, 0, r);
std::cerr << " server " << this << " receive " << r << " content: " << std::endl;
}
if (!buffers.empty() && !write_enabled)
atomic_int message_count, message_left;
entity_addr_t bind_addr;
std::atomic_bool already_bind = {false};
- bool zero_copy_read;
SocketOptions options;
explicit StressFactory(const std::shared_ptr<NetworkStack> &s, const string &addr,
- size_t cli, size_t qd, size_t mc, size_t l, bool zero_copy)
+ size_t cli, size_t qd, size_t mc, size_t l)
: stack(s), rs(128), client_num(cli), queue_depth(qd),
- max_message_length(l), message_count(mc), message_left(mc),
- zero_copy_read(zero_copy) {
+ max_message_length(l), message_count(mc), message_left(mc) {
bind_addr.parse(addr.c_str());
rs.prepare(100);
}
};
TEST_P(NetworkWorkerTest, StressTest) {
- StressFactory factory(stack, get_addr(), 16, 16, 10000, 1024,
- strncmp(GetParam(), "dpdk", 4) == 0);
+ StressFactory factory(stack, get_addr(), 16, 16, 10000, 1024);
StressFactory *f = &factory;
exec_events([f](Worker *worker) mutable {
f->start(worker);