From cd1ad88d917c27995ebeb60e008e47d75893af20 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 24 Sep 2015 13:54:58 -0700 Subject: [PATCH] rgw: move bucket shard full sync to its own coroutine Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_data_sync.cc | 200 ++++++++++++++++++++++++--------------- 1 file changed, 124 insertions(+), 76 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index fd66272fb6ca1..3f98825d3372f 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1072,17 +1072,16 @@ public: #define BUCKET_SYNC_SPAWN_WINDOW 20 -class RGWRunBucketSyncCoroutine : public RGWCoroutine { +class RGWBucketShardFullSyncCR : public RGWCoroutine { RGWHTTPManager *http_manager; RGWAsyncRadosProcessor *async_rados; RGWRESTConn *conn; RGWRados *store; - RGWObjectCtx& obj_ctx; string source_zone; string bucket_name; string bucket_id; - RGWBucketInfo bucket_info; int shard_id; + RGWBucketInfo *bucket_info; bucket_list_result list_result; list::iterator entries_iter; rgw_bucket_shard_sync_info sync_status; @@ -1092,29 +1091,132 @@ class RGWRunBucketSyncCoroutine : public RGWCoroutine { rgw_obj_key list_marker; public: - RGWRunBucketSyncCoroutine(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, - RGWRESTConn *_conn, RGWRados *_store, - RGWObjectCtx& _obj_ctx, const string& _source_zone, - const string& _bucket_name, const string _bucket_id, int _shard_id) : RGWCoroutine(_store->ctx()), + RGWBucketShardFullSyncCR(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, + RGWRESTConn *_conn, RGWRados *_store, + const string& _source_zone, + const string& _bucket_name, const string _bucket_id, int _shard_id, + RGWBucketInfo *_bucket_info) : RGWCoroutine(_store->ctx()), http_manager(_mgr), async_rados(_async_rados), conn(_conn), store(_store), - obj_ctx(_obj_ctx), source_zone(_source_zone), + source_zone(_source_zone), bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id), + bucket_info(_bucket_info), marker_tracker(NULL), spawn_window(BUCKET_SYNC_SPAWN_WINDOW), pending(0) {} - ~RGWRunBucketSyncCoroutine() { + ~RGWBucketShardFullSyncCR() { delete marker_tracker; } int operate(); }; +int RGWBucketShardFullSyncCR::operate() +{ + int ret; + reenter(this) { + list_marker = sync_status.full_marker.position; + marker_tracker = new RGWBucketFullSyncShardMarkerTrack(store, async_rados, + RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id), + sync_status.full_marker); + do { + yield { + ldout(store->ctx(), 20) << __func__ << "(): building full sync maps" << dendl; + int r = call(new RGWListBucketShardCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id, + list_marker, &list_result)); + if (r < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to call new CR (RGWListBucketShardCR)" << dendl; + return r; + } + } + if (retcode < 0 && retcode != -ENOENT) { + return set_state(RGWCoroutine_Error, retcode); + } + entries_iter = list_result.entries.begin(); + for (; entries_iter != list_result.entries.end(); ++entries_iter) { + ldout(store->ctx(), 20) << "[full sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << entries_iter->key << dendl; + yield { + bucket_list_entry& entry = *entries_iter; + ++pending; + marker_tracker->start(entry.key); + list_marker = entry.key; + spawn(new RGWBucketSyncSingleEntryCR(store, async_rados, source_zone, bucket_info, shard_id, + entry.key, entry.versioned_epoch, entry.key, marker_tracker), false); + } + while (pending > spawn_window && + collect(&ret)) { + --pending; + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: a sync operation returned error" << dendl; + /* we should have reported this error */ +#warning deal with error + } + yield; + } + } + } while (list_result.is_truncated); + /* wait for all operations to complete */ + while (collect(&ret)) { + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: a sync operation returned error" << dendl; + /* we should have reported this error */ +#warning deal with error + } + yield; + } + /* update sync state to incremental */ + yield { + sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync; + map attrs; + sync_status.encode_state_attr(attrs); + string oid = RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id); + int ret = call(new RGWSimpleRadosWriteAttrsCR(async_rados, store, store->get_zone_params().log_pool, + oid, attrs)); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to call RGWSimpleRadosWriteAttrsCR() oid=" << oid << dendl; + return set_state(RGWCoroutine_Error, ret); + } + } + if (retcode < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to set sync state on bucket " << bucket_name << ":" << bucket_id << ":" << shard_id + << " retcode=" << retcode << dendl; + return set_state(RGWCoroutine_Error, retcode); + } + } + return 0; +} + +class RGWRunBucketSyncCoroutine : public RGWCoroutine { + RGWHTTPManager *http_manager; + RGWAsyncRadosProcessor *async_rados; + RGWRESTConn *conn; + RGWRados *store; + RGWObjectCtx& obj_ctx; + string source_zone; + string bucket_name; + string bucket_id; + RGWBucketInfo bucket_info; + int shard_id; + rgw_bucket_shard_sync_info sync_status; + +public: + RGWRunBucketSyncCoroutine(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, + RGWRESTConn *_conn, RGWRados *_store, + RGWObjectCtx& _obj_ctx, const string& _source_zone, + const string& _bucket_name, const string _bucket_id, int _shard_id) : RGWCoroutine(_store->ctx()), + http_manager(_mgr), async_rados(_async_rados), conn(_conn), + store(_store), + obj_ctx(_obj_ctx), source_zone(_source_zone), + bucket_name(_bucket_name), + bucket_id(_bucket_id), shard_id(_shard_id) {} + + int operate(); +}; + int RGWRunBucketSyncCoroutine::operate() { reenter(this) { - int ret; yield { int r = call(new RGWReadBucketSyncStatusCoroutine(async_rados, store, obj_ctx, source_zone, bucket_name, bucket_id, shard_id, &sync_status)); if (r < 0) { @@ -1142,74 +1244,20 @@ int RGWRunBucketSyncCoroutine::operate() } if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) { - list_marker = sync_status.full_marker.position; - marker_tracker = new RGWBucketFullSyncShardMarkerTrack(store, async_rados, - RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id), - sync_status.full_marker); - do { - yield { - ldout(store->ctx(), 20) << __func__ << "(): building full sync maps" << dendl; - int r = call(new RGWListBucketShardCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id, - list_marker, &list_result)); - if (r < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to call new CR (RGWListBucketShardCR)" << dendl; - return r; - } - } - if (retcode < 0 && retcode != -ENOENT) { - return set_state(RGWCoroutine_Error, retcode); - } - entries_iter = list_result.entries.begin(); - for (; entries_iter != list_result.entries.end(); ++entries_iter) { - ldout(store->ctx(), 20) << "[full sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << entries_iter->key << dendl; - yield { - bucket_list_entry& entry = *entries_iter; - ++pending; - marker_tracker->start(entry.key); - list_marker = entry.key; - spawn(new RGWBucketSyncSingleEntryCR(store, async_rados, source_zone, &bucket_info, shard_id, - entry.key, entry.versioned_epoch, entry.key, marker_tracker), false); - } - while (pending > spawn_window && - collect(&ret)) { - --pending; - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: a sync operation returned error" << dendl; - /* we should have reported this error */ -#warning deal with error - } - yield; - } - } - } while (list_result.is_truncated); - /* wait for all operations to complete */ - while (collect(&ret)) { - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: a sync operation returned error" << dendl; - /* we should have reported this error */ -#warning deal with error - } - yield; - } - /* update sync state to incremental */ - yield { - sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync; - map attrs; - sync_status.encode_state_attr(attrs); - string oid = RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id); - int ret = call(new RGWSimpleRadosWriteAttrsCR(async_rados, store, store->get_zone_params().log_pool, - oid, attrs)); - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to call RGWSimpleRadosWriteAttrsCR() oid=" << oid << dendl; - return set_state(RGWCoroutine_Error, ret); - } - } - if (retcode < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to set sync state on bucket " << bucket_name << ":" << bucket_id << ":" << shard_id - << " retcode=" << retcode << dendl; - return set_state(RGWCoroutine_Error, retcode); + int r = call(new RGWBucketShardFullSyncCR(http_manager, async_rados, conn, store, + source_zone, bucket_name, bucket_id, shard_id, + &bucket_info)); + if (r < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status" << dendl; + return r; } } + + if (retcode < 0) { + ldout(store->ctx(), 0) << "ERROR: full sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl; + return set_state(RGWCoroutine_Error, retcode); + } + } return 0; -- 2.39.5