From: Matt Benjamin Date: Tue, 14 Mar 2017 01:52:08 +0000 (-0400) Subject: rgw_file: implement reliable has-children check (unlink dir) X-Git-Tag: v12.0.1~58^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b05f1c6d61aa4501a971e87de6dcaf3e58c3d9b4;p=ceph.git rgw_file: implement reliable has-children check (unlink dir) Bug report and discussion provided by Gui Hecheng in nfs-ganesha upstream github. Briefly, while a reliable check is potentially costly, it is necessary. Fixes: http://tracker.ceph.com/issues/19270 Signed-off-by: Matt Benjamin --- diff --git a/src/rgw/rgw_file.cc b/src/rgw/rgw_file.cc index f1d1ef1649d4..3bde20bad4da 100644 --- a/src/rgw/rgw_file.cc +++ b/src/rgw/rgw_file.cc @@ -257,8 +257,14 @@ namespace rgw { } std::string oname = rgw_fh->relative_object_name(); - if (rgw_fh->is_dir()) + if (rgw_fh->is_dir()) { + /* for the duration of our cache timer, trust positive + * child cache */ + if (rgw_fh->has_children()) { + return(-ENOTEMPTY); + } oname += "/"; + } RGWDeleteObjRequest req(cct, get_user(), parent->bucket_name(), oname); rc = rgwlib.get_fe()->execute_req(&req); @@ -845,6 +851,28 @@ namespace rgw { return true; } /* RGWFileHandle::reclaim */ + bool RGWFileHandle::has_children() const + { + if (unlikely(! is_dir())) + return false; + +#if 0 + /* XXX pretty, but unsafe w/o consistent caching */ + const directory* d = get(&variant_type); + if ((d->flags & RGWFileHandle::directory::FLAG_CACHED) && + (state.nlink > 2 /* . and .. */)) + return true; +#endif /* 0 */ + + RGWRMdirCheck req(fs->get_context(), fs->get_user(), this); + int rc = rgwlib.get_fe()->execute_req(&req); + if (! rc) { + return req.valid && req.has_children; + } + + return false; + } + int RGWFileHandle::readdir(rgw_readdir_cb rcb, void *cb_arg, uint64_t *offset, bool *eof, uint32_t flags) { @@ -868,7 +896,7 @@ namespace rgw { if (! rc) { lock_guard guard(mtx); state.atime = now; - set_nlink(2 + 1); + set_nlink(2 + 1); // XXXX *eof = req.eof(); event ev(event::type::READDIR, get_key(), state.atime); fs->state.push_event(ev); @@ -880,7 +908,7 @@ namespace rgw { if (! rc) { lock_guard guard(mtx); state.atime = now; - set_nlink(2 + 1); + set_nlink(2 + 1); // XXXX *eof = req.eof(); event ev(event::type::READDIR, get_key(), state.atime); fs->state.push_event(ev); @@ -1039,6 +1067,7 @@ namespace rgw { void RGWFileHandle::directory::clear_state() { + flags &= ~RGWFileHandle::directory::FLAG_CACHED; marker_cache.clear(); } diff --git a/src/rgw/rgw_file.h b/src/rgw/rgw_file.h index 8c590c1ec929..15f25573c587 100644 --- a/src/rgw/rgw_file.h +++ b/src/rgw/rgw_file.h @@ -398,11 +398,11 @@ namespace rgw { const std::string& object_name() const { return name; } - std::string full_object_name(bool omit_bucket = false) { + std::string full_object_name(bool omit_bucket = false) const { std::string path; std::vector segments; int reserve = 0; - RGWFileHandle* tfh = this; + const RGWFileHandle* tfh = this; while (tfh && !tfh->is_root() && !(tfh->is_bucket() && omit_bucket)) { segments.push_back(&tfh->object_name()); reserve += (1 + tfh->object_name().length()); @@ -423,11 +423,11 @@ namespace rgw { return path; } - inline std::string relative_object_name() { + inline std::string relative_object_name() const { return full_object_name(true /* omit_bucket */); } - inline std::string format_child_name(const std::string& cbasename) { + inline std::string format_child_name(const std::string& cbasename) const { std::string child_name{relative_object_name()}; if ((child_name.size() > 0) && (child_name.back() != '/')) @@ -436,7 +436,7 @@ namespace rgw { return child_name; } - inline std::string make_key_name(const char *name) { + inline std::string make_key_name(const char *name) const { std::string key_name{full_object_name()}; if (key_name.length() > 0) key_name += "/"; @@ -444,7 +444,7 @@ namespace rgw { return key_name; } - fh_key make_fhk(const std::string& name) { + fh_key make_fhk(const std::string& name) const { if (depth <= 1) return fh_key(fhk.fh_hk.object, name.c_str()); else { @@ -464,9 +464,9 @@ namespace rgw { } } - const rgw_obj_key* find_marker(uint64_t off) { + const rgw_obj_key* find_marker(uint64_t off) const { using std::get; - directory* d = get(&variant_type); + const directory* d = get(&variant_type); if (d) { const auto& iter = d->marker_cache.find(off); if (iter != d->marker_cache.end()) @@ -484,6 +484,7 @@ namespace rgw { bool creating() const { return flags & FLAG_CREATING; } bool deleted() const { return flags & FLAG_DELETED; } bool stateless_open() const { return flags & FLAG_STATELESS_OPEN; } + bool has_children() const; uint32_t open(uint32_t gsh_flags) { lock_guard guard(mtx); @@ -1372,6 +1373,89 @@ public: }; /* RGWReaddirRequest */ +/* + dir has-children predicate (bucket objects) +*/ + +class RGWRMdirCheck : public RGWLibRequest, + public RGWListBucket /* RGWOp */ +{ +public: + const RGWFileHandle* rgw_fh; + bool valid; + bool has_children; + + RGWRMdirCheck (CephContext* _cct, RGWUserInfo *_user, + const RGWFileHandle* _rgw_fh) + : RGWLibRequest(_cct, _user), rgw_fh(_rgw_fh), valid(false), + has_children(false) { + default_max = 2; + op = this; + } + + virtual bool only_bucket() override { return false; } + + virtual int op_init() override { + // assign store, s, and dialect_handler + RGWObjectCtx* rados_ctx + = static_cast(get_state()->obj_ctx); + // framework promises to call op_init after parent init + assert(rados_ctx); + RGWOp::init(rados_ctx->store, get_state(), this); + op = this; // assign self as op: REQUIRED + return 0; + } + + virtual int header_init() override { + struct req_state* s = get_state(); + s->info.method = "GET"; + s->op = OP_GET; + + std::string uri = "/" + rgw_fh->bucket_name() + "/"; + s->relative_uri = uri; + s->info.request_uri = uri; + s->info.effective_uri = uri; + s->info.request_params = ""; + s->info.domain = ""; /* XXX ? */ + + s->user = user; + + prefix = rgw_fh->relative_object_name(); + if (prefix.length() > 0) + prefix += "/"; + delimiter = '/'; + + return 0; + } + + virtual int get_params() override { + max = default_max; + return 0; + } + + virtual void send_response() override { + valid = true; + if ((objs.size() > 1) || + (! objs.empty() && + (objs.front().key.name != prefix))) { + has_children = true; + return; + } + for (auto& iter : common_prefixes) { + /* readdir never produces a name for this case */ + if (iter.first == "/") + continue; + has_children = true; + break; + } + } + + virtual void send_versioned_response() { + send_response(); + } + +}; /* RGWRMdirCheck */ + /* create bucket */