return r;
}
+void Client::C_nonblocking_fsync_state::advance()
+{
+ Context *advancer;
+ client_t const whoami = clnt->whoami; // For the benefit of ldout prefix
+
+ ceph_assert(ceph_mutex_is_locked_by_me(clnt->client_lock));
+
+ switch (progress) {
+ case 0:
+ progress = 1;
+
+ if (clnt->cct->_conf->client_oc) {
+ Context *finisher = new C_nonblocking_fsync_flush_finisher(clnt, this);
+ flush_wait = true;
+ tmp_ref = in; // take a reference; C_SaferCond doesn't and _flush won't either
+ clnt->_flush(in, finisher);
+ ldout(clnt->cct, 15) << "using return-valued form of _fsync" << dendl;
+ }
+
+ if (!syncdataonly && in->dirty_caps) {
+ clnt->check_caps(in, CHECK_CAPS_NODELAY|CHECK_CAPS_SYNCHRONOUS);
+ if (in->flushing_caps)
+ flush_tid = clnt->last_flush_tid;
+ } else {
+ ldout(clnt->cct, 10) << "no metadata needs to commit" << dendl;
+ }
+
+ if (!syncdataonly && !in->unsafe_ops.empty()) {
+ waitfor_safe = true;
+ clnt->flush_mdlog_sync(in);
+
+ advancer = new C_nonblocking_fsync_state_advancer(clnt, this);
+ req = in->unsafe_ops.back();
+ ldout(clnt->cct, 15) << "waiting on unsafe requests, last tid " << req->get_tid() << dendl;
+
+ req->get();
+ clnt->add_nonblocking_onfinish_to_context_list(req->waitfor_safe, advancer);
+ // ------------ here is a state machine break point
+ return;
+ }
+
+ // skip and fall through
+
+ case 1:
+ progress = 2;
+
+ if (waitfor_safe) {
+ clnt->put_request(req);
+ }
+
+ if (flush_wait && !flush_completed) {
+ // wait on a real reply instead of guessing
+ ldout(clnt->cct, 15) << "waiting on data to flush" << dendl;
+ // ------------ here is a state machine break point
+ return;
+ } else {
+ // FIXME: this can starve
+ if (in->cap_refs[CEPH_CAP_FILE_BUFFER] > 0) {
+ ldout(clnt->cct, 10) << "ino " << in->ino << " has " << in->cap_refs[CEPH_CAP_FILE_BUFFER]
+ << " uncommitted, waiting" << dendl;
+ advancer = new C_nonblocking_fsync_state_advancer(clnt, this);
+ clnt->add_nonblocking_onfinish_to_context_list(in->waitfor_commit, advancer);
+ // ------------ here is a state machine break point but we have to
+ // return to this case because this might loop.
+ progress = 1;
+ return;
+ }
+ }
+
+ // skip and fall through
+
+ case 2:
+
+ if (flush_completed) {
+ // we waited for real reply above, now we have it... retrieve result
+ ldout(clnt->cct, 15) << "got " << result << " from flush writeback" << dendl;
+ }
+
+ if (result != 0) {
+ // ERROR!
+ break;
+ }
+
+ if (flush_tid <= 0) {
+ // DONE!
+ break;
+ }
+
+ // fall through
+ progress = 3;
+
+ case 3:
+ // do equivalent of wait_sync_caps(in, flush_tid)
+ if (in->flushing_caps) {
+ map<ceph_tid_t, int>::iterator it = in->flushing_cap_tids.begin();
+ ceph_assert(it != in->flushing_cap_tids.end());
+
+ if (it->first <= flush_tid) {
+ ldout(clnt->cct, 10) << __func__ << " on " << *in << " flushing "
+ << ccap_string(it->second) << " flush_tid " << flush_tid
+ << " last " << it->first << dendl;
+ advancer = new C_nonblocking_fsync_state_advancer(clnt, this);
+ clnt->add_nonblocking_onfinish_to_context_list(in->waitfor_caps, advancer);
+ // ------------ here is a state machine break point
+ // the advancer completion will resume with case 3
+ return;
+ }
+
+ // DONE!
+ }
+ }
+
+ if (result == 0) {
+ ldout(clnt->cct, 10) << "ino " << in->ino << " has no uncommitted writes" << dendl;
+ } else {
+ ldout(clnt->cct, 8) << "ino " << in->ino << " failed to commit to disk! "
+ << cpp_strerror(-result) << dendl;
+ }
+
+ utime_t lat;
+
+ lat = ceph_clock_now();
+ lat -= start;
+ clnt->logger->tinc(l_c_fsync, lat);
+
+ onfinish->complete(result);
+
+ // we're done
+ delete this;
+}
+
+void Client::nonblocking_fsync(Inode *in, bool syncdataonly, Context *onfinish)
+{
+ C_nonblocking_fsync_state *state = new C_nonblocking_fsync_state(this, in, syncdataonly, onfinish);
+
+ // Kick fsync off...
+ state->advance();
+}
+
int Client::_fsync(Inode *in, bool syncdataonly)
{
ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
void flush_snaps(Inode *in);
void get_cap_ref(Inode *in, int cap);
void put_cap_ref(Inode *in, int cap);
+ void submit_sync_caps(Inode *in, ceph_tid_t want, Context *onfinish);
void wait_sync_caps(Inode *in, ceph_tid_t want);
void wait_sync_caps(ceph_tid_t want);
void queue_cap_snap(Inode *in, SnapContext &old_snapc);
}
};
+ struct C_nonblocking_fsync_state {
+ Client *clnt;
+
+ // nonblocking_fsync parms
+ Inode *in;
+ bool syncdataonly;
+ Context *onfinish;
+
+ // were local variables
+ ceph_tid_t flush_tid;
+ InodeRef tmp_ref;
+ utime_t start;
+ MetaRequest *req;
+
+ // we need to keep track of where we are
+ int progress;
+ bool flush_wait;
+ bool flush_completed;
+ int result;
+ bool waitfor_safe;
+
+ C_nonblocking_fsync_state(Client *clnt, Inode *in, bool syncdataonly, Context *onfinish)
+ : clnt(clnt), in(in), syncdataonly(syncdataonly), onfinish(onfinish) {
+ flush_tid = 0;
+ start = ceph_clock_now();
+ progress = 0;
+ flush_wait = false;
+ flush_completed = false;
+ result = 0;
+ waitfor_safe = false;
+ }
+
+ void advance();
+
+ void complete_flush(int r) {
+ flush_completed = true;
+ result = r;
+ if (progress == 2)
+ advance();
+ }
+ };
+
+ struct C_nonblocking_fsync_state_advancer : Context {
+ Client *clnt;
+ Client::C_nonblocking_fsync_state *state;
+
+ C_nonblocking_fsync_state_advancer(Client *clnt, Client::C_nonblocking_fsync_state *state)
+ : clnt(clnt), state(state) {
+ }
+
+ void finish(int r) override {
+ clnt->client_lock.lock();
+ state->advance();
+ clnt->client_lock.unlock();
+ }
+ };
+
+ struct C_nonblocking_fsync_flush_finisher : Context {
+ Client *clnt;
+ Client::C_nonblocking_fsync_state *state;
+
+ C_nonblocking_fsync_flush_finisher(Client *clnt, Client::C_nonblocking_fsync_state *state)
+ : clnt(clnt), state(state) {
+ }
+
+ void finish(int r) override {
+ clnt->client_lock.lock();
+ state->complete_flush(r);
+ clnt->client_lock.unlock();
+ }
+ };
+
struct C_Readahead : public Context {
C_Readahead(Client *c, Fh *f);
~C_Readahead() override;
int64_t offset, bool write, Context *onfinish = nullptr,
bufferlist *blp = nullptr);
int _flush(Fh *fh);
+ void nonblocking_fsync(Inode *in, bool syncdataonly, Context *onfinish);
int _fsync(Fh *fh, bool syncdataonly);
int _fsync(Inode *in, bool syncdataonly);
int _sync_fs();