using ESVersion = std::pair<int,int>;
static constexpr ESVersion ES_V5{5,0};
+static constexpr ESVersion ES_V7{7,0};
struct ESInfo {
std::string name;
uint32_t num_shards{0};
uint32_t num_replicas{0};
std::map <string,string> default_headers = {{ "Content-Type", "application/json" }};
+ ESInfo es_info;
void init(CephContext *cct, const JSONFormattable& config) {
string elastic_endpoint = config["endpoint"];
}
string get_obj_path(const RGWBucketInfo& bucket_info, const rgw_obj_key& key) {
- return index_path + "/object/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance));
+ if (es_info.version >= ES_V7) {
+ return index_path+ "/_doc/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance));
+;
+ } else {
+ return index_path + "/object/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance));
+ }
}
bool should_handle_operation(RGWBucketInfo& bucket_info) {
template <class T>
struct es_index_mappings {
+ ESVersion es_version;
ESType string_type {ESType::String};
+ es_index_mappings(ESVersion esv):es_version(esv) {
+ }
+
es_type<T> est(ESType t) const {
return es_type<T>(t);
}
}
void dump(Formatter *f) const {
- f->open_object_section("object");
+ if (es_version <= ES_V7)
+ f->open_object_section("object");
f->open_object_section("properties");
encode_json("bucket", est(string_type), f);
encode_json("name", est(string_type), f);
f->close_section(); // properties
f->close_section(); // meta
f->close_section(); // properties
+
+ if (es_version <= ES_V7)
f->close_section(); // object
}
};
es_index_settings settings;
es_index_mappings<T> mappings;
- es_index_config(es_index_settings& _s) : settings(_s) {}
+ es_index_config(es_index_settings& _s, ESVersion esv) : settings(_s), mappings(esv) {
+ }
void dump(Formatter *f) const {
encode_json("settings", settings, f);
if (es_info.version >= ES_V5) {
ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version >= 5" << dendl;
- index_conf.reset(new es_index_config<es_type_v5>(settings));
+ index_conf.reset(new es_index_config<es_type_v5>(settings, es_info.version));
} else {
ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version < 5" << dendl;
- index_conf.reset(new es_index_config<es_type_v2>(settings));
+ index_conf.reset(new es_index_config<es_type_v2>(settings, es_info.version));
}
call(new RGWPutRESTResourceCR<es_index_config_base, int, _err_response> (sync_env->cct,
conf->conn.get(),
if (retcode < 0) {
ldout(sync_env->cct, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response.error.type << " response.reason=" << err_response.error.reason << dendl;
- if (err_response.error.type != "index_already_exists_exception") {
+ if (err_response.error.type != "index_already_exists_exception" &&
+ err_response.error.type != "resource_already_exists_exception") {
return set_cr_error(retcode);
}
void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override {
conf->init_instance(sync_env->store->svc.zone->get_realm(), instance_id);
+ // try to get elastic search version
+ RGWCoroutinesManager crs(sync_env->store->ctx(), sync_env->store->get_cr_registry());
+ RGWHTTPManager http_manager(sync_env->store->ctx(), crs.get_completion_mgr());
+ int ret = http_manager.start();
+ if (ret < 0) {
+ return;
+ }
+ ret = crs.run(new RGWReadRESTResourceCR<ESInfo>(sync_env->cct,
+ conf->conn.get(),
+ &http_manager,
+ "/", nullptr,
+ &(conf->default_headers),
+ &(conf->es_info)));
+ http_manager.stop();
+ if (ret < 0) {
+ ldout(sync_env->cct, 1) << conf->id << ": fetch elastic info failed: " << ret << dendl;
+ } else {
+ ldout(sync_env->cct, 5) << conf->id << ": got elastic version=" << conf->es_info.get_version_str() << dendl;
+ }
}
RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override {