]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgwlc: revisions
authorMatt Benjamin <mbenjamin@redhat.com>
Tue, 31 Mar 2020 00:16:33 +0000 (20:16 -0400)
committerMatt Benjamin <mbenjamin@redhat.com>
Tue, 21 Apr 2020 17:39:25 +0000 (13:39 -0400)
Contains concurrency fixes, as well as improved debug prints.

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
src/rgw/rgw_lc.cc
src/rgw/rgw_lc.h

index 8632a13a4eb90ffc2b7b568831599521daaf8e1e..feef32e69daf10861f98907e4a4b4c5e7b1c7a41 100644 (file)
@@ -13,6 +13,7 @@
 #include <boost/algorithm/string/predicate.hpp>
 #include <boost/variant.hpp>
 
+#include "include/scope_guard.h"
 #include "common/Formatter.h"
 #include "common/containers.h"
 #include <common/errno.h>
@@ -215,7 +216,8 @@ void *RGWLC::LCWorker::entry() {
       ldpp_dout(dpp, 2) << "life cycle: start" << dendl;
       int r = lc->process(this);
       if (r < 0) {
-        ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r=" << r << dendl;
+        ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r="
+                         << r << dendl;
       }
       ldpp_dout(dpp, 2) << "life cycle: stop" << dendl;
     }
@@ -227,7 +229,8 @@ void *RGWLC::LCWorker::entry() {
     utime_t next;
     next.set_from_double(end + secs);
 
-    ldpp_dout(dpp, 5) << "schedule life cycle next start time: " << rgw_to_asctime(next) << dendl;
+    ldpp_dout(dpp, 5) << "schedule life cycle next start time: "
+                     << rgw_to_asctime(next) << dendl;
 
     std::unique_lock l{lock};
     cond.wait_for(l, std::chrono::seconds(secs));
@@ -263,7 +266,7 @@ void RGWLC::finalize()
   delete[] obj_names;
 }
 
-bool RGWLC::if_already_run_today(time_t& start_date)
+bool RGWLC::if_already_run_today(time_t start_date)
 {
   struct tm bdt;
   time_t begin_of_day;
@@ -287,11 +290,26 @@ bool RGWLC::if_already_run_today(time_t& start_date)
     return false;
 }
 
+static inline std::ostream& operator<<(std::ostream &os, cls_rgw_lc_entry& ent) {
+  os << "<ent: bucket=";
+  os << ent.bucket;
+  os << "; start_time=";
+  os << rgw_to_asctime(utime_t(time_t(ent.start_time), 0));
+  os << "; status=";
+    os << ent.status;
+    os << ">";
+    return os;
+}
+
 int RGWLC::bucket_lc_prepare(int index, LCWorker* worker)
 {
   vector<cls_rgw_lc_entry> entries;
   string marker;
 
+  dout(5) << "RGWLC::bucket_lc_prepare(): PREPARE "
+         << "index: " << index << " worker ix: " << worker->ix
+         << dendl;
+
 #define MAX_LC_LIST_ENTRIES 100
   do {
     int ret = cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index],
@@ -368,7 +386,8 @@ static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info,
       try {
         decode(retention, iter->second);
       } catch (buffer::error& err) {
-        ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectRetention" << dendl;
+        ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectRetention"
+                              << dendl;
         return false;
       }
       if (ceph::real_clock::to_time_t(retention.get_retain_until_date()) >
@@ -382,7 +401,8 @@ static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info,
       try {
         decode(obj_legal_hold, iter->second);
       } catch (buffer::error& err) {
-        ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectLegalHold" << dendl;
+        ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectLegalHold"
+                              << dendl;
         return false;
       }
       if (obj_legal_hold.is_enabled()) {
@@ -501,6 +521,7 @@ struct op_env {
 }; /* op_env */
 
 class LCRuleOp;
+class WorkQ;
 
 struct lc_op_ctx {
   CephContext *cct;
@@ -515,12 +536,14 @@ struct lc_op_ctx {
   rgw_obj obj;
   RGWObjectCtx rctx;
   const DoutPrefixProvider *dpp;
+  WorkQ* wq;
 
-  lc_op_ctx(op_env& _env, rgw_bucket_dir_entry& _o,
-           const DoutPrefixProvider *_dpp)
-    : cct(_env.store->ctx()), env(_env), o(_o),
+  lc_op_ctx(op_env& env, rgw_bucket_dir_entry& o,
+           const DoutPrefixProvider *dpp, WorkQ* wq)
+    : cct(env.store->ctx()), env(env), o(o),
       store(env.store), bucket_info(env.bucket_info), op(env.op), ol(env.ol),
-      obj(env.bucket_info.bucket, o.key), rctx(env.store), dpp(_dpp) {}
+      obj(env.bucket_info.bucket, o.key), rctx(env.store), dpp(dpp), wq(wq)
+    {}
 }; /* lc_op_ctx */
 
 static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed)
@@ -605,7 +628,8 @@ public:
   LCOpRule(op_env& _env) : env(_env) {}
 
   void build();
-  int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp);
+  int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp,
+             WorkQ* wq);
 }; /* LCOpRule */
 
 using WorkItem =
