]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: define sync modules manager, instance
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 11 Jul 2016 21:09:29 +0000 (14:09 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 7 Oct 2016 16:57:45 +0000 (09:57 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_sync_module.h

index 02f84d4dbc812b79362c62929af07cfb5ea28e62..97e3cc235b873d0ac11e1b424d8a8a168c0fd9fb 100644 (file)
@@ -592,7 +592,7 @@ int RGWRemoteDataLog::read_source_log_shards_next(map<int, string> shard_markers
   return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result));
 }
 
-int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWDataSyncModuleRef& _sync_module)
+int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& _sync_module)
 {
   sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, _source_zone, _sync_module);
 
@@ -1496,6 +1496,21 @@ public:
                                      rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch) override;
 };
 
+class RGWDefaultSyncModuleInstance : public RGWSyncModuleInstance {
+  RGWDefaultDataSyncModule data_handler;
+public:
+  RGWDefaultSyncModuleInstance() {}
+  RGWDataSyncModule *get_data_handler() override {
+    return &data_handler;
+  }
+};
+
+int RGWDefaultSyncModule::create_instance(map<string, string>& config, RGWSyncModuleInstanceRef *instance)
+{
+  instance->reset(new RGWDefaultSyncModuleInstance());
+  return 0;
+}
+
 RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch)
 {
   return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
@@ -1600,7 +1615,13 @@ int RGWDataSyncStatusManager::init()
 
   error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
 
-  sync_module.reset(new RGWDefaultDataSyncModule());
+  map<string, string> sync_module_config;
+  r = store->get_sync_modules_manager()->create_instance("default", sync_module_config, &sync_module);
+  if (r < 0) {
+    lderr(store->ctx()) << "ERROR: failed to init sync module instance, r=" << r << dendl;
+    finalize();
+    return r;
+  }
 
   r = source_log.init(source_zone, conn, error_logger, sync_module);
   if (r < 0) {
@@ -1652,7 +1673,7 @@ string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int s
 int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
                              const rgw_bucket& bucket, int shard_id,
                              RGWSyncErrorLogger *_error_logger,
-                             RGWDataSyncModuleRef& _sync_module)
+                             RGWSyncModuleInstanceRef& _sync_module)
 {
   conn = _conn;
   source_zone = _source_zone;
@@ -2142,6 +2163,7 @@ class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
 
   bool error_injection;
 
+  RGWDataSyncModule *data_sync_module;
 
 public:
   RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
@@ -2170,6 +2192,8 @@ public:
     logger.init(sync_env, "Object", ss.str());
 
     error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0);
+
+    data_sync_module = sync_env->sync_module->get_data_handler();
   }
 
   int operate() {
@@ -2202,19 +2226,19 @@ public:
             set_status("syncing obj");
             ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
             logger.log("fetch");
-            call(sync_env->sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch));
+            call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch));
           } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
             set_status("removing obj");
             if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
               versioned = true;
             }
             logger.log("remove");
-            call(sync_env->sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch));
+            call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch));
           } else if (op == CLS_RGW_OP_LINK_OLH_DM) {
             logger.log("creating delete marker");
             set_status("creating delete marker");
             ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
-            call(sync_env->sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch));
+            call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch));
           }
         }
       } while (marker_tracker->need_retry(key));
@@ -2798,7 +2822,7 @@ int RGWBucketSyncStatusManager::init()
 
   error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
 
-  sync_module.reset(new RGWDefaultDataSyncModule());
+  sync_module.reset(new RGWDefaultSyncModuleInstance());
 
   int effective_num_shards = (num_shards ? num_shards : 1);
 
index e4d2b14c9cc6394ef98a71837bfd5f66e60393be..78201d8b559ad8a00ae702591db8686ea8acbf02 100644 (file)
@@ -208,14 +208,14 @@ struct RGWDataSyncEnv {
   RGWHTTPManager *http_manager;
   RGWSyncErrorLogger *error_logger;
   string source_zone;
-  RGWDataSyncModuleRef sync_module;
+  RGWSyncModuleInstanceRef sync_module;
 
   RGWDataSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL), sync_module(NULL) {}
 
   void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
             RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
             RGWSyncErrorLogger *_error_logger, const string& _source_zone,
-            RGWDataSyncModuleRef& _sync_module) {
+            RGWSyncModuleInstanceRef& _sync_module) {
     cct = _cct;
     store = _store;
     conn = _conn;
@@ -249,7 +249,7 @@ public:
       http_manager(store->ctx(), completion_mgr),
       lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
       initialized(false) {}
-  int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWDataSyncModuleRef& module);
+  int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& module);
   void finish();
 
   int read_log_info(rgw_datalog_info *log_info);
@@ -270,7 +270,7 @@ class RGWDataSyncStatusManager {
   string source_zone;
   RGWRESTConn *conn;
   RGWSyncErrorLogger *error_logger;
-  RGWDataSyncModuleRef sync_module;
+  RGWSyncModuleInstanceRef sync_module;
 
   RGWRemoteDataLog source_log;
 
@@ -461,7 +461,7 @@ public:
   int init(const string& _source_zone, RGWRESTConn *_conn,
            const rgw_bucket& bucket, int shard_id,
            RGWSyncErrorLogger *_error_logger,
-           RGWDataSyncModuleRef& _sync_module);
+           RGWSyncModuleInstanceRef& _sync_module);
   void finish();
 
   RGWCoroutine *read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status);
