]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgwlc: make invalid OpRule, lc_op_ctx accesses impossible
authorMatt Benjamin <mbenjamin@redhat.com>
Tue, 28 Apr 2020 20:52:24 +0000 (16:52 -0400)
committerNathan Cutler <ncutler@suse.com>
Sun, 9 Aug 2020 20:49:12 +0000 (22:49 +0200)
* At the cost of some additional overhead from deep copies,
  makes all access to these structures from WorkQ context safe.

* Lifts saved next-key-name and effective-mtime from LCOpAction
  to lc_op_ctx (via LCOpRule), former usage was incorrect

* Use ordered entry listing for Current, NonCurrent, and DM
  expiration--this is required by lookahead strategy

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
(cherry picked from commit b610d504a2895656e1db6ed41c8cca8e12b98907)

src/rgw/rgw_lc.cc

index d2b3202469203cb8a553f932bb75f2c706460f2a..4a475ad7c4a0114e612b80e109eed53219666738 100644 (file)
@@ -431,7 +431,6 @@ public:
       store(_store), bucket_info(_bucket_info),
       target(store->getRados(), bucket_info), list_op(&target) {
     list_op.params.list_versions = bucket_info.versioned();
-    list_op.params.allow_unordered = true;
     delay_ms = store->ctx()->_conf.get_val<int64_t>("rgw_lc_thread_delay");
   }
 
@@ -500,6 +499,7 @@ public:
        * only happen if is_truncated is false */
       return boost::none;
     }
+
     return ((obj_iter + 1)->key.name);
   }
 
@@ -509,7 +509,7 @@ struct op_env {
 
   using LCWorker = RGWLC::LCWorker;
 
-  lc_op& op;
+  lc_op op;
   rgw::sal::RGWRadosStore *store;
   LCWorker* worker;
   RGWBucketInfo& bucket_info;
@@ -526,12 +526,14 @@ class WorkQ;
 
 struct lc_op_ctx {
   CephContext *cct;
-  op_env& env;
-  rgw_bucket_dir_entry& o;
+  op_env env;
+  rgw_bucket_dir_entry o;
+  boost::optional<std::string> next_key_name;
+  ceph::real_time dm_effective_mtime;
 
   rgw::sal::RGWRadosStore *store;
   RGWBucketInfo& bucket_info;
-  lc_op& op;
+  lc_op& op; // ok--refers to expanded env.op
   LCObjsLister& ol;
 
   rgw_obj obj;
@@ -540,11 +542,19 @@ struct lc_op_ctx {
   WorkQ* wq;
 
   lc_op_ctx(op_env& env, rgw_bucket_dir_entry& o,
+           boost::optional<std::string> next_key_name,
+           ceph::real_time dem,
            const DoutPrefixProvider *dpp, WorkQ* wq)
-    : cct(env.store->ctx()), env(env), o(o),
+    : cct(env.store->ctx()), env(env), o(o), next_key_name(next_key_name),
       store(env.store), bucket_info(env.bucket_info), op(env.op), ol(env.ol),
       obj(env.bucket_info.bucket, o.key), rctx(env.store), dpp(dpp), wq(wq)
     {}
+
+  bool next_has_same_name(const std::string& key_name) {
+    return (next_key_name && key_name.compare(
+             boost::get<std::string>(next_key_name)) == 0);
+  }
+
 }; /* lc_op_ctx */
 
 static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed)
@@ -582,19 +592,12 @@ static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed)
 } /* remove_expired_obj */
 
 class LCOpAction {
-protected:
-  boost::optional<std::string> next_key_name;
 public:
   virtual ~LCOpAction() {}
 
-  bool next_has_same_name(const std::string& key_name) {
-    return (next_key_name && key_name.compare(
-             boost::get<std::string>(next_key_name)) == 0);
-  }
-
   virtual bool check(lc_op_ctx& oc, ceph::real_time *exp_time) {
     return false;
-  };
+  }
 
   /* called after check(). Check should tell us whether this action
    * is applicable. If there are multiple actions, we'll end up executing
@@ -614,6 +617,8 @@ public:
   virtual int process(lc_op_ctx& oc) {
     return 0;
   }
+
+  friend class LCOpRule;
 }; /* LCOpAction */
 
 class LCOpFilter {
@@ -627,15 +632,22 @@ virtual ~LCOpFilter() {}
 class LCOpRule {
   friend class LCOpAction;
 
-  op_env& env;
+  op_env env;
+  boost::optional<std::string> next_key_name;
+  ceph::real_time dm_effective_mtime;
 
-  std::vector<unique_ptr<LCOpFilter> > filters;
-  std::vector<unique_ptr<LCOpAction> > actions;
+  std::vector<shared_ptr<LCOpFilter> > filters; // n.b., sharing ovhd
+  std::vector<shared_ptr<LCOpAction> > actions;
 
 public:
   LCOpRule(op_env& _env) : env(_env) {}
 
+  boost::optional<std::string> get_next_key_name() {
+    return next_key_name;
+  }
+
   void build();
+  void update();
   int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp,
              WorkQ* wq);
 }; /* LCOpRule */
