]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: initial data plugin definition and default implementation
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 5 Jul 2016 22:38:45 +0000 (15:38 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Thu, 6 Oct 2016 23:08:20 +0000 (16:08 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h
src/rgw/rgw_sync_module.h [new file with mode: 0644]

index 4c82795a46b3408adab0c5579a184375e5dc4eae..48a367ac31d364be4345a0ecc616e6ccdee25c37 100644 (file)
@@ -18,6 +18,7 @@
 #include "rgw_bucket.h"
 #include "rgw_metadata.h"
 #include "rgw_boost_asio_yield.h"
+#include "rgw_sync_module.h"
 
 #include "cls/lock/cls_lock_client.h"
 
@@ -591,9 +592,9 @@ int RGWRemoteDataLog::read_source_log_shards_next(map<int, string> shard_markers
   return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result));
 }
 
-int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger)
+int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWDataSyncModuleRef& _sync_module)
 {
-  sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, _source_zone);
+  sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, _source_zone, _sync_module);
 
   if (initialized) {
     return 0;
@@ -1485,6 +1486,39 @@ public:
   }
 };
 
+class RGWDefaultDataSyncModule : public RGWDataSyncModule {
+public:
+  RGWDefaultDataSyncModule() {}
+
+  RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch) override;
+  RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch) override;
+  RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+                                     rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch) override;
+};
+
+RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch)
+{
+  return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
+                                 key, versioned_epoch,
+                                 true);
+}
+
+RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
+                                                      real_time& mtime, bool versioned, uint64_t versioned_epoch)
+{
+  return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
+                            bucket_info, key, versioned, versioned_epoch,
+                            NULL, NULL, false, &mtime);
+}
+
+RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+                                                             rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch)
+{
+  return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
+                            bucket_info, key, versioned, versioned_epoch,
+                            &owner.id, &owner.display_name, true, &mtime);
+}
+
 class RGWDataSyncControlCR : public RGWBackoffControlCR
 {
   RGWDataSyncEnv *sync_env;
@@ -1566,7 +1600,9 @@ int RGWDataSyncStatusManager::init()
 
   error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
 
-  r = source_log.init(source_zone, conn, error_logger);
+  sync_module.reset(new RGWDefaultDataSyncModule());
+
+  r = source_log.init(source_zone, conn, error_logger, sync_module);
   if (r < 0) {
     lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
     finalize();
@@ -1615,14 +1651,15 @@ string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int s
 
 int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
                              const rgw_bucket& bucket, int shard_id,
-                             RGWSyncErrorLogger *_error_logger)
+                             RGWSyncErrorLogger *_error_logger,
+                             RGWDataSyncModuleRef& _sync_module)
 {
   conn = _conn;
   source_zone = _source_zone;
   bs.bucket = bucket;
   bs.shard_id = shard_id;
 
-  sync_env.init(store->ctx(), store, conn, async_rados, http_manager, _error_logger, source_zone);
+  sync_env.init(store->ctx(), store, conn, async_rados, http_manager, _error_logger, source_zone, _sync_module);
 
   return 0;
 }
@@ -1846,18 +1883,11 @@ RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
 }
 
 
