}
};
+#define ES_NUM_SHARDS_MIN 5
+
+#define ES_NUM_SHARDS_DEFAULT 16
+#define ES_NUM_REPLICAS_DEFAULT 1
+
struct ElasticConfig {
uint64_t sync_instance{0};
string id;
string index_path;
- RGWRESTConn *conn{nullptr};
+ std::unique_ptr<RGWRESTConn> conn;
bool explicit_custom_meta{true};
string override_index_path;
ItemList index_buckets;
ItemList allow_owners;
+ uint32_t num_shards{0};
+ uint32_t num_replicas{0};
+
+ void init(CephContext *cct, const map<string, string, ltstr_nocase>& config) {
+ string elastic_endpoint = rgw_conf_get(config, "endpoint", "");
+ id = string("elastic:") + elastic_endpoint;
+ conn.reset(new RGWRESTConn(cct, nullptr, id, { elastic_endpoint }));
+ explicit_custom_meta = rgw_conf_get_bool(config, "explicit_custom_meta", true);
+ index_buckets.init(rgw_conf_get(config, "index_buckets_list", ""), true); /* approve all buckets by default */
+ allow_owners.init(rgw_conf_get(config, "approved_owners_list", ""), true); /* approve all bucket owners by default */
+ override_index_path = rgw_conf_get(config, "override_index_path", "");
+ num_shards = rgw_conf_get_int(config, "num_shards", ES_NUM_SHARDS_DEFAULT);
+ if (num_shards < ES_NUM_SHARDS_MIN) {
+ num_shards = ES_NUM_SHARDS_MIN;
+ }
+ num_replicas = rgw_conf_get_int(config, "num_replicas", ES_NUM_REPLICAS_DEFAULT);
+ }
void init_instance(RGWRealm& realm, uint64_t instance_id) {
sync_instance = instance_id;
f->close_section(); // custom-string
}
void dump(Formatter *f) const {
- f->open_object_section("mappings");
f->open_object_section("object");
f->open_object_section("properties");
encode_json("bucket", es_dump_type("string"), f);
f->close_section(); // meta
f->close_section(); // properties
f->close_section(); // object
- f->close_section(); // mappings
+ }
+};
+
+struct es_index_settings {
+ uint32_t num_replicas;
+ uint32_t num_shards;
+
+ es_index_settings(uint32_t _replicas, uint32_t _shards) : num_replicas(_replicas), num_shards(_shards) {}
+
+ void dump(Formatter *f) const {
+ encode_json("number_of_replicas", num_replicas, f);
+ encode_json("number_of_shards", num_shards, f);
+ }
+};
+
+struct es_index_config {
+ es_index_settings settings;
+ es_index_mappings mappings;
+
+ es_index_config(es_index_settings& _s, es_index_mappings& _m) : settings(_s), mappings(_m) {}
+
+ void dump(Formatter *f) const {
+ encode_json("settings", settings, f);
+ encode_json("mappings", mappings, f);
}
};
yield {
string path = conf->get_index_path();
- es_index_mappings doc;
+ es_index_settings settings(conf->num_replicas, conf->num_shards);
+ es_index_mappings mappings;
+
+ es_index_config index_conf(settings, mappings);
- call(new RGWPutRESTResourceCR<es_index_mappings, int>(sync_env->cct, conf->conn,
+ call(new RGWPutRESTResourceCR<es_index_config, int>(sync_env->cct, conf->conn.get(),
sync_env->http_manager,
path, nullptr /* params */,
- doc, nullptr /* result */));
+ index_conf, nullptr /* result */));
}
if (retcode < 0) {
return set_cr_error(retcode);
string path = conf->get_obj_path(bucket_info, key);
es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs);
- call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn,
+ call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn.get(),
sync_env->http_manager,
path, nullptr /* params */,
doc, nullptr /* result */));
yield {
string path = conf->get_obj_path(bucket_info, key);
- call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn,
+ call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(),
sync_env->http_manager,
path, nullptr /* params */));
}
ElasticConfigRef conf;
public:
RGWElasticDataSyncModule(CephContext *cct, const map<string, string, ltstr_nocase>& config) : conf(std::make_shared<ElasticConfig>()) {
- ElasticConfigRef default_module(std::make_shared<ElasticConfig>());
- string elastic_endpoint = rgw_conf_get(config, "endpoint", "");
- conf->id = string("elastic:") + elastic_endpoint;
- conf->conn = new RGWRESTConn(cct, nullptr, conf->id, { elastic_endpoint });
- conf->explicit_custom_meta = rgw_conf_get_bool(config, "explicit_custom_meta", true);
- conf->index_buckets.init(rgw_conf_get(config, "index_buckets_list", ""), true); /* approve all buckets by default */
- conf->allow_owners.init(rgw_conf_get(config, "approved_owners_list", ""), true); /* approve all bucket owners by default */
- conf->override_index_path = rgw_conf_get(config, "override_index_path", "");
- }
- ~RGWElasticDataSyncModule() override {
- delete conf->conn;
+ conf->init(cct, config);
}
+ ~RGWElasticDataSyncModule() override {}
void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override {
conf->init_instance(sync_env->store->get_realm(), instance_id);
return NULL;
}
RGWRESTConn *get_rest_conn() {
- return conf->conn;
+ return conf->conn.get();
}
string get_index_path() {