return false;
}
+/// spawn rest requests to read each peer's sync status
+class MetaMasterStatusCollectCR : public RGWShardCollectCR {
+ static constexpr int MAX_CONCURRENT_SHARDS = 16;
+
+ MasterTrimEnv& env;
+ connection_map::iterator c;
+ std::vector<rgw_meta_sync_status>::iterator s;
+ public:
+ MetaMasterStatusCollectCR(MasterTrimEnv& env)
+ : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
+ env(env), c(env.connections.begin()), s(env.peer_status.begin())
+ {}
+
+ bool spawn_next() override {
+ if (c == env.connections.end()) {
+ return false;
+ }
+ static rgw_http_param_pair params[] = {
+ { "type", "metadata" },
+ { "status", nullptr },
+ { nullptr, nullptr }
+ };
+
+ ldout(cct, 20) << "query sync status from " << c->first << dendl;
+ auto conn = c->second.get();
+ using StatusCR = RGWReadRESTResourceCR<rgw_meta_sync_status>;
+ spawn(new StatusCR(cct, conn, env.http, "/admin/log/", params, &*s),
+ false);
+ ++c;
+ ++s;
+ return true;
+ }
+};
+
class MetaMasterTrimCR : public RGWCoroutine {
MasterTrimEnv& env;
rgw_meta_sync_status min_status; //< minimum sync status of all peers
}
ldout(cct, 10) << "fetching sync status for zone " << env.zone << dendl;
- yield {
- // query mdlog sync status from peers
- rgw_http_param_pair params[] = {
- { "type", "metadata" },
- { "status", nullptr },
- { nullptr, nullptr }
- };
-
- auto p = env.peer_status.begin();
- for (auto& c : env.connections) {
- ldout(cct, 20) << "query sync status from " << c.first << dendl;
- using StatusCR = RGWReadRESTResourceCR<rgw_meta_sync_status>;
- auto conn = c.second.get();
- spawn(new StatusCR(cct, conn, env.http, "/admin/log/", params, &*p),
- false);
- ++p;
- }
- }
+ // query mdlog sync status from peers
+ yield call(new MetaMasterStatusCollectCR(env));
// must get a successful reply from all peers to consider trimming
- ret = 0;
- while (ret == 0 && num_spawned()) {
- yield wait_for_child();
- collect_next(&ret);
- }
if (ret < 0) {
ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
- drain_all();
return set_cr_error(ret);
}
if (mdlog_info.realm_epoch > env.last_trim_epoch + 1) {
// delete any prior mdlog periods
- yield spawn(new PurgePeriodLogsCR(env.store, mdlog_info.realm_epoch,
- &env.last_trim_epoch), true);
+ yield call(new PurgePeriodLogsCR(env.store, mdlog_info.realm_epoch,
+ &env.last_trim_epoch));
} else {
ldout(cct, 10) << "mdlogs already purged through realm_epoch "
<< env.last_trim_epoch << dendl;
yield {
auto meta_mgr = env.store->meta_mgr;
auto mdlog = meta_mgr->get_log(env.current.get_period().get_id());
- spawn(new MetaPeerTrimShardCollectCR(env, mdlog), true);
+ call(new MetaPeerTrimShardCollectCR(env, mdlog));
// ignore any errors during purge/trim because we want to hold the lock open
}
}