]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
buffer: attempt to size raw_pipe buffers
authorJosh Durgin <josh.durgin@inktank.com>
Fri, 18 Oct 2013 14:46:34 +0000 (07:46 -0700)
committerJosh Durgin <josh.durgin@inktank.com>
Sat, 23 Nov 2013 00:14:03 +0000 (16:14 -0800)
Make sure the requested length is below the maximum pipe size for now,
since we're only using one pipe and splicing once into and out of
it. The default max is 1MB on recent kernels, so this isn't such a
terrible limitation.

To get around this we could use multiple pipes, or keep both source and
destination fds open at the same time and call splice many times. This
is more usual usage for splice, but would require a lot more work to
restructure the filestore and messenger to handle it.

Signed-off-by: Josh Durgin <josh.durgin@inktank.com>
configure.ac
src/common/buffer.cc

index 91c623abdaf162e06f306982d25ec70ea3daf524..3c3e227bed86bec6c1643d3fdfa3852e8dc3e0fb 100644 (file)
@@ -541,7 +541,15 @@ AC_CHECK_FUNC([splice],
        [AC_DEFINE([CEPH_HAVE_SPLICE], [], [splice(2) is supported])],
        [])
 
+
 AC_CHECK_HEADERS([arpa/nameser_compat.h])
+
+AC_COMPILE_IFELSE([AC_LANG_SOURCE([[#include <fcntl.h>
+F_SETPIPE_SZ]])],
+       [AC_DEFINE([CEPH_HAVE_SETPIPE_SZ], [], [F_SETPIPE_SZ is supported])],
+       [AC_MSG_NOTICE(["F_SETPIPE_SZ not found, zero-copy may be less efficent"])])
+
+
 AC_CHECK_HEADERS([sys/prctl.h])
 AC_CHECK_FUNCS([prctl])
 
index ed0c3a96a96f15e45c8a9e93c712eb7341ae3cef..650b3fbb331eec5b4be8088185158311a8b30cbd 100644 (file)
@@ -18,6 +18,7 @@
 #include "common/errno.h"
 #include "common/safe_io.h"
 #include "common/simple_spin.h"
+#include "common/strtol.h"
 #include "include/atomic.h"
 #include "include/types.h"
 #include "include/compat.h"
@@ -69,6 +70,40 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
     return buffer_cached_crc_adjusted.read();
   }
 
+  atomic_t buffer_max_pipe_size;
+  int update_max_pipe_size() {
+#ifdef CEPH_HAVE_SETPIPE_SZ
+    char buf[32];
+    int r;
+    std::string err;
+    struct stat stat_result;
+    if (::stat("/proc/sys/fs/pipe-max-size", &stat_result) == -1)
+      return -errno;
+    r = safe_read_file("/proc/sys/fs/", "pipe-max-size",
+                      buf, sizeof(buf) - 1);
+    if (r < 0)
+      return r;
+    buf[r] = '\0';
+    size = strict_strtol(buf, 10, &err);
+    if (!err.empty())
+      return -EIO;
+    buffer_max_pipe_size.set(size);
+#endif
+    return 0;
+  }
+
+  size_t get_max_pipe_size() {
+#ifdef CEPH_HAVE_SETPIPE_SZ
+    size_t size = buffer_max_pipe_size.read();
+    if (size)
+      return size;
+    if (update_max_pipe_size() == 0)
+      return buffer_max_pipe_size.read()
+#endif
+    // this is the max size hardcoded in linux before 2.6.35
+    return 65536;
+  }
+
   buffer::error_code::error_code(int error) :
     buffer::malformed_input(cpp_strerror(error).c_str()), code(error) {}
 
@@ -238,23 +273,40 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
   class buffer::raw_pipe : public buffer::raw {
   public:
     raw_pipe(unsigned len) : raw(len), source_consumed(false) {
+      size_t max = get_max_pipe_size();
+      if (len > max) {
+       bdout << "raw_pipe: requested length " << len
+             << " > max length " << max << bendl;
+       throw malformed_input("length larger than max pipe size");
+      }
       pipefds[0] = -1;
       pipefds[1] = -1;
+
+      int r;
       if (::pipe(pipefds) == -1) {
-       int r = -errno;
+       r = -errno;
        bdout << "raw_pipe: error creating pipe: " << cpp_strerror(r) << bendl;
        throw error_code(r);
       }
-      if (set_nonblocking(pipefds) < 0) {
-       int r = -errno;
+
+      r = set_nonblocking(pipefds);
+      if (r < 0) {
        bdout << "raw_pipe: error setting nonblocking flag on temp pipe: "
              << cpp_strerror(r) << bendl;
        throw error_code(r);
       }
+
+      r = set_pipe_size(pipefds, len);
+      if (r < 0) {
+       bdout << "raw_pipe: could not set pipe size" << bendl;
+       // continue, since the pipe should become large enough as needed
+      }
+
       inc_total_alloc(len);
       bdout << "raw_pipe " << this << " alloc " << len << " "
            << buffer::get_total_alloc() << bendl;
     }
+
     ~raw_pipe() {
       if (data)
        delete data;
@@ -263,12 +315,15 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
       bdout << "raw_pipe " << this << " free " << (void *)data << " "
            << buffer::get_total_alloc() << bendl;
     }
+
     bool can_zero_copy() const {
       return true;
     }
+
     bool is_page_aligned() {
       return false;
     }
+
     int set_source(int fd, loff_t *off) {
       int flags = SPLICE_F_NONBLOCK;
       ssize_t r = safe_splice(fd, off, pipefds[1], NULL, len, flags);
@@ -281,6 +336,7 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
       len = r;
       return 0;
     }
+
     int zero_copy_to_fd(int fd, loff_t *offset) {
       assert(!source_consumed);
       int flags = SPLICE_F_NONBLOCK;
@@ -293,17 +349,36 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
       source_consumed = true;
       return 0;
     }
+
     buffer::raw* clone_empty() {
       // cloning doesn't make sense for pipe-based buffers,
       // and is only used by unit tests for other types of buffers
       return NULL;
     }
+
     char *get_data() {
       if (data)
        return data;
       return copy_pipe(pipefds);
     }
+
   private:
+    int set_pipe_size(int *fds, long length) {
+#ifdef CEPH_HAVE_SETPIPE_SZ
+      if (::fcntl(fds[1], F_SETPIPE_SZ, length) == -1) {
+       int r = -errno;
+       if (r == -EPERM) {
+         // pipe limit must have changed - EPERM means we requested
+         // more than the maximum size as an unprivileged user
+         update_max_pipe_size();
+         throw malformed_input("length larger than new max pipe size");
+       }
+       return r;
+      }
+#endif
+      return 0;
+    }
+
     int set_nonblocking(int *fds) {
       if (::fcntl(fds[0], F_SETFL, O_NONBLOCK) == -1)
        return -errno;
@@ -340,6 +415,11 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
              << cpp_strerror(r) << bendl;
        throw error_code(r);
       }
+      r = set_pipe_size(tmpfd, len);
+      if (r < 0) {
+       bdout << "raw_pipe: error setting pipe size on temp pipe: "
+             << cpp_strerror(r) << bendl;
+      }
       int flags = SPLICE_F_NONBLOCK;
       if (::tee(fds[0], tmpfd[1], len, flags) == -1) {
        r = errno;