From: Yehuda Sadeh Date: Mon, 3 Apr 2017 22:18:39 +0000 (-0700) Subject: rgw: add "explicit_custom_meta" configurable to es sync module X-Git-Tag: ses5-milestone6~9^2~3^2~37 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=85310f0e8f3c2b830de071a4fdd334b54f68056d;p=ceph.git rgw: add "explicit_custom_meta" configurable to es sync module Modify all the map that is used to pass in module config to map so that it can be used with the conf get vals util. Also, switch to using shared_ptr to hold the ElasticConfig, so that later when we hold maps and such it doesn't need to be copied on every call. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index fe88d8247490..ec8b14fd4281 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -2182,7 +2182,7 @@ static void sync_status(Formatter *formatter) tab_dump("data sync", width, data_status); } -static void parse_tier_config_param(const string& s, map& out) +static void parse_tier_config_param(const string& s, map& out) { list confs; get_str_list(s, ",", confs); @@ -2514,8 +2514,8 @@ int main(int argc, const char **argv) string tier_type; bool tier_type_specified = false; - map tier_config_add; - map tier_config_rm; + map tier_config_add; + map tier_config_rm; boost::optional index_pool; boost::optional data_pool; diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index e305519b432a..db4da0847d1f 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1556,7 +1556,7 @@ public: } }; -int RGWDefaultSyncModule::create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) +int RGWDefaultSyncModule::create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) { instance->reset(new RGWDefaultSyncModuleInstance()); return 0; diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 75ae358198de..7a68ce5fbb21 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -520,7 +520,7 @@ class RGWDefaultSyncModule : public RGWSyncModule { public: RGWDefaultSyncModule() {} bool supports_data_export() override { return true; } - int create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) override; + int create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) override; }; // DataLogTrimCR factory function diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 6e0cca14c16f..cf426ff7ca7c 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1154,7 +1154,7 @@ struct RGWZoneParams : RGWSystemMetaObj { string realm_id; - map tier_config; + map tier_config; RGWZoneParams() : RGWSystemMetaObj() {} RGWZoneParams(const string& name) : RGWSystemMetaObj(name){} diff --git a/src/rgw/rgw_sync_module.h b/src/rgw/rgw_sync_module.h index 55b39b775e11..898eda5fd585 100644 --- a/src/rgw/rgw_sync_module.h +++ b/src/rgw/rgw_sync_module.h @@ -48,7 +48,7 @@ public: virtual ~RGWSyncModule() {} virtual bool supports_data_export() = 0; - virtual int create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) = 0; + virtual int create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) = 0; }; typedef std::shared_ptr RGWSyncModuleRef; @@ -89,7 +89,7 @@ public: return module.get()->supports_data_export(); } - int create_instance(CephContext *cct, const string& name, map& config, RGWSyncModuleInstanceRef *instance) { + int create_instance(CephContext *cct, const string& name, map& config, RGWSyncModuleInstanceRef *instance) { RGWSyncModuleRef module; if (!get_module(name, &module)) { return -ENOENT; diff --git a/src/rgw/rgw_sync_module_es.cc b/src/rgw/rgw_sync_module_es.cc index 4927e2889fd2..20c2cee25370 100644 --- a/src/rgw/rgw_sync_module_es.cc +++ b/src/rgw/rgw_sync_module_es.cc @@ -15,8 +15,11 @@ struct ElasticConfig { string id; RGWRESTConn *conn{nullptr}; + bool explicit_custom_meta{true}; }; +using ElasticConfigRef = std::shared_ptr; + static string es_get_index_path(const RGWRealm& realm) { string path = "/rgw-" + realm.get_name(); @@ -90,15 +93,16 @@ struct es_index_mappings { struct es_obj_metadata { CephContext *cct; + ElasticConfigRef es_conf; RGWBucketInfo bucket_info; rgw_obj_key key; ceph::real_time mtime; uint64_t size; map attrs; - es_obj_metadata(CephContext *_cct, const RGWBucketInfo& _bucket_info, + es_obj_metadata(CephContext *_cct, ElasticConfigRef _es_conf, const RGWBucketInfo& _bucket_info, const rgw_obj_key& _key, ceph::real_time& _mtime, uint64_t _size, - map& _attrs) : cct(_cct), bucket_info(_bucket_info), key(_key), + map& _attrs) : cct(_cct), es_conf(_es_conf), bucket_info(_bucket_info), key(_key), mtime(_mtime), size(_size), attrs(std::move(_attrs)) {} void dump(Formatter *f) const { @@ -174,7 +178,12 @@ struct es_obj_metadata { for (auto i : custom_meta) { auto config = bucket_info.mdsearch_config.find(i.first); if (config == bucket_info.mdsearch_config.end()) { - ldout(cct, 20) << "custom meta entry key=" << i.first << " not found in bucket mdsearch config: " << bucket_info.mdsearch_config << dendl; + if (!es_conf->explicit_custom_meta) { + /* default custom meta is of type string */ + custom_str[i.first] = i.second; + } else { + ldout(cct, 20) << "custom meta entry key=" << i.first << " not found in bucket mdsearch config: " << bucket_info.mdsearch_config << dendl; + } continue; } switch (config->second) { @@ -225,12 +234,12 @@ struct es_obj_metadata { class RGWElasticInitConfigCBCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; - const ElasticConfig& conf; + ElasticConfigRef conf; public: RGWElasticInitConfigCBCR(RGWDataSyncEnv *_sync_env, - const ElasticConfig& _conf) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), - conf(_conf) {} + ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), + conf(_conf) {} int operate() override { reenter(this) { ldout(sync_env->cct, 0) << ": init elasticsearch config zone=" << sync_env->source_zone << dendl; @@ -239,7 +248,7 @@ public: es_index_mappings doc; - call(new RGWPutRESTResourceCR(sync_env->cct, conf.conn, + call(new RGWPutRESTResourceCR(sync_env->cct, conf->conn, sync_env->http_manager, path, nullptr /* params */, doc, nullptr /* result */)); @@ -255,11 +264,11 @@ public: }; class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { - const ElasticConfig& conf; + ElasticConfigRef conf; public: RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env, RGWBucketInfo& _bucket_info, rgw_obj_key& _key, - const ElasticConfig& _conf) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf) {} + ElasticConfigRef _conf) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf) {} int operate() override { reenter(this) { ldout(sync_env->cct, 0) << ": stat of remote obj: z=" << sync_env->source_zone @@ -267,9 +276,9 @@ public: << " attrs=" << attrs << dendl; yield { string path = es_get_obj_path(sync_env->store->get_realm(), bucket_info, key); - es_obj_metadata doc(sync_env->cct, bucket_info, key, mtime, size, attrs); + es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs); - call(new RGWPutRESTResourceCR(sync_env->cct, conf.conn, + call(new RGWPutRESTResourceCR(sync_env->cct, conf->conn, sync_env->http_manager, path, nullptr /* params */, doc, nullptr /* result */)); @@ -286,11 +295,11 @@ public: }; class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR { - const ElasticConfig& conf; + ElasticConfigRef conf; public: RGWElasticHandleRemoteObjCR(RGWDataSyncEnv *_sync_env, RGWBucketInfo& _bucket_info, rgw_obj_key& _key, - const ElasticConfig& _conf) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key), + ElasticConfigRef _conf) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key), conf(_conf) { } @@ -306,11 +315,11 @@ class RGWElasticRemoveRemoteObjCBCR : public RGWCoroutine { RGWBucketInfo bucket_info; rgw_obj_key key; ceph::real_time mtime; - const ElasticConfig& conf; + ElasticConfigRef conf; public: RGWElasticRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env, RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime, - const ElasticConfig& _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bucket_info(_bucket_info), key(_key), mtime(_mtime), conf(_conf) {} int operate() override { @@ -320,7 +329,7 @@ public: yield { string path = es_get_obj_path(sync_env->store->get_realm(), bucket_info, key); - call(new RGWDeleteRESTResourceCR(sync_env->cct, conf.conn, + call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn, sync_env->http_manager, path, nullptr /* params */)); } @@ -335,43 +344,46 @@ public: }; class RGWElasticDataSyncModule : public RGWDataSyncModule { - ElasticConfig conf; + ElasticConfigRef conf; public: - RGWElasticDataSyncModule(CephContext *cct, const string& elastic_endpoint) { - conf.id = string("elastic:") + elastic_endpoint; - conf.conn = new RGWRESTConn(cct, nullptr, conf.id, { elastic_endpoint }); + RGWElasticDataSyncModule(CephContext *cct, const map& config) : conf(std::make_shared()) { + ElasticConfigRef default_module(std::make_shared()); + string elastic_endpoint = rgw_conf_get(config, "endpoint", ""); + conf->id = string("elastic:") + elastic_endpoint; + conf->conn = new RGWRESTConn(cct, nullptr, conf->id, { elastic_endpoint }); + conf->explicit_custom_meta = rgw_conf_get_bool(config, "explicit_custom_meta", true); } ~RGWElasticDataSyncModule() override { - delete conf.conn; + delete conf->conn; } RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override { - ldout(sync_env->cct, 0) << conf.id << ": init" << dendl; + ldout(sync_env->cct, 0) << conf->id << ": init" << dendl; return new RGWElasticInitConfigCBCR(sync_env, conf); } RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 0) << conf.id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl; + ldout(sync_env->cct, 0) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl; return new RGWElasticHandleRemoteObjCR(sync_env, bucket_info, key, conf); } RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { /* versioned and versioned epoch params are useless in the elasticsearch backend case */ - ldout(sync_env->cct, 0) << conf.id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; + ldout(sync_env->cct, 0) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return new RGWElasticRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf); } RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 0) << conf.id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime + ldout(sync_env->cct, 0) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return NULL; } RGWRESTConn *get_rest_conn() { - return conf.conn; + return conf->conn; } }; -RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext *cct, const string& endpoint) +RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext *cct, const map& config) { - data_handler = std::unique_ptr(new RGWElasticDataSyncModule(cct, endpoint)); + data_handler = std::unique_ptr(new RGWElasticDataSyncModule(cct, config)); } RGWDataSyncModule *RGWElasticSyncModuleInstance::get_data_handler() @@ -395,13 +407,13 @@ RGWRESTMgr *RGWElasticSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMg return new RGWRESTMgr_MDSearch_S3(this); } -int RGWElasticSyncModule::create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) { +int RGWElasticSyncModule::create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) { string endpoint; auto i = config.find("endpoint"); if (i != config.end()) { endpoint = i->second; } - instance->reset(new RGWElasticSyncModuleInstance(cct, endpoint)); + instance->reset(new RGWElasticSyncModuleInstance(cct, config)); return 0; } diff --git a/src/rgw/rgw_sync_module_es.h b/src/rgw/rgw_sync_module_es.h index 539302ef6f46..c9c2d5240a39 100644 --- a/src/rgw/rgw_sync_module_es.h +++ b/src/rgw/rgw_sync_module_es.h @@ -11,7 +11,7 @@ public: bool supports_data_export() override { return false; } - int create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) override; + int create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) override; }; class RGWElasticDataSyncModule; @@ -20,7 +20,7 @@ class RGWRESTConn; class RGWElasticSyncModuleInstance : public RGWSyncModuleInstance { std::unique_ptr data_handler; public: - RGWElasticSyncModuleInstance(CephContext *cct, const string& endpoint); + RGWElasticSyncModuleInstance(CephContext *cct, const std::map& config); RGWDataSyncModule *get_data_handler() override; RGWRESTMgr *get_rest_filter(int dialect, RGWRESTMgr *orig) override; RGWRESTConn *get_rest_conn(); diff --git a/src/rgw/rgw_sync_module_log.cc b/src/rgw/rgw_sync_module_log.cc index f253d7074297..67212ec593ae 100644 --- a/src/rgw/rgw_sync_module_log.cc +++ b/src/rgw/rgw_sync_module_log.cc @@ -64,7 +64,7 @@ public: } }; -int RGWLogSyncModule::create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) { +int RGWLogSyncModule::create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) { string prefix; auto i = config.find("prefix"); if (i != config.end()) { diff --git a/src/rgw/rgw_sync_module_log.h b/src/rgw/rgw_sync_module_log.h index 5b7bae6552de..9afc9108e678 100644 --- a/src/rgw/rgw_sync_module_log.h +++ b/src/rgw/rgw_sync_module_log.h @@ -9,7 +9,7 @@ public: bool supports_data_export() override { return false; } - int create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) override; + int create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) override; }; #endif