From: Yehuda Sadeh Date: Wed, 5 Apr 2017 00:30:26 +0000 (-0700) Subject: rgw: es: configurable number of replicas and shards X-Git-Tag: ses5-milestone6~9^2~3^2~32 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8bfe1379a187af57697ae70f7f8338691834b235;p=ceph.git rgw: es: configurable number of replicas and shards Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_sync_module_es.cc b/src/rgw/rgw_sync_module_es.cc index 0ab7ad40293b..6ceeab64d9de 100644 --- a/src/rgw/rgw_sync_module_es.cc +++ b/src/rgw/rgw_sync_module_es.cc @@ -97,15 +97,37 @@ public: } }; +#define ES_NUM_SHARDS_MIN 5 + +#define ES_NUM_SHARDS_DEFAULT 16 +#define ES_NUM_REPLICAS_DEFAULT 1 + struct ElasticConfig { uint64_t sync_instance{0}; string id; string index_path; - RGWRESTConn *conn{nullptr}; + std::unique_ptr conn; bool explicit_custom_meta{true}; string override_index_path; ItemList index_buckets; ItemList allow_owners; + uint32_t num_shards{0}; + uint32_t num_replicas{0}; + + void init(CephContext *cct, const map& config) { + string elastic_endpoint = rgw_conf_get(config, "endpoint", ""); + id = string("elastic:") + elastic_endpoint; + conn.reset(new RGWRESTConn(cct, nullptr, id, { elastic_endpoint })); + explicit_custom_meta = rgw_conf_get_bool(config, "explicit_custom_meta", true); + index_buckets.init(rgw_conf_get(config, "index_buckets_list", ""), true); /* approve all buckets by default */ + allow_owners.init(rgw_conf_get(config, "approved_owners_list", ""), true); /* approve all bucket owners by default */ + override_index_path = rgw_conf_get(config, "override_index_path", ""); + num_shards = rgw_conf_get_int(config, "num_shards", ES_NUM_SHARDS_DEFAULT); + if (num_shards < ES_NUM_SHARDS_MIN) { + num_shards = ES_NUM_SHARDS_MIN; + } + num_replicas = rgw_conf_get_int(config, "num_replicas", ES_NUM_REPLICAS_DEFAULT); + } void init_instance(RGWRealm& realm, uint64_t instance_id) { sync_instance = instance_id; @@ -165,7 +187,6 @@ struct es_index_mappings { f->close_section(); // custom-string } void dump(Formatter *f) const { - f->open_object_section("mappings"); f->open_object_section("object"); f->open_object_section("properties"); encode_json("bucket", es_dump_type("string"), f); @@ -192,7 +213,30 @@ struct es_index_mappings { f->close_section(); // meta f->close_section(); // properties f->close_section(); // object - f->close_section(); // mappings + } +}; + +struct es_index_settings { + uint32_t num_replicas; + uint32_t num_shards; + + es_index_settings(uint32_t _replicas, uint32_t _shards) : num_replicas(_replicas), num_shards(_shards) {} + + void dump(Formatter *f) const { + encode_json("number_of_replicas", num_replicas, f); + encode_json("number_of_shards", num_shards, f); + } +}; + +struct es_index_config { + es_index_settings settings; + es_index_mappings mappings; + + es_index_config(es_index_settings& _s, es_index_mappings& _m) : settings(_s), mappings(_m) {} + + void dump(Formatter *f) const { + encode_json("settings", settings, f); + encode_json("mappings", mappings, f); } }; @@ -351,12 +395,15 @@ public: yield { string path = conf->get_index_path(); - es_index_mappings doc; + es_index_settings settings(conf->num_replicas, conf->num_shards); + es_index_mappings mappings; + + es_index_config index_conf(settings, mappings); - call(new RGWPutRESTResourceCR(sync_env->cct, conf->conn, + call(new RGWPutRESTResourceCR(sync_env->cct, conf->conn.get(), sync_env->http_manager, path, nullptr /* params */, - doc, nullptr /* result */)); + index_conf, nullptr /* result */)); } if (retcode < 0) { return set_cr_error(retcode); @@ -383,7 +430,7 @@ public: string path = conf->get_obj_path(bucket_info, key); es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs); - call(new RGWPutRESTResourceCR(sync_env->cct, conf->conn, + call(new RGWPutRESTResourceCR(sync_env->cct, conf->conn.get(), sync_env->http_manager, path, nullptr /* params */, doc, nullptr /* result */)); @@ -433,7 +480,7 @@ public: yield { string path = conf->get_obj_path(bucket_info, key); - call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn, + call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(), sync_env->http_manager, path, nullptr /* params */)); } @@ -451,18 +498,9 @@ class RGWElasticDataSyncModule : public RGWDataSyncModule { ElasticConfigRef conf; public: RGWElasticDataSyncModule(CephContext *cct, const map& config) : conf(std::make_shared()) { - ElasticConfigRef default_module(std::make_shared()); - string elastic_endpoint = rgw_conf_get(config, "endpoint", ""); - conf->id = string("elastic:") + elastic_endpoint; - conf->conn = new RGWRESTConn(cct, nullptr, conf->id, { elastic_endpoint }); - conf->explicit_custom_meta = rgw_conf_get_bool(config, "explicit_custom_meta", true); - conf->index_buckets.init(rgw_conf_get(config, "index_buckets_list", ""), true); /* approve all buckets by default */ - conf->allow_owners.init(rgw_conf_get(config, "approved_owners_list", ""), true); /* approve all bucket owners by default */ - conf->override_index_path = rgw_conf_get(config, "override_index_path", ""); - } - ~RGWElasticDataSyncModule() override { - delete conf->conn; + conf->init(cct, config); } + ~RGWElasticDataSyncModule() override {} void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override { conf->init_instance(sync_env->store->get_realm(), instance_id); @@ -497,7 +535,7 @@ public: return NULL; } RGWRESTConn *get_rest_conn() { - return conf->conn; + return conf->conn.get(); } string get_index_path() {