From: Yehuda Sadeh Date: Fri, 23 Oct 2015 00:42:23 +0000 (-0700) Subject: rgw: guard metadata full/incremental sync X-Git-Tag: v10.1.0~354^2~309 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=899ff3dd40f3088b44582eb2ddbe1ca93bd2bf31;p=ceph.git rgw: guard metadata full/incremental sync Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index e3f546657060..51eeed410742 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -926,6 +926,9 @@ class RGWMetaSyncShardCR : public RGWCoroutine { boost::asio::coroutine incremental_cr; boost::asio::coroutine full_cr; + RGWContinuousLeaseCR *lease_cr; + bool lost_lock; + public: RGWMetaSyncShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, @@ -936,11 +939,15 @@ public: pool(_pool), shard_id(_shard_id), sync_marker(_marker), - marker_tracker(NULL), truncated(false), inc_lock("RGWMetaSyncShardCR::inc_lock") { + marker_tracker(NULL), truncated(false), inc_lock("RGWMetaSyncShardCR::inc_lock"), + lease_cr(NULL), lost_lock(false) { } ~RGWMetaSyncShardCR() { delete marker_tracker; + if (lease_cr) { + lease_cr->put(); + } } void set_marker_tracker(RGWMetaSyncShardMarkerTrack *mt) { @@ -949,13 +956,21 @@ public: } int operate() { + int r; while (true) { switch (sync_marker.state) { case rgw_meta_sync_marker::FullSync: - return full_sync(); + r = full_sync(); + if (r < 0) { + ldout(store->ctx(), 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl; + } + return 0; case rgw_meta_sync_marker::IncrementalSync: - return incremental_sync(); - break; + r = incremental_sync(); + if (r < 0) { + ldout(store->ctx(), 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl; + } + return 0; default: return set_state(RGWCoroutine_Error, -EIO); } @@ -970,10 +985,37 @@ public: int max_entries = OMAP_GET_MAX_ENTRIES; reenter(&full_cr) { oid = full_sync_index_shard_oid(shard_id); + /* grab lock */ + yield { + uint32_t lock_duration = 30; + string lock_name = "sync_lock"; + if (lease_cr) { + lease_cr->put(); + } + lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool, + RGWMetaSyncStatusManager::shard_obj_name(shard_id), + lock_name, lock_duration); + lease_cr->get(); + spawn(lease_cr, false); + lost_lock = false; + } + while (!lease_cr->is_locked()) { + if (lease_cr->is_done()) { + ldout(cct, 0) << "ERROR: lease cr failed, done early " << dendl; + return set_state(RGWCoroutine_Error, lease_cr->get_ret_status()); + } + yield; + } + /* prepare marker tracker */ set_marker_tracker(new RGWMetaSyncShardMarkerTrack(store, http_manager, async_rados, RGWMetaSyncStatusManager::shard_obj_name(shard_id), sync_marker)); + /* sync! */ do { + if (!lease_cr->is_locked()) { + lost_lock = true; + break; + } yield return call(new RGWRadosGetOmapKeysCR(store, pool, oid, sync_marker.marker, &entries, max_entries)); if (retcode < 0) { ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl; @@ -992,19 +1034,30 @@ public: } } while ((int)entries.size() == max_entries); - drain_all(); + drain_all_but(1); - yield { - /* update marker to reflect we're done with full sync */ - sync_marker.state = rgw_meta_sync_marker::IncrementalSync; - sync_marker.marker = sync_marker.next_step_marker; - sync_marker.next_step_marker.clear(); - call(new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, - RGWMetaSyncStatusManager::shard_obj_name(shard_id), sync_marker)); + if (!lost_lock) { + yield { + /* update marker to reflect we're done with full sync */ + sync_marker.state = rgw_meta_sync_marker::IncrementalSync; + sync_marker.marker = sync_marker.next_step_marker; + sync_marker.next_step_marker.clear(); + call(new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, + RGWMetaSyncStatusManager::shard_obj_name(shard_id), sync_marker)); + } + if (retcode < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl; + return set_state(RGWCoroutine_Error, retcode); + } } - if (retcode < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl; - return set_state(RGWCoroutine_Error, retcode); + + yield lease_cr->go_down(); + + lease_cr->put(); + lease_cr = NULL; + + if (lost_lock) { + return -EBUSY; } } return 0; @@ -1013,11 +1066,37 @@ public: int incremental_sync() { reenter(&incremental_cr) { + /* grab lock */ + yield { + uint32_t lock_duration = 30; + string lock_name = "sync_lock"; + if (lease_cr) { + lease_cr->put(); + } + lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool, + RGWMetaSyncStatusManager::shard_obj_name(shard_id), + lock_name, lock_duration); + lease_cr->get(); + spawn(lease_cr, false); + lost_lock = false; + } + while (!lease_cr->is_locked()) { + if (lease_cr->is_done()) { + ldout(cct, 0) << "ERROR: lease cr failed, done early " << dendl; + return set_state(RGWCoroutine_Error, lease_cr->get_ret_status()); + } + yield; + } mdlog_marker = sync_marker.marker; set_marker_tracker(new RGWMetaSyncShardMarkerTrack(store, http_manager, async_rados, RGWMetaSyncStatusManager::shard_obj_name(shard_id), sync_marker)); + /* inc sync */ do { + if (!lease_cr->is_locked()) { + lost_lock = true; + break; + } #define INCREMENTAL_MAX_ENTRIES 100 ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl; if (mdlog_marker <= sync_marker.marker) { @@ -1044,6 +1123,15 @@ public: yield wait(utime_t(INCREMENTAL_INTERVAL, 0)); } } while (true); + + yield lease_cr->go_down(); + + lease_cr->put(); + lease_cr = NULL; + + if (lost_lock) { + return -EBUSY; + } } /* TODO */ return 0;