From: Yehuda Sadeh Date: Tue, 3 Jul 2018 22:37:41 +0000 (-0700) Subject: rgw: pubsub config fixes X-Git-Tag: v14.1.0~616^2~48 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=92b278d86c25322548ce8ff3105927ead3d9f8ee;p=ceph.git rgw: pubsub config fixes Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index 16ef1b33ec72d..1d7bb6dd1272b 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -165,33 +165,23 @@ int RGWUserPubSub::create_topic(const string& name, const rgw_bucket& bucket) return ret; } - rgw_pubsub_user_topic_info new_topic; + rgw_pubsub_user_topic_info& new_topic = topics.topics[name]; new_topic.user = user; new_topic.topic.name = name; new_topic.topic.bucket = bucket; - topics.topics[name] = new_topic; - ret = write(obj, topics, &objv_tracker); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl; return ret; } - rgw_pubsub_user_topics bucket_topics; - for (auto& t : topics.topics) { - if (t.second.topic.bucket == bucket) { - bucket_topics.topics.insert(t); - } - } - - rgw_raw_obj bobj; - get_bucket_meta_obj(bucket, &bobj); - ret = write(obj, bucket_topics, nullptr); + ret = update_bucket(topics, bucket); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl; + ldout(store->ctx(), 0) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl; return ret; } + return 0; } @@ -227,6 +217,17 @@ int RGWUserPubSub::remove_topic(const string& name) return 0; } + ret = update_bucket(topics, bucket); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl; + return ret; + } + + return 0; +} + +int RGWUserPubSub::update_bucket(const rgw_pubsub_user_topics& topics, const rgw_bucket& bucket) +{ rgw_pubsub_user_topics bucket_topics; for (auto& t : topics.topics) { if (t.second.topic.bucket == bucket) { @@ -236,7 +237,7 @@ int RGWUserPubSub::remove_topic(const string& name) rgw_raw_obj bobj; get_bucket_meta_obj(bucket, &bobj); - ret = write(obj, bucket_topics, nullptr); + int ret = write(bobj, bucket_topics, nullptr); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl; return ret; @@ -293,9 +294,15 @@ int RGWUserPubSub::add_sub(const string& name, const string& topic, const rgw_pu return ret; } - rgw_raw_obj bobj; - get_sub_meta_obj(name, &bobj); - ret = write(bobj, sub_conf, nullptr); + ret = update_bucket(topics, t.topic.bucket); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl; + return ret; + } + + rgw_raw_obj sobj; + get_sub_meta_obj(name, &sobj); + ret = write(sobj, sub_conf, nullptr); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl; return ret; diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index 00f15ce3fb64f..52cf5e870df22 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -199,6 +199,8 @@ class RGWUserPubSub int write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *obj_tracker); int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker); + + int update_bucket(const rgw_pubsub_user_topics& topics, const rgw_bucket& bucket); public: RGWUserPubSub(RGWRados *_store, const rgw_user& _user) : store(_store), user(_user), diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index 71c0ac2a90bac..184a61be8f164 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -586,6 +586,7 @@ public: PSEnvRef _env, rgw_pubsub_user_sub_config& user_sub_conf) : sync_env(_sync_env), env(_env), + sub_conf(std::make_shared()), data_access(std::make_shared(sync_env->store)) { sub_conf->from_user_conf(user_sub_conf); }