#include <algorithm>
#include <boost/tokenizer.hpp>
#include <optional>
-#include "rgw_rest_pubsub_common.h"
#include "rgw_rest_pubsub.h"
#include "rgw_pubsub_push.h"
#include "rgw_pubsub.h"
#include "rgw_notify.h"
#include "rgw_sal_rados.h"
#include "services/svc_zone.h"
+#include "common/dout.h"
+#include "rgw_url.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
-using namespace std;
-
static const char* AWS_SNS_NS("https://sns.amazonaws.com/doc/2010-03-31/");
+bool verify_transport_security(CephContext *cct, const RGWEnv& env) {
+ const auto is_secure = rgw_transport_is_secure(cct, env);
+ if (!is_secure && g_conf().get_val<bool>("rgw_allow_notification_secrets_in_cleartext")) {
+ ldout(cct, 0) << "WARNING: bypassing endpoint validation, allows sending secrets over insecure transport" << dendl;
+ return true;
+ }
+ return is_secure;
+}
+
+// make sure that endpoint is a valid URL
+// make sure that if user/password are passed inside URL, it is over secure connection
+// update rgw_pubsub_sub_dest to indicate that a password is stored in the URL
+bool validate_and_update_endpoint_secret(rgw_pubsub_sub_dest& dest, CephContext *cct, const RGWEnv& env) {
+ if (dest.push_endpoint.empty()) {
+ return true;
+ }
+ std::string user;
+ std::string password;
+ if (!rgw::parse_url_userinfo(dest.push_endpoint, user, password)) {
+ ldout(cct, 1) << "endpoint validation error: malformed endpoint URL:" << dest.push_endpoint << dendl;
+ return false;
+ }
+ // this should be verified inside parse_url()
+ ceph_assert(user.empty() == password.empty());
+ if (!user.empty()) {
+ dest.stored_secret = true;
+ if (!verify_transport_security(cct, env)) {
+ ldout(cct, 1) << "endpoint validation error: sending secrets over insecure transport" << dendl;
+ return false;
+ }
+ }
+ return true;
+}
+
+bool topic_has_endpoint_secret(const rgw_pubsub_topic_subs& topic) {
+ return topic.topic.dest.stored_secret;
+}
+
+bool topics_has_endpoint_secret(const rgw_pubsub_topics& topics) {
+ for (const auto& topic : topics.topics) {
+ if (topic_has_endpoint_secret(topic.second)) return true;
+ }
+ return false;
+}
+
// command (AWS compliant):
// POST
// Action=CreateTopic&Name=<topic-name>[&OpaqueData=data][&push-endpoint=<endpoint>[&persistent][&<arg1>=<value1>]]
-class RGWPSCreateTopic_ObjStore_AWS : public RGWPSCreateTopicOp {
-public:
- int get_params() override {
+class RGWPSCreateTopicOp : public RGWOp {
+ private:
+ std::optional<RGWPubSub> ps;
+ std::string topic_name;
+ rgw_pubsub_sub_dest dest;
+ std::string topic_arn;
+ std::string opaque_data;
+
+ int get_params() {
topic_name = s->info.args.get("Name");
if (topic_name.empty()) {
ldpp_dout(this, 1) << "CreateTopic Action 'Name' argument is missing" << dendl;
return 0;
}
+ public:
+ int verify_permission(optional_yield) override {
+ return 0;
+ }
+
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute(optional_yield) override;
+
+ const char* name() const override { return "pubsub_topic_create"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_CREATE; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+
void send_response() override {
if (op_ret) {
set_req_state_err(s, op_ret);
}
};
+void RGWPSCreateTopicOp::execute(optional_yield y) {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+
+ ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
+ op_ret = ps->create_topic(this, topic_name, dest, topic_arn, opaque_data, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ ldpp_dout(this, 20) << "successfully created topic '" << topic_name << "'" << dendl;
+}
+
// command (AWS compliant):
// POST
// Action=ListTopics
-class RGWPSListTopics_ObjStore_AWS : public RGWPSListTopicsOp {
+class RGWPSListTopicsOp : public RGWOp {
+private:
+ std::optional<RGWPubSub> ps;
+ rgw_pubsub_topics result;
+
public:
+ int verify_permission(optional_yield) override {
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute(optional_yield) override;
+
+ const char* name() const override { return "pubsub_topics_list"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPICS_LIST; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+
void send_response() override {
if (op_ret) {
set_req_state_err(s, op_ret);
}
};
+void RGWPSListTopicsOp::execute(optional_yield y) {
+ ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
+ op_ret = ps->get_topics(&result);
+ // if there are no topics it is not considered an error
+ op_ret = op_ret == -ENOENT ? 0 : op_ret;
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get topics, ret=" << op_ret << dendl;
+ return;
+ }
+ if (topics_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
+ ldpp_dout(this, 1) << "topics contain secrets and cannot be sent over insecure transport" << dendl;
+ op_ret = -EPERM;
+ return;
+ }
+ ldpp_dout(this, 20) << "successfully got topics" << dendl;
+}
+
// command (extension to AWS):
// POST
// Action=GetTopic&TopicArn=<topic-arn>
-class RGWPSGetTopic_ObjStore_AWS : public RGWPSGetTopicOp {
-public:
- int get_params() override {
+class RGWPSGetTopicOp : public RGWOp {
+ private:
+ std::string topic_name;
+ std::optional<RGWPubSub> ps;
+ rgw_pubsub_topic_subs result;
+
+ int get_params() {
const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
if (!topic_arn || topic_arn->resource.empty()) {
return 0;
}
+ public:
+ int verify_permission(optional_yield y) override {
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute(optional_yield y) override;
+
+ const char* name() const override { return "pubsub_topic_get"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+
void send_response() override {
if (op_ret) {
set_req_state_err(s, op_ret);
}
};
+void RGWPSGetTopicOp::execute(optional_yield y) {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+ ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
+ op_ret = ps->get_topic(topic_name, &result);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
+ ldpp_dout(this, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
+ op_ret = -EPERM;
+ return;
+ }
+ ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
+}
+
// command (AWS compliant):
// POST
// Action=GetTopicAttributes&TopicArn=<topic-arn>
-class RGWPSGetTopicAttributes_ObjStore_AWS : public RGWPSGetTopicOp {
-public:
- int get_params() override {
+class RGWPSGetTopicAttributesOp : public RGWOp {
+ private:
+ std::string topic_name;
+ std::optional<RGWPubSub> ps;
+ rgw_pubsub_topic_subs result;
+
+ int get_params() {
const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
if (!topic_arn || topic_arn->resource.empty()) {
return 0;
}
+ public:
+ int verify_permission(optional_yield y) override {
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute(optional_yield y) override;
+
+ const char* name() const override { return "pubsub_topic_get"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+
void send_response() override {
if (op_ret) {
set_req_state_err(s, op_ret);
}
};
+void RGWPSGetTopicAttributesOp::execute(optional_yield y) {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+ ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
+ op_ret = ps->get_topic(topic_name, &result);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
+ ldpp_dout(this, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
+ op_ret = -EPERM;
+ return;
+ }
+ ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
+}
+
// command (AWS compliant):
// POST
// Action=DeleteTopic&TopicArn=<topic-arn>
-class RGWPSDeleteTopic_ObjStore_AWS : public RGWPSDeleteTopicOp {
-public:
- int get_params() override {
+class RGWPSDeleteTopicOp : public RGWOp {
+ private:
+ std::string topic_name;
+ std::optional<RGWPubSub> ps;
+
+ int get_params() {
const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
if (!topic_arn || topic_arn->resource.empty()) {
return 0;
}
-
+
+ public:
+ int verify_permission(optional_yield) override {
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute(optional_yield y) override;
+
+ const char* name() const override { return "pubsub_topic_delete"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_DELETE; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
+
void send_response() override {
if (op_ret) {
set_req_state_err(s, op_ret);
}
};
+void RGWPSDeleteTopicOp::execute(optional_yield y) {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+ ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
+ op_ret = ps->remove_topic(this, topic_name, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
+ return;
+ }
+ ldpp_dout(this, 1) << "successfully removed topic '" << topic_name << "'" << dendl;
+}
+
namespace {
// utility classes and functions for handling parameters with the following format:
// Attributes.entry.{N}.{key|value}={VALUE}
key_or_value.pop_back();
auto pos = key_or_value.find("=");
- if (pos != string::npos) {
+ if (pos != std::string::npos) {
const auto key_or_value_lhs = key_or_value.substr(0, pos);
const auto key_or_value_rhs = url_decode(key_or_value.substr(pos + 1, key_or_value.size() - 1));
const auto map_it = map.find(idx);
if (post_body.size() > 0) {
ldpp_dout(s, 10) << "Content of POST: " << post_body << dendl;
- if (post_body.find("Action") != string::npos) {
+ if (post_body.find("Action") != std::string::npos) {
const boost::char_separator<char> sep("&");
const boost::tokenizer<boost::char_separator<char>> tokens(post_body, sep);
AttributeMap map;
for (const auto& t : tokens) {
auto pos = t.find("=");
- if (pos != string::npos) {
+ if (pos != std::string::npos) {
const auto key = t.substr(0, pos);
if (key == "Action") {
s->info.args.append(key, t.substr(pos + 1, t.size() - 1));
if (s->info.args.exists("Action")) {
const auto action = s->info.args.get("Action");
if (action.compare("CreateTopic") == 0)
- return new RGWPSCreateTopic_ObjStore_AWS();
+ return new RGWPSCreateTopicOp();
if (action.compare("DeleteTopic") == 0)
- return new RGWPSDeleteTopic_ObjStore_AWS;
+ return new RGWPSDeleteTopicOp;
if (action.compare("ListTopics") == 0)
- return new RGWPSListTopics_ObjStore_AWS();
+ return new RGWPSListTopicsOp();
if (action.compare("GetTopic") == 0)
- return new RGWPSGetTopic_ObjStore_AWS();
+ return new RGWPSGetTopicOp();
if (action.compare("GetTopicAttributes") == 0)
- return new RGWPSGetTopicAttributes_ObjStore_AWS();
+ return new RGWPSGetTopicAttributesOp();
}
return nullptr;
return RGW_Auth_S3::authorize(dpp, store, auth_registry, s, y);
}
-
namespace {
// return a unique topic by prefexing with the notification name: <notification>_<topic>
std::string topic_to_unique(const std::string& topic, const std::string& notification) {
// extract the topic from a unique topic of the form: <notification>_<topic>
[[maybe_unused]] std::string unique_to_topic(const std::string& unique_topic, const std::string& notification) {
- if (unique_topic.find(notification + "_") == string::npos) {
+ if (unique_topic.find(notification + "_") == std::string::npos) {
return "";
}
return unique_topic.substr(notification.length() + 1);
// command (S3 compliant): PUT /<bucket name>?notification
// a "notification" and a subscription will be auto-generated
// actual configuration is XML encoded in the body of the message
-class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
+class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
+ private:
+ std::optional<RGWPubSub> ps;
+ std::string bucket_name;
+ RGWBucketInfo bucket_info;
rgw_pubsub_s3_notifications configurations;
+ int get_params() {
+ bool exists;
+ const auto no_value = s->info.args.get("notification", &exists);
+ if (!exists) {
+ ldpp_dout(this, 1) << "missing required param 'notification'" << dendl;
+ return -EINVAL;
+ }
+ if (no_value.length() > 0) {
+ ldpp_dout(this, 1) << "param 'notification' should not have any value" << dendl;
+ return -EINVAL;
+ }
+ if (s->bucket_name.empty()) {
+ ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
+ return -EINVAL;
+ }
+ bucket_name = s->bucket_name;
+ return 0;
+ }
+
+ public:
+ int verify_permission(optional_yield y) override;
+
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+
+ const char* name() const override { return "pubsub_notification_create_s3"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+
int get_params_from_body() {
const auto max_size = s->cct->_conf->rgw_max_put_param_size;
int r;
return 0;
}
- int get_params() override {
- bool exists;
- const auto no_value = s->info.args.get("notification", &exists);
- if (!exists) {
- ldpp_dout(this, 1) << "missing required param 'notification'" << dendl;
- return -EINVAL;
- }
- if (no_value.length() > 0) {
- ldpp_dout(this, 1) << "param 'notification' should not have any value" << dendl;
- return -EINVAL;
- }
- if (s->bucket_name.empty()) {
- ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
- return -EINVAL;
- }
- bucket_name = s->bucket_name;
- return 0;
- }
-
-public:
- const char* name() const override { return "pubsub_notification_create_s3"; }
void execute(optional_yield) override;
};
-void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) {
+void RGWPSCreateNotifOp::execute(optional_yield y) {
op_ret = get_params_from_body();
if (op_ret < 0) {
return;
}
}
+int RGWPSCreateNotifOp::verify_permission(optional_yield y) {
+ int ret = get_params();
+ if (ret < 0) {
+ return ret;
+ }
+
+ std::unique_ptr<rgw::sal::User> user = store->get_user(s->owner.get_id());
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+ ret = store->get_bucket(this, user.get(), s->owner.get_id().tenant, bucket_name, &bucket, y);
+ if (ret < 0) {
+ ldpp_dout(this, 1) << "failed to get bucket info, cannot verify ownership" << dendl;
+ return ret;
+ }
+ bucket_info = bucket->get_info();
+
+ if (bucket_info.owner != s->owner.get_id()) {
+ ldpp_dout(this, 1) << "user doesn't own bucket, not allowed to create notification" << dendl;
+ return -EPERM;
+ }
+ return 0;
+}
+
// command (extension to S3): DELETE /bucket?notification[=<notification-id>]
-class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSDeleteNotifOp {
-private:
+class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
+ private:
+ std::optional<RGWPubSub> ps;
+ std::string bucket_name;
+ RGWBucketInfo bucket_info;
std::string notif_name;
+
+ public:
+ int verify_permission(optional_yield y) override;
- int get_params() override {
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+
+ const char* name() const override { return "pubsub_notification_delete_s3"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_DELETE; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
+
+ int get_params() {
bool exists;
notif_name = s->info.args.get("notification", &exists);
if (!exists) {
return 0;
}
-public:
void execute(optional_yield y) override;
- const char* name() const override { return "pubsub_notification_delete_s3"; }
};
-void RGWPSDeleteNotif_ObjStore_S3::execute(optional_yield y) {
+void RGWPSDeleteNotifOp::execute(optional_yield y) {
op_ret = get_params();
if (op_ret < 0) {
return;
op_ret = delete_all_notifications(this, bucket_topics, b, y, *ps);
}
+int RGWPSDeleteNotifOp::verify_permission(optional_yield y) {
+ int ret = get_params();
+ if (ret < 0) {
+ return ret;
+ }
+
+ std::unique_ptr<rgw::sal::User> user = store->get_user(s->owner.get_id());
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+ ret = store->get_bucket(this, user.get(), s->owner.get_id().tenant, bucket_name, &bucket, y);
+ if (ret < 0) {
+ return ret;
+ }
+ bucket_info = bucket->get_info();
+
+ if (bucket_info.owner != s->owner.get_id()) {
+ ldpp_dout(this, 1) << "user doesn't own bucket, cannot remove notification" << dendl;
+ return -EPERM;
+ }
+ return 0;
+}
+
// command (S3 compliant): GET /bucket?notification[=<notification-id>]
-class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp {
+class RGWPSListNotifsOp : public RGWOp {
private:
+ std::string bucket_name;
+ RGWBucketInfo bucket_info;
+ std::optional<RGWPubSub> ps;
std::string notif_name;
rgw_pubsub_s3_notifications notifications;
- int get_params() override {
+ int get_params() {
bool exists;
notif_name = s->info.args.get("notification", &exists);
if (!exists) {
return 0;
}
-public:
+ public:
+ int verify_permission(optional_yield y) override;
+
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+
+ const char* name() const override { return "pubsub_notifications_get_s3"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_LIST; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+
void execute(optional_yield y) override;
void send_response() override {
if (op_ret) {
notifications.dump_xml(s->formatter);
rgw_flush_formatter_and_reset(s, s->formatter);
}
- const char* name() const override { return "pubsub_notifications_get_s3"; }
};
-void RGWPSListNotifs_ObjStore_S3::execute(optional_yield y) {
+void RGWPSListNotifsOp::execute(optional_yield y) {
ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
auto b = ps->get_bucket(bucket_info.bucket);
ceph_assert(b);
}
}
+int RGWPSListNotifsOp::verify_permission(optional_yield y) {
+ int ret = get_params();
+ if (ret < 0) {
+ return ret;
+ }
+
+ std::unique_ptr<rgw::sal::User> user = store->get_user(s->owner.get_id());
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+ ret = store->get_bucket(this, user.get(), s->owner.get_id().tenant, bucket_name, &bucket, y);
+ if (ret < 0) {
+ return ret;
+ }
+ bucket_info = bucket->get_info();
+
+ if (bucket_info.owner != s->owner.get_id()) {
+ ldpp_dout(this, 1) << "user doesn't own bucket, cannot get notification list" << dendl;
+ return -EPERM;
+ }
+
+ return 0;
+}
+
RGWOp* RGWHandler_REST_PSNotifs_S3::op_get() {
- return new RGWPSListNotifs_ObjStore_S3();
+ return new RGWPSListNotifsOp();
}
RGWOp* RGWHandler_REST_PSNotifs_S3::op_put() {
- return new RGWPSCreateNotif_ObjStore_S3();
+ return new RGWPSCreateNotifOp();
}
RGWOp* RGWHandler_REST_PSNotifs_S3::op_delete() {
- return new RGWPSDeleteNotif_ObjStore_S3();
+ return new RGWPSDeleteNotifOp();
}
RGWOp* RGWHandler_REST_PSNotifs_S3::create_get_op() {
- return new RGWPSListNotifs_ObjStore_S3();
+ return new RGWPSListNotifsOp();
}
RGWOp* RGWHandler_REST_PSNotifs_S3::create_put_op() {
- return new RGWPSCreateNotif_ObjStore_S3();
+ return new RGWPSCreateNotifOp();
}
RGWOp* RGWHandler_REST_PSNotifs_S3::create_delete_op() {
- return new RGWPSDeleteNotif_ObjStore_S3();
+ return new RGWPSDeleteNotifOp();
}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "rgw_common.h"
-#include "rgw_rest_pubsub_common.h"
-#include "common/dout.h"
-#include "rgw_url.h"
-#include "rgw_sal_rados.h"
-
-#define dout_context g_ceph_context
-#define dout_subsys ceph_subsys_rgw
-
-bool verify_transport_security(CephContext *cct, const RGWEnv& env) {
- const auto is_secure = rgw_transport_is_secure(cct, env);
- if (!is_secure && g_conf().get_val<bool>("rgw_allow_notification_secrets_in_cleartext")) {
- ldout(cct, 0) << "WARNING: bypassing endpoint validation, allow sending password over insecure transport" << dendl;
- return true;
- }
- return is_secure;
-}
-
-bool validate_and_update_endpoint_secret(rgw_pubsub_sub_dest& dest, CephContext *cct, const RGWEnv& env) {
- if (dest.push_endpoint.empty()) {
- return true;
- }
- std::string user;
- std::string password;
- if (!rgw::parse_url_userinfo(dest.push_endpoint, user, password)) {
- ldout(cct, 1) << "endpoint validation error: malformed endpoint URL:" << dest.push_endpoint << dendl;
- return false;
- }
- // this should be verified inside parse_url()
- ceph_assert(user.empty() == password.empty());
- if (!user.empty()) {
- dest.stored_secret = true;
- if (!verify_transport_security(cct, env)) {
- ldout(cct, 1) << "endpoint validation error: sending password over insecure transport" << dendl;
- return false;
- }
- }
- return true;
-}
-
-bool topic_has_endpoint_secret(const rgw_pubsub_topic_subs& topic) {
- return topic.topic.dest.stored_secret;
-}
-
-bool topics_has_endpoint_secret(const rgw_pubsub_topics& topics) {
- for (const auto& topic : topics.topics) {
- if (topic_has_endpoint_secret(topic.second)) return true;
- }
- return false;
-}
-
-void RGWPSCreateTopicOp::execute(optional_yield y) {
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
-
- ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
- op_ret = ps->create_topic(this, topic_name, dest, topic_arn, opaque_data, y);
- if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
- return;
- }
- ldpp_dout(this, 20) << "successfully created topic '" << topic_name << "'" << dendl;
-}
-
-void RGWPSListTopicsOp::execute(optional_yield y) {
- ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
- op_ret = ps->get_topics(&result);
- // if there are no topics it is not considered an error
- op_ret = op_ret == -ENOENT ? 0 : op_ret;
- if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to get topics, ret=" << op_ret << dendl;
- return;
- }
- if (topics_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
- ldpp_dout(this, 1) << "topics contain secret and cannot be sent over insecure transport" << dendl;
- op_ret = -EPERM;
- return;
- }
- ldpp_dout(this, 20) << "successfully got topics" << dendl;
-}
-
-void RGWPSGetTopicOp::execute(optional_yield y) {
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
- ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
- op_ret = ps->get_topic(topic_name, &result);
- if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
- ldpp_dout(this, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
- op_ret = -EPERM;
- return;
- }
- if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
- return;
- }
- ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
-}
-
-void RGWPSDeleteTopicOp::execute(optional_yield y) {
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
- ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
- op_ret = ps->remove_topic(this, topic_name, y);
- if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
- return;
- }
- ldpp_dout(this, 1) << "successfully removed topic '" << topic_name << "'" << dendl;
-}
-
-int RGWPSCreateNotifOp::verify_permission(optional_yield y) {
- int ret = get_params();
- if (ret < 0) {
- return ret;
- }
-
- std::unique_ptr<rgw::sal::User> user = store->get_user(s->owner.get_id());
- std::unique_ptr<rgw::sal::Bucket> bucket;
- ret = store->get_bucket(this, user.get(), s->owner.get_id().tenant, bucket_name, &bucket, y);
- if (ret < 0) {
- ldpp_dout(this, 1) << "failed to get bucket info, cannot verify ownership" << dendl;
- return ret;
- }
- bucket_info = bucket->get_info();
-
- if (bucket_info.owner != s->owner.get_id()) {
- ldpp_dout(this, 1) << "user doesn't own bucket, not allowed to create notification" << dendl;
- return -EPERM;
- }
- return 0;
-}
-
-int RGWPSDeleteNotifOp::verify_permission(optional_yield y) {
- int ret = get_params();
- if (ret < 0) {
- return ret;
- }
-
- std::unique_ptr<rgw::sal::User> user = store->get_user(s->owner.get_id());
- std::unique_ptr<rgw::sal::Bucket> bucket;
- ret = store->get_bucket(this, user.get(), s->owner.get_id().tenant, bucket_name, &bucket, y);
- if (ret < 0) {
- return ret;
- }
- bucket_info = bucket->get_info();
-
- if (bucket_info.owner != s->owner.get_id()) {
- ldpp_dout(this, 1) << "user doesn't own bucket, cannot remove notification" << dendl;
- return -EPERM;
- }
- return 0;
-}
-
-int RGWPSListNotifsOp::verify_permission(optional_yield y) {
- int ret = get_params();
- if (ret < 0) {
- return ret;
- }
-
- std::unique_ptr<rgw::sal::User> user = store->get_user(s->owner.get_id());
- std::unique_ptr<rgw::sal::Bucket> bucket;
- ret = store->get_bucket(this, user.get(), s->owner.get_id().tenant, bucket_name, &bucket, y);
- if (ret < 0) {
- return ret;
- }
- bucket_info = bucket->get_info();
-
- if (bucket_info.owner != s->owner.get_id()) {
- ldpp_dout(this, 1) << "user doesn't own bucket, cannot get notification list" << dendl;
- return -EPERM;
- }
-
- return 0;
-}
-
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-#pragma once
-#include <string>
-#include <optional>
-#include "rgw_op.h"
-#include "rgw_pubsub.h"
-
-// make sure that endpoint is a valid URL
-// make sure that if user/password are passed inside URL, it is over secure connection
-// update rgw_pubsub_sub_dest to indicate that a password is stored in the URL
-bool validate_and_update_endpoint_secret(rgw_pubsub_sub_dest& dest, CephContext *cct, const RGWEnv& env);
-
-// create a topic
-class RGWPSCreateTopicOp : public RGWDefaultResponseOp {
-protected:
- std::optional<RGWPubSub> ps;
- std::string topic_name;
- rgw_pubsub_sub_dest dest;
- std::string topic_arn;
- std::string opaque_data;
-
- virtual int get_params() = 0;
-
-public:
- int verify_permission(optional_yield) override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute(optional_yield) override;
-
- const char* name() const override { return "pubsub_topic_create"; }
- RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_CREATE; }
- uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
-};
-
-// list all topics
-class RGWPSListTopicsOp : public RGWOp {
-protected:
- std::optional<RGWPubSub> ps;
- rgw_pubsub_topics result;
-
-public:
- int verify_permission(optional_yield) override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute(optional_yield) override;
-
- const char* name() const override { return "pubsub_topics_list"; }
- RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPICS_LIST; }
- uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-};
-
-// get topic information
-class RGWPSGetTopicOp : public RGWOp {
-protected:
- std::string topic_name;
- std::optional<RGWPubSub> ps;
- rgw_pubsub_topic_subs result;
-
- virtual int get_params() = 0;
-
-public:
- int verify_permission(optional_yield y) override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute(optional_yield y) override;
-
- const char* name() const override { return "pubsub_topic_get"; }
- RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
- uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-};
-
-// delete a topic
-class RGWPSDeleteTopicOp : public RGWDefaultResponseOp {
-protected:
- std::string topic_name;
- std::optional<RGWPubSub> ps;
-
- virtual int get_params() = 0;
-
-public:
- int verify_permission(optional_yield) override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute(optional_yield y) override;
-
- const char* name() const override { return "pubsub_topic_delete"; }
- RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_DELETE; }
- uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
-};
-
-// notification creation
-class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
-protected:
- std::optional<RGWPubSub> ps;
- std::string bucket_name;
- RGWBucketInfo bucket_info;
-
- virtual int get_params() = 0;
-
-public:
- int verify_permission(optional_yield y) override;
-
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
-
- RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
- uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
-};
-
-// delete a notification
-class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
-protected:
- std::optional<RGWPubSub> ps;
- std::string bucket_name;
- RGWBucketInfo bucket_info;
-
- virtual int get_params() = 0;
-
-public:
- int verify_permission(optional_yield y) override;
-
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
-
- RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_DELETE; }
- uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
-};
-
-// get topics/notifications on a bucket
-class RGWPSListNotifsOp : public RGWOp {
-protected:
- std::string bucket_name;
- RGWBucketInfo bucket_info;
- std::optional<RGWPubSub> ps;
-
- virtual int get_params() = 0;
-
-public:
- int verify_permission(optional_yield y) override;
-
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
-
- RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_LIST; }
- uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-};