#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/predicate.hpp>
+#include <boost/asio/spawn.hpp>
#include <boost/variant.hpp>
#include "include/scope_guard.h"
return (timediff >= cmp);
}
-static bool pass_object_lock_check(rgw::sal::Driver* driver, rgw::sal::Object* obj, const DoutPrefixProvider *dpp)
+static bool pass_object_lock_check(rgw::sal::Driver* driver, rgw::sal::Object* obj, const DoutPrefixProvider *dpp, optional_yield y)
{
if (!obj->get_bucket()->get_info().obj_lock_enabled()) {
return true;
}
std::unique_ptr<rgw::sal::Object::ReadOp> read_op = obj->get_read_op();
- int ret = read_op->prepare(null_yield, dpp);
+ int ret = read_op->prepare(y, dpp);
if (ret < 0) {
if (ret == -ENOENT) {
return true;
public:
virtual ~LCOpAction() {}
- virtual bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) {
+ virtual bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y) {
return false;
}
return true;
}
- virtual int process(lc_op_ctx& oc) {
+ virtual int process(lc_op_ctx& oc, optional_yield y) {
return 0;
}
class LCOpFilter {
public:
virtual ~LCOpFilter() {}
- virtual bool check(const DoutPrefixProvider *dpp, lc_op_ctx& oc) {
+ virtual bool check(const DoutPrefixProvider *dpp, lc_op_ctx& oc, optional_yield y) {
return false;
}
}; /* LCOpFilter */
void build();
void update();
int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp,
- WorkQ* wq);
+ WorkQ* wq, optional_yield y);
}; /* LCOpRule */
using WorkItem =
{
public:
using unique_lock = std::unique_lock<std::mutex>;
- using work_f = std::function<void(RGWLC::LCWorker*, WorkQ*, WorkItem&)>;
- using dequeue_result = std::variant<void*, WorkItem>;
+ using work_f = std::function<void(RGWLC::LCWorker*, WorkQ*, WorkItem&, optional_yield)>;
+ using dequeue_result = std::list<WorkItem>;
static constexpr uint32_t FLAG_NONE = 0x0000;
static constexpr uint32_t FLAG_EWAIT_SYNC = 0x0001;
static constexpr uint32_t FLAG_EDRAIN_SYNC = 0x0004;
private:
- const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {};
+ const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi, optional_yield) {};
RGWLC::LCWorker* wk;
uint32_t qmax;
int ix;
std::mutex mtx;
std::condition_variable cv;
uint32_t flags;
- vector<WorkItem> items;
+ std::list<WorkItem> items;
work_f f;
public:
}
private:
- dequeue_result dequeue() {
+ dequeue_result dequeue(size_t max_items=1) {
unique_lock uniq(mtx);
while ((!wk->get_lc()->going_down()) &&
(items.size() == 0)) {
cv.wait_for(uniq, 200ms);
}
if (items.size() > 0) {
- auto item = items.back();
- items.pop_back();
+ size_t split_size = std::min(max_items, items.size());
+ dequeue_result result;
+ result.splice(result.begin(), items, items.begin(), std::next(items.begin(), split_size));
if (flags & FLAG_EWAIT_SYNC) {
flags &= ~FLAG_EWAIT_SYNC;
cv.notify_one();
}
- return {item};
+ return result;
}
- return nullptr;
+ return dequeue_result{};
}
void* entry() override {
while (!wk->get_lc()->going_down()) {
- auto item = dequeue();
- if (item.index() == 0) {
- /* going down */
- break;
+ boost::asio::io_context context;
+ for(auto& item : items) {
+ if(item.index() != 0) {
+ boost::asio::spawn(context, [&](boost::asio::yield_context yield) {
+ try {
+ optional_yield y(yield);
+ f(wk, this, item, y);
+ } catch (const std::exception& e) {
+ ldpp_dout(wk->dpp, 0) << "Coroutine error: " << e.what() << dendl;
+ }
+ });
+ }
+ }
+ try {
+ context.run();
+ } catch (const std::system_error& e) {
+ ldpp_dout(wk->dpp, 0) << "ERROR: WorkQ context run returned error r="
+ << -e.code().value() << dendl;
}
- f(wk, this, std::get<WorkItem>(item));
}
return nullptr;
}
return 0;
} /* RGWLC::handle_multipart_expiration */
-static int read_obj_tags(const DoutPrefixProvider *dpp, rgw::sal::Object* obj, bufferlist& tags_bl)
+static int read_obj_tags(const DoutPrefixProvider *dpp, rgw::sal::Object* obj, bufferlist& tags_bl, optional_yield y)
{
std::unique_ptr<rgw::sal::Object::ReadOp> rop = obj->get_read_op();
- return rop->get_attr(dpp, RGW_ATTR_TAGS, tags_bl, null_yield);
+ return rop->get_attr(dpp, RGW_ATTR_TAGS, tags_bl, y);
}
static bool is_valid_op(const lc_op& op)
return tag_count == rule_action.obj_tags->count();
}
-static int check_tags(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool *skip)
+static int check_tags(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool *skip, optional_yield y)
{
auto& op = oc.op;
*skip = true;
bufferlist tags_bl;
- int ret = read_obj_tags(dpp, oc.obj.get(), tags_bl);
+ int ret = read_obj_tags(dpp, oc.obj.get(), tags_bl, y);
if (ret < 0) {
if (ret != -ENODATA) {
ldpp_dout(oc.dpp, 5) << "ERROR: read_obj_tags returned r="
class LCOpFilter_Tags : public LCOpFilter {
public:
- bool check(const DoutPrefixProvider *dpp, lc_op_ctx& oc) override {
+ bool check(const DoutPrefixProvider *dpp, lc_op_ctx& oc, optional_yield y) override {
auto& o = oc.o;
if (o.is_delete_marker()) {
bool skip;
- int ret = check_tags(dpp, oc, &skip);
+ int ret = check_tags(dpp, oc, &skip, y);
if (ret < 0) {
if (ret == -ENOENT) {
return false;
public:
LCOpAction_CurrentExpiration(op_env& env) {}
- bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
+ bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y) override {
auto& o = oc.o;
if (!o.is_current()) {
ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
LCOpAction_NonCurrentExpiration(op_env& env)
{}
- bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
+ bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y) override {
auto& o = oc.o;
if (o.is_current()) {
ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
public:
LCOpAction_DMExpiration(op_env& env) {}
- bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
+ bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y) override {
auto& o = oc.o;
if (!o.is_delete_marker()) {
ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
LCOpAction_Transition(const transition_action& _transition)
: transition(_transition) {}
- bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
+ bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y) override {
auto& o = oc.o;
if (o.is_delete_marker()) {
return need_to_process;
}
- int delete_tier_obj(lc_op_ctx& oc) {
+ int delete_tier_obj(lc_op_ctx& oc, optional_yield y) {
int ret = 0;
/* If bucket has versioning enabled, create delete_marker for current version
}
if (delete_object) {
- ret = delete_tier_obj(oc);
+ ret = delete_tier_obj(oc, y);
if (ret < 0) {
ldpp_dout(oc.dpp, 0) << "ERROR: Deleting tier object(" << oc.o.key << ") failed ret=" << ret << dendl;
return ret;
if (!r && oc.tier->is_tier_type_s3()) {
ldpp_dout(oc.dpp, 30) << "Found cloud s3 tier: " << target_placement.storage_class << dendl;
if (!oc.o.is_current() &&
- !pass_object_lock_check(oc.driver, oc.obj.get(), oc.dpp)) {
+ !pass_object_lock_check(oc.driver, oc.obj.get(), oc.dpp, y)) {
/* Skip objects which has object lock enabled. */
ldpp_dout(oc.dpp, 10) << "Object(key:" << oc.o.key << ") is locked. Skipping transition to cloud-s3 tier: " << target_placement.storage_class << dendl;
return 0;
}
- r = transition_obj_to_cloud(oc);
+ r = transition_obj_to_cloud(oc, y);
if (r < 0) {
ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj(key:" << oc.o.key << ") to cloud (r=" << r << ")"
<< dendl;
int LCOpRule::process(rgw_bucket_dir_entry& o,
const DoutPrefixProvider *dpp,
- WorkQ* wq)
+ WorkQ* wq, optional_yield y)
{
lc_op_ctx ctx(env, o, next_key_name, num_noncurrent, effective_mtime, dpp, wq);
shared_ptr<LCOpAction> *selected = nullptr; // n.b., req'd by sharing
for (auto& a : actions) {
real_time action_exp;
- if (a->check(ctx, &action_exp, dpp)) {
+ if (a->check(ctx, &action_exp, dpp, y)) {
if (action_exp > exp) {
exp = action_exp;
selected = &a;
bool cont = false;
for (auto& f : filters) {
- if (f->check(dpp, ctx)) {
+ if (f->check(dpp, ctx, y)) {
cont = true;
break;
}
return 0;
}
- int r = (*selected)->process(ctx);
+ int r = (*selected)->process(ctx, y);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj "
<< env.bucket << ":" << o.key
ldpp_dout(wk->get_lc(), 20)
<< __func__ << "(): key=" << o.key << wq->thr_name()
<< dendl;
- int ret = op_rule.process(o, wk->dpp, wq);
+ int ret = op_rule.process(o, wk->dpp, wq, y);
if (ret < 0) {
ldpp_dout(wk->get_lc(), 20)
<< "ERROR: orule.process() returned ret=" << ret