-struct bucket_entry_owner {
-  string id;
-  string display_name;
-
-  bucket_entry_owner() {}
-  bucket_entry_owner(const string& _id, const string& _display_name) : id(_id), display_name(_display_name) {}
-
-  void decode_json(JSONObj *obj) {
-    JSONDecoder::decode_json("ID", id, obj);
-    JSONDecoder::decode_json("DisplayName", display_name, obj);
-  }
-};
+void rgw_bucket_entry_owner::decode_json(JSONObj *obj)
+{
+  JSONDecoder::decode_json("ID", id, obj);
+  JSONDecoder::decode_json("DisplayName", display_name, obj);
+}
 
 struct bucket_list_entry {
   bool delete_marker;
@@ -1867,7 +1897,7 @@ struct bucket_list_entry {
   string etag;
   uint64_t size;
   string storage_class;
-  bucket_entry_owner owner;
+  rgw_bucket_entry_owner owner;
   uint64_t versioned_epoch;
   string rgw_tag;
 
@@ -2096,7 +2126,7 @@ class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
   rgw_obj_key key;
   bool versioned;
   uint64_t versioned_epoch;
-  bucket_entry_owner owner;
+  rgw_bucket_entry_owner owner;
   real_time timestamp;
   RGWModifyOp op;
   RGWPendingState op_state;
@@ -2119,7 +2149,7 @@ public:
                              const rgw_bucket_shard& bs,
                              const rgw_obj_key& _key, bool _versioned, uint64_t _versioned_epoch,
                              real_time& _timestamp,
-                             const bucket_entry_owner& _owner,
+                             const rgw_bucket_entry_owner& _owner,
                              RGWModifyOp _op, RGWPendingState _op_state,
                             const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker) : RGWCoroutine(_sync_env->cct),
                                                      sync_env(_sync_env),
@@ -2578,7 +2608,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
             ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << cur_id << ". Duplicate entry?" << dendl;
           } else {
             uint64_t versioned_epoch = 0;
-            bucket_entry_owner owner(entry->owner, entry->owner_display_name);
+            rgw_bucket_entry_owner owner(entry->owner, entry->owner_display_name);
             if (entry->ver.pool < 0) {
               versioned_epoch = entry->ver.epoch;
             }
@@ -2770,11 +2800,13 @@ int RGWBucketSyncStatusManager::init()
 
   error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
 
+  sync_module.reset(new RGWDefaultDataSyncModule());
+
   int effective_num_shards = (num_shards ? num_shards : 1);
 
   for (int i = 0; i < effective_num_shards; i++) {
     RGWRemoteBucketLog *l = new RGWRemoteBucketLog(store, this, async_rados, &http_manager);
-    ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger);
+    ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, sync_module);
     if (ret < 0) {
       ldout(store->ctx(), 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
       return ret;
index 07953c5d942c5a6f004ec2005ce7c62dfb2e88d8..e4d2b14c9cc6394ef98a71837bfd5f66e60393be 100644 (file)
@@ -5,6 +5,8 @@
 #include "rgw_http_client.h"
 #include "rgw_bucket.h"
 
+#include "rgw_sync_module.h"
+
 #include "common/RWLock.h"
 #include "common/ceph_json.h"
 
@@ -185,6 +187,18 @@ struct rgw_datalog_shard_data {
 class RGWAsyncRadosProcessor;
 class RGWDataSyncStatusManager;
 class RGWDataSyncControlCR;
+struct RGWDataSyncEnv;
+
+struct rgw_bucket_entry_owner {
+  string id;
+  string display_name;
+
+  rgw_bucket_entry_owner() {}
+  rgw_bucket_entry_owner(const string& _id, const string& _display_name) : id(_id), display_name(_display_name) {}
+
+  void decode_json(JSONObj *obj);
+};
+
 
 struct RGWDataSyncEnv {
   CephContext *cct;
@@ -194,12 +208,14 @@ struct RGWDataSyncEnv {
   RGWHTTPManager *http_manager;
   RGWSyncErrorLogger *error_logger;
   string source_zone;
+  RGWDataSyncModuleRef sync_module;
 
-  RGWDataSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL) {}
+  RGWDataSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL), sync_module(NULL) {}
 
   void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
             RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
-            RGWSyncErrorLogger *_error_logger, const string& _source_zone) {
+            RGWSyncErrorLogger *_error_logger, const string& _source_zone,
+            RGWDataSyncModuleRef& _sync_module) {
     cct = _cct;
     store = _store;
     conn = _conn;
@@ -207,6 +223,7 @@ struct RGWDataSyncEnv {
     http_manager = _http_manager;
     error_logger = _error_logger;
     source_zone = _source_zone;
+    sync_module = _sync_module;
   }
 
   string shard_obj_name(int shard_id);
@@ -232,7 +249,7 @@ public:
       http_manager(store->ctx(), completion_mgr),
       lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
       initialized(false) {}
-  int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger);
+  int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWDataSyncModuleRef& module);
   void finish();
 
   int read_log_info(rgw_datalog_info *log_info);
@@ -253,6 +270,7 @@ class RGWDataSyncStatusManager {
   string source_zone;
   RGWRESTConn *conn;
   RGWSyncErrorLogger *error_logger;
+  RGWDataSyncModuleRef sync_module;
 
   RGWRemoteDataLog source_log;
 
@@ -268,6 +286,7 @@ public:
   RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
                            const string& _source_zone)
     : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
+      sync_module(nullptr),
       source_log(store, async_rados), num_shards(0) {}
   ~RGWDataSyncStatusManager() {
     finalize();
@@ -441,7 +460,8 @@ public:
 
   int init(const string& _source_zone, RGWRESTConn *_conn,
            const rgw_bucket& bucket, int shard_id,
-           RGWSyncErrorLogger *_error_logger);
+           RGWSyncErrorLogger *_error_logger,
+           RGWDataSyncModuleRef& _sync_module);
   void finish();
 
   RGWCoroutine *read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status);
@@ -463,6 +483,7 @@ class RGWBucketSyncStatusManager {
   string source_zone;
   RGWRESTConn *conn;
   RGWSyncErrorLogger *error_logger;
+  RGWDataSyncModuleRef sync_module;
 
   rgw_bucket bucket;
 
diff --git a/src/rgw/rgw_sync_module.h b/src/rgw/rgw_sync_module.h
new file mode 100644 (file)
index 0000000..d8a2792
--- /dev/null
@@ -0,0 +1,27 @@
+#ifndef CEPH_RGW_SYNC_MODULE_H
+#define CEPH_RGW_SYNC_MODULE_H
+
+#include "rgw_common.h"
+
+class RGWCoroutine;
+class RGWBucketInfo;
+class RGWRemoteDataLog;
+struct RGWDataSyncEnv;
+struct rgw_bucket_entry_owner;
+struct rgw_obj_key;
+
+class RGWDataSyncModule {
+public:
+  RGWDataSyncModule() {}
+  virtual ~RGWDataSyncModule() {}
+
+  virtual RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch) = 0;
+  virtual RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+                                      bool versioned, uint64_t versioned_epoch) = 0;
+  virtual RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+                                             rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch) = 0;
+};
+
+typedef std::shared_ptr<RGWDataSyncModule> RGWDataSyncModuleRef;
+
+#endif