]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: pubsub config fixes
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 3 Jul 2018 22:37:41 +0000 (15:37 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 11 Dec 2018 08:10:42 +0000 (00:10 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_sync_module_pubsub.cc

index 16ef1b33ec72d90b97e4faefa568b45651c6bf10..1d7bb6dd1272b8a33e688cbf686864a8d13c8eca 100644 (file)
@@ -165,33 +165,23 @@ int RGWUserPubSub::create_topic(const string& name, const rgw_bucket& bucket)
     return ret;
   }
 
-  rgw_pubsub_user_topic_info new_topic;
+  rgw_pubsub_user_topic_info& new_topic = topics.topics[name];
   new_topic.user = user;
   new_topic.topic.name = name;
   new_topic.topic.bucket = bucket;
 
-  topics.topics[name] = new_topic;
-
   ret = write(obj, topics, &objv_tracker);
   if (ret < 0) {
     ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
     return ret;
   }
 
-  rgw_pubsub_user_topics bucket_topics;
-  for (auto& t : topics.topics) {
-    if (t.second.topic.bucket == bucket) {
-      bucket_topics.topics.insert(t);
-    }
-  }
-
-  rgw_raw_obj bobj;
-  get_bucket_meta_obj(bucket, &bobj);
-  ret = write(obj, bucket_topics, nullptr);
+  ret = update_bucket(topics, bucket);
   if (ret < 0) {
-    ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
+    ldout(store->ctx(), 0) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
     return ret;
   }
+
   return 0;
 }
 
@@ -227,6 +217,17 @@ int RGWUserPubSub::remove_topic(const string& name)
     return 0;
   }
 
+  ret = update_bucket(topics, bucket);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
+    return ret;
+  }
+
+  return 0;
+}
+
+int RGWUserPubSub::update_bucket(const rgw_pubsub_user_topics& topics, const rgw_bucket& bucket)
+{
   rgw_pubsub_user_topics bucket_topics;
   for (auto& t : topics.topics) {
     if (t.second.topic.bucket == bucket) {
@@ -236,7 +237,7 @@ int RGWUserPubSub::remove_topic(const string& name)
 
   rgw_raw_obj bobj;
   get_bucket_meta_obj(bucket, &bobj);
-  ret = write(obj, bucket_topics, nullptr);
+  int ret = write(bobj, bucket_topics, nullptr);
   if (ret < 0) {
     ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
     return ret;
@@ -293,9 +294,15 @@ int RGWUserPubSub::add_sub(const string& name, const string& topic, const rgw_pu
     return ret;
   }
 
-  rgw_raw_obj bobj;
-  get_sub_meta_obj(name, &bobj);
-  ret = write(bobj, sub_conf, nullptr);
+  ret = update_bucket(topics, t.topic.bucket);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
+    return ret;
+  }
+
+  rgw_raw_obj sobj;
+  get_sub_meta_obj(name, &sobj);
+  ret = write(sobj, sub_conf, nullptr);
   if (ret < 0) {
     ldout(store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
     return ret;
index 00f15ce3fb64ffc97899a351957aa4bc88da70a0..52cf5e870df2253c13f66d5bb26a9f20dd2d8a3d 100644 (file)
@@ -199,6 +199,8 @@ class RGWUserPubSub
   int write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *obj_tracker);
 
   int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker);
+
+  int update_bucket(const rgw_pubsub_user_topics& topics, const rgw_bucket& bucket);
 public:
   RGWUserPubSub(RGWRados *_store, const rgw_user& _user) : store(_store),
                                                            user(_user),
index 71c0ac2a90bacc9eef658bcbf9d771d9927dd154..184a61be8f16460cc5aeac00df533caaa5f0234f 100644 (file)
@@ -586,6 +586,7 @@ public:
                  PSEnvRef _env,
                  rgw_pubsub_user_sub_config& user_sub_conf) : sync_env(_sync_env),
                                       env(_env),
+                                      sub_conf(std::make_shared<PSSubConfig>()),
                                       data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {
     sub_conf->from_user_conf(user_sub_conf);
   }