]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: initial implementation of elasticsearch sync module
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 17 Aug 2016 09:45:32 +0000 (02:45 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 7 Oct 2016 17:31:21 +0000 (10:31 -0700)
sync module that will handle rgw metadata indexing.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/CMakeLists.txt
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h
src/rgw/rgw_rados.cc
src/rgw/rgw_sync_module.cc
src/rgw/rgw_sync_module.h
src/rgw/rgw_sync_module_es.cc [new file with mode: 0644]
src/rgw/rgw_sync_module_es.h [new file with mode: 0644]
src/rgw/rgw_sync_module_log.cc
src/rgw/rgw_sync_module_log.h

index 4bbb548257b85d6269230a9e2d99f26eb56db2f4..01a5a79ebcfd90191ceeeadfa6d3fadc7c62f86e 100644 (file)
@@ -51,6 +51,7 @@ set(rgw_a_srcs
   rgw_sync.cc
   rgw_data_sync.cc
   rgw_sync_module.cc
+  rgw_sync_module_es.cc
   rgw_sync_module_log.cc
   rgw_period_history.cc
   rgw_period_puller.cc
index 0be2bd44a7b07bca8265198508498acaa3be7d28..1cd2e02974c85e361ca16ab249986b546dc1f1ba 100644 (file)
@@ -1505,7 +1505,7 @@ public:
   }
 };
 
-int RGWDefaultSyncModule::create_instance(map<string, string>& config, RGWSyncModuleInstanceRef *instance)
+int RGWDefaultSyncModule::create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance)
 {
   instance->reset(new RGWDefaultSyncModuleInstance());
   return 0;
index 331249c2b269ef5c11445084bbc14f618466c281..80dda7725e85543baf0b3938f5b7117cc2ddc13a 100644 (file)
@@ -526,7 +526,7 @@ class RGWDefaultSyncModule : public RGWSyncModule {
 public:
   RGWDefaultSyncModule() {}
   bool supports_data_export() override { return true; }
-  int create_instance(map<string, string>& config, RGWSyncModuleInstanceRef *instance) override;
+  int create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) override;
 };
 
 
index dc03f28a37b85637b5ebac5ee067fd731da8d9d2..49cf4e0d4e08418404a8b0864bb491aaa5435467 100644 (file)
@@ -3791,7 +3791,7 @@ int RGWRados::init_complete()
 
   zone_short_id = current_period.get_map().get_zone_short_id(zone_params.get_id());
 
-  ret = sync_modules_manager->create_instance(zone_public_config.tier_type, zone_params.tier_config, &sync_module);
+  ret = sync_modules_manager->create_instance(cct, zone_public_config.tier_type, zone_params.tier_config, &sync_module);
   if (ret < 0) {
     lderr(cct) << "ERROR: failed to init sync module instance, ret=" << ret << dendl;
     return ret;
index 3e65de6b46cbeea646978d5708a03933fa2235a4..36418e5a28d0c54e7407f9e97c69dece9df31d6b 100644 (file)
@@ -6,6 +6,7 @@
 #include "rgw_boost_asio_yield.h"
 
 #include "rgw_sync_module_log.h"
+#include "rgw_sync_module_es.h"
 
 #define dout_subsys ceph_subsys_rgw
 
@@ -58,4 +59,7 @@ void rgw_register_sync_modules(RGWSyncModulesManager *modules_manager)
 
   RGWSyncModuleRef log_module(new RGWLogSyncModule());
   modules_manager->register_module("log", log_module);
+
+  RGWSyncModuleRef es_module(new RGWElasticSyncModule());
+  modules_manager->register_module("elasticsearch", es_module);
 }
index 7b465b9e8ce159b6497c43e4cc837621b54b3e6d..2bae4d2d47c43fb425a48a6d44d90c5bb0cef572 100644 (file)
@@ -39,7 +39,7 @@ public:
   virtual ~RGWSyncModule() {}
 
   virtual bool supports_data_export() = 0;
-  virtual int create_instance(map<string, string>& config, RGWSyncModuleInstanceRef *instance) = 0;
+  virtual int create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) = 0;
 };
 
 typedef std::shared_ptr<RGWSyncModule> RGWSyncModuleRef;
@@ -80,13 +80,13 @@ public:
     return module.get()->supports_data_export();
   }
 
-  int create_instance(const string& name, map<string, string>& config, RGWSyncModuleInstanceRef *instance) {
+  int create_instance(CephContext *cct, const string& name, map<string, string>& config, RGWSyncModuleInstanceRef *instance) {
     RGWSyncModuleRef module;
     if (!get_module(name, &module)) {
       return -ENOENT;
     }
 
-    return module.get()->create_instance(config, instance);
+    return module.get()->create_instance(cct, config, instance);
   }
 };
 