@@ -620,25 +644,37 @@ class WorkQ : public Thread
 {
 public:
   using unique_lock = std::unique_lock<std::mutex>;
-  using work_f = std::function<void(RGWLC::LCWorker*, WorkItem&)>;
+  using work_f = std::function<void(RGWLC::LCWorker*, WorkQ*, WorkItem&)>;
   using dequeue_result = boost::variant<void*, WorkItem>;
 
+  static constexpr uint32_t FLAG_NONE =        0x0000;
+  static constexpr uint32_t FLAG_EWAIT_SYNC =  0x0001;
+  static constexpr uint32_t FLAG_DWAIT_SYNC =  0x0002;
+  static constexpr uint32_t FLAG_EDRAIN_SYNC = 0x0004;
+
 private:
-  const work_f bsf = [](RGWLC::LCWorker* wk, WorkItem& wi) {};
+  const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {};
   RGWLC::LCWorker* wk;
   uint32_t qmax;
+  int ix;
   std::mutex mtx;
   std::condition_variable cv;
+  uint32_t flags;
   vector<WorkItem> items;
   work_f f;
 
 public:
   WorkQ(RGWLC::LCWorker* wk, uint32_t ix, uint32_t qmax)
-    : wk(wk), qmax(qmax), f(bsf)
+    : wk(wk), qmax(qmax), ix(ix), flags(FLAG_NONE), f(bsf)
     {
-      create((string{"workpool_thr_"} + to_string(ix)).c_str());
+      create(thr_name().c_str());
     }
 
+  std::string thr_name() {
+    return std::string{"wp_thrd: "}
+    + std::to_string(wk->ix) + ", " + std::to_string(ix);
+  }
+
   void setf(work_f _f) {
     f = _f;
   }
@@ -647,15 +683,20 @@ public:
     unique_lock uniq(mtx);
     while ((!wk->get_lc()->going_down()) &&
           (items.size() > qmax)) {
+      flags |= FLAG_EWAIT_SYNC;
       cv.wait_for(uniq, 200ms);
     }
     items.push_back(item);
+    if (flags & FLAG_DWAIT_SYNC) {
+      flags &= ~FLAG_DWAIT_SYNC;
+      cv.notify_one();
+    }
   }
 
   void drain() {
     unique_lock uniq(mtx);
-    while ((!wk->get_lc()->going_down()) &&
-          (items.size() > 0)) {
+    flags |= FLAG_EDRAIN_SYNC;
+    while (flags & FLAG_EDRAIN_SYNC) {
       cv.wait_for(uniq, 200ms);
     }
   }
@@ -665,11 +706,20 @@ private:
     unique_lock uniq(mtx);
     while ((!wk->get_lc()->going_down()) &&
           (items.size() == 0)) {
+      /* clear drain state, as we are NOT doing work and qlen==0 */
+      if (flags & FLAG_EDRAIN_SYNC) {
+       flags &= ~FLAG_EDRAIN_SYNC;
+      }
+      flags |= FLAG_DWAIT_SYNC;
       cv.wait_for(uniq, 200ms);
     }
     if (items.size() > 0) {
       auto item = items.back();
       items.pop_back();
+      if (flags & FLAG_EWAIT_SYNC) {
+       flags &= ~FLAG_EWAIT_SYNC;
+       cv.notify_one();
+      }
       return {item};
     }
     return nullptr;
@@ -682,7 +732,7 @@ private:
        /* going down */
        break;
       }
-      f(wk, boost::get<WorkItem>(item));
+      f(wk, this, boost::get<WorkItem>(item));
     }
     return nullptr;
   }
@@ -723,17 +773,22 @@ public:
   }
 }; /* WorkPool */
 
-RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct,
-                         RGWLC *_lc)
-  : dpp(_dpp), cct(_cct), lc(_lc)
+RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* dpp, CephContext *cct,
+                         RGWLC *lc, int ix)
+  : dpp(dpp), cct(cct), lc(lc), ix(ix)
 {
   auto wpw = cct->_conf.get_val<int64_t>("rgw_lc_max_wp_worker");
   workpool = new WorkPool(this, wpw, 512);
 }
 
+static inline bool worker_should_stop(time_t stop_at)
+{
+  return stop_at < time(nullptr);
+}
+
 int RGWLC::handle_multipart_expiration(
   RGWRados::Bucket *target, const multimap<string, lc_op>& prefix_map,
-  LCWorker* worker)
+  LCWorker* worker, time_t stop_at)
 {
   MultipartMetaFilter mp_filter;
   vector<rgw_bucket_dir_entry> objs;
@@ -750,7 +805,7 @@ int RGWLC::handle_multipart_expiration(
   list_op.params.ns = RGW_OBJ_NS_MULTIPART;
   list_op.params.filter = &mp_filter;
 
-  auto pf = [&](RGWLC::LCWorker* wk, WorkItem& wi) {
+  auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
     auto wt = boost::get<std::tuple<const lc_op&, rgw_bucket_dir_entry>>(wi);
     auto& [rule, obj] = wt;
     RGWMPObj mp_obj;
@@ -764,11 +819,13 @@ int RGWLC::handle_multipart_expiration(
       if (ret < 0 && ret != -ERR_NO_SUCH_UPLOAD) {
        ldpp_dout(wk->get_lc(), 0)
          << "ERROR: abort_multipart_upload failed, ret=" << ret
+         << wq->thr_name()
          << ", meta:" << obj.key
          << dendl;
       } else if (ret == -ERR_NO_SUCH_UPLOAD) {
        ldpp_dout(wk->get_lc(), 5)
          << "ERROR: abort_multipart_upload failed, ret=" << ret
+         << wq->thr_name()
          << ", meta:" << obj.key
          << dendl;
       }
@@ -779,6 +836,14 @@ int RGWLC::handle_multipart_expiration(
 
   for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
        ++prefix_iter) {
+
+    if (worker_should_stop(stop_at)) {
+      ldout(cct, 5) << __func__ << " interval budget EXPIRED worker "
+                    << worker->ix
+                    << dendl;
+      return 0;
+    }
+
     if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
       continue;
     }
@@ -799,7 +864,6 @@ int RGWLC::handle_multipart_expiration(
          {prefix_iter->second, *obj_iter};
        worker->workpool->enqueue(WorkItem{t1});
        if (going_down()) {
-         worker->workpool->drain();
          return 0;
        }
       } /* for objs */
@@ -864,7 +928,8 @@ static int check_tags(lc_op_ctx& oc, bool *skip)
                            oc.rctx, tags_bl);
     if (ret < 0) {
       if (ret != -ENODATA) {
-        ldout(oc.cct, 5) << "ERROR: read_obj_tags returned r=" << ret << dendl;
+        ldout(oc.cct, 5) << "ERROR: read_obj_tags returned r="
+                        << ret << " " << oc.wq->thr_name() << dendl;
       }
       return 0;
     }
@@ -873,12 +938,16 @@ static int check_tags(lc_op_ctx& oc, bool *skip)
       auto iter = tags_bl.cbegin();
       dest_obj_tags.decode(iter);
     } catch (buffer::error& err) {
-      ldout(oc.cct,0) << "ERROR: caught buffer::error, couldn't decode TagSet" << dendl;
+      ldout(oc.cct,0) << "ERROR: caught buffer::error, couldn't decode TagSet "
+                     << oc.wq->thr_name() << dendl;
       return -EIO;
     }
 
     if (! has_all_tags(op, dest_obj_tags)) {
-      ldout(oc.cct, 20) << __func__ << "() skipping obj " << oc.obj << " as tags do not match in rule: " << op.id << dendl;
+      ldout(oc.cct, 20) << __func__ << "() skipping obj " << oc.obj
+                       << " as tags do not match in rule: "
+                       << op.id << " "
+                       << oc.wq->thr_name() << dendl;
       return 0;
     }
   }
@@ -902,7 +971,9 @@ public:
       if (ret == -ENOENT) {
         return false;
       }
-      ldout(oc.cct, 0) << "ERROR: check_tags on obj=" << oc.obj << " returned ret=" << ret << dendl;
+      ldout(oc.cct, 0) << "ERROR: check_tags on obj=" << oc.obj
+                      << " returned ret=" << ret << " "
+                      << oc.wq->thr_name() << dendl;
       return false;
     }
 
