]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Client: Hook nonblocking fsync into the write path of ll_preadv_pwritev
authorFrank S. Filz <ffilzlnx@mindspring.com>
Thu, 14 Jul 2022 22:02:06 +0000 (15:02 -0700)
committerFrank S. Filz <ffilzlnx@mindspring.com>
Mon, 24 Jul 2023 18:49:04 +0000 (11:49 -0700)
Signed-off-by: Frank S. Filz <ffilzlnx@mindspring.com>
src/client/Client.cc
src/client/Client.h

index 97cad15577ee52ed3c375cfd68c3ec5f2055f34f..66e964658ef01eea1741aa06e4e87d419ac65e73 100644 (file)
@@ -11086,7 +11086,8 @@ int Client::pwritev(int fd, const struct iovec *iov, int iovcnt, int64_t offset)
 int64_t Client::_preadv_pwritev_locked(Fh *fh, const struct iovec *iov,
                                        unsigned iovcnt, int64_t offset,
                                        bool write, bool clamp_to_int,
-                                       Context *onfinish, bufferlist *blp)
+                                       Context *onfinish, bufferlist *blp,
+                                       bool do_fsync, bool syncdataonly)
 {
     ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
 
@@ -11109,7 +11110,7 @@ int64_t Client::_preadv_pwritev_locked(Fh *fh, const struct iovec *iov,
     }
 
     if (write) {
-        int64_t w = _write(fh, offset, totallen, NULL, iov, iovcnt, onfinish);
+        int64_t w = _write(fh, offset, totallen, NULL, iov, iovcnt, onfinish, do_fsync, syncdataonly);
         ldout(cct, 3) << "pwritev(" << fh << ", \"...\", " << totallen << ", " << offset << ") = " << w << dendl;
         return w;
     } else {
@@ -11233,22 +11234,64 @@ void Client::C_Write_Finisher::finish_onuninline(int r)
     delete this;
 }
 
+void Client::C_Write_Finisher::finish_fsync(int r)
+{
+  bool fini;
+  client_t const whoami = clnt->whoami;  // For the benefit of ldout prefix
+
+  ldout(clnt->cct, 3) << "finish_fsync r = " << r << dendl;
+
+  fsync_finished = true;
+  fsync_r = r;
+  fini = try_complete();
+
+  if (fini)
+    delete this;
+}
+
 bool Client::C_Write_Finisher::try_complete()
 {
-  if (onuninlinefinished && iofinished) {
+  client_t const whoami = clnt->whoami;  // For the benefit of ldout prefix
+
+  ldout(clnt->cct, 19) << "C_Write_Finisher::try_complete this " << this 
+                       << " onuninlinefinished " << onuninlinefinished
+                       << " iofinished " << iofinished
+                       << " iofinished_r " << iofinished_r
+                       << " fsync_finished " << fsync_finished
+                       << dendl;
+
+  if (onuninlinefinished && iofinished && !fsync_finished && iofinished_r >= 0) {
+    // Done with I/O AND uninline, but we want to do fsync
+    CWF_fsync_finish *fsync_f = new CWF_fsync_finish(this);
+    C_nonblocking_fsync_state *state = new C_nonblocking_fsync_state(clnt, in, syncdataonly, fsync_f);
+
+    // Kick fsync off... and all will magically complete eventually...
+    ldout(clnt->cct, 19) << "kickoff fsync onfinish " << onfinish << dendl;
+    state->advance();
+  } else if (onuninlinefinished && iofinished) {
+    // Now we are REALLY done...
     clnt->put_cap_ref(in, CEPH_CAP_FILE_WR);
 
-    if (onuninlinefinished_r >= 0 || onuninlinefinished_r == -CEPHFS_ECANCELED)
-      onfinish->complete(iofinished_r);
-    else
+    if (fsync_r < 0) {
+      ldout(clnt->cct, 19) << " complete with fsync_r " << fsync_r << dendl;
+      onfinish->complete(fsync_r);
+    } else if (onuninlinefinished_r < 0 && onuninlinefinished_r != -CEPHFS_ECANCELED) {
+      ldout(clnt->cct, 19) << " complete with onuninlinefinished_r " << onuninlinefinished_r << dendl;
       onfinish->complete(onuninlinefinished_r);
+    } else {
+      ldout(clnt->cct, 19) << " complete with iofinished_r " << iofinished_r << dendl;
+      onfinish->complete(iofinished_r);
+    }
+    onfinish = nullptr;
     return true;
   }
+
   return false;
 }
 
 int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
-                       const struct iovec *iov, int iovcnt, Context *onfinish)
+                       const struct iovec *iov, int iovcnt, Context *onfinish,
+                       bool do_fsync, bool syncdataonly)
 {
   ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
 
@@ -11390,7 +11433,8 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
                         cct->_conf->client_oc &&
                           (have & (CEPH_CAP_FILE_BUFFER |
                                  CEPH_CAP_FILE_LAZYIO)),
-                        start, f, in, fpos, offset, size));
+                        start, f, in, fpos, offset, size,
+                        do_fsync, syncdataonly));
 
     cwf_iofinish->CWF = cwf.get();
   }