diff --git a/src/rgw/rgw_sync_module_es.cc b/src/rgw/rgw_sync_module_es.cc
new file mode 100644 (file)
index 0000000..6aa9d73
--- /dev/null
@@ -0,0 +1,95 @@
+#include "rgw_common.h"
+#include "rgw_coroutine.h"
+#include "rgw_sync_module.h"
+#include "rgw_data_sync.h"
+#include "rgw_boost_asio_yield.h"
+#include "rgw_sync_module_es.h"
+#include "rgw_rest_conn.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+struct ElasticConfig {
+  string id;
+  RGWRESTConn *conn{nullptr};
+};
+
+class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
+  const ElasticConfig& conf;
+public:
+  RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
+                          RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
+                          const ElasticConfig& _conf) : RGWStatRemoteObjCBCR(sync_env, bucket_info, key), conf(_conf) {}
+  int operate() override {
+    reenter(this) {
+      ldout(sync_env->cct, 0) << ": stat of remote obj: z=" << sync_env->source_zone
+                              << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
+                              << " attrs=" << attrs << dendl;
+      return set_cr_done();
+    }
+    return 0;
+  }
+
+};
+
+class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
+  const ElasticConfig& conf;
+public:
+  RGWElasticHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
+                        RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
+                        const ElasticConfig& _conf) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
+                                                           conf(_conf) {
+  }
+
+  ~RGWElasticHandleRemoteObjCR() {}
+
+  RGWStatRemoteObjCBCR *allocate_callback() override {
+    return new RGWElasticHandleRemoteObjCBCR(sync_env, bucket_info, key, conf);
+  }
+};
+
+class RGWElasticDataSyncModule : public RGWDataSyncModule {
+  ElasticConfig conf;
+public:
+  RGWElasticDataSyncModule(CephContext *cct, const string& elastic_endpoint) {
+    conf.id = string("elastic:") + elastic_endpoint;
+    conf.conn = new RGWRESTConn(cct, nullptr, conf.id, { elastic_endpoint });
+  }
+  ~RGWElasticDataSyncModule() {
+    delete conf.conn;
+  }
+
+  RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch) override {
+    ldout(sync_env->cct, 0) << conf.id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
+    return new RGWElasticHandleRemoteObjCR(sync_env, bucket_info, key, conf);
+  }
+  RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch) override {
+    ldout(sync_env->cct, 0) << conf.id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+    return NULL;
+  }
+  RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+                                     rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch) override {
+    ldout(sync_env->cct, 0) << conf.id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
+                            << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+    return NULL;
+  }
+};
+
+class RGWElasticSyncModuleInstance : public RGWSyncModuleInstance {
+  RGWElasticDataSyncModule data_handler;
+public:
+  RGWElasticSyncModuleInstance(CephContext *cct, const string& endpoint) : data_handler(cct, endpoint) {}
+  RGWDataSyncModule *get_data_handler() override {
+    return &data_handler;
+  }
+};
+
+int RGWElasticSyncModule::create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) {
+  string endpoint;
+  auto i = config.find("endpoint");
+  if (i != config.end()) {
+    endpoint = i->second;
+  }
+  instance->reset(new RGWElasticSyncModuleInstance(cct, endpoint));
+  return 0;
+}
+
diff --git a/src/rgw/rgw_sync_module_es.h b/src/rgw/rgw_sync_module_es.h
new file mode 100644 (file)
index 0000000..73c8368
--- /dev/null
@@ -0,0 +1,15 @@
+#ifndef CEPH_RGW_SYNC_MODULE_ES_H
+#define CEPH_RGW_SYNC_MODULE_ES_H
+
+#include "rgw_sync_module.h"
+
+class RGWElasticSyncModule : public RGWSyncModule {
+public:
+  RGWElasticSyncModule() {}
+  bool supports_data_export() override {
+    return false;
+  }
+  int create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) override;
+};
+
+#endif
index 2c6352ac8a18b1450e67110e91d5a9b43c465e80..230fde3f01cf218adb7ac282288a66df8d29226d 100644 (file)
@@ -64,7 +64,7 @@ public:
   }
 };
 
-int RGWLogSyncModule::create_instance(map<string, string>& config, RGWSyncModuleInstanceRef *instance) {
+int RGWLogSyncModule::create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) {
   string prefix;
   auto i = config.find("prefix");
   if (i != config.end()) {
index f889e319ae923be9492c24fceb49f19a7510bc38..5b7bae6552de13fb575eaca0a342c56d7bddeb6b 100644 (file)
@@ -9,7 +9,7 @@ public:
   bool supports_data_export() override {
     return false;
   }
-  int create_instance(map<string, string>& config, RGWSyncModuleInstanceRef *instance) override;
+  int create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) override;
 };
 
 #endif