]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: es: elasticsearch index path unique per sync instance
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 4 Apr 2017 18:34:33 +0000 (11:34 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 30 May 2017 20:26:52 +0000 (13:26 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h
src/rgw/rgw_sync_module.h
src/rgw/rgw_sync_module_es.cc
src/rgw/rgw_sync_module_es.h
src/rgw/rgw_sync_module_es_rest.cc

index db4da0847d1f0c1667f0ccf254b53b882719130a..560213ac6653f6e6a99613d07af4486f7cacf1d7 100644 (file)
@@ -22,6 +22,8 @@
 
 #include "cls/lock/cls_lock_client.h"
 
+#include "auth/Crypto.h"
+
 #define dout_subsys ceph_subsys_rgw
 
 #undef dout_prefix
@@ -476,6 +478,8 @@ public:
       num_shards(num_shards), status(status) {
     lock_name = "sync_lock";
 
+    get_random_bytes((char *)&status.instance_id, sizeof(status.instance_id));
+
 #define COOKIE_LEN 16
     char buf[COOKIE_LEN + 1];
 
@@ -1456,6 +1460,8 @@ public:
       /* read sync status */
       yield call(new RGWReadDataSyncStatusCoroutine(sync_env, &sync_status));
 
+      data_sync_module = sync_env->sync_module->get_data_handler();
+
       if (retcode == -ENOENT) {
         sync_status.sync_info.num_shards = num_shards;
       } else if (retcode < 0 && retcode != -ENOENT) {
@@ -1476,9 +1482,10 @@ public:
         *reset_backoff = true;
       }
 
+      data_sync_module->init(sync_env, sync_status.sync_info.instance_id);
+
       if  ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
         /* call sync module init here */
-        data_sync_module = sync_env->sync_module->get_data_handler();
         call(data_sync_module->init_sync(sync_env));
         /* state: building full sync maps */
         ldout(sync_env->cct, 20) << __func__ << "(): building full sync maps" << dendl;
index 7a68ce5fbb211db667af81ad02347784508f6c2e..00e094474557d446512aaee86bee44521487af36 100644 (file)
@@ -29,17 +29,23 @@ struct rgw_data_sync_info {
   uint16_t state;
   uint32_t num_shards;
 
+  uint64_t instance_id{0};
+
   void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 1, bl);
     ::encode(state, bl);
     ::encode(num_shards, bl);
+    ::encode(instance_id, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::iterator& bl) {
-     DECODE_START(1, bl);
+     DECODE_START(2, bl);
      ::decode(state, bl);
      ::decode(num_shards, bl);
+     if (struct_v >= 2) {
+       ::decode(instance_id, bl);
+     }
      DECODE_FINISH(bl);
   }
 
@@ -61,6 +67,7 @@ struct rgw_data_sync_info {
     }
     encode_json("status", s, f);
     encode_json("num_shards", num_shards, f);
+    encode_json("instance_id", instance_id, f);
   }
   void decode_json(JSONObj *obj) {
     std::string s;
@@ -73,6 +80,7 @@ struct rgw_data_sync_info {
       state = StateInit;
     }
     JSONDecoder::decode_json("num_shards", num_shards, obj);
+    JSONDecoder::decode_json("instance_id", num_shards, obj);
   }
 
   rgw_data_sync_info() : state((int)StateInit), num_shards(0) {}
index 898eda5fd58529f97060d92991585feeaca1ccc4..278d1df389315ee5bb4ab03ac2353a82486dd415 100644 (file)
@@ -16,6 +16,8 @@ public:
   RGWDataSyncModule() {}
   virtual ~RGWDataSyncModule() {}
 
+  virtual void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) {}
+
   virtual RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) {
     return nullptr;
   }
index 8f47f954e60414d24b7af69c6cbca6cdd1d9df63..e81c61ba5b964fd59ca9c19e63b961806929a0fa 100644 (file)
@@ -98,12 +98,31 @@ public:
 };
 
 struct ElasticConfig {
+  uint64_t sync_instance{0};
   string id;
+  string index_path;
   RGWRESTConn *conn{nullptr};
   bool explicit_custom_meta{true};
   ItemList index_buckets;
   ItemList allow_owners;
 
+  void init_instance(RGWRealm& realm, uint64_t instance_id) {
+    sync_instance = instance_id;
+
+    char buf[32];
+    snprintf(buf, sizeof(buf), "-%08x", (uint32_t)(sync_instance & 0xFFFFFFFF));
+
+    index_path = "/rgw-" + realm.get_name() + buf;
+  }
+
+  string get_index_path() {
+    return index_path;
+  }
+
+  string get_obj_path(const RGWBucketInfo& bucket_info, const rgw_obj_key& key) {
+    return index_path +  "/object/" + bucket_info.bucket.bucket_id + ":" + key.name + ":" + key.instance;
+  }
+
   bool should_handle_operation(RGWBucketInfo& bucket_info) {
     return index_buckets.exists(bucket_info.bucket.name) &&
            allow_owners.exists(bucket_info.owner.to_str());
@@ -112,18 +131,6 @@ struct ElasticConfig {
 
 using ElasticConfigRef = std::shared_ptr<ElasticConfig>;
 
-static string es_get_index_path(const RGWRealm& realm)
-{
-  string path = "/rgw-" + realm.get_name();
-  return path;
-}
-
-static string es_get_obj_path(const RGWRealm& realm, const RGWBucketInfo& bucket_info, const rgw_obj_key& key)
-{
-  string path = "/rgw-" + realm.get_name() + "/object/" + bucket_info.bucket.bucket_id + ":" + key.name + ":" + key.instance;
-  return path;
-}
-
 struct es_dump_type {
   const char *type;
   const char *format;
@@ -336,7 +343,7 @@ public:
     reenter(this) {
       ldout(sync_env->cct, 0) << ": init elasticsearch config zone=" << sync_env->source_zone << dendl;
       yield {
-        string path = es_get_index_path(sync_env->store->get_realm());
+        string path = conf->get_index_path();
 
         es_index_mappings doc;
 
@@ -367,7 +374,7 @@ public:
                               << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
                               << " attrs=" << attrs << dendl;
       yield {
-        string path = es_get_obj_path(sync_env->store->get_realm(), bucket_info, key);
+        string path = conf->get_obj_path(bucket_info, key);
         es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs);
 
         call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn,
@@ -418,7 +425,7 @@ public:
       ldout(sync_env->cct, 0) << ": remove remote obj: z=" << sync_env->source_zone
                               << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
       yield {
-        string path = es_get_obj_path(sync_env->store->get_realm(), bucket_info, key);
+        string path = conf->get_obj_path(bucket_info, key);
 
         call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn,
                                          sync_env->http_manager,
@@ -450,6 +457,10 @@ public:
     delete conf->conn;
   }
 
+  void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override {
+    conf->init_instance(sync_env->store->get_realm(), instance_id);
+  }
+
   RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override {
     ldout(sync_env->cct, 5) << conf->id << ": init" << dendl;
     return new RGWElasticInitConfigCBCR(sync_env, conf);
@@ -481,6 +492,10 @@ public:
   RGWRESTConn *get_rest_conn() {
     return conf->conn;
   }
+
+  string get_index_path() {
+    return conf->get_index_path();
+  }
 };
 
 RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext *cct, const map<string, string, ltstr_nocase>& config)
@@ -498,8 +513,8 @@ RGWRESTConn *RGWElasticSyncModuleInstance::get_rest_conn()
   return data_handler->get_rest_conn();
 }
 
-string RGWElasticSyncModuleInstance::get_index_path(const RGWRealm& realm) {
-  return es_get_index_path(realm);
+string RGWElasticSyncModuleInstance::get_index_path() {
+  return data_handler->get_index_path();
 }
 
 RGWRESTMgr *RGWElasticSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) {
index c9c2d5240a39337a8719c390d743383688f9bac8..43e591e42fc361b32fd1a9a09223caf0a3ce39f2 100644 (file)
@@ -3,8 +3,6 @@
 
 #include "rgw_sync_module.h"
 
-class RGWRealm;
-
 class RGWElasticSyncModule : public RGWSyncModule {
 public:
   RGWElasticSyncModule() {}
@@ -24,7 +22,7 @@ public:
   RGWDataSyncModule *get_data_handler() override;
   RGWRESTMgr *get_rest_filter(int dialect, RGWRESTMgr *orig) override;
   RGWRESTConn *get_rest_conn();
-  std::string get_index_path(const RGWRealm& realm);
+  std::string get_index_path();
 };
 
 #endif
index 2bc41530913b0a524b5de3436e3cd51da096f12c..82edd61d4eb0fa2343133f5c250399dea085e1c5 100644 (file)
@@ -209,7 +209,7 @@ void RGWMetadataSearchOp::execute()
   f.flush(ss);
   in.append(ss.str());
 
-  string resource = es_module->get_index_path(store->get_realm()) + "/_search";
+  string resource = es_module->get_index_path() + "/_search";
   param_vec_t params;
 #define BUFSIZE 32
   char buf[BUFSIZE];