return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pairs[num], sync_status, nullptr, full_status.incremental_gen);
}
-RGWBucketPipeSyncStatusManager::RGWBucketPipeSyncStatusManager(rgw::sal::RadosStore* _store,
- std::optional<rgw_zone_id> _source_zone,
- std::optional<rgw_bucket> _source_bucket,
- const rgw_bucket& _dest_bucket) : store(_store),
- cr_mgr(_store->ctx(), _store->getRados()->get_cr_registry()),
- http_manager(store->ctx(), cr_mgr.get_completion_mgr()),
- source_zone(_source_zone), source_bucket(_source_bucket),
- conn(NULL), error_logger(NULL),
- dest_bucket(_dest_bucket)
-{
-}
-
-RGWBucketPipeSyncStatusManager::~RGWBucketPipeSyncStatusManager()
-{
- for (vector<RGWRemoteBucketManager *>::iterator iter = source_mgrs.begin(); iter != source_mgrs.end(); ++iter) {
- delete *iter;
- }
- delete error_logger;
-}
-
CephContext *RGWBucketPipeSyncStatusManager::get_cct() const
{
return store->ctx();
return ret;
}
- error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
-
sync_module.reset(new RGWDefaultSyncModuleInstance());
auto async_rados = store->svc()->rados->get_async_processor();
sync_env.init(this, store->ctx(), store,
store->svc(), async_rados, &http_manager,
- error_logger, store->getRados()->get_sync_tracer(),
+ error_logger.get(), store->getRados()->get_sync_tracer(),
sync_module, nullptr);
rgw_sync_pipe_info_set pipes;
return r;
}
const int num_shards = remote_markers.get().size();
- source_mgrs.push_back(new RGWRemoteBucketManager(this, &sync_env,
- szone, conn,
- pipe.source.get_bucket_info(),
- num_shards,
- pipe.target.get_bucket()));
+ source_mgrs.emplace_back(this, &sync_env, szone, conn,
+ pipe.source.get_bucket_info(),
+ num_shards,
+ pipe.target.get_bucket());
}
return 0;
RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
objvs.emplace_back();
infos.emplace_back();
- stack->call(mgr->init_sync_status_cr(objvs.back(), infos.back()));
+ stack->call(mgr.init_sync_status_cr(objvs.back(), infos.back()));
stacks.push_back(stack);
}
for (auto& mgr : source_mgrs) {
RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
- for (int i = 0; i < mgr->num_pipes(); ++i) {
- stack->call(mgr->read_sync_status_cr(i, &sync_status[i]));
+ for (int i = 0; i < mgr.num_pipes(); ++i) {
+ stack->call(mgr.read_sync_status_cr(i, &sync_status[i]));
}
stacks.push_back(stack);
for (auto& mgr : source_mgrs) {
RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
- for (int i = 0; i < mgr->num_pipes(); ++i) {
- stack->call(mgr->run_sync_cr(i));
+ for (int i = 0; i < mgr.num_pipes(); ++i) {
+ stack->call(mgr.run_sync_cr(i));
}
stacks.push_back(stack);
#include "rgw_sal_rados.h"
#include "rgw_datalog.h"
+#include "rgw_sync.h"
#include "rgw_sync_module.h"
#include "rgw_sync_trace.h"
#include "rgw_sync_policy.h"
RGWDataSyncEnv sync_env;
- RGWCoroutinesManager cr_mgr;
+ RGWCoroutinesManager cr_mgr{store->ctx(),
+ store->getRados()->get_cr_registry()};
- RGWHTTPManager http_manager;
+ RGWHTTPManager http_manager{store->ctx(), cr_mgr.get_completion_mgr()};
std::optional<rgw_zone_id> source_zone;
std::optional<rgw_bucket> source_bucket;
- RGWRESTConn *conn;
- RGWSyncErrorLogger *error_logger;
+ RGWRESTConn* conn = nullptr;
+ std::unique_ptr<RGWSyncErrorLogger> error_logger =
+ std::make_unique<RGWSyncErrorLogger>(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX,
+ ERROR_LOGGER_SHARDS);
RGWSyncModuleInstanceRef sync_module;
rgw_bucket dest_bucket;
- std::vector<RGWRemoteBucketManager *> source_mgrs;
+ std::vector<RGWRemoteBucketManager> source_mgrs;
std::map<int, rgw_bucket_shard_sync_info> sync_status;
public:
- RGWBucketPipeSyncStatusManager(rgw::sal::RadosStore* _store,
- std::optional<rgw_zone_id> _source_zone,
- std::optional<rgw_bucket> _source_bucket,
- const rgw_bucket& dest_bucket);
- ~RGWBucketPipeSyncStatusManager();
+ RGWBucketPipeSyncStatusManager(rgw::sal::RadosStore* store,
+ std::optional<rgw_zone_id> source_zone,
+ std::optional<rgw_bucket> source_bucket,
+ const rgw_bucket& dest_bucket)
+ : store(store), source_zone(source_zone), source_bucket(source_bucket),
+ dest_bucket(dest_bucket) {}
+ ~RGWBucketPipeSyncStatusManager() = default;
int init(const DoutPrefixProvider *dpp);
uint64_t gen);
// specific source obj sync status, can be used by sync modules
static std::string obj_status_oid(const rgw_bucket_sync_pipe& sync_pipe,
- const rgw_zone_id& source_zone, const rgw::sal::Object* obj); /* specific source obj sync status,
- can be used by sync modules */
+ const rgw_zone_id& source_zone, const rgw::sal::Object* obj); /* specific source obj sync status,
+ can be used by sync modules */
// implements DoutPrefixProvider
CephContext *get_cct() const override;