#include "rgw_lib_frontend.h"
#include <errno.h>
-#include <sstream>
+#include <chrono>
+#include <thread>
#include <string>
#include <string.h>
#include <mutex>
void RGWLibProcess::run()
{
- /* XXX */
+ while (! shutdown) {
+ std::cout << "RGWLibProcess GC" << std::endl;
+ unique_lock uniq(mtx);
+ restart:
+ int cur_gen = gen;
+ for (auto iter = mounted_fs.begin(); iter != mounted_fs.end();
+ ++iter) {
+ RGWLibFS* fs = iter->first->ref();
+ uniq.unlock();
+ fs->gc();
+ fs->rele();
+ uniq.lock();
+ if (cur_gen != gen)
+ goto restart; /* invalidated */
+ }
+ uniq.unlock();
+ std::this_thread::sleep_for(std::chrono::seconds(120));
+ }
}
void RGWLibProcess::handle_request(RGWRequest* r)
return fhr;
} /* RGWLibFS::stat_leaf */
+ void RGWLibFS::close()
+ {
+ flags |= FLAG_CLOSED;
+
+ class ObjUnref
+ {
+ RGWLibFS* fs;
+ public:
+ ObjUnref(RGWLibFS* fs) : fs(fs) {}
+ void operator()(RGWFileHandle* fh) const {
+ lsubdout(fs->get_context(), rgw, 5)
+ << __func__
+ << fh->name
+ << " before ObjUnref refs=" << fh->get_refcnt()
+ << dendl;
+ fs->fh_lru.unref(fh, cohort::lru::FLAG_NONE);
+ }
+ };
+
+ /* force cache drain, forces objects to evict */
+ fh_cache.drain(ObjUnref(this),
+ RGWFileHandle::FHCache::FLAG_LOCK);
+ librgw.get_fe()->get_process()->unregister_fs(this);
+ rele();
+ } /* RGWLibFS::close */
+
+ void RGWLibFS::gc()
+ {
+ } /* RGWLibFS::gc */
+
bool RGWFileHandle::reclaim() {
fs->fh_cache.remove(fh.fh_hk.object, this, cohort::lru::FLAG_NONE);
return true;
return -EINVAL;
}
+ /* register fs for shared gc */
+ librgw.get_fe()->get_process()->register_fs(new_fs);
+
struct rgw_fs *fs = new_fs->get_fs();
fs->rgw = rgw;
RGWUserInfo* get_user() { return &user; }
- void close() {
-
- flags |= FLAG_CLOSED;
-
- class ObjUnref
- {
- RGWLibFS* fs;
- public:
- ObjUnref(RGWLibFS* fs) : fs(fs) {}
- void operator()(RGWFileHandle* fh) const {
- lsubdout(fs->get_context(), rgw, 5)
- << __func__
- << fh->name
- << " before ObjUnref refs=" << fh->get_refcnt()
- << dendl;
- fs->fh_lru.unref(fh, cohort::lru::FLAG_NONE);
- }
- };
-
- /* force cache drain, forces objects to evict */
- fh_cache.drain(ObjUnref(this),
- RGWFileHandle::FHCache::FLAG_LOCK);
-
- /* XXX unref this */
- }
+ void close();
+ void gc();
}; /* RGWLibFS */
static inline std::string make_uri(const std::string& bucket_name,
#include <boost/container/flat_map.hpp>
+#include <boost/container/flat_map.hpp>
+
#include "rgw_lib.h"
#include "rgw_file.h"
class RGWLibProcess : public RGWProcess {
RGWAccessKey access_key;
+ std::mutex mtx;
+ int gen;
+ bool shutdown;
+
+ typedef flat_map<RGWLibFS*, RGWLibFS*> FSMAP;
+ FSMAP mounted_fs;
+
+ using lock_guard = std::lock_guard<std::mutex>;
+ using unique_lock = std::unique_lock<std::mutex>;
+
public:
RGWLibProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads,
RGWFrontendConfig* _conf) :
- RGWProcess(cct, pe, num_threads, _conf) {}
+ RGWProcess(cct, pe, num_threads, _conf), gen(0), shutdown(false) {}
void run();
void checkpoint();
+ void register_fs(RGWLibFS* fs) {
+ lock_guard guard(mtx);
+ mounted_fs.insert(FSMAP::value_type(fs, fs));
+ ++gen;
+ }
+
+ void unregister_fs(RGWLibFS* fs) {
+ lock_guard guard(mtx);
+ FSMAP::iterator it = mounted_fs.find(fs);
+ if (it != mounted_fs.end()) {
+ mounted_fs.erase(it);
+ ++gen;
+ }
+ }
+
void enqueue_req(RGWLibRequest* req) {
lsubdout(g_ceph_context, rgw, 10)
int init();
+ RGWLibProcess* get_process() {
+ return static_cast<RGWLibProcess*>(pprocess);
+ }
+
inline void enqueue_req(RGWLibRequest* req) {
static_cast<RGWLibProcess*>(pprocess)->enqueue_req(req); // async
}