return _preadv_pwritev(fd, iov, iovcnt, offset, false);
}
-int64_t Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl)
+void Client::C_Read_Finisher::finish_io(int r)
+{
+ utime_t lat;
+
+ // Caller holds client_lock so we don't need to take it.
+
+ if (r >= 0) {
+ if (is_read_async) {
+ } else {
+ // may need to retry on short read
+ }
+
+ clnt->update_read_io_size(size);
+ if (movepos) {
+ // adjust fd pos
+ f->pos = offset + r;
+ }
+
+ lat = ceph_clock_now();
+ lat -= start;
+ ++clnt->nr_read_request;
+ clnt->update_io_stat_read(lat);
+ }
+
+ iofinished = true;
+
+ if (have_caps) {
+ clnt->put_cap_ref(in, CEPH_CAP_FILE_RD);
+ }
+ if (movepos) {
+ clnt->unlock_fh_pos(f);
+ }
+
+ onfinish->complete(r);
+ delete this;
+}
+
+void Client::C_Read_Sync_NonBlocking::retry()
+{
+ filer->read_trunc(in->ino, &in->layout, in->snapid, pos, left, &tbl, 0,
+ in->truncate_size, in->truncate_seq, this);
+}
+
+/**
+ * The following method implements most of what _read_sync does, but in a
+ * way that works with the non-blocking read path.
+ */
+void Client::C_Read_Sync_NonBlocking::finish(int r)
+{
+ clnt->client_lock.lock();
+
+ if (r == -CEPHFS_ENOENT) {
+ // if we get ENOENT from OSD, assume 0 bytes returned
+ goto success;
+ } else if (r < 0) {
+ // pass error to caller
+ goto error;
+ }
+
+ if (tbl.length()) {
+ r = tbl.length();
+
+ read += r;
+ pos += r;
+ left -= r;
+ bl->claim_append(tbl);
+ }
+
+ // short read?
+ if (r >= 0 && r < wanted) {
+ if (pos < in->size) {
+ // zero up to known EOF
+ int64_t some = in->size - pos;
+ if (some > left)
+ some = left;
+ auto z = buffer::ptr_node::create(some);
+ z->zero();
+ bl->push_back(std::move(z));
+ read += some;
+ pos += some;
+ left -= some;
+ if (left == 0)
+ goto success;
+ }
+
+ clnt->put_cap_ref(in, CEPH_CAP_FILE_RD);
+ // reverify size
+ {
+ r = clnt->_getattr(in, CEPH_STAT_CAP_SIZE, f->actor_perms);
+ if (r < 0)
+ goto error;
+ }
+
+ // eof? short read.
+ if ((uint64_t)pos >= in->size)
+ goto success;
+
+ {
+ int have_caps2 = 0;
+ r = clnt->get_caps(f, CEPH_CAP_FILE_RD, have_caps, &have_caps2, -1);
+ if (r < 0) {
+ goto error;
+ }
+ }
+
+ wanted = left;
+ retry();
+ clnt->client_lock.unlock();
+ return;
+ }
+
+success:
+
+ r = read;
+
+error:
+
+ onfinish->complete(r);
+ fini = true;
+
+ clnt->client_lock.unlock();
+}
+
+int64_t Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl,
+ Context *onfinish)
{
ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
int want, have = 0;
bool movepos = false;
+ std::unique_ptr<Context> iofinish = nullptr;
+ std::unique_ptr<C_Read_Finisher> crf = nullptr;
int64_t rc = 0;
const auto& conf = cct->_conf;
Inode *in = f->inode.get();
utime_t lat;
utime_t start = ceph_clock_now();
+ CRF_iofinish *crf_iofinish = nullptr;
if ((f->mode & CEPH_FILE_MODE_RD) == 0)
return -CEPHFS_EBADF;
goto success;
}
+ if (onfinish) {
+ crf_iofinish = new CRF_iofinish();
+ iofinish.reset(crf_iofinish);
+
+ crf.reset(new
+ C_Read_Finisher(this, onfinish, iofinish.get(),
+ !conf->client_debug_force_sync_read &&
+ conf->client_oc &&
+ (have & (CEPH_CAP_FILE_CACHE |
+ CEPH_CAP_FILE_LAZYIO)),
+ have, movepos, start, f, in, f->pos, offset, size));
+
+ crf_iofinish->CRF = crf.get();
+ }
+
+ // There are three cases to be handled at this point:
+ //
+ // CAES 1 - blocking or non-blocking caller with the client holding Fc caps
+ // and client_debug_force_sync_read being default (`false)
+ //
+ // CASE 2 - non-blocking caller with sync read from the OSD (since client
+ // does not have the required caps or the above config is set)
+ //
+ // CASE 3 - blocking call by the caller
+
if (!conf->client_debug_force_sync_read &&
conf->client_oc &&
(have & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO))) {
+ // CAES 1 - blocking or non-blocking caller with the client holding Fc caps
+ // and client_debug_force_sync_read being default (`false)
if (f->flags & O_RSYNC) {
_flush_range(in, offset, size);
}
- rc = _read_async(f, offset, size, bl);
+ rc = _read_async(f, offset, size, bl, iofinish.get());
+
+ if (onfinish) {
+ // handle non-blocking caller (onfinish != nullptr), we can now safely
+ // release all the managed pointers, but we might need to do something
+ // with iofinisher.
+ Context *iof = iofinish.release();
+ crf.release();
+
+ if (rc < 0)
+ iof->complete(rc);
+
+ // allow caller to wait on onfinish...
+ return 0;
+ }
+
if (rc < 0)
goto done;
+ } else if (onfinish) {
+ // CASE 2 - non-blocking caller with sync read from the OSD (since client
+ // does not have the required caps or the above config is set)
+
+ // handle _sync_read without blocking...
+ // This sounds odd, but we want to accomplish what is done in the else
+ // branch below but in a non-blocking fashion. The code in _read_sync
+ // is duplicated and modified and exists in
+ // C_Read_Sync_NonBlocking::finish().
+ C_Read_Sync_NonBlocking *crsa =
+ new C_Read_Sync_NonBlocking(this, iofinish.release(), f, in, f->pos,
+ offset, size, bl, filer.get(), have);
+ crf.release();
+
+ // Now make first attempt at performing _read_sync
+ crsa->retry();
+
+ // Now the C_Read_Sync_NonBlocking is going to handle EVERYTHING else
+ // Allow caller to wait on onfinish...
+ return 0;
} else {
+ // CASE 3 - blocking call by the caller
+
if (f->flags & O_DIRECT)
_flush_range(in, offset, size);
}
}
-int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl)
+void Client::C_Read_Async_Finisher::finish(int r)
+{
+ clnt->client_lock.lock();
+
+ clnt->do_readahead(f, in, off, len);
+
+ onfinish->complete(r);
+
+ clnt->client_lock.unlock();
+}
+
+int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl,
+ Context *onfinish)
{
ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
// read (and possibly block)
int r = 0;
- C_SaferCond onfinish("Client::_read_async flock");
+ std::unique_ptr<Context> io_finish = nullptr;
+ C_SaferCond *io_finish_cond = nullptr;
+ if (onfinish == nullptr) {
+ io_finish_cond = new C_SaferCond("Client::_read_async flock");
+ io_finish.reset(io_finish_cond);
+ } else {
+ io_finish.reset(new C_Read_Async_Finisher(this, onfinish, f, in,
+ f->pos, off, len));
+ }
+
r = objectcacher->file_read(&in->oset, &in->layout, in->snapid,
- off, len, bl, 0, &onfinish);
+ off, len, bl, 0, io_finish.get());
+
+ if (onfinish != nullptr) {
+ // Release C_Read_Async_Finisher from managed pointer, either
+ // file_read will result in non-blocking complete, or we need to complete
+ // immediately. In either case, the C_Read_Async_Finisher is safely
+ // handled and won't be abandoned.
+ Context *crf = io_finish.release();
+ if (r != 0) {
+ // need to do readahead, so complete the crf
+ client_lock.unlock();
+ crf->complete(r);
+ client_lock.lock();
+ } else {
+ get_cap_ref(in, CEPH_CAP_FILE_CACHE);
+ }
+ return 0;
+ }
+
if (r == 0) {
get_cap_ref(in, CEPH_CAP_FILE_CACHE);
client_lock.unlock();
- r = onfinish.wait();
+ r = io_finish_cond->wait();
client_lock.lock();
put_cap_ref(in, CEPH_CAP_FILE_CACHE);
update_read_io_size(bl->length());
int64_t Client::_preadv_pwritev_locked(Fh *fh, const struct iovec *iov,
unsigned iovcnt, int64_t offset,
- bool write, bool clamp_to_int)
+ bool write, bool clamp_to_int,
+ Context *onfinish, bufferlist *blp)
{
ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
if (clamp_to_int) {
totallen = std::min(totallen, (loff_t)INT_MAX);
}
+
if (write) {
- int64_t w = _write(fh, offset, totallen, NULL, iov, iovcnt);
+ int64_t w = _write(fh, offset, totallen, NULL, iov, iovcnt, onfinish);
ldout(cct, 3) << "pwritev(" << fh << ", \"...\", " << totallen << ", " << offset << ") = " << w << dendl;
return w;
} else {
bufferlist bl;
- int64_t r = _read(fh, offset, totallen, &bl);
+ int64_t r = _read(fh, offset, totallen, blp ? blp : &bl,
+ onfinish);
ldout(cct, 3) << "preadv(" << fh << ", " << offset << ") = " << r << dendl;
- if (r <= 0)
+ if (r <= 0) {
+ if (r < 0 && onfinish != nullptr) {
+ client_lock.unlock();
+ onfinish->complete(r);
+ client_lock.lock();
+ }
return r;
+ }
client_lock.unlock();
copy_bufferlist_to_iovec(iov, iovcnt, &bl, r);
}
}
-int Client::_preadv_pwritev(int fd, const struct iovec *iov, unsigned iovcnt, int64_t offset, bool write)
+int Client::_preadv_pwritev(int fd, const struct iovec *iov, unsigned iovcnt,
+ int64_t offset, bool write, Context *onfinish,
+ bufferlist *blp)
{
RWRef_t mref_reader(mount_state, CLIENT_MOUNTING);
if (!mref_reader.is_state_satisfied())
Fh *fh = get_filehandle(fd);
if (!fh)
return -CEPHFS_EBADF;
- return _preadv_pwritev_locked(fh, iov, iovcnt, offset, write, true);
+ return _preadv_pwritev_locked(fh, iov, iovcnt, offset, write, true,
+ onfinish, blp);
}
int64_t Client::_write_success(Fh *f, utime_t start, uint64_t fpos,
return r;
}
+void Client::C_Write_Finisher::finish_io(int r)
+{
+ bool fini;
+
+ clnt->put_cap_ref(in, CEPH_CAP_FILE_BUFFER);
+
+ if (r >= 0) {
+ if (is_file_write) {
+ if ((f->flags & O_SYNC) || (f->flags & O_DSYNC)) {
+ clnt->_flush_range(in, offset, size);
+ }
+ }
+
+ r = clnt->_write_success(f, start, fpos, offset, size, in);
+ }
+
+ iofinished = true;
+ iofinished_r = r;
+ fini = try_complete();
+
+ if (fini)
+ delete this;
+}
+
+void Client::C_Write_Finisher::finish_onuninline(int r)
+{
+ // Called by _write with client_lock held.
+ onuninlinefinished = true;
+ onuninlinefinished_r = r;
+
+ if (try_complete())
+ delete this;
+}
+
+bool Client::C_Write_Finisher::try_complete()
+{
+ if (onuninlinefinished && iofinished) {
+ clnt->put_cap_ref(in, CEPH_CAP_FILE_WR);
+
+ if (onuninlinefinished_r >= 0 || onuninlinefinished_r == -CEPHFS_ECANCELED)
+ onfinish->complete(iofinished_r);
+ else
+ onfinish->complete(onuninlinefinished_r);
+ return true;
+ }
+ return false;
+}
+
int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
- const struct iovec *iov, int iovcnt)
+ const struct iovec *iov, int iovcnt, Context *onfinish)
{
ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
uint64_t fpos = 0;
Inode *in = f->inode.get();
+ std::unique_ptr<C_SaferCond> onuninline = nullptr;
+ CWF_iofinish *cwf_iofinish = NULL;
+ C_SaferCond *cond_iofinish = NULL;
if ( (uint64_t)(offset+size) > mdsmap->get_max_filesize() && //exceeds config
(uint64_t)(offset+size) > in->size ) { //exceeds filesize
ldout(cct, 10) << " snaprealm " << *in->snaprealm << dendl;
- std::unique_ptr<C_SaferCond> onuninline = nullptr;
+ std::unique_ptr<Context> iofinish = nullptr;
+ std::unique_ptr<C_Write_Finisher> cwf = nullptr;
if (in->inline_version < CEPH_INLINE_NONE) {
if (endoff > cct->_conf->client_max_inline_size ||
}
}
+ if (onfinish) {
+ cwf_iofinish = new CWF_iofinish();
+ iofinish.reset(cwf_iofinish);
+
+ cwf.reset(new
+ C_Write_Finisher(this, onfinish, nullptr == onuninline,
+ cct->_conf->client_oc &&
+ (have & (CEPH_CAP_FILE_BUFFER |
+ CEPH_CAP_FILE_LAZYIO)),
+ start, f, in, fpos, offset, size));
+
+ cwf_iofinish->CWF = cwf.get();
+ }
+
if (cct->_conf->client_oc &&
(have & (CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO))) {
// do buffered write
r = objectcacher->file_write(&in->oset, &in->layout,
in->snaprealm->get_snap_context(),
offset, size, bl, ceph::real_clock::now(),
- 0, nullptr,
- objectcacher->CFG_block_writes_upfront());
+ 0, iofinish.get(),
+ onfinish == nullptr
+ ? objectcacher->CFG_block_writes_upfront()
+ : false);
+
+ if (onfinish) {
+ // handle non-blocking caller (onfinish != nullptr), we can now safely
+ // release all the managed pointers, but we might need to do something
+ // with iofinisher.
+ Context *iof = iofinish.release();
+ C_Write_Finisher *cwfp = cwf.release();
+
+ if (r < 0) {
+ // should not get here, but...
+ iof->complete(r);
+ }
+
+ if (nullptr != onuninline) {
+ client_lock.unlock();
+ int uninline_ret = onuninline->wait();
+ client_lock.lock();
+
+ if (uninline_ret >= 0 || uninline_ret == -CEPHFS_ECANCELED) {
+ in->inline_data.clear();
+ in->inline_version = CEPH_INLINE_NONE;
+ in->mark_caps_dirty(CEPH_CAP_FILE_WR);
+ check_caps(in, 0);
+ }
+
+ cwfp->finish_onuninline(uninline_ret);
+ }
+
+ // allow caller to wait on onfinish...
+ return 0;
+ }
+
put_cap_ref(in, CEPH_CAP_FILE_BUFFER);
if (r < 0)
_flush_range(in, offset, size);
// simple, non-atomic sync write
- C_SaferCond onfinish("Client::_write flock");
+ if (onfinish == nullptr) {
+ // We need a safer condition to wait on.
+ cond_iofinish = new C_SaferCond();
+ iofinish.reset(cond_iofinish);
+ }
+
get_cap_ref(in, CEPH_CAP_FILE_BUFFER);
filer->write_trunc(in->ino, &in->layout, in->snaprealm->get_snap_context(),
offset, size, bl, ceph::real_clock::now(), 0,
in->truncate_size, in->truncate_seq,
- &onfinish);
+ iofinish.get());
+
+ if (onfinish) {
+ // handle non-blocking caller (onfinish != nullptr), we can now safely
+ // release all the managed pointers
+ iofinish.release();
+ onuninline.release();
+ cwf.release();
+
+ // allow caller to wait on onfinish...
+ return 0;
+ }
+
client_lock.unlock();
- r = onfinish.wait();
+ r = cond_iofinish->wait();
client_lock.lock();
put_cap_ref(in, CEPH_CAP_FILE_BUFFER);
if (r < 0)
// if we get here, write was successful, update client metadata
success:
+ // do not get here if non-blocking caller (onfinish != nullptr)
r = _write_success(f, start, fpos, offset, size, in);
done:
+ // can not get here if non-blocking caller (onfinish != nullptr)
+
if (nullptr != onuninline) {
client_lock.unlock();
int uninline_ret = onuninline->wait();
struct initialize_state_t initialize_state;
private:
+ class C_Read_Finisher : public Context {
+ public:
+ bool iofinished;
+ void finish_io(int r);
+
+ C_Read_Finisher(Client *clnt, Context *onfinish, Context *iofinish,
+ bool is_read_async, int have_caps, bool movepos,
+ utime_t start, Fh *f, Inode *in, uint64_t fpos,
+ int64_t offset, uint64_t size)
+ : clnt(clnt), onfinish(onfinish), iofinish(iofinish),
+ is_read_async(is_read_async), have_caps(have_caps), f(f), in(in),
+ start(start), fpos(fpos), offset(offset), size(size), movepos(movepos) {
+ iofinished = false;
+ }
+
+ void finish(int r) override {
+ // We need to override finish, but have nothing to do.
+ }
+
+ private:
+ Client *clnt;
+ Context *onfinish;
+ Context *iofinish;
+ bool is_read_async;
+ int have_caps;
+ Fh *f;
+ Inode *in;
+ utime_t start;
+ uint64_t fpos;
+ int64_t offset;
+ uint64_t size;
+ bool movepos;
+ };
+
+ struct CRF_iofinish : public Context {
+ Client::C_Read_Finisher *CRF;
+
+ CRF_iofinish()
+ : CRF(nullptr) {}
+
+ void finish(int r) override {
+ CRF->finish_io(r);
+ }
+
+ // For _read_async, we may not finish in one go, so be prepared for multiple
+ // calls to complete. All the handling though is in C_Read_Finisher.
+ void complete(int r) override {
+ finish(r);
+ if (CRF->iofinished)
+ delete this;
+ }
+ };
+
+ class C_Read_Sync_NonBlocking : public Context {
+ // When operating in non-blocking mode, what used to be done by _read_sync
+ // still needs to be handled, but it needs to be handled without blocking
+ // while still following the semantics. Note that _read_sync is actually
+ // asynchronous. it just uses condition variables to wait. Now instead, we use
+ // this Context class to synchronize the steps.
+ //
+ // The steps will be accomplished by complete/finish being called to complete
+ // each step, with complete only releasing this object once all is finally
+ // complete.
+ public:
+ C_Read_Sync_NonBlocking(Client *clnt, Context *onfinish, Fh *f, Inode *in,
+ uint64_t fpos, uint64_t off, uint64_t len,
+ bufferlist *bl, Filer *filer, int have_caps)
+ : clnt(clnt), onfinish(onfinish), f(f), in(in), off(off), len(len), bl(bl),
+ filer(filer), have_caps(have_caps)
+ {
+ left = len;
+ wanted = len;
+ read = 0;
+ pos = off;
+ fini = false;
+ }
+
+ void retry();
+
+ private:
+ Client *clnt;
+ Context *onfinish;
+ Fh *f;
+ Inode *in;
+ uint64_t off;
+ uint64_t len;
+ int left;
+ int wanted;
+ bufferlist *bl;
+ bufferlist tbl;
+ Filer *filer;
+ int have_caps;
+ int read;
+ uint64_t pos;
+ bool fini;
+
+ void finish(int r) override;
+
+ void complete(int r) override
+ {
+ finish(r);
+ if (fini)
+ delete this;
+ }
+ };
+
+ class C_Read_Async_Finisher : public Context {
+ public:
+ C_Read_Async_Finisher(Client *clnt, Context *onfinish, Fh *f, Inode *in,
+ uint64_t fpos, uint64_t off, uint64_t len)
+ : clnt(clnt), onfinish(onfinish), f(f), in(in), off(off), len(len) {}
+
+ private:
+ Client *clnt;
+ Context *onfinish;
+ Fh *f;
+ Inode *in;
+ uint64_t off;
+ uint64_t len;
+
+ void finish(int r) override;
+ };
+
+ class C_Write_Finisher : public Context {
+ public:
+ void finish_io(int r);
+ void finish_onuninline(int r);
+
+ C_Write_Finisher(Client *clnt, Context *onfinish, bool dont_need_uninline,
+ bool is_file_write, utime_t start, Fh *f, Inode *in,
+ uint64_t fpos, int64_t offset, uint64_t size)
+ : clnt(clnt), onfinish(onfinish),
+ is_file_write(is_file_write), start(start), f(f), in(in), fpos(fpos),
+ offset(offset), size(size) {
+ iofinished_r = 0;
+ onuninlinefinished_r = 0;
+ iofinished = false;
+ onuninlinefinished = dont_need_uninline;
+ }
+
+ void finish(int r) override {
+ // We need to override finish, but have nothing to do.
+ }
+
+ private:
+ Client *clnt;
+ Context *onfinish;
+ bool is_file_write;
+ utime_t start;
+ Fh *f;
+ Inode *in;
+ uint64_t fpos;
+ int64_t offset;
+ uint64_t size;
+ int64_t iofinished_r;
+ int64_t onuninlinefinished_r;
+ bool iofinished;
+ bool onuninlinefinished;
+ bool try_complete();
+ };
+
+ struct CWF_iofinish : public Context {
+ C_Write_Finisher *CWF;
+
+ CWF_iofinish()
+ : CWF(nullptr) {}
+
+ void finish(int r) override {
+ CWF->finish_io(r);
+ }
+ };
+
struct C_Readahead : public Context {
C_Readahead(Client *c, Fh *f);
~C_Readahead() override;
std::pair<int, bool> _do_remount(bool retry_on_error);
int _read_sync(Fh *f, uint64_t off, uint64_t len, bufferlist *bl, bool *checkeof);
- int _read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl);
+ int _read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl,
+ Context *onfinish);
bool _dentry_valid(const Dentry *dn);
std::string alternate_name);
loff_t _lseek(Fh *fh, loff_t offset, int whence);
- int64_t _read(Fh *fh, int64_t offset, uint64_t size, bufferlist *bl);
+ int64_t _read(Fh *fh, int64_t offset, uint64_t size, bufferlist *bl,
+ Context *onfinish = nullptr);
void do_readahead(Fh *f, Inode *in, uint64_t off, uint64_t len);
int64_t _write_success(Fh *fh, utime_t start, uint64_t fpos,
int64_t offset, uint64_t size, Inode *in);
int64_t _write(Fh *fh, int64_t offset, uint64_t size, const char *buf,
- const struct iovec *iov, int iovcnt);
+ const struct iovec *iov, int iovcnt, Context *onfinish = nullptr);
int64_t _preadv_pwritev_locked(Fh *fh, const struct iovec *iov,
unsigned iovcnt, int64_t offset,
- bool write, bool clamp_to_int);
+ bool write, bool clamp_to_int,
+ Context *onfinish = nullptr,
+ bufferlist *blp = nullptr);
int _preadv_pwritev(int fd, const struct iovec *iov, unsigned iovcnt,
- int64_t offset, bool write);
+ int64_t offset, bool write, Context *onfinish = nullptr,
+ bufferlist *blp = nullptr);
int _flush(Fh *fh);
int _fsync(Fh *fh, bool syncdataonly);
int _fsync(Inode *in, bool syncdataonly);