boost::asio::coroutine incremental_cr;
boost::asio::coroutine full_cr;
+ RGWContinuousLeaseCR *lease_cr;
+ bool lost_lock;
+
public:
RGWMetaSyncShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
pool(_pool),
shard_id(_shard_id),
sync_marker(_marker),
- marker_tracker(NULL), truncated(false), inc_lock("RGWMetaSyncShardCR::inc_lock") {
+ marker_tracker(NULL), truncated(false), inc_lock("RGWMetaSyncShardCR::inc_lock"),
+ lease_cr(NULL), lost_lock(false) {
}
~RGWMetaSyncShardCR() {
delete marker_tracker;
+ if (lease_cr) {
+ lease_cr->put();
+ }
}
void set_marker_tracker(RGWMetaSyncShardMarkerTrack *mt) {
}
int operate() {
+ int r;
while (true) {
switch (sync_marker.state) {
case rgw_meta_sync_marker::FullSync:
- return full_sync();
+ r = full_sync();
+ if (r < 0) {
+ ldout(store->ctx(), 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
+ }
+ return 0;
case rgw_meta_sync_marker::IncrementalSync:
- return incremental_sync();
- break;
+ r = incremental_sync();
+ if (r < 0) {
+ ldout(store->ctx(), 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
+ }
+ return 0;
default:
return set_state(RGWCoroutine_Error, -EIO);
}
int max_entries = OMAP_GET_MAX_ENTRIES;
reenter(&full_cr) {
oid = full_sync_index_shard_oid(shard_id);
+ /* grab lock */
+ yield {
+ uint32_t lock_duration = 30;
+ string lock_name = "sync_lock";
+ if (lease_cr) {
+ lease_cr->put();
+ }
+ lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool,
+ RGWMetaSyncStatusManager::shard_obj_name(shard_id),
+ lock_name, lock_duration);
+ lease_cr->get();
+ spawn(lease_cr, false);
+ lost_lock = false;
+ }
+ while (!lease_cr->is_locked()) {
+ if (lease_cr->is_done()) {
+ ldout(cct, 0) << "ERROR: lease cr failed, done early " << dendl;
+ return set_state(RGWCoroutine_Error, lease_cr->get_ret_status());
+ }
+ yield;
+ }
+ /* prepare marker tracker */
set_marker_tracker(new RGWMetaSyncShardMarkerTrack(store, http_manager, async_rados,
RGWMetaSyncStatusManager::shard_obj_name(shard_id),
sync_marker));
+ /* sync! */
do {
+ if (!lease_cr->is_locked()) {
+ lost_lock = true;
+ break;
+ }
yield return call(new RGWRadosGetOmapKeysCR(store, pool, oid, sync_marker.marker, &entries, max_entries));
if (retcode < 0) {
ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
}
} while ((int)entries.size() == max_entries);
- drain_all();
+ drain_all_but(1);
- yield {
- /* update marker to reflect we're done with full sync */
- sync_marker.state = rgw_meta_sync_marker::IncrementalSync;
- sync_marker.marker = sync_marker.next_step_marker;
- sync_marker.next_step_marker.clear();
- call(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
- RGWMetaSyncStatusManager::shard_obj_name(shard_id), sync_marker));
+ if (!lost_lock) {
+ yield {
+ /* update marker to reflect we're done with full sync */
+ sync_marker.state = rgw_meta_sync_marker::IncrementalSync;
+ sync_marker.marker = sync_marker.next_step_marker;
+ sync_marker.next_step_marker.clear();
+ call(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
+ RGWMetaSyncStatusManager::shard_obj_name(shard_id), sync_marker));
+ }
+ if (retcode < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
+ return set_state(RGWCoroutine_Error, retcode);
+ }
}
- if (retcode < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
- return set_state(RGWCoroutine_Error, retcode);
+
+ yield lease_cr->go_down();
+
+ lease_cr->put();
+ lease_cr = NULL;
+
+ if (lost_lock) {
+ return -EBUSY;
}
}
return 0;
int incremental_sync() {
reenter(&incremental_cr) {
+ /* grab lock */
+ yield {
+ uint32_t lock_duration = 30;
+ string lock_name = "sync_lock";
+ if (lease_cr) {
+ lease_cr->put();
+ }
+ lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool,
+ RGWMetaSyncStatusManager::shard_obj_name(shard_id),
+ lock_name, lock_duration);
+ lease_cr->get();
+ spawn(lease_cr, false);
+ lost_lock = false;
+ }
+ while (!lease_cr->is_locked()) {
+ if (lease_cr->is_done()) {
+ ldout(cct, 0) << "ERROR: lease cr failed, done early " << dendl;
+ return set_state(RGWCoroutine_Error, lease_cr->get_ret_status());
+ }
+ yield;
+ }
mdlog_marker = sync_marker.marker;
set_marker_tracker(new RGWMetaSyncShardMarkerTrack(store, http_manager, async_rados,
RGWMetaSyncStatusManager::shard_obj_name(shard_id),
sync_marker));
+ /* inc sync */
do {
+ if (!lease_cr->is_locked()) {
+ lost_lock = true;
+ break;
+ }
#define INCREMENTAL_MAX_ENTRIES 100
ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
if (mdlog_marker <= sync_marker.marker) {
yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
}
} while (true);
+
+ yield lease_cr->go_down();
+
+ lease_cr->put();
+ lease_cr = NULL;
+
+ if (lost_lock) {
+ return -EBUSY;
+ }
}
/* TODO */
return 0;