From: Frank S. Filz Date: Fri, 1 Jul 2022 20:58:23 +0000 (-0700) Subject: Client: Add non-blocking fsync X-Git-Tag: v19.0.0~814^2~3 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=a181c1a0dbbee25feaee9f28563d60db4b29fc89;p=ceph-ci.git Client: Add non-blocking fsync We will need the ability to do an non-blocking write that finishes with fsync so we need non-blocking fsync. Signed-off-by: Frank S. Filz --- diff --git a/src/client/Client.cc b/src/client/Client.cc index 953a6dd5397..97cad15577e 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -11602,6 +11602,145 @@ int Client::fsync(int fd, bool syncdataonly) return r; } +void Client::C_nonblocking_fsync_state::advance() +{ + Context *advancer; + client_t const whoami = clnt->whoami; // For the benefit of ldout prefix + + ceph_assert(ceph_mutex_is_locked_by_me(clnt->client_lock)); + + switch (progress) { + case 0: + progress = 1; + + if (clnt->cct->_conf->client_oc) { + Context *finisher = new C_nonblocking_fsync_flush_finisher(clnt, this); + flush_wait = true; + tmp_ref = in; // take a reference; C_SaferCond doesn't and _flush won't either + clnt->_flush(in, finisher); + ldout(clnt->cct, 15) << "using return-valued form of _fsync" << dendl; + } + + if (!syncdataonly && in->dirty_caps) { + clnt->check_caps(in, CHECK_CAPS_NODELAY|CHECK_CAPS_SYNCHRONOUS); + if (in->flushing_caps) + flush_tid = clnt->last_flush_tid; + } else { + ldout(clnt->cct, 10) << "no metadata needs to commit" << dendl; + } + + if (!syncdataonly && !in->unsafe_ops.empty()) { + waitfor_safe = true; + clnt->flush_mdlog_sync(in); + + advancer = new C_nonblocking_fsync_state_advancer(clnt, this); + req = in->unsafe_ops.back(); + ldout(clnt->cct, 15) << "waiting on unsafe requests, last tid " << req->get_tid() << dendl; + + req->get(); + clnt->add_nonblocking_onfinish_to_context_list(req->waitfor_safe, advancer); + // ------------ here is a state machine break point + return; + } + + // skip and fall through + + case 1: + progress = 2; + + if (waitfor_safe) { + clnt->put_request(req); + } + + if (flush_wait && !flush_completed) { + // wait on a real reply instead of guessing + ldout(clnt->cct, 15) << "waiting on data to flush" << dendl; + // ------------ here is a state machine break point + return; + } else { + // FIXME: this can starve + if (in->cap_refs[CEPH_CAP_FILE_BUFFER] > 0) { + ldout(clnt->cct, 10) << "ino " << in->ino << " has " << in->cap_refs[CEPH_CAP_FILE_BUFFER] + << " uncommitted, waiting" << dendl; + advancer = new C_nonblocking_fsync_state_advancer(clnt, this); + clnt->add_nonblocking_onfinish_to_context_list(in->waitfor_commit, advancer); + // ------------ here is a state machine break point but we have to + // return to this case because this might loop. + progress = 1; + return; + } + } + + // skip and fall through + + case 2: + + if (flush_completed) { + // we waited for real reply above, now we have it... retrieve result + ldout(clnt->cct, 15) << "got " << result << " from flush writeback" << dendl; + } + + if (result != 0) { + // ERROR! + break; + } + + if (flush_tid <= 0) { + // DONE! + break; + } + + // fall through + progress = 3; + + case 3: + // do equivalent of wait_sync_caps(in, flush_tid) + if (in->flushing_caps) { + map::iterator it = in->flushing_cap_tids.begin(); + ceph_assert(it != in->flushing_cap_tids.end()); + + if (it->first <= flush_tid) { + ldout(clnt->cct, 10) << __func__ << " on " << *in << " flushing " + << ccap_string(it->second) << " flush_tid " << flush_tid + << " last " << it->first << dendl; + advancer = new C_nonblocking_fsync_state_advancer(clnt, this); + clnt->add_nonblocking_onfinish_to_context_list(in->waitfor_caps, advancer); + // ------------ here is a state machine break point + // the advancer completion will resume with case 3 + return; + } + + // DONE! + } + } + + if (result == 0) { + ldout(clnt->cct, 10) << "ino " << in->ino << " has no uncommitted writes" << dendl; + } else { + ldout(clnt->cct, 8) << "ino " << in->ino << " failed to commit to disk! " + << cpp_strerror(-result) << dendl; + } + + utime_t lat; + + lat = ceph_clock_now(); + lat -= start; + clnt->logger->tinc(l_c_fsync, lat); + + onfinish->complete(result); + + // we're done + delete this; +} + +void Client::nonblocking_fsync(Inode *in, bool syncdataonly, Context *onfinish) +{ + C_nonblocking_fsync_state *state = new C_nonblocking_fsync_state(this, in, syncdataonly, onfinish); + + // Kick fsync off... + state->advance(); +} + int Client::_fsync(Inode *in, bool syncdataonly) { ceph_assert(ceph_mutex_is_locked_by_me(client_lock)); diff --git a/src/client/Client.h b/src/client/Client.h index 8371bb5547b..ae09224d84a 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -757,6 +757,7 @@ public: void flush_snaps(Inode *in); void get_cap_ref(Inode *in, int cap); void put_cap_ref(Inode *in, int cap); + void submit_sync_caps(Inode *in, ceph_tid_t want, Context *onfinish); void wait_sync_caps(Inode *in, ceph_tid_t want); void wait_sync_caps(ceph_tid_t want); void queue_cap_snap(Inode *in, SnapContext &old_snapc); @@ -1425,6 +1426,78 @@ private: } }; + struct C_nonblocking_fsync_state { + Client *clnt; + + // nonblocking_fsync parms + Inode *in; + bool syncdataonly; + Context *onfinish; + + // were local variables + ceph_tid_t flush_tid; + InodeRef tmp_ref; + utime_t start; + MetaRequest *req; + + // we need to keep track of where we are + int progress; + bool flush_wait; + bool flush_completed; + int result; + bool waitfor_safe; + + C_nonblocking_fsync_state(Client *clnt, Inode *in, bool syncdataonly, Context *onfinish) + : clnt(clnt), in(in), syncdataonly(syncdataonly), onfinish(onfinish) { + flush_tid = 0; + start = ceph_clock_now(); + progress = 0; + flush_wait = false; + flush_completed = false; + result = 0; + waitfor_safe = false; + } + + void advance(); + + void complete_flush(int r) { + flush_completed = true; + result = r; + if (progress == 2) + advance(); + } + }; + + struct C_nonblocking_fsync_state_advancer : Context { + Client *clnt; + Client::C_nonblocking_fsync_state *state; + + C_nonblocking_fsync_state_advancer(Client *clnt, Client::C_nonblocking_fsync_state *state) + : clnt(clnt), state(state) { + } + + void finish(int r) override { + clnt->client_lock.lock(); + state->advance(); + clnt->client_lock.unlock(); + } + }; + + struct C_nonblocking_fsync_flush_finisher : Context { + Client *clnt; + Client::C_nonblocking_fsync_state *state; + + C_nonblocking_fsync_flush_finisher(Client *clnt, Client::C_nonblocking_fsync_state *state) + : clnt(clnt), state(state) { + } + + void finish(int r) override { + clnt->client_lock.lock(); + state->complete_flush(r); + clnt->client_lock.unlock(); + } + }; + struct C_Readahead : public Context { C_Readahead(Client *c, Fh *f); ~C_Readahead() override; @@ -1596,6 +1669,7 @@ private: int64_t offset, bool write, Context *onfinish = nullptr, bufferlist *blp = nullptr); int _flush(Fh *fh); + void nonblocking_fsync(Inode *in, bool syncdataonly, Context *onfinish); int _fsync(Fh *fh, bool syncdataonly); int _fsync(Inode *in, bool syncdataonly); int _sync_fs();