"will be located in the path that is specified here. "),
Option("rgw_enable_apis", Option::TYPE_STR, Option::LEVEL_ADVANCED)
- .set_default("s3, s3website, swift, swift_auth, admin, sts, iam")
+ .set_default("s3, s3website, swift, swift_auth, admin, sts, iam, pubsub")
.set_description("A list of set of RESTful APIs that rgw handles."),
Option("rgw_cache_enabled", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
shift 2
-$CEPH_BIN/$command -c $CEPH_CONF_PATH/ceph.conf "$@"
+if [ "$RGW_VALGRIND" == 'yes' ] && [ $command == 'radosgw' ]; then
+ valgrind --trace-children=yes --tool=memcheck --max-threads=1024 $CEPH_BIN/$command -c $CEPH_CONF_PATH/ceph.conf "$@"
+ sleep 10
+else
+ $CEPH_BIN/$command -c $CEPH_CONF_PATH/ceph.conf "$@"
+fi
+
rgw_rest_conn.cc
rgw_rest_log.cc
rgw_rest_metadata.cc
+ rgw_rest_pubsub.cc
+ rgw_rest_pubsub_common.cc
rgw_rest_realm.cc
rgw_rest_role.cc
rgw_rest_s3.cc
#include <string>
#include "rgw_basic_types.h"
+#include "rgw_xml.h"
#include "common/ceph_json.h"
using std::string;
void decode_json_obj(rgw_user& val, JSONObj *obj)
{
- string s = obj->get_data();
- val.from_str(s);
+ val.from_str(obj->get_data());
}
void encode_json(const char *name, const rgw_user& val, Formatter *f)
{
- string s = val.to_str();
- f->dump_string(name, s);
+ f->dump_string(name, val.to_str());
+}
+
+void encode_xml(const char *name, const rgw_user& val, Formatter *f)
+{
+ encode_xml(name, val.to_str(), f);
}
namespace rgw {
std::string id;
rgw_user() {}
- // cppcheck-suppress noExplicitConstructor
- rgw_user(const std::string& s) {
+ explicit rgw_user(const std::string& s) {
from_str(s);
}
rgw_user(const std::string& tenant, const std::string& id)
void decode_json_obj(rgw_user& val, JSONObj *obj);
void encode_json(const char *name, const rgw_user& val, Formatter *f);
+void encode_xml(const char *name, const rgw_user& val, Formatter *f);
inline ostream& operator<<(ostream& out, const rgw_user &u) {
string s;
do {
RGWUserBuckets buckets;
- ret = rgw_read_user_buckets(store, user_id, buckets,
+ ret = rgw_read_user_buckets(store, rgw_user(user_id), buckets,
marker, string(), max_entries, false,
&is_truncated);
if (ret < 0)
std::optional<uint64_t> bytes_transferred;
int r = store->getRados()->fetch_remote_obj(obj_ctx,
- user_id,
+ rgw_user(user_id),
NULL, /* req_info */
source_zone,
dest_obj,
rgw_obj dest_obj(src_obj);
int r = store->getRados()->stat_remote_obj(obj_ctx,
- user_id,
+ rgw_user(user_id),
nullptr, /* req_info */
source_zone,
src_obj,
}
del_op.params.olh_epoch = versioned_epoch;
del_op.params.marker_version_id = marker_version_id;
- del_op.params.obj_owner.set_id(owner);
+ del_op.params.obj_owner.set_id(rgw_user(owner));
del_op.params.obj_owner.set_name(owner_display_name);
del_op.params.mtime = timestamp;
del_op.params.high_precision_time = true;
}
if (token.valid() && (ldh->auth(token.id, token.key) == 0)) {
/* try to store user if it doesn't already exist */
- if (store->ctl()->user->get_info_by_uid(token.id, &user, null_yield) < 0) {
+ if (store->ctl()->user->get_info_by_uid(rgw_user(token.id), &user, null_yield) < 0) {
int ret = store->ctl()->user->store_info(user, null_yield,
RGWUserCtl::PutParams()
.set_exclusive(true));
const bool s3website_enabled = apis_map.count("s3website") > 0;
const bool sts_enabled = apis_map.count("sts") > 0;
const bool iam_enabled = apis_map.count("iam") > 0;
+ const bool pubsub_enabled = apis_map.count("pubsub") > 0;
// Swift API entrypoint could placed in the root instead of S3
const bool swift_at_root = g_conf()->rgw_swift_url_prefix == "/";
if (apis_map.count("s3") > 0 || s3website_enabled) {
if (! swift_at_root) {
rest.register_default_mgr(set_logging(rest_filter(store->getRados(), RGW_REST_S3,
- new RGWRESTMgr_S3(s3website_enabled, sts_enabled, iam_enabled))));
+ new RGWRESTMgr_S3(s3website_enabled, sts_enabled, iam_enabled, pubsub_enabled))));
} else {
derr << "Cannot have the S3 or S3 Website enabled together with "
<< "Swift API placed in the root of hierarchy" << dendl;
bufferlist response;
string uid_str = s->user->user_id.to_str();
#define MAX_REST_RESPONSE (128 * 1024) // we expect a very small response
- int ret = store->svc()->zone->get_master_conn()->forward(uid_str, (forward_info ? *forward_info : s->info),
+ int ret = store->svc()->zone->get_master_conn()->forward(rgw_user(uid_str), (forward_info ? *forward_info : s->info),
objv, MAX_REST_RESPONSE, &in_data, &response);
if (ret < 0)
return ret;
encode_json("arn", arn, f);
}
+void rgw_pubsub_topic::dump_xml(Formatter *f) const
+{
+ encode_xml("User", user, f);
+ encode_xml("Name", name, f);
+ encode_xml("EndPoint", dest, f);
+ encode_xml("TopicArn", arn, f);
+}
+
void rgw_pubsub_topic_filter::dump(Formatter *f) const
{
encode_json("topic", topic, f);
}
}
+void rgw_pubsub_user_topics::dump_xml(Formatter *f) const
+{
+ for (auto& t : topics) {
+ encode_xml("member", t.second.topic, f);
+ }
+}
+
void rgw_pubsub_sub_dest::dump(Formatter *f) const
{
encode_json("bucket_name", bucket_name, f);
encode_json("arn_topic", arn_topic, f);
}
+void rgw_pubsub_sub_dest::dump_xml(Formatter *f) const
+{
+ encode_xml("EndpointAddress", push_endpoint, f);
+ encode_xml("EndpointArgs", push_endpoint_args, f);
+ encode_xml("TopicArn", arn_topic, f);
+}
+
void rgw_pubsub_sub_config::dump(Formatter *f) const
{
encode_json("user", user, f);
}
void dump(Formatter *f) const;
+ void dump_xml(Formatter *f) const;
};
WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest)
}
void dump(Formatter *f) const;
+ void dump_xml(Formatter *f) const;
bool operator<(const rgw_pubsub_topic& t) const {
return to_str().compare(t.to_str());
}
void dump(Formatter *f) const;
+ void dump_xml(Formatter *f) const;
};
WRITE_CLASS_ENCODER(rgw_pubsub_user_topics)
ceph::real_time last_stats_sync;
ceph::real_time last_stats_update;
- int ret = store->ctl()->user->read_stats(user_str, &stats, &last_stats_sync, &last_stats_update);
+ int ret = store->ctl()->user->read_stats(rgw_user(user_str), &stats, &last_stats_sync, &last_stats_update);
if (ret < 0) {
ldout(store->ctx(), 5) << "ERROR: can't read user header: ret=" << ret << dendl;
return ret;
virtual RGWOp *op_copy() { return NULL; }
virtual RGWOp *op_options() { return NULL; }
+public:
static int allocate_formatter(struct req_state *s, int default_formatter,
bool configurable);
-public:
static constexpr int MAX_BUCKET_NAME_LEN = 255;
static constexpr int MAX_OBJ_NAME_LEN = 1024;
for (const auto& t : tokens) {
auto pos = t.find("=");
if (pos != string::npos) {
- std::string key = t.substr(0, pos);
- std::string value = t.substr(pos + 1, t.size() - 1);
- if (key == "AssumeRolePolicyDocument" || key == "Path" || key == "PolicyDocument") {
- value = url_decode(value);
+ const auto key = t.substr(0, pos);
+ if (key == "Action") {
+ s->info.args.append(key, t.substr(pos + 1, t.size() - 1));
+ } else if (key == "AssumeRolePolicyDocument" || key == "Path" || key == "PolicyDocument") {
+ const auto value = url_decode(t.substr(pos + 1, t.size() - 1));
+ s->info.args.append(key, value);
}
- s->info.args.append(key, value);
}
}
}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <algorithm>
+#include <boost/tokenizer.hpp>
+#include "rgw_rest_pubsub_common.h"
+#include "rgw_rest_pubsub.h"
+#include "rgw_pubsub_push.h"
+#include "rgw_pubsub.h"
+#include "rgw_sync_module_pubsub.h"
+#include "rgw_op.h"
+#include "rgw_rest.h"
+#include "rgw_rest_s3.h"
+#include "rgw_arn.h"
+#include "rgw_auth_s3.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rgw
+
+// command (AWS compliant):
+// POST
+// Action=CreateTopic&Name=<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]
+class RGWPSCreateTopic_ObjStore_AWS : public RGWPSCreateTopicOp {
+public:
+ int get_params() override {
+ topic_name = s->info.args.get("Name");
+ if (topic_name.empty()) {
+ ldout(s->cct, 1) << "CreateTopic Action 'Name' argument is missing" << dendl;
+ return -EINVAL;
+ }
+
+ dest.push_endpoint = s->info.args.get("push-endpoint");
+ for (const auto param : s->info.args.get_params()) {
+ if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") {
+ continue;
+ }
+ dest.push_endpoint_args.append(param.first+"="+param.second+"&");
+ }
+
+ if (!dest.push_endpoint_args.empty()) {
+ // remove last separator
+ dest.push_endpoint_args.pop_back();
+ }
+
+ // dest object only stores endpoint info
+ // bucket to store events/records will be set only when subscription is created
+ dest.bucket_name = "";
+ dest.oid_prefix = "";
+ dest.arn_topic = topic_name;
+ // the topic ARN will be sent in the reply
+ const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns,
+ store->svc()->zone->get_zonegroup().get_name(),
+ s->user->user_id.tenant, topic_name);
+ topic_arn = arn.to_string();
+ return 0;
+ }
+
+ void send_response() override {
+ if (op_ret) {
+ set_req_state_err(s, op_ret);
+ }
+ dump_errno(s);
+ end_header(s, this, "application/xml");
+
+ if (op_ret < 0) {
+ return;
+ }
+
+ {
+ XMLFormatter* f = static_cast<XMLFormatter*>(s->formatter);
+ f->open_object_section_in_ns("CreateTopicResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
+ f->open_object_section("CreateTopicResult");
+ encode_xml("TopicArn", topic_arn, f);
+ f->close_section();
+ f->open_object_section("ResponseMetadata");
+ encode_xml("RequestId", s->req_id, f);
+ f->close_section();
+ f->close_section();
+ }
+ rgw_flush_formatter_and_reset(s, s->formatter);
+ }
+};
+
+// command (AWS compliant):
+// POST
+// Action=ListTopics
+class RGWPSListTopics_ObjStore_AWS : public RGWPSListTopicsOp {
+public:
+ void send_response() override {
+ if (op_ret) {
+ set_req_state_err(s, op_ret);
+ }
+ dump_errno(s);
+ end_header(s, this, "application/xml");
+
+ if (op_ret < 0) {
+ return;
+ }
+
+ {
+ XMLFormatter* f = static_cast<XMLFormatter*>(s->formatter);
+ f->open_object_section_in_ns("ListTopicsResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
+ f->open_object_section("ListTopicsResult");
+ encode_xml("Topics", result, f);
+ f->close_section();
+ f->open_object_section("ResponseMetadata");
+ encode_xml("RequestId", s->req_id, f);
+ f->close_section();
+ f->close_section();
+ }
+ rgw_flush_formatter_and_reset(s, s->formatter);
+ }
+};
+
+// command (extension to AWS):
+// POST
+// Action=GetTopic&TopicArn=<topic-arn>
+class RGWPSGetTopic_ObjStore_AWS : public RGWPSGetTopicOp {
+public:
+ int get_params() override {
+ const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
+
+ if (!topic_arn || topic_arn->resource.empty()) {
+ ldout(s->cct, 1) << "GetTopic Action 'TopicArn' argument is missing or invalid" << dendl;
+ return -EINVAL;
+ }
+
+ topic_name = topic_arn->resource;
+ return 0;
+ }
+
+ void send_response() override {
+ if (op_ret) {
+ set_req_state_err(s, op_ret);
+ }
+ dump_errno(s);
+ end_header(s, this, "application/xml");
+
+ if (op_ret < 0) {
+ return;
+ }
+
+ {
+ XMLFormatter* f = static_cast<XMLFormatter*>(s->formatter);
+ f->open_object_section("GetTopicResponse");
+ f->open_object_section("GetTopicResult");
+ encode_xml("Topic", result.topic, f);
+ f->close_section();
+ f->open_object_section("ResponseMetadata");
+ encode_xml("RequestId", s->req_id, f);
+ f->close_section();
+ f->close_section();
+ }
+ rgw_flush_formatter_and_reset(s, s->formatter);
+ }
+};
+
+// command (AWS compliant):
+// POST
+// Action=DeleteTopic&TopicArn=<topic-arn>
+class RGWPSDeleteTopic_ObjStore_AWS : public RGWPSDeleteTopicOp {
+public:
+ int get_params() override {
+ const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
+
+ if (!topic_arn || topic_arn->resource.empty()) {
+ ldout(s->cct, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl;
+ return -EINVAL;
+ }
+
+ topic_name = topic_arn->resource;
+ return 0;
+ }
+
+ void send_response() override {
+ if (op_ret) {
+ set_req_state_err(s, op_ret);
+ }
+ dump_errno(s);
+ end_header(s, this, "application/xml");
+
+ if (op_ret < 0) {
+ return;
+ }
+
+ {
+ XMLFormatter* f = static_cast<XMLFormatter*>(s->formatter);
+ f->open_object_section_in_ns("DeleteTopicResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
+ f->open_object_section("ResponseMetadata");
+ encode_xml("RequestId", s->req_id, f);
+ f->close_section();
+ f->close_section();
+ }
+ rgw_flush_formatter_and_reset(s, s->formatter);
+ }
+};
+
+namespace {
+// utility classes and functions for handling parameters with the following format:
+// Attributes.entry.{N}.{key|value}={VALUE}
+// N - any unsigned number
+// VALUE - url encoded string
+
+// and Attribute is holding key and value
+// ctor and set are done according to the "type" argument
+// if type is not "key" or "value" its a no-op
+class Attribute {
+ std::string key;
+ std::string value;
+public:
+ Attribute(const std::string& type, const std::string& key_or_value) {
+ set(type, key_or_value);
+ }
+ void set(const std::string& type, const std::string& key_or_value) {
+ if (type == "key") {
+ key = key_or_value;
+ } else if (type == "value") {
+ value = key_or_value;
+ }
+ }
+ const std::string& get_key() const { return key; }
+ const std::string& get_value() const { return value; }
+};
+
+using AttributeMap = std::map<unsigned, Attribute>;
+
+// aggregate the attributes into a map
+// the key and value are associated by the index (N)
+// no assumptions are made on the order in which these parameters are added
+void update_attribute_map(const std::string& input, AttributeMap& map) {
+ const boost::char_separator<char> sep(".");
+ const boost::tokenizer tokens(input, sep);
+ auto token = tokens.begin();
+ if (*token != "Attributes") {
+ return;
+ }
+ ++token;
+
+ if (*token != "entry") {
+ return;
+ }
+ ++token;
+
+ unsigned idx;
+ try {
+ idx = std::stoul(*token);
+ } catch (const std::invalid_argument&) {
+ return;
+ }
+ ++token;
+
+ std::string key_or_value = "";
+ // get the rest of the string regardless of dots
+ // this is to allow dots in the value
+ while (token != tokens.end()) {
+ key_or_value.append(*token+".");
+ ++token;
+ }
+ // remove last separator
+ key_or_value.pop_back();
+
+ auto pos = key_or_value.find("=");
+ if (pos != string::npos) {
+ const auto key_or_value_lhs = key_or_value.substr(0, pos);
+ const auto key_or_value_rhs = url_decode(key_or_value.substr(pos + 1, key_or_value.size() - 1));
+ const auto map_it = map.find(idx);
+ if (map_it == map.end()) {
+ // new entry
+ map.emplace(std::make_pair(idx, Attribute(key_or_value_lhs, key_or_value_rhs)));
+ } else {
+ // existing entry
+ map_it->second.set(key_or_value_lhs, key_or_value_rhs);
+ }
+ }
+}
+}
+
+void RGWHandler_REST_PSTopic_AWS::rgw_topic_parse_input() {
+ if (post_body.size() > 0) {
+ ldout(s->cct, 10) << "Content of POST: " << post_body << dendl;
+
+ if (post_body.find("Action") != string::npos) {
+ const boost::char_separator<char> sep("&");
+ const boost::tokenizer<boost::char_separator<char>> tokens(post_body, sep);
+ AttributeMap map;
+ for (const auto& t : tokens) {
+ auto pos = t.find("=");
+ if (pos != string::npos) {
+ const auto key = t.substr(0, pos);
+ if (key == "Action") {
+ s->info.args.append(key, t.substr(pos + 1, t.size() - 1));
+ } else if (key == "Name" || key == "TopicArn") {
+ const auto value = url_decode(t.substr(pos + 1, t.size() - 1));
+ s->info.args.append(key, value);
+ } else {
+ update_attribute_map(t, map);
+ }
+ }
+ }
+ // update the regular args with the content of the attribute map
+ for (const auto attr : map) {
+ s->info.args.append(attr.second.get_key(), attr.second.get_value());
+ }
+ }
+ const auto payload_hash = rgw::auth::s3::calc_v4_payload_hash(post_body);
+ s->info.args.append("PayloadHash", payload_hash);
+ }
+}
+
+RGWOp* RGWHandler_REST_PSTopic_AWS::op_post() {
+ rgw_topic_parse_input();
+
+ if (s->info.args.exists("Action")) {
+ const auto action = s->info.args.get("Action");
+ if (action.compare("CreateTopic") == 0)
+ return new RGWPSCreateTopic_ObjStore_AWS();
+ if (action.compare("DeleteTopic") == 0)
+ return new RGWPSDeleteTopic_ObjStore_AWS;
+ if (action.compare("ListTopics") == 0)
+ return new RGWPSListTopics_ObjStore_AWS();
+ if (action.compare("GetTopic") == 0)
+ return new RGWPSGetTopic_ObjStore_AWS();
+ }
+
+ return nullptr;
+}
+
+int RGWHandler_REST_PSTopic_AWS::authorize(const DoutPrefixProvider* dpp) {
+ /*if (s->info.args.exists("Action") && s->info.args.get("Action").find("Topic") != std::string::npos) {
+ // TODO: some topic specific authorization
+ return 0;
+ }*/
+ return RGW_Auth_S3::authorize(dpp, store, auth_registry, s);
+}
+
+namespace {
+// conversion functions between S3 and GCP style event names
+std::string s3_to_gcp_event(const std::string& event) {
+ if (event == "s3:ObjectCreated:*") {
+ return "OBJECT_CREATE";
+ }
+ if (event == "s3:ObjectRemoved:*") {
+ return "OBJECT_DELETE";
+ }
+ return "UNKNOWN_EVENT";
+}
+std::string gcp_to_s3_event(const std::string& event) {
+ if (event == "OBJECT_CREATE") {
+ return "s3:ObjectCreated:";
+ }
+ if (event == "OBJECT_DELETE") {
+ return "s3:ObjectRemoved:";
+ }
+ return "UNKNOWN_EVENT";
+}
+
+// return a unique topic by prefexing with the notification name: <notification>_<topic>
+std::string topic_to_unique(const std::string& topic, const std::string& notification) {
+ return notification + "_" + topic;
+}
+
+// extract the topic from a unique topic of the form: <notification>_<topic>
+[[maybe_unused]] std::string unique_to_topic(const std::string& unique_topic, const std::string& notification) {
+ if (unique_topic.find(notification + "_") == string::npos) {
+ return "";
+ }
+ return unique_topic.substr(notification.length() + 1);
+}
+}
+
+// command (S3 compliant): PUT /<bucket name>?notification
+// a "notification" and a subscription will be auto-generated
+// actual configuration is XML encoded in the body of the message
+class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
+ rgw_pubsub_s3_notifications configurations;
+
+ int get_params_from_body() {
+ const auto max_size = s->cct->_conf->rgw_max_put_param_size;
+ int r;
+ bufferlist data;
+ std::tie(r, data) = rgw_rest_read_all_input(s, max_size, false);
+
+ if (r < 0) {
+ ldout(s->cct, 1) << "failed to read XML payload" << dendl;
+ return r;
+ }
+ if (data.length() == 0) {
+ ldout(s->cct, 1) << "XML payload missing" << dendl;
+ return -EINVAL;
+ }
+
+ RGWXMLDecoder::XMLParser parser;
+
+ if (!parser.init()){
+ ldout(s->cct, 1) << "failed to initialize XML parser" << dendl;
+ return -EINVAL;
+ }
+ if (!parser.parse(data.c_str(), data.length(), 1)) {
+ ldout(s->cct, 1) << "failed to parse XML payload" << dendl;
+ return -ERR_MALFORMED_XML;
+ }
+ try {
+ // NotificationConfigurations is mandatory
+ RGWXMLDecoder::decode_xml("NotificationConfiguration", configurations, &parser, true);
+ } catch (RGWXMLDecoder::err& err) {
+ ldout(s->cct, 1) << "failed to parse XML payload. error: " << err << dendl;
+ return -ERR_MALFORMED_XML;
+ }
+ return 0;
+ }
+
+ int get_params() override {
+ bool exists;
+ const auto no_value = s->info.args.get("notification", &exists);
+ if (!exists) {
+ ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
+ return -EINVAL;
+ }
+ if (no_value.length() > 0) {
+ ldout(s->cct, 1) << "param 'notification' should not have any value" << dendl;
+ return -EINVAL;
+ }
+ if (s->bucket_name.empty()) {
+ ldout(s->cct, 1) << "request must be on a bucket" << dendl;
+ return -EINVAL;
+ }
+ bucket_name = s->bucket_name;
+ return 0;
+ }
+
+public:
+ const char* name() const override { return "pubsub_notification_create_s3"; }
+ void execute() override;
+};
+
+void RGWPSCreateNotif_ObjStore_S3::execute() {
+ op_ret = get_params_from_body();
+ if (op_ret < 0) {
+ return;
+ }
+
+ ups.emplace(store, s->owner.get_id());
+ auto b = ups->get_bucket(bucket_info.bucket);
+ ceph_assert(b);
+ std::string data_bucket_prefix = "";
+ std::string data_oid_prefix = "";
+ if (store->getRados()->get_sync_module()) {
+ const auto psmodule = dynamic_cast<RGWPSSyncModuleInstance*>(store->getRados()->get_sync_module().get());
+ if (psmodule) {
+ const auto& conf = psmodule->get_effective_conf();
+ data_bucket_prefix = conf["data_bucket_prefix"];
+ data_oid_prefix = conf["data_oid_prefix"];
+ }
+ }
+
+ for (const auto& c : configurations.list) {
+ const auto& sub_name = c.id;
+ if (sub_name.empty()) {
+ ldout(s->cct, 1) << "missing notification id" << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+ if (c.topic_arn.empty()) {
+ ldout(s->cct, 1) << "missing topic ARN" << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+
+ const auto arn = rgw::ARN::parse(c.topic_arn);
+ if (!arn || arn->resource.empty()) {
+ ldout(s->cct, 1) << "topic ARN has invalid format:" << c.topic_arn << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+
+ const auto topic_name = arn->resource;
+
+ // get topic information. destination information is stored in the topic
+ rgw_pubsub_topic_subs topic_info;
+ op_ret = ups->get_topic(topic_name, &topic_info);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ // make sure that full topic configuration match
+ // TODO: use ARN match function
+
+ // create unique topic name. this has 2 reasons:
+ // (1) topics cannot be shared between different S3 notifications because they hold the filter information
+ // (2) make topic clneaup easier, when notification is removed
+ const auto unique_topic_name = topic_to_unique(topic_name, sub_name);
+ // generate the internal topic, no need to store destination info in thr unique topic
+ // ARN is cached to make the "GET" method faster
+ op_ret = ups->create_topic(unique_topic_name, rgw_pubsub_sub_dest(), topic_info.topic.arn);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to auto-generate topic '" << unique_topic_name <<
+ "', ret=" << op_ret << dendl;
+ return;
+ }
+ // generate the notification
+ EventTypeList events;
+ std::transform(c.events.begin(), c.events.end(), std::inserter(events, events.begin()), s3_to_gcp_event);
+ op_ret = b->create_notification(unique_topic_name, events);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to auto-generate notification on topic '" << unique_topic_name <<
+ "', ret=" << op_ret << dendl;
+ // rollback generated topic (ignore return value)
+ ups->remove_topic(unique_topic_name);
+ return;
+ }
+
+ // generate the subscription with destination information from the original topic
+ rgw_pubsub_sub_dest dest = topic_info.topic.dest;
+ dest.bucket_name = data_bucket_prefix + s->owner.get_id().to_str() + "-" + unique_topic_name;
+ dest.oid_prefix = data_oid_prefix + sub_name + "/";
+ auto sub = ups->get_sub(sub_name);
+ op_ret = sub->subscribe(unique_topic_name, dest, sub_name);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to auto-generate subscription '" << sub_name << "', ret=" << op_ret << dendl;
+ // rollback generated notification (ignore return value)
+ b->remove_notification(unique_topic_name);
+ // rollback generated topic (ignore return value)
+ ups->remove_topic(unique_topic_name);
+ return;
+ }
+ ldout(s->cct, 20) << "successfully auto-generated subscription '" << sub_name << "'" << dendl;
+ }
+}
+
+// command (extension to S3): DELETE /bucket?notification[=<notification-id>]
+class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSDeleteNotifOp {
+private:
+ std::string sub_name;
+
+ int get_params() override {
+ bool exists;
+ sub_name = s->info.args.get("notification", &exists);
+ if (!exists) {
+ ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
+ return -EINVAL;
+ }
+ if (s->bucket_name.empty()) {
+ ldout(s->cct, 1) << "request must be on a bucket" << dendl;
+ return -EINVAL;
+ }
+ bucket_name = s->bucket_name;
+ return 0;
+ }
+
+ void delete_notification(const std::string& _sub_name, const RGWUserPubSub::BucketRef& b, bool must_delete) {
+ auto sub = ups->get_sub(_sub_name);
+ rgw_pubsub_sub_config sub_conf;
+ op_ret = sub->get_conf(&sub_conf);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
+ return;
+ }
+ if (sub_conf.s3_id.empty()) {
+ if (must_delete) {
+ op_ret = -ENOENT;
+ ldout(s->cct, 1) << "notification does not have an ID, ret=" << op_ret << dendl;
+ }
+ return;
+ }
+ const auto& sub_topic_name = sub_conf.topic;
+ op_ret = sub->unsubscribe(sub_topic_name);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to remove auto-generated subscription, ret=" << op_ret << dendl;
+ return;
+ }
+ op_ret = b->remove_notification(sub_topic_name);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to remove auto-generated notification, ret=" << op_ret << dendl;
+ }
+ op_ret = ups->remove_topic(sub_topic_name);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to remove auto-generated topic, ret=" << op_ret << dendl;
+ }
+ return;
+ }
+
+public:
+ void execute() override;
+ const char* name() const override { return "pubsub_notification_delete_s3"; }
+};
+
+void RGWPSDeleteNotif_ObjStore_S3::execute() {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+
+ ups.emplace(store, s->owner.get_id());
+ auto b = ups->get_bucket(bucket_info.bucket);
+ ceph_assert(b);
+
+ if (!sub_name.empty()) {
+ // delete a specific notification
+ delete_notification(sub_name, b, true);
+ return;
+ }
+
+ // delete all notifications on a bucket
+ rgw_pubsub_bucket_topics bucket_topics;
+ b->get_topics(&bucket_topics);
+ // loop through all topics of the bucket
+ for (const auto& topic : bucket_topics.topics) {
+ // for each topic get all subscriptions
+ rgw_pubsub_topic_subs topic_subs;
+ ups->get_topic(topic.first, &topic_subs);
+ // loop through all subscriptions
+ for (const auto& topic_sub_name : topic_subs.subs) {
+ delete_notification(topic_sub_name, b, false);
+ }
+ }
+}
+
+// command (S3 compliant): GET /bucket?notification[=<notification-id>]
+class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp {
+private:
+ std::string sub_name;
+ rgw_pubsub_s3_notifications notifications;
+
+ int get_params() override {
+ bool exists;
+ sub_name = s->info.args.get("notification", &exists);
+ if (!exists) {
+ ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
+ return -EINVAL;
+ }
+ if (s->bucket_name.empty()) {
+ ldout(s->cct, 1) << "request must be on a bucket" << dendl;
+ return -EINVAL;
+ }
+ bucket_name = s->bucket_name;
+ return 0;
+ }
+
+ void add_notification_to_list(const rgw_pubsub_sub_config& sub_conf,
+ const EventTypeList& events,
+ const std::string& topic_arn);
+
+public:
+ void execute() override;
+ void send_response() override {
+ if (op_ret) {
+ set_req_state_err(s, op_ret);
+ }
+ dump_errno(s);
+ end_header(s, this, "application/xml");
+
+ if (op_ret < 0) {
+ return;
+ }
+ notifications.dump_xml(s->formatter);
+ rgw_flush_formatter_and_reset(s, s->formatter);
+ }
+ const char* name() const override { return "pubsub_notifications_get_s3"; }
+};
+
+void RGWPSListNotifs_ObjStore_S3::add_notification_to_list(const rgw_pubsub_sub_config& sub_conf,
+ const EventTypeList& events,
+ const std::string& topic_arn) {
+ rgw_pubsub_s3_notification notification;
+ notification.id = sub_conf.s3_id;
+ notification.topic_arn = topic_arn,
+ std::transform(events.begin(), events.end(), std::back_inserter(notification.events), gcp_to_s3_event);
+ notifications.list.push_back(notification);
+}
+
+void RGWPSListNotifs_ObjStore_S3::execute() {
+ ups.emplace(store, s->owner.get_id());
+ auto b = ups->get_bucket(bucket_info.bucket);
+ ceph_assert(b);
+ if (!sub_name.empty()) {
+ // get info of a specific notification
+ auto sub = ups->get_sub(sub_name);
+ rgw_pubsub_sub_config sub_conf;
+ op_ret = sub->get_conf(&sub_conf);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
+ return;
+ }
+ if (sub_conf.s3_id.empty()) {
+ op_ret = -ENOENT;
+ ldout(s->cct, 1) << "notification does not have an ID, ret=" << op_ret << dendl;
+ return;
+ }
+ rgw_pubsub_bucket_topics bucket_topics;
+ op_ret = b->get_topics(&bucket_topics);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
+ return;
+ }
+ const auto topic_it = bucket_topics.topics.find(sub_conf.topic);
+ if (topic_it == bucket_topics.topics.end()) {
+ op_ret = -ENOENT;
+ ldout(s->cct, 1) << "notification does not have topic information, ret=" << op_ret << dendl;
+ return;
+ }
+ add_notification_to_list(sub_conf, topic_it->second.events, topic_it->second.topic.arn);
+ return;
+ }
+ // get info on all s3 notifications of the bucket
+ rgw_pubsub_bucket_topics bucket_topics;
+ b->get_topics(&bucket_topics);
+ // loop through all topics of the bucket
+ for (const auto& topic : bucket_topics.topics) {
+ // for each topic get all subscriptions
+ rgw_pubsub_topic_subs topic_subs;
+ ups->get_topic(topic.first, &topic_subs);
+ const auto& events = topic.second.events;
+ const auto& topic_arn = topic.second.topic.arn;
+ // loop through all subscriptions
+ for (const auto& topic_sub_name : topic_subs.subs) {
+ // get info of a specific notification
+ auto sub = ups->get_sub(topic_sub_name);
+ rgw_pubsub_sub_config sub_conf;
+ op_ret = sub->get_conf(&sub_conf);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
+ return;
+ }
+ if (sub_conf.s3_id.empty()) {
+ // not an s3 notification
+ continue;
+ }
+ add_notification_to_list(sub_conf, events, topic_arn);
+ }
+ }
+}
+
+RGWOp* RGWHandler_REST_PSNotifs_S3::op_get() {
+ return new RGWPSListNotifs_ObjStore_S3();
+}
+
+RGWOp* RGWHandler_REST_PSNotifs_S3::op_put() {
+ return new RGWPSCreateNotif_ObjStore_S3();
+}
+
+RGWOp* RGWHandler_REST_PSNotifs_S3::op_delete() {
+ return new RGWPSDeleteNotif_ObjStore_S3();
+}
+
+RGWOp* RGWHandler_REST_PSNotifs_S3::create_get_op() {
+ return new RGWPSListNotifs_ObjStore_S3();
+}
+
+RGWOp* RGWHandler_REST_PSNotifs_S3::create_put_op() {
+ return new RGWPSCreateNotif_ObjStore_S3();
+}
+
+RGWOp* RGWHandler_REST_PSNotifs_S3::create_delete_op() {
+ return new RGWPSDeleteNotif_ObjStore_S3();
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#pragma once
+
+#include "rgw_rest_s3.h"
+
+// s3 compliant notification handler factory
+class RGWHandler_REST_PSNotifs_S3 : public RGWHandler_REST_S3 {
+protected:
+ int init_permissions(RGWOp* op) override {return 0;}
+ int read_permissions(RGWOp* op) override {return 0;}
+ bool supports_quota() override {return false;}
+ RGWOp* op_get() override;
+ RGWOp* op_put() override;
+ RGWOp* op_delete() override;
+public:
+ using RGWHandler_REST_S3::RGWHandler_REST_S3;
+ virtual ~RGWHandler_REST_PSNotifs_S3() = default;
+ // following are used to generate the operations when invoked by another REST handler
+ static RGWOp* create_get_op();
+ static RGWOp* create_put_op();
+ static RGWOp* create_delete_op();
+};
+
+// AWS compliant topics handler factory
+class RGWHandler_REST_PSTopic_AWS : public RGWHandler_REST {
+ const rgw::auth::StrategyRegistry& auth_registry;
+ const std::string& post_body;
+ void rgw_topic_parse_input();
+ //static int init_from_header(struct req_state *s, int default_formatter, bool configurable_format);
+protected:
+ RGWOp* op_post() override;
+public:
+ RGWHandler_REST_PSTopic_AWS(const rgw::auth::StrategyRegistry& _auth_registry, const std::string& _post_body) :
+ auth_registry(_auth_registry),
+ post_body(_post_body) {}
+ virtual ~RGWHandler_REST_PSTopic_AWS() = default;
+ int postauth_init() override { return 0; }
+ int authorize(const DoutPrefixProvider* dpp) override;
+};
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rgw_rest_pubsub_common.h"
+#include "common/dout.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rgw
+
+void RGWPSCreateTopicOp::execute() {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+
+ ups.emplace(store, s->owner.get_id());
+ op_ret = ups->create_topic(topic_name, dest, topic_arn);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ ldout(s->cct, 20) << "successfully created topic '" << topic_name << "'" << dendl;
+}
+
+void RGWPSListTopicsOp::execute() {
+ ups.emplace(store, s->owner.get_id());
+ op_ret = ups->get_user_topics(&result);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl;
+ return;
+ }
+ ldout(s->cct, 20) << "successfully got topics" << dendl;
+}
+
+void RGWPSGetTopicOp::execute() {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+ ups.emplace(store, s->owner.get_id());
+ op_ret = ups->get_topic(topic_name, &result);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ ldout(s->cct, 1) << "successfully got topic '" << topic_name << "'" << dendl;
+}
+
+void RGWPSDeleteTopicOp::execute() {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+ ups.emplace(store, s->owner.get_id());
+ op_ret = ups->remove_topic(topic_name);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
+ return;
+ }
+ ldout(s->cct, 1) << "successfully removed topic '" << topic_name << "'" << dendl;
+}
+
+void RGWPSCreateSubOp::execute() {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+ ups.emplace(store, s->owner.get_id());
+ auto sub = ups->get_sub(sub_name);
+ op_ret = sub->subscribe(topic_name, dest);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to create subscription '" << sub_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ ldout(s->cct, 20) << "successfully created subscription '" << sub_name << "'" << dendl;
+}
+
+void RGWPSGetSubOp::execute() {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+ ups.emplace(store, s->owner.get_id());
+ auto sub = ups->get_sub(sub_name);
+ op_ret = sub->get_conf(&result);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to get subscription '" << sub_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ ldout(s->cct, 20) << "successfully got subscription '" << sub_name << "'" << dendl;
+}
+
+void RGWPSDeleteSubOp::execute() {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+ ups.emplace(store, s->owner.get_id());
+ auto sub = ups->get_sub(sub_name);
+ op_ret = sub->unsubscribe(topic_name);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to remove subscription '" << sub_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ ldout(s->cct, 20) << "successfully removed subscription '" << sub_name << "'" << dendl;
+}
+
+void RGWPSAckSubEventOp::execute() {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+ ups.emplace(store, s->owner.get_id());
+ auto sub = ups->get_sub_with_events(sub_name);
+ op_ret = sub->remove_event(event_id);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to ack event on subscription '" << sub_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ ldout(s->cct, 20) << "successfully acked event on subscription '" << sub_name << "'" << dendl;
+}
+
+void RGWPSPullSubEventsOp::execute() {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+ ups.emplace(store, s->owner.get_id());
+ sub = ups->get_sub_with_events(sub_name);
+ if (!sub) {
+ op_ret = -ENOENT;
+ ldout(s->cct, 1) << "failed to get subscription '" << sub_name << "' for events, ret=" << op_ret << dendl;
+ return;
+ }
+ op_ret = sub->list_events(marker, max_entries);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to get events from subscription '" << sub_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ ldout(s->cct, 20) << "successfully got events from subscription '" << sub_name << "'" << dendl;
+}
+
+
+int RGWPSCreateNotifOp::verify_permission() {
+ int ret = get_params();
+ if (ret < 0) {
+ return ret;
+ }
+
+ const auto& id = s->owner.get_id();
+
+ ret = store->getRados()->get_bucket_info(*s->sysobj_ctx, id.tenant, bucket_name,
+ bucket_info, nullptr, null_yield, nullptr);
+ if (ret < 0) {
+ ldout(s->cct, 1) << "failed to get bucket info, cannot verify ownership" << dendl;
+ return ret;
+ }
+
+ if (bucket_info.owner != id) {
+ ldout(s->cct, 1) << "user doesn't own bucket, not allowed to create notification" << dendl;
+ return -EPERM;
+ }
+ return 0;
+}
+
+int RGWPSDeleteNotifOp::verify_permission() {
+ int ret = get_params();
+ if (ret < 0) {
+ return ret;
+ }
+
+ ret = store->getRados()->get_bucket_info(*s->sysobj_ctx, s->owner.get_id().tenant, bucket_name,
+ bucket_info, nullptr, null_yield, nullptr);
+ if (ret < 0) {
+ return ret;
+ }
+
+ if (bucket_info.owner != s->owner.get_id()) {
+ ldout(s->cct, 1) << "user doesn't own bucket, cannot remove notification" << dendl;
+ return -EPERM;
+ }
+ return 0;
+}
+
+int RGWPSListNotifsOp::verify_permission() {
+ int ret = get_params();
+ if (ret < 0) {
+ return ret;
+ }
+
+ ret = store->getRados()->get_bucket_info(*s->sysobj_ctx, s->owner.get_id().tenant, bucket_name,
+ bucket_info, nullptr, null_yield, nullptr);
+ if (ret < 0) {
+ return ret;
+ }
+
+ if (bucket_info.owner != s->owner.get_id()) {
+ ldout(s->cct, 1) << "user doesn't own bucket, cannot get topic list" << dendl;
+ return -EPERM;
+ }
+
+ return 0;
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#pragma once
+#include <string>
+#include <optional>
+#include "rgw_op.h"
+#include "rgw_pubsub.h"
+
+// create a topic
+class RGWPSCreateTopicOp : public RGWDefaultResponseOp {
+protected:
+ std::optional<RGWUserPubSub> ups;
+ std::string topic_name;
+ rgw_pubsub_sub_dest dest;
+ std::string topic_arn;
+
+ virtual int get_params() = 0;
+
+public:
+ int verify_permission() override {
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute() override;
+
+ const char* name() const override { return "pubsub_topic_create"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_CREATE; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+};
+
+// list all topics
+class RGWPSListTopicsOp : public RGWOp {
+protected:
+ std::optional<RGWUserPubSub> ups;
+ rgw_pubsub_user_topics result;
+
+public:
+ int verify_permission() override {
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute() override;
+
+ const char* name() const override { return "pubsub_topics_list"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPICS_LIST; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+};
+
+// get topic information
+class RGWPSGetTopicOp : public RGWOp {
+protected:
+ std::string topic_name;
+ std::optional<RGWUserPubSub> ups;
+ rgw_pubsub_topic_subs result;
+
+ virtual int get_params() = 0;
+
+public:
+ int verify_permission() override {
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute() override;
+
+ const char* name() const override { return "pubsub_topic_get"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+};
+
+// delete a topic
+class RGWPSDeleteTopicOp : public RGWDefaultResponseOp {
+protected:
+ string topic_name;
+ std::optional<RGWUserPubSub> ups;
+
+ virtual int get_params() = 0;
+
+public:
+ int verify_permission() override {
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute() override;
+
+ const char* name() const override { return "pubsub_topic_delete"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_DELETE; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
+};
+
+// create a subscription
+class RGWPSCreateSubOp : public RGWDefaultResponseOp {
+protected:
+ std::string sub_name;
+ std::string topic_name;
+ std::optional<RGWUserPubSub> ups;
+ rgw_pubsub_sub_dest dest;
+
+ virtual int get_params() = 0;
+
+public:
+ int verify_permission() override {
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute() override;
+
+ const char* name() const override { return "pubsub_subscription_create"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_CREATE; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+};
+
+// get subscription information (including push-endpoint if exist)
+class RGWPSGetSubOp : public RGWOp {
+protected:
+ std::string sub_name;
+ std::optional<RGWUserPubSub> ups;
+ rgw_pubsub_sub_config result;
+
+ virtual int get_params() = 0;
+
+public:
+ int verify_permission() override {
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute() override;
+
+ const char* name() const override { return "pubsub_subscription_get"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_GET; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+};
+
+// delete subscription
+class RGWPSDeleteSubOp : public RGWDefaultResponseOp {
+protected:
+ std::string sub_name;
+ std::string topic_name;
+ std::optional<RGWUserPubSub> ups;
+
+ virtual int get_params() = 0;
+
+public:
+ int verify_permission() override {
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute() override;
+
+ const char* name() const override { return "pubsub_subscription_delete"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_DELETE; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
+};
+
+// acking of an event
+class RGWPSAckSubEventOp : public RGWDefaultResponseOp {
+protected:
+ std::string sub_name;
+ std::string event_id;
+ std::optional<RGWUserPubSub> ups;
+
+ virtual int get_params() = 0;
+
+public:
+ RGWPSAckSubEventOp() {}
+
+ int verify_permission() override {
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute() override;
+
+ const char* name() const override { return "pubsub_subscription_ack"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_ACK; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+};
+
+// fetching events from a subscription
+// dpending on whether the subscription was created via s3 compliant API or not
+// the matching events will be returned
+class RGWPSPullSubEventsOp : public RGWOp {
+protected:
+ int max_entries{0};
+ std::string sub_name;
+ std::string marker;
+ std::optional<RGWUserPubSub> ups;
+ RGWUserPubSub::SubRef sub;
+
+ virtual int get_params() = 0;
+
+public:
+ RGWPSPullSubEventsOp() {}
+
+ int verify_permission() override {
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute() override;
+
+ const char* name() const override { return "pubsub_subscription_pull"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_PULL; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+};
+
+// notification creation
+class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
+protected:
+ std::optional<RGWUserPubSub> ups;
+ string bucket_name;
+ RGWBucketInfo bucket_info;
+
+ virtual int get_params() = 0;
+
+public:
+ int verify_permission() override;
+
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+};
+
+// delete a notification
+class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
+protected:
+ std::optional<RGWUserPubSub> ups;
+ std::string bucket_name;
+ RGWBucketInfo bucket_info;
+
+ virtual int get_params() = 0;
+
+public:
+ int verify_permission() override;
+
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_DELETE; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
+};
+
+// get topics/notifications on a bucket
+class RGWPSListNotifsOp : public RGWOp {
+protected:
+ std::string bucket_name;
+ RGWBucketInfo bucket_info;
+ std::optional<RGWUserPubSub> ups;
+
+ virtual int get_params() = 0;
+
+public:
+ int verify_permission() override;
+
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_LIST; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+};
+
#include "rgw_rest.h"
#include "rgw_rest_s3.h"
#include "rgw_rest_s3website.h"
+#include "rgw_rest_pubsub.h"
#include "rgw_auth_s3.h"
#include "rgw_acl.h"
#include "rgw_policy_s3.h"
auto& storage_class = rgw_placement_rule::get_canonical_storage_class(iter->meta.storage_class);
s->formatter->dump_string("StorageClass", storage_class.c_str());
}
- dump_owner(s, iter->meta.owner, iter->meta.owner_display_name);
+ dump_owner(s, rgw_user(iter->meta.owner), iter->meta.owner_display_name);
if (iter->meta.appendable) {
s->formatter->dump_string("Type", "Appendable");
} else {
s->formatter->dump_int("Size", iter->meta.accounted_size);
auto& storage_class = rgw_placement_rule::get_canonical_storage_class(iter->meta.storage_class);
s->formatter->dump_string("StorageClass", storage_class.c_str());
- dump_owner(s, iter->meta.owner, iter->meta.owner_display_name);
+ dump_owner(s, rgw_user(iter->meta.owner), iter->meta.owner_display_name);
if (s->system_request) {
s->formatter->dump_string("RgwxTag", iter->tag);
}
{
const auto max_size = s->cct->_conf->rgw_max_put_param_size;
- int ret = 0;
+ int ret;
bufferlist data;
std::tie(ret, data) = rgw_rest_read_all_input(s, max_size, false);
- string post_body = data.to_str();
+ if (ret < 0) {
+ return nullptr;
+ }
+
+ const auto post_body = data.to_str();
- if (this->isSTSenabled) {
+ if (isSTSEnabled) {
RGWHandler_REST_STS sts_handler(auth_registry, post_body);
sts_handler.init(store, s, s->cio);
auto op = sts_handler.get_op(store);
}
}
- if (this->isIAMenabled) {
+ if (isIAMEnabled) {
RGWHandler_REST_IAM iam_handler(auth_registry, post_body);
iam_handler.init(store, s, s->cio);
auto op = iam_handler.get_op(store);
return op;
}
}
- return NULL;
+
+ if (isPSEnabled) {
+ RGWHandler_REST_PSTopic_AWS topic_handler(auth_registry, post_body);
+ topic_handler.init(store, s, s->cio);
+ auto op = topic_handler.get_op(store);
+ if (op) {
+ return op;
+ }
+ }
+
+ return nullptr;
}
RGWOp *RGWHandler_REST_Bucket_S3::get_obj_op(bool get_data)
return new RGWGetBucketTags_ObjStore_S3;
} else if (is_object_lock_op()) {
return new RGWGetBucketObjectLock_ObjStore_S3;
+ } else if (is_notification_op()) {
+ return RGWHandler_REST_PSNotifs_S3::create_get_op();
}
return get_obj_op(true);
}
return nullptr;
} else if (is_object_lock_op()) {
return new RGWPutBucketObjectLock_ObjStore_S3;
+ } else if (is_notification_op()) {
+ return RGWHandler_REST_PSNotifs_S3::create_put_op();
}
return new RGWCreateBucket_ObjStore_S3;
}
return new RGWDeleteLC_ObjStore_S3;
} else if(is_policy_op()) {
return new RGWDeleteBucketPolicy;
+ } else if (is_notification_op()) {
+ return RGWHandler_REST_PSNotifs_S3::create_delete_op();
}
if (s->info.args.sub_resource_exists("website")) {
}
} else {
if (s->init_state.url_bucket.empty()) {
- handler = new RGWHandler_REST_Service_S3(auth_registry, enable_sts, enable_iam);
+ handler = new RGWHandler_REST_Service_S3(auth_registry, enable_sts, enable_iam, enable_pubsub);
} else if (s->object.empty()) {
- handler = new RGWHandler_REST_Bucket_S3(auth_registry);
+ handler = new RGWHandler_REST_Bucket_S3(auth_registry, enable_pubsub);
} else {
handler = new RGWHandler_REST_Obj_S3(auth_registry);
}
class RGWHandler_REST_Service_S3 : public RGWHandler_REST_S3 {
protected:
- bool isSTSenabled;
- bool isIAMenabled;
+ const bool isSTSEnabled;
+ const bool isIAMEnabled;
+ const bool isPSEnabled;
bool is_usage_op() {
return s->info.args.exists("usage");
}
RGWOp *op_post() override;
public:
RGWHandler_REST_Service_S3(const rgw::auth::StrategyRegistry& auth_registry,
- bool isSTSenabled, bool isIAMenabled) :
- RGWHandler_REST_S3(auth_registry), isSTSenabled(isSTSenabled), isIAMenabled(isIAMenabled) {}
+ bool _isSTSEnabled, bool _isIAMEnabled, bool _isPSEnabled) :
+ RGWHandler_REST_S3(auth_registry), isSTSEnabled(_isSTSEnabled), isIAMEnabled(_isIAMEnabled), isPSEnabled(_isPSEnabled) {}
~RGWHandler_REST_Service_S3() override = default;
};
class RGWHandler_REST_Bucket_S3 : public RGWHandler_REST_S3 {
+ const bool enable_pubsub;
protected:
bool is_acl_op() {
return s->info.args.exists("acl");
bool is_object_lock_op() {
return s->info.args.exists("object-lock");
}
+ bool is_notification_op() const {
+ if (enable_pubsub) {
+ return s->info.args.exists("notification");
+ }
+ return false;
+ }
RGWOp *get_obj_op(bool get_data);
RGWOp *op_get() override;
RGWOp *op_post() override;
RGWOp *op_options() override;
public:
- using RGWHandler_REST_S3::RGWHandler_REST_S3;
+ RGWHandler_REST_Bucket_S3(const rgw::auth::StrategyRegistry& auth_registry, bool _enable_pubsub) :
+ RGWHandler_REST_S3(auth_registry), enable_pubsub(_enable_pubsub) {}
~RGWHandler_REST_Bucket_S3() override = default;
};
class RGWRESTMgr_S3 : public RGWRESTMgr {
private:
- bool enable_s3website;
- bool enable_sts;
- bool enable_iam;
-public:
- explicit RGWRESTMgr_S3(bool enable_s3website = false, bool enable_sts = false, bool enable_iam = false)
- : enable_s3website(enable_s3website),
- enable_sts(enable_sts),
- enable_iam(enable_iam) {
+ const bool enable_s3website;
+ const bool enable_sts;
+ const bool enable_iam;
+ const bool enable_pubsub;
+public:
+ explicit RGWRESTMgr_S3(bool _enable_s3website=false, bool _enable_sts=false, bool _enable_iam=false, bool _enable_pubsub=false)
+ : enable_s3website(_enable_s3website),
+ enable_sts(_enable_sts),
+ enable_iam(_enable_iam),
+ enable_pubsub(_enable_pubsub) {
}
~RGWRESTMgr_S3() override = default;
for (const auto& t : tokens) {
auto pos = t.find("=");
if (pos != string::npos) {
- std::string key = t.substr(0, pos);
- std::string value = t.substr(pos + 1, t.size() - 1);
- if (key == "RoleArn" || key == "Policy") {
- value = url_decode(value);
- }
- s->info.args.append(key, value);
- }
- }
- }
+ const auto key = t.substr(0, pos);
+ if (key == "Action") {
+ s->info.args.append(key, t.substr(pos + 1, t.size() - 1));
+ } else if (key == "RoleArn" || key == "Policy") {
+ const auto value = url_decode(t.substr(pos + 1, t.size() - 1));
+ s->info.args.append(key, value);
+ }
+ }
+ }
+ }
}
auto payload_hash = rgw::auth::s3::calc_v4_payload_hash(post_body);
s->info.args.append("PayloadHash", payload_hash);
acl_strategy_t&& extra_acl_strategy,
const rgw::auth::RemoteApplier::AuthInfo &info) const override {
auto apl = \
- rgw::auth::add_3rdparty(ctl, s->account_name,
+ rgw::auth::add_3rdparty(ctl, rgw_user(s->account_name),
rgw::auth::add_sysreq(cct, ctl, s,
rgw::auth::RemoteApplier(cct, ctl, std::move(extra_acl_strategy), info,
implicit_tenant_context,
const std::string& subuser,
const boost::optional<uint32_t>& perm_mask) const override {
auto apl = \
- rgw::auth::add_3rdparty(ctl, s->account_name,
+ rgw::auth::add_3rdparty(ctl, rgw_user(s->account_name),
rgw::auth::add_sysreq(cct, ctl, s,
rgw::auth::LocalApplier(cct, user_info, subuser, perm_mask)));
/* TODO(rzarzynski): replace with static_ptr. */
if (dialect != RGW_REST_S3) {
return orig;
}
- return new RGWRESTMgr_PubSub_S3(orig);
+ return new RGWRESTMgr_PubSub();
}
bool RGWPSSyncModuleInstance::should_full_sync() const {
// vim: ts=8 sw=2 smarttab ft=cpp
#include <algorithm>
+#include "rgw_rest_pubsub_common.h"
+#include "rgw_rest_pubsub.h"
#include "rgw_sync_module_pubsub.h"
#include "rgw_pubsub_push.h"
#include "rgw_sync_module_pubsub_rest.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
-// create a topic
-class RGWPSCreateTopicOp : public RGWDefaultResponseOp {
-protected:
- std::unique_ptr<RGWUserPubSub> ups;
- std::string topic_name;
- rgw_pubsub_sub_dest dest;
- std::string topic_arn;
-
- virtual int get_params() = 0;
-
-public:
- int verify_permission() override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute() override;
-
- const char* name() const override { return "pubsub_topic_create"; }
- RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_CREATE; }
- uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
-};
-
-void RGWPSCreateTopicOp::execute()
-{
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
-
- ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
- op_ret = ups->create_topic(topic_name, dest, topic_arn);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
- return;
- }
- ldout(s->cct, 20) << "successfully created topic '" << topic_name << "'" << dendl;
-}
-
// command: PUT /topics/<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]
-class RGWPSCreateTopic_ObjStore_S3 : public RGWPSCreateTopicOp {
+class RGWPSCreateTopic_ObjStore : public RGWPSCreateTopicOp {
public:
int get_params() override {
}
};
-// list all topics
-class RGWPSListTopicsOp : public RGWOp {
-protected:
- std::unique_ptr<RGWUserPubSub> ups;
- rgw_pubsub_user_topics result;
-
-public:
- int verify_permission() override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute() override;
-
- const char* name() const override { return "pubsub_topics_list"; }
- RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPICS_LIST; }
- uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-};
-
-void RGWPSListTopicsOp::execute()
-{
- ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
- op_ret = ups->get_user_topics(&result);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl;
- return;
- }
- ldout(s->cct, 20) << "successfully got topics" << dendl;
-}
-
// command: GET /topics
-class RGWPSListTopics_ObjStore_S3 : public RGWPSListTopicsOp {
+class RGWPSListTopics_ObjStore : public RGWPSListTopicsOp {
public:
void send_response() override {
if (op_ret) {
}
};
-// get topic information
-class RGWPSGetTopicOp : public RGWOp {
-protected:
- std::string topic_name;
- std::unique_ptr<RGWUserPubSub> ups;
- rgw_pubsub_topic_subs result;
-
- virtual int get_params() = 0;
-
-public:
- int verify_permission() override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute() override;
-
- const char* name() const override { return "pubsub_topic_get"; }
- RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
- uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-};
-
-void RGWPSGetTopicOp::execute()
-{
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
- ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
- op_ret = ups->get_topic(topic_name, &result);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
- return;
- }
- ldout(s->cct, 1) << "successfully got topic '" << topic_name << "'" << dendl;
-}
-
// command: GET /topics/<topic-name>
-class RGWPSGetTopic_ObjStore_S3 : public RGWPSGetTopicOp {
+class RGWPSGetTopic_ObjStore : public RGWPSGetTopicOp {
public:
int get_params() override {
topic_name = s->object.name;
}
};
-// delete a topic
-class RGWPSDeleteTopicOp : public RGWDefaultResponseOp {
-protected:
- string topic_name;
- std::unique_ptr<RGWUserPubSub> ups;
-
- virtual int get_params() = 0;
-
-public:
- int verify_permission() override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute() override;
-
- const char* name() const override { return "pubsub_topic_delete"; }
- RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_DELETE; }
- uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
-};
-
-void RGWPSDeleteTopicOp::execute()
-{
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
-
- ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
- op_ret = ups->remove_topic(topic_name);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
- return;
- }
- ldout(s->cct, 1) << "successfully removed topic '" << topic_name << "'" << dendl;
-}
-
// command: DELETE /topics/<topic-name>
-class RGWPSDeleteTopic_ObjStore_S3 : public RGWPSDeleteTopicOp {
+class RGWPSDeleteTopic_ObjStore : public RGWPSDeleteTopicOp {
public:
int get_params() override {
topic_name = s->object.name;
}
};
-// topics handler factory
-class RGWHandler_REST_PSTopic_S3 : public RGWHandler_REST_S3 {
+// ceph specifc topics handler factory
+class RGWHandler_REST_PSTopic : public RGWHandler_REST_S3 {
protected:
int init_permissions(RGWOp* op) override {
return 0;
return nullptr;
}
if (s->object.empty()) {
- return new RGWPSListTopics_ObjStore_S3();
+ return new RGWPSListTopics_ObjStore();
}
- return new RGWPSGetTopic_ObjStore_S3();
+ return new RGWPSGetTopic_ObjStore();
}
RGWOp *op_put() override {
if (!s->object.empty()) {
- return new RGWPSCreateTopic_ObjStore_S3();
+ return new RGWPSCreateTopic_ObjStore();
}
return nullptr;
}
RGWOp *op_delete() override {
if (!s->object.empty()) {
- return new RGWPSDeleteTopic_ObjStore_S3();
+ return new RGWPSDeleteTopic_ObjStore();
}
return nullptr;
}
public:
- explicit RGWHandler_REST_PSTopic_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
- virtual ~RGWHandler_REST_PSTopic_S3() = default;
+ explicit RGWHandler_REST_PSTopic(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
+ virtual ~RGWHandler_REST_PSTopic() = default;
};
-// create a subscription
-class RGWPSCreateSubOp : public RGWDefaultResponseOp {
-protected:
- std::string sub_name;
- std::string topic_name;
- std::unique_ptr<RGWUserPubSub> ups;
- rgw_pubsub_sub_dest dest;
-
- virtual int get_params() = 0;
-
-public:
- int verify_permission() override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute() override;
-
- const char* name() const override { return "pubsub_subscription_create"; }
- RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_CREATE; }
- uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
-};
-
-void RGWPSCreateSubOp::execute()
-{
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
- ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
- auto sub = ups->get_sub(sub_name);
- op_ret = sub->subscribe(topic_name, dest);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to create subscription '" << sub_name << "', ret=" << op_ret << dendl;
- return;
- }
- ldout(s->cct, 20) << "successfully created subscription '" << sub_name << "'" << dendl;
-}
-
// command: PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]...
-class RGWPSCreateSub_ObjStore_S3 : public RGWPSCreateSubOp {
+class RGWPSCreateSub_ObjStore : public RGWPSCreateSubOp {
public:
int get_params() override {
sub_name = s->object.name;
}
};
-// get subscription information (including push-endpoint if exist)
-class RGWPSGetSubOp : public RGWOp {
-protected:
- std::string sub_name;
- std::unique_ptr<RGWUserPubSub> ups;
- rgw_pubsub_sub_config result;
-
- virtual int get_params() = 0;
-
-public:
- int verify_permission() override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute() override;
-
- const char* name() const override { return "pubsub_subscription_get"; }
- RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_GET; }
- uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-};
-
-void RGWPSGetSubOp::execute()
-{
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
- ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
- auto sub = ups->get_sub(sub_name);
- op_ret = sub->get_conf(&result);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to get subscription '" << sub_name << "', ret=" << op_ret << dendl;
- return;
- }
- ldout(s->cct, 20) << "successfully got subscription '" << sub_name << "'" << dendl;
-}
-
// command: GET /subscriptions/<sub-name>
-class RGWPSGetSub_ObjStore_S3 : public RGWPSGetSubOp {
+class RGWPSGetSub_ObjStore : public RGWPSGetSubOp {
public:
int get_params() override {
sub_name = s->object.name;
}
};
-// delete subscription
-class RGWPSDeleteSubOp : public RGWDefaultResponseOp {
-protected:
- std::string sub_name;
- std::string topic_name;
- std::unique_ptr<RGWUserPubSub> ups;
-
- virtual int get_params() = 0;
-
-public:
- int verify_permission() override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute() override;
-
- const char* name() const override { return "pubsub_subscription_delete"; }
- RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_DELETE; }
- uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
-};
-
-void RGWPSDeleteSubOp::execute()
-{
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
- ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
- auto sub = ups->get_sub(sub_name);
- op_ret = sub->unsubscribe(topic_name);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to remove subscription '" << sub_name << "', ret=" << op_ret << dendl;
- return;
- }
- ldout(s->cct, 20) << "successfully removed subscription '" << sub_name << "'" << dendl;
-}
-
// command: DELETE /subscriptions/<sub-name>
-class RGWPSDeleteSub_ObjStore_S3 : public RGWPSDeleteSubOp {
+class RGWPSDeleteSub_ObjStore : public RGWPSDeleteSubOp {
public:
int get_params() override {
sub_name = s->object.name;
}
};
-// acking of an event
-class RGWPSAckSubEventOp : public RGWDefaultResponseOp {
-protected:
- std::string sub_name;
- std::string event_id;
- std::unique_ptr<RGWUserPubSub> ups;
-
- virtual int get_params() = 0;
-
-public:
- RGWPSAckSubEventOp() {}
-
- int verify_permission() override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute() override;
-
- const char* name() const override { return "pubsub_subscription_ack"; }
- RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_ACK; }
- uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
-};
-
-void RGWPSAckSubEventOp::execute()
-{
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
- ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
- auto sub = ups->get_sub_with_events(sub_name);
- op_ret = sub->remove_event(event_id);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to ack event on subscription '" << sub_name << "', ret=" << op_ret << dendl;
- return;
- }
- ldout(s->cct, 20) << "successfully acked event on subscription '" << sub_name << "'" << dendl;
-}
-
// command: POST /subscriptions/<sub-name>?ack&event-id=<event-id>
-class RGWPSAckSubEvent_ObjStore_S3 : public RGWPSAckSubEventOp {
+class RGWPSAckSubEvent_ObjStore : public RGWPSAckSubEventOp {
public:
- explicit RGWPSAckSubEvent_ObjStore_S3() {}
+ explicit RGWPSAckSubEvent_ObjStore() {}
int get_params() override {
sub_name = s->object.name;
}
};
-// fetching events from a subscription
-// dpending on whether the subscription was created via s3 compliant API or not
-// the matching events will be returned
-class RGWPSPullSubEventsOp : public RGWOp {
-protected:
- int max_entries{0};
- std::string sub_name;
- std::string marker;
- std::unique_ptr<RGWUserPubSub> ups;
- RGWUserPubSub::SubRef sub;
-
- virtual int get_params() = 0;
-
-public:
- RGWPSPullSubEventsOp() {}
-
- int verify_permission() override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute() override;
-
- const char* name() const override { return "pubsub_subscription_pull"; }
- RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_PULL; }
- uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-};
-
-void RGWPSPullSubEventsOp::execute()
-{
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
- ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
- sub = ups->get_sub_with_events(sub_name);
- if (!sub) {
- op_ret = -ENOENT;
- ldout(s->cct, 1) << "failed to get subscription '" << sub_name << "' for events, ret=" << op_ret << dendl;
- return;
- }
- op_ret = sub->list_events(marker, max_entries);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to get events from subscription '" << sub_name << "', ret=" << op_ret << dendl;
- return;
- }
- ldout(s->cct, 20) << "successfully got events from subscription '" << sub_name << "'" << dendl;
-}
-
// command: GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
-class RGWPSPullSubEvents_ObjStore_S3 : public RGWPSPullSubEventsOp {
+class RGWPSPullSubEvents_ObjStore : public RGWPSPullSubEventsOp {
public:
int get_params() override {
sub_name = s->object.name;
};
// subscriptions handler factory
-class RGWHandler_REST_PSSub_S3 : public RGWHandler_REST_S3 {
+class RGWHandler_REST_PSSub : public RGWHandler_REST_S3 {
protected:
int init_permissions(RGWOp* op) override {
return 0;
return nullptr;
}
if (s->info.args.exists("events")) {
- return new RGWPSPullSubEvents_ObjStore_S3();
+ return new RGWPSPullSubEvents_ObjStore();
}
- return new RGWPSGetSub_ObjStore_S3();
+ return new RGWPSGetSub_ObjStore();
}
RGWOp *op_put() override {
if (!s->object.empty()) {
- return new RGWPSCreateSub_ObjStore_S3();
+ return new RGWPSCreateSub_ObjStore();
}
return nullptr;
}
RGWOp *op_delete() override {
if (!s->object.empty()) {
- return new RGWPSDeleteSub_ObjStore_S3();
+ return new RGWPSDeleteSub_ObjStore();
}
return nullptr;
}
RGWOp *op_post() override {
if (s->info.args.exists("ack")) {
- return new RGWPSAckSubEvent_ObjStore_S3();
+ return new RGWPSAckSubEvent_ObjStore();
}
return nullptr;
}
public:
- explicit RGWHandler_REST_PSSub_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
- virtual ~RGWHandler_REST_PSSub_S3() = default;
+ explicit RGWHandler_REST_PSSub(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
+ virtual ~RGWHandler_REST_PSSub() = default;
};
namespace {
}
}
-// notification creation
-class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
-protected:
- std::unique_ptr<RGWUserPubSub> ups;
- string bucket_name;
- RGWBucketInfo bucket_info;
-
- virtual int get_params() = 0;
-
-public:
- int verify_permission() override {
- int ret = get_params();
- if (ret < 0) {
- return ret;
- }
-
- const auto& id = s->owner.get_id();
-
- ret = store->getRados()->get_bucket_info(*s->sysobj_ctx, id.tenant, bucket_name,
- bucket_info, nullptr, null_yield, nullptr);
- if (ret < 0) {
- ldout(s->cct, 1) << "failed to get bucket info, cannot verify ownership" << dendl;
- return ret;
- }
-
- if (bucket_info.owner != id) {
- ldout(s->cct, 1) << "user doesn't own bucket, not allowed to create notification" << dendl;
- return -EPERM;
- }
- return 0;
- }
-
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
-
- RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
- uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
-};
-
-
// command (ceph specific): PUT /notification/bucket/<bucket name>?topic=<topic name>
-class RGWPSCreateNotif_ObjStore_Ceph : public RGWPSCreateNotifOp {
+class RGWPSCreateNotif_ObjStore : public RGWPSCreateNotifOp {
private:
std::string topic_name;
std::set<std::string, ltstr_nocase> events;
void execute() override;
};
-void RGWPSCreateNotif_ObjStore_Ceph::execute()
+void RGWPSCreateNotif_ObjStore::execute()
{
- ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
+ ups.emplace(store, s->owner.get_id());
auto b = ups->get_bucket(bucket_info.bucket);
op_ret = b->create_notification(topic_name, events);
ldout(s->cct, 20) << "successfully created notification for topic '" << topic_name << "'" << dendl;
}
-namespace {
-// conversion functions between S3 and GCP style event names
-std::string s3_to_gcp_event(const std::string& event) {
- if (event == "s3:ObjectCreated:*") {
- return "OBJECT_CREATE";
- }
- if (event == "s3:ObjectRemoved:*") {
- return "OBJECT_DELETE";
- }
- return "UNKNOWN_EVENT";
-}
-std::string gcp_to_s3_event(const std::string& event) {
- if (event == "OBJECT_CREATE") {
- return "s3:ObjectCreated:";
- }
- if (event == "OBJECT_DELETE") {
- return "s3:ObjectRemoved:";
- }
- return "UNKNOWN_EVENT";
-}
-
-// return a unique topic by prefexing with the notification name: <notification>_<topic>
-std::string topic_to_unique(const std::string& topic, const std::string& notification) {
- return notification + "_" + topic;
-}
-
-// extract the topic from a unique topic of the form: <notification>_<topic>
-[[maybe_unused]] std::string unique_to_topic(const std::string& unique_topic, const std::string& notification) {
- if (unique_topic.find(notification + "_") == string::npos) {
- return "";
- }
- return unique_topic.substr(notification.length() + 1);
-}
-}
-
-// command (S3 compliant): PUT /<bucket name>?notification
-// a "notification" and a subscription will be auto-generated
-// actual configuration is XML encoded in the body of the message
-class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
- rgw_pubsub_s3_notifications configurations;
-
- int get_params_from_body() {
- const auto max_size = s->cct->_conf->rgw_max_put_param_size;
- int r;
- bufferlist data;
- std::tie(r, data) = rgw_rest_read_all_input(s, max_size, false);
-
- if (r < 0) {
- ldout(s->cct, 1) << "failed to read XML payload" << dendl;
- return r;
- }
- if (data.length() == 0) {
- ldout(s->cct, 1) << "XML payload missing" << dendl;
- return -EINVAL;
- }
-
- RGWXMLDecoder::XMLParser parser;
-
- if (!parser.init()){
- ldout(s->cct, 1) << "failed to initialize XML parser" << dendl;
- return -EINVAL;
- }
- if (!parser.parse(data.c_str(), data.length(), 1)) {
- ldout(s->cct, 1) << "failed to parse XML payload" << dendl;
- return -ERR_MALFORMED_XML;
- }
- try {
- // TopicConfigurations is mandatory
- RGWXMLDecoder::decode_xml("NotificationConfiguration", configurations, &parser, true);
- } catch (RGWXMLDecoder::err& err) {
- ldout(s->cct, 1) << "failed to parse XML payload. error: " << err << dendl;
- return -ERR_MALFORMED_XML;
- }
- return 0;
- }
-
- int get_params() override {
- bool exists;
- const auto no_value = s->info.args.get("notification", &exists);
- if (!exists) {
- ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
- return -EINVAL;
- }
- if (no_value.length() > 0) {
- ldout(s->cct, 1) << "param 'notification' should not have any value" << dendl;
- return -EINVAL;
- }
- if (s->bucket_name.empty()) {
- ldout(s->cct, 1) << "request must be on a bucket" << dendl;
- return -EINVAL;
- }
- bucket_name = s->bucket_name;
- return 0;
- }
-
-public:
- const char* name() const override { return "pubsub_notification_create_s3"; }
- void execute() override;
-};
-
-void RGWPSCreateNotif_ObjStore_S3::execute() {
- op_ret = get_params_from_body();
- if (op_ret < 0) {
- return;
- }
-
- ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
- auto b = ups->get_bucket(bucket_info.bucket);
- ceph_assert(b);
- const auto psmodule = static_cast<RGWPSSyncModuleInstance*>(store->getRados()->get_sync_module().get());
- const auto& conf = psmodule->get_effective_conf();
-
- for (const auto& c : configurations.list) {
- const auto& sub_name = c.id;
- if (sub_name.empty()) {
- ldout(s->cct, 1) << "missing notification id" << dendl;
- op_ret = -EINVAL;
- return;
- }
- if (c.topic_arn.empty()) {
- ldout(s->cct, 1) << "missing topic ARN" << dendl;
- op_ret = -EINVAL;
- return;
- }
-
- const auto arn = rgw::ARN::parse(c.topic_arn);
- if (!arn || arn->resource.empty()) {
- ldout(s->cct, 1) << "topic ARN has invalid format:" << c.topic_arn << dendl;
- op_ret = -EINVAL;
- return;
- }
-
- const auto topic_name = arn->resource;
-
- // get topic information. destination information is stored in the topic
- rgw_pubsub_topic_subs topic_info;
- op_ret = ups->get_topic(topic_name, &topic_info);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
- return;
- }
- // make sure that full topic configuration match
- // TODO: use ARN match function
-
- // create unique topic name. this has 2 reasons:
- // (1) topics cannot be shared between different S3 notifications because they hold the filter information
- // (2) make topic clneaup easier, when notification is removed
- const auto unique_topic_name = topic_to_unique(topic_name, sub_name);
- // generate the internal topic, no need to store destination info
- // ARN is cached to make the "GET" method faster
- op_ret = ups->create_topic(unique_topic_name, rgw_pubsub_sub_dest(), topic_info.topic.arn);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to auto-generate topic '" << unique_topic_name <<
- "', ret=" << op_ret << dendl;
- return;
- }
- // generate the notification
- EventTypeList events;
- std::transform(c.events.begin(), c.events.end(), std::inserter(events, events.begin()), s3_to_gcp_event);
- op_ret = b->create_notification(unique_topic_name, events);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to auto-generate notification on topic '" << unique_topic_name <<
- "', ret=" << op_ret << dendl;
- // rollback generated topic (ignore return value)
- ups->remove_topic(unique_topic_name);
- return;
- }
-
- rgw_pubsub_sub_dest dest = topic_info.topic.dest;
- dest.bucket_name = string(conf["data_bucket_prefix"]) + s->owner.get_id().to_str() + "-" + unique_topic_name;
- dest.oid_prefix = string(conf["data_oid_prefix"]) + sub_name + "/";
- auto sub = ups->get_sub(sub_name);
- op_ret = sub->subscribe(unique_topic_name, dest, sub_name);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to auto-generate subscription '" << sub_name << "', ret=" << op_ret << dendl;
- // rollback generated notification (ignore return value)
- b->remove_notification(unique_topic_name);
- // rollback generated topic (ignore return value)
- ups->remove_topic(unique_topic_name);
- return;
- }
- ldout(s->cct, 20) << "successfully auto-generated subscription '" << sub_name << "'" << dendl;
- }
-}
-
-// delete a notification
-class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
-protected:
- std::unique_ptr<RGWUserPubSub> ups;
- std::string bucket_name;
- RGWBucketInfo bucket_info;
-
- virtual int get_params() = 0;
-
-public:
- int verify_permission() override {
- int ret = get_params();
- if (ret < 0) {
- return ret;
- }
-
- ret = store->getRados()->get_bucket_info(*s->sysobj_ctx, s->owner.get_id().tenant, bucket_name,
- bucket_info, nullptr, null_yield, nullptr);
- if (ret < 0) {
- return ret;
- }
-
- if (bucket_info.owner != s->owner.get_id()) {
- ldout(s->cct, 1) << "user doesn't own bucket, cannot remove notification" << dendl;
- return -EPERM;
- }
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
-
- RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_DELETE; }
- uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
-};
-
-// command: DELETE /notifications/bucket/<bucket>?topic=<topic-name>
-class RGWPSDeleteNotif_ObjStore_Ceph : public RGWPSDeleteNotifOp {
+// command: DELETE /notifications/bucket/<bucket>?topic=<topic-name>
+class RGWPSDeleteNotif_ObjStore : public RGWPSDeleteNotifOp {
private:
std::string topic_name;
const char* name() const override { return "pubsub_notification_delete"; }
};
-void RGWPSDeleteNotif_ObjStore_Ceph::execute() {
+void RGWPSDeleteNotif_ObjStore::execute() {
op_ret = get_params();
if (op_ret < 0) {
return;
}
- ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
+ ups.emplace(store, s->owner.get_id());
auto b = ups->get_bucket(bucket_info.bucket);
op_ret = b->remove_notification(topic_name);
if (op_ret < 0) {
ldout(s->cct, 20) << "successfully removed notification from topic '" << topic_name << "'" << dendl;
}
-// command (extension to S3): DELETE /bucket?notification[=<notification-id>]
-class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSDeleteNotifOp {
-private:
- std::string sub_name;
-
- int get_params() override {
- bool exists;
- sub_name = s->info.args.get("notification", &exists);
- if (!exists) {
- ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
- return -EINVAL;
- }
- if (s->bucket_name.empty()) {
- ldout(s->cct, 1) << "request must be on a bucket" << dendl;
- return -EINVAL;
- }
- bucket_name = s->bucket_name;
- return 0;
- }
-
- void delete_notification(const std::string& _sub_name, const RGWUserPubSub::BucketRef& b, bool must_delete) {
- auto sub = ups->get_sub(_sub_name);
- rgw_pubsub_sub_config sub_conf;
- op_ret = sub->get_conf(&sub_conf);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
- return;
- }
- if (sub_conf.s3_id.empty()) {
- if (must_delete) {
- op_ret = -ENOENT;
- ldout(s->cct, 1) << "notification does not have an ID, ret=" << op_ret << dendl;
- }
- return;
- }
- const auto& sub_topic_name = sub_conf.topic;
- op_ret = sub->unsubscribe(sub_topic_name);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to remove auto-generated subscription, ret=" << op_ret << dendl;
- return;
- }
- op_ret = b->remove_notification(sub_topic_name);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to remove auto-generated notification, ret=" << op_ret << dendl;
- }
- op_ret = ups->remove_topic(sub_topic_name);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to remove auto-generated topic, ret=" << op_ret << dendl;
- }
- return;
- }
-
-public:
- void execute() override;
- const char* name() const override { return "pubsub_notification_delete_s3"; }
-};
-
-void RGWPSDeleteNotif_ObjStore_S3::execute() {
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
-
- ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
- auto b = ups->get_bucket(bucket_info.bucket);
- ceph_assert(b);
-
- if (!sub_name.empty()) {
- // delete a specific notification
- delete_notification(sub_name, b, true);
- return;
- }
-
- // delete all notifications on a bucket
- rgw_pubsub_bucket_topics bucket_topics;
- b->get_topics(&bucket_topics);
- // loop through all topics of the bucket
- for (const auto& topic : bucket_topics.topics) {
- // for each topic get all subscriptions
- rgw_pubsub_topic_subs topic_subs;
- ups->get_topic(topic.first, &topic_subs);
- // loop through all subscriptions
- for (const auto& topic_sub_name : topic_subs.subs) {
- delete_notification(topic_sub_name, b, false);
- }
- }
-}
-
-// get topics/notifications on a bucket
-class RGWPSListNotifsOp : public RGWOp {
-protected:
- std::string bucket_name;
- RGWBucketInfo bucket_info;
- std::unique_ptr<RGWUserPubSub> ups;
-
- virtual int get_params() = 0;
-
-public:
- int verify_permission() override {
- int ret = get_params();
- if (ret < 0) {
- return ret;
- }
-
- ret = store->getRados()->get_bucket_info(*s->sysobj_ctx, s->owner.get_id().tenant, bucket_name,
- bucket_info, nullptr, null_yield, nullptr);
- if (ret < 0) {
- return ret;
- }
-
- if (bucket_info.owner != s->owner.get_id()) {
- ldout(s->cct, 1) << "user doesn't own bucket, cannot get topic list" << dendl;
- return -EPERM;
- }
-
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
-
- RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_LIST; }
- uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-};
-
// command: GET /notifications/bucket/<bucket>
-class RGWPSListNotifs_ObjStore_Ceph : public RGWPSListNotifsOp {
+class RGWPSListNotifs_ObjStore : public RGWPSListNotifsOp {
private:
rgw_pubsub_bucket_topics result;
const char* name() const override { return "pubsub_notifications_list"; }
};
-void RGWPSListNotifs_ObjStore_Ceph::execute()
+void RGWPSListNotifs_ObjStore::execute()
{
- ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
+ ups.emplace(store, s->owner.get_id());
auto b = ups->get_bucket(bucket_info.bucket);
op_ret = b->get_topics(&result);
if (op_ret < 0) {
}
}
-// command (S3 compliant): GET /bucket?notification[=<notification-id>]
-class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp {
-private:
- std::string sub_name;
- rgw_pubsub_s3_notifications notifications;
-
- int get_params() override {
- bool exists;
- sub_name = s->info.args.get("notification", &exists);
- if (!exists) {
- ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
- return -EINVAL;
- }
- if (s->bucket_name.empty()) {
- ldout(s->cct, 1) << "request must be on a bucket" << dendl;
- return -EINVAL;
- }
- bucket_name = s->bucket_name;
- return 0;
- }
-
- void add_notification_to_list(const rgw_pubsub_sub_config& sub_conf,
- const EventTypeList& events,
- const std::string& topic_arn);
-
-public:
- void execute() override;
- void send_response() override {
- if (op_ret) {
- set_req_state_err(s, op_ret);
- }
- dump_errno(s);
- end_header(s, this, "application/xml");
-
- if (op_ret < 0) {
- return;
- }
- notifications.dump_xml(s->formatter);
- rgw_flush_formatter_and_reset(s, s->formatter);
- }
- const char* name() const override { return "pubsub_notifications_get_s3"; }
-};
-
-void RGWPSListNotifs_ObjStore_S3::add_notification_to_list(const rgw_pubsub_sub_config& sub_conf,
- const EventTypeList& events,
- const std::string& topic_arn) {
- rgw_pubsub_s3_notification notification;
- notification.id = sub_conf.s3_id;
- notification.topic_arn = topic_arn,
- std::transform(events.begin(), events.end(), std::back_inserter(notification.events), gcp_to_s3_event);
- notifications.list.push_back(notification);
-}
-
-void RGWPSListNotifs_ObjStore_S3::execute() {
- ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
- auto b = ups->get_bucket(bucket_info.bucket);
- ceph_assert(b);
- if (!sub_name.empty()) {
- // get info of a specific notification
- auto sub = ups->get_sub(sub_name);
- rgw_pubsub_sub_config sub_conf;
- op_ret = sub->get_conf(&sub_conf);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
- return;
- }
- if (sub_conf.s3_id.empty()) {
- op_ret = -ENOENT;
- ldout(s->cct, 1) << "notification does not have an ID, ret=" << op_ret << dendl;
- return;
- }
- rgw_pubsub_bucket_topics bucket_topics;
- op_ret = b->get_topics(&bucket_topics);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
- return;
- }
- const auto topic_it = bucket_topics.topics.find(sub_conf.topic);
- if (topic_it == bucket_topics.topics.end()) {
- op_ret = -ENOENT;
- ldout(s->cct, 1) << "notification does not have topic information, ret=" << op_ret << dendl;
- return;
- }
- add_notification_to_list(sub_conf, topic_it->second.events, topic_it->second.topic.arn);
- return;
- }
- // get info on all s3 notifications of the bucket
- rgw_pubsub_bucket_topics bucket_topics;
- b->get_topics(&bucket_topics);
- // loop through all topics of the bucket
- for (const auto& topic : bucket_topics.topics) {
- // for each topic get all subscriptions
- rgw_pubsub_topic_subs topic_subs;
- ups->get_topic(topic.first, &topic_subs);
- const auto& events = topic.second.events;
- const auto& topic_arn = topic.second.topic.arn;
- // loop through all subscriptions
- for (const auto& topic_sub_name : topic_subs.subs) {
- // get info of a specific notification
- auto sub = ups->get_sub(topic_sub_name);
- rgw_pubsub_sub_config sub_conf;
- op_ret = sub->get_conf(&sub_conf);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
- return;
- }
- if (sub_conf.s3_id.empty()) {
- // not an s3 notification
- continue;
- }
- add_notification_to_list(sub_conf, events, topic_arn);
- }
- }
-}
-
// ceph specific notification handler factory
-class RGWHandler_REST_PSNotifs_Ceph : public RGWHandler_REST_S3 {
+class RGWHandler_REST_PSNotifs : public RGWHandler_REST_S3 {
protected:
int init_permissions(RGWOp* op) override {
return 0;
if (s->object.empty()) {
return nullptr;
}
- return new RGWPSListNotifs_ObjStore_Ceph();
+ return new RGWPSListNotifs_ObjStore();
}
RGWOp *op_put() override {
if (!s->object.empty()) {
- return new RGWPSCreateNotif_ObjStore_Ceph();
+ return new RGWPSCreateNotif_ObjStore();
}
return nullptr;
}
RGWOp *op_delete() override {
if (!s->object.empty()) {
- return new RGWPSDeleteNotif_ObjStore_Ceph();
+ return new RGWPSDeleteNotif_ObjStore();
}
return nullptr;
}
public:
- explicit RGWHandler_REST_PSNotifs_Ceph(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
- virtual ~RGWHandler_REST_PSNotifs_Ceph() = default;
+ explicit RGWHandler_REST_PSNotifs(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
+ virtual ~RGWHandler_REST_PSNotifs() = default;
};
-// s3 compliant notification handler factory
-class RGWHandler_REST_PSNotifs_S3 : public RGWHandler_REST_S3 {
-protected:
- int init_permissions(RGWOp* op) override {
- return 0;
- }
-
- int read_permissions(RGWOp* op) override {
- return 0;
- }
- bool supports_quota() override {
- return false;
- }
- RGWOp *op_get() override {
- return new RGWPSListNotifs_ObjStore_S3();
- }
- RGWOp *op_put() override {
- return new RGWPSCreateNotif_ObjStore_S3();
- }
- RGWOp *op_delete() override {
- return new RGWPSDeleteNotif_ObjStore_S3();
- }
-public:
- explicit RGWHandler_REST_PSNotifs_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
- virtual ~RGWHandler_REST_PSNotifs_S3() = default;
-};
-
-// factory for PubSub REST handlers
-RGWHandler_REST* RGWRESTMgr_PubSub_S3::get_handler(struct req_state* const s,
+// factory for ceph specific PubSub REST handlers
+RGWHandler_REST* RGWRESTMgr_PubSub::get_handler(struct req_state* const s,
const rgw::auth::StrategyRegistry& auth_registry,
const std::string& frontend_prefix)
{
if (RGWHandler_REST_S3::init_from_header(s, RGW_FORMAT_JSON, true) < 0) {
return nullptr;
}
-
- RGWHandler_REST *handler = nullptr;
+
+ RGWHandler_REST* handler{nullptr};
// ceph specific PubSub API: topics/subscriptions/notification are reserved bucket names
+ // this API is available only on RGW that belong to a pubsub zone
if (s->init_state.url_bucket == "topics") {
- handler = new RGWHandler_REST_PSTopic_S3(auth_registry);
+ handler = new RGWHandler_REST_PSTopic(auth_registry);
} else if (s->init_state.url_bucket == "subscriptions") {
- handler = new RGWHandler_REST_PSSub_S3(auth_registry);
+ handler = new RGWHandler_REST_PSSub(auth_registry);
} else if (s->init_state.url_bucket == "notifications") {
- handler = new RGWHandler_REST_PSNotifs_Ceph(auth_registry);
- } else {
- // S3 compatible answers are XML formatted, this is not necessarily in the header of the request
- if (RGWHandler_REST_S3::reallocate_formatter(s, RGW_FORMAT_XML) < 0) {
- return nullptr;
+ handler = new RGWHandler_REST_PSNotifs(auth_registry);
+ } else if (s->info.args.exists("notification")) {
+ const int ret = RGWHandler_REST::allocate_formatter(s, RGW_FORMAT_XML, true);
+ if (ret == 0) {
+ handler = new RGWHandler_REST_PSNotifs_S3(auth_registry);
}
- // s3 compliant PubSub API: uses: <bucket name>?notification
- handler = new RGWHandler_REST_PSNotifs_S3(auth_registry);
}
-
+
ldout(s->cct, 20) << __func__ << " handler=" << (handler ? typeid(*handler).name() : "<null>") << dendl;
+
return handler;
}
#include "rgw_rest.h"
-class RGWRESTMgr_PubSub_S3 : public RGWRESTMgr {
- RGWRESTMgr *next;
+class RGWRESTMgr_PubSub : public RGWRESTMgr {
public:
- explicit RGWRESTMgr_PubSub_S3(RGWRESTMgr *_next) : next(_next) {}
-
- RGWHandler_REST *get_handler(struct req_state* s,
+ virtual RGWHandler_REST* get_handler(struct req_state* s,
const rgw::auth::StrategyRegistry& auth_registry,
const std::string& frontend_prefix) override;
};
return rados_obj.operate(&op, &ibl, null_yield);
}
-int RGWSI_User_RADOS::cls_user_get_header_async(const string& user, RGWGetUserHeader_CB *cb)
+int RGWSI_User_RADOS::cls_user_get_header_async(const string& user_str, RGWGetUserHeader_CB *cb)
{
- rgw_raw_obj obj = get_buckets_obj(user);
+ rgw_raw_obj obj = get_buckets_obj(rgw_user(user_str));
auto rados_obj = svc.rados->obj(obj);
int r = rados_obj.open();
if (r < 0) {
string user_str = user.to_str();
cls_user_header header;
- int r = cls_user_get_header(user_str, &header);
+ int r = cls_user_get_header(rgw_user(user_str), &header);
if (r < 0)
return r;
gen_bucket_name, \
get_user, \
get_tenant
-from .zone_ps import PSTopic, PSNotification, PSSubscription, PSNotificationS3, print_connection_info
+from .zone_ps import PSTopic, PSTopicS3, PSNotification, PSSubscription, PSNotificationS3, print_connection_info
from multisite import User
from nose import SkipTest
from nose.tools import assert_not_equal, assert_equal
zones[0].delete_bucket(bucket_name)
+def test_ps_s3_notification_on_master():
+ """ test s3 notification set/get/delete on master """
+ zones, _ = init_env()
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+ bucket_name = gen_bucket_name()
+ # create bucket on the first of the rados zones
+ zones[0].create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+ # create s3 topic
+ topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:*']
+ }]
+ s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # get notifications on a bucket
+ response, status = s3_notification_conf.get_config()
+ assert_equal(status/100, 2)
+ assert_equal(len(response['TopicConfigurations']), 1)
+ assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn)
+
+ # delete specific notifications
+ _, status = s3_notification_conf.del_config(notification=notification_name)
+ assert_equal(status/100, 2)
+
+ # cleanup
+ topic_conf.del_config()
+ # delete the bucket
+ zones[0].delete_bucket(bucket_name)
+
+
def test_ps_topic():
""" test set/get/delete of topic """
_, ps_zones = init_env()
assert_equal(parsed_result['Code'], 'NoSuchKey')
+def test_ps_s3_topic():
+ """ test set/get/delete of s3 topic """
+ zones, _ = init_env()
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+ bucket_name = gen_bucket_name()
+ topic_name = bucket_name+TOPIC_SUFFIX
+
+ # create topic
+ topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name)
+ topic_arn = topic_conf.set_config()
+ assert_equal(topic_arn,
+ 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name)
+
+ # list topics
+ result = topic_conf.get_list()
+ assert len(result['Topics']) > 0
+
+ # get topic
+ result, status = topic_conf.get_config()
+ assert_equal(status/100, 2)
+ assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
+
+ # delete topic
+ topic_conf.del_config()
+
+
def test_ps_topic_with_endpoint():
""" test set topic with endpoint"""
_, ps_zones = init_env()
topic_conf.del_config()
+def test_ps_s3_topic_with_endpoint():
+ """ test set/get/delete of s3 topic with endpoint """
+ zones, _ = init_env()
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+ bucket_name = gen_bucket_name()
+ topic_name = bucket_name+TOPIC_SUFFIX
+
+ # create topic
+ endpoint_address = 'amqp://127.0.0.1:7001'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+ topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args)
+ topic_arn = topic_conf.set_config()
+ assert_equal(topic_arn,
+ 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name)
+
+ # get topic
+ result, status = topic_conf.get_config()
+ assert_equal(status/100, 2)
+ assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
+ assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
+ # Note that endpoint args may be ordered differently in the result
+
+ # delete topic
+ topic_conf.del_config()
+
+
def test_ps_notification():
""" test set/get/delete of notification """
zones, ps_zones = init_env()
import logging
import httplib
import urllib
+import urlparse
import hmac
import hashlib
import base64
return self.send_request('GET', get_list=True)
+class PSTopicS3:
+ """class to set/list/get/delete a topic
+ POST ?Action=CreateTopic&Name=<topic name>&push-endpoint=<endpoint>&[<arg1>=<value1>...]]
+ POST ?Action=ListTopics
+ POST ?Action=GetTopic&TopicArn=<topic-arn>
+ POST ?Action=DeleteTopic&TopicArn=<topic-arn>
+ """
+ def __init__(self, conn, topic_name, region, endpoint_args=None):
+ self.conn = conn
+ self.topic_name = topic_name.strip()
+ assert self.topic_name
+ self.topic_arn = ''
+ self.attributes = {}
+ if endpoint_args is not None:
+ self.attributes = {nvp[0] : nvp[1] for nvp in urlparse.parse_qsl(endpoint_args, keep_blank_values=True)}
+ self.client = boto3.client('sns',
+ endpoint_url='http://'+conn.host+':'+str(conn.port),
+ aws_access_key_id=conn.aws_access_key_id,
+ aws_secret_access_key=conn.aws_secret_access_key,
+ region_name=region,
+ config=Config(signature_version='s3'))
+
+
+ def get_config(self):
+ """get topic info"""
+ parameters = {'Action': 'GetTopic', 'TopicArn': self.topic_arn}
+ body = urllib.urlencode(parameters)
+ string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
+ content_type = 'application/x-www-form-urlencoded; charset=utf-8'
+ resource = '/'
+ method = 'POST'
+ string_to_sign = method + '\n\n' + content_type + '\n' + string_date + '\n' + resource
+ log.debug('StringTosign: %s', string_to_sign)
+ signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key,
+ string_to_sign.encode('utf-8'),
+ hashlib.sha1).digest())
+ headers = {'Authorization': 'AWS '+self.conn.aws_access_key_id+':'+signature,
+ 'Date': string_date,
+ 'Host': self.conn.host+':'+str(self.conn.port),
+ 'Content-Type': content_type}
+ http_conn = httplib.HTTPConnection(self.conn.host, self.conn.port)
+ if log.getEffectiveLevel() <= 10:
+ http_conn.set_debuglevel(5)
+ http_conn.request(method, resource, body, headers)
+ response = http_conn.getresponse()
+ data = response.read()
+ status = response.status
+ http_conn.close()
+ dict_response = xmltodict.parse(data)
+ return dict_response, status
+
+ def set_config(self):
+ """set topic"""
+ result = self.client.create_topic(Name=self.topic_name, Attributes=self.attributes)
+ self.topic_arn = result['TopicArn']
+ return self.topic_arn
+
+ def del_config(self):
+ """delete topic"""
+ self.client.delete_topic(TopicArn=self.topic_arn)
+
+ def get_list(self):
+ """list all topics"""
+ return self.client.list_topics()
+
+
class PSNotification:
"""class to set/get/delete a notification
PUT /notifications/bucket/<bucket>?topic=<topic-name>[&events=<event>[,<event>]]
def start(self, args = None):
""" start the gateway """
assert(self.cluster)
+ env = os.environ.copy()
+ # to change frontend, set RGW_FRONTEND env variable
+ # e.g. RGW_FRONTEND=civetweb
+ # to run test under valgrind memcheck, set RGW_VALGRIND to 'yes'
+ # e.g. RGW_VALGRIND=yes
cmd = [mstart_path + 'mrgw.sh', self.cluster.cluster_id, str(self.port)]
if self.id:
cmd += ['-i', self.id]
cmd += ['--debug-rgw=20', '--debug-ms=1']
if args:
cmd += args
- bash(cmd)
+ bash(cmd, env=env)
def stop(self):
""" stop the gateway """
expected_output_with_attributes);
}
+static const char* expected_xml_output = "<Items xmlns=\"https://www.ceph.com/doc/\">"
+ "<Item Order=\"0\"><NameAndStatus><Name>hello</Name><Status>True</Status></NameAndStatus><Value>0</Value></Item>"
+ "<Item Order=\"1\"><NameAndStatus><Name>hello</Name><Status>False</Status></NameAndStatus><Value>1</Value></Item>"
+ "<Item Order=\"2\"><NameAndStatus><Name>hello</Name><Status>True</Status></NameAndStatus><Value>2</Value></Item>"
+ "<Item Order=\"3\"><NameAndStatus><Name>hello</Name><Status>False</Status></NameAndStatus><Value>3</Value></Item>"
+ "<Item Order=\"4\"><NameAndStatus><Name>hello</Name><Status>True</Status></NameAndStatus><Value>4</Value></Item>"
+ "</Items>";
+TEST(TestEncoder, ListWithAttrsAndNS)
+{
+ XMLFormatter f;
+ const auto array_size = 5;
+ f.open_array_section_in_ns("Items", "https://www.ceph.com/doc/");
+ for (auto i = 0; i < array_size; ++i) {
+ FormatterAttrs item_attrs("Order", std::to_string(i).c_str(), NULL);
+ f.open_object_section_with_attrs("Item", item_attrs);
+ f.open_object_section("NameAndStatus");
+ encode_xml("Name", "hello", &f);
+ encode_xml("Status", (i%2 == 0), &f);
+ f.close_section();
+ encode_xml("Value", i, &f);
+ f.close_section();
+ }
+ f.close_section();
+ std::stringstream ss;
+ f.flush(ss);
+ ASSERT_STREQ(ss.str().c_str(), expected_xml_output);
+}
+