OPTION(rgw_nfs_lru_lane_hiwat, OPT_INT, 911)
OPTION(rgw_nfs_fhcache_partitions, OPT_INT, 3)
OPTION(rgw_nfs_fhcache_size, OPT_INT, 2017) /* 3*2017=6051 */
+OPTION(rgw_nfs_namespace_expire_secs, OPT_INT, 300) /* namespace invalidate
+ * timer */
+OPTION(rgw_nfs_max_gc, OPT_INT, 300) /* max gc events per cycle */
OPTION(rgw_nfs_write_completion_interval_s, OPT_INT, 10) /* stateless (V3)
* commit
* delay */
#define LIBRGW_FILE_VER_MAJOR 1
#define LIBRGW_FILE_VER_MINOR 1
-#define LIBRGW_FILE_VER_EXTRA 0
+#define LIBRGW_FILE_VER_EXTRA 1
#define LIBRGW_FILE_VERSION(maj, min, extra) ((maj << 16) + (min << 8) + extra)
#define LIBRGW_FILE_VERSION_CODE LIBRGW_FILE_VERSION(LIBRGW_FILE_VER_MAJOR, LIBRGW_FILE_VER_MINOR, LIBRGW_FILE_VER_EXTRA)
const char *secret, struct rgw_fs **rgw_fs,
uint32_t flags);
+/*
+ register invalidate callbacks
+*/
+#define RGW_REG_INVALIDATE_FLAG_NONE 0x0000
+
+typedef void (*rgw_fh_callback_t)(void *handle, struct rgw_fh_hk fh_hk);
+
+int rgw_register_invalidate(struct rgw_fs *rgw_fs, rgw_fh_callback_t cb,
+ void *arg, uint32_t flags);
+
/*
detach rgw namespace
*/
*/
#include <sys/types.h>
#include <string.h>
+#include <chrono>
#include "include/types.h"
#include "include/rados/librgw.h"
m_tp.drain(&req_wq);
}
+#define MIN_EXPIRE_S 120
+
void RGWLibProcess::run()
{
/* write completion interval */
/* gc loop */
while (! shutdown) {
lsubdout(cct, rgw, 5) << "RGWLibProcess GC" << dendl;
+
+ /* dirent invalidate timeout--basically, the upper-bound on
+ * inconsistency with the S3 namespace */
+ auto expire_s = cct->_conf->rgw_nfs_namespace_expire_secs;
+
+ /* delay between gc cycles */
+ auto delay_s = std::max(1, std::min(MIN_EXPIRE_S, expire_s/2));
+
unique_lock uniq(mtx);
restart:
int cur_gen = gen;
goto restart; /* invalidated */
}
uniq.unlock();
- std::this_thread::sleep_for(std::chrono::seconds(120));
+ std::this_thread::sleep_for(std::chrono::seconds(delay_s));
}
}
using std::get;
LookupFHResult fhr{nullptr, 0};
-#if 0
- RGWFileHandle::directory* d = parent->get_directory();
- if (! d->name_cache.empty()) {
- RGWFileHandle::dirent_string name{path};
- const auto& diter = d->name_cache.find(name);
- if (diter != d->name_cache.end()) {
- fhr = lookup_fh(parent, path,
- RGWFileHandle::FLAG_CREATE|
- ((diter->second == RGW_FS_TYPE_DIRECTORY) ?
- RGWFileHandle::FLAG_DIRECTORY :
- RGWFileHandle::FLAG_NONE));
- if (get<0>(fhr))
- return fhr;
- }
- }
-#endif
/* XXX the need for two round-trip operations to identify file or
* directory leaf objects is unecessary--the current proposed
rele();
} /* RGWLibFS::close */
+ std::ostream& operator<<(std::ostream &os, RGWLibFS::event const &ev) {
+ os << "<event:";
+ switch (ev.t) {
+ case RGWLibFS::event::type::READDIR:
+ os << "type=READDIR;";
+ break;
+ default:
+ os << "type=UNKNOWN;";
+ break;
+ };
+ os << "fid=" << ev.fhk.fh_hk.bucket << ":" << ev.fhk.fh_hk.object
+ << ";ts=<timespec:" << ev.ts.tv_sec << ";" << ev.ts.tv_nsec << ">>";
+ return os;
+ }
+
void RGWLibFS::gc()
{
using std::get;
using directory = RGWFileHandle::directory;
- static constexpr uint32_t max_ev = 24;
- static constexpr uint16_t expire_s = 300; /* 5m */
+ /* dirent invalidate timeout--basically, the upper-bound on
+ * inconsistency with the S3 namespace */
+ auto expire_s
+ = get_context()->_conf->rgw_nfs_namespace_expire_secs;
+
+ /* max events to gc in one cycle */
+ uint32_t max_ev =
+ std::max(1, get_context()->_conf->rgw_nfs_max_gc);
struct timespec now;
event_vector ve;
do {
{
lock_guard guard(state.mtx); /* LOCKED */
+ /* just return if no events */
+ if (events.empty()) {
+ return;
+ }
uint32_t _max_ev =
(events.size() < 500) ? max_ev : (events.size() / 4);
for (uint32_t ix = 0; (ix < _max_ev) && (events.size() > 0); ++ix) {
event& ev = events.front();
- if (ev.ts.tv_sec < (now.tv_sec + expire_s)) {
+ if (ev.ts.tv_sec > (now.tv_sec + expire_s)) {
stop = true;
break;
}
} /* anon */
/* !LOCKED */
for (auto& ev : ve) {
+ lsubdout(get_context(), rgw, 15)
+ << "try-expire ev: " << ev << dendl;
if (likely(ev.t == event::type::READDIR)) {
RGWFileHandle* rgw_fh = lookup_handle(ev.fhk.fh_hk);
+ lsubdout(get_context(), rgw, 15)
+ << "ev rgw_fh: " << rgw_fh << dendl;
if (rgw_fh) {
RGWFileHandle::directory* d;
if (unlikely(! rgw_fh->is_dir())) {
if (d) {
lock_guard guard(rgw_fh->mtx);
d->clear_state();
+ rgw_fh->invalidate();
}
rele:
unref(rgw_fh);
} /* rgw_fh */
} /* event::type::READDIR */
} /* ev */
- std::this_thread::sleep_for(std::chrono::seconds(120));
- } while (! stop);
+ ve.clear();
+ } while (! (stop || shutdown));
} /* RGWLibFS::gc */
void RGWFileHandle::encode_attrs(ceph::buffer::list& ux_key1,
int rc = 0;
struct timespec now;
CephContext* cct = fs->get_context();
- directory* d = get_directory(); /* already type-checked */
(void) clock_gettime(CLOCK_MONOTONIC_COARSE, &now); /* !LOCKED */
offset);
rc = rgwlib.get_fe()->execute_req(&req);
if (! rc) {
- set_nlink(2 + d->name_cache.size());
+ lock_guard guard(mtx);
state.atime = now;
+ set_nlink(2 + 1);
*eof = req.eof();
event ev(event::type::READDIR, get_key(), state.atime);
fs->state.push_event(ev);
RGWReaddirRequest req(cct, fs->get_user(), this, rcb, cb_arg, offset);
rc = rgwlib.get_fe()->execute_req(&req);
if (! rc) {
+ lock_guard guard(mtx);
state.atime = now;
- set_nlink(2 + d->name_cache.size());
+ set_nlink(2 + 1);
*eof = req.eof();
event ev(event::type::READDIR, get_key(), state.atime);
fs->state.push_event(ev);
delete write_req;
}
+ void RGWFileHandle::directory::clear_state()
+ {
+ marker_cache.clear();
+ }
+
+ void RGWFileHandle::invalidate() {
+ RGWLibFS *fs = get_fs();
+ if (fs->invalidate_cb) {
+ fs->invalidate_cb(fs->invalidate_arg, get_key().fh_hk);
+ }
+ }
+
int RGWWriteRequest::exec_start() {
struct req_state* s = get_state();
return 0;
}
+/*
+ register invalidate callbacks
+*/
+int rgw_register_invalidate(struct rgw_fs *rgw_fs, rgw_fh_callback_t cb,
+ void *arg, uint32_t flags)
+
+{
+ RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
+ return fs->register_invalidate(cb, arg, flags);
+}
+
/*
detach rgw namespace
*/
using dirent_string = basic_sstring<char, uint16_t, 32>;
using marker_cache_t = flat_map<uint64_t, dirent_string>;
- using name_cache_t = flat_map<dirent_string, uint8_t>;
struct State {
uint64_t dev;
uint32_t flags;
marker_cache_t marker_cache;
- name_cache_t name_cache;
directory() : flags(FLAG_NONE) {}
- void clear_state() {
- marker_cache.clear();
- name_cache.clear();
- }
-
- void set_overflow() {
- clear_state();
- flags |= FLAG_OVERFLOW;
- }
+ void clear_state();
};
boost::variant<file, directory> variant_type;
// XXXX check for failure (dup key)
d->marker_cache.insert(
marker_cache_t::value_type(off, marker.data()));
- /* 90% of directories hold <= 32 entries (Yifan Wang, CMU),
- * but go big */
- if (d->name_cache.size() < 128) {
- d->name_cache.insert(
- name_cache_t::value_type(marker.data(), obj_type));
- } else {
- d->set_overflow(); // too many
- }
}
}
- /* XXX */
std::string find_marker(uint64_t off) { // XXX copy
using std::get;
directory* d = get<directory>(&variant_type);
void decode_attrs(const ceph::buffer::list* ux_key1,
const ceph::buffer::list* ux_attrs1);
+ void invalidate();
+
virtual bool reclaim();
typedef cohort::lru::LRU<std::mutex> FhLRU;
CephContext* cct;
struct rgw_fs fs;
RGWFileHandle root_fh;
+ rgw_fh_callback_t invalidate_cb;
+ void *invalidate_arg;
+ bool shutdown;
mutable std::atomic<uint64_t> refcnt;
: t(t), fhk(k), ts(ts) {}
};
+ friend std::ostream& operator<<(std::ostream &os,
+ RGWLibFS::event const &ev);
+
using event_vector = /* boost::small_vector<event, 16> */
std::vector<event>;
State() : flags(0) {}
void push_event(const event& ev) {
- lock_guard guard(mtx);
events.push_back(ev);
}
} state;
RGWLibFS(CephContext* _cct, const char *_uid, const char *_user_id,
const char* _key)
- : cct(_cct), root_fh(this, get_inst()), refcnt(1),
+ : cct(_cct), root_fh(this, get_inst()), invalidate_cb(nullptr),
+ invalidate_arg(nullptr), shutdown(false), refcnt(1),
fh_cache(cct->_conf->rgw_nfs_fhcache_partitions,
cct->_conf->rgw_nfs_fhcache_size),
fh_lru(cct->_conf->rgw_nfs_lru_lanes,
intrusive_ptr_release(this);
}
+ void stop() { shutdown = true; }
+
void release_evict(RGWFileHandle* fh) {
/* remove from cache, releases sentinel ref */
fh_cache.remove(fh->fh.fh_hk.object, fh,
return ret;
} /* authorize */
+ int register_invalidate(rgw_fh_callback_t cb, void *arg, uint32_t flags) {
+ invalidate_cb = cb;
+ invalidate_arg = arg;
+ return 0;
+ }
+
/* find RGWFileHandle by id */
LookupFHResult lookup_fh(const fh_key& fhk,
const uint32_t flags = RGWFileHandle::FLAG_NONE) {
fh->mtx.unlock(); /* !LOCKED */
out:
lat.lock->unlock(); /* !LATCHED */
+
+ /* special case: lookup root_fh */
+ if (! fh) {
+ if (unlikely(fh_hk == root_fh.fh.fh_hk)) {
+ fh = &root_fh;
+ ref(fh);
+ }
+ }
+
return fh;
}
void run();
void checkpoint();
- void stop() { shutdown = true; }
+
+ void stop() {
+ shutdown = true;
+ for (const auto& fs: mounted_fs) {
+ fs.second->stop();
+ }
+ }
void register_fs(RGWLibFS* fs) {
lock_guard guard(mtx);