return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result));
}
-int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWDataSyncModuleRef& _sync_module)
+int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& _sync_module)
{
sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, _source_zone, _sync_module);
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch) override;
};
+class RGWDefaultSyncModuleInstance : public RGWSyncModuleInstance {
+ RGWDefaultDataSyncModule data_handler;
+public:
+ RGWDefaultSyncModuleInstance() {}
+ RGWDataSyncModule *get_data_handler() override {
+ return &data_handler;
+ }
+};
+
+int RGWDefaultSyncModule::create_instance(map<string, string>& config, RGWSyncModuleInstanceRef *instance)
+{
+ instance->reset(new RGWDefaultSyncModuleInstance());
+ return 0;
+}
+
RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch)
{
return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
- sync_module.reset(new RGWDefaultDataSyncModule());
+ map<string, string> sync_module_config;
+ r = store->get_sync_modules_manager()->create_instance("default", sync_module_config, &sync_module);
+ if (r < 0) {
+ lderr(store->ctx()) << "ERROR: failed to init sync module instance, r=" << r << dendl;
+ finalize();
+ return r;
+ }
r = source_log.init(source_zone, conn, error_logger, sync_module);
if (r < 0) {
int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
const rgw_bucket& bucket, int shard_id,
RGWSyncErrorLogger *_error_logger,
- RGWDataSyncModuleRef& _sync_module)
+ RGWSyncModuleInstanceRef& _sync_module)
{
conn = _conn;
source_zone = _source_zone;
bool error_injection;
+ RGWDataSyncModule *data_sync_module;
public:
RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
logger.init(sync_env, "Object", ss.str());
error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0);
+
+ data_sync_module = sync_env->sync_module->get_data_handler();
}
int operate() {
set_status("syncing obj");
ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
logger.log("fetch");
- call(sync_env->sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch));
+ call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch));
} else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
set_status("removing obj");
if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
versioned = true;
}
logger.log("remove");
- call(sync_env->sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch));
+ call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch));
} else if (op == CLS_RGW_OP_LINK_OLH_DM) {
logger.log("creating delete marker");
set_status("creating delete marker");
ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
- call(sync_env->sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch));
+ call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch));
}
}
} while (marker_tracker->need_retry(key));
error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
- sync_module.reset(new RGWDefaultDataSyncModule());
+ sync_module.reset(new RGWDefaultSyncModuleInstance());
int effective_num_shards = (num_shards ? num_shards : 1);
RGWHTTPManager *http_manager;
RGWSyncErrorLogger *error_logger;
string source_zone;
- RGWDataSyncModuleRef sync_module;
+ RGWSyncModuleInstanceRef sync_module;
RGWDataSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL), sync_module(NULL) {}
void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
RGWSyncErrorLogger *_error_logger, const string& _source_zone,
- RGWDataSyncModuleRef& _sync_module) {
+ RGWSyncModuleInstanceRef& _sync_module) {
cct = _cct;
store = _store;
conn = _conn;
http_manager(store->ctx(), completion_mgr),
lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
initialized(false) {}
- int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWDataSyncModuleRef& module);
+ int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& module);
void finish();
int read_log_info(rgw_datalog_info *log_info);
string source_zone;
RGWRESTConn *conn;
RGWSyncErrorLogger *error_logger;
- RGWDataSyncModuleRef sync_module;
+ RGWSyncModuleInstanceRef sync_module;
RGWRemoteDataLog source_log;
int init(const string& _source_zone, RGWRESTConn *_conn,
const rgw_bucket& bucket, int shard_id,
RGWSyncErrorLogger *_error_logger,
- RGWDataSyncModuleRef& _sync_module);
+ RGWSyncModuleInstanceRef& _sync_module);
void finish();
RGWCoroutine *read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status);
string source_zone;
RGWRESTConn *conn;
RGWSyncErrorLogger *error_logger;
- RGWDataSyncModuleRef sync_module;
+ RGWSyncModuleInstanceRef sync_module;
rgw_bucket bucket;
int run();
};
+class RGWDefaultSyncModule : public RGWSyncModule {
+public:
+ RGWDefaultSyncModule() {}
+ int create_instance(map<string, string>& config, RGWSyncModuleInstanceRef *instance) override;
+};
+
class RGWDataLogTrimCR : public RGWCoroutine {
RGWRados *store;
delete meta_mgr;
delete binfo_cache;
delete obj_tombstone_cache;
+ delete sync_modules_manager;
}
/**
}
}
+ sync_modules_manager = new RGWSyncModulesManager();
+
+ RGWSyncModuleRef default_module(new RGWDefaultSyncModule());
+ sync_modules_manager->register_module("default", default_module);
+
auto crs = std::unique_ptr<RGWCoroutinesManagerRegistry>{
new RGWCoroutinesManagerRegistry(cct)};
ret = crs->hook_to_admin_command("cr dump");
#include "rgw_metadata.h"
#include "rgw_meta_sync_status.h"
#include "rgw_period_puller.h"
+#include "rgw_sync_module.h"
class RGWWatcher;
class SafeTimer;
RGWCoroutinesManagerRegistry *cr_registry;
+ RGWSyncModulesManager *sync_modules_manager{nullptr};
+
RGWZoneGroup zonegroup;
RGWZone zone_public_config; /* external zone params, e.g., entrypoints, log flags, etc. */
RGWZoneParams zone_params; /* internal zone params, e.g., rados pools */
return obj_tombstone_cache;
}
+ RGWSyncModulesManager *get_sync_modules_manager() {
+ return sync_modules_manager;
+ }
int get_required_alignment(rgw_bucket& bucket, uint64_t *alignment);
int get_max_chunk_size(rgw_bucket& bucket, uint64_t *max_chunk_size);
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch) = 0;
};
-typedef std::shared_ptr<RGWDataSyncModule> RGWDataSyncModuleRef;
+class RGWSyncModuleInstance {
+public:
+ RGWSyncModuleInstance() {}
+ virtual ~RGWSyncModuleInstance() {}
+ virtual RGWDataSyncModule *get_data_handler() = 0;
+};
+
+typedef std::shared_ptr<RGWSyncModuleInstance> RGWSyncModuleInstanceRef;
+
+class RGWSyncModule {
+
+public:
+ RGWSyncModule() {}
+ virtual ~RGWSyncModule() {}
+
+ virtual int create_instance(map<string, string>& config, RGWSyncModuleInstanceRef *instance) = 0;
+};
+
+typedef std::shared_ptr<RGWSyncModule> RGWSyncModuleRef;
+
+
+class RGWSyncModulesManager {
+ Mutex lock;
+
+ map<string, RGWSyncModuleRef> modules;
+public:
+ RGWSyncModulesManager() : lock("RGWSyncModulesManager") {}
+
+ void register_module(const string& name, RGWSyncModuleRef& module) {
+ Mutex::Locker l(lock);
+ modules[name] = module;
+ }
+
+ bool get_module(const string& name, RGWSyncModuleRef *module) {
+ Mutex::Locker l(lock);
+ auto iter = modules.find(name);
+ if (iter == modules.end()) {
+ return false;
+ }
+ *module = iter->second;
+ return true;
+ }
+
+
+ int create_instance(const string& name, map<string, string>& config, RGWSyncModuleInstanceRef *instance) {
+ RGWSyncModuleRef module;
+ if (!get_module(name, &module)) {
+ return -ENOENT;
+ }
+
+ return module.get()->create_instance(config, instance);
+ }
+};
#endif