From 722e2d0861c07a8ac6aed7f0a650f6d9e3a77d54 Mon Sep 17 00:00:00 2001 From: Chang Liu Date: Mon, 16 Dec 2019 11:38:52 +0000 Subject: [PATCH] rgw: refine es related coroutines Signed-off-by: Chang Liu --- src/rgw/rgw_sync_module_es.cc | 125 ++++++++++++++++++++++------------ 1 file changed, 83 insertions(+), 42 deletions(-) diff --git a/src/rgw/rgw_sync_module_es.cc b/src/rgw/rgw_sync_module_es.cc index 4468302cf7c52..83fc0e8cef36c 100644 --- a/src/rgw/rgw_sync_module_es.cc +++ b/src/rgw/rgw_sync_module_es.cc @@ -641,63 +641,57 @@ struct es_obj_metadata { } }; -class RGWElasticInitConfigCBCR : public RGWCoroutine { - RGWDataSyncEnv *sync_env; - ElasticConfigRef conf; - ESInfo es_info; - - struct _err_response { - struct err_reason { - vector root_cause; - string type; - string reason; - string index; - - void decode_json(JSONObj *obj) { - JSONDecoder::decode_json("root_cause", root_cause, obj); - JSONDecoder::decode_json("type", type, obj); - JSONDecoder::decode_json("reason", reason, obj); - JSONDecoder::decode_json("index", index, obj); - } - } error; - - void decode_json(JSONObj *obj) { - JSONDecoder::decode_json("error", error, obj); - } - } err_response; - +class RGWElasticGetESInfoCBCR : public RGWCoroutine { public: - RGWElasticInitConfigCBCR(RGWDataSyncEnv *_sync_env, + RGWElasticGetESInfoCBCR(RGWDataSyncEnv *_sync_env, ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), conf(_conf) {} int operate() override { reenter(this) { - ldout(sync_env->cct, 0) << ": init elasticsearch config zone=" << sync_env->source_zone << dendl; + ldout(sync_env->cct, 5) << conf->id << ": get elasticsearch info for zone: " << sync_env->source_zone << dendl; yield call(new RGWReadRESTResourceCR (sync_env->cct, conf->conn.get(), sync_env->http_manager, "/", nullptr /*params*/, &(conf->default_headers), - &es_info)); + &(conf->es_info))); if (retcode < 0) { + ldout(sync_env->cct, 5) << conf->id << ": get elasticsearch failed: " << retcode << dendl; return set_cr_error(retcode); } + ldout(sync_env->cct, 5) << conf->id << ": got elastic version=" << conf->es_info.get_version_str() << dendl; + return set_cr_done(); + } + return 0; + } +private: + RGWDataSyncEnv *sync_env; + ElasticConfigRef conf; +}; + +class RGWElasticPutIndexCBCR : public RGWCoroutine { +public: + RGWElasticPutIndexCBCR(RGWDataSyncEnv *_sync_env, + ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), + conf(_conf) {} + int operate() override { + reenter(this) { + ldout(sync_env->cct, 5) << conf->id << ": put elasticsearch index for zone: " << sync_env->source_zone << dendl; + yield { string path = conf->get_index_path(); - ldout(sync_env->cct, 5) << "got elastic version=" << es_info.get_version_str() << dendl; - es_index_settings settings(conf->num_replicas, conf->num_shards); - std::unique_ptr index_conf; - if (es_info.version >= ES_V5) { + if (conf->es_info.version >= ES_V5) { ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version >= 5" << dendl; - index_conf.reset(new es_index_config(settings, es_info.version)); + index_conf.reset(new es_index_config(settings, conf->es_info.version)); } else { ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version < 5" << dendl; - index_conf.reset(new es_index_config(settings, es_info.version)); + index_conf.reset(new es_index_config(settings, conf->es_info.version)); } call(new RGWPutRESTResourceCR (sync_env->cct, conf->conn.get(), @@ -707,10 +701,10 @@ public: *index_conf, nullptr, &err_response)); } if (retcode < 0) { - ldout(sync_env->cct, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response.error.type << " response.reason=" << err_response.error.reason << dendl; if (err_response.error.type != "index_already_exists_exception" && - err_response.error.type != "resource_already_exists_exception") { + err_response.error.type != "resource_already_exists_exception") { + ldout(sync_env->cct, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response.error.type << " response.reason=" << err_response.error.reason << dendl; return set_cr_error(retcode); } @@ -721,6 +715,58 @@ public: return 0; } +private: + RGWDataSyncEnv *sync_env; + ElasticConfigRef conf; + + struct _err_response { + struct err_reason { + vector root_cause; + string type; + string reason; + string index; + + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("root_cause", root_cause, obj); + JSONDecoder::decode_json("type", type, obj); + JSONDecoder::decode_json("reason", reason, obj); + JSONDecoder::decode_json("index", index, obj); + } + } error; + + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("error", error, obj); + } + } err_response; +}; + +class RGWElasticInitConfigCBCR : public RGWCoroutine { + RGWDataSyncEnv *sync_env; + ElasticConfigRef conf; + +public: + RGWElasticInitConfigCBCR(RGWDataSyncEnv *_sync_env, + ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), + conf(_conf) {} + int operate() override { + reenter(this) { + + yield call(new RGWElasticGetESInfoCBCR(sync_env, conf)); + + if (retcode < 0) { + return set_cr_error(retcode); + } + + yield call(new RGWElasticPutIndexCBCR(sync_env, conf)); + if (retcode < 0) { + return set_cr_error(retcode); + } + return set_cr_done(); + } + return 0; + } + }; class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { @@ -827,12 +873,7 @@ public: RGWCoroutine *start_sync(RGWDataSyncEnv *sync_env) override { ldout(sync_env->cct, 5) << conf->id << ": start_sync" << dendl; // try to get elastic search version - return new RGWReadRESTResourceCR(sync_env->cct, - conf->conn.get(), - sync_env->http_manager, - "/", nullptr, - &(conf->default_headers), - &(conf->es_info)); + return new RGWElasticGetESInfoCBCR(sync_env, conf); } RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { -- 2.39.5