]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw_file: implement stateless write completion timer
authorMatt Benjamin <mbenjamin@redhat.com>
Tue, 16 Aug 2016 22:17:53 +0000 (18:17 -0400)
committerMatt Benjamin <mbenjamin@redhat.com>
Wed, 5 Oct 2016 18:24:36 +0000 (14:24 -0400)
Implements a temporal mechanism to enforce write completion for setups
which lack open state tracking (e.g., NFS3).

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
(cherry picked from commit 2c83ed4bd43fcb5d5497151a157b1dc08022fed9)

src/common/ceph_timer.h
src/include/rados/rgw_file.h
src/rgw/librgw.cc
src/rgw/rgw_file.cc
src/rgw/rgw_file.h

index ae95eb250ddb356f54c9b87f2e31d30f1c20bfdb..97b0dc96865b2fe6d02c70503c97ec129108c2bd 100644 (file)
@@ -89,15 +89,19 @@ namespace ceph {
        }
       };
 
-      set<event,
-         member_hook<event, sh, &event::schedule_link>,
-         constant_time_size<false>,
-         compare<SchedCompare> > schedule;
+      typedef set<event,
+                 member_hook<event, sh, &event::schedule_link>,
+                 constant_time_size<false>,
+                 compare<SchedCompare> > schedule_type;
 
-      set<event,
-         member_hook<event, sh, &event::event_link>,
-         constant_time_size<false>,
-         compare<EventCompare> > events;
+      schedule_type schedule;
+
+      typedef set<event,
+                 member_hook<event, sh, &event::event_link>,
+                 constant_time_size<false>,
+                 compare<EventCompare> > event_set_type;
+
+      event_set_type events;
 
       std::mutex lock;
       using lock_guard = std::lock_guard<std::mutex>;
@@ -231,6 +235,30 @@ namespace ceph {
        return e.id;
       }
 
+      // Adjust the timeout of a currently-scheduled event (relative)
+      bool adjust_event(uint64_t id, typename TC::duration duration) {
+       return adjust_event(id, TC::now() + duration);
+      }
+
+      // Adjust the timeout of a currently-scheduled event (absolute)
+      bool adjust_event(uint64_t id, typename TC::time_point when) {
+       std::lock_guard<std::mutex> l(lock);
+
+       event key(id);
+       typename event_set_type::iterator it = events.find(key);
+
+       if (it == events.end())
+         return false;
+
+       event& e = *it;
+
+       schedule.erase(e);
+       e.t = when;
+       schedule.insert(e);
+
+       return true;
+      }
+
       // Cancel an event. If the event has already come and gone (or you
       // never submitted it) you will receive false. Otherwise you will
       // receive true and it is guaranteed the event will not execute.
index 5fac51acba16b5c99ff920deca0bc67230401d82..b8545be23e9ccc720a9d23f9fef3db4c7d2073da 100644 (file)
@@ -299,6 +299,15 @@ int rgw_writev(struct rgw_fs *rgw_fs,
 int rgw_fsync(struct rgw_fs *rgw_fs, struct rgw_file_handle *fh,
              uint32_t flags);
 
+/*
+   NFS commit operation
+*/
+
+#define RGW_COMMIT_FLAG_NONE        0x0000
+
+int rgw_commit(struct rgw_fs *rgw_fs, struct rgw_file_handle *fh,
+              uint64_t offset, uint64_t length, uint32_t flags);
+
 #ifdef __cplusplus
 }
 #endif
