From 9db06e677d2ef2396fa0b2575317671c96b0e6ee Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 20 Aug 2019 17:02:16 -0700 Subject: [PATCH] rgw: data sync: split RGWDataSyncEnv Keep only the environment related fields in there, and have the sync info related stuff in the containing RGWDataSyncCtx. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_data_sync.cc | 539 ++++++++++++++++-------------- src/rgw/rgw_data_sync.h | 34 +- src/rgw/rgw_sync_module.cc | 16 +- src/rgw/rgw_sync_module.h | 19 +- src/rgw/rgw_sync_module_aws.cc | 270 +++++++-------- src/rgw/rgw_sync_module_es.cc | 92 ++--- src/rgw/rgw_sync_module_log.cc | 26 +- src/rgw/rgw_sync_module_pubsub.cc | 237 ++++++------- 8 files changed, 659 insertions(+), 574 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 66add66dbd7e..ed26f3e349af 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -89,6 +89,7 @@ void rgw_sync_flow_rule::get_zone_peers(const string& zone_id, class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR { static constexpr int MAX_CONCURRENT_SHARDS = 16; + RGWDataSyncCtx *sc; RGWDataSyncEnv *env; const int num_shards; int shard_id{0};; @@ -96,10 +97,10 @@ class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR { map& markers; public: - RGWReadDataSyncStatusMarkersCR(RGWDataSyncEnv *env, int num_shards, + RGWReadDataSyncStatusMarkersCR(RGWDataSyncCtx *sc, int num_shards, map& markers) - : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), - env(env), num_shards(num_shards), markers(markers) + : RGWShardCollectCR(sc->cct, MAX_CONCURRENT_SHARDS), + sc(sc), env(sc->env), num_shards(num_shards), markers(markers) {} bool spawn_next() override; }; @@ -110,8 +111,8 @@ bool RGWReadDataSyncStatusMarkersCR::spawn_next() return false; } using CR = RGWSimpleRadosReadCR; - spawn(new CR(env->async_rados, env->svc.sysobj, - rgw_raw_obj(env->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)), + spawn(new CR(env->async_rados, env->svc->sysobj, + rgw_raw_obj(env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)), &markers[shard_id]), false); shard_id++; @@ -121,6 +122,7 @@ bool RGWReadDataSyncStatusMarkersCR::spawn_next() class RGWReadDataSyncRecoveringShardsCR : public RGWShardCollectCR { static constexpr int MAX_CONCURRENT_SHARDS = 16; + RGWDataSyncCtx *sc; RGWDataSyncEnv *env; uint64_t max_entries; @@ -131,9 +133,9 @@ class RGWReadDataSyncRecoveringShardsCR : public RGWShardCollectCR { std::vector& omapkeys; public: - RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv *env, uint64_t _max_entries, int _num_shards, + RGWReadDataSyncRecoveringShardsCR(RGWDataSyncCtx *sc, uint64_t _max_entries, int _num_shards, std::vector& omapkeys) - : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), env(env), + : RGWShardCollectCR(sc->cct, MAX_CONCURRENT_SHARDS), sc(sc), env(sc->env), max_entries(_max_entries), num_shards(_num_shards), omapkeys(omapkeys) {} bool spawn_next() override; @@ -144,10 +146,10 @@ bool RGWReadDataSyncRecoveringShardsCR::spawn_next() if (shard_id >= num_shards) return false; - string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry"; + string error_oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id) + ".retry"; auto& shard_keys = omapkeys[shard_id]; shard_keys = std::make_shared(); - spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->svc.zone->get_zone_params().log_pool, error_oid), + spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->svc->zone->get_zone_params().log_pool, error_oid), marker, max_entries, shard_keys), false); ++shard_id; @@ -155,13 +157,14 @@ bool RGWReadDataSyncRecoveringShardsCR::spawn_next() } class RGWReadDataSyncStatusCoroutine : public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw_data_sync_status *sync_status; public: - RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, + RGWReadDataSyncStatusCoroutine(RGWDataSyncCtx *_sc, rgw_data_sync_status *_status) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status) + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(sc->env), sync_status(_status) {} int operate() override; }; @@ -173,8 +176,8 @@ int RGWReadDataSyncStatusCoroutine::operate() using ReadInfoCR = RGWSimpleRadosReadCR; yield { bool empty_on_enoent = false; // fail on ENOENT - call(new ReadInfoCR(sync_env->async_rados, sync_env->svc.sysobj, - rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)), + call(new ReadInfoCR(sync_env->async_rados, sync_env->svc->sysobj, + rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sc->source_zone)), &sync_status->sync_info, empty_on_enoent)); } if (retcode < 0) { @@ -184,7 +187,7 @@ int RGWReadDataSyncStatusCoroutine::operate() } // read shard markers using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR; - yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards, + yield call(new ReadMarkersCR(sc, sync_status->sync_info.num_shards, sync_status->sync_markers)); if (retcode < 0) { ldout(sync_env->cct, 4) << "failed to read sync status markers with " @@ -197,6 +200,7 @@ int RGWReadDataSyncStatusCoroutine::operate() } class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; RGWRESTReadResource *http_op; @@ -205,9 +209,10 @@ class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine { RGWDataChangesLogInfo *shard_info; public: - RGWReadRemoteDataLogShardInfoCR(RGWDataSyncEnv *_sync_env, - int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), + RGWReadRemoteDataLogShardInfoCR(RGWDataSyncCtx *_sc, + int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sc->cct), + sc(_sc), + sync_env(_sc->env), http_op(NULL), shard_id(_shard_id), shard_info(_shard_info) { @@ -231,7 +236,7 @@ public: string p = "/admin/log/"; - http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager); + http_op = new RGWRESTReadResource(sc->conn, p, pairs, NULL, sync_env->http_manager); init_new_io(http_op); @@ -271,6 +276,7 @@ struct read_remote_data_log_response { }; class RGWReadRemoteDataLogShardCR : public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; RGWRESTReadResource *http_op = nullptr; @@ -285,11 +291,11 @@ class RGWReadRemoteDataLogShardCR : public RGWCoroutine { std::optional timer; public: - RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env, int _shard_id, + RGWReadRemoteDataLogShardCR(RGWDataSyncCtx *_sc, int _shard_id, const std::string& marker, string *pnext_marker, list *_entries, bool *_truncated) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), shard_id(_shard_id), marker(marker), pnext_marker(pnext_marker), entries(_entries), truncated(_truncated) { } @@ -312,7 +318,7 @@ public: string p = "/admin/log/"; - http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager); + http_op = new RGWRESTReadResource(sc->conn, p, pairs, NULL, sync_env->http_manager); init_new_io(http_op); @@ -352,6 +358,7 @@ public: }; class RGWReadRemoteDataLogInfoCR : public RGWShardCollectCR { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; int num_shards; @@ -361,10 +368,10 @@ class RGWReadRemoteDataLogInfoCR : public RGWShardCollectCR { #define READ_DATALOG_MAX_CONCURRENT 10 public: - RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv *_sync_env, + RGWReadRemoteDataLogInfoCR(RGWDataSyncCtx *_sc, int _num_shards, - map *_datalog_info) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT), - sync_env(_sync_env), num_shards(_num_shards), + map *_datalog_info) : RGWShardCollectCR(_sc->cct, READ_DATALOG_MAX_CONCURRENT), + sc(_sc), sync_env(_sc->env), num_shards(_num_shards), datalog_info(_datalog_info), shard_id(0) {} bool spawn_next() override; }; @@ -373,12 +380,13 @@ bool RGWReadRemoteDataLogInfoCR::spawn_next() { if (shard_id >= num_shards) { return false; } - spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &(*datalog_info)[shard_id]), false); + spawn(new RGWReadRemoteDataLogShardInfoCR(sc, shard_id, &(*datalog_info)[shard_id]), false); shard_id++; return true; } class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; RGWRESTReadResource *http_op; @@ -388,14 +396,14 @@ class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine { rgw_datalog_shard_data *result; public: - RGWListRemoteDataLogShardCR(RGWDataSyncEnv *env, int _shard_id, + RGWListRemoteDataLogShardCR(RGWDataSyncCtx *sc, int _shard_id, const string& _marker, uint32_t _max_entries, rgw_datalog_shard_data *_result) - : RGWSimpleCoroutine(env->cct), sync_env(env), http_op(NULL), + : RGWSimpleCoroutine(sc->cct), sc(sc), sync_env(sc->env), http_op(NULL), shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {} int send_request() override { - RGWRESTConn *conn = sync_env->conn; + RGWRESTConn *conn = sc->conn; char buf[32]; snprintf(buf, sizeof(buf), "%d", shard_id); @@ -439,6 +447,7 @@ public: }; class RGWListRemoteDataLogCR : public RGWShardCollectCR { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; map shards; @@ -449,11 +458,11 @@ class RGWListRemoteDataLogCR : public RGWShardCollectCR { #define READ_DATALOG_MAX_CONCURRENT 10 public: - RGWListRemoteDataLogCR(RGWDataSyncEnv *_sync_env, + RGWListRemoteDataLogCR(RGWDataSyncCtx *_sc, map& _shards, int _max_entries_per_shard, - map *_result) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT), - sync_env(_sync_env), max_entries_per_shard(_max_entries_per_shard), + map *_result) : RGWShardCollectCR(_sc->cct, READ_DATALOG_MAX_CONCURRENT), + sc(_sc), sync_env(_sc->env), max_entries_per_shard(_max_entries_per_shard), result(_result) { shards.swap(_shards); iter = shards.begin(); @@ -466,13 +475,14 @@ bool RGWListRemoteDataLogCR::spawn_next() { return false; } - spawn(new RGWListRemoteDataLogShardCR(sync_env, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false); + spawn(new RGWListRemoteDataLogShardCR(sc, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false); ++iter; return true; } class RGWInitDataSyncStatusCoroutine : public RGWCoroutine { static constexpr uint32_t lock_duration = 30; + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw::sal::RGWRadosStore *store; const rgw_pool& pool; @@ -487,12 +497,12 @@ class RGWInitDataSyncStatusCoroutine : public RGWCoroutine { RGWSyncTraceNodeRef tn; public: - RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, uint32_t num_shards, + RGWInitDataSyncStatusCoroutine(RGWDataSyncCtx *_sc, uint32_t num_shards, uint64_t instance_id, RGWSyncTraceNodeRef& _tn_parent, rgw_data_sync_status *status) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store), - pool(sync_env->svc.zone->get_zone_params().log_pool), + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), store(sync_env->store), + pool(sync_env->svc->zone->get_zone_params().log_pool), num_shards(num_shards), status(status), tn(sync_env->sync_tracer->add_node(_tn_parent, "init_data_sync_status")) { lock_name = "sync_lock"; @@ -505,7 +515,7 @@ public: gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1); cookie = buf; - sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone); + sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sc->source_zone); } @@ -521,7 +531,7 @@ public: return set_cr_error(retcode); } using WriteInfoCR = RGWSimpleRadosWriteCR; - yield call(new WriteInfoCR(sync_env->async_rados, sync_env->svc.sysobj, + yield call(new WriteInfoCR(sync_env->async_rados, sync_env->svc->sysobj, rgw_raw_obj{pool, sync_status_oid}, status->sync_info)); if (retcode < 0) { @@ -542,13 +552,13 @@ public: /* fetch current position in logs */ yield { - RGWRESTConn *conn = sync_env->svc.zone->get_zone_conn_by_id(sync_env->source_zone); + RGWRESTConn *conn = sync_env->svc->zone->get_zone_conn_by_id(sc->source_zone); if (!conn) { - tn->log(0, SSTR("ERROR: connection to zone " << sync_env->source_zone << " does not exist!")); + tn->log(0, SSTR("ERROR: connection to zone " << sc->source_zone << " does not exist!")); return set_cr_error(-EIO); } for (uint32_t i = 0; i < num_shards; i++) { - spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true); + spawn(new RGWReadRemoteDataLogShardInfoCR(sc, i, &shards_info[i]), true); } } while (collect(&ret, NULL)) { @@ -564,9 +574,9 @@ public: auto& marker = status->sync_markers[i]; marker.next_step_marker = info.marker; marker.timestamp = info.last_update; - const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i); + const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, i); using WriteMarkerCR = RGWSimpleRadosWriteCR; - spawn(new WriteMarkerCR(sync_env->async_rados, sync_env->svc.sysobj, + spawn(new WriteMarkerCR(sync_env->async_rados, sync_env->svc->sysobj, rgw_raw_obj{pool, oid}, marker), true); } } @@ -579,7 +589,7 @@ public: } status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps; - yield call(new WriteInfoCR(sync_env->async_rados, sync_env->svc.sysobj, + yield call(new WriteInfoCR(sync_env->async_rados, sync_env->svc->sysobj, rgw_raw_obj{pool, sync_status_oid}, status->sync_info)); if (retcode < 0) { @@ -596,13 +606,13 @@ public: }; RGWRemoteDataLog::RGWRemoteDataLog(const DoutPrefixProvider *dpp, - CephContext *_cct, - RGWCoroutinesManagerRegistry *_cr_registry, + rgw::sal::RGWRadosStore *store, RGWAsyncRadosProcessor *async_rados) - : RGWCoroutinesManager(_cct, _cr_registry), - dpp(dpp), cct(_cct), cr_registr(_cr_registry), + : RGWCoroutinesManager(store->ctx(), store->getRados()->get_cr_registry()), + dpp(dpp), store(store), + cct(store->ctx()), cr_registry(store->getRados()->get_cr_registry()), async_rados(async_rados), - http_manager(_cct, completion_mgr), + http_manager(store->ctx(), completion_mgr), data_sync_cr(NULL), initialized(false) { @@ -613,7 +623,7 @@ int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info) rgw_http_param_pair pairs[] = { { "type", "data" }, { NULL, NULL } }; - int ret = sync_env.conn->get_json_resource("/admin/log", pairs, *log_info); + int ret = sc.conn->get_json_resource("/admin/log", pairs, *log_info); if (ret < 0) { ldpp_dout(dpp, 0) << "ERROR: failed to fetch datalog info" << dendl; return ret; @@ -632,20 +642,21 @@ int RGWRemoteDataLog::read_source_log_shards_info(map shard_markers, map *result) { - return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result)); + return run(new RGWListRemoteDataLogCR(&sc, shard_markers, 1, result)); } int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& _sync_module, PerfCounters* counters) { - sync_env.init(dpp, cct, store, _conn, async_rados, &http_manager, _error_logger, - _sync_tracer, _source_zone, _sync_module, counters); + sync_env.init(dpp, cct, store, store->svc(), async_rados, &http_manager, _error_logger, + _sync_tracer, _sync_module, counters); + sc.init(&sync_env, _conn, _source_zone); if (initialized) { return 0; @@ -681,7 +692,11 @@ int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status) } RGWDataSyncEnv sync_env_local = sync_env; sync_env_local.http_manager = &http_manager; - ret = crs.run(new RGWReadDataSyncStatusCoroutine(&sync_env_local, sync_status)); + + RGWDataSyncCtx sc_local = sc; + sc_local.env = &sync_env_local; + + ret = crs.run(new RGWReadDataSyncStatusCoroutine(&sc_local, sync_status)); http_manager.stop(); return ret; } @@ -698,10 +713,15 @@ int RGWRemoteDataLog::read_recovering_shards(const int num_shards, set& rec } RGWDataSyncEnv sync_env_local = sync_env; sync_env_local.http_manager = &http_manager; + + RGWDataSyncCtx sc_local = sc; + sc_local.env = &sync_env_local; + std::vector omapkeys; omapkeys.resize(num_shards); uint64_t max_entries{1}; - ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, omapkeys)); + + ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sc_local, max_entries, num_shards, omapkeys)); http_manager.stop(); if (ret == 0) { @@ -730,7 +750,9 @@ int RGWRemoteDataLog::init_sync_status(int num_shards) RGWDataSyncEnv sync_env_local = sync_env; sync_env_local.http_manager = &http_manager; auto instance_id = ceph::util::generate_random_number(); - ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, instance_id, tn, &sync_status)); + RGWDataSyncCtx sc_local = sc; + sc.env = &sync_env_local; + ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sc, num_shards, instance_id, tn, &sync_status)); http_manager.stop(); return ret; } @@ -775,6 +797,7 @@ struct bucket_instance_meta_info { }; class RGWListBucketIndexesCR : public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw::sal::RGWRadosStore *store; @@ -802,11 +825,11 @@ class RGWListBucketIndexesCR : public RGWCoroutine { read_metadata_list result; public: - RGWListBucketIndexesCR(RGWDataSyncEnv *_sync_env, - rgw_data_sync_status *_sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + RGWListBucketIndexesCR(RGWDataSyncCtx *_sc, + rgw_data_sync_status *_sync_status) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), store(sync_env->store), sync_status(_sync_status), req_ret(0), ret(0), entries_index(NULL), i(0), failed(false), truncated(false) { - oid_prefix = datalog_sync_full_sync_index_prefix + "." + sync_env->source_zone; + oid_prefix = datalog_sync_full_sync_index_prefix + "." + sc->source_zone; path = "/admin/metadata/bucket.instance"; num_shards = sync_status->sync_info.num_shards; } @@ -817,7 +840,7 @@ public: int operate() override { reenter(this) { entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards, - synv_env->svc.zone->get_zone_params().log_pool, + sync_env->svc->zone->get_zone_params().log_pool, oid_prefix); yield; // yield so OmapAppendCRs can start @@ -829,7 +852,7 @@ public: {"marker", result.marker.c_str()}, {NULL, NULL}}; - call(new RGWReadRESTResourceCR(sync_env->cct, sync_env->conn, sync_env->http_manager, + call(new RGWReadRESTResourceCR(sync_env->cct, sc->conn, sync_env->http_manager, entrypoint, pairs, &result)); } if (retcode < 0) { @@ -845,7 +868,7 @@ public: rgw_http_param_pair pairs[] = {{"key", key.c_str()}, {NULL, NULL}}; - call(new RGWReadRESTResourceCR(sync_env->cct, sync_env->conn, sync_env->http_manager, path, pairs, &meta_info)); + call(new RGWReadRESTResourceCR(sync_env->cct, sc->conn, sync_env->http_manager, path, pairs, &meta_info)); } num_shards = meta_info.data.get_bucket_info().num_shards; @@ -854,10 +877,10 @@ public: char buf[16]; snprintf(buf, sizeof(buf), ":%d", i); s = key + buf; - yield entries_index->append(s, synv_env->svc.datalog_rados->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i)); + yield entries_index->append(s, sync_env->svc->datalog_rados->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i)); } } else { - yield entries_index->append(key, synv_env->svc.datalog_rados->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1)); + yield entries_index->append(key, sync_env->svc->datalog_rados->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1)); } } truncated = result.truncated; @@ -873,18 +896,18 @@ public: int shard_id = (int)iter->first; rgw_data_sync_marker& marker = iter->second; marker.total_entries = entries_index->get_total_entries(shard_id); - spawn(new RGWSimpleRadosWriteCR(sync_env->async_rados, synv_env->svc.sysobj, - rgw_raw_obj(synv_env->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)), + spawn(new RGWSimpleRadosWriteCR(sync_env->async_rados, sync_env->svc->sysobj, + rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)), marker), true); } } else { - yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "", + yield call(sync_env->error_logger->log_error_cr(sc->conn->get_remote_id(), "data.init", "", EIO, string("failed to build bucket instances map"))); } while (collect(&ret, NULL)) { if (ret < 0) { - yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "", + yield call(sync_env->error_logger->log_error_cr(sc->conn->get_remote_id(), "data.init", "", -ret, string("failed to store sync status: ") + cpp_strerror(-ret))); req_ret = ret; } @@ -904,6 +927,7 @@ public: #define DATA_SYNC_UPDATE_MARKER_WINDOW 1 class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; string marker_oid; @@ -925,11 +949,11 @@ class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrackenv), marker_oid(_marker_oid), sync_marker(_marker), tn(_tn) {} @@ -940,10 +964,9 @@ public: sync_marker.timestamp = timestamp; tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker)); - RGWRados *rados = sync_env->store->getRados(); - return new RGWSimpleRadosWriteCR(sync_env->async_rados, synv_env->svc.sysobj, - rgw_raw_obj(synv_env->svc.zone->get_zone_params().log_pool, marker_oid), + return new RGWSimpleRadosWriteCR(sync_env->async_rados, sync_env->svc->sysobj, + rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, marker_oid), sync_marker); } @@ -1013,6 +1036,7 @@ std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) { } class RGWRunBucketSyncCoroutine : public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw_bucket_sync_pipe sync_pipe; rgw_bucket_shard_sync_info sync_status; @@ -1026,9 +1050,9 @@ class RGWRunBucketSyncCoroutine : public RGWCoroutine { RGWSyncTraceNodeRef tn; public: - RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs, const RGWSyncTraceNodeRef& _tn_parent) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), - status_oid(RGWBucketPipeSyncStatusManager::status_oid(sync_env->source_zone, sync_pipe)), + RGWRunBucketSyncCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_shard& bs, const RGWSyncTraceNodeRef& _tn_parent) + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), + status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pipe)), tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket", SSTR(bucket_shard_str{bs}))) { sync_pipe.source_bs = bs; @@ -1043,6 +1067,7 @@ public: }; class RGWDataSyncSingleEntryCR : public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; string raw_key; @@ -1063,15 +1088,15 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine { RGWSyncTraceNodeRef tn; public: - RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env, + RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker, - RGWOmapAppend *_error_repo, bool _remove_from_repo, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), + RGWOmapAppend *_error_repo, bool _remove_from_repo, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->cct), + sc(_sc), sync_env(_sc->env), raw_key(_raw_key), entry_marker(_entry_marker), sync_status(0), marker_tracker(_marker_tracker), error_repo(_error_repo), remove_from_repo(_remove_from_repo) { - set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker; + set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker; tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", raw_key); } @@ -1088,7 +1113,7 @@ public: marker_tracker->reset_need_retry(raw_key); } tn->log(0, SSTR("triggering sync of bucket/shard " << bucket_shard_str{bs})); - call(new RGWRunBucketSyncCoroutine(sync_env, bs, tn)); + call(new RGWRunBucketSyncCoroutine(sc, bs, tn)); } } while (marker_tracker && marker_tracker->need_retry(raw_key)); @@ -1105,7 +1130,7 @@ public: if (sync_status < 0) { // write actual sync failures for 'radosgw-admin sync error list' if (sync_status != -EBUSY && sync_status != -EAGAIN) { - yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key, + yield call(sync_env->error_logger->log_error_cr(sc->conn->get_remote_id(), "data", raw_key, -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status))); if (retcode < 0) { tn->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode)); @@ -1143,6 +1168,7 @@ public: #define DATA_SYNC_MAX_ERR_ENTRIES 10 class RGWDataSyncShardCR : public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw_pool pool; @@ -1201,12 +1227,12 @@ class RGWDataSyncShardCR : public RGWCoroutine { RGWSyncTraceNodeRef tn; public: - RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env, + RGWDataSyncShardCR(RGWDataSyncCtx *_sc, rgw_pool& _pool, uint32_t _shard_id, rgw_data_sync_marker& _marker, RGWSyncTraceNodeRef& _tn, - bool *_reset_backoff) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), + bool *_reset_backoff) : RGWCoroutine(_sc->cct), + sc(_sc), sync_env(_sc->env), pool(_pool), shard_id(_shard_id), sync_marker(_marker), @@ -1214,8 +1240,8 @@ public: total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL), lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES), retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT), tn(_tn) { - set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id; - status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id); + set_description() << "data sync shard source_zone=" << sc->source_zone << " shard_id=" << shard_id; + status_oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id); error_oid = status_oid + ".retry"; } @@ -1277,7 +1303,7 @@ public: } auto store = sync_env->store; lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store, - rgw_raw_obj(synv_env->svc.zone->get_zone_params().log_pool, status_oid), + rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, status_oid), lock_name, lock_duration, this)); lease_stack.reset(spawn(lease_cr.get(), false)); } @@ -1299,8 +1325,8 @@ public: yield; } tn->log(10, "took lease"); - oid = full_data_sync_index_shard_oid(sync_env->source_zone, shard_id); - set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker, tn)); + oid = full_data_sync_index_shard_oid(sc->source_zone, shard_id); + set_marker_tracker(new RGWDataSyncShardMarkerTrack(sc, status_oid, sync_marker, tn)); total_entries = sync_marker.pos; do { if (!lease_cr->is_locked()) { @@ -1330,7 +1356,7 @@ public: tn->log(0, SSTR("ERROR: cannot start syncing " << *iter << ". Duplicate entry?")); } else { // fetch remote and write locally - yield spawn(new RGWDataSyncSingleEntryCR(sync_env, *iter, *iter, marker_tracker, error_repo, false, tn), false); + yield spawn(new RGWDataSyncSingleEntryCR(sc, *iter, *iter, marker_tracker, error_repo, false, tn), false); } sync_marker.marker = *iter; @@ -1357,9 +1383,8 @@ public: sync_marker.state = rgw_data_sync_marker::IncrementalSync; sync_marker.marker = sync_marker.next_step_marker; sync_marker.next_step_marker.clear(); - RGWRados *store = sync_env->store; - call(new RGWSimpleRadosWriteCR(sync_env->async_rados, synv_env->svc.sysobj, - rgw_raw_obj(synv_env->svc.zone->get_zone_params().log_pool, status_oid), + call(new RGWSimpleRadosWriteCR(sync_env->async_rados, sync_env->svc->sysobj, + rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, status_oid), sync_marker)); } if (retcode < 0) { @@ -1398,7 +1423,7 @@ public: 1 /* no buffer */); error_repo->get(); spawn(error_repo, false); - set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker, tn)); + set_marker_tracker(new RGWDataSyncShardMarkerTrack(sc, status_oid, sync_marker, tn)); do { if (!lease_cr->is_locked()) { stop_spawned_services(); @@ -1417,7 +1442,7 @@ public: for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) { yield { tn->log(20, SSTR("received async update notification: " << *modified_iter)); - spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo, false, tn), false); + spawn(new RGWDataSyncSingleEntryCR(sc, *modified_iter, string(), marker_tracker, error_repo, false, tn), false); } } @@ -1432,7 +1457,7 @@ public: for (; iter != error_entries.end(); ++iter) { error_marker = *iter; tn->log(20, SSTR("handle error entry: " << error_marker)); - spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false); + spawn(new RGWDataSyncSingleEntryCR(sc, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false); } if (!omapkeys->more) { if (error_marker.empty() && error_entries.empty()) { @@ -1451,7 +1476,7 @@ public: omapkeys.reset(); tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker)); - yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, sync_marker.marker, + yield call(new RGWReadRemoteDataLogShardCR(sc, shard_id, sync_marker.marker, &next_marker, &log_entries, &truncated)); if (retcode < 0 && retcode != -ENOENT) { tn->log(0, SSTR("ERROR: failed to read remote data log info: ret=" << retcode)); @@ -1474,7 +1499,7 @@ public: if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) { tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?")); } else { - spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false, tn), false); + spawn(new RGWDataSyncSingleEntryCR(sc, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false, tn), false); } while ((int)num_spawned() > spawn_window) { set_status() << "num_spawned() > spawn_window"; @@ -1534,6 +1559,7 @@ public: }; class RGWDataSyncShardControlCR : public RGWBackoffControlCR { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw_pool pool; @@ -1543,10 +1569,10 @@ class RGWDataSyncShardControlCR : public RGWBackoffControlCR { RGWSyncTraceNodeRef tn; public: - RGWDataSyncShardControlCR(RGWDataSyncEnv *_sync_env, const rgw_pool& _pool, + RGWDataSyncShardControlCR(RGWDataSyncCtx *_sc, const rgw_pool& _pool, uint32_t _shard_id, rgw_data_sync_marker& _marker, - RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sync_env->cct, false), - sync_env(_sync_env), + RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sc->cct, false), + sc(_sc), sync_env(_sc->env), pool(_pool), shard_id(_shard_id), sync_marker(_marker) { @@ -1554,13 +1580,12 @@ public: } RGWCoroutine *alloc_cr() override { - return new RGWDataSyncShardCR(sync_env, pool, shard_id, sync_marker, tn, backoff_ptr()); + return new RGWDataSyncShardCR(sc, pool, shard_id, sync_marker, tn, backoff_ptr()); } RGWCoroutine *alloc_finisher_cr() override { - RGWRados *store = sync_env->store; - return new RGWSimpleRadosReadCR(sync_env->async_rados, synv_env->svc.sysobj, - rgw_raw_obj(synv_env->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)), + return new RGWSimpleRadosReadCR(sync_env->async_rados, sync_env->svc->sysobj, + rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)), &sync_marker); } @@ -1577,6 +1602,7 @@ public: }; class RGWDataSyncCR : public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; uint32_t num_shards; @@ -1594,8 +1620,8 @@ class RGWDataSyncCR : public RGWCoroutine { RGWDataSyncModule *data_sync_module{nullptr}; public: - RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, RGWSyncTraceNodeRef& _tn, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), + RGWDataSyncCR(RGWDataSyncCtx *_sc, uint32_t _num_shards, RGWSyncTraceNodeRef& _tn, bool *_reset_backoff) : RGWCoroutine(_sc->cct), + sc(_sc), sync_env(_sc->env), num_shards(_num_shards), marker_tracker(NULL), reset_backoff(_reset_backoff), tn(_tn) { @@ -1612,7 +1638,7 @@ public: reenter(this) { /* read sync status */ - yield call(new RGWReadDataSyncStatusCoroutine(sync_env, &sync_status)); + yield call(new RGWReadDataSyncStatusCoroutine(sc, &sync_status)); data_sync_module = sync_env->sync_module->get_data_handler(); @@ -1627,7 +1653,7 @@ public: sync_status.sync_info.num_shards = num_shards; uint64_t instance_id; instance_id = ceph::util::generate_random_number(); - yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, instance_id, tn, &sync_status)); + yield call(new RGWInitDataSyncStatusCoroutine(sc, num_shards, instance_id, tn, &sync_status)); if (retcode < 0) { tn->log(0, SSTR("ERROR: failed to init sync, retcode=" << retcode)); return set_cr_error(retcode); @@ -1637,19 +1663,19 @@ public: *reset_backoff = true; } - data_sync_module->init(sync_env, sync_status.sync_info.instance_id); + data_sync_module->init(sc, sync_status.sync_info.instance_id); if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) { tn->log(10, SSTR("building full sync maps")); /* call sync module init here */ sync_status.sync_info.num_shards = num_shards; - yield call(data_sync_module->init_sync(sync_env)); + yield call(data_sync_module->init_sync(sc)); if (retcode < 0) { tn->log(0, SSTR("ERROR: sync module init_sync() failed, retcode=" << retcode)); return set_cr_error(retcode); } /* state: building full sync maps */ - yield call(new RGWListBucketIndexesCR(sync_env, &sync_status)); + yield call(new RGWListBucketIndexesCR(sc, &sync_status)); if (retcode < 0) { tn->log(0, SSTR("ERROR: failed to build full sync maps, retcode=" << retcode)); return set_cr_error(retcode); @@ -1666,7 +1692,7 @@ public: *reset_backoff = true; } - yield call(data_sync_module->start_sync(sync_env)); + yield call(data_sync_module->start_sync(sc)); if (retcode < 0) { tn->log(0, SSTR("ERROR: failed to start sync, retcode=" << retcode)); return set_cr_error(retcode); @@ -1677,7 +1703,7 @@ public: tn->log(10, SSTR("spawning " << num_shards << " shards sync")); for (map::iterator iter = sync_status.sync_markers.begin(); iter != sync_status.sync_markers.end(); ++iter) { - RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->synv_env->svc.zone->get_zone_params().log_pool, + RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sc, sync_env->svc->zone->get_zone_params().log_pool, iter->first, iter->second, tn); cr->get(); shard_crs_lock.lock(); @@ -1694,8 +1720,8 @@ public: } RGWCoroutine *set_sync_info_cr() { - return new RGWSimpleRadosWriteCR(sync_env->async_rados, synv_env->svc.sysobj, - rgw_raw_obj(synv_env->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)), + return new RGWSimpleRadosWriteCR(sync_env->async_rados, sync_env->svc->sysobj, + rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sc->source_zone)), sync_status.sync_info); } @@ -1714,9 +1740,9 @@ class RGWDefaultDataSyncModule : public RGWDataSyncModule { public: RGWDefaultDataSyncModule() {} - RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override; - RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override; - RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, + RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override; + RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override; + RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override; }; @@ -1738,26 +1764,29 @@ int RGWDefaultSyncModule::create_instance(CephContext *cct, const JSONFormattabl return 0; } -RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) +RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) { - return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, sync_pipe.source_bs.bucket, + auto sync_env = sc->env; + return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, sync_pipe.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info, key, std::nullopt, versioned_epoch, true, zones_trace, sync_env->counters, sync_env->dpp); } -RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, +RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) { - return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, + auto sync_env = sc->env; + return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, sync_pipe.dest_bucket_info, key, versioned, versioned_epoch, NULL, NULL, false, &mtime, zones_trace); } -RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, +RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) { - return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, + auto sync_env = sc->env; + return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, sync_pipe.dest_bucket_info, key, versioned, versioned_epoch, &owner.id, &owner.display_name, true, &mtime, zones_trace); } @@ -1766,9 +1795,9 @@ class RGWArchiveDataSyncModule : public RGWDefaultDataSyncModule { public: RGWArchiveDataSyncModule() {} - RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override; - RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override; - RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, + RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override; + RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override; + RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override; }; @@ -1793,16 +1822,17 @@ int RGWArchiveSyncModule::create_instance(CephContext *cct, const JSONFormattabl return 0; } -RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) +RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) { - ldout(sync_env->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; + auto sync_env = sc->env; + ldout(sc->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; if (!sync_pipe.dest_bucket_info.versioned() || (sync_pipe.dest_bucket_info.flags & BUCKET_VERSIONS_SUSPENDED)) { - ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: sync_object: enabling object versioning for archive bucket" << dendl; + ldout(sc->cct, 0) << "SYNC_ARCHIVE: sync_object: enabling object versioning for archive bucket" << dendl; sync_pipe.dest_bucket_info.flags = (sync_pipe.dest_bucket_info.flags & ~BUCKET_VERSIONS_SUSPENDED) | BUCKET_VERSIONED; int op_ret = sync_env->store->getRados()->put_bucket_instance_info(sync_pipe.dest_bucket_info, false, real_time(), NULL); if (op_ret < 0) { - ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: sync_object: error versioning archive bucket" << dendl; + ldout(sc->cct, 0) << "SYNC_ARCHIVE: sync_object: error versioning archive bucket" << dendl; return NULL; } } @@ -1817,31 +1847,33 @@ RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, rg } } - return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, + return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, sync_pipe.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info, key, dest_key, versioned_epoch, true, zones_trace, nullptr, sync_env->dpp); } -RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, +RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) { - ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl; + ldout(sc->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl; return NULL; } -RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, +RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) { - ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime + ldout(sc->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; - return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, + auto sync_env = sc->env; + return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, sync_pipe.dest_bucket_info, key, versioned, versioned_epoch, &owner.id, &owner.display_name, true, &mtime, zones_trace); } class RGWDataSyncControlCR : public RGWBackoffControlCR { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; uint32_t num_shards; @@ -1849,14 +1881,14 @@ class RGWDataSyncControlCR : public RGWBackoffControlCR static constexpr bool exit_on_error = false; // retry on all errors public: - RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, - RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sync_env->cct, exit_on_error), - sync_env(_sync_env), num_shards(_num_shards) { + RGWDataSyncControlCR(RGWDataSyncCtx *_sc, uint32_t _num_shards, + RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sc->cct, exit_on_error), + sc(_sc), sync_env(_sc->env), num_shards(_num_shards) { tn = sync_env->sync_tracer->add_node(_tn_parent, "sync"); } RGWCoroutine *alloc_cr() override { - return new RGWDataSyncCR(sync_env, num_shards, tn, backoff_ptr()); + return new RGWDataSyncCR(sc, num_shards, tn, backoff_ptr()); } void wakeup(int shard_id, set& keys) { @@ -1892,7 +1924,7 @@ void RGWRemoteDataLog::wakeup(int shard_id, set& keys) { int RGWRemoteDataLog::run_sync(int num_shards) { lock.lock(); - data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards, tn); + data_sync_cr = new RGWDataSyncControlCR(&sc, num_shards, tn); data_sync_cr->get(); // run() will drop a ref, so take another lock.unlock(); @@ -1919,22 +1951,22 @@ int RGWDataSyncStatusManager::init() { RGWZone *zone_def; - if (!svc.zone->find_zone_by_id(source_zone, &zone_def)) { + if (!store->svc()->zone->find_zone_by_id(source_zone, &zone_def)) { ldpp_dout(this, 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl; return -EIO; } - if (!svc.sync_modules->get_manager()->supports_data_export(zone_def->tier_type)) { + if (!store->svc()->sync_modules->get_manager()->supports_data_export(zone_def->tier_type)) { return -ENOTSUP; } - const RGWZoneParams& zone_params = svc.zone->get_zone_params(); + const RGWZoneParams& zone_params = store->svc()->zone->get_zone_params(); if (sync_module == nullptr) { sync_module = store->getRados()->get_sync_module(); } - conn = svc.zone->get_zone_conn_by_id(source_zone); + conn = store->svc()->zone->get_zone_conn_by_id(source_zone); if (!conn) { ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl; return -EINVAL; @@ -2021,23 +2053,25 @@ int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn, bs.bucket = bucket; bs.shard_id = shard_id; - sync_env.init(dpp, store->ctx(), store, conn, async_rados, http_manager, - _error_logger, _sync_tracer, source_zone, _sync_module, nullptr); + sync_env.init(dpp, store->ctx(), store, store->svc(), async_rados, http_manager, + _error_logger, _sync_tracer, _sync_module, nullptr); + sc.init(&sync_env, conn, source_zone); return 0; } class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; const string instance_key; rgw_bucket_index_marker_info *info; public: - RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env, + RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& bs, rgw_bucket_index_marker_info *_info) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), instance_key(bs.get_key()), info(_info) {} int operate() override { @@ -2049,7 +2083,7 @@ public: { NULL, NULL } }; string p = "/admin/log/"; - call(new RGWReadRESTResourceCR(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info)); + call(new RGWReadRESTResourceCR(sync_env->cct, sc->conn, sync_env->http_manager, p, pairs, info)); } if (retcode < 0) { return set_cr_error(retcode); @@ -2061,6 +2095,7 @@ public: }; class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; const rgw_bucket_sync_pipe& sync_pipe; @@ -2070,26 +2105,26 @@ class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine { rgw_bucket_index_marker_info info; public: - RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, + RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_sync_pipe& _sync_pipe, rgw_bucket_shard_sync_info& _status) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), sync_pipe(_sync_pipe), - sync_status_oid(RGWBucketPipeSyncStatusManager::status_oid(sync_env->source_zone, _sync_pipe)), + sync_status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, _sync_pipe)), status(_status) {} int operate() override { reenter(this) { /* fetch current position in logs */ - yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, sync_pipe.source_bs, &info)); + yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pipe.source_bs, &info)); if (retcode < 0 && retcode != -ENOENT) { ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl; return set_cr_error(retcode); } yield { auto store = sync_env->store; - rgw_raw_obj obj(sync_env->svc.zone->get_zone_params().log_pool, sync_status_oid); + rgw_raw_obj obj(sync_env->svc->zone->get_zone_params().log_pool, sync_status_oid); if (info.syncstopped) { call(new RGWRadosRemoveCR(store, obj)); @@ -2104,7 +2139,7 @@ public: } map attrs; status.encode_all_attrs(attrs); - call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc.sysobj, obj, attrs)); + call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc->sysobj, obj, attrs)); } } if (info.syncstopped) { @@ -2124,7 +2159,7 @@ RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr() #warning FIXME rgw_bucket_sync_pipe sync_pipe; sync_pipe.source_bs = bs; - return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, sync_pipe, init_status); + return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pipe, init_status); } #define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync." @@ -2187,17 +2222,18 @@ void rgw_bucket_shard_inc_sync_marker::encode_attr(map& attr } class RGWReadBucketPipeSyncStatusCoroutine : public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; string oid; rgw_bucket_shard_sync_info *status; map attrs; public: - RGWReadBucketPipeSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, + RGWReadBucketPipeSyncStatusCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_sync_pipe& sync_pipe, rgw_bucket_shard_sync_info *_status) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), - oid(RGWBucketPipeSyncStatusManager::status_oid(sync_env->source_zone, sync_pipe)), + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), + oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pipe)), status(_status) {} int operate() override; }; @@ -2205,8 +2241,8 @@ public: int RGWReadBucketPipeSyncStatusCoroutine::operate() { reenter(this) { - yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->svc.sysobj, - rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, oid), + yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->svc->sysobj, + rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, oid), &attrs, true)); if (retcode == -ENOENT) { *status = rgw_bucket_shard_sync_info(); @@ -2224,6 +2260,7 @@ int RGWReadBucketPipeSyncStatusCoroutine::operate() #define OMAP_READ_MAX_ENTRIES 10 class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw::sal::RGWRadosStore *store; @@ -2240,13 +2277,13 @@ class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine { int count; public: - RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id, + RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncCtx *_sc, const int _shard_id, set& _recovering_buckets, const int _max_entries) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries), recovering_buckets(_recovering_buckets), max_omap_entries(OMAP_READ_MAX_ENTRIES) { - error_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id) + ".retry"; + error_oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id) + ".retry"; } int operate() override; @@ -2259,7 +2296,7 @@ int RGWReadRecoveringBucketShardsCoroutine::operate() count = 0; do { omapkeys = std::make_shared(); - yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, error_oid), + yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, error_oid), marker, max_omap_entries, omapkeys)); if (retcode == -ENOENT) { @@ -2290,6 +2327,7 @@ int RGWReadRecoveringBucketShardsCoroutine::operate() } class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw::sal::RGWRadosStore *store; @@ -2308,14 +2346,14 @@ class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine { bool truncated; public: - RGWReadPendingBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id, + RGWReadPendingBucketShardsCoroutine(RGWDataSyncCtx *_sc, const int _shard_id, set& _pending_buckets, rgw_data_sync_marker* _sync_marker, const int _max_entries) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries), pending_buckets(_pending_buckets), sync_marker(_sync_marker) { - status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id); + status_oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id); } int operate() override; @@ -2326,8 +2364,8 @@ int RGWReadPendingBucketShardsCoroutine::operate() reenter(this){ //read sync status marker using CR = RGWSimpleRadosReadCR; - yield call(new CR(sync_env->async_rados, sync_env->svc.sysobj, - rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, status_oid), + yield call(new CR(sync_env->async_rados, sync_env->svc->sysobj, + rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, status_oid), sync_marker)); if (retcode < 0) { ldout(sync_env->cct,0) << "failed to read sync status marker with " @@ -2339,7 +2377,7 @@ int RGWReadPendingBucketShardsCoroutine::operate() marker = sync_marker->marker; count = 0; do{ - yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, marker, + yield call(new RGWReadRemoteDataLogShardCR(sc, shard_id, marker, &next_marker, &log_entries, &truncated)); if (retcode == -ENOENT) { @@ -2380,12 +2418,14 @@ int RGWRemoteDataLog::read_shard_status(int shard_id, set& pending_bucke } RGWDataSyncEnv sync_env_local = sync_env; sync_env_local.http_manager = &http_manager; + RGWDataSyncCtx sc_local = sc; + sc_local.env = &sync_env_local; list stacks; RGWCoroutinesStack* recovering_stack = new RGWCoroutinesStack(store->ctx(), &crs); - recovering_stack->call(new RGWReadRecoveringBucketShardsCoroutine(&sync_env_local, shard_id, recovering_buckets, max_entries)); + recovering_stack->call(new RGWReadRecoveringBucketShardsCoroutine(&sc_local, shard_id, recovering_buckets, max_entries)); stacks.push_back(recovering_stack); RGWCoroutinesStack* pending_stack = new RGWCoroutinesStack(store->ctx(), &crs); - pending_stack->call(new RGWReadPendingBucketShardsCoroutine(&sync_env_local, shard_id, pending_buckets, sync_marker, max_entries)); + pending_stack->call(new RGWReadPendingBucketShardsCoroutine(&sc_local, shard_id, pending_buckets, sync_marker, max_entries)); stacks.push_back(pending_stack); ret = crs.run(stacks); http_manager.stop(); @@ -2397,7 +2437,7 @@ RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info #warning FIXME rgw_bucket_sync_pipe sync_pipe; sync_pipe.source_bs = bs; - return new RGWReadBucketPipeSyncStatusCoroutine(&sync_env, sync_pipe, sync_status); + return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pipe, sync_status); } RGWBucketPipeSyncStatusManager::RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store, const string& _source_zone, @@ -2505,6 +2545,7 @@ struct bucket_list_result { }; class RGWListBucketShardCR: public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; const rgw_bucket_shard& bs; const string instance_key; @@ -2513,9 +2554,9 @@ class RGWListBucketShardCR: public RGWCoroutine { bucket_list_result *result; public: - RGWListBucketShardCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs, + RGWListBucketShardCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& bs, rgw_obj_key& _marker_position, bucket_list_result *_result) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs), + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), bs(bs), instance_key(bs.get_key()), marker_position(_marker_position), result(_result) {} @@ -2531,7 +2572,7 @@ public: { NULL, NULL } }; // don't include tenant in the url, it's already part of instance_key string p = string("/") + bs.bucket.name; - call(new RGWReadRESTResourceCR(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, result)); + call(new RGWReadRESTResourceCR(sync_env->cct, sc->conn, sync_env->http_manager, p, pairs, result)); } if (retcode < 0) { return set_cr_error(retcode); @@ -2543,6 +2584,7 @@ public: }; class RGWListBucketIndexLogCR: public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; const string instance_key; string marker; @@ -2551,9 +2593,9 @@ class RGWListBucketIndexLogCR: public RGWCoroutine { std::optional timer; public: - RGWListBucketIndexLogCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs, + RGWListBucketIndexLogCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& bs, string& _marker, list *_result) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), instance_key(bs.get_key()), marker(_marker), result(_result) {} int operate() override { @@ -2568,7 +2610,7 @@ public: { "type", "bucket-index" }, { NULL, NULL } }; - call(new RGWReadRESTResourceCR >(sync_env->cct, sync_env->conn, sync_env->http_manager, "/admin/log", pairs, result)); + call(new RGWReadRESTResourceCR >(sync_env->cct, sc->conn, sync_env->http_manager, "/admin/log", pairs, result)); } timer.reset(); if (retcode < 0) { @@ -2586,6 +2628,7 @@ public: #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10 class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; string marker_oid; @@ -2594,10 +2637,10 @@ class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrackenv), marker_oid(_marker_oid), sync_marker(_marker) {} @@ -2612,11 +2655,9 @@ public: map attrs; sync_marker.encode_attr(attrs); - RGWRados *rados = sync_env->store->getRados(); - tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker)); - return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc.sysobj, - rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, marker_oid), + return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc->sysobj, + rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, marker_oid), attrs); } @@ -2626,6 +2667,7 @@ public: }; class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; string marker_oid; @@ -2657,10 +2699,10 @@ class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrackenv), marker_oid(_marker_oid), sync_marker(_marker) {} @@ -2676,8 +2718,8 @@ public: tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker)); return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, - sync_env->svc.sysobj, - rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, marker_oid), + sync_env->svc->sysobj, + rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, marker_oid), attrs); } @@ -2720,6 +2762,7 @@ public: template class RGWBucketSyncSingleEntryCR : public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw_bucket_sync_pipe& sync_pipe; @@ -2748,7 +2791,7 @@ class RGWBucketSyncSingleEntryCR : public RGWCoroutine { RGWSyncTraceNodeRef tn; public: - RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env, + RGWBucketSyncSingleEntryCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, const rgw_obj_key& _key, bool _versioned, std::optional _versioned_epoch, @@ -2756,8 +2799,8 @@ public: const rgw_bucket_entry_owner& _owner, RGWModifyOp _op, RGWPendingState _op_state, const T& _entry_marker, RGWSyncShardMarkerTrack *_marker_tracker, rgw_zone_set& _zones_trace, - RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), + RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->cct), + sc(_sc), sync_env(_sc->env), sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs), key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch), owner(_owner), @@ -2768,18 +2811,18 @@ public: sync_status(0){ stringstream ss; ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch.value_or(0) << "]"; - set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state; + set_description() << "bucket sync single entry (source_zone=" << sc->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state; set_status("init"); tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", SSTR(key)); - tn->log(20, SSTR("bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state)); + tn->log(20, SSTR("bucket sync single entry (source_zone=" << sc->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state)); error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0); data_sync_module = sync_env->sync_module->get_data_handler(); zones_trace = _zones_trace; - zones_trace.insert(sync_env->svc.zone->get_zone().id); + zones_trace.insert(sync_env->svc->zone->get_zone().id); } int operate() override { @@ -2805,23 +2848,23 @@ public: } else if (op == CLS_RGW_OP_ADD || op == CLS_RGW_OP_LINK_OLH) { set_status("syncing obj"); - tn->log(5, SSTR("bucket sync: sync obj: " << sync_env->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]")); - call(data_sync_module->sync_object(sync_env, sync_pipe, key, versioned_epoch, &zones_trace)); + tn->log(5, SSTR("bucket sync: sync obj: " << sc->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]")); + call(data_sync_module->sync_object(sc, sync_pipe, key, versioned_epoch, &zones_trace)); } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) { set_status("removing obj"); if (op == CLS_RGW_OP_UNLINK_INSTANCE) { versioned = true; } - tn->log(10, SSTR("removing obj: " << sync_env->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]")); - call(data_sync_module->remove_object(sync_env, sync_pipe, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace)); + tn->log(10, SSTR("removing obj: " << sc->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]")); + call(data_sync_module->remove_object(sc, sync_pipe, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace)); // our copy of the object is more recent, continue as if it succeeded if (retcode == -ERR_PRECONDITION_FAILED) { retcode = 0; } } else if (op == CLS_RGW_OP_LINK_OLH_DM) { set_status("creating delete marker"); - tn->log(10, SSTR("creating delete marker: obj: " << sync_env->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]")); - call(data_sync_module->create_delete_marker(sync_env, sync_pipe, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace)); + tn->log(10, SSTR("creating delete marker: obj: " << sc->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]")); + call(data_sync_module->create_delete_marker(sc, sync_pipe, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace)); } tn->set_resource_name(SSTR(bucket_str_noinstance(bs.bucket) << "/" << key)); } @@ -2843,7 +2886,7 @@ public: sync_status = retcode; } if (!error_ss.str().empty()) { - yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, string("failed to sync object") + cpp_strerror(-sync_status))); + yield call(sync_env->error_logger->log_error_cr(sc->conn->get_remote_id(), "data", error_ss.str(), -retcode, string("failed to sync object") + cpp_strerror(-sync_status))); } done: if (sync_status == 0) { @@ -2864,6 +2907,7 @@ done: #define BUCKET_SYNC_SPAWN_WINDOW 20 class RGWBucketShardFullSyncCR : public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw_bucket_sync_pipe& sync_pipe; rgw_bucket_shard& bs; @@ -2885,20 +2929,20 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine { RGWSyncTraceNodeRef tn; public: - RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, + RGWBucketShardFullSyncCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, const std::string& status_oid, RGWContinuousLeaseCR *lease_cr, rgw_bucket_shard_sync_info& sync_info, RGWSyncTraceNodeRef tn_parent) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs), lease_cr(lease_cr), sync_info(sync_info), - marker_tracker(sync_env, status_oid, sync_info.full_marker), + marker_tracker(sc, status_oid, sync_info.full_marker), status_oid(status_oid), tn(sync_env->sync_tracer->add_node(tn_parent, "full_sync", SSTR(bucket_shard_str{bs}))) { - zones_trace.insert(sync_env->source_zone); + zones_trace.insert(sc->source_zone); marker_tracker.set_tn(tn); } @@ -2919,7 +2963,7 @@ int RGWBucketShardFullSyncCR::operate() } set_status("listing remote bucket"); tn->log(20, "listing bucket for full sync"); - yield call(new RGWListBucketShardCR(sync_env, bs, list_marker, + yield call(new RGWListBucketShardCR(sc, bs, list_marker, &list_result)); if (retcode < 0 && retcode != -ENOENT) { set_status("failed bucket listing, going down"); @@ -2944,7 +2988,7 @@ int RGWBucketShardFullSyncCR::operate() tn->log(0, SSTR("ERROR: cannot start syncing " << entry->key << ". Duplicate entry?")); } else { using SyncCR = RGWBucketSyncSingleEntryCR; - yield spawn(new SyncCR(sync_env, sync_pipe, entry->key, + yield spawn(new SyncCR(sc, sync_pipe, entry->key, false, /* versioned, only matters for object removal */ entry->versioned_epoch, entry->mtime, entry->owner, entry->get_modify_op(), CLS_RGW_STATE_COMPLETE, @@ -2989,9 +3033,8 @@ int RGWBucketShardFullSyncCR::operate() sync_info.state = rgw_bucket_shard_sync_info::StateIncrementalSync; map attrs; sync_info.encode_state_attr(attrs); - RGWRados *store = sync_env->store; - call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc.sysobj, - rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, status_oid), + call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc->sysobj, + rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, status_oid), attrs)); } } else { @@ -3015,6 +3058,7 @@ static bool has_olh_epoch(RGWModifyOp op) { } class RGWBucketShardIncrementalSyncCR : public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw_bucket_sync_pipe& sync_pipe; rgw_bucket_shard& bs; @@ -3037,17 +3081,17 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine { RGWSyncTraceNodeRef tn; public: - RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env, + RGWBucketShardIncrementalSyncCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, const std::string& status_oid, RGWContinuousLeaseCR *lease_cr, rgw_bucket_shard_sync_info& sync_info, RGWSyncTraceNodeRef& _tn_parent) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs), lease_cr(lease_cr), sync_info(sync_info), - marker_tracker(sync_env, status_oid, sync_info.inc_marker), - status_oid(status_oid), zone_id(_sync_env->svc.zone->get_zone().id), + marker_tracker(sc, status_oid, sync_info.inc_marker), + status_oid(status_oid), zone_id(sync_env->svc->zone->get_zone().id), tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync", SSTR(bucket_shard_str{bs}))) { @@ -3072,7 +3116,7 @@ int RGWBucketShardIncrementalSyncCR::operate() } tn->log(20, SSTR("listing bilog for incremental sync" << sync_info.inc_marker.position)); set_status() << "listing bilog; position=" << sync_info.inc_marker.position; - yield call(new RGWListBucketIndexLogCR(sync_env, bs, sync_info.inc_marker.position, + yield call(new RGWListBucketIndexLogCR(sc, bs, sync_info.inc_marker.position, &list_result)); if (retcode < 0 && retcode != -ENOENT) { /* wait for all operations to complete */ @@ -3225,7 +3269,7 @@ int RGWBucketShardIncrementalSyncCR::operate() } tn->log(20, SSTR("entry->timestamp=" << entry->timestamp)); using SyncCR = RGWBucketSyncSingleEntryCR; - spawn(new SyncCR(sync_env, sync_pipe, key, + spawn(new SyncCR(sc, sync_pipe, key, entry->is_versioned(), versioned_epoch, entry->timestamp, owner, entry->op, entry->state, cur_id, &marker_tracker, entry->zones_trace, tn), @@ -3287,8 +3331,9 @@ int RGWBucketShardIncrementalSyncCR::operate() return 0; } - +#if 0 class RGWRunBucketSourcesSyncCR : public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw_bucket bucket; rgw_sync_source source; @@ -3302,9 +3347,9 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine { RGWSyncTraceNodeRef tn; public: - RGWRunBucketSourcesSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket& bucket, const RGWSyncTraceNodeRef& _tn_parent) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bucket(_bucket), - status_oid(RGWBucketPipeSyncStatusManager::status_oid(sync_env->source_zone, bucket)), + RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc, const rgw_bucket& bucket, const RGWSyncTraceNodeRef& _tn_parent) + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), bucket(_bucket), + status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, bucket)), tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_source", SSTR(bucket_shard_str{bs}))) { } @@ -3324,7 +3369,7 @@ int RGWRunBucketSourcesSyncCR::operate() set_status("acquiring sync lock"); auto store = sync_env->store; lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store, - rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid), + rgw_raw_obj(store->svc->zone->get_zone_params().log_pool, status_oid), "sync_lock", cct->_conf->rgw_sync_lease_period, this)); @@ -3341,7 +3386,7 @@ int RGWRunBucketSourcesSyncCR::operate() } tn->log(10, "took lease"); - yield call(new RGWReadBucketSourcesInfoCR(sync_env, bs.bucket, &info)); + yield call(new RGWReadBucketSourcesInfoCR(sc, bs.bucket, &info)); if (retcode < 0 && retcode != -ENOENT) { tn->log(0, "ERROR: failed to read sync status for bucket"); lease_cr->go_down(); @@ -3357,7 +3402,7 @@ int RGWRunBucketSourcesSyncCR::operate() yield { for (auto pipe : info.pipes) { - spawn(new RGWRunBucketSyncCoroutine(sync_env, pipe, &tn)); + spawn(new RGWRunBucketSyncCoroutine(sc, pipe, &tn)); } } @@ -3377,7 +3422,7 @@ int RGWRunBucketSyncCoroutine::operate() set_status("acquiring sync lock"); auto store = sync_env->store; lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store, - rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, status_oid), + rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, status_oid), "sync_lock", cct->_conf->rgw_sync_lease_period, this)); @@ -3395,7 +3440,7 @@ int RGWRunBucketSyncCoroutine::operate() } tn->log(10, "took lease"); - yield call(new RGWReadBucketPipeSyncStatusCoroutine(sync_env, sync_pipe, &sync_status)); + yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pipe, &sync_status)); if (retcode < 0 && retcode != -ENOENT) { tn->log(0, "ERROR: failed to read sync status for bucket"); lease_cr->go_down(); @@ -3412,7 +3457,7 @@ int RGWRunBucketSyncCoroutine::operate() tn->log(10, SSTR("no local info for bucket:" << ": fetching metadata")); string raw_key = string("bucket.instance:") + sync_pipe.source_bs.bucket.get_key(); - meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->svc.zone->get_master_conn(), sync_env->async_rados, + meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->svc->zone->get_master_conn(), sync_env->async_rados, sync_env->http_manager, sync_env->error_logger, sync_env->sync_tracer); call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key, @@ -3439,7 +3484,7 @@ int RGWRunBucketSyncCoroutine::operate() do { if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) { - yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, sync_pipe, sync_status)); + yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pipe, sync_status)); if (retcode == -ENOENT) { tn->log(0, "bucket sync disabled"); lease_cr->abort(); // deleted lease object, abort/wakeup instead of unlock @@ -3457,7 +3502,7 @@ int RGWRunBucketSyncCoroutine::operate() } if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) { - yield call(new RGWBucketShardFullSyncCR(sync_env, sync_pipe, + yield call(new RGWBucketShardFullSyncCR(sc, sync_pipe, status_oid, lease_cr.get(), sync_status, tn)); if (retcode < 0) { @@ -3469,7 +3514,7 @@ int RGWRunBucketSyncCoroutine::operate() } if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) { - yield call(new RGWBucketShardIncrementalSyncCR(sync_env, sync_pipe, + yield call(new RGWBucketShardIncrementalSyncCR(sc, sync_pipe, status_oid, lease_cr.get(), sync_status, tn)); if (retcode < 0) { @@ -3492,12 +3537,12 @@ int RGWRunBucketSyncCoroutine::operate() RGWCoroutine *RGWRemoteBucketLog::run_sync_cr() { - return new RGWRunBucketSyncCoroutine(&sync_env, bs, sync_env.sync_tracer->root_node); + return new RGWRunBucketSyncCoroutine(&sc, bs, sync_env.sync_tracer->root_node); } int RGWBucketPipeSyncStatusManager::init() { - conn = sync_env->svc.zone->get_zone_conn_by_id(source_zone); + conn = store->svc()->zone->get_zone_conn_by_id(source_zone); if (!conn) { ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl; return -EINVAL; @@ -3533,7 +3578,7 @@ int RGWBucketPipeSyncStatusManager::init() int effective_num_shards = (num_shards ? num_shards : 1); - auto async_rados = sync_env->svc.rados->get_async_processor(); + auto async_rados = store->svc.rados->get_async_processor(); for (int i = 0; i < effective_num_shards; i++) { RGWRemoteBucketLog *l = new RGWRemoteBucketLog(this, store, async_rados, &http_manager); @@ -3636,6 +3681,7 @@ string RGWBucketPipeSyncStatusManager::obj_status_oid(const string& source_zone, class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR { static constexpr int max_concurrent_shards = 16; rgw::sal::RGWRadosStore *const store; + RGWDataSyncCtx *const sc; RGWDataSyncEnv *const env; const int num_shards; rgw_bucket_shard bs; @@ -3646,11 +3692,11 @@ class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR { Vector::iterator i, end; public: - RGWCollectBucketSyncStatusCR(rgw::sal::RGWRadosStore *store, RGWDataSyncEnv *env, + RGWCollectBucketSyncStatusCR(rgw::sal::RGWRadosStore *store, RGWDataSyncCtx *sc, int num_shards, const rgw_bucket& bucket, Vector *status) - : RGWShardCollectCR(env->cct, max_concurrent_shards), - store(store), env(env), num_shards(num_shards), + : RGWShardCollectCR(sc->cct, max_concurrent_shards), + store(store), sc(sc), env(sc->env), num_shards(num_shards), bs(bucket, num_shards > 0 ? 0 : -1), // start at shard 0 or -1 i(status->begin()), end(status->end()) {} @@ -3660,7 +3706,7 @@ class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR { return false; } sync_pipe.source_bs = bs; - spawn(new RGWReadBucketPipeSyncStatusCoroutine(env, sync_pipe, &*i), false); + spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pipe, &*i), false); ++i; ++bs.shard_id; return true; @@ -3677,10 +3723,13 @@ int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStor RGWDataSyncEnv env; RGWSyncModuleInstanceRef module; // null sync module - env.init(dpp, store->ctx(), store, nullptr, sync_env->svc.rados->get_async_processor(), - nullptr, nullptr, nullptr, source_zone, module, nullptr); + env.init(dpp, store->ctx(), store, store->svc(), store->svc()->rados->get_async_processor(), + nullptr, nullptr, nullptr, module, nullptr); + + RGWDataSyncCtx sc; + sc.init(&env, nullptr, source_zone); RGWCoroutinesManager crs(store->ctx(), store->getRados()->get_cr_registry()); - return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, num_shards, + return crs.run(new RGWCollectBucketSyncStatusCR(store, &sc, num_shards, bucket_info.bucket, status)); } diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index fa589ddce30e..56cae033562b 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -413,39 +413,35 @@ WRITE_CLASS_ENCODER(rgw_sync_policy_info) class RGWSyncErrorLogger; class RGWRESTConn; +class RGWServices; struct RGWDataSyncEnv { const DoutPrefixProvider *dpp{nullptr}; CephContext *cct{nullptr}; rgw::sal::RGWRadosStore *store{nullptr}; RGWServices *svc{nullptr}; - RGWRESTConn *conn{nullptr}; RGWAsyncRadosProcessor *async_rados{nullptr}; RGWHTTPManager *http_manager{nullptr}; RGWSyncErrorLogger *error_logger{nullptr}; RGWSyncTraceManager *sync_tracer{nullptr}; - string source_zone; RGWSyncModuleInstanceRef sync_module{nullptr}; PerfCounters* counters{nullptr}; RGWDataSyncEnv() {} - void init(const DoutPrefixProvider *_dpp, CephContext *_cct, rgw::sal::RGWRadosStore *_store, - RGWRESTConn *_conn, RGWServices *_svc, + void init(const DoutPrefixProvider *_dpp, CephContext *_cct, rgw::sal::RGWRadosStore *_store, RGWServices *_svc, RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager, RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer, - const string& _source_zone, RGWSyncModuleInstanceRef& _sync_module, + RGWSyncModuleInstanceRef& _sync_module, PerfCounters* _counters) { dpp = _dpp; cct = _cct; store = _store; svc = _svc; - conn = _conn; async_rados = _async_rados; http_manager = _http_manager; error_logger = _error_logger; sync_tracer = _sync_tracer; - source_zone = _source_zone; sync_module = _sync_module; counters = _counters; } @@ -454,16 +450,36 @@ struct RGWDataSyncEnv { string status_oid(); }; +struct RGWDataSyncCtx { + CephContext *cct{nullptr}; + RGWDataSyncEnv *env{nullptr}; + + RGWRESTConn *conn{nullptr}; + string source_zone; + + void init(RGWDataSyncEnv *_env, + RGWRESTConn *_conn, + const string& _source_zone) { + cct = _env->cct; + env = _env; + conn = _conn; + source_zone = _source_zone; + } +}; + class RGWRados; class RGWDataChangesLogInfo; class RGWRemoteDataLog : public RGWCoroutinesManager { const DoutPrefixProvider *dpp; + rgw::sal::RGWRadosStore *store; CephContext *cct; + RGWCoroutinesManagerRegistry *cr_registry; RGWAsyncRadosProcessor *async_rados; RGWHTTPManager http_manager; RGWDataSyncEnv sync_env; + RGWDataSyncCtx sc; ceph::shared_mutex lock = ceph::make_shared_mutex("RGWRemoteDataLog::lock"); RGWDataSyncControlCR *data_sync_cr; @@ -474,8 +490,7 @@ class RGWRemoteDataLog : public RGWCoroutinesManager { public: RGWRemoteDataLog(const DoutPrefixProvider *dpp, - CephContext *_cct, - RGWCoroutinesManagerRegistry *_cr_registry, + rgw::sal::RGWRadosStore *_store, RGWAsyncRadosProcessor *async_rados); int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& module, @@ -692,6 +707,7 @@ class RGWRemoteBucketLog : public RGWCoroutinesManager { RGWHTTPManager *http_manager; RGWDataSyncEnv sync_env; + RGWDataSyncCtx sc; rgw_bucket_shard_sync_info init_status; RGWBucketSyncCR *sync_cr{nullptr}; diff --git a/src/rgw/rgw_sync_module.cc b/src/rgw/rgw_sync_module.cc index 9ee2b4341f6a..269316616818 100644 --- a/src/rgw/rgw_sync_module.cc +++ b/src/rgw/rgw_sync_module.cc @@ -27,15 +27,15 @@ RGWMetadataHandler *RGWSyncModuleInstance::alloc_bucket_instance_meta_handler() return RGWBucketInstanceMetaHandlerAllocator::alloc(); } -RGWStatRemoteObjCBCR::RGWStatRemoteObjCBCR(RGWDataSyncEnv *_sync_env, - rgw_bucket& _src_bucket, rgw_obj_key& _key) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), +RGWStatRemoteObjCBCR::RGWStatRemoteObjCBCR(RGWDataSyncCtx *_sc, + rgw_bucket& _src_bucket, rgw_obj_key& _key) : RGWCoroutine(_sc->cct), + sc(_sc), sync_env(_sc->env), src_bucket(_src_bucket), key(_key) { } -RGWCallStatRemoteObjCR::RGWCallStatRemoteObjCR(RGWDataSyncEnv *_sync_env, - rgw_bucket& _src_bucket, rgw_obj_key& _key) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), +RGWCallStatRemoteObjCR::RGWCallStatRemoteObjCR(RGWDataSyncCtx *_sc, + rgw_bucket& _src_bucket, rgw_obj_key& _key) : RGWCoroutine(_sc->cct), + sc(_sc), sync_env(_sc->env), src_bucket(_src_bucket), key(_key) { } @@ -43,14 +43,14 @@ int RGWCallStatRemoteObjCR::operate() { reenter(this) { yield { call(new RGWStatRemoteObjCR(sync_env->async_rados, sync_env->store, - sync_env->source_zone, + sc->source_zone, src_bucket, key, &mtime, &size, &etag, &attrs, &headers)); } if (retcode < 0) { ldout(sync_env->cct, 10) << "RGWStatRemoteObjCR() returned " << retcode << dendl; return set_cr_error(retcode); } - ldout(sync_env->cct, 20) << "stat of remote obj: z=" << sync_env->source_zone + ldout(sync_env->cct, 20) << "stat of remote obj: z=" << sc->source_zone << " b=" << src_bucket << " k=" << key << " size=" << size << " mtime=" << mtime << dendl; yield { diff --git a/src/rgw/rgw_sync_module.h b/src/rgw/rgw_sync_module.h index 6f989d981e02..a82ba21d1c98 100644 --- a/src/rgw/rgw_sync_module.h +++ b/src/rgw/rgw_sync_module.h @@ -9,6 +9,7 @@ class RGWBucketInfo; class RGWRemoteDataLog; +struct RGWDataSyncCtx; struct RGWDataSyncEnv; struct rgw_bucket_entry_owner; struct rgw_obj_key; @@ -20,19 +21,19 @@ public: RGWDataSyncModule() {} virtual ~RGWDataSyncModule() {} - virtual void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) {} + virtual void init(RGWDataSyncCtx *sync_env, uint64_t instance_id) {} - virtual RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) { + virtual RGWCoroutine *init_sync(RGWDataSyncCtx *sc) { return nullptr; } - virtual RGWCoroutine *start_sync(RGWDataSyncEnv *sync_env) { + virtual RGWCoroutine *start_sync(RGWDataSyncCtx *sc) { return nullptr; } - virtual RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) = 0; - virtual RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& bucket_info, rgw_obj_key& key, real_time& mtime, + virtual RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) = 0; + virtual RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) = 0; - virtual RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& bucket_info, rgw_obj_key& key, real_time& mtime, + virtual RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& bucket_info, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) = 0; }; @@ -139,6 +140,7 @@ public: class RGWStatRemoteObjCBCR : public RGWCoroutine { protected: + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw_bucket src_bucket; @@ -150,7 +152,7 @@ protected: map attrs; map headers; public: - RGWStatRemoteObjCBCR(RGWDataSyncEnv *_sync_env, + RGWStatRemoteObjCBCR(RGWDataSyncCtx *_sc, rgw_bucket& _src_bucket, rgw_obj_key& _key); ~RGWStatRemoteObjCBCR() override {} @@ -175,13 +177,14 @@ class RGWCallStatRemoteObjCR : public RGWCoroutine { map headers; protected: + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw_bucket src_bucket; rgw_obj_key key; public: - RGWCallStatRemoteObjCR(RGWDataSyncEnv *_sync_env, + RGWCallStatRemoteObjCR(RGWDataSyncCtx *_sc, rgw_bucket& _src_bucket, rgw_obj_key& _key); ~RGWCallStatRemoteObjCR() override {} diff --git a/src/rgw/rgw_sync_module_aws.cc b/src/rgw/rgw_sync_module_aws.cc index 3ef5dded1b15..ac45036226fc 100644 --- a/src/rgw/rgw_sync_module_aws.cc +++ b/src/rgw/rgw_sync_module_aws.cc @@ -560,24 +560,24 @@ struct AWSSyncConfig { return 0; } - void expand_target(RGWDataSyncEnv *sync_env, const string& sid, const string& path, string *dest) { + void expand_target(RGWDataSyncCtx *sc, const string& sid, const string& path, string *dest) { apply_meta_param(path, "sid", sid, dest); - const RGWZoneGroup& zg = sync_env->store->svc()->zone->get_zonegroup(); + const RGWZoneGroup& zg = sc->env->svc->zone->get_zonegroup(); apply_meta_param(path, "zonegroup", zg.get_name(), dest); apply_meta_param(path, "zonegroup_id", zg.get_id(), dest); - const RGWZone& zone = sync_env->store->svc()->zone->get_zone(); + const RGWZone& zone = sc->env->svc->zone->get_zone(); apply_meta_param(path, "zone", zone.name, dest); apply_meta_param(path, "zone_id", zone.id, dest); } - void update_config(RGWDataSyncEnv *sync_env, const string& sid) { - expand_target(sync_env, sid, root_profile->target_path, &root_profile->target_path); - ldout(sync_env->cct, 20) << "updated target: (root) -> " << root_profile->target_path << dendl; + void update_config(RGWDataSyncCtx *sc, const string& sid) { + expand_target(sc, sid, root_profile->target_path, &root_profile->target_path); + ldout(sc->cct, 20) << "updated target: (root) -> " << root_profile->target_path << dendl; for (auto& t : explicit_profiles) { - expand_target(sync_env, sid, t.second->target_path, &t.second->target_path); - ldout(sync_env->cct, 20) << "updated target: " << t.first << " -> " << t.second->target_path << dendl; + expand_target(sc, sid, t.second->target_path, &t.second->target_path); + ldout(sc->cct, 20) << "updated target: " << t.first << " -> " << t.second->target_path << dendl; } } @@ -636,13 +636,15 @@ struct AWSSyncConfig { *obj_name = path.substr(pos + 1); } - void init_conns(RGWDataSyncEnv *sync_env, const string& id) { - update_config(sync_env, id); + void init_conns(RGWDataSyncCtx *sc, const string& id) { + auto sync_env = sc->env; + + update_config(sc, id); auto& root_conf = root_profile->conn_conf; - root_profile->conn.reset(new S3RESTConn(sync_env->cct, - sync_env->store->svc()->zone, + root_profile->conn.reset(new S3RESTConn(sc->cct, + sync_env->svc->zone, id, { root_conf->endpoint }, root_conf->key, @@ -651,8 +653,8 @@ struct AWSSyncConfig { for (auto i : explicit_profiles) { auto& c = i.second; - c->conn.reset(new S3RESTConn(sync_env->cct, - sync_env->store->svc()->zone, + c->conn.reset(new S3RESTConn(sc->cct, + sync_env->svc->zone, id, { c->conn_conf->endpoint }, c->conn_conf->key, @@ -668,12 +670,12 @@ struct AWSSyncInstanceEnv { explicit AWSSyncInstanceEnv(AWSSyncConfig& _conf) : conf(_conf) {} - void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) { + void init(RGWDataSyncCtx *sc, uint64_t instance_id) { char buf[32]; snprintf(buf, sizeof(buf), "%llx", (unsigned long long)instance_id); id = buf; - conf.init_conns(sync_env, id); + conf.init_conns(sc, id); } void get_profile(const rgw_bucket& bucket, std::shared_ptr *ptarget) { @@ -713,7 +715,7 @@ static int do_decode_rest_obj(CephContext *cct, map& attrs, class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF { - RGWDataSyncEnv *sync_env; + RGWDataSyncCtx *sc; RGWRESTConn *conn; rgw_obj src_obj; RGWRESTConn::get_obj_params req_params; @@ -723,12 +725,12 @@ public: RGWRESTStreamGetCRF(CephContext *_cct, RGWCoroutinesEnv *_env, RGWCoroutine *_caller, - RGWDataSyncEnv *_sync_env, + RGWDataSyncCtx *_sc, RGWRESTConn *_conn, rgw_obj& _src_obj, const rgw_sync_aws_src_obj_properties& _src_properties) : RGWStreamReadHTTPResourceCRF(_cct, _env, _caller, - _sync_env->http_manager, _src_obj.key), - sync_env(_sync_env), conn(_conn), src_obj(_src_obj), + _sc->env->http_manager, _src_obj.key), + sc(_sc), conn(_conn), src_obj(_src_obj), src_properties(_src_properties) { } @@ -753,7 +755,7 @@ public: RGWRESTStreamRWRequest *in_req; int ret = conn->get_obj(src_obj, req_params, false /* send */, &in_req); if (ret < 0) { - ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl; + ldout(sc->cct, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl; return ret; } @@ -765,18 +767,18 @@ public: int decode_rest_obj(map& headers, bufferlist& extra_data) override { map src_attrs; - ldout(sync_env->cct, 20) << __func__ << ":" << " headers=" << headers << " extra_data.length()=" << extra_data.length() << dendl; + ldout(sc->cct, 20) << __func__ << ":" << " headers=" << headers << " extra_data.length()=" << extra_data.length() << dendl; if (extra_data.length() > 0) { JSONParser jp; if (!jp.parse(extra_data.c_str(), extra_data.length())) { - ldout(sync_env->cct, 0) << "ERROR: failed to parse response extra data. len=" << extra_data.length() << " data=" << extra_data.c_str() << dendl; + ldout(sc->cct, 0) << "ERROR: failed to parse response extra data. len=" << extra_data.length() << " data=" << extra_data.c_str() << dendl; return -EIO; } JSONDecoder::decode_json("attrs", src_attrs, &jp); } - return do_decode_rest_obj(sync_env->cct, src_attrs, headers, &rest_obj); + return do_decode_rest_obj(sc->cct, src_attrs, headers, &rest_obj); } bool need_extra_data() override { @@ -791,7 +793,7 @@ static std::set keep_headers = { "CONTENT_TYPE", class RGWAWSStreamPutCRF : public RGWStreamWriteHTTPResourceCRF { - RGWDataSyncEnv *sync_env; + RGWDataSyncCtx *sc; rgw_sync_aws_src_obj_properties src_properties; std::shared_ptr target; rgw_obj dest_obj; @@ -800,11 +802,11 @@ public: RGWAWSStreamPutCRF(CephContext *_cct, RGWCoroutinesEnv *_env, RGWCoroutine *_caller, - RGWDataSyncEnv *_sync_env, + RGWDataSyncCtx *_sc, const rgw_sync_aws_src_obj_properties& _src_properties, std::shared_ptr& _target, - rgw_obj& _dest_obj) : RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _sync_env->http_manager), - sync_env(_sync_env), src_properties(_src_properties), target(_target), dest_obj(_dest_obj) { + rgw_obj& _dest_obj) : RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _sc->env->http_manager), + sc(_sc), src_properties(_src_properties), target(_target), dest_obj(_dest_obj) { } int init() override { @@ -959,7 +961,7 @@ public: map new_attrs; if (!multipart.is_multipart) { - init_send_attrs(sync_env->cct, rest_obj, src_properties, target.get(), &new_attrs); + init_send_attrs(sc->cct, rest_obj, src_properties, target.get(), &new_attrs); } r->set_send_length(rest_obj.content_len); @@ -988,7 +990,7 @@ public: class RGWAWSStreamObjToCloudPlainCR : public RGWCoroutine { - RGWDataSyncEnv *sync_env; + RGWDataSyncCtx *sc; RGWRESTConn *source_conn; std::shared_ptr target; rgw_obj src_obj; @@ -1000,13 +1002,13 @@ class RGWAWSStreamObjToCloudPlainCR : public RGWCoroutine { std::shared_ptr out_crf; public: - RGWAWSStreamObjToCloudPlainCR(RGWDataSyncEnv *_sync_env, + RGWAWSStreamObjToCloudPlainCR(RGWDataSyncCtx *_sc, RGWRESTConn *_source_conn, const rgw_obj& _src_obj, const rgw_sync_aws_src_obj_properties& _src_properties, std::shared_ptr _target, - const rgw_obj& _dest_obj) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), + const rgw_obj& _dest_obj) : RGWCoroutine(_sc->cct), + sc(_sc), source_conn(_source_conn), target(_target), src_obj(_src_obj), @@ -1016,15 +1018,15 @@ public: int operate() override { reenter(this) { /* init input */ - in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, + in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sc, source_conn, src_obj, src_properties)); /* init output */ - out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, + out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sc, src_properties, target, dest_obj)); - yield call(new RGWStreamSpliceCR(cct, sync_env->http_manager, in_crf, out_crf)); + yield call(new RGWStreamSpliceCR(cct, sc->env->http_manager, in_crf, out_crf)); if (retcode < 0) { return set_cr_error(retcode); } @@ -1037,7 +1039,7 @@ public: }; class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine { - RGWDataSyncEnv *sync_env; + RGWDataSyncCtx *sc; RGWRESTConn *source_conn; std::shared_ptr target; rgw_obj src_obj; @@ -1055,7 +1057,7 @@ class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine { string *petag; public: - RGWAWSStreamObjToCloudMultipartPartCR(RGWDataSyncEnv *_sync_env, + RGWAWSStreamObjToCloudMultipartPartCR(RGWDataSyncCtx *_sc, RGWRESTConn *_source_conn, const rgw_obj& _src_obj, std::shared_ptr& _target, @@ -1063,8 +1065,8 @@ public: const rgw_sync_aws_src_obj_properties& _src_properties, const string& _upload_id, const rgw_sync_aws_multipart_part_info& _part_info, - string *_petag) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), + string *_petag) : RGWCoroutine(_sc->cct), + sc(_sc), source_conn(_source_conn), target(_target), src_obj(_src_obj), @@ -1077,25 +1079,25 @@ public: int operate() override { reenter(this) { /* init input */ - in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, + in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sc, source_conn, src_obj, src_properties)); in_crf->set_range(part_info.ofs, part_info.size); /* init output */ - out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, + out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sc, src_properties, target, dest_obj)); out_crf->set_multipart(upload_id, part_info.part_num, part_info.size); - yield call(new RGWStreamSpliceCR(cct, sync_env->http_manager, in_crf, out_crf)); + yield call(new RGWStreamSpliceCR(cct, sc->env->http_manager, in_crf, out_crf)); if (retcode < 0) { return set_cr_error(retcode); } if (!(static_cast(out_crf.get()))->get_etag(petag)) { - ldout(sync_env->cct, 0) << "ERROR: failed to get etag from PUT request" << dendl; + ldout(sc->cct, 0) << "ERROR: failed to get etag from PUT request" << dendl; return set_cr_error(-EIO); } @@ -1107,18 +1109,18 @@ public: }; class RGWAWSAbortMultipartCR : public RGWCoroutine { - RGWDataSyncEnv *sync_env; + RGWDataSyncCtx *sc; RGWRESTConn *dest_conn; rgw_obj dest_obj; string upload_id; public: - RGWAWSAbortMultipartCR(RGWDataSyncEnv *_sync_env, + RGWAWSAbortMultipartCR(RGWDataSyncCtx *_sc, RGWRESTConn *_dest_conn, const rgw_obj& _dest_obj, - const string& _upload_id) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), + const string& _upload_id) : RGWCoroutine(_sc->cct), + sc(_sc), dest_conn(_dest_conn), dest_obj(_dest_obj), upload_id(_upload_id) {} @@ -1129,12 +1131,12 @@ public: yield { rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} }; bufferlist bl; - call(new RGWDeleteRESTResourceCR(sync_env->cct, dest_conn, sync_env->http_manager, + call(new RGWDeleteRESTResourceCR(sc->cct, dest_conn, sc->env->http_manager, obj_to_aws_path(dest_obj), params)); } if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj << " (retcode=" << retcode << ")" << dendl; + ldout(sc->cct, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj << " (retcode=" << retcode << ")" << dendl; return set_cr_error(retcode); } @@ -1146,7 +1148,7 @@ public: }; class RGWAWSInitMultipartCR : public RGWCoroutine { - RGWDataSyncEnv *sync_env; + RGWDataSyncCtx *sc; RGWRESTConn *dest_conn; rgw_obj dest_obj; @@ -1170,13 +1172,13 @@ class RGWAWSInitMultipartCR : public RGWCoroutine { } result; public: - RGWAWSInitMultipartCR(RGWDataSyncEnv *_sync_env, + RGWAWSInitMultipartCR(RGWDataSyncCtx *_sc, RGWRESTConn *_dest_conn, const rgw_obj& _dest_obj, uint64_t _obj_size, const map& _attrs, - string *_upload_id) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), + string *_upload_id) : RGWCoroutine(_sc->cct), + sc(_sc), dest_conn(_dest_conn), dest_obj(_dest_obj), obj_size(_obj_size), @@ -1189,12 +1191,12 @@ public: yield { rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} }; bufferlist bl; - call(new RGWPostRawRESTResourceCR (sync_env->cct, dest_conn, sync_env->http_manager, + call(new RGWPostRawRESTResourceCR (sc->cct, dest_conn, sc->env->http_manager, obj_to_aws_path(dest_obj), params, &attrs, bl, &out_bl)); } if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl; + ldout(sc->cct, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl; return set_cr_error(retcode); } { @@ -1205,13 +1207,13 @@ public: */ RGWXMLDecoder::XMLParser parser; if (!parser.init()) { - ldout(sync_env->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl; + ldout(sc->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl; return set_cr_error(-EIO); } if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) { string str(out_bl.c_str(), out_bl.length()); - ldout(sync_env->cct, 5) << "ERROR: failed to parse xml: " << str << dendl; + ldout(sc->cct, 5) << "ERROR: failed to parse xml: " << str << dendl; return set_cr_error(-EIO); } @@ -1219,12 +1221,12 @@ public: RGWXMLDecoder::decode_xml("InitiateMultipartUploadResult", result, &parser, true); } catch (RGWXMLDecoder::err& err) { string str(out_bl.c_str(), out_bl.length()); - ldout(sync_env->cct, 5) << "ERROR: unexpected xml: " << str << dendl; + ldout(sc->cct, 5) << "ERROR: unexpected xml: " << str << dendl; return set_cr_error(-EIO); } } - ldout(sync_env->cct, 20) << "init multipart result: bucket=" << result.bucket << " key=" << result.key << " upload_id=" << result.upload_id << dendl; + ldout(sc->cct, 20) << "init multipart result: bucket=" << result.bucket << " key=" << result.key << " upload_id=" << result.upload_id << dendl; *upload_id = result.upload_id; @@ -1236,7 +1238,7 @@ public: }; class RGWAWSCompleteMultipartCR : public RGWCoroutine { - RGWDataSyncEnv *sync_env; + RGWDataSyncCtx *sc; RGWRESTConn *dest_conn; rgw_obj dest_obj; @@ -1274,12 +1276,12 @@ class RGWAWSCompleteMultipartCR : public RGWCoroutine { } result; public: - RGWAWSCompleteMultipartCR(RGWDataSyncEnv *_sync_env, + RGWAWSCompleteMultipartCR(RGWDataSyncCtx *_sc, RGWRESTConn *_dest_conn, const rgw_obj& _dest_obj, string _upload_id, - const map& _parts) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), + const map& _parts) : RGWCoroutine(_sc->cct), + sc(_sc), dest_conn(_dest_conn), dest_obj(_dest_obj), upload_id(_upload_id), @@ -1300,12 +1302,12 @@ public: bufferlist bl; bl.append(ss.str()); - call(new RGWPostRawRESTResourceCR (sync_env->cct, dest_conn, sync_env->http_manager, + call(new RGWPostRawRESTResourceCR (sc->cct, dest_conn, sc->env->http_manager, obj_to_aws_path(dest_obj), params, nullptr, bl, &out_bl)); } if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl; + ldout(sc->cct, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl; return set_cr_error(retcode); } { @@ -1316,13 +1318,13 @@ public: */ RGWXMLDecoder::XMLParser parser; if (!parser.init()) { - ldout(sync_env->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl; + ldout(sc->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl; return set_cr_error(-EIO); } if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) { string str(out_bl.c_str(), out_bl.length()); - ldout(sync_env->cct, 5) << "ERROR: failed to parse xml: " << str << dendl; + ldout(sc->cct, 5) << "ERROR: failed to parse xml: " << str << dendl; return set_cr_error(-EIO); } @@ -1330,12 +1332,12 @@ public: RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result, &parser, true); } catch (RGWXMLDecoder::err& err) { string str(out_bl.c_str(), out_bl.length()); - ldout(sync_env->cct, 5) << "ERROR: unexpected xml: " << str << dendl; + ldout(sc->cct, 5) << "ERROR: unexpected xml: " << str << dendl; return set_cr_error(-EIO); } } - ldout(sync_env->cct, 20) << "complete multipart result: location=" << result.location << " bucket=" << result.bucket << " key=" << result.key << " etag=" << result.etag << dendl; + ldout(sc->cct, 20) << "complete multipart result: location=" << result.location << " bucket=" << result.bucket << " key=" << result.key << " etag=" << result.etag << dendl; return set_cr_done(); } @@ -1346,7 +1348,7 @@ public: class RGWAWSStreamAbortMultipartUploadCR : public RGWCoroutine { - RGWDataSyncEnv *sync_env; + RGWDataSyncCtx *sc; RGWRESTConn *dest_conn; const rgw_obj dest_obj; const rgw_raw_obj status_obj; @@ -1355,11 +1357,11 @@ class RGWAWSStreamAbortMultipartUploadCR : public RGWCoroutine { public: - RGWAWSStreamAbortMultipartUploadCR(RGWDataSyncEnv *_sync_env, + RGWAWSStreamAbortMultipartUploadCR(RGWDataSyncCtx *_sc, RGWRESTConn *_dest_conn, const rgw_obj& _dest_obj, const rgw_raw_obj& _status_obj, - const string& _upload_id) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + const string& _upload_id) : RGWCoroutine(_sc->cct), sc(_sc), dest_conn(_dest_conn), dest_obj(_dest_obj), status_obj(_status_obj), @@ -1367,14 +1369,14 @@ public: int operate() override { reenter(this) { - yield call(new RGWAWSAbortMultipartCR(sync_env, dest_conn, dest_obj, upload_id)); + yield call(new RGWAWSAbortMultipartCR(sc, dest_conn, dest_obj, upload_id)); if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl; + ldout(sc->cct, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl; /* ignore error, best effort */ } - yield call(new RGWRadosRemoveCR(sync_env->store, status_obj)); + yield call(new RGWRadosRemoveCR(sc->env->store, status_obj)); if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << retcode << dendl; + ldout(sc->cct, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << retcode << dendl; /* ignore error, best effort */ } return set_cr_done(); @@ -1385,6 +1387,7 @@ public: }; class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; AWSSyncConfig& conf; RGWRESTConn *source_conn; @@ -1408,7 +1411,7 @@ class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine { rgw_raw_obj status_obj; public: - RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncEnv *_sync_env, + RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncCtx *_sc, AWSSyncConfig& _conf, RGWRESTConn *_source_conn, const rgw_obj& _src_obj, @@ -1416,8 +1419,9 @@ public: const rgw_obj& _dest_obj, uint64_t _obj_size, const rgw_sync_aws_src_obj_properties& _src_properties, - const rgw_rest_obj& _rest_obj) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), + const rgw_rest_obj& _rest_obj) : RGWCoroutine(_sc->cct), + sc(_sc), + sync_env(_sc->env), conf(_conf), source_conn(_source_conn), target(_target), @@ -1426,18 +1430,18 @@ public: obj_size(_obj_size), src_properties(_src_properties), rest_obj(_rest_obj), - status_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, - RGWBucketPipeSyncStatusManager::obj_status_oid(sync_env->source_zone, src_obj)) { + status_obj(sync_env->svc->zone->get_zone_params().log_pool, + RGWBucketPipeSyncStatusManager::obj_status_oid(sc->source_zone, src_obj)) { } int operate() override { reenter(this) { - yield call(new RGWSimpleRadosReadCR(sync_env->async_rados, sync_env->store->svc()->sysobj, + yield call(new RGWSimpleRadosReadCR(sync_env->async_rados, sync_env->svc->sysobj, status_obj, &status, false)); if (retcode < 0 && retcode != -ENOENT) { - ldout(sync_env->cct, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << retcode << dendl; + ldout(sc->cct, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << retcode << dendl; return retcode; } @@ -1446,15 +1450,15 @@ public: if (status.src_properties.mtime != src_properties.mtime || status.obj_size != obj_size || status.src_properties.etag != src_properties.etag) { - yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, target->conn.get(), dest_obj, status_obj, status.upload_id)); + yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id)); retcode = -ENOENT; } } if (retcode == -ENOENT) { - RGWAWSStreamPutCRF::init_send_attrs(sync_env->cct, rest_obj, src_properties, target.get(), &new_attrs); + RGWAWSStreamPutCRF::init_send_attrs(sc->cct, rest_obj, src_properties, target.get(), &new_attrs); - yield call(new RGWAWSInitMultipartCR(sync_env, target->conn.get(), dest_obj, status.obj_size, std::move(new_attrs), &status.upload_id)); + yield call(new RGWAWSInitMultipartCR(sc, target->conn.get(), dest_obj, status.obj_size, std::move(new_attrs), &status.upload_id)); if (retcode < 0) { return set_cr_error(retcode); } @@ -1479,7 +1483,7 @@ public: status.cur_ofs += status.part_size; - call(new RGWAWSStreamObjToCloudMultipartPartCR(sync_env, + call(new RGWAWSStreamObjToCloudMultipartPartCR(sc, source_conn, src_obj, target, dest_obj, @@ -1490,32 +1494,32 @@ public: } if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to sync obj=" << src_obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << status.cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl; + ldout(sc->cct, 0) << "ERROR: failed to sync obj=" << src_obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << status.cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl; ret_err = retcode; - yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, target->conn.get(), dest_obj, status_obj, status.upload_id)); + yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id)); return set_cr_error(ret_err); } - yield call(new RGWSimpleRadosWriteCR(sync_env->async_rados, sync_env->store->svc()->sysobj, status_obj, status)); + yield call(new RGWSimpleRadosWriteCR(sync_env->async_rados, sync_env->svc->sysobj, status_obj, status)); if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl; + ldout(sc->cct, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl; /* continue with upload anyway */ } - ldout(sync_env->cct, 20) << "sync of object=" << src_obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << pcur_part_info->etag << dendl; + ldout(sc->cct, 20) << "sync of object=" << src_obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << pcur_part_info->etag << dendl; } - yield call(new RGWAWSCompleteMultipartCR(sync_env, target->conn.get(), dest_obj, status.upload_id, status.parts)); + yield call(new RGWAWSCompleteMultipartCR(sc, target->conn.get(), dest_obj, status.upload_id, status.parts)); if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to complete multipart upload of obj=" << src_obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl; + ldout(sc->cct, 0) << "ERROR: failed to complete multipart upload of obj=" << src_obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl; ret_err = retcode; - yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, target->conn.get(), dest_obj, status_obj, status.upload_id)); + yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id)); return set_cr_error(ret_err); } /* remove status obj */ yield call(new RGWRadosRemoveCR(sync_env->store, status_obj)); if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl; + ldout(sc->cct, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl; /* ignore error, best effort */ } return set_cr_done(); @@ -1576,11 +1580,11 @@ class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR { } result; public: - RGWAWSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env, + RGWAWSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, AWSSyncInstanceEnv& _instance, - uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _sync_pipe.source_bs.bucket, _key), + uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.source_bs.bucket, _key), sync_pipe(_sync_pipe), instance(_instance), versioned_epoch(_versioned_epoch) {} @@ -1592,23 +1596,23 @@ public: reenter(this) { ret = decode_attr(attrs, RGW_ATTR_PG_VER, &src_pg_ver, (uint64_t)0); if (ret < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to decode pg ver attr, ignoring" << dendl; + ldout(sc->cct, 0) << "ERROR: failed to decode pg ver attr, ignoring" << dendl; } else { ret = decode_attr(attrs, RGW_ATTR_SOURCE_ZONE, &src_zone_short_id, (uint32_t)0); if (ret < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to decode source zone short_id attr, ignoring" << dendl; + ldout(sc->cct, 0) << "ERROR: failed to decode source zone short_id attr, ignoring" << dendl; src_pg_ver = 0; /* all or nothing */ } } - ldout(sync_env->cct, 4) << "AWS: download begin: z=" << sync_env->source_zone + ldout(sc->cct, 4) << "AWS: download begin: z=" << sc->source_zone << " b=" << src_bucket << " k=" << key << " size=" << size << " mtime=" << mtime << " etag=" << etag << " zone_short_id=" << src_zone_short_id << " pg_ver=" << src_pg_ver << dendl; - source_conn = sync_env->store->svc()->zone->get_zone_conn_by_id(sync_env->source_zone); + source_conn = sync_env->svc->zone->get_zone_conn_by_id(sc->source_zone); if (!source_conn) { - ldout(sync_env->cct, 0) << "ERROR: cannot find http connection to zone " << sync_env->source_zone << dendl; + ldout(sc->cct, 0) << "ERROR: cannot find http connection to zone " << sc->source_zone << dendl; return set_cr_error(-EINVAL); } @@ -1617,22 +1621,22 @@ public: if (bucket_created.find(target_bucket_name) == bucket_created.end()){ yield { - ldout(sync_env->cct,0) << "AWS: creating bucket " << target_bucket_name << dendl; + ldout(sc->cct,0) << "AWS: creating bucket " << target_bucket_name << dendl; bufferlist bl; - call(new RGWPutRawRESTResourceCR (sync_env->cct, target->conn.get(), + call(new RGWPutRawRESTResourceCR (sc->cct, target->conn.get(), sync_env->http_manager, target_bucket_name, nullptr, bl, &out_bl)); } if (retcode < 0 ) { RGWXMLDecoder::XMLParser parser; if (!parser.init()) { - ldout(sync_env->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl; + ldout(sc->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl; return set_cr_error(retcode); } if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) { string str(out_bl.c_str(), out_bl.length()); - ldout(sync_env->cct, 5) << "ERROR: failed to parse xml: " << str << dendl; + ldout(sc->cct, 5) << "ERROR: failed to parse xml: " << str << dendl; return set_cr_error(retcode); } @@ -1640,7 +1644,7 @@ public: RGWXMLDecoder::decode_xml("Error", result, &parser, true); } catch (RGWXMLDecoder::err& err) { string str(out_bl.c_str(), out_bl.length()); - ldout(sync_env->cct, 5) << "ERROR: unexpected xml: " << str << dendl; + ldout(sc->cct, 5) << "ERROR: unexpected xml: " << str << dendl; return set_cr_error(retcode); } @@ -1670,18 +1674,18 @@ public: src_properties.versioned_epoch = versioned_epoch; if (size < instance.conf.s3.multipart_sync_threshold) { - call(new RGWAWSStreamObjToCloudPlainCR(sync_env, source_conn, src_obj, + call(new RGWAWSStreamObjToCloudPlainCR(sc, source_conn, src_obj, src_properties, target, dest_obj)); } else { rgw_rest_obj rest_obj; rest_obj.init(key); - if (do_decode_rest_obj(sync_env->cct, attrs, headers, &rest_obj)) { - ldout(sync_env->cct, 0) << "ERROR: failed to decode rest obj out of headers=" << headers << ", attrs=" << attrs << dendl; + if (do_decode_rest_obj(sc->cct, attrs, headers, &rest_obj)) { + ldout(sc->cct, 0) << "ERROR: failed to decode rest obj out of headers=" << headers << ", attrs=" << attrs << dendl; return set_cr_error(-EINVAL); } - call(new RGWAWSStreamObjToCloudMultipartCR(sync_env, instance.conf, source_conn, src_obj, + call(new RGWAWSStreamObjToCloudMultipartCR(sc, instance.conf, source_conn, src_obj, target, dest_obj, size, src_properties, rest_obj)); } } @@ -1701,9 +1705,9 @@ class RGWAWSHandleRemoteObjCR : public RGWCallStatRemoteObjCR { AWSSyncInstanceEnv& instance; uint64_t versioned_epoch; public: - RGWAWSHandleRemoteObjCR(RGWDataSyncEnv *_sync_env, + RGWAWSHandleRemoteObjCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, - AWSSyncInstanceEnv& _instance, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _sync_pipe.source_bs.bucket, _key), + AWSSyncInstanceEnv& _instance, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.source_bs.bucket, _key), sync_pipe(_sync_pipe), instance(_instance), versioned_epoch(_versioned_epoch) { } @@ -1711,12 +1715,12 @@ public: ~RGWAWSHandleRemoteObjCR() {} RGWStatRemoteObjCBCR *allocate_callback() override { - return new RGWAWSHandleRemoteObjCBCR(sync_env, sync_pipe, key, instance, versioned_epoch); + return new RGWAWSHandleRemoteObjCBCR(sc, sync_pipe, key, instance, versioned_epoch); } }; class RGWAWSRemoveRemoteObjCBCR : public RGWCoroutine { - RGWDataSyncEnv *sync_env{nullptr}; + RGWDataSyncCtx *sc; std::shared_ptr target; rgw_bucket_sync_pipe sync_pipe; rgw_obj_key key; @@ -1724,22 +1728,22 @@ class RGWAWSRemoveRemoteObjCBCR : public RGWCoroutine { AWSSyncInstanceEnv& instance; int ret{0}; public: - RGWAWSRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env, + RGWAWSRemoveRemoteObjCBCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime, - AWSSyncInstanceEnv& _instance) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + AWSSyncInstanceEnv& _instance) : RGWCoroutine(_sc->cct), sc(_sc), sync_pipe(_sync_pipe), key(_key), mtime(_mtime), instance(_instance) {} int operate() override { reenter(this) { - ldout(sync_env->cct, 0) << ": remove remote obj: z=" << sync_env->source_zone + ldout(sc->cct, 0) << ": remove remote obj: z=" << sc->source_zone << " b=" <cct, 0) << "AWS: removing aws object at" << path << dendl; - call(new RGWDeleteRESTResourceCR(sync_env->cct, target->conn.get(), - sync_env->http_manager, + call(new RGWDeleteRESTResourceCR(sc->cct, target->conn.get(), + sc->env->http_manager, path, nullptr /* params */)); } if (retcode < 0) { @@ -1762,27 +1766,27 @@ public: instance(_conf) { } - void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override { - instance.init(sync_env, instance_id); + void init(RGWDataSyncCtx *sc, uint64_t instance_id) override { + instance.init(sc, instance_id); } ~RGWAWSDataSyncModule() {} - RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, + RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 0) << instance.id << ": sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; - return new RGWAWSHandleRemoteObjCR(sync_env, sync_pipe, key, instance, versioned_epoch.value_or(0)); + ldout(sc->cct, 0) << instance.id << ": sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; + return new RGWAWSHandleRemoteObjCR(sc, sync_pipe, key, instance, versioned_epoch.value_or(0)); } - RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, + RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 0) <<"rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; - return new RGWAWSRemoveRemoteObjCBCR(sync_env, sync_pipe, key, mtime, instance); + ldout(sc->cct, 0) <<"rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; + return new RGWAWSRemoveRemoteObjCBCR(sc, sync_pipe, key, mtime, instance); } - RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, + RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime + ldout(sc->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return NULL; } diff --git a/src/rgw/rgw_sync_module_es.cc b/src/rgw/rgw_sync_module_es.cc index 46433c3903a6..eea1d9082010 100644 --- a/src/rgw/rgw_sync_module_es.cc +++ b/src/rgw/rgw_sync_module_es.cc @@ -643,13 +643,13 @@ struct es_obj_metadata { class RGWElasticGetESInfoCBCR : public RGWCoroutine { public: - RGWElasticGetESInfoCBCR(RGWDataSyncEnv *_sync_env, - ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), + RGWElasticGetESInfoCBCR(RGWDataSyncCtx *_sc, + ElasticConfigRef _conf) : RGWCoroutine(_sc->cct), + sc(_sc), sync_env(_sc->env), conf(_conf) {} int operate() override { reenter(this) { - ldout(sync_env->cct, 5) << conf->id << ": get elasticsearch info for zone: " << sync_env->source_zone << dendl; + ldout(sync_env->cct, 5) << conf->id << ": get elasticsearch info for zone: " << sc->source_zone << dendl; yield call(new RGWReadRESTResourceCR (sync_env->cct, conf->conn.get(), sync_env->http_manager, @@ -667,19 +667,20 @@ public: return 0; } private: + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; ElasticConfigRef conf; }; class RGWElasticPutIndexCBCR : public RGWCoroutine { public: - RGWElasticPutIndexCBCR(RGWDataSyncEnv *_sync_env, - ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), + RGWElasticPutIndexCBCR(RGWDataSyncCtx *_sc, + ElasticConfigRef _conf) : RGWCoroutine(_sc->cct), + sc(_sc), sync_env(_sc->env), conf(_conf) {} int operate() override { reenter(this) { - ldout(sync_env->cct, 5) << conf->id << ": put elasticsearch index for zone: " << sync_env->source_zone << dendl; + ldout(sc->cct, 5) << conf->id << ": put elasticsearch index for zone: " << sc->source_zone << dendl; yield { string path = conf->get_index_path(); @@ -687,13 +688,13 @@ public: std::unique_ptr index_conf; if (conf->es_info.version >= ES_V5) { - ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version >= 5" << dendl; + ldout(sc->cct, 0) << "elasticsearch: index mapping: version >= 5" << dendl; index_conf.reset(new es_index_config(settings, conf->es_info.version)); } else { - ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version < 5" << dendl; + ldout(sc->cct, 0) << "elasticsearch: index mapping: version < 5" << dendl; index_conf.reset(new es_index_config(settings, conf->es_info.version)); } - call(new RGWPutRESTResourceCR (sync_env->cct, + call(new RGWPutRESTResourceCR (sc->cct, conf->conn.get(), sync_env->http_manager, path, nullptr /*params*/, @@ -716,6 +717,7 @@ public: } private: + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; ElasticConfigRef conf; @@ -741,24 +743,25 @@ private: }; class RGWElasticInitConfigCBCR : public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; ElasticConfigRef conf; public: - RGWElasticInitConfigCBCR(RGWDataSyncEnv *_sync_env, - ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), + RGWElasticInitConfigCBCR(RGWDataSyncCtx *_sc, + ElasticConfigRef _conf) : RGWCoroutine(_sc->cct), + sc(_sc), sync_env(_sc->env), conf(_conf) {} int operate() override { reenter(this) { - yield call(new RGWElasticGetESInfoCBCR(sync_env, conf)); + yield call(new RGWElasticGetESInfoCBCR(sc, conf)); if (retcode < 0) { return set_cr_error(retcode); } - yield call(new RGWElasticPutIndexCBCR(sync_env, conf)); + yield call(new RGWElasticPutIndexCBCR(sc, conf)); if (retcode < 0) { return set_cr_error(retcode); } @@ -774,14 +777,14 @@ class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { ElasticConfigRef conf; uint64_t versioned_epoch; public: - RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env, + RGWElasticHandleRemoteObjCBCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, - ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _sync_pipe.source_bs.bucket, _key), + ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.source_bs.bucket, _key), sync_pipe(_sync_pipe), conf(_conf), versioned_epoch(_versioned_epoch) {} int operate() override { reenter(this) { - ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone + ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sc->source_zone << " b=" << sync_pipe.source_bs.bucket << " k=" << key << " size=" << size << " mtime=" << mtime << dendl; @@ -810,9 +813,9 @@ class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR { ElasticConfigRef conf; uint64_t versioned_epoch; public: - RGWElasticHandleRemoteObjCR(RGWDataSyncEnv *_sync_env, + RGWElasticHandleRemoteObjCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, - ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _sync_pipe.source_bs.bucket, _key), + ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.source_bs.bucket, _key), sync_pipe(_sync_pipe), conf(_conf), versioned_epoch(_versioned_epoch) { } @@ -820,25 +823,26 @@ public: ~RGWElasticHandleRemoteObjCR() override {} RGWStatRemoteObjCBCR *allocate_callback() override { - return new RGWElasticHandleRemoteObjCBCR(sync_env, sync_pipe, key, conf, versioned_epoch); + return new RGWElasticHandleRemoteObjCBCR(sc, sync_pipe, key, conf, versioned_epoch); } }; class RGWElasticRemoveRemoteObjCBCR : public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw_bucket_sync_pipe sync_pipe; rgw_obj_key key; ceph::real_time mtime; ElasticConfigRef conf; public: - RGWElasticRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env, + RGWElasticRemoveRemoteObjCBCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime, - ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + ElasticConfigRef _conf) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), sync_pipe(_sync_pipe), key(_key), mtime(_mtime), conf(_conf) {} int operate() override { reenter(this) { - ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone + ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sc->source_zone << " b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl; yield { string path = conf->get_obj_path(sync_pipe.dest_bucket_info, key); @@ -865,43 +869,43 @@ public: } ~RGWElasticDataSyncModule() override {} - void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override { - conf->init_instance(sync_env->store->svc()->zone->get_realm(), instance_id); + void init(RGWDataSyncCtx *sc, uint64_t instance_id) override { + conf->init_instance(sc->env->svc->zone->get_realm(), instance_id); } - RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override { - ldout(sync_env->cct, 5) << conf->id << ": init" << dendl; - return new RGWElasticInitConfigCBCR(sync_env, conf); + RGWCoroutine *init_sync(RGWDataSyncCtx *sc) override { + ldout(sc->cct, 5) << conf->id << ": init" << dendl; + return new RGWElasticInitConfigCBCR(sc, conf); } - RGWCoroutine *start_sync(RGWDataSyncEnv *sync_env) override { - ldout(sync_env->cct, 5) << conf->id << ": start_sync" << dendl; + RGWCoroutine *start_sync(RGWDataSyncCtx *sc) override { + ldout(sc->cct, 5) << conf->id << ": start_sync" << dendl; // try to get elastic search version - return new RGWElasticGetESInfoCBCR(sync_env, conf); + return new RGWElasticGetESInfoCBCR(sc, conf); } - RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; + RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { + ldout(sc->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) { - ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; + ldout(sc->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; return nullptr; } - return new RGWElasticHandleRemoteObjCR(sync_env, sync_pipe, key, conf, versioned_epoch.value_or(0)); + return new RGWElasticHandleRemoteObjCR(sc, sync_pipe, key, conf, versioned_epoch.value_or(0)); } - RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { + RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { /* versioned and versioned epoch params are useless in the elasticsearch backend case */ - ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; + ldout(sc->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) { - ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; + ldout(sc->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; return nullptr; } - return new RGWElasticRemoveRemoteObjCBCR(sync_env, sync_pipe, key, mtime, conf); + return new RGWElasticRemoveRemoteObjCBCR(sc, sync_pipe, key, mtime, conf); } - RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, + RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime + ldout(sc->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; - ldout(sync_env->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl; + ldout(sc->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl; return NULL; } RGWRESTConn *get_rest_conn() { diff --git a/src/rgw/rgw_sync_module_log.cc b/src/rgw/rgw_sync_module_log.cc index 981e820910ff..c712f7c17f32 100644 --- a/src/rgw/rgw_sync_module_log.cc +++ b/src/rgw/rgw_sync_module_log.cc @@ -12,10 +12,10 @@ class RGWLogStatRemoteObjCBCR : public RGWStatRemoteObjCBCR { public: - RGWLogStatRemoteObjCBCR(RGWDataSyncEnv *_sync_env, - rgw_bucket& _src_bucket, rgw_obj_key& _key) : RGWStatRemoteObjCBCR(_sync_env, _src_bucket, _key) {} + RGWLogStatRemoteObjCBCR(RGWDataSyncCtx *_sc, + rgw_bucket& _src_bucket, rgw_obj_key& _key) : RGWStatRemoteObjCBCR(_sc, _src_bucket, _key) {} int operate() override { - ldout(sync_env->cct, 0) << "SYNC_LOG: stat of remote obj: z=" << sync_env->source_zone + ldout(sync_env->cct, 0) << "SYNC_LOG: stat of remote obj: z=" << sc->source_zone << " b=" << src_bucket << " k=" << key << " size=" << size << " mtime=" << mtime << " attrs=" << attrs << dendl; return set_cr_done(); @@ -25,14 +25,14 @@ public: class RGWLogStatRemoteObjCR : public RGWCallStatRemoteObjCR { public: - RGWLogStatRemoteObjCR(RGWDataSyncEnv *_sync_env, - rgw_bucket& _src_bucket, rgw_obj_key& _key) : RGWCallStatRemoteObjCR(_sync_env, _src_bucket, _key) { + RGWLogStatRemoteObjCR(RGWDataSyncCtx *_sc, + rgw_bucket& _src_bucket, rgw_obj_key& _key) : RGWCallStatRemoteObjCR(_sc, _src_bucket, _key) { } ~RGWLogStatRemoteObjCR() override {} RGWStatRemoteObjCBCR *allocate_callback() override { - return new RGWLogStatRemoteObjCBCR(sync_env, src_bucket, key); + return new RGWLogStatRemoteObjCBCR(sc, src_bucket, key); } }; @@ -41,17 +41,17 @@ class RGWLogDataSyncModule : public RGWDataSyncModule { public: explicit RGWLogDataSyncModule(const string& _prefix) : prefix(_prefix) {} - RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; - return new RGWLogStatRemoteObjCR(sync_env, sync_pipe.source_bs.bucket, key); + RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { + ldout(sc->cct, 0) << prefix << ": SYNC_LOG: sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; + return new RGWLogStatRemoteObjCR(sc, sync_pipe.source_bs.bucket, key); } - RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; + RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { + ldout(sc->cct, 0) << prefix << ": SYNC_LOG: rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return NULL; } - RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, + RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime + ldout(sc->cct, 0) << prefix << ": SYNC_LOG: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return NULL; } diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index af797aa4a107..008d35f05849 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -584,6 +584,7 @@ class PSSubscription { friend class InitCR; friend class RGWPSHandleObjEventCR; + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; PSEnvRef env; PSSubConfigRef sub_conf; @@ -595,6 +596,7 @@ class PSSubscription { InitCR *init_cr{nullptr}; class InitBucketLifecycleCR : public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; PSConfigRef& conf; LCRule rule; @@ -604,11 +606,11 @@ class PSSubscription { rgw_bucket_lifecycle_config_params lc_config; public: - InitBucketLifecycleCR(RGWDataSyncEnv *_sync_env, + InitBucketLifecycleCR(RGWDataSyncCtx *_sc, PSConfigRef& _conf, RGWBucketInfo& _bucket_info, - std::map& _bucket_attrs) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), + std::map& _bucket_attrs) : RGWCoroutine(_sc->cct), + sc(_sc), sync_env(_sc->env), conf(_conf) { lc_config.bucket_info = _bucket_info; lc_config.bucket_attrs = _bucket_attrs; @@ -663,6 +665,7 @@ class PSSubscription { }; class InitCR : public RGWSingletonCR { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; PSSubscriptionRef sub; rgw_get_bucket_info_params get_bucket_info; @@ -672,9 +675,9 @@ class PSSubscription { int i; public: - InitCR(RGWDataSyncEnv *_sync_env, - PSSubscriptionRef& _sub) : RGWSingletonCR(_sync_env->cct), - sync_env(_sync_env), + InitCR(RGWDataSyncCtx *_sc, + PSSubscriptionRef& _sub) : RGWSingletonCR(_sc->cct), + sc(_sc), sync_env(_sc->env), sub(_sub), conf(sub->env->conf), sub_conf(sub->sub_conf) { } @@ -706,7 +709,7 @@ class PSSubscription { } } - yield call(new InitBucketLifecycleCR(sync_env, conf, + yield call(new InitBucketLifecycleCR(sc, conf, sub->get_bucket_info_result->bucket_info, sub->get_bucket_info_result->attrs)); if (retcode < 0) { @@ -744,16 +747,17 @@ class PSSubscription { template class StoreEventCR : public RGWCoroutine { + RGWDataSyncCtx* const sc; RGWDataSyncEnv* const sync_env; const PSSubscriptionRef sub; const PSEvent pse; const string oid_prefix; public: - StoreEventCR(RGWDataSyncEnv* const _sync_env, + StoreEventCR(RGWDataSyncCtx* const _sc, const PSSubscriptionRef& _sub, - const EventRef& _event) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), + const EventRef& _event) : RGWCoroutine(_sc->cct), + sc(_sc), sync_env(_sc->env), sub(_sub), pse(_event), oid_prefix(sub->sub_conf->data_oid_prefix) { @@ -795,15 +799,16 @@ class PSSubscription { template class PushEventCR : public RGWCoroutine { + RGWDataSyncCtx* const sc; RGWDataSyncEnv* const sync_env; const EventRef event; const PSSubConfigRef& sub_conf; public: - PushEventCR(RGWDataSyncEnv* const _sync_env, + PushEventCR(RGWDataSyncCtx* const _sc, const PSSubscriptionRef& _sub, - const EventRef& _event) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), + const EventRef& _event) : RGWCoroutine(_sc->cct), + sc(_sc), sync_env(_sc->env), event(_event), sub_conf(_sub->sub_conf) { } @@ -828,16 +833,16 @@ class PSSubscription { }; public: - PSSubscription(RGWDataSyncEnv *_sync_env, + PSSubscription(RGWDataSyncCtx *_sc, PSEnvRef _env, - PSSubConfigRef& _sub_conf) : sync_env(_sync_env), + PSSubConfigRef& _sub_conf) : sc(_sc), sync_env(_sc->env), env(_env), sub_conf(_sub_conf), data_access(std::make_shared(sync_env->store)) {} - PSSubscription(RGWDataSyncEnv *_sync_env, + PSSubscription(RGWDataSyncCtx *_sc, PSEnvRef _env, - rgw_pubsub_sub_config& user_sub_conf) : sync_env(_sync_env), + rgw_pubsub_sub_config& user_sub_conf) : sc(_sc), sync_env(_sc->env), env(_env), sub_conf(std::make_shared()), data_access(std::make_shared(sync_env->store)) { @@ -850,11 +855,11 @@ public: } template - static PSSubscriptionRef get_shared(RGWDataSyncEnv *_sync_env, + static PSSubscriptionRef get_shared(RGWDataSyncCtx *_sc, PSEnvRef _env, C& _sub_conf) { - auto sub = std::make_shared(_sync_env, _env, _sub_conf); - sub->init_cr = new InitCR(_sync_env, sub); + auto sub = std::make_shared(_sc, _env, _sub_conf); + sub->init_cr = new InitCR(_sc, sub); sub->init_cr->get(); return sub; } @@ -864,25 +869,27 @@ public: } template - static RGWCoroutine *store_event_cr(RGWDataSyncEnv* const sync_env, const PSSubscriptionRef& sub, const EventRef& event) { - return new StoreEventCR(sync_env, sub, event); + static RGWCoroutine *store_event_cr(RGWDataSyncCtx* const sc, const PSSubscriptionRef& sub, const EventRef& event) { + return new StoreEventCR(sc, sub, event); } template - static RGWCoroutine *push_event_cr(RGWDataSyncEnv* const sync_env, const PSSubscriptionRef& sub, const EventRef& event) { - return new PushEventCR(sync_env, sub, event); + static RGWCoroutine *push_event_cr(RGWDataSyncCtx* const sc, const PSSubscriptionRef& sub, const EventRef& event) { + return new PushEventCR(sc, sub, event); } friend class InitCR; }; class PSManager { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; PSEnvRef env; std::map subs; class GetSubCR : public RGWSingletonCR { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; PSManagerRef mgr; rgw_user owner; @@ -896,12 +903,12 @@ class PSManager rgw_pubsub_sub_config user_sub_conf; public: - GetSubCR(RGWDataSyncEnv *_sync_env, + GetSubCR(RGWDataSyncCtx *_sc, PSManagerRef& _mgr, const rgw_user& _owner, const string& _sub_name, - PSSubscriptionRef *_ref) : RGWSingletonCR(_sync_env->cct), - sync_env(_sync_env), + PSSubscriptionRef *_ref) : RGWSingletonCR(_sc->cct), + sc(_sc), sync_env(_sc->env), mgr(_mgr), owner(_owner), sub_name(_sub_name), @@ -919,7 +926,7 @@ class PSManager return set_cr_error(-ENOENT); } - *ref = PSSubscription::get_shared(sync_env, mgr->env, sub_conf); + *ref = PSSubscription::get_shared(sc, mgr->env, sub_conf); } else { using ReadInfoCR = RGWSimpleRadosReadCR; yield { @@ -936,7 +943,7 @@ class PSManager return set_cr_error(retcode); } - *ref = PSSubscription::get_shared(sync_env, mgr->env, user_sub_conf); + *ref = PSSubscription::get_shared(sc, mgr->env, user_sub_conf); } yield (*ref)->call_init_cr(this); @@ -992,28 +999,28 @@ class PSManager return false; } - PSManager(RGWDataSyncEnv *_sync_env, - PSEnvRef _env) : sync_env(_sync_env), + PSManager(RGWDataSyncCtx *_sc, + PSEnvRef _env) : sc(_sc), sync_env(_sc->env), env(_env) {} public: - static PSManagerRef get_shared(RGWDataSyncEnv *_sync_env, + static PSManagerRef get_shared(RGWDataSyncCtx *_sc, PSEnvRef _env) { - return std::shared_ptr(new PSManager(_sync_env, _env)); + return std::shared_ptr(new PSManager(_sc, _env)); } - static int call_get_subscription_cr(RGWDataSyncEnv *sync_env, PSManagerRef& mgr, + static int call_get_subscription_cr(RGWDataSyncCtx *sc, PSManagerRef& mgr, RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) { if (mgr->find_sub_instance(owner, sub_name, ref)) { /* found it! nothing to execute */ - ldout(sync_env->cct, 20) << __func__ << "(): found sub instance" << dendl; + ldout(sc->cct, 20) << __func__ << "(): found sub instance" << dendl; } auto& gs = mgr->get_get_subs(owner, sub_name); if (!gs) { - ldout(sync_env->cct, 20) << __func__ << "(): first get subs" << dendl; - gs = new GetSubCR(sync_env, mgr, owner, sub_name, ref); + ldout(sc->cct, 20) << __func__ << "(): first get subs" << dendl; + gs = new GetSubCR(sc, mgr, owner, sub_name, ref); } - ldout(sync_env->cct, 20) << __func__ << "(): executing get subs" << dendl; + ldout(sc->cct, 20) << __func__ << "(): executing get subs" << dendl; return gs->execute(caller, ref); } @@ -1026,6 +1033,7 @@ void PSEnv::init_instance(const RGWRealm& realm, uint64_t instance_id, PSManager } class RGWPSInitEnvCBCR : public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; PSEnvRef env; PSConfigRef& conf; @@ -1033,13 +1041,13 @@ class RGWPSInitEnvCBCR : public RGWCoroutine { rgw_user_create_params create_user; rgw_get_user_info_params get_user_info; public: - RGWPSInitEnvCBCR(RGWDataSyncEnv *_sync_env, - PSEnvRef& _env) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), + RGWPSInitEnvCBCR(RGWDataSyncCtx *_sc, + PSEnvRef& _env) : RGWCoroutine(_sc->cct), + sc(_sc), sync_env(_sc->env), env(_env), conf(env->conf) {} int operate() override { reenter(this) { - ldpp_dout(sync_env->dpp, 1) << ": init pubsub config zone=" << sync_env->source_zone << dendl; + ldpp_dout(sync_env->dpp, 1) << ": init pubsub config zone=" << sc->source_zone << dendl; /* nothing to do here right now */ create_user.user = conf->user; @@ -1079,6 +1087,7 @@ bool match(const rgw_pubsub_topic_filter& filter, const std::string& key_name, r } class RGWPSFindBucketTopicsCR : public RGWCoroutine { + RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; PSEnvRef env; rgw_user owner; @@ -1094,20 +1103,20 @@ class RGWPSFindBucketTopicsCR : public RGWCoroutine { rgw_pubsub_user_topics user_topics; TopicsRef *topics; public: - RGWPSFindBucketTopicsCR(RGWDataSyncEnv *_sync_env, + RGWPSFindBucketTopicsCR(RGWDataSyncCtx *_sc, PSEnvRef& _env, const rgw_user& _owner, const rgw_bucket& _bucket, const rgw_obj_key& _key, rgw::notify::EventType _event_type, - TopicsRef *_topics) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), + TopicsRef *_topics) : RGWCoroutine(_sc->cct), + sc(_sc), sync_env(_sc->env), env(_env), owner(_owner), bucket(_bucket), key(_key), event_type(_event_type), - ups(_sync_env->store, owner), + ups(sync_env->store, owner), topics(_topics) { *topics = std::make_shared >(); } @@ -1163,7 +1172,7 @@ public: }; class RGWPSHandleObjEventCR : public RGWCoroutine { - RGWDataSyncEnv* const sync_env; + RGWDataSyncCtx* const sc; const PSEnvRef env; const rgw_user& owner; const EventRef event; @@ -1180,13 +1189,13 @@ class RGWPSHandleObjEventCR : public RGWCoroutine { int last_sub_conf_error; public: - RGWPSHandleObjEventCR(RGWDataSyncEnv* const _sync_env, + RGWPSHandleObjEventCR(RGWDataSyncCtx* const _sc, const PSEnvRef _env, const rgw_user& _owner, const EventRef& _event, const EventRef& _record, - const TopicsRef& _topics) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), + const TopicsRef& _topics) : RGWCoroutine(_sc->cct), + sc(_sc), env(_env), owner(_owner), event(_event), @@ -1198,11 +1207,11 @@ public: int operate() override { reenter(this) { - ldout(sync_env->cct, 20) << ": handle event: obj: z=" << sync_env->source_zone + ldout(sc->cct, 20) << ": handle event: obj: z=" << sc->source_zone << " event=" << json_str("event", *event, false) << " owner=" << owner << dendl; - ldout(sync_env->cct, 20) << "pubsub: " << topics->size() << " topics found for path" << dendl; + ldout(sc->cct, 20) << "pubsub: " << topics->size() << " topics found for path" << dendl; // outside caller should check that ceph_assert(!topics->empty()); @@ -1211,17 +1220,17 @@ public: // loop over all topics related to the bucket/object for (titer = topics->begin(); titer != topics->end(); ++titer) { - ldout(sync_env->cct, 20) << ": notification for " << event->source << ": topic=" << + ldout(sc->cct, 20) << ": notification for " << event->source << ": topic=" << (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl; // loop over all subscriptions of the topic for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) { - ldout(sync_env->cct, 20) << ": subscription: " << *siter << dendl; + ldout(sc->cct, 20) << ": subscription: " << *siter << dendl; has_subscriptions = true; sub_conf_found = false; // try to read subscription configuration from global/user cond // configuration is considered missing only if does not exist in either for (oiter = owners.begin(); oiter != owners.end(); ++oiter) { - yield PSManager::call_get_subscription_cr(sync_env, env->manager, this, *oiter, *siter, &sub); + yield PSManager::call_get_subscription_cr(sc, env->manager, this, *oiter, *siter, &sub); if (retcode < 0) { if (sub_conf_found) { // not a real issue, sub conf already found @@ -1233,21 +1242,21 @@ public: sub_conf_found = true; if (sub->sub_conf->s3_id.empty()) { // subscription was not made by S3 compatible API - ldout(sync_env->cct, 20) << "storing event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; - yield call(PSSubscription::store_event_cr(sync_env, sub, event)); + ldout(sc->cct, 20) << "storing event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; + yield call(PSSubscription::store_event_cr(sc, sub, event)); if (retcode < 0) { if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail); - ldout(sync_env->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl; + ldout(sc->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl; } else { if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok); event_handled = true; } if (sub->sub_conf->push_endpoint) { - ldout(sync_env->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; - yield call(PSSubscription::push_event_cr(sync_env, sub, event)); + ldout(sc->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; + yield call(PSSubscription::push_event_cr(sc, sub, event)); if (retcode < 0) { if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); - ldout(sync_env->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl; + ldout(sc->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl; } else { if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); event_handled = true; @@ -1255,23 +1264,23 @@ public: } } else { // subscription was made by S3 compatible API - ldout(sync_env->cct, 20) << "storing record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; + ldout(sc->cct, 20) << "storing record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; record->configurationId = sub->sub_conf->s3_id; record->opaque_data = (*titer)->opaque_data; - yield call(PSSubscription::store_event_cr(sync_env, sub, record)); + yield call(PSSubscription::store_event_cr(sc, sub, record)); if (retcode < 0) { if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail); - ldout(sync_env->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl; + ldout(sc->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl; } else { if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok); event_handled = true; } if (sub->sub_conf->push_endpoint) { - ldout(sync_env->cct, 20) << "push record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; - yield call(PSSubscription::push_event_cr(sync_env, sub, record)); + ldout(sc->cct, 20) << "push record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; + yield call(PSSubscription::push_event_cr(sc, sub, record)); if (retcode < 0) { if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); - ldout(sync_env->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl; + ldout(sc->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl; } else { if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); event_handled = true; @@ -1282,7 +1291,7 @@ public: if (!sub_conf_found) { // could not find conf for subscription at user or global levels if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf); - ldout(sync_env->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter + ldout(sc->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter << " ret=" << last_sub_conf_error << dendl; if (retcode == -ENOENT) { // missing subscription info should be reflected back as invalid argument @@ -1308,7 +1317,7 @@ public: // coroutine invoked on remote object creation class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { - RGWDataSyncEnv *sync_env; + RGWDataSyncCtx *sc; rgw_bucket_sync_pipe sync_pipe; PSEnvRef env; std::optional versioned_epoch; @@ -1316,11 +1325,11 @@ class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { EventRef record; TopicsRef topics; public: - RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env, + RGWPSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, PSEnvRef _env, std::optional _versioned_epoch, - TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sync_env, _sync_pipe.source_bs.bucket, _key), - sync_env(_sync_env), + TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.source_bs.bucket, _key), + sc(_sc), sync_pipe(_sync_pipe), env(_env), versioned_epoch(_versioned_epoch), @@ -1328,7 +1337,7 @@ public: } int operate() override { reenter(this) { - ldout(sync_env->cct, 20) << ": stat of remote obj: z=" << sync_env->source_zone + ldout(sc->cct, 20) << ": stat of remote obj: z=" << sc->source_zone << " b=" << sync_pipe.source_bs.bucket << " k=" << key << " size=" << size << " mtime=" << mtime << " attrs=" << attrs << dendl; { @@ -1343,18 +1352,18 @@ public: // at this point we don't know whether we need the ceph event or S3 record // this is why both are created here, once we have information about the // subscription, we will store/push only the relevant ones - make_event_ref(sync_env->cct, + make_event_ref(sc->cct, sync_pipe.source_bs.bucket, key, mtime, &attrs, rgw::notify::ObjectCreated, &event); - make_s3_record_ref(sync_env->cct, + make_s3_record_ref(sc->cct, sync_pipe.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key, mtime, &attrs, rgw::notify::ObjectCreated, &record); } #warning should it be source owner? - yield call(new RGWPSHandleObjEventCR(sync_env, env, sync_pipe.dest_bucket_info.owner, event, record, topics)); + yield call(new RGWPSHandleObjEventCR(sc, env, sync_pipe.dest_bucket_info.owner, event, record, topics)); if (retcode < 0) { return set_cr_error(retcode); } @@ -1370,10 +1379,10 @@ class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR { std::optional versioned_epoch; TopicsRef topics; public: - RGWPSHandleRemoteObjCR(RGWDataSyncEnv *_sync_env, + RGWPSHandleRemoteObjCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, PSEnvRef _env, std::optional _versioned_epoch, - TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sync_env, _sync_pipe.source_bs.bucket, _key), + TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.source_bs.bucket, _key), sync_pipe(_sync_pipe), env(_env), versioned_epoch(_versioned_epoch), topics(_topics) { @@ -1382,23 +1391,22 @@ public: ~RGWPSHandleRemoteObjCR() override {} RGWStatRemoteObjCBCR *allocate_callback() override { - return new RGWPSHandleRemoteObjCBCR(sync_env, sync_pipe, key, env, versioned_epoch, topics); + return new RGWPSHandleRemoteObjCBCR(sc, sync_pipe, key, env, versioned_epoch, topics); } }; class RGWPSHandleObjCreateCR : public RGWCoroutine { - - RGWDataSyncEnv *sync_env; + RGWDataSyncCtx *sc; rgw_bucket_sync_pipe sync_pipe; rgw_obj_key key; PSEnvRef env; std::optional versioned_epoch; TopicsRef topics; public: - RGWPSHandleObjCreateCR(RGWDataSyncEnv *_sync_env, + RGWPSHandleObjCreateCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, - PSEnvRef _env, std::optional _versioned_epoch) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), + PSEnvRef _env, std::optional _versioned_epoch) : RGWCoroutine(_sc->cct), + sc(_sc), sync_pipe(_sync_pipe), key(_key), env(_env), @@ -1409,19 +1417,19 @@ public: int operate() override { reenter(this) { - yield call(new RGWPSFindBucketTopicsCR(sync_env, env, sync_pipe.dest_bucket_info.owner, + yield call(new RGWPSFindBucketTopicsCR(sc, env, sync_pipe.dest_bucket_info.owner, sync_pipe.source_bs.bucket, key, rgw::notify::ObjectCreated, &topics)); if (retcode < 0) { - ldout(sync_env->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; + ldout(sc->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; return set_cr_error(retcode); } if (topics->empty()) { - ldout(sync_env->cct, 20) << "no topics found for " << sync_pipe.source_bs.bucket << "/" << key << dendl; + ldout(sc->cct, 20) << "no topics found for " << sync_pipe.source_bs.bucket << "/" << key << dendl; return set_cr_done(); } - yield call(new RGWPSHandleRemoteObjCR(sync_env, sync_pipe, key, env, versioned_epoch, topics)); + yield call(new RGWPSHandleRemoteObjCR(sc, sync_pipe, key, env, versioned_epoch, topics)); if (retcode < 0) { return set_cr_error(retcode); } @@ -1433,7 +1441,7 @@ public: // coroutine invoked on remote object deletion class RGWPSGenericObjEventCBCR : public RGWCoroutine { - RGWDataSyncEnv *sync_env; + RGWDataSyncCtx *sc; PSEnvRef env; rgw_user owner; rgw_bucket bucket; @@ -1444,11 +1452,11 @@ class RGWPSGenericObjEventCBCR : public RGWCoroutine { EventRef record; TopicsRef topics; public: - RGWPSGenericObjEventCBCR(RGWDataSyncEnv *_sync_env, + RGWPSGenericObjEventCBCR(RGWDataSyncCtx *_sc, PSEnvRef _env, rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime, - rgw::notify::EventType _event_type) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), + rgw::notify::EventType _event_type) : RGWCoroutine(_sc->cct), + sc(_sc), env(_env), owner(_sync_pipe.dest_bucket_info.owner), bucket(_sync_pipe.dest_bucket_info.bucket), @@ -1456,29 +1464,29 @@ public: mtime(_mtime), event_type(_event_type) {} int operate() override { reenter(this) { - ldout(sync_env->cct, 20) << ": remove remote obj: z=" << sync_env->source_zone + ldout(sc->cct, 20) << ": remove remote obj: z=" << sc->source_zone << " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl; - yield call(new RGWPSFindBucketTopicsCR(sync_env, env, owner, bucket, key, event_type, &topics)); + yield call(new RGWPSFindBucketTopicsCR(sc, env, owner, bucket, key, event_type, &topics)); if (retcode < 0) { - ldout(sync_env->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; + ldout(sc->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; return set_cr_error(retcode); } if (topics->empty()) { - ldout(sync_env->cct, 20) << "no topics found for " << bucket << "/" << key << dendl; + ldout(sc->cct, 20) << "no topics found for " << bucket << "/" << key << dendl; return set_cr_done(); } // at this point we don't know whether we need the ceph event or S3 record // this is why both are created here, once we have information about the // subscription, we will store/push only the relevant ones - make_event_ref(sync_env->cct, + make_event_ref(sc->cct, bucket, key, mtime, nullptr, event_type, &event); - make_s3_record_ref(sync_env->cct, + make_s3_record_ref(sc->cct, bucket, owner, key, mtime, nullptr, event_type, &record); - yield call(new RGWPSHandleObjEventCR(sync_env, env, owner, event, record, topics)); + yield call(new RGWPSHandleObjEventCR(sc, env, owner, event, record, topics)); if (retcode < 0) { return set_cr_error(retcode); } @@ -1500,35 +1508,36 @@ public: ~RGWPSDataSyncModule() override {} - void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override { - PSManagerRef mgr = PSManager::get_shared(sync_env, env); - env->init_instance(sync_env->store->svc()->zone->get_realm(), instance_id, mgr); + void init(RGWDataSyncCtx *sc, uint64_t instance_id) override { + auto sync_env = sc->env; + PSManagerRef mgr = PSManager::get_shared(sc, env); + env->init_instance(sync_env->svc->zone->get_realm(), instance_id, mgr); } - RGWCoroutine *start_sync(RGWDataSyncEnv *sync_env) override { - ldout(sync_env->cct, 5) << conf->id << ": start" << dendl; - return new RGWPSInitEnvCBCR(sync_env, env); + RGWCoroutine *start_sync(RGWDataSyncCtx *sc) override { + ldout(sc->cct, 5) << conf->id << ": start" << dendl; + return new RGWPSInitEnvCBCR(sc, env); } - RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, + RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe << + ldout(sc->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; - return new RGWPSHandleObjCreateCR(sync_env, sync_pipe, key, env, versioned_epoch); + return new RGWPSHandleObjCreateCR(sc, sync_pipe, key, env, versioned_epoch); } - RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, + RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe << + ldout(sc->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; - return new RGWPSGenericObjEventCBCR(sync_env, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDelete); + return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDelete); } - RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, + RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe << + ldout(sc->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; - return new RGWPSGenericObjEventCBCR(sync_env, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated); + return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated); } PSConfigRef& get_conf() { return conf; } -- 2.47.3