]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: remove deprecated radogw-admin pubsub creation commands 35331/head
authorYuval Lifshitz <ylifshit@redhat.com>
Mon, 1 Jun 2020 05:11:32 +0000 (08:11 +0300)
committerYuval Lifshitz <ylifshit@redhat.com>
Tue, 9 Jun 2020 08:56:46 +0000 (11:56 +0300)
make topic and subscription read commands an official feature

Fixes: https://tracker.ceph.com/issues/43536
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
doc/man/8/radosgw-admin.rst
doc/radosgw/notifications.rst
doc/radosgw/pubsub-module.rst
src/rgw/rgw_admin.cc
src/rgw/rgw_sync_module_pubsub.cc
src/test/cli/radosgw-admin/help.t
src/test/rgw/rgw_multi/tests_ps.py

index a80f95d2d437af36dc1ef71cefa9c1f2775a451d..6fa2aba3ecec61f444b1f40621dbfa736306d916 100644 (file)
@@ -457,6 +457,28 @@ which are as follows:
 :command:`reshard cancel`
   Cancel resharding a bucket
 
+:command:`topic list`
+  List bucket notifications/pubsub topics                                                   
+
+:command:`topic get`
+  Get a bucket notifications/pubsub topic                                                   
+  
+:command:`topic rm`
+  Remove a bucket notifications/pubsub topic                                                
+
+:command:`subscription get`
+  Get a pubsub subscription definition
+
+:command:`subscription rm`
+  Remove a pubsub subscription
+
+:command:`subscription pull`
+  Show events in a pubsub subscription
+             
+:command:`subscription ack`
+  Ack (remove) an events in a pubsub subscription
+
+
 Options
 =======
 
@@ -898,6 +920,22 @@ Role Options
 
    The path prefix for filtering the roles.
 
+
+Bucket Notifications/PubSub Options
+===================================
+.. option:: --topic                   
+
+   The bucket notifications/pubsub topic name.
+
+.. option:: --subscription
+
+   The pubsub subscription name.
+
+.. option:: --event-id
+
+   The event id in a pubsub subscription.
+
+
 Examples
 ========
 
index 3762e89075ef4f9903f52f69f8c49d339352b9ba..264c4b3a127b96122d82a2d28b4f36b774837564 100644 (file)
@@ -29,6 +29,31 @@ mechanism. This API is similar to the one defined as the S3-compatible API of th
 
    S3 Bucket Notification Compatibility <s3-notification-compatibility>
 
+
+Topic Management via CLI
+------------------------
+
+Configuration of all topics of a user could be fetched using the following command:
+   
+::
+   
+   # radosgw-admin topic list --uid={user-id}
+
+
+Configuration of a specific topic could be fetched using:
+
+::
+   
+   # radosgw-admin topic get --uid={user-id} --topic={topic-name}
+
+
+And removed using:
+
+::
+   
+   # radosgw-admin topic rm --uid={user-id} --topic={topic-name}
+
+
 Notification Performance Stats
 ------------------------------
 The same counters are shared between the pubsub sync module and the bucket notification mechanism.
index fd3b9f021e64e48e0313bcac1906e16305f60cfb..52a542baca6e4423de46360d73876cb9d9bc8ef3 100644 (file)
@@ -112,6 +112,58 @@ the ``val`` specifies its new value. For example, setting the pubsub control use
 
 A configuration field can be removed by using ``--tier-config-rm={key}``.
 
+
+Topic and Subscription Management via CLI
+-----------------------------------------
+
+Configuration of all topics of a user could be fetched using the following command:
+   
+::
+   
+   # radosgw-admin topic list --uid={user-id}
+
+
+Configuration of a specific topic could be fetched using:
+
+::
+   
+   # radosgw-admin topic get --uid={user-id} --topic={topic-name}
+
+
+And removed using:
+
+::
+   
+   # radosgw-admin topic rm --uid={user-id} --topic={topic-name}
+
+
+Configuration of a subscription could be fetched using:
+
+::
+   
+   # radosgw-admin subscription get --uid={user-id} --subscription={topic-name}
+
+And removed using:
+
+::
+   
+   # radosgw-admin subscription rm --uid={user-id} --subscription={topic-name}
+
+
+To fetch all of the events stored in a subcription, use:
+
+::
+   
+   # radosgw-admin subscription pull --uid={user-id} --subscription={topic-name} [--marker={last-marker}]
+
+
+To ack (and remove) an event from a subscription, use:
+
+::
+   
+   # radosgw-admin subscription ack --uid={user-id} --subscription={topic-name} --event-id={event-id}
+
+
 PubSub Performance Stats
 -------------------------
 Same counters are shared between the pubsub sync module and the notification mechanism.
index 961ad816be43987f6811b3e3f85a52ef05e36a91..2bdc9726c871af6241f04934323b2c014b2cee78 100644 (file)
@@ -271,6 +271,13 @@ void usage()
   cout << "  mfa remove                 delete MFA TOTP token\n";
   cout << "  mfa check                  check MFA TOTP token\n";
   cout << "  mfa resync                 re-sync MFA TOTP token\n";