@@ -915,7 +986,9 @@ public:
   bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
     auto& o = oc.o;
     if (!o.is_current()) {
-      ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": not current, skipping" << dendl;
+      ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+                       << ": not current, skipping "
+                       << oc.wq->thr_name() << dendl;
       return false;
     }
     if (o.is_delete_marker()) {
@@ -932,7 +1005,9 @@ public:
     auto& op = oc.op;
     if (op.expiration <= 0) {
       if (op.expiration_date == boost::none) {
-        ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": no expiration set in rule, skipping" << dendl;
+        ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+                         << ": no expiration set in rule, skipping "
+                         << oc.wq->thr_name() << dendl;
         return false;
       }
       is_expired = ceph_clock_now() >=
@@ -942,7 +1017,9 @@ public:
       is_expired = obj_has_expired(oc.cct, mtime, op.expiration, exp_time);
     }
 
-    ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << (int)is_expired << dendl;
+    ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired="
+                     << (int)is_expired << " "
+                     << oc.wq->thr_name() << dendl;
     return is_expired;
   }
 
@@ -956,11 +1033,13 @@ public:
     }
     if (r < 0) {
       ldout(oc.cct, 0) << "ERROR: remove_expired_obj " 
-      << oc.bucket_info.bucket << ":" << o.key 
-      << " " << cpp_strerror(r) << dendl;
+                      << oc.bucket_info.bucket << ":" << o.key 
+                      << " " << cpp_strerror(r) << " "
+                      << oc.wq->thr_name() << dendl;
       return r;
     }
-    ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << dendl;
+    ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
+                    << " " << oc.wq->thr_name() << dendl;
     return 0;
   }
 };
@@ -970,7 +1049,9 @@ public:
   bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
     auto& o = oc.o;
     if (o.is_current()) {
-      ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": current version, skipping" << dendl;
+      ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+                       << ": current version, skipping "
+                       << oc.wq->thr_name() << dendl;
       return false;
     }
 
@@ -978,7 +1059,9 @@ public:
     int expiration = oc.op.noncur_expiration;
     bool is_expired = obj_has_expired(oc.cct, mtime, expiration, exp_time);
 
-    ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << is_expired << dendl;
+    ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired="
+                     << is_expired << " "
+                     << oc.wq->thr_name() << dendl;
     return is_expired &&
       pass_object_lock_check(oc.store->getRados(),
                             oc.bucket_info, oc.obj, oc.rctx);
@@ -989,11 +1072,14 @@ public:
     int r = remove_expired_obj(oc, true);
     if (r < 0) {
       ldout(oc.cct, 0) << "ERROR: remove_expired_obj (non-current expiration) " 
-      << oc.bucket_info.bucket << ":" << o.key 
-      << " " << cpp_strerror(r) << dendl;
+                      << oc.bucket_info.bucket << ":" << o.key 
+                      << " " << cpp_strerror(r)
+                      << " " << oc.wq->thr_name() << dendl;
       return r;
     }
