From 702f7f42b4d9f3f046757a6fc81248d37282f650 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 28 Jun 2018 21:10:40 -0700 Subject: [PATCH] rgw: pubsub: subscription init and more Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_cr_rados.h | 3 +- src/rgw/rgw_cr_tools.cc | 17 +- src/rgw/rgw_cr_tools.h | 22 ++- src/rgw/rgw_sync_module_pubsub.cc | 310 +++++++++++++++++++++++++++--- 4 files changed, 307 insertions(+), 45 deletions(-) diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index 85796199892..516cf8d6044 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -204,7 +204,8 @@ class RGWSimpleAsyncCR : public RGWSimpleCoroutine { req = new Request(this, stack->create_completion_notifier(), store, - params); + params, + result); async_rados->queue(req); return 0; diff --git a/src/rgw/rgw_cr_tools.cc b/src/rgw/rgw_cr_tools.cc index 6176cc4df55..d7dbc6d7bb9 100644 --- a/src/rgw/rgw_cr_tools.cc +++ b/src/rgw/rgw_cr_tools.cc @@ -16,11 +16,9 @@ int RGWUserCreateCR::Request::_send_request() RGWUserAdminOpState op_state; - rgw_user uid(params.uid); + auto& user = params.user; - uid.tenant = params.tenant_name; - - op_state.set_user_id(uid); + op_state.set_user_id(user); op_state.set_display_name(params.display_name); op_state.set_user_email(params.email); op_state.set_caps(params.caps); @@ -86,8 +84,15 @@ int RGWUserCreateCR::Request::_send_request() template<> int RGWGetUserInfoCR::Request::_send_request() { - rgw_user user(params.tenant, params.uid); - return rgw_get_user_info_by_uid(store, user, result->user_info); + return rgw_get_user_info_by_uid(store, params.user, *result); +} + +template<> +int RGWGetBucketInfoCR::Request::_send_request() +{ + RGWObjectCtx obj_ctx(store); + return store->get_bucket_info(obj_ctx, params.tenant, params.bucket_name, + result->bucket_info, &result->mtime, &result->attrs); } template<> diff --git a/src/rgw/rgw_cr_tools.h b/src/rgw/rgw_cr_tools.h index 8ac66263628..ab69d4c0e04 100644 --- a/src/rgw/rgw_cr_tools.h +++ b/src/rgw/rgw_cr_tools.h @@ -5,14 +5,13 @@ struct rgw_user_create_params { - std::string uid; + rgw_user user; std::string display_name; std::string email; std::string access_key; std::string secret_key; std::string key_type; /* "swift" or "s3" */ std::string caps; - std::string tenant_name; bool generate_key{true}; bool suspended{false}; @@ -24,17 +23,24 @@ struct rgw_user_create_params { using RGWUserCreateCR = RGWSimpleWriteOnlyAsyncCR; - struct rgw_get_user_info_params { - std::string uid; - std::string tenant; + rgw_user user; +}; + +using RGWGetUserInfoCR = RGWSimpleAsyncCR; + +struct rgw_get_bucket_info_params { + string tenant; + string bucket_name; }; -struct rgw_get_user_info_result { - RGWUserInfo user_info; +struct rgw_get_bucket_info_result { + ceph::real_time mtime; + RGWBucketInfo bucket_info; + map attrs; }; -using RGWGetUserInfoCR = RGWSimpleAsyncCR; +using RGWGetBucketInfoCR = RGWSimpleAsyncCR; struct rgw_bucket_create_local_params { shared_ptr user_info; diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index 49cd9da9385..abcf83a894c 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -52,15 +52,33 @@ struct PSSubConfig { /* subscription config */ string topic; string push_endpoint; - void init(CephContext *cct, const JSONFormattable& config) { + string data_bucket_name; + string data_prefix; + + void dump(Formatter *f) const { + encode_json("name", name, f); + encode_json("topic", topic, f); + encode_json("push_endpoint", push_endpoint, f); + encode_json("data_bucket_name", data_bucket_name, f); + encode_json("data_oid_prefix", data_oid_prefix, f); + } + + void init(CephContext *cct, const JSONFormattable& config, + const string& data_bucket_prefix) { name = config["name"]; topic = config["topic"]; push_endpoint = config["push_endpoint"]; + string default_bucket_name = data_prefix + name; + data_bucket_name = config["data_bucket"](default_bucket_name.c_str()); + data_prefix = config["data_prefix"]; } }; +using PSSubConfigRef = std::shared_ptr; + struct PSTopicConfig { string name; + set subs; }; struct PSNotificationConfig { @@ -95,19 +113,25 @@ static string json_str(const char *name, const T& obj, bool pretty = false) struct PSConfig { string id{"pubsub"}; - string ps_uid{"pubsub"}; + rgw_user user; + string data_bucket_prefix; + uint64_t sync_instance{0}; uint32_t num_pub_shards{0}; uint32_t num_topic_shards{0}; uint64_t max_id{0}; + /* FIXME: no hard coded buckets, we'll have configurable topics */ - vector subscriptions; + map subs; map topics; multimap notifications; void init(CephContext *cct, const JSONFormattable& config) { - ps_uid = config["pubsub"]; + string uid = config["uid"]("pubsub"); + user = rgw_user(config["tenant"], uid); + data_bucket_prefix = config["data_bucket_prefix"]("pubsub"); + num_pub_shards = config["num_pub_shards"](PS_NUM_PUB_SHARDS_DEFAULT); if (num_pub_shards < PS_NUM_PUB_SHARDS_MIN) { num_pub_shards = PS_NUM_PUB_SHARDS_MIN; @@ -128,9 +152,10 @@ struct PSConfig { topics[nc.topic] = topic_config; } for (auto& c : config["subscriptions"].array()) { - PSSubConfig sc; - sc.init(cct, c); - subscriptions.push_back(sc); + auto sc = std::make_shared(); + sc->init(cct, c, data_bucket_prefix); + subs[sc->name] = sc; + topics[sc->topic].subs.insert(sc->name); } ldout(cct, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl; @@ -169,26 +194,242 @@ struct PSConfig { notifs->push_back(target); } while (iter != notifications.begin()); } + + bool find_sub(const string& name, PSSubConfigRef *ref) { + auto iter = subs.find(name); + if (iter != subs.end()) { + *ref = iter->second; + return true; + } + return false; + } }; using PSConfigRef = std::shared_ptr; +using EventRef = std::shared_ptr; -class RGWPSInitConfigCBCR : public RGWCoroutine { - RGWDataSyncEnv *sync_env; +class PSManager; +using PSManagerRef = std::shared_ptr; + +struct PSEnv { PSConfigRef conf; + shared_ptr data_user_info; + PSManagerRef manager; + + PSEnv() : conf(make_shared()), + data_user_info(make_shared()) {} + + void init(CephContext *cct, const JSONFormattable& config) { + conf->init(cct, config); + } + + void init_instance(RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr); +}; + +using PSEnvRef = std::shared_ptr; + +class PSSubscription : public RefCountedObject { + RGWDataSyncEnv *sync_env; + PSEnvRef env; + PSSubConfigRef sub_conf; + shared_ptr get_bucket_info_result; + RGWBucketInfo *bucket_info{nullptr}; + +public: + + class InitCR : public RGWCoroutine { + RGWDataSyncEnv *sync_env; + PSSubscriptionRef sub; + rgw_get_bucket_info_params get_bucket_info; + rgw_bucket_create_local_params create_bucket; + PSConfigRef& conf; + PSSubConfigRef& sub_conf; + int i; + public: + InitCR(RGWDataSyncEnv *_sync_env, + PSSubscriptionRef& _sub) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), + sub(_sub), conf(sub->env->conf), + sub_conf(sub->sub_conf) { + } + + int operate() override { + reenter(this) { + get_bucket_info.tenant = conf->user.tenant; + get_bucket_info.bucket_name = sub_conf->data_bucket_name; + sub->get_bucket_info_result = make_shared(); + + for (i = 0; i < 2; ++i) { + yield call(new RGWGetBucketInfoCR(sync_env->async_rados, + sync_env->store, + get_bucket_info, + sub->get_bucket_info_result)); + if (retcode < 0 && retcode != -ENOENT) { + ldout(sync_env->cct, 0) << "ERROR: failed to geting bucket info: " << "tenant=" + << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl; + } + if (retcode == 0) { + sub->bucket_info = &sub->get_bucket_info_result->bucket_info; + return set_cr_done(); + } + + create_bucket.user_info = sub->env->data_user_info; + create_bucket.bucket_name = sub_conf->data_bucket_name; + ldout(sync_env->cct, 20) << "pubsub: bucket create: using user info: " << json_str("obj", *sub->env->data_user_info, true) << dendl; + yield call(new RGWBucketCreateLocalCR(sync_env->async_rados, + sync_env->store, + create_bucket)); + if (retcode < 0) { + ldout(sync_env->cct, 0) << "ERROR: failed to create bucket: " << "tenant=" + << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl; + return set_cr_error(retcode); + } +ldout(sync_env->cct, 20) << "pubsub: bucket create: after user info: " << json_str("obj", *sub->env->data_user_info, true) << dendl; + + /* second iteration: we got -ENOENT and created a bucket */ + } + + /* failed twice on -ENOENT, unexpected */ + ldout(sync_env->cct, 0) << "ERROR: failed to create bucket " << "tenant=" << get_bucket_info.tenant + << " name=" << get_bucket_info.bucket_name << dendl; + return set_cr_error(-EIO); + } + return 0; + } + }; + + +public: + PSSubscription(RGWDataSyncEnv *_sync_env, + PSEnvRef _env, + PSSubConfigRef& _sub_conf) : sync_env(_sync_env), + env(_env), + sub_conf(_sub_conf) {} + + RGWCoroutine *init_cr() { + return new InitCR(sync_env, this); + } + + RGWCoroutine *store_event_cr(EventRef event); + + friend class InitCR; +}; + +using PSSubscriptionRef = std::shared_ptr; + + +RGWCoroutine *PSSubscription::store_event_cr(EventRef event) +{ + /* FIXME */ +} + + +class PSManager : public RefCountedObject +{ + RGWDataSyncEnv *sync_env; + PSEnvRef env; + + map subs; + + class GetSubCR : public RGWCoroutine { + RGWDataSyncEnv *sync_env; + PSManagerRef mgr; + const string& sub_name; + PSSubscriptionRef *ref; + + PSConfigRef& conf; + + PSSubConfigRef sub_conf; + public: + GetSubCR(RGWDataSyncEnv *_sync_env, + PSManagerRef& _mgr, + const string& _sub_name, + PSSubscriptionRef *_ref) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), + mgr(_mgr), + sub_name(_sub_name), + ref(_ref), + conf(mgr->env->conf) { + } + + int operate() override { + reenter(this) { + if (!conf->find_sub(sub_name, &sub_conf)) { + ldout(sync_env->cct, 0) << "ERROR: could not find subscription config: name=" << sub_name << dendl; + return set_cr_error(-ENOENT); + } + + *ref = PSSubscription::get_shared(sync_env, mgr->env, sub_conf); + + yield call((*ref)->init_cr()); + if (retcode < 0) { + ldout(sync_env->cct, 0) << "ERROR: failed to init subscription" << dendl; + ref->reset(); + return set_cr_error(retcode); + } + + mgr->subs[sub_name] = *ref; + return set_cr_done(); + } + return 0; + } + }; + + bool find_sub_instance(const string& sub_name, PSSubscriptionRef *sub) { + auto iter = subs.find(sub_name); + if (iter != subs.end()) { + *sub = iter->second; + return true; + } + return false; + } + + PSManager(RGWDataSyncEnv *_sync_env, + PSEnvRef _env) : sync_env(_sync_env), + env(_env) {} + +public: + static PSManagerRef& get_shared(RGWDataSyncEnv *_sync_env, + PSEnvRef _env) { + auto mgr = new PSManager(_sync_env, _env); + mgr->self = std::shared_ptr(mgr); + return mgr->self; + } + + RGWCoroutine *get_subscription_cr(const string& sub_name, PSSubscriptionRef *ref) { + if (find_sub_instance(sub_name, ref)) { + /* found it! nothing to execute */ + return nullptr; + } + return new GetSubCR(sync_env, self, sub_name, ref); + } + + friend class GetSubCR; +}; + +void PSEnv::init_instance(RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr) { + manager = mgr; + conf->init_instance(realm, instance_id); +} + +class RGWPSInitEnvCBCR : public RGWCoroutine { + RGWDataSyncEnv *sync_env; + PSEnvRef env; + PSConfigRef& conf; rgw_user_create_params create_user; + rgw_get_user_info_params get_user_info; public: - RGWPSInitConfigCBCR(RGWDataSyncEnv *_sync_env, - PSConfigRef _conf) : RGWCoroutine(_sync_env->cct), + RGWPSInitEnvCBCR(RGWDataSyncEnv *_sync_env, + PSEnvRef _env) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), - conf(_conf) {} + env(_env), conf(env->conf) {} int operate() override { reenter(this) { ldout(sync_env->cct, 0) << ": init pubsub config zone=" << sync_env->source_zone << dendl; /* nothing to do here right now */ - create_user.uid = conf->ps_uid; + create_user.user = conf->user; create_user.max_buckets = 0; /* unlimited */ create_user.display_name = "pubsub"; create_user.generate_key = false; @@ -198,6 +439,13 @@ public: return set_cr_error(retcode); } + get_user_info.user = conf->user; + yield call(new RGWGetUserInfoCR(sync_env->async_rados, sync_env->store, get_user_info, env->data_user_info)); + if (retcode < 0) { + ldout(sync_env->store->ctx(), 0) << "ERROR: failed to create rgw user: ret=" << retcode << dendl; + return set_cr_error(retcode); + } + return set_cr_done(); } return 0; @@ -205,17 +453,17 @@ public: }; class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { - PSConfigRef conf; + PSEnvRef env; uint64_t versioned_epoch; vector notifs; vector::iterator niter; public: RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env, RGWBucketInfo& _bucket_info, rgw_obj_key& _key, - PSConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf), + PSEnvRef _env, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), env(_env), versioned_epoch(_versioned_epoch) { #warning this will need to change obviously - conf->get_notifs(_bucket_info, _key, ¬ifs); + env->conf->get_notifs(_bucket_info, _key, ¬ifs); } int operate() override { reenter(this) { @@ -250,13 +498,13 @@ public: }; class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR { - PSConfigRef conf; + PSEnvRef env; uint64_t versioned_epoch; public: RGWPSHandleRemoteObjCR(RGWDataSyncEnv *_sync_env, RGWBucketInfo& _bucket_info, rgw_obj_key& _key, - PSConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key), - conf(_conf), versioned_epoch(_versioned_epoch) { + PSEnvRef _env, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key), + env(_env), versioned_epoch(_versioned_epoch) { } ~RGWPSHandleRemoteObjCR() override {} @@ -267,7 +515,7 @@ public: * actually be handled. Otherwise we fetch info from remote zone about every object, even * if we don't intend to handle it. */ - return new RGWPSHandleRemoteObjCBCR(sync_env, bucket_info, key, conf, versioned_epoch); + return new RGWPSHandleRemoteObjCBCR(sync_env, bucket_info, key, env, versioned_epoch); } }; @@ -276,13 +524,13 @@ class RGWPSRemoveRemoteObjCBCR : public RGWCoroutine { RGWBucketInfo bucket_info; rgw_obj_key key; ceph::real_time mtime; - PSConfigRef conf; + PSEnvRef env; public: RGWPSRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env, RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime, - PSConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + PSEnvRef _env) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bucket_info(_bucket_info), key(_key), - mtime(_mtime), conf(_conf) {} + mtime(_mtime), env(_env) {} int operate() override { reenter(this) { ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone @@ -307,20 +555,22 @@ public: }; class RGWPSDataSyncModule : public RGWDataSyncModule { - PSConfigRef conf; + PSEnvRef env; + PSConfigRef& conf; public: - RGWPSDataSyncModule(CephContext *cct, const JSONFormattable& config) : conf(std::make_shared()) { - conf->init(cct, config); + RGWPSDataSyncModule(CephContext *cct, const JSONFormattable& config) : env(std::make_shared()), conf(env->conf) { + env->init(cct, config); } ~RGWPSDataSyncModule() override {} void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override { - conf->init_instance(sync_env->store->get_realm(), instance_id); + PSManagerRef mgr = PSManager::get_shared(sync_env, env); + env->init_instance(sync_env->store->get_realm(), instance_id, mgr); } RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override { ldout(sync_env->cct, 5) << conf->id << ": init" << dendl; - return new RGWPSInitConfigCBCR(sync_env, conf); + return new RGWPSInitEnvCBCR(sync_env, env); } RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl; @@ -331,7 +581,7 @@ public: return nullptr; } #endif - return new RGWPSHandleRemoteObjCR(sync_env, bucket_info, key, conf, versioned_epoch); + return new RGWPSHandleRemoteObjCR(sync_env, bucket_info, key, env, versioned_epoch); } RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { /* versioned and versioned epoch params are useless in the elasticsearch backend case */ @@ -343,7 +593,7 @@ public: return nullptr; } #endif - return new RGWPSRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf); + return new RGWPSRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, env); } RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { -- 2.39.5