}
};
-class RGWElasticInitConfigCBCR : public RGWCoroutine {
- RGWDataSyncEnv *sync_env;
- ElasticConfigRef conf;
- ESInfo es_info;
-
- struct _err_response {
- struct err_reason {
- vector<err_reason> root_cause;
- string type;
- string reason;
- string index;
-
- void decode_json(JSONObj *obj) {
- JSONDecoder::decode_json("root_cause", root_cause, obj);
- JSONDecoder::decode_json("type", type, obj);
- JSONDecoder::decode_json("reason", reason, obj);
- JSONDecoder::decode_json("index", index, obj);
- }
- } error;
-
- void decode_json(JSONObj *obj) {
- JSONDecoder::decode_json("error", error, obj);
- }
- } err_response;
-
+class RGWElasticGetESInfoCBCR : public RGWCoroutine {
public:
- RGWElasticInitConfigCBCR(RGWDataSyncEnv *_sync_env,
+ RGWElasticGetESInfoCBCR(RGWDataSyncEnv *_sync_env,
ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
conf(_conf) {}
int operate() override {
reenter(this) {
- ldout(sync_env->cct, 0) << ": init elasticsearch config zone=" << sync_env->source_zone << dendl;
+ ldout(sync_env->cct, 5) << conf->id << ": get elasticsearch info for zone: " << sync_env->source_zone << dendl;
yield call(new RGWReadRESTResourceCR<ESInfo> (sync_env->cct,
conf->conn.get(),
sync_env->http_manager,
"/", nullptr /*params*/,
&(conf->default_headers),
- &es_info));
+ &(conf->es_info)));
if (retcode < 0) {
+ ldout(sync_env->cct, 5) << conf->id << ": get elasticsearch failed: " << retcode << dendl;
return set_cr_error(retcode);
}
+ ldout(sync_env->cct, 5) << conf->id << ": got elastic version=" << conf->es_info.get_version_str() << dendl;
+ return set_cr_done();
+ }
+ return 0;
+ }
+private:
+ RGWDataSyncEnv *sync_env;
+ ElasticConfigRef conf;
+};
+
+class RGWElasticPutIndexCBCR : public RGWCoroutine {
+public:
+ RGWElasticPutIndexCBCR(RGWDataSyncEnv *_sync_env,
+ ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ conf(_conf) {}
+ int operate() override {
+ reenter(this) {
+ ldout(sync_env->cct, 5) << conf->id << ": put elasticsearch index for zone: " << sync_env->source_zone << dendl;
+
yield {
string path = conf->get_index_path();
- ldout(sync_env->cct, 5) << "got elastic version=" << es_info.get_version_str() << dendl;
-
es_index_settings settings(conf->num_replicas, conf->num_shards);
-
std::unique_ptr<es_index_config_base> index_conf;
- if (es_info.version >= ES_V5) {
+ if (conf->es_info.version >= ES_V5) {
ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version >= 5" << dendl;
- index_conf.reset(new es_index_config<es_type_v5>(settings, es_info.version));
+ index_conf.reset(new es_index_config<es_type_v5>(settings, conf->es_info.version));
} else {
ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version < 5" << dendl;
- index_conf.reset(new es_index_config<es_type_v2>(settings, es_info.version));
+ index_conf.reset(new es_index_config<es_type_v2>(settings, conf->es_info.version));
}
call(new RGWPutRESTResourceCR<es_index_config_base, int, _err_response> (sync_env->cct,
conf->conn.get(),
*index_conf, nullptr, &err_response));
}
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response.error.type << " response.reason=" << err_response.error.reason << dendl;
if (err_response.error.type != "index_already_exists_exception" &&
- err_response.error.type != "resource_already_exists_exception") {
+ err_response.error.type != "resource_already_exists_exception") {
+ ldout(sync_env->cct, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response.error.type << " response.reason=" << err_response.error.reason << dendl;
return set_cr_error(retcode);
}
return 0;
}
+private:
+ RGWDataSyncEnv *sync_env;
+ ElasticConfigRef conf;
+
+ struct _err_response {
+ struct err_reason {
+ vector<err_reason> root_cause;
+ string type;
+ string reason;
+ string index;
+
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("root_cause", root_cause, obj);
+ JSONDecoder::decode_json("type", type, obj);
+ JSONDecoder::decode_json("reason", reason, obj);
+ JSONDecoder::decode_json("index", index, obj);
+ }
+ } error;
+
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("error", error, obj);
+ }
+ } err_response;
+};
+
+class RGWElasticInitConfigCBCR : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ ElasticConfigRef conf;
+
+public:
+ RGWElasticInitConfigCBCR(RGWDataSyncEnv *_sync_env,
+ ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ conf(_conf) {}
+ int operate() override {
+ reenter(this) {
+
+ yield call(new RGWElasticGetESInfoCBCR(sync_env, conf));
+
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+
+ yield call(new RGWElasticPutIndexCBCR(sync_env, conf));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ return set_cr_done();
+ }
+ return 0;
+ }
+
};
class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
RGWCoroutine *start_sync(RGWDataSyncEnv *sync_env) override {
ldout(sync_env->cct, 5) << conf->id << ": start_sync" << dendl;
// try to get elastic search version
- return new RGWReadRESTResourceCR<ESInfo>(sync_env->cct,
- conf->conn.get(),
- sync_env->http_manager,
- "/", nullptr,
- &(conf->default_headers),
- &(conf->es_info));
+ return new RGWElasticGetESInfoCBCR(sync_env, conf);
}
RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {