};
#endif
+#ifdef CEPH_HAVE_SPLICE
+ class buffer::raw_pipe : public buffer::raw {
+ public:
+ raw_pipe(unsigned len) : raw(len), source_consumed(false) {
+ pipefds[0] = -1;
+ pipefds[1] = -1;
+ if (::pipe(pipefds) == -1) {
+ int r = -errno;
+ bdout << "raw_pipe: error creating pipe: " << cpp_strerror(r) << bendl;
+ throw error_code(r);
+ }
+ if (set_nonblocking(pipefds) < 0) {
+ int r = -errno;
+ bdout << "raw_pipe: error setting nonblocking flag on temp pipe: "
+ << cpp_strerror(r) << bendl;
+ throw error_code(r);
+ }
+ inc_total_alloc(len);
+ bdout << "raw_pipe " << this << " alloc " << len << " "
+ << buffer::get_total_alloc() << bendl;
+ }
+ ~raw_pipe() {
+ if (data)
+ delete data;
+ close_pipe(pipefds);
+ dec_total_alloc(len);
+ bdout << "raw_pipe " << this << " free " << (void *)data << " "
+ << buffer::get_total_alloc() << bendl;
+ }
+ return false;
+ }
+ int set_source(int fd, loff_t *off) {
+ int flags = SPLICE_F_NONBLOCK;
+ ssize_t r = safe_splice(fd, off, pipefds[1], NULL, len, flags);
+ if (r < 0) {
+ bdout << "raw_pipe: error splicing into pipe: " << cpp_strerror(r)
+ << bendl;
+ return r;
+ }
+ // update length with actual amount read
+ len = r;
+ return 0;
+ }
+ // cloning doesn't make sense for pipe-based buffers,
+ // and is only used by unit tests for other types of buffers
+ return NULL;
+ }
+ char *get_data() {
+ if (data)
+ return data;
+ return copy_pipe(pipefds);
+ }
+ private:
+ int set_nonblocking(int *fds) {
+ if (::fcntl(fds[0], F_SETFL, O_NONBLOCK) == -1)
+ return -errno;
+ if (::fcntl(fds[1], F_SETFL, O_NONBLOCK) == -1)
+ return -errno;
+ return 0;
+ }
+
+ void close_pipe(int *fds) {
+ if (fds[0] >= 0)
+ TEMP_FAILURE_RETRY(::close(fds[0]));
+ if (fds[1] >= 0)
+ TEMP_FAILURE_RETRY(::close(fds[1]));
+ }
+ char *copy_pipe(int *fds) {
+ /* preserve original pipe contents by copying into a temporary
+ * pipe before reading.
+ */
+ int tmpfd[2];
+ int r;
+
+ assert(!source_consumed);
+ assert(fds[0] >= 0);
+
+ if (::pipe(tmpfd) == -1) {
+ r = -errno;
+ bdout << "raw_pipe: error creating temp pipe: " << cpp_strerror(r)
+ << bendl;
+ throw error_code(r);
+ }
+ r = set_nonblocking(tmpfd);
+ if (r < 0) {
+ bdout << "raw_pipe: error setting nonblocking flag on temp pipe: "
+ << cpp_strerror(r) << bendl;
+ throw error_code(r);
+ }
+ int flags = SPLICE_F_NONBLOCK;
+ if (::tee(fds[0], tmpfd[1], len, flags) == -1) {
+ r = errno;
+ bdout << "raw_pipe: error tee'ing into temp pipe: " << cpp_strerror(r)
+ << bendl;
+ close_pipe(tmpfd);
+ throw error_code(r);
+ }
+ data = (char *)malloc(len);
+ if (!data) {
+ close_pipe(tmpfd);
+ throw bad_alloc();
+ }
+ r = safe_read(tmpfd[0], data, len);
+ if (r < (ssize_t)len) {
+ bdout << "raw_pipe: error reading from temp pipe:" << cpp_strerror(r)
+ << bendl;
+ delete data;
+ data = NULL;
+ close_pipe(tmpfd);
+ throw error_code(r);
+ }
+ close_pipe(tmpfd);
+ return data;
+ }
+ bool source_consumed;
+ int pipefds[2];
+ };
+#endif // CEPH_HAVE_SPLICE
+
/*
* primitive buffer types
*/
#include "include/encoding.h"
#include "common/environment.h"
#include "common/Clock.h"
+#include "common/safe_io.h"
#include "gtest/gtest.h"
#include "stdlib.h"
bufferptr clone = ptr.clone();
EXPECT_EQ(0, ::memcmp(clone.c_str(), ptr.c_str(), len));
}
+#ifdef CEPH_HAVE_SPLICE
+ if (ceph_buffer_track)
+ EXPECT_EQ(0, buffer::get_total_alloc());
+ {
+ // no fd
+ EXPECT_THROW(buffer::create_zero_copy(len, -1, NULL), buffer::error_code);
+
+ unsigned zc_len = 4;
+ ::unlink("testfile");
+ ::system("echo ABC > testfile");
+ int fd = ::open("testfile", O_RDONLY);
+ bufferptr ptr(buffer::create_zero_copy(zc_len, fd, NULL));
+ EXPECT_EQ(zc_len, ptr.length());
+ if (ceph_buffer_track)
+ EXPECT_EQ(zc_len, (unsigned)buffer::get_total_alloc());
+ ::close(fd);
+ ::unlink("testfile");
+ }
+#endif
if (ceph_buffer_track)
EXPECT_EQ(0, buffer::get_total_alloc());
}
EXPECT_GT(stream.str().size(), stream.str().find("len 1 nref 1)"));
}
+#ifdef CEPH_HAVE_SPLICE
+class TestRawPipe : public ::testing::Test {
+protected:
+ virtual void SetUp() {
+ len = 4;
+ ::unlink("testfile");
+ ::system("echo ABC > testfile");
+ fd = ::open("testfile", O_RDONLY);
+ assert(fd >= 0);
+ }
+ virtual void TearDown() {
+ ::close(fd);
+ ::unlink("testfile");
+ }
+ int fd;
+ unsigned len;
+};
+
+TEST_F(TestRawPipe, create_zero_copy) {
+ bufferptr ptr(buffer::create_zero_copy(len, fd, NULL));
+ EXPECT_EQ(len, ptr.length());
+ if (get_env_bool("CEPH_BUFFER_TRACK"))
+ EXPECT_EQ(len, (unsigned)buffer::get_total_alloc());
+}
+
+TEST_F(TestRawPipe, c_str_no_fd) {
+ EXPECT_THROW(bufferptr ptr(buffer::create_zero_copy(len, -1, NULL)),
+ buffer::error_code);
+}
+
+TEST_F(TestRawPipe, c_str_basic) {
+ bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL));
+ EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len));
+ EXPECT_EQ(len, ptr.length());
+}
+
+TEST_F(TestRawPipe, c_str_twice) {
+ // make sure we're creating a copy of the data and not consuming it
+ bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL));
+ EXPECT_EQ(len, ptr.length());
+ EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len));
+ EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len));
+}
+
+TEST_F(TestRawPipe, c_str_basic_offset) {
+ loff_t offset = len - 1;
+ ::lseek(fd, offset, SEEK_SET);
+ bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd, NULL));
+ EXPECT_EQ(len - offset, ptr.length());
+ EXPECT_EQ(0, memcmp(ptr.c_str(), "\n", len - offset));
+}
+
+TEST_F(TestRawPipe, c_str_dest_short) {
+ ::lseek(fd, 1, SEEK_SET);
+ bufferptr ptr = bufferptr(buffer::create_zero_copy(2, fd, NULL));
+ EXPECT_EQ(2u, ptr.length());
+ EXPECT_EQ(0, memcmp(ptr.c_str(), "BC", 2));
+}
+
+TEST_F(TestRawPipe, c_str_source_short) {
+ ::lseek(fd, 1, SEEK_SET);
+ bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL));
+ EXPECT_EQ(len - 1, ptr.length());
+ EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1));
+}
+
+TEST_F(TestRawPipe, c_str_explicit_zero_offset) {
+ loff_t offset = 0;
+ ::lseek(fd, 1, SEEK_SET);
+ bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, &offset));
+ EXPECT_EQ(len, offset);
+ EXPECT_EQ(len, ptr.length());
+ EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len));
+}
+
+TEST_F(TestRawPipe, c_str_explicit_positive_offset) {
+ loff_t offset = 1;
+ bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd,
+ &offset));
+ EXPECT_EQ(len, offset);
+ EXPECT_EQ(len - 1, ptr.length());
+ EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1));
+}
+
+TEST_F(TestRawPipe, c_str_explicit_positive_empty_result) {
+ loff_t offset = len;
+ bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd,
+ &offset));
+ EXPECT_EQ(len, offset);
+ EXPECT_EQ(0u, ptr.length());
+}
+
+TEST_F(TestRawPipe, c_str_source_short_explicit_offset) {
+ loff_t offset = 1;
+ bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, &offset));
+ EXPECT_EQ(len, offset);
+ EXPECT_EQ(len - 1, ptr.length());
+ EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1));
+}
+
+TEST_F(TestRawPipe, c_str_dest_short_explicit_offset) {
+ loff_t offset = 1;
+ bufferptr ptr = bufferptr(buffer::create_zero_copy(2, fd, &offset));
+ EXPECT_EQ(3, offset);
+ EXPECT_EQ(2u, ptr.length());
+ EXPECT_EQ(0, memcmp(ptr.c_str(), "BC", 2));
+}
+#endif // CEPH_HAVE_SPLICE
+
//
// +-----------+ +-----+
// | | | |