struct PSTopicConfig {
string name;
set<string> subs;
+
+ void dump(Formatter *f) const {
+ encode_json("name", name, f);
+ encode_json("subs", subs, f);
+ }
};
struct PSNotificationConfig {
+ uint64_t id{0};
string path; /* a path or a path prefix that would trigger the event (prefix: if ends with a wildcard) */
string topic;
-
- uint64_t id{0};
bool is_prefix{false};
+
+ void dump(Formatter *f) const {
+ encode_json("id", id, f);
+ encode_json("path", path, f);
+ encode_json("topic", topic, f);
+ encode_json("is_prefix", is_prefix, f);
+ }
+
void init(CephContext *cct, const JSONFormattable& config) {
path = config["path"];
if (!path.empty() && path[path.size() - 1] == '*') {
uint64_t sync_instance{0};
uint64_t max_id{0};
-
/* FIXME: no hard coded buckets, we'll have configurable topics */
map<string, PSSubConfigRef> subs;
map<string, PSTopicConfig> topics;
multimap<string, PSNotificationConfig> notifications;
+ void dump(Formatter *f) const {
+ encode_json("id", id, f);
+ encode_json("user", user, f);
+ encode_json("data_bucket_prefix", data_bucket_prefix, f);
+ encode_json("sync_instance", sync_instance, f);
+ encode_json("max_id", max_id, f);
+ {
+ Formatter::ArraySection section(*f, "subs");
+ for (auto& sub : subs) {
+ encode_json("sub", *sub.second, f);
+ }
+ }
+ {
+ Formatter::ArraySection section(*f, "topics");
+ for (auto& topic : topics) {
+ encode_json("topic", topic.second, f);
+ }
+ }
+ {
+ Formatter::ObjectSection section(*f, "notifications");
+ string last;
+ for (auto& notif : notifications) {
+ const string& n = notif.first;
+ if (n != last) {
+ if (!last.empty()) {
+ f->close_section();
+ }
+ f->open_array_section(n.c_str());
+ }
+ last = n;
+ encode_json("notifications", notif.second, f);
+ }
+ if (!last.empty()) {
+ f->close_section();
+ }
+ }
+ }
+
void init(CephContext *cct, const JSONFormattable& config) {
string uid = config["uid"]("pubsub");
user = rgw_user(config["tenant"], uid);
}
void format(bufferlist *bl) {
- stringstream ss;
- JSONFormatter f;
-
- encode_json("event", *event, &f);
- f.flush(ss);
-
- bl->append(ss.str());
+ bl->append(json_str("event", *event));
}
};
-class PSSubscription : public RefCountedObject {
+class PSSubscription;
+using PSSubscriptionRef = std::shared_ptr<PSSubscription>;
+
+class PSSubscription {
+ PSSubscriptionRef self;
RGWDataSyncEnv *sync_env;
PSEnvRef env;
PSSubConfigRef sub_conf;
yield call(new RGWObjectSimplePutCR(sync_env->async_rados,
sync_env->store,
put_obj));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+
+ return set_cr_done();
}
return 0;
}
};
-public:
PSSubscription(RGWDataSyncEnv *_sync_env,
PSEnvRef _env,
PSSubConfigRef& _sub_conf) : sync_env(_sync_env),
env(_env),
sub_conf(_sub_conf),
data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {}
+public:
+ static PSSubscriptionRef& get_shared(RGWDataSyncEnv *_sync_env,
+ PSEnvRef _env,
+ PSSubConfigRef& _sub_conf) {
+ auto sub = new PSSubscription(_sync_env, _env, _sub_conf);
+ sub->self = std::shared_ptr<PSSubscription>(sub);
+ return sub->self;
+ }
RGWCoroutine *init_cr() {
- return new InitCR(sync_env, this);
+ return new InitCR(sync_env, self);
}
RGWCoroutine *store_event_cr(EventRef& event) {
- return new StoreEventCR(sync_env, this, event);
+ return new StoreEventCR(sync_env, self, event);
}
friend class InitCR;
};
-using PSSubscriptionRef = std::shared_ptr<PSSubscription>;
-
-class PSManager : public RefCountedObject
+class PSManager
{
+ PSManagerRef self;
+
RGWDataSyncEnv *sync_env;
PSEnvRef env;
return set_cr_error(retcode);
}
+ ldout(sync_env->cct, 20) << "pubsub: get user info cr returned: " << json_str("obj", *env->data_user_info, true) << dendl;
+
+
return set_cr_done();
}
return 0;