#include <boost/algorithm/string/predicate.hpp>
#include <boost/variant.hpp>
+#include "include/scope_guard.h"
#include "common/Formatter.h"
#include "common/containers.h"
#include <common/errno.h>
ldpp_dout(dpp, 2) << "life cycle: start" << dendl;
int r = lc->process(this);
if (r < 0) {
- ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r=" << r << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r="
+ << r << dendl;
}
ldpp_dout(dpp, 2) << "life cycle: stop" << dendl;
}
utime_t next;
next.set_from_double(end + secs);
- ldpp_dout(dpp, 5) << "schedule life cycle next start time: " << rgw_to_asctime(next) << dendl;
+ ldpp_dout(dpp, 5) << "schedule life cycle next start time: "
+ << rgw_to_asctime(next) << dendl;
std::unique_lock l{lock};
cond.wait_for(l, std::chrono::seconds(secs));
delete[] obj_names;
}
-bool RGWLC::if_already_run_today(time_t& start_date)
+bool RGWLC::if_already_run_today(time_t start_date)
{
struct tm bdt;
time_t begin_of_day;
return false;
}
+static inline std::ostream& operator<<(std::ostream &os, cls_rgw_lc_entry& ent) {
+ os << "<ent: bucket=";
+ os << ent.bucket;
+ os << "; start_time=";
+ os << rgw_to_asctime(utime_t(time_t(ent.start_time), 0));
+ os << "; status=";
+ os << ent.status;
+ os << ">";
+ return os;
+}
+
int RGWLC::bucket_lc_prepare(int index, LCWorker* worker)
{
vector<cls_rgw_lc_entry> entries;
string marker;
+ dout(5) << "RGWLC::bucket_lc_prepare(): PREPARE "
+ << "index: " << index << " worker ix: " << worker->ix
+ << dendl;
+
#define MAX_LC_LIST_ENTRIES 100
do {
int ret = cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index],
try {
decode(retention, iter->second);
} catch (buffer::error& err) {
- ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectRetention" << dendl;
+ ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectRetention"
+ << dendl;
return false;
}
if (ceph::real_clock::to_time_t(retention.get_retain_until_date()) >
try {
decode(obj_legal_hold, iter->second);
} catch (buffer::error& err) {
- ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectLegalHold" << dendl;
+ ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectLegalHold"
+ << dendl;
return false;
}
if (obj_legal_hold.is_enabled()) {
}; /* op_env */
class LCRuleOp;
+class WorkQ;
struct lc_op_ctx {
CephContext *cct;
rgw_obj obj;
RGWObjectCtx rctx;
const DoutPrefixProvider *dpp;
+ WorkQ* wq;
- lc_op_ctx(op_env& _env, rgw_bucket_dir_entry& _o,
- const DoutPrefixProvider *_dpp)
- : cct(_env.store->ctx()), env(_env), o(_o),
+ lc_op_ctx(op_env& env, rgw_bucket_dir_entry& o,
+ const DoutPrefixProvider *dpp, WorkQ* wq)
+ : cct(env.store->ctx()), env(env), o(o),
store(env.store), bucket_info(env.bucket_info), op(env.op), ol(env.ol),
- obj(env.bucket_info.bucket, o.key), rctx(env.store), dpp(_dpp) {}
+ obj(env.bucket_info.bucket, o.key), rctx(env.store), dpp(dpp), wq(wq)
+ {}
}; /* lc_op_ctx */
static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed)
LCOpRule(op_env& _env) : env(_env) {}
void build();
- int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp);
+ int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp,
+ WorkQ* wq);
}; /* LCOpRule */
using WorkItem =
{
public:
using unique_lock = std::unique_lock<std::mutex>;
- using work_f = std::function<void(RGWLC::LCWorker*, WorkItem&)>;
+ using work_f = std::function<void(RGWLC::LCWorker*, WorkQ*, WorkItem&)>;
using dequeue_result = boost::variant<void*, WorkItem>;
+ static constexpr uint32_t FLAG_NONE = 0x0000;
+ static constexpr uint32_t FLAG_EWAIT_SYNC = 0x0001;
+ static constexpr uint32_t FLAG_DWAIT_SYNC = 0x0002;
+ static constexpr uint32_t FLAG_EDRAIN_SYNC = 0x0004;
+
private:
- const work_f bsf = [](RGWLC::LCWorker* wk, WorkItem& wi) {};
+ const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {};
RGWLC::LCWorker* wk;
uint32_t qmax;
+ int ix;
std::mutex mtx;
std::condition_variable cv;
+ uint32_t flags;
vector<WorkItem> items;
work_f f;
public:
WorkQ(RGWLC::LCWorker* wk, uint32_t ix, uint32_t qmax)
- : wk(wk), qmax(qmax), f(bsf)
+ : wk(wk), qmax(qmax), ix(ix), flags(FLAG_NONE), f(bsf)
{
- create((string{"workpool_thr_"} + to_string(ix)).c_str());
+ create(thr_name().c_str());
}
+ std::string thr_name() {
+ return std::string{"wp_thrd: "}
+ + std::to_string(wk->ix) + ", " + std::to_string(ix);
+ }
+
void setf(work_f _f) {
f = _f;
}
unique_lock uniq(mtx);
while ((!wk->get_lc()->going_down()) &&
(items.size() > qmax)) {
+ flags |= FLAG_EWAIT_SYNC;
cv.wait_for(uniq, 200ms);
}
items.push_back(item);
+ if (flags & FLAG_DWAIT_SYNC) {
+ flags &= ~FLAG_DWAIT_SYNC;
+ cv.notify_one();
+ }
}
void drain() {
unique_lock uniq(mtx);
- while ((!wk->get_lc()->going_down()) &&
- (items.size() > 0)) {
+ flags |= FLAG_EDRAIN_SYNC;
+ while (flags & FLAG_EDRAIN_SYNC) {
cv.wait_for(uniq, 200ms);
}
}
unique_lock uniq(mtx);
while ((!wk->get_lc()->going_down()) &&
(items.size() == 0)) {
+ /* clear drain state, as we are NOT doing work and qlen==0 */
+ if (flags & FLAG_EDRAIN_SYNC) {
+ flags &= ~FLAG_EDRAIN_SYNC;
+ }
+ flags |= FLAG_DWAIT_SYNC;
cv.wait_for(uniq, 200ms);
}
if (items.size() > 0) {
auto item = items.back();
items.pop_back();
+ if (flags & FLAG_EWAIT_SYNC) {
+ flags &= ~FLAG_EWAIT_SYNC;
+ cv.notify_one();
+ }
return {item};
}
return nullptr;
/* going down */
break;
}
- f(wk, boost::get<WorkItem>(item));
+ f(wk, this, boost::get<WorkItem>(item));
}
return nullptr;
}
}
}; /* WorkPool */
-RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct,
- RGWLC *_lc)
- : dpp(_dpp), cct(_cct), lc(_lc)
+RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* dpp, CephContext *cct,
+ RGWLC *lc, int ix)
+ : dpp(dpp), cct(cct), lc(lc), ix(ix)
{
auto wpw = cct->_conf.get_val<int64_t>("rgw_lc_max_wp_worker");
workpool = new WorkPool(this, wpw, 512);
}
+static inline bool worker_should_stop(time_t stop_at)
+{
+ return stop_at < time(nullptr);
+}
+
int RGWLC::handle_multipart_expiration(
RGWRados::Bucket *target, const multimap<string, lc_op>& prefix_map,
- LCWorker* worker)
+ LCWorker* worker, time_t stop_at)
{
MultipartMetaFilter mp_filter;
vector<rgw_bucket_dir_entry> objs;
list_op.params.ns = RGW_OBJ_NS_MULTIPART;
list_op.params.filter = &mp_filter;
- auto pf = [&](RGWLC::LCWorker* wk, WorkItem& wi) {
+ auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
auto wt = boost::get<std::tuple<const lc_op&, rgw_bucket_dir_entry>>(wi);
auto& [rule, obj] = wt;
RGWMPObj mp_obj;
if (ret < 0 && ret != -ERR_NO_SUCH_UPLOAD) {
ldpp_dout(wk->get_lc(), 0)
<< "ERROR: abort_multipart_upload failed, ret=" << ret
+ << wq->thr_name()
<< ", meta:" << obj.key
<< dendl;
} else if (ret == -ERR_NO_SUCH_UPLOAD) {
ldpp_dout(wk->get_lc(), 5)
<< "ERROR: abort_multipart_upload failed, ret=" << ret
+ << wq->thr_name()
<< ", meta:" << obj.key
<< dendl;
}
for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
++prefix_iter) {
+
+ if (worker_should_stop(stop_at)) {
+ ldout(cct, 5) << __func__ << " interval budget EXPIRED worker "
+ << worker->ix
+ << dendl;
+ return 0;
+ }
+
if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
continue;
}
{prefix_iter->second, *obj_iter};
worker->workpool->enqueue(WorkItem{t1});
if (going_down()) {
- worker->workpool->drain();
return 0;
}
} /* for objs */
oc.rctx, tags_bl);
if (ret < 0) {
if (ret != -ENODATA) {
- ldout(oc.cct, 5) << "ERROR: read_obj_tags returned r=" << ret << dendl;
+ ldout(oc.cct, 5) << "ERROR: read_obj_tags returned r="
+ << ret << " " << oc.wq->thr_name() << dendl;
}
return 0;
}
auto iter = tags_bl.cbegin();
dest_obj_tags.decode(iter);
} catch (buffer::error& err) {
- ldout(oc.cct,0) << "ERROR: caught buffer::error, couldn't decode TagSet" << dendl;
+ ldout(oc.cct,0) << "ERROR: caught buffer::error, couldn't decode TagSet "
+ << oc.wq->thr_name() << dendl;
return -EIO;
}
if (! has_all_tags(op, dest_obj_tags)) {
- ldout(oc.cct, 20) << __func__ << "() skipping obj " << oc.obj << " as tags do not match in rule: " << op.id << dendl;
+ ldout(oc.cct, 20) << __func__ << "() skipping obj " << oc.obj
+ << " as tags do not match in rule: "
+ << op.id << " "
+ << oc.wq->thr_name() << dendl;
return 0;
}
}
if (ret == -ENOENT) {
return false;
}
- ldout(oc.cct, 0) << "ERROR: check_tags on obj=" << oc.obj << " returned ret=" << ret << dendl;
+ ldout(oc.cct, 0) << "ERROR: check_tags on obj=" << oc.obj
+ << " returned ret=" << ret << " "
+ << oc.wq->thr_name() << dendl;
return false;
}
bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
auto& o = oc.o;
if (!o.is_current()) {
- ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": not current, skipping" << dendl;
+ ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+ << ": not current, skipping "
+ << oc.wq->thr_name() << dendl;
return false;
}
if (o.is_delete_marker()) {
auto& op = oc.op;
if (op.expiration <= 0) {
if (op.expiration_date == boost::none) {
- ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": no expiration set in rule, skipping" << dendl;
+ ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+ << ": no expiration set in rule, skipping "
+ << oc.wq->thr_name() << dendl;
return false;
}
is_expired = ceph_clock_now() >=
is_expired = obj_has_expired(oc.cct, mtime, op.expiration, exp_time);
}
- ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << (int)is_expired << dendl;
+ ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired="
+ << (int)is_expired << " "
+ << oc.wq->thr_name() << dendl;
return is_expired;
}
}
if (r < 0) {
ldout(oc.cct, 0) << "ERROR: remove_expired_obj "
- << oc.bucket_info.bucket << ":" << o.key
- << " " << cpp_strerror(r) << dendl;
+ << oc.bucket_info.bucket << ":" << o.key
+ << " " << cpp_strerror(r) << " "
+ << oc.wq->thr_name() << dendl;
return r;
}
- ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << dendl;
+ ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
+ << " " << oc.wq->thr_name() << dendl;
return 0;
}
};
bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
auto& o = oc.o;
if (o.is_current()) {
- ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": current version, skipping" << dendl;
+ ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+ << ": current version, skipping "
+ << oc.wq->thr_name() << dendl;
return false;
}
int expiration = oc.op.noncur_expiration;
bool is_expired = obj_has_expired(oc.cct, mtime, expiration, exp_time);
- ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << is_expired << dendl;
+ ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired="
+ << is_expired << " "
+ << oc.wq->thr_name() << dendl;
return is_expired &&
pass_object_lock_check(oc.store->getRados(),
oc.bucket_info, oc.obj, oc.rctx);
int r = remove_expired_obj(oc, true);
if (r < 0) {
ldout(oc.cct, 0) << "ERROR: remove_expired_obj (non-current expiration) "
- << oc.bucket_info.bucket << ":" << o.key
- << " " << cpp_strerror(r) << dendl;
+ << oc.bucket_info.bucket << ":" << o.key
+ << " " << cpp_strerror(r)
+ << " " << oc.wq->thr_name() << dendl;
return r;
}
- ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << " (non-current expiration)" << dendl;
+ ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
+ << " (non-current expiration) "
+ << oc.wq->thr_name() << dendl;
return 0;
}
};
bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
auto& o = oc.o;
if (!o.is_delete_marker()) {
- ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": not a delete marker, skipping" << dendl;
+ ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+ << ": not a delete marker, skipping "
+ << oc.wq->thr_name() << dendl;
return false;
}
if (oc.ol.next_has_same_name()) {
- ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": next is same object, skipping" << dendl;
+ ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+ << ": next is same object, skipping "
+ << oc.wq->thr_name() << dendl;
return false;
}
int r = remove_expired_obj(oc, true);
if (r < 0) {
ldout(oc.cct, 0) << "ERROR: remove_expired_obj (delete marker expiration) "
- << oc.bucket_info.bucket << ":" << o.key
- << " " << cpp_strerror(r) << dendl;
+ << oc.bucket_info.bucket << ":" << o.key
+ << " " << cpp_strerror(r)
+ << " " << oc.wq->thr_name()
+ << dendl;
return r;
}
- ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << " (delete marker expiration)" << dendl;
+ ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
+ << " (delete marker expiration) "
+ << oc.wq->thr_name() << dendl;
return 0;
}
};
bool is_expired;
if (transition.days < 0) {
if (transition.date == boost::none) {
- ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": no transition day/date set in rule, skipping" << dendl;
+ ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+ << ": no transition day/date set in rule, skipping "
+ << oc.wq->thr_name() << dendl;
return false;
}
is_expired = ceph_clock_now() >=
is_expired = obj_has_expired(oc.cct, mtime, transition.days, exp_time);
}
- ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << is_expired << dendl;
+ ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired="
+ << is_expired << " "
+ << oc.wq->thr_name() << dendl;
need_to_process =
(rgw_placement_rule::get_canonical_storage_class(o.meta.storage_class) !=
if (!oc.store->svc()->zone->get_zone_params().
valid_placement(target_placement)) {
- ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: " << target_placement
+ ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: "
+ << target_placement
<< " bucket="<< oc.bucket_info.bucket
- << " rule_id=" << oc.op.id << dendl;
+ << " rule_id=" << oc.op.id
+ << " " << oc.wq->thr_name() << dendl;
return -EINVAL;
}
o.versioned_epoch, oc.dpp, null_yield);
if (r < 0) {
ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj "
- << oc.bucket_info.bucket << ":" << o.key
- << " -> " << transition.storage_class
- << " " << cpp_strerror(r) << dendl;
+ << oc.bucket_info.bucket << ":" << o.key
+ << " -> " << transition.storage_class
+ << " " << cpp_strerror(r)
+ << " " << oc.wq->thr_name() << dendl;
return r;
}
- ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket_info.bucket << ":" << o.key << " -> " << transition.storage_class << dendl;
+ ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket_info.bucket
+ << ":" << o.key << " -> "
+ << transition.storage_class
+ << " " << oc.wq->thr_name() << dendl;
return 0;
}
};
}
}
-int LCOpRule::process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp)
+int LCOpRule::process(rgw_bucket_dir_entry& o,
+ const DoutPrefixProvider *dpp,
+ WorkQ* wq)
{
- lc_op_ctx ctx(env, o, dpp);
+ lc_op_ctx ctx(env, o, dpp, wq);
unique_ptr<LCOpAction> *selected = nullptr;
real_time exp;
}
if (!cont) {
- ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": no rule match, skipping" << dendl;
+ ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
+ << ": no rule match, skipping "
+ << " " << wq->thr_name() << dendl;
return 0;
}
int r = (*selected)->process(ctx);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj "
- << env.bucket_info.bucket << ":" << o.key
- << " " << cpp_strerror(r) << dendl;
+ << env.bucket_info.bucket << ":" << o.key
+ << " " << cpp_strerror(r)
+ << " " << wq->thr_name() << dendl;
return r;
}
- ldpp_dout(dpp, 20) << "processed:" << env.bucket_info.bucket << ":" << o.key << dendl;
+ ldpp_dout(dpp, 20) << "processed:" << env.bucket_info.bucket << ":"
+ << o.key << " " << wq->thr_name() << dendl;
}
return 0;
}
-int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker)
+int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
+ time_t stop_at)
{
RGWLifecycleConfiguration config(cct);
RGWBucketInfo bucket_info;
store->svc(), bucket_tenant, bucket_name, bucket_info, NULL, null_yield,
&bucket_attrs);
if (ret < 0) {
- ldpp_dout(this, 0) << "LC:get_bucket_info for " << bucket_name << " failed" << dendl;
+ ldpp_dout(this, 0) << "LC:get_bucket_info for " << bucket_name
+ << " failed" << dendl;
return ret;
}
+ auto stack_guard = make_scope_guard(
+ [&worker, &bucket_info]
+ {
+ worker->workpool->drain();
+ }
+ );
+
if (bucket_info.bucket.marker != bucket_marker) {
- ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket=" << bucket_tenant
- << ":" << bucket_name << " cur_marker=" << bucket_info.bucket.marker
+ ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket="
+ << bucket_tenant << ":" << bucket_name
+ << " cur_marker=" << bucket_info.bucket.marker
<< " orig_marker=" << bucket_marker << dendl;
return -ENOENT;
}
try {
config.decode(iter);
} catch (const buffer::error& e) {
- ldpp_dout(this, 0) << __func__ << "() decode life cycle config failed" << dendl;
+ ldpp_dout(this, 0) << __func__ << "() decode life cycle config failed"
+ << dendl;
return -1;
}
- auto pf = [](RGWLC::LCWorker* wk, WorkItem& wi) {
+ auto pf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
auto wt =
boost::get<std::tuple<LCOpRule&, rgw_bucket_dir_entry>>(wi);
auto& [op_rule, o] = wt;
ldpp_dout(wk->get_lc(), 20)
- << __func__ << "(): key=" << o.key << dendl;
- std::cout << "KEY2: " << o.key << std::endl;
- int ret = op_rule.process(o, wk->dpp);
+ << __func__ << "(): key=" << o.key << wq->thr_name()
+ << dendl;
+ int ret = op_rule.process(o, wk->dpp, wq);
if (ret < 0) {
ldpp_dout(wk->get_lc(), 20)
<< "ERROR: orule.process() returned ret=" << ret
+ << wq->thr_name()
<< dendl;
}
};
rgw_obj_key next_marker;
for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
++prefix_iter) {
+
+ if (worker_should_stop(stop_at)) {
+ ldout(cct, 5) << __func__ << " interval budget EXPIRED worker "
+ << worker->ix
+ << dendl;
+ return 0;
+ }
+
auto& op = prefix_iter->second;
if (!is_valid_op(op)) {
continue;
}
- ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first << dendl;
+ ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first
+ << dendl;
if (prefix_iter != prefix_map.begin() &&
(prefix_iter->first.compare(0, prev(prefix_iter)->first.length(),
prev(prefix_iter)->first) == 0)) {
op_env oenv(op, store, worker, bucket_info, ol);
LCOpRule orule(oenv);
orule.build(); // why can't ctor do it?
-#if 0
- /* would permit passing o by reference, removes fetch overlap */
- auto fetch_barrier = [&worker]()
- { worker->workpool->drain(); };
-#endif
rgw_bucket_dir_entry* o{nullptr};
for (; ol.get_obj(&o /* , fetch_barrier */); ol.next()) {
std::tuple<LCOpRule&, rgw_bucket_dir_entry> t1 = {orule, *o};
worker->workpool->drain();
}
- ret = handle_multipart_expiration(&target, prefix_map, worker);
+ ret = handle_multipart_expiration(&target, prefix_map, worker, stop_at);
return ret;
}
do {
int ret = l.lock_exclusive(
&store->getRados()->lc_pool_ctx, obj_names[index]);
- if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */
+ if (ret == -EBUSY || ret == -EEXIST) {
+ /* already locked by another lc processor */
ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to acquire lock on "
- << obj_names[index] << ", sleep 5, try again" << dendl;
+ << obj_names[index] << ", sleep 5, try again " << dendl;
sleep(5);
continue;
}
if (ret < 0)
return 0;
- ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index] << dendl;
+ ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index]
+ << dendl;
if (result == -ENOENT) {
ret = cls_rgw_lc_rm_entry(store->getRados()->lc_pool_ctx,
obj_names[index], entry);
}
clean:
l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
- ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock " << obj_names[index] << dendl;
+ ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock "
+ << obj_names[index] << dendl;
return 0;
} while (true);
}
return 0;
}
+bool RGWLC::expired_session(time_t started)
+{
+ time_t interval = (cct->_conf->rgw_lc_debug_interval > 0)
+ ? cct->_conf->rgw_lc_debug_interval
+ : 24*60*60;
+
+ auto now = time(nullptr);
+
+ dout(16) << "RGWLC::expired_session"
+ << " started: " << started
+ << " interval: " << interval << "(*2==" << 2*interval << ")"
+ << " now: " << now
+ << dendl;
+
+ return (started + 2*interval < now);
+}
+
+time_t RGWLC::thread_stop_at()
+{
+ uint64_t interval = (cct->_conf->rgw_lc_debug_interval > 0)
+ ? cct->_conf->rgw_lc_debug_interval
+ : 24*60*60;
+
+ return time(nullptr) + interval;
+}
+
int RGWLC::process(int index, int max_lock_secs, LCWorker* worker)
{
+ dout(5) << "RGWLC::process(): ENTER: "
+ << "index: " << index << " worker ix: " << worker->ix
+ << dendl;
+
rados::cls::lock::Lock l(lc_index_lock_name);
do {
utime_t now = ceph_clock_now();
- //string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS
+ //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS
cls_rgw_lc_entry entry;
if (max_lock_secs <= 0)
return -EAGAIN;
int ret = l.lock_exclusive(&store->getRados()->lc_pool_ctx,
obj_names[index]);
- if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */
+ if (ret == -EBUSY || ret == -EEXIST) {
+ /* already locked by another lc processor */
ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
<< obj_names[index] << ", sleep 5, try again" << dendl;
sleep(5);
if (! (cct->_conf->rgw_lc_lock_max_time == 9969)) {
ret = cls_rgw_lc_get_entry(store->getRados()->lc_pool_ctx,
obj_names[index], head.marker, entry);
- if ((entry.status == lc_processing) &&
- (true /* XXXX expired epoch! */)) {
- dout(5) << "RGWLC::process(): ACTIVE entry: " << entry
- << " index: " << index << " worker ix: " << worker->ix
- << dendl;
- goto exit;
+ if (ret >= 0) {
+ if (entry.status == lc_processing) {
+ if (expired_session(entry.start_time)) {
+ dout(5) << "RGWLC::process(): STALE lc session found for: " << entry
+ << " index: " << index << " worker ix: " << worker->ix
+ << " (clearing)"
+ << dendl;
+ } else {
+ dout(5) << "RGWLC::process(): ACTIVE entry: " << entry
+ << " index: " << index << " worker ix: " << worker->ix
+ << dendl;
+ goto exit;
+ }
+ }
}
}
<< dendl;
l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
- ret = bucket_lc_process(entry.bucket, worker);
+ ret = bucket_lc_process(entry.bucket, worker, thread_stop_at());
bucket_lc_post(index, max_lock_secs, entry, ret, worker);
} while(1);
workers.reserve(maxw);
for (int ix = 0; ix < maxw; ++ix) {
auto worker =
- std::make_unique<RGWLC::LCWorker>(this /* dpp */, cct, this);
+ std::make_unique<RGWLC::LCWorker>(this /* dpp */, cct, this, ix);
worker->create((string{"lifecycle_thr_"} + to_string(ix)).c_str());
workers.emplace_back(std::move(worker));
}