]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: pubsub fixes
authorYehuda Sadeh <yehuda@redhat.com>
Sun, 1 Jul 2018 05:07:22 +0000 (22:07 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 11 Dec 2018 08:10:42 +0000 (00:10 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_sync_module_pubsub.cc

index 75a2e667af169986d03d2197f6e135be3cf7e3e8..0eec2d94729c1170dfdddeb9a814d00c5f487542 100644 (file)
@@ -112,15 +112,27 @@ using  PSSubConfigRef = std::shared_ptr<PSSubConfig>;
 struct PSTopicConfig {
   string name;
   set<string> subs;
+
+  void dump(Formatter *f) const {
+    encode_json("name", name, f);
+    encode_json("subs", subs, f);
+  }
 };
 
 struct PSNotificationConfig {
+  uint64_t id{0};
   string path; /* a path or a path prefix that would trigger the event (prefix: if ends with a wildcard) */
   string topic;
-
-  uint64_t id{0};
   bool is_prefix{false};
 
+
+  void dump(Formatter *f) const {
+    encode_json("id", id, f);
+    encode_json("path", path, f);
+    encode_json("topic", topic, f);
+    encode_json("is_prefix", is_prefix, f);
+  }
+
   void init(CephContext *cct, const JSONFormattable& config) {
     path = config["path"];
     if (!path.empty() && path[path.size() - 1] == '*') {
@@ -152,12 +164,49 @@ struct PSConfig {
   uint64_t sync_instance{0};
   uint64_t max_id{0};
 
-
   /* FIXME: no hard coded buckets, we'll have configurable topics */
   map<string, PSSubConfigRef> subs;
   map<string, PSTopicConfig> topics;
   multimap<string, PSNotificationConfig> notifications;
 
+  void dump(Formatter *f) const {
+    encode_json("id", id, f);
+    encode_json("user", user, f);
+    encode_json("data_bucket_prefix", data_bucket_prefix, f);
+    encode_json("sync_instance", sync_instance, f);
+    encode_json("max_id", max_id, f);
+    {
+      Formatter::ArraySection section(*f, "subs");
+      for (auto& sub : subs) {
+        encode_json("sub", *sub.second, f);
+      }
+    }
+    {
+      Formatter::ArraySection section(*f, "topics");
+      for (auto& topic : topics) {
+        encode_json("topic", topic.second, f);
+      }
+    }
+    {
+      Formatter::ObjectSection section(*f, "notifications");
+      string last;
+      for (auto& notif : notifications) {
+        const string& n = notif.first;
+        if (n != last) {
+          if (!last.empty()) {
+            f->close_section();
+          }
+          f->open_array_section(n.c_str());
+        }
+        last = n;
+        encode_json("notifications", notif.second, f);
+      }
+      if (!last.empty()) {
+        f->close_section();
+      }
+    }
+  }
+
   void init(CephContext *cct, const JSONFormattable& config) {
     string uid = config["uid"]("pubsub");
     user = rgw_user(config["tenant"], uid);
@@ -286,18 +335,16 @@ public:
   }
 
   void format(bufferlist *bl) {
-    stringstream ss;
-    JSONFormatter f;
-
-    encode_json("event", *event, &f);
-    f.flush(ss);
-
-    bl->append(ss.str());
+    bl->append(json_str("event", *event));
   }
         
 };
 
-class PSSubscription : public RefCountedObject {
+class PSSubscription;
+using PSSubscriptionRef = std::shared_ptr<PSSubscription>;
+
+class PSSubscription {
+  PSSubscriptionRef self;
   RGWDataSyncEnv *sync_env;
   PSEnvRef env;
   PSSubConfigRef sub_conf;
@@ -406,35 +453,48 @@ ldout(sync_env->cct, 20) << "pubsub: bucket create: after user info: " << json_s
         yield call(new RGWObjectSimplePutCR(sync_env->async_rados,
                                             sync_env->store,
                                             put_obj));
+        if (retcode < 0) {
+          ldout(sync_env->cct, 0) << "ERROR: failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl;
+          return set_cr_error(retcode);
+        }
+
+        return set_cr_done();
       }
       return 0;
     }
   };
 
-public:
   PSSubscription(RGWDataSyncEnv *_sync_env,
                  PSEnvRef _env,
                  PSSubConfigRef& _sub_conf) : sync_env(_sync_env),
                                       env(_env),
                                       sub_conf(_sub_conf),
                                       data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {}
+public:
+  static PSSubscriptionRef& get_shared(RGWDataSyncEnv *_sync_env,
+                                PSEnvRef _env,
+                                PSSubConfigRef& _sub_conf) {
+    auto sub = new PSSubscription(_sync_env, _env, _sub_conf);
+    sub->self = std::shared_ptr<PSSubscription>(sub);
+    return sub->self;
+  }
 
   RGWCoroutine *init_cr() {
-    return new InitCR(sync_env, this);
+    return new InitCR(sync_env, self);
   }
 
   RGWCoroutine *store_event_cr(EventRef& event) {
-    return new StoreEventCR(sync_env, this, event);
+    return new StoreEventCR(sync_env, self, event);
   }
 
   friend class InitCR;
 };
 
-using PSSubscriptionRef = std::shared_ptr<PSSubscription>;
-
 
-class PSManager : public RefCountedObject
+class PSManager
 {
+  PSManagerRef self;
+
   RGWDataSyncEnv *sync_env;
   PSEnvRef env;
 
@@ -555,6 +615,9 @@ public:
         return set_cr_error(retcode);
       }
 
+      ldout(sync_env->cct, 20) << "pubsub: get user info cr returned: " << json_str("obj", *env->data_user_info, true) << dendl;
+
+
       return set_cr_done();
     }
     return 0;