-    ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << " (non-current expiration)" << dendl;
+    ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
+                    << " (non-current expiration) "
+                    << oc.wq->thr_name() << dendl;
     return 0;
   }
 };
@@ -1003,12 +1089,16 @@ public:
   bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
     auto& o = oc.o;
     if (!o.is_delete_marker()) {
-      ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": not a delete marker, skipping" << dendl;
+      ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+                       << ": not a delete marker, skipping "
+                       << oc.wq->thr_name() << dendl;
       return false;
     }
 
     if (oc.ol.next_has_same_name()) {
-      ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": next is same object, skipping" << dendl;
+      ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+                       << ": next is same object, skipping "
+                       << oc.wq->thr_name() << dendl;
       return false;
     }
 
@@ -1022,11 +1112,15 @@ public:
     int r = remove_expired_obj(oc, true);
     if (r < 0) {
       ldout(oc.cct, 0) << "ERROR: remove_expired_obj (delete marker expiration) "
-      << oc.bucket_info.bucket << ":" << o.key
-      << " " << cpp_strerror(r) << dendl;
+                      << oc.bucket_info.bucket << ":" << o.key
+                      << " " << cpp_strerror(r)
+                      << " " << oc.wq->thr_name()
+                      << dendl;
       return r;
     }
-    ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << " (delete marker expiration)" << dendl;
+    ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
+                    << " (delete marker expiration) "
+                    << oc.wq->thr_name() << dendl;
     return 0;
   }
 };
@@ -1057,7 +1151,9 @@ public:
     bool is_expired;
     if (transition.days < 0) {
       if (transition.date == boost::none) {
-        ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": no transition day/date set in rule, skipping" << dendl;
+        ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+                         << ": no transition day/date set in rule, skipping "
+                         << oc.wq->thr_name() << dendl;
         return false;
       }
       is_expired = ceph_clock_now() >=
@@ -1067,7 +1163,9 @@ public:
       is_expired = obj_has_expired(oc.cct, mtime, transition.days, exp_time);
     }
 
-    ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << is_expired << dendl;
+    ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired="
+                     << is_expired << " "
+                     << oc.wq->thr_name() << dendl;
 
     need_to_process =
       (rgw_placement_rule::get_canonical_storage_class(o.meta.storage_class) !=
@@ -1089,9 +1187,11 @@ public:
 
     if (!oc.store->svc()->zone->get_zone_params().
        valid_placement(target_placement)) {
-      ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: " << target_placement
+      ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: "
+                          << target_placement
                            << " bucket="<< oc.bucket_info.bucket
-                           << " rule_id=" << oc.op.id << dendl;
+                           << " rule_id=" << oc.op.id
+                          << " " << oc.wq->thr_name() << dendl;
       return -EINVAL;
     }
 
@@ -1100,12 +1200,16 @@ public:
       o.versioned_epoch, oc.dpp, null_yield);
     if (r < 0) {
       ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj " 
-      << oc.bucket_info.bucket << ":" << o.key 
-      << " -> " << transition.storage_class 
-      << " " << cpp_strerror(r) << dendl;
+                          << oc.bucket_info.bucket << ":" << o.key 
+                          << " -> " << transition.storage_class 
+                          << " " << cpp_strerror(r)
+                          << " " << oc.wq->thr_name() << dendl;
       return r;
     }
-    ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket_info.bucket << ":" << o.key << " -> " << transition.storage_class << dendl;
+    ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket_info.bucket
+                        << ":" << o.key << " -> "
+                        << transition.storage_class
+                        << " " << oc.wq->thr_name() << dendl;
     return 0;
   }
 };
@@ -1166,9 +1270,11 @@ void LCOpRule::build()
   }
 }
 
-int LCOpRule::process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp)
+int LCOpRule::process(rgw_bucket_dir_entry& o,
+                     const DoutPrefixProvider *dpp,
+                     WorkQ* wq)
 {
-  lc_op_ctx ctx(env, o, dpp);
+  lc_op_ctx ctx(env, o, dpp, wq);
 
   unique_ptr<LCOpAction> *selected = nullptr;
   real_time exp;
@@ -1205,25 +1311,30 @@ int LCOpRule::process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp)
     }
 
     if (!cont) {
-      ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": no rule match, skipping" << dendl;
+      ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
+                        << ": no rule match, skipping "
+                        << " " << wq->thr_name() << dendl;
       return 0;
     }
 
     int r = (*selected)->process(ctx);
     if (r < 0) {
       ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj " 
-      << env.bucket_info.bucket << ":" << o.key
-      << " " << cpp_strerror(r) << dendl;
+                       << env.bucket_info.bucket << ":" << o.key
+                       << " " << cpp_strerror(r)
+                       << " " << wq->thr_name() << dendl;
       return r;
     }
-    ldpp_dout(dpp, 20) << "processed:" << env.bucket_info.bucket << ":" << o.key << dendl;
+    ldpp_dout(dpp, 20) << "processed:" << env.bucket_info.bucket << ":"
+                      << o.key << " " << wq->thr_name() << dendl;
   }
 
   return 0;
 
 }
 
-int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker)
+int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
+                            time_t stop_at)
 {
   RGWLifecycleConfiguration  config(cct);
   RGWBucketInfo bucket_info;
@@ -1239,13 +1350,22 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker)
     store->svc(), bucket_tenant, bucket_name, bucket_info, NULL, null_yield,
     &bucket_attrs);
   if (ret < 0) {
-    ldpp_dout(this, 0) << "LC:get_bucket_info for " << bucket_name << " failed" << dendl;
+    ldpp_dout(this, 0) << "LC:get_bucket_info for " << bucket_name
+                      << " failed" << dendl;
     return ret;
   }
 
+  auto stack_guard = make_scope_guard(
+    [&worker, &bucket_info]
+      {
+       worker->workpool->drain();
+      }
+    );
+
   if (bucket_info.bucket.marker != bucket_marker) {
-    ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket=" << bucket_tenant
-                       << ":" << bucket_name << " cur_marker=" << bucket_info.bucket.marker
+    ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket="
+                      << bucket_tenant << ":" << bucket_name
+                      << " cur_marker=" << bucket_info.bucket.marker
                        << " orig_marker=" << bucket_marker << dendl;
     return -ENOENT;
   }
@@ -1260,21 +1380,23 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker)
   try {
       config.decode(iter);
     } catch (const buffer::error& e) {
-      ldpp_dout(this, 0) << __func__ <<  "() decode life cycle config failed" << dendl;
+      ldpp_dout(this, 0) << __func__ <<  "() decode life cycle config failed"
+                        << dendl;
       return -1;
     }
 
-  auto pf = [](RGWLC::LCWorker* wk, WorkItem& wi) {
+  auto pf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
     auto wt =
       boost::get<std::tuple<LCOpRule&, rgw_bucket_dir_entry>>(wi);
     auto& [op_rule, o] = wt;
     ldpp_dout(wk->get_lc(), 20)
-      << __func__ << "(): key=" << o.key << dendl;
-    std::cout << "KEY2: " << o.key << std::endl;
-    int ret = op_rule.process(o, wk->dpp);
+      << __func__ << "(): key=" << o.key << wq->thr_name() 
+      << dendl;
+    int ret = op_rule.process(o, wk->dpp, wq);
     if (ret < 0) {
       ldpp_dout(wk->get_lc(), 20)
        << "ERROR: orule.process() returned ret=" << ret
+       << wq->thr_name() 
        << dendl;
     }
   };
@@ -1289,11 +1411,20 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker)
   rgw_obj_key next_marker;
   for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
       ++prefix_iter) {
+
+    if (worker_should_stop(stop_at)) {
+      ldout(cct, 5) << __func__ << " interval budget EXPIRED worker "
+                    << worker->ix
+                    << dendl;
+      return 0;
+    }
+
     auto& op = prefix_iter->second;
     if (!is_valid_op(op)) {
       continue;
     }
-    ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first << dendl;
+    ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first
+                       << dendl;
     if (prefix_iter != prefix_map.begin() && 
         (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(),
                                    prev(prefix_iter)->first) == 0)) {
@@ -1316,11 +1447,6 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker)
     op_env oenv(op, store, worker, bucket_info, ol);
     LCOpRule orule(oenv);
     orule.build(); // why can't ctor do it?
