class RGWFileHandle;
class RGWWriteRequest;
- typedef boost::intrusive_ptr<RGWFileHandle> RGWFHRef;
-
/*
* XXX
* The current 64-bit, non-cryptographic hash used here is intended
using boost::variant;
using boost::container::flat_map;
- class RGWFileHandle
+ class RGWFileHandle : public cohort::lru::Object
{
struct rgw_file_handle fh;
- mutable std::atomic<uint64_t> refcnt;
std::mutex mtx;
RGWLibFS* fs;
- RGWFHRef bucket;
- RGWFHRef parent;
+ RGWFileHandle* bucket;
+ RGWFileHandle* parent;
/* const */ std::string name; /* XXX file or bucket name */
/* const */ fh_key fhk;
private:
RGWFileHandle(RGWLibFS* _fs, uint32_t fs_inst)
- : refcnt(1), fs(_fs), bucket(nullptr), parent(nullptr), depth(0),
+ : fs(_fs), bucket(nullptr), parent(nullptr), depth(0),
flags(FLAG_ROOT)
{
/* root */
public:
RGWFileHandle(RGWLibFS* fs, uint32_t fs_inst, RGWFileHandle* _parent,
const fh_key& _fhk, std::string& _name, uint32_t _flags)
- : refcnt(0), fs(fs), bucket(nullptr), parent(_parent),
- name(std::move(_name)), fhk(_fhk), flags(_flags) {
+ : fs(fs), bucket(nullptr), parent(_parent), name(std::move(_name)),
+ fhk(_fhk), flags(_flags) {
if (parent->is_root()) {
fh.fh_type = RGW_FS_TYPE_DIRECTORY;
RGWLibFS* get_fs() { return fs; }
- RGWFileHandle* get_parent() { return parent.get(); }
+ RGWFileHandle* get_parent() { return parent; }
int stat(struct stat *st) {
/* partial Unix attrs */
while (tfh && !tfh->is_bucket()) {
segments.push_back(&tfh->object_name());
reserve += (1 + tfh->object_name().length());
- tfh = tfh->parent.get();
+ tfh = tfh->parent;
}
bool first = true;
path.reserve(reserve);
state.atime = ts;
}
- friend void intrusive_ptr_add_ref(const RGWFileHandle* fh) {
- fh->refcnt.fetch_add(1, std::memory_order_relaxed);
- }
-
- friend void intrusive_ptr_release(const RGWFileHandle* fh) {
- if (fh->refcnt.fetch_sub(1, std::memory_order_release) == 1) {
- std::atomic_thread_fence(std::memory_order_acquire);
- /* root handles are expanded in RGWLibFS */
- if (! const_cast<RGWFileHandle*>(fh)->is_root())
- delete fh;
- }
- }
-
- RGWFileHandle* ref() {
- intrusive_ptr_add_ref(this);
- return this;
- }
+ virtual bool reclaim();
- inline void rele() {
- intrusive_ptr_release(this);
- }
+ typedef cohort::lru::LRU<std::mutex> FhLRU;
struct FhLT
{
#endif
typedef cohort::lru::TreeX<RGWFileHandle, FhTree, FhLT, FhEQ, fh_key,
std::mutex> FHCache;
+
+ virtual ~RGWFileHandle() {}
+
+ class Factory : public cohort::lru::ObjectFactory
+ {
+ public:
+ RGWLibFS* fs;
+ uint32_t fs_inst;
+ RGWFileHandle* parent;
+ const fh_key& fhk;
+ std::string& name;
+ uint32_t flags;
+
+ Factory() = delete;
+
+ Factory(RGWLibFS* fs, uint32_t fs_inst, RGWFileHandle* parent,
+ const fh_key& fhk, std::string& name, uint32_t flags)
+ : fs(fs), fs_inst(fs_inst), parent(parent), fhk(fhk), name(name),
+ flags(flags) {}
+
+ void recycle (cohort::lru::Object* o) {
+ /* re-use an existing object */
+ o->~Object(); // call lru::Object virtual dtor
+ // placement new!
+ new (o) RGWFileHandle(fs, fs_inst, parent, fhk, name, flags);
+ }
+
+ cohort::lru::Object* alloc() {
+ return new RGWFileHandle(fs, fs_inst, parent, fhk, name, flags);
+ }
+ }; /* Factory */
+
}; /* RGWFileHandle */
static inline RGWFileHandle* get_rgwfh(struct rgw_file_handle* fh) {
RGWFileHandle root_fh;
RGWFileHandle::FHCache fh_cache;
+ RGWFileHandle::FhLRU fh_lru;
std::string uid; // should match user.user_id, iiuc
static atomic<uint32_t> fs_inst;
std::string fsid;
-
+ uint32_t flags;
+
+ friend class RGWFileHandle;
+
public:
+
+ static constexpr uint32_t FLAG_NONE = 0x0000;
+ static constexpr uint32_t FLAG_CLOSED = 0x0001;
+
RGWLibFS(CephContext* _cct, const char *_uid, const char *_user_id,
const char* _key)
- : cct(_cct), root_fh(this, get_inst()), uid(_uid), key(_user_id, _key) {
+ : cct(_cct), root_fh(this, get_inst()),
+ fh_cache(cct->_conf->rgw_nfs_fhcache_partitions,
+ cct->_conf->rgw_nfs_fhcache_size),
+ fh_lru(cct->_conf->rgw_nfs_lru_lanes,
+ cct->_conf->rgw_nfs_lru_lane_hiwat),
+ uid(_uid), key(_user_id, _key) {
/* no bucket may be named rgw_fs_inst-(.*) */
fsid = RGWFileHandle::root_name + "rgw_fs_inst-" +
const uint32_t cflags = RGWFileHandle::FLAG_NONE) {
using std::get;
+ LookupFHResult fhr { nullptr, RGWFileHandle::FLAG_NONE };
+
+ if (flags & FLAG_CLOSED)
+ return fhr;
+
RGWFileHandle::FHCache::Latch lat;
std::string obj_name{name};
std::string key_name{parent->make_key_name(name)};
fh_key fhk = parent->make_fhk(key_name);
- LookupFHResult fhr { nullptr, RGWFileHandle::FLAG_NONE };
+ retry:
RGWFileHandle* fh =
fh_cache.find_latch(fhk.fh_hk.object /* partition selector*/,
fhk /* key */, lat /* serializer */,
RGWFileHandle::FHCache::FLAG_LOCK);
/* LATCHED */
- if (! fh) {
- if (cflags & RGWFileHandle::FLAG_CREATE) {
- fh = new RGWFileHandle(this, get_inst(), parent, fhk, obj_name,
- cflags);
- intrusive_ptr_add_ref(fh); /* sentinel ref */
- fh_cache.insert_latched(fh, lat,
- RGWFileHandle::FHCache::FLAG_NONE);
- if (cflags & RGWFileHandle::FLAG_PSEUDO)
- fh->set_pseudo();
- get<1>(fhr) = RGWFileHandle::FLAG_CREATE;
+ if (fh) {
+ /* need initial ref from LRU (fast path) */
+ if (! fh_lru.ref(fh, cohort::lru::FLAG_INITIAL)) {
+ lat.lock->unlock();
+ goto retry; /* !LATCHED */
+ }
+ /* LATCHED */
+ } else {
+ /* make or re-use handle */
+ RGWFileHandle::Factory prototype(this, get_inst(), parent, fhk,
+ obj_name, cflags);
+ fh = static_cast<RGWFileHandle*>(
+ fh_lru.insert(&prototype,
+ cohort::lru::Edge::MRU,
+ cohort::lru::FLAG_INITIAL));
+ if (fh) {
+ fh_cache.insert_latched(fh, lat, RGWFileHandle::FHCache::FLAG_UNLOCK);
+ goto out; /* !LATCHED */
} else {
- lat.lock->unlock(); /* !LATCHED */
- return fhr;
+ lat.lock->unlock();
+ goto retry; /* !LATCHED */
}
}
-
- intrusive_ptr_add_ref(fh); /* call path/handle ref */
lat.lock->unlock(); /* !LATCHED */
+ out:
get<0>(fhr) = fh;
-
return fhr;
}
+ inline void unref(RGWFileHandle* fh) {
+ (void) fh_lru.unref(fh, cohort::lru::FLAG_NONE);
+ }
+
+ inline RGWFileHandle* ref(RGWFileHandle* fh) {
+ fh_lru.ref(fh, cohort::lru::FLAG_NONE);
+ return fh;
+ }
+
LookupFHResult stat_bucket(RGWFileHandle* parent,
const char *path, uint32_t flags);
LookupFHResult stat_leaf(RGWFileHandle* parent, const char *path,
uint32_t flags);
- /* find or create an RGWFileHandle */
+ /* find existing RGWFileHandle */
RGWFileHandle* lookup_handle(struct rgw_fh_hk fh_hk) {
+ if (flags & FLAG_CLOSED)
+ return nullptr;
+
RGWFileHandle::FHCache::Latch lat;
fh_key fhk(fh_hk);
+ retry:
RGWFileHandle* fh =
fh_cache.find_latch(fhk.fh_hk.object /* partition selector*/,
fhk /* key */, lat /* serializer */,
<< dendl;
goto out;
}
- intrusive_ptr_add_ref(fh); /* call path/handle ref */
+ if (! fh_lru.ref(fh, cohort::lru::FLAG_INITIAL)) {
+ lat.lock->unlock();
+ goto retry; /* !LATCHED */
+ }
+ /* LATCHED */
out:
lat.lock->unlock(); /* !LATCHED */
return fh;
uint32_t get_inst() { return fs_inst; }
RGWUserInfo* get_user() { return &user; }
+
+ void close() {
+
+ flags |= FLAG_CLOSED;
+
+ class ObjUnref
+ {
+ RGWLibFS* fs;
+ public:
+ ObjUnref(RGWLibFS* fs) : fs(fs) {}
+ void operator()(RGWFileHandle* fh) const {
+ fs->fh_lru.unref(fh, cohort::lru::FLAG_NONE);
+ }
+ };
+
+ /* force cache drain, forces objects to evict */
+ fh_cache.drain(ObjUnref(this),
+ RGWFileHandle::FHCache::FLAG_LOCK);
+
+ /* XXX unref this */
+ }
}; /* RGWLibFS */
static inline std::string make_uri(const std::string& bucket_name,