From: Matt Benjamin Date: Tue, 28 Apr 2020 20:52:24 +0000 (-0400) Subject: rgwlc: make invalid OpRule, lc_op_ctx accesses impossible X-Git-Tag: v15.2.5~56^2~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=96fd7b5be61d7fcba9346c3a30456291375a832b;p=ceph.git rgwlc: make invalid OpRule, lc_op_ctx accesses impossible * At the cost of some additional overhead from deep copies, makes all access to these structures from WorkQ context safe. * Lifts saved next-key-name and effective-mtime from LCOpAction to lc_op_ctx (via LCOpRule), former usage was incorrect * Use ordered entry listing for Current, NonCurrent, and DM expiration--this is required by lookahead strategy Signed-off-by: Matt Benjamin (cherry picked from commit b610d504a2895656e1db6ed41c8cca8e12b98907) --- diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index d2b320246920..4a475ad7c4a0 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -431,7 +431,6 @@ public: store(_store), bucket_info(_bucket_info), target(store->getRados(), bucket_info), list_op(&target) { list_op.params.list_versions = bucket_info.versioned(); - list_op.params.allow_unordered = true; delay_ms = store->ctx()->_conf.get_val("rgw_lc_thread_delay"); } @@ -500,6 +499,7 @@ public: * only happen if is_truncated is false */ return boost::none; } + return ((obj_iter + 1)->key.name); } @@ -509,7 +509,7 @@ struct op_env { using LCWorker = RGWLC::LCWorker; - lc_op& op; + lc_op op; rgw::sal::RGWRadosStore *store; LCWorker* worker; RGWBucketInfo& bucket_info; @@ -526,12 +526,14 @@ class WorkQ; struct lc_op_ctx { CephContext *cct; - op_env& env; - rgw_bucket_dir_entry& o; + op_env env; + rgw_bucket_dir_entry o; + boost::optional next_key_name; + ceph::real_time dm_effective_mtime; rgw::sal::RGWRadosStore *store; RGWBucketInfo& bucket_info; - lc_op& op; + lc_op& op; // ok--refers to expanded env.op LCObjsLister& ol; rgw_obj obj; @@ -540,11 +542,19 @@ struct lc_op_ctx { WorkQ* wq; lc_op_ctx(op_env& env, rgw_bucket_dir_entry& o, + boost::optional next_key_name, + ceph::real_time dem, const DoutPrefixProvider *dpp, WorkQ* wq) - : cct(env.store->ctx()), env(env), o(o), + : cct(env.store->ctx()), env(env), o(o), next_key_name(next_key_name), store(env.store), bucket_info(env.bucket_info), op(env.op), ol(env.ol), obj(env.bucket_info.bucket, o.key), rctx(env.store), dpp(dpp), wq(wq) {} + + bool next_has_same_name(const std::string& key_name) { + return (next_key_name && key_name.compare( + boost::get(next_key_name)) == 0); + } + }; /* lc_op_ctx */ static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed) @@ -582,19 +592,12 @@ static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed) } /* remove_expired_obj */ class LCOpAction { -protected: - boost::optional next_key_name; public: virtual ~LCOpAction() {} - bool next_has_same_name(const std::string& key_name) { - return (next_key_name && key_name.compare( - boost::get(next_key_name)) == 0); - } - virtual bool check(lc_op_ctx& oc, ceph::real_time *exp_time) { return false; - }; + } /* called after check(). Check should tell us whether this action * is applicable. If there are multiple actions, we'll end up executing @@ -614,6 +617,8 @@ public: virtual int process(lc_op_ctx& oc) { return 0; } + + friend class LCOpRule; }; /* LCOpAction */ class LCOpFilter { @@ -627,15 +632,22 @@ virtual ~LCOpFilter() {} class LCOpRule { friend class LCOpAction; - op_env& env; + op_env env; + boost::optional next_key_name; + ceph::real_time dm_effective_mtime; - std::vector > filters; - std::vector > actions; + std::vector > filters; // n.b., sharing ovhd + std::vector > actions; public: LCOpRule(op_env& _env) : env(_env) {} + boost::optional get_next_key_name() { + return next_key_name; + } + void build(); + void update(); int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp, WorkQ* wq); }; /* LCOpRule */ @@ -643,9 +655,9 @@ public: using WorkItem = boost::variant, + std::tuple, /* uncompleted MPU expiration */ - std::tuple, + std::tuple, rgw_bucket_dir_entry>; class WorkQ : public Thread @@ -814,7 +826,7 @@ int RGWLC::handle_multipart_expiration( list_op.params.filter = &mp_filter; auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) { - auto wt = boost::get>(wi); + auto wt = boost::get>(wi); auto& [rule, obj] = wt; RGWMPObj mp_obj; if (obj_has_expired(cct, obj.meta.mtime, rule.mp_expiration)) { @@ -868,7 +880,7 @@ int RGWLC::handle_multipart_expiration( } for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) { - std::tuple t1 = + std::tuple t1 = {prefix_iter->second, *obj_iter}; worker->workpool->enqueue(WorkItem{t1}); if (going_down()) { @@ -993,9 +1005,7 @@ public: class LCOpAction_CurrentExpiration : public LCOpAction { public: - LCOpAction_CurrentExpiration(op_env& env) { - next_key_name = env.ol.next_key_name(); - } + LCOpAction_CurrentExpiration(op_env& env) {} bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override { auto& o = oc.o; @@ -1006,9 +1016,17 @@ public: return false; } if (o.is_delete_marker()) { - if (next_has_same_name(o.key.name)) { + std::string nkn; + if (oc.next_key_name) nkn = *oc.next_key_name; + if (oc.next_has_same_name(o.key.name)) { + ldout(oc.cct, 7) << __func__ << "(): dm-check SAME: key=" << o.key + << " next_key_name: %%" << nkn << "%% " + << oc.wq->thr_name() << dendl; return false; } else { + ldout(oc.cct, 7) << __func__ << "(): dm-check DELE: key=" << o.key + << " next_key_name: %%" << nkn << "%% " + << oc.wq->thr_name() << dendl; *exp_time = real_clock::now(); return true; } @@ -1042,18 +1060,29 @@ public: int r; if (o.is_delete_marker()) { r = remove_expired_obj(oc, true); + if (r < 0) { + ldout(oc.cct, 0) << "ERROR: current is-dm remove_expired_obj " + << oc.bucket_info.bucket << ":" << o.key + << " " << cpp_strerror(r) << " " + << oc.wq->thr_name() << dendl; + return r; + } + ldout(oc.cct, 2) << "DELETED: current is-dm " + << oc.bucket_info.bucket << ":" << o.key + << " " << oc.wq->thr_name() << dendl; } else { + /* ! o.is_delete_marker() */ r = remove_expired_obj(oc, !oc.bucket_info.versioned()); + if (r < 0) { + ldout(oc.cct, 0) << "ERROR: remove_expired_obj " + << oc.bucket_info.bucket << ":" << o.key + << " " << cpp_strerror(r) << " " + << oc.wq->thr_name() << dendl; + return r; + } + ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key + << " " << oc.wq->thr_name() << dendl; } - if (r < 0) { - ldout(oc.cct, 0) << "ERROR: remove_expired_obj " - << oc.bucket_info.bucket << ":" << o.key - << " " << cpp_strerror(r) << " " - << oc.wq->thr_name() << dendl; - return r; - } - ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key - << " " << oc.wq->thr_name() << dendl; return 0; } }; @@ -1105,9 +1134,7 @@ public: class LCOpAction_DMExpiration : public LCOpAction { public: - LCOpAction_DMExpiration(op_env& env) { - next_key_name = env.ol.next_key_name(); - } + LCOpAction_DMExpiration(op_env& env) {} bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override { auto& o = oc.o; @@ -1117,7 +1144,7 @@ public: << oc.wq->thr_name() << dendl; return false; } - if (next_has_same_name(o.key.name)) { + if (oc.next_has_same_name(o.key.name)) { ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": next is same object, skipping " << oc.wq->thr_name() << dendl; @@ -1252,20 +1279,17 @@ public: class LCOpAction_NonCurrentTransition : public LCOpAction_Transition { protected: - ceph::real_time effective_mtime; - bool check_current_state(bool is_current) override { return !is_current; } ceph::real_time get_effective_mtime(lc_op_ctx& oc) override { - return effective_mtime; + return oc.dm_effective_mtime; } public: LCOpAction_NonCurrentTransition(op_env& env, const transition_action& _transition) - : LCOpAction_Transition(_transition), - effective_mtime(env.ol.get_prev_obj().meta.mtime) + : LCOpAction_Transition(_transition) {} }; @@ -1297,13 +1321,18 @@ void LCOpRule::build() } } +void LCOpRule::update() +{ + next_key_name = env.ol.next_key_name(); + dm_effective_mtime = env.ol.get_prev_obj().meta.mtime; +} + int LCOpRule::process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp, WorkQ* wq) { - lc_op_ctx ctx(env, o, dpp, wq); - - unique_ptr *selected = nullptr; + lc_op_ctx ctx(env, o, next_key_name, dm_effective_mtime, dpp, wq); + shared_ptr *selected = nullptr; // n.b., req'd by sharing real_time exp; for (auto& a : actions) { @@ -1414,8 +1443,9 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, auto pf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) { auto wt = - boost::get>(wi); + boost::get>(wi); auto& [op_rule, o] = wt; + ldpp_dout(wk->get_lc(), 20) << __func__ << "(): key=" << o.key << wq->thr_name() << dendl; @@ -1476,7 +1506,8 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, orule.build(); // why can't ctor do it? rgw_bucket_dir_entry* o{nullptr}; for (; ol.get_obj(&o /* , fetch_barrier */); ol.next()) { - std::tuple t1 = {orule, *o}; + orule.update(); + std::tuple t1 = {orule, *o}; worker->workpool->enqueue(WorkItem{t1}); } worker->workpool->drain(); @@ -1866,7 +1897,7 @@ void get_lc_oid(CephContext *cct, const string& shard_id, string *oid) int max_objs = (cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME : cct->_conf->rgw_lc_max_objs); - /* XXXX oh noes!!! */ + /* n.b. review hash algo */ int index = ceph_str_hash_linux(shard_id.c_str(), shard_id.size()) % HASH_PRIME % max_objs; *oid = lc_oid_prefix;