]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw-admin: pubsub related new commands
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 3 Jul 2018 03:31:34 +0000 (20:31 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 11 Dec 2018 08:10:42 +0000 (00:10 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_sync_module_pubsub.cc

index 3a681636ebf4e86c6ff16ba4837d5d3d5d0aafae..9286bee96b29d5a2986ea7dc005c87c48fefe9de 100644 (file)
@@ -52,6 +52,7 @@ extern "C" {
 #include "rgw_reshard.h"
 #include "rgw_http_client_curl.h"
 #include "rgw_zone.h"
+#include "rgw_pubsub.h"
 
 #include "services/svc_sync_modules.h"
 
@@ -530,7 +531,13 @@ enum {
   OPT_MFA_CHECK,
   OPT_MFA_RESYNC,
   OPT_RESHARD_STALE_INSTANCES_LIST,
-  OPT_RESHARD_STALE_INSTANCES_DELETE
+  OPT_RESHARD_STALE_INSTANCES_DELETE,
+  OPT_PUBSUB_TOPICS_LIST,
+  OPT_PUBSUB_TOPIC_CREATE,
+  OPT_PUBSUB_TOPIC_RM,
+  OPT_PUBSUB_SUB_GET,
+  OPT_PUBSUB_SUB_CREATE,
+  OPT_PUBSUB_SUB_RM,
 };
 
 static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_cmd, bool *need_more)
@@ -562,13 +569,17 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_
       strcmp(cmd, "placement") == 0 ||
       strcmp(cmd, "pool") == 0 ||
       strcmp(cmd, "pools") == 0 ||
+      strcmp(cmd, "pubsub") == 0 ||
       strcmp(cmd, "quota") == 0 ||
       strcmp(cmd, "realm") == 0 ||
       strcmp(cmd, "role") == 0 ||
       strcmp(cmd, "role-policy") == 0 ||
       strcmp(cmd, "stale-instances") == 0 ||
+      strcmp(cmd, "sub") == 0 ||
       strcmp(cmd, "subuser") == 0 ||
       strcmp(cmd, "sync") == 0 ||
+      strcmp(cmd, "topic") == 0 ||
+      strcmp(cmd, "topics") == 0 ||
       strcmp(cmd, "usage") == 0 ||
       strcmp(cmd, "user") == 0 ||
       strcmp(cmd, "zone") == 0 ||
@@ -992,6 +1003,23 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_
       return OPT_RESHARD_STALE_INSTANCES_LIST;
     if (match_str(cmd, "rm", "delete"))
       return OPT_RESHARD_STALE_INSTANCES_DELETE;
+  } else if (prev_prev_cmd && strcmp(prev_prev_cmd, "pubsub") == 0) {
+    if (strcmp(prev_cmd, "topics") == 0) {
+      if (strcmp(cmd, "list") == 0)
+        return OPT_PUBSUB_TOPICS_LIST;
+    } else if (strcmp(prev_cmd, "topic") == 0) {
+      if (strcmp(cmd, "create") == 0)
+        return OPT_PUBSUB_TOPIC_CREATE;
+      if (strcmp(cmd, "rm") == 0)
+        return OPT_PUBSUB_TOPIC_RM;
+    } else if (strcmp(prev_cmd, "sub") == 0) {
+      if (strcmp(cmd, "get") == 0)
+        return OPT_PUBSUB_SUB_GET;
+      if (strcmp(cmd, "create") == 0)
+        return OPT_PUBSUB_SUB_CREATE;
+      if (strcmp(cmd, "rm") == 0)
+        return OPT_PUBSUB_SUB_RM;
+    }
   }
   return -EINVAL;
 }
@@ -2638,6 +2666,14 @@ static int trim_sync_error_log(int shard_id, const ceph::real_time& start_time,
   // unreachable
 }
 
