]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
buffer: create raw pipe-based buffer
authorJosh Durgin <josh.durgin@inktank.com>
Mon, 21 Oct 2013 15:58:56 +0000 (08:58 -0700)
committerJosh Durgin <josh.durgin@inktank.com>
Sat, 23 Nov 2013 00:14:03 +0000 (16:14 -0800)
This uses a pipe to reference kernel memory so we can use splice(2) to
avoid extra data copies. Take an fd in the factory to create it, since
that's the only way to use it efficiently, which is its whole purpose.

Signed-off-by: Josh Durgin <josh.durgin@inktank.com>
src/common/buffer.cc
src/include/buffer.h
src/test/bufferlist.cc

index 7ac54390943a559660a68680e6765bc01d7bb81a..cf20080fe5666690576616eea71adcfb6ccc68ce 100644 (file)
@@ -228,6 +228,125 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
   };
 #endif
 
+#ifdef CEPH_HAVE_SPLICE
+  class buffer::raw_pipe : public buffer::raw {
+  public:
+    raw_pipe(unsigned len) : raw(len), source_consumed(false) {
+      pipefds[0] = -1;
+      pipefds[1] = -1;
+      if (::pipe(pipefds) == -1) {
+       int r = -errno;
+       bdout << "raw_pipe: error creating pipe: " << cpp_strerror(r) << bendl;
+       throw error_code(r);
+      }
+      if (set_nonblocking(pipefds) < 0) {
+       int r = -errno;
+       bdout << "raw_pipe: error setting nonblocking flag on temp pipe: "
+             << cpp_strerror(r) << bendl;
+       throw error_code(r);
+      }
+      inc_total_alloc(len);
+      bdout << "raw_pipe " << this << " alloc " << len << " "
+           << buffer::get_total_alloc() << bendl;
+    }
+    ~raw_pipe() {
+      if (data)
+       delete data;
+      close_pipe(pipefds);
+      dec_total_alloc(len);
+      bdout << "raw_pipe " << this << " free " << (void *)data << " "
+           << buffer::get_total_alloc() << bendl;
+    }
+      return false;
+    }
+    int set_source(int fd, loff_t *off) {
+      int flags = SPLICE_F_NONBLOCK;
+      ssize_t r = safe_splice(fd, off, pipefds[1], NULL, len, flags);
+      if (r < 0) {
+       bdout << "raw_pipe: error splicing into pipe: " << cpp_strerror(r)
+             << bendl;
+       return r;
+      }
+      // update length with actual amount read
+      len = r;
+      return 0;
+    }
+      // cloning doesn't make sense for pipe-based buffers,
+      // and is only used by unit tests for other types of buffers
+      return NULL;
+    }
+    char *get_data() {
+      if (data)
+       return data;
+      return copy_pipe(pipefds);
+    }
+  private:
+    int set_nonblocking(int *fds) {
+      if (::fcntl(fds[0], F_SETFL, O_NONBLOCK) == -1)
+       return -errno;
+      if (::fcntl(fds[1], F_SETFL, O_NONBLOCK) == -1)
+       return -errno;
+      return 0;
+    }
+
+    void close_pipe(int *fds) {
+      if (fds[0] >= 0)
+       TEMP_FAILURE_RETRY(::close(fds[0]));
+      if (fds[1] >= 0)
+       TEMP_FAILURE_RETRY(::close(fds[1]));
+    }
+    char *copy_pipe(int *fds) {
+      /* preserve original pipe contents by copying into a temporary
+       * pipe before reading.
+       */
+      int tmpfd[2];
+      int r;
+
+      assert(!source_consumed);
+      assert(fds[0] >= 0);
+
+      if (::pipe(tmpfd) == -1) {
+       r = -errno;
+       bdout << "raw_pipe: error creating temp pipe: " << cpp_strerror(r)
+             << bendl;
+       throw error_code(r);
+      }
+      r = set_nonblocking(tmpfd);
+      if (r < 0) {
+       bdout << "raw_pipe: error setting nonblocking flag on temp pipe: "
+             << cpp_strerror(r) << bendl;
+       throw error_code(r);
+      }
+      int flags = SPLICE_F_NONBLOCK;
+      if (::tee(fds[0], tmpfd[1], len, flags) == -1) {
+       r = errno;
+       bdout << "raw_pipe: error tee'ing into temp pipe: " << cpp_strerror(r)
+             << bendl;
+       close_pipe(tmpfd);
+       throw error_code(r);
+      }
+      data = (char *)malloc(len);
+      if (!data) {
+       close_pipe(tmpfd);
+       throw bad_alloc();
+      }
+      r = safe_read(tmpfd[0], data, len);
+      if (r < (ssize_t)len) {
+       bdout << "raw_pipe: error reading from temp pipe:" << cpp_strerror(r)
+             << bendl;
+       delete data;
+       data = NULL;
+       close_pipe(tmpfd);
+       throw error_code(r);
+      }
+      close_pipe(tmpfd);
+      return data;
+    }
+    bool source_consumed;
+    int pipefds[2];
+  };
+#endif // CEPH_HAVE_SPLICE
+
   /*
    * primitive buffer types
    */
