message.push_back(metric);
}
+ // copy sizes
+ if (_collect_and_send_global_metrics ||
+ session->mds_metric_flags.test(CLIENT_METRIC_TYPE_COPY_IO_SIZES)) {
+ metric = ClientMetricMessage(CopyIoSizesPayload(total_io_copy_ops,
+ total_io_copy_size));
+ message.push_back(metric);
+ }
+
session->con->send_message2(make_message<MClientMetrics>(std::move(message)));
}
#endif
/* We can't return bytes written larger than INT_MAX, clamp size to that */
size = std::min(size, (loff_t)INT_MAX);
- int r = _write(fh, offset, size, buf, NULL, false);
+ int r = _write(fh, offset, size, buf);
ldout(cct, 3) << "write(" << fd << ", \"...\", " << size << ", " << offset << ") = " << r << dendl;
return r;
}
return _preadv_pwritev(fd, iov, iovcnt, offset, true);
}
+// NOTE: when called with zerocopy = true, the iovec SHOULD just consist of a
+// single entry with iov_len equal to the desired read length and iov_base
+// being a nullptr. The bufferlist pointer is used to return the data and the
+// caller is responsible for converting that into an iovec.
+//
+// Also, we don't try to support zerocopy with client_oc = true
int64_t Client::_preadv_pwritev_locked(Fh *fh, const struct iovec *iov,
int iovcnt, int64_t offset,
bool write, bool clamp_to_int,
Context *onfinish, bufferlist *blp,
- bool do_fsync, bool syncdataonly)
+ bool do_fsync, bool syncdataonly,
+ bool zerocopy)
{
ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
+ if (cct->_conf->client_oc && zerocopy) {
+ // we don't support zerocopy with Objectcacher enabled
+ return -EINVAL;
+ }
#if defined(__linux__) && defined(O_PATH)
if (fh->flags & O_PATH)
return -EBADF;
#endif
- if(iovcnt < 0) {
+ // negative iovcount is invalid,
+ // for ZC read - we must use provided buffer, it will contain the data after the read
+ // for ZC write - we must have iovec AND we will enforce that CEPH buffer is not passed
+ if (iovcnt < 0 || (zerocopy && ((!blp && !write) || (!iov && write)))) {
return -EINVAL;
}
loff_t totallen = 0;
for (int i = 0; i < iovcnt; i++) {
+ // NOTE: zero copy read has passed in an iov that is just used to
+ // communicate the length.
totallen += iov[i].iov_len;
}
}
if (write) {
- int64_t w = _write(fh, offset, totallen, NULL, iov, iovcnt, onfinish, do_fsync, syncdataonly);
+ int64_t w = _write(fh, offset, totallen, NULL, iov, iovcnt, onfinish,
+ do_fsync, syncdataonly, zerocopy);
ldout(cct, 3) << "pwritev(" << fh << ", \"...\", " << totallen << ", " << offset << ") = " << w << dendl;
return w;
} else {
bufferlist bl;
+ // if zerocopy, blp can't be null due to the check above
+ // the read will be into Ceph buffers which are returned to the caller.
int64_t r = _read(fh, offset, totallen, blp ? blp : &bl,
onfinish);
ldout(cct, 3) << "preadv(" << fh << ", " << offset << ") = " << r << dendl;
return r;
}
- client_lock.unlock();
- copy_bufferlist_to_iovec(iov, iovcnt, blp ? blp : &bl, r);
- client_lock.lock();
+ if (!zerocopy) {
+ client_lock.unlock();
+ uint64_t size = copy_bufferlist_to_iovec(iov, iovcnt, blp ? blp : &bl, r);
+ // all IOVs will be copied, the function above returns the total bytes being copied
+ update_total_copy_size(size, iovcnt);
+ client_lock.lock();
+ }
return r;
}
}
int Client::_preadv_pwritev(int fd, const struct iovec *iov, int iovcnt,
int64_t offset, bool write, Context *onfinish,
- bufferlist *blp)
+ bufferlist *blp, bool zerocopy)
{
RWRef_t mref_reader(mount_state, CLIENT_MOUNTING);
if (!mref_reader.is_state_satisfied())
if (!fh)
return -EBADF;
return _preadv_pwritev_locked(fh, iov, iovcnt, offset, write, true,
- onfinish, blp);
+ onfinish, blp, zerocopy);
}
int64_t Client::_write_success(Fh *f, utime_t start, uint64_t fpos,
int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
const struct iovec *iov, int iovcnt, Context *onfinish,
- bool do_fsync, bool syncdataonly)
+ bool do_fsync, bool syncdataonly, bool zerocopy)
{
ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
if (size < 1) { // zero bytes write is not supported by osd
return -EINVAL;
}
+ if (buf != nullptr && zerocopy) {
+ return -EINVAL;
+ }
if ( (uint64_t)(offset+size) > mdsmap->get_max_filesize() && //exceeds config
(uint64_t)(offset+size) > in->size ) { //exceeds filesize
ceph_assert(in->inline_version > 0);
}
- // copy into fresh buffer (since our write may be resub, async)
+ // make a bufferlist, if not zerocopy make a copy
bufferlist bl;
+ uint64_t copy_size = 0;
+ uint64_t copy_ops = 0;
if (buf) {
- if (size > 0)
+ if (size > 0) {
bl.append(buf, size);
+ copy_size += size;
+ copy_ops = 1;
+ }
} else if (iov){
for (int i = 0; i < iovcnt; i++) {
if (iov[i].iov_len > 0) {
- bl.append((const char *)iov[i].iov_base, iov[i].iov_len);
+ if (zerocopy) {
+ bl.push_back(buffer::create_static(iov[i].iov_len,
+ static_cast<char*>(iov[i].iov_base)));
+ } else {
+ bl.append(static_cast<const char*>(iov[i].iov_base), iov[i].iov_len);
+ copy_size += iov[i].iov_len;
+ ++copy_ops;
+ }
}
}
}
+ update_total_copy_size(copy_size, copy_ops);
int want, have;
if (f->mode & CEPH_FILE_MODE_LAZY)
tout(cct) << off << std::endl;
tout(cct) << len << std::endl;
- int r = _write(fh, off, len, data, NULL, 0);
+ int r = _write(fh, off, len, data);
ldout(cct, 3) << "ll_write " << fh << " " << off << "~" << len << " = " << r
<< dendl;
return r;
int64_t Client::ll_preadv_pwritev(struct Fh *fh, const struct iovec *iov,
int iovcnt, int64_t offset, bool write,
Context *onfinish, bufferlist *bl,
- bool do_fsync, bool syncdataonly)
+ bool do_fsync, bool syncdataonly, bool zerocopy)
{
int64_t retval = -1;
}
retval = _preadv_pwritev_locked(fh, iov, iovcnt, offset, write, true,
- onfinish, bl, do_fsync, syncdataonly);
+ onfinish, bl, do_fsync, syncdataonly, zerocopy);
/* There are two scenarios with each having two cases to handle here
1) async io
1.a) r == 0:
int64_t offset, bool write,
Context *onfinish = nullptr,
bufferlist *blp = nullptr,
- bool do_fsync = false, bool syncdataonly = false);
+ bool do_fsync = false, bool syncdataonly = false,
+ bool zerocopy = false);
loff_t ll_lseek(Fh *fh, loff_t offset, int whence);
int ll_flush(Fh *fh);
int ll_fsync(Fh *fh, bool syncdataonly);
return fuse_default_permissions;
}
+ void update_total_copy_size(const uint64_t &size, const uint64_t &ops = 1) {
+ total_io_copy_size += size;
+ total_io_copy_ops += ops;
+ }
+
/* timer_lock for 'timer' */
ceph::mutex timer_lock = ceph::make_mutex("Client::timer_lock");
SafeTimer timer;
struct mount_state_t mount_state;
struct initialize_state_t initialize_state;
+ // zero copy related metrics
+ uint64_t total_io_copy_ops = 0;
+ uint64_t total_io_copy_size = 0;
+
private:
class C_Read_Finisher : public Context {
public:
int64_t _write_success(Fh *fh, utime_t start, uint64_t fpos,
int64_t offset, uint64_t size, Inode *in);
int64_t _write(Fh *fh, int64_t offset, uint64_t size, const char *buf,
- const struct iovec *iov, int iovcnt, Context *onfinish = nullptr,
- bool do_fsync = false, bool syncdataonly = false);
+ const struct iovec *iov = nullptr, int iovcnt = 0,
+ Context *onfinish = nullptr,
+ bool do_fsync = false, bool syncdataonly = false,
+ bool zerocopy = false);
int64_t _preadv_pwritev_locked(Fh *fh, const struct iovec *iov,
int iovcnt, int64_t offset,
bool write, bool clamp_to_int,
Context *onfinish = nullptr,
bufferlist *blp = nullptr,
- bool do_fsync = false, bool syncdataonly = false);
+ bool do_fsync = false, bool syncdataonly = false,
+ bool zerocopy = false);
int _preadv_pwritev(int fd, const struct iovec *iov, int iovcnt,
int64_t offset, bool write, Context *onfinish = nullptr,
- bufferlist *blp = nullptr);
+ bufferlist *blp = nullptr, bool zerocopy = false);
int _flush(Fh *fh);
void nonblocking_fsync(Inode *in, bool syncdataonly, Context *onfinish);
int _fsync(Fh *fh, bool syncdataonly);