+  cout << "  topic list                 list bucket notifications/pubsub topics\n";
+  cout << "  topic get                  get a bucket notifications/pubsub topic\n";
+  cout << "  topic rm                   remove a bucket notifications/pubsub topic\n";
+  cout << "  subscription get           get a pubsub subscription definition\n";
+  cout << "  subscription rm            remove a pubsub subscription\n";
+  cout << "  subscription pull          show events in a pubsub subscription\n";
+  cout << "  subscription ack           ack (remove) an events in a pubsub subscription\n";
   cout << "options:\n";
   cout << "   --tenant=<tenant>         tenant name\n";
   cout << "   --uid=<id>                user id\n";
@@ -415,6 +422,10 @@ void usage()
   cout << "   --totp-seconds            the time resolution that is being used for TOTP generation\n";
   cout << "   --totp-window             the number of TOTP tokens that are checked before and after the current token when validating token\n";
   cout << "   --totp-pin                the valid value of a TOTP token at a certain time\n";
+  cout << "\nBucket notifications/pubsub options:\n";
+  cout << "   --topic                   bucket notifications/pubsub topic name\n";
+  cout << "   --subscription            pubsub subscription name\n";
+  cout << "   --event-id                event id in a pubsub subscription\n";
   cout << "\n";
   generic_client_usage();
 }