@@ -483,7 +483,7 @@ class RGWBucketSyncStatusManager {
   string source_zone;
   RGWRESTConn *conn;
   RGWSyncErrorLogger *error_logger;
-  RGWDataSyncModuleRef sync_module;
+  RGWSyncModuleInstanceRef sync_module;
 
   rgw_bucket bucket;
 
@@ -521,6 +521,12 @@ public:
   int run();
 };
 
+class RGWDefaultSyncModule : public RGWSyncModule {
+public:
+  RGWDefaultSyncModule() {}
+  int create_instance(map<string, string>& config, RGWSyncModuleInstanceRef *instance) override;
+};
+
 
 class RGWDataLogTrimCR : public RGWCoroutine {
   RGWRados *store;
index 61e698c523d593744f751780683ede3efd9ca03c..4a04aeee31a6fef90583bbca055dcfbcc261b777 100644 (file)
@@ -3268,6 +3268,7 @@ void RGWRados::finalize()
   delete meta_mgr;
   delete binfo_cache;
   delete obj_tombstone_cache;
+  delete sync_modules_manager;
 }
 
 /** 
@@ -3291,6 +3292,11 @@ int RGWRados::init_rados()
     }
   }
 
+  sync_modules_manager = new RGWSyncModulesManager();
+
+  RGWSyncModuleRef default_module(new RGWDefaultSyncModule());
+  sync_modules_manager->register_module("default", default_module);
+
   auto crs = std::unique_ptr<RGWCoroutinesManagerRegistry>{
     new RGWCoroutinesManagerRegistry(cct)};
   ret = crs->hook_to_admin_command("cr dump");
index 0caecab2909c7a111debcc65bb6368041508b5ed..4aa2976c823b285c7c6b8b95570baab208d757e1 100644 (file)
@@ -22,6 +22,7 @@
 #include "rgw_metadata.h"
 #include "rgw_meta_sync_status.h"
 #include "rgw_period_puller.h"
+#include "rgw_sync_module.h"
 
 class RGWWatcher;
 class SafeTimer;
@@ -1889,6 +1890,8 @@ protected:
   
   RGWCoroutinesManagerRegistry *cr_registry;
 
+  RGWSyncModulesManager *sync_modules_manager{nullptr};
+
   RGWZoneGroup zonegroup;
   RGWZone zone_public_config; /* external zone params, e.g., entrypoints, log flags, etc. */  
   RGWZoneParams zone_params; /* internal zone params, e.g., rados pools */
@@ -2030,6 +2033,9 @@ public:
     return obj_tombstone_cache;
   }
 
+  RGWSyncModulesManager *get_sync_modules_manager() {
+    return sync_modules_manager;
+  }
   int get_required_alignment(rgw_bucket& bucket, uint64_t *alignment);
   int get_max_chunk_size(rgw_bucket& bucket, uint64_t *max_chunk_size);
 
index d8a2792e41f0ef7110365eebd88f119c6d7f2573..06a9e8452fa31b98f1b161ee8a1d4ae36bac4d23 100644 (file)
@@ -22,6 +22,58 @@ public:
                                              rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch) = 0;
 };
 
-typedef std::shared_ptr<RGWDataSyncModule> RGWDataSyncModuleRef;
+class RGWSyncModuleInstance {
+public:
+  RGWSyncModuleInstance() {}
+  virtual ~RGWSyncModuleInstance() {}
+  virtual RGWDataSyncModule *get_data_handler() = 0;
+};
+
+typedef std::shared_ptr<RGWSyncModuleInstance> RGWSyncModuleInstanceRef;
+
+class RGWSyncModule {
+
+public:
+  RGWSyncModule() {}
+  virtual ~RGWSyncModule() {}
+
+  virtual int create_instance(map<string, string>& config, RGWSyncModuleInstanceRef *instance) = 0;
+};
+
+typedef std::shared_ptr<RGWSyncModule> RGWSyncModuleRef;
+
+
+class RGWSyncModulesManager {
+  Mutex lock;
+
+  map<string, RGWSyncModuleRef> modules;
+public:
+  RGWSyncModulesManager() : lock("RGWSyncModulesManager") {}
+
+  void register_module(const string& name, RGWSyncModuleRef& module) {
+    Mutex::Locker l(lock);
+    modules[name] = module;
+  }
+
+  bool get_module(const string& name, RGWSyncModuleRef *module) {
+    Mutex::Locker l(lock);
+    auto iter = modules.find(name);
+    if (iter == modules.end()) {
+      return false;
+    }
+    *module = iter->second;
+    return true;
+  }
+
+
+  int create_instance(const string& name, map<string, string>& config, RGWSyncModuleInstanceRef *instance) {
+    RGWSyncModuleRef module;
+    if (!get_module(name, &module)) {
+      return -ENOENT;
+    }
+
+    return module.get()->create_instance(config, instance);
+  }
+};
 
 #endif