@@ -643,9 +655,9 @@ public:
 using WorkItem =
   boost::variant<void*,
                 /* out-of-line delete */
-                std::tuple<LCOpRule&, rgw_bucket_dir_entry>,
+                std::tuple<LCOpRule, rgw_bucket_dir_entry>,
                 /* uncompleted MPU expiration */
-                std::tuple<const lc_op&, rgw_bucket_dir_entry>,
+                std::tuple<lc_op, rgw_bucket_dir_entry>,
                 rgw_bucket_dir_entry>;
 
 class WorkQ : public Thread
@@ -814,7 +826,7 @@ int RGWLC::handle_multipart_expiration(
   list_op.params.filter = &mp_filter;
 
   auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
-    auto wt = boost::get<std::tuple<const lc_op&, rgw_bucket_dir_entry>>(wi);
+    auto wt = boost::get<std::tuple<lc_op, rgw_bucket_dir_entry>>(wi);
     auto& [rule, obj] = wt;
     RGWMPObj mp_obj;
     if (obj_has_expired(cct, obj.meta.mtime, rule.mp_expiration)) {
@@ -868,7 +880,7 @@ int RGWLC::handle_multipart_expiration(
       }
 
       for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
-       std::tuple<const lc_op&, rgw_bucket_dir_entry> t1 =
+       std::tuple<lc_op, rgw_bucket_dir_entry> t1 =
          {prefix_iter->second, *obj_iter};
        worker->workpool->enqueue(WorkItem{t1});
        if (going_down()) {
@@ -993,9 +1005,7 @@ public:
 
 class LCOpAction_CurrentExpiration : public LCOpAction {
 public:
-  LCOpAction_CurrentExpiration(op_env& env) {
-    next_key_name = env.ol.next_key_name();
-  }
+  LCOpAction_CurrentExpiration(op_env& env) {}
 
   bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
     auto& o = oc.o;
@@ -1006,9 +1016,17 @@ public:
       return false;
     }
     if (o.is_delete_marker()) {
-      if (next_has_same_name(o.key.name)) {
+      std::string nkn;
+      if (oc.next_key_name) nkn = *oc.next_key_name;
+      if (oc.next_has_same_name(o.key.name)) {
+       ldout(oc.cct, 7) << __func__ << "(): dm-check SAME: key=" << o.key
+                      << " next_key_name: %%" << nkn << "%% "
+                      << oc.wq->thr_name() << dendl;
        return false;
       } else {
+       ldout(oc.cct, 7) << __func__ << "(): dm-check DELE: key=" << o.key
+                        << " next_key_name: %%" << nkn << "%% "
+                        << oc.wq->thr_name() << dendl;
         *exp_time = real_clock::now();
         return true;
       }
@@ -1042,18 +1060,29 @@ public:
     int r;
     if (o.is_delete_marker()) {
       r = remove_expired_obj(oc, true);
+      if (r < 0) {
+       ldout(oc.cct, 0) << "ERROR: current is-dm remove_expired_obj "
+                        << oc.bucket_info.bucket << ":" << o.key
+                        << " " << cpp_strerror(r) << " "
+                        << oc.wq->thr_name() << dendl;
+      return r;
+      }
+      ldout(oc.cct, 2) << "DELETED: current is-dm "
+                      << oc.bucket_info.bucket << ":" << o.key
+                      << " " << oc.wq->thr_name() << dendl;
     } else {
+      /* ! o.is_delete_marker() */
       r = remove_expired_obj(oc, !oc.bucket_info.versioned());
+      if (r < 0) {
+       ldout(oc.cct, 0) << "ERROR: remove_expired_obj "
+                        << oc.bucket_info.bucket << ":" << o.key
+                        << " " << cpp_strerror(r) << " "
+                        << oc.wq->thr_name() << dendl;
+       return r;
+      }
+      ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
+                      << " " << oc.wq->thr_name() << dendl;
     }
-    if (r < 0) {
-      ldout(oc.cct, 0) << "ERROR: remove_expired_obj " 
-                      << oc.bucket_info.bucket << ":" << o.key 
-                      << " " << cpp_strerror(r) << " "
-                      << oc.wq->thr_name() << dendl;
-      return r;
-    }
-    ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
-                    << " " << oc.wq->thr_name() << dendl;
     return 0;
   }
 };
@@ -1105,9 +1134,7 @@ public:
 
 class LCOpAction_DMExpiration : public LCOpAction {
 public:
-  LCOpAction_DMExpiration(op_env& env) {
-    next_key_name = env.ol.next_key_name();
-  }
+  LCOpAction_DMExpiration(op_env& env) {}
 
   bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
     auto& o = oc.o;
@@ -1117,7 +1144,7 @@ public:
                        << oc.wq->thr_name() << dendl;
       return false;
     }
-    if (next_has_same_name(o.key.name)) {
+    if (oc.next_has_same_name(o.key.name)) {
       ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
                        << ": next is same object, skipping "
                        << oc.wq->thr_name() << dendl;
@@ -1252,20 +1279,17 @@ public:
 
 class LCOpAction_NonCurrentTransition : public LCOpAction_Transition {
 protected:
-  ceph::real_time effective_mtime;
-
   bool check_current_state(bool is_current) override {
     return !is_current;
   }
 
   ceph::real_time get_effective_mtime(lc_op_ctx& oc) override {
-    return effective_mtime;
+    return oc.dm_effective_mtime;
   }
 public:
   LCOpAction_NonCurrentTransition(op_env& env,
                                  const transition_action& _transition)
-    : LCOpAction_Transition(_transition),
-      effective_mtime(env.ol.get_prev_obj().meta.mtime)
+    : LCOpAction_Transition(_transition)
     {}
 };
 
@@ -1297,13 +1321,18 @@ void LCOpRule::build()
   }
 }
 
+void LCOpRule::update()
+{
+  next_key_name = env.ol.next_key_name();
+  dm_effective_mtime = env.ol.get_prev_obj().meta.mtime;
+}
+
 int LCOpRule::process(rgw_bucket_dir_entry& o,
                      const DoutPrefixProvider *dpp,
                      WorkQ* wq)
 {
-  lc_op_ctx ctx(env, o, dpp, wq);
-
-  unique_ptr<LCOpAction> *selected = nullptr;
+  lc_op_ctx ctx(env, o, next_key_name, dm_effective_mtime, dpp, wq);
+  shared_ptr<LCOpAction> *selected = nullptr; // n.b., req'd by sharing
   real_time exp;
 
   for (auto& a : actions) {
@@ -1414,8 +1443,9 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
 
   auto pf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
     auto wt =
-      boost::get<std::tuple<LCOpRule&, rgw_bucket_dir_entry>>(wi);
+      boost::get<std::tuple<LCOpRule, rgw_bucket_dir_entry>>(wi);
     auto& [op_rule, o] = wt;
+
     ldpp_dout(wk->get_lc(), 20)
       << __func__ << "(): key=" << o.key << wq->thr_name() 
       << dendl;
@@ -1476,7 +1506,8 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
     orule.build(); // why can't ctor do it?
     rgw_bucket_dir_entry* o{nullptr};
     for (; ol.get_obj(&o /* , fetch_barrier */); ol.next()) {
-      std::tuple<LCOpRule&, rgw_bucket_dir_entry> t1 = {orule, *o};
+      orule.update();
+      std::tuple<LCOpRule, rgw_bucket_dir_entry> t1 = {orule, *o};
       worker->workpool->enqueue(WorkItem{t1});
     }
     worker->workpool->drain();
@@ -1866,7 +1897,7 @@ void get_lc_oid(CephContext *cct, const string& shard_id, string *oid)
   int max_objs =
     (cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME :
      cct->_conf->rgw_lc_max_objs);
-  /* XXXX oh noes!!! */
+  /* n.b. review hash algo */
   int index = ceph_str_hash_linux(shard_id.c_str(),
                                  shard_id.size()) % HASH_PRIME % max_objs;
   *oid = lc_oid_prefix;