From f30c9663ce992fe0c94cac0cabc9b0163ee45b8b Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 11 Jul 2016 14:09:29 -0700 Subject: [PATCH] rgw: define sync modules manager, instance Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_data_sync.cc | 38 ++++++++++++++++++++++----- src/rgw/rgw_data_sync.h | 18 ++++++++----- src/rgw/rgw_rados.cc | 6 +++++ src/rgw/rgw_rados.h | 6 +++++ src/rgw/rgw_sync_module.h | 54 ++++++++++++++++++++++++++++++++++++++- 5 files changed, 108 insertions(+), 14 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 02f84d4dbc812..97e3cc235b873 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -592,7 +592,7 @@ int RGWRemoteDataLog::read_source_log_shards_next(map shard_markers return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result)); } -int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWDataSyncModuleRef& _sync_module) +int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& _sync_module) { sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, _source_zone, _sync_module); @@ -1496,6 +1496,21 @@ public: rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch) override; }; +class RGWDefaultSyncModuleInstance : public RGWSyncModuleInstance { + RGWDefaultDataSyncModule data_handler; +public: + RGWDefaultSyncModuleInstance() {} + RGWDataSyncModule *get_data_handler() override { + return &data_handler; + } +}; + +int RGWDefaultSyncModule::create_instance(map& config, RGWSyncModuleInstanceRef *instance) +{ + instance->reset(new RGWDefaultSyncModuleInstance()); + return 0; +} + RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch) { return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info, @@ -1600,7 +1615,13 @@ int RGWDataSyncStatusManager::init() error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS); - sync_module.reset(new RGWDefaultDataSyncModule()); + map sync_module_config; + r = store->get_sync_modules_manager()->create_instance("default", sync_module_config, &sync_module); + if (r < 0) { + lderr(store->ctx()) << "ERROR: failed to init sync module instance, r=" << r << dendl; + finalize(); + return r; + } r = source_log.init(source_zone, conn, error_logger, sync_module); if (r < 0) { @@ -1652,7 +1673,7 @@ string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int s int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn, const rgw_bucket& bucket, int shard_id, RGWSyncErrorLogger *_error_logger, - RGWDataSyncModuleRef& _sync_module) + RGWSyncModuleInstanceRef& _sync_module) { conn = _conn; source_zone = _source_zone; @@ -2142,6 +2163,7 @@ class RGWBucketSyncSingleEntryCR : public RGWCoroutine { bool error_injection; + RGWDataSyncModule *data_sync_module; public: RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env, @@ -2170,6 +2192,8 @@ public: logger.init(sync_env, "Object", ss.str()); error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0); + + data_sync_module = sync_env->sync_module->get_data_handler(); } int operate() { @@ -2202,19 +2226,19 @@ public: set_status("syncing obj"); ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl; logger.log("fetch"); - call(sync_env->sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch)); + call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch)); } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) { set_status("removing obj"); if (op == CLS_RGW_OP_UNLINK_INSTANCE) { versioned = true; } logger.log("remove"); - call(sync_env->sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch)); + call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch)); } else if (op == CLS_RGW_OP_LINK_OLH_DM) { logger.log("creating delete marker"); set_status("creating delete marker"); ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl; - call(sync_env->sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch)); + call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch)); } } } while (marker_tracker->need_retry(key)); @@ -2798,7 +2822,7 @@ int RGWBucketSyncStatusManager::init() error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS); - sync_module.reset(new RGWDefaultDataSyncModule()); + sync_module.reset(new RGWDefaultSyncModuleInstance()); int effective_num_shards = (num_shards ? num_shards : 1); diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index e4d2b14c9cc63..78201d8b559ad 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -208,14 +208,14 @@ struct RGWDataSyncEnv { RGWHTTPManager *http_manager; RGWSyncErrorLogger *error_logger; string source_zone; - RGWDataSyncModuleRef sync_module; + RGWSyncModuleInstanceRef sync_module; RGWDataSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL), sync_module(NULL) {} void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn, RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager, RGWSyncErrorLogger *_error_logger, const string& _source_zone, - RGWDataSyncModuleRef& _sync_module) { + RGWSyncModuleInstanceRef& _sync_module) { cct = _cct; store = _store; conn = _conn; @@ -249,7 +249,7 @@ public: http_manager(store->ctx(), completion_mgr), lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL), initialized(false) {} - int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWDataSyncModuleRef& module); + int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& module); void finish(); int read_log_info(rgw_datalog_info *log_info); @@ -270,7 +270,7 @@ class RGWDataSyncStatusManager { string source_zone; RGWRESTConn *conn; RGWSyncErrorLogger *error_logger; - RGWDataSyncModuleRef sync_module; + RGWSyncModuleInstanceRef sync_module; RGWRemoteDataLog source_log; @@ -461,7 +461,7 @@ public: int init(const string& _source_zone, RGWRESTConn *_conn, const rgw_bucket& bucket, int shard_id, RGWSyncErrorLogger *_error_logger, - RGWDataSyncModuleRef& _sync_module); + RGWSyncModuleInstanceRef& _sync_module); void finish(); RGWCoroutine *read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status); @@ -483,7 +483,7 @@ class RGWBucketSyncStatusManager { string source_zone; RGWRESTConn *conn; RGWSyncErrorLogger *error_logger; - RGWDataSyncModuleRef sync_module; + RGWSyncModuleInstanceRef sync_module; rgw_bucket bucket; @@ -521,6 +521,12 @@ public: int run(); }; +class RGWDefaultSyncModule : public RGWSyncModule { +public: + RGWDefaultSyncModule() {} + int create_instance(map& config, RGWSyncModuleInstanceRef *instance) override; +}; + class RGWDataLogTrimCR : public RGWCoroutine { RGWRados *store; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 61e698c523d59..4a04aeee31a6f 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -3268,6 +3268,7 @@ void RGWRados::finalize() delete meta_mgr; delete binfo_cache; delete obj_tombstone_cache; + delete sync_modules_manager; } /** @@ -3291,6 +3292,11 @@ int RGWRados::init_rados() } } + sync_modules_manager = new RGWSyncModulesManager(); + + RGWSyncModuleRef default_module(new RGWDefaultSyncModule()); + sync_modules_manager->register_module("default", default_module); + auto crs = std::unique_ptr{ new RGWCoroutinesManagerRegistry(cct)}; ret = crs->hook_to_admin_command("cr dump"); diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 0caecab2909c7..4aa2976c823b2 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -22,6 +22,7 @@ #include "rgw_metadata.h" #include "rgw_meta_sync_status.h" #include "rgw_period_puller.h" +#include "rgw_sync_module.h" class RGWWatcher; class SafeTimer; @@ -1889,6 +1890,8 @@ protected: RGWCoroutinesManagerRegistry *cr_registry; + RGWSyncModulesManager *sync_modules_manager{nullptr}; + RGWZoneGroup zonegroup; RGWZone zone_public_config; /* external zone params, e.g., entrypoints, log flags, etc. */ RGWZoneParams zone_params; /* internal zone params, e.g., rados pools */ @@ -2030,6 +2033,9 @@ public: return obj_tombstone_cache; } + RGWSyncModulesManager *get_sync_modules_manager() { + return sync_modules_manager; + } int get_required_alignment(rgw_bucket& bucket, uint64_t *alignment); int get_max_chunk_size(rgw_bucket& bucket, uint64_t *max_chunk_size); diff --git a/src/rgw/rgw_sync_module.h b/src/rgw/rgw_sync_module.h index d8a2792e41f0e..06a9e8452fa31 100644 --- a/src/rgw/rgw_sync_module.h +++ b/src/rgw/rgw_sync_module.h @@ -22,6 +22,58 @@ public: rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch) = 0; }; -typedef std::shared_ptr RGWDataSyncModuleRef; +class RGWSyncModuleInstance { +public: + RGWSyncModuleInstance() {} + virtual ~RGWSyncModuleInstance() {} + virtual RGWDataSyncModule *get_data_handler() = 0; +}; + +typedef std::shared_ptr RGWSyncModuleInstanceRef; + +class RGWSyncModule { + +public: + RGWSyncModule() {} + virtual ~RGWSyncModule() {} + + virtual int create_instance(map& config, RGWSyncModuleInstanceRef *instance) = 0; +}; + +typedef std::shared_ptr RGWSyncModuleRef; + + +class RGWSyncModulesManager { + Mutex lock; + + map modules; +public: + RGWSyncModulesManager() : lock("RGWSyncModulesManager") {} + + void register_module(const string& name, RGWSyncModuleRef& module) { + Mutex::Locker l(lock); + modules[name] = module; + } + + bool get_module(const string& name, RGWSyncModuleRef *module) { + Mutex::Locker l(lock); + auto iter = modules.find(name); + if (iter == modules.end()) { + return false; + } + *module = iter->second; + return true; + } + + + int create_instance(const string& name, map& config, RGWSyncModuleInstanceRef *instance) { + RGWSyncModuleRef module; + if (!get_module(name, &module)) { + return -ENOENT; + } + + return module.get()->create_instance(config, instance); + } +}; #endif -- 2.39.5