@@ -738,13 +749,10 @@ enum class OPT {
   RESHARD_STALE_INSTANCES_LIST,
   RESHARD_STALE_INSTANCES_DELETE,
   PUBSUB_TOPICS_LIST,
-  PUBSUB_TOPIC_CREATE,
+  // TODO add "subscription list" command
   PUBSUB_TOPIC_GET,
   PUBSUB_TOPIC_RM,
-  PUBSUB_NOTIFICATION_CREATE,
-  PUBSUB_NOTIFICATION_RM,
   PUBSUB_SUB_GET,
-  PUBSUB_SUB_CREATE,
   PUBSUB_SUB_RM,
   PUBSUB_SUB_PULL,
   PUBSUB_EVENT_RM,
@@ -953,17 +961,13 @@ static SimpleCmd::Commands all_cmds = {
   { "reshard stale list", OPT::RESHARD_STALE_INSTANCES_LIST },
   { "reshard stale-instances delete", OPT::RESHARD_STALE_INSTANCES_DELETE },
   { "reshard stale delete", OPT::RESHARD_STALE_INSTANCES_DELETE },
-  { "pubsub topics list", OPT::PUBSUB_TOPICS_LIST },
-  { "pubsub topic create", OPT::PUBSUB_TOPIC_CREATE },
-  { "pubsub topic get", OPT::PUBSUB_TOPIC_GET },
-  { "pubsub topic rm", OPT::PUBSUB_TOPIC_RM },
-  { "pubsub notification create", OPT::PUBSUB_NOTIFICATION_CREATE },
-  { "pubsub notification rm", OPT::PUBSUB_NOTIFICATION_RM },
-  { "pubsub sub get", OPT::PUBSUB_SUB_GET },
-  { "pubsub sub create", OPT::PUBSUB_SUB_CREATE },
-  { "pubsub sub rm", OPT::PUBSUB_SUB_RM },
-  { "pubsub sub pull", OPT::PUBSUB_SUB_PULL },
-  { "pubsub event rm", OPT::PUBSUB_EVENT_RM },
+  { "topic list", OPT::PUBSUB_TOPICS_LIST },
+  { "topic get", OPT::PUBSUB_TOPIC_GET },
+  { "topic rm", OPT::PUBSUB_TOPIC_RM },
+  { "subscription get", OPT::PUBSUB_SUB_GET },
+  { "subscription rm", OPT::PUBSUB_SUB_RM },
+  { "subscription pull", OPT::PUBSUB_SUB_PULL },
+  { "subscription ack", OPT::PUBSUB_EVENT_RM },
 };
 
 static SimpleCmd::Aliases cmd_aliases = {
@@ -3203,9 +3207,6 @@ int main(int argc, const char **argv)
 
   string topic_name;
   string sub_name;
-  string sub_oid_prefix;
-  string sub_dest_bucket;
-  string sub_push_endpoint;
   string event_id;
 
   std::optional<string> opt_group_id;
@@ -3245,8 +3246,6 @@ int main(int argc, const char **argv)
   std::optional<string> opt_mode;
   std::optional<rgw_user> opt_dest_owner;
 
-  rgw::notify::EventTypeList event_types;
-
   SimpleCmd cmd(all_cmds, cmd_aliases);
 
   for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) {
@@ -3599,18 +3598,10 @@ int main(int argc, const char **argv)
       trim_delay_ms = atoi(val.c_str());
     } else if (ceph_argparse_witharg(args, i, &val, "--topic", (char*)NULL)) {
       topic_name = val;
-    } else if (ceph_argparse_witharg(args, i, &val, "--sub-name", (char*)NULL)) {
+    } else if (ceph_argparse_witharg(args, i, &val, "--subscription", (char*)NULL)) {
       sub_name = val;
-    } else if (ceph_argparse_witharg(args, i, &val, "--sub-oid-prefix", (char*)NULL)) {
-      sub_oid_prefix = val;
-    } else if (ceph_argparse_witharg(args, i, &val, "--sub-dest-bucket", (char*)NULL)) {
-      sub_dest_bucket = val;
-    } else if (ceph_argparse_witharg(args, i, &val, "--sub-push-endpoint", (char*)NULL)) {
-      sub_push_endpoint = val;
     } else if (ceph_argparse_witharg(args, i, &val, "--event-id", (char*)NULL)) {
       event_id = val;
-    } else if (ceph_argparse_witharg(args, i, &val, "--event-type", "--event-types", (char*)NULL)) {
-      rgw::notify::from_string_list(val, event_types);
     } else if (ceph_argparse_witharg(args, i, &val, "--group-id", (char*)NULL)) {
       opt_group_id = val;
     } else if (ceph_argparse_witharg(args, i, &val, "--status", (char*)NULL)) {
@@ -3874,6 +3865,10 @@ int main(int argc, const char **argv)
                         OPT::ROLE_POLICY_GET,
                         OPT::RESHARD_LIST,
                         OPT::RESHARD_STATUS,
+       OPT::PUBSUB_TOPICS_LIST,
+       OPT::PUBSUB_TOPIC_GET,
+       OPT::PUBSUB_SUB_GET,
+       OPT::PUBSUB_SUB_PULL,
   };
 
 
@@ -9012,10 +9007,6 @@ next:
  }
 
   if (opt_cmd == OPT::PUBSUB_TOPICS_LIST) {
-    if (get_tier_type(store) != "pubsub") {
-      cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
-      return EINVAL;
-    }
     if (user_id.empty()) {
       cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
       return EINVAL;
@@ -9054,34 +9045,7 @@ next:
     formatter->flush(cout);
   }
 
-  if (opt_cmd == OPT::PUBSUB_TOPIC_CREATE) {
-    if (get_tier_type(store) != "pubsub") {
-      cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
-      return EINVAL;
-    }
-    if (topic_name.empty()) {
-      cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
-      return EINVAL;
-    }
-    if (user_id.empty()) {
-      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
-      return EINVAL;
-    }
-    RGWUserInfo& user_info = user_op.get_user_info();
-    RGWUserPubSub ups(store, user_info.user_id);
-
-    ret = ups.create_topic(topic_name);
-    if (ret < 0) {
-      cerr << "ERROR: could not create topic: " << cpp_strerror(-ret) << std::endl;
-      return -ret;
-    }
-  }
-
   if (opt_cmd == OPT::PUBSUB_TOPIC_GET) {
-    if (get_tier_type(store) != "pubsub") {
-      cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
-      return EINVAL;
-    }
     if (topic_name.empty()) {
       cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
       return EINVAL;
@@ -9096,92 +9060,14 @@ next:
     rgw_pubsub_topic_subs topic;
     ret = ups.get_topic(topic_name, &topic);
     if (ret < 0) {
-      cerr << "ERROR: could not create topic: " << cpp_strerror(-ret) << std::endl;
+      cerr << "ERROR: could not get topic: " << cpp_strerror(-ret) << std::endl;
       return -ret;
     }
     encode_json("topic", topic, formatter);
     formatter->flush(cout);
   }
 
-  if (opt_cmd == OPT::PUBSUB_NOTIFICATION_CREATE) {
-    if (get_tier_type(store) != "pubsub") {
-      cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
-      return EINVAL;
-    }
-    if (topic_name.empty()) {
-      cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
-      return EINVAL;
-    }
-    if (user_id.empty()) {
-      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
-      return EINVAL;
-    }
-    if (bucket_name.empty()) {
-      cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;
-      return EINVAL;
-    }
-    RGWUserInfo& user_info = user_op.get_user_info();
-    RGWUserPubSub ups(store, user_info.user_id);
-
-    rgw_bucket bucket;
-
-    RGWBucketInfo bucket_info;
-    int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket);
-    if (ret < 0) {
-      cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
-      return -ret;
-    }
-
-    auto b = ups.get_bucket(bucket_info.bucket);
-    ret = b->create_notification(topic_name, event_types);
-    if (ret < 0) {
-      cerr << "ERROR: could not publish bucket: " << cpp_strerror(-ret) << std::endl;
-      return -ret;
-    }
-  }
-
-  if (opt_cmd == OPT::PUBSUB_NOTIFICATION_RM) {
-    if (get_tier_type(store) != "pubsub") {
-      cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
-      return EINVAL;
-    }
-    if (topic_name.empty()) {
-      cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
-      return EINVAL;
-    }
-    if (user_id.empty()) {
-      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
-      return EINVAL;
-    }
-    if (bucket_name.empty()) {
-      cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;
-      return EINVAL;
-    }
-    RGWUserInfo& user_info = user_op.get_user_info();
-    RGWUserPubSub ups(store, user_info.user_id);
-
-    rgw_bucket bucket;
-
-    RGWBucketInfo bucket_info;
-    int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket);
-    if (ret < 0) {
-      cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
-      return -ret;
-    }
-
-    auto b = ups.get_bucket(bucket_info.bucket);
-    ret = b->remove_notification(topic_name);
-    if (ret < 0) {
-      cerr << "ERROR: could not publish bucket: " << cpp_strerror(-ret) << std::endl;
-      return -ret;
-    }
-  }
-
   if (opt_cmd == OPT::PUBSUB_TOPIC_RM) {
-    if (get_tier_type(store) != "pubsub") {
-      cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
-      return EINVAL;
-    }
     if (topic_name.empty()) {
       cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
       return EINVAL;
@@ -9210,7 +9096,7 @@ next:
       return EINVAL;
     }
     if (sub_name.empty()) {
-      cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
+      cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
       return EINVAL;
     }
     RGWUserInfo& user_info = user_op.get_user_info();
@@ -9228,55 +9114,6 @@ next:
     formatter->flush(cout);
   }
 