index 3987f021601cdfbd743ae3d0d7c2a0e4814175c0..546f728bbdc98b73ca15649d2f6d2117ccf40649 100644 (file)
@@ -137,6 +137,7 @@ private:
   class raw_posix_aligned;
   class raw_hack_aligned;
   class raw_char;
+  class raw_pipe;
 
   friend std::ostream& operator<<(std::ostream& out, const raw &r);
 
@@ -152,8 +153,8 @@ public:
   static raw* claim_malloc(unsigned len, char *buf);
   static raw* create_static(unsigned len, char *buf);
   static raw* create_page_aligned(unsigned len);
-  
-  
+  static raw* create_zero_copy(unsigned len, int fd, loff_t *offset);
+
   /*
    * a buffer pointer.  references (a subsequence of) a raw buffer.
    */
index 8b6ca269234ff59e7c2292b615aa5a88ee65ea3f..61db1ca33467fb239b340e2bc242b77d64df887f 100644 (file)
@@ -29,6 +29,7 @@
 #include "include/encoding.h"
 #include "common/environment.h"
 #include "common/Clock.h"
+#include "common/safe_io.h"
 
 #include "gtest/gtest.h"
 #include "stdlib.h"
@@ -141,6 +142,25 @@ TEST(Buffer, constructors) {
     bufferptr clone = ptr.clone();
     EXPECT_EQ(0, ::memcmp(clone.c_str(), ptr.c_str(), len));
   }
+#ifdef CEPH_HAVE_SPLICE
+  if (ceph_buffer_track)
+    EXPECT_EQ(0, buffer::get_total_alloc());
+  {
+    // no fd
+    EXPECT_THROW(buffer::create_zero_copy(len, -1, NULL), buffer::error_code);
+
+    unsigned zc_len = 4;
+    ::unlink("testfile");
+    ::system("echo ABC > testfile");
+    int fd = ::open("testfile", O_RDONLY);
+    bufferptr ptr(buffer::create_zero_copy(zc_len, fd, NULL));
+    EXPECT_EQ(zc_len, ptr.length());
+    if (ceph_buffer_track)
+      EXPECT_EQ(zc_len, (unsigned)buffer::get_total_alloc());
+    ::close(fd);
+    ::unlink("testfile");
+  }
+#endif
   if (ceph_buffer_track)
     EXPECT_EQ(0, buffer::get_total_alloc());
 }
@@ -153,6 +173,115 @@ TEST(BufferRaw, ostream) {
   EXPECT_GT(stream.str().size(), stream.str().find("len 1 nref 1)"));
 }
 
