From: Yehuda Sadeh Date: Tue, 26 Jun 2018 23:02:14 +0000 (-0700) Subject: rgw: initial pub sub work X-Git-Tag: v14.1.0~616^2~73 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=88e7f610736c0bfebd5d16d25dc32139ece26a83;p=ceph.git rgw: initial pub sub work Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 502ab0891acb3..1072631f072d9 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -77,6 +77,7 @@ set(librgw_common_srcs rgw_metadata.cc rgw_multi.cc rgw_multi_del.cc + rgw_pubsub.cc rgw_sync.cc rgw_data_sync.cc rgw_sync_module.cc @@ -84,6 +85,7 @@ set(librgw_common_srcs rgw_sync_module_es.cc rgw_sync_module_es_rest.cc rgw_sync_module_log.cc + rgw_sync_module_pubsub.cc rgw_sync_log_trim.cc rgw_sync_trace.cc rgw_period_history.cc diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc new file mode 100644 index 0000000000000..49cd9da938559 --- /dev/null +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -0,0 +1,387 @@ +#include "rgw_common.h" +#include "rgw_coroutine.h" +#include "rgw_sync_module.h" +#include "rgw_data_sync.h" +#include "rgw_sync_module_pubsub.h" +#include "rgw_rest_conn.h" +#include "rgw_cr_rados.h" +#include "rgw_cr_tools.h" +#include "rgw_op.h" +#include "rgw_pubsub.h" + +#include + +#define dout_subsys ceph_subsys_rgw + + +/* + +config: + +{ + "tenant": , # default: + "uid": , # default: "pubsub" + "data_bucket_prefix": # default: "pubsub-" + "data_oid_prefix": # + + # non-dynamic config + "notifications": [ + { + "path": , # this can be either an explicit path: , or /, + # or a prefix if it ends with a wildcard + "topic": + }, + ... + ], + "subscriptions": [ + { + "name": , + "topic": , + "push_endpoint": , + "data_bucket": , # override name of bucket where subscription data will be store + "data_oid_prefix": # set prefix for subscription data object ids + }, + ... + ] +} + +*/ + +struct PSSubConfig { /* subscription config */ + string name; + string topic; + string push_endpoint; + + void init(CephContext *cct, const JSONFormattable& config) { + name = config["name"]; + topic = config["topic"]; + push_endpoint = config["push_endpoint"]; + } +}; + +struct PSTopicConfig { + string name; +}; + +struct PSNotificationConfig { + string path; /* a path or a path prefix that would trigger the event (prefix: if ends with a wildcard) */ + string topic; + + uint64_t id{0}; + bool is_prefix{false}; + + void init(CephContext *cct, const JSONFormattable& config) { + path = config["path"]; + if (!path.empty() && path[path.size() - 1] == '*') { + path = path.substr(0, path.size() - 1); + is_prefix = true; + } + topic = config["topic"]; + } +}; + +template +static string json_str(const char *name, const T& obj, bool pretty = false) +{ + stringstream ss; + JSONFormatter f(pretty); + + encode_json(name, obj, &f); + f.flush(ss); + + return ss.str(); +} + + +struct PSConfig { + string id{"pubsub"}; + string ps_uid{"pubsub"}; + uint64_t sync_instance{0}; + uint32_t num_pub_shards{0}; + uint32_t num_topic_shards{0}; + uint64_t max_id{0}; + + /* FIXME: no hard coded buckets, we'll have configurable topics */ + vector subscriptions; + map topics; + multimap notifications; + + void init(CephContext *cct, const JSONFormattable& config) { + ps_uid = config["pubsub"]; + num_pub_shards = config["num_pub_shards"](PS_NUM_PUB_SHARDS_DEFAULT); + if (num_pub_shards < PS_NUM_PUB_SHARDS_MIN) { + num_pub_shards = PS_NUM_PUB_SHARDS_MIN; + } + + num_topic_shards = config["num_topic_shards"](PS_NUM_TOPIC_SHARDS_DEFAULT); + if (num_topic_shards < PS_NUM_TOPIC_SHARDS_MIN) { + num_topic_shards = PS_NUM_TOPIC_SHARDS_MIN; + } + /* FIXME: this will be dynamically configured */ + for (auto& c : config["notifications"].array()) { + PSNotificationConfig nc; + nc.id = ++max_id; + nc.init(cct, c); + notifications.insert(std::make_pair(nc.path, nc)); + + PSTopicConfig topic_config = { .name = nc.topic }; + topics[nc.topic] = topic_config; + } + for (auto& c : config["subscriptions"].array()) { + PSSubConfig sc; + sc.init(cct, c); + subscriptions.push_back(sc); + } + + ldout(cct, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl; + } + + void init_instance(RGWRealm& realm, uint64_t instance_id) { + sync_instance = instance_id; + } + + void get_notifs(const RGWBucketInfo& bucket_info, const rgw_obj_key& key, vector *notifs) { + string path = bucket_info.bucket.name + "/" + key.name; + + notifs->clear(); + + auto iter = notifications.upper_bound(path); + if (iter == notifications.begin()) { + return; + } + + do { + --iter; + if (iter->first.size() > path.size()) { + break; + } + if (path.compare(0, iter->first.size(), iter->first) != 0) { + break; + } + + PSNotificationConfig *target = &iter->second; + + if (!target->is_prefix && + path.size() != iter->first.size()) { + continue; + } + + notifs->push_back(target); + } while (iter != notifications.begin()); + } +}; + +using PSConfigRef = std::shared_ptr; + +class RGWPSInitConfigCBCR : public RGWCoroutine { + RGWDataSyncEnv *sync_env; + PSConfigRef conf; + + rgw_user_create_params create_user; +public: + RGWPSInitConfigCBCR(RGWDataSyncEnv *_sync_env, + PSConfigRef _conf) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), + conf(_conf) {} + int operate() override { + reenter(this) { + ldout(sync_env->cct, 0) << ": init pubsub config zone=" << sync_env->source_zone << dendl; + + /* nothing to do here right now */ + create_user.uid = conf->ps_uid; + create_user.max_buckets = 0; /* unlimited */ + create_user.display_name = "pubsub"; + create_user.generate_key = false; + yield call(new RGWUserCreateCR(sync_env->async_rados, sync_env->store, create_user)); + if (retcode < 0) { + ldout(sync_env->store->ctx(), 0) << "ERROR: failed to create rgw user: ret=" << retcode << dendl; + return set_cr_error(retcode); + } + + return set_cr_done(); + } + return 0; + } +}; + +class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { + PSConfigRef conf; + uint64_t versioned_epoch; + vector notifs; + vector::iterator niter; +public: + RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env, + RGWBucketInfo& _bucket_info, rgw_obj_key& _key, + PSConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf), + versioned_epoch(_versioned_epoch) { +#warning this will need to change obviously + conf->get_notifs(_bucket_info, _key, ¬ifs); + } + int operate() override { + reenter(this) { + ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone + << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime + << " attrs=" << attrs << dendl; + + + for (niter = notifs.begin(); niter != notifs.end(); ++niter) { + yield { + ldout(sync_env->cct, 10) << ": notification for " << bucket_info.bucket << "/" << key << ": id=" << (*niter)->id << " path=" << (*niter)->path << ", topic=" << (*niter)->topic << dendl; + +#warning publish notification +#if 0 + string path = conf->get_obj_path(bucket_info, key); + es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs, versioned_epoch); + + call(new RGWPutRESTResourceCR(sync_env->cct, conf->conn.get(), + sync_env->http_manager, + path, nullptr /* params */, + doc, nullptr /* result */)); +#endif + } + if (retcode < 0) { + return set_cr_error(retcode); + } + } + return set_cr_done(); + } + return 0; + } +}; + +class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR { + PSConfigRef conf; + uint64_t versioned_epoch; +public: + RGWPSHandleRemoteObjCR(RGWDataSyncEnv *_sync_env, + RGWBucketInfo& _bucket_info, rgw_obj_key& _key, + PSConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key), + conf(_conf), versioned_epoch(_versioned_epoch) { + } + + ~RGWPSHandleRemoteObjCR() override {} + + RGWStatRemoteObjCBCR *allocate_callback() override { +#warning things need to change + /* FIXME: we need to create a pre_callback coroutine that decides whether object should + * actually be handled. Otherwise we fetch info from remote zone about every object, even + * if we don't intend to handle it. + */ + return new RGWPSHandleRemoteObjCBCR(sync_env, bucket_info, key, conf, versioned_epoch); + } +}; + +class RGWPSRemoveRemoteObjCBCR : public RGWCoroutine { + RGWDataSyncEnv *sync_env; + RGWBucketInfo bucket_info; + rgw_obj_key key; + ceph::real_time mtime; + PSConfigRef conf; +public: + RGWPSRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env, + RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime, + PSConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + bucket_info(_bucket_info), key(_key), + mtime(_mtime), conf(_conf) {} + int operate() override { + reenter(this) { + ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone + << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl; + yield { +#if 0 + string path = conf->get_obj_path(bucket_info, key); + + call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(), + sync_env->http_manager, + path, nullptr /* params */)); +#endif + } + if (retcode < 0) { + return set_cr_error(retcode); + } + return set_cr_done(); + } + return 0; + } + +}; + +class RGWPSDataSyncModule : public RGWDataSyncModule { + PSConfigRef conf; +public: + RGWPSDataSyncModule(CephContext *cct, const JSONFormattable& config) : conf(std::make_shared()) { + conf->init(cct, config); + } + ~RGWPSDataSyncModule() override {} + + void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override { + conf->init_instance(sync_env->store->get_realm(), instance_id); + } + + RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override { + ldout(sync_env->cct, 5) << conf->id << ": init" << dendl; + return new RGWPSInitConfigCBCR(sync_env, conf); + } + RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { + ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl; +#warning this should be done correctly +#if 0 + if (!conf->should_handle_operation(bucket_info)) { + ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; + return nullptr; + } +#endif + return new RGWPSHandleRemoteObjCR(sync_env, bucket_info, key, conf, versioned_epoch); + } + RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { + /* versioned and versioned epoch params are useless in the elasticsearch backend case */ + ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; +#warning this should be done correctly +#if 0 + if (!conf->should_handle_operation(bucket_info)) { + ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; + return nullptr; + } +#endif + return new RGWPSRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf); + } + RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, + rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { + ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime + << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; +#warning requests should be filtered correctly +#if 0 + ldout(sync_env->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl; +#endif +#warning delete markers need to be handled too + return NULL; + } +}; + +RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config) +{ + data_handler = std::unique_ptr(new RGWPSDataSyncModule(cct, config)); +} + +RGWDataSyncModule *RGWPSSyncModuleInstance::get_data_handler() +{ + return data_handler.get(); +} + +RGWRESTMgr *RGWPSSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) { +#warning REST filter implementation missing +#if 0 + if (dialect != RGW_REST_S3) { + return orig; + } + delete orig; + return new RGWRESTMgr_MDSearch_S3(); +#endif + return orig; +} + +int RGWPSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) { + instance->reset(new RGWPSSyncModuleInstance(cct, config)); + return 0; +} + diff --git a/src/rgw/rgw_sync_module_pubsub.h b/src/rgw/rgw_sync_module_pubsub.h new file mode 100644 index 0000000000000..a7e91cfe28d87 --- /dev/null +++ b/src/rgw/rgw_sync_module_pubsub.h @@ -0,0 +1,26 @@ +#ifndef CEPH_RGW_SYNC_MODULE_PUBSUB_H +#define CEPH_RGW_SYNC_MODULE_PUBSUB_H + +#include "rgw_sync_module.h" + +class RGWPSSyncModule : public RGWSyncModule { +public: + RGWPSSyncModule() {} + bool supports_data_export() override { + return false; + } + int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; +}; + +class RGWPSDataSyncModule; +class RGWRESTConn; + +class RGWPSSyncModuleInstance : public RGWSyncModuleInstance { + std::unique_ptr data_handler; +public: + RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config); + RGWDataSyncModule *get_data_handler() override; + RGWRESTMgr *get_rest_filter(int dialect, RGWRESTMgr *orig) override; +}; + +#endif