#include <iostream>
#include <map>
#include <algorithm>
+#include <tuple>
+#include <functional>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/predicate.hpp>
+#include <boost/variant.hpp>
#include "common/Formatter.h"
+#include "common/containers.h"
#include <common/errno.h>
#include "include/random.h"
#include "cls/rgw/cls_rgw_client.h"
return true;
}
-RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct,
- RGWLC *_lc)
- : dpp(_dpp), cct(_cct), lc(_lc) {}
-
void *RGWLC::LCWorker::entry() {
do {
utime_t start = ceph_clock_now();
}
}
-int RGWLC::handle_multipart_expiration(
- RGWRados::Bucket *target, const multimap<string, lc_op>& prefix_map)
-{
- MultipartMetaFilter mp_filter;
- vector<rgw_bucket_dir_entry> objs;
- RGWMPObj mp_obj;
- bool is_truncated;
- int ret;
- RGWBucketInfo& bucket_info = target->get_bucket_info();
- RGWRados::Bucket::List list_op(target);
- auto delay_ms = cct->_conf.get_val<int64_t>("rgw_lc_thread_delay");
- list_op.params.list_versions = false;
- /* lifecycle processing does not depend on total order, so can
- * take advantage of unordered listing optimizations--such as
- * operating on one shard at a time */
- list_op.params.allow_unordered = true;
- list_op.params.ns = RGW_OBJ_NS_MULTIPART;
- list_op.params.filter = &mp_filter;
- for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
- ++prefix_iter) {
- if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
- continue;
- }
- list_op.params.prefix = prefix_iter->first;
- do {
- objs.clear();
- list_op.params.marker = list_op.get_next_marker();
- ret = list_op.list_objects(1000, &objs, NULL, &is_truncated, null_yield);
- if (ret < 0) {
- if (ret == (-ENOENT))
- return 0;
- ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
- return ret;
- }
-
- for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
- if (obj_has_expired(cct, obj_iter->meta.mtime,
- prefix_iter->second.mp_expiration)) {
- rgw_obj_key key(obj_iter->key);
- if (!mp_obj.from_meta(key.name)) {
- continue;
- }
- RGWObjectCtx rctx(store);
- /* XXXX where is this defined? */
- ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj);
- if (ret < 0 && ret != -ERR_NO_SUCH_UPLOAD) {
- ldpp_dout(this, 0) << "ERROR: abort_multipart_upload failed, ret=" << ret << ", meta:" << obj_iter->key << dendl;
- } else if (ret == -ERR_NO_SUCH_UPLOAD) {
- ldpp_dout(this, 5) << "ERROR: abort_multipart_upload failed, ret=" << ret << ", meta:" << obj_iter->key << dendl;
- }
- if (going_down())
- return 0;
- }
- } /* for objs */
- std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
- } while(is_truncated);
- }
- return 0;
-}
-
-static int read_obj_tags(RGWRados *store, RGWBucketInfo& bucket_info,
- rgw_obj& obj, RGWObjectCtx& ctx, bufferlist& tags_bl)
-{
- RGWRados::Object op_target(store, bucket_info, ctx, obj);
- RGWRados::Object::Read read_op(&op_target);
-
- return read_op.get_attr(RGW_ATTR_TAGS, tags_bl, null_yield);
-}
-
-static bool is_valid_op(const lc_op& op)
-{
- return (op.status &&
- (op.expiration > 0
- || op.expiration_date != boost::none
- || op.noncur_expiration > 0
- || op.dm_expiration
- || !op.transitions.empty()
- || !op.noncur_transitions.empty()));
-}
-
-static inline bool has_all_tags(const lc_op& rule_action,
- const RGWObjTags& object_tags)
-{
- if(! rule_action.obj_tags)
- return false;
- if(object_tags.count() < rule_action.obj_tags->count())
- return false;
- size_t tag_count = 0;
- for (const auto& tag : object_tags.get_tags()) {
- const auto& rule_tags = rule_action.obj_tags->get_tags();
- const auto& iter = rule_tags.find(tag.first);
- if(iter->second == tag.second)
- {
- tag_count++;
- }
- /* all tags in the rule appear in obj tags */
- }
- return tag_count == rule_action.obj_tags->count();
-}
-
class LCObjsLister {
rgw::sal::RGWRadosStore *store;
RGWBucketInfo& bucket_info;
std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
}
- bool get_obj(rgw_bucket_dir_entry *obj) {
+ bool get_obj(rgw_bucket_dir_entry **obj,
+ std::function<void(void)> fetch_barrier
+ = []() { /* nada */}) {
if (obj_iter == objs.end()) {
if (!is_truncated) {
delay();
return false;
} else {
+ fetch_barrier();
list_op.params.marker = pre_obj.key;
-
int ret = fetch();
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: list_op returned ret=" << ret << dendl;
+ ldout(store->ctx(), 0) << "ERROR: list_op returned ret=" << ret
+ << dendl;
return ret;
}
}
delay();
}
- *obj = *obj_iter;
+ /* returning address of entry in objs */
+ *obj = &(*obj_iter);
return obj_iter != objs.end();
}
}
return (obj_iter->key.name.compare((obj_iter + 1)->key.name) == 0);
}
-};
-
+}; /* LCObjsLister */
struct op_env {
}
return del_op.delete_obj(null_yield);
-}
+} /* remove_expired_obj */
class LCOpAction {
public:
virtual int process(lc_op_ctx& oc) {
return 0;
}
-};
+}; /* LCOpAction */
class LCOpFilter {
public:
virtual bool check(lc_op_ctx& oc) {
return false;
}
-};
+}; /* LCOpFilter */
class LCOpRule {
friend class LCOpAction;
void build();
int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp);
-};
+}; /* LCOpRule */
+
+using WorkItem =
+ boost::variant<void*,
+ /* out-of-line delete */
+ std::tuple<LCOpRule&, rgw_bucket_dir_entry>,
+ /* uncompleted MPU expiration */
+ std::tuple<const lc_op&, rgw_bucket_dir_entry>,
+ rgw_bucket_dir_entry>;
+
+class WorkQ : public Thread
+{
+public:
+ using unique_lock = std::unique_lock<std::mutex>;
+ using work_f = std::function<void(RGWLC::LCWorker*, WorkItem&)>;
+ using dequeue_result = boost::variant<void*, WorkItem>;
+
+private:
+ const work_f bsf = [](RGWLC::LCWorker* wk, WorkItem& wi) {};
+ RGWLC::LCWorker* wk;
+ uint32_t qmax;
+ std::mutex mtx;
+ std::condition_variable cv;
+ vector<WorkItem> items;
+ work_f f;
+
+public:
+ WorkQ(RGWLC::LCWorker* wk, uint32_t ix, uint32_t qmax)
+ : wk(wk), qmax(qmax), f(bsf)
+ {
+ create((string{"workpool_thr_"} + to_string(ix)).c_str());
+ }
+
+ void setf(work_f _f) {
+ f = _f;
+ }
+
+ void enqueue(WorkItem&& item) {
+ unique_lock uniq(mtx);
+ while ((!wk->get_lc()->going_down()) &&
+ (items.size() > qmax)) {
+ cv.wait_for(uniq, 200ms);
+ }
+ items.push_back(item);
+ }
+
+ void drain() {
+ unique_lock uniq(mtx);
+ while ((!wk->get_lc()->going_down()) &&
+ (items.size() > 0)) {
+ cv.wait_for(uniq, 200ms);
+ }
+ }
+
+private:
+ dequeue_result dequeue() {
+ unique_lock uniq(mtx);
+ while ((!wk->get_lc()->going_down()) &&
+ (items.size() == 0)) {
+ cv.wait_for(uniq, 200ms);
+ }
+ if (items.size() > 0) {
+ auto item = items.back();
+ items.pop_back();
+ return {item};
+ }
+ return nullptr;
+ }
+
+ void* entry() override {
+ while (!wk->get_lc()->going_down()) {
+ auto item = dequeue();
+ if (item.which() == 0) {
+ /* going down */
+ break;
+ }
+ f(wk, boost::get<WorkItem>(item));
+ }
+ return nullptr;
+ }
+}; /* WorkQ */
+
+class RGWLC::WorkPool
+{
+ using TVector = ceph::containers::tiny_vector<WorkQ, 3>;
+ TVector wqs;
+ uint64_t ix;
+
+public:
+ WorkPool(RGWLC::LCWorker* wk, uint16_t n_threads, uint32_t qmax)
+ : wqs(TVector{
+ n_threads,
+ [&](const size_t ix, auto emplacer) {
+ emplacer.emplace(wk, ix, qmax);
+ }}),
+ ix(0)
+ {}
+
+ void setf(WorkQ::work_f _f) {
+ for (auto& wq : wqs) {
+ wq.setf(_f);
+ }
+ }
+
+ void enqueue(WorkItem item) {
+ const auto tix = ix;
+ ix = (ix+1) % wqs.size();
+ (wqs[tix]).enqueue(std::move(item));
+ }
+
+ void drain() {
+ for (auto& wq : wqs) {
+ wq.drain();
+ }
+ }
+}; /* WorkPool */
+
+RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct,
+ RGWLC *_lc)
+ : dpp(_dpp), cct(_cct), lc(_lc)
+{
+ auto wpw = cct->_conf.get_val<int64_t>("rgw_lc_max_wp_worker");
+ workpool = new WorkPool(this, wpw, 512);
+}
+
+int RGWLC::handle_multipart_expiration(
+ RGWRados::Bucket *target, const multimap<string, lc_op>& prefix_map,
+ LCWorker* worker)
+{
+ MultipartMetaFilter mp_filter;
+ vector<rgw_bucket_dir_entry> objs;
+ bool is_truncated;
+ int ret;
+ RGWBucketInfo& bucket_info = target->get_bucket_info();
+ RGWRados::Bucket::List list_op(target);
+ auto delay_ms = cct->_conf.get_val<int64_t>("rgw_lc_thread_delay");
+ list_op.params.list_versions = false;
+ /* lifecycle processing does not depend on total order, so can
+ * take advantage of unordered listing optimizations--such as
+ * operating on one shard at a time */
+ list_op.params.allow_unordered = true;
+ list_op.params.ns = RGW_OBJ_NS_MULTIPART;
+ list_op.params.filter = &mp_filter;
+
+ auto pf = [&](RGWLC::LCWorker* wk, WorkItem& wi) {
+ auto wt = boost::get<std::tuple<const lc_op&, rgw_bucket_dir_entry>>(wi);
+ auto& [rule, obj] = wt;
+ RGWMPObj mp_obj;
+ if (obj_has_expired(cct, obj.meta.mtime, rule.mp_expiration)) {
+ rgw_obj_key key(obj.key);
+ if (!mp_obj.from_meta(key.name)) {
+ return;
+ }
+ RGWObjectCtx rctx(store);
+ ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj);
+ if (ret < 0 && ret != -ERR_NO_SUCH_UPLOAD) {
+ ldpp_dout(wk->get_lc(), 0)
+ << "ERROR: abort_multipart_upload failed, ret=" << ret
+ << ", meta:" << obj.key
+ << dendl;
+ } else if (ret == -ERR_NO_SUCH_UPLOAD) {
+ ldpp_dout(wk->get_lc(), 5)
+ << "ERROR: abort_multipart_upload failed, ret=" << ret
+ << ", meta:" << obj.key
+ << dendl;
+ }
+ } /* expired */
+ };
+
+ worker->workpool->setf(pf);
+
+ for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
+ ++prefix_iter) {
+ if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
+ continue;
+ }
+ list_op.params.prefix = prefix_iter->first;
+ do {
+ objs.clear();
+ list_op.params.marker = list_op.get_next_marker();
+ ret = list_op.list_objects(1000, &objs, NULL, &is_truncated, null_yield);
+ if (ret < 0) {
+ if (ret == (-ENOENT))
+ return 0;
+ ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
+ return ret;
+ }
+
+ for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
+ std::tuple<const lc_op&, rgw_bucket_dir_entry> t1 =
+ {prefix_iter->second, *obj_iter};
+ worker->workpool->enqueue(WorkItem{t1});
+ if (going_down()) {
+ worker->workpool->drain();
+ return 0;
+ }
+ } /* for objs */
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
+ } while(is_truncated);
+ } /* for prefix_map */
+
+ worker->workpool->drain();
+ return 0;
+}
+
+static int read_obj_tags(RGWRados *store, RGWBucketInfo& bucket_info,
+ rgw_obj& obj, RGWObjectCtx& ctx, bufferlist& tags_bl)
+{
+ RGWRados::Object op_target(store, bucket_info, ctx, obj);
+ RGWRados::Object::Read read_op(&op_target);
+
+ return read_op.get_attr(RGW_ATTR_TAGS, tags_bl, null_yield);
+}
+
+static bool is_valid_op(const lc_op& op)
+{
+ return (op.status &&
+ (op.expiration > 0
+ || op.expiration_date != boost::none
+ || op.noncur_expiration > 0
+ || op.dm_expiration
+ || !op.transitions.empty()
+ || !op.noncur_transitions.empty()));
+}
+
+static inline bool has_all_tags(const lc_op& rule_action,
+ const RGWObjTags& object_tags)
+{
+ if(! rule_action.obj_tags)
+ return false;
+ if(object_tags.count() < rule_action.obj_tags->count())
+ return false;
+ size_t tag_count = 0;
+ for (const auto& tag : object_tags.get_tags()) {
+ const auto& rule_tags = rule_action.obj_tags->get_tags();
+ const auto& iter = rule_tags.find(tag.first);
+ if(iter->second == tag.second)
+ {
+ tag_count++;
+ }
+ /* all tags in the rule appear in obj tags */
+ }
+ return tag_count == rule_action.obj_tags->count();
+}
static int check_tags(lc_op_ctx& oc, bool *skip)
{
return -1;
}
- multimap<string, lc_op>& prefix_map = config.get_prefix_map();
+ auto pf = [](RGWLC::LCWorker* wk, WorkItem& wi) {
+ auto wt =
+ boost::get<std::tuple<LCOpRule&, rgw_bucket_dir_entry>>(wi);
+ auto& [op_rule, o] = wt;
+ ldpp_dout(wk->get_lc(), 20)
+ << __func__ << "(): key=" << o.key << dendl;
+ std::cout << "KEY2: " << o.key << std::endl;
+ int ret = op_rule.process(o, wk->dpp);
+ if (ret < 0) {
+ ldpp_dout(wk->get_lc(), 20)
+ << "ERROR: orule.process() returned ret=" << ret
+ << dendl;
+ }
+ };
+ worker->workpool->setf(pf);
+ multimap<string, lc_op>& prefix_map = config.get_prefix_map();
ldpp_dout(this, 10) << __func__ << "() prefix_map size="
<< prefix_map.size()
<< dendl;
ol.set_prefix(prefix_iter->first);
ret = ol.init();
-
if (ret < 0) {
if (ret == (-ENOENT))
return 0;
}
op_env oenv(op, store, worker, bucket_info, ol);
-
LCOpRule orule(oenv);
-
- orule.build();
-
- rgw_bucket_dir_entry o;
- for (; ol.get_obj(&o); ol.next()) {
- ldpp_dout(this, 20) << __func__ << "(): key=" << o.key << dendl;
- int ret = orule.process(o, this);
- if (ret < 0) {
- ldpp_dout(this, 20) << "ERROR: orule.process() returned ret="
- << ret
- << dendl;
- }
-
- if (going_down()) {
- return 0;
- }
+ orule.build(); // why can't ctor do it?
+#if 0
+ /* would permit passing o by reference, removes fetch overlap */
+ auto fetch_barrier = [&worker]()
+ { worker->workpool->drain(); };
+#endif
+ rgw_bucket_dir_entry* o{nullptr};
+ for (; ol.get_obj(&o /* , fetch_barrier */); ol.next()) {
+ std::tuple<LCOpRule&, rgw_bucket_dir_entry> t1 = {orule, *o};
+ worker->workpool->enqueue(WorkItem{t1});
}
+ worker->workpool->drain();
}
- ret = handle_multipart_expiration(&target, prefix_map);
-
+ ret = handle_multipart_expiration(&target, prefix_map, worker);
return ret;
}
return secs>0 ? secs : secs+24*60*60;
}
+RGWLC::LCWorker::~LCWorker()
+{
+ workpool->drain();
+ delete workpool;
+} /* ~LCWorker */
+
void RGWLifecycleConfiguration::generate_test_instances(
list<RGWLifecycleConfiguration*>& o)
{
});
return ret;
-}
+} /* RGWLC::remove_bucket_config */
+
+RGWLC::~RGWLC()
+{
+ stop_processor();
+ finalize();
+} /* ~RGWLC() */
namespace rgw::lc {