From: Casey Bodley Date: Sun, 31 Jul 2016 16:42:24 +0000 (-0400) Subject: rgw: hold lock for data log trimming X-Git-Tag: v10.2.11~111^2~24 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=f885c7b71d0b4ccba6a601fe610ece823ad52daf;p=ceph.git rgw: hold lock for data log trimming Signed-off-by: Casey Bodley (cherry picked from commit 0777fff4163588850a95433799d4e3ce1bd8b6e9) --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index a3e39f078cbe3..70afacbe8c006 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -2950,22 +2950,21 @@ class DataLogTrimCR : public RGWCoroutine { RGWRados *store; RGWHTTPManager *http; const int num_shards; - const utime_t interval; //< polling interval const std::string& zone; //< my zone id std::vector peer_status; //< sync status for each peer std::vector min_shard_markers; //< min marker per shard - std::vector last_trim; //< last trimmed marker per shard + std::vector& last_trim; //< last trimmed marker per shard int ret{0}; public: DataLogTrimCR(RGWRados *store, RGWHTTPManager *http, - int num_shards, utime_t interval) + int num_shards, std::vector& last_trim) : RGWCoroutine(store->ctx()), store(store), http(http), - num_shards(num_shards), interval(interval), + num_shards(num_shards), zone(store->get_zone().id), peer_status(store->zone_conn_map.size()), min_shard_markers(num_shards), - last_trim(num_shards) + last_trim(last_trim) {} int operate() override; @@ -2974,62 +2973,115 @@ class DataLogTrimCR : public RGWCoroutine { int DataLogTrimCR::operate() { reenter(this) { - for (;;) { - yield wait(interval); + ldout(cct, 10) << "fetching sync status for zone " << zone << dendl; + set_status("fetching sync status"); + yield { + // query data sync status from each sync peer + rgw_http_param_pair params[] = { + { "type", "data" }, + { "status", nullptr }, + { "source-zone", zone.c_str() }, + { nullptr, nullptr } + }; + + auto p = peer_status.begin(); + for (auto& c : store->zone_conn_map) { + ldout(cct, 20) << "query sync status from " << c.first << dendl; + using StatusCR = RGWReadRESTResourceCR; + spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p), + false); + ++p; + } + } + + // must get a successful reply from all peers to consider trimming + ret = 0; + while (ret == 0 && num_spawned() > 0) { + yield wait_for_child(); + collect_next(&ret); + } + drain_all(); - ldout(cct, 10) << "fetching sync status for zone " << zone << dendl; - yield { - // query data sync status from each sync peer - rgw_http_param_pair params[] = { - { "type", "data" }, - { "status", nullptr }, - { "source-zone", zone.c_str() }, - { nullptr, nullptr } - }; - - auto p = peer_status.begin(); - for (auto& c : store->zone_conn_map) { - ldout(cct, 20) << "query sync status from " << c.first << dendl; - using StatusCR = RGWReadRESTResourceCR; - spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p), - false); - ++p; + if (ret < 0) { + ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl; + return set_cr_error(ret); + } + + ldout(cct, 10) << "trimming log shards" << dendl; + set_status("trimming log shards"); + yield { + // determine the minimum marker for each shard + take_min_markers(peer_status.begin(), peer_status.end(), + min_shard_markers.begin()); + + for (int i = 0; i < num_shards; i++) { + const auto& m = min_shard_markers[i]; + auto& stable = get_stable_marker(m); + if (stable <= last_trim[i]) { + continue; } + ldout(cct, 10) << "trimming log shard " << i + << " at marker=" << stable + << " last_trim=" << last_trim[i] << dendl; + using TrimCR = LastTimelogTrimCR; + spawn(new TrimCR(store, store->data_log->get_oid(i), + stable, &last_trim[i]), + true); } + } + return set_cr_done(); + } + return 0; +} - // must get a successful reply from all peers to consider trimming - ret = 0; - while (ret == 0 && num_spawned()) { - yield wait_for_child(); - collect_next(&ret); - } - if (ret < 0) { - ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl; - drain_all(); +class DataLogTrimPollCR : public RGWCoroutine { + RGWRados *store; + RGWHTTPManager *http; + const int num_shards; + const utime_t interval; //< polling interval + const std::string lock_oid; //< use first data log shard for lock + const std::string lock_cookie; + std::vector last_trim; //< last trimmed marker per shard + + public: + DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http, + int num_shards, utime_t interval) + : RGWCoroutine(store->ctx()), store(store), http(http), + num_shards(num_shards), interval(interval), + lock_oid(store->data_log->get_oid(0)), + lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)), + last_trim(num_shards) + {} + + int operate(); +}; + +int DataLogTrimPollCR::operate() +{ + reenter(this) { + for (;;) { + set_status("sleeping"); + wait(interval); + + // request a 'data_trim' lock that covers the entire wait interval to + // prevent other gateways from attempting to trim for the duration + set_status("acquiring trim lock"); + yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store, + store->get_zone_params().log_pool, + lock_oid, "data_trim", lock_cookie, + interval.sec())); + if (retcode < 0) { + // if the lock is already held, go back to sleep and try again later + ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in " + << interval.sec() << "s" << dendl; continue; } - ldout(cct, 10) << "trimming log shards" << dendl; - yield { - // determine the minimum marker for each shard - take_min_markers(peer_status.begin(), peer_status.end(), - min_shard_markers.begin()); - - for (int i = 0; i < num_shards; i++) { - const auto& m = min_shard_markers[i]; - auto& stable = get_stable_marker(m); - if (stable <= last_trim[i]) { - continue; - } - ldout(cct, 10) << "trimming log shard " << i - << " at marker=" << stable - << " last_trim=" << last_trim[i] << dendl; - using TrimCR = LastTimelogTrimCR; - spawn(new TrimCR(store, store->data_log->get_oid(i), - stable, &last_trim[i]), - true); - } - } + set_status("trimming"); + yield call(new DataLogTrimCR(store, http, num_shards, last_trim)); + + // note that the lock is not released. this is intentional, as it avoids + // duplicating this work in other gateways } } return 0; @@ -3041,5 +3093,5 @@ RGWCoroutine* create_data_log_trim_cr(RGWRados *store, RGWHTTPManager *http, int num_shards, utime_t interval) { - return new DataLogTrimCR(store, http, num_shards, interval); + return new DataLogTrimPollCR(store, http, num_shards, interval); }