#include "rgw_reshard.h"
#include "rgw_http_client_curl.h"
#include "rgw_zone.h"
+#include "rgw_pubsub.h"
#include "services/svc_sync_modules.h"
OPT_MFA_CHECK,
OPT_MFA_RESYNC,
OPT_RESHARD_STALE_INSTANCES_LIST,
- OPT_RESHARD_STALE_INSTANCES_DELETE
+ OPT_RESHARD_STALE_INSTANCES_DELETE,
+ OPT_PUBSUB_TOPICS_LIST,
+ OPT_PUBSUB_TOPIC_CREATE,
+ OPT_PUBSUB_TOPIC_RM,
+ OPT_PUBSUB_SUB_GET,
+ OPT_PUBSUB_SUB_CREATE,
+ OPT_PUBSUB_SUB_RM,
};
static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_cmd, bool *need_more)
strcmp(cmd, "placement") == 0 ||
strcmp(cmd, "pool") == 0 ||
strcmp(cmd, "pools") == 0 ||
+ strcmp(cmd, "pubsub") == 0 ||
strcmp(cmd, "quota") == 0 ||
strcmp(cmd, "realm") == 0 ||
strcmp(cmd, "role") == 0 ||
strcmp(cmd, "role-policy") == 0 ||
strcmp(cmd, "stale-instances") == 0 ||
+ strcmp(cmd, "sub") == 0 ||
strcmp(cmd, "subuser") == 0 ||
strcmp(cmd, "sync") == 0 ||
+ strcmp(cmd, "topic") == 0 ||
+ strcmp(cmd, "topics") == 0 ||
strcmp(cmd, "usage") == 0 ||
strcmp(cmd, "user") == 0 ||
strcmp(cmd, "zone") == 0 ||
return OPT_RESHARD_STALE_INSTANCES_LIST;
if (match_str(cmd, "rm", "delete"))
return OPT_RESHARD_STALE_INSTANCES_DELETE;
+ } else if (prev_prev_cmd && strcmp(prev_prev_cmd, "pubsub") == 0) {
+ if (strcmp(prev_cmd, "topics") == 0) {
+ if (strcmp(cmd, "list") == 0)
+ return OPT_PUBSUB_TOPICS_LIST;
+ } else if (strcmp(prev_cmd, "topic") == 0) {
+ if (strcmp(cmd, "create") == 0)
+ return OPT_PUBSUB_TOPIC_CREATE;
+ if (strcmp(cmd, "rm") == 0)
+ return OPT_PUBSUB_TOPIC_RM;
+ } else if (strcmp(prev_cmd, "sub") == 0) {
+ if (strcmp(cmd, "get") == 0)
+ return OPT_PUBSUB_SUB_GET;
+ if (strcmp(cmd, "create") == 0)
+ return OPT_PUBSUB_SUB_CREATE;
+ if (strcmp(cmd, "rm") == 0)
+ return OPT_PUBSUB_SUB_RM;
+ }
}
return -EINVAL;
}
// unreachable
}
+JSONFormattable& get_tier_config(RGWRados *store) {
+ return store->get_zone_params().tier_config;
+}
+
+const string& get_tier_type(RGWRados *store) {
+ return store->get_zone().tier_type;
+}
+
int main(int argc, const char **argv)
{
vector<const char*> args;
int totp_window = 0;
int trim_delay_ms = 0;
+ string topic_name;
+ string sub_name;
+ string sub_oid_prefix;
+ string sub_dest_bucket;
+ string sub_push_endpoint;
+
for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) {
if (ceph_argparse_double_dash(args, i)) {
break;
totp_window = atoi(val.c_str());
} else if (ceph_argparse_witharg(args, i, &val, "--trim-delay-ms", (char*)NULL)) {
trim_delay_ms = atoi(val.c_str());
+ } else if (ceph_argparse_witharg(args, i, &val, "--topic", (char*)NULL)) {
+ topic_name = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--sub-name", (char*)NULL)) {
+ sub_name = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--sub-oid-prefix", (char*)NULL)) {
+ sub_oid_prefix = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--sub-dest-bucket", (char*)NULL)) {
+ sub_dest_bucket = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--sub-push-endpoint", (char*)NULL)) {
+ sub_push_endpoint = val;
} else if (strncmp(*i, "-", 1) == 0) {
cerr << "ERROR: invalid flag " << *i << std::endl;
return EINVAL;
}
}
- return 0;
+ if (opt_cmd == OPT_PUBSUB_TOPICS_LIST) {
+ if (get_tier_type(store) != "pubsub") {
+ cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
+ return EINVAL;
+ }
+ if (user_id.empty()) {
+ cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
+ return EINVAL;
+ }
+ RGWUserInfo& user_info = user_op.get_user_info();
+
+ RGWUserPubSub ups(store, user_info.user_id);
+
+ rgw_bucket bucket;
+
+ rgw_pubsub_user_topics result;
+
+ if (!bucket_name.empty()) {
+ RGWBucketInfo bucket_info;
+ int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket);
+ if (ret < 0) {
+ cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ ret = ups.get_bucket_topics(bucket_info.bucket, &result);
+ if (ret < 0) {
+ cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ } else {
+ int ret = ups.get_topics(&result);
+ if (ret < 0) {
+ cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ }
+ encode_json("result", result, formatter);
+ formatter->flush(cout);
+ }
+
+ if (opt_cmd == OPT_PUBSUB_TOPIC_CREATE) {
+ if (get_tier_type(store) != "pubsub") {
+ cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
+ return EINVAL;
+ }
+ if (topic_name.empty()) {
+ cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
+ return EINVAL;
+ }
+ if (user_id.empty()) {
+ cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
+ return EINVAL;
+ }
+ if (bucket_name.empty()) {
+ cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;
+ return EINVAL;
+ }
+ RGWUserInfo& user_info = user_op.get_user_info();
+ RGWUserPubSub ups(store, user_info.user_id);
+
+ rgw_bucket bucket;
+
+ RGWBucketInfo bucket_info;
+ int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket);
+ if (ret < 0) {
+ cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ ret = ups.create_topic(topic_name, bucket_info.bucket);
+ if (ret < 0) {
+ cerr << "ERROR: could not create topic: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ }
+
+ if (opt_cmd == OPT_PUBSUB_TOPIC_RM) {
+ if (get_tier_type(store) != "pubsub") {
+ cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
+ return EINVAL;
+ }
+ if (topic_name.empty()) {
+ cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
+ return EINVAL;
+ }
+ if (user_id.empty()) {
+ cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
+ return EINVAL;
+ }
+ if (bucket_name.empty()) {
+ cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;
+ return EINVAL;
+ }
+ RGWUserInfo& user_info = user_op.get_user_info();
+ RGWUserPubSub ups(store, user_info.user_id);
+
+ rgw_bucket bucket;
+
+ RGWBucketInfo bucket_info;
+ int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket);
+ if (ret < 0) {
+ cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ ret = ups.remove_topic(topic_name);
+ if (ret < 0) {
+ cerr << "ERROR: could not remove topic: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ }
+
+ if (opt_cmd == OPT_PUBSUB_SUB_GET) {
+ if (get_tier_type(store) != "pubsub") {
+ cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
+ return EINVAL;
+ }
+ if (user_id.empty()) {
+ cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
+ return EINVAL;
+ }
+ if (sub_name.empty()) {
+ cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
+ return EINVAL;
+ }
+ RGWUserInfo& user_info = user_op.get_user_info();
+ RGWUserPubSub ups(store, user_info.user_id);
+
+ rgw_pubsub_user_sub_config sub;
+
+ ret = ups.get_sub(sub_name, &sub);
+ if (ret < 0) {
+ cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ encode_json("sub", sub, formatter);
+ formatter->flush(cout);
+ }
+
+ if (opt_cmd == OPT_PUBSUB_SUB_CREATE) {
+ if (get_tier_type(store) != "pubsub") {
+ cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
+ return EINVAL;
+ }
+ if (user_id.empty()) {
+ cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
+ return EINVAL;
+ }
+ if (sub_name.empty()) {
+ cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
+ return EINVAL;
+ }
+ if (topic_name.empty()) {
+ cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
+ return EINVAL;
+ }
+ RGWUserInfo& user_info = user_op.get_user_info();
+ RGWUserPubSub ups(store, user_info.user_id);
+
+ rgw_pubsub_user_topic_info topic;
+ int ret = ups.get_topic(topic_name, &topic);
+ if (ret < 0) {
+ cerr << "ERROR: topic not found" << std::endl;
+ return EINVAL;
+ }
+ rgw_pubsub_user_sub_config sub;
+
+ auto& tier_config = get_tier_config(store);
+
+ rgw_pubsub_user_sub_dest dest_config;
+ dest_config.bucket_name = sub_dest_bucket;
+ dest_config.oid_prefix = sub_oid_prefix;
+ dest_config.push_endpoint = sub_push_endpoint;
+
+ if (dest_config.bucket_name.empty()) {
+ dest_config.bucket_name = string(tier_config["data_bucket_prefix"]) + topic.user.to_str() + "-" + topic.topic.name;
+ }
+ if (dest_config.oid_prefix.empty()) {
+ dest_config.oid_prefix = tier_config["data_oid_prefix"];
+ }
+ ret = ups.add_sub(sub_name, topic_name, dest_config);
+ if (ret < 0) {
+ cerr << "ERROR: could not store subscription info: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ }
+
+ if (opt_cmd == OPT_PUBSUB_SUB_RM) {
+ if (get_tier_type(store) != "pubsub") {
+ cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
+ return EINVAL;
+ }
+ if (user_id.empty()) {
+ cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
+ return EINVAL;
+ }
+ if (sub_name.empty()) {
+ cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
+ return EINVAL;
+ }
+ RGWUserInfo& user_info = user_op.get_user_info();
+ RGWUserPubSub ups(store, user_info.user_id);
+
+ rgw_pubsub_user_sub_config sub;
+
+ ret = ups.get_sub(sub_name, &sub);
+ if (ret < 0) {
+ cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ encode_json("sub", sub, formatter);
+ formatter->flush(cout);
+ }
+
+ return 0;
}
{
encode_json("user", user, f);
encode_json("topic", topic, f);
+ encode_json("subs", subs, f);
}
void rgw_pubsub_user_topics::dump(Formatter *f) const
encode_json("dest", dest, f);
}
-static string pubsub_user_oid_prefix = "pubsub.user.";
-
-
-class RGWUserPubSub
-{
- RGWRados *store;
- rgw_user user;
- RGWObjectCtx obj_ctx;
-
- template <class T>
- int read(const rgw_raw_obj& obj, T *data, RGWObjVersionTracker *objv_tracker);
-
- template <class T>
- int write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *obj_tracker);
-
- int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker);
-public:
- RGWUserPubSub(RGWRados *_store, const rgw_user& _user) : store(_store),
- user(_user),
- obj_ctx(store) {}
-
- string user_meta_oid() const {
- return pubsub_user_oid_prefix + user.to_str();
- }
-
- string bucket_meta_oid(const rgw_bucket& bucket) const {
- return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id;
- }
-
- string sub_meta_oid(const string& name) const {
- return pubsub_user_oid_prefix + user.to_str() + ".sub." + name;
- }
-
- void get_user_meta_obj(rgw_raw_obj *obj) const {
- *obj = rgw_raw_obj(store->get_zone_params().log_pool, user_meta_oid());
- }
-
- void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
- *obj = rgw_raw_obj(store->get_zone_params().log_pool, bucket_meta_oid(bucket));
- }
-
- void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
- *obj = rgw_raw_obj(store->get_zone_params().log_pool, sub_meta_oid(name));
- }
-
- int get_topics(rgw_pubsub_user_topics *result);
- int get_bucket_topics(const rgw_bucket& bucket, rgw_pubsub_user_topics *result);
- int create_topic(const string& name, const rgw_bucket& bucket);
- int remove_topic(const string& name);
- int add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest);
- int remove_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest);
-};
-
-template <class T>
-int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker *objv_tracker)
-{
- bufferlist bl;
- int ret = rgw_get_system_obj(store, obj_ctx,
- obj.pool, obj.oid,
- bl,
- objv_tracker,
- nullptr, nullptr, nullptr);
- if (ret < 0) {
- return ret;
- }
-
- auto iter = bl.cbegin();
- try {
- decode(*result, iter);
- } catch (buffer::error& err) {
- ldout(store->ctx(), 0) << "ERROR: failed to decode info, caught buffer::error" << dendl;
- return -EIO;
- }
-
- return 0;
-}
-
-template <class T>
-int RGWUserPubSub::write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *objv_tracker)
-{
- bufferlist bl;
- encode(info, bl);
-
- int ret = rgw_put_system_obj(store, obj.pool, obj.oid,
- bl, false, objv_tracker,
- real_time());
- if (ret < 0) {
- return ret;
- }
-
- return 0;
-}
int RGWUserPubSub::remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker)
{
return 0;
}
+int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_user_topic_info *result)
+{
+ rgw_raw_obj obj;
+ get_user_meta_obj(&obj);
+
+ RGWObjVersionTracker objv_tracker;
+ rgw_pubsub_user_topics topics;
+
+ int ret = read(obj, &topics, &objv_tracker);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+ return ret;
+ }
+
+ auto iter = topics.topics.find(name);
+ if (iter == topics.topics.end()) {
+ ldout(store->ctx(), 0) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
+ return -ENOENT;
+ }
+
+ *result = iter->second;
+ return 0;
+}
+
int RGWUserPubSub::create_topic(const string& name, const rgw_bucket& bucket)
{
return 0;
}
+int RGWUserPubSub::get_sub(const string& name, rgw_pubsub_user_sub_config *result)
+{
+ rgw_raw_obj obj;
+ get_sub_meta_obj(name, &obj);
+ int ret = read(obj, result, nullptr);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
+ return ret;
+ }
+ return 0;
+}
+
int RGWUserPubSub::add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest)
{
rgw_raw_obj obj;
return ret;
}
- auto iter = topics.topics.find(name);
+ auto iter = topics.topics.find(topic);
if (iter == topics.topics.end()) {
- ldout(store->ctx(), 20) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
+ ldout(store->ctx(), 0) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
return -ENOENT;
}
rgw_raw_obj bobj;
get_sub_meta_obj(name, &bobj);
- ret = write(obj, sub_conf, nullptr);
+ ret = write(bobj, sub_conf, nullptr);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWUserPubSub::remove_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest)
+int RGWUserPubSub::remove_sub(const string& name, const string& _topic, const rgw_pubsub_user_sub_dest& dest)
{
+ string topic = _topic;
+
+ RGWObjVersionTracker sobjv_tracker;
+ rgw_raw_obj sobj;
+ get_sub_meta_obj(name, &sobj);
+
+ if (topic.empty()) {
+ rgw_pubsub_user_sub_config sub_conf;
+ int ret = read(sobj, &sub_conf, &sobjv_tracker);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
+ return ret;
+ }
+ topic = sub_conf.topic;
+ }
+
rgw_raw_obj obj;
get_user_meta_obj(&obj);
}
if (ret >= 0) {
- auto iter = topics.topics.find(name);
+ auto iter = topics.topics.find(topic);
if (iter == topics.topics.end()) {
ldout(store->ctx(), 20) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
} else {
}
}
- rgw_raw_obj sobj;
- get_sub_meta_obj(name, &sobj);
- ret = remove(obj, nullptr);
+ ret = remove(sobj, &sobjv_tracker);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
return ret;
#define CEPH_RGW_PUBSUB_H
#include "rgw_common.h"
+#include "rgw_tools.h"
enum RGWPubSubEventType {
};
WRITE_CLASS_ENCODER(rgw_pubsub_user_topics)
+static string pubsub_user_oid_prefix = "pubsub.user.";
+
+class RGWUserPubSub
+{
+ RGWRados *store;
+ rgw_user user;
+ RGWObjectCtx obj_ctx;
+
+ template <class T>
+ int read(const rgw_raw_obj& obj, T *data, RGWObjVersionTracker *objv_tracker);
+
+ template <class T>
+ int write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *obj_tracker);
+
+ int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker);
+public:
+ RGWUserPubSub(RGWRados *_store, const rgw_user& _user) : store(_store),
+ user(_user),
+ obj_ctx(store) {}
+
+ string user_meta_oid() const {
+ return pubsub_user_oid_prefix + user.to_str();
+ }
+
+ string bucket_meta_oid(const rgw_bucket& bucket) const {
+ return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id;
+ }
+
+ string sub_meta_oid(const string& name) const {
+ return pubsub_user_oid_prefix + user.to_str() + ".sub." + name;
+ }
+
+ void get_user_meta_obj(rgw_raw_obj *obj) const {
+ *obj = rgw_raw_obj(store->get_zone_params().log_pool, user_meta_oid());
+ }
+
+ void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
+ *obj = rgw_raw_obj(store->get_zone_params().log_pool, bucket_meta_oid(bucket));
+ }
+
+ void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
+ *obj = rgw_raw_obj(store->get_zone_params().log_pool, sub_meta_oid(name));
+ }
+
+ int get_topics(rgw_pubsub_user_topics *result);
+ int get_bucket_topics(const rgw_bucket& bucket, rgw_pubsub_user_topics *result);
+ int get_topic(const string& name, rgw_pubsub_user_topic_info *result);
+ int create_topic(const string& name, const rgw_bucket& bucket);
+ int remove_topic(const string& name);
+ int get_sub(const string& name, rgw_pubsub_user_sub_config *result);
+ int add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest);
+ int remove_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest);
+};
+
+template <class T>
+int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker *objv_tracker)
+{
+ bufferlist bl;
+ int ret = rgw_get_system_obj(store, obj_ctx,
+ obj.pool, obj.oid,
+ bl,
+ objv_tracker,
+ nullptr, nullptr, nullptr);
+ if (ret < 0) {
+ return ret;
+ }
+
+ auto iter = bl.cbegin();
+ try {
+ decode(*result, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+
+ return 0;
+}
+
+template <class T>
+int RGWUserPubSub::write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *objv_tracker)
+{
+ bufferlist bl;
+ encode(info, bl);
+
+ int ret = rgw_put_system_obj(store, obj.pool, obj.oid,
+ bl, false, objv_tracker,
+ real_time());
+ if (ret < 0) {
+ return ret;
+ }
+
+ return 0;
+}
+
#endif
"tenant": <tenant>, # default: <empty>
"uid": <uid>, # default: "pubsub"
"data_bucket_prefix": <prefix> # default: "pubsub"
+ "data_oid_prefix": <prefix> #
# non-dynamic config
"notifications": [
}
void init(CephContext *cct, const JSONFormattable& config,
- const string& data_bucket_prefix) {
+ const string& data_bucket_prefix,
+ const string& default_oid_prefix) {
name = config["name"];
topic = config["topic"];
push_endpoint = config["push_endpoint"];
string default_bucket_name = data_bucket_prefix + name;
data_bucket_name = config["data_bucket"](default_bucket_name.c_str());
- data_oid_prefix = config["data_oid_prefix"];
+ data_oid_prefix = config["data_oid_prefix"](default_oid_prefix.c_str());
}
};
return ss.str();
}
-using TopicsRef = std::shared_ptr<vector<PSTopicConfig *>>;
+using PSTopicConfigRef = std::shared_ptr<PSTopicConfig>;
+using TopicsRef = std::shared_ptr<vector<PSTopicConfigRef>>;
struct PSConfig {
string id{"pubsub"};
rgw_user user;
string data_bucket_prefix;
+ string data_oid_prefix;
uint64_t sync_instance{0};
uint64_t max_id{0};
/* FIXME: no hard coded buckets, we'll have configurable topics */
map<string, PSSubConfigRef> subs;
- map<string, PSTopicConfig> topics;
+ map<string, PSTopicConfigRef> topics;
multimap<string, PSNotificationConfig> notifications;
void dump(Formatter *f) const {
encode_json("id", id, f);
encode_json("user", user, f);
encode_json("data_bucket_prefix", data_bucket_prefix, f);
+ encode_json("data_oid_prefix", data_bucket_prefix, f);
encode_json("sync_instance", sync_instance, f);
encode_json("max_id", max_id, f);
{
{
Formatter::ArraySection section(*f, "topics");
for (auto& topic : topics) {
- encode_json("topic", topic.second, f);
+ encode_json("topic", *topic.second, f);
}
}
{
string uid = config["uid"]("pubsub");
user = rgw_user(config["tenant"], uid);
data_bucket_prefix = config["data_bucket_prefix"]("pubsub");
+ data_oid_prefix = config["data_oid_prefix"];
/* FIXME: this will be dynamically configured */
for (auto& c : config["notifications"].array()) {
notifications.insert(std::make_pair(nc.path, nc));
PSTopicConfig topic_config = { .name = nc.topic };
- topics[nc.topic] = topic_config;
+ topics[nc.topic] = make_shared<PSTopicConfig>(topic_config);
}
for (auto& c : config["subscriptions"].array()) {
auto sc = std::make_shared<PSSubConfig>();
- sc->init(cct, c, data_bucket_prefix);
+ sc->init(cct, c, data_bucket_prefix, data_oid_prefix);
subs[sc->name] = sc;
- topics[sc->topic].subs.insert(sc->name);
+ auto iter = topics.find(sc->topic);
+ if (iter != topics.end()) {
+ iter->second->subs.insert(sc->name);
+ }
}
ldout(cct, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl;
void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, TopicsRef *result) {
string path = bucket.name + "/" + key.name;
- (*result)->clear();
-
auto iter = notifications.upper_bound(path);
if (iter == notifications.begin()) {
return;
}
ldout(cct, 10) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id << " target_path=" << target.path << ", topic=" << target.topic << dendl;
- (*result)->push_back(&topic->second);
+ (*result)->push_back(topic->second);
} while (iter != notifications.begin());
}
class RGWPSFindBucketTopicsCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
PSEnvRef env;
+ rgw_user owner;
rgw_bucket bucket;
rgw_obj_key key;
+ RGWUserPubSub ups;
+
+ rgw_raw_obj obj;
+ rgw_pubsub_user_topics bucket_topics;
TopicsRef *topics;
public:
RGWPSFindBucketTopicsCR(RGWDataSyncEnv *_sync_env,
PSEnvRef& _env,
- const rgw_bucket& _bucket,
+ const RGWBucketInfo& _bucket_info,
const rgw_obj_key& _key,
TopicsRef *_topics) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
env(_env),
- bucket(_bucket),
+ owner(_bucket_info.owner),
+ bucket(_bucket_info.bucket),
key(_key),
+ ups(_sync_env->store, owner),
topics(_topics) {
- *topics = std::make_shared<vector<PSTopicConfig *> >();
+ *topics = std::make_shared<vector<PSTopicConfigRef> >();
}
int operate() override {
reenter(this) {
-#warning this will need to change
+ ups.get_bucket_meta_obj(bucket, &obj);
+
+
+ using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_user_topics>;
+ yield {
+ bool empty_on_enoent = true;
+ call(new ReadInfoCR(sync_env->async_rados, sync_env->store,
+ obj,
+ &bucket_topics, empty_on_enoent));
+ }
+ if (retcode < 0 && retcode != -ENOENT) {
+ return set_cr_error(retcode);
+ }
+
+ ldout(sync_env->cct, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl;
+
+ for (auto& titer : bucket_topics.topics) {
+ auto& info = titer.second;
+ shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>();
+ tc->name = info.topic.name;
+ tc->subs = info.subs;
+ (*topics)->push_back(tc);
+ }
+
env->conf->get_topics(sync_env->cct, bucket, key, topics);
return set_cr_done();
}
PSEnvRef env;
EventRef event;
- vector<PSTopicConfig *>::iterator titer;
+ vector<PSTopicConfigRef>::iterator titer;
set<string>::iterator siter;
PSSubscriptionRef sub;
TopicsRef topics;
int operate() override {
reenter(this) {
- yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.bucket, key, &topics));
+ yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info, key, &topics));
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
return set_cr_error(retcode);
reenter(this) {
ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
<< " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
- yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.bucket, key, &topics));
+ yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info, key, &topics));
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
return set_cr_error(retcode);