From: Josh Durgin Date: Fri, 18 Oct 2013 14:46:34 +0000 (-0700) Subject: buffer: attempt to size raw_pipe buffers X-Git-Tag: v0.74~65^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=be29b3471c3f92d28b7aa05449e89ea83603e295;p=ceph.git buffer: attempt to size raw_pipe buffers Make sure the requested length is below the maximum pipe size for now, since we're only using one pipe and splicing once into and out of it. The default max is 1MB on recent kernels, so this isn't such a terrible limitation. To get around this we could use multiple pipes, or keep both source and destination fds open at the same time and call splice many times. This is more usual usage for splice, but would require a lot more work to restructure the filestore and messenger to handle it. Signed-off-by: Josh Durgin --- diff --git a/configure.ac b/configure.ac index 91c623abdaf1..3c3e227bed86 100644 --- a/configure.ac +++ b/configure.ac @@ -541,7 +541,15 @@ AC_CHECK_FUNC([splice], [AC_DEFINE([CEPH_HAVE_SPLICE], [], [splice(2) is supported])], []) + AC_CHECK_HEADERS([arpa/nameser_compat.h]) + +AC_COMPILE_IFELSE([AC_LANG_SOURCE([[#include +F_SETPIPE_SZ]])], + [AC_DEFINE([CEPH_HAVE_SETPIPE_SZ], [], [F_SETPIPE_SZ is supported])], + [AC_MSG_NOTICE(["F_SETPIPE_SZ not found, zero-copy may be less efficent"])]) + + AC_CHECK_HEADERS([sys/prctl.h]) AC_CHECK_FUNCS([prctl]) diff --git a/src/common/buffer.cc b/src/common/buffer.cc index ed0c3a96a96f..650b3fbb331e 100644 --- a/src/common/buffer.cc +++ b/src/common/buffer.cc @@ -18,6 +18,7 @@ #include "common/errno.h" #include "common/safe_io.h" #include "common/simple_spin.h" +#include "common/strtol.h" #include "include/atomic.h" #include "include/types.h" #include "include/compat.h" @@ -69,6 +70,40 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE return buffer_cached_crc_adjusted.read(); } + atomic_t buffer_max_pipe_size; + int update_max_pipe_size() { +#ifdef CEPH_HAVE_SETPIPE_SZ + char buf[32]; + int r; + std::string err; + struct stat stat_result; + if (::stat("/proc/sys/fs/pipe-max-size", &stat_result) == -1) + return -errno; + r = safe_read_file("/proc/sys/fs/", "pipe-max-size", + buf, sizeof(buf) - 1); + if (r < 0) + return r; + buf[r] = '\0'; + size = strict_strtol(buf, 10, &err); + if (!err.empty()) + return -EIO; + buffer_max_pipe_size.set(size); +#endif + return 0; + } + + size_t get_max_pipe_size() { +#ifdef CEPH_HAVE_SETPIPE_SZ + size_t size = buffer_max_pipe_size.read(); + if (size) + return size; + if (update_max_pipe_size() == 0) + return buffer_max_pipe_size.read() +#endif + // this is the max size hardcoded in linux before 2.6.35 + return 65536; + } + buffer::error_code::error_code(int error) : buffer::malformed_input(cpp_strerror(error).c_str()), code(error) {} @@ -238,23 +273,40 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE class buffer::raw_pipe : public buffer::raw { public: raw_pipe(unsigned len) : raw(len), source_consumed(false) { + size_t max = get_max_pipe_size(); + if (len > max) { + bdout << "raw_pipe: requested length " << len + << " > max length " << max << bendl; + throw malformed_input("length larger than max pipe size"); + } pipefds[0] = -1; pipefds[1] = -1; + + int r; if (::pipe(pipefds) == -1) { - int r = -errno; + r = -errno; bdout << "raw_pipe: error creating pipe: " << cpp_strerror(r) << bendl; throw error_code(r); } - if (set_nonblocking(pipefds) < 0) { - int r = -errno; + + r = set_nonblocking(pipefds); + if (r < 0) { bdout << "raw_pipe: error setting nonblocking flag on temp pipe: " << cpp_strerror(r) << bendl; throw error_code(r); } + + r = set_pipe_size(pipefds, len); + if (r < 0) { + bdout << "raw_pipe: could not set pipe size" << bendl; + // continue, since the pipe should become large enough as needed + } + inc_total_alloc(len); bdout << "raw_pipe " << this << " alloc " << len << " " << buffer::get_total_alloc() << bendl; } + ~raw_pipe() { if (data) delete data; @@ -263,12 +315,15 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE bdout << "raw_pipe " << this << " free " << (void *)data << " " << buffer::get_total_alloc() << bendl; } + bool can_zero_copy() const { return true; } + bool is_page_aligned() { return false; } + int set_source(int fd, loff_t *off) { int flags = SPLICE_F_NONBLOCK; ssize_t r = safe_splice(fd, off, pipefds[1], NULL, len, flags); @@ -281,6 +336,7 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE len = r; return 0; } + int zero_copy_to_fd(int fd, loff_t *offset) { assert(!source_consumed); int flags = SPLICE_F_NONBLOCK; @@ -293,17 +349,36 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE source_consumed = true; return 0; } + buffer::raw* clone_empty() { // cloning doesn't make sense for pipe-based buffers, // and is only used by unit tests for other types of buffers return NULL; } + char *get_data() { if (data) return data; return copy_pipe(pipefds); } + private: + int set_pipe_size(int *fds, long length) { +#ifdef CEPH_HAVE_SETPIPE_SZ + if (::fcntl(fds[1], F_SETPIPE_SZ, length) == -1) { + int r = -errno; + if (r == -EPERM) { + // pipe limit must have changed - EPERM means we requested + // more than the maximum size as an unprivileged user + update_max_pipe_size(); + throw malformed_input("length larger than new max pipe size"); + } + return r; + } +#endif + return 0; + } + int set_nonblocking(int *fds) { if (::fcntl(fds[0], F_SETFL, O_NONBLOCK) == -1) return -errno; @@ -340,6 +415,11 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE << cpp_strerror(r) << bendl; throw error_code(r); } + r = set_pipe_size(tmpfd, len); + if (r < 0) { + bdout << "raw_pipe: error setting pipe size on temp pipe: " + << cpp_strerror(r) << bendl; + } int flags = SPLICE_F_NONBLOCK; if (::tee(fds[0], tmpfd[1], len, flags) == -1) { r = errno;