From 2d52ab31d5166b2b3f1cc6219ccc8863583ac9bd Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 25 Jan 2016 15:58:36 -0800 Subject: [PATCH] rgw: data sync env cleanup pass RGWDataSyncEnv container object instead of passing around multiple params. (incomplete) Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_data_sync.cc | 663 ++++++++++++++++----------------------- src/rgw/rgw_data_sync.h | 45 ++- 2 files changed, 300 insertions(+), 408 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 2507d31aeb3c7..887f174f890a6 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -57,23 +57,20 @@ void rgw_datalog_shard_data::decode_json(JSONObj *obj) { }; class RGWReadDataSyncStatusCoroutine : public RGWSimpleRadosReadCR { - RGWAsyncRadosProcessor *async_rados; - RGWRados *store; - RGWObjectCtx& obj_ctx; + RGWDataSyncEnv *sync_env; - string source_zone; + RGWObjectCtx& obj_ctx; rgw_data_sync_status *sync_status; public: - RGWReadDataSyncStatusCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, - RGWObjectCtx& _obj_ctx, const string& _source_zone, - rgw_data_sync_status *_status) : RGWSimpleRadosReadCR(_async_rados, _store, _obj_ctx, - _store->get_zone_params().log_pool, - RGWDataSyncStatusManager::sync_status_oid(_source_zone), + RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, RGWObjectCtx& _obj_ctx, + rgw_data_sync_status *_status) : RGWSimpleRadosReadCR(_sync_env->async_rados, _sync_env->store, _obj_ctx, + _sync_env->store->get_zone_params().log_pool, + RGWDataSyncStatusManager::sync_status_oid(_sync_env->source_zone), &_status->sync_info), - async_rados(_async_rados), store(_store), - obj_ctx(_obj_ctx), source_zone(_source_zone), + sync_env(_sync_env), + obj_ctx(_obj_ctx), sync_status(_status) {} int handle_data(rgw_data_sync_info& data); @@ -86,18 +83,16 @@ int RGWReadDataSyncStatusCoroutine::handle_data(rgw_data_sync_info& data) } map& markers = sync_status->sync_markers; + RGWRados *store = sync_env->store; for (int i = 0; i < (int)data.num_shards; i++) { - spawn(new RGWSimpleRadosReadCR(async_rados, store, obj_ctx, store->get_zone_params().log_pool, - RGWDataSyncStatusManager::shard_obj_name(source_zone, i), &markers[i]), true); + spawn(new RGWSimpleRadosReadCR(sync_env->async_rados, store, obj_ctx, store->get_zone_params().log_pool, + RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i), &markers[i]), true); } return 0; } class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine { - RGWRados *store; - RGWHTTPManager *http_manager; - RGWAsyncRadosProcessor *async_rados; - RGWRESTConn *conn; + RGWDataSyncEnv *sync_env; RGWRESTReadResource *http_op; @@ -105,11 +100,9 @@ class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine { RGWDataChangesLogInfo *shard_info; public: - RGWReadRemoteDataLogShardInfoCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, RGWRESTConn *_conn, - int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_store->ctx()), store(_store), - http_manager(_mgr), - async_rados(_async_rados), - conn(_conn), + RGWReadRemoteDataLogShardInfoCR(RGWDataSyncEnv *_sync_env, + int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), http_op(NULL), shard_id(_shard_id), shard_info(_shard_info) { @@ -133,13 +126,13 @@ public: string p = "/admin/log/"; - http_op = new RGWRESTReadResource(conn, p, pairs, NULL, http_manager); + http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager); http_op->set_user_info((void *)stack); int ret = http_op->aio_read(); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl; + ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl; log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl; return set_cr_error(ret); } @@ -173,10 +166,7 @@ struct read_remote_data_log_response { }; class RGWReadRemoteDataLogShardCR : public RGWCoroutine { - RGWRados *store; - RGWHTTPManager *http_manager; - RGWAsyncRadosProcessor *async_rados; - RGWRESTConn *conn; + RGWDataSyncEnv *sync_env; RGWRESTReadResource *http_op; @@ -188,11 +178,9 @@ class RGWReadRemoteDataLogShardCR : public RGWCoroutine { read_remote_data_log_response response; public: - RGWReadRemoteDataLogShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, RGWRESTConn *_conn, - int _shard_id, string *_pmarker, list *_entries, bool *_truncated) : RGWCoroutine(_store->ctx()), store(_store), - http_manager(_mgr), - async_rados(_async_rados), - conn(_conn), + RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env, + int _shard_id, string *_pmarker, list *_entries, bool *_truncated) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), http_op(NULL), shard_id(_shard_id), pmarker(_pmarker), @@ -216,13 +204,13 @@ public: string p = "/admin/log/"; - http_op = new RGWRESTReadResource(conn, p, pairs, NULL, http_manager); + http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager); http_op->set_user_info((void *)stack); int ret = http_op->aio_read(); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl; + ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl; log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl; return set_cr_error(ret); } @@ -246,11 +234,10 @@ public: }; class RGWInitDataSyncStatusCoroutine : public RGWCoroutine { - RGWAsyncRadosProcessor *async_rados; + RGWDataSyncEnv *sync_env; + RGWRados *store; - RGWHTTPManager *http_manager; RGWObjectCtx& obj_ctx; - string source_zone; string sync_status_oid; @@ -259,10 +246,10 @@ class RGWInitDataSyncStatusCoroutine : public RGWCoroutine { rgw_data_sync_info status; map shards_info; public: - RGWInitDataSyncStatusCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWHTTPManager *_http_mgr, - RGWObjectCtx& _obj_ctx, const string& _source_zone, uint32_t _num_shards) : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store), - http_manager(_http_mgr), - obj_ctx(_obj_ctx), source_zone(_source_zone) { + RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, + RGWObjectCtx& _obj_ctx, uint32_t _num_shards) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), store(sync_env->store), + obj_ctx(_obj_ctx) { lock_name = "sync_lock"; status.num_shards = _num_shards; @@ -272,7 +259,7 @@ public: gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1); string cookie = buf; - sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(source_zone); + sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone); } int operate() { @@ -280,7 +267,7 @@ public: reenter(this) { yield { uint32_t lock_duration = 30; - call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid, + call(new RGWSimpleRadosLockCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid, lock_name, cookie, lock_duration)); if (retcode < 0) { ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl; @@ -288,12 +275,12 @@ public: } } yield { - call(new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, + call(new RGWSimpleRadosWriteCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid, status)); } yield { /* take lock again, we just recreated the object */ uint32_t lock_duration = 30; - call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid, + call(new RGWSimpleRadosLockCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid, lock_name, cookie, lock_duration)); if (retcode < 0) { ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl; @@ -302,13 +289,13 @@ public: } /* fetch current position in logs */ yield { - RGWRESTConn *conn = store->get_zone_conn_by_id(source_zone); + RGWRESTConn *conn = store->get_zone_conn_by_id(sync_env->source_zone); if (!conn) { - ldout(cct, 0) << "ERROR: connection to zone " << source_zone << " does not exist!" << dendl; + ldout(cct, 0) << "ERROR: connection to zone " << sync_env->source_zone << " does not exist!" << dendl; return set_cr_error(-EIO); } for (int i = 0; i < (int)status.num_shards; i++) { - spawn(new RGWReadRemoteDataLogShardInfoCR(store, http_manager, async_rados, conn, i, &shards_info[i]), true); + spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true); } } while (collect(&ret)) { @@ -323,17 +310,17 @@ public: RGWDataChangesLogInfo& info = shards_info[i]; marker.next_step_marker = info.marker; marker.timestamp = info.last_update; - spawn(new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, - RGWDataSyncStatusManager::shard_obj_name(source_zone, i), marker), true); + spawn(new RGWSimpleRadosWriteCR(sync_env->async_rados, store, store->get_zone_params().log_pool, + RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i), marker), true); } } yield { status.state = rgw_data_sync_info::StateBuildingFullSyncMaps; - call(new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, + call(new RGWSimpleRadosWriteCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid, status)); } yield { /* unlock */ - call(new RGWSimpleRadosUnlockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid, + call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid, lock_name, cookie)); } while (collect(&ret)) { @@ -353,7 +340,7 @@ int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info) rgw_http_param_pair pairs[] = { { "type", "data" }, { NULL, NULL } }; - int ret = conn->get_json_resource("/admin/log", pairs, *log_info); + int ret = sync_env.conn->get_json_resource("/admin/log", pairs, *log_info); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl; return ret; @@ -370,8 +357,7 @@ int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn) return 0; } - source_zone = _source_zone; - conn = _conn; + sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, NULL /* error_logger */, _source_zone); int ret = http_manager.set_threaded(); if (ret < 0) { @@ -400,7 +386,7 @@ int RGWRemoteDataLog::get_shard_info(int shard_id) { NULL, NULL } }; RGWDataChangesLogInfo info; - int ret = conn->get_json_resource("/admin/log", pairs, info); + int ret = sync_env.conn->get_json_resource("/admin/log", pairs, info); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl; return ret; @@ -414,7 +400,7 @@ int RGWRemoteDataLog::get_shard_info(int shard_id) int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status) { RGWObjectCtx obj_ctx(store, NULL); - int r = run(new RGWReadDataSyncStatusCoroutine(async_rados, store, obj_ctx, source_zone, sync_status)); + int r = run(new RGWReadDataSyncStatusCoroutine(&sync_env, obj_ctx, sync_status)); if (r == -ENOENT) { r = 0; } @@ -424,7 +410,7 @@ int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status) int RGWRemoteDataLog::init_sync_status(int num_shards) { RGWObjectCtx obj_ctx(store, NULL); - return run(new RGWInitDataSyncStatusCoroutine(async_rados, store, &http_manager, obj_ctx, source_zone, num_shards)); + return run(new RGWInitDataSyncStatusCoroutine(&sync_env, obj_ctx, num_shards)); } static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id) @@ -451,12 +437,10 @@ struct bucket_instance_meta_info { }; class RGWListBucketIndexesCR : public RGWCoroutine { + RGWDataSyncEnv *sync_env; + RGWRados *store; - RGWHTTPManager *http_manager; - RGWAsyncRadosProcessor *async_rados; - RGWRESTConn *conn; - string source_zone; rgw_data_sync_status *sync_status; int num_shards; @@ -478,16 +462,11 @@ class RGWListBucketIndexesCR : public RGWCoroutine { bool failed; public: - RGWListBucketIndexesCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, - RGWRESTConn *_conn, - const string& _source_zone, - rgw_data_sync_status *_sync_status) : RGWCoroutine(_store->ctx()), store(_store), - http_manager(_mgr), - async_rados(_async_rados), - conn(_conn), source_zone(_source_zone), - sync_status(_sync_status), + RGWListBucketIndexesCR(RGWDataSyncEnv *_sync_env, + rgw_data_sync_status *_sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + store(sync_env->store), sync_status(_sync_status), req_ret(0), entries_index(NULL), i(0), failed(false) { - oid_prefix = datalog_sync_full_sync_index_prefix + "." + source_zone; + oid_prefix = datalog_sync_full_sync_index_prefix + "." + sync_env->source_zone; path = "/admin/metadata/bucket.instance"; num_shards = sync_status->sync_info.num_shards; } @@ -497,21 +476,21 @@ public: int operate() { reenter(this) { - entries_index = new RGWShardedOmapCRManager(async_rados, store, this, num_shards, + entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards, store->get_zone_params().log_pool, oid_prefix); yield { string entrypoint = string("/admin/metadata/bucket.instance"); #warning need a better scaling solution here, requires streaming output - call(new RGWReadRESTResourceCR >(store->ctx(), conn, http_manager, + call(new RGWReadRESTResourceCR >(store->ctx(), sync_env->conn, sync_env->http_manager, entrypoint, NULL, &result)); } if (get_ret_status() < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl; + ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl; return set_state(RGWCoroutine_Error); } for (iter = result.begin(); iter != result.end(); ++iter) { - ldout(store->ctx(), 20) << "list metadata: section=bucket.index key=" << *iter << dendl; + ldout(sync_env->cct, 20) << "list metadata: section=bucket.index key=" << *iter << dendl; key = *iter; @@ -519,7 +498,7 @@ public: rgw_http_param_pair pairs[] = { { "key", key.c_str() }, { NULL, NULL } }; - call(new RGWReadRESTResourceCR(store->ctx(), conn, http_manager, path, pairs, &meta_info)); + call(new RGWReadRESTResourceCR(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info)); } num_shards = meta_info.data.get_bucket_info().num_shards; @@ -545,8 +524,8 @@ public: int shard_id = (int)iter->first; rgw_data_sync_marker& marker = iter->second; marker.total_entries = entries_index->get_total_entries(shard_id); - spawn(new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, - RGWDataSyncStatusManager::shard_obj_name(source_zone, shard_id), marker), true); + spawn(new RGWSimpleRadosWriteCR(sync_env->async_rados, store, store->get_zone_params().log_pool, + RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id), marker), true); } } int ret; @@ -565,8 +544,7 @@ public: #define DATA_SYNC_UPDATE_MARKER_WINDOW 1 class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { - RGWRados *store; - RGWAsyncRadosProcessor *async_rados; + RGWDataSyncEnv *sync_env; string marker_oid; rgw_data_sync_marker sync_marker; @@ -585,11 +563,10 @@ class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrackctx(), 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl; - return new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, + ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl; + RGWRados *store = sync_env->store; + + return new RGWSimpleRadosWriteCR(sync_env->async_rados, store, store->get_zone_params().log_pool, marker_oid, sync_marker); } @@ -621,26 +600,18 @@ public: }; class RGWRunBucketSyncCoroutine : public RGWCoroutine { - RGWHTTPManager *http_manager; - RGWAsyncRadosProcessor *async_rados; - RGWRESTConn *conn; - RGWRados *store; - string source_zone; + RGWDataSyncEnv *sync_env; string bucket_name; string bucket_id; RGWBucketInfo bucket_info; int shard_id; rgw_bucket_shard_sync_info sync_status; - RGWMetaSyncEnv sync_env; + RGWMetaSyncEnv meta_sync_env; public: - RGWRunBucketSyncCoroutine(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, - RGWRESTConn *_conn, RGWRados *_store, - const string& _source_zone, - const string& _bucket_name, const string _bucket_id, int _shard_id) : RGWCoroutine(_store->ctx()), - http_manager(_mgr), async_rados(_async_rados), conn(_conn), - store(_store), - source_zone(_source_zone), + RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, + const string& _bucket_name, const string _bucket_id, int _shard_id) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id) {} @@ -669,12 +640,7 @@ static int parse_bucket_shard(CephContext *cct, const string& raw_key, string *b } class RGWDataSyncSingleEntryCR : public RGWCoroutine { - RGWRados *store; - RGWHTTPManager *http_manager; - RGWAsyncRadosProcessor *async_rados; - - RGWRESTConn *conn; - string source_zone; + RGWDataSyncEnv *sync_env; string raw_key; string entry_marker; @@ -689,16 +655,13 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine { RGWDataSyncShardMarkerTrack *marker_tracker; public: - RGWDataSyncSingleEntryCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, - RGWRESTConn *_conn, const string& _source_zone, - const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_store->ctx()), store(_store), - http_manager(_mgr), - async_rados(_async_rados), - conn(_conn), source_zone(_source_zone), + RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env, + const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), raw_key(_raw_key), entry_marker(_entry_marker), sync_status(0), marker_tracker(_marker_tracker) { - set_description() << "data sync single entry (source_zone=" << source_zone << ") key=" <<_raw_key << " entry=" << entry_marker; + set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker; } int operate() { @@ -706,12 +669,12 @@ public: do { yield { int shard_id; - int ret = parse_bucket_shard(store->ctx(), raw_key, &bucket_name, &bucket_instance, &shard_id); + int ret = parse_bucket_shard(sync_env->cct, raw_key, &bucket_name, &bucket_instance, &shard_id); if (ret < 0) { return set_cr_error(-EIO); } marker_tracker->reset_need_retry(raw_key); - call(new RGWRunBucketSyncCoroutine(http_manager, async_rados, conn, store, source_zone, bucket_name, bucket_instance, shard_id)); + call(new RGWRunBucketSyncCoroutine(sync_env, bucket_name, bucket_instance, shard_id)); } } while (marker_tracker->need_retry(raw_key)); @@ -736,14 +699,10 @@ public: #define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20 class RGWDataSyncShardCR : public RGWCoroutine { - RGWRados *store; - RGWHTTPManager *http_manager; - RGWAsyncRadosProcessor *async_rados; - RGWRESTConn *conn; + RGWDataSyncEnv *sync_env; rgw_bucket pool; - string source_zone; uint32_t shard_id; rgw_data_sync_marker sync_marker; @@ -784,21 +743,18 @@ class RGWDataSyncShardCR : public RGWCoroutine { RGWContinuousLeaseCR *lease_cr; string status_oid; public: - RGWDataSyncShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, - RGWRESTConn *_conn, rgw_bucket& _pool, const string& _source_zone, - uint32_t _shard_id, rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_store->ctx()), store(_store), - http_manager(_mgr), - async_rados(_async_rados), - conn(_conn), + RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env, + rgw_bucket& _pool, + uint32_t _shard_id, rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), pool(_pool), - source_zone(_source_zone), shard_id(_shard_id), sync_marker(_marker), marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"), total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL), lease_cr(NULL) { - set_description() << "data sync shard source_zone=" << source_zone << " shard_id=" << shard_id; - status_oid = RGWDataSyncStatusManager::shard_obj_name(source_zone, shard_id); + set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id; + status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id); } ~RGWDataSyncShardCR() { @@ -852,7 +808,8 @@ public: lease_cr->abort(); lease_cr->put(); } - lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool, status_oid, + RGWRados *store = sync_env->store; + lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid, lock_name, lock_duration, this); lease_cr->get(); spawn(lease_cr, false); @@ -872,28 +829,26 @@ public: set_sleeping(true); yield; } - oid = full_data_sync_index_shard_oid(source_zone, shard_id); - set_marker_tracker(new RGWDataSyncShardMarkerTrack(store, http_manager, async_rados, - status_oid, - sync_marker)); + oid = full_data_sync_index_shard_oid(sync_env->source_zone, shard_id); + set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker)); total_entries = sync_marker.pos; do { - yield call(new RGWRadosGetOmapKeysCR(store, pool, oid, sync_marker.marker, &entries, max_entries)); + yield call(new RGWRadosGetOmapKeysCR(sync_env->store, pool, oid, sync_marker.marker, &entries, max_entries)); if (retcode < 0) { - ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl; + ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl; lease_cr->go_down(); drain_all(); return set_cr_error(retcode); } iter = entries.begin(); for (; iter != entries.end(); ++iter) { - ldout(store->ctx(), 20) << __func__ << ": full sync: " << iter->first << dendl; + ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl; total_entries++; if (!marker_tracker->start(iter->first, total_entries, utime_t())) { - ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl; + ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl; } else { // fetch remote and write locally - yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, iter->first, iter->first, marker_tracker), false); + yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker), false); if (retcode < 0) { lease_cr->go_down(); drain_all(); @@ -912,11 +867,12 @@ public: sync_marker.state = rgw_data_sync_marker::IncrementalSync; sync_marker.marker = sync_marker.next_step_marker; sync_marker.next_step_marker.clear(); - call(new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, + RGWRados *store = sync_env->store; + call(new RGWSimpleRadosWriteCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid, sync_marker)); } if (retcode < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl; + ldout(sync_env->cct, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl; lease_cr->go_down(); return set_cr_error(retcode); } @@ -937,9 +893,7 @@ public: yield; } set_status("lease acquired"); - set_marker_tracker(new RGWDataSyncShardMarkerTrack(store, http_manager, async_rados, - status_oid, - sync_marker)); + set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker)); do { current_modified.clear(); inc_lock.Lock(); @@ -949,40 +903,40 @@ public: /* process out of band updates */ for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) { yield { - ldout(store->ctx(), 20) << __func__ << "(): async update notification: " << *modified_iter << dendl; - spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, *modified_iter, string(), marker_tracker), false); + ldout(sync_env->cct, 20) << __func__ << "(): async update notification: " << *modified_iter << dendl; + spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker), false); } } - yield call(new RGWReadRemoteDataLogShardInfoCR(store, http_manager, async_rados, conn, shard_id, &shard_info)); + yield call(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &shard_info)); if (retcode < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to fetch remote data log info: ret=" << retcode << dendl; + ldout(sync_env->cct, 0) << "ERROR: failed to fetch remote data log info: ret=" << retcode << dendl; lease_cr->go_down(); drain_all(); return set_cr_error(retcode); } datalog_marker = shard_info.marker; #define INCREMENTAL_MAX_ENTRIES 100 - ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl; + ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl; if (datalog_marker > sync_marker.marker) { spawned_keys.clear(); - yield call(new RGWReadRemoteDataLogShardCR(store, http_manager, async_rados, conn, shard_id, &sync_marker.marker, &log_entries, &truncated)); + yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated)); for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) { - ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl; + ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl; if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) { - ldout(store->ctx(), 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl; + ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl; marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp); continue; } if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) { - ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl; + ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl; } else { /* * don't spawn the same key more than once. We can do that as long as we don't yield */ if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) { spawned_keys.insert(log_iter->entry.key); - spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, log_iter->entry.key, log_iter->log_id, marker_tracker), false); + spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker), false); if (retcode < 0) { lease_cr->go_down(); drain_all(); @@ -997,7 +951,7 @@ public: int ret; while (collect(&ret)) { if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: a sync operation returned error" << dendl; + ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl; /* we should have reported this error */ #warning deal with error } @@ -1005,7 +959,7 @@ public: } } } - ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl; + ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl; if (datalog_marker == sync_marker.marker) { #define INCREMENTAL_INTERVAL 20 yield wait(utime_t(INCREMENTAL_INTERVAL, 0)); @@ -1017,32 +971,24 @@ public: }; class RGWDataSyncShardControlCR : public RGWBackoffControlCR { - RGWRados *store; - RGWHTTPManager *http_manager; - RGWAsyncRadosProcessor *async_rados; - RGWRESTConn *conn; + RGWDataSyncEnv *sync_env; rgw_bucket pool; - string source_zone; uint32_t shard_id; rgw_data_sync_marker sync_marker; public: - RGWDataSyncShardControlCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, - RGWRESTConn *_conn, rgw_bucket& _pool, const string& _source_zone, - uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWBackoffControlCR(_store->ctx()), store(_store), - http_manager(_mgr), - async_rados(_async_rados), - conn(_conn), + RGWDataSyncShardControlCR(RGWDataSyncEnv *_sync_env, rgw_bucket& _pool, + uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWBackoffControlCR(_sync_env->cct), + sync_env(_sync_env), pool(_pool), - source_zone(_source_zone), shard_id(_shard_id), sync_marker(_marker) { } RGWCoroutine *alloc_cr() { - return new RGWDataSyncShardCR(store, http_manager, async_rados, conn, pool, source_zone, shard_id, sync_marker, backoff_ptr()); + return new RGWDataSyncShardCR(sync_env, pool, shard_id, sync_marker, backoff_ptr()); } void append_modified_shards(set& keys) { @@ -1058,11 +1004,7 @@ public: }; class RGWDataSyncCR : public RGWCoroutine { - RGWRados *store; - RGWHTTPManager *http_manager; - RGWAsyncRadosProcessor *async_rados; - RGWRESTConn *conn; - string source_zone; + RGWDataSyncEnv *sync_env; uint32_t num_shards; RGWObjectCtx obj_ctx; @@ -1077,14 +1019,10 @@ class RGWDataSyncCR : public RGWCoroutine { bool *reset_backoff; public: - RGWDataSyncCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, - RGWRESTConn *_conn, const string& _source_zone, uint32_t _num_shards, bool *_reset_backoff) : RGWCoroutine(_store->ctx()), store(_store), - http_manager(_mgr), - async_rados(_async_rados), - conn(_conn), - source_zone(_source_zone), + RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), num_shards(_num_shards), - obj_ctx(store), + obj_ctx(sync_env->store), marker_tracker(NULL), shard_crs_lock("RGWDataSyncCR::shard_crs_lock"), reset_backoff(_reset_backoff) { @@ -1094,21 +1032,21 @@ public: reenter(this) { /* read sync status */ - yield call(new RGWReadDataSyncStatusCoroutine(async_rados, store, obj_ctx, source_zone, &sync_status)); + yield call(new RGWReadDataSyncStatusCoroutine(sync_env, obj_ctx, &sync_status)); if (retcode == -ENOENT) { sync_status.sync_info.num_shards = num_shards; } else if (retcode < 0 && retcode != -ENOENT) { - ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status, retcode=" << retcode << dendl; + ldout(sync_env->cct, 0) << "ERROR: failed to fetch sync status, retcode=" << retcode << dendl; return set_cr_error(retcode); } /* state: init status */ if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) { - ldout(store->ctx(), 20) << __func__ << "(): init" << dendl; - yield call(new RGWInitDataSyncStatusCoroutine(async_rados, store, http_manager, obj_ctx, source_zone, sync_status.sync_info.num_shards)); + ldout(sync_env->cct, 20) << __func__ << "(): init" << dendl; + yield call(new RGWInitDataSyncStatusCoroutine(sync_env, obj_ctx, sync_status.sync_info.num_shards)); if (retcode < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl; + ldout(sync_env->cct, 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl; return set_cr_error(retcode); } sync_status.sync_info.num_shards = num_shards; @@ -1117,7 +1055,7 @@ public: yield call(set_sync_info_cr()); if (retcode < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl; + ldout(sync_env->cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl; return set_cr_error(retcode); } @@ -1126,14 +1064,14 @@ public: if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) { /* state: building full sync maps */ - ldout(store->ctx(), 20) << __func__ << "(): building full sync maps" << dendl; - yield call(new RGWListBucketIndexesCR(store, http_manager, async_rados, conn, source_zone, &sync_status)); + ldout(sync_env->cct, 20) << __func__ << "(): building full sync maps" << dendl; + yield call(new RGWListBucketIndexesCR(sync_env, &sync_status)); sync_status.sync_info.state = rgw_data_sync_info::StateSync; /* update new state */ yield call(set_sync_info_cr()); if (retcode < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl; + ldout(sync_env->cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl; return set_cr_error(retcode); } @@ -1145,8 +1083,7 @@ public: case rgw_data_sync_info::StateSync: for (map::iterator iter = sync_status.sync_markers.begin(); iter != sync_status.sync_markers.end(); ++iter) { - RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(store, http_manager, async_rados, - conn, store->get_zone_params().log_pool, source_zone, + RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->get_zone_params().log_pool, iter->first, iter->second); shard_crs_lock.Lock(); shard_crs[iter->first] = cr; @@ -1162,8 +1099,9 @@ public: } RGWCoroutine *set_sync_info_cr() { - return new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, - RGWDataSyncStatusManager::sync_status_oid(source_zone), + RGWRados *store = sync_env->store; + return new RGWSimpleRadosWriteCR(sync_env->async_rados, store, store->get_zone_params().log_pool, + RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone), sync_status.sync_info); } @@ -1180,24 +1118,16 @@ public: class RGWDataSyncControlCR : public RGWBackoffControlCR { - RGWRados *store; - RGWHTTPManager *http_manager; - RGWAsyncRadosProcessor *async_rados; - RGWRESTConn *conn; - string source_zone; + RGWDataSyncEnv *sync_env; uint32_t num_shards; public: - RGWDataSyncControlCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, - RGWRESTConn *_conn, const string& _source_zone, uint32_t _num_shards) : RGWBackoffControlCR(_store->ctx()), store(_store), - http_manager(_mgr), - async_rados(_async_rados), - conn(_conn), - source_zone(_source_zone), num_shards(_num_shards) { + RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards) : RGWBackoffControlCR(_sync_env->cct), + sync_env(_sync_env), num_shards(_num_shards) { } RGWCoroutine *alloc_cr() { - return new RGWDataSyncCR(store, http_manager, async_rados, conn, source_zone, num_shards, backoff_ptr()); + return new RGWDataSyncCR(sync_env, num_shards, backoff_ptr()); } void wakeup(int shard_id, set& keys) { @@ -1222,14 +1152,14 @@ int RGWRemoteDataLog::run_sync(int num_shards, rgw_data_sync_status& sync_status { RGWObjectCtx obj_ctx(store, NULL); - int r = run(new RGWReadDataSyncStatusCoroutine(async_rados, store, obj_ctx, source_zone, &sync_status)); + int r = run(new RGWReadDataSyncStatusCoroutine(&sync_env, obj_ctx, &sync_status)); if (r < 0 && r != -ENOENT) { - ldout(store->ctx(), 0) << "ERROR: failed to read sync status from source_zone=" << source_zone << " r=" << r << dendl; + ldout(store->ctx(), 0) << "ERROR: failed to read sync status from source_zone=" << sync_env.source_zone << " r=" << r << dendl; return r; } lock.get_write(); - data_sync_cr = new RGWDataSyncControlCR(store, &http_manager, async_rados, conn, source_zone, num_shards); + data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards); lock.unlock(); r = run(data_sync_cr); if (r < 0) { @@ -1309,6 +1239,8 @@ int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn, con bucket_id = _bucket_id; shard_id = _shard_id; + sync_env.init(store->ctx(), store, conn, async_rados, http_manager, error_logger, source_zone); + return 0; } @@ -1325,11 +1257,7 @@ struct bucket_index_marker_info { }; class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine { - RGWRados *store; - RGWHTTPManager *http_manager; - RGWAsyncRadosProcessor *async_rados; - - RGWRESTConn *conn; + RGWDataSyncEnv *sync_env; string bucket_name; string bucket_id; @@ -1340,13 +1268,9 @@ class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine { bucket_index_marker_info *info; public: - RGWReadRemoteBucketIndexLogInfoCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, - RGWRESTConn *_conn, + RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env, const string& _bucket_name, const string& _bucket_id, int _shard_id, - bucket_index_marker_info *_info) : RGWCoroutine(_store->ctx()), store(_store), - http_manager(_mgr), - async_rados(_async_rados), - conn(_conn), + bucket_index_marker_info *_info) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id), info(_info) { instance_key = bucket_name + ":" + bucket_id; @@ -1366,7 +1290,7 @@ public: { NULL, NULL } }; string p = "/admin/log/"; - call(new RGWReadRESTResourceCR(store->ctx(), conn, http_manager, p, pairs, info)); + call(new RGWReadRESTResourceCR(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info)); } if (retcode < 0) { return set_cr_error(retcode); @@ -1377,26 +1301,10 @@ public: } }; -class RGWReadBucketShardSyncStatusCR : public RGWSimpleRadosReadCR { - map attrs; -public: - RGWReadBucketShardSyncStatusCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store, - RGWObjectCtx& obj_ctx, const string& source_zone, - const string& bucket_name, const string bucket_id, int shard_id, - rgw_bucket_shard_sync_info *status) : RGWSimpleRadosReadCR(async_rados, store, obj_ctx, - store->get_zone_params().log_pool, - RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id), - status) {} - -}; - - class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine { - RGWAsyncRadosProcessor *async_rados; + RGWDataSyncEnv *sync_env; RGWRados *store; - RGWHTTPManager *http_manager; - string source_zone; - RGWRESTConn *conn; + string bucket_name; string bucket_id; int shard_id; @@ -1409,12 +1317,10 @@ class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine { bucket_index_marker_info info; public: - RGWInitBucketShardSyncStatusCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWHTTPManager *_http_mgr, - const string& _source_zone, RGWRESTConn *_conn, - const string& _bucket_name, const string& _bucket_id, int _shard_id) : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store), - http_manager(_http_mgr), - source_zone(_source_zone), conn(_conn), + RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, + const string& _bucket_name, const string& _bucket_id, int _shard_id) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id) { + store = sync_env->store; lock_name = "sync_lock"; #define COOKIE_LEN 16 @@ -1423,25 +1329,25 @@ public: gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1); string cookie = buf; - sync_status_oid = RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id); + sync_status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id); } int operate() { reenter(this) { yield { uint32_t lock_duration = 30; - call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid, + call(new RGWSimpleRadosLockCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid, lock_name, cookie, lock_duration)); if (retcode < 0) { ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl; return set_cr_error(retcode); } } - yield call(new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, + yield call(new RGWSimpleRadosWriteCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid, status)); yield { /* take lock again, we just recreated the object */ uint32_t lock_duration = 30; - call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid, + call(new RGWSimpleRadosLockCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid, lock_name, cookie, lock_duration)); if (retcode < 0) { ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl; @@ -1449,7 +1355,7 @@ public: } } /* fetch current position in logs */ - yield call(new RGWReadRemoteBucketIndexLogInfoCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id, &info)); + yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bucket_name, bucket_id, shard_id, &info)); if (retcode < 0 && retcode != -ENOENT) { ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl; return set_cr_error(retcode); @@ -1459,11 +1365,11 @@ public: status.inc_marker.position = info.max_marker; map attrs; status.encode_all_attrs(attrs); - call(new RGWSimpleRadosWriteAttrsCR(async_rados, store, store->get_zone_params().log_pool, + call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid, attrs)); } yield { /* unlock */ - call(new RGWSimpleRadosUnlockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid, + call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid, lock_name, cookie)); } return set_cr_done(); @@ -1474,8 +1380,8 @@ public: RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr() { - return new RGWInitBucketShardSyncStatusCoroutine(async_rados, store, http_manager, source_zone, - conn, bucket_name, bucket_id, shard_id); + return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, + bucket_name, bucket_id, shard_id); } template @@ -1525,22 +1431,19 @@ void rgw_bucket_shard_inc_sync_marker::encode_attr(map& attr } class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine { - RGWAsyncRadosProcessor *async_rados; - RGWRados *store; + RGWDataSyncEnv *sync_env; RGWObjectCtx obj_ctx; string oid; rgw_bucket_shard_sync_info *status; map attrs; public: - RGWReadBucketSyncStatusCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, - const string& _source_zone, + RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, const string& _bucket_name, const string _bucket_id, int _shard_id, - rgw_bucket_shard_sync_info *_status) : RGWCoroutine(_store->ctx()), - async_rados(_async_rados), - store(_store), - obj_ctx(_store), - oid(RGWBucketSyncStatusManager::status_oid(_source_zone, _bucket_name, _bucket_id, _shard_id)), + rgw_bucket_shard_sync_info *_status) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), + obj_ctx(sync_env->store), + oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, _bucket_name, _bucket_id, _shard_id)), status(_status) {} int operate(); }; @@ -1548,8 +1451,8 @@ public: int RGWReadBucketSyncStatusCoroutine::operate() { reenter(this) { - yield call(new RGWSimpleRadosReadAttrsCR(async_rados, store, obj_ctx, - store->get_zone_params().log_pool, + yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store, obj_ctx, + sync_env->store->get_zone_params().log_pool, oid, &attrs)); if (retcode == -ENOENT) { @@ -1557,18 +1460,17 @@ int RGWReadBucketSyncStatusCoroutine::operate() return set_cr_done(); } if (retcode < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl; + ldout(sync_env->cct, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl; return set_cr_error(retcode); } - status->decode_from_attrs(store->ctx(), attrs); + status->decode_from_attrs(sync_env->cct, attrs); return set_cr_done(); } return 0; } RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status) { - return new RGWReadBucketSyncStatusCoroutine(async_rados, store, source_zone, - bucket_name, bucket_id, shard_id, sync_status); + return new RGWReadBucketSyncStatusCoroutine(&sync_env, bucket_name, bucket_id, shard_id, sync_status); } RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() { @@ -1652,11 +1554,7 @@ struct bucket_list_result { }; class RGWListBucketShardCR: public RGWCoroutine { - RGWRados *store; - RGWHTTPManager *http_manager; - RGWAsyncRadosProcessor *async_rados; - - RGWRESTConn *conn; + RGWDataSyncEnv *sync_env; string bucket_name; string bucket_id; @@ -1668,14 +1566,10 @@ class RGWListBucketShardCR: public RGWCoroutine { bucket_list_result *result; public: - RGWListBucketShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, - RGWRESTConn *_conn, + RGWListBucketShardCR(RGWDataSyncEnv *_sync_env, const string& _bucket_name, const string& _bucket_id, int _shard_id, rgw_obj_key& _marker_position, - bucket_list_result *_result) : RGWCoroutine(_store->ctx()), store(_store), - http_manager(_mgr), - async_rados(_async_rados), - conn(_conn), + bucket_list_result *_result) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id), marker_position(_marker_position), result(_result) { @@ -1699,7 +1593,7 @@ public: { NULL, NULL } }; string p = string("/") + bucket_name; - call(new RGWReadRESTResourceCR(store->ctx(), conn, http_manager, p, pairs, result)); + call(new RGWReadRESTResourceCR(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, result)); } if (retcode < 0) { return set_cr_error(retcode); @@ -1711,11 +1605,7 @@ public: }; class RGWListBucketIndexLogCR: public RGWCoroutine { - RGWRados *store; - RGWHTTPManager *http_manager; - RGWAsyncRadosProcessor *async_rados; - - RGWRESTConn *conn; + RGWDataSyncEnv *sync_env; string bucket_name; string bucket_id; @@ -1727,14 +1617,10 @@ class RGWListBucketIndexLogCR: public RGWCoroutine { list *result; public: - RGWListBucketIndexLogCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, - RGWRESTConn *_conn, - const string& _bucket_name, const string& _bucket_id, int _shard_id, - string& _marker, - list *_result) : RGWCoroutine(_store->ctx()), store(_store), - http_manager(_mgr), - async_rados(_async_rados), - conn(_conn), + RGWListBucketIndexLogCR(RGWDataSyncEnv *_sync_env, + const string& _bucket_name, const string& _bucket_id, int _shard_id, + string& _marker, + list *_result) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id), marker(_marker), result(_result) { @@ -1755,7 +1641,7 @@ public: { "type", "bucket-index" }, { NULL, NULL } }; - call(new RGWReadRESTResourceCR >(store->ctx(), conn, http_manager, "/admin/log", pairs, result)); + call(new RGWReadRESTResourceCR >(sync_env->cct, sync_env->conn, sync_env->http_manager, "/admin/log", pairs, result)); } if (retcode < 0) { return set_cr_error(retcode); @@ -1769,18 +1655,16 @@ public: #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10 class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { - RGWRados *store; - RGWAsyncRadosProcessor *async_rados; + RGWDataSyncEnv *sync_env; string marker_oid; rgw_bucket_shard_full_sync_marker sync_marker; public: - RGWBucketFullSyncShardMarkerTrack(RGWRados *_store, RGWAsyncRadosProcessor *_async_rados, + RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env, const string& _marker_oid, const rgw_bucket_shard_full_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW), - store(_store), - async_rados(_async_rados), + sync_env(_sync_env), marker_oid(_marker_oid), sync_marker(_marker) {} @@ -1791,15 +1675,16 @@ public: map attrs; sync_marker.encode_attr(attrs); - ldout(store->ctx(), 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl; - return new RGWSimpleRadosWriteAttrsCR(async_rados, store, store->get_zone_params().log_pool, + RGWRados *store = sync_env->store; + + ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl; + return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, store->get_zone_params().log_pool, marker_oid, attrs); } }; class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { - RGWRados *store; - RGWAsyncRadosProcessor *async_rados; + RGWDataSyncEnv *sync_env; string marker_oid; rgw_bucket_shard_inc_sync_marker sync_marker; @@ -1818,11 +1703,10 @@ class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack attrs; sync_marker.encode_attr(attrs); - ldout(store->ctx(), 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl; - return new RGWSimpleRadosWriteAttrsCR(async_rados, store, store->get_zone_params().log_pool, + RGWRados *store = sync_env->store; + + ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl; + return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, store->get_zone_params().log_pool, marker_oid, attrs); } @@ -1868,10 +1754,8 @@ public: template class RGWBucketSyncSingleEntryCR : public RGWCoroutine { - RGWRados *store; - RGWAsyncRadosProcessor *async_rados; + RGWDataSyncEnv *sync_env; - string source_zone; RGWBucketInfo *bucket_info; int shard_id; @@ -1890,15 +1774,14 @@ class RGWBucketSyncSingleEntryCR : public RGWCoroutine { public: - RGWBucketSyncSingleEntryCR(RGWRados *_store, RGWAsyncRadosProcessor *_async_rados, - const string& _source_zone, RGWBucketInfo *_bucket_info, int _shard_id, + RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env, + RGWBucketInfo *_bucket_info, int _shard_id, const rgw_obj_key& _key, bool _versioned, uint64_t _versioned_epoch, utime_t& _timestamp, const bucket_entry_owner& _owner, RGWModifyOp _op, RGWPendingState _op_state, - const T& _entry_marker, RGWSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_store->ctx()), store(_store), - async_rados(_async_rados), - source_zone(_source_zone), + const T& _entry_marker, RGWSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), bucket_info(_bucket_info), shard_id(_shard_id), key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch), owner(_owner), @@ -1907,8 +1790,8 @@ public: entry_marker(_entry_marker), marker_tracker(_marker_tracker), sync_status(0) { - set_description() << "bucket sync single entry (source_zone=" << source_zone << ") b=" << bucket_info->bucket << ":" << shard_id <<"/" << key << "[" << versioned_epoch << "] log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state; - ldout(store->ctx(), 20) << "bucket sync single entry (source_zone=" << source_zone << ") b=" << bucket_info->bucket << ":" << shard_id <<"/" << key << "[" << versioned_epoch << "] log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl; + set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << bucket_info->bucket << ":" << shard_id <<"/" << key << "[" << versioned_epoch << "] log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state; + ldout(sync_env->cct, 20) << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << bucket_info->bucket << ":" << shard_id <<"/" << key << "[" << versioned_epoch << "] log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl; set_status("init"); } @@ -1925,13 +1808,13 @@ public: op == CLS_RGW_OP_LINK_OLH) { if (op == CLS_RGW_OP_ADD && !key.instance.empty() && key.instance != "null") { set_status("skipping entry"); - ldout(store->ctx(), 10) << "bucket skipping sync obj: " << source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]: versioned object will be synced on link_olh" << dendl; + ldout(sync_env->cct, 10) << "bucket skipping sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]: versioned object will be synced on link_olh" << dendl; goto done; } set_status("syncing obj"); - ldout(store->ctx(), 5) << "bucket sync: sync obj: " << source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl; - call(new RGWFetchRemoteObjCR(async_rados, store, source_zone, *bucket_info, + ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl; + call(new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, *bucket_info, key, versioned_epoch, true)); } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) { @@ -1939,18 +1822,18 @@ public: if (op == CLS_RGW_OP_UNLINK_INSTANCE) { versioned = true; } - call(new RGWRemoveObjCR(async_rados, store, source_zone, *bucket_info, key, versioned, versioned_epoch, NULL, NULL, false, ×tamp)); + call(new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, *bucket_info, key, versioned, versioned_epoch, NULL, NULL, false, ×tamp)); } else if (op == CLS_RGW_OP_LINK_OLH_DM) { set_status("creating delete marker"); - ldout(store->ctx(), 10) << "creating delete marker: obj: " << source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl; - call(new RGWRemoveObjCR(async_rados, store, source_zone, *bucket_info, key, versioned, versioned_epoch, &owner.id, &owner.display_name, true, ×tamp)); + ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl; + call(new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, *bucket_info, key, versioned, versioned_epoch, &owner.id, &owner.display_name, true, ×tamp)); } } } while (marker_tracker->need_retry(key)); if (retcode < 0 && retcode != -ENOENT) { set_status() << "failed to sync obj; retcode=" << retcode; rgw_bucket& bucket = bucket_info->bucket; - ldout(store->ctx(), 0) << "ERROR: failed to sync object: " << bucket.name << ":" << bucket.bucket_id << ":" << shard_id << "/" << key << dendl; + ldout(sync_env->cct, 0) << "ERROR: failed to sync object: " << bucket.name << ":" << bucket.bucket_id << ":" << shard_id << "/" << key << dendl; sync_status = retcode; } done: @@ -1972,11 +1855,7 @@ done: #define BUCKET_SYNC_SPAWN_WINDOW 20 class RGWBucketShardFullSyncCR : public RGWCoroutine { - RGWHTTPManager *http_manager; - RGWAsyncRadosProcessor *async_rados; - RGWRESTConn *conn; - RGWRados *store; - string source_zone; + RGWDataSyncEnv *sync_env; string bucket_name; string bucket_id; int shard_id; @@ -1996,14 +1875,10 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine { string status_oid; public: - RGWBucketShardFullSyncCR(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, - RGWRESTConn *_conn, RGWRados *_store, - const string& _source_zone, + RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const string& _bucket_name, const string _bucket_id, int _shard_id, - RGWBucketInfo *_bucket_info, rgw_bucket_shard_full_sync_marker& _full_marker) : RGWCoroutine(_store->ctx()), - http_manager(_mgr), async_rados(_async_rados), conn(_conn), - store(_store), - source_zone(_source_zone), + RGWBucketInfo *_bucket_info, rgw_bucket_shard_full_sync_marker& _full_marker) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id), bucket_info(_bucket_info), @@ -2011,7 +1886,7 @@ public: spawn_window(BUCKET_SYNC_SPAWN_WINDOW), entry(NULL), op(CLS_RGW_OP_ADD), total_entries(0), lease_cr(NULL) { - status_oid = RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id); + status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id); } ~RGWBucketShardFullSyncCR() { @@ -2032,7 +1907,8 @@ int RGWBucketShardFullSyncCR::operate() set_status("acquiring sync lock"); uint32_t lock_duration = cct->_conf->rgw_sync_lease_period; string lock_name = "sync_lock"; - lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool, status_oid, + RGWRados *store = sync_env->store; + lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid, lock_name, lock_duration, this); lease_cr->get(); spawn(lease_cr, false); @@ -2048,16 +1924,14 @@ int RGWBucketShardFullSyncCR::operate() } set_status("lock acquired"); list_marker = full_marker.position; - marker_tracker = new RGWBucketFullSyncShardMarkerTrack(store, async_rados, - status_oid, - full_marker); + marker_tracker = new RGWBucketFullSyncShardMarkerTrack(sync_env, status_oid, full_marker); total_entries = full_marker.count; do { set_status("listing remote bucket"); - ldout(store->ctx(), 20) << __func__ << "(): listing bucket for full sync" << dendl; - yield call(new RGWListBucketShardCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id, - list_marker, &list_result)); + ldout(sync_env->cct, 20) << __func__ << "(): listing bucket for full sync" << dendl; + yield call(new RGWListBucketShardCR(sync_env, bucket_name, bucket_id, shard_id, + list_marker, &list_result)); if (retcode < 0 && retcode != -ENOENT) { set_status("failed bucket listing, going down"); yield lease_cr->go_down(); @@ -2066,17 +1940,17 @@ int RGWBucketShardFullSyncCR::operate() } entries_iter = list_result.entries.begin(); for (; entries_iter != list_result.entries.end(); ++entries_iter) { - ldout(store->ctx(), 20) << "[full sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << entries_iter->key << dendl; + ldout(sync_env->cct, 20) << "[full sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << entries_iter->key << dendl; entry = &(*entries_iter); total_entries++; list_marker = entries_iter->key; if (!marker_tracker->start(entry->key, total_entries, utime_t())) { - ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << entry->key << ". Duplicate entry?" << dendl; + ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->key << ". Duplicate entry?" << dendl; } else { op = (entry->key.instance.empty() || entry->key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH); yield { - spawn(new RGWBucketSyncSingleEntryCR(store, async_rados, source_zone, bucket_info, shard_id, + spawn(new RGWBucketSyncSingleEntryCR(sync_env, bucket_info, shard_id, entry->key, false, /* versioned, only matters for object removal */ entry->versioned_epoch, entry->mtime, @@ -2087,7 +1961,7 @@ int RGWBucketShardFullSyncCR::operate() yield wait_for_child(); while (collect(&ret)) { if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: a sync operation returned error" << dendl; + ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl; /* we should have reported this error */ #warning deal with error } @@ -2104,14 +1978,15 @@ int RGWBucketShardFullSyncCR::operate() sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync; map attrs; sync_status.encode_state_attr(attrs); - string oid = RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id); - call(new RGWSimpleRadosWriteAttrsCR(async_rados, store, store->get_zone_params().log_pool, + string oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id); + RGWRados *store = sync_env->store; + call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, store->get_zone_params().log_pool, oid, attrs)); } yield lease_cr->go_down(); drain_all(); if (retcode < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to set sync state on bucket " << bucket_name << ":" << bucket_id << ":" << shard_id + ldout(sync_env->cct, 0) << "ERROR: failed to set sync state on bucket " << bucket_name << ":" << bucket_id << ":" << shard_id << " retcode=" << retcode << dendl; return set_cr_error(retcode); } @@ -2121,11 +1996,7 @@ int RGWBucketShardFullSyncCR::operate() } class RGWBucketShardIncrementalSyncCR : public RGWCoroutine { - RGWHTTPManager *http_manager; - RGWAsyncRadosProcessor *async_rados; - RGWRESTConn *conn; - RGWRados *store; - string source_zone; + RGWDataSyncEnv *sync_env; string bucket_name; string bucket_id; int shard_id; @@ -2148,21 +2019,17 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine { public: - RGWBucketShardIncrementalSyncCR(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, - RGWRESTConn *_conn, RGWRados *_store, - const string& _source_zone, + RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env, const string& _bucket_name, const string _bucket_id, int _shard_id, - RGWBucketInfo *_bucket_info, rgw_bucket_shard_inc_sync_marker& _inc_marker) : RGWCoroutine(_store->ctx()), - http_manager(_mgr), async_rados(_async_rados), conn(_conn), - store(_store), - source_zone(_source_zone), + RGWBucketInfo *_bucket_info, rgw_bucket_shard_inc_sync_marker& _inc_marker) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id), bucket_info(_bucket_info), inc_marker(_inc_marker), entry(NULL), marker_tracker(NULL), spawn_window(BUCKET_SYNC_SPAWN_WINDOW), updated_status(false), lease_cr(NULL) { - status_oid = RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id); + status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id); set_description() << "bucket shard incremental sync bucket=" << _bucket_name << ":" << _bucket_id << ":" << _shard_id; set_status("init"); } @@ -2185,7 +2052,8 @@ int RGWBucketShardIncrementalSyncCR::operate() set_status("acquiring sync lock"); uint32_t lock_duration = cct->_conf->rgw_sync_lease_period; string lock_name = "sync_lock"; - lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool, status_oid, + RGWRados *store = sync_env->store; + lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid, lock_name, lock_duration, this); lease_cr->get(); spawn(lease_cr, false); @@ -2199,13 +2067,13 @@ int RGWBucketShardIncrementalSyncCR::operate() set_sleeping(true); yield; } - marker_tracker = new RGWBucketIncSyncShardMarkerTrack(store, async_rados, + marker_tracker = new RGWBucketIncSyncShardMarkerTrack(sync_env, status_oid, inc_marker); do { - ldout(store->ctx(), 20) << __func__ << "(): listing bilog for incremental sync" << dendl; + ldout(sync_env->cct, 20) << __func__ << "(): listing bilog for incremental sync" << dendl; set_status() << "listing bilog; position=" << inc_marker.position; - yield call(new RGWListBucketIndexLogCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id, + yield call(new RGWListBucketIndexLogCR(sync_env, bucket_name, bucket_id, shard_id, inc_marker.position, &list_result)); if (retcode < 0 && retcode != -ENOENT) { /* wait for all operations to complete */ @@ -2220,15 +2088,15 @@ int RGWBucketShardIncrementalSyncCR::operate() if (!rgw_obj::parse_raw_oid(entries_iter->object, &name, &instance, &ns)) { set_status() << "parse_raw_oid() on " << entries_iter->object << " returned false, skipping entry"; - ldout(store->ctx(), 20) << "parse_raw_oid() on " << entries_iter->object << " returned false, skipping entry" << dendl; + ldout(sync_env->cct, 20) << "parse_raw_oid() on " << entries_iter->object << " returned false, skipping entry" << dendl; continue; } - ldout(store->ctx(), 20) << "parsed entry: iter->object=" << entries_iter->object << " iter->instance=" << entries_iter->instance << " name=" << name << " instance=" << instance << " ns=" << ns << dendl; + ldout(sync_env->cct, 20) << "parsed entry: iter->object=" << entries_iter->object << " iter->instance=" << entries_iter->instance << " name=" << name << " instance=" << instance << " ns=" << ns << dendl; if (!ns.empty()) { set_status() << "skipping entry in namespace: " << entries_iter->object; - ldout(store->ctx(), 20) << "skipping entry in namespace: " << entries_iter->object << dendl; + ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entries_iter->object << dendl; continue; } @@ -2236,42 +2104,42 @@ int RGWBucketShardIncrementalSyncCR::operate() set_status() << "got entry.id=" << entry->id << " key=" << key << " op=" << (int)entry->op; if (entry->op == CLS_RGW_OP_CANCEL) { set_status() << "canceled operation, skipping"; - ldout(store->ctx(), 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": canceled operation" << dendl; + ldout(sync_env->cct, 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": canceled operation" << dendl; continue; } if (entry->state != CLS_RGW_STATE_COMPLETE) { set_status() << "non-complete operation, skipping"; - ldout(store->ctx(), 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": non-complete operation" << dendl; + ldout(sync_env->cct, 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": non-complete operation" << dendl; continue; } - ldout(store->ctx(), 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl; + ldout(sync_env->cct, 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl; updated_status = false; while (!marker_tracker->can_do_op(key, entry->op)) { if (!updated_status) { set_status() << "can't do op, conflicting inflight operation"; updated_status = true; } - ldout(store->ctx(), 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl; + ldout(sync_env->cct, 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl; yield wait_for_child(); } if (!marker_tracker->index_key_to_marker(key, entry->op, entry->id)) { set_status() << "can't do op, sync already in progress for object"; - ldout(store->ctx(), 20) << __func__ << ": skipping sync of entry: " << entry->id << ":" << key << " sync already in progress for object" << dendl; + ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << entry->id << ":" << key << " sync already in progress for object" << dendl; marker_tracker->try_update_high_marker(entry->id, 0, entries_iter->timestamp); continue; } // yield { set_status() << "start object sync"; if (!marker_tracker->start(entry->id, 0, entries_iter->timestamp)) { - ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << entry->id << ". Duplicate entry?" << dendl; + ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->id << ". Duplicate entry?" << dendl; } else { uint64_t versioned_epoch = 0; bucket_entry_owner owner(entry->owner, entry->owner_display_name); if (entry->ver.pool < 0) { versioned_epoch = entry->ver.epoch; } - spawn(new RGWBucketSyncSingleEntryCR(store, async_rados, source_zone, bucket_info, shard_id, + spawn(new RGWBucketSyncSingleEntryCR(sync_env, bucket_info, shard_id, key, entry->is_versioned(), versioned_epoch, entry->timestamp, owner, entry->op, entry->state, entry->id, marker_tracker), false); } @@ -2281,7 +2149,7 @@ int RGWBucketShardIncrementalSyncCR::operate() yield wait_for_child(); while (collect(&ret)) { if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: a sync operation returned error" << dendl; + ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl; /* we should have reported this error */ #warning deal with error } @@ -2302,74 +2170,71 @@ int RGWBucketShardIncrementalSyncCR::operate() int RGWRunBucketSyncCoroutine::operate() { reenter(this) { - yield call(new RGWReadBucketSyncStatusCoroutine(async_rados, store, source_zone, bucket_name, bucket_id, shard_id, &sync_status)); + yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bucket_name, bucket_id, shard_id, &sync_status)); if (retcode < 0 && retcode != -ENOENT) { - ldout(store->ctx(), 0) << "ERROR: failed to read sync status for bucket=" << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << dendl; + ldout(sync_env->cct, 0) << "ERROR: failed to read sync status for bucket=" << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << dendl; return set_cr_error(retcode); } - ldout(store->ctx(), 20) << __func__ << "(): sync status for bucket " << bucket_name << ":" << bucket_id << ":" << shard_id << ": " << sync_status.state << dendl; + ldout(sync_env->cct, 20) << __func__ << "(): sync status for bucket " << bucket_name << ":" << bucket_id << ":" << shard_id << ": " << sync_status.state << dendl; - yield call(new RGWGetBucketInstanceInfoCR(async_rados, store, bucket_name, bucket_id, &bucket_info)); + yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket_name, bucket_id, &bucket_info)); if (retcode == -ENOENT) { /* bucket instance info has not been synced in yet, fetch it now */ yield { - ldout(store->ctx(), 10) << "no local info for bucket " << bucket_name << ":" << bucket_id << ": fetching metadata" << dendl; + ldout(sync_env->cct, 10) << "no local info for bucket " << bucket_name << ":" << bucket_id << ": fetching metadata" << dendl; string raw_key = string("bucket.instance:") + bucket_name + ":" + bucket_id; - sync_env.init(cct, store, store->rest_master_conn, async_rados, http_manager); + meta_sync_env.init(cct, sync_env->store, sync_env->store->rest_master_conn, sync_env->async_rados, sync_env->http_manager, sync_env->error_logger); - call(new RGWMetaSyncSingleEntryCR(&sync_env, raw_key, + call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key, string() /* no marker */, MDLOG_STATUS_COMPLETE, NULL /* no marker tracker */)); } if (retcode < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to fetch bucket instance info for " << bucket_name << ":" << bucket_id << dendl; + ldout(sync_env->cct, 0) << "ERROR: failed to fetch bucket instance info for " << bucket_name << ":" << bucket_id << dendl; return set_cr_error(retcode); } - yield call(new RGWGetBucketInstanceInfoCR(async_rados, store, bucket_name, bucket_id, &bucket_info)); + yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket_name, bucket_id, &bucket_info)); } if (retcode < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_name << " bucket_id=" << bucket_id << dendl; + ldout(sync_env->cct, 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_name << " bucket_id=" << bucket_id << dendl; return set_cr_error(retcode); } yield { if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateInit) { - call(new RGWInitBucketShardSyncStatusCoroutine(async_rados, store, http_manager, source_zone, - conn, bucket_name, bucket_id, shard_id)); + call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bucket_name, bucket_id, shard_id)); sync_status.state = rgw_bucket_shard_sync_info::StateFullSync; } } if (retcode < 0) { - ldout(store->ctx(), 0) << "ERROR: init sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl; + ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl; return set_cr_error(retcode); } yield { if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) { - call(new RGWBucketShardFullSyncCR(http_manager, async_rados, conn, store, - source_zone, bucket_name, bucket_id, shard_id, + call(new RGWBucketShardFullSyncCR(sync_env, bucket_name, bucket_id, shard_id, &bucket_info, sync_status.full_marker)); sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync; } } if (retcode < 0) { - ldout(store->ctx(), 0) << "ERROR: full sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl; + ldout(sync_env->cct, 0) << "ERROR: full sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl; return set_cr_error(retcode); } yield { if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) { - call(new RGWBucketShardIncrementalSyncCR(http_manager, async_rados, conn, store, - source_zone, bucket_name, bucket_id, shard_id, + call(new RGWBucketShardIncrementalSyncCR(sync_env, bucket_name, bucket_id, shard_id, &bucket_info, sync_status.inc_marker)); } } if (retcode < 0) { - ldout(store->ctx(), 0) << "ERROR: incremental sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl; + ldout(sync_env->cct, 0) << "ERROR: incremental sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl; return set_cr_error(retcode); } @@ -2381,7 +2246,7 @@ int RGWRunBucketSyncCoroutine::operate() RGWCoroutine *RGWRemoteBucketLog::run_sync_cr() { - return new RGWRunBucketSyncCoroutine(http_manager, async_rados, conn, store, source_zone, bucket_name, bucket_id, shard_id); + return new RGWRunBucketSyncCoroutine(&sync_env, bucket_name, bucket_id, shard_id); } int RGWBucketSyncStatusManager::init() diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index e5a59dbf365d3..c3778d23e02cf 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -141,14 +141,39 @@ class RGWAsyncRadosProcessor; class RGWDataSyncStatusManager; class RGWDataSyncControlCR; -class RGWRemoteDataLog : public RGWCoroutinesManager { +struct RGWDataSyncEnv { + CephContext *cct; RGWRados *store; RGWRESTConn *conn; - string source_zone; RGWAsyncRadosProcessor *async_rados; + RGWHTTPManager *http_manager; + RGWSyncErrorLogger *error_logger; + string source_zone; + RGWDataSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL) {} + + void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn, + RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager, + RGWSyncErrorLogger *_error_logger, const string& _source_zone) { + cct = _cct; + store = _store; + conn = _conn; + async_rados = _async_rados; + http_manager = _http_manager; + error_logger = _error_logger; + source_zone = _source_zone; + } + + string shard_obj_name(int shard_id); + string status_oid(); +}; + +class RGWRemoteDataLog : public RGWCoroutinesManager { + RGWRados *store; + RGWAsyncRadosProcessor *async_rados; RGWHTTPManager http_manager; - RGWDataSyncStatusManager *status_manager; + + RGWDataSyncEnv sync_env; RWLock lock; RGWDataSyncControlCR *data_sync_cr; @@ -156,12 +181,11 @@ class RGWRemoteDataLog : public RGWCoroutinesManager { bool initialized; public: - RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados, - RGWDataSyncStatusManager *_sm) + RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), - store(_store), conn(NULL), async_rados(async_rados), + store(_store), async_rados(async_rados), http_manager(store->ctx(), &completion_mgr), - status_manager(_sm), lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL), + lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL), initialized(false) {} int init(const string& _source_zone, RGWRESTConn *_conn); @@ -198,7 +222,7 @@ public: RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados, const string& _source_zone) : store(_store), source_zone(_source_zone), conn(NULL), - source_log(store, async_rados, this), num_shards(0) {} + source_log(store, async_rados), num_shards(0) {} int init(); rgw_data_sync_status& get_sync_status() { return sync_status; } @@ -347,6 +371,9 @@ class RGWRemoteBucketLog : public RGWCoroutinesManager { RGWBucketSyncStatusManager *status_manager; RGWAsyncRadosProcessor *async_rados; RGWHTTPManager *http_manager; + RGWSyncErrorLogger *error_logger; + + RGWDataSyncEnv sync_env; RGWBucketSyncCR *sync_cr; @@ -354,7 +381,7 @@ public: RGWRemoteBucketLog(RGWRados *_store, RGWBucketSyncStatusManager *_sm, RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store), conn(NULL), shard_id(0), - status_manager(_sm), async_rados(_async_rados), http_manager(_http_manager), + status_manager(_sm), async_rados(_async_rados), http_manager(_http_manager), error_logger(NULL), sync_cr(NULL) {} int init(const string& _source_zone, RGWRESTConn *_conn, const string& _bucket_name, const string& _bucket_id, int _shard_id); -- 2.39.5