+JSONFormattable& get_tier_config(RGWRados *store) {
+  return store->get_zone_params().tier_config;
+}
+
+const string& get_tier_type(RGWRados *store) {
+  return store->get_zone().tier_type;
+}
+
 int main(int argc, const char **argv)
 {
   vector<const char*> args;
@@ -2805,6 +2841,12 @@ int main(int argc, const char **argv)
   int totp_window = 0;
   int trim_delay_ms = 0;
 
+  string topic_name;
+  string sub_name;
+  string sub_oid_prefix;
+  string sub_dest_bucket;
+  string sub_push_endpoint;
+
   for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) {
     if (ceph_argparse_double_dash(args, i)) {
       break;
@@ -3125,6 +3167,16 @@ int main(int argc, const char **argv)
       totp_window = atoi(val.c_str());
     } else if (ceph_argparse_witharg(args, i, &val, "--trim-delay-ms", (char*)NULL)) {
       trim_delay_ms = atoi(val.c_str());
+    } else if (ceph_argparse_witharg(args, i, &val, "--topic", (char*)NULL)) {
+      topic_name = val;
+    } else if (ceph_argparse_witharg(args, i, &val, "--sub-name", (char*)NULL)) {
+      sub_name = val;
+    } else if (ceph_argparse_witharg(args, i, &val, "--sub-oid-prefix", (char*)NULL)) {
+      sub_oid_prefix = val;
+    } else if (ceph_argparse_witharg(args, i, &val, "--sub-dest-bucket", (char*)NULL)) {
+      sub_dest_bucket = val;
+    } else if (ceph_argparse_witharg(args, i, &val, "--sub-push-endpoint", (char*)NULL)) {
+      sub_push_endpoint = val;
     } else if (strncmp(*i, "-", 1) == 0) {
       cerr << "ERROR: invalid flag " << *i << std::endl;
       return EINVAL;
@@ -7701,5 +7753,220 @@ next:
    }
  }
 
