From: Casey Bodley Date: Mon, 6 Jun 2016 20:46:56 +0000 (-0400) Subject: rgw: add RGWDataLogTrimCR X-Git-Tag: v11.0.1~24^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=6a366f955a0caa4febdbcf855a9da482a27b7fb0;p=ceph.git rgw: add RGWDataLogTrimCR Signed-off-by: Casey Bodley --- diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index 12182d12935a..ee52ad626b7e 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -466,6 +466,7 @@ public: ~RGWDataChangesLog(); int choose_oid(const rgw_bucket_shard& bs); + const std::string& get_oid(int shard_id) const { return oids[shard_id]; } int add_entry(rgw_bucket& bucket, int shard_id); int get_log_shard_id(rgw_bucket& bucket, int shard_id); int renew_entries(); diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index 5174ab6e1f29..a1cbfabc8dee 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -984,13 +984,14 @@ public: class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine { RGWRados *store; RGWAioCompletionNotifier *cn{nullptr}; + protected: std::string oid; real_time start_time; real_time end_time; std::string from_marker; std::string to_marker; -public: + public: RGWRadosTimelogTrimCR(RGWRados *store, const std::string& oid, const real_time& start_time, const real_time& end_time, const std::string& from_marker, diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index fe7155c8036a..051231129269 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -2838,3 +2838,148 @@ string RGWBucketSyncStatusManager::status_oid(const string& source_zone, return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key(); } + +#undef dout_prefix +#define dout_prefix (*_dout << "data trim: ") + +namespace { + +/// return the marker that it's safe to trim up to +const std::string& get_stable_marker(const rgw_data_sync_marker& m) +{ + return m.state == m.FullSync ? m.next_step_marker : m.marker; +} + +/// comparison operator for take_min_markers() +bool operator<(const rgw_data_sync_marker& lhs, + const rgw_data_sync_marker& rhs) +{ + // sort by stable marker + return get_stable_marker(lhs) < get_stable_marker(rhs); +} + +/// populate the container starting with 'dest' with the minimum stable marker +/// of each shard for all of the peers in [first, last) +template +void take_min_markers(IterIn first, IterIn last, IterOut dest) +{ + if (first == last) { + return; + } + // initialize markers with the first peer's + auto m = dest; + for (auto &shard : first->sync_markers) { + *m = std::move(shard.second); + ++m; + } + // for remaining peers, replace with smaller markers + for (auto p = first + 1; p != last; ++p) { + m = dest; + for (auto &shard : p->sync_markers) { + if (shard.second < *m) { + *m = std::move(shard.second); + } + ++m; + } + } +} + +// wrapper to update last_trim_marker on success +class LastTimelogTrimCR : public RGWRadosTimelogTrimCR { + CephContext *cct; + std::string *last_trim_marker; + public: + LastTimelogTrimCR(RGWRados *store, const std::string& oid, + const std::string& to_marker, std::string *last_trim_marker) + : RGWRadosTimelogTrimCR(store, oid, real_time{}, real_time{}, + std::string{}, to_marker), + cct(store->ctx()), last_trim_marker(last_trim_marker) + {} + int request_complete() override { + int r = RGWRadosTimelogTrimCR::request_complete(); + if (r < 0 && r != -ENODATA) { + ldout(cct, 1) << "failed to trim datalog: " << cpp_strerror(r) << dendl; + return r; + } + ldout(cct, 10) << "datalog trimmed to marker " << to_marker << dendl; + *last_trim_marker = to_marker; + return 0; + } +}; + +} // anonymous namespace + +RGWDataLogTrimCR::RGWDataLogTrimCR(RGWRados *store, RGWHTTPManager *http, + int num_shards, utime_t interval) + : RGWCoroutine(store->ctx()), store(store), http(http), + num_shards(num_shards), interval(interval), + zone(store->get_zone().id), + peer_status(store->zone_conn_map.size()), + min_shard_markers(num_shards), + last_trim(num_shards) +{ +} + +int RGWDataLogTrimCR::operate() +{ + reenter(this) { + for (;;) { + yield wait(interval); + + ldout(cct, 10) << "fetching sync status for zone " << zone << dendl; + yield { + // query data sync status from each sync peer + rgw_http_param_pair params[] = { + { "type", "data" }, + { "status", nullptr }, + { "source-zone", zone.c_str() }, + { nullptr, nullptr } + }; + + auto p = peer_status.begin(); + for (auto& c : store->zone_conn_map) { + ldout(cct, 20) << "query sync status from " << c.first << dendl; + using StatusCR = RGWReadRESTResourceCR; + spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p), + false); + ++p; + } + } + + // must get a successful reply from all peers to consider trimming + ret = 0; + while (ret == 0 && num_spawned()) { + yield wait_for_child(); + collect_next(&ret); + } + if (ret < 0) { + ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl; + drain_all(); + continue; + } + + ldout(cct, 10) << "trimming log shards" << dendl; + yield { + // determine the minimum marker for each shard + take_min_markers(peer_status.begin(), peer_status.end(), + min_shard_markers.begin()); + + for (int i = 0; i < num_shards; i++) { + const auto& m = min_shard_markers[i]; + auto& stable = get_stable_marker(m); + if (stable <= last_trim[i]) { + continue; + } + ldout(cct, 10) << "trimming log shard " << i + << " at marker=" << stable + << " last_trim=" << last_trim[i] << dendl; + using TrimCR = LastTimelogTrimCR; + spawn(new TrimCR(store, store->data_log->get_oid(i), + stable, &last_trim[i]), + true); + } + } + } + } + return 0; +} diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 1c950674c224..07953c5d942c 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -501,4 +501,21 @@ public: }; +class RGWDataLogTrimCR : public RGWCoroutine { + RGWRados *store; + RGWHTTPManager *http; + const int num_shards; + const utime_t interval; //< polling interval + const std::string& zone; //< my zone id + std::vector peer_status; //< sync status for each peer + std::vector min_shard_markers; //< min marker per shard + std::vector last_trim; //< last trimmed marker per shard + int ret{0}; + + public: + RGWDataLogTrimCR(RGWRados *store, RGWHTTPManager *http, + int num_shards, utime_t interval); + int operate() override; +}; + #endif