-#if 0
-    /* would permit passing o by reference, removes fetch overlap */
-    auto fetch_barrier = [&worker]()
-                          { worker->workpool->drain(); };
-#endif
     rgw_bucket_dir_entry* o{nullptr};
     for (; ol.get_obj(&o /* , fetch_barrier */); ol.next()) {
       std::tuple<LCOpRule&, rgw_bucket_dir_entry> t1 = {orule, *o};
@@ -1329,7 +1455,7 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker)
     worker->workpool->drain();
   }
 
-  ret = handle_multipart_expiration(&target, prefix_map, worker);
+  ret = handle_multipart_expiration(&target, prefix_map, worker, stop_at);
   return ret;
 }
 
@@ -1350,15 +1476,17 @@ int RGWLC::bucket_lc_post(int index, int max_lock_sec,
   do {
     int ret = l.lock_exclusive(
       &store->getRados()->lc_pool_ctx, obj_names[index]);
-    if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */
+    if (ret == -EBUSY || ret == -EEXIST) {
+      /* already locked by another lc processor */
       ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to acquire lock on "
-          << obj_names[index] << ", sleep 5, try again" << dendl;
+                        << obj_names[index] << ", sleep 5, try again " << dendl;
       sleep(5);
       continue;
     }
     if (ret < 0)
       return 0;
-    ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index] << dendl;
+    ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index]
+                       << dendl;
     if (result ==  -ENOENT) {
       ret = cls_rgw_lc_rm_entry(store->getRados()->lc_pool_ctx,
                                obj_names[index],  entry);
@@ -1381,7 +1509,8 @@ int RGWLC::bucket_lc_post(int index, int max_lock_sec,
     }
 clean:
     l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
-    ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock " << obj_names[index] << dendl;
+    ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock "
+                       << obj_names[index] << dendl;
     return 0;
   } while (true);
 }
@@ -1434,12 +1563,42 @@ int RGWLC::process(LCWorker* worker)
   return 0;
 }
 
+bool RGWLC::expired_session(time_t started)
+{
+  time_t interval = (cct->_conf->rgw_lc_debug_interval > 0)
+    ? cct->_conf->rgw_lc_debug_interval
+    : 24*60*60;
+
+  auto now = time(nullptr);
+
+  dout(16) << "RGWLC::expired_session"
+          << " started: " << started
+          << " interval: " << interval << "(*2==" << 2*interval << ")"
+          << " now: " << now
+          << dendl;
+
+  return (started + 2*interval < now);
+}
+
+time_t RGWLC::thread_stop_at()
+{
+  uint64_t interval = (cct->_conf->rgw_lc_debug_interval > 0)
+    ? cct->_conf->rgw_lc_debug_interval
+    : 24*60*60;
+
+  return time(nullptr) + interval;
+}
+
 int RGWLC::process(int index, int max_lock_secs, LCWorker* worker)
 {
+  dout(5) << "RGWLC::process(): ENTER: "
+         << "index: " << index << " worker ix: " << worker->ix
+         << dendl;
+
   rados::cls::lock::Lock l(lc_index_lock_name);
   do {
     utime_t now = ceph_clock_now();
-    //string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS
+    //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS
     cls_rgw_lc_entry entry;
     if (max_lock_secs <= 0)
       return -EAGAIN;
@@ -1449,7 +1608,8 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker)
 
     int ret = l.lock_exclusive(&store->getRados()->lc_pool_ctx,
                               obj_names[index]);
-    if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */
+    if (ret == -EBUSY || ret == -EEXIST) {
+      /* already locked by another lc processor */
       ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
           << obj_names[index] << ", sleep 5, try again" << dendl;
       sleep(5);
@@ -1470,12 +1630,20 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker)
     if (! (cct->_conf->rgw_lc_lock_max_time == 9969)) {
       ret = cls_rgw_lc_get_entry(store->getRados()->lc_pool_ctx,
                                 obj_names[index], head.marker, entry);
-      if ((entry.status == lc_processing) &&
-         (true /* XXXX expired epoch! */)) {
-       dout(5) << "RGWLC::process(): ACTIVE entry: " << entry
-               << " index: " << index << " worker ix: " << worker->ix
-               << dendl;
-       goto exit;
+      if (ret >= 0) {
+       if (entry.status == lc_processing) {
+         if (expired_session(entry.start_time)) {
+           dout(5) << "RGWLC::process(): STALE lc session found for: " << entry
+                   << " index: " << index << " worker ix: " << worker->ix
+                   << " (clearing)"
+                   << dendl;
+         } else {
+           dout(5) << "RGWLC::process(): ACTIVE entry: " << entry
+                   << " index: " << index << " worker ix: " << worker->ix
+                 << dendl;
+           goto exit;
+         }
+       }
       }
     }
 
@@ -1532,7 +1700,7 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker)
            << dendl;
 
     l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
-    ret = bucket_lc_process(entry.bucket, worker);
+    ret = bucket_lc_process(entry.bucket, worker, thread_stop_at());
     bucket_lc_post(index, max_lock_secs, entry, ret, worker);
   } while(1);
 