-  if (opt_cmd == OPT::PUBSUB_SUB_CREATE) {
-    if (get_tier_type(store) != "pubsub") {
-      cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
-      return EINVAL;
-    }
-    if (user_id.empty()) {
-      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
-      return EINVAL;
-    }
-    if (sub_name.empty()) {
-      cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
-      return EINVAL;
-    }
-    if (topic_name.empty()) {
-      cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
-      return EINVAL;
-    }
-    RGWUserInfo& user_info = user_op.get_user_info();
-    RGWUserPubSub ups(store, user_info.user_id);
-
-    rgw_pubsub_topic_subs topic;
-    int ret = ups.get_topic(topic_name, &topic);
-    if (ret < 0) {
-      cerr << "ERROR: topic not found" << std::endl;
-      return EINVAL;
-    }
-
-    rgw_pubsub_sub_dest dest_config;
-    dest_config.bucket_name = sub_dest_bucket;
-    dest_config.oid_prefix = sub_oid_prefix;
-    dest_config.push_endpoint = sub_push_endpoint;
-
-    auto psmodule = static_cast<RGWPSSyncModuleInstance *>(store->getRados()->get_sync_module().get());
-    auto conf = psmodule->get_effective_conf();
-
-    if (dest_config.bucket_name.empty()) {
-      dest_config.bucket_name = string(conf["data_bucket_prefix"]) + user_info.user_id.to_str() + "-" + topic.topic.name;
-    }
-    if (dest_config.oid_prefix.empty()) {
-      dest_config.oid_prefix = conf["data_oid_prefix"];
-    }
-    auto sub = ups.get_sub(sub_name);
-    ret = sub->subscribe(topic_name, dest_config);
-    if (ret < 0) {
-      cerr << "ERROR: could not store subscription info: " << cpp_strerror(-ret) << std::endl;
-      return -ret;
-    }
-  }
-
  if (opt_cmd == OPT::PUBSUB_SUB_RM) {
     if (get_tier_type(store) != "pubsub") {
       cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
@@ -9287,7 +9124,7 @@ next:
       return EINVAL;
     }
     if (sub_name.empty()) {
-      cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
+      cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
       return EINVAL;
     }
     RGWUserInfo& user_info = user_op.get_user_info();
@@ -9311,7 +9148,7 @@ next:
       return EINVAL;
     }
     if (sub_name.empty()) {
-      cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
+      cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
       return EINVAL;
     }
     RGWUserInfo& user_info = user_op.get_user_info();
@@ -9320,7 +9157,7 @@ next:
     if (!max_entries_specified) {
       max_entries = RGWUserPubSub::Sub::DEFAULT_MAX_EVENTS;
     }
