#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/predicate.hpp>
-#include <boost/asio/spawn.hpp>
-#include <boost/variant.hpp>
#include "include/scope_guard.h"
#include "include/function2.hpp"
#include "common/Clock.h" // for ceph_clock_now()
+#include "common/async/spawn_throttle.h"
#include "common/Formatter.h"
#include "common/containers.h"
#include "common/split.h"
#include "include/random.h"
#include "cls/lock/cls_lock_client.h"
#include "rgw_perf_counters.h"
+#include "rgw_asio_thread.h"
#include "rgw_common.h"
#include "rgw_bucket.h"
#include "rgw_bucket_layout.h"
}; /* op_env */
class LCRuleOp;
-class WorkQ;
struct lc_op_ctx {
CephContext *cct;
optional_yield y);
}; /* LCOpRule */
-using WorkItem =
- std::variant<void*,
- /* out-of-line delete */
- std::tuple<LCOpRule, rgw_bucket_dir_entry>,
- /* uncompleted MPU expiration */
- std::tuple<lc_op, rgw_bucket_dir_entry>,
- rgw_bucket_dir_entry>;
-
-class WorkQ : public Thread
-{
-public:
- using unique_lock = std::unique_lock<std::mutex>;
- using work_f = std::function<void(RGWLC::LCWorker*, WorkItem&, optional_yield)>;
- using dequeue_result = std::list<WorkItem>;
-
- static constexpr uint32_t FLAG_NONE = 0x0000;
- static constexpr uint32_t FLAG_EWAIT_SYNC = 0x0001;
- static constexpr uint32_t FLAG_DWAIT_SYNC = 0x0002;
- static constexpr uint32_t FLAG_EDRAIN_SYNC = 0x0004;
-
-private:
- const work_f bsf = [](RGWLC::LCWorker* wk, WorkItem& wi, optional_yield) {};
- RGWLC::LCWorker* wk;
- uint32_t qmax;
- int ix;
- std::mutex mtx;
- std::condition_variable cv;
- uint32_t flags;
- std::list<WorkItem> items;
- work_f f;
-
-public:
- WorkQ(RGWLC::LCWorker* wk, uint32_t ix, uint32_t qmax)
- : wk(wk), qmax(qmax), ix(ix), flags(FLAG_NONE), f(bsf)
- {
- create(thr_name().c_str());
- }
-
- std::string thr_name() {
- return std::string{"wp_thrd: "}
- + std::to_string(wk->ix) + ", " + std::to_string(ix);
- }
-
- void setf(work_f _f) {
- f = _f;
- }
-
- void enqueue(WorkItem&& item) {
- unique_lock uniq(mtx);
- while ((!wk->get_lc()->going_down()) &&
- (items.size() > qmax)) {
- flags |= FLAG_EWAIT_SYNC;
- cv.wait_for(uniq, 200ms);
- }
- items.push_back(item);
- if (flags & FLAG_DWAIT_SYNC) {
- flags &= ~FLAG_DWAIT_SYNC;
- cv.notify_one();
- }
- }
-
- void drain() {
- unique_lock uniq(mtx);
- flags |= FLAG_EDRAIN_SYNC;
- while (flags & FLAG_EDRAIN_SYNC) {
- cv.wait_for(uniq, 200ms);
- }
- }
-
-private:
- dequeue_result dequeue(size_t max_items=1) {
- unique_lock uniq(mtx);
- while ((!wk->get_lc()->going_down()) &&
- (items.size() == 0)) {
- /* clear drain state, as we are NOT doing work and qlen==0 */
- if (flags & FLAG_EDRAIN_SYNC) {
- flags &= ~FLAG_EDRAIN_SYNC;
- }
- flags |= FLAG_DWAIT_SYNC;
- cv.wait_for(uniq, 200ms);
- }
- if (items.size() > 0) {
- size_t split_size = std::min(max_items, items.size());
- dequeue_result result;
- result.splice(result.begin(), items, items.begin(), std::next(items.begin(), split_size));
- if (flags & FLAG_EWAIT_SYNC) {
- flags &= ~FLAG_EWAIT_SYNC;
- cv.notify_one();
- }
- return result;
- }
- return dequeue_result{};
- }
-
- void* entry() override {
- while (!wk->get_lc()->going_down()) {
- boost::asio::io_context context;
- for(auto& item : items) {
- if(item.index() != 0) {
- boost::asio::spawn(context, [&](boost::asio::yield_context yield) {
- try {
- optional_yield y(yield);
- f(wk, item, y);
- } catch (const std::exception& e) {
- ldpp_dout(wk->dpp, 0) << "Coroutine error: " << e.what() << dendl;
- }
- });
- }
- }
- try {
- context.run();
- } catch (const std::system_error& e) {
- ldpp_dout(wk->dpp, 0) << "ERROR: WorkQ context run returned error r="
- << -e.code().value() << dendl;
- }
- }
- return nullptr;
- }
-}; /* WorkQ */
-
-class RGWLC::WorkPool
-{
- using TVector = ceph::containers::tiny_vector<WorkQ, 3>;
- TVector wqs;
- uint64_t ix;
-
-public:
- WorkPool(RGWLC::LCWorker* wk, uint16_t n_threads, uint32_t qmax)
- : wqs(TVector{
- n_threads,
- [&](const size_t ix, auto emplacer) {
- emplacer.emplace(wk, ix, qmax);
- }}),
- ix(0)
- {}
-
- ~WorkPool() {
- for (auto& wq : wqs) {
- wq.join();
- }
- }
-
- void setf(WorkQ::work_f _f) {
- for (auto& wq : wqs) {
- wq.setf(_f);
- }
- }
-
- void enqueue(WorkItem item) {
- const auto tix = ix;
- ix = (ix+1) % wqs.size();
- (wqs[tix]).enqueue(std::move(item));
- }
-
- void drain() {
- for (auto& wq : wqs) {
- wq.drain();
- }
- }
-}; /* WorkPool */
-
RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* dpp, CephContext *cct,
RGWLC *lc, int ix)
: dpp(dpp), cct(cct), lc(lc), ix(ix)
{
- auto wpw = cct->_conf.get_val<int64_t>("rgw_lc_max_wp_worker");
- workpool = new WorkPool(this, wpw, 512);
}
static inline bool worker_should_stop(time_t stop_at, bool once)
int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target,
const multimap<string, lc_op>& prefix_map,
+ ceph::async::spawn_throttle& workpool,
LCWorker* worker, time_t stop_at, bool once)
{
int ret;
params.ns = RGW_OBJ_NS_MULTIPART;
params.access_list_filter = MultipartMetaFilter;
- auto pf = [&](RGWLC::LCWorker *wk, WorkItem &wi, optional_yield y) {
+ auto pf = [this, target] (optional_yield y, const lc_op& rule,
+ const rgw_bucket_dir_entry& obj) {
int ret{0};
- auto wt = std::get<std::tuple<lc_op, rgw_bucket_dir_entry>>(wi);
- auto& [rule, obj] = wt;
if (obj_has_expired(this, cct, obj.meta.mtime, rule.mp_expiration)) {
rgw_obj_key key(obj.key);
}
} else {
if (ret == -ERR_NO_SUCH_UPLOAD) {
- ldpp_dout(wk->get_lc(), 5) << "ERROR: abort_multipart_upload "
- "failed, ret=" << ret
- << ", meta:" << obj.key << dendl;
+ ldpp_dout(this, 5) << "ERROR: abort_multipart_upload failed, ret="
+ << ret << ", meta:" << obj.key << dendl;
} else {
- ldpp_dout(wk->get_lc(), 0) << "ERROR: abort_multipart_upload "
- "failed, ret=" << ret
- << ", meta:" << obj.key << dendl;
+ ldpp_dout(this, 0) << "ERROR: abort_multipart_upload failed, ret="
+ << ret << ", meta:" << obj.key << dendl;
}
} /* abort failed */
} /* expired */
return ret;
};
- worker->workpool->setf(pf);
-
for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
++prefix_iter) {
}
for (auto obj_iter = results.objs.begin(); obj_iter != results.objs.end(); ++obj_iter, ++offset) {
- std::tuple<lc_op, rgw_bucket_dir_entry> t1 =
- {prefix_iter->second, *obj_iter};
- worker->workpool->enqueue(WorkItem{t1});
+ workpool.spawn([pf, op=prefix_iter->second, obj=*obj_iter]
+ (boost::asio::yield_context yield) mutable {
+ pf(yield, op, obj);
+ });
if (going_down()) {
return 0;
}
} while(results.is_truncated);
} /* for prefix_map */
- worker->workpool->drain();
return 0;
} /* RGWLC::handle_multipart_expiration */
};
class LCOpAction_Transition : public LCOpAction {
- const transition_action& transition;
+ transition_action transition;
bool need_to_process{false};
protected:
}
int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
- time_t stop_at, bool once)
+ time_t stop_at, bool once,
+ boost::asio::yield_context yield)
{
RGWLifecycleConfiguration config(cct);
std::unique_ptr<rgw::sal::Bucket> bucket;
}
int ret = driver->load_bucket(this, rgw_bucket(bucket_tenant, bucket_name),
- &bucket, null_yield);
+ &bucket, yield);
if (ret < 0) {
ldpp_dout(this, 0) << "LC:get_bucket for " << bucket_name
<< " failed" << dendl;
return ret;
}
+ // use a limited number of coroutines for concurrent processing
+ size_t limit = cct->_conf.get_val<int64_t>("rgw_lc_max_wp_worker");
+ auto workpool = ceph::async::spawn_throttle{yield, limit};
auto stack_guard = make_scope_guard(
- [&worker]
+ [&workpool]
{
- worker->workpool->drain();
+ workpool.wait();
}
);
/* fetch information for zone checks */
rgw::sal::Zone* zone = driver->get_zone();
- auto pf = [&bucket_name](RGWLC::LCWorker* wk, WorkItem& wi, optional_yield y) {
- auto wt =
- std::get<std::tuple<LCOpRule, rgw_bucket_dir_entry>>(wi);
- auto& [op_rule, o] = wt;
-
- ldpp_dout(wk->get_lc(), 20)
+ auto pf = [&bucket_name](const DoutPrefixProvider* dpp, optional_yield y,
+ LCOpRule& op_rule, rgw_bucket_dir_entry& o) {
+ ldpp_dout(dpp, 20)
<< __func__ << "(): key=" << o.key << dendl;
- int ret = op_rule.process(o, wk->dpp, y);
+ int ret = op_rule.process(o, dpp, y);
if (ret < 0) {
- ldpp_dout(wk->get_lc(), 20)
+ ldpp_dout(dpp, 20)
<< "ERROR: orule.process() returned ret=" << ret
<< " bucket=" << bucket_name
<< dendl;
}
};
- worker->workpool->setf(pf);
multimap<string, lc_op>& prefix_map = config.get_prefix_map();
ldpp_dout(this, 10) << __func__ << "() prefix_map size="
rgw_bucket_dir_entry* o{nullptr};
for (auto offset = 0; ol.get_obj(this, &o /* , fetch_barrier */); ++offset, ol.next()) {
orule.update();
- std::tuple<LCOpRule, rgw_bucket_dir_entry> t1 = {orule, *o};
- worker->workpool->enqueue(WorkItem{t1});
+ workpool.spawn([&pf, dpp=this, orule, o=*o]
+ (boost::asio::yield_context yield) mutable {
+ pf(dpp, yield, orule, o);
+ });
if ((offset % 100) == 0) {
if (worker_should_stop(stop_at, once)) {
ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker="
}
}
}
- worker->workpool->drain();
}
- ret = handle_multipart_expiration(bucket.get(), prefix_map, worker, stop_at, once);
+ ret = handle_multipart_expiration(bucket.get(), prefix_map, workpool,
+ worker, stop_at, once);
+ return ret;
+}
+
+int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
+ time_t stop_at, bool once)
+{
+ int ret = 0;
+
+ // spawn a coroutine for bucket_lc_process() so it can use spawn_throttle
+ // for concurrent operations
+ boost::asio::io_context context;
+ boost::asio::spawn(context,
+ [this, &shard_id, worker, stop_at, once] (boost::asio::yield_context yield) {
+ return bucket_lc_process(shard_id, worker, stop_at, once, yield);
+ },
+ [&ret] (std::exception_ptr eptr, int result) {
+ if (eptr) {
+ std::rethrow_exception(eptr);
+ } else {
+ ret = result;
+ }
+ });
+
+ // warn about any blocking operations called from this coroutine
+ auto enable_warnings = warn_about_blocking_in_scope{};
+
+ context.run();
+
return ret;
}
RGWLC::LCWorker::~LCWorker()
{
- delete workpool;
} /* ~LCWorker */
list<RGWLifecycleConfiguration> RGWLifecycleConfiguration::generate_test_instances()