]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
buffer: add methods to read and write using zero copy
authorJosh Durgin <josh.durgin@inktank.com>
Mon, 21 Oct 2013 19:40:30 +0000 (12:40 -0700)
committerJosh Durgin <josh.durgin@inktank.com>
Sat, 23 Nov 2013 00:14:03 +0000 (16:14 -0800)
Create explicit methods for testing. Make buffer::list::write_fd() use
zero-copy if all the buffers support it.  Don't automatically handle
reads yet, since we need better detection of read length first.

Signed-off-by: Josh Durgin <josh.durgin@inktank.com>
src/common/buffer.cc
src/include/buffer.h
src/test/bufferlist.cc

index cf20080fe5666690576616eea71adcfb6ccc68ce..ed0c3a96a96f15e45c8a9e93c712eb7341ae3cef 100644 (file)
@@ -100,6 +100,12 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
       memcpy(c->data, data, len);
       return c;
     }
+    virtual bool can_zero_copy() const {
+      return false;
+    }
+    virtual int zero_copy_to_fd(int fd, loff_t *offset) {
+      return -ENOTSUP;
+    }
     virtual bool is_page_aligned() {
       return ((long)data & ~CEPH_PAGE_MASK) == 0;
     }
@@ -257,6 +263,10 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
       bdout << "raw_pipe " << this << " free " << (void *)data << " "
            << buffer::get_total_alloc() << bendl;
     }
+    bool can_zero_copy() const {
+      return true;
+    }
+    bool is_page_aligned() {
       return false;
     }
     int set_source(int fd, loff_t *off) {
@@ -271,6 +281,19 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
       len = r;
       return 0;
     }
+    int zero_copy_to_fd(int fd, loff_t *offset) {
+      assert(!source_consumed);
+      int flags = SPLICE_F_NONBLOCK;
+      ssize_t r = safe_splice_exact(pipefds[0], NULL, fd, offset, len, flags);
+      if (r < 0) {
+       bdout << "raw_pipe: error splicing from pipe to fd: "
+             << cpp_strerror(r) << bendl;
+       return r;
+      }
+      source_consumed = true;
+      return 0;
+    }
+    buffer::raw* clone_empty() {
       // cloning doesn't make sense for pipe-based buffers,
       // and is only used by unit tests for other types of buffers
       return NULL;
@@ -412,6 +435,20 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
 #endif
   }
 
+  buffer::raw* buffer::create_zero_copy(unsigned len, int fd, loff_t *offset) {
+#ifdef CEPH_HAVE_SPLICE
+    buffer::raw_pipe* buf = new raw_pipe(len);
+    int r = buf->set_source(fd, offset);
+    if (r < 0) {
+      delete buf;
+      throw error_code(r);
+    }
+    return buf;
+#else
+    throw error_code(-ENOTSUP);
+#endif
+  }
+
   buffer::ptr::ptr(raw *r) : _raw(r), _off(0), _len(r->len)   // no lock needed; this is an unref raw.
   {
     r->nref.inc();
@@ -597,6 +634,15 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
     memset(c_str()+o, 0, l);
   }
 
