encode_json("events", events, f);
}
-void rgw_pubsub_topic_subs::dump(Formatter *f) const
-{
- encode_json("topic", topic, f);
- encode_json("subs", subs, f);
-}
-
void rgw_pubsub_bucket_topics::dump(Formatter *f) const
{
Formatter::ArraySection s(*f, "topics");
void rgw_pubsub_topics::dump_xml(Formatter *f) const
{
for (auto& t : topics) {
- encode_xml("member", t.second.topic, f);
+ encode_xml("member", t.second, f);
}
}
-void rgw_pubsub_sub_dest::dump(Formatter *f) const
+void rgw_pubsub_dest::dump(Formatter *f) const
{
- encode_json("bucket_name", bucket_name, f);
- encode_json("oid_prefix", oid_prefix, f);
encode_json("push_endpoint", push_endpoint, f);
encode_json("push_endpoint_args", push_endpoint_args, f);
encode_json("push_endpoint_topic", arn_topic, f);
encode_json("persistent", persistent, f);
}
-void rgw_pubsub_sub_dest::dump_xml(Formatter *f) const
+void rgw_pubsub_dest::dump_xml(Formatter *f) const
{
- // first 2 members are omitted here since they
- // dont apply to AWS compliant topics
encode_xml("EndpointAddress", push_endpoint, f);
encode_xml("EndpointArgs", push_endpoint_args, f);
encode_xml("EndpointTopic", arn_topic, f);
encode_xml("Persistent", persistent, f);
}
-std::string rgw_pubsub_sub_dest::to_json_str() const
+std::string rgw_pubsub_dest::to_json_str() const
{
- // first 2 members are omitted here since they
- // dont apply to AWS compliant topics
JSONFormatter f;
f.open_object_section("");
encode_json("EndpointAddress", push_endpoint, &f);
return ss.str();
}
-void rgw_pubsub_sub_config::dump(Formatter *f) const
-{
- encode_json("user", user, f);
- encode_json("name", name, f);
- encode_json("topic", topic, f);
- encode_json("dest", dest, f);
- encode_json("s3_id", s3_id, f);
-}
-
RGWPubSub::RGWPubSub(rgw::sal::RadosStore* _store, const std::string& _tenant)
: store(_store), tenant(_tenant), svc_sysobj(store->svc()->sysobj)
{
return read_topics(result, nullptr);
}
-int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result)
-{
- rgw_pubsub_topics topics;
- int ret = get_topics(&topics);
- if (ret < 0) {
- ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
- return ret;
- }
-
- auto iter = topics.topics.find(name);
- if (iter == topics.topics.end()) {
- ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl;
- return -ENOENT;
- }
-
- *result = iter->second;
- return 0;
-}
-
int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic *result)
{
rgw_pubsub_topics topics;
return -ENOENT;
}
- *result = iter->second.topic;
+ *result = iter->second;
return 0;
}
}
int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const string& topic_name,const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) {
- rgw_pubsub_topic_subs topic_info;
+ rgw_pubsub_topic topic_info;
int ret = ps->get_topic(topic_name, &topic_info);
if (ret < 0) {
bucket.name << "'" << dendl;
auto& topic_filter = bucket_topics.topics[topic_name];
- topic_filter.topic = topic_info.topic;
+ topic_filter.topic = topic_info;
topic_filter.events = events;
topic_filter.s3_id = notif_name;
if (s3_filter) {
int RGWPubSub::Bucket::remove_notification(const DoutPrefixProvider *dpp, const string& topic_name, optional_yield y)
{
- rgw_pubsub_topic_subs topic_info;
+ rgw_pubsub_topic topic_info;
int ret = ps->get_topic(topic_name, &topic_info);
if (ret < 0) {
}
int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const string& name, optional_yield y) {
- return create_topic(dpp, name, rgw_pubsub_sub_dest(), "", "", y);
+ return create_topic(dpp, name, rgw_pubsub_dest{}, "", "", y);
}
-int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y) {
+int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const string& name, const rgw_pubsub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y) {
RGWObjVersionTracker objv_tracker;
rgw_pubsub_topics topics;
return ret;
}
- rgw_pubsub_topic_subs& new_topic = topics.topics[name];
- new_topic.topic.user = rgw_user("", tenant);
- new_topic.topic.name = name;
- new_topic.topic.dest = dest;
- new_topic.topic.arn = arn;
- new_topic.topic.opaque_data = opaque_data;
+ rgw_pubsub_topic& new_topic = topics.topics[name];
+ new_topic.user = rgw_user("", tenant);
+ new_topic.name = name;
+ new_topic.dest = dest;
+ new_topic.arn = arn;
+ new_topic.opaque_data = opaque_data;
ret = write_topics(dpp, topics, &objv_tracker, y);
if (ret < 0) {
*obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, bucket_meta_oid(bucket));
}
-void RGWPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
- *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sub_meta_oid(name));
-}
-
// setting a unique ID for an event based on object hash and timestamp
void set_event_id(std::string& id, const std::string& hash, const utime_t& ts);
-struct rgw_pubsub_sub_dest {
- std::string bucket_name;
- std::string oid_prefix;
+struct rgw_pubsub_dest {
std::string push_endpoint;
std::string push_endpoint_args;
std::string arn_topic;
void encode(bufferlist& bl) const {
ENCODE_START(5, 1, bl);
- encode(bucket_name, bl);
- encode(oid_prefix, bl);
+ encode("", bl);
+ encode("", bl);
encode(push_endpoint, bl);
encode(push_endpoint_args, bl);
encode(arn_topic, bl);
void decode(bufferlist::const_iterator& bl) {
DECODE_START(5, bl);
- decode(bucket_name, bl);
- decode(oid_prefix, bl);
+ std::string dummy;
+ decode(dummy, bl);
+ decode(dummy, bl);
decode(push_endpoint, bl);
if (struct_v >= 2) {
decode(push_endpoint_args, bl);
void dump_xml(Formatter *f) const;
std::string to_json_str() const;
};
-WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest)
-
-struct rgw_pubsub_sub_config {
- rgw_user user;
- std::string name;
- std::string topic;
- rgw_pubsub_sub_dest dest;
- std::string s3_id;
-
- void encode(bufferlist& bl) const {
- ENCODE_START(2, 1, bl);
- encode(user, bl);
- encode(name, bl);
- encode(topic, bl);
- encode(dest, bl);
- encode(s3_id, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::const_iterator& bl) {
- DECODE_START(2, bl);
- decode(user, bl);
- decode(name, bl);
- decode(topic, bl);
- decode(dest, bl);
- if (struct_v >= 2) {
- decode(s3_id, bl);
- }
- DECODE_FINISH(bl);
- }
-
- void dump(Formatter *f) const;
-};
-WRITE_CLASS_ENCODER(rgw_pubsub_sub_config)
+WRITE_CLASS_ENCODER(rgw_pubsub_dest)
struct rgw_pubsub_topic {
rgw_user user;
std::string name;
- rgw_pubsub_sub_dest dest;
+ rgw_pubsub_dest dest;
std::string arn;
std::string opaque_data;
};
WRITE_CLASS_ENCODER(rgw_pubsub_topic)
+// this struct deprecated and remain only for backward compatibility
struct rgw_pubsub_topic_subs {
rgw_pubsub_topic topic;
std::set<std::string> subs;
WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics)
struct rgw_pubsub_topics {
- std::map<std::string, rgw_pubsub_topic_subs> topics;
+ std::map<std::string, rgw_pubsub_topic> topics;
void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 2, bl);
encode(topics, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
- decode(topics, bl);
+ DECODE_START(2, bl);
+ if (struct_v >= 2) {
+ decode(topics, bl);
+ } else {
+ std::map<std::string, rgw_pubsub_topic_subs> v1topics;
+ decode(v1topics, bl);
+ std::transform(v1topics.begin(), v1topics.end(), std::inserter(topics, topics.end()),
+ [](const auto& entry) {
+ return std::pair<std::string, rgw_pubsub_topic>(entry.first, entry.second.topic);
+ });
+ }
DECODE_FINISH(bl);
}
return pubsub_oid_prefix + tenant + ".bucket." + bucket.name + "/" + bucket.marker;
}
- std::string sub_meta_oid(const std::string& name) const {
- return pubsub_oid_prefix + tenant + ".sub." + name;
- }
-
template <class T>
int read(const rgw_raw_obj& obj, T* data, RGWObjVersionTracker* objv_tracker);
void get_meta_obj(rgw_raw_obj *obj) const;
void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const;
- void get_sub_meta_obj(const std::string& name, rgw_raw_obj *obj) const;
-
// get all topics (per tenant, if used)) and populate them into "result"
// return 0 on success or if no topics exist, error code otherwise
int get_topics(rgw_pubsub_topics *result);
- // get a topic with its subscriptions by its name and populate it into "result"
- // return -ENOENT if the topic does not exists
- // return 0 on success, error code otherwise
- int get_topic(const std::string& name, rgw_pubsub_topic_subs *result);
// get a topic with by its name and populate it into "result"
// return -ENOENT if the topic does not exists
// return 0 on success, error code otherwise
// create a topic with push destination information and ARN
// if the topic already exists the destination and ARN values may be updated (considered succsess)
// return 0 on success, error code otherwise
- int create_topic(const DoutPrefixProvider *dpp, const std::string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y);
+ int create_topic(const DoutPrefixProvider *dpp, const std::string& name, const rgw_pubsub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y);
// remove a topic according to its name
// if the topic does not exists it is a no-op (considered success)
// return 0 on success, error code otherwise
// make sure that endpoint is a valid URL
// make sure that if user/password are passed inside URL, it is over secure connection
-// update rgw_pubsub_sub_dest to indicate that a password is stored in the URL
-bool validate_and_update_endpoint_secret(rgw_pubsub_sub_dest& dest, CephContext *cct, const RGWEnv& env) {
+// update rgw_pubsub_dest to indicate that a password is stored in the URL
+bool validate_and_update_endpoint_secret(rgw_pubsub_dest& dest, CephContext *cct, const RGWEnv& env) {
if (dest.push_endpoint.empty()) {
return true;
}
return true;
}
-bool topic_has_endpoint_secret(const rgw_pubsub_topic_subs& topic) {
- return topic.topic.dest.stored_secret;
+bool topic_has_endpoint_secret(const rgw_pubsub_topic& topic) {
+ return topic.dest.stored_secret;
}
bool topics_has_endpoint_secret(const rgw_pubsub_topics& topics) {
private:
std::optional<RGWPubSub> ps;
std::string topic_name;
- rgw_pubsub_sub_dest dest;
+ rgw_pubsub_dest dest;
std::string topic_arn;
std::string opaque_data;
private:
std::string topic_name;
std::optional<RGWPubSub> ps;
- rgw_pubsub_topic_subs result;
+ rgw_pubsub_topic result;
int get_params() {
const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
const auto f = s->formatter;
f->open_object_section("GetTopicResponse");
f->open_object_section("GetTopicResult");
- encode_xml("Topic", result.topic, f);
+ encode_xml("Topic", result, f);
f->close_section();
f->open_object_section("ResponseMetadata");
encode_xml("RequestId", s->req_id, f);
private:
std::string topic_name;
std::optional<RGWPubSub> ps;
- rgw_pubsub_topic_subs result;
+ rgw_pubsub_topic result;
int get_params() {
const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
const auto f = s->formatter;
f->open_object_section_in_ns("GetTopicAttributesResponse", AWS_SNS_NS);
f->open_object_section("GetTopicAttributesResult");
- result.topic.dump_xml_as_attributes(f);
+ result.dump_xml_as_attributes(f);
f->close_section(); // GetTopicAttributesResult
f->open_object_section("ResponseMetadata");
encode_xml("RequestId", s->req_id, f);
RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(driver), tenant);
- rgw_pubsub_topic_subs topic;
+ rgw_pubsub_topic topic;
ret = ps.get_topic(topic_name, &topic);
if (ret < 0) {
cerr << "ERROR: could not get topic: " << cpp_strerror(-ret) << std::endl;
# get topic 3 via commandline
result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant])
parsed_result = json.loads(result[0])
- assert_equal(parsed_result['topic']['arn'], topic_arn3)
+ assert_equal(parsed_result['arn'], topic_arn3)
# delete topic 3
_, result = admin(['topic', 'rm', '--topic', topic_name+'_3', '--tenant', tenant])
# try to get a deleted topic
_, result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant])
+ print('"topic not found" error is expected')
assert_equal(result, 2)
# get the remaining 2 topics
if topic_conf2 is not None:
topic_conf2.del_config()
# delete the bucket
+ for key in bucket.list():
+ key.delete()
conn.delete_bucket(bucket_name)
if receiver is not None:
stop_kafka_receiver(receiver, task)