From 0b5cdc31b2ae548189b46f5670c231d5fdc7bfb5 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 3 Dec 2015 14:42:17 -0800 Subject: [PATCH] rgw: take a lease before syncing bucket shard This is needed for having multiple rgws running on the same zone Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_data_sync.cc | 57 +++++++++++++++++++++++++++++++++++++--- src/rgw/rgw_sync.cc | 1 + 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 3d09d25eb2098..84703a1aeb5cf 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1935,6 +1935,9 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine { int total_entries; + RGWContinuousLeaseCR *lease_cr; + + string status_oid; public: RGWBucketShardFullSyncCR(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, RGWRESTConn *_conn, RGWRados *_store, @@ -1950,7 +1953,9 @@ public: full_marker(_full_marker), marker_tracker(NULL), spawn_window(BUCKET_SYNC_SPAWN_WINDOW), entry(NULL), op(CLS_RGW_OP_ADD), - total_entries(0) {} + total_entries(0), lease_cr(NULL) { + status_oid = RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id); + } ~RGWBucketShardFullSyncCR() { delete marker_tracker; @@ -1962,9 +1967,27 @@ int RGWBucketShardFullSyncCR::operate() { int ret; reenter(this) { + yield { + set_status("acquiring sync lock"); + uint32_t lock_duration = cct->_conf->rgw_sync_lease_period; + string lock_name = "sync_lock"; + lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool, status_oid, + lock_name, lock_duration, this); + lease_cr->get(); + spawn(lease_cr, false); + } + while (!lease_cr->is_locked()) { + if (lease_cr->is_done()) { + ldout(cct, 0) << "ERROR: lease cr failed, done early " << dendl; + set_status("lease lock failed, early abort"); + return set_cr_error(lease_cr->get_ret_status()); + } + set_sleeping(true); + yield; + } list_marker = full_marker.position; marker_tracker = new RGWBucketFullSyncShardMarkerTrack(store, async_rados, - RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id), + status_oid, full_marker); total_entries = full_marker.count; @@ -1973,6 +1996,7 @@ int RGWBucketShardFullSyncCR::operate() yield call(new RGWListBucketShardCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id, list_marker, &list_result)); if (retcode < 0 && retcode != -ENOENT) { + yield lease_cr->go_down(); drain_all(); return set_cr_error(retcode); } @@ -2016,6 +2040,7 @@ int RGWBucketShardFullSyncCR::operate() call(new RGWSimpleRadosWriteAttrsCR(async_rados, store, store->get_zone_params().log_pool, oid, attrs)); } + yield lease_cr->go_down(); if (retcode < 0) { ldout(store->ctx(), 0) << "ERROR: failed to set sync state on bucket " << bucket_name << ":" << bucket_id << ":" << shard_id << " retcode=" << retcode << dendl; @@ -2044,6 +2069,8 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine { RGWBucketIncSyncShardMarkerTrack *marker_tracker; int spawn_window; bool updated_status; + RGWContinuousLeaseCR *lease_cr; + string status_oid; public: @@ -2059,7 +2086,9 @@ public: bucket_id(_bucket_id), shard_id(_shard_id), bucket_info(_bucket_info), inc_marker(_inc_marker), entry(NULL), marker_tracker(NULL), - spawn_window(BUCKET_SYNC_SPAWN_WINDOW), updated_status(false) { + spawn_window(BUCKET_SYNC_SPAWN_WINDOW), updated_status(false), + lease_cr(NULL) { + status_oid = RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id); set_description() << "bucket shard incremental sync bucket=" << _bucket_name << ":" << _bucket_id << ":" << _shard_id; set_status("init"); } @@ -2074,8 +2103,26 @@ int RGWBucketShardIncrementalSyncCR::operate() { int ret; reenter(this) { + yield { + set_status("acquiring sync lock"); + uint32_t lock_duration = cct->_conf->rgw_sync_lease_period; + string lock_name = "sync_lock"; + lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool, status_oid, + lock_name, lock_duration, this); + lease_cr->get(); + spawn(lease_cr, false); + } + while (!lease_cr->is_locked()) { + if (lease_cr->is_done()) { + ldout(cct, 0) << "ERROR: lease cr failed, done early " << dendl; + set_status("lease lock failed, early abort"); + return set_cr_error(lease_cr->get_ret_status()); + } + set_sleeping(true); + yield; + } marker_tracker = new RGWBucketIncSyncShardMarkerTrack(store, async_rados, - RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id), + status_oid, inc_marker); do { ldout(store->ctx(), 20) << __func__ << "(): listing bilog for incremental sync" << dendl; @@ -2084,6 +2131,7 @@ int RGWBucketShardIncrementalSyncCR::operate() inc_marker.position, &list_result)); if (retcode < 0 && retcode != -ENOENT) { /* wait for all operations to complete */ + lease_cr->go_down(); drain_all(); return set_cr_error(retcode); } @@ -2147,6 +2195,7 @@ int RGWBucketShardIncrementalSyncCR::operate() } } while (!list_result.empty()); + lease_cr->go_down(); /* wait for all operations to complete */ drain_all(); return set_cr_done(); diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 7b1e980b205b5..6f2b3a0b2d72e 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -512,6 +512,7 @@ public: if (retcode < 0) { set_status("failed to write sync status"); ldout(cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl; + yield lease_cr->go_down(); return set_cr_error(retcode); } /* fetch current position in logs */ -- 2.39.5