From: Josh Durgin Date: Mon, 21 Oct 2013 19:40:30 +0000 (-0700) Subject: buffer: add methods to read and write using zero copy X-Git-Tag: v0.74~65^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3f6fa05d7224f9b04d849c4a5309a3da01d3cb34;p=ceph.git buffer: add methods to read and write using zero copy Create explicit methods for testing. Make buffer::list::write_fd() use zero-copy if all the buffers support it. Don't automatically handle reads yet, since we need better detection of read length first. Signed-off-by: Josh Durgin --- diff --git a/src/common/buffer.cc b/src/common/buffer.cc index cf20080fe566..ed0c3a96a96f 100644 --- a/src/common/buffer.cc +++ b/src/common/buffer.cc @@ -100,6 +100,12 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE memcpy(c->data, data, len); return c; } + virtual bool can_zero_copy() const { + return false; + } + virtual int zero_copy_to_fd(int fd, loff_t *offset) { + return -ENOTSUP; + } virtual bool is_page_aligned() { return ((long)data & ~CEPH_PAGE_MASK) == 0; } @@ -257,6 +263,10 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE bdout << "raw_pipe " << this << " free " << (void *)data << " " << buffer::get_total_alloc() << bendl; } + bool can_zero_copy() const { + return true; + } + bool is_page_aligned() { return false; } int set_source(int fd, loff_t *off) { @@ -271,6 +281,19 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE len = r; return 0; } + int zero_copy_to_fd(int fd, loff_t *offset) { + assert(!source_consumed); + int flags = SPLICE_F_NONBLOCK; + ssize_t r = safe_splice_exact(pipefds[0], NULL, fd, offset, len, flags); + if (r < 0) { + bdout << "raw_pipe: error splicing from pipe to fd: " + << cpp_strerror(r) << bendl; + return r; + } + source_consumed = true; + return 0; + } + buffer::raw* clone_empty() { // cloning doesn't make sense for pipe-based buffers, // and is only used by unit tests for other types of buffers return NULL; @@ -412,6 +435,20 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE #endif } + buffer::raw* buffer::create_zero_copy(unsigned len, int fd, loff_t *offset) { +#ifdef CEPH_HAVE_SPLICE + buffer::raw_pipe* buf = new raw_pipe(len); + int r = buf->set_source(fd, offset); + if (r < 0) { + delete buf; + throw error_code(r); + } + return buf; +#else + throw error_code(-ENOTSUP); +#endif + } + buffer::ptr::ptr(raw *r) : _raw(r), _off(0), _len(r->len) // no lock needed; this is an unref raw. { r->nref.inc(); @@ -597,6 +634,15 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE memset(c_str()+o, 0, l); } + bool buffer::ptr::can_zero_copy() const + { + return _raw->can_zero_copy(); + } + + int buffer::ptr::zero_copy_to_fd(int fd, loff_t *offset) const + { + return _raw->zero_copy_to_fd(fd, offset); + } // -- buffer::list::iterator -- /* @@ -859,6 +905,16 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE } } + bool buffer::list::can_zero_copy() const + { + for (std::list::const_iterator it = _buffers.begin(); + it != _buffers.end(); + ++it) + if (!it->can_zero_copy()) + return false; + return true; + } + bool buffer::list::is_page_aligned() const { for (std::list::const_iterator it = _buffers.begin(); @@ -1351,7 +1407,7 @@ int buffer::list::read_file(const char *fn, std::string *error) return 0; } -ssize_t buffer::list::read_fd(int fd, size_t len) +ssize_t buffer::list::read_fd(int fd, size_t len) { int s = ROUND_UP_TO(len, CEPH_PAGE_SIZE); bufferptr bp = buffer::create_page_aligned(s); @@ -1363,6 +1419,21 @@ ssize_t buffer::list::read_fd(int fd, size_t len) return ret; } +int buffer::list::read_fd_zero_copy(int fd, size_t len) +{ +#ifdef CEPH_HAVE_SPLICE + try { + bufferptr bp = buffer::create_zero_copy(len, fd, NULL); + append(bp); + } catch (buffer::error_code e) { + return e.code; + } + return 0; +#else + return -ENOTSUP; +#endif +} + int buffer::list::write_file(const char *fn, int mode) { int fd = TEMP_FAILURE_RETRY(::open(fn, O_WRONLY|O_CREAT|O_TRUNC, mode)); @@ -1390,12 +1461,15 @@ int buffer::list::write_file(const char *fn, int mode) int buffer::list::write_fd(int fd) const { + if (can_zero_copy()) + return write_fd_zero_copy(fd); + // use writev! iovec iov[IOV_MAX]; int iovlen = 0; ssize_t bytes = 0; - std::list::const_iterator p = _buffers.begin(); + std::list::const_iterator p = _buffers.begin(); while (p != _buffers.end()) { if (p->length() > 0) { iov[iovlen].iov_base = (void *)p->c_str(); @@ -1440,6 +1514,30 @@ int buffer::list::write_fd(int fd) const return 0; } +int buffer::list::write_fd_zero_copy(int fd) const +{ + if (!can_zero_copy()) + return -ENOTSUP; + /* pass offset to each call to avoid races updating the fd seek + * position, since the I/O may be non-blocking + */ + loff_t offset = ::lseek(fd, 0, SEEK_CUR); + loff_t *off_p = &offset; + if (offset < 0 && offset != ESPIPE) + return (int) offset; + if (offset == ESPIPE) + off_p = NULL; + for (std::list::const_iterator it = _buffers.begin(); + it != _buffers.end(); ++it) { + int r = it->zero_copy_to_fd(fd, off_p); + if (r < 0) + return r; + if (off_p) + offset += it->length(); + } + return 0; +} + __u32 buffer::list::crc32c(__u32 crc) const { for (std::list::const_iterator it = _buffers.begin(); diff --git a/src/include/buffer.h b/src/include/buffer.h index 546f728bbdc9..6eefd3494cfc 100644 --- a/src/include/buffer.h +++ b/src/include/buffer.h @@ -211,6 +211,9 @@ public: memcpy(dest, c_str()+o, l); } + bool can_zero_copy() const; + int zero_copy_to_fd(int fd, loff_t *offset) const; + unsigned wasted(); int cmp(const ptr& o); @@ -310,6 +313,7 @@ public: private: mutable iterator last_p; + int zero_copy_to_fd(int fd) const; public: // cons/des @@ -347,6 +351,7 @@ public: } bool contents_equal(buffer::list& other); + bool can_zero_copy() const; bool is_page_aligned() const; bool is_n_page_sized() const; @@ -434,8 +439,10 @@ public: void hexdump(std::ostream &out) const; int read_file(const char *fn, std::string *error); ssize_t read_fd(int fd, size_t len); + int read_fd_zero_copy(int fd, size_t len); int write_file(const char *fn, int mode=0644); int write_fd(int fd) const; + int write_fd_zero_copy(int fd) const; uint32_t crc32c(uint32_t crc) const; }; diff --git a/src/test/bufferlist.cc b/src/test/bufferlist.cc index 61db1ca33467..4b3ae6167267 100644 --- a/src/test/bufferlist.cc +++ b/src/test/bufferlist.cc @@ -280,6 +280,37 @@ TEST_F(TestRawPipe, c_str_dest_short_explicit_offset) { EXPECT_EQ(2u, ptr.length()); EXPECT_EQ(0, memcmp(ptr.c_str(), "BC", 2)); } + +TEST_F(TestRawPipe, buffer_list_read_fd_zero_copy) { + bufferlist bl; + EXPECT_EQ(-EBADF, bl.read_fd_zero_copy(-1, len)); + bl = bufferlist(); + EXPECT_EQ(0, bl.read_fd_zero_copy(fd, len)); + EXPECT_EQ(len, bl.length()); + EXPECT_EQ(0u, bl.buffers().front().unused_tail_length()); + EXPECT_EQ(1u, bl.buffers().size()); + EXPECT_EQ(len, bl.buffers().front().raw_length()); + EXPECT_EQ(0, memcmp(bl.c_str(), "ABC\n", len)); + EXPECT_TRUE(bl.can_zero_copy()); +} + +TEST_F(TestRawPipe, buffer_list_write_fd_zero_copy) { + ::unlink("testfile_out"); + bufferlist bl; + EXPECT_EQ(0, bl.read_fd_zero_copy(fd, len)); + EXPECT_TRUE(bl.can_zero_copy()); + int out_fd = ::open("testfile_out", O_RDWR|O_CREAT|O_TRUNC, 0600); + EXPECT_EQ(0, bl.write_fd_zero_copy(out_fd)); + struct stat st; + memset(&st, 0, sizeof(st)); + EXPECT_EQ(0, ::stat("testfile_out", &st)); + EXPECT_EQ(len, st.st_size); + char buf[len + 1]; + EXPECT_EQ(len, safe_read(out_fd, buf, len + 1)); + EXPECT_EQ(0, memcmp(buf, "ABC\n", len)); + ::close(out_fd); + ::unlink("testfile_out"); +} #endif // CEPH_HAVE_SPLICE //