]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Client: Add non-blocking fsync
authorFrank S. Filz <ffilzlnx@mindspring.com>
Fri, 1 Jul 2022 20:58:23 +0000 (13:58 -0700)
committerFrank S. Filz <ffilzlnx@mindspring.com>
Mon, 24 Jul 2023 18:49:04 +0000 (11:49 -0700)
We will need the ability to do an non-blocking write that finishes with
fsync so we need non-blocking fsync.

Signed-off-by: Frank S. Filz <ffilzlnx@mindspring.com>
src/client/Client.cc
src/client/Client.h

index 953a6dd53973e49ca5a68dd88cdc2c493900d1e5..97cad15577ee52ed3c375cfd68c3ec5f2055f34f 100644 (file)
@@ -11602,6 +11602,145 @@ int Client::fsync(int fd, bool syncdataonly)
   return r;
 }
 
+void Client::C_nonblocking_fsync_state::advance()
+{
+  Context *advancer;
+  client_t const whoami = clnt->whoami;  // For the benefit of ldout prefix
+
+  ceph_assert(ceph_mutex_is_locked_by_me(clnt->client_lock));
+
+  switch (progress) {
+  case 0:
+    progress = 1;
+
+    if (clnt->cct->_conf->client_oc) {
+      Context *finisher = new C_nonblocking_fsync_flush_finisher(clnt, this);
+      flush_wait = true;
+      tmp_ref = in; // take a reference; C_SaferCond doesn't and _flush won't either
+      clnt->_flush(in, finisher);
+      ldout(clnt->cct, 15) << "using return-valued form of _fsync" << dendl;
+    }
+
+    if (!syncdataonly && in->dirty_caps) {
+      clnt->check_caps(in, CHECK_CAPS_NODELAY|CHECK_CAPS_SYNCHRONOUS);
+      if (in->flushing_caps)
+        flush_tid = clnt->last_flush_tid;
+    } else {
+      ldout(clnt->cct, 10) << "no metadata needs to commit" << dendl;
+    }
+
+    if (!syncdataonly && !in->unsafe_ops.empty()) {
+      waitfor_safe = true;
+      clnt->flush_mdlog_sync(in);
+
+      advancer = new C_nonblocking_fsync_state_advancer(clnt, this);
+      req = in->unsafe_ops.back();
+      ldout(clnt->cct, 15) << "waiting on unsafe requests, last tid " << req->get_tid() <<  dendl;
+
+      req->get();
+      clnt->add_nonblocking_onfinish_to_context_list(req->waitfor_safe, advancer);
+      // ------------  here is a state machine break point
+      return;
+    }
+
+    // skip and fall through
+
+  case 1:
+    progress = 2;
+
+    if (waitfor_safe) {
+      clnt->put_request(req);
+    }
+
+    if (flush_wait && !flush_completed) {
+      // wait on a real reply instead of guessing
+      ldout(clnt->cct, 15) << "waiting on data to flush" << dendl;
+      // ------------  here is a state machine break point
+      return;
+    } else {
+      // FIXME: this can starve
+      if (in->cap_refs[CEPH_CAP_FILE_BUFFER] > 0) {
+        ldout(clnt->cct, 10) << "ino " << in->ino << " has " << in->cap_refs[CEPH_CAP_FILE_BUFFER]
+                             << " uncommitted, waiting" << dendl;
+        advancer = new C_nonblocking_fsync_state_advancer(clnt, this);
+        clnt->add_nonblocking_onfinish_to_context_list(in->waitfor_commit, advancer);
+        // ------------  here is a state machine break point but we have to
+        //               return to this case because this might loop.
+        progress = 1;
+        return;
+      }
+    }
+
+    // skip and fall through
+
+  case 2:
+
+    if (flush_completed) {
+      // we waited for real reply above, now we have it... retrieve result
+      ldout(clnt->cct, 15) << "got " << result << " from flush writeback" << dendl;
+    }
+
+    if (result != 0) {
+      // ERROR!
+      break;
+    }
+
+    if (flush_tid <= 0) {
+      // DONE!
+      break;
+    }
+
+    // fall through
+    progress = 3;
+
+  case 3:
+    // do equivalent of wait_sync_caps(in, flush_tid)
+    if (in->flushing_caps) {
+      map<ceph_tid_t, int>::iterator it = in->flushing_cap_tids.begin();
+      ceph_assert(it != in->flushing_cap_tids.end());
+
+      if (it->first <= flush_tid) {
+        ldout(clnt->cct, 10) << __func__ << " on " << *in << " flushing "
+                             << ccap_string(it->second) << " flush_tid " << flush_tid
+                             << " last " << it->first << dendl;
+        advancer = new C_nonblocking_fsync_state_advancer(clnt, this);
+        clnt->add_nonblocking_onfinish_to_context_list(in->waitfor_caps, advancer);
+        // ------------  here is a state machine break point
+        //               the advancer completion will resume with case 3
+        return;
+      }
+
+      // DONE!
+    }
+  }
+
+  if (result == 0) {
+    ldout(clnt->cct, 10) << "ino " << in->ino << " has no uncommitted writes" << dendl;
+  } else {
+    ldout(clnt->cct, 8) << "ino " << in->ino << " failed to commit to disk! "
+                        << cpp_strerror(-result) << dendl;
+  }
+
+  utime_t lat;
+
+  lat = ceph_clock_now();
+  lat -= start;
+  clnt->logger->tinc(l_c_fsync, lat);
+
+  onfinish->complete(result);
+
+  // we're done
+  delete this;
+}
+
+void Client::nonblocking_fsync(Inode *in, bool syncdataonly, Context *onfinish)
+{
+  C_nonblocking_fsync_state *state = new C_nonblocking_fsync_state(this, in, syncdataonly, onfinish);
+
+  // Kick fsync off...
+  state->advance();
+}
+
 int Client::_fsync(Inode *in, bool syncdataonly)
 {
   ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
index 8371bb5547b0e9453b869e5109601820aee58b87..ae09224d84a1300c62594e21fe14716ca1c24306 100644 (file)
@@ -757,6 +757,7 @@ public:
   void flush_snaps(Inode *in);
   void get_cap_ref(Inode *in, int cap);
   void put_cap_ref(Inode *in, int cap);
+  void submit_sync_caps(Inode *in, ceph_tid_t want, Context *onfinish);
   void wait_sync_caps(Inode *in, ceph_tid_t want);
   void wait_sync_caps(ceph_tid_t want);
   void queue_cap_snap(Inode *in, SnapContext &old_snapc);
@@ -1425,6 +1426,78 @@ private:
     }
   };
 
