From c39761cddacbc82f70805b36e15970044c1a5d9c Mon Sep 17 00:00:00 2001 From: Chang Liu Date: Mon, 12 Aug 2019 17:14:07 +0800 Subject: [PATCH] rgw: sync with elastic search v7 elastic search removes mapping types feature. see https://www.elastic.co/guide/en/elasticsearch/reference/current/removal-of-types.html to support elastic search above 7.0, es sync module includes following changes : 1. the ElasticConfig struct addes new member "esinfo"; 2. create elastic index without root type "object"; 3. upload elastic document with new typeless path "/_doc/{id}"; 4. add new exist error code "resource_already_exists_exception". Fixes: https://tracker.ceph.com/issues/41227 Signed-off-by: Chang Liu --- src/rgw/rgw_sync_module_es.cc | 47 ++++++++++++++++++++++++++++++----- 1 file changed, 41 insertions(+), 6 deletions(-) diff --git a/src/rgw/rgw_sync_module_es.cc b/src/rgw/rgw_sync_module_es.cc index 4f0f3a9e894..3d466b08249 100644 --- a/src/rgw/rgw_sync_module_es.cc +++ b/src/rgw/rgw_sync_module_es.cc @@ -112,6 +112,7 @@ public: using ESVersion = std::pair; static constexpr ESVersion ES_V5{5,0}; +static constexpr ESVersion ES_V7{7,0}; struct ESInfo { std::string name; @@ -171,6 +172,7 @@ struct ElasticConfig { uint32_t num_shards{0}; uint32_t num_replicas{0}; std::map default_headers = {{ "Content-Type", "application/json" }}; + ESInfo es_info; void init(CephContext *cct, const JSONFormattable& config) { string elastic_endpoint = config["endpoint"]; @@ -216,7 +218,12 @@ struct ElasticConfig { } string get_obj_path(const RGWBucketInfo& bucket_info, const rgw_obj_key& key) { - return index_path + "/object/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance)); + if (es_info.version >= ES_V7) { + return index_path+ "/_doc/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance)); +; + } else { + return index_path + "/object/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance)); + } } bool should_handle_operation(RGWBucketInfo& bucket_info) { @@ -328,8 +335,12 @@ struct es_type : public T { template struct es_index_mappings { + ESVersion es_version; ESType string_type {ESType::String}; + es_index_mappings(ESVersion esv):es_version(esv) { + } + es_type est(ESType t) const { return es_type(t); } @@ -345,7 +356,8 @@ struct es_index_mappings { } void dump(Formatter *f) const { - f->open_object_section("object"); + if (es_version <= ES_V7) + f->open_object_section("object"); f->open_object_section("properties"); encode_json("bucket", est(string_type), f); encode_json("name", est(string_type), f); @@ -370,6 +382,8 @@ struct es_index_mappings { f->close_section(); // properties f->close_section(); // meta f->close_section(); // properties + + if (es_version <= ES_V7) f->close_section(); // object } }; @@ -396,7 +410,8 @@ struct es_index_config : public es_index_config_base { es_index_settings settings; es_index_mappings mappings; - es_index_config(es_index_settings& _s) : settings(_s) {} + es_index_config(es_index_settings& _s, ESVersion esv) : settings(_s), mappings(esv) { + } void dump(Formatter *f) const { encode_json("settings", settings, f); @@ -679,10 +694,10 @@ public: if (es_info.version >= ES_V5) { ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version >= 5" << dendl; - index_conf.reset(new es_index_config(settings)); + index_conf.reset(new es_index_config(settings, es_info.version)); } else { ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version < 5" << dendl; - index_conf.reset(new es_index_config(settings)); + index_conf.reset(new es_index_config(settings, es_info.version)); } call(new RGWPutRESTResourceCR (sync_env->cct, conf->conn.get(), @@ -694,7 +709,8 @@ public: if (retcode < 0) { ldout(sync_env->cct, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response.error.type << " response.reason=" << err_response.error.reason << dendl; - if (err_response.error.type != "index_already_exists_exception") { + if (err_response.error.type != "index_already_exists_exception" && + err_response.error.type != "resource_already_exists_exception") { return set_cr_error(retcode); } @@ -801,6 +817,25 @@ public: void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override { conf->init_instance(sync_env->store->svc()->zone->get_realm(), instance_id); + // try to get elastic search version + RGWCoroutinesManager crs(sync_env->store->ctx(), sync_env->store->getRados()->get_cr_registry()); + RGWHTTPManager http_manager(sync_env->store->ctx(), crs.get_completion_mgr()); + int ret = http_manager.start(); + if (ret < 0) { + return; + } + ret = crs.run(new RGWReadRESTResourceCR(sync_env->cct, + conf->conn.get(), + &http_manager, + "/", nullptr, + &(conf->default_headers), + &(conf->es_info))); + http_manager.stop(); + if (ret < 0) { + ldout(sync_env->cct, 1) << conf->id << ": fetch elastic info failed: " << ret << dendl; + } else { + ldout(sync_env->cct, 5) << conf->id << ": got elastic version=" << conf->es_info.get_version_str() << dendl; + } } RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override { -- 2.39.5