store(_store), bucket_info(_bucket_info),
target(store->getRados(), bucket_info), list_op(&target) {
list_op.params.list_versions = bucket_info.versioned();
- list_op.params.allow_unordered = true;
delay_ms = store->ctx()->_conf.get_val<int64_t>("rgw_lc_thread_delay");
}
* only happen if is_truncated is false */
return boost::none;
}
+
return ((obj_iter + 1)->key.name);
}
using LCWorker = RGWLC::LCWorker;
- lc_op& op;
+ lc_op op;
rgw::sal::RGWRadosStore *store;
LCWorker* worker;
RGWBucketInfo& bucket_info;
struct lc_op_ctx {
CephContext *cct;
- op_env& env;
- rgw_bucket_dir_entry& o;
+ op_env env;
+ rgw_bucket_dir_entry o;
+ boost::optional<std::string> next_key_name;
+ ceph::real_time dm_effective_mtime;
rgw::sal::RGWRadosStore *store;
RGWBucketInfo& bucket_info;
- lc_op& op;
+ lc_op& op; // ok--refers to expanded env.op
LCObjsLister& ol;
rgw_obj obj;
WorkQ* wq;
lc_op_ctx(op_env& env, rgw_bucket_dir_entry& o,
+ boost::optional<std::string> next_key_name,
+ ceph::real_time dem,
const DoutPrefixProvider *dpp, WorkQ* wq)
- : cct(env.store->ctx()), env(env), o(o),
+ : cct(env.store->ctx()), env(env), o(o), next_key_name(next_key_name),
store(env.store), bucket_info(env.bucket_info), op(env.op), ol(env.ol),
obj(env.bucket_info.bucket, o.key), rctx(env.store), dpp(dpp), wq(wq)
{}
+
+ bool next_has_same_name(const std::string& key_name) {
+ return (next_key_name && key_name.compare(
+ boost::get<std::string>(next_key_name)) == 0);
+ }
+
}; /* lc_op_ctx */
static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed)
} /* remove_expired_obj */
class LCOpAction {
-protected:
- boost::optional<std::string> next_key_name;
public:
virtual ~LCOpAction() {}
- bool next_has_same_name(const std::string& key_name) {
- return (next_key_name && key_name.compare(
- boost::get<std::string>(next_key_name)) == 0);
- }
-
virtual bool check(lc_op_ctx& oc, ceph::real_time *exp_time) {
return false;
- };
+ }
/* called after check(). Check should tell us whether this action
* is applicable. If there are multiple actions, we'll end up executing
virtual int process(lc_op_ctx& oc) {
return 0;
}
+
+ friend class LCOpRule;
}; /* LCOpAction */
class LCOpFilter {
class LCOpRule {
friend class LCOpAction;
- op_env& env;
+ op_env env;
+ boost::optional<std::string> next_key_name;
+ ceph::real_time dm_effective_mtime;
- std::vector<unique_ptr<LCOpFilter> > filters;
- std::vector<unique_ptr<LCOpAction> > actions;
+ std::vector<shared_ptr<LCOpFilter> > filters; // n.b., sharing ovhd
+ std::vector<shared_ptr<LCOpAction> > actions;
public:
LCOpRule(op_env& _env) : env(_env) {}
+ boost::optional<std::string> get_next_key_name() {
+ return next_key_name;
+ }
+
void build();
+ void update();
int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp,
WorkQ* wq);
}; /* LCOpRule */
using WorkItem =
boost::variant<void*,
/* out-of-line delete */
- std::tuple<LCOpRule&, rgw_bucket_dir_entry>,
+ std::tuple<LCOpRule, rgw_bucket_dir_entry>,
/* uncompleted MPU expiration */
- std::tuple<const lc_op&, rgw_bucket_dir_entry>,
+ std::tuple<lc_op, rgw_bucket_dir_entry>,
rgw_bucket_dir_entry>;
class WorkQ : public Thread
list_op.params.filter = &mp_filter;
auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
- auto wt = boost::get<std::tuple<const lc_op&, rgw_bucket_dir_entry>>(wi);
+ auto wt = boost::get<std::tuple<lc_op, rgw_bucket_dir_entry>>(wi);
auto& [rule, obj] = wt;
RGWMPObj mp_obj;
if (obj_has_expired(cct, obj.meta.mtime, rule.mp_expiration)) {
}
for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
- std::tuple<const lc_op&, rgw_bucket_dir_entry> t1 =
+ std::tuple<lc_op, rgw_bucket_dir_entry> t1 =
{prefix_iter->second, *obj_iter};
worker->workpool->enqueue(WorkItem{t1});
if (going_down()) {
class LCOpAction_CurrentExpiration : public LCOpAction {
public:
- LCOpAction_CurrentExpiration(op_env& env) {
- next_key_name = env.ol.next_key_name();
- }
+ LCOpAction_CurrentExpiration(op_env& env) {}
bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
auto& o = oc.o;
return false;
}
if (o.is_delete_marker()) {
- if (next_has_same_name(o.key.name)) {
+ std::string nkn;
+ if (oc.next_key_name) nkn = *oc.next_key_name;
+ if (oc.next_has_same_name(o.key.name)) {
+ ldout(oc.cct, 7) << __func__ << "(): dm-check SAME: key=" << o.key
+ << " next_key_name: %%" << nkn << "%% "
+ << oc.wq->thr_name() << dendl;
return false;
} else {
+ ldout(oc.cct, 7) << __func__ << "(): dm-check DELE: key=" << o.key
+ << " next_key_name: %%" << nkn << "%% "
+ << oc.wq->thr_name() << dendl;
*exp_time = real_clock::now();
return true;
}
int r;
if (o.is_delete_marker()) {
r = remove_expired_obj(oc, true);
+ if (r < 0) {
+ ldout(oc.cct, 0) << "ERROR: current is-dm remove_expired_obj "
+ << oc.bucket_info.bucket << ":" << o.key
+ << " " << cpp_strerror(r) << " "
+ << oc.wq->thr_name() << dendl;
+ return r;
+ }
+ ldout(oc.cct, 2) << "DELETED: current is-dm "
+ << oc.bucket_info.bucket << ":" << o.key
+ << " " << oc.wq->thr_name() << dendl;
} else {
+ /* ! o.is_delete_marker() */
r = remove_expired_obj(oc, !oc.bucket_info.versioned());
+ if (r < 0) {
+ ldout(oc.cct, 0) << "ERROR: remove_expired_obj "
+ << oc.bucket_info.bucket << ":" << o.key
+ << " " << cpp_strerror(r) << " "
+ << oc.wq->thr_name() << dendl;
+ return r;
+ }
+ ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
+ << " " << oc.wq->thr_name() << dendl;
}
- if (r < 0) {
- ldout(oc.cct, 0) << "ERROR: remove_expired_obj "
- << oc.bucket_info.bucket << ":" << o.key
- << " " << cpp_strerror(r) << " "
- << oc.wq->thr_name() << dendl;
- return r;
- }
- ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
- << " " << oc.wq->thr_name() << dendl;
return 0;
}
};
class LCOpAction_DMExpiration : public LCOpAction {
public:
- LCOpAction_DMExpiration(op_env& env) {
- next_key_name = env.ol.next_key_name();
- }
+ LCOpAction_DMExpiration(op_env& env) {}
bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
auto& o = oc.o;
<< oc.wq->thr_name() << dendl;
return false;
}
- if (next_has_same_name(o.key.name)) {
+ if (oc.next_has_same_name(o.key.name)) {
ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
<< ": next is same object, skipping "
<< oc.wq->thr_name() << dendl;
class LCOpAction_NonCurrentTransition : public LCOpAction_Transition {
protected:
- ceph::real_time effective_mtime;
-
bool check_current_state(bool is_current) override {
return !is_current;
}
ceph::real_time get_effective_mtime(lc_op_ctx& oc) override {
- return effective_mtime;
+ return oc.dm_effective_mtime;
}
public:
LCOpAction_NonCurrentTransition(op_env& env,
const transition_action& _transition)
- : LCOpAction_Transition(_transition),
- effective_mtime(env.ol.get_prev_obj().meta.mtime)
+ : LCOpAction_Transition(_transition)
{}
};
}
}
+void LCOpRule::update()
+{
+ next_key_name = env.ol.next_key_name();
+ dm_effective_mtime = env.ol.get_prev_obj().meta.mtime;
+}
+
int LCOpRule::process(rgw_bucket_dir_entry& o,
const DoutPrefixProvider *dpp,
WorkQ* wq)
{
- lc_op_ctx ctx(env, o, dpp, wq);
-
- unique_ptr<LCOpAction> *selected = nullptr;
+ lc_op_ctx ctx(env, o, next_key_name, dm_effective_mtime, dpp, wq);
+ shared_ptr<LCOpAction> *selected = nullptr; // n.b., req'd by sharing
real_time exp;
for (auto& a : actions) {
auto pf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
auto wt =
- boost::get<std::tuple<LCOpRule&, rgw_bucket_dir_entry>>(wi);
+ boost::get<std::tuple<LCOpRule, rgw_bucket_dir_entry>>(wi);
auto& [op_rule, o] = wt;
+
ldpp_dout(wk->get_lc(), 20)
<< __func__ << "(): key=" << o.key << wq->thr_name()
<< dendl;
orule.build(); // why can't ctor do it?
rgw_bucket_dir_entry* o{nullptr};
for (; ol.get_obj(&o /* , fetch_barrier */); ol.next()) {
- std::tuple<LCOpRule&, rgw_bucket_dir_entry> t1 = {orule, *o};
+ orule.update();
+ std::tuple<LCOpRule, rgw_bucket_dir_entry> t1 = {orule, *o};
worker->workpool->enqueue(WorkItem{t1});
}
worker->workpool->drain();
int max_objs =
(cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME :
cct->_conf->rgw_lc_max_objs);
- /* XXXX oh noes!!! */
+ /* n.b. review hash algo */
int index = ceph_str_hash_linux(shard_id.c_str(),
shard_id.size()) % HASH_PRIME % max_objs;
*oid = lc_oid_prefix;