RGWRados *store;
RGWHTTPManager *http;
const int num_shards;
- const utime_t interval; //< polling interval
const std::string& zone; //< my zone id
std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer
std::vector<rgw_data_sync_marker> min_shard_markers; //< min marker per shard
- std::vector<std::string> last_trim; //< last trimmed marker per shard
+ std::vector<std::string>& last_trim; //< last trimmed marker per shard
int ret{0};
public:
DataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
- int num_shards, utime_t interval)
+ int num_shards, std::vector<std::string>& last_trim)
: RGWCoroutine(store->ctx()), store(store), http(http),
- num_shards(num_shards), interval(interval),
+ num_shards(num_shards),
zone(store->get_zone().id),
peer_status(store->zone_conn_map.size()),
min_shard_markers(num_shards),
- last_trim(num_shards)
+ last_trim(last_trim)
{}
int operate() override;
int DataLogTrimCR::operate()
{
reenter(this) {
- for (;;) {
- yield wait(interval);
+ ldout(cct, 10) << "fetching sync status for zone " << zone << dendl;
+ set_status("fetching sync status");
+ yield {
+ // query data sync status from each sync peer
+ rgw_http_param_pair params[] = {
+ { "type", "data" },
+ { "status", nullptr },
+ { "source-zone", zone.c_str() },
+ { nullptr, nullptr }
+ };
+
+ auto p = peer_status.begin();
+ for (auto& c : store->zone_conn_map) {
+ ldout(cct, 20) << "query sync status from " << c.first << dendl;
+ using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
+ spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
+ false);
+ ++p;
+ }
+ }
+
+ // must get a successful reply from all peers to consider trimming
+ ret = 0;
+ while (ret == 0 && num_spawned() > 0) {
+ yield wait_for_child();
+ collect_next(&ret);
+ }
+ drain_all();
- ldout(cct, 10) << "fetching sync status for zone " << zone << dendl;
- yield {
- // query data sync status from each sync peer
- rgw_http_param_pair params[] = {
- { "type", "data" },
- { "status", nullptr },
- { "source-zone", zone.c_str() },
- { nullptr, nullptr }
- };
-
- auto p = peer_status.begin();
- for (auto& c : store->zone_conn_map) {
- ldout(cct, 20) << "query sync status from " << c.first << dendl;
- using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
- spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
- false);
- ++p;
+ if (ret < 0) {
+ ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
+ return set_cr_error(ret);
+ }
+
+ ldout(cct, 10) << "trimming log shards" << dendl;
+ set_status("trimming log shards");
+ yield {
+ // determine the minimum marker for each shard
+ take_min_markers(peer_status.begin(), peer_status.end(),
+ min_shard_markers.begin());
+
+ for (int i = 0; i < num_shards; i++) {
+ const auto& m = min_shard_markers[i];
+ auto& stable = get_stable_marker(m);
+ if (stable <= last_trim[i]) {
+ continue;
}
+ ldout(cct, 10) << "trimming log shard " << i
+ << " at marker=" << stable
+ << " last_trim=" << last_trim[i] << dendl;
+ using TrimCR = LastTimelogTrimCR;
+ spawn(new TrimCR(store, store->data_log->get_oid(i),
+ stable, &last_trim[i]),
+ true);
}
+ }
+ return set_cr_done();
+ }
+ return 0;
+}
- // must get a successful reply from all peers to consider trimming
- ret = 0;
- while (ret == 0 && num_spawned()) {
- yield wait_for_child();
- collect_next(&ret);
- }
- if (ret < 0) {
- ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
- drain_all();
+class DataLogTrimPollCR : public RGWCoroutine {
+ RGWRados *store;
+ RGWHTTPManager *http;
+ const int num_shards;
+ const utime_t interval; //< polling interval
+ const std::string lock_oid; //< use first data log shard for lock
+ const std::string lock_cookie;
+ std::vector<std::string> last_trim; //< last trimmed marker per shard
+
+ public:
+ DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http,
+ int num_shards, utime_t interval)
+ : RGWCoroutine(store->ctx()), store(store), http(http),
+ num_shards(num_shards), interval(interval),
+ lock_oid(store->data_log->get_oid(0)),
+ lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
+ last_trim(num_shards)
+ {}
+
+ int operate();
+};
+
+int DataLogTrimPollCR::operate()
+{
+ reenter(this) {
+ for (;;) {
+ set_status("sleeping");
+ wait(interval);
+
+ // request a 'data_trim' lock that covers the entire wait interval to
+ // prevent other gateways from attempting to trim for the duration
+ set_status("acquiring trim lock");
+ yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
+ store->get_zone_params().log_pool,
+ lock_oid, "data_trim", lock_cookie,
+ interval.sec()));
+ if (retcode < 0) {
+ // if the lock is already held, go back to sleep and try again later
+ ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in "
+ << interval.sec() << "s" << dendl;
continue;
}
- ldout(cct, 10) << "trimming log shards" << dendl;
- yield {
- // determine the minimum marker for each shard
- take_min_markers(peer_status.begin(), peer_status.end(),
- min_shard_markers.begin());
-
- for (int i = 0; i < num_shards; i++) {
- const auto& m = min_shard_markers[i];
- auto& stable = get_stable_marker(m);
- if (stable <= last_trim[i]) {
- continue;
- }
- ldout(cct, 10) << "trimming log shard " << i
- << " at marker=" << stable
- << " last_trim=" << last_trim[i] << dendl;
- using TrimCR = LastTimelogTrimCR;
- spawn(new TrimCR(store, store->data_log->get_oid(i),
- stable, &last_trim[i]),
- true);
- }
- }
+ set_status("trimming");
+ yield call(new DataLogTrimCR(store, http, num_shards, last_trim));
+
+ // note that the lock is not released. this is intentional, as it avoids
+ // duplicating this work in other gateways
}
}
return 0;
RGWHTTPManager *http,
int num_shards, utime_t interval)
{
- return new DataLogTrimCR(store, http, num_shards, interval);
+ return new DataLogTrimPollCR(store, http, num_shards, interval);
}