From: Yuval Lifshitz Date: Mon, 1 Jun 2020 05:11:32 +0000 (+0300) Subject: rgw/pubsub: remove deprecated radogw-admin pubsub creation commands X-Git-Tag: v16.1.0~1945^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=4c486c32c6dc21d8650d8ec2e446950d82631be5;p=ceph.git rgw/pubsub: remove deprecated radogw-admin pubsub creation commands make topic and subscription read commands an official feature Fixes: https://tracker.ceph.com/issues/43536 Signed-off-by: Yuval Lifshitz --- diff --git a/doc/man/8/radosgw-admin.rst b/doc/man/8/radosgw-admin.rst index a80f95d2d43..6fa2aba3ece 100644 --- a/doc/man/8/radosgw-admin.rst +++ b/doc/man/8/radosgw-admin.rst @@ -457,6 +457,28 @@ which are as follows: :command:`reshard cancel` Cancel resharding a bucket +:command:`topic list` + List bucket notifications/pubsub topics + +:command:`topic get` + Get a bucket notifications/pubsub topic + +:command:`topic rm` + Remove a bucket notifications/pubsub topic + +:command:`subscription get` + Get a pubsub subscription definition + +:command:`subscription rm` + Remove a pubsub subscription + +:command:`subscription pull` + Show events in a pubsub subscription + +:command:`subscription ack` + Ack (remove) an events in a pubsub subscription + + Options ======= @@ -898,6 +920,22 @@ Role Options The path prefix for filtering the roles. + +Bucket Notifications/PubSub Options +=================================== +.. option:: --topic + + The bucket notifications/pubsub topic name. + +.. option:: --subscription + + The pubsub subscription name. + +.. option:: --event-id + + The event id in a pubsub subscription. + + Examples ======== diff --git a/doc/radosgw/notifications.rst b/doc/radosgw/notifications.rst index 3762e89075e..264c4b3a127 100644 --- a/doc/radosgw/notifications.rst +++ b/doc/radosgw/notifications.rst @@ -29,6 +29,31 @@ mechanism. This API is similar to the one defined as the S3-compatible API of th S3 Bucket Notification Compatibility + +Topic Management via CLI +------------------------ + +Configuration of all topics of a user could be fetched using the following command: + +:: + + # radosgw-admin topic list --uid={user-id} + + +Configuration of a specific topic could be fetched using: + +:: + + # radosgw-admin topic get --uid={user-id} --topic={topic-name} + + +And removed using: + +:: + + # radosgw-admin topic rm --uid={user-id} --topic={topic-name} + + Notification Performance Stats ------------------------------ The same counters are shared between the pubsub sync module and the bucket notification mechanism. diff --git a/doc/radosgw/pubsub-module.rst b/doc/radosgw/pubsub-module.rst index fd3b9f021e6..52a542baca6 100644 --- a/doc/radosgw/pubsub-module.rst +++ b/doc/radosgw/pubsub-module.rst @@ -112,6 +112,58 @@ the ``val`` specifies its new value. For example, setting the pubsub control use A configuration field can be removed by using ``--tier-config-rm={key}``. + +Topic and Subscription Management via CLI +----------------------------------------- + +Configuration of all topics of a user could be fetched using the following command: + +:: + + # radosgw-admin topic list --uid={user-id} + + +Configuration of a specific topic could be fetched using: + +:: + + # radosgw-admin topic get --uid={user-id} --topic={topic-name} + + +And removed using: + +:: + + # radosgw-admin topic rm --uid={user-id} --topic={topic-name} + + +Configuration of a subscription could be fetched using: + +:: + + # radosgw-admin subscription get --uid={user-id} --subscription={topic-name} + +And removed using: + +:: + + # radosgw-admin subscription rm --uid={user-id} --subscription={topic-name} + + +To fetch all of the events stored in a subcription, use: + +:: + + # radosgw-admin subscription pull --uid={user-id} --subscription={topic-name} [--marker={last-marker}] + + +To ack (and remove) an event from a subscription, use: + +:: + + # radosgw-admin subscription ack --uid={user-id} --subscription={topic-name} --event-id={event-id} + + PubSub Performance Stats ------------------------- Same counters are shared between the pubsub sync module and the notification mechanism. diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 961ad816be4..2bdc9726c87 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -271,6 +271,13 @@ void usage() cout << " mfa remove delete MFA TOTP token\n"; cout << " mfa check check MFA TOTP token\n"; cout << " mfa resync re-sync MFA TOTP token\n"; + cout << " topic list list bucket notifications/pubsub topics\n"; + cout << " topic get get a bucket notifications/pubsub topic\n"; + cout << " topic rm remove a bucket notifications/pubsub topic\n"; + cout << " subscription get get a pubsub subscription definition\n"; + cout << " subscription rm remove a pubsub subscription\n"; + cout << " subscription pull show events in a pubsub subscription\n"; + cout << " subscription ack ack (remove) an events in a pubsub subscription\n"; cout << "options:\n"; cout << " --tenant= tenant name\n"; cout << " --uid= user id\n"; @@ -415,6 +422,10 @@ void usage() cout << " --totp-seconds the time resolution that is being used for TOTP generation\n"; cout << " --totp-window the number of TOTP tokens that are checked before and after the current token when validating token\n"; cout << " --totp-pin the valid value of a TOTP token at a certain time\n"; + cout << "\nBucket notifications/pubsub options:\n"; + cout << " --topic bucket notifications/pubsub topic name\n"; + cout << " --subscription pubsub subscription name\n"; + cout << " --event-id event id in a pubsub subscription\n"; cout << "\n"; generic_client_usage(); } @@ -738,13 +749,10 @@ enum class OPT { RESHARD_STALE_INSTANCES_LIST, RESHARD_STALE_INSTANCES_DELETE, PUBSUB_TOPICS_LIST, - PUBSUB_TOPIC_CREATE, + // TODO add "subscription list" command PUBSUB_TOPIC_GET, PUBSUB_TOPIC_RM, - PUBSUB_NOTIFICATION_CREATE, - PUBSUB_NOTIFICATION_RM, PUBSUB_SUB_GET, - PUBSUB_SUB_CREATE, PUBSUB_SUB_RM, PUBSUB_SUB_PULL, PUBSUB_EVENT_RM, @@ -953,17 +961,13 @@ static SimpleCmd::Commands all_cmds = { { "reshard stale list", OPT::RESHARD_STALE_INSTANCES_LIST }, { "reshard stale-instances delete", OPT::RESHARD_STALE_INSTANCES_DELETE }, { "reshard stale delete", OPT::RESHARD_STALE_INSTANCES_DELETE }, - { "pubsub topics list", OPT::PUBSUB_TOPICS_LIST }, - { "pubsub topic create", OPT::PUBSUB_TOPIC_CREATE }, - { "pubsub topic get", OPT::PUBSUB_TOPIC_GET }, - { "pubsub topic rm", OPT::PUBSUB_TOPIC_RM }, - { "pubsub notification create", OPT::PUBSUB_NOTIFICATION_CREATE }, - { "pubsub notification rm", OPT::PUBSUB_NOTIFICATION_RM }, - { "pubsub sub get", OPT::PUBSUB_SUB_GET }, - { "pubsub sub create", OPT::PUBSUB_SUB_CREATE }, - { "pubsub sub rm", OPT::PUBSUB_SUB_RM }, - { "pubsub sub pull", OPT::PUBSUB_SUB_PULL }, - { "pubsub event rm", OPT::PUBSUB_EVENT_RM }, + { "topic list", OPT::PUBSUB_TOPICS_LIST }, + { "topic get", OPT::PUBSUB_TOPIC_GET }, + { "topic rm", OPT::PUBSUB_TOPIC_RM }, + { "subscription get", OPT::PUBSUB_SUB_GET }, + { "subscription rm", OPT::PUBSUB_SUB_RM }, + { "subscription pull", OPT::PUBSUB_SUB_PULL }, + { "subscription ack", OPT::PUBSUB_EVENT_RM }, }; static SimpleCmd::Aliases cmd_aliases = { @@ -3203,9 +3207,6 @@ int main(int argc, const char **argv) string topic_name; string sub_name; - string sub_oid_prefix; - string sub_dest_bucket; - string sub_push_endpoint; string event_id; std::optional opt_group_id; @@ -3245,8 +3246,6 @@ int main(int argc, const char **argv) std::optional opt_mode; std::optional opt_dest_owner; - rgw::notify::EventTypeList event_types; - SimpleCmd cmd(all_cmds, cmd_aliases); for (std::vector::iterator i = args.begin(); i != args.end(); ) { @@ -3599,18 +3598,10 @@ int main(int argc, const char **argv) trim_delay_ms = atoi(val.c_str()); } else if (ceph_argparse_witharg(args, i, &val, "--topic", (char*)NULL)) { topic_name = val; - } else if (ceph_argparse_witharg(args, i, &val, "--sub-name", (char*)NULL)) { + } else if (ceph_argparse_witharg(args, i, &val, "--subscription", (char*)NULL)) { sub_name = val; - } else if (ceph_argparse_witharg(args, i, &val, "--sub-oid-prefix", (char*)NULL)) { - sub_oid_prefix = val; - } else if (ceph_argparse_witharg(args, i, &val, "--sub-dest-bucket", (char*)NULL)) { - sub_dest_bucket = val; - } else if (ceph_argparse_witharg(args, i, &val, "--sub-push-endpoint", (char*)NULL)) { - sub_push_endpoint = val; } else if (ceph_argparse_witharg(args, i, &val, "--event-id", (char*)NULL)) { event_id = val; - } else if (ceph_argparse_witharg(args, i, &val, "--event-type", "--event-types", (char*)NULL)) { - rgw::notify::from_string_list(val, event_types); } else if (ceph_argparse_witharg(args, i, &val, "--group-id", (char*)NULL)) { opt_group_id = val; } else if (ceph_argparse_witharg(args, i, &val, "--status", (char*)NULL)) { @@ -3874,6 +3865,10 @@ int main(int argc, const char **argv) OPT::ROLE_POLICY_GET, OPT::RESHARD_LIST, OPT::RESHARD_STATUS, + OPT::PUBSUB_TOPICS_LIST, + OPT::PUBSUB_TOPIC_GET, + OPT::PUBSUB_SUB_GET, + OPT::PUBSUB_SUB_PULL, }; @@ -9012,10 +9007,6 @@ next: } if (opt_cmd == OPT::PUBSUB_TOPICS_LIST) { - if (get_tier_type(store) != "pubsub") { - cerr << "ERROR: only pubsub tier type supports this command" << std::endl; - return EINVAL; - } if (user_id.empty()) { cerr << "ERROR: user id was not provided (via --uid)" << std::endl; return EINVAL; @@ -9054,34 +9045,7 @@ next: formatter->flush(cout); } - if (opt_cmd == OPT::PUBSUB_TOPIC_CREATE) { - if (get_tier_type(store) != "pubsub") { - cerr << "ERROR: only pubsub tier type supports this command" << std::endl; - return EINVAL; - } - if (topic_name.empty()) { - cerr << "ERROR: topic name was not provided (via --topic)" << std::endl; - return EINVAL; - } - if (user_id.empty()) { - cerr << "ERROR: user id was not provided (via --uid)" << std::endl; - return EINVAL; - } - RGWUserInfo& user_info = user_op.get_user_info(); - RGWUserPubSub ups(store, user_info.user_id); - - ret = ups.create_topic(topic_name); - if (ret < 0) { - cerr << "ERROR: could not create topic: " << cpp_strerror(-ret) << std::endl; - return -ret; - } - } - if (opt_cmd == OPT::PUBSUB_TOPIC_GET) { - if (get_tier_type(store) != "pubsub") { - cerr << "ERROR: only pubsub tier type supports this command" << std::endl; - return EINVAL; - } if (topic_name.empty()) { cerr << "ERROR: topic name was not provided (via --topic)" << std::endl; return EINVAL; @@ -9096,92 +9060,14 @@ next: rgw_pubsub_topic_subs topic; ret = ups.get_topic(topic_name, &topic); if (ret < 0) { - cerr << "ERROR: could not create topic: " << cpp_strerror(-ret) << std::endl; + cerr << "ERROR: could not get topic: " << cpp_strerror(-ret) << std::endl; return -ret; } encode_json("topic", topic, formatter); formatter->flush(cout); } - if (opt_cmd == OPT::PUBSUB_NOTIFICATION_CREATE) { - if (get_tier_type(store) != "pubsub") { - cerr << "ERROR: only pubsub tier type supports this command" << std::endl; - return EINVAL; - } - if (topic_name.empty()) { - cerr << "ERROR: topic name was not provided (via --topic)" << std::endl; - return EINVAL; - } - if (user_id.empty()) { - cerr << "ERROR: user id was not provided (via --uid)" << std::endl; - return EINVAL; - } - if (bucket_name.empty()) { - cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl; - return EINVAL; - } - RGWUserInfo& user_info = user_op.get_user_info(); - RGWUserPubSub ups(store, user_info.user_id); - - rgw_bucket bucket; - - RGWBucketInfo bucket_info; - int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket); - if (ret < 0) { - cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; - return -ret; - } - - auto b = ups.get_bucket(bucket_info.bucket); - ret = b->create_notification(topic_name, event_types); - if (ret < 0) { - cerr << "ERROR: could not publish bucket: " << cpp_strerror(-ret) << std::endl; - return -ret; - } - } - - if (opt_cmd == OPT::PUBSUB_NOTIFICATION_RM) { - if (get_tier_type(store) != "pubsub") { - cerr << "ERROR: only pubsub tier type supports this command" << std::endl; - return EINVAL; - } - if (topic_name.empty()) { - cerr << "ERROR: topic name was not provided (via --topic)" << std::endl; - return EINVAL; - } - if (user_id.empty()) { - cerr << "ERROR: user id was not provided (via --uid)" << std::endl; - return EINVAL; - } - if (bucket_name.empty()) { - cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl; - return EINVAL; - } - RGWUserInfo& user_info = user_op.get_user_info(); - RGWUserPubSub ups(store, user_info.user_id); - - rgw_bucket bucket; - - RGWBucketInfo bucket_info; - int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket); - if (ret < 0) { - cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; - return -ret; - } - - auto b = ups.get_bucket(bucket_info.bucket); - ret = b->remove_notification(topic_name); - if (ret < 0) { - cerr << "ERROR: could not publish bucket: " << cpp_strerror(-ret) << std::endl; - return -ret; - } - } - if (opt_cmd == OPT::PUBSUB_TOPIC_RM) { - if (get_tier_type(store) != "pubsub") { - cerr << "ERROR: only pubsub tier type supports this command" << std::endl; - return EINVAL; - } if (topic_name.empty()) { cerr << "ERROR: topic name was not provided (via --topic)" << std::endl; return EINVAL; @@ -9210,7 +9096,7 @@ next: return EINVAL; } if (sub_name.empty()) { - cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl; + cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl; return EINVAL; } RGWUserInfo& user_info = user_op.get_user_info(); @@ -9228,55 +9114,6 @@ next: formatter->flush(cout); } - if (opt_cmd == OPT::PUBSUB_SUB_CREATE) { - if (get_tier_type(store) != "pubsub") { - cerr << "ERROR: only pubsub tier type supports this command" << std::endl; - return EINVAL; - } - if (user_id.empty()) { - cerr << "ERROR: user id was not provided (via --uid)" << std::endl; - return EINVAL; - } - if (sub_name.empty()) { - cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl; - return EINVAL; - } - if (topic_name.empty()) { - cerr << "ERROR: topic name was not provided (via --topic)" << std::endl; - return EINVAL; - } - RGWUserInfo& user_info = user_op.get_user_info(); - RGWUserPubSub ups(store, user_info.user_id); - - rgw_pubsub_topic_subs topic; - int ret = ups.get_topic(topic_name, &topic); - if (ret < 0) { - cerr << "ERROR: topic not found" << std::endl; - return EINVAL; - } - - rgw_pubsub_sub_dest dest_config; - dest_config.bucket_name = sub_dest_bucket; - dest_config.oid_prefix = sub_oid_prefix; - dest_config.push_endpoint = sub_push_endpoint; - - auto psmodule = static_cast(store->getRados()->get_sync_module().get()); - auto conf = psmodule->get_effective_conf(); - - if (dest_config.bucket_name.empty()) { - dest_config.bucket_name = string(conf["data_bucket_prefix"]) + user_info.user_id.to_str() + "-" + topic.topic.name; - } - if (dest_config.oid_prefix.empty()) { - dest_config.oid_prefix = conf["data_oid_prefix"]; - } - auto sub = ups.get_sub(sub_name); - ret = sub->subscribe(topic_name, dest_config); - if (ret < 0) { - cerr << "ERROR: could not store subscription info: " << cpp_strerror(-ret) << std::endl; - return -ret; - } - } - if (opt_cmd == OPT::PUBSUB_SUB_RM) { if (get_tier_type(store) != "pubsub") { cerr << "ERROR: only pubsub tier type supports this command" << std::endl; @@ -9287,7 +9124,7 @@ next: return EINVAL; } if (sub_name.empty()) { - cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl; + cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl; return EINVAL; } RGWUserInfo& user_info = user_op.get_user_info(); @@ -9311,7 +9148,7 @@ next: return EINVAL; } if (sub_name.empty()) { - cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl; + cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl; return EINVAL; } RGWUserInfo& user_info = user_op.get_user_info(); @@ -9320,7 +9157,7 @@ next: if (!max_entries_specified) { max_entries = RGWUserPubSub::Sub::DEFAULT_MAX_EVENTS; } - auto sub = ups.get_sub(sub_name); + auto sub = ups.get_sub_with_events(sub_name); ret = sub->list_events(marker, max_entries); if (ret < 0) { cerr << "ERROR: could not list events: " << cpp_strerror(-ret) << std::endl; @@ -9340,7 +9177,7 @@ next: return EINVAL; } if (sub_name.empty()) { - cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl; + cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl; return EINVAL; } if (event_id.empty()) { @@ -9350,7 +9187,7 @@ next: RGWUserInfo& user_info = user_op.get_user_info(); RGWUserPubSub ups(store, user_info.user_id); - auto sub = ups.get_sub(sub_name); + auto sub = ups.get_sub_with_events(sub_name); ret = sub->remove_event(event_id); if (ret < 0) { cerr << "ERROR: could not remove event: " << cpp_strerror(-ret) << std::endl; diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index d78bb29c76c..75e08300386 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -43,28 +43,6 @@ config: "data_oid_prefix": # "events_retention_days": # default: 7 "start_with_full_sync" # default: false - - # non-dynamic config - "notifications": [ - { - "path": , # this can be either an explicit path: , or /, - # or a prefix if it ends with a wildcard - "topic": - }, - ... - ], - "subscriptions": [ - { - "name": , - "topic": , - "push_endpoint": , - "push_endpoint_args:" . # any push endpoint specific args (include all args) - "data_bucket": , # override name of bucket where subscription data will be store - "data_oid_prefix": # set prefix for subscription data object ids - "s3_id": # in case of S3 compatible notifications, the notification ID will be set here - }, - ... - ] } */ @@ -119,28 +97,6 @@ struct PSSubConfig { encode_json("s3_id", s3_id, f); } - void init(CephContext *cct, const JSONFormattable& config, - const string& data_bucket_prefix, - const string& default_oid_prefix) { - name = config["name"]; - topic = config["topic"]; - push_endpoint_name = config["push_endpoint"]; - string default_bucket_name = data_bucket_prefix + name; - data_bucket_name = config["data_bucket"](default_bucket_name.c_str()); - data_oid_prefix = config["data_oid_prefix"](default_oid_prefix.c_str()); - s3_id = config["s3_id"]; - arn_topic = config["arn_topic"]; - if (!push_endpoint_name.empty()) { - push_endpoint_args = config["push_endpoint_args"]; - try { - push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct); - ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl; - } catch (const RGWPubSubEndpoint::configuration_error& e) { - ldout(cct, 1) << "ERROR: failed to create push endpoint: " - << push_endpoint_name << " due to: " << e.what() << dendl; - } - } - } }; using PSSubConfigRef = std::shared_ptr; @@ -196,22 +152,14 @@ static string json_str(const char *name, const T& obj, bool pretty = false) using PSTopicConfigRef = std::shared_ptr; using TopicsRef = std::shared_ptr>; +// global pubsub configuration struct PSConfig { const std::string id{"pubsub"}; rgw_user user; std::string data_bucket_prefix; std::string data_oid_prefix; - int events_retention_days{0}; - uint64_t sync_instance{0}; - uint64_t max_id{0}; - - /* FIXME: no hard coded buckets, we'll have configurable topics */ - std::map subs; - std::map topics; - std::multimap notifications; - bool start_with_full_sync{false}; void dump(Formatter *f) const { @@ -221,37 +169,6 @@ struct PSConfig { encode_json("data_oid_prefix", data_oid_prefix, f); encode_json("events_retention_days", events_retention_days, f); encode_json("sync_instance", sync_instance, f); - encode_json("max_id", max_id, f); - { - Formatter::ArraySection section(*f, "subs"); - for (auto& sub : subs) { - encode_json("sub", *sub.second, f); - } - } - { - Formatter::ArraySection section(*f, "topics"); - for (auto& topic : topics) { - encode_json("topic", *topic.second, f); - } - } - { - Formatter::ObjectSection section(*f, "notifications"); - std::string last; - for (auto& notif : notifications) { - const string& n = notif.first; - if (n != last) { - if (!last.empty()) { - f->close_section(); - } - f->open_array_section(n.c_str()); - } - last = n; - encode_json("notifications", notif.second, f); - } - if (!last.empty()) { - f->close_section(); - } - } encode_json("start_with_full_sync", start_with_full_sync, f); } @@ -261,77 +178,14 @@ struct PSConfig { data_bucket_prefix = config["data_bucket_prefix"]("pubsub-"); data_oid_prefix = config["data_oid_prefix"]; events_retention_days = config["events_retention_days"](PUBSUB_EVENTS_RETENTION_DEFAULT); - - for (auto& c : config["notifications"].array()) { - PSNotificationConfig nc; - nc.id = ++max_id; - nc.init(cct, c); - notifications.insert(std::make_pair(nc.path, nc)); - - PSTopicConfig topic_config = { .name = nc.topic }; - topics[nc.topic] = make_shared(topic_config); - } - for (auto& c : config["subscriptions"].array()) { - auto sc = std::make_shared(); - sc->init(cct, c, data_bucket_prefix, data_oid_prefix); - subs[sc->name] = sc; - auto iter = topics.find(sc->topic); - if (iter != topics.end()) { - iter->second->subs.insert(sc->name); - } - } start_with_full_sync = config["start_with_full_sync"](false); - ldout(cct, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl; + ldout(cct, 20) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl; } void init_instance(const RGWRealm& realm, uint64_t instance_id) { sync_instance = instance_id; } - - void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, TopicsRef *result) { - const std::string path = bucket.name + "/" + key.name; - - auto iter = notifications.upper_bound(path); - if (iter == notifications.begin()) { - return; - } - - do { - --iter; - if (iter->first.size() > path.size()) { - break; - } - if (path.compare(0, iter->first.size(), iter->first) != 0) { - break; - } - - PSNotificationConfig& target = iter->second; - - if (!target.is_prefix && - path.size() != iter->first.size()) { - continue; - } - - auto topic = topics.find(target.topic); - if (topic == topics.end()) { - continue; - } - - ldout(cct, 20) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id << - " target_path=" << target.path << ", topic=" << target.topic << dendl; - (*result)->push_back(topic->second); - } while (iter != notifications.begin()); - } - - bool find_sub(const string& name, PSSubConfigRef *ref) { - auto iter = subs.find(name); - if (iter != subs.end()) { - *ref = iter->second; - return true; - } - return false; - } }; using PSConfigRef = std::shared_ptr; @@ -920,13 +774,9 @@ class PSManager int operate() override { reenter(this) { if (owner.empty()) { - if (!conf->find_sub(sub_name, &sub_conf)) { - ldout(sync_env->cct, 10) << "failed to find subscription config: name=" << sub_name << dendl; + ldout(sync_env->cct, 1) << "ERROR: missing user info when getting subscription: " << sub_name << dendl; mgr->remove_get_sub(owner, sub_name); - return set_cr_error(-ENOENT); - } - - *ref = PSSubscription::get_shared(sc, mgr->env, sub_conf); + return set_cr_error(-EINVAL); } else { using ReadInfoCR = RGWSimpleRadosReadCR; yield { @@ -948,14 +798,11 @@ class PSManager yield (*ref)->call_init_cr(this); if (retcode < 0) { - ldout(sync_env->cct, 10) << "failed to init subscription" << dendl; + ldout(sync_env->cct, 1) << "ERROR: failed to init subscription when getting subscription: " << sub_name << dendl; mgr->remove_get_sub(owner, sub_name); return set_cr_error(retcode); } - if (owner.empty()) { - mgr->subs[sub_name] = *ref; - } mgr->remove_get_sub(owner, sub_name); return set_cr_done(); @@ -1164,7 +1011,6 @@ public: (*topics)->push_back(tc); } - env->conf->get_topics(sync_env->cct, bucket, key, topics); return set_cr_done(); } return 0; @@ -1174,19 +1020,16 @@ public: class RGWPSHandleObjEventCR : public RGWCoroutine { RGWDataSyncCtx* const sc; const PSEnvRef env; - const rgw_user& owner; + const rgw_user owner; const EventRef event; const EventRef record; const TopicsRef topics; - const std::array owners; bool has_subscriptions; bool event_handled; bool sub_conf_found; PSSubscriptionRef sub; - std::array::const_iterator oiter; std::vector::const_iterator titer; std::set::const_iterator siter; - int last_sub_conf_error; public: RGWPSHandleObjEventCR(RGWDataSyncCtx* const _sc, @@ -1201,7 +1044,6 @@ public: event(_event), record(_record), topics(_topics), - owners({owner, rgw_user{}}), has_subscriptions(false), event_handled(false) {} @@ -1226,79 +1068,67 @@ public: for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) { ldout(sc->cct, 20) << ": subscription: " << *siter << dendl; has_subscriptions = true; - sub_conf_found = false; - // try to read subscription configuration from global/user cond - // configuration is considered missing only if does not exist in either - for (oiter = owners.begin(); oiter != owners.end(); ++oiter) { - yield PSManager::call_get_subscription_cr(sc, env->manager, this, *oiter, *siter, &sub); + // try to read subscription configuration + yield PSManager::call_get_subscription_cr(sc, env->manager, this, owner, *siter, &sub); + if (retcode < 0) { + if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf); + ldout(sc->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter + << " ret=" << retcode << dendl; + if (retcode == -ENOENT) { + // missing subscription info should be reflected back as invalid argument + // and not as missing object + retcode = -EINVAL; + } + // try the next subscription + continue; + } + if (sub->sub_conf->s3_id.empty()) { + // subscription was not made by S3 compatible API + ldout(sc->cct, 20) << "storing event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl; + yield call(PSSubscription::store_event_cr(sc, sub, event)); if (retcode < 0) { - if (sub_conf_found) { - // not a real issue, sub conf already found - retcode = 0; - } - last_sub_conf_error = retcode; - continue; + if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail); + ldout(sc->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl; + } else { + if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok); + event_handled = true; } - sub_conf_found = true; - if (sub->sub_conf->s3_id.empty()) { - // subscription was not made by S3 compatible API - ldout(sc->cct, 20) << "storing event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; - yield call(PSSubscription::store_event_cr(sc, sub, event)); + if (sub->sub_conf->push_endpoint) { + ldout(sc->cct, 20) << "push event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl; + yield call(PSSubscription::push_event_cr(sc, sub, event)); if (retcode < 0) { - if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail); - ldout(sc->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl; + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); + ldout(sc->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl; } else { - if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok); + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); event_handled = true; } - if (sub->sub_conf->push_endpoint) { - ldout(sc->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; - yield call(PSSubscription::push_event_cr(sc, sub, event)); - if (retcode < 0) { - if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); - ldout(sc->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl; - } else { - if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); - event_handled = true; - } - } + } + } else { + // subscription was made by S3 compatible API + ldout(sc->cct, 20) << "storing record for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl; + record->configurationId = sub->sub_conf->s3_id; + record->opaque_data = (*titer)->opaque_data; + yield call(PSSubscription::store_event_cr(sc, sub, record)); + if (retcode < 0) { + if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail); + ldout(sc->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl; } else { - // subscription was made by S3 compatible API - ldout(sc->cct, 20) << "storing record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; - record->configurationId = sub->sub_conf->s3_id; - record->opaque_data = (*titer)->opaque_data; - yield call(PSSubscription::store_event_cr(sc, sub, record)); + if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok); + event_handled = true; + } + if (sub->sub_conf->push_endpoint) { + ldout(sc->cct, 20) << "push record for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl; + yield call(PSSubscription::push_event_cr(sc, sub, record)); if (retcode < 0) { - if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail); - ldout(sc->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl; + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); + ldout(sc->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl; } else { - if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok); + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); event_handled = true; } - if (sub->sub_conf->push_endpoint) { - ldout(sc->cct, 20) << "push record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; - yield call(PSSubscription::push_event_cr(sc, sub, record)); - if (retcode < 0) { - if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); - ldout(sc->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl; - } else { - if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); - event_handled = true; - } - } } } - if (!sub_conf_found) { - // could not find conf for subscription at user or global levels - if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf); - ldout(sc->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter - << " ret=" << last_sub_conf_error << dendl; - if (retcode == -ENOENT) { - // missing subscription info should be reflected back as invalid argument - // and not as missing object - retcode = -EINVAL; - } - } } } if (has_subscriptions && !event_handled) { diff --git a/src/test/cli/radosgw-admin/help.t b/src/test/cli/radosgw-admin/help.t index 27c01c4ddaa..220d5da7b0c 100644 --- a/src/test/cli/radosgw-admin/help.t +++ b/src/test/cli/radosgw-admin/help.t @@ -166,6 +166,13 @@ mfa remove delete MFA TOTP token mfa check check MFA TOTP token mfa resync re-sync MFA TOTP token + topic list list bucket notifications/pubsub topics + topic get get a bucket notifications/pubsub topic + topic rm remove a bucket notifications/pubsub topic + subscription get get a pubsub subscription definition + subscription rm remove a pubsub subscription + subscription pull show events in a pubsub subscription + subscription ack ack (remove) an events in a pubsub subscription options: --tenant= tenant name --uid= user id @@ -316,6 +323,11 @@ --totp-window the number of TOTP tokens that are checked before and after the current token when validating token --totp-pin the valid value of a TOTP token at a certain time + Bucket notifications/pubsub options: + --topic bucket notifications/pubsub topic name + --subscription pubsub subscription name + --event-id event id in a pubsub subscription + --conf/-c FILE read configuration from the given configuration file --id ID set ID portion of my name --name/-n TYPE.ID set name diff --git a/src/test/rgw/rgw_multi/tests_ps.py b/src/test/rgw/rgw_multi/tests_ps.py index 9f2dc27ce1e..8b5fc1d41c2 100644 --- a/src/test/rgw/rgw_multi/tests_ps.py +++ b/src/test/rgw/rgw_multi/tests_ps.py @@ -177,6 +177,7 @@ class AMQPReceiver(object): remaining_retries -= 1 print('failed to connect to rabbitmq (remaining retries ' + str(remaining_retries) + '): ' + str(error)) + time.sleep(1) if remaining_retries == 0: raise Exception('failed to connect to rabbitmq - no retries left') @@ -2221,6 +2222,113 @@ def test_ps_subscription(): topic_conf.del_config() master_zone.delete_bucket(bucket_name) + +def test_ps_admin(): + """ test radosgw-admin commands """ + master_zone, ps_zone = init_env() + bucket_name = gen_bucket_name() + topic_name = bucket_name+TOPIC_SUFFIX + realm = get_realm() + zonegroup = realm.master_zonegroup() + + # create topic + topic_conf = PSTopic(ps_zone.conn, topic_name) + topic_conf.set_config() + result, status = topic_conf.get_config() + assert_equal(status, 200) + parsed_result = json.loads(result) + assert_equal(parsed_result['topic']['name'], topic_name) + result, status = ps_zone.zone.cluster.admin(['topic', 'list', '--uid', get_user()] + ps_zone.zone.zone_arg()) + assert_equal(status, 0) + parsed_result = json.loads(result) + assert len(parsed_result['topics']) > 0 + result, status = ps_zone.zone.cluster.admin(['topic', 'get', '--uid', get_user(), '--topic', topic_name] + ps_zone.zone.zone_arg()) + assert_equal(status, 0) + parsed_result = json.loads(result) + assert_equal(parsed_result['topic']['name'], topic_name) + + # create s3 topics + endpoint_address = 'amqp://127.0.0.1:7001/vhost_1' + endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' + topic_conf_s3 = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) + topic_conf_s3.set_config() + result, status = topic_conf_s3.get_config() + assert_equal(status, 200) + assert_equal(result['GetTopicResponse']['GetTopicResult']['Topic']['Name'], topic_name) + result, status = master_zone.zone.cluster.admin(['topic', 'list', '--uid', get_user()] + master_zone.zone.zone_arg()) + assert_equal(status, 0) + parsed_result = json.loads(result) + assert len(parsed_result['topics']) > 0 + result, status = master_zone.zone.cluster.admin(['topic', 'get', '--uid', get_user(), '--topic', topic_name] + master_zone.zone.zone_arg()) + assert_equal(status, 0) + parsed_result = json.loads(result) + assert_equal(parsed_result['topic']['name'], topic_name) + + # create bucket on the first of the rados zones + bucket = master_zone.create_bucket(bucket_name) + # wait for sync + zone_meta_checkpoint(ps_zone.zone) + # create notifications + notification_conf = PSNotification(ps_zone.conn, bucket_name, + topic_name) + _, status = notification_conf.set_config() + assert_equal(status/100, 2) + # create subscription + sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX, + topic_name) + _, status = sub_conf.set_config() + assert_equal(status/100, 2) + result, status = ps_zone.zone.cluster.admin(['subscription', 'get', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX] + + ps_zone.zone.zone_arg()) + assert_equal(status, 0) + parsed_result = json.loads(result) + assert_equal(parsed_result['name'], bucket_name+SUB_SUFFIX) + # create objects in the bucket + number_of_objects = 110 + for i in range(number_of_objects): + key = bucket.new_key(str(i)) + key.set_contents_from_string('bar') + # wait for sync + # get events from subscription + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) + result, status = ps_zone.zone.cluster.admin(['subscription', 'pull', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX] + + ps_zone.zone.zone_arg()) + assert_equal(status, 0) + parsed_result = json.loads(result) + assert_equal(len(parsed_result['events']), 100) + marker = parsed_result['next_marker'] + result, status = ps_zone.zone.cluster.admin(['subscription', 'pull', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX, '--marker', marker] + + ps_zone.zone.zone_arg()) + assert_equal(status, 0) + parsed_result = json.loads(result) + assert_equal(len(parsed_result['events']), 10) + event_id = parsed_result['events'][0]['id'] + + # ack an event in the subscription + result, status = ps_zone.zone.cluster.admin(['subscription', 'ack', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX, '--event-id', event_id] + + ps_zone.zone.zone_arg()) + assert_equal(status, 0) + + # remove the subscription + result, status = ps_zone.zone.cluster.admin(['subscription', 'rm', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX] + + ps_zone.zone.zone_arg()) + assert_equal(status, 0) + + # remove the topics + result, status = ps_zone.zone.cluster.admin(['topic', 'rm', '--uid', get_user(), '--topic', topic_name] + + ps_zone.zone.zone_arg()) + assert_equal(status, 0) + result, status = master_zone.zone.cluster.admin(['topic', 'rm', '--uid', get_user(), '--topic', topic_name] + + master_zone.zone.zone_arg()) + assert_equal(status, 0) + + # cleanup + for key in bucket.list(): + key.delete() + notification_conf.del_config() + master_zone.delete_bucket(bucket_name) + + def test_ps_incremental_sync(): """ test that events are only sent on incremental sync """ master_zone, ps_zone = init_env()