From: Casey Bodley Date: Sat, 13 Apr 2019 17:21:44 +0000 (-0400) Subject: rgw: move datalog trimming into rgw_trim_datalog.cc X-Git-Tag: v15.1.0~2837^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e96281640b86cb50a5174b06be3ee08a7ac6b8ed;p=ceph.git rgw: move datalog trimming into rgw_trim_datalog.cc Signed-off-by: Casey Bodley --- diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index b93cac4aa1e1..cff1ead0ebfe 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -74,6 +74,7 @@ set(librgw_common_srcs rgw_sync_module_pubsub_rest.cc rgw_sync_log_trim.cc rgw_sync_trace.cc + rgw_trim_datalog.cc rgw_trim_mdlog.cc rgw_period_history.cc rgw_period_puller.cc diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index e76d2954024f..de75af45e1e4 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -44,6 +44,7 @@ extern "C" { #include "rgw_usage.h" #include "rgw_orphan.h" #include "rgw_sync.h" +#include "rgw_trim_datalog.h" #include "rgw_trim_mdlog.h" #include "rgw_sync_log_trim.h" #include "rgw_data_sync.h" diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 6c8d4236cd16..33e69c965941 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -3503,208 +3503,3 @@ int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, RGWRados *store, const return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, num_shards, bucket_info.bucket, status)); } - - -// TODO: move into rgw_data_sync_trim.cc -#undef dout_prefix -#define dout_prefix (*_dout << "data trim: ") - -namespace { - -/// return the marker that it's safe to trim up to -const std::string& get_stable_marker(const rgw_data_sync_marker& m) -{ - return m.state == m.FullSync ? m.next_step_marker : m.marker; -} - -/// comparison operator for take_min_markers() -bool operator<(const rgw_data_sync_marker& lhs, - const rgw_data_sync_marker& rhs) -{ - // sort by stable marker - return get_stable_marker(lhs) < get_stable_marker(rhs); -} - -/// populate the container starting with 'dest' with the minimum stable marker -/// of each shard for all of the peers in [first, last) -template -void take_min_markers(IterIn first, IterIn last, IterOut dest) -{ - if (first == last) { - return; - } - // initialize markers with the first peer's - auto m = dest; - for (auto &shard : first->sync_markers) { - *m = std::move(shard.second); - ++m; - } - // for remaining peers, replace with smaller markers - for (auto p = first + 1; p != last; ++p) { - m = dest; - for (auto &shard : p->sync_markers) { - if (shard.second < *m) { - *m = std::move(shard.second); - } - ++m; - } - } -} - -} // anonymous namespace - -class DataLogTrimCR : public RGWCoroutine { - RGWRados *store; - RGWHTTPManager *http; - const int num_shards; - const std::string& zone_id; //< my zone id - std::vector peer_status; //< sync status for each peer - std::vector min_shard_markers; //< min marker per shard - std::vector& last_trim; //< last trimmed marker per shard - int ret{0}; - - public: - DataLogTrimCR(RGWRados *store, RGWHTTPManager *http, - int num_shards, std::vector& last_trim) - : RGWCoroutine(store->ctx()), store(store), http(http), - num_shards(num_shards), - zone_id(store->svc.zone->get_zone().id), - peer_status(store->svc.zone->get_zone_data_notify_to_map().size()), - min_shard_markers(num_shards), - last_trim(last_trim) - {} - - int operate() override; -}; - -int DataLogTrimCR::operate() -{ - reenter(this) { - ldout(cct, 10) << "fetching sync status for zone " << zone_id << dendl; - set_status("fetching sync status"); - yield { - // query data sync status from each sync peer - rgw_http_param_pair params[] = { - { "type", "data" }, - { "status", nullptr }, - { "source-zone", zone_id.c_str() }, - { nullptr, nullptr } - }; - - auto p = peer_status.begin(); - for (auto& c : store->svc.zone->get_zone_data_notify_to_map()) { - ldout(cct, 20) << "query sync status from " << c.first << dendl; - using StatusCR = RGWReadRESTResourceCR; - spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p), - false); - ++p; - } - } - - // must get a successful reply from all peers to consider trimming - ret = 0; - while (ret == 0 && num_spawned() > 0) { - yield wait_for_child(); - collect_next(&ret); - } - drain_all(); - - if (ret < 0) { - ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl; - return set_cr_error(ret); - } - - ldout(cct, 10) << "trimming log shards" << dendl; - set_status("trimming log shards"); - yield { - // determine the minimum marker for each shard - take_min_markers(peer_status.begin(), peer_status.end(), - min_shard_markers.begin()); - - for (int i = 0; i < num_shards; i++) { - const auto& m = min_shard_markers[i]; - auto& stable = get_stable_marker(m); - if (stable <= last_trim[i]) { - continue; - } - ldout(cct, 10) << "trimming log shard " << i - << " at marker=" << stable - << " last_trim=" << last_trim[i] << dendl; - using TrimCR = RGWSyncLogTrimCR; - spawn(new TrimCR(store, store->data_log->get_oid(i), - stable, &last_trim[i]), - true); - } - } - return set_cr_done(); - } - return 0; -} - -RGWCoroutine* create_admin_data_log_trim_cr(RGWRados *store, - RGWHTTPManager *http, - int num_shards, - std::vector& markers) -{ - return new DataLogTrimCR(store, http, num_shards, markers); -} - -class DataLogTrimPollCR : public RGWCoroutine { - RGWRados *store; - RGWHTTPManager *http; - const int num_shards; - const utime_t interval; //< polling interval - const std::string lock_oid; //< use first data log shard for lock - const std::string lock_cookie; - std::vector last_trim; //< last trimmed marker per shard - - public: - DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http, - int num_shards, utime_t interval) - : RGWCoroutine(store->ctx()), store(store), http(http), - num_shards(num_shards), interval(interval), - lock_oid(store->data_log->get_oid(0)), - lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)), - last_trim(num_shards) - {} - - int operate() override; -}; - -int DataLogTrimPollCR::operate() -{ - reenter(this) { - for (;;) { - set_status("sleeping"); - wait(interval); - - // request a 'data_trim' lock that covers the entire wait interval to - // prevent other gateways from attempting to trim for the duration - set_status("acquiring trim lock"); - yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store, - rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, lock_oid), - "data_trim", lock_cookie, - interval.sec())); - if (retcode < 0) { - // if the lock is already held, go back to sleep and try again later - ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in " - << interval.sec() << "s" << dendl; - continue; - } - - set_status("trimming"); - yield call(new DataLogTrimCR(store, http, num_shards, last_trim)); - - // note that the lock is not released. this is intentional, as it avoids - // duplicating this work in other gateways - } - } - return 0; -} - -RGWCoroutine* create_data_log_trim_cr(RGWRados *store, - RGWHTTPManager *http, - int num_shards, utime_t interval) -{ - return new DataLogTrimPollCR(store, http, num_shards, interval); -} diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 55a71d720daa..440ef153cb28 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -611,15 +611,4 @@ public: int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; }; -// DataLogTrimCR factory function -extern RGWCoroutine* create_data_log_trim_cr(RGWRados *store, - RGWHTTPManager *http, - int num_shards, utime_t interval); - -// factory function for datalog trim via radosgw-admin -RGWCoroutine* create_admin_data_log_trim_cr(RGWRados *store, - RGWHTTPManager *http, - int num_shards, - std::vector& markers); - #endif diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 4a18def8d258..f6a68d44d058 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -68,6 +68,7 @@ using namespace librados; #include "rgw_sync.h" #include "rgw_sync_counters.h" #include "rgw_sync_trace.h" +#include "rgw_trim_datalog.h" #include "rgw_trim_mdlog.h" #include "rgw_data_sync.h" #include "rgw_realm_watcher.h" diff --git a/src/rgw/rgw_trim_datalog.cc b/src/rgw/rgw_trim_datalog.cc new file mode 100644 index 000000000000..272dfabca719 --- /dev/null +++ b/src/rgw/rgw_trim_datalog.cc @@ -0,0 +1,218 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include +#include + +#include "rgw_trim_datalog.h" +#include "rgw_cr_rados.h" +#include "rgw_cr_rest.h" +#include "rgw_data_sync.h" +#include "rgw_zone.h" + +#include + +#define dout_subsys ceph_subsys_rgw + +#undef dout_prefix +#define dout_prefix (*_dout << "data trim: ") + +namespace { + +/// return the marker that it's safe to trim up to +const std::string& get_stable_marker(const rgw_data_sync_marker& m) +{ + return m.state == m.FullSync ? m.next_step_marker : m.marker; +} + +/// comparison operator for take_min_markers() +bool operator<(const rgw_data_sync_marker& lhs, + const rgw_data_sync_marker& rhs) +{ + // sort by stable marker + return get_stable_marker(lhs) < get_stable_marker(rhs); +} + +/// populate the container starting with 'dest' with the minimum stable marker +/// of each shard for all of the peers in [first, last) +template +void take_min_markers(IterIn first, IterIn last, IterOut dest) +{ + if (first == last) { + return; + } + // initialize markers with the first peer's + auto m = dest; + for (auto &shard : first->sync_markers) { + *m = std::move(shard.second); + ++m; + } + // for remaining peers, replace with smaller markers + for (auto p = first + 1; p != last; ++p) { + m = dest; + for (auto &shard : p->sync_markers) { + if (shard.second < *m) { + *m = std::move(shard.second); + } + ++m; + } + } +} + +} // anonymous namespace + +class DataLogTrimCR : public RGWCoroutine { + RGWRados *store; + RGWHTTPManager *http; + const int num_shards; + const std::string& zone_id; //< my zone id + std::vector peer_status; //< sync status for each peer + std::vector min_shard_markers; //< min marker per shard + std::vector& last_trim; //< last trimmed marker per shard + int ret{0}; + + public: + DataLogTrimCR(RGWRados *store, RGWHTTPManager *http, + int num_shards, std::vector& last_trim) + : RGWCoroutine(store->ctx()), store(store), http(http), + num_shards(num_shards), + zone_id(store->svc.zone->get_zone().id), + peer_status(store->svc.zone->get_zone_data_notify_to_map().size()), + min_shard_markers(num_shards), + last_trim(last_trim) + {} + + int operate() override; +}; + +int DataLogTrimCR::operate() +{ + reenter(this) { + ldout(cct, 10) << "fetching sync status for zone " << zone_id << dendl; + set_status("fetching sync status"); + yield { + // query data sync status from each sync peer + rgw_http_param_pair params[] = { + { "type", "data" }, + { "status", nullptr }, + { "source-zone", zone_id.c_str() }, + { nullptr, nullptr } + }; + + auto p = peer_status.begin(); + for (auto& c : store->svc.zone->get_zone_data_notify_to_map()) { + ldout(cct, 20) << "query sync status from " << c.first << dendl; + using StatusCR = RGWReadRESTResourceCR; + spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p), + false); + ++p; + } + } + + // must get a successful reply from all peers to consider trimming + ret = 0; + while (ret == 0 && num_spawned() > 0) { + yield wait_for_child(); + collect_next(&ret); + } + drain_all(); + + if (ret < 0) { + ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl; + return set_cr_error(ret); + } + + ldout(cct, 10) << "trimming log shards" << dendl; + set_status("trimming log shards"); + yield { + // determine the minimum marker for each shard + take_min_markers(peer_status.begin(), peer_status.end(), + min_shard_markers.begin()); + + for (int i = 0; i < num_shards; i++) { + const auto& m = min_shard_markers[i]; + auto& stable = get_stable_marker(m); + if (stable <= last_trim[i]) { + continue; + } + ldout(cct, 10) << "trimming log shard " << i + << " at marker=" << stable + << " last_trim=" << last_trim[i] << dendl; + using TrimCR = RGWSyncLogTrimCR; + spawn(new TrimCR(store, store->data_log->get_oid(i), + stable, &last_trim[i]), + true); + } + } + return set_cr_done(); + } + return 0; +} + +RGWCoroutine* create_admin_data_log_trim_cr(RGWRados *store, + RGWHTTPManager *http, + int num_shards, + std::vector& markers) +{ + return new DataLogTrimCR(store, http, num_shards, markers); +} + +class DataLogTrimPollCR : public RGWCoroutine { + RGWRados *store; + RGWHTTPManager *http; + const int num_shards; + const utime_t interval; //< polling interval + const std::string lock_oid; //< use first data log shard for lock + const std::string lock_cookie; + std::vector last_trim; //< last trimmed marker per shard + + public: + DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http, + int num_shards, utime_t interval) + : RGWCoroutine(store->ctx()), store(store), http(http), + num_shards(num_shards), interval(interval), + lock_oid(store->data_log->get_oid(0)), + lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)), + last_trim(num_shards) + {} + + int operate() override; +}; + +int DataLogTrimPollCR::operate() +{ + reenter(this) { + for (;;) { + set_status("sleeping"); + wait(interval); + + // request a 'data_trim' lock that covers the entire wait interval to + // prevent other gateways from attempting to trim for the duration + set_status("acquiring trim lock"); + yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store, + rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, lock_oid), + "data_trim", lock_cookie, + interval.sec())); + if (retcode < 0) { + // if the lock is already held, go back to sleep and try again later + ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in " + << interval.sec() << "s" << dendl; + continue; + } + + set_status("trimming"); + yield call(new DataLogTrimCR(store, http, num_shards, last_trim)); + + // note that the lock is not released. this is intentional, as it avoids + // duplicating this work in other gateways + } + } + return 0; +} + +RGWCoroutine* create_data_log_trim_cr(RGWRados *store, + RGWHTTPManager *http, + int num_shards, utime_t interval) +{ + return new DataLogTrimPollCR(store, http, num_shards, interval); +} diff --git a/src/rgw/rgw_trim_datalog.h b/src/rgw/rgw_trim_datalog.h new file mode 100644 index 000000000000..6b640dafe01f --- /dev/null +++ b/src/rgw/rgw_trim_datalog.h @@ -0,0 +1,20 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +class RGWCoroutine; +class RGWRados; +class RGWHTTPManager; +class utime_t; + +// DataLogTrimCR factory function +extern RGWCoroutine* create_data_log_trim_cr(RGWRados *store, + RGWHTTPManager *http, + int num_shards, utime_t interval); + +// factory function for datalog trim via radosgw-admin +RGWCoroutine* create_admin_data_log_trim_cr(RGWRados *store, + RGWHTTPManager *http, + int num_shards, + std::vector& markers);