From 1210ddf7a1333d0a88214148e7d838cd2b6b5883 Mon Sep 17 00:00:00 2001 From: "Frank S. Filz" Date: Wed, 11 May 2022 14:22:55 -0700 Subject: [PATCH] Client: Add non-blocking helper classes Signed-off-by: Frank S. Filz --- src/client/Client.cc | 400 +++++++++++++++++++++++++++++++++++++++++-- src/client/Client.h | 187 +++++++++++++++++++- 2 files changed, 563 insertions(+), 24 deletions(-) diff --git a/src/client/Client.cc b/src/client/Client.cc index e3b0a94a2ab85..20e62f389951f 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -10499,17 +10499,144 @@ int Client::preadv(int fd, const struct iovec *iov, int iovcnt, loff_t offset) return _preadv_pwritev(fd, iov, iovcnt, offset, false); } -int64_t Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl) +void Client::C_Read_Finisher::finish_io(int r) +{ + utime_t lat; + + // Caller holds client_lock so we don't need to take it. + + if (r >= 0) { + if (is_read_async) { + } else { + // may need to retry on short read + } + + clnt->update_read_io_size(size); + if (movepos) { + // adjust fd pos + f->pos = offset + r; + } + + lat = ceph_clock_now(); + lat -= start; + ++clnt->nr_read_request; + clnt->update_io_stat_read(lat); + } + + iofinished = true; + + if (have_caps) { + clnt->put_cap_ref(in, CEPH_CAP_FILE_RD); + } + if (movepos) { + clnt->unlock_fh_pos(f); + } + + onfinish->complete(r); + delete this; +} + +void Client::C_Read_Sync_NonBlocking::retry() +{ + filer->read_trunc(in->ino, &in->layout, in->snapid, pos, left, &tbl, 0, + in->truncate_size, in->truncate_seq, this); +} + +/** + * The following method implements most of what _read_sync does, but in a + * way that works with the non-blocking read path. + */ +void Client::C_Read_Sync_NonBlocking::finish(int r) +{ + clnt->client_lock.lock(); + + if (r == -CEPHFS_ENOENT) { + // if we get ENOENT from OSD, assume 0 bytes returned + goto success; + } else if (r < 0) { + // pass error to caller + goto error; + } + + if (tbl.length()) { + r = tbl.length(); + + read += r; + pos += r; + left -= r; + bl->claim_append(tbl); + } + + // short read? + if (r >= 0 && r < wanted) { + if (pos < in->size) { + // zero up to known EOF + int64_t some = in->size - pos; + if (some > left) + some = left; + auto z = buffer::ptr_node::create(some); + z->zero(); + bl->push_back(std::move(z)); + read += some; + pos += some; + left -= some; + if (left == 0) + goto success; + } + + clnt->put_cap_ref(in, CEPH_CAP_FILE_RD); + // reverify size + { + r = clnt->_getattr(in, CEPH_STAT_CAP_SIZE, f->actor_perms); + if (r < 0) + goto error; + } + + // eof? short read. + if ((uint64_t)pos >= in->size) + goto success; + + { + int have_caps2 = 0; + r = clnt->get_caps(f, CEPH_CAP_FILE_RD, have_caps, &have_caps2, -1); + if (r < 0) { + goto error; + } + } + + wanted = left; + retry(); + clnt->client_lock.unlock(); + return; + } + +success: + + r = read; + +error: + + onfinish->complete(r); + fini = true; + + clnt->client_lock.unlock(); +} + +int64_t Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl, + Context *onfinish) { ceph_assert(ceph_mutex_is_locked_by_me(client_lock)); int want, have = 0; bool movepos = false; + std::unique_ptr iofinish = nullptr; + std::unique_ptr crf = nullptr; int64_t rc = 0; const auto& conf = cct->_conf; Inode *in = f->inode.get(); utime_t lat; utime_t start = ceph_clock_now(); + CRF_iofinish *crf_iofinish = nullptr; if ((f->mode & CEPH_FILE_MODE_RD) == 0) return -CEPHFS_EBADF; @@ -10569,17 +10696,81 @@ retry: goto success; } + if (onfinish) { + crf_iofinish = new CRF_iofinish(); + iofinish.reset(crf_iofinish); + + crf.reset(new + C_Read_Finisher(this, onfinish, iofinish.get(), + !conf->client_debug_force_sync_read && + conf->client_oc && + (have & (CEPH_CAP_FILE_CACHE | + CEPH_CAP_FILE_LAZYIO)), + have, movepos, start, f, in, f->pos, offset, size)); + + crf_iofinish->CRF = crf.get(); + } + + // There are three cases to be handled at this point: + // + // CAES 1 - blocking or non-blocking caller with the client holding Fc caps + // and client_debug_force_sync_read being default (`false) + // + // CASE 2 - non-blocking caller with sync read from the OSD (since client + // does not have the required caps or the above config is set) + // + // CASE 3 - blocking call by the caller + if (!conf->client_debug_force_sync_read && conf->client_oc && (have & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO))) { + // CAES 1 - blocking or non-blocking caller with the client holding Fc caps + // and client_debug_force_sync_read being default (`false) if (f->flags & O_RSYNC) { _flush_range(in, offset, size); } - rc = _read_async(f, offset, size, bl); + rc = _read_async(f, offset, size, bl, iofinish.get()); + + if (onfinish) { + // handle non-blocking caller (onfinish != nullptr), we can now safely + // release all the managed pointers, but we might need to do something + // with iofinisher. + Context *iof = iofinish.release(); + crf.release(); + + if (rc < 0) + iof->complete(rc); + + // allow caller to wait on onfinish... + return 0; + } + if (rc < 0) goto done; + } else if (onfinish) { + // CASE 2 - non-blocking caller with sync read from the OSD (since client + // does not have the required caps or the above config is set) + + // handle _sync_read without blocking... + // This sounds odd, but we want to accomplish what is done in the else + // branch below but in a non-blocking fashion. The code in _read_sync + // is duplicated and modified and exists in + // C_Read_Sync_NonBlocking::finish(). + C_Read_Sync_NonBlocking *crsa = + new C_Read_Sync_NonBlocking(this, iofinish.release(), f, in, f->pos, + offset, size, bl, filer.get(), have); + crf.release(); + + // Now make first attempt at performing _read_sync + crsa->retry(); + + // Now the C_Read_Sync_NonBlocking is going to handle EVERYTHING else + // Allow caller to wait on onfinish... + return 0; } else { + // CASE 3 - blocking call by the caller + if (f->flags & O_DIRECT) _flush_range(in, offset, size); @@ -10674,7 +10865,19 @@ void Client::do_readahead(Fh *f, Inode *in, uint64_t off, uint64_t len) } } -int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl) +void Client::C_Read_Async_Finisher::finish(int r) +{ + clnt->client_lock.lock(); + + clnt->do_readahead(f, in, off, len); + + onfinish->complete(r); + + clnt->client_lock.unlock(); +} + +int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl, + Context *onfinish) { ceph_assert(ceph_mutex_is_locked_by_me(client_lock)); @@ -10698,13 +10901,40 @@ int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl) // read (and possibly block) int r = 0; - C_SaferCond onfinish("Client::_read_async flock"); + std::unique_ptr io_finish = nullptr; + C_SaferCond *io_finish_cond = nullptr; + if (onfinish == nullptr) { + io_finish_cond = new C_SaferCond("Client::_read_async flock"); + io_finish.reset(io_finish_cond); + } else { + io_finish.reset(new C_Read_Async_Finisher(this, onfinish, f, in, + f->pos, off, len)); + } + r = objectcacher->file_read(&in->oset, &in->layout, in->snapid, - off, len, bl, 0, &onfinish); + off, len, bl, 0, io_finish.get()); + + if (onfinish != nullptr) { + // Release C_Read_Async_Finisher from managed pointer, either + // file_read will result in non-blocking complete, or we need to complete + // immediately. In either case, the C_Read_Async_Finisher is safely + // handled and won't be abandoned. + Context *crf = io_finish.release(); + if (r != 0) { + // need to do readahead, so complete the crf + client_lock.unlock(); + crf->complete(r); + client_lock.lock(); + } else { + get_cap_ref(in, CEPH_CAP_FILE_CACHE); + } + return 0; + } + if (r == 0) { get_cap_ref(in, CEPH_CAP_FILE_CACHE); client_lock.unlock(); - r = onfinish.wait(); + r = io_finish_cond->wait(); client_lock.lock(); put_cap_ref(in, CEPH_CAP_FILE_CACHE); update_read_io_size(bl->length()); @@ -10823,7 +11053,8 @@ int Client::pwritev(int fd, const struct iovec *iov, int iovcnt, int64_t offset) int64_t Client::_preadv_pwritev_locked(Fh *fh, const struct iovec *iov, unsigned iovcnt, int64_t offset, - bool write, bool clamp_to_int) + bool write, bool clamp_to_int, + Context *onfinish, bufferlist *blp) { ceph_assert(ceph_mutex_is_locked_by_me(client_lock)); @@ -10844,16 +11075,24 @@ int64_t Client::_preadv_pwritev_locked(Fh *fh, const struct iovec *iov, if (clamp_to_int) { totallen = std::min(totallen, (loff_t)INT_MAX); } + if (write) { - int64_t w = _write(fh, offset, totallen, NULL, iov, iovcnt); + int64_t w = _write(fh, offset, totallen, NULL, iov, iovcnt, onfinish); ldout(cct, 3) << "pwritev(" << fh << ", \"...\", " << totallen << ", " << offset << ") = " << w << dendl; return w; } else { bufferlist bl; - int64_t r = _read(fh, offset, totallen, &bl); + int64_t r = _read(fh, offset, totallen, blp ? blp : &bl, + onfinish); ldout(cct, 3) << "preadv(" << fh << ", " << offset << ") = " << r << dendl; - if (r <= 0) + if (r <= 0) { + if (r < 0 && onfinish != nullptr) { + client_lock.unlock(); + onfinish->complete(r); + client_lock.lock(); + } return r; + } client_lock.unlock(); copy_bufferlist_to_iovec(iov, iovcnt, &bl, r); @@ -10862,7 +11101,9 @@ int64_t Client::_preadv_pwritev_locked(Fh *fh, const struct iovec *iov, } } -int Client::_preadv_pwritev(int fd, const struct iovec *iov, unsigned iovcnt, int64_t offset, bool write) +int Client::_preadv_pwritev(int fd, const struct iovec *iov, unsigned iovcnt, + int64_t offset, bool write, Context *onfinish, + bufferlist *blp) { RWRef_t mref_reader(mount_state, CLIENT_MOUNTING); if (!mref_reader.is_state_satisfied()) @@ -10875,7 +11116,8 @@ int Client::_preadv_pwritev(int fd, const struct iovec *iov, unsigned iovcnt, in Fh *fh = get_filehandle(fd); if (!fh) return -CEPHFS_EBADF; - return _preadv_pwritev_locked(fh, iov, iovcnt, offset, write, true); + return _preadv_pwritev_locked(fh, iov, iovcnt, offset, write, true, + onfinish, blp); } int64_t Client::_write_success(Fh *f, utime_t start, uint64_t fpos, @@ -10925,13 +11167,64 @@ int64_t Client::_write_success(Fh *f, utime_t start, uint64_t fpos, return r; } +void Client::C_Write_Finisher::finish_io(int r) +{ + bool fini; + + clnt->put_cap_ref(in, CEPH_CAP_FILE_BUFFER); + + if (r >= 0) { + if (is_file_write) { + if ((f->flags & O_SYNC) || (f->flags & O_DSYNC)) { + clnt->_flush_range(in, offset, size); + } + } + + r = clnt->_write_success(f, start, fpos, offset, size, in); + } + + iofinished = true; + iofinished_r = r; + fini = try_complete(); + + if (fini) + delete this; +} + +void Client::C_Write_Finisher::finish_onuninline(int r) +{ + // Called by _write with client_lock held. + onuninlinefinished = true; + onuninlinefinished_r = r; + + if (try_complete()) + delete this; +} + +bool Client::C_Write_Finisher::try_complete() +{ + if (onuninlinefinished && iofinished) { + clnt->put_cap_ref(in, CEPH_CAP_FILE_WR); + + if (onuninlinefinished_r >= 0 || onuninlinefinished_r == -CEPHFS_ECANCELED) + onfinish->complete(iofinished_r); + else + onfinish->complete(onuninlinefinished_r); + return true; + } + return false; +} + int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf, - const struct iovec *iov, int iovcnt) + const struct iovec *iov, int iovcnt, Context *onfinish) { ceph_assert(ceph_mutex_is_locked_by_me(client_lock)); uint64_t fpos = 0; Inode *in = f->inode.get(); + std::unique_ptr onuninline = nullptr; + CWF_iofinish *cwf_iofinish = NULL; + C_SaferCond *cond_iofinish = NULL; if ( (uint64_t)(offset+size) > mdsmap->get_max_filesize() && //exceeds config (uint64_t)(offset+size) > in->size ) { //exceeds filesize @@ -11025,7 +11318,8 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf, ldout(cct, 10) << " snaprealm " << *in->snaprealm << dendl; - std::unique_ptr onuninline = nullptr; + std::unique_ptr iofinish = nullptr; + std::unique_ptr cwf = nullptr; if (in->inline_version < CEPH_INLINE_NONE) { if (endoff > cct->_conf->client_max_inline_size || @@ -11055,6 +11349,20 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf, } } + if (onfinish) { + cwf_iofinish = new CWF_iofinish(); + iofinish.reset(cwf_iofinish); + + cwf.reset(new + C_Write_Finisher(this, onfinish, nullptr == onuninline, + cct->_conf->client_oc && + (have & (CEPH_CAP_FILE_BUFFER | + CEPH_CAP_FILE_LAZYIO)), + start, f, in, fpos, offset, size)); + + cwf_iofinish->CWF = cwf.get(); + } + if (cct->_conf->client_oc && (have & (CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO))) { // do buffered write @@ -11067,8 +11375,42 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf, r = objectcacher->file_write(&in->oset, &in->layout, in->snaprealm->get_snap_context(), offset, size, bl, ceph::real_clock::now(), - 0, nullptr, - objectcacher->CFG_block_writes_upfront()); + 0, iofinish.get(), + onfinish == nullptr + ? objectcacher->CFG_block_writes_upfront() + : false); + + if (onfinish) { + // handle non-blocking caller (onfinish != nullptr), we can now safely + // release all the managed pointers, but we might need to do something + // with iofinisher. + Context *iof = iofinish.release(); + C_Write_Finisher *cwfp = cwf.release(); + + if (r < 0) { + // should not get here, but... + iof->complete(r); + } + + if (nullptr != onuninline) { + client_lock.unlock(); + int uninline_ret = onuninline->wait(); + client_lock.lock(); + + if (uninline_ret >= 0 || uninline_ret == -CEPHFS_ECANCELED) { + in->inline_data.clear(); + in->inline_version = CEPH_INLINE_NONE; + in->mark_caps_dirty(CEPH_CAP_FILE_WR); + check_caps(in, 0); + } + + cwfp->finish_onuninline(uninline_ret); + } + + // allow caller to wait on onfinish... + return 0; + } + put_cap_ref(in, CEPH_CAP_FILE_BUFFER); if (r < 0) @@ -11085,15 +11427,32 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf, _flush_range(in, offset, size); // simple, non-atomic sync write - C_SaferCond onfinish("Client::_write flock"); + if (onfinish == nullptr) { + // We need a safer condition to wait on. + cond_iofinish = new C_SaferCond(); + iofinish.reset(cond_iofinish); + } + get_cap_ref(in, CEPH_CAP_FILE_BUFFER); filer->write_trunc(in->ino, &in->layout, in->snaprealm->get_snap_context(), offset, size, bl, ceph::real_clock::now(), 0, in->truncate_size, in->truncate_seq, - &onfinish); + iofinish.get()); + + if (onfinish) { + // handle non-blocking caller (onfinish != nullptr), we can now safely + // release all the managed pointers + iofinish.release(); + onuninline.release(); + cwf.release(); + + // allow caller to wait on onfinish... + return 0; + } + client_lock.unlock(); - r = onfinish.wait(); + r = cond_iofinish->wait(); client_lock.lock(); put_cap_ref(in, CEPH_CAP_FILE_BUFFER); if (r < 0) @@ -11103,10 +11462,13 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf, // if we get here, write was successful, update client metadata success: + // do not get here if non-blocking caller (onfinish != nullptr) r = _write_success(f, start, fpos, offset, size, in); done: + // can not get here if non-blocking caller (onfinish != nullptr) + if (nullptr != onuninline) { client_lock.unlock(); int uninline_ret = onuninline->wait(); diff --git a/src/client/Client.h b/src/client/Client.h index a561f4e7a5850..b8e4a9cec8e09 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -1245,6 +1245,178 @@ protected: struct initialize_state_t initialize_state; private: + class C_Read_Finisher : public Context { + public: + bool iofinished; + void finish_io(int r); + + C_Read_Finisher(Client *clnt, Context *onfinish, Context *iofinish, + bool is_read_async, int have_caps, bool movepos, + utime_t start, Fh *f, Inode *in, uint64_t fpos, + int64_t offset, uint64_t size) + : clnt(clnt), onfinish(onfinish), iofinish(iofinish), + is_read_async(is_read_async), have_caps(have_caps), f(f), in(in), + start(start), fpos(fpos), offset(offset), size(size), movepos(movepos) { + iofinished = false; + } + + void finish(int r) override { + // We need to override finish, but have nothing to do. + } + + private: + Client *clnt; + Context *onfinish; + Context *iofinish; + bool is_read_async; + int have_caps; + Fh *f; + Inode *in; + utime_t start; + uint64_t fpos; + int64_t offset; + uint64_t size; + bool movepos; + }; + + struct CRF_iofinish : public Context { + Client::C_Read_Finisher *CRF; + + CRF_iofinish() + : CRF(nullptr) {} + + void finish(int r) override { + CRF->finish_io(r); + } + + // For _read_async, we may not finish in one go, so be prepared for multiple + // calls to complete. All the handling though is in C_Read_Finisher. + void complete(int r) override { + finish(r); + if (CRF->iofinished) + delete this; + } + }; + + class C_Read_Sync_NonBlocking : public Context { + // When operating in non-blocking mode, what used to be done by _read_sync + // still needs to be handled, but it needs to be handled without blocking + // while still following the semantics. Note that _read_sync is actually + // asynchronous. it just uses condition variables to wait. Now instead, we use + // this Context class to synchronize the steps. + // + // The steps will be accomplished by complete/finish being called to complete + // each step, with complete only releasing this object once all is finally + // complete. + public: + C_Read_Sync_NonBlocking(Client *clnt, Context *onfinish, Fh *f, Inode *in, + uint64_t fpos, uint64_t off, uint64_t len, + bufferlist *bl, Filer *filer, int have_caps) + : clnt(clnt), onfinish(onfinish), f(f), in(in), off(off), len(len), bl(bl), + filer(filer), have_caps(have_caps) + { + left = len; + wanted = len; + read = 0; + pos = off; + fini = false; + } + + void retry(); + + private: + Client *clnt; + Context *onfinish; + Fh *f; + Inode *in; + uint64_t off; + uint64_t len; + int left; + int wanted; + bufferlist *bl; + bufferlist tbl; + Filer *filer; + int have_caps; + int read; + uint64_t pos; + bool fini; + + void finish(int r) override; + + void complete(int r) override + { + finish(r); + if (fini) + delete this; + } + }; + + class C_Read_Async_Finisher : public Context { + public: + C_Read_Async_Finisher(Client *clnt, Context *onfinish, Fh *f, Inode *in, + uint64_t fpos, uint64_t off, uint64_t len) + : clnt(clnt), onfinish(onfinish), f(f), in(in), off(off), len(len) {} + + private: + Client *clnt; + Context *onfinish; + Fh *f; + Inode *in; + uint64_t off; + uint64_t len; + + void finish(int r) override; + }; + + class C_Write_Finisher : public Context { + public: + void finish_io(int r); + void finish_onuninline(int r); + + C_Write_Finisher(Client *clnt, Context *onfinish, bool dont_need_uninline, + bool is_file_write, utime_t start, Fh *f, Inode *in, + uint64_t fpos, int64_t offset, uint64_t size) + : clnt(clnt), onfinish(onfinish), + is_file_write(is_file_write), start(start), f(f), in(in), fpos(fpos), + offset(offset), size(size) { + iofinished_r = 0; + onuninlinefinished_r = 0; + iofinished = false; + onuninlinefinished = dont_need_uninline; + } + + void finish(int r) override { + // We need to override finish, but have nothing to do. + } + + private: + Client *clnt; + Context *onfinish; + bool is_file_write; + utime_t start; + Fh *f; + Inode *in; + uint64_t fpos; + int64_t offset; + uint64_t size; + int64_t iofinished_r; + int64_t onuninlinefinished_r; + bool iofinished; + bool onuninlinefinished; + bool try_complete(); + }; + + struct CWF_iofinish : public Context { + C_Write_Finisher *CWF; + + CWF_iofinish() + : CWF(nullptr) {} + + void finish(int r) override { + CWF->finish_io(r); + } + }; + struct C_Readahead : public Context { C_Readahead(Client *c, Fh *f); ~C_Readahead() override; @@ -1331,7 +1503,8 @@ private: std::pair _do_remount(bool retry_on_error); int _read_sync(Fh *f, uint64_t off, uint64_t len, bufferlist *bl, bool *checkeof); - int _read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl); + int _read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl, + Context *onfinish); bool _dentry_valid(const Dentry *dn); @@ -1399,17 +1572,21 @@ private: std::string alternate_name); loff_t _lseek(Fh *fh, loff_t offset, int whence); - int64_t _read(Fh *fh, int64_t offset, uint64_t size, bufferlist *bl); + int64_t _read(Fh *fh, int64_t offset, uint64_t size, bufferlist *bl, + Context *onfinish = nullptr); void do_readahead(Fh *f, Inode *in, uint64_t off, uint64_t len); int64_t _write_success(Fh *fh, utime_t start, uint64_t fpos, int64_t offset, uint64_t size, Inode *in); int64_t _write(Fh *fh, int64_t offset, uint64_t size, const char *buf, - const struct iovec *iov, int iovcnt); + const struct iovec *iov, int iovcnt, Context *onfinish = nullptr); int64_t _preadv_pwritev_locked(Fh *fh, const struct iovec *iov, unsigned iovcnt, int64_t offset, - bool write, bool clamp_to_int); + bool write, bool clamp_to_int, + Context *onfinish = nullptr, + bufferlist *blp = nullptr); int _preadv_pwritev(int fd, const struct iovec *iov, unsigned iovcnt, - int64_t offset, bool write); + int64_t offset, bool write, Context *onfinish = nullptr, + bufferlist *blp = nullptr); int _flush(Fh *fh); int _fsync(Fh *fh, bool syncdataonly); int _fsync(Inode *in, bool syncdataonly); -- 2.39.5