return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
}
+
+#undef dout_prefix
+#define dout_prefix (*_dout << "data trim: ")
+
+namespace {
+
+/// return the marker that it's safe to trim up to
+const std::string& get_stable_marker(const rgw_data_sync_marker& m)
+{
+ return m.state == m.FullSync ? m.next_step_marker : m.marker;
+}
+
+/// comparison operator for take_min_markers()
+bool operator<(const rgw_data_sync_marker& lhs,
+ const rgw_data_sync_marker& rhs)
+{
+ // sort by stable marker
+ return get_stable_marker(lhs) < get_stable_marker(rhs);
+}
+
+/// populate the container starting with 'dest' with the minimum stable marker
+/// of each shard for all of the peers in [first, last)
+template <typename IterIn, typename IterOut>
+void take_min_markers(IterIn first, IterIn last, IterOut dest)
+{
+ if (first == last) {
+ return;
+ }
+ // initialize markers with the first peer's
+ auto m = dest;
+ for (auto &shard : first->sync_markers) {
+ *m = std::move(shard.second);
+ ++m;
+ }
+ // for remaining peers, replace with smaller markers
+ for (auto p = first + 1; p != last; ++p) {
+ m = dest;
+ for (auto &shard : p->sync_markers) {
+ if (shard.second < *m) {
+ *m = std::move(shard.second);
+ }
+ ++m;
+ }
+ }
+}
+
+// wrapper to update last_trim_marker on success
+class LastTimelogTrimCR : public RGWRadosTimelogTrimCR {
+ CephContext *cct;
+ std::string *last_trim_marker;
+ public:
+ LastTimelogTrimCR(RGWRados *store, const std::string& oid,
+ const std::string& to_marker, std::string *last_trim_marker)
+ : RGWRadosTimelogTrimCR(store, oid, real_time{}, real_time{},
+ std::string{}, to_marker),
+ cct(store->ctx()), last_trim_marker(last_trim_marker)
+ {}
+ int request_complete() override {
+ int r = RGWRadosTimelogTrimCR::request_complete();
+ if (r < 0 && r != -ENODATA) {
+ ldout(cct, 1) << "failed to trim datalog: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+ ldout(cct, 10) << "datalog trimmed to marker " << to_marker << dendl;
+ *last_trim_marker = to_marker;
+ return 0;
+ }
+};
+
+} // anonymous namespace
+
+RGWDataLogTrimCR::RGWDataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
+ int num_shards, utime_t interval)
+ : RGWCoroutine(store->ctx()), store(store), http(http),
+ num_shards(num_shards), interval(interval),
+ zone(store->get_zone().id),
+ peer_status(store->zone_conn_map.size()),
+ min_shard_markers(num_shards),
+ last_trim(num_shards)
+{
+}
+
+int RGWDataLogTrimCR::operate()
+{
+ reenter(this) {
+ for (;;) {
+ yield wait(interval);
+
+ ldout(cct, 10) << "fetching sync status for zone " << zone << dendl;
+ yield {
+ // query data sync status from each sync peer
+ rgw_http_param_pair params[] = {
+ { "type", "data" },
+ { "status", nullptr },
+ { "source-zone", zone.c_str() },
+ { nullptr, nullptr }
+ };
+
+ auto p = peer_status.begin();
+ for (auto& c : store->zone_conn_map) {
+ ldout(cct, 20) << "query sync status from " << c.first << dendl;
+ using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
+ spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
+ false);
+ ++p;
+ }
+ }
+
+ // must get a successful reply from all peers to consider trimming
+ ret = 0;
+ while (ret == 0 && num_spawned()) {
+ yield wait_for_child();
+ collect_next(&ret);
+ }
+ if (ret < 0) {
+ ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
+ drain_all();
+ continue;
+ }
+
+ ldout(cct, 10) << "trimming log shards" << dendl;
+ yield {
+ // determine the minimum marker for each shard
+ take_min_markers(peer_status.begin(), peer_status.end(),
+ min_shard_markers.begin());
+
+ for (int i = 0; i < num_shards; i++) {
+ const auto& m = min_shard_markers[i];
+ auto& stable = get_stable_marker(m);
+ if (stable <= last_trim[i]) {
+ continue;
+ }
+ ldout(cct, 10) << "trimming log shard " << i
+ << " at marker=" << stable
+ << " last_trim=" << last_trim[i] << dendl;
+ using TrimCR = LastTimelogTrimCR;
+ spawn(new TrimCR(store, store->data_log->get_oid(i),
+ stable, &last_trim[i]),
+ true);
+ }
+ }
+ }
+ }
+ return 0;
+}