From 4a2b12a6d6a086f9534f569fc548f7ff502f423c Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 20 Aug 2019 14:02:12 -0700 Subject: [PATCH] rgw: reduce use of sync_env->store Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_data_sync.cc | 151 ++++++++++++++++++++------------------- src/rgw/rgw_data_sync.h | 11 ++- 2 files changed, 85 insertions(+), 77 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index a8fb37f482e64..66add66dbd7e2 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -45,6 +45,7 @@ static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard"; static string datalog_sync_full_sync_index_prefix = "data.full-sync.index"; static string bucket_status_oid_prefix = "bucket.sync-status"; static string object_status_oid_prefix = "bucket.sync-status"; +static string bucket_sync_sources_oid_prefix = "bucket.sync-sources"; void rgw_datalog_info::decode_json(JSONObj *obj) { @@ -109,8 +110,8 @@ bool RGWReadDataSyncStatusMarkersCR::spawn_next() return false; } using CR = RGWSimpleRadosReadCR; - spawn(new CR(env->async_rados, env->store->svc()->sysobj, - rgw_raw_obj(env->store->svc()->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)), + spawn(new CR(env->async_rados, env->svc.sysobj, + rgw_raw_obj(env->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)), &markers[shard_id]), false); shard_id++; @@ -146,7 +147,7 @@ bool RGWReadDataSyncRecoveringShardsCR::spawn_next() string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry"; auto& shard_keys = omapkeys[shard_id]; shard_keys = std::make_shared(); - spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->svc()->zone->get_zone_params().log_pool, error_oid), + spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->svc.zone->get_zone_params().log_pool, error_oid), marker, max_entries, shard_keys), false); ++shard_id; @@ -172,8 +173,8 @@ int RGWReadDataSyncStatusCoroutine::operate() using ReadInfoCR = RGWSimpleRadosReadCR; yield { bool empty_on_enoent = false; // fail on ENOENT - call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj, - rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)), + call(new ReadInfoCR(sync_env->async_rados, sync_env->svc.sysobj, + rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)), &sync_status->sync_info, empty_on_enoent)); } if (retcode < 0) { @@ -390,7 +391,7 @@ public: RGWListRemoteDataLogShardCR(RGWDataSyncEnv *env, int _shard_id, const string& _marker, uint32_t _max_entries, rgw_datalog_shard_data *_result) - : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL), + : RGWSimpleCoroutine(env->cct), sync_env(env), http_op(NULL), shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {} int send_request() override { @@ -417,7 +418,7 @@ public: int ret = http_op->aio_read(); if (ret < 0) { - ldout(sync_env->store->ctx(), 0) << "ERROR: failed to read from " << p << dendl; + ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl; log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl; http_op->put(); return ret; @@ -430,7 +431,7 @@ public: int ret = http_op->wait(result, null_yield); http_op->put(); if (ret < 0 && ret != -ENOENT) { - ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl; + ldout(sync_env->cct, 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl; return ret; } return 0; @@ -491,7 +492,7 @@ public: RGWSyncTraceNodeRef& _tn_parent, rgw_data_sync_status *status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store), - pool(store->svc()->zone->get_zone_params().log_pool), + pool(sync_env->svc.zone->get_zone_params().log_pool), num_shards(num_shards), status(status), tn(sync_env->sync_tracer->add_node(_tn_parent, "init_data_sync_status")) { lock_name = "sync_lock"; @@ -520,7 +521,7 @@ public: return set_cr_error(retcode); } using WriteInfoCR = RGWSimpleRadosWriteCR; - yield call(new WriteInfoCR(sync_env->async_rados, store->svc()->sysobj, + yield call(new WriteInfoCR(sync_env->async_rados, sync_env->svc.sysobj, rgw_raw_obj{pool, sync_status_oid}, status->sync_info)); if (retcode < 0) { @@ -541,7 +542,7 @@ public: /* fetch current position in logs */ yield { - RGWRESTConn *conn = store->svc()->zone->get_zone_conn_by_id(sync_env->source_zone); + RGWRESTConn *conn = sync_env->svc.zone->get_zone_conn_by_id(sync_env->source_zone); if (!conn) { tn->log(0, SSTR("ERROR: connection to zone " << sync_env->source_zone << " does not exist!")); return set_cr_error(-EIO); @@ -565,7 +566,7 @@ public: marker.timestamp = info.last_update; const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i); using WriteMarkerCR = RGWSimpleRadosWriteCR; - spawn(new WriteMarkerCR(sync_env->async_rados, store->svc()->sysobj, + spawn(new WriteMarkerCR(sync_env->async_rados, sync_env->svc.sysobj, rgw_raw_obj{pool, oid}, marker), true); } } @@ -578,7 +579,7 @@ public: } status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps; - yield call(new WriteInfoCR(sync_env->async_rados, store->svc()->sysobj, + yield call(new WriteInfoCR(sync_env->async_rados, sync_env->svc.sysobj, rgw_raw_obj{pool, sync_status_oid}, status->sync_info)); if (retcode < 0) { @@ -594,11 +595,14 @@ public: } }; -RGWRemoteDataLog::RGWRemoteDataLog(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *_store, +RGWRemoteDataLog::RGWRemoteDataLog(const DoutPrefixProvider *dpp, + CephContext *_cct, + RGWCoroutinesManagerRegistry *_cr_registry, RGWAsyncRadosProcessor *async_rados) - : RGWCoroutinesManager(_store->ctx(), _store->getRados()->get_cr_registry()), - dpp(dpp), store(_store), async_rados(async_rados), - http_manager(store->ctx(), completion_mgr), + : RGWCoroutinesManager(_cct, _cr_registry), + dpp(dpp), cct(_cct), cr_registr(_cr_registry), + async_rados(async_rados), + http_manager(_cct, completion_mgr), data_sync_cr(NULL), initialized(false) { @@ -640,7 +644,7 @@ int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSy RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& _sync_module, PerfCounters* counters) { - sync_env.init(dpp, store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, + sync_env.init(dpp, cct, store, _conn, async_rados, &http_manager, _error_logger, _sync_tracer, _source_zone, _sync_module, counters); if (initialized) { @@ -668,8 +672,8 @@ void RGWRemoteDataLog::finish() int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status) { // cannot run concurrently with run_sync(), so run in a separate manager - RGWCoroutinesManager crs(store->ctx(), store->getRados()->get_cr_registry()); - RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr()); + RGWCoroutinesManager crs(cct, cr_registry); + RGWHTTPManager http_manager(cct, crs.get_completion_mgr()); int ret = http_manager.start(); if (ret < 0) { ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl; @@ -685,8 +689,8 @@ int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status) int RGWRemoteDataLog::read_recovering_shards(const int num_shards, set& recovering_shards) { // cannot run concurrently with run_sync(), so run in a separate manager - RGWCoroutinesManager crs(store->ctx(), store->getRados()->get_cr_registry()); - RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr()); + RGWCoroutinesManager crs(cct, cr_registry); + RGWHTTPManager http_manager(cct, crs.get_completion_mgr()); int ret = http_manager.start(); if (ret < 0) { ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl; @@ -716,8 +720,8 @@ int RGWRemoteDataLog::init_sync_status(int num_shards) rgw_data_sync_status sync_status; sync_status.sync_info.num_shards = num_shards; - RGWCoroutinesManager crs(store->ctx(), store->getRados()->get_cr_registry()); - RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr()); + RGWCoroutinesManager crs(cct, cr_registry); + RGWHTTPManager http_manager(cct, crs.get_completion_mgr()); int ret = http_manager.start(); if (ret < 0) { ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl; @@ -813,7 +817,7 @@ public: int operate() override { reenter(this) { entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards, - store->svc()->zone->get_zone_params().log_pool, + synv_env->svc.zone->get_zone_params().log_pool, oid_prefix); yield; // yield so OmapAppendCRs can start @@ -825,7 +829,7 @@ public: {"marker", result.marker.c_str()}, {NULL, NULL}}; - call(new RGWReadRESTResourceCR(store->ctx(), sync_env->conn, sync_env->http_manager, + call(new RGWReadRESTResourceCR(sync_env->cct, sync_env->conn, sync_env->http_manager, entrypoint, pairs, &result)); } if (retcode < 0) { @@ -841,7 +845,7 @@ public: rgw_http_param_pair pairs[] = {{"key", key.c_str()}, {NULL, NULL}}; - call(new RGWReadRESTResourceCR(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info)); + call(new RGWReadRESTResourceCR(sync_env->cct, sync_env->conn, sync_env->http_manager, path, pairs, &meta_info)); } num_shards = meta_info.data.get_bucket_info().num_shards; @@ -850,10 +854,10 @@ public: char buf[16]; snprintf(buf, sizeof(buf), ":%d", i); s = key + buf; - yield entries_index->append(s, store->svc()->datalog_rados->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i)); + yield entries_index->append(s, synv_env->svc.datalog_rados->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i)); } } else { - yield entries_index->append(key, store->svc()->datalog_rados->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1)); + yield entries_index->append(key, synv_env->svc.datalog_rados->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1)); } } truncated = result.truncated; @@ -869,8 +873,8 @@ public: int shard_id = (int)iter->first; rgw_data_sync_marker& marker = iter->second; marker.total_entries = entries_index->get_total_entries(shard_id); - spawn(new RGWSimpleRadosWriteCR(sync_env->async_rados, store->svc()->sysobj, - rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)), + spawn(new RGWSimpleRadosWriteCR(sync_env->async_rados, synv_env->svc.sysobj, + rgw_raw_obj(synv_env->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)), marker), true); } @@ -938,8 +942,8 @@ public: tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker)); RGWRados *rados = sync_env->store->getRados(); - return new RGWSimpleRadosWriteCR(sync_env->async_rados, rados->svc.sysobj, - rgw_raw_obj(rados->svc.zone->get_zone_params().log_pool, marker_oid), + return new RGWSimpleRadosWriteCR(sync_env->async_rados, synv_env->svc.sysobj, + rgw_raw_obj(synv_env->svc.zone->get_zone_params().log_pool, marker_oid), sync_marker); } @@ -1271,8 +1275,9 @@ public: if (lease_cr) { lease_cr->abort(); } - lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, sync_env->store, - rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, status_oid), + auto store = sync_env->store; + lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store, + rgw_raw_obj(synv_env->svc.zone->get_zone_params().log_pool, status_oid), lock_name, lock_duration, this)); lease_stack.reset(spawn(lease_cr.get(), false)); } @@ -1352,9 +1357,9 @@ public: sync_marker.state = rgw_data_sync_marker::IncrementalSync; sync_marker.marker = sync_marker.next_step_marker; sync_marker.next_step_marker.clear(); - RGWRados *rados = sync_env->store->getRados(); - call(new RGWSimpleRadosWriteCR(sync_env->async_rados, rados->svc.sysobj, - rgw_raw_obj(rados->svc.zone->get_zone_params().log_pool, status_oid), + RGWRados *store = sync_env->store; + call(new RGWSimpleRadosWriteCR(sync_env->async_rados, synv_env->svc.sysobj, + rgw_raw_obj(synv_env->svc.zone->get_zone_params().log_pool, status_oid), sync_marker)); } if (retcode < 0) { @@ -1553,9 +1558,9 @@ public: } RGWCoroutine *alloc_finisher_cr() override { - RGWRados *rados = sync_env->store->getRados(); - return new RGWSimpleRadosReadCR(sync_env->async_rados, rados->svc.sysobj, - rgw_raw_obj(rados->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)), + RGWRados *store = sync_env->store; + return new RGWSimpleRadosReadCR(sync_env->async_rados, synv_env->svc.sysobj, + rgw_raw_obj(synv_env->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)), &sync_marker); } @@ -1672,7 +1677,7 @@ public: tn->log(10, SSTR("spawning " << num_shards << " shards sync")); for (map::iterator iter = sync_status.sync_markers.begin(); iter != sync_status.sync_markers.end(); ++iter) { - RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->svc()->zone->get_zone_params().log_pool, + RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->synv_env->svc.zone->get_zone_params().log_pool, iter->first, iter->second, tn); cr->get(); shard_crs_lock.lock(); @@ -1689,9 +1694,8 @@ public: } RGWCoroutine *set_sync_info_cr() { - RGWRados *rados = sync_env->store->getRados(); - return new RGWSimpleRadosWriteCR(sync_env->async_rados, rados->svc.sysobj, - rgw_raw_obj(rados->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)), + return new RGWSimpleRadosWriteCR(sync_env->async_rados, synv_env->svc.sysobj, + rgw_raw_obj(synv_env->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)), sync_status.sync_info); } @@ -1915,22 +1919,22 @@ int RGWDataSyncStatusManager::init() { RGWZone *zone_def; - if (!store->svc()->zone->find_zone_by_id(source_zone, &zone_def)) { + if (!svc.zone->find_zone_by_id(source_zone, &zone_def)) { ldpp_dout(this, 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl; return -EIO; } - if (!store->svc()->sync_modules->get_manager()->supports_data_export(zone_def->tier_type)) { + if (!svc.sync_modules->get_manager()->supports_data_export(zone_def->tier_type)) { return -ENOTSUP; } - const RGWZoneParams& zone_params = store->svc()->zone->get_zone_params(); + const RGWZoneParams& zone_params = svc.zone->get_zone_params(); if (sync_module == nullptr) { sync_module = store->getRados()->get_sync_module(); } - conn = store->svc()->zone->get_zone_conn_by_id(source_zone); + conn = svc.zone->get_zone_conn_by_id(source_zone); if (!conn) { ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl; return -EINVAL; @@ -2085,7 +2089,7 @@ public: } yield { auto store = sync_env->store; - rgw_raw_obj obj(store->svc()->zone->get_zone_params().log_pool, sync_status_oid); + rgw_raw_obj obj(sync_env->svc.zone->get_zone_params().log_pool, sync_status_oid); if (info.syncstopped) { call(new RGWRadosRemoveCR(store, obj)); @@ -2100,7 +2104,7 @@ public: } map attrs; status.encode_all_attrs(attrs); - call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc()->sysobj, obj, attrs)); + call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc.sysobj, obj, attrs)); } } if (info.syncstopped) { @@ -2201,8 +2205,8 @@ public: int RGWReadBucketPipeSyncStatusCoroutine::operate() { reenter(this) { - yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store->svc()->sysobj, - rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, oid), + yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->svc.sysobj, + rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, oid), &attrs, true)); if (retcode == -ENOENT) { *status = rgw_bucket_shard_sync_info(); @@ -2255,7 +2259,7 @@ int RGWReadRecoveringBucketShardsCoroutine::operate() count = 0; do { omapkeys = std::make_shared(); - yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, error_oid), + yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, error_oid), marker, max_omap_entries, omapkeys)); if (retcode == -ENOENT) { @@ -2322,8 +2326,8 @@ int RGWReadPendingBucketShardsCoroutine::operate() reenter(this){ //read sync status marker using CR = RGWSimpleRadosReadCR; - yield call(new CR(sync_env->async_rados, store->svc()->sysobj, - rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, status_oid), + yield call(new CR(sync_env->async_rados, sync_env->svc.sysobj, + rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, status_oid), sync_marker)); if (retcode < 0) { ldout(sync_env->cct,0) << "failed to read sync status marker with " @@ -2611,8 +2615,8 @@ public: RGWRados *rados = sync_env->store->getRados(); tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker)); - return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, rados->svc.sysobj, - rgw_raw_obj(rados->svc.zone->get_zone_params().log_pool, marker_oid), + return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc.sysobj, + rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, marker_oid), attrs); } @@ -2670,12 +2674,10 @@ public: map attrs; sync_marker.encode_attr(attrs); - RGWRados *rados = sync_env->store->getRados(); - tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker)); return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, - rados->svc.sysobj, - rgw_raw_obj(rados->svc.zone->get_zone_params().log_pool, marker_oid), + sync_env->svc.sysobj, + rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, marker_oid), attrs); } @@ -2777,7 +2779,7 @@ public: data_sync_module = sync_env->sync_module->get_data_handler(); zones_trace = _zones_trace; - zones_trace.insert(sync_env->store->svc()->zone->get_zone().id); + zones_trace.insert(sync_env->svc.zone->get_zone().id); } int operate() override { @@ -2987,9 +2989,9 @@ int RGWBucketShardFullSyncCR::operate() sync_info.state = rgw_bucket_shard_sync_info::StateIncrementalSync; map attrs; sync_info.encode_state_attr(attrs); - RGWRados *rados = sync_env->store->getRados(); - call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, rados->svc.sysobj, - rgw_raw_obj(rados->svc.zone->get_zone_params().log_pool, status_oid), + RGWRados *store = sync_env->store; + call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc.sysobj, + rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, status_oid), attrs)); } } else { @@ -3045,7 +3047,7 @@ public: sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs), lease_cr(lease_cr), sync_info(sync_info), marker_tracker(sync_env, status_oid, sync_info.inc_marker), - status_oid(status_oid), zone_id(_sync_env->store->svc()->zone->get_zone().id), + status_oid(status_oid), zone_id(_sync_env->svc.zone->get_zone().id), tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync", SSTR(bucket_shard_str{bs}))) { @@ -3373,8 +3375,9 @@ int RGWRunBucketSyncCoroutine::operate() reenter(this) { yield { set_status("acquiring sync lock"); - lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, sync_env->store, - rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, status_oid), + auto store = sync_env->store; + lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store, + rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, status_oid), "sync_lock", cct->_conf->rgw_sync_lease_period, this)); @@ -3409,7 +3412,7 @@ int RGWRunBucketSyncCoroutine::operate() tn->log(10, SSTR("no local info for bucket:" << ": fetching metadata")); string raw_key = string("bucket.instance:") + sync_pipe.source_bs.bucket.get_key(); - meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->store->svc()->zone->get_master_conn(), sync_env->async_rados, + meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->svc.zone->get_master_conn(), sync_env->async_rados, sync_env->http_manager, sync_env->error_logger, sync_env->sync_tracer); call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key, @@ -3494,7 +3497,7 @@ RGWCoroutine *RGWRemoteBucketLog::run_sync_cr() int RGWBucketPipeSyncStatusManager::init() { - conn = store->svc()->zone->get_zone_conn_by_id(source_zone); + conn = sync_env->svc.zone->get_zone_conn_by_id(source_zone); if (!conn) { ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl; return -EINVAL; @@ -3530,7 +3533,7 @@ int RGWBucketPipeSyncStatusManager::init() int effective_num_shards = (num_shards ? num_shards : 1); - auto async_rados = store->svc()->rados->get_async_processor(); + auto async_rados = sync_env->svc.rados->get_async_processor(); for (int i = 0; i < effective_num_shards; i++) { RGWRemoteBucketLog *l = new RGWRemoteBucketLog(this, store, async_rados, &http_manager); @@ -3646,7 +3649,7 @@ class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR { RGWCollectBucketSyncStatusCR(rgw::sal::RGWRadosStore *store, RGWDataSyncEnv *env, int num_shards, const rgw_bucket& bucket, Vector *status) - : RGWShardCollectCR(store->ctx(), max_concurrent_shards), + : RGWShardCollectCR(env->cct, max_concurrent_shards), store(store), env(env), num_shards(num_shards), bs(bucket, num_shards > 0 ? 0 : -1), // start at shard 0 or -1 i(status->begin()), end(status->end()) @@ -3674,7 +3677,7 @@ int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStor RGWDataSyncEnv env; RGWSyncModuleInstanceRef module; // null sync module - env.init(dpp, store->ctx(), store, nullptr, store->svc()->rados->get_async_processor(), + env.init(dpp, store->ctx(), store, nullptr, sync_env->svc.rados->get_async_processor(), nullptr, nullptr, nullptr, source_zone, module, nullptr); RGWCoroutinesManager crs(store->ctx(), store->getRados()->get_cr_registry()); diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index d14eada51c70e..fa589ddce30e3 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -418,6 +418,7 @@ struct RGWDataSyncEnv { const DoutPrefixProvider *dpp{nullptr}; CephContext *cct{nullptr}; rgw::sal::RGWRadosStore *store{nullptr}; + RGWServices *svc{nullptr}; RGWRESTConn *conn{nullptr}; RGWAsyncRadosProcessor *async_rados{nullptr}; RGWHTTPManager *http_manager{nullptr}; @@ -429,7 +430,8 @@ struct RGWDataSyncEnv { RGWDataSyncEnv() {} - void init(const DoutPrefixProvider *_dpp, CephContext *_cct, rgw::sal::RGWRadosStore *_store, RGWRESTConn *_conn, + void init(const DoutPrefixProvider *_dpp, CephContext *_cct, rgw::sal::RGWRadosStore *_store, + RGWRESTConn *_conn, RGWServices *_svc, RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager, RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer, const string& _source_zone, RGWSyncModuleInstanceRef& _sync_module, @@ -437,6 +439,7 @@ struct RGWDataSyncEnv { dpp = _dpp; cct = _cct; store = _store; + svc = _svc; conn = _conn; async_rados = _async_rados; http_manager = _http_manager; @@ -456,7 +459,7 @@ class RGWDataChangesLogInfo; class RGWRemoteDataLog : public RGWCoroutinesManager { const DoutPrefixProvider *dpp; - rgw::sal::RGWRadosStore *store; + CephContext *cct; RGWAsyncRadosProcessor *async_rados; RGWHTTPManager http_manager; @@ -470,7 +473,9 @@ class RGWRemoteDataLog : public RGWCoroutinesManager { bool initialized; public: - RGWRemoteDataLog(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *_store, + RGWRemoteDataLog(const DoutPrefixProvider *dpp, + CephContext *_cct, + RGWCoroutinesManagerRegistry *_cr_registry, RGWAsyncRadosProcessor *async_rados); int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& module, -- 2.39.5