#undef dout_prefix
#define dout_prefix (*_dout << "meta trim: ")
-namespace {
+/// purge all log shards for the given mdlog
+class PurgeLogShardsCR : public RGWShardCollectCR {
+ RGWRados *const store;
+ const RGWMetadataLog* mdlog;
+ const int num_shards;
+ const rgw_bucket& pool;
+ int i{0};
+
+ static constexpr int max_concurrent = 16;
+
+ public:
+ PurgeLogShardsCR(RGWRados *store, const RGWMetadataLog* mdlog,
+ const rgw_bucket& pool, int num_shards)
+ : RGWShardCollectCR(store->ctx(), max_concurrent),
+ store(store), mdlog(mdlog), num_shards(num_shards), pool(pool)
+ {}
+
+ bool spawn_next() override {
+ if (i == num_shards) {
+ return false;
+ }
+ std::string oid;
+ mdlog->get_shard_oid(i++, oid);
+ spawn(new RGWRadosRemoveCR(store, pool, oid), false);
+ return true;
+ }
+};
using Cursor = RGWPeriodHistory::Cursor;
+/// purge mdlogs from the oldest up to (but not including) the given realm_epoch
+class PurgePeriodLogsCR : public RGWCoroutine {
+ RGWRados *const store;
+ RGWMetadataManager *const metadata;
+ RGWObjVersionTracker objv;
+ Cursor cursor;
+ epoch_t realm_epoch;
+ epoch_t *last_trim_epoch; //< update last trim on success
+
+ public:
+ PurgePeriodLogsCR(RGWRados *store, epoch_t realm_epoch, epoch_t *last_trim)
+ : RGWCoroutine(store->ctx()), store(store), metadata(store->meta_mgr),
+ realm_epoch(realm_epoch), last_trim_epoch(last_trim)
+ {}
+
+ int operate();
+};
+
+int PurgePeriodLogsCR::operate()
+{
+ reenter(this) {
+ // read our current oldest log period
+ yield call(metadata->read_oldest_log_period_cr(&cursor, &objv));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ assert(cursor);
+ ldout(cct, 20) << "oldest log realm_epoch=" << cursor.get_epoch()
+ << " period=" << cursor.get_period().get_id() << dendl;
+
+ // trim -up to- the given realm_epoch
+ while (cursor.get_epoch() < realm_epoch) {
+ ldout(cct, 4) << "purging log shards for realm_epoch=" << cursor.get_epoch()
+ << " period=" << cursor.get_period().get_id() << dendl;
+ yield {
+ const auto mdlog = metadata->get_log(cursor.get_period().get_id());
+ const auto& pool = store->get_zone_params().log_pool;
+ auto num_shards = cct->_conf->rgw_md_log_max_shards;
+ call(new PurgeLogShardsCR(store, mdlog, pool, num_shards));
+ }
+ if (retcode < 0) {
+ ldout(cct, 1) << "failed to remove log shards: "
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
+ ldout(cct, 10) << "removed log shards for realm_epoch=" << cursor.get_epoch()
+ << " period=" << cursor.get_period().get_id() << dendl;
+
+ // update our mdlog history
+ yield call(metadata->trim_log_period_cr(cursor, &objv));
+ if (retcode == -ENOENT) {
+ // must have raced to update mdlog history. return success and allow the
+ // winner to continue purging
+ ldout(cct, 10) << "already removed log shards for realm_epoch=" << cursor.get_epoch()
+ << " period=" << cursor.get_period().get_id() << dendl;
+ return set_cr_done();
+ } else if (retcode < 0) {
+ ldout(cct, 1) << "failed to remove log shards for realm_epoch="
+ << cursor.get_epoch() << " period=" << cursor.get_period().get_id()
+ << " with: " << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
+
+ if (*last_trim_epoch < cursor.get_epoch()) {
+ *last_trim_epoch = cursor.get_epoch();
+ }
+
+ assert(cursor.has_next()); // get_current() should always come after
+ cursor.next();
+ }
+ return set_cr_done();
+ }
+ return 0;
+}
+
+namespace {
+
using connection_map = std::map<std::string, std::unique_ptr<RGWRESTConn>>;
/// construct a RGWRESTConn for each zone in the realm