@@ -1547,7 +1715,7 @@ void RGWLC::start_processor()
   workers.reserve(maxw);
   for (int ix = 0; ix < maxw; ++ix) {
     auto worker  =
-      std::make_unique<RGWLC::LCWorker>(this /* dpp */, cct, this);
+      std::make_unique<RGWLC::LCWorker>(this /* dpp */, cct, this, ix);
     worker->create((string{"lifecycle_thr_"} + to_string(ix)).c_str());
     workers.emplace_back(std::move(worker));
   }
index 793e6f90b7b87e4ccc6202fbf3cda7e672c8e9d8..fd8b565f2ae60b463b375c0c662c07431cb96845 100644 (file)
@@ -468,12 +468,16 @@ public:
     CephContext *cct;
     RGWLC *lc;
     int ix;
-    ceph::mutex lock = ceph::make_mutex("LCWorker");
-    ceph::condition_variable cond;
+    std::mutex lock;
+    std::condition_variable cond;
     WorkPool* workpool{nullptr};
 
   public:
-    LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWLC *_lc);
+    using lock_guard = std::lock_guard<std::mutex>;
+    using unique_lock = std::unique_lock<std::mutex>;
+
+    LCWorker(const DoutPrefixProvider* dpp, CephContext *_cct, RGWLC *_lc,
+            int ix);
     RGWLC* get_lc() { return lc; }
     void *entry() override;
     void stop();
@@ -483,6 +487,7 @@ public:
 
     friend class RGWRados;
     friend class RGWLC;
+    friend class WorkQ;
   }; /* LCWorker */
 
   friend class RGWRados;
@@ -497,11 +502,13 @@ public:
 
   int process(LCWorker* worker);
   int process(int index, int max_secs, LCWorker* worker);
-  bool if_already_run_today(time_t& start_date);
+  bool if_already_run_today(time_t start_date);
+  bool expired_session(time_t started);
+  time_t thread_stop_at();
   int list_lc_progress(const string& marker, uint32_t max_entries,
                       vector<cls_rgw_lc_entry>&);
   int bucket_lc_prepare(int index, LCWorker* worker);
-  int bucket_lc_process(string& shard_id, LCWorker* worker);
+  int bucket_lc_process(string& shard_id, LCWorker* worker, time_t stop_at);
   int bucket_lc_post(int index, int max_lock_sec,
                     cls_rgw_lc_entry& entry, int& result, LCWorker* worker);
   bool going_down();
@@ -521,7 +528,8 @@ public:
 
   int handle_multipart_expiration(RGWRados::Bucket *target,
                                  const multimap<string, lc_op>& prefix_map,
-                                 LCWorker* worker);
+                                 LCWorker* worker,
+                                 time_t stop_at);
 };
 
 namespace rgw::lc {