#include "common/errno.h"
#include "common/safe_io.h"
#include "common/simple_spin.h"
+#include "common/strtol.h"
#include "include/atomic.h"
#include "include/types.h"
#include "include/compat.h"
return buffer_cached_crc_adjusted.read();
}
+ atomic_t buffer_max_pipe_size;
+ int update_max_pipe_size() {
+#ifdef CEPH_HAVE_SETPIPE_SZ
+ char buf[32];
+ int r;
+ std::string err;
+ struct stat stat_result;
+ if (::stat("/proc/sys/fs/pipe-max-size", &stat_result) == -1)
+ return -errno;
+ r = safe_read_file("/proc/sys/fs/", "pipe-max-size",
+ buf, sizeof(buf) - 1);
+ if (r < 0)
+ return r;
+ buf[r] = '\0';
+ size = strict_strtol(buf, 10, &err);
+ if (!err.empty())
+ return -EIO;
+ buffer_max_pipe_size.set(size);
+#endif
+ return 0;
+ }
+
+ size_t get_max_pipe_size() {
+#ifdef CEPH_HAVE_SETPIPE_SZ
+ size_t size = buffer_max_pipe_size.read();
+ if (size)
+ return size;
+ if (update_max_pipe_size() == 0)
+ return buffer_max_pipe_size.read()
+#endif
+ // this is the max size hardcoded in linux before 2.6.35
+ return 65536;
+ }
+
buffer::error_code::error_code(int error) :
buffer::malformed_input(cpp_strerror(error).c_str()), code(error) {}
class buffer::raw_pipe : public buffer::raw {
public:
raw_pipe(unsigned len) : raw(len), source_consumed(false) {
+ size_t max = get_max_pipe_size();
+ if (len > max) {
+ bdout << "raw_pipe: requested length " << len
+ << " > max length " << max << bendl;
+ throw malformed_input("length larger than max pipe size");
+ }
pipefds[0] = -1;
pipefds[1] = -1;
+
+ int r;
if (::pipe(pipefds) == -1) {
- int r = -errno;
+ r = -errno;
bdout << "raw_pipe: error creating pipe: " << cpp_strerror(r) << bendl;
throw error_code(r);
}
- if (set_nonblocking(pipefds) < 0) {
- int r = -errno;
+
+ r = set_nonblocking(pipefds);
+ if (r < 0) {
bdout << "raw_pipe: error setting nonblocking flag on temp pipe: "
<< cpp_strerror(r) << bendl;
throw error_code(r);
}
+
+ r = set_pipe_size(pipefds, len);
+ if (r < 0) {
+ bdout << "raw_pipe: could not set pipe size" << bendl;
+ // continue, since the pipe should become large enough as needed
+ }
+
inc_total_alloc(len);
bdout << "raw_pipe " << this << " alloc " << len << " "
<< buffer::get_total_alloc() << bendl;
}
+
~raw_pipe() {
if (data)
delete data;
bdout << "raw_pipe " << this << " free " << (void *)data << " "
<< buffer::get_total_alloc() << bendl;
}
+
bool can_zero_copy() const {
return true;
}
+
bool is_page_aligned() {
return false;
}
+
int set_source(int fd, loff_t *off) {
int flags = SPLICE_F_NONBLOCK;
ssize_t r = safe_splice(fd, off, pipefds[1], NULL, len, flags);
len = r;
return 0;
}
+
int zero_copy_to_fd(int fd, loff_t *offset) {
assert(!source_consumed);
int flags = SPLICE_F_NONBLOCK;
source_consumed = true;
return 0;
}
+
buffer::raw* clone_empty() {
// cloning doesn't make sense for pipe-based buffers,
// and is only used by unit tests for other types of buffers
return NULL;
}
+
char *get_data() {
if (data)
return data;
return copy_pipe(pipefds);
}
+
private:
+ int set_pipe_size(int *fds, long length) {
+#ifdef CEPH_HAVE_SETPIPE_SZ
+ if (::fcntl(fds[1], F_SETPIPE_SZ, length) == -1) {
+ int r = -errno;
+ if (r == -EPERM) {
+ // pipe limit must have changed - EPERM means we requested
+ // more than the maximum size as an unprivileged user
+ update_max_pipe_size();
+ throw malformed_input("length larger than new max pipe size");
+ }
+ return r;
+ }
+#endif
+ return 0;
+ }
+
int set_nonblocking(int *fds) {
if (::fcntl(fds[0], F_SETFL, O_NONBLOCK) == -1)
return -errno;
<< cpp_strerror(r) << bendl;
throw error_code(r);
}
+ r = set_pipe_size(tmpfd, len);
+ if (r < 0) {
+ bdout << "raw_pipe: error setting pipe size on temp pipe: "
+ << cpp_strerror(r) << bendl;
+ }
int flags = SPLICE_F_NONBLOCK;
if (::tee(fds[0], tmpfd[1], len, flags) == -1) {
r = errno;