]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: sync with elastic search v7 31027/head
authorChang Liu <liuchang0812@gmail.com>
Mon, 12 Aug 2019 09:14:07 +0000 (17:14 +0800)
committerNathan Cutler <ncutler@suse.com>
Tue, 22 Oct 2019 07:55:47 +0000 (09:55 +0200)
elastic search removes mapping types feature. see
https://www.elastic.co/guide/en/elasticsearch/reference/current/removal-of-types.html

to support elastic search above 7.0, es sync module includes following
changes :

1. the ElasticConfig struct addes new member "esinfo";
2. create elastic index without root type "object";
3. upload elastic document with new typeless path "/_doc/{id}";
4. add new exist error code "resource_already_exists_exception".

Fixes: https://tracker.ceph.com/issues/41227
Signed-off-by: Chang Liu <liuchang0812@gmail.com>
(cherry picked from commit c39761cddacbc82f70805b36e15970044c1a5d9c)

Conflicts:
src/rgw/rgw_sync_module_es.cc
- svc.zone
- get_cr_registry

src/rgw/rgw_sync_module_es.cc

index f9b02a4027bbfa917ef413f3ca25600f1a90515d..36b652a1b6020493c192a190b74d0131cc567eb5 100644 (file)
@@ -112,6 +112,7 @@ public:
 
 using ESVersion = std::pair<int,int>;
 static constexpr ESVersion ES_V5{5,0};
+static constexpr ESVersion ES_V7{7,0};
 
 struct ESInfo {
   std::string name;
@@ -171,6 +172,7 @@ struct ElasticConfig {
   uint32_t num_shards{0};
   uint32_t num_replicas{0};
   std::map <string,string> default_headers = {{ "Content-Type", "application/json" }};
+  ESInfo es_info;
 
   void init(CephContext *cct, const JSONFormattable& config) {
     string elastic_endpoint = config["endpoint"];
@@ -216,7 +218,12 @@ struct ElasticConfig {
   }
 
   string get_obj_path(const RGWBucketInfo& bucket_info, const rgw_obj_key& key) {
-    return index_path +  "/object/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance));
+    if (es_info.version >= ES_V7) {
+      return index_path+ "/_doc/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance));
+;
+    } else {
+      return index_path +  "/object/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance));
+    }
   }
 
   bool should_handle_operation(RGWBucketInfo& bucket_info) {
@@ -328,8 +335,12 @@ struct es_type : public T {
 
 template <class T>
 struct es_index_mappings {
+  ESVersion es_version;
   ESType string_type {ESType::String};
 
+  es_index_mappings(ESVersion esv):es_version(esv) {
+  }
+
   es_type<T> est(ESType t) const {
     return es_type<T>(t);
   }
@@ -345,7 +356,8 @@ struct es_index_mappings {
   }
 
   void dump(Formatter *f) const {
-    f->open_object_section("object");
+    if (es_version <= ES_V7)
+      f->open_object_section("object");
     f->open_object_section("properties");
     encode_json("bucket", est(string_type), f);
     encode_json("name", est(string_type), f);
@@ -370,6 +382,8 @@ struct es_index_mappings {
     f->close_section(); // properties
     f->close_section(); // meta
     f->close_section(); // properties
+
+    if (es_version <= ES_V7)
     f->close_section(); // object
   }
 };
@@ -396,7 +410,8 @@ struct es_index_config : public es_index_config_base {
   es_index_settings settings;
   es_index_mappings<T> mappings;
 
-  es_index_config(es_index_settings& _s) : settings(_s) {}
+  es_index_config(es_index_settings& _s, ESVersion esv) : settings(_s), mappings(esv) {
+  }
 
   void dump(Formatter *f) const {
     encode_json("settings", settings, f);
@@ -679,10 +694,10 @@ public:
 
         if (es_info.version >= ES_V5) {
           ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version >= 5" << dendl;
-          index_conf.reset(new es_index_config<es_type_v5>(settings));
+          index_conf.reset(new es_index_config<es_type_v5>(settings, es_info.version));
         } else {
           ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version < 5" << dendl;
-          index_conf.reset(new es_index_config<es_type_v2>(settings));
+          index_conf.reset(new es_index_config<es_type_v2>(settings, es_info.version));
         }
         call(new RGWPutRESTResourceCR<es_index_config_base, int, _err_response> (sync_env->cct,
                                                              conf->conn.get(),
@@ -694,7 +709,8 @@ public:
       if (retcode < 0) {
         ldout(sync_env->cct, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response.error.type << " response.reason=" << err_response.error.reason << dendl;
 
-        if (err_response.error.type != "index_already_exists_exception") {
+        if (err_response.error.type != "index_already_exists_exception" &&
+           err_response.error.type != "resource_already_exists_exception") {
           return set_cr_error(retcode);
         }
 
@@ -801,6 +817,25 @@ public:
 
   void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override {
     conf->init_instance(sync_env->store->svc.zone->get_realm(), instance_id);
+    // try to get elastic search version
+    RGWCoroutinesManager crs(sync_env->store->ctx(), sync_env->store->get_cr_registry());
+    RGWHTTPManager http_manager(sync_env->store->ctx(), crs.get_completion_mgr());
+    int ret = http_manager.start();
+    if (ret < 0) {
+      return;
+    }
+    ret = crs.run(new RGWReadRESTResourceCR<ESInfo>(sync_env->cct,
+                                             conf->conn.get(),
+                                             &http_manager,
+                                             "/", nullptr,
+                                             &(conf->default_headers),
+                                             &(conf->es_info)));
+    http_manager.stop();
+    if (ret < 0) {
+      ldout(sync_env->cct, 1) << conf->id << ": fetch elastic info failed: " << ret << dendl;
+    } else {
+      ldout(sync_env->cct, 5) << conf->id << ": got elastic version=" << conf->es_info.get_version_str() << dendl;
+    }
   }
 
   RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override {