memcpy(c->data, data, len);
return c;
}
+ virtual bool can_zero_copy() const {
+ return false;
+ }
+ virtual int zero_copy_to_fd(int fd, loff_t *offset) {
+ return -ENOTSUP;
+ }
virtual bool is_page_aligned() {
return ((long)data & ~CEPH_PAGE_MASK) == 0;
}
bdout << "raw_pipe " << this << " free " << (void *)data << " "
<< buffer::get_total_alloc() << bendl;
}
+ bool can_zero_copy() const {
+ return true;
+ }
+ bool is_page_aligned() {
return false;
}
int set_source(int fd, loff_t *off) {
len = r;
return 0;
}
+ int zero_copy_to_fd(int fd, loff_t *offset) {
+ assert(!source_consumed);
+ int flags = SPLICE_F_NONBLOCK;
+ ssize_t r = safe_splice_exact(pipefds[0], NULL, fd, offset, len, flags);
+ if (r < 0) {
+ bdout << "raw_pipe: error splicing from pipe to fd: "
+ << cpp_strerror(r) << bendl;
+ return r;
+ }
+ source_consumed = true;
+ return 0;
+ }
+ buffer::raw* clone_empty() {
// cloning doesn't make sense for pipe-based buffers,
// and is only used by unit tests for other types of buffers
return NULL;
#endif
}
+ buffer::raw* buffer::create_zero_copy(unsigned len, int fd, loff_t *offset) {
+#ifdef CEPH_HAVE_SPLICE
+ buffer::raw_pipe* buf = new raw_pipe(len);
+ int r = buf->set_source(fd, offset);
+ if (r < 0) {
+ delete buf;
+ throw error_code(r);
+ }
+ return buf;
+#else
+ throw error_code(-ENOTSUP);
+#endif
+ }
+
buffer::ptr::ptr(raw *r) : _raw(r), _off(0), _len(r->len) // no lock needed; this is an unref raw.
{
r->nref.inc();
memset(c_str()+o, 0, l);
}
+ bool buffer::ptr::can_zero_copy() const
+ {
+ return _raw->can_zero_copy();
+ }
+
+ int buffer::ptr::zero_copy_to_fd(int fd, loff_t *offset) const
+ {
+ return _raw->zero_copy_to_fd(fd, offset);
+ }
// -- buffer::list::iterator --
/*
}
}
+ bool buffer::list::can_zero_copy() const
+ {
+ for (std::list<ptr>::const_iterator it = _buffers.begin();
+ it != _buffers.end();
+ ++it)
+ if (!it->can_zero_copy())
+ return false;
+ return true;
+ }
+
bool buffer::list::is_page_aligned() const
{
for (std::list<ptr>::const_iterator it = _buffers.begin();
return 0;
}
-ssize_t buffer::list::read_fd(int fd, size_t len)
+ssize_t buffer::list::read_fd(int fd, size_t len)
{
int s = ROUND_UP_TO(len, CEPH_PAGE_SIZE);
bufferptr bp = buffer::create_page_aligned(s);
return ret;
}
+int buffer::list::read_fd_zero_copy(int fd, size_t len)
+{
+#ifdef CEPH_HAVE_SPLICE
+ try {
+ bufferptr bp = buffer::create_zero_copy(len, fd, NULL);
+ append(bp);
+ } catch (buffer::error_code e) {
+ return e.code;
+ }
+ return 0;
+#else
+ return -ENOTSUP;
+#endif
+}
+
int buffer::list::write_file(const char *fn, int mode)
{
int fd = TEMP_FAILURE_RETRY(::open(fn, O_WRONLY|O_CREAT|O_TRUNC, mode));
int buffer::list::write_fd(int fd) const
{
+ if (can_zero_copy())
+ return write_fd_zero_copy(fd);
+
// use writev!
iovec iov[IOV_MAX];
int iovlen = 0;
ssize_t bytes = 0;
- std::list<ptr>::const_iterator p = _buffers.begin();
+ std::list<ptr>::const_iterator p = _buffers.begin();
while (p != _buffers.end()) {
if (p->length() > 0) {
iov[iovlen].iov_base = (void *)p->c_str();
return 0;
}
+int buffer::list::write_fd_zero_copy(int fd) const
+{
+ if (!can_zero_copy())
+ return -ENOTSUP;
+ /* pass offset to each call to avoid races updating the fd seek
+ * position, since the I/O may be non-blocking
+ */
+ loff_t offset = ::lseek(fd, 0, SEEK_CUR);
+ loff_t *off_p = &offset;
+ if (offset < 0 && offset != ESPIPE)
+ return (int) offset;
+ if (offset == ESPIPE)
+ off_p = NULL;
+ for (std::list<ptr>::const_iterator it = _buffers.begin();
+ it != _buffers.end(); ++it) {
+ int r = it->zero_copy_to_fd(fd, off_p);
+ if (r < 0)
+ return r;
+ if (off_p)
+ offset += it->length();
+ }
+ return 0;
+}
+
__u32 buffer::list::crc32c(__u32 crc) const
{
for (std::list<ptr>::const_iterator it = _buffers.begin();
memcpy(dest, c_str()+o, l);
}
+ bool can_zero_copy() const;
+ int zero_copy_to_fd(int fd, loff_t *offset) const;
+
unsigned wasted();
int cmp(const ptr& o);
private:
mutable iterator last_p;
+ int zero_copy_to_fd(int fd) const;
public:
// cons/des
}
bool contents_equal(buffer::list& other);
+ bool can_zero_copy() const;
bool is_page_aligned() const;
bool is_n_page_sized() const;
void hexdump(std::ostream &out) const;
int read_file(const char *fn, std::string *error);
ssize_t read_fd(int fd, size_t len);
+ int read_fd_zero_copy(int fd, size_t len);
int write_file(const char *fn, int mode=0644);
int write_fd(int fd) const;
+ int write_fd_zero_copy(int fd) const;
uint32_t crc32c(uint32_t crc) const;
};
EXPECT_EQ(2u, ptr.length());
EXPECT_EQ(0, memcmp(ptr.c_str(), "BC", 2));
}
+
+TEST_F(TestRawPipe, buffer_list_read_fd_zero_copy) {
+ bufferlist bl;
+ EXPECT_EQ(-EBADF, bl.read_fd_zero_copy(-1, len));
+ bl = bufferlist();
+ EXPECT_EQ(0, bl.read_fd_zero_copy(fd, len));
+ EXPECT_EQ(len, bl.length());
+ EXPECT_EQ(0u, bl.buffers().front().unused_tail_length());
+ EXPECT_EQ(1u, bl.buffers().size());
+ EXPECT_EQ(len, bl.buffers().front().raw_length());
+ EXPECT_EQ(0, memcmp(bl.c_str(), "ABC\n", len));
+ EXPECT_TRUE(bl.can_zero_copy());
+}
+
+TEST_F(TestRawPipe, buffer_list_write_fd_zero_copy) {
+ ::unlink("testfile_out");
+ bufferlist bl;
+ EXPECT_EQ(0, bl.read_fd_zero_copy(fd, len));
+ EXPECT_TRUE(bl.can_zero_copy());
+ int out_fd = ::open("testfile_out", O_RDWR|O_CREAT|O_TRUNC, 0600);
+ EXPECT_EQ(0, bl.write_fd_zero_copy(out_fd));
+ struct stat st;
+ memset(&st, 0, sizeof(st));
+ EXPECT_EQ(0, ::stat("testfile_out", &st));
+ EXPECT_EQ(len, st.st_size);
+ char buf[len + 1];
+ EXPECT_EQ(len, safe_read(out_fd, buf, len + 1));
+ EXPECT_EQ(0, memcmp(buf, "ABC\n", len));
+ ::close(out_fd);
+ ::unlink("testfile_out");
+}
#endif // CEPH_HAVE_SPLICE
//