~RGWInitSyncStatusCoroutine() {
if (lease_cr) {
+ lease_cr->abort();
lease_cr->put();
}
}
return set_cr_error(lease_cr->get_ret_status());
}
set_sleeping(true);
- yield return 0;
+ yield;
}
yield {
call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(async_rados, store, store->get_zone_params().log_pool,
return set_cr_error(lease_cr->get_ret_status());
}
set_sleeping(true);
- yield return 0;
+ yield;
}
entries_index = new RGWShardedOmapCRManager(async_rados, store, this, num_shards,
store->get_zone_params().log_pool, mdlog_sync_full_sync_index_prefix);
RGWContinuousLeaseCR *lease_cr;
bool lost_lock;
+ bool *reset_backoff;
+
public:
RGWMetaSyncShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
rgw_bucket& _pool,
- uint32_t _shard_id, rgw_meta_sync_marker& _marker) : RGWCoroutine(_store->ctx()), store(_store),
+ uint32_t _shard_id, rgw_meta_sync_marker& _marker,
+ bool *_reset_backoff) : RGWCoroutine(_store->ctx()), store(_store),
http_manager(_mgr),
async_rados(_async_rados),
pool(_pool),
shard_id(_shard_id),
sync_marker(_marker),
marker_tracker(NULL), truncated(false), inc_lock("RGWMetaSyncShardCR::inc_lock"),
- lease_cr(NULL), lost_lock(false) {
+ lease_cr(NULL), lost_lock(false), reset_backoff(_reset_backoff) {
+ *reset_backoff = false;
}
~RGWMetaSyncShardCR() {
delete marker_tracker;
if (lease_cr) {
+ lease_cr->abort();
lease_cr->put();
}
}
while (!lease_cr->is_locked()) {
if (lease_cr->is_done()) {
ldout(cct, 0) << "ERROR: lease cr failed, done early " << dendl;
+ drain_all();
return lease_cr->get_ret_status();
}
set_sleeping(true);
- yield return 0;
+ yield;
}
+
+ /* lock succeeded, a retry now should avoid previous backoff status */
+ *reset_backoff = true;
+
/* prepare marker tracker */
set_marker_tracker(new RGWMetaSyncShardMarkerTrack(store, http_manager, async_rados,
RGWMetaSyncStatusManager::shard_obj_name(shard_id),
lease_cr->put();
lease_cr = NULL;
+ drain_all();
+
if (lost_lock) {
return -EBUSY;
}
yield {
uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
string lock_name = "sync_lock";
- if (lease_cr) {
- lease_cr->put();
- }
lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool,
RGWMetaSyncStatusManager::shard_obj_name(shard_id),
lock_name, lock_duration, this);
while (!lease_cr->is_locked()) {
if (lease_cr->is_done()) {
ldout(cct, 0) << "ERROR: lease cr failed, done early " << dendl;
+ drain_all();
return lease_cr->get_ret_status();
}
set_sleeping(true);
- yield return 0;
+ yield;
}
+ *reset_backoff = true;
}
mdlog_marker = sync_marker.marker;
set_marker_tracker(new RGWMetaSyncShardMarkerTrack(store, http_manager, async_rados,
yield lease_cr->go_down();
- lease_cr->put();
- lease_cr = NULL;
+ drain_all();
if (lost_lock) {
return -EBUSY;
}
};
+class RGWMetaSyncShardControlCR : public RGWCoroutine {
+ RGWRados *store;
+ RGWHTTPManager *http_manager;
+ RGWAsyncRadosProcessor *async_rados;
+
+ rgw_bucket pool;
+
+ uint32_t shard_id;
+ rgw_meta_sync_marker sync_marker;
+
+ RGWObjectCtx obj_ctx;
+
+ RGWSyncBackoff backoff;
+ bool reset_backoff;
+
+public:
+ RGWMetaSyncShardControlCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+ rgw_bucket& _pool,
+ uint32_t _shard_id, rgw_meta_sync_marker& _marker) : RGWCoroutine(_store->ctx()), store(_store),
+ http_manager(_mgr),
+ async_rados(_async_rados),
+ pool(_pool),
+ shard_id(_shard_id),
+ sync_marker(_marker), obj_ctx(store), reset_backoff(false) {
+ }
+
+ int operate() {
+ reenter(this) {
+ while (true) {
+ yield {
+ call(new RGWMetaSyncShardCR(store, http_manager, async_rados, pool, shard_id, sync_marker, &reset_backoff));
+ }
+ if (retcode < 0 && retcode != -EBUSY) {
+ ldout(store->ctx(), 0) << "ERROR: RGWMetaSyncShardCR() returned " << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+ if (reset_backoff) {
+ backoff.reset();
+ }
+ yield backoff.backoff(this);
+ yield {
+ call(new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(async_rados, store, obj_ctx, store->get_zone_params().log_pool,
+ RGWMetaSyncStatusManager::shard_obj_name(shard_id), &sync_marker));
+ }
+ if (retcode < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to read sync state for metadata shard id=" << shard_id << " retcode=" << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+ }
+ }
+ return 0;
+ }
+};
+
class RGWMetaSyncCR : public RGWCoroutine {
RGWRados *store;
RGWHTTPManager *http_manager;
rgw_meta_sync_status sync_status;
- map<int, RGWMetaSyncShardCR *> shard_crs;
+ map<int, RGWMetaSyncShardControlCR *> shard_crs;
public:
uint32_t shard_id = iter->first;
rgw_meta_sync_marker marker;
- RGWMetaSyncShardCR *shard_cr = new RGWMetaSyncShardCR(store, http_manager, async_rados, store->get_zone_params().log_pool,
+ RGWMetaSyncShardControlCR *shard_cr = new RGWMetaSyncShardControlCR(store, http_manager, async_rados, store->get_zone_params().log_pool,
shard_id,
sync_status.sync_markers[shard_id]);
}
void wakeup(int shard_id) {
- map<int, RGWMetaSyncShardCR *>::iterator iter = shard_crs.find(shard_id);
+ map<int, RGWMetaSyncShardControlCR *>::iterator iter = shard_crs.find(shard_id);
if (iter == shard_crs.end()) {
return;
}