+#ifdef CEPH_HAVE_SPLICE
+class TestRawPipe : public ::testing::Test {
+protected:
+  virtual void SetUp() {
+    len = 4;
+    ::unlink("testfile");
+    ::system("echo ABC > testfile");
+    fd = ::open("testfile", O_RDONLY);
+    assert(fd >= 0);
+  }
+  virtual void TearDown() {
+    ::close(fd);
+    ::unlink("testfile");
+  }
+  int fd;
+  unsigned len;
+};
+
+TEST_F(TestRawPipe, create_zero_copy) {
+  bufferptr ptr(buffer::create_zero_copy(len, fd, NULL));
+  EXPECT_EQ(len, ptr.length());
+  if (get_env_bool("CEPH_BUFFER_TRACK"))
+    EXPECT_EQ(len, (unsigned)buffer::get_total_alloc());
+}
+
+TEST_F(TestRawPipe, c_str_no_fd) {
+  EXPECT_THROW(bufferptr ptr(buffer::create_zero_copy(len, -1, NULL)),
+              buffer::error_code);
+}
+
+TEST_F(TestRawPipe, c_str_basic) {
+  bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL));
+  EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len));
+  EXPECT_EQ(len, ptr.length());
+}
+
+TEST_F(TestRawPipe, c_str_twice) {
+  // make sure we're creating a copy of the data and not consuming it
+  bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL));
+  EXPECT_EQ(len, ptr.length());
+  EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len));
+  EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len));
+}
+
+TEST_F(TestRawPipe, c_str_basic_offset) {
+  loff_t offset = len - 1;
+  ::lseek(fd, offset, SEEK_SET);
+  bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd, NULL));
+  EXPECT_EQ(len - offset, ptr.length());
+  EXPECT_EQ(0, memcmp(ptr.c_str(), "\n", len - offset));
+}
+
+TEST_F(TestRawPipe, c_str_dest_short) {
+  ::lseek(fd, 1, SEEK_SET);
+  bufferptr ptr = bufferptr(buffer::create_zero_copy(2, fd, NULL));
+  EXPECT_EQ(2u, ptr.length());
+  EXPECT_EQ(0, memcmp(ptr.c_str(), "BC", 2));
+}
+
+TEST_F(TestRawPipe, c_str_source_short) {
+  ::lseek(fd, 1, SEEK_SET);
+  bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL));
+  EXPECT_EQ(len - 1, ptr.length());
+  EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1));
+}
+
+TEST_F(TestRawPipe, c_str_explicit_zero_offset) {
+  loff_t offset = 0;
+  ::lseek(fd, 1, SEEK_SET);
+  bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, &offset));
+  EXPECT_EQ(len, offset);
+  EXPECT_EQ(len, ptr.length());
+  EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len));
+}
+
+TEST_F(TestRawPipe, c_str_explicit_positive_offset) {
+  loff_t offset = 1;
+  bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd,
+                                                    &offset));
+  EXPECT_EQ(len, offset);
+  EXPECT_EQ(len - 1, ptr.length());
+  EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1));
+}
+
+TEST_F(TestRawPipe, c_str_explicit_positive_empty_result) {
+  loff_t offset = len;
+  bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd,
+                                                    &offset));
+  EXPECT_EQ(len, offset);
+  EXPECT_EQ(0u, ptr.length());
+}
+
+TEST_F(TestRawPipe, c_str_source_short_explicit_offset) {
+  loff_t offset = 1;
+  bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, &offset));
+  EXPECT_EQ(len, offset);
+  EXPECT_EQ(len - 1, ptr.length());
+  EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1));
+}
+
+TEST_F(TestRawPipe, c_str_dest_short_explicit_offset) {
+  loff_t offset = 1;
+  bufferptr ptr = bufferptr(buffer::create_zero_copy(2, fd, &offset));
+  EXPECT_EQ(3, offset);
+  EXPECT_EQ(2u, ptr.length());
+  EXPECT_EQ(0, memcmp(ptr.c_str(), "BC", 2));
+}
+#endif // CEPH_HAVE_SPLICE
+
 //                                     
 // +-----------+                +-----+
 // |           |                |     |