- return 0;
+  if (opt_cmd == OPT_PUBSUB_TOPICS_LIST) {
+    if (get_tier_type(store) != "pubsub") {
+      cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
+      return EINVAL;
+    }
+    if (user_id.empty()) {
+      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
+      return EINVAL;
+    }
+    RGWUserInfo& user_info = user_op.get_user_info();
+
+    RGWUserPubSub ups(store, user_info.user_id);
+
+    rgw_bucket bucket;
+
+    rgw_pubsub_user_topics result;
+
+    if (!bucket_name.empty()) {
+      RGWBucketInfo bucket_info;
+      int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket);
+      if (ret < 0) {
+        cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
+        return -ret;
+      }
+
+      ret = ups.get_bucket_topics(bucket_info.bucket, &result);
+      if (ret < 0) {
+        cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
+        return -ret;
+      }
+    } else {
+      int ret = ups.get_topics(&result);
+      if (ret < 0) {
+        cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
+        return -ret;
+      }
+    }
+    encode_json("result", result, formatter);
+    formatter->flush(cout);
+  }
+
+  if (opt_cmd == OPT_PUBSUB_TOPIC_CREATE) {
+    if (get_tier_type(store) != "pubsub") {
+      cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
+      return EINVAL;
+    }
+    if (topic_name.empty()) {
+      cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
+      return EINVAL;
+    }
+    if (user_id.empty()) {
+      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
+      return EINVAL;
+    }
+    if (bucket_name.empty()) {
+      cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;
+      return EINVAL;
+    }
+    RGWUserInfo& user_info = user_op.get_user_info();
+    RGWUserPubSub ups(store, user_info.user_id);
+
+    rgw_bucket bucket;
+
+    RGWBucketInfo bucket_info;
+    int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket);
+    if (ret < 0) {
+      cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+
+    ret = ups.create_topic(topic_name, bucket_info.bucket);
+    if (ret < 0) {
+      cerr << "ERROR: could not create topic: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+  }
+
+  if (opt_cmd == OPT_PUBSUB_TOPIC_RM) {
+    if (get_tier_type(store) != "pubsub") {
+      cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
+      return EINVAL;
+    }
+    if (topic_name.empty()) {
+      cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
+      return EINVAL;
+    }
+    if (user_id.empty()) {
+      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
+      return EINVAL;
+    }
+    if (bucket_name.empty()) {
+      cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;
+      return EINVAL;
+    }
+    RGWUserInfo& user_info = user_op.get_user_info();
+    RGWUserPubSub ups(store, user_info.user_id);
+
+    rgw_bucket bucket;
+
+    RGWBucketInfo bucket_info;
+    int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket);
+    if (ret < 0) {
+      cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+
+    ret = ups.remove_topic(topic_name);
+    if (ret < 0) {
+      cerr << "ERROR: could not remove topic: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+  }
+
+  if (opt_cmd == OPT_PUBSUB_SUB_GET) {
+    if (get_tier_type(store) != "pubsub") {
+      cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
+      return EINVAL;
+    }
+    if (user_id.empty()) {
+      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
+      return EINVAL;
+    }
+    if (sub_name.empty()) {
+      cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
+      return EINVAL;
+    }
+    RGWUserInfo& user_info = user_op.get_user_info();
+    RGWUserPubSub ups(store, user_info.user_id);
+
+    rgw_pubsub_user_sub_config sub;
+
+    ret = ups.get_sub(sub_name, &sub);
+    if (ret < 0) {
+      cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+    encode_json("sub", sub, formatter);
+    formatter->flush(cout);
+  }
+
+  if (opt_cmd == OPT_PUBSUB_SUB_CREATE) {
+    if (get_tier_type(store) != "pubsub") {
+      cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
+      return EINVAL;
+    }
+    if (user_id.empty()) {
+      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
+      return EINVAL;
+    }
+    if (sub_name.empty()) {
+      cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
+      return EINVAL;
+    }
+    if (topic_name.empty()) {
+      cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
+      return EINVAL;
+    }
+    RGWUserInfo& user_info = user_op.get_user_info();
+    RGWUserPubSub ups(store, user_info.user_id);
+
+    rgw_pubsub_user_topic_info topic;
+    int ret = ups.get_topic(topic_name, &topic);
+    if (ret < 0) {
+      cerr << "ERROR: topic not found" << std::endl;
+      return EINVAL;
+    }
+    rgw_pubsub_user_sub_config sub;
+
+    auto& tier_config = get_tier_config(store);
+
+    rgw_pubsub_user_sub_dest dest_config;
+    dest_config.bucket_name = sub_dest_bucket;
+    dest_config.oid_prefix = sub_oid_prefix;
+    dest_config.push_endpoint = sub_push_endpoint;
+
+    if (dest_config.bucket_name.empty()) {
+      dest_config.bucket_name = string(tier_config["data_bucket_prefix"]) + topic.user.to_str() + "-" + topic.topic.name;
+    }
+    if (dest_config.oid_prefix.empty()) {
+      dest_config.oid_prefix = tier_config["data_oid_prefix"];
+    }
+    ret = ups.add_sub(sub_name, topic_name, dest_config);
+    if (ret < 0) {
+      cerr << "ERROR: could not store subscription info: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+  }
+
+ if (opt_cmd == OPT_PUBSUB_SUB_RM) {
+    if (get_tier_type(store) != "pubsub") {
+      cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
+      return EINVAL;
+    }
+    if (user_id.empty()) {
+      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
+      return EINVAL;
+    }
+    if (sub_name.empty()) {
+      cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
+      return EINVAL;
+    }
+    RGWUserInfo& user_info = user_op.get_user_info();
+    RGWUserPubSub ups(store, user_info.user_id);
+
+    rgw_pubsub_user_sub_config sub;
+
+    ret = ups.get_sub(sub_name, &sub);
+    if (ret < 0) {
+      cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+    encode_json("sub", sub, formatter);
+    formatter->flush(cout);
+  }
+
+  return 0;
 }
index e147516483d34a783f04a90a15b7b5ce53ffef96..16ef1b33ec72d90b97e4faefa568b45651c6bf10 100644 (file)
@@ -61,6 +61,7 @@ void rgw_pubsub_user_topic_info::dump(Formatter *f) const
 {
   encode_json("user", user, f);
   encode_json("topic", topic, f);
+  encode_json("subs", subs, f);
 }
 
 void rgw_pubsub_user_topics::dump(Formatter *f) const
@@ -86,98 +87,6 @@ void rgw_pubsub_user_sub_config::dump(Formatter *f) const
   encode_json("dest", dest, f);
 }
 
-static string pubsub_user_oid_prefix = "pubsub.user.";
-
-
-class RGWUserPubSub
-{
-  RGWRados *store;
-  rgw_user user;
-  RGWObjectCtx obj_ctx;
-
-  template <class T>
-  int read(const rgw_raw_obj& obj, T *data, RGWObjVersionTracker *objv_tracker);
-
-  template <class T>
-  int write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *obj_tracker);
-
-  int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker);
-public:
-  RGWUserPubSub(RGWRados *_store, const rgw_user& _user) : store(_store),
-                                                           user(_user),
-                                                           obj_ctx(store) {}
-
-  string user_meta_oid() const {
-    return pubsub_user_oid_prefix + user.to_str();
-  }
-
-  string bucket_meta_oid(const rgw_bucket& bucket) const {
-    return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id;
-  }
-
-  string sub_meta_oid(const string& name) const {
-    return pubsub_user_oid_prefix + user.to_str() + ".sub." + name;
-  }
-
-  void get_user_meta_obj(rgw_raw_obj *obj) const {
-    *obj = rgw_raw_obj(store->get_zone_params().log_pool, user_meta_oid());
-  }
-
-  void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
-    *obj = rgw_raw_obj(store->get_zone_params().log_pool, bucket_meta_oid(bucket));
-  }
-
-  void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
-    *obj = rgw_raw_obj(store->get_zone_params().log_pool, sub_meta_oid(name));
-  }
-
-  int get_topics(rgw_pubsub_user_topics *result);
-  int get_bucket_topics(const rgw_bucket& bucket, rgw_pubsub_user_topics *result);
-  int create_topic(const string& name, const rgw_bucket& bucket);
-  int remove_topic(const string& name);
-  int add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest);
-  int remove_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest);
-};
-
-template <class T>
-int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker *objv_tracker)
-{
-  bufferlist bl;
-  int ret = rgw_get_system_obj(store, obj_ctx,
-                               obj.pool, obj.oid,
-                               bl,
-                               objv_tracker,
-                               nullptr, nullptr, nullptr);
-  if (ret < 0) {
-    return ret;
-  }
-
-  auto iter = bl.cbegin();
-  try {
-    decode(*result, iter);
-  } catch (buffer::error& err) {
-    ldout(store->ctx(), 0) << "ERROR: failed to decode info, caught buffer::error" << dendl;
-    return -EIO;
-  }
-
-  return 0;
-}
-
-template <class T>
-int RGWUserPubSub::write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *objv_tracker)
-{
-  bufferlist bl;
-  encode(info, bl);
-
-  int ret = rgw_put_system_obj(store, obj.pool, obj.oid,
-                           bl, false, objv_tracker,
-                           real_time());
-  if (ret < 0) {
-    return ret;
-  }
-
-  return 0;
-}
 
 int RGWUserPubSub::remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker)
 {
@@ -217,6 +126,30 @@ int RGWUserPubSub::get_bucket_topics(const rgw_bucket& bucket, rgw_pubsub_user_t
   return 0;
 }
 
+int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_user_topic_info *result)
+{
+  rgw_raw_obj obj;
+  get_user_meta_obj(&obj);
+
+  RGWObjVersionTracker objv_tracker;
+  rgw_pubsub_user_topics topics;
+
+  int ret = read(obj, &topics, &objv_tracker);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+    return ret;
+  }
+
+  auto iter = topics.topics.find(name);
+  if (iter == topics.topics.end()) {
+    ldout(store->ctx(), 0) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
+    return -ENOENT;
+  }
+
+  *result = iter->second;
+  return 0;
+}
+
 
 int RGWUserPubSub::create_topic(const string& name, const rgw_bucket& bucket)
 {
@@ -311,6 +244,18 @@ int RGWUserPubSub::remove_topic(const string& name)
   return 0;
 }
 
