From: Frank S. Filz Date: Thu, 14 Jul 2022 22:02:06 +0000 (-0700) Subject: Client: Hook nonblocking fsync into the write path of ll_preadv_pwritev X-Git-Tag: v19.0.0~814^2~2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=6809ff8c1247232186c045acd6b2f066576c33ae;p=ceph.git Client: Hook nonblocking fsync into the write path of ll_preadv_pwritev Signed-off-by: Frank S. Filz --- diff --git a/src/client/Client.cc b/src/client/Client.cc index 97cad15577ee5..66e964658ef01 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -11086,7 +11086,8 @@ int Client::pwritev(int fd, const struct iovec *iov, int iovcnt, int64_t offset) int64_t Client::_preadv_pwritev_locked(Fh *fh, const struct iovec *iov, unsigned iovcnt, int64_t offset, bool write, bool clamp_to_int, - Context *onfinish, bufferlist *blp) + Context *onfinish, bufferlist *blp, + bool do_fsync, bool syncdataonly) { ceph_assert(ceph_mutex_is_locked_by_me(client_lock)); @@ -11109,7 +11110,7 @@ int64_t Client::_preadv_pwritev_locked(Fh *fh, const struct iovec *iov, } if (write) { - int64_t w = _write(fh, offset, totallen, NULL, iov, iovcnt, onfinish); + int64_t w = _write(fh, offset, totallen, NULL, iov, iovcnt, onfinish, do_fsync, syncdataonly); ldout(cct, 3) << "pwritev(" << fh << ", \"...\", " << totallen << ", " << offset << ") = " << w << dendl; return w; } else { @@ -11233,22 +11234,64 @@ void Client::C_Write_Finisher::finish_onuninline(int r) delete this; } +void Client::C_Write_Finisher::finish_fsync(int r) +{ + bool fini; + client_t const whoami = clnt->whoami; // For the benefit of ldout prefix + + ldout(clnt->cct, 3) << "finish_fsync r = " << r << dendl; + + fsync_finished = true; + fsync_r = r; + fini = try_complete(); + + if (fini) + delete this; +} + bool Client::C_Write_Finisher::try_complete() { - if (onuninlinefinished && iofinished) { + client_t const whoami = clnt->whoami; // For the benefit of ldout prefix + + ldout(clnt->cct, 19) << "C_Write_Finisher::try_complete this " << this + << " onuninlinefinished " << onuninlinefinished + << " iofinished " << iofinished + << " iofinished_r " << iofinished_r + << " fsync_finished " << fsync_finished + << dendl; + + if (onuninlinefinished && iofinished && !fsync_finished && iofinished_r >= 0) { + // Done with I/O AND uninline, but we want to do fsync + CWF_fsync_finish *fsync_f = new CWF_fsync_finish(this); + C_nonblocking_fsync_state *state = new C_nonblocking_fsync_state(clnt, in, syncdataonly, fsync_f); + + // Kick fsync off... and all will magically complete eventually... + ldout(clnt->cct, 19) << "kickoff fsync onfinish " << onfinish << dendl; + state->advance(); + } else if (onuninlinefinished && iofinished) { + // Now we are REALLY done... clnt->put_cap_ref(in, CEPH_CAP_FILE_WR); - if (onuninlinefinished_r >= 0 || onuninlinefinished_r == -CEPHFS_ECANCELED) - onfinish->complete(iofinished_r); - else + if (fsync_r < 0) { + ldout(clnt->cct, 19) << " complete with fsync_r " << fsync_r << dendl; + onfinish->complete(fsync_r); + } else if (onuninlinefinished_r < 0 && onuninlinefinished_r != -CEPHFS_ECANCELED) { + ldout(clnt->cct, 19) << " complete with onuninlinefinished_r " << onuninlinefinished_r << dendl; onfinish->complete(onuninlinefinished_r); + } else { + ldout(clnt->cct, 19) << " complete with iofinished_r " << iofinished_r << dendl; + onfinish->complete(iofinished_r); + } + onfinish = nullptr; return true; } + return false; } int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf, - const struct iovec *iov, int iovcnt, Context *onfinish) + const struct iovec *iov, int iovcnt, Context *onfinish, + bool do_fsync, bool syncdataonly) { ceph_assert(ceph_mutex_is_locked_by_me(client_lock)); @@ -11390,7 +11433,8 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf, cct->_conf->client_oc && (have & (CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO)), - start, f, in, fpos, offset, size)); + start, f, in, fpos, offset, size, + do_fsync, syncdataonly)); cwf_iofinish->CWF = cwf.get(); } @@ -11497,6 +11541,15 @@ success: // do not get here if non-blocking caller (onfinish != nullptr) r = _write_success(f, start, fpos, offset, size, in); + if (r >= 0 && do_fsync) { + int64_t r1; + client_lock.unlock(); + r1 = _fsync(f, false); + if (r1 < 0) + r = r1; + client_lock.lock(); + } + done: // can not get here if non-blocking caller (onfinish != nullptr) @@ -11607,10 +11660,20 @@ void Client::C_nonblocking_fsync_state::advance() Context *advancer; client_t const whoami = clnt->whoami; // For the benefit of ldout prefix + ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance" + << " progress " << progress + << " flush_wait " << flush_wait + << " flush_completed " << flush_completed + << " result " << result + << " waitfor_safe " << waitfor_safe + << " onfinish " << onfinish + << dendl; + ceph_assert(ceph_mutex_is_locked_by_me(clnt->client_lock)); switch (progress) { case 0: + ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - case 0" << dendl; progress = 1; if (clnt->cct->_conf->client_oc) { @@ -11646,6 +11709,7 @@ void Client::C_nonblocking_fsync_state::advance() // skip and fall through case 1: + ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - case 1" << dendl; progress = 2; if (waitfor_safe) { @@ -11674,6 +11738,7 @@ void Client::C_nonblocking_fsync_state::advance() // skip and fall through case 2: + ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - case 2" << dendl; if (flush_completed) { // we waited for real reply above, now we have it... retrieve result @@ -11682,11 +11747,13 @@ void Client::C_nonblocking_fsync_state::advance() if (result != 0) { // ERROR! + ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - ERROR!" << dendl; break; } if (flush_tid <= 0) { // DONE! + ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - DONE!" << dendl; break; } @@ -11694,23 +11761,41 @@ void Client::C_nonblocking_fsync_state::advance() progress = 3; case 3: + case 4: + ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - case " << progress << dendl; + ldout(clnt->cct, 15) << "in->flushing_cap_tids.empty() " << in->flushing_cap_tids.empty() + << " in->flushing_caps " << in->flushing_caps + << dendl; // do equivalent of wait_sync_caps(in, flush_tid) if (in->flushing_caps) { + ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - flushing_caps" << dendl; map::iterator it = in->flushing_cap_tids.begin(); ceph_assert(it != in->flushing_cap_tids.end()); + ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance" + << " it->first " << it->first + << " flush_tid " << flush_tid + << dendl; if (it->first <= flush_tid) { ldout(clnt->cct, 10) << __func__ << " on " << *in << " flushing " << ccap_string(it->second) << " flush_tid " << flush_tid << " last " << it->first << dendl; advancer = new C_nonblocking_fsync_state_advancer(clnt, this); - clnt->add_nonblocking_onfinish_to_context_list(in->waitfor_caps, advancer); + ldout(clnt->cct, 10) << "Adding onfinish " << onfinish + << " for C_nonblocking_fsync_state " << this + << dendl; + if (progress == 3) + clnt->add_nonblocking_onfinish_to_context_list(in->waitfor_caps, advancer); + else + clnt->add_nonblocking_onfinish_to_context_list(in->waitfor_caps_pending, advancer); // ------------ here is a state machine break point // the advancer completion will resume with case 3 + progress = 4; return; } // DONE! + ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - DONE!" << dendl; } } @@ -11733,6 +11818,33 @@ void Client::C_nonblocking_fsync_state::advance() delete this; } +void Client::C_nonblocking_fsync_state::complete_flush(int r) +{ + client_t const whoami = clnt->whoami; // For the benefit of ldout prefix + + ldout(clnt->cct, 15) << "complete_flush" + << " r " << r + << " progress " << progress + << dendl; + + flush_completed = true; + result = r; + if (progress == 2) + advance(); +} + +void Client::C_nonblocking_fsync_state_advancer::finish(int r) +{ + client_t const whoami = clnt->whoami; // For the benefit of ldout prefix + + ldout(clnt->cct, 15) << "C_nonblocking_fsync_state_advancer::finish" + << " r " << r + << dendl; + + ceph_assert(ceph_mutex_is_locked_by_me(clnt->client_lock)); + state->advance(); +} + void Client::nonblocking_fsync(Inode *in, bool syncdataonly, Context *onfinish) { C_nonblocking_fsync_state *state = new C_nonblocking_fsync_state(this, in, syncdataonly, onfinish); @@ -15684,7 +15796,8 @@ int64_t Client::ll_readv(struct Fh *fh, const struct iovec *iov, int iovcnt, int int64_t Client::ll_preadv_pwritev(struct Fh *fh, const struct iovec *iov, int iovcnt, int64_t offset, bool write, - Context *onfinish, bufferlist *bl) + Context *onfinish, bufferlist *bl, + bool do_fsync, bool syncdataonly) { RWRef_t mref_reader(mount_state, CLIENT_MOUNTING); if (!mref_reader.is_state_satisfied()) @@ -15692,7 +15805,7 @@ int64_t Client::ll_preadv_pwritev(struct Fh *fh, const struct iovec *iov, std::scoped_lock cl(client_lock); return _preadv_pwritev_locked(fh, iov, iovcnt, offset, write, true, - onfinish, bl); + onfinish, bl, do_fsync, syncdataonly); } int Client::ll_flush(Fh *fh) diff --git a/src/client/Client.h b/src/client/Client.h index ae09224d84a13..1e4d5890d198d 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -649,7 +649,8 @@ public: int64_t ll_preadv_pwritev(struct Fh *fh, const struct iovec *iov, int iovcnt, int64_t offset, bool write, Context *onfinish = nullptr, - bufferlist *blp = nullptr); + bufferlist *blp = nullptr, + bool do_fsync = false, bool syncdataonly = false); loff_t ll_lseek(Fh *fh, loff_t offset, int whence); int ll_flush(Fh *fh); int ll_fsync(Fh *fh, bool syncdataonly); @@ -1381,17 +1382,21 @@ private: public: void finish_io(int r); void finish_onuninline(int r); + void finish_fsync(int r); C_Write_Finisher(Client *clnt, Context *onfinish, bool dont_need_uninline, bool is_file_write, utime_t start, Fh *f, Inode *in, - uint64_t fpos, int64_t offset, uint64_t size) + uint64_t fpos, int64_t offset, uint64_t size, + bool do_fsync, bool syncdataonly) : clnt(clnt), onfinish(onfinish), is_file_write(is_file_write), start(start), f(f), in(in), fpos(fpos), - offset(offset), size(size) { + offset(offset), size(size), syncdataonly(syncdataonly) { iofinished_r = 0; onuninlinefinished_r = 0; + fsync_r = 0; iofinished = false; onuninlinefinished = dont_need_uninline; + fsync_finished = !do_fsync; } void finish(int r) override { @@ -1408,10 +1413,13 @@ private: uint64_t fpos; int64_t offset; uint64_t size; + bool syncdataonly; int64_t iofinished_r; int64_t onuninlinefinished_r; + int64_t fsync_r; bool iofinished; bool onuninlinefinished; + bool fsync_finished; bool try_complete(); }; @@ -1426,6 +1434,17 @@ private: } }; + struct CWF_fsync_finish : public Context { + C_Write_Finisher *CWF; + + CWF_fsync_finish(C_Write_Finisher *CWF) + : CWF(CWF) {} + + void finish(int r) override { + CWF->finish_fsync(r); + } + }; + struct C_nonblocking_fsync_state { Client *clnt; @@ -1460,12 +1479,7 @@ private: void advance(); - void complete_flush(int r) { - flush_completed = true; - result = r; - if (progress == 2) - advance(); - } + void complete_flush(int r); }; struct C_nonblocking_fsync_state_advancer : Context { @@ -1476,11 +1490,7 @@ private: : clnt(clnt), state(state) { } - void finish(int r) override { - clnt->client_lock.lock(); - state->advance(); - clnt->client_lock.unlock(); - } + void finish(int r) override; }; struct C_nonblocking_fsync_flush_finisher : Context { @@ -1492,9 +1502,8 @@ private: } void finish(int r) override { - clnt->client_lock.lock(); + ceph_assert(ceph_mutex_is_locked_by_me(clnt->client_lock)); state->complete_flush(r); - clnt->client_lock.unlock(); } }; @@ -1659,12 +1668,14 @@ private: int64_t _write_success(Fh *fh, utime_t start, uint64_t fpos, int64_t offset, uint64_t size, Inode *in); int64_t _write(Fh *fh, int64_t offset, uint64_t size, const char *buf, - const struct iovec *iov, int iovcnt, Context *onfinish = nullptr); + const struct iovec *iov, int iovcnt, Context *onfinish = nullptr, + bool do_fsync = false, bool syncdataonly = false); int64_t _preadv_pwritev_locked(Fh *fh, const struct iovec *iov, unsigned iovcnt, int64_t offset, bool write, bool clamp_to_int, Context *onfinish = nullptr, - bufferlist *blp = nullptr); + bufferlist *blp = nullptr, + bool do_fsync = false, bool syncdataonly = false); int _preadv_pwritev(int fd, const struct iovec *iov, unsigned iovcnt, int64_t offset, bool write, Context *onfinish = nullptr, bufferlist *blp = nullptr);