-    auto sub = ups.get_sub(sub_name);
+    auto sub = ups.get_sub_with_events(sub_name);
     ret = sub->list_events(marker, max_entries);
     if (ret < 0) {
       cerr << "ERROR: could not list events: " << cpp_strerror(-ret) << std::endl;
@@ -9340,7 +9177,7 @@ next:
       return EINVAL;
     }
     if (sub_name.empty()) {
-      cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
+      cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
       return EINVAL;
     }
     if (event_id.empty()) {
@@ -9350,7 +9187,7 @@ next:
     RGWUserInfo& user_info = user_op.get_user_info();
     RGWUserPubSub ups(store, user_info.user_id);
 
-    auto sub = ups.get_sub(sub_name);
+    auto sub = ups.get_sub_with_events(sub_name);
     ret = sub->remove_event(event_id);
     if (ret < 0) {
       cerr << "ERROR: could not remove event: " << cpp_strerror(-ret) << std::endl;
index d78bb29c76c5c6167ad7fd6065eeb2d48798e745..75e08300386c4d5c321ec075ab075b61a7cf5f5c 100644 (file)
@@ -43,28 +43,6 @@ config:
    "data_oid_prefix": <prefix>     #
    "events_retention_days": <int>  # default: 7
    "start_with_full_sync" <bool>   # default: false
-
-    # non-dynamic config
-    "notifications": [
-        {
-            "path": <notification-path>,    # this can be either an explicit path: <bucket>, or <bucket>/<object>,
-                                            # or a prefix if it ends with a wildcard
-            "topic": <topic-name>
-         },
-        ...
-    ],
-    "subscriptions": [
-        {
-            "name": <subscription-name>,
-            "topic": <topic>,
-            "push_endpoint": <endpoint>,
-            "push_endpoint_args:" <arg list>.            # any push endpoint specific args (include all args)
-            "data_bucket": <bucket>,       # override name of bucket where subscription data will be store
-            "data_oid_prefix": <prefix>    # set prefix for subscription data object ids
-            "s3_id": <id>                  # in case of S3 compatible notifications, the notification ID will be set here
-        },
-        ...
-    ]
 }
 
 */
@@ -119,28 +97,6 @@ struct PSSubConfig {
     encode_json("s3_id", s3_id, f);
   }
 
-  void init(CephContext *cct, const JSONFormattable& config,
-            const string& data_bucket_prefix,
-            const string& default_oid_prefix) {
-    name = config["name"];
-    topic = config["topic"];
-    push_endpoint_name = config["push_endpoint"];
-    string default_bucket_name = data_bucket_prefix + name;
-    data_bucket_name = config["data_bucket"](default_bucket_name.c_str());
-    data_oid_prefix = config["data_oid_prefix"](default_oid_prefix.c_str());
-    s3_id = config["s3_id"];
-    arn_topic = config["arn_topic"];
-    if (!push_endpoint_name.empty()) {
-      push_endpoint_args = config["push_endpoint_args"];
-      try {
-        push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct);
-        ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl;
-      } catch (const RGWPubSubEndpoint::configuration_error& e) {
-        ldout(cct, 1) << "ERROR: failed to create push endpoint: " 
-          << push_endpoint_name << " due to: " << e.what() << dendl;
-      }
-    }
-  }
 };
 
 using  PSSubConfigRef = std::shared_ptr<PSSubConfig>;
@@ -196,22 +152,14 @@ static string json_str(const char *name, const T& obj, bool pretty = false)
 using PSTopicConfigRef = std::shared_ptr<PSTopicConfig>;
 using TopicsRef = std::shared_ptr<std::vector<PSTopicConfigRef>>;
 