+int RGWUserPubSub::get_sub(const string& name, rgw_pubsub_user_sub_config *result)
+{
+  rgw_raw_obj obj;
+  get_sub_meta_obj(name, &obj);
+  int ret = read(obj, result, nullptr);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
+    return ret;
+  }
+  return 0;
+}
+
 int RGWUserPubSub::add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest)
 {
   rgw_raw_obj obj;
@@ -325,9 +270,9 @@ int RGWUserPubSub::add_sub(const string& name, const string& topic, const rgw_pu
     return ret;
   }
 
-  auto iter = topics.topics.find(name);
+  auto iter = topics.topics.find(topic);
   if (iter == topics.topics.end()) {
-    ldout(store->ctx(), 20) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
+    ldout(store->ctx(), 0) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
     return -ENOENT;
   }
 
@@ -350,7 +295,7 @@ int RGWUserPubSub::add_sub(const string& name, const string& topic, const rgw_pu
 
   rgw_raw_obj bobj;
   get_sub_meta_obj(name, &bobj);
-  ret = write(obj, sub_conf, nullptr);
+  ret = write(bobj, sub_conf, nullptr);
   if (ret < 0) {
     ldout(store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
     return ret;
@@ -358,8 +303,24 @@ int RGWUserPubSub::add_sub(const string& name, const string& topic, const rgw_pu
   return 0;
 }
 
-int RGWUserPubSub::remove_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest)
+int RGWUserPubSub::remove_sub(const string& name, const string& _topic, const rgw_pubsub_user_sub_dest& dest)
 {
+  string topic = _topic;
+
+  RGWObjVersionTracker sobjv_tracker;
+  rgw_raw_obj sobj;
+  get_sub_meta_obj(name, &sobj);
+
+  if (topic.empty()) {
+    rgw_pubsub_user_sub_config sub_conf;
+    int ret = read(sobj, &sub_conf, &sobjv_tracker);
+    if (ret < 0) {
+      ldout(store->ctx(), 0) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
+      return ret;
+    }
+    topic = sub_conf.topic;
+  }
+
   rgw_raw_obj obj;
   get_user_meta_obj(&obj);
 
@@ -372,7 +333,7 @@ int RGWUserPubSub::remove_sub(const string& name, const string& topic, const rgw
   }
 
   if (ret >= 0) {
-    auto iter = topics.topics.find(name);
+    auto iter = topics.topics.find(topic);
     if (iter == topics.topics.end()) {
       ldout(store->ctx(), 20) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
     } else {
@@ -388,9 +349,7 @@ int RGWUserPubSub::remove_sub(const string& name, const string& topic, const rgw
     }
   }
 
-  rgw_raw_obj sobj;
-  get_sub_meta_obj(name, &sobj);
-  ret = remove(obj, nullptr);
+  ret = remove(sobj, &sobjv_tracker);
   if (ret < 0) {
     ldout(store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
     return ret;
index 2239ab2f9c6c40b4587c205ca7fa8a66a1246016..00f15ce3fb64ffc97899a351957aa4bc88da70a0 100644 (file)
@@ -2,6 +2,7 @@
 #define CEPH_RGW_PUBSUB_H
 
 #include "rgw_common.h"
+#include "rgw_tools.h"
 
 
 enum RGWPubSubEventType {
@@ -183,4 +184,97 @@ struct rgw_pubsub_user_topics {
 };
 WRITE_CLASS_ENCODER(rgw_pubsub_user_topics)
 
+static string pubsub_user_oid_prefix = "pubsub.user.";
+
+class RGWUserPubSub
+{
+  RGWRados *store;
+  rgw_user user;
+  RGWObjectCtx obj_ctx;
+
+  template <class T>
+  int read(const rgw_raw_obj& obj, T *data, RGWObjVersionTracker *objv_tracker);
+
+  template <class T>
+  int write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *obj_tracker);
+
+  int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker);
+public:
+  RGWUserPubSub(RGWRados *_store, const rgw_user& _user) : store(_store),
+                                                           user(_user),
+                                                           obj_ctx(store) {}
+
+  string user_meta_oid() const {
+    return pubsub_user_oid_prefix + user.to_str();
+  }
+
+  string bucket_meta_oid(const rgw_bucket& bucket) const {
+    return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id;
+  }
+
+  string sub_meta_oid(const string& name) const {
+    return pubsub_user_oid_prefix + user.to_str() + ".sub." + name;
+  }
+
+  void get_user_meta_obj(rgw_raw_obj *obj) const {
+    *obj = rgw_raw_obj(store->get_zone_params().log_pool, user_meta_oid());
+  }
+
+  void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
+    *obj = rgw_raw_obj(store->get_zone_params().log_pool, bucket_meta_oid(bucket));
+  }
+
+  void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
+    *obj = rgw_raw_obj(store->get_zone_params().log_pool, sub_meta_oid(name));
+  }
+
+  int get_topics(rgw_pubsub_user_topics *result);
+  int get_bucket_topics(const rgw_bucket& bucket, rgw_pubsub_user_topics *result);
+  int get_topic(const string& name, rgw_pubsub_user_topic_info *result);
+  int create_topic(const string& name, const rgw_bucket& bucket);
+  int remove_topic(const string& name);
+  int get_sub(const string& name, rgw_pubsub_user_sub_config *result);
+  int add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest);
+  int remove_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest);
+};
+
+template <class T>
+int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker *objv_tracker)
+{
+  bufferlist bl;
+  int ret = rgw_get_system_obj(store, obj_ctx,
+                               obj.pool, obj.oid,
+                               bl,
+                               objv_tracker,
+                               nullptr, nullptr, nullptr);
+  if (ret < 0) {
+    return ret;
+  }
+
+  auto iter = bl.cbegin();
+  try {
+    decode(*result, iter);
+  } catch (buffer::error& err) {
+    return -EIO;
+  }
+
+  return 0;
+}
+
+template <class T>
+int RGWUserPubSub::write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *objv_tracker)
+{
+  bufferlist bl;
+  encode(info, bl);
+
+  int ret = rgw_put_system_obj(store, obj.pool, obj.oid,
+                           bl, false, objv_tracker,
+                           real_time());
+  if (ret < 0) {
+    return ret;
+  }
+
+  return 0;
+}
+
 #endif