+  bool buffer::ptr::can_zero_copy() const
+  {
+    return _raw->can_zero_copy();
+  }
+
+  int buffer::ptr::zero_copy_to_fd(int fd, loff_t *offset) const
+  {
+    return _raw->zero_copy_to_fd(fd, offset);
+  }
 
   // -- buffer::list::iterator --
   /*
@@ -859,6 +905,16 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
     }
   }
 
+  bool buffer::list::can_zero_copy() const
+  {
+    for (std::list<ptr>::const_iterator it = _buffers.begin();
+        it != _buffers.end();
+        ++it)
+      if (!it->can_zero_copy())
+       return false;
+    return true;
+  }
+
   bool buffer::list::is_page_aligned() const
   {
     for (std::list<ptr>::const_iterator it = _buffers.begin();
@@ -1351,7 +1407,7 @@ int buffer::list::read_file(const char *fn, std::string *error)
   return 0;
 }
 
-ssize_t buffer::list::read_fd(int fd, size_t len) 
+ssize_t buffer::list::read_fd(int fd, size_t len)
 {
   int s = ROUND_UP_TO(len, CEPH_PAGE_SIZE);
   bufferptr bp = buffer::create_page_aligned(s);
@@ -1363,6 +1419,21 @@ ssize_t buffer::list::read_fd(int fd, size_t len)
   return ret;
 }
 
+int buffer::list::read_fd_zero_copy(int fd, size_t len)
+{
+#ifdef CEPH_HAVE_SPLICE
+  try {
+    bufferptr bp = buffer::create_zero_copy(len, fd, NULL);
+    append(bp);
+  } catch (buffer::error_code e) {
+    return e.code;
+  }
+  return 0;
+#else
+  return -ENOTSUP;
+#endif
+}
+
 int buffer::list::write_file(const char *fn, int mode)
 {
   int fd = TEMP_FAILURE_RETRY(::open(fn, O_WRONLY|O_CREAT|O_TRUNC, mode));
@@ -1390,12 +1461,15 @@ int buffer::list::write_file(const char *fn, int mode)
 
 int buffer::list::write_fd(int fd) const
 {
+  if (can_zero_copy())
+    return write_fd_zero_copy(fd);
+
   // use writev!
   iovec iov[IOV_MAX];
   int iovlen = 0;
   ssize_t bytes = 0;
 
-  std::list<ptr>::const_iterator p = _buffers.begin(); 
+  std::list<ptr>::const_iterator p = _buffers.begin();
   while (p != _buffers.end()) {
     if (p->length() > 0) {
       iov[iovlen].iov_base = (void *)p->c_str();
@@ -1440,6 +1514,30 @@ int buffer::list::write_fd(int fd) const
   return 0;
 }
 
+int buffer::list::write_fd_zero_copy(int fd) const
+{
+  if (!can_zero_copy())
+    return -ENOTSUP;
+  /* pass offset to each call to avoid races updating the fd seek
+   * position, since the I/O may be non-blocking
+   */
+  loff_t offset = ::lseek(fd, 0, SEEK_CUR);
+  loff_t *off_p = &offset;
+  if (offset < 0 && offset != ESPIPE)
+    return (int) offset;
+  if (offset == ESPIPE)
+    off_p = NULL;
+  for (std::list<ptr>::const_iterator it = _buffers.begin();
+       it != _buffers.end(); ++it) {
+    int r = it->zero_copy_to_fd(fd, off_p);
+    if (r < 0)
+      return r;
+    if (off_p)
+      offset += it->length();
+  }
+  return 0;
+}
+
 __u32 buffer::list::crc32c(__u32 crc) const
 {
   for (std::list<ptr>::const_iterator it = _buffers.begin();
index 546f728bbdc98b73ca15649d2f6d2117ccf40649..6eefd3494cfc7724262151b21491131f7759d242 100644 (file)
@@ -211,6 +211,9 @@ public:
       memcpy(dest, c_str()+o, l);
     }
 
+    bool can_zero_copy() const;
+    int zero_copy_to_fd(int fd, loff_t *offset) const;
+
     unsigned wasted();
 
     int cmp(const ptr& o);
@@ -310,6 +313,7 @@ public:
 
   private:
     mutable iterator last_p;
+    int zero_copy_to_fd(int fd) const;
 
   public:
     // cons/des
@@ -347,6 +351,7 @@ public:
     }
     bool contents_equal(buffer::list& other);
 
+    bool can_zero_copy() const;
     bool is_page_aligned() const;
     bool is_n_page_sized() const;
 
@@ -434,8 +439,10 @@ public:
     void hexdump(std::ostream &out) const;
     int read_file(const char *fn, std::string *error);
     ssize_t read_fd(int fd, size_t len);
+    int read_fd_zero_copy(int fd, size_t len);
     int write_file(const char *fn, int mode=0644);
     int write_fd(int fd) const;
+    int write_fd_zero_copy(int fd) const;
     uint32_t crc32c(uint32_t crc) const;
   };
 
index 61db1ca33467fb239b340e2bc242b77d64df887f..4b3ae616726714f4208aeacfd300f2f284164b01 100644 (file)
@@ -280,6 +280,37 @@ TEST_F(TestRawPipe, c_str_dest_short_explicit_offset) {
   EXPECT_EQ(2u, ptr.length());
   EXPECT_EQ(0, memcmp(ptr.c_str(), "BC", 2));
 }
+
+TEST_F(TestRawPipe, buffer_list_read_fd_zero_copy) {
+  bufferlist bl;
+  EXPECT_EQ(-EBADF, bl.read_fd_zero_copy(-1, len));
+  bl = bufferlist();
+  EXPECT_EQ(0, bl.read_fd_zero_copy(fd, len));
+  EXPECT_EQ(len, bl.length());
+  EXPECT_EQ(0u, bl.buffers().front().unused_tail_length());
+  EXPECT_EQ(1u, bl.buffers().size());
+  EXPECT_EQ(len, bl.buffers().front().raw_length());
+  EXPECT_EQ(0, memcmp(bl.c_str(), "ABC\n", len));
+  EXPECT_TRUE(bl.can_zero_copy());
+}
+
+TEST_F(TestRawPipe, buffer_list_write_fd_zero_copy) {
+  ::unlink("testfile_out");
+  bufferlist bl;
+  EXPECT_EQ(0, bl.read_fd_zero_copy(fd, len));
+  EXPECT_TRUE(bl.can_zero_copy());
+  int out_fd = ::open("testfile_out", O_RDWR|O_CREAT|O_TRUNC, 0600);
+  EXPECT_EQ(0, bl.write_fd_zero_copy(out_fd));
+  struct stat st;
+  memset(&st, 0, sizeof(st));
+  EXPECT_EQ(0, ::stat("testfile_out", &st));
+  EXPECT_EQ(len, st.st_size);
+  char buf[len + 1];
+  EXPECT_EQ(len, safe_read(out_fd, buf, len + 1));
+  EXPECT_EQ(0, memcmp(buf, "ABC\n", len));
+  ::close(out_fd);
+  ::unlink("testfile_out");
+}
 #endif // CEPH_HAVE_SPLICE
 
 //