From: Yehuda Sadeh Date: Tue, 3 Jul 2018 03:31:34 +0000 (-0700) Subject: rgw-admin: pubsub related new commands X-Git-Tag: v14.1.0~616^2~52 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=cd374803736d6db9d3cbfd2239d315c8a75cca2c;p=ceph.git rgw-admin: pubsub related new commands Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 3a681636ebf4..9286bee96b29 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -52,6 +52,7 @@ extern "C" { #include "rgw_reshard.h" #include "rgw_http_client_curl.h" #include "rgw_zone.h" +#include "rgw_pubsub.h" #include "services/svc_sync_modules.h" @@ -530,7 +531,13 @@ enum { OPT_MFA_CHECK, OPT_MFA_RESYNC, OPT_RESHARD_STALE_INSTANCES_LIST, - OPT_RESHARD_STALE_INSTANCES_DELETE + OPT_RESHARD_STALE_INSTANCES_DELETE, + OPT_PUBSUB_TOPICS_LIST, + OPT_PUBSUB_TOPIC_CREATE, + OPT_PUBSUB_TOPIC_RM, + OPT_PUBSUB_SUB_GET, + OPT_PUBSUB_SUB_CREATE, + OPT_PUBSUB_SUB_RM, }; static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_cmd, bool *need_more) @@ -562,13 +569,17 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_ strcmp(cmd, "placement") == 0 || strcmp(cmd, "pool") == 0 || strcmp(cmd, "pools") == 0 || + strcmp(cmd, "pubsub") == 0 || strcmp(cmd, "quota") == 0 || strcmp(cmd, "realm") == 0 || strcmp(cmd, "role") == 0 || strcmp(cmd, "role-policy") == 0 || strcmp(cmd, "stale-instances") == 0 || + strcmp(cmd, "sub") == 0 || strcmp(cmd, "subuser") == 0 || strcmp(cmd, "sync") == 0 || + strcmp(cmd, "topic") == 0 || + strcmp(cmd, "topics") == 0 || strcmp(cmd, "usage") == 0 || strcmp(cmd, "user") == 0 || strcmp(cmd, "zone") == 0 || @@ -992,6 +1003,23 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_ return OPT_RESHARD_STALE_INSTANCES_LIST; if (match_str(cmd, "rm", "delete")) return OPT_RESHARD_STALE_INSTANCES_DELETE; + } else if (prev_prev_cmd && strcmp(prev_prev_cmd, "pubsub") == 0) { + if (strcmp(prev_cmd, "topics") == 0) { + if (strcmp(cmd, "list") == 0) + return OPT_PUBSUB_TOPICS_LIST; + } else if (strcmp(prev_cmd, "topic") == 0) { + if (strcmp(cmd, "create") == 0) + return OPT_PUBSUB_TOPIC_CREATE; + if (strcmp(cmd, "rm") == 0) + return OPT_PUBSUB_TOPIC_RM; + } else if (strcmp(prev_cmd, "sub") == 0) { + if (strcmp(cmd, "get") == 0) + return OPT_PUBSUB_SUB_GET; + if (strcmp(cmd, "create") == 0) + return OPT_PUBSUB_SUB_CREATE; + if (strcmp(cmd, "rm") == 0) + return OPT_PUBSUB_SUB_RM; + } } return -EINVAL; } @@ -2638,6 +2666,14 @@ static int trim_sync_error_log(int shard_id, const ceph::real_time& start_time, // unreachable } +JSONFormattable& get_tier_config(RGWRados *store) { + return store->get_zone_params().tier_config; +} + +const string& get_tier_type(RGWRados *store) { + return store->get_zone().tier_type; +} + int main(int argc, const char **argv) { vector args; @@ -2805,6 +2841,12 @@ int main(int argc, const char **argv) int totp_window = 0; int trim_delay_ms = 0; + string topic_name; + string sub_name; + string sub_oid_prefix; + string sub_dest_bucket; + string sub_push_endpoint; + for (std::vector::iterator i = args.begin(); i != args.end(); ) { if (ceph_argparse_double_dash(args, i)) { break; @@ -3125,6 +3167,16 @@ int main(int argc, const char **argv) totp_window = atoi(val.c_str()); } else if (ceph_argparse_witharg(args, i, &val, "--trim-delay-ms", (char*)NULL)) { trim_delay_ms = atoi(val.c_str()); + } else if (ceph_argparse_witharg(args, i, &val, "--topic", (char*)NULL)) { + topic_name = val; + } else if (ceph_argparse_witharg(args, i, &val, "--sub-name", (char*)NULL)) { + sub_name = val; + } else if (ceph_argparse_witharg(args, i, &val, "--sub-oid-prefix", (char*)NULL)) { + sub_oid_prefix = val; + } else if (ceph_argparse_witharg(args, i, &val, "--sub-dest-bucket", (char*)NULL)) { + sub_dest_bucket = val; + } else if (ceph_argparse_witharg(args, i, &val, "--sub-push-endpoint", (char*)NULL)) { + sub_push_endpoint = val; } else if (strncmp(*i, "-", 1) == 0) { cerr << "ERROR: invalid flag " << *i << std::endl; return EINVAL; @@ -7701,5 +7753,220 @@ next: } } - return 0; + if (opt_cmd == OPT_PUBSUB_TOPICS_LIST) { + if (get_tier_type(store) != "pubsub") { + cerr << "ERROR: only pubsub tier type supports this command" << std::endl; + return EINVAL; + } + if (user_id.empty()) { + cerr << "ERROR: user id was not provided (via --uid)" << std::endl; + return EINVAL; + } + RGWUserInfo& user_info = user_op.get_user_info(); + + RGWUserPubSub ups(store, user_info.user_id); + + rgw_bucket bucket; + + rgw_pubsub_user_topics result; + + if (!bucket_name.empty()) { + RGWBucketInfo bucket_info; + int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket); + if (ret < 0) { + cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + ret = ups.get_bucket_topics(bucket_info.bucket, &result); + if (ret < 0) { + cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + } else { + int ret = ups.get_topics(&result); + if (ret < 0) { + cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + } + encode_json("result", result, formatter); + formatter->flush(cout); + } + + if (opt_cmd == OPT_PUBSUB_TOPIC_CREATE) { + if (get_tier_type(store) != "pubsub") { + cerr << "ERROR: only pubsub tier type supports this command" << std::endl; + return EINVAL; + } + if (topic_name.empty()) { + cerr << "ERROR: topic name was not provided (via --topic)" << std::endl; + return EINVAL; + } + if (user_id.empty()) { + cerr << "ERROR: user id was not provided (via --uid)" << std::endl; + return EINVAL; + } + if (bucket_name.empty()) { + cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl; + return EINVAL; + } + RGWUserInfo& user_info = user_op.get_user_info(); + RGWUserPubSub ups(store, user_info.user_id); + + rgw_bucket bucket; + + RGWBucketInfo bucket_info; + int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket); + if (ret < 0) { + cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + ret = ups.create_topic(topic_name, bucket_info.bucket); + if (ret < 0) { + cerr << "ERROR: could not create topic: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + } + + if (opt_cmd == OPT_PUBSUB_TOPIC_RM) { + if (get_tier_type(store) != "pubsub") { + cerr << "ERROR: only pubsub tier type supports this command" << std::endl; + return EINVAL; + } + if (topic_name.empty()) { + cerr << "ERROR: topic name was not provided (via --topic)" << std::endl; + return EINVAL; + } + if (user_id.empty()) { + cerr << "ERROR: user id was not provided (via --uid)" << std::endl; + return EINVAL; + } + if (bucket_name.empty()) { + cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl; + return EINVAL; + } + RGWUserInfo& user_info = user_op.get_user_info(); + RGWUserPubSub ups(store, user_info.user_id); + + rgw_bucket bucket; + + RGWBucketInfo bucket_info; + int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket); + if (ret < 0) { + cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + ret = ups.remove_topic(topic_name); + if (ret < 0) { + cerr << "ERROR: could not remove topic: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + } + + if (opt_cmd == OPT_PUBSUB_SUB_GET) { + if (get_tier_type(store) != "pubsub") { + cerr << "ERROR: only pubsub tier type supports this command" << std::endl; + return EINVAL; + } + if (user_id.empty()) { + cerr << "ERROR: user id was not provided (via --uid)" << std::endl; + return EINVAL; + } + if (sub_name.empty()) { + cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl; + return EINVAL; + } + RGWUserInfo& user_info = user_op.get_user_info(); + RGWUserPubSub ups(store, user_info.user_id); + + rgw_pubsub_user_sub_config sub; + + ret = ups.get_sub(sub_name, &sub); + if (ret < 0) { + cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + encode_json("sub", sub, formatter); + formatter->flush(cout); + } + + if (opt_cmd == OPT_PUBSUB_SUB_CREATE) { + if (get_tier_type(store) != "pubsub") { + cerr << "ERROR: only pubsub tier type supports this command" << std::endl; + return EINVAL; + } + if (user_id.empty()) { + cerr << "ERROR: user id was not provided (via --uid)" << std::endl; + return EINVAL; + } + if (sub_name.empty()) { + cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl; + return EINVAL; + } + if (topic_name.empty()) { + cerr << "ERROR: topic name was not provided (via --topic)" << std::endl; + return EINVAL; + } + RGWUserInfo& user_info = user_op.get_user_info(); + RGWUserPubSub ups(store, user_info.user_id); + + rgw_pubsub_user_topic_info topic; + int ret = ups.get_topic(topic_name, &topic); + if (ret < 0) { + cerr << "ERROR: topic not found" << std::endl; + return EINVAL; + } + rgw_pubsub_user_sub_config sub; + + auto& tier_config = get_tier_config(store); + + rgw_pubsub_user_sub_dest dest_config; + dest_config.bucket_name = sub_dest_bucket; + dest_config.oid_prefix = sub_oid_prefix; + dest_config.push_endpoint = sub_push_endpoint; + + if (dest_config.bucket_name.empty()) { + dest_config.bucket_name = string(tier_config["data_bucket_prefix"]) + topic.user.to_str() + "-" + topic.topic.name; + } + if (dest_config.oid_prefix.empty()) { + dest_config.oid_prefix = tier_config["data_oid_prefix"]; + } + ret = ups.add_sub(sub_name, topic_name, dest_config); + if (ret < 0) { + cerr << "ERROR: could not store subscription info: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + } + + if (opt_cmd == OPT_PUBSUB_SUB_RM) { + if (get_tier_type(store) != "pubsub") { + cerr << "ERROR: only pubsub tier type supports this command" << std::endl; + return EINVAL; + } + if (user_id.empty()) { + cerr << "ERROR: user id was not provided (via --uid)" << std::endl; + return EINVAL; + } + if (sub_name.empty()) { + cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl; + return EINVAL; + } + RGWUserInfo& user_info = user_op.get_user_info(); + RGWUserPubSub ups(store, user_info.user_id); + + rgw_pubsub_user_sub_config sub; + + ret = ups.get_sub(sub_name, &sub); + if (ret < 0) { + cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + encode_json("sub", sub, formatter); + formatter->flush(cout); + } + + return 0; } diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index e147516483d3..16ef1b33ec72 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -61,6 +61,7 @@ void rgw_pubsub_user_topic_info::dump(Formatter *f) const { encode_json("user", user, f); encode_json("topic", topic, f); + encode_json("subs", subs, f); } void rgw_pubsub_user_topics::dump(Formatter *f) const @@ -86,98 +87,6 @@ void rgw_pubsub_user_sub_config::dump(Formatter *f) const encode_json("dest", dest, f); } -static string pubsub_user_oid_prefix = "pubsub.user."; - - -class RGWUserPubSub -{ - RGWRados *store; - rgw_user user; - RGWObjectCtx obj_ctx; - - template - int read(const rgw_raw_obj& obj, T *data, RGWObjVersionTracker *objv_tracker); - - template - int write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *obj_tracker); - - int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker); -public: - RGWUserPubSub(RGWRados *_store, const rgw_user& _user) : store(_store), - user(_user), - obj_ctx(store) {} - - string user_meta_oid() const { - return pubsub_user_oid_prefix + user.to_str(); - } - - string bucket_meta_oid(const rgw_bucket& bucket) const { - return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id; - } - - string sub_meta_oid(const string& name) const { - return pubsub_user_oid_prefix + user.to_str() + ".sub." + name; - } - - void get_user_meta_obj(rgw_raw_obj *obj) const { - *obj = rgw_raw_obj(store->get_zone_params().log_pool, user_meta_oid()); - } - - void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const { - *obj = rgw_raw_obj(store->get_zone_params().log_pool, bucket_meta_oid(bucket)); - } - - void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const { - *obj = rgw_raw_obj(store->get_zone_params().log_pool, sub_meta_oid(name)); - } - - int get_topics(rgw_pubsub_user_topics *result); - int get_bucket_topics(const rgw_bucket& bucket, rgw_pubsub_user_topics *result); - int create_topic(const string& name, const rgw_bucket& bucket); - int remove_topic(const string& name); - int add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest); - int remove_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest); -}; - -template -int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker *objv_tracker) -{ - bufferlist bl; - int ret = rgw_get_system_obj(store, obj_ctx, - obj.pool, obj.oid, - bl, - objv_tracker, - nullptr, nullptr, nullptr); - if (ret < 0) { - return ret; - } - - auto iter = bl.cbegin(); - try { - decode(*result, iter); - } catch (buffer::error& err) { - ldout(store->ctx(), 0) << "ERROR: failed to decode info, caught buffer::error" << dendl; - return -EIO; - } - - return 0; -} - -template -int RGWUserPubSub::write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *objv_tracker) -{ - bufferlist bl; - encode(info, bl); - - int ret = rgw_put_system_obj(store, obj.pool, obj.oid, - bl, false, objv_tracker, - real_time()); - if (ret < 0) { - return ret; - } - - return 0; -} int RGWUserPubSub::remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker) { @@ -217,6 +126,30 @@ int RGWUserPubSub::get_bucket_topics(const rgw_bucket& bucket, rgw_pubsub_user_t return 0; } +int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_user_topic_info *result) +{ + rgw_raw_obj obj; + get_user_meta_obj(&obj); + + RGWObjVersionTracker objv_tracker; + rgw_pubsub_user_topics topics; + + int ret = read(obj, &topics, &objv_tracker); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl; + return ret; + } + + auto iter = topics.topics.find(name); + if (iter == topics.topics.end()) { + ldout(store->ctx(), 0) << "ERROR: cannot add subscription to topic: topic not found" << dendl; + return -ENOENT; + } + + *result = iter->second; + return 0; +} + int RGWUserPubSub::create_topic(const string& name, const rgw_bucket& bucket) { @@ -311,6 +244,18 @@ int RGWUserPubSub::remove_topic(const string& name) return 0; } +int RGWUserPubSub::get_sub(const string& name, rgw_pubsub_user_sub_config *result) +{ + rgw_raw_obj obj; + get_sub_meta_obj(name, &obj); + int ret = read(obj, result, nullptr); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to read subscription info: ret=" << ret << dendl; + return ret; + } + return 0; +} + int RGWUserPubSub::add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest) { rgw_raw_obj obj; @@ -325,9 +270,9 @@ int RGWUserPubSub::add_sub(const string& name, const string& topic, const rgw_pu return ret; } - auto iter = topics.topics.find(name); + auto iter = topics.topics.find(topic); if (iter == topics.topics.end()) { - ldout(store->ctx(), 20) << "ERROR: cannot add subscription to topic: topic not found" << dendl; + ldout(store->ctx(), 0) << "ERROR: cannot add subscription to topic: topic not found" << dendl; return -ENOENT; } @@ -350,7 +295,7 @@ int RGWUserPubSub::add_sub(const string& name, const string& topic, const rgw_pu rgw_raw_obj bobj; get_sub_meta_obj(name, &bobj); - ret = write(obj, sub_conf, nullptr); + ret = write(bobj, sub_conf, nullptr); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl; return ret; @@ -358,8 +303,24 @@ int RGWUserPubSub::add_sub(const string& name, const string& topic, const rgw_pu return 0; } -int RGWUserPubSub::remove_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest) +int RGWUserPubSub::remove_sub(const string& name, const string& _topic, const rgw_pubsub_user_sub_dest& dest) { + string topic = _topic; + + RGWObjVersionTracker sobjv_tracker; + rgw_raw_obj sobj; + get_sub_meta_obj(name, &sobj); + + if (topic.empty()) { + rgw_pubsub_user_sub_config sub_conf; + int ret = read(sobj, &sub_conf, &sobjv_tracker); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to read subscription info: ret=" << ret << dendl; + return ret; + } + topic = sub_conf.topic; + } + rgw_raw_obj obj; get_user_meta_obj(&obj); @@ -372,7 +333,7 @@ int RGWUserPubSub::remove_sub(const string& name, const string& topic, const rgw } if (ret >= 0) { - auto iter = topics.topics.find(name); + auto iter = topics.topics.find(topic); if (iter == topics.topics.end()) { ldout(store->ctx(), 20) << "ERROR: cannot add subscription to topic: topic not found" << dendl; } else { @@ -388,9 +349,7 @@ int RGWUserPubSub::remove_sub(const string& name, const string& topic, const rgw } } - rgw_raw_obj sobj; - get_sub_meta_obj(name, &sobj); - ret = remove(obj, nullptr); + ret = remove(sobj, &sobjv_tracker); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl; return ret; diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index 2239ab2f9c6c..00f15ce3fb64 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -2,6 +2,7 @@ #define CEPH_RGW_PUBSUB_H #include "rgw_common.h" +#include "rgw_tools.h" enum RGWPubSubEventType { @@ -183,4 +184,97 @@ struct rgw_pubsub_user_topics { }; WRITE_CLASS_ENCODER(rgw_pubsub_user_topics) +static string pubsub_user_oid_prefix = "pubsub.user."; + +class RGWUserPubSub +{ + RGWRados *store; + rgw_user user; + RGWObjectCtx obj_ctx; + + template + int read(const rgw_raw_obj& obj, T *data, RGWObjVersionTracker *objv_tracker); + + template + int write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *obj_tracker); + + int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker); +public: + RGWUserPubSub(RGWRados *_store, const rgw_user& _user) : store(_store), + user(_user), + obj_ctx(store) {} + + string user_meta_oid() const { + return pubsub_user_oid_prefix + user.to_str(); + } + + string bucket_meta_oid(const rgw_bucket& bucket) const { + return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id; + } + + string sub_meta_oid(const string& name) const { + return pubsub_user_oid_prefix + user.to_str() + ".sub." + name; + } + + void get_user_meta_obj(rgw_raw_obj *obj) const { + *obj = rgw_raw_obj(store->get_zone_params().log_pool, user_meta_oid()); + } + + void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const { + *obj = rgw_raw_obj(store->get_zone_params().log_pool, bucket_meta_oid(bucket)); + } + + void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const { + *obj = rgw_raw_obj(store->get_zone_params().log_pool, sub_meta_oid(name)); + } + + int get_topics(rgw_pubsub_user_topics *result); + int get_bucket_topics(const rgw_bucket& bucket, rgw_pubsub_user_topics *result); + int get_topic(const string& name, rgw_pubsub_user_topic_info *result); + int create_topic(const string& name, const rgw_bucket& bucket); + int remove_topic(const string& name); + int get_sub(const string& name, rgw_pubsub_user_sub_config *result); + int add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest); + int remove_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest); +}; + +template +int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker *objv_tracker) +{ + bufferlist bl; + int ret = rgw_get_system_obj(store, obj_ctx, + obj.pool, obj.oid, + bl, + objv_tracker, + nullptr, nullptr, nullptr); + if (ret < 0) { + return ret; + } + + auto iter = bl.cbegin(); + try { + decode(*result, iter); + } catch (buffer::error& err) { + return -EIO; + } + + return 0; +} + +template +int RGWUserPubSub::write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *objv_tracker) +{ + bufferlist bl; + encode(info, bl); + + int ret = rgw_put_system_obj(store, obj.pool, obj.oid, + bl, false, objv_tracker, + real_time()); + if (ret < 0) { + return ret; + } + + return 0; +} + #endif diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index 1768fb23c333..6a19d5f7e1d7 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -55,6 +55,7 @@ config: "tenant": , # default: "uid": , # default: "pubsub" "data_bucket_prefix": # default: "pubsub" + "data_oid_prefix": # # non-dynamic config "notifications": [ @@ -96,13 +97,14 @@ struct PSSubConfig { /* subscription config */ } void init(CephContext *cct, const JSONFormattable& config, - const string& data_bucket_prefix) { + const string& data_bucket_prefix, + const string& default_oid_prefix) { name = config["name"]; topic = config["topic"]; push_endpoint = config["push_endpoint"]; string default_bucket_name = data_bucket_prefix + name; data_bucket_name = config["data_bucket"](default_bucket_name.c_str()); - data_oid_prefix = config["data_oid_prefix"]; + data_oid_prefix = config["data_oid_prefix"](default_oid_prefix.c_str()); } }; @@ -154,26 +156,29 @@ static string json_str(const char *name, const T& obj, bool pretty = false) return ss.str(); } -using TopicsRef = std::shared_ptr>; +using PSTopicConfigRef = std::shared_ptr; +using TopicsRef = std::shared_ptr>; struct PSConfig { string id{"pubsub"}; rgw_user user; string data_bucket_prefix; + string data_oid_prefix; uint64_t sync_instance{0}; uint64_t max_id{0}; /* FIXME: no hard coded buckets, we'll have configurable topics */ map subs; - map topics; + map topics; multimap notifications; void dump(Formatter *f) const { encode_json("id", id, f); encode_json("user", user, f); encode_json("data_bucket_prefix", data_bucket_prefix, f); + encode_json("data_oid_prefix", data_bucket_prefix, f); encode_json("sync_instance", sync_instance, f); encode_json("max_id", max_id, f); { @@ -185,7 +190,7 @@ struct PSConfig { { Formatter::ArraySection section(*f, "topics"); for (auto& topic : topics) { - encode_json("topic", topic.second, f); + encode_json("topic", *topic.second, f); } } { @@ -212,6 +217,7 @@ struct PSConfig { string uid = config["uid"]("pubsub"); user = rgw_user(config["tenant"], uid); data_bucket_prefix = config["data_bucket_prefix"]("pubsub"); + data_oid_prefix = config["data_oid_prefix"]; /* FIXME: this will be dynamically configured */ for (auto& c : config["notifications"].array()) { @@ -221,13 +227,16 @@ struct PSConfig { notifications.insert(std::make_pair(nc.path, nc)); PSTopicConfig topic_config = { .name = nc.topic }; - topics[nc.topic] = topic_config; + topics[nc.topic] = make_shared(topic_config); } for (auto& c : config["subscriptions"].array()) { auto sc = std::make_shared(); - sc->init(cct, c, data_bucket_prefix); + sc->init(cct, c, data_bucket_prefix, data_oid_prefix); subs[sc->name] = sc; - topics[sc->topic].subs.insert(sc->name); + auto iter = topics.find(sc->topic); + if (iter != topics.end()) { + iter->second->subs.insert(sc->name); + } } ldout(cct, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl; @@ -240,8 +249,6 @@ struct PSConfig { void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, TopicsRef *result) { string path = bucket.name + "/" + key.name; - (*result)->clear(); - auto iter = notifications.upper_bound(path); if (iter == notifications.begin()) { return; @@ -269,7 +276,7 @@ struct PSConfig { } ldout(cct, 10) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id << " target_path=" << target.path << ", topic=" << target.topic << dendl; - (*result)->push_back(&topic->second); + (*result)->push_back(topic->second); } while (iter != notifications.begin()); } @@ -756,26 +763,56 @@ public: class RGWPSFindBucketTopicsCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; PSEnvRef env; + rgw_user owner; rgw_bucket bucket; rgw_obj_key key; + RGWUserPubSub ups; + + rgw_raw_obj obj; + rgw_pubsub_user_topics bucket_topics; TopicsRef *topics; public: RGWPSFindBucketTopicsCR(RGWDataSyncEnv *_sync_env, PSEnvRef& _env, - const rgw_bucket& _bucket, + const RGWBucketInfo& _bucket_info, const rgw_obj_key& _key, TopicsRef *_topics) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), env(_env), - bucket(_bucket), + owner(_bucket_info.owner), + bucket(_bucket_info.bucket), key(_key), + ups(_sync_env->store, owner), topics(_topics) { - *topics = std::make_shared >(); + *topics = std::make_shared >(); } int operate() override { reenter(this) { -#warning this will need to change + ups.get_bucket_meta_obj(bucket, &obj); + + + using ReadInfoCR = RGWSimpleRadosReadCR; + yield { + bool empty_on_enoent = true; + call(new ReadInfoCR(sync_env->async_rados, sync_env->store, + obj, + &bucket_topics, empty_on_enoent)); + } + if (retcode < 0 && retcode != -ENOENT) { + return set_cr_error(retcode); + } + + ldout(sync_env->cct, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl; + + for (auto& titer : bucket_topics.topics) { + auto& info = titer.second; + shared_ptr tc = std::make_shared(); + tc->name = info.topic.name; + tc->subs = info.subs; + (*topics)->push_back(tc); + } + env->conf->get_topics(sync_env->cct, bucket, key, topics); return set_cr_done(); } @@ -788,7 +825,7 @@ class RGWPSHandleObjEvent : public RGWCoroutine { PSEnvRef env; EventRef event; - vector::iterator titer; + vector::iterator titer; set::iterator siter; PSSubscriptionRef sub; TopicsRef topics; @@ -932,7 +969,7 @@ public: int operate() override { reenter(this) { - yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.bucket, key, &topics)); + yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info, key, &topics)); if (retcode < 0) { ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; return set_cr_error(retcode); @@ -973,7 +1010,7 @@ public: reenter(this) { ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl; - yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.bucket, key, &topics)); + yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info, key, &topics)); if (retcode < 0) { ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; return set_cr_error(retcode);