From 85310f0e8f3c2b830de071a4fdd334b54f68056d Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 3 Apr 2017 15:18:39 -0700 Subject: [PATCH] rgw: add "explicit_custom_meta" configurable to es sync module Modify all the map that is used to pass in module config to map so that it can be used with the conf get vals util. Also, switch to using shared_ptr to hold the ElasticConfig, so that later when we hold maps and such it doesn't need to be copied on every call. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_admin.cc | 6 +-- src/rgw/rgw_data_sync.cc | 2 +- src/rgw/rgw_data_sync.h | 2 +- src/rgw/rgw_rados.h | 2 +- src/rgw/rgw_sync_module.h | 4 +- src/rgw/rgw_sync_module_es.cc | 74 ++++++++++++++++++++-------------- src/rgw/rgw_sync_module_es.h | 4 +- src/rgw/rgw_sync_module_log.cc | 2 +- src/rgw/rgw_sync_module_log.h | 2 +- 9 files changed, 55 insertions(+), 43 deletions(-) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index fe88d824749..ec8b14fd428 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -2182,7 +2182,7 @@ static void sync_status(Formatter *formatter) tab_dump("data sync", width, data_status); } -static void parse_tier_config_param(const string& s, map& out) +static void parse_tier_config_param(const string& s, map& out) { list confs; get_str_list(s, ",", confs); @@ -2514,8 +2514,8 @@ int main(int argc, const char **argv) string tier_type; bool tier_type_specified = false; - map tier_config_add; - map tier_config_rm; + map tier_config_add; + map tier_config_rm; boost::optional index_pool; boost::optional data_pool; diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index e305519b432..db4da0847d1 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1556,7 +1556,7 @@ public: } }; -int RGWDefaultSyncModule::create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) +int RGWDefaultSyncModule::create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) { instance->reset(new RGWDefaultSyncModuleInstance()); return 0; diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 75ae358198d..7a68ce5fbb2 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -520,7 +520,7 @@ class RGWDefaultSyncModule : public RGWSyncModule { public: RGWDefaultSyncModule() {} bool supports_data_export() override { return true; } - int create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) override; + int create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) override; }; // DataLogTrimCR factory function diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 6e0cca14c16..cf426ff7ca7 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1154,7 +1154,7 @@ struct RGWZoneParams : RGWSystemMetaObj { string realm_id; - map tier_config; + map tier_config; RGWZoneParams() : RGWSystemMetaObj() {} RGWZoneParams(const string& name) : RGWSystemMetaObj(name){} diff --git a/src/rgw/rgw_sync_module.h b/src/rgw/rgw_sync_module.h index 55b39b775e1..898eda5fd58 100644 --- a/src/rgw/rgw_sync_module.h +++ b/src/rgw/rgw_sync_module.h @@ -48,7 +48,7 @@ public: virtual ~RGWSyncModule() {} virtual bool supports_data_export() = 0; - virtual int create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) = 0; + virtual int create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) = 0; }; typedef std::shared_ptr RGWSyncModuleRef; @@ -89,7 +89,7 @@ public: return module.get()->supports_data_export(); } - int create_instance(CephContext *cct, const string& name, map& config, RGWSyncModuleInstanceRef *instance) { + int create_instance(CephContext *cct, const string& name, map& config, RGWSyncModuleInstanceRef *instance) { RGWSyncModuleRef module; if (!get_module(name, &module)) { return -ENOENT; diff --git a/src/rgw/rgw_sync_module_es.cc b/src/rgw/rgw_sync_module_es.cc index 4927e2889fd..20c2cee2537 100644 --- a/src/rgw/rgw_sync_module_es.cc +++ b/src/rgw/rgw_sync_module_es.cc @@ -15,8 +15,11 @@ struct ElasticConfig { string id; RGWRESTConn *conn{nullptr}; + bool explicit_custom_meta{true}; }; +using ElasticConfigRef = std::shared_ptr; + static string es_get_index_path(const RGWRealm& realm) { string path = "/rgw-" + realm.get_name(); @@ -90,15 +93,16 @@ struct es_index_mappings { struct es_obj_metadata { CephContext *cct; + ElasticConfigRef es_conf; RGWBucketInfo bucket_info; rgw_obj_key key; ceph::real_time mtime; uint64_t size; map attrs; - es_obj_metadata(CephContext *_cct, const RGWBucketInfo& _bucket_info, + es_obj_metadata(CephContext *_cct, ElasticConfigRef _es_conf, const RGWBucketInfo& _bucket_info, const rgw_obj_key& _key, ceph::real_time& _mtime, uint64_t _size, - map& _attrs) : cct(_cct), bucket_info(_bucket_info), key(_key), + map& _attrs) : cct(_cct), es_conf(_es_conf), bucket_info(_bucket_info), key(_key), mtime(_mtime), size(_size), attrs(std::move(_attrs)) {} void dump(Formatter *f) const { @@ -174,7 +178,12 @@ struct es_obj_metadata { for (auto i : custom_meta) { auto config = bucket_info.mdsearch_config.find(i.first); if (config == bucket_info.mdsearch_config.end()) { - ldout(cct, 20) << "custom meta entry key=" << i.first << " not found in bucket mdsearch config: " << bucket_info.mdsearch_config << dendl; + if (!es_conf->explicit_custom_meta) { + /* default custom meta is of type string */ + custom_str[i.first] = i.second; + } else { + ldout(cct, 20) << "custom meta entry key=" << i.first << " not found in bucket mdsearch config: " << bucket_info.mdsearch_config << dendl; + } continue; } switch (config->second) { @@ -225,12 +234,12 @@ struct es_obj_metadata { class RGWElasticInitConfigCBCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; - const ElasticConfig& conf; + ElasticConfigRef conf; public: RGWElasticInitConfigCBCR(RGWDataSyncEnv *_sync_env, - const ElasticConfig& _conf) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), - conf(_conf) {} + ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), + conf(_conf) {} int operate() override { reenter(this) { ldout(sync_env->cct, 0) << ": init elasticsearch config zone=" << sync_env->source_zone << dendl; @@ -239,7 +248,7 @@ public: es_index_mappings doc; - call(new RGWPutRESTResourceCR(sync_env->cct, conf.conn, + call(new RGWPutRESTResourceCR(sync_env->cct, conf->conn, sync_env->http_manager, path, nullptr /* params */, doc, nullptr /* result */)); @@ -255,11 +264,11 @@ public: }; class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { - const ElasticConfig& conf; + ElasticConfigRef conf; public: RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env, RGWBucketInfo& _bucket_info, rgw_obj_key& _key, - const ElasticConfig& _conf) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf) {} + ElasticConfigRef _conf) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf) {} int operate() override { reenter(this) { ldout(sync_env->cct, 0) << ": stat of remote obj: z=" << sync_env->source_zone @@ -267,9 +276,9 @@ public: << " attrs=" << attrs << dendl; yield { string path = es_get_obj_path(sync_env->store->get_realm(), bucket_info, key); - es_obj_metadata doc(sync_env->cct, bucket_info, key, mtime, size, attrs); + es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs); - call(new RGWPutRESTResourceCR(sync_env->cct, conf.conn, + call(new RGWPutRESTResourceCR(sync_env->cct, conf->conn, sync_env->http_manager, path, nullptr /* params */, doc, nullptr /* result */)); @@ -286,11 +295,11 @@ public: }; class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR { - const ElasticConfig& conf; + ElasticConfigRef conf; public: RGWElasticHandleRemoteObjCR(RGWDataSyncEnv *_sync_env, RGWBucketInfo& _bucket_info, rgw_obj_key& _key, - const ElasticConfig& _conf) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key), + ElasticConfigRef _conf) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key), conf(_conf) { } @@ -306,11 +315,11 @@ class RGWElasticRemoveRemoteObjCBCR : public RGWCoroutine { RGWBucketInfo bucket_info; rgw_obj_key key; ceph::real_time mtime; - const ElasticConfig& conf; + ElasticConfigRef conf; public: RGWElasticRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env, RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime, - const ElasticConfig& _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bucket_info(_bucket_info), key(_key), mtime(_mtime), conf(_conf) {} int operate() override { @@ -320,7 +329,7 @@ public: yield { string path = es_get_obj_path(sync_env->store->get_realm(), bucket_info, key); - call(new RGWDeleteRESTResourceCR(sync_env->cct, conf.conn, + call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn, sync_env->http_manager, path, nullptr /* params */)); } @@ -335,43 +344,46 @@ public: }; class RGWElasticDataSyncModule : public RGWDataSyncModule { - ElasticConfig conf; + ElasticConfigRef conf; public: - RGWElasticDataSyncModule(CephContext *cct, const string& elastic_endpoint) { - conf.id = string("elastic:") + elastic_endpoint; - conf.conn = new RGWRESTConn(cct, nullptr, conf.id, { elastic_endpoint }); + RGWElasticDataSyncModule(CephContext *cct, const map& config) : conf(std::make_shared()) { + ElasticConfigRef default_module(std::make_shared()); + string elastic_endpoint = rgw_conf_get(config, "endpoint", ""); + conf->id = string("elastic:") + elastic_endpoint; + conf->conn = new RGWRESTConn(cct, nullptr, conf->id, { elastic_endpoint }); + conf->explicit_custom_meta = rgw_conf_get_bool(config, "explicit_custom_meta", true); } ~RGWElasticDataSyncModule() override { - delete conf.conn; + delete conf->conn; } RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override { - ldout(sync_env->cct, 0) << conf.id << ": init" << dendl; + ldout(sync_env->cct, 0) << conf->id << ": init" << dendl; return new RGWElasticInitConfigCBCR(sync_env, conf); } RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 0) << conf.id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl; + ldout(sync_env->cct, 0) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl; return new RGWElasticHandleRemoteObjCR(sync_env, bucket_info, key, conf); } RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { /* versioned and versioned epoch params are useless in the elasticsearch backend case */ - ldout(sync_env->cct, 0) << conf.id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; + ldout(sync_env->cct, 0) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return new RGWElasticRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf); } RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 0) << conf.id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime + ldout(sync_env->cct, 0) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return NULL; } RGWRESTConn *get_rest_conn() { - return conf.conn; + return conf->conn; } }; -RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext *cct, const string& endpoint) +RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext *cct, const map& config) { - data_handler = std::unique_ptr(new RGWElasticDataSyncModule(cct, endpoint)); + data_handler = std::unique_ptr(new RGWElasticDataSyncModule(cct, config)); } RGWDataSyncModule *RGWElasticSyncModuleInstance::get_data_handler() @@ -395,13 +407,13 @@ RGWRESTMgr *RGWElasticSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMg return new RGWRESTMgr_MDSearch_S3(this); } -int RGWElasticSyncModule::create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) { +int RGWElasticSyncModule::create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) { string endpoint; auto i = config.find("endpoint"); if (i != config.end()) { endpoint = i->second; } - instance->reset(new RGWElasticSyncModuleInstance(cct, endpoint)); + instance->reset(new RGWElasticSyncModuleInstance(cct, config)); return 0; } diff --git a/src/rgw/rgw_sync_module_es.h b/src/rgw/rgw_sync_module_es.h index 539302ef6f4..c9c2d5240a3 100644 --- a/src/rgw/rgw_sync_module_es.h +++ b/src/rgw/rgw_sync_module_es.h @@ -11,7 +11,7 @@ public: bool supports_data_export() override { return false; } - int create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) override; + int create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) override; }; class RGWElasticDataSyncModule; @@ -20,7 +20,7 @@ class RGWRESTConn; class RGWElasticSyncModuleInstance : public RGWSyncModuleInstance { std::unique_ptr data_handler; public: - RGWElasticSyncModuleInstance(CephContext *cct, const string& endpoint); + RGWElasticSyncModuleInstance(CephContext *cct, const std::map& config); RGWDataSyncModule *get_data_handler() override; RGWRESTMgr *get_rest_filter(int dialect, RGWRESTMgr *orig) override; RGWRESTConn *get_rest_conn(); diff --git a/src/rgw/rgw_sync_module_log.cc b/src/rgw/rgw_sync_module_log.cc index f253d707429..67212ec593a 100644 --- a/src/rgw/rgw_sync_module_log.cc +++ b/src/rgw/rgw_sync_module_log.cc @@ -64,7 +64,7 @@ public: } }; -int RGWLogSyncModule::create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) { +int RGWLogSyncModule::create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) { string prefix; auto i = config.find("prefix"); if (i != config.end()) { diff --git a/src/rgw/rgw_sync_module_log.h b/src/rgw/rgw_sync_module_log.h index 5b7bae6552d..9afc9108e67 100644 --- a/src/rgw/rgw_sync_module_log.h +++ b/src/rgw/rgw_sync_module_log.h @@ -9,7 +9,7 @@ public: bool supports_data_export() override { return false; } - int create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) override; + int create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) override; }; #endif -- 2.47.3