From: Matt Benjamin Date: Tue, 31 Mar 2020 00:16:33 +0000 (-0400) Subject: rgwlc: revisions X-Git-Tag: v16.1.0~2298^2~9 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=786358c224bd79f8f38bc2d2820af37564efcfec;p=ceph.git rgwlc: revisions Contains concurrency fixes, as well as improved debug prints. Signed-off-by: Matt Benjamin --- diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index 8632a13a4eb9..feef32e69daf 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -13,6 +13,7 @@ #include #include +#include "include/scope_guard.h" #include "common/Formatter.h" #include "common/containers.h" #include @@ -215,7 +216,8 @@ void *RGWLC::LCWorker::entry() { ldpp_dout(dpp, 2) << "life cycle: start" << dendl; int r = lc->process(this); if (r < 0) { - ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r=" << r << dendl; + ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r=" + << r << dendl; } ldpp_dout(dpp, 2) << "life cycle: stop" << dendl; } @@ -227,7 +229,8 @@ void *RGWLC::LCWorker::entry() { utime_t next; next.set_from_double(end + secs); - ldpp_dout(dpp, 5) << "schedule life cycle next start time: " << rgw_to_asctime(next) << dendl; + ldpp_dout(dpp, 5) << "schedule life cycle next start time: " + << rgw_to_asctime(next) << dendl; std::unique_lock l{lock}; cond.wait_for(l, std::chrono::seconds(secs)); @@ -263,7 +266,7 @@ void RGWLC::finalize() delete[] obj_names; } -bool RGWLC::if_already_run_today(time_t& start_date) +bool RGWLC::if_already_run_today(time_t start_date) { struct tm bdt; time_t begin_of_day; @@ -287,11 +290,26 @@ bool RGWLC::if_already_run_today(time_t& start_date) return false; } +static inline std::ostream& operator<<(std::ostream &os, cls_rgw_lc_entry& ent) { + os << ""; + return os; +} + int RGWLC::bucket_lc_prepare(int index, LCWorker* worker) { vector entries; string marker; + dout(5) << "RGWLC::bucket_lc_prepare(): PREPARE " + << "index: " << index << " worker ix: " << worker->ix + << dendl; + #define MAX_LC_LIST_ENTRIES 100 do { int ret = cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index], @@ -368,7 +386,8 @@ static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info, try { decode(retention, iter->second); } catch (buffer::error& err) { - ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectRetention" << dendl; + ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectRetention" + << dendl; return false; } if (ceph::real_clock::to_time_t(retention.get_retain_until_date()) > @@ -382,7 +401,8 @@ static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info, try { decode(obj_legal_hold, iter->second); } catch (buffer::error& err) { - ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectLegalHold" << dendl; + ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectLegalHold" + << dendl; return false; } if (obj_legal_hold.is_enabled()) { @@ -501,6 +521,7 @@ struct op_env { }; /* op_env */ class LCRuleOp; +class WorkQ; struct lc_op_ctx { CephContext *cct; @@ -515,12 +536,14 @@ struct lc_op_ctx { rgw_obj obj; RGWObjectCtx rctx; const DoutPrefixProvider *dpp; + WorkQ* wq; - lc_op_ctx(op_env& _env, rgw_bucket_dir_entry& _o, - const DoutPrefixProvider *_dpp) - : cct(_env.store->ctx()), env(_env), o(_o), + lc_op_ctx(op_env& env, rgw_bucket_dir_entry& o, + const DoutPrefixProvider *dpp, WorkQ* wq) + : cct(env.store->ctx()), env(env), o(o), store(env.store), bucket_info(env.bucket_info), op(env.op), ol(env.ol), - obj(env.bucket_info.bucket, o.key), rctx(env.store), dpp(_dpp) {} + obj(env.bucket_info.bucket, o.key), rctx(env.store), dpp(dpp), wq(wq) + {} }; /* lc_op_ctx */ static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed) @@ -605,7 +628,8 @@ public: LCOpRule(op_env& _env) : env(_env) {} void build(); - int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp); + int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp, + WorkQ* wq); }; /* LCOpRule */ using WorkItem = @@ -620,25 +644,37 @@ class WorkQ : public Thread { public: using unique_lock = std::unique_lock; - using work_f = std::function; + using work_f = std::function; using dequeue_result = boost::variant; + static constexpr uint32_t FLAG_NONE = 0x0000; + static constexpr uint32_t FLAG_EWAIT_SYNC = 0x0001; + static constexpr uint32_t FLAG_DWAIT_SYNC = 0x0002; + static constexpr uint32_t FLAG_EDRAIN_SYNC = 0x0004; + private: - const work_f bsf = [](RGWLC::LCWorker* wk, WorkItem& wi) {}; + const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {}; RGWLC::LCWorker* wk; uint32_t qmax; + int ix; std::mutex mtx; std::condition_variable cv; + uint32_t flags; vector items; work_f f; public: WorkQ(RGWLC::LCWorker* wk, uint32_t ix, uint32_t qmax) - : wk(wk), qmax(qmax), f(bsf) + : wk(wk), qmax(qmax), ix(ix), flags(FLAG_NONE), f(bsf) { - create((string{"workpool_thr_"} + to_string(ix)).c_str()); + create(thr_name().c_str()); } + std::string thr_name() { + return std::string{"wp_thrd: "} + + std::to_string(wk->ix) + ", " + std::to_string(ix); + } + void setf(work_f _f) { f = _f; } @@ -647,15 +683,20 @@ public: unique_lock uniq(mtx); while ((!wk->get_lc()->going_down()) && (items.size() > qmax)) { + flags |= FLAG_EWAIT_SYNC; cv.wait_for(uniq, 200ms); } items.push_back(item); + if (flags & FLAG_DWAIT_SYNC) { + flags &= ~FLAG_DWAIT_SYNC; + cv.notify_one(); + } } void drain() { unique_lock uniq(mtx); - while ((!wk->get_lc()->going_down()) && - (items.size() > 0)) { + flags |= FLAG_EDRAIN_SYNC; + while (flags & FLAG_EDRAIN_SYNC) { cv.wait_for(uniq, 200ms); } } @@ -665,11 +706,20 @@ private: unique_lock uniq(mtx); while ((!wk->get_lc()->going_down()) && (items.size() == 0)) { + /* clear drain state, as we are NOT doing work and qlen==0 */ + if (flags & FLAG_EDRAIN_SYNC) { + flags &= ~FLAG_EDRAIN_SYNC; + } + flags |= FLAG_DWAIT_SYNC; cv.wait_for(uniq, 200ms); } if (items.size() > 0) { auto item = items.back(); items.pop_back(); + if (flags & FLAG_EWAIT_SYNC) { + flags &= ~FLAG_EWAIT_SYNC; + cv.notify_one(); + } return {item}; } return nullptr; @@ -682,7 +732,7 @@ private: /* going down */ break; } - f(wk, boost::get(item)); + f(wk, this, boost::get(item)); } return nullptr; } @@ -723,17 +773,22 @@ public: } }; /* WorkPool */ -RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, - RGWLC *_lc) - : dpp(_dpp), cct(_cct), lc(_lc) +RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* dpp, CephContext *cct, + RGWLC *lc, int ix) + : dpp(dpp), cct(cct), lc(lc), ix(ix) { auto wpw = cct->_conf.get_val("rgw_lc_max_wp_worker"); workpool = new WorkPool(this, wpw, 512); } +static inline bool worker_should_stop(time_t stop_at) +{ + return stop_at < time(nullptr); +} + int RGWLC::handle_multipart_expiration( RGWRados::Bucket *target, const multimap& prefix_map, - LCWorker* worker) + LCWorker* worker, time_t stop_at) { MultipartMetaFilter mp_filter; vector objs; @@ -750,7 +805,7 @@ int RGWLC::handle_multipart_expiration( list_op.params.ns = RGW_OBJ_NS_MULTIPART; list_op.params.filter = &mp_filter; - auto pf = [&](RGWLC::LCWorker* wk, WorkItem& wi) { + auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) { auto wt = boost::get>(wi); auto& [rule, obj] = wt; RGWMPObj mp_obj; @@ -764,11 +819,13 @@ int RGWLC::handle_multipart_expiration( if (ret < 0 && ret != -ERR_NO_SUCH_UPLOAD) { ldpp_dout(wk->get_lc(), 0) << "ERROR: abort_multipart_upload failed, ret=" << ret + << wq->thr_name() << ", meta:" << obj.key << dendl; } else if (ret == -ERR_NO_SUCH_UPLOAD) { ldpp_dout(wk->get_lc(), 5) << "ERROR: abort_multipart_upload failed, ret=" << ret + << wq->thr_name() << ", meta:" << obj.key << dendl; } @@ -779,6 +836,14 @@ int RGWLC::handle_multipart_expiration( for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) { + + if (worker_should_stop(stop_at)) { + ldout(cct, 5) << __func__ << " interval budget EXPIRED worker " + << worker->ix + << dendl; + return 0; + } + if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) { continue; } @@ -799,7 +864,6 @@ int RGWLC::handle_multipart_expiration( {prefix_iter->second, *obj_iter}; worker->workpool->enqueue(WorkItem{t1}); if (going_down()) { - worker->workpool->drain(); return 0; } } /* for objs */ @@ -864,7 +928,8 @@ static int check_tags(lc_op_ctx& oc, bool *skip) oc.rctx, tags_bl); if (ret < 0) { if (ret != -ENODATA) { - ldout(oc.cct, 5) << "ERROR: read_obj_tags returned r=" << ret << dendl; + ldout(oc.cct, 5) << "ERROR: read_obj_tags returned r=" + << ret << " " << oc.wq->thr_name() << dendl; } return 0; } @@ -873,12 +938,16 @@ static int check_tags(lc_op_ctx& oc, bool *skip) auto iter = tags_bl.cbegin(); dest_obj_tags.decode(iter); } catch (buffer::error& err) { - ldout(oc.cct,0) << "ERROR: caught buffer::error, couldn't decode TagSet" << dendl; + ldout(oc.cct,0) << "ERROR: caught buffer::error, couldn't decode TagSet " + << oc.wq->thr_name() << dendl; return -EIO; } if (! has_all_tags(op, dest_obj_tags)) { - ldout(oc.cct, 20) << __func__ << "() skipping obj " << oc.obj << " as tags do not match in rule: " << op.id << dendl; + ldout(oc.cct, 20) << __func__ << "() skipping obj " << oc.obj + << " as tags do not match in rule: " + << op.id << " " + << oc.wq->thr_name() << dendl; return 0; } } @@ -902,7 +971,9 @@ public: if (ret == -ENOENT) { return false; } - ldout(oc.cct, 0) << "ERROR: check_tags on obj=" << oc.obj << " returned ret=" << ret << dendl; + ldout(oc.cct, 0) << "ERROR: check_tags on obj=" << oc.obj + << " returned ret=" << ret << " " + << oc.wq->thr_name() << dendl; return false; } @@ -915,7 +986,9 @@ public: bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override { auto& o = oc.o; if (!o.is_current()) { - ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": not current, skipping" << dendl; + ldout(oc.cct, 20) << __func__ << "(): key=" << o.key + << ": not current, skipping " + << oc.wq->thr_name() << dendl; return false; } if (o.is_delete_marker()) { @@ -932,7 +1005,9 @@ public: auto& op = oc.op; if (op.expiration <= 0) { if (op.expiration_date == boost::none) { - ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": no expiration set in rule, skipping" << dendl; + ldout(oc.cct, 20) << __func__ << "(): key=" << o.key + << ": no expiration set in rule, skipping " + << oc.wq->thr_name() << dendl; return false; } is_expired = ceph_clock_now() >= @@ -942,7 +1017,9 @@ public: is_expired = obj_has_expired(oc.cct, mtime, op.expiration, exp_time); } - ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << (int)is_expired << dendl; + ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" + << (int)is_expired << " " + << oc.wq->thr_name() << dendl; return is_expired; } @@ -956,11 +1033,13 @@ public: } if (r < 0) { ldout(oc.cct, 0) << "ERROR: remove_expired_obj " - << oc.bucket_info.bucket << ":" << o.key - << " " << cpp_strerror(r) << dendl; + << oc.bucket_info.bucket << ":" << o.key + << " " << cpp_strerror(r) << " " + << oc.wq->thr_name() << dendl; return r; } - ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << dendl; + ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key + << " " << oc.wq->thr_name() << dendl; return 0; } }; @@ -970,7 +1049,9 @@ public: bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override { auto& o = oc.o; if (o.is_current()) { - ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": current version, skipping" << dendl; + ldout(oc.cct, 20) << __func__ << "(): key=" << o.key + << ": current version, skipping " + << oc.wq->thr_name() << dendl; return false; } @@ -978,7 +1059,9 @@ public: int expiration = oc.op.noncur_expiration; bool is_expired = obj_has_expired(oc.cct, mtime, expiration, exp_time); - ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << is_expired << dendl; + ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" + << is_expired << " " + << oc.wq->thr_name() << dendl; return is_expired && pass_object_lock_check(oc.store->getRados(), oc.bucket_info, oc.obj, oc.rctx); @@ -989,11 +1072,14 @@ public: int r = remove_expired_obj(oc, true); if (r < 0) { ldout(oc.cct, 0) << "ERROR: remove_expired_obj (non-current expiration) " - << oc.bucket_info.bucket << ":" << o.key - << " " << cpp_strerror(r) << dendl; + << oc.bucket_info.bucket << ":" << o.key + << " " << cpp_strerror(r) + << " " << oc.wq->thr_name() << dendl; return r; } - ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << " (non-current expiration)" << dendl; + ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key + << " (non-current expiration) " + << oc.wq->thr_name() << dendl; return 0; } }; @@ -1003,12 +1089,16 @@ public: bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override { auto& o = oc.o; if (!o.is_delete_marker()) { - ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": not a delete marker, skipping" << dendl; + ldout(oc.cct, 20) << __func__ << "(): key=" << o.key + << ": not a delete marker, skipping " + << oc.wq->thr_name() << dendl; return false; } if (oc.ol.next_has_same_name()) { - ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": next is same object, skipping" << dendl; + ldout(oc.cct, 20) << __func__ << "(): key=" << o.key + << ": next is same object, skipping " + << oc.wq->thr_name() << dendl; return false; } @@ -1022,11 +1112,15 @@ public: int r = remove_expired_obj(oc, true); if (r < 0) { ldout(oc.cct, 0) << "ERROR: remove_expired_obj (delete marker expiration) " - << oc.bucket_info.bucket << ":" << o.key - << " " << cpp_strerror(r) << dendl; + << oc.bucket_info.bucket << ":" << o.key + << " " << cpp_strerror(r) + << " " << oc.wq->thr_name() + << dendl; return r; } - ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << " (delete marker expiration)" << dendl; + ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key + << " (delete marker expiration) " + << oc.wq->thr_name() << dendl; return 0; } }; @@ -1057,7 +1151,9 @@ public: bool is_expired; if (transition.days < 0) { if (transition.date == boost::none) { - ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": no transition day/date set in rule, skipping" << dendl; + ldout(oc.cct, 20) << __func__ << "(): key=" << o.key + << ": no transition day/date set in rule, skipping " + << oc.wq->thr_name() << dendl; return false; } is_expired = ceph_clock_now() >= @@ -1067,7 +1163,9 @@ public: is_expired = obj_has_expired(oc.cct, mtime, transition.days, exp_time); } - ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << is_expired << dendl; + ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" + << is_expired << " " + << oc.wq->thr_name() << dendl; need_to_process = (rgw_placement_rule::get_canonical_storage_class(o.meta.storage_class) != @@ -1089,9 +1187,11 @@ public: if (!oc.store->svc()->zone->get_zone_params(). valid_placement(target_placement)) { - ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: " << target_placement + ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: " + << target_placement << " bucket="<< oc.bucket_info.bucket - << " rule_id=" << oc.op.id << dendl; + << " rule_id=" << oc.op.id + << " " << oc.wq->thr_name() << dendl; return -EINVAL; } @@ -1100,12 +1200,16 @@ public: o.versioned_epoch, oc.dpp, null_yield); if (r < 0) { ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj " - << oc.bucket_info.bucket << ":" << o.key - << " -> " << transition.storage_class - << " " << cpp_strerror(r) << dendl; + << oc.bucket_info.bucket << ":" << o.key + << " -> " << transition.storage_class + << " " << cpp_strerror(r) + << " " << oc.wq->thr_name() << dendl; return r; } - ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket_info.bucket << ":" << o.key << " -> " << transition.storage_class << dendl; + ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket_info.bucket + << ":" << o.key << " -> " + << transition.storage_class + << " " << oc.wq->thr_name() << dendl; return 0; } }; @@ -1166,9 +1270,11 @@ void LCOpRule::build() } } -int LCOpRule::process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp) +int LCOpRule::process(rgw_bucket_dir_entry& o, + const DoutPrefixProvider *dpp, + WorkQ* wq) { - lc_op_ctx ctx(env, o, dpp); + lc_op_ctx ctx(env, o, dpp, wq); unique_ptr *selected = nullptr; real_time exp; @@ -1205,25 +1311,30 @@ int LCOpRule::process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp) } if (!cont) { - ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": no rule match, skipping" << dendl; + ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key + << ": no rule match, skipping " + << " " << wq->thr_name() << dendl; return 0; } int r = (*selected)->process(ctx); if (r < 0) { ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj " - << env.bucket_info.bucket << ":" << o.key - << " " << cpp_strerror(r) << dendl; + << env.bucket_info.bucket << ":" << o.key + << " " << cpp_strerror(r) + << " " << wq->thr_name() << dendl; return r; } - ldpp_dout(dpp, 20) << "processed:" << env.bucket_info.bucket << ":" << o.key << dendl; + ldpp_dout(dpp, 20) << "processed:" << env.bucket_info.bucket << ":" + << o.key << " " << wq->thr_name() << dendl; } return 0; } -int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker) +int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, + time_t stop_at) { RGWLifecycleConfiguration config(cct); RGWBucketInfo bucket_info; @@ -1239,13 +1350,22 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker) store->svc(), bucket_tenant, bucket_name, bucket_info, NULL, null_yield, &bucket_attrs); if (ret < 0) { - ldpp_dout(this, 0) << "LC:get_bucket_info for " << bucket_name << " failed" << dendl; + ldpp_dout(this, 0) << "LC:get_bucket_info for " << bucket_name + << " failed" << dendl; return ret; } + auto stack_guard = make_scope_guard( + [&worker, &bucket_info] + { + worker->workpool->drain(); + } + ); + if (bucket_info.bucket.marker != bucket_marker) { - ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket=" << bucket_tenant - << ":" << bucket_name << " cur_marker=" << bucket_info.bucket.marker + ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket=" + << bucket_tenant << ":" << bucket_name + << " cur_marker=" << bucket_info.bucket.marker << " orig_marker=" << bucket_marker << dendl; return -ENOENT; } @@ -1260,21 +1380,23 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker) try { config.decode(iter); } catch (const buffer::error& e) { - ldpp_dout(this, 0) << __func__ << "() decode life cycle config failed" << dendl; + ldpp_dout(this, 0) << __func__ << "() decode life cycle config failed" + << dendl; return -1; } - auto pf = [](RGWLC::LCWorker* wk, WorkItem& wi) { + auto pf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) { auto wt = boost::get>(wi); auto& [op_rule, o] = wt; ldpp_dout(wk->get_lc(), 20) - << __func__ << "(): key=" << o.key << dendl; - std::cout << "KEY2: " << o.key << std::endl; - int ret = op_rule.process(o, wk->dpp); + << __func__ << "(): key=" << o.key << wq->thr_name() + << dendl; + int ret = op_rule.process(o, wk->dpp, wq); if (ret < 0) { ldpp_dout(wk->get_lc(), 20) << "ERROR: orule.process() returned ret=" << ret + << wq->thr_name() << dendl; } }; @@ -1289,11 +1411,20 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker) rgw_obj_key next_marker; for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) { + + if (worker_should_stop(stop_at)) { + ldout(cct, 5) << __func__ << " interval budget EXPIRED worker " + << worker->ix + << dendl; + return 0; + } + auto& op = prefix_iter->second; if (!is_valid_op(op)) { continue; } - ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first << dendl; + ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first + << dendl; if (prefix_iter != prefix_map.begin() && (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) { @@ -1316,11 +1447,6 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker) op_env oenv(op, store, worker, bucket_info, ol); LCOpRule orule(oenv); orule.build(); // why can't ctor do it? -#if 0 - /* would permit passing o by reference, removes fetch overlap */ - auto fetch_barrier = [&worker]() - { worker->workpool->drain(); }; -#endif rgw_bucket_dir_entry* o{nullptr}; for (; ol.get_obj(&o /* , fetch_barrier */); ol.next()) { std::tuple t1 = {orule, *o}; @@ -1329,7 +1455,7 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker) worker->workpool->drain(); } - ret = handle_multipart_expiration(&target, prefix_map, worker); + ret = handle_multipart_expiration(&target, prefix_map, worker, stop_at); return ret; } @@ -1350,15 +1476,17 @@ int RGWLC::bucket_lc_post(int index, int max_lock_sec, do { int ret = l.lock_exclusive( &store->getRados()->lc_pool_ctx, obj_names[index]); - if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */ + if (ret == -EBUSY || ret == -EEXIST) { + /* already locked by another lc processor */ ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to acquire lock on " - << obj_names[index] << ", sleep 5, try again" << dendl; + << obj_names[index] << ", sleep 5, try again " << dendl; sleep(5); continue; } if (ret < 0) return 0; - ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index] << dendl; + ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index] + << dendl; if (result == -ENOENT) { ret = cls_rgw_lc_rm_entry(store->getRados()->lc_pool_ctx, obj_names[index], entry); @@ -1381,7 +1509,8 @@ int RGWLC::bucket_lc_post(int index, int max_lock_sec, } clean: l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]); - ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock " << obj_names[index] << dendl; + ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock " + << obj_names[index] << dendl; return 0; } while (true); } @@ -1434,12 +1563,42 @@ int RGWLC::process(LCWorker* worker) return 0; } +bool RGWLC::expired_session(time_t started) +{ + time_t interval = (cct->_conf->rgw_lc_debug_interval > 0) + ? cct->_conf->rgw_lc_debug_interval + : 24*60*60; + + auto now = time(nullptr); + + dout(16) << "RGWLC::expired_session" + << " started: " << started + << " interval: " << interval << "(*2==" << 2*interval << ")" + << " now: " << now + << dendl; + + return (started + 2*interval < now); +} + +time_t RGWLC::thread_stop_at() +{ + uint64_t interval = (cct->_conf->rgw_lc_debug_interval > 0) + ? cct->_conf->rgw_lc_debug_interval + : 24*60*60; + + return time(nullptr) + interval; +} + int RGWLC::process(int index, int max_lock_secs, LCWorker* worker) { + dout(5) << "RGWLC::process(): ENTER: " + << "index: " << index << " worker ix: " << worker->ix + << dendl; + rados::cls::lock::Lock l(lc_index_lock_name); do { utime_t now = ceph_clock_now(); - //string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS + //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS cls_rgw_lc_entry entry; if (max_lock_secs <= 0) return -EAGAIN; @@ -1449,7 +1608,8 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker) int ret = l.lock_exclusive(&store->getRados()->lc_pool_ctx, obj_names[index]); - if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */ + if (ret == -EBUSY || ret == -EEXIST) { + /* already locked by another lc processor */ ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on " << obj_names[index] << ", sleep 5, try again" << dendl; sleep(5); @@ -1470,12 +1630,20 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker) if (! (cct->_conf->rgw_lc_lock_max_time == 9969)) { ret = cls_rgw_lc_get_entry(store->getRados()->lc_pool_ctx, obj_names[index], head.marker, entry); - if ((entry.status == lc_processing) && - (true /* XXXX expired epoch! */)) { - dout(5) << "RGWLC::process(): ACTIVE entry: " << entry - << " index: " << index << " worker ix: " << worker->ix - << dendl; - goto exit; + if (ret >= 0) { + if (entry.status == lc_processing) { + if (expired_session(entry.start_time)) { + dout(5) << "RGWLC::process(): STALE lc session found for: " << entry + << " index: " << index << " worker ix: " << worker->ix + << " (clearing)" + << dendl; + } else { + dout(5) << "RGWLC::process(): ACTIVE entry: " << entry + << " index: " << index << " worker ix: " << worker->ix + << dendl; + goto exit; + } + } } } @@ -1532,7 +1700,7 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker) << dendl; l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]); - ret = bucket_lc_process(entry.bucket, worker); + ret = bucket_lc_process(entry.bucket, worker, thread_stop_at()); bucket_lc_post(index, max_lock_secs, entry, ret, worker); } while(1); @@ -1547,7 +1715,7 @@ void RGWLC::start_processor() workers.reserve(maxw); for (int ix = 0; ix < maxw; ++ix) { auto worker = - std::make_unique(this /* dpp */, cct, this); + std::make_unique(this /* dpp */, cct, this, ix); worker->create((string{"lifecycle_thr_"} + to_string(ix)).c_str()); workers.emplace_back(std::move(worker)); } diff --git a/src/rgw/rgw_lc.h b/src/rgw/rgw_lc.h index 793e6f90b7b8..fd8b565f2ae6 100644 --- a/src/rgw/rgw_lc.h +++ b/src/rgw/rgw_lc.h @@ -468,12 +468,16 @@ public: CephContext *cct; RGWLC *lc; int ix; - ceph::mutex lock = ceph::make_mutex("LCWorker"); - ceph::condition_variable cond; + std::mutex lock; + std::condition_variable cond; WorkPool* workpool{nullptr}; public: - LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWLC *_lc); + using lock_guard = std::lock_guard; + using unique_lock = std::unique_lock; + + LCWorker(const DoutPrefixProvider* dpp, CephContext *_cct, RGWLC *_lc, + int ix); RGWLC* get_lc() { return lc; } void *entry() override; void stop(); @@ -483,6 +487,7 @@ public: friend class RGWRados; friend class RGWLC; + friend class WorkQ; }; /* LCWorker */ friend class RGWRados; @@ -497,11 +502,13 @@ public: int process(LCWorker* worker); int process(int index, int max_secs, LCWorker* worker); - bool if_already_run_today(time_t& start_date); + bool if_already_run_today(time_t start_date); + bool expired_session(time_t started); + time_t thread_stop_at(); int list_lc_progress(const string& marker, uint32_t max_entries, vector&); int bucket_lc_prepare(int index, LCWorker* worker); - int bucket_lc_process(string& shard_id, LCWorker* worker); + int bucket_lc_process(string& shard_id, LCWorker* worker, time_t stop_at); int bucket_lc_post(int index, int max_lock_sec, cls_rgw_lc_entry& entry, int& result, LCWorker* worker); bool going_down(); @@ -521,7 +528,8 @@ public: int handle_multipart_expiration(RGWRados::Bucket *target, const multimap& prefix_map, - LCWorker* worker); + LCWorker* worker, + time_t stop_at); }; namespace rgw::lc {