From a10a95fabd3ea5c4b68632116347b1fa3a705e03 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Sat, 30 Jun 2018 22:07:22 -0700 Subject: [PATCH] rgw: pubsub fixes Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_sync_module_pubsub.cc | 97 +++++++++++++++++++++++++------ 1 file changed, 80 insertions(+), 17 deletions(-) diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index 75a2e667af1..0eec2d94729 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -112,15 +112,27 @@ using PSSubConfigRef = std::shared_ptr; struct PSTopicConfig { string name; set subs; + + void dump(Formatter *f) const { + encode_json("name", name, f); + encode_json("subs", subs, f); + } }; struct PSNotificationConfig { + uint64_t id{0}; string path; /* a path or a path prefix that would trigger the event (prefix: if ends with a wildcard) */ string topic; - - uint64_t id{0}; bool is_prefix{false}; + + void dump(Formatter *f) const { + encode_json("id", id, f); + encode_json("path", path, f); + encode_json("topic", topic, f); + encode_json("is_prefix", is_prefix, f); + } + void init(CephContext *cct, const JSONFormattable& config) { path = config["path"]; if (!path.empty() && path[path.size() - 1] == '*') { @@ -152,12 +164,49 @@ struct PSConfig { uint64_t sync_instance{0}; uint64_t max_id{0}; - /* FIXME: no hard coded buckets, we'll have configurable topics */ map subs; map topics; multimap notifications; + void dump(Formatter *f) const { + encode_json("id", id, f); + encode_json("user", user, f); + encode_json("data_bucket_prefix", data_bucket_prefix, f); + encode_json("sync_instance", sync_instance, f); + encode_json("max_id", max_id, f); + { + Formatter::ArraySection section(*f, "subs"); + for (auto& sub : subs) { + encode_json("sub", *sub.second, f); + } + } + { + Formatter::ArraySection section(*f, "topics"); + for (auto& topic : topics) { + encode_json("topic", topic.second, f); + } + } + { + Formatter::ObjectSection section(*f, "notifications"); + string last; + for (auto& notif : notifications) { + const string& n = notif.first; + if (n != last) { + if (!last.empty()) { + f->close_section(); + } + f->open_array_section(n.c_str()); + } + last = n; + encode_json("notifications", notif.second, f); + } + if (!last.empty()) { + f->close_section(); + } + } + } + void init(CephContext *cct, const JSONFormattable& config) { string uid = config["uid"]("pubsub"); user = rgw_user(config["tenant"], uid); @@ -286,18 +335,16 @@ public: } void format(bufferlist *bl) { - stringstream ss; - JSONFormatter f; - - encode_json("event", *event, &f); - f.flush(ss); - - bl->append(ss.str()); + bl->append(json_str("event", *event)); } }; -class PSSubscription : public RefCountedObject { +class PSSubscription; +using PSSubscriptionRef = std::shared_ptr; + +class PSSubscription { + PSSubscriptionRef self; RGWDataSyncEnv *sync_env; PSEnvRef env; PSSubConfigRef sub_conf; @@ -406,35 +453,48 @@ ldout(sync_env->cct, 20) << "pubsub: bucket create: after user info: " << json_s yield call(new RGWObjectSimplePutCR(sync_env->async_rados, sync_env->store, put_obj)); + if (retcode < 0) { + ldout(sync_env->cct, 0) << "ERROR: failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl; + return set_cr_error(retcode); + } + + return set_cr_done(); } return 0; } }; -public: PSSubscription(RGWDataSyncEnv *_sync_env, PSEnvRef _env, PSSubConfigRef& _sub_conf) : sync_env(_sync_env), env(_env), sub_conf(_sub_conf), data_access(std::make_shared(sync_env->store)) {} +public: + static PSSubscriptionRef& get_shared(RGWDataSyncEnv *_sync_env, + PSEnvRef _env, + PSSubConfigRef& _sub_conf) { + auto sub = new PSSubscription(_sync_env, _env, _sub_conf); + sub->self = std::shared_ptr(sub); + return sub->self; + } RGWCoroutine *init_cr() { - return new InitCR(sync_env, this); + return new InitCR(sync_env, self); } RGWCoroutine *store_event_cr(EventRef& event) { - return new StoreEventCR(sync_env, this, event); + return new StoreEventCR(sync_env, self, event); } friend class InitCR; }; -using PSSubscriptionRef = std::shared_ptr; - -class PSManager : public RefCountedObject +class PSManager { + PSManagerRef self; + RGWDataSyncEnv *sync_env; PSEnvRef env; @@ -555,6 +615,9 @@ public: return set_cr_error(retcode); } + ldout(sync_env->cct, 20) << "pubsub: get user info cr returned: " << json_str("obj", *env->data_user_info, true) << dendl; + + return set_cr_done(); } return 0; -- 2.39.5