index 1768fb23c33332cefb76d99a602e165972e3c52e..6a19d5f7e1d799962c3402ca4ed4b07de04ceff6 100644 (file)
@@ -55,6 +55,7 @@ config:
    "tenant": <tenant>,             # default: <empty>
    "uid": <uid>,                   # default: "pubsub"
    "data_bucket_prefix": <prefix>  # default: "pubsub"
+   "data_oid_prefix": <prefix>     #
 
     # non-dynamic config
     "notifications": [
@@ -96,13 +97,14 @@ struct PSSubConfig { /* subscription config */
   }
 
   void init(CephContext *cct, const JSONFormattable& config,
-            const string& data_bucket_prefix) {
+            const string& data_bucket_prefix,
+            const string& default_oid_prefix) {
     name = config["name"];
     topic = config["topic"];
     push_endpoint = config["push_endpoint"];
     string default_bucket_name = data_bucket_prefix + name;
     data_bucket_name = config["data_bucket"](default_bucket_name.c_str());
-    data_oid_prefix = config["data_oid_prefix"];
+    data_oid_prefix = config["data_oid_prefix"](default_oid_prefix.c_str());
   }
 };
 
@@ -154,26 +156,29 @@ static string json_str(const char *name, const T& obj, bool pretty = false)
   return ss.str();
 }
 
