}
+#undef dout_prefix
+#define dout_prefix (*_dout << "meta trim: ")
+
+namespace {
+
+using Cursor = RGWPeriodHistory::Cursor;
+
+using connection_map = std::map<std::string, std::unique_ptr<RGWRESTConn>>;
+
+/// construct a RGWRESTConn for each zone in the realm
+template <typename Zonegroups>
+connection_map make_peer_connections(RGWRados *store,
+ const Zonegroups& zonegroups)
+{
+ connection_map connections;
+ for (auto& g : zonegroups) {
+ for (auto& z : g.second.zones) {
+ std::unique_ptr<RGWRESTConn> conn{
+ new RGWRESTConn(store->ctx(), store, z.first, z.second.endpoints)};
+ connections.emplace(z.first, std::move(conn));
+ }
+ }
+ return connections;
+}
+
+struct TrimEnv {
+ RGWRados *const store;
+ RGWHTTPManager *const http;
+ int num_shards;
+ const std::string& zone;
+ Cursor current; //< cursor to current period
+
+ TrimEnv(RGWRados *store, RGWHTTPManager *http, int num_shards)
+ : store(store), http(http), num_shards(num_shards),
+ zone(store->get_zone_params().get_id()),
+ current(store->period_history->get_current())
+ {}
+};
+
+struct MasterTrimEnv : public TrimEnv {
+ connection_map connections; //< peer connections
+ std::vector<rgw_meta_sync_status> peer_status; //< sync status for each peer
+ /// last trim marker for each shard, only applies to current period's mdlog
+ std::vector<std::string> last_trim_markers;
+
+ MasterTrimEnv(RGWRados *store, RGWHTTPManager *http, int num_shards)
+ : TrimEnv(store, http, num_shards),
+ last_trim_markers(num_shards)
+ {
+ auto& period = current.get_period();
+ connections = make_peer_connections(store, period.get_map().zonegroups);
+ connections.erase(zone);
+ peer_status.resize(connections.size());
+ }
+};
+
+} // anonymous namespace
+
+
+class MetaMasterTrimCR : public RGWCoroutine {
+ MasterTrimEnv& env;
+ int ret{0};
+
+ public:
+ MetaMasterTrimCR(MasterTrimEnv& env)
+ : RGWCoroutine(env.store->ctx()), env(env)
+ {}
+
+ int operate();
+};
+
+int MetaMasterTrimCR::operate()
+{
+ reenter(this) {
+ // TODO: detect this and fail before we spawn the trim thread?
+ if (env.connections.empty()) {
+ ldout(cct, 4) << "no peers, exiting" << dendl;
+ return set_cr_done();
+ }
+
+ ldout(cct, 10) << "fetching sync status for zone " << env.zone << dendl;
+ yield {
+ // query mdlog sync status from peers
+ rgw_http_param_pair params[] = {
+ { "type", "metadata" },
+ { "status", nullptr },
+ { nullptr, nullptr }
+ };
+
+ auto p = env.peer_status.begin();
+ for (auto& c : env.connections) {
+ ldout(cct, 20) << "query sync status from " << c.first << dendl;
+ using StatusCR = RGWReadRESTResourceCR<rgw_meta_sync_status>;
+ auto conn = c.second.get();
+ spawn(new StatusCR(cct, conn, env.http, "/admin/log/", params, &*p),
+ false);
+ ++p;
+ }
+ }
+
+ // must get a successful reply from all peers to consider trimming
+ ret = 0;
+ while (ret == 0 && num_spawned()) {
+ yield wait_for_child();
+ collect_next(&ret);
+ }
+ if (ret < 0) {
+ ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
+ drain_all();
+ return set_cr_error(ret);
+ }
+ return set_cr_done();
+ }
+ return 0;
+}