index c47612907542eff0e84e83d2036c6763c02fdc11..c129a21430b7e84263557881cb44d05077ae9de4 100644 (file)
@@ -81,6 +81,10 @@ namespace rgw {
 
   void RGWLibProcess::run()
   {
+    /* start write timer */
+    RGWLibFS::write_timer.resume();
+
+    /* gc loop */
     while (! shutdown) {
       lsubdout(cct, rgw, 5) << "RGWLibProcess GC" << dendl;
       unique_lock uniq(mtx);
index 8491b7701323595e8fa619c7dc520d8326d1efa7..a8a2b3d0156f23cc1364765429f2dc401880acca 100644 (file)
@@ -38,6 +38,9 @@ namespace rgw {
 
   atomic<uint32_t> RGWLibFS::fs_inst;
 
+  ceph::timer<ceph::mono_clock> RGWLibFS::write_timer{
+    ceph::construct_suspended};
+
   LookupFHResult RGWLibFS::stat_bucket(RGWFileHandle* parent,
                                       const char *path, uint32_t flags)
   {
@@ -755,6 +758,7 @@ namespace rgw {
                           void *buffer)
   {
     using std::get;
+    using WriteCompletion = RGWLibFS::WriteCompletion;
 
     lock_guard guard(mtx);
 
@@ -783,7 +787,7 @@ namespace rgw {
       if (off != 0) {
        lsubdout(fs->get_context(), rgw, 5)
          << __func__
-         << object_name()
+         << " " << object_name()
          << " non-0 initial write position " << off
          << dendl;
        return -EIO;
@@ -806,6 +810,13 @@ namespace rgw {
        delete f->write_req;
        f->write_req = nullptr;
         return -EIO;
+      } else {
+       if (stateless_open())  {
+         /* start write timer */
+         f->write_req->timer_id =
+           RGWLibFS::write_timer.add_event(
+             std::chrono::seconds(10), WriteCompletion(*this));
+       }
       }
     }
 
@@ -826,6 +837,11 @@ namespace rgw {
       size_t min_size = off + len;
       if (min_size > get_size())
        set_size(min_size);
+      if (stateless_open()) {
+       /* bump write timer */
+       RGWLibFS::write_timer.adjust_event(
+         f->write_req->timer_id, std::chrono::seconds(10));
+      }
     } else {
       /* continuation failed (e.g., non-contiguous write position) */
       lsubdout(fs->get_context(), rgw, 5)
@@ -844,11 +860,15 @@ namespace rgw {
     return rc;
   } /* RGWFileHandle::write */
 
-  int RGWFileHandle::close()
+  int RGWFileHandle::write_finish(uint32_t flags)
   {
-    lock_guard guard(mtx);
-
+    unique_lock guard{mtx, std::defer_lock};
     int rc = 0;
+
+    if (! (flags & FLAG_LOCKED)) {
+      guard.lock();
+    }
+
     file* f = get<file>(&variant_type);
     if (f && (f->write_req)) {
       rc = rgwlib.get_fe()->finish_req(f->write_req);
@@ -859,6 +879,15 @@ namespace rgw {
       f->write_req = nullptr;
     }
 
+    return rc;
+  } /* RGWFileHandle::write_finish */
+
+  int RGWFileHandle::close()
+  {
+    lock_guard guard(mtx);
+
+    int rc = write_finish(FLAG_LOCKED);
+
     flags &= ~FLAG_OPEN;
     return rc;
   } /* RGWFileHandle::close */
@@ -1530,4 +1559,12 @@ int rgw_fsync(struct rgw_fs *rgw_fs, struct rgw_file_handle *handle,
   return 0;
 }
 
+int rgw_commit(struct rgw_fs *rgw_fs, struct rgw_file_handle *fh,
+              uint64_t offset, uint64_t length, uint32_t flags)
+{
+  RGWFileHandle* rgw_fh = get_rgwfh(fh);
+
+  return rgw_fh->commit(offset, length, RGWFileHandle::FLAG_NONE);
+}
+
 } /* extern "C" */
index 2d94952e877fd171154b1bb4cb14ab405653f62f..9b0fe98a46eafc9e7efddfd36e68b4d934e060bf 100644 (file)
@@ -28,6 +28,7 @@
 #include "include/buffer.h"
 #include "common/sstring.hh"
 #include "common/cohort_lru.h"
+#include "common/ceph_timer.h"
 #include "rgw_common.h"
 #include "rgw_user.h"
 #include "rgw_lib.h"
@@ -492,6 +493,7 @@ namespace rgw {
     bool is_dir() const { return (fh.fh_type == RGW_FS_TYPE_DIRECTORY); }
     bool creating() const { return flags & FLAG_CREATING; }
     bool deleted() const { return flags & FLAG_DELETED; }
+    bool stateless_open() const { return flags & FLAG_STATELESS_OPEN; }
 
     uint32_t open(uint32_t gsh_flags) {
       lock_guard guard(mtx);
@@ -508,7 +510,12 @@ namespace rgw {
     int readdir(rgw_readdir_cb rcb, void *cb_arg, uint64_t *offset, bool *eof,
                uint32_t flags);
     int write(uint64_t off, size_t len, size_t *nbytes, void *buffer);
-    int write_finish();
+
+    int commit(uint64_t offset, uint64_t length, uint32_t flags) {
+      return 0;
+    }
+
+    int write_finish(uint32_t flags = FLAG_NONE);
     int close();
 
     void open_for_create() {
@@ -717,11 +724,29 @@ namespace rgw {
     using event_vector = /* boost::small_vector<event, 16> */
       std::vector<event>;
 
-    struct state {
+    struct WriteCompletion
+    {
+      RGWFileHandle& rgw_fh;
+
+      WriteCompletion(RGWFileHandle& _fh) : rgw_fh(_fh) {
+       rgw_fh.get_fs()->ref(&rgw_fh);
+      }
+
+      void operator()() {
+       rgw_fh.write_finish();
+       rgw_fh.get_fs()->unref(&rgw_fh);
+      }
+    };
+
+    static ceph::timer<ceph::mono_clock> write_timer;
+
+    struct State {
       std::mutex mtx;
       std::atomic<uint32_t> flags;
       std::deque<event> events;
-      state() : flags(0) {}
+
+      State() : flags(0) {}
+
       void push_event(const event& ev) {
        lock_guard guard(mtx);
        events.push_back(ev);
@@ -729,6 +754,7 @@ namespace rgw {
     } state;
 
     friend class RGWFileHandle;
+    friend class RGWLibProcess;
 
   public:
 
@@ -1932,6 +1958,7 @@ public:
   RGWFileHandle* rgw_fh;
   RGWPutObjProcessor *processor;
   buffer::list data;
+  uint64_t timer_id;
   MD5 hash;
   off_t real_ofs;
   size_t bytes_written;