@@ -11497,6 +11541,15 @@ success:
   // do not get here if non-blocking caller (onfinish != nullptr)
   r = _write_success(f, start, fpos, offset, size, in);
 
+  if (r >= 0 && do_fsync) {
+    int64_t r1;
+    client_lock.unlock();
+    r1 = _fsync(f, false);
+    if (r1 < 0)
+      r = r1;
+    client_lock.lock();
+  }
+
 done:
 
   // can not get here if non-blocking caller (onfinish != nullptr)
@@ -11607,10 +11660,20 @@ void Client::C_nonblocking_fsync_state::advance()
   Context *advancer;
   client_t const whoami = clnt->whoami;  // For the benefit of ldout prefix
 
+  ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance"
+                       << " progress " << progress
+                       << " flush_wait " << flush_wait
+                       << " flush_completed " << flush_completed
+                       << " result " << result
+                       << " waitfor_safe " << waitfor_safe
+                       << " onfinish " << onfinish
+                       << dendl;
+
   ceph_assert(ceph_mutex_is_locked_by_me(clnt->client_lock));
 
   switch (progress) {
   case 0:
+    ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - case 0" << dendl;
     progress = 1;
 
     if (clnt->cct->_conf->client_oc) {
@@ -11646,6 +11709,7 @@ void Client::C_nonblocking_fsync_state::advance()
     // skip and fall through
 
   case 1:
+    ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - case 1" << dendl;
     progress = 2;
 
     if (waitfor_safe) {
@@ -11674,6 +11738,7 @@ void Client::C_nonblocking_fsync_state::advance()
     // skip and fall through
 
   case 2:
+    ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - case 2" << dendl;
 
     if (flush_completed) {
       // we waited for real reply above, now we have it... retrieve result
@@ -11682,11 +11747,13 @@ void Client::C_nonblocking_fsync_state::advance()
 
     if (result != 0) {
       // ERROR!
+      ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - ERROR!" << dendl;
       break;
     }
 
     if (flush_tid <= 0) {
       // DONE!
+      ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - DONE!" << dendl;
       break;
     }
 
@@ -11694,23 +11761,41 @@ void Client::C_nonblocking_fsync_state::advance()
     progress = 3;
 
   case 3:
+  case 4:
+    ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - case " << progress << dendl;
+    ldout(clnt->cct, 15) << "in->flushing_cap_tids.empty() " << in->flushing_cap_tids.empty()
+                         << " in->flushing_caps " << in->flushing_caps
+                         << dendl; 
     // do equivalent of wait_sync_caps(in, flush_tid)
     if (in->flushing_caps) {
+      ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - flushing_caps" << dendl;
       map<ceph_tid_t, int>::iterator it = in->flushing_cap_tids.begin();
       ceph_assert(it != in->flushing_cap_tids.end());
 
+      ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance" 
+                           << " it->first " << it->first
+                           << " flush_tid " << flush_tid
+                           << dendl;
       if (it->first <= flush_tid) {
         ldout(clnt->cct, 10) << __func__ << " on " << *in << " flushing "
                              << ccap_string(it->second) << " flush_tid " << flush_tid
                              << " last " << it->first << dendl;
         advancer = new C_nonblocking_fsync_state_advancer(clnt, this);
-        clnt->add_nonblocking_onfinish_to_context_list(in->waitfor_caps, advancer);
+        ldout(clnt->cct, 10) << "Adding onfinish " << onfinish
+                             << " for C_nonblocking_fsync_state " << this
+                             << dendl;
+        if (progress == 3)
+          clnt->add_nonblocking_onfinish_to_context_list(in->waitfor_caps, advancer);
+        else
+          clnt->add_nonblocking_onfinish_to_context_list(in->waitfor_caps_pending, advancer);
         // ------------  here is a state machine break point
         //               the advancer completion will resume with case 3
+        progress = 4;
         return;
       }
 
       // DONE!
+      ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - DONE!" << dendl;
     }
   }
 
@@ -11733,6 +11818,33 @@ void Client::C_nonblocking_fsync_state::advance()
   delete this;
 }
 
+void Client::C_nonblocking_fsync_state::complete_flush(int r)
+{
+  client_t const whoami = clnt->whoami;  // For the benefit of ldout prefix
+
+  ldout(clnt->cct, 15) << "complete_flush"
+                       << " r " << r
+                       << " progress " << progress
+                       << dendl;
+
+  flush_completed = true;
+  result = r;
+  if (progress == 2)
+    advance();
+}
+
+void Client::C_nonblocking_fsync_state_advancer::finish(int r)
+{
+  client_t const whoami = clnt->whoami;  // For the benefit of ldout prefix
+
+  ldout(clnt->cct, 15) << "C_nonblocking_fsync_state_advancer::finish"
+                       << " r " << r
+                       << dendl;
+
+  ceph_assert(ceph_mutex_is_locked_by_me(clnt->client_lock));
+  state->advance();
+}
+
 void Client::nonblocking_fsync(Inode *in, bool syncdataonly, Context *onfinish)
 {
   C_nonblocking_fsync_state *state = new C_nonblocking_fsync_state(this, in, syncdataonly, onfinish);
@@ -15684,7 +15796,8 @@ int64_t Client::ll_readv(struct Fh *fh, const struct iovec *iov, int iovcnt, int
 
 int64_t Client::ll_preadv_pwritev(struct Fh *fh, const struct iovec *iov,
                                   int iovcnt, int64_t offset, bool write,
-                                  Context *onfinish, bufferlist *bl)
+                                  Context *onfinish, bufferlist *bl,
+                                  bool do_fsync, bool syncdataonly)
 {
     RWRef_t mref_reader(mount_state, CLIENT_MOUNTING);
     if (!mref_reader.is_state_satisfied())
@@ -15692,7 +15805,7 @@ int64_t Client::ll_preadv_pwritev(struct Fh *fh, const struct iovec *iov,
 
     std::scoped_lock cl(client_lock);
     return _preadv_pwritev_locked(fh, iov, iovcnt, offset, write, true,
-                                 onfinish, bl);
+                                 onfinish, bl, do_fsync, syncdataonly);
 }
 
 int Client::ll_flush(Fh *fh)
index ae09224d84a1300c62594e21fe14716ca1c24306..1e4d5890d198dbbcb3c8bbd7801ed84918271162 100644 (file)
@@ -649,7 +649,8 @@ public:
   int64_t ll_preadv_pwritev(struct Fh *fh, const struct iovec *iov, int iovcnt,
                             int64_t offset, bool write,
                             Context *onfinish = nullptr,
-                            bufferlist *blp = nullptr);
+                            bufferlist *blp = nullptr,
+                            bool do_fsync = false, bool syncdataonly = false);
   loff_t ll_lseek(Fh *fh, loff_t offset, int whence);
   int ll_flush(Fh *fh);
   int ll_fsync(Fh *fh, bool syncdataonly);
@@ -1381,17 +1382,21 @@ private:
   public:
     void finish_io(int r);
     void finish_onuninline(int r);
+    void finish_fsync(int r);
 
     C_Write_Finisher(Client *clnt, Context *onfinish, bool dont_need_uninline,
                      bool is_file_write, utime_t start, Fh *f, Inode *in,
-                     uint64_t fpos, int64_t offset, uint64_t size)
+                     uint64_t fpos, int64_t offset, uint64_t size,
+                     bool do_fsync, bool syncdataonly)
       : clnt(clnt), onfinish(onfinish),
         is_file_write(is_file_write), start(start), f(f), in(in), fpos(fpos),
-        offset(offset), size(size) {
+        offset(offset), size(size), syncdataonly(syncdataonly) {
       iofinished_r = 0;
       onuninlinefinished_r = 0;
+      fsync_r = 0;
       iofinished = false;
       onuninlinefinished = dont_need_uninline;
+      fsync_finished = !do_fsync;
     }
 
     void finish(int r) override {
@@ -1408,10 +1413,13 @@ private:
     uint64_t fpos;
     int64_t offset;
     uint64_t size;
+    bool syncdataonly;
     int64_t iofinished_r;
     int64_t onuninlinefinished_r;
+    int64_t fsync_r;
     bool iofinished;
     bool onuninlinefinished;
+    bool fsync_finished;
     bool try_complete();
   };
 
@@ -1426,6 +1434,17 @@ private:
     }
   };
 
+  struct CWF_fsync_finish : public Context {
+    C_Write_Finisher *CWF;
+
+    CWF_fsync_finish(C_Write_Finisher *CWF)
+      : CWF(CWF) {}
+
+    void finish(int r) override {
+      CWF->finish_fsync(r);
+    }
+  };
+
   struct C_nonblocking_fsync_state {
     Client *clnt;
 
@@ -1460,12 +1479,7 @@ private:
 
     void advance();
 
-    void complete_flush(int r) {
-      flush_completed = true;
-      result = r;
-      if (progress == 2)
-        advance();
-    }
+    void complete_flush(int r);
   };
 
   struct C_nonblocking_fsync_state_advancer : Context {
@@ -1476,11 +1490,7 @@ private:
       : clnt(clnt), state(state) {
     }
 
-    void finish(int r) override {
-      clnt->client_lock.lock();
-      state->advance();
-      clnt->client_lock.unlock();
-    }
+    void finish(int r) override;
   };
 
   struct C_nonblocking_fsync_flush_finisher : Context {
@@ -1492,9 +1502,8 @@ private:
     }
 
     void finish(int r) override {
-      clnt->client_lock.lock();
+      ceph_assert(ceph_mutex_is_locked_by_me(clnt->client_lock));
       state->complete_flush(r);
-      clnt->client_lock.unlock();
     }
   };
 
@@ -1659,12 +1668,14 @@ private:
   int64_t _write_success(Fh *fh, utime_t start, uint64_t fpos,
           int64_t offset, uint64_t size, Inode *in);
   int64_t _write(Fh *fh, int64_t offset, uint64_t size, const char *buf,
-          const struct iovec *iov, int iovcnt, Context *onfinish = nullptr);
+          const struct iovec *iov, int iovcnt, Context *onfinish = nullptr,
+          bool do_fsync = false, bool syncdataonly = false);
   int64_t _preadv_pwritev_locked(Fh *fh, const struct iovec *iov,
                                  unsigned iovcnt, int64_t offset,
                                  bool write, bool clamp_to_int,
                                  Context *onfinish = nullptr,
-                                 bufferlist *blp = nullptr);
+                                 bufferlist *blp = nullptr,
+                                 bool do_fsync = false, bool syncdataonly = false);
   int _preadv_pwritev(int fd, const struct iovec *iov, unsigned iovcnt,
                       int64_t offset, bool write, Context *onfinish = nullptr,
                       bufferlist *blp = nullptr);