]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: refine es related coroutines 32269/head
authorChang Liu <liuchang0812@gmail.com>
Mon, 16 Dec 2019 11:38:52 +0000 (11:38 +0000)
committerChang Liu <liuchang0812@gmail.com>
Mon, 16 Dec 2019 11:38:52 +0000 (11:38 +0000)
Signed-off-by: Chang Liu <liuchang0812@gmail.com>
src/rgw/rgw_sync_module_es.cc

index 4468302cf7c5293436df01ee037f2f129a9cc7cd..83fc0e8cef36c020317d14cd2bb56329ce98f906 100644 (file)
@@ -641,63 +641,57 @@ struct es_obj_metadata {
   }
 };
 
-class RGWElasticInitConfigCBCR : public RGWCoroutine {
-  RGWDataSyncEnv *sync_env;
-  ElasticConfigRef conf;
-  ESInfo es_info;
-
-  struct _err_response {
-    struct err_reason {
-      vector<err_reason> root_cause;
-      string type;
-      string reason;
-      string index;
-
-      void decode_json(JSONObj *obj) {
-        JSONDecoder::decode_json("root_cause", root_cause, obj);
-        JSONDecoder::decode_json("type", type, obj);
-        JSONDecoder::decode_json("reason", reason, obj);
-        JSONDecoder::decode_json("index", index, obj);
-      }
-    } error;
-
-    void decode_json(JSONObj *obj) {
-      JSONDecoder::decode_json("error", error, obj);
-    }
-  } err_response;
-
+class RGWElasticGetESInfoCBCR : public RGWCoroutine {
 public:
-  RGWElasticInitConfigCBCR(RGWDataSyncEnv *_sync_env,
+  RGWElasticGetESInfoCBCR(RGWDataSyncEnv *_sync_env, 
                           ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct),
                                                     sync_env(_sync_env),
                                                     conf(_conf) {}
   int operate() override {
     reenter(this) {
-      ldout(sync_env->cct, 0) << ": init elasticsearch config zone=" << sync_env->source_zone << dendl;
+      ldout(sync_env->cct, 5) << conf->id << ": get elasticsearch info for zone: " << sync_env->source_zone << dendl;
       yield call(new RGWReadRESTResourceCR<ESInfo> (sync_env->cct,
                                                     conf->conn.get(),
                                                     sync_env->http_manager,
                                                     "/", nullptr /*params*/,
                                                     &(conf->default_headers),
-                                                    &es_info));
+                                                    &(conf->es_info)));
       if (retcode < 0) {
+        ldout(sync_env->cct, 5) << conf->id << ": get elasticsearch failed: " << retcode << dendl;
         return set_cr_error(retcode);
       }
 
+      ldout(sync_env->cct, 5) << conf->id << ": got elastic version=" << conf->es_info.get_version_str() << dendl;
+      return set_cr_done();
+    }
+    return 0;
+  }
+private:
+  RGWDataSyncEnv *sync_env;
+  ElasticConfigRef conf;
+};
+
+class RGWElasticPutIndexCBCR : public RGWCoroutine {
+public:
+  RGWElasticPutIndexCBCR(RGWDataSyncEnv *_sync_env,
+                         ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct),
+                                                   sync_env(_sync_env),
+                                                   conf(_conf) {}
+  int operate() override {
+    reenter(this) {
+      ldout(sync_env->cct, 5) << conf->id << ": put elasticsearch index for zone: " << sync_env->source_zone << dendl;
+
       yield {
         string path = conf->get_index_path();
-        ldout(sync_env->cct, 5) << "got elastic version=" << es_info.get_version_str() << dendl;
-
         es_index_settings settings(conf->num_replicas, conf->num_shards);
-
         std::unique_ptr<es_index_config_base> index_conf;
 
-        if (es_info.version >= ES_V5) {
+        if (conf->es_info.version >= ES_V5) {
           ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version >= 5" << dendl;
-          index_conf.reset(new es_index_config<es_type_v5>(settings, es_info.version));
+          index_conf.reset(new es_index_config<es_type_v5>(settings, conf->es_info.version));
         } else {
           ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version < 5" << dendl;
-          index_conf.reset(new es_index_config<es_type_v2>(settings, es_info.version));
+          index_conf.reset(new es_index_config<es_type_v2>(settings, conf->es_info.version));
         }
         call(new RGWPutRESTResourceCR<es_index_config_base, int, _err_response> (sync_env->cct,
                                                              conf->conn.get(),
@@ -707,10 +701,10 @@ public:
                                                              *index_conf, nullptr, &err_response));
       }
       if (retcode < 0) {
-        ldout(sync_env->cct, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response.error.type << " response.reason=" << err_response.error.reason << dendl;
 
         if (err_response.error.type != "index_already_exists_exception" &&
-           err_response.error.type != "resource_already_exists_exception") {
+                 err_response.error.type != "resource_already_exists_exception") {
+          ldout(sync_env->cct, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response.error.type << " response.reason=" << err_response.error.reason << dendl;
           return set_cr_error(retcode);
         }
 
@@ -721,6 +715,58 @@ public:
     return 0;
   }
 
+private:
+  RGWDataSyncEnv *sync_env;
+  ElasticConfigRef conf;
+
+    struct _err_response {
+    struct err_reason {
+      vector<err_reason> root_cause;
+      string type;
+      string reason;
+      string index;
+
+      void decode_json(JSONObj *obj) {
+        JSONDecoder::decode_json("root_cause", root_cause, obj);
+        JSONDecoder::decode_json("type", type, obj);
+        JSONDecoder::decode_json("reason", reason, obj);
+        JSONDecoder::decode_json("index", index, obj);
+      }
+    } error;
+
+    void decode_json(JSONObj *obj) {
+      JSONDecoder::decode_json("error", error, obj);
+    }
+  } err_response;
+};
+
+class RGWElasticInitConfigCBCR : public RGWCoroutine {
+  RGWDataSyncEnv *sync_env;
+  ElasticConfigRef conf;
+
+public:
+  RGWElasticInitConfigCBCR(RGWDataSyncEnv *_sync_env,
+                          ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct),
+                                                    sync_env(_sync_env),
+                                                    conf(_conf) {}
+  int operate() override {
+    reenter(this) {
+
+      yield call(new RGWElasticGetESInfoCBCR(sync_env, conf));
+
+      if (retcode < 0) {
+        return set_cr_error(retcode);
+      }
+
+      yield call(new RGWElasticPutIndexCBCR(sync_env, conf));
+      if (retcode < 0) {
+          return set_cr_error(retcode);
+      }
+      return set_cr_done();
+    }
+    return 0;
+  }
+
 };
 
 class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
@@ -827,12 +873,7 @@ public:
   RGWCoroutine *start_sync(RGWDataSyncEnv *sync_env) override {
     ldout(sync_env->cct, 5) << conf->id << ": start_sync" << dendl;
     // try to get elastic search version
-    return new RGWReadRESTResourceCR<ESInfo>(sync_env->cct,
-                                            conf->conn.get(),
-                                            sync_env->http_manager,
-                                            "/", nullptr,
-                                            &(conf->default_headers),
-                                            &(conf->es_info));
+    return new RGWElasticGetESInfoCBCR(sync_env, conf);
   }
 
   RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {