From: Yehuda Sadeh Date: Tue, 4 Apr 2017 18:34:33 +0000 (-0700) Subject: rgw: es: elasticsearch index path unique per sync instance X-Git-Tag: ses5-milestone6~9^2~3^2~34 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f5b158fcbd3683a25accd6baebb6c78226f64ff2;p=ceph.git rgw: es: elasticsearch index path unique per sync instance Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index db4da0847d1f..560213ac6653 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -22,6 +22,8 @@ #include "cls/lock/cls_lock_client.h" +#include "auth/Crypto.h" + #define dout_subsys ceph_subsys_rgw #undef dout_prefix @@ -476,6 +478,8 @@ public: num_shards(num_shards), status(status) { lock_name = "sync_lock"; + get_random_bytes((char *)&status.instance_id, sizeof(status.instance_id)); + #define COOKIE_LEN 16 char buf[COOKIE_LEN + 1]; @@ -1456,6 +1460,8 @@ public: /* read sync status */ yield call(new RGWReadDataSyncStatusCoroutine(sync_env, &sync_status)); + data_sync_module = sync_env->sync_module->get_data_handler(); + if (retcode == -ENOENT) { sync_status.sync_info.num_shards = num_shards; } else if (retcode < 0 && retcode != -ENOENT) { @@ -1476,9 +1482,10 @@ public: *reset_backoff = true; } + data_sync_module->init(sync_env, sync_status.sync_info.instance_id); + if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) { /* call sync module init here */ - data_sync_module = sync_env->sync_module->get_data_handler(); call(data_sync_module->init_sync(sync_env)); /* state: building full sync maps */ ldout(sync_env->cct, 20) << __func__ << "(): building full sync maps" << dendl; diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 7a68ce5fbb21..00e094474557 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -29,17 +29,23 @@ struct rgw_data_sync_info { uint16_t state; uint32_t num_shards; + uint64_t instance_id{0}; + void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 1, bl); ::encode(state, bl); ::encode(num_shards, bl); + ::encode(instance_id, bl); ENCODE_FINISH(bl); } void decode(bufferlist::iterator& bl) { - DECODE_START(1, bl); + DECODE_START(2, bl); ::decode(state, bl); ::decode(num_shards, bl); + if (struct_v >= 2) { + ::decode(instance_id, bl); + } DECODE_FINISH(bl); } @@ -61,6 +67,7 @@ struct rgw_data_sync_info { } encode_json("status", s, f); encode_json("num_shards", num_shards, f); + encode_json("instance_id", instance_id, f); } void decode_json(JSONObj *obj) { std::string s; @@ -73,6 +80,7 @@ struct rgw_data_sync_info { state = StateInit; } JSONDecoder::decode_json("num_shards", num_shards, obj); + JSONDecoder::decode_json("instance_id", num_shards, obj); } rgw_data_sync_info() : state((int)StateInit), num_shards(0) {} diff --git a/src/rgw/rgw_sync_module.h b/src/rgw/rgw_sync_module.h index 898eda5fd585..278d1df38931 100644 --- a/src/rgw/rgw_sync_module.h +++ b/src/rgw/rgw_sync_module.h @@ -16,6 +16,8 @@ public: RGWDataSyncModule() {} virtual ~RGWDataSyncModule() {} + virtual void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) {} + virtual RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) { return nullptr; } diff --git a/src/rgw/rgw_sync_module_es.cc b/src/rgw/rgw_sync_module_es.cc index 8f47f954e604..e81c61ba5b96 100644 --- a/src/rgw/rgw_sync_module_es.cc +++ b/src/rgw/rgw_sync_module_es.cc @@ -98,12 +98,31 @@ public: }; struct ElasticConfig { + uint64_t sync_instance{0}; string id; + string index_path; RGWRESTConn *conn{nullptr}; bool explicit_custom_meta{true}; ItemList index_buckets; ItemList allow_owners; + void init_instance(RGWRealm& realm, uint64_t instance_id) { + sync_instance = instance_id; + + char buf[32]; + snprintf(buf, sizeof(buf), "-%08x", (uint32_t)(sync_instance & 0xFFFFFFFF)); + + index_path = "/rgw-" + realm.get_name() + buf; + } + + string get_index_path() { + return index_path; + } + + string get_obj_path(const RGWBucketInfo& bucket_info, const rgw_obj_key& key) { + return index_path + "/object/" + bucket_info.bucket.bucket_id + ":" + key.name + ":" + key.instance; + } + bool should_handle_operation(RGWBucketInfo& bucket_info) { return index_buckets.exists(bucket_info.bucket.name) && allow_owners.exists(bucket_info.owner.to_str()); @@ -112,18 +131,6 @@ struct ElasticConfig { using ElasticConfigRef = std::shared_ptr; -static string es_get_index_path(const RGWRealm& realm) -{ - string path = "/rgw-" + realm.get_name(); - return path; -} - -static string es_get_obj_path(const RGWRealm& realm, const RGWBucketInfo& bucket_info, const rgw_obj_key& key) -{ - string path = "/rgw-" + realm.get_name() + "/object/" + bucket_info.bucket.bucket_id + ":" + key.name + ":" + key.instance; - return path; -} - struct es_dump_type { const char *type; const char *format; @@ -336,7 +343,7 @@ public: reenter(this) { ldout(sync_env->cct, 0) << ": init elasticsearch config zone=" << sync_env->source_zone << dendl; yield { - string path = es_get_index_path(sync_env->store->get_realm()); + string path = conf->get_index_path(); es_index_mappings doc; @@ -367,7 +374,7 @@ public: << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime << " attrs=" << attrs << dendl; yield { - string path = es_get_obj_path(sync_env->store->get_realm(), bucket_info, key); + string path = conf->get_obj_path(bucket_info, key); es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs); call(new RGWPutRESTResourceCR(sync_env->cct, conf->conn, @@ -418,7 +425,7 @@ public: ldout(sync_env->cct, 0) << ": remove remote obj: z=" << sync_env->source_zone << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl; yield { - string path = es_get_obj_path(sync_env->store->get_realm(), bucket_info, key); + string path = conf->get_obj_path(bucket_info, key); call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn, sync_env->http_manager, @@ -450,6 +457,10 @@ public: delete conf->conn; } + void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override { + conf->init_instance(sync_env->store->get_realm(), instance_id); + } + RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override { ldout(sync_env->cct, 5) << conf->id << ": init" << dendl; return new RGWElasticInitConfigCBCR(sync_env, conf); @@ -481,6 +492,10 @@ public: RGWRESTConn *get_rest_conn() { return conf->conn; } + + string get_index_path() { + return conf->get_index_path(); + } }; RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext *cct, const map& config) @@ -498,8 +513,8 @@ RGWRESTConn *RGWElasticSyncModuleInstance::get_rest_conn() return data_handler->get_rest_conn(); } -string RGWElasticSyncModuleInstance::get_index_path(const RGWRealm& realm) { - return es_get_index_path(realm); +string RGWElasticSyncModuleInstance::get_index_path() { + return data_handler->get_index_path(); } RGWRESTMgr *RGWElasticSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) { diff --git a/src/rgw/rgw_sync_module_es.h b/src/rgw/rgw_sync_module_es.h index c9c2d5240a39..43e591e42fc3 100644 --- a/src/rgw/rgw_sync_module_es.h +++ b/src/rgw/rgw_sync_module_es.h @@ -3,8 +3,6 @@ #include "rgw_sync_module.h" -class RGWRealm; - class RGWElasticSyncModule : public RGWSyncModule { public: RGWElasticSyncModule() {} @@ -24,7 +22,7 @@ public: RGWDataSyncModule *get_data_handler() override; RGWRESTMgr *get_rest_filter(int dialect, RGWRESTMgr *orig) override; RGWRESTConn *get_rest_conn(); - std::string get_index_path(const RGWRealm& realm); + std::string get_index_path(); }; #endif diff --git a/src/rgw/rgw_sync_module_es_rest.cc b/src/rgw/rgw_sync_module_es_rest.cc index 2bc41530913b..82edd61d4eb0 100644 --- a/src/rgw/rgw_sync_module_es_rest.cc +++ b/src/rgw/rgw_sync_module_es_rest.cc @@ -209,7 +209,7 @@ void RGWMetadataSearchOp::execute() f.flush(ss); in.append(ss.str()); - string resource = es_module->get_index_path(store->get_realm()) + "/_search"; + string resource = es_module->get_index_path() + "/_search"; param_vec_t params; #define BUFSIZE 32 char buf[BUFSIZE];