+// global pubsub configuration
 struct PSConfig {
   const std::string id{"pubsub"};
   rgw_user user;
   std::string data_bucket_prefix;
   std::string data_oid_prefix;
-
   int events_retention_days{0};
-
   uint64_t sync_instance{0};
-  uint64_t max_id{0};
-
-  /* FIXME: no hard coded buckets, we'll have configurable topics */
-  std::map<std::string, PSSubConfigRef> subs;
-  std::map<std::string, PSTopicConfigRef> topics;
-  std::multimap<std::string, PSNotificationConfig> notifications;
-  
   bool start_with_full_sync{false};
 
   void dump(Formatter *f) const {
@@ -221,37 +169,6 @@ struct PSConfig {
     encode_json("data_oid_prefix", data_oid_prefix, f);
     encode_json("events_retention_days", events_retention_days, f);
     encode_json("sync_instance", sync_instance, f);
-    encode_json("max_id", max_id, f);
-    {
-      Formatter::ArraySection section(*f, "subs");
-      for (auto& sub : subs) {
-        encode_json("sub", *sub.second, f);
-      }
-    }
-    {
-      Formatter::ArraySection section(*f, "topics");
-      for (auto& topic : topics) {
-        encode_json("topic", *topic.second, f);
-      }
-    }
-    {
-      Formatter::ObjectSection section(*f, "notifications");
-      std::string last;
-      for (auto& notif : notifications) {
-        const string& n = notif.first;
-        if (n != last) {
-          if (!last.empty()) {
-            f->close_section();
-          }
-          f->open_array_section(n.c_str());
-        }
-        last = n;
-        encode_json("notifications", notif.second, f);
-      }
-      if (!last.empty()) {
-        f->close_section();
-      }
-    }
     encode_json("start_with_full_sync", start_with_full_sync, f);
   }
 
@@ -261,77 +178,14 @@ struct PSConfig {
     data_bucket_prefix = config["data_bucket_prefix"]("pubsub-");
     data_oid_prefix = config["data_oid_prefix"];
     events_retention_days = config["events_retention_days"](PUBSUB_EVENTS_RETENTION_DEFAULT);
-
-    for (auto& c : config["notifications"].array()) {
-      PSNotificationConfig nc;
-      nc.id = ++max_id;
-      nc.init(cct, c);
-      notifications.insert(std::make_pair(nc.path, nc));
-
-      PSTopicConfig topic_config = { .name = nc.topic };
-      topics[nc.topic] = make_shared<PSTopicConfig>(topic_config);
-    }
-    for (auto& c : config["subscriptions"].array()) {
-      auto sc = std::make_shared<PSSubConfig>();
-      sc->init(cct, c, data_bucket_prefix, data_oid_prefix);
-      subs[sc->name] = sc;
-      auto iter = topics.find(sc->topic);
-      if (iter != topics.end()) {
-        iter->second->subs.insert(sc->name);
-      }
-    }
     start_with_full_sync = config["start_with_full_sync"](false);
 
-    ldout(cct, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl;
+    ldout(cct, 20) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl;
   }
 
   void init_instance(const RGWRealm& realm, uint64_t instance_id) {
     sync_instance = instance_id;
   }
-
-  void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, TopicsRef *result) {
-    const std::string path = bucket.name + "/" + key.name;
-
-    auto iter = notifications.upper_bound(path);
-    if (iter == notifications.begin()) {
-      return;
-    }
-
-    do {
-      --iter;
-      if (iter->first.size() > path.size()) {
-        break;
-      }
-      if (path.compare(0, iter->first.size(), iter->first) != 0) {
-        break;
-      }
-
-      PSNotificationConfig& target = iter->second;
-
-      if (!target.is_prefix &&
-          path.size() != iter->first.size()) {
-        continue;
-      }
-
-      auto topic = topics.find(target.topic);
-      if (topic == topics.end()) {
-        continue;
-      }
-
-      ldout(cct, 20) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id << 
-          " target_path=" << target.path << ", topic=" << target.topic << dendl;
-      (*result)->push_back(topic->second);
-    } while (iter != notifications.begin());
-  }
-
-  bool find_sub(const string& name, PSSubConfigRef *ref) {
-    auto iter = subs.find(name);
-    if (iter != subs.end()) {
-      *ref = iter->second;
-      return true;
-    }
-    return false;
-  }
 };
 
 using PSConfigRef = std::shared_ptr<PSConfig>;
@@ -920,13 +774,9 @@ class PSManager
     int operate() override {
       reenter(this) {
         if (owner.empty()) {
-          if (!conf->find_sub(sub_name, &sub_conf)) {
-            ldout(sync_env->cct, 10) << "failed to find subscription config: name=" << sub_name << dendl;
+          ldout(sync_env->cct, 1) << "ERROR: missing user info when getting subscription: " << sub_name << dendl;
             mgr->remove_get_sub(owner, sub_name);
-            return set_cr_error(-ENOENT);
-          }
-
-          *ref = PSSubscription::get_shared(sc, mgr->env, sub_conf);
+            return set_cr_error(-EINVAL);
         } else {
           using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_sub_config>;
           yield {
@@ -948,14 +798,11 @@ class PSManager
 
         yield (*ref)->call_init_cr(this);
         if (retcode < 0) {
-          ldout(sync_env->cct, 10) << "failed to init subscription" << dendl;
+          ldout(sync_env->cct, 1) << "ERROR: failed to init subscription when getting subscription: " << sub_name << dendl;
           mgr->remove_get_sub(owner, sub_name);
           return set_cr_error(retcode);
         }
 
-        if (owner.empty()) {
-          mgr->subs[sub_name] = *ref;
-        }
         mgr->remove_get_sub(owner, sub_name);
 
         return set_cr_done();
@@ -1164,7 +1011,6 @@ public:
         (*topics)->push_back(tc);
       }
 
-      env->conf->get_topics(sync_env->cct, bucket, key, topics);
       return set_cr_done();
     }
     return 0;
@@ -1174,19 +1020,16 @@ public:
 class RGWPSHandleObjEventCR : public RGWCoroutine {
   RGWDataSyncCtx* const sc;
   const PSEnvRef env;
-  const rgw_user& owner;
+  const rgw_user owner;
   const EventRef<rgw_pubsub_event> event;
   const EventRef<rgw_pubsub_s3_record> record;
   const TopicsRef topics;
-  const std::array<rgw_user, 2> owners;
   bool has_subscriptions;
   bool event_handled;
   bool sub_conf_found;
   PSSubscriptionRef sub;
-  std::array<rgw_user, 2>::const_iterator oiter;
   std::vector<PSTopicConfigRef>::const_iterator titer;
   std::set<std::string>::const_iterator siter;
-  int last_sub_conf_error;
 
 public:
   RGWPSHandleObjEventCR(RGWDataSyncCtx* const _sc,
@@ -1201,7 +1044,6 @@ public:
                                           event(_event),
                                           record(_record),
                                           topics(_topics),
-                                          owners({owner, rgw_user{}}),
                                           has_subscriptions(false),
                                           event_handled(false) {}
 
@@ -1226,79 +1068,67 @@ public:
         for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
           ldout(sc->cct, 20) << ": subscription: " << *siter << dendl;
           has_subscriptions = true;
-          sub_conf_found = false;
-          // try to read subscription configuration from global/user cond
-          // configuration is considered missing only if does not exist in either
-          for (oiter = owners.begin(); oiter != owners.end(); ++oiter) {
-            yield PSManager::call_get_subscription_cr(sc, env->manager, this, *oiter, *siter, &sub);
+          // try to read subscription configuration
+          yield PSManager::call_get_subscription_cr(sc, env->manager, this, owner, *siter, &sub);
+          if (retcode < 0) {
+            if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf);
+            ldout(sc->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter 
+              << " ret=" << retcode << dendl;
+            if (retcode == -ENOENT) {
+              // missing subscription info should be reflected back as invalid argument
+              // and not as missing object
+              retcode = -EINVAL;
+            }
+            // try the next subscription
+            continue;
+          }
+          if (sub->sub_conf->s3_id.empty()) {
+            // subscription was not made by S3 compatible API
+            ldout(sc->cct, 20) << "storing event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
+            yield call(PSSubscription::store_event_cr(sc, sub, event));
             if (retcode < 0) {
-              if (sub_conf_found) {
-                // not a real issue, sub conf already found
-                retcode = 0;
-              }
-              last_sub_conf_error = retcode;
-              continue;
+              if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
+              ldout(sc->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
+            } else {
+              if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
+              event_handled = true;
             }
-            sub_conf_found = true;
-            if (sub->sub_conf->s3_id.empty()) {
-              // subscription was not made by S3 compatible API
-              ldout(sc->cct, 20) << "storing event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
-              yield call(PSSubscription::store_event_cr(sc, sub, event));
+            if (sub->sub_conf->push_endpoint) {
+              ldout(sc->cct, 20) << "push event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
+              yield call(PSSubscription::push_event_cr(sc, sub, event));
               if (retcode < 0) {
-                if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
-                ldout(sc->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
+                if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+                ldout(sc->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl;
               } else {
-                if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
+                if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
                 event_handled = true;
               }
-              if (sub->sub_conf->push_endpoint) {
-                ldout(sc->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
-                yield call(PSSubscription::push_event_cr(sc, sub, event));
-                if (retcode < 0) {
-                  if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
-                  ldout(sc->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl;
-                } else {
-                  if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
-                  event_handled = true;
-                }
-              } 
+            } 
+          } else {
+            // subscription was made by S3 compatible API
+            ldout(sc->cct, 20) << "storing record for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
+            record->configurationId = sub->sub_conf->s3_id;
+            record->opaque_data = (*titer)->opaque_data;
+            yield call(PSSubscription::store_event_cr(sc, sub, record));
+            if (retcode < 0) {
+              if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
+              ldout(sc->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl;
             } else {
-              // subscription was made by S3 compatible API
-              ldout(sc->cct, 20) << "storing record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
-              record->configurationId = sub->sub_conf->s3_id;
-              record->opaque_data = (*titer)->opaque_data;
-              yield call(PSSubscription::store_event_cr(sc, sub, record));
+              if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
+              event_handled = true;
+            }
+            if (sub->sub_conf->push_endpoint) {
+                ldout(sc->cct, 20) << "push record for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
+              yield call(PSSubscription::push_event_cr(sc, sub, record));
               if (retcode < 0) {
-                if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
-                ldout(sc->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl;
+                if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+                ldout(sc->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl;
               } else {
-                if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
+                if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
                 event_handled = true;
               }
-              if (sub->sub_conf->push_endpoint) {
-                  ldout(sc->cct, 20) << "push record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
-                yield call(PSSubscription::push_event_cr(sc, sub, record));
-                if (retcode < 0) {
-                  if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
-                  ldout(sc->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl;
-                } else {
-                  if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
-                  event_handled = true;
-                }
-              }
             }
           }
-          if (!sub_conf_found) {
-            // could not find conf for subscription at user or global levels
-            if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf);
-            ldout(sc->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter 
-              << " ret=" << last_sub_conf_error << dendl;
-              if (retcode == -ENOENT) {
-                // missing subscription info should be reflected back as invalid argument
-                // and not as missing object
-                retcode =  -EINVAL;
-              }
-          }
         }
       }
       if (has_subscriptions && !event_handled) {
index 27c01c4ddaa7cb8b781fb7b55be03333b46d2dca..220d5da7b0c11cc5b13bae4f7ce9b549abd06c1a 100644 (file)
     mfa remove                 delete MFA TOTP token
     mfa check                  check MFA TOTP token
     mfa resync                 re-sync MFA TOTP token
+    topic list                 list bucket notifications/pubsub topics
+    topic get                  get a bucket notifications/pubsub topic
+    topic rm                   remove a bucket notifications/pubsub topic
+    subscription get           get a pubsub subscription definition
+    subscription rm            remove a pubsub subscription
+    subscription pull          show events in a pubsub subscription
+    subscription ack           ack (remove) an events in a pubsub subscription
   options:
      --tenant=<tenant>         tenant name
      --uid=<id>                user id
      --totp-window             the number of TOTP tokens that are checked before and after the current token when validating token
      --totp-pin                the valid value of a TOTP token at a certain time
   
+  Bucket notifications/pubsub options:
+     --topic                   bucket notifications/pubsub topic name
+     --subscription            pubsub subscription name
+     --event-id                event id in a pubsub subscription
+  
     --conf/-c FILE    read configuration from the given configuration file
     --id ID           set ID portion of my name
     --name/-n TYPE.ID set name
index 9f2dc27ce1e4db3c76943d7f6c486e08c1cdd605..8b5fc1d41c2d97bb0d92518e3a003a3086317442 100644 (file)
@@ -177,6 +177,7 @@ class AMQPReceiver(object):
                 remaining_retries -= 1
                 print('failed to connect to rabbitmq (remaining retries '
                     + str(remaining_retries) + '): ' + str(error))
+                time.sleep(1)
 
         if remaining_retries == 0:
             raise Exception('failed to connect to rabbitmq - no retries left')
@@ -2221,6 +2222,113 @@ def test_ps_subscription():
     topic_conf.del_config()
     master_zone.delete_bucket(bucket_name)
 
+
+def test_ps_admin():
+    """ test radosgw-admin commands """
+    master_zone, ps_zone = init_env()
+    bucket_name = gen_bucket_name()
+    topic_name = bucket_name+TOPIC_SUFFIX
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+
+    # create topic
+    topic_conf = PSTopic(ps_zone.conn, topic_name)
+    topic_conf.set_config()
+    result, status = topic_conf.get_config()
+    assert_equal(status, 200)
+    parsed_result = json.loads(result)
+    assert_equal(parsed_result['topic']['name'], topic_name)
+    result, status = ps_zone.zone.cluster.admin(['topic', 'list', '--uid', get_user()] + ps_zone.zone.zone_arg())
+    assert_equal(status, 0)
+    parsed_result = json.loads(result)
+    assert len(parsed_result['topics']) > 0
+    result, status = ps_zone.zone.cluster.admin(['topic', 'get', '--uid', get_user(), '--topic', topic_name] + ps_zone.zone.zone_arg())
+    assert_equal(status, 0)
+    parsed_result = json.loads(result)
+    assert_equal(parsed_result['topic']['name'], topic_name)
+
+    # create s3 topics
+    endpoint_address = 'amqp://127.0.0.1:7001/vhost_1'
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+    topic_conf_s3 = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    topic_conf_s3.set_config()
+    result, status = topic_conf_s3.get_config()
+    assert_equal(status, 200)
+    assert_equal(result['GetTopicResponse']['GetTopicResult']['Topic']['Name'], topic_name)
+    result, status = master_zone.zone.cluster.admin(['topic', 'list', '--uid', get_user()] + master_zone.zone.zone_arg())
+    assert_equal(status, 0)
+    parsed_result = json.loads(result)
+    assert len(parsed_result['topics']) > 0
+    result, status = master_zone.zone.cluster.admin(['topic', 'get', '--uid', get_user(), '--topic', topic_name] + master_zone.zone.zone_arg())
+    assert_equal(status, 0)
+    parsed_result = json.loads(result)
+    assert_equal(parsed_result['topic']['name'], topic_name)
+
+    # create bucket on the first of the rados zones
+    bucket = master_zone.create_bucket(bucket_name)
+    # wait for sync
+    zone_meta_checkpoint(ps_zone.zone)
+    # create notifications
+    notification_conf = PSNotification(ps_zone.conn, bucket_name,
+                                       topic_name)
+    _, status = notification_conf.set_config()
+    assert_equal(status/100, 2)
+    # create subscription
+    sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
+                              topic_name)
+    _, status = sub_conf.set_config()
+    assert_equal(status/100, 2)
+    result, status = ps_zone.zone.cluster.admin(['subscription', 'get', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX] 
+            + ps_zone.zone.zone_arg())
+    assert_equal(status, 0)
+    parsed_result = json.loads(result)
+    assert_equal(parsed_result['name'], bucket_name+SUB_SUFFIX)
+    # create objects in the bucket
+    number_of_objects = 110
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i))
+        key.set_contents_from_string('bar')
+    # wait for sync
+    # get events from subscription
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
+    result, status = ps_zone.zone.cluster.admin(['subscription', 'pull', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX] 
+            + ps_zone.zone.zone_arg())
+    assert_equal(status, 0)
+    parsed_result = json.loads(result)
+    assert_equal(len(parsed_result['events']), 100)
+    marker = parsed_result['next_marker']
+    result, status = ps_zone.zone.cluster.admin(['subscription', 'pull', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX, '--marker', marker]
+            + ps_zone.zone.zone_arg())
+    assert_equal(status, 0)
+    parsed_result = json.loads(result)
+    assert_equal(len(parsed_result['events']), 10)
+    event_id = parsed_result['events'][0]['id']
+
+    # ack an event in the subscription 
+    result, status = ps_zone.zone.cluster.admin(['subscription', 'ack', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX, '--event-id', event_id]
+            + ps_zone.zone.zone_arg())
+    assert_equal(status, 0)
+
+    # remove the subscription
+    result, status = ps_zone.zone.cluster.admin(['subscription', 'rm', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX]
+            + ps_zone.zone.zone_arg())
+    assert_equal(status, 0)
+
+    # remove the topics
+    result, status = ps_zone.zone.cluster.admin(['topic', 'rm', '--uid', get_user(), '--topic', topic_name]
+            + ps_zone.zone.zone_arg())
+    assert_equal(status, 0)
+    result, status = master_zone.zone.cluster.admin(['topic', 'rm', '--uid', get_user(), '--topic', topic_name]
+            + master_zone.zone.zone_arg())
+    assert_equal(status, 0)
+
+    # cleanup
+    for key in bucket.list():
+        key.delete()
+    notification_conf.del_config()
+    master_zone.delete_bucket(bucket_name)
+
+
 def test_ps_incremental_sync():
     """ test that events are only sent on incremental sync """
     master_zone, ps_zone = init_env()