+  struct C_nonblocking_fsync_state {
+    Client *clnt;
+
+    // nonblocking_fsync parms
+    Inode *in;
+    bool syncdataonly;
+    Context *onfinish;
+
+    // were local variables
+    ceph_tid_t flush_tid;
+    InodeRef tmp_ref;
+    utime_t start;
+    MetaRequest *req;
+
+    // we need to keep track of where we are
+    int progress;
+    bool flush_wait;
+    bool flush_completed;
+    int result;
+    bool waitfor_safe;
+
+    C_nonblocking_fsync_state(Client *clnt, Inode *in, bool syncdataonly, Context *onfinish)
+      : clnt(clnt), in(in), syncdataonly(syncdataonly), onfinish(onfinish) {
+      flush_tid = 0;
+      start = ceph_clock_now();
+      progress = 0;
+      flush_wait = false;
+      flush_completed = false;
+      result = 0;
+      waitfor_safe = false;
+    }
+
+    void advance();
+
+    void complete_flush(int r) {
+      flush_completed = true;
+      result = r;
+      if (progress == 2)
+        advance();
+    }
+  };
+
+  struct C_nonblocking_fsync_state_advancer : Context {
+    Client *clnt;
+    Client::C_nonblocking_fsync_state *state;
+
+    C_nonblocking_fsync_state_advancer(Client *clnt, Client::C_nonblocking_fsync_state *state)
+      : clnt(clnt), state(state) {
+    }
+
+    void finish(int r) override {
+      clnt->client_lock.lock();
+      state->advance();
+      clnt->client_lock.unlock();
+    }
+  };
+
+  struct C_nonblocking_fsync_flush_finisher : Context {
+    Client *clnt;
+    Client::C_nonblocking_fsync_state *state;
+
+    C_nonblocking_fsync_flush_finisher(Client *clnt, Client::C_nonblocking_fsync_state *state)
+      : clnt(clnt), state(state) {
+    }
+
+    void finish(int r) override {
+      clnt->client_lock.lock();
+      state->complete_flush(r);
+      clnt->client_lock.unlock();
+    }
+  };
+
   struct C_Readahead : public Context {
     C_Readahead(Client *c, Fh *f);
     ~C_Readahead() override;
@@ -1596,6 +1669,7 @@ private:
                       int64_t offset, bool write, Context *onfinish = nullptr,
                       bufferlist *blp = nullptr);
   int _flush(Fh *fh);
+  void nonblocking_fsync(Inode *in, bool syncdataonly, Context *onfinish);
   int _fsync(Fh *fh, bool syncdataonly);
   int _fsync(Inode *in, bool syncdataonly);
   int _sync_fs();