-using TopicsRef = std::shared_ptr<vector<PSTopicConfig *>>;
+using PSTopicConfigRef = std::shared_ptr<PSTopicConfig>;
+using TopicsRef = std::shared_ptr<vector<PSTopicConfigRef>>;
 
 
 struct PSConfig {
   string id{"pubsub"};
   rgw_user user;
   string data_bucket_prefix;
+  string data_oid_prefix;
 
   uint64_t sync_instance{0};
   uint64_t max_id{0};
 
   /* FIXME: no hard coded buckets, we'll have configurable topics */
   map<string, PSSubConfigRef> subs;
-  map<string, PSTopicConfig> topics;
+  map<string, PSTopicConfigRef> topics;
   multimap<string, PSNotificationConfig> notifications;
 
   void dump(Formatter *f) const {
     encode_json("id", id, f);
     encode_json("user", user, f);
     encode_json("data_bucket_prefix", data_bucket_prefix, f);
+    encode_json("data_oid_prefix", data_bucket_prefix, f);
     encode_json("sync_instance", sync_instance, f);
     encode_json("max_id", max_id, f);
     {
@@ -185,7 +190,7 @@ struct PSConfig {
     {
       Formatter::ArraySection section(*f, "topics");
       for (auto& topic : topics) {
-        encode_json("topic", topic.second, f);
+        encode_json("topic", *topic.second, f);
       }
     }
     {
@@ -212,6 +217,7 @@ struct PSConfig {
     string uid = config["uid"]("pubsub");
     user = rgw_user(config["tenant"], uid);
     data_bucket_prefix = config["data_bucket_prefix"]("pubsub");
+    data_oid_prefix = config["data_oid_prefix"];
 
     /* FIXME: this will be dynamically configured */
     for (auto& c : config["notifications"].array()) {
@@ -221,13 +227,16 @@ struct PSConfig {
       notifications.insert(std::make_pair(nc.path, nc));
 
       PSTopicConfig topic_config = { .name = nc.topic };
-      topics[nc.topic] = topic_config;
+      topics[nc.topic] = make_shared<PSTopicConfig>(topic_config);
     }
     for (auto& c : config["subscriptions"].array()) {
       auto sc = std::make_shared<PSSubConfig>();
-      sc->init(cct, c, data_bucket_prefix);
+      sc->init(cct, c, data_bucket_prefix, data_oid_prefix);
       subs[sc->name] = sc;
-      topics[sc->topic].subs.insert(sc->name);
+      auto iter = topics.find(sc->topic);
+      if (iter != topics.end()) {
+        iter->second->subs.insert(sc->name);
+      }
     }
 
     ldout(cct, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl;
@@ -240,8 +249,6 @@ struct PSConfig {
   void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, TopicsRef *result) {
     string path = bucket.name + "/" + key.name;
 
-    (*result)->clear();
-
     auto iter = notifications.upper_bound(path);
     if (iter == notifications.begin()) {
       return;
@@ -269,7 +276,7 @@ struct PSConfig {
       }
 
       ldout(cct, 10) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id << " target_path=" << target.path << ", topic=" << target.topic << dendl;
-      (*result)->push_back(&topic->second);
+      (*result)->push_back(topic->second);
     } while (iter != notifications.begin());
   }
 
@@ -756,26 +763,56 @@ public:
 class RGWPSFindBucketTopicsCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   PSEnvRef env;
+  rgw_user owner;
   rgw_bucket bucket;
   rgw_obj_key key;
 
+  RGWUserPubSub ups;
+
+  rgw_raw_obj obj;
+  rgw_pubsub_user_topics bucket_topics;
   TopicsRef *topics;
 public:
   RGWPSFindBucketTopicsCR(RGWDataSyncEnv *_sync_env,
                       PSEnvRef& _env,
-                      const rgw_bucket& _bucket,
+                      const RGWBucketInfo& _bucket_info,
                       const rgw_obj_key& _key,
                       TopicsRef *_topics) : RGWCoroutine(_sync_env->cct),
                                                           sync_env(_sync_env),
                                                           env(_env),
-                                                          bucket(_bucket),
+                                                          owner(_bucket_info.owner),
+                                                          bucket(_bucket_info.bucket),
                                                           key(_key),
+                                                          ups(_sync_env->store, owner),
                                                           topics(_topics) {
-    *topics = std::make_shared<vector<PSTopicConfig *> >();
+    *topics = std::make_shared<vector<PSTopicConfigRef> >();
   }
   int operate() override {
     reenter(this) {
-#warning this will need to change
+      ups.get_bucket_meta_obj(bucket, &obj);
+
+
+      using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_user_topics>;
+      yield {
+        bool empty_on_enoent = true;
+        call(new ReadInfoCR(sync_env->async_rados, sync_env->store,
+                            obj,
+                            &bucket_topics, empty_on_enoent));
+      }
+      if (retcode < 0 && retcode != -ENOENT) {
+        return set_cr_error(retcode);
+      }
+
+      ldout(sync_env->cct, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl;
+
+      for (auto& titer : bucket_topics.topics) {
+        auto& info = titer.second;
+        shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>();
+        tc->name = info.topic.name;
+        tc->subs = info.subs;
+        (*topics)->push_back(tc);
+      }
+
       env->conf->get_topics(sync_env->cct, bucket, key, topics);
       return set_cr_done();
     }
@@ -788,7 +825,7 @@ class RGWPSHandleObjEvent : public RGWCoroutine {
   PSEnvRef env;
   EventRef event;
 
-  vector<PSTopicConfig *>::iterator titer;
+  vector<PSTopicConfigRef>::iterator titer;
   set<string>::iterator siter;
   PSSubscriptionRef sub;
   TopicsRef topics;
@@ -932,7 +969,7 @@ public:
 
   int operate() override {
     reenter(this) {
-      yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.bucket, key, &topics));
+      yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info, key, &topics));
       if (retcode < 0) {
         ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
         return set_cr_error(retcode);
@@ -973,7 +1010,7 @@ public:
     reenter(this) {
       ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
                                << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
-      yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.bucket, key, &topics));
+      yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info, key, &topics));
       if (retcode < 0) {
         ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
         return set_cr_error(retcode);