From: Casey Bodley Date: Sat, 13 Apr 2019 16:39:35 +0000 (-0400) Subject: rgw: move mdlog trimming into rgw_trim_mdlog.cc X-Git-Tag: v15.1.0~2837^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ac3d1a02517e2fe6e1ac6b07b87fe1318af6da51;p=ceph.git rgw: move mdlog trimming into rgw_trim_mdlog.cc Signed-off-by: Casey Bodley --- diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index ee7d174a8ee..b93cac4aa1e 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -74,6 +74,7 @@ set(librgw_common_srcs rgw_sync_module_pubsub_rest.cc rgw_sync_log_trim.cc rgw_sync_trace.cc + rgw_trim_mdlog.cc rgw_period_history.cc rgw_period_puller.cc rgw_reshard.cc diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 6457050511e..e76d2954024 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -44,6 +44,7 @@ extern "C" { #include "rgw_usage.h" #include "rgw_orphan.h" #include "rgw_sync.h" +#include "rgw_trim_mdlog.h" #include "rgw_sync_log_trim.h" #include "rgw_data_sync.h" #include "rgw_rest_conn.h" diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index dc893241102..4a18def8d25 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -68,6 +68,7 @@ using namespace librados; #include "rgw_sync.h" #include "rgw_sync_counters.h" #include "rgw_sync_trace.h" +#include "rgw_trim_mdlog.h" #include "rgw_data_sync.h" #include "rgw_realm_watcher.h" #include "rgw_reshard.h" diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index eb83a51192f..1c3ee7d45e9 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -524,6 +524,14 @@ public: } }; +RGWCoroutine* create_read_remote_mdlog_shard_info_cr(RGWMetaSyncEnv *env, + const std::string& period, + int shard_id, + RGWMetadataLogInfo* info) +{ + return new RGWReadRemoteMDLogShardInfoCR(env, period, shard_id, info); +} + class RGWListRemoteMDLogShardCR : public RGWSimpleCoroutine { RGWMetaSyncEnv *sync_env; RGWRESTReadResource *http_op; @@ -586,6 +594,17 @@ public: } }; +RGWCoroutine* create_list_remote_mdlog_shard_cr(RGWMetaSyncEnv *env, + const std::string& period, + int shard_id, + const std::string& marker, + uint32_t max_entries, + rgw_mdlog_shard_data *result) +{ + return new RGWListRemoteMDLogShardCR(env, period, shard_id, marker, + max_entries, result); +} + bool RGWReadRemoteMDLogInfoCR::spawn_next() { if (shard_id >= num_shards) { return false; @@ -2445,694 +2464,3 @@ int RGWCloneMetaLogCoroutine::state_store_mdlog_entries_complete() { return set_cr_done(); } - - -// TODO: move into rgw_sync_trim.cc -#undef dout_prefix -#define dout_prefix (*_dout << "meta trim: ") - -/// purge all log shards for the given mdlog -class PurgeLogShardsCR : public RGWShardCollectCR { - RGWRados *const store; - const RGWMetadataLog* mdlog; - const int num_shards; - rgw_raw_obj obj; - int i{0}; - - static constexpr int max_concurrent = 16; - - public: - PurgeLogShardsCR(RGWRados *store, const RGWMetadataLog* mdlog, - const rgw_pool& pool, int num_shards) - : RGWShardCollectCR(store->ctx(), max_concurrent), - store(store), mdlog(mdlog), num_shards(num_shards), obj(pool, "") - {} - - bool spawn_next() override { - if (i == num_shards) { - return false; - } - mdlog->get_shard_oid(i++, obj.oid); - spawn(new RGWRadosRemoveCR(store, obj), false); - return true; - } -}; - -using Cursor = RGWPeriodHistory::Cursor; - -/// purge mdlogs from the oldest up to (but not including) the given realm_epoch -class PurgePeriodLogsCR : public RGWCoroutine { - RGWRados *const store; - RGWMetadataManager *const metadata; - RGWObjVersionTracker objv; - Cursor cursor; - epoch_t realm_epoch; - epoch_t *last_trim_epoch; //< update last trim on success - - public: - PurgePeriodLogsCR(RGWRados *store, epoch_t realm_epoch, epoch_t *last_trim) - : RGWCoroutine(store->ctx()), store(store), metadata(store->meta_mgr), - realm_epoch(realm_epoch), last_trim_epoch(last_trim) - {} - - int operate() override; -}; - -int PurgePeriodLogsCR::operate() -{ - reenter(this) { - // read our current oldest log period - yield call(metadata->read_oldest_log_period_cr(&cursor, &objv)); - if (retcode < 0) { - return set_cr_error(retcode); - } - ceph_assert(cursor); - ldout(cct, 20) << "oldest log realm_epoch=" << cursor.get_epoch() - << " period=" << cursor.get_period().get_id() << dendl; - - // trim -up to- the given realm_epoch - while (cursor.get_epoch() < realm_epoch) { - ldout(cct, 4) << "purging log shards for realm_epoch=" << cursor.get_epoch() - << " period=" << cursor.get_period().get_id() << dendl; - yield { - const auto mdlog = metadata->get_log(cursor.get_period().get_id()); - const auto& pool = store->svc.zone->get_zone_params().log_pool; - auto num_shards = cct->_conf->rgw_md_log_max_shards; - call(new PurgeLogShardsCR(store, mdlog, pool, num_shards)); - } - if (retcode < 0) { - ldout(cct, 1) << "failed to remove log shards: " - << cpp_strerror(retcode) << dendl; - return set_cr_error(retcode); - } - ldout(cct, 10) << "removed log shards for realm_epoch=" << cursor.get_epoch() - << " period=" << cursor.get_period().get_id() << dendl; - - // update our mdlog history - yield call(metadata->trim_log_period_cr(cursor, &objv)); - if (retcode == -ENOENT) { - // must have raced to update mdlog history. return success and allow the - // winner to continue purging - ldout(cct, 10) << "already removed log shards for realm_epoch=" << cursor.get_epoch() - << " period=" << cursor.get_period().get_id() << dendl; - return set_cr_done(); - } else if (retcode < 0) { - ldout(cct, 1) << "failed to remove log shards for realm_epoch=" - << cursor.get_epoch() << " period=" << cursor.get_period().get_id() - << " with: " << cpp_strerror(retcode) << dendl; - return set_cr_error(retcode); - } - - if (*last_trim_epoch < cursor.get_epoch()) { - *last_trim_epoch = cursor.get_epoch(); - } - - ceph_assert(cursor.has_next()); // get_current() should always come after - cursor.next(); - } - return set_cr_done(); - } - return 0; -} - -namespace { - -using connection_map = std::map>; - -/// construct a RGWRESTConn for each zone in the realm -template -connection_map make_peer_connections(RGWRados *store, - const Zonegroups& zonegroups) -{ - connection_map connections; - for (auto& g : zonegroups) { - for (auto& z : g.second.zones) { - std::unique_ptr conn{ - new RGWRESTConn(store->ctx(), store->svc.zone, z.first, z.second.endpoints)}; - connections.emplace(z.first, std::move(conn)); - } - } - return connections; -} - -/// return the marker that it's safe to trim up to -const std::string& get_stable_marker(const rgw_meta_sync_marker& m) -{ - return m.state == m.FullSync ? m.next_step_marker : m.marker; -} - -/// comparison operator for take_min_status() -bool operator<(const rgw_meta_sync_marker& lhs, const rgw_meta_sync_marker& rhs) -{ - // sort by stable marker - return get_stable_marker(lhs) < get_stable_marker(rhs); -} - -/// populate the status with the minimum stable marker of each shard for any -/// peer whose realm_epoch matches the minimum realm_epoch in the input -template -int take_min_status(CephContext *cct, Iter first, Iter last, - rgw_meta_sync_status *status) -{ - if (first == last) { - return -EINVAL; - } - const size_t num_shards = cct->_conf->rgw_md_log_max_shards; - - status->sync_info.realm_epoch = std::numeric_limits::max(); - for (auto p = first; p != last; ++p) { - // validate peer's shard count - if (p->sync_markers.size() != num_shards) { - ldout(cct, 1) << "take_min_status got peer status with " - << p->sync_markers.size() << " shards, expected " - << num_shards << dendl; - return -EINVAL; - } - if (p->sync_info.realm_epoch < status->sync_info.realm_epoch) { - // earlier epoch, take its entire status - *status = std::move(*p); - } else if (p->sync_info.realm_epoch == status->sync_info.realm_epoch) { - // same epoch, take any earlier markers - auto m = status->sync_markers.begin(); - for (auto& shard : p->sync_markers) { - if (shard.second < m->second) { - m->second = std::move(shard.second); - } - ++m; - } - } - } - return 0; -} - -struct TrimEnv { - const DoutPrefixProvider *dpp; - RGWRados *const store; - RGWHTTPManager *const http; - int num_shards; - const std::string& zone; - Cursor current; //< cursor to current period - epoch_t last_trim_epoch{0}; //< epoch of last mdlog that was purged - - TrimEnv(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards) - : dpp(dpp), store(store), http(http), num_shards(num_shards), - zone(store->svc.zone->get_zone_params().get_id()), - current(store->period_history->get_current()) - {} -}; - -struct MasterTrimEnv : public TrimEnv { - connection_map connections; //< peer connections - std::vector peer_status; //< sync status for each peer - /// last trim marker for each shard, only applies to current period's mdlog - std::vector last_trim_markers; - - MasterTrimEnv(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards) - : TrimEnv(dpp, store, http, num_shards), - last_trim_markers(num_shards) - { - auto& period = current.get_period(); - connections = make_peer_connections(store, period.get_map().zonegroups); - connections.erase(zone); - peer_status.resize(connections.size()); - } -}; - -struct PeerTrimEnv : public TrimEnv { - /// last trim timestamp for each shard, only applies to current period's mdlog - std::vector last_trim_timestamps; - - PeerTrimEnv(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards) - : TrimEnv(dpp, store, http, num_shards), - last_trim_timestamps(num_shards) - {} - - void set_num_shards(int num_shards) { - this->num_shards = num_shards; - last_trim_timestamps.resize(num_shards); - } -}; - -} // anonymous namespace - - -/// spawn a trim cr for each shard that needs it, while limiting the number -/// of concurrent shards -class MetaMasterTrimShardCollectCR : public RGWShardCollectCR { - private: - static constexpr int MAX_CONCURRENT_SHARDS = 16; - - MasterTrimEnv& env; - RGWMetadataLog *mdlog; - int shard_id{0}; - std::string oid; - const rgw_meta_sync_status& sync_status; - - public: - MetaMasterTrimShardCollectCR(MasterTrimEnv& env, RGWMetadataLog *mdlog, - const rgw_meta_sync_status& sync_status) - : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS), - env(env), mdlog(mdlog), sync_status(sync_status) - {} - - bool spawn_next() override; -}; - -bool MetaMasterTrimShardCollectCR::spawn_next() -{ - while (shard_id < env.num_shards) { - auto m = sync_status.sync_markers.find(shard_id); - if (m == sync_status.sync_markers.end()) { - shard_id++; - continue; - } - auto& stable = get_stable_marker(m->second); - auto& last_trim = env.last_trim_markers[shard_id]; - - if (stable <= last_trim) { - // already trimmed - ldout(cct, 20) << "skipping log shard " << shard_id - << " at marker=" << stable - << " last_trim=" << last_trim - << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl; - shard_id++; - continue; - } - - mdlog->get_shard_oid(shard_id, oid); - - ldout(cct, 10) << "trimming log shard " << shard_id - << " at marker=" << stable - << " last_trim=" << last_trim - << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl; - spawn(new RGWSyncLogTrimCR(env.store, oid, stable, &last_trim), false); - shard_id++; - return true; - } - return false; -} - -/// spawn rest requests to read each peer's sync status -class MetaMasterStatusCollectCR : public RGWShardCollectCR { - static constexpr int MAX_CONCURRENT_SHARDS = 16; - - MasterTrimEnv& env; - connection_map::iterator c; - std::vector::iterator s; - public: - explicit MetaMasterStatusCollectCR(MasterTrimEnv& env) - : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS), - env(env), c(env.connections.begin()), s(env.peer_status.begin()) - {} - - bool spawn_next() override { - if (c == env.connections.end()) { - return false; - } - static rgw_http_param_pair params[] = { - { "type", "metadata" }, - { "status", nullptr }, - { nullptr, nullptr } - }; - - ldout(cct, 20) << "query sync status from " << c->first << dendl; - auto conn = c->second.get(); - using StatusCR = RGWReadRESTResourceCR; - spawn(new StatusCR(cct, conn, env.http, "/admin/log/", params, &*s), - false); - ++c; - ++s; - return true; - } -}; - -class MetaMasterTrimCR : public RGWCoroutine { - MasterTrimEnv& env; - rgw_meta_sync_status min_status; //< minimum sync status of all peers - int ret{0}; - - public: - explicit MetaMasterTrimCR(MasterTrimEnv& env) - : RGWCoroutine(env.store->ctx()), env(env) - {} - - int operate() override; -}; - -int MetaMasterTrimCR::operate() -{ - reenter(this) { - // TODO: detect this and fail before we spawn the trim thread? - if (env.connections.empty()) { - ldout(cct, 4) << "no peers, exiting" << dendl; - return set_cr_done(); - } - - ldout(cct, 10) << "fetching sync status for zone " << env.zone << dendl; - // query mdlog sync status from peers - yield call(new MetaMasterStatusCollectCR(env)); - - // must get a successful reply from all peers to consider trimming - if (ret < 0) { - ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl; - return set_cr_error(ret); - } - - // determine the minimum epoch and markers - ret = take_min_status(env.store->ctx(), env.peer_status.begin(), - env.peer_status.end(), &min_status); - if (ret < 0) { - ldout(cct, 4) << "failed to calculate min sync status from peers" << dendl; - return set_cr_error(ret); - } - yield { - auto store = env.store; - auto epoch = min_status.sync_info.realm_epoch; - ldout(cct, 4) << "realm epoch min=" << epoch - << " current=" << env.current.get_epoch()<< dendl; - if (epoch > env.last_trim_epoch + 1) { - // delete any prior mdlog periods - spawn(new PurgePeriodLogsCR(store, epoch, &env.last_trim_epoch), true); - } else { - ldout(cct, 10) << "mdlogs already purged up to realm_epoch " - << env.last_trim_epoch << dendl; - } - - // if realm_epoch == current, trim mdlog based on markers - if (epoch == env.current.get_epoch()) { - auto mdlog = store->meta_mgr->get_log(env.current.get_period().get_id()); - spawn(new MetaMasterTrimShardCollectCR(env, mdlog, min_status), true); - } - } - // ignore any errors during purge/trim because we want to hold the lock open - return set_cr_done(); - } - return 0; -} - - -/// read the first entry of the master's mdlog shard and trim to that position -class MetaPeerTrimShardCR : public RGWCoroutine { - RGWMetaSyncEnv& env; - RGWMetadataLog *mdlog; - const std::string& period_id; - const int shard_id; - RGWMetadataLogInfo info; - ceph::real_time stable; //< safe timestamp to trim, according to master - ceph::real_time *last_trim; //< last trimmed timestamp, updated on trim - rgw_mdlog_shard_data result; //< result from master's mdlog listing - - public: - MetaPeerTrimShardCR(RGWMetaSyncEnv& env, RGWMetadataLog *mdlog, - const std::string& period_id, int shard_id, - ceph::real_time *last_trim) - : RGWCoroutine(env.store->ctx()), env(env), mdlog(mdlog), - period_id(period_id), shard_id(shard_id), last_trim(last_trim) - {} - - int operate() override; -}; - -int MetaPeerTrimShardCR::operate() -{ - reenter(this) { - // query master's first mdlog entry for this shard - yield call(new RGWListRemoteMDLogShardCR(&env, period_id, shard_id, - "", 1, &result)); - if (retcode < 0) { - ldpp_dout(env.dpp, 5) << "failed to read first entry from master's mdlog shard " - << shard_id << " for period " << period_id - << ": " << cpp_strerror(retcode) << dendl; - return set_cr_error(retcode); - } - if (result.entries.empty()) { - // if there are no mdlog entries, we don't have a timestamp to compare. we - // can't just trim everything, because there could be racing updates since - // this empty reply. query the mdlog shard info to read its max timestamp, - // then retry the listing to make sure it's still empty before trimming to - // that - ldpp_dout(env.dpp, 10) << "empty master mdlog shard " << shard_id - << ", reading last timestamp from shard info" << dendl; - // read the mdlog shard info for the last timestamp - using ShardInfoCR = RGWReadRemoteMDLogShardInfoCR; - yield call(new ShardInfoCR(&env, period_id, shard_id, &info)); - if (retcode < 0) { - ldpp_dout(env.dpp, 5) << "failed to read info from master's mdlog shard " - << shard_id << " for period " << period_id - << ": " << cpp_strerror(retcode) << dendl; - return set_cr_error(retcode); - } - if (ceph::real_clock::is_zero(info.last_update)) { - return set_cr_done(); // nothing to trim - } - ldpp_dout(env.dpp, 10) << "got mdlog shard info with last update=" - << info.last_update << dendl; - // re-read the master's first mdlog entry to make sure it hasn't changed - yield call(new RGWListRemoteMDLogShardCR(&env, period_id, shard_id, - "", 1, &result)); - if (retcode < 0) { - ldpp_dout(env.dpp, 5) << "failed to read first entry from master's mdlog shard " - << shard_id << " for period " << period_id - << ": " << cpp_strerror(retcode) << dendl; - return set_cr_error(retcode); - } - // if the mdlog is still empty, trim to max marker - if (result.entries.empty()) { - stable = info.last_update; - } else { - stable = result.entries.front().timestamp; - - // can only trim -up to- master's first timestamp, so subtract a second. - // (this is why we use timestamps instead of markers for the peers) - stable -= std::chrono::seconds(1); - } - } else { - stable = result.entries.front().timestamp; - stable -= std::chrono::seconds(1); - } - - if (stable <= *last_trim) { - ldpp_dout(env.dpp, 10) << "skipping log shard " << shard_id - << " at timestamp=" << stable - << " last_trim=" << *last_trim << dendl; - return set_cr_done(); - } - - ldpp_dout(env.dpp, 10) << "trimming log shard " << shard_id - << " at timestamp=" << stable - << " last_trim=" << *last_trim << dendl; - yield { - std::string oid; - mdlog->get_shard_oid(shard_id, oid); - call(new RGWRadosTimelogTrimCR(env.store, oid, real_time{}, stable, "", "")); - } - if (retcode < 0 && retcode != -ENODATA) { - ldpp_dout(env.dpp, 1) << "failed to trim mdlog shard " << shard_id - << ": " << cpp_strerror(retcode) << dendl; - return set_cr_error(retcode); - } - *last_trim = stable; - return set_cr_done(); - } - return 0; -} - -class MetaPeerTrimShardCollectCR : public RGWShardCollectCR { - static constexpr int MAX_CONCURRENT_SHARDS = 16; - - PeerTrimEnv& env; - RGWMetadataLog *mdlog; - const std::string& period_id; - RGWMetaSyncEnv meta_env; //< for RGWListRemoteMDLogShardCR - int shard_id{0}; - - public: - MetaPeerTrimShardCollectCR(PeerTrimEnv& env, RGWMetadataLog *mdlog) - : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS), - env(env), mdlog(mdlog), period_id(env.current.get_period().get_id()) - { - meta_env.init(env.dpp, cct, env.store, env.store->svc.zone->get_master_conn(), - env.store->get_async_rados(), env.http, nullptr, - env.store->get_sync_tracer()); - } - - bool spawn_next() override; -}; - -bool MetaPeerTrimShardCollectCR::spawn_next() -{ - if (shard_id >= env.num_shards) { - return false; - } - auto& last_trim = env.last_trim_timestamps[shard_id]; - spawn(new MetaPeerTrimShardCR(meta_env, mdlog, period_id, shard_id, &last_trim), - false); - shard_id++; - return true; -} - -class MetaPeerTrimCR : public RGWCoroutine { - PeerTrimEnv& env; - rgw_mdlog_info mdlog_info; //< master's mdlog info - - public: - explicit MetaPeerTrimCR(PeerTrimEnv& env) : RGWCoroutine(env.store->ctx()), env(env) {} - - int operate() override; -}; - -int MetaPeerTrimCR::operate() -{ - reenter(this) { - ldout(cct, 10) << "fetching master mdlog info" << dendl; - yield { - // query mdlog_info from master for oldest_log_period - rgw_http_param_pair params[] = { - { "type", "metadata" }, - { nullptr, nullptr } - }; - - using LogInfoCR = RGWReadRESTResourceCR; - call(new LogInfoCR(cct, env.store->svc.zone->get_master_conn(), env.http, - "/admin/log/", params, &mdlog_info)); - } - if (retcode < 0) { - ldout(cct, 4) << "failed to read mdlog info from master" << dendl; - return set_cr_error(retcode); - } - // use master's shard count instead - env.set_num_shards(mdlog_info.num_shards); - - if (mdlog_info.realm_epoch > env.last_trim_epoch + 1) { - // delete any prior mdlog periods - yield call(new PurgePeriodLogsCR(env.store, mdlog_info.realm_epoch, - &env.last_trim_epoch)); - } else { - ldout(cct, 10) << "mdlogs already purged through realm_epoch " - << env.last_trim_epoch << dendl; - } - - // if realm_epoch == current, trim mdlog based on master's markers - if (mdlog_info.realm_epoch == env.current.get_epoch()) { - yield { - auto meta_mgr = env.store->meta_mgr; - auto mdlog = meta_mgr->get_log(env.current.get_period().get_id()); - call(new MetaPeerTrimShardCollectCR(env, mdlog)); - // ignore any errors during purge/trim because we want to hold the lock open - } - } - return set_cr_done(); - } - return 0; -} - -class MetaTrimPollCR : public RGWCoroutine { - RGWRados *const store; - const utime_t interval; //< polling interval - const rgw_raw_obj obj; - const std::string name{"meta_trim"}; //< lock name - const std::string cookie; - - protected: - /// allocate the coroutine to run within the lease - virtual RGWCoroutine* alloc_cr() = 0; - - public: - MetaTrimPollCR(RGWRados *store, utime_t interval) - : RGWCoroutine(store->ctx()), store(store), interval(interval), - obj(store->svc.zone->get_zone_params().log_pool, RGWMetadataLogHistory::oid), - cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)) - {} - - int operate() override; -}; - -int MetaTrimPollCR::operate() -{ - reenter(this) { - for (;;) { - set_status("sleeping"); - wait(interval); - - // prevent others from trimming for our entire wait interval - set_status("acquiring trim lock"); - yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store, - obj, name, cookie, interval.sec())); - if (retcode < 0) { - ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl; - continue; - } - - set_status("trimming"); - yield call(alloc_cr()); - - if (retcode < 0) { - // on errors, unlock so other gateways can try - set_status("unlocking"); - yield call(new RGWSimpleRadosUnlockCR(store->get_async_rados(), store, - obj, name, cookie)); - } - } - } - return 0; -} - -class MetaMasterTrimPollCR : public MetaTrimPollCR { - MasterTrimEnv env; //< trim state to share between calls - RGWCoroutine* alloc_cr() override { - return new MetaMasterTrimCR(env); - } - public: - MetaMasterTrimPollCR(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, - int num_shards, utime_t interval) - : MetaTrimPollCR(store, interval), - env(dpp, store, http, num_shards) - {} -}; - -class MetaPeerTrimPollCR : public MetaTrimPollCR { - PeerTrimEnv env; //< trim state to share between calls - RGWCoroutine* alloc_cr() override { - return new MetaPeerTrimCR(env); - } - public: - MetaPeerTrimPollCR(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, - int num_shards, utime_t interval) - : MetaTrimPollCR(store, interval), - env(dpp, store, http, num_shards) - {} -}; - -RGWCoroutine* create_meta_log_trim_cr(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, - int num_shards, utime_t interval) -{ - if (store->svc.zone->is_meta_master()) { - return new MetaMasterTrimPollCR(dpp, store, http, num_shards, interval); - } - return new MetaPeerTrimPollCR(dpp, store, http, num_shards, interval); -} - - -struct MetaMasterAdminTrimCR : private MasterTrimEnv, public MetaMasterTrimCR { - MetaMasterAdminTrimCR(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards) - : MasterTrimEnv(dpp, store, http, num_shards), - MetaMasterTrimCR(*static_cast(this)) - {} -}; - -struct MetaPeerAdminTrimCR : private PeerTrimEnv, public MetaPeerTrimCR { - MetaPeerAdminTrimCR(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards) - : PeerTrimEnv(dpp, store, http, num_shards), - MetaPeerTrimCR(*static_cast(this)) - {} -}; - -RGWCoroutine* create_admin_meta_log_trim_cr(const DoutPrefixProvider *dpp, RGWRados *store, - RGWHTTPManager *http, - int num_shards) -{ - if (store->svc.zone->is_meta_master()) { - return new MetaMasterAdminTrimCR(dpp, store, http, num_shards); - } - return new MetaPeerAdminTrimCR(dpp, store, http, num_shards); -} diff --git a/src/rgw/rgw_sync.h b/src/rgw/rgw_sync.h index 7774e164522..777b2e62d65 100644 --- a/src/rgw/rgw_sync.h +++ b/src/rgw/rgw_sync.h @@ -522,13 +522,18 @@ public: int operate() override; }; -// MetaLogTrimCR factory function -RGWCoroutine* create_meta_log_trim_cr(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, - int num_shards, utime_t interval); - -// factory function for mdlog trim via radosgw-admin -RGWCoroutine* create_admin_meta_log_trim_cr(const DoutPrefixProvider *dpp, RGWRados *store, - RGWHTTPManager *http, - int num_shards); +// factory functions for meta sync coroutines needed in mdlog trimming + +RGWCoroutine* create_read_remote_mdlog_shard_info_cr(RGWMetaSyncEnv *env, + const std::string& period, + int shard_id, + RGWMetadataLogInfo* info); + +RGWCoroutine* create_list_remote_mdlog_shard_cr(RGWMetaSyncEnv *env, + const std::string& period, + int shard_id, + const std::string& marker, + uint32_t max_entries, + rgw_mdlog_shard_data *result); #endif diff --git a/src/rgw/rgw_trim_mdlog.cc b/src/rgw/rgw_trim_mdlog.cc new file mode 100644 index 00000000000..5efd4f71200 --- /dev/null +++ b/src/rgw/rgw_trim_mdlog.cc @@ -0,0 +1,704 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/errno.h" + +#include "rgw_trim_mdlog.h" +#include "rgw_sync.h" +#include "rgw_cr_rados.h" +#include "rgw_cr_rest.h" +#include "rgw_rados.h" +#include "rgw_zone.h" +#include "services/svc_zone.h" + +#include + +#define dout_subsys ceph_subsys_rgw + +#undef dout_prefix +#define dout_prefix (*_dout << "meta trim: ") + +/// purge all log shards for the given mdlog +class PurgeLogShardsCR : public RGWShardCollectCR { + RGWRados *const store; + const RGWMetadataLog* mdlog; + const int num_shards; + rgw_raw_obj obj; + int i{0}; + + static constexpr int max_concurrent = 16; + + public: + PurgeLogShardsCR(RGWRados *store, const RGWMetadataLog* mdlog, + const rgw_pool& pool, int num_shards) + : RGWShardCollectCR(store->ctx(), max_concurrent), + store(store), mdlog(mdlog), num_shards(num_shards), obj(pool, "") + {} + + bool spawn_next() override { + if (i == num_shards) { + return false; + } + mdlog->get_shard_oid(i++, obj.oid); + spawn(new RGWRadosRemoveCR(store, obj), false); + return true; + } +}; + +using Cursor = RGWPeriodHistory::Cursor; + +/// purge mdlogs from the oldest up to (but not including) the given realm_epoch +class PurgePeriodLogsCR : public RGWCoroutine { + RGWRados *const store; + RGWMetadataManager *const metadata; + RGWObjVersionTracker objv; + Cursor cursor; + epoch_t realm_epoch; + epoch_t *last_trim_epoch; //< update last trim on success + + public: + PurgePeriodLogsCR(RGWRados *store, epoch_t realm_epoch, epoch_t *last_trim) + : RGWCoroutine(store->ctx()), store(store), metadata(store->meta_mgr), + realm_epoch(realm_epoch), last_trim_epoch(last_trim) + {} + + int operate() override; +}; + +int PurgePeriodLogsCR::operate() +{ + reenter(this) { + // read our current oldest log period + yield call(metadata->read_oldest_log_period_cr(&cursor, &objv)); + if (retcode < 0) { + return set_cr_error(retcode); + } + ceph_assert(cursor); + ldout(cct, 20) << "oldest log realm_epoch=" << cursor.get_epoch() + << " period=" << cursor.get_period().get_id() << dendl; + + // trim -up to- the given realm_epoch + while (cursor.get_epoch() < realm_epoch) { + ldout(cct, 4) << "purging log shards for realm_epoch=" << cursor.get_epoch() + << " period=" << cursor.get_period().get_id() << dendl; + yield { + const auto mdlog = metadata->get_log(cursor.get_period().get_id()); + const auto& pool = store->svc.zone->get_zone_params().log_pool; + auto num_shards = cct->_conf->rgw_md_log_max_shards; + call(new PurgeLogShardsCR(store, mdlog, pool, num_shards)); + } + if (retcode < 0) { + ldout(cct, 1) << "failed to remove log shards: " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + ldout(cct, 10) << "removed log shards for realm_epoch=" << cursor.get_epoch() + << " period=" << cursor.get_period().get_id() << dendl; + + // update our mdlog history + yield call(metadata->trim_log_period_cr(cursor, &objv)); + if (retcode == -ENOENT) { + // must have raced to update mdlog history. return success and allow the + // winner to continue purging + ldout(cct, 10) << "already removed log shards for realm_epoch=" << cursor.get_epoch() + << " period=" << cursor.get_period().get_id() << dendl; + return set_cr_done(); + } else if (retcode < 0) { + ldout(cct, 1) << "failed to remove log shards for realm_epoch=" + << cursor.get_epoch() << " period=" << cursor.get_period().get_id() + << " with: " << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + + if (*last_trim_epoch < cursor.get_epoch()) { + *last_trim_epoch = cursor.get_epoch(); + } + + ceph_assert(cursor.has_next()); // get_current() should always come after + cursor.next(); + } + return set_cr_done(); + } + return 0; +} + +namespace { + +using connection_map = std::map>; + +/// construct a RGWRESTConn for each zone in the realm +template +connection_map make_peer_connections(RGWRados *store, + const Zonegroups& zonegroups) +{ + connection_map connections; + for (auto& g : zonegroups) { + for (auto& z : g.second.zones) { + std::unique_ptr conn{ + new RGWRESTConn(store->ctx(), store->svc.zone, z.first, z.second.endpoints)}; + connections.emplace(z.first, std::move(conn)); + } + } + return connections; +} + +/// return the marker that it's safe to trim up to +const std::string& get_stable_marker(const rgw_meta_sync_marker& m) +{ + return m.state == m.FullSync ? m.next_step_marker : m.marker; +} + +/// comparison operator for take_min_status() +bool operator<(const rgw_meta_sync_marker& lhs, const rgw_meta_sync_marker& rhs) +{ + // sort by stable marker + return get_stable_marker(lhs) < get_stable_marker(rhs); +} + +/// populate the status with the minimum stable marker of each shard for any +/// peer whose realm_epoch matches the minimum realm_epoch in the input +template +int take_min_status(CephContext *cct, Iter first, Iter last, + rgw_meta_sync_status *status) +{ + if (first == last) { + return -EINVAL; + } + const size_t num_shards = cct->_conf->rgw_md_log_max_shards; + + status->sync_info.realm_epoch = std::numeric_limits::max(); + for (auto p = first; p != last; ++p) { + // validate peer's shard count + if (p->sync_markers.size() != num_shards) { + ldout(cct, 1) << "take_min_status got peer status with " + << p->sync_markers.size() << " shards, expected " + << num_shards << dendl; + return -EINVAL; + } + if (p->sync_info.realm_epoch < status->sync_info.realm_epoch) { + // earlier epoch, take its entire status + *status = std::move(*p); + } else if (p->sync_info.realm_epoch == status->sync_info.realm_epoch) { + // same epoch, take any earlier markers + auto m = status->sync_markers.begin(); + for (auto& shard : p->sync_markers) { + if (shard.second < m->second) { + m->second = std::move(shard.second); + } + ++m; + } + } + } + return 0; +} + +struct TrimEnv { + const DoutPrefixProvider *dpp; + RGWRados *const store; + RGWHTTPManager *const http; + int num_shards; + const std::string& zone; + Cursor current; //< cursor to current period + epoch_t last_trim_epoch{0}; //< epoch of last mdlog that was purged + + TrimEnv(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards) + : dpp(dpp), store(store), http(http), num_shards(num_shards), + zone(store->svc.zone->get_zone_params().get_id()), + current(store->period_history->get_current()) + {} +}; + +struct MasterTrimEnv : public TrimEnv { + connection_map connections; //< peer connections + std::vector peer_status; //< sync status for each peer + /// last trim marker for each shard, only applies to current period's mdlog + std::vector last_trim_markers; + + MasterTrimEnv(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards) + : TrimEnv(dpp, store, http, num_shards), + last_trim_markers(num_shards) + { + auto& period = current.get_period(); + connections = make_peer_connections(store, period.get_map().zonegroups); + connections.erase(zone); + peer_status.resize(connections.size()); + } +}; + +struct PeerTrimEnv : public TrimEnv { + /// last trim timestamp for each shard, only applies to current period's mdlog + std::vector last_trim_timestamps; + + PeerTrimEnv(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards) + : TrimEnv(dpp, store, http, num_shards), + last_trim_timestamps(num_shards) + {} + + void set_num_shards(int num_shards) { + this->num_shards = num_shards; + last_trim_timestamps.resize(num_shards); + } +}; + +} // anonymous namespace + + +/// spawn a trim cr for each shard that needs it, while limiting the number +/// of concurrent shards +class MetaMasterTrimShardCollectCR : public RGWShardCollectCR { + private: + static constexpr int MAX_CONCURRENT_SHARDS = 16; + + MasterTrimEnv& env; + RGWMetadataLog *mdlog; + int shard_id{0}; + std::string oid; + const rgw_meta_sync_status& sync_status; + + public: + MetaMasterTrimShardCollectCR(MasterTrimEnv& env, RGWMetadataLog *mdlog, + const rgw_meta_sync_status& sync_status) + : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS), + env(env), mdlog(mdlog), sync_status(sync_status) + {} + + bool spawn_next() override; +}; + +bool MetaMasterTrimShardCollectCR::spawn_next() +{ + while (shard_id < env.num_shards) { + auto m = sync_status.sync_markers.find(shard_id); + if (m == sync_status.sync_markers.end()) { + shard_id++; + continue; + } + auto& stable = get_stable_marker(m->second); + auto& last_trim = env.last_trim_markers[shard_id]; + + if (stable <= last_trim) { + // already trimmed + ldout(cct, 20) << "skipping log shard " << shard_id + << " at marker=" << stable + << " last_trim=" << last_trim + << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl; + shard_id++; + continue; + } + + mdlog->get_shard_oid(shard_id, oid); + + ldout(cct, 10) << "trimming log shard " << shard_id + << " at marker=" << stable + << " last_trim=" << last_trim + << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl; + spawn(new RGWSyncLogTrimCR(env.store, oid, stable, &last_trim), false); + shard_id++; + return true; + } + return false; +} + +/// spawn rest requests to read each peer's sync status +class MetaMasterStatusCollectCR : public RGWShardCollectCR { + static constexpr int MAX_CONCURRENT_SHARDS = 16; + + MasterTrimEnv& env; + connection_map::iterator c; + std::vector::iterator s; + public: + explicit MetaMasterStatusCollectCR(MasterTrimEnv& env) + : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS), + env(env), c(env.connections.begin()), s(env.peer_status.begin()) + {} + + bool spawn_next() override { + if (c == env.connections.end()) { + return false; + } + static rgw_http_param_pair params[] = { + { "type", "metadata" }, + { "status", nullptr }, + { nullptr, nullptr } + }; + + ldout(cct, 20) << "query sync status from " << c->first << dendl; + auto conn = c->second.get(); + using StatusCR = RGWReadRESTResourceCR; + spawn(new StatusCR(cct, conn, env.http, "/admin/log/", params, &*s), + false); + ++c; + ++s; + return true; + } +}; + +class MetaMasterTrimCR : public RGWCoroutine { + MasterTrimEnv& env; + rgw_meta_sync_status min_status; //< minimum sync status of all peers + int ret{0}; + + public: + explicit MetaMasterTrimCR(MasterTrimEnv& env) + : RGWCoroutine(env.store->ctx()), env(env) + {} + + int operate() override; +}; + +int MetaMasterTrimCR::operate() +{ + reenter(this) { + // TODO: detect this and fail before we spawn the trim thread? + if (env.connections.empty()) { + ldout(cct, 4) << "no peers, exiting" << dendl; + return set_cr_done(); + } + + ldout(cct, 10) << "fetching sync status for zone " << env.zone << dendl; + // query mdlog sync status from peers + yield call(new MetaMasterStatusCollectCR(env)); + + // must get a successful reply from all peers to consider trimming + if (ret < 0) { + ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl; + return set_cr_error(ret); + } + + // determine the minimum epoch and markers + ret = take_min_status(env.store->ctx(), env.peer_status.begin(), + env.peer_status.end(), &min_status); + if (ret < 0) { + ldout(cct, 4) << "failed to calculate min sync status from peers" << dendl; + return set_cr_error(ret); + } + yield { + auto store = env.store; + auto epoch = min_status.sync_info.realm_epoch; + ldout(cct, 4) << "realm epoch min=" << epoch + << " current=" << env.current.get_epoch()<< dendl; + if (epoch > env.last_trim_epoch + 1) { + // delete any prior mdlog periods + spawn(new PurgePeriodLogsCR(store, epoch, &env.last_trim_epoch), true); + } else { + ldout(cct, 10) << "mdlogs already purged up to realm_epoch " + << env.last_trim_epoch << dendl; + } + + // if realm_epoch == current, trim mdlog based on markers + if (epoch == env.current.get_epoch()) { + auto mdlog = store->meta_mgr->get_log(env.current.get_period().get_id()); + spawn(new MetaMasterTrimShardCollectCR(env, mdlog, min_status), true); + } + } + // ignore any errors during purge/trim because we want to hold the lock open + return set_cr_done(); + } + return 0; +} + + +/// read the first entry of the master's mdlog shard and trim to that position +class MetaPeerTrimShardCR : public RGWCoroutine { + RGWMetaSyncEnv& env; + RGWMetadataLog *mdlog; + const std::string& period_id; + const int shard_id; + RGWMetadataLogInfo info; + ceph::real_time stable; //< safe timestamp to trim, according to master + ceph::real_time *last_trim; //< last trimmed timestamp, updated on trim + rgw_mdlog_shard_data result; //< result from master's mdlog listing + + public: + MetaPeerTrimShardCR(RGWMetaSyncEnv& env, RGWMetadataLog *mdlog, + const std::string& period_id, int shard_id, + ceph::real_time *last_trim) + : RGWCoroutine(env.store->ctx()), env(env), mdlog(mdlog), + period_id(period_id), shard_id(shard_id), last_trim(last_trim) + {} + + int operate() override; +}; + +int MetaPeerTrimShardCR::operate() +{ + reenter(this) { + // query master's first mdlog entry for this shard + yield call(create_list_remote_mdlog_shard_cr(&env, period_id, shard_id, + "", 1, &result)); + if (retcode < 0) { + ldpp_dout(env.dpp, 5) << "failed to read first entry from master's mdlog shard " + << shard_id << " for period " << period_id + << ": " << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + if (result.entries.empty()) { + // if there are no mdlog entries, we don't have a timestamp to compare. we + // can't just trim everything, because there could be racing updates since + // this empty reply. query the mdlog shard info to read its max timestamp, + // then retry the listing to make sure it's still empty before trimming to + // that + ldpp_dout(env.dpp, 10) << "empty master mdlog shard " << shard_id + << ", reading last timestamp from shard info" << dendl; + // read the mdlog shard info for the last timestamp + yield call(create_read_remote_mdlog_shard_info_cr(&env, period_id, shard_id, &info)); + if (retcode < 0) { + ldpp_dout(env.dpp, 5) << "failed to read info from master's mdlog shard " + << shard_id << " for period " << period_id + << ": " << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + if (ceph::real_clock::is_zero(info.last_update)) { + return set_cr_done(); // nothing to trim + } + ldpp_dout(env.dpp, 10) << "got mdlog shard info with last update=" + << info.last_update << dendl; + // re-read the master's first mdlog entry to make sure it hasn't changed + yield call(create_list_remote_mdlog_shard_cr(&env, period_id, shard_id, + "", 1, &result)); + if (retcode < 0) { + ldpp_dout(env.dpp, 5) << "failed to read first entry from master's mdlog shard " + << shard_id << " for period " << period_id + << ": " << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + // if the mdlog is still empty, trim to max marker + if (result.entries.empty()) { + stable = info.last_update; + } else { + stable = result.entries.front().timestamp; + + // can only trim -up to- master's first timestamp, so subtract a second. + // (this is why we use timestamps instead of markers for the peers) + stable -= std::chrono::seconds(1); + } + } else { + stable = result.entries.front().timestamp; + stable -= std::chrono::seconds(1); + } + + if (stable <= *last_trim) { + ldpp_dout(env.dpp, 10) << "skipping log shard " << shard_id + << " at timestamp=" << stable + << " last_trim=" << *last_trim << dendl; + return set_cr_done(); + } + + ldpp_dout(env.dpp, 10) << "trimming log shard " << shard_id + << " at timestamp=" << stable + << " last_trim=" << *last_trim << dendl; + yield { + std::string oid; + mdlog->get_shard_oid(shard_id, oid); + call(new RGWRadosTimelogTrimCR(env.store, oid, real_time{}, stable, "", "")); + } + if (retcode < 0 && retcode != -ENODATA) { + ldpp_dout(env.dpp, 1) << "failed to trim mdlog shard " << shard_id + << ": " << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + *last_trim = stable; + return set_cr_done(); + } + return 0; +} + +class MetaPeerTrimShardCollectCR : public RGWShardCollectCR { + static constexpr int MAX_CONCURRENT_SHARDS = 16; + + PeerTrimEnv& env; + RGWMetadataLog *mdlog; + const std::string& period_id; + RGWMetaSyncEnv meta_env; //< for RGWListRemoteMDLogShardCR + int shard_id{0}; + + public: + MetaPeerTrimShardCollectCR(PeerTrimEnv& env, RGWMetadataLog *mdlog) + : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS), + env(env), mdlog(mdlog), period_id(env.current.get_period().get_id()) + { + meta_env.init(env.dpp, cct, env.store, env.store->svc.zone->get_master_conn(), + env.store->get_async_rados(), env.http, nullptr, + env.store->get_sync_tracer()); + } + + bool spawn_next() override; +}; + +bool MetaPeerTrimShardCollectCR::spawn_next() +{ + if (shard_id >= env.num_shards) { + return false; + } + auto& last_trim = env.last_trim_timestamps[shard_id]; + spawn(new MetaPeerTrimShardCR(meta_env, mdlog, period_id, shard_id, &last_trim), + false); + shard_id++; + return true; +} + +class MetaPeerTrimCR : public RGWCoroutine { + PeerTrimEnv& env; + rgw_mdlog_info mdlog_info; //< master's mdlog info + + public: + explicit MetaPeerTrimCR(PeerTrimEnv& env) : RGWCoroutine(env.store->ctx()), env(env) {} + + int operate() override; +}; + +int MetaPeerTrimCR::operate() +{ + reenter(this) { + ldout(cct, 10) << "fetching master mdlog info" << dendl; + yield { + // query mdlog_info from master for oldest_log_period + rgw_http_param_pair params[] = { + { "type", "metadata" }, + { nullptr, nullptr } + }; + + using LogInfoCR = RGWReadRESTResourceCR; + call(new LogInfoCR(cct, env.store->svc.zone->get_master_conn(), env.http, + "/admin/log/", params, &mdlog_info)); + } + if (retcode < 0) { + ldout(cct, 4) << "failed to read mdlog info from master" << dendl; + return set_cr_error(retcode); + } + // use master's shard count instead + env.set_num_shards(mdlog_info.num_shards); + + if (mdlog_info.realm_epoch > env.last_trim_epoch + 1) { + // delete any prior mdlog periods + yield call(new PurgePeriodLogsCR(env.store, mdlog_info.realm_epoch, + &env.last_trim_epoch)); + } else { + ldout(cct, 10) << "mdlogs already purged through realm_epoch " + << env.last_trim_epoch << dendl; + } + + // if realm_epoch == current, trim mdlog based on master's markers + if (mdlog_info.realm_epoch == env.current.get_epoch()) { + yield { + auto meta_mgr = env.store->meta_mgr; + auto mdlog = meta_mgr->get_log(env.current.get_period().get_id()); + call(new MetaPeerTrimShardCollectCR(env, mdlog)); + // ignore any errors during purge/trim because we want to hold the lock open + } + } + return set_cr_done(); + } + return 0; +} + +class MetaTrimPollCR : public RGWCoroutine { + RGWRados *const store; + const utime_t interval; //< polling interval + const rgw_raw_obj obj; + const std::string name{"meta_trim"}; //< lock name + const std::string cookie; + + protected: + /// allocate the coroutine to run within the lease + virtual RGWCoroutine* alloc_cr() = 0; + + public: + MetaTrimPollCR(RGWRados *store, utime_t interval) + : RGWCoroutine(store->ctx()), store(store), interval(interval), + obj(store->svc.zone->get_zone_params().log_pool, RGWMetadataLogHistory::oid), + cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)) + {} + + int operate() override; +}; + +int MetaTrimPollCR::operate() +{ + reenter(this) { + for (;;) { + set_status("sleeping"); + wait(interval); + + // prevent others from trimming for our entire wait interval + set_status("acquiring trim lock"); + yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store, + obj, name, cookie, interval.sec())); + if (retcode < 0) { + ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl; + continue; + } + + set_status("trimming"); + yield call(alloc_cr()); + + if (retcode < 0) { + // on errors, unlock so other gateways can try + set_status("unlocking"); + yield call(new RGWSimpleRadosUnlockCR(store->get_async_rados(), store, + obj, name, cookie)); + } + } + } + return 0; +} + +class MetaMasterTrimPollCR : public MetaTrimPollCR { + MasterTrimEnv env; //< trim state to share between calls + RGWCoroutine* alloc_cr() override { + return new MetaMasterTrimCR(env); + } + public: + MetaMasterTrimPollCR(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, + int num_shards, utime_t interval) + : MetaTrimPollCR(store, interval), + env(dpp, store, http, num_shards) + {} +}; + +class MetaPeerTrimPollCR : public MetaTrimPollCR { + PeerTrimEnv env; //< trim state to share between calls + RGWCoroutine* alloc_cr() override { + return new MetaPeerTrimCR(env); + } + public: + MetaPeerTrimPollCR(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, + int num_shards, utime_t interval) + : MetaTrimPollCR(store, interval), + env(dpp, store, http, num_shards) + {} +}; + +RGWCoroutine* create_meta_log_trim_cr(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, + int num_shards, utime_t interval) +{ + if (store->svc.zone->is_meta_master()) { + return new MetaMasterTrimPollCR(dpp, store, http, num_shards, interval); + } + return new MetaPeerTrimPollCR(dpp, store, http, num_shards, interval); +} + + +struct MetaMasterAdminTrimCR : private MasterTrimEnv, public MetaMasterTrimCR { + MetaMasterAdminTrimCR(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards) + : MasterTrimEnv(dpp, store, http, num_shards), + MetaMasterTrimCR(*static_cast(this)) + {} +}; + +struct MetaPeerAdminTrimCR : private PeerTrimEnv, public MetaPeerTrimCR { + MetaPeerAdminTrimCR(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards) + : PeerTrimEnv(dpp, store, http, num_shards), + MetaPeerTrimCR(*static_cast(this)) + {} +}; + +RGWCoroutine* create_admin_meta_log_trim_cr(const DoutPrefixProvider *dpp, RGWRados *store, + RGWHTTPManager *http, + int num_shards) +{ + if (store->svc.zone->is_meta_master()) { + return new MetaMasterAdminTrimCR(dpp, store, http, num_shards); + } + return new MetaPeerAdminTrimCR(dpp, store, http, num_shards); +} diff --git a/src/rgw/rgw_trim_mdlog.h b/src/rgw/rgw_trim_mdlog.h new file mode 100644 index 00000000000..79465668f87 --- /dev/null +++ b/src/rgw/rgw_trim_mdlog.h @@ -0,0 +1,22 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +class RGWCoroutine; +class DoutPrefixProvider; +class RGWRados; +class RGWHTTPManager; +class utime_t; + +// MetaLogTrimCR factory function +RGWCoroutine* create_meta_log_trim_cr(const DoutPrefixProvider *dpp, + RGWRados *store, + RGWHTTPManager *http, + int num_shards, utime_t interval); + +// factory function for mdlog trim via radosgw-admin +RGWCoroutine* create_admin_meta_log_trim_cr(const DoutPrefixProvider *dpp, + RGWRados *store, + RGWHTTPManager *http, + int num_shards);