]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librgw: implement framework for fs periodic work
authorMatt Benjamin <mbenjamin@redhat.com>
Mon, 21 Dec 2015 01:57:23 +0000 (20:57 -0500)
committerMatt Benjamin <mbenjamin@redhat.com>
Fri, 12 Feb 2016 17:07:11 +0000 (12:07 -0500)
This can be used for, e.g., gc of stale handles and directory
traversals.

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
src/rgw/librgw.cc
src/rgw/rgw_file.cc
src/rgw/rgw_file.h
src/rgw/rgw_lib_frontend.h

index afddea4f2ba15097f9ddc937a438f088d9afc37b..2c0b22bdcd71717645c8d6a4c6f7a5c22eb15563 100644 (file)
@@ -46,7 +46,8 @@
 #include "rgw_lib_frontend.h"
 
 #include <errno.h>
-#include <sstream>
+#include <chrono>
+#include <thread>
 #include <string>
 #include <string.h>
 #include <mutex>
@@ -79,7 +80,24 @@ namespace rgw {
 
   void RGWLibProcess::run()
   {
-    /* XXX */
+    while (! shutdown) {
+      std::cout << "RGWLibProcess GC" << std::endl;
+      unique_lock uniq(mtx);
+    restart:
+      int cur_gen = gen;
+      for (auto iter = mounted_fs.begin(); iter != mounted_fs.end();
+          ++iter) {
+       RGWLibFS* fs = iter->first->ref();
+       uniq.unlock();
+       fs->gc();
+       fs->rele();
+       uniq.lock();
+       if (cur_gen != gen)
+         goto restart; /* invalidated */
+      }
+      uniq.unlock();
+      std::this_thread::sleep_for(std::chrono::seconds(120));
+    }
   }
 
   void RGWLibProcess::handle_request(RGWRequest* r)
index a5e32fbf1d8db33bbc465e3d47c04c3cea1f6c78..d9fd84c7d5f2ac789128933313e30aa03040d9f4 100644 (file)
@@ -105,6 +105,36 @@ namespace rgw {
     return fhr;
   } /* RGWLibFS::stat_leaf */
 
+  void RGWLibFS::close()
+  {
+    flags |= FLAG_CLOSED;
+
+    class ObjUnref
+    {
+      RGWLibFS* fs;
+    public:
+      ObjUnref(RGWLibFS* fs) : fs(fs) {}
+      void operator()(RGWFileHandle* fh) const {
+       lsubdout(fs->get_context(), rgw, 5)
+         << __func__
+         << fh->name
+         << " before ObjUnref refs=" << fh->get_refcnt()
+         << dendl;
+       fs->fh_lru.unref(fh, cohort::lru::FLAG_NONE);
+      }
+    };
+
+    /* force cache drain, forces objects to evict */
+    fh_cache.drain(ObjUnref(this),
+                 RGWFileHandle::FHCache::FLAG_LOCK);
+    librgw.get_fe()->get_process()->unregister_fs(this);
+    rele();
+  } /* RGWLibFS::close */
+
+  void RGWLibFS::gc()
+  {
+  } /* RGWLibFS::gc */
+
   bool RGWFileHandle::reclaim() {
     fs->fh_cache.remove(fh.fh_hk.object, this, cohort::lru::FLAG_NONE);
     return true;
@@ -392,6 +422,9 @@ extern "C" {
     return -EINVAL;
   }
 
+  /* register fs for shared gc */
+  librgw.get_fe()->get_process()->register_fs(new_fs);
+
   struct rgw_fs *fs = new_fs->get_fs();
   fs->rgw = rgw;
 
index c33b14436186547e600a5b49c583c241230c0773..0ab6bfa444dcade05daf3ff85938d51487f0f73f 100644 (file)
@@ -691,31 +691,8 @@ namespace rgw {
 
     RGWUserInfo* get_user() { return &user; }
 
-    void close() {
-
-      flags |= FLAG_CLOSED;
-
-      class ObjUnref
-      {
-       RGWLibFS* fs;
-      public:
-       ObjUnref(RGWLibFS* fs) : fs(fs) {}
-       void operator()(RGWFileHandle* fh) const {
-         lsubdout(fs->get_context(), rgw, 5)
-           << __func__
-           << fh->name
-           << " before ObjUnref refs=" << fh->get_refcnt()
-           << dendl;
-         fs->fh_lru.unref(fh, cohort::lru::FLAG_NONE);
-       }
-      };
-
-      /* force cache drain, forces objects to evict */
-      fh_cache.drain(ObjUnref(this),
-                    RGWFileHandle::FHCache::FLAG_LOCK);
-
-      /* XXX unref this */
-    }
+    void close();
+    void gc();
   }; /* RGWLibFS */
 
 static inline std::string make_uri(const std::string& bucket_name,
index 51d4f6221183e9f9a8549eb1640fa06e8bbdb84d..9f407f1c4d04eab9a7c70c3b34b68bcda3a4ca1f 100644 (file)
@@ -6,6 +6,8 @@
 
 #include <boost/container/flat_map.hpp>
 
+#include <boost/container/flat_map.hpp>
+
 #include "rgw_lib.h"
 #include "rgw_file.h"
 
@@ -13,14 +15,39 @@ namespace rgw {
 
   class RGWLibProcess : public RGWProcess {
     RGWAccessKey access_key;
+    std::mutex mtx;
+    int gen;
+    bool shutdown;
+
+    typedef flat_map<RGWLibFS*, RGWLibFS*> FSMAP;
+    FSMAP mounted_fs;
+
+    using lock_guard = std::lock_guard<std::mutex>;
+    using unique_lock = std::unique_lock<std::mutex>;
+
   public:
     RGWLibProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads,
                  RGWFrontendConfig* _conf) :
-      RGWProcess(cct, pe, num_threads, _conf) {}
+      RGWProcess(cct, pe, num_threads, _conf), gen(0), shutdown(false) {}
 
     void run();
     void checkpoint();
 
+    void register_fs(RGWLibFS* fs) {
+      lock_guard guard(mtx);
+      mounted_fs.insert(FSMAP::value_type(fs, fs));
+      ++gen;
+    }
+
+    void unregister_fs(RGWLibFS* fs) {
+      lock_guard guard(mtx);
+      FSMAP::iterator it = mounted_fs.find(fs);
+      if (it != mounted_fs.end()) {
+       mounted_fs.erase(it);
+       ++gen;
+      }
+    }
+
     void enqueue_req(RGWLibRequest* req) {
 
       lsubdout(g_ceph_context, rgw, 10)
@@ -49,6 +76,10 @@ namespace rgw {
                
     int init();
 
+    RGWLibProcess* get_process() {
+      return static_cast<RGWLibProcess*>(pprocess);
+    }
+
     inline void enqueue_req(RGWLibRequest* req) {
       static_cast<RGWLibProcess*>(pprocess)->enqueue_req(req); // async
     }