From: Yehuda Sadeh Date: Wed, 17 Aug 2016 09:45:32 +0000 (-0700) Subject: rgw: initial implementation of elasticsearch sync module X-Git-Tag: v11.1.0~681^2~16 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=faa90fabb32536ec633ddf8338849370cfbf2008;p=ceph.git rgw: initial implementation of elasticsearch sync module sync module that will handle rgw metadata indexing. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 4bbb548257b8..01a5a79ebcfd 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -51,6 +51,7 @@ set(rgw_a_srcs rgw_sync.cc rgw_data_sync.cc rgw_sync_module.cc + rgw_sync_module_es.cc rgw_sync_module_log.cc rgw_period_history.cc rgw_period_puller.cc diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 0be2bd44a7b0..1cd2e02974c8 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1505,7 +1505,7 @@ public: } }; -int RGWDefaultSyncModule::create_instance(map& config, RGWSyncModuleInstanceRef *instance) +int RGWDefaultSyncModule::create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) { instance->reset(new RGWDefaultSyncModuleInstance()); return 0; diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 331249c2b269..80dda7725e85 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -526,7 +526,7 @@ class RGWDefaultSyncModule : public RGWSyncModule { public: RGWDefaultSyncModule() {} bool supports_data_export() override { return true; } - int create_instance(map& config, RGWSyncModuleInstanceRef *instance) override; + int create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) override; }; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index dc03f28a37b8..49cf4e0d4e08 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -3791,7 +3791,7 @@ int RGWRados::init_complete() zone_short_id = current_period.get_map().get_zone_short_id(zone_params.get_id()); - ret = sync_modules_manager->create_instance(zone_public_config.tier_type, zone_params.tier_config, &sync_module); + ret = sync_modules_manager->create_instance(cct, zone_public_config.tier_type, zone_params.tier_config, &sync_module); if (ret < 0) { lderr(cct) << "ERROR: failed to init sync module instance, ret=" << ret << dendl; return ret; diff --git a/src/rgw/rgw_sync_module.cc b/src/rgw/rgw_sync_module.cc index 3e65de6b46cb..36418e5a28d0 100644 --- a/src/rgw/rgw_sync_module.cc +++ b/src/rgw/rgw_sync_module.cc @@ -6,6 +6,7 @@ #include "rgw_boost_asio_yield.h" #include "rgw_sync_module_log.h" +#include "rgw_sync_module_es.h" #define dout_subsys ceph_subsys_rgw @@ -58,4 +59,7 @@ void rgw_register_sync_modules(RGWSyncModulesManager *modules_manager) RGWSyncModuleRef log_module(new RGWLogSyncModule()); modules_manager->register_module("log", log_module); + + RGWSyncModuleRef es_module(new RGWElasticSyncModule()); + modules_manager->register_module("elasticsearch", es_module); } diff --git a/src/rgw/rgw_sync_module.h b/src/rgw/rgw_sync_module.h index 7b465b9e8ce1..2bae4d2d47c4 100644 --- a/src/rgw/rgw_sync_module.h +++ b/src/rgw/rgw_sync_module.h @@ -39,7 +39,7 @@ public: virtual ~RGWSyncModule() {} virtual bool supports_data_export() = 0; - virtual int create_instance(map& config, RGWSyncModuleInstanceRef *instance) = 0; + virtual int create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) = 0; }; typedef std::shared_ptr RGWSyncModuleRef; @@ -80,13 +80,13 @@ public: return module.get()->supports_data_export(); } - int create_instance(const string& name, map& config, RGWSyncModuleInstanceRef *instance) { + int create_instance(CephContext *cct, const string& name, map& config, RGWSyncModuleInstanceRef *instance) { RGWSyncModuleRef module; if (!get_module(name, &module)) { return -ENOENT; } - return module.get()->create_instance(config, instance); + return module.get()->create_instance(cct, config, instance); } }; diff --git a/src/rgw/rgw_sync_module_es.cc b/src/rgw/rgw_sync_module_es.cc new file mode 100644 index 000000000000..6aa9d73e9b74 --- /dev/null +++ b/src/rgw/rgw_sync_module_es.cc @@ -0,0 +1,95 @@ +#include "rgw_common.h" +#include "rgw_coroutine.h" +#include "rgw_sync_module.h" +#include "rgw_data_sync.h" +#include "rgw_boost_asio_yield.h" +#include "rgw_sync_module_es.h" +#include "rgw_rest_conn.h" + +#define dout_subsys ceph_subsys_rgw + +struct ElasticConfig { + string id; + RGWRESTConn *conn{nullptr}; +}; + +class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { + const ElasticConfig& conf; +public: + RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env, + RGWBucketInfo& _bucket_info, rgw_obj_key& _key, + const ElasticConfig& _conf) : RGWStatRemoteObjCBCR(sync_env, bucket_info, key), conf(_conf) {} + int operate() override { + reenter(this) { + ldout(sync_env->cct, 0) << ": stat of remote obj: z=" << sync_env->source_zone + << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime + << " attrs=" << attrs << dendl; + return set_cr_done(); + } + return 0; + } + +}; + +class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR { + const ElasticConfig& conf; +public: + RGWElasticHandleRemoteObjCR(RGWDataSyncEnv *_sync_env, + RGWBucketInfo& _bucket_info, rgw_obj_key& _key, + const ElasticConfig& _conf) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key), + conf(_conf) { + } + + ~RGWElasticHandleRemoteObjCR() {} + + RGWStatRemoteObjCBCR *allocate_callback() override { + return new RGWElasticHandleRemoteObjCBCR(sync_env, bucket_info, key, conf); + } +}; + +class RGWElasticDataSyncModule : public RGWDataSyncModule { + ElasticConfig conf; +public: + RGWElasticDataSyncModule(CephContext *cct, const string& elastic_endpoint) { + conf.id = string("elastic:") + elastic_endpoint; + conf.conn = new RGWRESTConn(cct, nullptr, conf.id, { elastic_endpoint }); + } + ~RGWElasticDataSyncModule() { + delete conf.conn; + } + + RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch) override { + ldout(sync_env->cct, 0) << conf.id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl; + return new RGWElasticHandleRemoteObjCR(sync_env, bucket_info, key, conf); + } + RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch) override { + ldout(sync_env->cct, 0) << conf.id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; + return NULL; + } + RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, + rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch) override { + ldout(sync_env->cct, 0) << conf.id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime + << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; + return NULL; + } +}; + +class RGWElasticSyncModuleInstance : public RGWSyncModuleInstance { + RGWElasticDataSyncModule data_handler; +public: + RGWElasticSyncModuleInstance(CephContext *cct, const string& endpoint) : data_handler(cct, endpoint) {} + RGWDataSyncModule *get_data_handler() override { + return &data_handler; + } +}; + +int RGWElasticSyncModule::create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) { + string endpoint; + auto i = config.find("endpoint"); + if (i != config.end()) { + endpoint = i->second; + } + instance->reset(new RGWElasticSyncModuleInstance(cct, endpoint)); + return 0; +} + diff --git a/src/rgw/rgw_sync_module_es.h b/src/rgw/rgw_sync_module_es.h new file mode 100644 index 000000000000..73c8368571d8 --- /dev/null +++ b/src/rgw/rgw_sync_module_es.h @@ -0,0 +1,15 @@ +#ifndef CEPH_RGW_SYNC_MODULE_ES_H +#define CEPH_RGW_SYNC_MODULE_ES_H + +#include "rgw_sync_module.h" + +class RGWElasticSyncModule : public RGWSyncModule { +public: + RGWElasticSyncModule() {} + bool supports_data_export() override { + return false; + } + int create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) override; +}; + +#endif diff --git a/src/rgw/rgw_sync_module_log.cc b/src/rgw/rgw_sync_module_log.cc index 2c6352ac8a18..230fde3f01cf 100644 --- a/src/rgw/rgw_sync_module_log.cc +++ b/src/rgw/rgw_sync_module_log.cc @@ -64,7 +64,7 @@ public: } }; -int RGWLogSyncModule::create_instance(map& config, RGWSyncModuleInstanceRef *instance) { +int RGWLogSyncModule::create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) { string prefix; auto i = config.find("prefix"); if (i != config.end()) { diff --git a/src/rgw/rgw_sync_module_log.h b/src/rgw/rgw_sync_module_log.h index f889e319ae92..5b7bae6552de 100644 --- a/src/rgw/rgw_sync_module_log.h +++ b/src/rgw/rgw_sync_module_log.h @@ -9,7 +9,7 @@ public: bool supports_data_export() override { return false; } - int create_instance(map& config, RGWSyncModuleInstanceRef *instance) override; + int create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) override; }; #endif