From: Matt Benjamin Date: Tue, 14 Mar 2017 01:52:08 +0000 (-0400) Subject: rgw_file: implement reliable has-children check (unlink dir) X-Git-Tag: v10.2.8~72^2~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8c7cb8227c39b09060cdca4e11dec75cf7f2336c;p=ceph.git rgw_file: implement reliable has-children check (unlink dir) Bug report and discussion provided by Gui Hecheng in nfs-ganesha upstream github. Briefly, while a reliable check is potentially costly, it is necessary. Fixes: http://tracker.ceph.com/issues/19270 Signed-off-by: Matt Benjamin (cherry picked from commit b05f1c6d61aa4501a971e87de6dcaf3e58c3d9b4) --- diff --git a/src/rgw/rgw_file.cc b/src/rgw/rgw_file.cc index 240b828895bb..2e084a6add06 100644 --- a/src/rgw/rgw_file.cc +++ b/src/rgw/rgw_file.cc @@ -266,8 +266,14 @@ namespace rgw { } std::string oname = rgw_fh->relative_object_name(); - if (rgw_fh->is_dir()) + if (rgw_fh->is_dir()) { + /* for the duration of our cache timer, trust positive + * child cache */ + if (rgw_fh->has_children()) { + return(-ENOTEMPTY); + } oname += "/"; + } RGWDeleteObjRequest req(cct, get_user(), parent->bucket_name(), oname); rc = rgwlib.get_fe()->execute_req(&req); @@ -840,6 +846,28 @@ namespace rgw { return true; } /* RGWFileHandle::reclaim */ + bool RGWFileHandle::has_children() const + { + if (unlikely(! is_dir())) + return false; + +#if 0 + /* XXX pretty, but unsafe w/o consistent caching */ + const directory* d = get(&variant_type); + if ((d->flags & RGWFileHandle::directory::FLAG_CACHED) && + (state.nlink > 2 /* . and .. */)) + return true; +#endif /* 0 */ + + RGWRMdirCheck req(fs->get_context(), fs->get_user(), this); + int rc = rgwlib.get_fe()->execute_req(&req); + if (! rc) { + return req.valid && req.has_children; + } + + return false; + } + int RGWFileHandle::readdir(rgw_readdir_cb rcb, void *cb_arg, uint64_t *offset, bool *eof, uint32_t flags) { @@ -863,7 +891,7 @@ namespace rgw { if (! rc) { lock_guard guard(mtx); state.atime = now; - set_nlink(2 + 1); + set_nlink(2 + 1); // XXXX *eof = req.eof(); event ev(event::type::READDIR, get_key(), state.atime); fs->state.push_event(ev); @@ -875,7 +903,7 @@ namespace rgw { if (! rc) { lock_guard guard(mtx); state.atime = now; - set_nlink(2 + 1); + set_nlink(2 + 1); // XXXX *eof = req.eof(); event ev(event::type::READDIR, get_key(), state.atime); fs->state.push_event(ev); @@ -1034,6 +1062,7 @@ namespace rgw { void RGWFileHandle::directory::clear_state() { + flags &= ~RGWFileHandle::directory::FLAG_CACHED; marker_cache.clear(); } diff --git a/src/rgw/rgw_file.h b/src/rgw/rgw_file.h index cdcb6cb569e0..99b31fd5c7ac 100644 --- a/src/rgw/rgw_file.h +++ b/src/rgw/rgw_file.h @@ -392,11 +392,11 @@ namespace rgw { const std::string& object_name() const { return name; } - std::string full_object_name(bool omit_bucket = false) { + std::string full_object_name(bool omit_bucket = false) const { std::string path; std::vector segments; int reserve = 0; - RGWFileHandle* tfh = this; + const RGWFileHandle* tfh = this; while (tfh && !tfh->is_root() && !(tfh->is_bucket() && omit_bucket)) { segments.push_back(&tfh->object_name()); reserve += (1 + tfh->object_name().length()); @@ -417,11 +417,11 @@ namespace rgw { return path; } - inline std::string relative_object_name() { + inline std::string relative_object_name() const { return full_object_name(true /* omit_bucket */); } - inline std::string format_child_name(const std::string& cbasename) { + inline std::string format_child_name(const std::string& cbasename) const { std::string child_name{relative_object_name()}; if ((child_name.size() > 0) && (child_name.back() != '/')) @@ -430,7 +430,7 @@ namespace rgw { return child_name; } - inline std::string make_key_name(const char *name) { + inline std::string make_key_name(const char *name) const { std::string key_name{full_object_name()}; if (key_name.length() > 0) key_name += "/"; @@ -438,7 +438,7 @@ namespace rgw { return key_name; } - fh_key make_fhk(const std::string& name) { + fh_key make_fhk(const std::string& name) const { if (depth <= 1) return fh_key(fhk.fh_hk.object, name.c_str()); else { @@ -458,9 +458,9 @@ namespace rgw { } } - const rgw_obj_key* find_marker(uint64_t off) { + const rgw_obj_key* find_marker(uint64_t off) const { using std::get; - directory* d = get(&variant_type); + const directory* d = get(&variant_type); if (d) { const auto& iter = d->marker_cache.find(off); if (iter != d->marker_cache.end()) @@ -478,6 +478,7 @@ namespace rgw { bool creating() const { return flags & FLAG_CREATING; } bool deleted() const { return flags & FLAG_DELETED; } bool stateless_open() const { return flags & FLAG_STATELESS_OPEN; } + bool has_children() const; int open(uint32_t gsh_flags) { lock_guard guard(mtx); @@ -1381,6 +1382,89 @@ public: }; /* RGWReaddirRequest */ +/* + dir has-children predicate (bucket objects) +*/ + +class RGWRMdirCheck : public RGWLibRequest, + public RGWListBucket /* RGWOp */ +{ +public: + const RGWFileHandle* rgw_fh; + bool valid; + bool has_children; + + RGWRMdirCheck (CephContext* _cct, RGWUserInfo *_user, + const RGWFileHandle* _rgw_fh) + : RGWLibRequest(_cct, _user), rgw_fh(_rgw_fh), valid(false), + has_children(false) { + default_max = 2; + op = this; + } + + virtual bool only_bucket() override { return false; } + + virtual int op_init() override { + // assign store, s, and dialect_handler + RGWObjectCtx* rados_ctx + = static_cast(get_state()->obj_ctx); + // framework promises to call op_init after parent init + assert(rados_ctx); + RGWOp::init(rados_ctx->store, get_state(), this); + op = this; // assign self as op: REQUIRED + return 0; + } + + virtual int header_init() override { + struct req_state* s = get_state(); + s->info.method = "GET"; + s->op = OP_GET; + + std::string uri = "/" + rgw_fh->bucket_name() + "/"; + s->relative_uri = uri; + s->info.request_uri = uri; + s->info.effective_uri = uri; + s->info.request_params = ""; + s->info.domain = ""; /* XXX ? */ + + s->user = user; + + prefix = rgw_fh->relative_object_name(); + if (prefix.length() > 0) + prefix += "/"; + delimiter = '/'; + + return 0; + } + + virtual int get_params() override { + max = default_max; + return 0; + } + + virtual void send_response() override { + valid = true; + if ((objs.size() > 1) || + (! objs.empty() && + (objs.front().key.name != prefix))) { + has_children = true; + return; + } + for (auto& iter : common_prefixes) { + /* readdir never produces a name for this case */ + if (iter.first == "/") + continue; + has_children = true; + break; + } + } + + virtual void send_versioned_response() { + send_response(); + } + +}; /* RGWRMdirCheck */ + /* create bucket */