};
#endif
-#ifdef CEPH_HAVE_SPLICE
- class buffer::raw_pipe : public buffer::raw {
- public:
- MEMPOOL_CLASS_HELPERS();
-
- explicit raw_pipe(unsigned len) : raw(len), source_consumed(false) {
- size_t max = get_max_pipe_size();
- if (len > max) {
- bdout << "raw_pipe: requested length " << len
- << " > max length " << max << bendl;
- throw malformed_input("length larger than max pipe size");
- }
- pipefds[0] = -1;
- pipefds[1] = -1;
-
- int r;
- if (::pipe(pipefds) == -1) {
- r = -errno;
- bdout << "raw_pipe: error creating pipe: " << cpp_strerror(r) << bendl;
- throw error_code(r);
- }
-
- r = set_nonblocking(pipefds);
- if (r < 0) {
- bdout << "raw_pipe: error setting nonblocking flag on temp pipe: "
- << cpp_strerror(r) << bendl;
- throw error_code(r);
- }
-
- r = set_pipe_size(pipefds, len);
- if (r < 0) {
- bdout << "raw_pipe: could not set pipe size" << bendl;
- // continue, since the pipe should become large enough as needed
- }
-
- inc_total_alloc(len);
- inc_history_alloc(len);
- bdout << "raw_pipe " << this << " alloc " << len << " "
- << buffer::get_total_alloc() << bendl;
- }
-
- ~raw_pipe() override {
- if (data)
- free(data);
- close_pipe(pipefds);
- dec_total_alloc(len);
- bdout << "raw_pipe " << this << " free " << (void *)data << " "
- << buffer::get_total_alloc() << bendl;
- }
-
- bool can_zero_copy() const override {
- return true;
- }
-
- int set_source(int fd, loff_t *off) {
- int flags = SPLICE_F_NONBLOCK;
- ssize_t r = safe_splice(fd, off, pipefds[1], NULL, len, flags);
- if (r < 0) {
- bdout << "raw_pipe: error splicing into pipe: " << cpp_strerror(r)
- << bendl;
- return r;
- }
- // update length with actual amount read
- _set_len(r);
- return 0;
- }
-
- int zero_copy_to_fd(int fd, loff_t *offset) override {
- ceph_assert(!source_consumed);
- int flags = SPLICE_F_NONBLOCK;
- ssize_t r = safe_splice_exact(pipefds[0], NULL, fd, offset, len, flags);
- if (r < 0) {
- bdout << "raw_pipe: error splicing from pipe to fd: "
- << cpp_strerror(r) << bendl;
- return r;
- }
- source_consumed = true;
- return 0;
- }
-
- buffer::raw* clone_empty() override {
- // cloning doesn't make sense for pipe-based buffers,
- // and is only used by unit tests for other types of buffers
- return NULL;
- }
-
- char *get_data() override {
- if (data)
- return data;
- return copy_pipe(pipefds);
- }
-
- private:
- int set_pipe_size(int *fds, long length) {
-#ifdef CEPH_HAVE_SETPIPE_SZ
- if (::fcntl(fds[1], F_SETPIPE_SZ, length) == -1) {
- int r = -errno;
- if (r == -EPERM) {
- // pipe limit must have changed - EPERM means we requested
- // more than the maximum size as an unprivileged user
- update_max_pipe_size();
- throw malformed_input("length larger than new max pipe size");
- }
- return r;
- }
-#endif
- return 0;
- }
-
- int set_nonblocking(int *fds) {
- if (::fcntl(fds[0], F_SETFL, O_NONBLOCK) == -1)
- return -errno;
- if (::fcntl(fds[1], F_SETFL, O_NONBLOCK) == -1)
- return -errno;
- return 0;
- }
-
- static void close_pipe(const int fds[2]) {
- if (fds[0] >= 0)
- VOID_TEMP_FAILURE_RETRY(::close(fds[0]));
- if (fds[1] >= 0)
- VOID_TEMP_FAILURE_RETRY(::close(fds[1]));
- }
- char *copy_pipe(int *fds) {
- /* preserve original pipe contents by copying into a temporary
- * pipe before reading.
- */
- int tmpfd[2];
- int r;
-
- ceph_assert(!source_consumed);
- ceph_assert(fds[0] >= 0);
-
- if (::pipe(tmpfd) == -1) {
- r = -errno;
- bdout << "raw_pipe: error creating temp pipe: " << cpp_strerror(r)
- << bendl;
- throw error_code(r);
- }
- auto sg = make_scope_guard([=] { close_pipe(tmpfd); });
- r = set_nonblocking(tmpfd);
- if (r < 0) {
- bdout << "raw_pipe: error setting nonblocking flag on temp pipe: "
- << cpp_strerror(r) << bendl;
- throw error_code(r);
- }
- r = set_pipe_size(tmpfd, len);
- if (r < 0) {
- bdout << "raw_pipe: error setting pipe size on temp pipe: "
- << cpp_strerror(r) << bendl;
- }
- int flags = SPLICE_F_NONBLOCK;
- if (::tee(fds[0], tmpfd[1], len, flags) == -1) {
- r = errno;
- bdout << "raw_pipe: error tee'ing into temp pipe: " << cpp_strerror(r)
- << bendl;
- throw error_code(r);
- }
- data = (char *)malloc(len);
- if (!data) {
- throw bad_alloc();
- }
- r = safe_read(tmpfd[0], data, len);
- if (r < (ssize_t)len) {
- bdout << "raw_pipe: error reading from temp pipe:" << cpp_strerror(r)
- << bendl;
- free(data);
- data = NULL;
- throw error_code(r);
- }
- return data;
- }
- bool source_consumed;
- int pipefds[2];
- };
-#endif // CEPH_HAVE_SPLICE
-
/*
* primitive buffer types
*/
return create_aligned(len, CEPH_PAGE_SIZE);
}
- buffer::raw* buffer::create_zero_copy(unsigned len, int fd, int64_t *offset) {
-#ifdef CEPH_HAVE_SPLICE
- buffer::raw_pipe* buf = new raw_pipe(len);
- int r = buf->set_source(fd, (loff_t*)offset);
- if (r < 0) {
- delete buf;
- throw error_code(r);
- }
- return buf;
-#else
- throw error_code(-ENOTSUP);
-#endif
- }
-
buffer::raw* buffer::create_unshareable(unsigned len) {
return new raw_unshareable(len);
}
buffer_meta);
MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_posix_aligned,
buffer_raw_posix_aligned, buffer_meta);
-#ifdef CEPH_HAVE_SPLICE
-MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_pipe, buffer_raw_pipe, buffer_meta);
-#endif
MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_char, buffer_raw_char, buffer_meta);
MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_claimed_char, buffer_raw_claimed_char,
buffer_meta);