rgw_aio_throttle.cc
rgw_auth.cc
rgw_auth_s3.cc
+ rgw_arn.cc
rgw_basic_types.cc
rgw_bucket.cc
rgw_cache.cc
RGWUserInfo& user_info = user_op.get_user_info();
RGWUserPubSub ups(store, user_info.user_id);
- RGWUserPubSub::Sub::list_events_result result;
-
if (!max_entries_specified) {
- max_entries = 100;
+ max_entries = RGWUserPubSub::Sub::DEFAULT_MAX_EVENTS;
}
auto sub = ups.get_sub(sub_name);
- ret = sub->list_events(marker, max_entries, &result);
+ ret = sub->list_events(marker, max_entries);
if (ret < 0) {
cerr << "ERROR: could not list events: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
- encode_json("result", result, formatter);
+ encode_json("result", *sub, formatter);
formatter->flush(cout);
}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rgw_arn.h"
+#include "rgw_common.h"
+#include <regex>
+
+namespace rgw {
+
+namespace {
+boost::optional<Partition> to_partition(const smatch::value_type& p,
+ bool wildcards) {
+ if (p == "aws") {
+ return Partition::aws;
+ } else if (p == "aws-cn") {
+ return Partition::aws_cn;
+ } else if (p == "aws-us-gov") {
+ return Partition::aws_us_gov;
+ } else if (p == "*" && wildcards) {
+ return Partition::wildcard;
+ } else {
+ return boost::none;
+ }
+
+ ceph_abort();
+}
+
+boost::optional<Service> to_service(const smatch::value_type& s,
+ bool wildcards) {
+ static const unordered_map<string, Service> services = {
+ { "acm", Service::acm },
+ { "apigateway", Service::apigateway },
+ { "appstream", Service::appstream },
+ { "artifact", Service::artifact },
+ { "autoscaling", Service::autoscaling },
+ { "aws-marketplace", Service::aws_marketplace },
+ { "aws-marketplace-management",
+ Service::aws_marketplace_management },
+ { "aws-portal", Service::aws_portal },
+ { "cloudformation", Service::cloudformation },
+ { "cloudfront", Service::cloudfront },
+ { "cloudhsm", Service::cloudhsm },
+ { "cloudsearch", Service::cloudsearch },
+ { "cloudtrail", Service::cloudtrail },
+ { "cloudwatch", Service::cloudwatch },
+ { "codebuild", Service::codebuild },
+ { "codecommit", Service::codecommit },
+ { "codedeploy", Service::codedeploy },
+ { "codepipeline", Service::codepipeline },
+ { "cognito-identity", Service::cognito_identity },
+ { "cognito-idp", Service::cognito_idp },
+ { "cognito-sync", Service::cognito_sync },
+ { "config", Service::config },
+ { "datapipeline", Service::datapipeline },
+ { "devicefarm", Service::devicefarm },
+ { "directconnect", Service::directconnect },
+ { "dms", Service::dms },
+ { "ds", Service::ds },
+ { "dynamodb", Service::dynamodb },
+ { "ec2", Service::ec2 },
+ { "ecr", Service::ecr },
+ { "ecs", Service::ecs },
+ { "elasticache", Service::elasticache },
+ { "elasticbeanstalk", Service::elasticbeanstalk },
+ { "elasticfilesystem", Service::elasticfilesystem },
+ { "elasticloadbalancing", Service::elasticloadbalancing },
+ { "elasticmapreduce", Service::elasticmapreduce },
+ { "elastictranscoder", Service::elastictranscoder },
+ { "es", Service::es },
+ { "events", Service::events },
+ { "firehose", Service::firehose },
+ { "gamelift", Service::gamelift },
+ { "glacier", Service::glacier },
+ { "health", Service::health },
+ { "iam", Service::iam },
+ { "importexport", Service::importexport },
+ { "inspector", Service::inspector },
+ { "iot", Service::iot },
+ { "kinesis", Service::kinesis },
+ { "kinesisanalytics", Service::kinesisanalytics },
+ { "kms", Service::kms },
+ { "lambda", Service::lambda },
+ { "lightsail", Service::lightsail },
+ { "logs", Service::logs },
+ { "machinelearning", Service::machinelearning },
+ { "mobileanalytics", Service::mobileanalytics },
+ { "mobilehub", Service::mobilehub },
+ { "opsworks", Service::opsworks },
+ { "opsworks-cm", Service::opsworks_cm },
+ { "polly", Service::polly },
+ { "rds", Service::rds },
+ { "redshift", Service::redshift },
+ { "route53", Service::route53 },
+ { "route53domains", Service::route53domains },
+ { "s3", Service::s3 },
+ { "sdb", Service::sdb },
+ { "servicecatalog", Service::servicecatalog },
+ { "ses", Service::ses },
+ { "sns", Service::sns },
+ { "sqs", Service::sqs },
+ { "ssm", Service::ssm },
+ { "states", Service::states },
+ { "storagegateway", Service::storagegateway },
+ { "sts", Service::sts },
+ { "support", Service::support },
+ { "swf", Service::swf },
+ { "trustedadvisor", Service::trustedadvisor },
+ { "waf", Service::waf },
+ { "workmail", Service::workmail },
+ { "workspaces", Service::workspaces }};
+
+ if (wildcards && s == "*") {
+ return Service::wildcard;
+ }
+
+ auto i = services.find(s);
+ if (i == services.end()) {
+ return boost::none;
+ } else {
+ return i->second;
+ }
+}
+}
+ARN::ARN(const rgw_obj& o)
+ : partition(Partition::aws),
+ service(Service::s3),
+ region(),
+ account(o.bucket.tenant),
+ resource(o.bucket.name)
+{
+ resource.push_back('/');
+ resource.append(o.key.name);
+}
+
+ARN::ARN(const rgw_bucket& b)
+ : partition(Partition::aws),
+ service(Service::s3),
+ region(),
+ account(b.tenant),
+ resource(b.name) { }
+
+ARN::ARN(const rgw_bucket& b, const std::string& o)
+ : partition(Partition::aws),
+ service(Service::s3),
+ region(),
+ account(b.tenant),
+ resource(b.name) {
+ resource.push_back('/');
+ resource.append(o);
+}
+
+ARN::ARN(const std::string& resource_name, const std::string& type, const std::string& tenant, bool has_path)
+ : partition(Partition::aws),
+ service(Service::iam),
+ region(),
+ account(tenant),
+ resource(type) {
+ if (! has_path)
+ resource.push_back('/');
+ resource.append(resource_name);
+}
+
+boost::optional<ARN> ARN::parse(const std::string& s, bool wildcards) {
+ static const std::regex rx_wild("arn:([^:]*):([^:]*):([^:]*):([^:]*):([^:]*)",
+ std::regex_constants::ECMAScript |
+ std::regex_constants::optimize);
+ static const std::regex rx_no_wild(
+ "arn:([^:*]*):([^:*]*):([^:*]*):([^:*]*):(.*)",
+ std::regex_constants::ECMAScript |
+ std::regex_constants::optimize);
+
+ smatch match;
+
+ if ((s == "*") && wildcards) {
+ return ARN(Partition::wildcard, Service::wildcard, "*", "*", "*");
+ } else if (regex_match(s, match, wildcards ? rx_wild : rx_no_wild) &&
+ match.size() == 6) {
+ if (auto p = to_partition(match[1], wildcards)) {
+ if (auto s = to_service(match[2], wildcards)) {
+ return ARN(*p, *s, match[3], match[4], match[5]);
+ }
+ }
+ }
+ return boost::none;
+}
+
+std::string ARN::to_string() const {
+ std::string s;
+
+ if (partition == Partition::aws) {
+ s.append("aws:");
+ } else if (partition == Partition::aws_cn) {
+ s.append("aws-cn:");
+ } else if (partition == Partition::aws_us_gov) {
+ s.append("aws-us-gov:");
+ } else {
+ s.append("*:");
+ }
+
+ static const std::unordered_map<Service, string> services = {
+ { Service::acm, "acm" },
+ { Service::apigateway, "apigateway" },
+ { Service::appstream, "appstream" },
+ { Service::artifact, "artifact" },
+ { Service::autoscaling, "autoscaling" },
+ { Service::aws_marketplace, "aws-marketplace" },
+ { Service::aws_marketplace_management, "aws-marketplace-management" },
+ { Service::aws_portal, "aws-portal" },
+ { Service::cloudformation, "cloudformation" },
+ { Service::cloudfront, "cloudfront" },
+ { Service::cloudhsm, "cloudhsm" },
+ { Service::cloudsearch, "cloudsearch" },
+ { Service::cloudtrail, "cloudtrail" },
+ { Service::cloudwatch, "cloudwatch" },
+ { Service::codebuild, "codebuild" },
+ { Service::codecommit, "codecommit" },
+ { Service::codedeploy, "codedeploy" },
+ { Service::codepipeline, "codepipeline" },
+ { Service::cognito_identity, "cognito-identity" },
+ { Service::cognito_idp, "cognito-idp" },
+ { Service::cognito_sync, "cognito-sync" },
+ { Service::config, "config" },
+ { Service::datapipeline, "datapipeline" },
+ { Service::devicefarm, "devicefarm" },
+ { Service::directconnect, "directconnect" },
+ { Service::dms, "dms" },
+ { Service::ds, "ds" },
+ { Service::dynamodb, "dynamodb" },
+ { Service::ec2, "ec2" },
+ { Service::ecr, "ecr" },
+ { Service::ecs, "ecs" },
+ { Service::elasticache, "elasticache" },
+ { Service::elasticbeanstalk, "elasticbeanstalk" },
+ { Service::elasticfilesystem, "elasticfilesystem" },
+ { Service::elasticloadbalancing, "elasticloadbalancing" },
+ { Service::elasticmapreduce, "elasticmapreduce" },
+ { Service::elastictranscoder, "elastictranscoder" },
+ { Service::es, "es" },
+ { Service::events, "events" },
+ { Service::firehose, "firehose" },
+ { Service::gamelift, "gamelift" },
+ { Service::glacier, "glacier" },
+ { Service::health, "health" },
+ { Service::iam, "iam" },
+ { Service::importexport, "importexport" },
+ { Service::inspector, "inspector" },
+ { Service::iot, "iot" },
+ { Service::kinesis, "kinesis" },
+ { Service::kinesisanalytics, "kinesisanalytics" },
+ { Service::kms, "kms" },
+ { Service::lambda, "lambda" },
+ { Service::lightsail, "lightsail" },
+ { Service::logs, "logs" },
+ { Service::machinelearning, "machinelearning" },
+ { Service::mobileanalytics, "mobileanalytics" },
+ { Service::mobilehub, "mobilehub" },
+ { Service::opsworks, "opsworks" },
+ { Service::opsworks_cm, "opsworks-cm" },
+ { Service::polly, "polly" },
+ { Service::rds, "rds" },
+ { Service::redshift, "redshift" },
+ { Service::route53, "route53" },
+ { Service::route53domains, "route53domains" },
+ { Service::s3, "s3" },
+ { Service::sdb, "sdb" },
+ { Service::servicecatalog, "servicecatalog" },
+ { Service::ses, "ses" },
+ { Service::sns, "sns" },
+ { Service::sqs, "sqs" },
+ { Service::ssm, "ssm" },
+ { Service::states, "states" },
+ { Service::storagegateway, "storagegateway" },
+ { Service::sts, "sts" },
+ { Service::support, "support" },
+ { Service::swf, "swf" },
+ { Service::trustedadvisor, "trustedadvisor" },
+ { Service::waf, "waf" },
+ { Service::workmail, "workmail" },
+ { Service::workspaces, "workspaces" }};
+
+ auto i = services.find(service);
+ if (i != services.end()) {
+ s.append(i->second);
+ } else {
+ s.push_back('*');
+ }
+ s.push_back(':');
+
+ s.append(region);
+ s.push_back(':');
+
+ s.append(account);
+ s.push_back(':');
+
+ s.append(resource);
+
+ return s;
+}
+
+bool operator ==(const ARN& l, const ARN& r) {
+ return ((l.partition == r.partition) &&
+ (l.service == r.service) &&
+ (l.region == r.region) &&
+ (l.account == r.account) &&
+ (l.resource == r.resource));
+}
+bool operator <(const ARN& l, const ARN& r) {
+ return ((l.partition < r.partition) ||
+ (l.service < r.service) ||
+ (l.region < r.region) ||
+ (l.account < r.account) ||
+ (l.resource < r.resource));
+}
+
+// The candidate is not allowed to have wildcards. The only way to
+// do that sanely would be to use unification rather than matching.
+bool ARN::match(const ARN& candidate) const {
+ if ((candidate.partition == Partition::wildcard) ||
+ (partition != candidate.partition && partition
+ != Partition::wildcard)) {
+ return false;
+ }
+
+ if ((candidate.service == Service::wildcard) ||
+ (service != candidate.service && service != Service::wildcard)) {
+ return false;
+ }
+
+ if (!match_policy(region, candidate.region, MATCH_POLICY_ARN)) {
+ return false;
+ }
+
+ if (!match_policy(account, candidate.account, MATCH_POLICY_ARN)) {
+ return false;
+ }
+
+ if (!match_policy(resource, candidate.resource, MATCH_POLICY_RESOURCE)) {
+ return false;
+ }
+
+ return true;
+}
+
+boost::optional<ARNResource> ARNResource::parse(const std::string& s) {
+ static const std::regex rx("^([^:/]*)[:/]?([^:/]*)?[:/]?(.*)$",
+ std::regex_constants::ECMAScript |
+ std::regex_constants::optimize);
+ std::smatch match;
+ if (!regex_match(s, match, rx)) {
+ return boost::none;
+ }
+ if (match[2].str().empty() && match[3].str().empty()) {
+ // only resource exist
+ return rgw::ARNResource("", match[1], "");
+ }
+
+ // resource type also exist, and cannot be wildcard
+ if (match[1] != std::string(wildcard)) {
+ // resource type cannot be wildcard
+ return rgw::ARNResource(match[1], match[2], match[3]);
+ }
+
+ return boost::none;
+}
+
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+#include <string>
+#include <boost/optional.hpp>
+
+class rgw_obj;
+class rgw_bucket;
+
+namespace rgw {
+
+enum struct Partition {
+ aws, aws_cn, aws_us_gov, wildcard
+ // If we wanted our own ARNs for principal type unique to us
+ // (maybe to integrate better with Swift) or for anything else we
+ // provide that doesn't map onto S3, we could add an 'rgw'
+ // partition type.
+};
+
+enum struct Service {
+ apigateway, appstream, artifact, autoscaling, aws_portal, acm,
+ cloudformation, cloudfront, cloudhsm, cloudsearch, cloudtrail,
+ cloudwatch, events, logs, codebuild, codecommit, codedeploy,
+ codepipeline, cognito_idp, cognito_identity, cognito_sync,
+ config, datapipeline, dms, devicefarm, directconnect,
+ ds, dynamodb, ec2, ecr, ecs, ssm, elasticbeanstalk, elasticfilesystem,
+ elasticloadbalancing, elasticmapreduce, elastictranscoder, elasticache,
+ es, gamelift, glacier, health, iam, importexport, inspector, iot,
+ kms, kinesisanalytics, firehose, kinesis, lambda, lightsail,
+ machinelearning, aws_marketplace, aws_marketplace_management,
+ mobileanalytics, mobilehub, opsworks, opsworks_cm, polly,
+ redshift, rds, route53, route53domains, sts, servicecatalog,
+ ses, sns, sqs, s3, swf, sdb, states, storagegateway, support,
+ trustedadvisor, waf, workmail, workspaces, wildcard
+};
+
+/* valid format:
+ * 'arn:partition:service:region:account-id:resource'
+ * The 'resource' part can be further broken down via ARNResource
+*/
+struct ARN {
+ Partition partition;
+ Service service;
+ std::string region;
+ // Once we refit tenant, we should probably use that instead of a
+ // string.
+ std::string account;
+ std::string resource;
+
+ ARN()
+ : partition(Partition::wildcard), service(Service::wildcard) {}
+ ARN(Partition partition, Service service, std::string region,
+ std::string account, std::string resource)
+ : partition(partition), service(service), region(std::move(region)),
+ account(std::move(account)), resource(std::move(resource)) {}
+ ARN(const rgw_obj& o);
+ ARN(const rgw_bucket& b);
+ ARN(const rgw_bucket& b, const std::string& o);
+ ARN(const std::string& resource_name, const std::string& type, const std::string& tenant, bool has_path=false);
+
+ static boost::optional<ARN> parse(const std::string& s,
+ bool wildcard = false);
+ std::string to_string() const;
+
+ // `this` is the pattern
+ bool match(const ARN& candidate) const;
+};
+
+inline std::string to_string(const ARN& a) {
+ return a.to_string();
+}
+
+inline std::ostream& operator <<(std::ostream& m, const ARN& a) {
+ return m << to_string(a);
+}
+
+bool operator ==(const ARN& l, const ARN& r);
+bool operator <(const ARN& l, const ARN& r);
+
+/* valid formats (only resource part):
+ * 'resource'
+ * 'resourcetype/resource'
+ * 'resourcetype/resource/qualifier'
+ * 'resourcetype/resource:qualifier'
+ * 'resourcetype:resource'
+ * 'resourcetype:resource:qualifier'
+ * Note that 'resourceType' cannot be wildcard
+*/
+struct ARNResource {
+ constexpr static const char* const wildcard = "*";
+ std::string resource_type;
+ std::string resource;
+ std::string qualifier;
+
+ ARNResource() : resource_type(""), resource(wildcard), qualifier("") {}
+
+ ARNResource(const std::string& _resource_type, const std::string& _resource, const std::string& _qualifier) :
+ resource_type(std::move(_resource_type)), resource(std::move(_resource)), qualifier(std::move(_qualifier)) {}
+
+ static boost::optional<ARNResource> parse(const std::string& s);
+};
+
+} // namespace rgw
+
+namespace std {
+template<>
+struct hash<::rgw::Service> {
+ size_t operator()(const ::rgw::Service& s) const noexcept {
+ // Invoke a default-constructed hash object for int.
+ return hash<int>()(static_cast<int>(s));
+ }
+};
+} // namespace std
+
#include "rgw_string.h"
#include "rgw_rados.h"
#include "rgw_http_errors.h"
+#include "rgw_arn.h"
#include "common/ceph_crypto.h"
#include "common/armor.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
-using rgw::IAM::ARN;
+using rgw::ARN;
using rgw::IAM::Effect;
using rgw::IAM::op_to_perm;
using rgw::IAM::Policy;
struct req_state * const s,
RGWAccessControlPolicy * const user_acl,
const vector<rgw::IAM::Policy>& user_policies,
- const rgw::IAM::ARN& res,
+ const rgw::ARN& res,
const uint64_t op)
{
auto usr_policy_res = eval_user_policies(user_policies, s->env, boost::none, op, res);
bool verify_user_permission(const DoutPrefixProvider* dpp,
struct req_state * const s,
- const rgw::IAM::ARN& res,
+ const rgw::ARN& res,
const uint64_t op)
{
return verify_user_permission(dpp, s, s->user_acl.get(), s->iam_user_policies, res, op);
const rgw::IAM::Environment& env,
boost::optional<const rgw::auth::Identity&> id,
const uint64_t op,
- const rgw::IAM::ARN& arn);
+ const rgw::ARN& arn);
bool verify_user_permission(const DoutPrefixProvider* dpp,
struct req_state * const s,
RGWAccessControlPolicy * const user_acl,
const vector<rgw::IAM::Policy>& user_policies,
- const rgw::IAM::ARN& res,
+ const rgw::ARN& res,
const uint64_t op);
bool verify_user_permission_no_policy(const DoutPrefixProvider* dpp,
struct req_state * const s,
const int perm);
bool verify_user_permission(const DoutPrefixProvider* dpp,
struct req_state * const s,
- const rgw::IAM::ARN& res,
+ const rgw::ARN& res,
const uint64_t op);
bool verify_user_permission_no_policy(const DoutPrefixProvider* dpp,
struct req_state * const s,
const uint64_t bit;
};
-namespace {
-boost::optional<Partition> to_partition(const smatch::value_type& p,
- bool wildcards) {
- if (p == "aws") {
- return Partition::aws;
- } else if (p == "aws-cn") {
- return Partition::aws_cn;
- } else if (p == "aws-us-gov") {
- return Partition::aws_us_gov;
- } else if (p == "*" && wildcards) {
- return Partition::wildcard;
- } else {
- return boost::none;
- }
-
- ceph_abort();
-}
-
-boost::optional<Service> to_service(const smatch::value_type& s,
- bool wildcards) {
- static const unordered_map<string, Service> services = {
- { "acm", Service::acm },
- { "apigateway", Service::apigateway },
- { "appstream", Service::appstream },
- { "artifact", Service::artifact },
- { "autoscaling", Service::autoscaling },
- { "aws-marketplace", Service::aws_marketplace },
- { "aws-marketplace-management",
- Service::aws_marketplace_management },
- { "aws-portal", Service::aws_portal },
- { "cloudformation", Service::cloudformation },
- { "cloudfront", Service::cloudfront },
- { "cloudhsm", Service::cloudhsm },
- { "cloudsearch", Service::cloudsearch },
- { "cloudtrail", Service::cloudtrail },
- { "cloudwatch", Service::cloudwatch },
- { "codebuild", Service::codebuild },
- { "codecommit", Service::codecommit },
- { "codedeploy", Service::codedeploy },
- { "codepipeline", Service::codepipeline },
- { "cognito-identity", Service::cognito_identity },
- { "cognito-idp", Service::cognito_idp },
- { "cognito-sync", Service::cognito_sync },
- { "config", Service::config },
- { "datapipeline", Service::datapipeline },
- { "devicefarm", Service::devicefarm },
- { "directconnect", Service::directconnect },
- { "dms", Service::dms },
- { "ds", Service::ds },
- { "dynamodb", Service::dynamodb },
- { "ec2", Service::ec2 },
- { "ecr", Service::ecr },
- { "ecs", Service::ecs },
- { "elasticache", Service::elasticache },
- { "elasticbeanstalk", Service::elasticbeanstalk },
- { "elasticfilesystem", Service::elasticfilesystem },
- { "elasticloadbalancing", Service::elasticloadbalancing },
- { "elasticmapreduce", Service::elasticmapreduce },
- { "elastictranscoder", Service::elastictranscoder },
- { "es", Service::es },
- { "events", Service::events },
- { "firehose", Service::firehose },
- { "gamelift", Service::gamelift },
- { "glacier", Service::glacier },
- { "health", Service::health },
- { "iam", Service::iam },
- { "importexport", Service::importexport },
- { "inspector", Service::inspector },
- { "iot", Service::iot },
- { "kinesis", Service::kinesis },
- { "kinesisanalytics", Service::kinesisanalytics },
- { "kms", Service::kms },
- { "lambda", Service::lambda },
- { "lightsail", Service::lightsail },
- { "logs", Service::logs },
- { "machinelearning", Service::machinelearning },
- { "mobileanalytics", Service::mobileanalytics },
- { "mobilehub", Service::mobilehub },
- { "opsworks", Service::opsworks },
- { "opsworks-cm", Service::opsworks_cm },
- { "polly", Service::polly },
- { "rds", Service::rds },
- { "redshift", Service::redshift },
- { "route53", Service::route53 },
- { "route53domains", Service::route53domains },
- { "s3", Service::s3 },
- { "sdb", Service::sdb },
- { "servicecatalog", Service::servicecatalog },
- { "ses", Service::ses },
- { "sns", Service::sns },
- { "sqs", Service::sqs },
- { "ssm", Service::ssm },
- { "states", Service::states },
- { "storagegateway", Service::storagegateway },
- { "sts", Service::sts },
- { "support", Service::support },
- { "swf", Service::swf },
- { "trustedadvisor", Service::trustedadvisor },
- { "waf", Service::waf },
- { "workmail", Service::workmail },
- { "workspaces", Service::workspaces }};
-
- if (wildcards && s == "*") {
- return Service::wildcard;
- }
-
- auto i = services.find(s);
- if (i == services.end()) {
- return boost::none;
- } else {
- return i->second;
- }
-}
-}
-
-ARN::ARN(const rgw_obj& o)
- : partition(Partition::aws),
- service(Service::s3),
- region(),
- account(o.bucket.tenant),
- resource(o.bucket.name)
-{
- resource.push_back('/');
- resource.append(o.key.name);
-}
-
-ARN::ARN(const rgw_bucket& b)
- : partition(Partition::aws),
- service(Service::s3),
- region(),
- account(b.tenant),
- resource(b.name) { }
-
-ARN::ARN(const rgw_bucket& b, const string& o)
- : partition(Partition::aws),
- service(Service::s3),
- region(),
- account(b.tenant),
- resource(b.name) {
- resource.push_back('/');
- resource.append(o);
-}
-
-ARN::ARN(const string& resource_name, const string& type, const string& tenant, bool has_path)
- : partition(Partition::aws),
- service(Service::iam),
- region(),
- account(tenant),
- resource(type) {
- if (! has_path)
- resource.push_back('/');
- resource.append(resource_name);
-}
-
-boost::optional<ARN> ARN::parse(const string& s, bool wildcards) {
- static const regex rx_wild("arn:([^:]*):([^:]*):([^:]*):([^:]*):([^:]*)",
- std::regex_constants::ECMAScript |
- std::regex_constants::optimize);
- static const regex rx_no_wild(
- "arn:([^:*]*):([^:*]*):([^:*]*):([^:*]*):(.*)",
- std::regex_constants::ECMAScript |
- std::regex_constants::optimize);
-
- smatch match;
-
- if ((s == "*") && wildcards) {
- return ARN(Partition::wildcard, Service::wildcard, "*", "*", "*");
- } else if (regex_match(s, match, wildcards ? rx_wild : rx_no_wild) &&
- match.size() == 6) {
- if (auto p = to_partition(match[1], wildcards)) {
- if (auto s = to_service(match[2], wildcards)) {
- return ARN(*p, *s, match[3], match[4], match[5]);
- }
- }
- }
- return boost::none;
-}
-
-string ARN::to_string() const {
- string s;
-
- if (partition == Partition::aws) {
- s.append("aws:");
- } else if (partition == Partition::aws_cn) {
- s.append("aws-cn:");
- } else if (partition == Partition::aws_us_gov) {
- s.append("aws-us-gov:");
- } else {
- s.append("*:");
- }
-
- static const unordered_map<Service, string> services = {
- { Service::acm, "acm" },
- { Service::apigateway, "apigateway" },
- { Service::appstream, "appstream" },
- { Service::artifact, "artifact" },
- { Service::autoscaling, "autoscaling" },
- { Service::aws_marketplace, "aws-marketplace" },
- { Service::aws_marketplace_management, "aws-marketplace-management" },
- { Service::aws_portal, "aws-portal" },
- { Service::cloudformation, "cloudformation" },
- { Service::cloudfront, "cloudfront" },
- { Service::cloudhsm, "cloudhsm" },
- { Service::cloudsearch, "cloudsearch" },
- { Service::cloudtrail, "cloudtrail" },
- { Service::cloudwatch, "cloudwatch" },
- { Service::codebuild, "codebuild" },
- { Service::codecommit, "codecommit" },
- { Service::codedeploy, "codedeploy" },
- { Service::codepipeline, "codepipeline" },
- { Service::cognito_identity, "cognito-identity" },
- { Service::cognito_idp, "cognito-idp" },
- { Service::cognito_sync, "cognito-sync" },
- { Service::config, "config" },
- { Service::datapipeline, "datapipeline" },
- { Service::devicefarm, "devicefarm" },
- { Service::directconnect, "directconnect" },
- { Service::dms, "dms" },
- { Service::ds, "ds" },
- { Service::dynamodb, "dynamodb" },
- { Service::ec2, "ec2" },
- { Service::ecr, "ecr" },
- { Service::ecs, "ecs" },
- { Service::elasticache, "elasticache" },
- { Service::elasticbeanstalk, "elasticbeanstalk" },
- { Service::elasticfilesystem, "elasticfilesystem" },
- { Service::elasticloadbalancing, "elasticloadbalancing" },
- { Service::elasticmapreduce, "elasticmapreduce" },
- { Service::elastictranscoder, "elastictranscoder" },
- { Service::es, "es" },
- { Service::events, "events" },
- { Service::firehose, "firehose" },
- { Service::gamelift, "gamelift" },
- { Service::glacier, "glacier" },
- { Service::health, "health" },
- { Service::iam, "iam" },
- { Service::importexport, "importexport" },
- { Service::inspector, "inspector" },
- { Service::iot, "iot" },
- { Service::kinesis, "kinesis" },
- { Service::kinesisanalytics, "kinesisanalytics" },
- { Service::kms, "kms" },
- { Service::lambda, "lambda" },
- { Service::lightsail, "lightsail" },
- { Service::logs, "logs" },
- { Service::machinelearning, "machinelearning" },
- { Service::mobileanalytics, "mobileanalytics" },
- { Service::mobilehub, "mobilehub" },
- { Service::opsworks, "opsworks" },
- { Service::opsworks_cm, "opsworks-cm" },
- { Service::polly, "polly" },
- { Service::rds, "rds" },
- { Service::redshift, "redshift" },
- { Service::route53, "route53" },
- { Service::route53domains, "route53domains" },
- { Service::s3, "s3" },
- { Service::sdb, "sdb" },
- { Service::servicecatalog, "servicecatalog" },
- { Service::ses, "ses" },
- { Service::sns, "sns" },
- { Service::sqs, "sqs" },
- { Service::ssm, "ssm" },
- { Service::states, "states" },
- { Service::storagegateway, "storagegateway" },
- { Service::sts, "sts" },
- { Service::support, "support" },
- { Service::swf, "swf" },
- { Service::trustedadvisor, "trustedadvisor" },
- { Service::waf, "waf" },
- { Service::workmail, "workmail" },
- { Service::workspaces, "workspaces" }};
-
- auto i = services.find(service);
- if (i != services.end()) {
- s.append(i->second);
- } else {
- s.push_back('*');
- }
- s.push_back(':');
-
- s.append(region);
- s.push_back(':');
-
- s.append(account);
- s.push_back(':');
-
- s.append(resource);
-
- return s;
-}
-bool operator ==(const ARN& l, const ARN& r) {
- return ((l.partition == r.partition) &&
- (l.service == r.service) &&
- (l.region == r.region) &&
- (l.account == r.account) &&
- (l.resource == r.resource));
-}
-bool operator <(const ARN& l, const ARN& r) {
- return ((l.partition < r.partition) ||
- (l.service < r.service) ||
- (l.region < r.region) ||
- (l.account < r.account) ||
- (l.resource < r.resource));
-}
-
-// The candidate is not allowed to have wildcards. The only way to
-// do that sanely would be to use unification rather than matching.
-bool ARN::match(const ARN& candidate) const {
- if ((candidate.partition == Partition::wildcard) ||
- (partition != candidate.partition && partition
- != Partition::wildcard)) {
- return false;
- }
-
- if ((candidate.service == Service::wildcard) ||
- (service != candidate.service && service != Service::wildcard)) {
- return false;
- }
-
- if (!match_policy(region, candidate.region, MATCH_POLICY_ARN)) {
- return false;
- }
-
- if (!match_policy(account, candidate.account, MATCH_POLICY_ARN)) {
- return false;
- }
-
- if (!match_policy(resource, candidate.resource, MATCH_POLICY_RESOURCE)) {
- return false;
- }
-
- return true;
-}
static const actpair actpairs[] =
{{ "s3:AbortMultipartUpload", s3AbortMultipartUpload },
#include "rgw_basic_types.h"
#include "rgw_iam_policy_keywords.h"
#include "rgw_string.h"
+#include "rgw_arn.h"
class RGWRados;
namespace rgw {
class Identity;
}
}
-struct rgw_obj;
-struct rgw_bucket;
namespace rgw {
namespace IAM {
using Environment = boost::container::flat_map<std::string, std::string>;
-enum struct Partition {
- aws, aws_cn, aws_us_gov, wildcard
- // If we wanted our own ARNs for principal type unique to us
- // (maybe to integrate better with Swift) or for anything else we
- // provide that doesn't map onto S3, we could add an 'rgw'
- // partition type.
-};
-
-enum struct Service {
- apigateway, appstream, artifact, autoscaling, aws_portal, acm,
- cloudformation, cloudfront, cloudhsm, cloudsearch, cloudtrail,
- cloudwatch, events, logs, codebuild, codecommit, codedeploy,
- codepipeline, cognito_idp, cognito_identity, cognito_sync,
- config, datapipeline, dms, devicefarm, directconnect,
- ds, dynamodb, ec2, ecr, ecs, ssm, elasticbeanstalk, elasticfilesystem,
- elasticloadbalancing, elasticmapreduce, elastictranscoder, elasticache,
- es, gamelift, glacier, health, iam, importexport, inspector, iot,
- kms, kinesisanalytics, firehose, kinesis, lambda, lightsail,
- machinelearning, aws_marketplace, aws_marketplace_management,
- mobileanalytics, mobilehub, opsworks, opsworks_cm, polly,
- redshift, rds, route53, route53domains, sts, servicecatalog,
- ses, sns, sqs, s3, swf, sdb, states, storagegateway, support,
- trustedadvisor, waf, workmail, workspaces, wildcard
-};
-
-struct ARN {
- Partition partition;
- Service service;
- std::string region;
- // Once we refit tenant, we should probably use that instead of a
- // string.
- std::string account;
- std::string resource;
-
- ARN()
- : partition(Partition::wildcard), service(Service::wildcard) {}
- ARN(Partition partition, Service service, std::string region,
- std::string account, std::string resource)
- : partition(partition), service(service), region(std::move(region)),
- account(std::move(account)), resource(std::move(resource)) {}
- ARN(const rgw_obj& o);
- ARN(const rgw_bucket& b);
- ARN(const rgw_bucket& b, const std::string& o);
- ARN(const string& resource_name, const string& type, const string& tenant, bool has_path=false);
-
- static boost::optional<ARN> parse(const std::string& s,
- bool wildcard = false);
- std::string to_string() const;
-
- // `this` is the pattern
- bool match(const ARN& candidate) const;
-};
-
-inline std::string to_string(const ARN& a) {
- return a.to_string();
-}
-
-inline std::ostream& operator <<(std::ostream& m, const ARN& a) {
- return m << to_string(a);
-}
-
-bool operator ==(const ARN& l, const ARN& r);
-bool operator <(const ARN& l, const ARN& r);
-
using Address = std::bitset<128>;
struct MaskedIP {
bool v6;
}
}
-namespace std {
-template<>
-struct hash<::rgw::IAM::Service> {
- size_t operator()(const ::rgw::IAM::Service& s) const noexcept {
- // Invoke a default-constructed hash object for int.
- return hash<int>()(static_cast<int>(s));
- }
-};
-}
-
#endif
f->open_object_section("auth");
f->open_object_section("passwordCredentials");
encode_json("username", to_string(conf.get_admin_user()), f);
- encode_json("password", to_string(conf.get_admin_password()), f);
+ encode_json("password", ::to_string(conf.get_admin_password()), f);
f->close_section();
encode_json("tenantName", to_string(conf.get_admin_tenant()), f);
f->close_section();
encode_json("name", to_string(conf.get_admin_domain()), f);
f->close_section();
encode_json("name", to_string(conf.get_admin_user()), f);
- encode_json("password", to_string(conf.get_admin_password()), f);
+ encode_json("password", ::to_string(conf.get_admin_password()), f);
f->close_section();
f->close_section();
f->close_section();
using boost::optional;
using boost::none;
-using rgw::IAM::ARN;
+using rgw::ARN;
using rgw::IAM::Effect;
using rgw::IAM::Policy;
int RGWListBuckets::verify_permission()
{
- rgw::IAM::Partition partition = rgw::IAM::Partition::aws;
- rgw::IAM::Service service = rgw::IAM::Service::s3;
+ rgw::Partition partition = rgw::Partition::aws;
+ rgw::Service service = rgw::Service::s3;
if (!verify_user_permission(this, s, ARN(partition, service, "", s->user->user_id.tenant, "*"), rgw::IAM::s3ListAllMyBuckets)) {
return -EACCES;
cs_object.instance.empty() ?
rgw::IAM::s3GetObject :
rgw::IAM::s3GetObjectVersion,
- rgw::IAM::ARN(obj)); usr_policy_res == Effect::Deny)
+ rgw::ARN(obj)); usr_policy_res == Effect::Deny)
return -EACCES;
else if (usr_policy_res == Effect::Allow)
break;
cs_object.instance.empty() ?
rgw::IAM::s3GetObject :
rgw::IAM::s3GetObjectVersion,
- rgw::IAM::ARN(obj));
+ rgw::ARN(obj));
}
if (e == Effect::Deny) {
return -EACCES;
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
#include "rgw_b64.h"
#include "rgw_rados.h"
#include "rgw_pubsub.h"
#define dout_subsys ceph_subsys_rgw
+void rgw_pubsub_s3_record::dump(Formatter *f) const {
+ encode_json("eventVersion", eventVersion, f);
+ encode_json("eventSource", eventSource, f);
+ encode_json("awsRegion", awsRegion, f);
+ utime_t ut(eventTime);
+ encode_json("eventTime", ut, f);
+ if (eventName == "OBJECT_CREATE") {
+ encode_json("eventName", "ObjectCreated", f);
+ }
+ else if (eventName == "OBJECT_DELETE") {
+ encode_json("eventName", "ObjectRemoved", f);
+ } else {
+ encode_json("eventName", eventName, f);
+ }
+ {
+ Formatter::ObjectSection s(*f, "userIdentity");
+ encode_json("principalId", userIdentity, f);
+ }
+ {
+ Formatter::ObjectSection s(*f, "requestParameters");
+ encode_json("sourceIPAddress", sourceIPAddress, f);
+ }
+ {
+ Formatter::ObjectSection s(*f, "responseElements");
+ encode_json("x-amz-request-id", x_amz_request_id, f);
+ encode_json("x-amz-id-2", x_amz_id_2, f);
+ }
+ {
+ Formatter::ObjectSection s(*f, "s3");
+ encode_json("s3SchemaVersion", s3SchemaVersion, f);
+ encode_json("configurationId", configurationId, f);
+ {
+ Formatter::ObjectSection sub_s(*f, "bucket");
+ encode_json("name", bucket_name, f);
+ {
+ Formatter::ObjectSection sub_sub_s(*f, "ownerIdentity");
+ encode_json("principalId", bucket_ownerIdentity, f);
+ }
+ encode_json("arn", bucket_arn, f);
+ }
+ {
+ Formatter::ObjectSection sub_s(*f, "object");
+ encode_json("key", object_key, f);
+ encode_json("size", object_size, f);
+ encode_json("etag", object_etag, f);
+ encode_json("versionId", object_versionId, f);
+ encode_json("sequencer", object_sequencer, f);
+ }
+ }
+ encode_json("eventId", id, f);
+}
void rgw_pubsub_event::dump(Formatter *f) const
{
encode_json("name", name, f);
encode_json("topic", topic, f);
encode_json("dest", dest, f);
+ encode_json("s3_id", s3_id, f);
}
-
int RGWUserPubSub::remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker)
{
int ret = rgw_delete_system_obj(store, obj.pool, obj.oid, objv_tracker);
{
int ret = read(user_meta_obj, result, objv_tracker);
if (ret < 0 && ret != -ENOENT) {
- ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+ ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
return ret;
}
return 0;
{
int ret = write(user_meta_obj, topics, objv_tracker);
if (ret < 0 && ret != -ENOENT) {
- ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+ ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
return ret;
}
return 0;
{
int ret = ps->read(bucket_meta_obj, result, objv_tracker);
if (ret < 0 && ret != -ENOENT) {
- ldout(ps->store->ctx(), 0) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
+ ldout(ps->store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
return ret;
}
return 0;
{
int ret = ps->write(bucket_meta_obj, topics, objv_tracker);
if (ret < 0) {
- ldout(ps->store->ctx(), 0) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
+ ldout(ps->store->ctx(), 1) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
return ret;
}
rgw_pubsub_user_topics topics;
int ret = get_user_topics(&topics);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+ ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
return ret;
}
auto iter = topics.topics.find(name);
if (iter == topics.topics.end()) {
- ldout(store->ctx(), 0) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
+ ldout(store->ctx(), 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
return -ENOENT;
}
return 0;
}
-
-int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const set<string, ltstr_nocase>& events)
+int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const EventTypeList& events)
{
rgw_pubsub_topic_subs user_topic_info;
RGWRados *store = ps->store;
int ret = ps->get_topic(topic_name, &user_topic_info);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to read topic info: ret=" << ret << dendl;
+ ldout(store->ctx(), 1) << "ERROR: failed to read topic info: ret=" << ret << dendl;
return ret;
}
ret = read_topics(&bucket_topics, &objv_tracker);
if (ret < 0 && ret != -ENOENT) {
- ldout(store->ctx(), 0) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
+ ldout(store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
return ret;
}
ret = write_topics(bucket_topics, &objv_tracker);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
+ ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
return ret;
}
int ret = ps->get_topic(topic_name, &user_topic_info);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to read topic info: ret=" << ret << dendl;
+ ldout(store->ctx(), 1) << "ERROR: failed to read topic info: ret=" << ret << dendl;
return ret;
}
ret = read_topics(&bucket_topics, &objv_tracker);
if (ret < 0 && ret != -ENOENT) {
- ldout(store->ctx(), 0) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
+ ldout(store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
return ret;
}
ret = write_topics(bucket_topics, &objv_tracker);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
+ ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
return ret;
}
int ret = read_user_topics(&topics, &objv_tracker);
if (ret < 0 && ret != -ENOENT) {
- ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+ ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
return ret;
}
ret = write_user_topics(topics, &objv_tracker);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
+ ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
return ret;
}
int ret = read_user_topics(&topics, &objv_tracker);
if (ret < 0 && ret != -ENOENT) {
- ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+ ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
return ret;
}
ret = write_user_topics(topics, &objv_tracker);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
+ ldout(store->ctx(), 1) << "ERROR: failed to remove topics info: ret=" << ret << dendl;
return ret;
}
{
int ret = ps->read(sub_meta_obj, result, objv_tracker);
if (ret < 0 && ret != -ENOENT) {
- ldout(ps->store->ctx(), 0) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
+ ldout(ps->store->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
return ret;
}
return 0;
{
int ret = ps->write(sub_meta_obj, sub_conf, objv_tracker);
if (ret < 0) {
- ldout(ps->store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
+ ldout(ps->store->ctx(), 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
return ret;
}
{
int ret = ps->remove(sub_meta_obj, objv_tracker);
if (ret < 0) {
- ldout(ps->store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
+ ldout(ps->store->ctx(), 1) << "ERROR: failed to remove subscription info: ret=" << ret << dendl;
return ret;
}
return read_sub(result, nullptr);
}
-int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest& dest)
+int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest& dest, const std::string& s3_id)
{
RGWObjVersionTracker user_objv_tracker;
rgw_pubsub_user_topics topics;
int ret = ps->read_user_topics(&topics, &user_objv_tracker);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+ ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
return ret;
}
auto iter = topics.topics.find(topic);
if (iter == topics.topics.end()) {
- ldout(store->ctx(), 0) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
+ ldout(store->ctx(), 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
return -ENOENT;
}
sub_conf.name = sub;
sub_conf.topic = topic;
sub_conf.dest = dest;
+ sub_conf.s3_id = s3_id;
t.subs.insert(sub);
return 0;
}
-void RGWUserPubSub::Sub::list_events_result::dump(Formatter *f) const
+template<typename EventType>
+void RGWUserPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter *f) const
{
encode_json("next_marker", next_marker, f);
encode_json("is_truncated", is_truncated, f);
- Formatter::ArraySection s(*f, "events");
+ Formatter::ArraySection s(*f, EventType::json_type_plural);
for (auto& event : events) {
- encode_json("event", event, f);
+ encode_json(EventType::json_type_single, event, f);
}
}
-int RGWUserPubSub::Sub::list_events(const string& marker, int max_events,
- list_events_result *result)
+template<typename EventType>
+int RGWUserPubSub::SubWithEvents<EventType>::list_events(const string& marker, int max_events)
{
RGWRados *store = ps->store;
rgw_pubsub_sub_config sub_conf;
int ret = get_conf(&sub_conf);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to read sub config: ret=" << ret << dendl;
+ ldout(store->ctx(), 1) << "ERROR: failed to read sub config: ret=" << ret << dendl;
return ret;
}
RGWSysObjectCtx obj_ctx(store->svc.sysobj->init_obj_ctx());
ret = store->get_bucket_info(obj_ctx, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, nullptr);
if (ret == -ENOENT) {
- result->is_truncated = false;
+ list.is_truncated = false;
return 0;
}
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
+ ldout(store->ctx(), 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
return ret;
}
list_op.params.prefix = sub_conf.dest.oid_prefix;
list_op.params.marker = marker;
- vector<rgw_bucket_dir_entry> objs;
+ std::vector<rgw_bucket_dir_entry> objs;
- ret = list_op.list_objects(max_events, &objs, nullptr, &result->is_truncated);
+ ret = list_op.list_objects(max_events, &objs, nullptr, &list.is_truncated);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to list bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
+ ldout(store->ctx(), 1) << "ERROR: failed to list bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
return ret;
}
- if (result->is_truncated) {
- result->next_marker = list_op.get_next_marker().name;
+ if (list.is_truncated) {
+ list.next_marker = list_op.get_next_marker().name;
}
for (auto& obj : objs) {
try {
bl.decode_base64(bl64);
} catch (buffer::error& err) {
- ldout(store->ctx(), 0) << "ERROR: failed to event (not a valid base64)" << dendl;
+ ldout(store->ctx(), 1) << "ERROR: failed to event (not a valid base64)" << dendl;
continue;
}
- rgw_pubsub_event event;
+ EventType event;
auto iter = bl.cbegin();
try {
decode(event, iter);
} catch (buffer::error& err) {
- ldout(store->ctx(), 0) << "ERROR: failed to decode event" << dendl;
+ ldout(store->ctx(), 1) << "ERROR: failed to decode event" << dendl;
continue;
};
- result->events.push_back(event);
+ list.events.push_back(event);
}
return 0;
}
-int RGWUserPubSub::Sub::remove_event(const string& event_id)
+template<typename EventType>
+int RGWUserPubSub::SubWithEvents<EventType>::remove_event(const string& event_id)
{
RGWRados *store = ps->store;
rgw_pubsub_sub_config sub_conf;
int ret = get_conf(&sub_conf);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to read sub config: ret=" << ret << dendl;
+ ldout(store->ctx(), 1) << "ERROR: failed to read sub config: ret=" << ret << dendl;
return ret;
}
RGWSysObjectCtx sysobj_ctx(store->svc.sysobj->init_obj_ctx());
ret = store->get_bucket_info(sysobj_ctx, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, nullptr);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
+ ldout(store->ctx(), 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
return ret;
}
ret = del_op.delete_obj();
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to remove event (obj=" << obj << "): ret=" << ret << dendl;
+ ldout(store->ctx(), 1) << "ERROR: failed to remove event (obj=" << obj << "): ret=" << ret << dendl;
}
return 0;
}
+
+template<typename EventType>
+void RGWUserPubSub::SubWithEvents<EventType>::dump(Formatter* f) const {
+ list.dump(f);
+}
+
+// explicit instantiation for the only two possible types
+// no need to move implementation to header
+template class RGWUserPubSub::SubWithEvents<rgw_pubsub_event>;
+template class RGWUserPubSub::SubWithEvents<rgw_pubsub_s3_record>;
+
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
#ifndef CEPH_RGW_PUBSUB_H
#define CEPH_RGW_PUBSUB_H
-#include "rgw_common.h"
#include "rgw_tools.h"
#include "rgw_zone.h"
-
#include "services/svc_sys_obj.h"
+/* S3 event records structure
+ * based on: https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
+{
+"Records":[
+ {
+ "eventVersion":""
+ "eventSource":"",
+ "awsRegion":"",
+ "eventTime":"",
+ "eventName":"",
+ "userIdentity":{
+ "principalId":""
+ },
+ "requestParameters":{
+ "sourceIPAddress":""
+ },
+ "responseElements":{
+ "x-amz-request-id":"",
+ "x-amz-id-2":""
+ },
+ "s3":{
+ "s3SchemaVersion":"1.0",
+ "configurationId":"",
+ "bucket":{
+ "name":"",
+ "ownerIdentity":{
+ "principalId":""
+ },
+ "arn":""
+ },
+ "object":{
+ "key":"",
+ "size": ,
+ "eTag":"",
+ "versionId":"",
+ "sequencer": ""
+ }
+ },
+ "eventId":"",
+ }
+]
+}*/
+
+struct rgw_pubsub_s3_record {
+ constexpr static const char* const json_type_single = "Record";
+ constexpr static const char* const json_type_plural = "Records";
+ // 2.1
+ std::string eventVersion;
+ // aws:s3
+ std::string eventSource;
+ // zone?
+ std::string awsRegion;
+ // time of the request
+ ceph::real_time eventTime;
+ // type of the event
+ std::string eventName;
+ // user that sent the requet (not implemented)
+ std::string userIdentity;
+ // IP address of source of the request (not implemented)
+ std::string sourceIPAddress;
+ // request ID (not implemented)
+ std::string x_amz_request_id;
+ // radosgw that received the request
+ std::string x_amz_id_2;
+ // 1.0
+ std::string s3SchemaVersion;
+ // ID received in the notification request
+ std::string configurationId;
+ // bucket name
+ std::string bucket_name;
+ // bucket owner (not implemented)
+ std::string bucket_ownerIdentity;
+ // bucket ARN
+ std::string bucket_arn;
+ // object key
+ std::string object_key;
+ // object size
+ uint64_t object_size;
+ // object etag
+ std::string object_etag;
+ // object version id bucket is versioned (not implemented)
+ std::string object_versionId;
+ // hexadecimal value used to determine event order for specific key
+ std::string object_sequencer;
+ // this is an rgw extension (not S3 standard)
+ // used to store a globally unique identifier of the event
+ // that could be used for acking
+ std::string id;
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(eventVersion, bl);
+ encode(eventSource, bl);
+ encode(awsRegion, bl);
+ encode(eventTime, bl);
+ encode(eventName, bl);
+ encode(userIdentity, bl);
+ encode(sourceIPAddress, bl);
+ encode(x_amz_request_id, bl);
+ encode(x_amz_id_2, bl);
+ encode(s3SchemaVersion, bl);
+ encode(configurationId, bl);
+ encode(bucket_name, bl);
+ encode(bucket_ownerIdentity, bl);
+ encode(bucket_arn, bl);
+ encode(object_key, bl);
+ encode(object_size, bl);
+ encode(object_etag, bl);
+ encode(object_versionId, bl);
+ encode(object_sequencer, bl);
+ encode(id, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(eventVersion, bl);
+ decode(eventSource, bl);
+ decode(awsRegion, bl);
+ decode(eventTime, bl);
+ decode(eventName, bl);
+ decode(userIdentity, bl);
+ decode(sourceIPAddress, bl);
+ decode(x_amz_request_id, bl);
+ decode(x_amz_id_2, bl);
+ decode(s3SchemaVersion, bl);
+ decode(configurationId, bl);
+ decode(bucket_name, bl);
+ decode(bucket_ownerIdentity, bl);
+ decode(bucket_arn, bl);
+ decode(object_key, bl);
+ decode(object_size, bl);
+ decode(object_etag, bl);
+ decode(object_versionId, bl);
+ decode(object_sequencer, bl);
+ decode(id, bl);
+ DECODE_FINISH(bl);
+ }
+
+ void dump(Formatter *f) const;
+};
+WRITE_CLASS_ENCODER(rgw_pubsub_s3_record)
struct rgw_pubsub_event {
+ constexpr static const char* const json_type_single = "event";
+ constexpr static const char* const json_type_plural = "events";
string id;
string event;
string source;
struct rgw_pubsub_sub_config {
rgw_user user;
- string name;
- string topic;
+ std::string name;
+ std::string topic;
rgw_pubsub_sub_dest dest;
+ std::string s3_id;
void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
encode(user, bl);
encode(name, bl);
encode(topic, bl);
encode(dest, bl);
+ encode(s3_id, bl);
ENCODE_FINISH(bl);
}
decode(name, bl);
decode(topic, bl);
decode(dest, bl);
+ if (struct_v >= 2) {
+ decode(s3_id, bl);
+ }
DECODE_FINISH(bl);
}
struct rgw_pubsub_topic_subs {
rgw_pubsub_topic topic;
- set<string> subs;
+ std::set<string> subs;
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
};
WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs)
+typedef std::set<std::string, ltstr_nocase> EventTypeList;
+
struct rgw_pubsub_topic_filter {
rgw_pubsub_topic topic;
- set<string, ltstr_nocase> events;
+ EventTypeList events;
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter)
struct rgw_pubsub_bucket_topics {
- map<string, rgw_pubsub_topic_filter> topics;
+ std::map<string, rgw_pubsub_topic_filter> topics;
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics)
struct rgw_pubsub_user_topics {
- map<string, rgw_pubsub_topic_subs> topics;
+ std::map<std::string, rgw_pubsub_topic_subs> topics;
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
int read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker);
int write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker);
+
public:
RGWUserPubSub(RGWRados *_store, const rgw_user& _user) : store(_store),
user(_user),
}
int get_topics(rgw_pubsub_bucket_topics *result);
- int create_notification(const string& topic_name, const set<string, ltstr_nocase>& events);
+ int create_notification(const string& topic_name, const EventTypeList& events);
int remove_notification(const string& topic_name);
};
+ // base class for subscription
class Sub {
friend class RGWUserPubSub;
- RGWUserPubSub *ps;
- string sub;
+ protected:
+ RGWUserPubSub* const ps;
+ const std::string sub;
rgw_raw_obj sub_meta_obj;
int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker);
ps->get_sub_meta_obj(sub, &sub_meta_obj);
}
- int subscribe(const string& topic_name, const rgw_pubsub_sub_dest& dest);
+ virtual ~Sub() = default;
+
+ int subscribe(const string& topic_name, const rgw_pubsub_sub_dest& dest, const std::string& s3_id="");
int unsubscribe(const string& topic_name);
- int get_conf(rgw_pubsub_sub_config *result);
+ int get_conf(rgw_pubsub_sub_config* result);
+
+ static const int DEFAULT_MAX_EVENTS = 100;
+ // followint virtual methods should only be called in derived
+ virtual int list_events(const string& marker, int max_events) {ceph_assert(false);}
+ virtual int remove_event(const string& event_id) {ceph_assert(false);}
+ virtual void dump(Formatter* f) const {ceph_assert(false);}
+ };
+ // subscription with templated list of events to support both S3 compliant and Ceph specific events
+ template<typename EventType>
+ class SubWithEvents : public Sub {
+ private:
struct list_events_result {
- string next_marker;
+ std::string next_marker;
bool is_truncated{false};
- std::vector<rgw_pubsub_event> events;
-
void dump(Formatter *f) const;
- };
+ std::vector<EventType> events;
+ } list;
+
+ public:
+ SubWithEvents(RGWUserPubSub *_ps, const string& _sub) : Sub(_ps, _sub) {}
- int list_events(const string& marker, int max_events, list_events_result *result);
- int remove_event(const string& event_id);
+ virtual ~SubWithEvents() = default;
+
+ int list_events(const string& marker, int max_events) override;
+ int remove_event(const string& event_id) override;
+ void dump(Formatter* f) const override;
};
using BucketRef = std::shared_ptr<Bucket>;
SubRef get_sub(const string& sub) {
return std::make_shared<Sub>(this, sub);
}
+
+ SubRef get_sub_with_events(const string& sub) {
+ auto tmpsub = Sub(this, sub);
+ rgw_pubsub_sub_config conf;
+ if (tmpsub.get_conf(&conf) < 0) {
+ return nullptr;
+ }
+ if (conf.s3_id.empty()) {
+ return std::make_shared<SubWithEvents<rgw_pubsub_event>>(this, sub);
+ }
+ return std::make_shared<SubWithEvents<rgw_pubsub_s3_record>>(this, sub);
+ }
void get_user_meta_obj(rgw_raw_obj *obj) const {
*obj = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, user_meta_oid());
int remove_topic(const string& name);
};
+
template <class T>
int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker *objv_tracker)
{
using namespace rgw;
-std::string json_format_pubsub_event(const rgw_pubsub_event& event) {
+template<typename EventType>
+std::string json_format_pubsub_event(const EventType& event) {
std::stringstream ss;
JSONFormatter f(false);
- encode_json("event", event, &f);
+ encode_json(EventType::json_type_single, event, &f);
f.flush(ss);
return ss.str();
}
public:
RGWPubSubHTTPEndpoint(const std::string& _endpoint,
- const RGWHTTPArgs& args) :
- endpoint(_endpoint) {
- bool exists;
-
- str_ack_level = args.get("http-ack-level", &exists);
- if (!exists || str_ack_level == "any") {
- // "any" is default
- ack_level = ACK_LEVEL_ANY;
- } else if (str_ack_level == "non-error") {
- ack_level = ACK_LEVEL_NON_ERROR;
- } else {
- ack_level = std::atoi(str_ack_level.c_str());
- if (ack_level < 100 || ack_level >= 600) {
- throw configuration_error("HTTP: invalid http-ack-level " + str_ack_level);
- }
- }
+ const RGWHTTPArgs& args) : endpoint(_endpoint) {
+ bool exists;
- auto str_verify_ssl = args.get("verify-ssl", &exists);
- boost::algorithm::to_lower(str_verify_ssl);
- // verify server certificate by default
- if (!exists || str_verify_ssl == "true") {
- verify_ssl = true;
- } else if (str_verify_ssl == "false") {
- verify_ssl = false;
- } else {
- throw configuration_error("HTTP: verify-ssl must be true/false, not: " + str_verify_ssl);
+ str_ack_level = args.get("http-ack-level", &exists);
+ if (!exists || str_ack_level == "any") {
+ // "any" is default
+ ack_level = ACK_LEVEL_ANY;
+ } else if (str_ack_level == "non-error") {
+ ack_level = ACK_LEVEL_NON_ERROR;
+ } else {
+ ack_level = std::atoi(str_ack_level.c_str());
+ if (ack_level < 100 || ack_level >= 600) {
+ throw configuration_error("HTTP: invalid http-ack-level " + str_ack_level);
}
}
+ auto str_verify_ssl = args.get("verify-ssl", &exists);
+ boost::algorithm::to_lower(str_verify_ssl);
+ // verify server certificate by default
+ if (!exists || str_verify_ssl == "true") {
+ verify_ssl = true;
+ } else if (str_verify_ssl == "false") {
+ verify_ssl = false;
+ } else {
+ throw configuration_error("HTTP: verify-ssl must be true/false, not: " + str_verify_ssl);
+ }
+ }
+
RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl);
}
+ RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
+ return new PostCR(json_format_pubsub_event(record), env, endpoint, ack_level, verify_ssl);
+ }
+
std::string to_str() const override {
std::string str("HTTP Endpoint");
str += "\nURI: " + endpoint;
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
- private:
- enum ack_level_t {
- ACK_LEVEL_NONE,
- ACK_LEVEL_BROKER,
- ACK_LEVEL_ROUTEABLE
- };
- const std::string endpoint;
- const std::string topic;
- amqp::connection_ptr_t conn;
- ack_level_t ack_level;
- std::string str_ack_level;
-
- static std::string get_exchange(const RGWHTTPArgs& args) {
- bool exists;
- const auto exchange = args.get("amqp-exchange", &exists);
- if (!exists) {
- throw configuration_error("AMQP: missing amqp-exchange");
- }
- return exchange;
+private:
+ enum ack_level_t {
+ ACK_LEVEL_NONE,
+ ACK_LEVEL_BROKER,
+ ACK_LEVEL_ROUTEABLE
+ };
+ const std::string endpoint;
+ const std::string topic;
+ amqp::connection_ptr_t conn;
+ ack_level_t ack_level;
+ std::string str_ack_level;
+
+ static std::string get_exchange(const RGWHTTPArgs& args) {
+ bool exists;
+ const auto exchange = args.get("amqp-exchange", &exists);
+ if (!exists) {
+ throw configuration_error("AMQP: missing amqp-exchange");
}
+ return exchange;
+ }
// NoAckPublishCR implements async amqp publishing via coroutine
// This coroutine ends when it send the message and does not wait for an ack
}
};
- public:
- RGWPubSubAMQPEndpoint(const std::string& _endpoint,
- const std::string& _topic,
- const RGWHTTPArgs& args) :
- endpoint(_endpoint),
- topic(_topic),
- conn(amqp::connect(endpoint, get_exchange(args))) {
- bool exists;
- // get ack level
- str_ack_level = args.get("amqp-ack-level", &exists);
- if (!exists || str_ack_level == "broker") {
- // "broker" is default
- ack_level = ACK_LEVEL_BROKER;
- } else if (str_ack_level == "none") {
- ack_level = ACK_LEVEL_NONE;
- } else if (str_ack_level == "routable") {
- ack_level = ACK_LEVEL_ROUTEABLE;
- } else {
- throw configuration_error("HTTP: invalid amqp-ack-level " + str_ack_level);
- }
+public:
+ RGWPubSubAMQPEndpoint(const std::string& _endpoint,
+ const std::string& _topic,
+ const RGWHTTPArgs& args) :
+ endpoint(_endpoint),
+ topic(_topic),
+ conn(amqp::connect(endpoint, get_exchange(args))) {
+ bool exists;
+ // get ack level
+ str_ack_level = args.get("amqp-ack-level", &exists);
+ if (!exists || str_ack_level == "broker") {
+ // "broker" is default
+ ack_level = ACK_LEVEL_BROKER;
+ } else if (str_ack_level == "none") {
+ ack_level = ACK_LEVEL_NONE;
+ } else if (str_ack_level == "routable") {
+ ack_level = ACK_LEVEL_ROUTEABLE;
+ } else {
+ throw configuration_error("HTTP: invalid amqp-ack-level " + str_ack_level);
}
+ }
- RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
- if (ack_level == ACK_LEVEL_NONE) {
- return new NoAckPublishCR(env, topic, conn, json_format_pubsub_event(event));
- } else {
- // TODO: currently broker and routable are the same - this will require different flags
- // but the same mechanism
- return new AckPublishCR(env, topic, conn, json_format_pubsub_event(event), ack_level);
- }
+ RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
+ if (ack_level == ACK_LEVEL_NONE) {
+ return new NoAckPublishCR(env, topic, conn, json_format_pubsub_event(event));
+ } else {
+ // TODO: currently broker and routable are the same - this will require different flags
+ // but the same mechanism
+ return new AckPublishCR(env, topic, conn, json_format_pubsub_event(event), ack_level);
}
-
- std::string to_str() const override {
- std::string str("AMQP(0.9.1) Endpoint");
- str += "\nURI: " + endpoint;
- str += "\nTopic: " + topic;
- str += "\nAck Level: " + str_ack_level;
- return str;
+ }
+
+ RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
+ if (ack_level == ACK_LEVEL_NONE) {
+ return new NoAckPublishCR(env, topic, conn, json_format_pubsub_event(record));
+ } else {
+ // TODO: currently broker and routable are the same - this will require different flags
+ // but the same mechanism
+ return new AckPublishCR(env, topic, conn, json_format_pubsub_event(record), ack_level);
}
+ }
+
+ std::string to_str() const override {
+ std::string str("AMQP(0.9.1) Endpoint");
+ str += "\nURI: " + endpoint;
+ str += "\nTopic: " + topic;
+ str += "\nAck Level: " + str_ack_level;
+ return str;
+ }
};
static const std::string AMQP_0_9_1("0-9-1");
class RGWCoroutine;
class RGWHTTPArgs;
struct rgw_pubsub_event;
+struct rgw_pubsub_s3_record;
// endpoint base class all endpoint - types should derive from it
class RGWPubSubEndpoint {
// may throw a configuration_error if creation fails
static Ptr create(const std::string& endpoint, const std::string& topic, const RGWHTTPArgs& args);
- // this method is used in order to send notification and wait for completion
+ // this method is used in order to send notification (Ceph specific) and wait for completion
// in async manner via a coroutine
virtual RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) = 0;
+ // this method is used in order to send notification (S3 compliant) and wait for completion
+ // in async manner via a coroutine
+ virtual RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) = 0;
+
// present as string
virtual std::string to_str() const { return ""; }
uint64_t op = get_op();
if (!verify_user_permission(this,
s,
- rgw::IAM::ARN(resource_name,
+ rgw::ARN(resource_name,
"role",
s->user->user_id.tenant, true),
op)) {
string resource_name = role_path + role_name;
if (!verify_user_permission(this,
s,
- rgw::IAM::ARN(resource_name,
+ rgw::ARN(resource_name,
"role",
s->user->user_id.tenant, true),
get_op())) {
string resource_name = role.get_path() + role.get_name();
if (!verify_user_permission(this,
s,
- rgw::IAM::ARN(resource_name,
+ rgw::ARN(resource_name,
"role",
s->user->user_id.tenant, true),
get_op())) {
if (!verify_user_permission(this,
s,
- rgw::IAM::ARN(),
+ rgw::ARN(),
get_op())) {
return -EACCES;
}
int RGWSTSGetSessionToken::verify_permission()
{
- rgw::IAM::Partition partition = rgw::IAM::Partition::aws;
- rgw::IAM::Service service = rgw::IAM::Service::s3;
+ rgw::Partition partition = rgw::Partition::aws;
+ rgw::Service service = rgw::Service::s3;
if (!verify_user_permission(this,
s,
- rgw::IAM::ARN(partition, service, "", s->user->user_id.tenant, ""),
+ rgw::ARN(partition, service, "", s->user->user_id.tenant, ""),
rgw::IAM::stsGetSessionToken)) {
return -EACCES;
}
uint64_t op = get_op();
string user_name = s->info.args.get("UserName");
rgw_user user_id(user_name);
- if (! verify_user_permission(this, s, rgw::IAM::ARN(rgw::IAM::ARN(user_id.id,
+ if (! verify_user_permission(this, s, rgw::ARN(rgw::ARN(user_id.id,
"user",
user_id.tenant)), op)) {
return -EACCES;
int AssumedRoleUser::generateAssumedRoleUser(CephContext* cct,
RGWRados *store,
const string& roleId,
- const rgw::IAM::ARN& roleArn,
+ const rgw::ARN& roleArn,
const string& roleSessionName)
{
string resource = std::move(roleArn.resource);
resource.append("/");
resource.append(roleSessionName);
- rgw::IAM::ARN assumed_role_arn(rgw::IAM::Partition::aws,
- rgw::IAM::Service::sts,
+ rgw::ARN assumed_role_arn(rgw::Partition::aws,
+ rgw::Service::sts,
"", roleArn.account, resource);
arn = assumed_role_arn.to_string();
std::tuple<int, RGWRole> STSService::getRoleInfo(const string& arn)
{
- if (auto r_arn = rgw::IAM::ARN::parse(arn); r_arn) {
+ if (auto r_arn = rgw::ARN::parse(arn); r_arn) {
auto pos = r_arn->resource.find_last_of('/');
string roleName = r_arn->resource.substr(pos + 1);
RGWRole role(cct, store, roleName, r_arn->account);
response.sub = req.getSub();
//Get the role info which is being assumed
- boost::optional<rgw::IAM::ARN> r_arn = rgw::IAM::ARN::parse(req.getRoleARN());
+ boost::optional<rgw::ARN> r_arn = rgw::ARN::parse(req.getRoleARN());
if (r_arn == boost::none) {
response.assumeRoleResp.retCode = -EINVAL;
return response;
response.packedPolicySize = 0;
//Get the role info which is being assumed
- boost::optional<rgw::IAM::ARN> r_arn = rgw::IAM::ARN::parse(req.getRoleARN());
+ boost::optional<rgw::ARN> r_arn = rgw::ARN::parse(req.getRoleARN());
if (r_arn == boost::none) {
response.retCode = -EINVAL;
return response;
int generateAssumedRoleUser( CephContext* cct,
RGWRados *store,
const string& roleId,
- const rgw::IAM::ARN& roleArn,
+ const rgw::ARN& roleArn,
const string& roleSessionName);
const string& getARN() const { return arn; }
const string& getAssumeRoleId() const { return assumeRoleId; }
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
#include "rgw_common.h"
#include "rgw_coroutine.h"
#include "rgw_sync_module.h"
#include "rgw_pubsub_push.h"
#include "rgw_perf_counters.h"
+#include <boost/algorithm/hex.hpp>
#include <boost/asio/yield.hpp>
#define dout_subsys ceph_subsys_rgw
config:
-{
- "tenant": <tenant>, # default: <empty>
- "uid": <uid>, # default: "pubsub"
- "data_bucket_prefix": <prefix> # default: "pubsub-"
- "data_oid_prefix": <prefix> #
-
- "events_retention_days": <days> # default: 7
-
- # non-dynamic config
- "notifications": [
- {
- "path": <notification-path>, # this can be either an explicit path: <bucket>, or <bucket>/<object>,
- # or a prefix if it ends with a wildcard
- "topic": <topic-name>
- },
- ...
- ],
- "subscriptions": [
- {
- "name": <subscription-name>,
- "topic": <topic>,
- "push_endpoint": <endpoint>,
- "args:" <arg list>. # any push endpoint specific args (include all args)
- "data_bucket": <bucket>, # override name of bucket where subscription data will be store
- "data_oid_prefix": <prefix> # set prefix for subscription data object ids
- },
- ...
- ]
-}
-
-*/
-
-/*
-
-config:
-
{
"tenant": <tenant>, # default: <empty>
"uid": <uid>, # default: "pubsub"
"args:" <arg list>. # any push endpoint specific args (include all args)
"data_bucket": <bucket>, # override name of bucket where subscription data will be store
"data_oid_prefix": <prefix> # set prefix for subscription data object ids
+ "s3_id": <id> # in case of S3 compatible notifications, the notification ID will be set here
},
...
]
return args;
}
-struct PSSubConfig { /* subscription config */
- string name;
- string topic;
- string push_endpoint_name;
- string push_endpoint_args;
+struct PSSubConfig {
+ std::string name;
+ std::string topic;
+ std::string push_endpoint_name;
+ std::string push_endpoint_args;
+ std::string data_bucket_name;
+ std::string data_oid_prefix;
+ std::string s3_id;
RGWPubSubEndpoint::Ptr push_endpoint;
- string data_bucket_name;
- string data_oid_prefix;
-
void from_user_conf(CephContext *cct, const rgw_pubsub_sub_config& uc) {
name = uc.name;
topic = uc.topic;
push_endpoint_name = uc.dest.push_endpoint;
data_bucket_name = uc.dest.bucket_name;
data_oid_prefix = uc.dest.oid_prefix;
+ s3_id = uc.s3_id;
if (push_endpoint_name != "") {
push_endpoint_args = uc.dest.push_endpoint_args;
try {
encode_json("args", push_endpoint_args, f);
encode_json("data_bucket_name", data_bucket_name, f);
encode_json("data_oid_prefix", data_oid_prefix, f);
+ encode_json("s3_id", s3_id, f);
}
void init(CephContext *cct, const JSONFormattable& config,
string default_bucket_name = data_bucket_prefix + name;
data_bucket_name = config["data_bucket"](default_bucket_name.c_str());
data_oid_prefix = config["data_oid_prefix"](default_oid_prefix.c_str());
+ s3_id = config["s3_id"];
if (!push_endpoint_name.empty()) {
push_endpoint_args = config["push_endpoint_args"];
try {
push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, topic, string_to_args(push_endpoint_args));
} catch (const RGWPubSubEndpoint::configuration_error& e) {
- ldout(cct, 0) << "ERROR: failed to create push endpoint: "
+ ldout(cct, 1) << "ERROR: failed to create push endpoint: "
<< push_endpoint_name << " due to: " << e.what() << dendl;
}
}
using PSSubConfigRef = std::shared_ptr<PSSubConfig>;
struct PSTopicConfig {
- string name;
- set<string> subs;
+ std::string name;
+ std::set<std::string> subs;
void dump(Formatter *f) const {
encode_json("name", name, f);
}
using PSTopicConfigRef = std::shared_ptr<PSTopicConfig>;
-using TopicsRef = std::shared_ptr<vector<PSTopicConfigRef>>;
-
+using TopicsRef = std::shared_ptr<std::vector<PSTopicConfigRef>>;
struct PSConfig {
string id{"pubsub"};
uint64_t max_id{0};
/* FIXME: no hard coded buckets, we'll have configurable topics */
- map<string, PSSubConfigRef> subs;
- map<string, PSTopicConfigRef> topics;
- multimap<string, PSNotificationConfig> notifications;
+ std::map<std::string, PSSubConfigRef> subs;
+ std::map<std::string, PSTopicConfigRef> topics;
+ std::multimap<std::string, PSNotificationConfig> notifications;
void dump(Formatter *f) const {
encode_json("id", id, f);
continue;
}
- ldout(cct, 10) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id << " target_path=" << target.path << ", topic=" << target.topic << dendl;
+ ldout(cct, 20) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id <<
+ " target_path=" << target.path << ", topic=" << target.topic << dendl;
(*result)->push_back(topic->second);
} while (iter != notifications.begin());
}
}
using PSConfigRef = std::shared_ptr<PSConfig>;
-using EventRef = std::shared_ptr<rgw_pubsub_event>;
+template<typename EventType>
+using EventRef = std::shared_ptr<EventType>;
struct objstore_event {
string id;
}
};
+static void set_event_id(std::string& id, const std::string& hash, const utime_t& ts) {
+ char buf[64];
+ const auto len = snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str());
+ if (len > 0) {
+ id.assign(buf, len);
+ }
+}
+
static void make_event_ref(CephContext *cct, const rgw_bucket& bucket,
const rgw_obj_key& key,
const ceph::real_time& mtime,
const std::vector<std::pair<std::string, std::string> > *attrs,
const string& event_name,
- EventRef *event) {
+ EventRef<rgw_pubsub_event> *event) {
*event = std::make_shared<rgw_pubsub_event>();
- EventRef& e = *event;
+ EventRef<rgw_pubsub_event>& e = *event;
e->event = event_name;
e->source = bucket.name + "/" + key.name;
e->timestamp = real_clock::now();
objstore_event oevent(bucket, key, mtime, attrs);
- string hash = oevent.get_hash();
- utime_t ts(e->timestamp);
- char buf[64];
- snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str());
- e->id = buf;
+ const utime_t ts(e->timestamp);
+ set_event_id(e->id, oevent.get_hash(), ts);
encode_json("info", oevent, &e->info);
}
+static void make_s3_record_ref(CephContext *cct, const rgw_bucket& bucket,
+ const rgw_obj_key& key,
+ const ceph::real_time& mtime,
+ const std::vector<std::pair<std::string, std::string> > *attrs,
+ const string& event_name,
+ EventRef<rgw_pubsub_s3_record> *record) {
+ *record = std::make_shared<rgw_pubsub_s3_record>();
+
+ EventRef<rgw_pubsub_s3_record>& r = *record;
+ r->eventVersion = "2.1";
+ r->eventSource = "aws:s3";
+ r->eventTime = mtime;
+ r->eventName = event_name;
+ r->userIdentity = ""; // not supported yet
+ r->sourceIPAddress = ""; // not supported yet
+ r->x_amz_request_id = ""; // not supported yet
+ r->x_amz_id_2 = ""; // TODO: get that?
+ r->s3SchemaVersion = "1.0";
+ // configurationId is filled from subscription configuration
+ r->bucket_name = bucket.name;
+ r->bucket_ownerIdentity = bucket.tenant;
+ r->bucket_arn = to_string(rgw::ARN(bucket));
+ r->object_key = key.name;
+ r->object_size = 0; // TODO: get that?
+ objstore_event oevent(bucket, key, mtime, attrs);
+ r->object_etag = oevent.get_hash();
+ r->object_versionId = ""; // not supported yet
+
+ // use timestamp as per key sequence id (hex encoded)
+ const utime_t ts(real_clock::now());
+ boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t),
+ std::back_inserter(r->object_sequencer));
+
+ // event ID is rgw extension (not in the S3 spec), used for acking the event
+ // same format is used in both S3 compliant and Ceph specific events
+ set_event_id(r->id, r->object_etag, ts);
+}
+
class PSManager;
using PSManagerRef = std::shared_ptr<PSManager>;
using PSEnvRef = std::shared_ptr<PSEnv>;
+template<typename EventType>
class PSEvent {
- const EventRef event;
+ const EventRef<EventType> event;
public:
- PSEvent(const EventRef& _event) : event(_event) {}
+ PSEvent(const EventRef<EventType>& _event) : event(_event) {}
void format(bufferlist *bl) const {
- bl->append(json_str("event", *event));
+ bl->append(json_str(EventType::json_type_single, *event));
}
void encode_event(bufferlist& bl) const {
RGWDataSyncEnv *sync_env;
PSEnvRef env;
PSSubConfigRef sub_conf;
- shared_ptr<rgw_get_bucket_info_result> get_bucket_info_result;
+ std::shared_ptr<rgw_get_bucket_info_result> get_bucket_info_result;
RGWBucketInfo *bucket_info{nullptr};
RGWDataAccessRef data_access;
RGWDataAccess::BucketRef bucket;
- struct push_endpoint_info {
- shared_ptr<RGWRESTConn> conn;
- string path;
- } push;
-
InitCR *init_cr{nullptr};
class InitBucketLifecycleCR : public RGWCoroutine {
InitBucketLifecycleCR(RGWDataSyncEnv *_sync_env,
PSConfigRef& _conf,
RGWBucketInfo& _bucket_info,
- map<string, bufferlist>& _bucket_attrs) : RGWCoroutine(_sync_env->cct),
+ std::map<string, bufferlist>& _bucket_attrs) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
conf(_conf) {
lc_config.bucket_info = _bucket_info;
if (old_rule.get_prefix().empty() &&
old_rule.get_expiration().get_days() == retention_days &&
old_rule.is_enabled()) {
- ldout(sync_env->cct, 20) << "no need to set lifecycle rule on bucketi, existing rule matches config" << dendl;
+ ldout(sync_env->cct, 20) << "no need to set lifecycle rule on bucket, existing rule matches config" << dendl;
return set_cr_done();
}
}
get_bucket_info,
sub->get_bucket_info_result));
if (retcode < 0 && retcode != -ENOENT) {
- ldout(sync_env->cct, 0) << "ERROR: failed to geting bucket info: " << "tenant="
+ ldout(sync_env->cct, 1) << "ERROR: failed to geting bucket info: " << "tenant="
<< get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
}
if (retcode == 0) {
int ret = sub->data_access->get_bucket(result->bucket_info, result->attrs, &sub->bucket);
if (ret < 0) {
- ldout(sync_env->cct, 0) << "ERROR: data_access.get_bucket() bucket=" << result->bucket_info.bucket << " failed, ret=" << ret << dendl;
+ ldout(sync_env->cct, 1) << "ERROR: data_access.get_bucket() bucket=" << result->bucket_info.bucket << " failed, ret=" << ret << dendl;
return set_cr_error(ret);
}
}
sub->get_bucket_info_result->bucket_info,
sub->get_bucket_info_result->attrs));
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf->data_bucket_name << ") ret=" << retcode << dendl;
+ ldout(sync_env->cct, 1) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf->data_bucket_name << ") ret=" << retcode << dendl;
return set_cr_error(retcode);
}
sync_env->store,
create_bucket));
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to create bucket: " << "tenant="
+ ldout(sync_env->cct, 1) << "ERROR: failed to create bucket: " << "tenant="
<< get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
return set_cr_error(retcode);
}
}
/* failed twice on -ENOENT, unexpected */
- ldout(sync_env->cct, 0) << "ERROR: failed to create bucket " << "tenant=" << get_bucket_info.tenant
+ ldout(sync_env->cct, 1) << "ERROR: failed to create bucket " << "tenant=" << get_bucket_info.tenant
<< " name=" << get_bucket_info.bucket_name << dendl;
return set_cr_error(-EIO);
}
}
};
+ template<typename EventType>
class StoreEventCR : public RGWCoroutine {
RGWDataSyncEnv* const sync_env;
const PSSubscriptionRef sub;
- const PSEvent pse;
+ const PSEvent<EventType> pse;
const string oid_prefix;
public:
StoreEventCR(RGWDataSyncEnv* const _sync_env,
const PSSubscriptionRef& _sub,
- const EventRef& _event) : RGWCoroutine(_sync_env->cct),
+ const EventRef<EventType>& _event) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
sub(_sub),
pse(_event),
}
int operate() override {
- // TODO: in case of "push-only" subscription no need to store event
rgw_object_simple_put_params put_obj;
reenter(this) {
sync_env->store,
put_obj));
if (retcode < 0) {
- ldout(sync_env->cct, 10) << "ERROR: failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl;
+ ldout(sync_env->cct, 1) << "ERROR: failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl;
if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
return set_cr_error(retcode);
} else {
}
};
+ template<typename EventType>
class PushEventCR : public RGWCoroutine {
RGWDataSyncEnv* const sync_env;
- const EventRef event;
+ const EventRef<EventType> event;
const PSSubConfigRef& sub_conf;
public:
PushEventCR(RGWDataSyncEnv* const _sync_env,
const PSSubscriptionRef& _sub,
- const EventRef& _event) : RGWCoroutine(_sync_env->cct),
+ const EventRef<EventType>& _event) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
event(_event),
sub_conf(_sub->sub_conf) {
return init_cr->execute(caller);
}
- static RGWCoroutine *store_event_cr(RGWDataSyncEnv* const sync_env, const PSSubscriptionRef& sub, const EventRef& event) {
- return new StoreEventCR(sync_env, sub, event);
+ template<typename EventType>
+ static RGWCoroutine *store_event_cr(RGWDataSyncEnv* const sync_env, const PSSubscriptionRef& sub, const EventRef<EventType>& event) {
+ return new StoreEventCR<EventType>(sync_env, sub, event);
}
- static RGWCoroutine *push_event_cr(RGWDataSyncEnv* const sync_env, const PSSubscriptionRef& sub, const EventRef& event) {
- return new PushEventCR(sync_env, sub, event);
+ template<typename EventType>
+ static RGWCoroutine *push_event_cr(RGWDataSyncEnv* const sync_env, const PSSubscriptionRef& sub, const EventRef<EventType>& event) {
+ return new PushEventCR<EventType>(sync_env, sub, event);
}
friend class InitCR;
};
-
class PSManager
{
RGWDataSyncEnv *sync_env;
PSEnvRef env;
- map<string, PSSubscriptionRef> subs;
+ std::map<string, PSSubscriptionRef> subs;
class GetSubCR : public RGWSingletonCR<PSSubscriptionRef> {
RGWDataSyncEnv *sync_env;
PSSubConfigRef sub_conf;
rgw_pubsub_sub_config user_sub_conf;
+
public:
GetSubCR(RGWDataSyncEnv *_sync_env,
PSManagerRef& _mgr,
ref(_ref),
conf(mgr->env->conf) {
}
- ~GetSubCR() {
- }
+ ~GetSubCR() { }
int operate() override {
reenter(this) {
if (owner.empty()) {
if (!conf->find_sub(sub_name, &sub_conf)) {
- ldout(sync_env->cct, 10) << "ERROR: could not find subscription config: name=" << sub_name << dendl;
+ ldout(sync_env->cct, 1) << "ERROR: could not find subscription config: name=" << sub_name << dendl;
mgr->remove_get_sub(owner, sub_name);
return set_cr_error(-ENOENT);
}
yield (*ref)->call_init_cr(this);
if (retcode < 0) {
- ldout(sync_env->cct, 10) << "ERROR: failed to init subscription" << dendl;
+ ldout(sync_env->cct, 1) << "ERROR: failed to init subscription" << dendl;
mgr->remove_get_sub(owner, sub_name);
return set_cr_error(retcode);
}
return owner_prefix + sub_name;
}
- map<string, GetSubCR *> get_subs;
+ std::map<std::string, GetSubCR *> get_subs;
GetSubCR *& get_get_subs(const rgw_user& owner, const string& name) {
return get_subs[sub_id(owner, name)];
env(_env), conf(env->conf) {}
int operate() override {
reenter(this) {
- ldout(sync_env->cct, 0) << ": init pubsub config zone=" << sync_env->source_zone << dendl;
+ ldout(sync_env->cct, 5) << ": init pubsub config zone=" << sync_env->source_zone << dendl;
/* nothing to do here right now */
create_user.user = conf->user;
create_user.generate_key = false;
yield call(new RGWUserCreateCR(sync_env->async_rados, sync_env->store, create_user));
if (retcode < 0) {
- ldout(sync_env->store->ctx(), 0) << "ERROR: failed to create rgw user: ret=" << retcode << dendl;
+ ldout(sync_env->store->ctx(), 1) << "ERROR: failed to create rgw user: ret=" << retcode << dendl;
return set_cr_error(retcode);
}
get_user_info.user = conf->user;
yield call(new RGWGetUserInfoCR(sync_env->async_rados, sync_env->store, get_user_info, env->data_user_info));
if (retcode < 0) {
- ldout(sync_env->store->ctx(), 0) << "ERROR: failed to create rgw user: ret=" << retcode << dendl;
+ ldout(sync_env->store->ctx(), 1) << "ERROR: failed to create rgw user: ret=" << retcode << dendl;
return set_cr_error(retcode);
}
RGWDataSyncEnv* const sync_env;
const PSEnvRef env;
const rgw_user& owner;
- const EventRef event;
+ const EventRef<rgw_pubsub_event> event;
+ const EventRef<rgw_pubsub_s3_record> record;
const TopicsRef topics;
const std::array<rgw_user, 2> owners;
bool has_subscriptions;
bool sub_conf_found;
PSSubscriptionRef sub;
std::array<rgw_user, 2>::const_iterator oiter;
- vector<PSTopicConfigRef>::const_iterator titer;
- set<string>::const_iterator siter;
+ std::vector<PSTopicConfigRef>::const_iterator titer;
+ std::set<string>::const_iterator siter;
int last_sub_conf_error;
public:
RGWPSHandleObjEventCR(RGWDataSyncEnv* const _sync_env,
const PSEnvRef _env,
const rgw_user& _owner,
- const EventRef& _event,
+ const EventRef<rgw_pubsub_event>& _event,
+ const EventRef<rgw_pubsub_s3_record>& _record,
const TopicsRef& _topics) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
env(_env),
owner(_owner),
event(_event),
+ record(_record),
topics(_topics),
owners({owner, rgw_user{}}),
has_subscriptions(false),
int operate() override {
reenter(this) {
- ldout(sync_env->cct, 10) << ": handle event: obj: z=" << sync_env->source_zone
+ ldout(sync_env->cct, 20) << ": handle event: obj: z=" << sync_env->source_zone
<< " event=" << json_str("event", *event, false)
<< " owner=" << owner << dendl;
ldout(sync_env->cct, 20) << "pubsub: " << topics->size() << " topics found for path" << dendl;
-
- if (topics->empty()) {
- // if event has no topics - no further processing is needed
- return set_cr_done();
- }
+
+ // outside caller should check that
+ ceph_assert(!topics->empty());
if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_triggered);
for (titer = topics->begin(); titer != topics->end(); ++titer) {
- ldout(sync_env->cct, 10) << ": notification for " << event->source << ": topic=" <<
+ ldout(sync_env->cct, 20) << ": notification for " << event->source << ": topic=" <<
(*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
- ldout(sync_env->cct, 10) << ": subscription: " << *siter << dendl;
+ ldout(sync_env->cct, 20) << ": subscription: " << *siter << dendl;
has_subscriptions = true;
sub_conf_found = false;
for (oiter = owners.begin(); oiter != owners.end(); ++oiter) {
continue;
}
sub_conf_found = true;
-
- ldout(sync_env->cct, 20) << "storing event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
- yield call(PSSubscription::store_event_cr(sync_env, sub, event));
- if (retcode < 0) {
- ldout(sync_env->cct, 10) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
+ if (sub->sub_conf->s3_id.empty()) {
+ // subscription was not made by S3 compatible API
+ ldout(sync_env->cct, 20) << "storing event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
+ yield call(PSSubscription::store_event_cr(sync_env, sub, event));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
+ } else {
+ event_handled = true;
+ }
+ if (sub->sub_conf->push_endpoint) {
+ ldout(sync_env->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
+ yield call(PSSubscription::push_event_cr(sync_env, sub, event));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl;
+ } else {
+ event_handled = true;
+ }
+ }
} else {
- event_handled = true;
- }
- if (sub->sub_conf->push_endpoint) {
- ldout(sync_env->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
- yield call(PSSubscription::push_event_cr(sync_env, sub, event));
+ // subscription was made by S3 compatible API
+ ldout(sync_env->cct, 20) << "storing record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
+ record->configurationId = sub->sub_conf->s3_id;
+ yield call(PSSubscription::store_event_cr(sync_env, sub, record));
if (retcode < 0) {
- ldout(sync_env->cct, 10) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl;
+ ldout(sync_env->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl;
} else {
event_handled = true;
}
+ if (sub->sub_conf->push_endpoint) {
+ ldout(sync_env->cct, 20) << "push record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
+ yield call(PSSubscription::push_event_cr(sync_env, sub, record));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl;
+ } else {
+ event_handled = true;
+ }
+ }
}
}
if (!sub_conf_found) {
// could not find conf for subscription at user or global levels
- ldout(sync_env->cct, 10) << "ERROR: failed to find subscription config for subscription=" << *siter
+ ldout(sync_env->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter
<< " ret=" << last_sub_conf_error << dendl;
}
}
}
};
-
+// coroutine invoked on remote object creation
class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
RGWDataSyncEnv *sync_env;
PSEnvRef env;
std::optional<uint64_t> versioned_epoch;
- EventRef event;
+ EventRef<rgw_pubsub_event> event;
+ EventRef<rgw_pubsub_s3_record> record;
TopicsRef topics;
public:
RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
}
int operate() override {
reenter(this) {
- ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone
+ ldout(sync_env->cct, 20) << ": stat of remote obj: z=" << sync_env->source_zone
<< " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
<< " attrs=" << attrs << dendl;
{
k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1);
}
attrs.push_back(std::make_pair(k, attr.second));
- }
+ }
+ // at this point we don't know whether we need the ceph event or S3 record
+ // this is why both are created here, once we have information about the
+ // subscription, we will store/push only the relevant ones
make_event_ref(sync_env->cct,
bucket_info.bucket, key,
mtime, &attrs,
EVENT_NAME_OBJECT_CREATE, &event);
+ make_s3_record_ref(sync_env->cct,
+ bucket_info.bucket, key,
+ mtime, &attrs,
+ EVENT_NAME_OBJECT_CREATE, &record);
}
- yield call(new RGWPSHandleObjEventCR(sync_env, env, bucket_info.owner, event, topics));
+ yield call(new RGWPSHandleObjEventCR(sync_env, env, bucket_info.owner, event, record, topics));
if (retcode < 0) {
return set_cr_error(retcode);
}
}
};
+// coroutine invoked on remote object deletion
class RGWPSGenericObjEventCBCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
PSEnvRef env;
rgw_obj_key key;
ceph::real_time mtime;
string event_name;
- EventRef event;
+ EventRef<rgw_pubsub_event> event;
+ EventRef<rgw_pubsub_s3_record> record;
TopicsRef topics;
public:
RGWPSGenericObjEventCBCR(RGWDataSyncEnv *_sync_env,
<< " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl;
yield call(new RGWPSFindBucketTopicsCR(sync_env, env, owner, bucket, key, event_name, &topics));
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
+ ldout(sync_env->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
return set_cr_error(retcode);
}
if (topics->empty()) {
ldout(sync_env->cct, 20) << "no topics found for " << bucket << "/" << key << dendl;
return set_cr_done();
}
- {
- make_event_ref(sync_env->cct,
- bucket, key,
- mtime, nullptr,
- event_name, &event);
- }
-
- yield call(new RGWPSHandleObjEventCR(sync_env, env, owner, event, topics));
+ // at this point we don't know whether we need the ceph event or S3 record
+ // this is why both are created here, once we have information about the
+ // subscription, we will store/push only the relevant ones
+ make_event_ref(sync_env->cct,
+ bucket, key,
+ mtime, nullptr,
+ event_name, &event);
+ make_s3_record_ref(sync_env->cct,
+ bucket, key,
+ mtime, nullptr,
+ event_name, &record);
+ yield call(new RGWPSHandleObjEventCR(sync_env, env, owner, event, record, topics));
if (retcode < 0) {
return set_cr_error(retcode);
}
class RGWPSDataSyncModule : public RGWDataSyncModule {
PSEnvRef env;
PSConfigRef& conf;
+
public:
RGWPSDataSyncModule(CephContext *cct, const JSONFormattable& config) : env(std::make_shared<PSEnv>()), conf(env->conf) {
env->init(cct, config);
}
+
~RGWPSDataSyncModule() override {}
void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override {
ldout(sync_env->cct, 5) << conf->id << ": start" << dendl;
return new RGWPSInitEnvCBCR(sync_env, env);
}
- RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
+
+ RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info,
+ rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
+ ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket <<
+ " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
return new RGWPSHandleObjCreateCR(sync_env, bucket_info, key, env, versioned_epoch);
}
- RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+
+ RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info,
+ rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
+ ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket <<
+ " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, OBJECT_DELETE);
}
- RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
- rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
- << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+
+ RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info,
+ rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
+ ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket <<
+ " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, DELETE_MARKER_CREATE);
}
string jconf = json_str("conf", *data_handler->get_conf());
JSONParser p;
if (!p.parse(jconf.c_str(), jconf.size())) {
- ldout(cct, 0) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl;
+ ldout(cct, 1) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl;
effective_conf = config;
} else {
effective_conf.decode_json(&p);
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <algorithm>
#include "rgw_sync_module_pubsub.h"
#include "rgw_sync_module_pubsub_rest.h"
#include "rgw_pubsub.h"
#include "rgw_op.h"
#include "rgw_rest.h"
#include "rgw_rest_s3.h"
+#include "rgw_arn.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
}
}
+// create a topic
+// command: PUT /topics/<topic-name>
class RGWPSCreateTopic_ObjStore_S3 : public RGWPSCreateTopicOp {
public:
explicit RGWPSCreateTopic_ObjStore_S3() {}
}
+// list all topics
+// command: GET /topics
class RGWPSListTopics_ObjStore_S3 : public RGWPSListTopicsOp {
public:
explicit RGWPSListTopics_ObjStore_S3() {}
}
}
+// get topic information (including subscriptions)
+// command: GET /topics/<topic-name>
class RGWPSGetTopic_ObjStore_S3 : public RGWPSGetTopicOp {
public:
explicit RGWPSGetTopic_ObjStore_S3() {}
}
}
+// delete a topic
+// command: DELETE /topics/<topic-name>
class RGWPSDeleteTopic_ObjStore_S3 : public RGWPSDeleteTopicOp {
public:
explicit RGWPSDeleteTopic_ObjStore_S3() {}
}
};
+// topics handler factory
class RGWHandler_REST_PSTopic_S3 : public RGWHandler_REST_S3 {
protected:
int init_permissions(RGWOp* op) override {
virtual ~RGWHandler_REST_PSTopic_S3() {}
};
-
class RGWPSCreateSubOp : public RGWDefaultResponseOp {
protected:
string sub_name;
}
}
+// create a subscription
+// command: PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]...
class RGWPSCreateSub_ObjStore_S3 : public RGWPSCreateSubOp {
public:
explicit RGWPSCreateSub_ObjStore_S3() {}
}
}
+// get subscription configuration
+// command: GET /subscriptions/<sub-name>
class RGWPSGetSub_ObjStore_S3 : public RGWPSGetSubOp {
public:
explicit RGWPSGetSub_ObjStore_S3() {}
}
}
+// delete subscription
+// Command: DELETE /subscriptions/<sub-name>
class RGWPSDeleteSub_ObjStore_S3 : public RGWPSDeleteSubOp {
public:
explicit RGWPSDeleteSub_ObjStore_S3() {}
class RGWPSAckSubEventOp : public RGWDefaultResponseOp {
protected:
- string sub_name;
- string event_id;
+ std::string sub_name;
+ std::string event_id;
std::unique_ptr<RGWUserPubSub> ups;
public:
return;
}
ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
- auto sub = ups->get_sub(sub_name);
+ auto sub = ups->get_sub_with_events(sub_name);
op_ret = sub->remove_event(event_id);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to ack event, ret=" << op_ret << dendl;
}
}
+// acking of an event
+// command POST /subscriptions/<sub-name>?ack&event-id=<event-id>
class RGWPSAckSubEvent_ObjStore_S3 : public RGWPSAckSubEventOp {
public:
explicit RGWPSAckSubEvent_ObjStore_S3() {}
class RGWPSPullSubEventsOp : public RGWOp {
protected:
int max_entries{0};
- string sub_name;
- string marker;
+ std::string sub_name;
+ std::string marker;
std::unique_ptr<RGWUserPubSub> ups;
- RGWUserPubSub::Sub::list_events_result result;
+ RGWUserPubSub::SubRef sub;
+ //RGWUserPubSub::SubWithEvents<rgw_pubsub_event>::list_events_result result;
public:
RGWPSPullSubEventsOp() {}
return;
}
ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
- auto sub = ups->get_sub(sub_name);
- op_ret = sub->list_events(marker, max_entries, &result);
- if (op_ret < 0) {
+ sub = ups->get_sub_with_events(sub_name);
+ if (!sub) {
+ op_ret = -ENOENT;
ldout(s->cct, 1) << "failed to get subscription, ret=" << op_ret << dendl;
return;
}
+ op_ret = sub->list_events(marker, max_entries);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to get subscription events, ret=" << op_ret << dendl;
+ return;
+ }
}
+// fetching events from a subscription
+// command: GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
+// dpending on whether the subscription was created via s3 compliant API or not
+// the matching events will be returned
class RGWPSPullSubEvents_ObjStore_S3 : public RGWPSPullSubEventsOp {
public:
explicit RGWPSPullSubEvents_ObjStore_S3() {}
int get_params() override {
sub_name = s->object.name;
marker = s->info.args.get("marker");
-#define DEFAULT_MAX_ENTRIES 100
- int ret = s->info.args.get_int("max-entries", &max_entries, DEFAULT_MAX_ENTRIES);
+ const int ret = s->info.args.get_int("max-entries", &max_entries,
+ RGWUserPubSub::Sub::DEFAULT_MAX_EVENTS);
if (ret < 0) {
ldout(s->cct, 1) << "failed to parse 'max-entries' param" << dendl;
return -EINVAL;
return;
}
- encode_json("result", result, s->formatter);
+ encode_json("result", *sub, s->formatter);
rgw_flush_formatter_and_reset(s, s->formatter);
}
};
class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
protected:
std::unique_ptr<RGWUserPubSub> ups;
- string topic_name;
- set<string, ltstr_nocase> events;
-
string bucket_name;
RGWBucketInfo bucket_info;
+ virtual int get_params() = 0;
+
public:
- RGWPSCreateNotifOp() {}
+ RGWPSCreateNotifOp() = default;
int verify_permission() override {
int ret = get_params();
}
return 0;
}
+
void pre_exec() override {
rgw_bucket_object_pre_exec(s);
}
RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
- virtual int get_params() = 0;
};
-// GCP style notification creation
-// Command: PUT /notification/bucket/<bucket name>?topic=<topic name>
-// "topic" has to be created beforehand
-class RGWPSCreateNotif_ObjStore_GCP : public RGWPSCreateNotifOp {
-public:
- explicit RGWPSCreateNotif_ObjStore_GCP() {}
-
- const char* name() const override { return "pubsub_notification_create_gcp"; }
-
- void execute() override;
+// ceph specific notification creation
+// command: PUT /notification/bucket/<bucket name>?topic=<topic name>
+// ("topic" has to be created beforehand)
+class RGWPSCreateNotif_ObjStore_Ceph : public RGWPSCreateNotifOp {
+private:
+ string topic_name;
+ std::set<string, ltstr_nocase> events;
int get_params() override {
bool exists;
}
return notif_bucket_path(s->object.name, &bucket_name);
}
+
+public:
+ RGWPSCreateNotif_ObjStore_Ceph() = default;
+
+ const char* name() const override { return "pubsub_notification_create_gcp"; }
+
+ void execute() override;
+
};
-void RGWPSCreateNotif_ObjStore_GCP::execute()
+void RGWPSCreateNotif_ObjStore_Ceph::execute()
{
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
-
ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
auto b = ups->get_bucket(bucket_info.bucket);
}
}
-// S3 compliant notification creation
-// Command: PUT /<bucket name>?notification
-// a "topic" will be auto-generated
+// s3 compliant notification creation
+// command: PUT /<bucket name>?notification
+// a "topic", a "notification" and a subscription will be auto-generated
+// actual configuration is XML encoded in the body of the message, with following schema example:
+// <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+// <TopicConfiguration>
+// <Filter>
+// <S3Key>
+// <FilterRule>
+// <Name>suffix</Name>
+// <Value>jpg</Value>
+// </FilterRule>
+// </S3Key>
+// </Filter>
+// <Id>notification1</Id>
+// <Topic>arn:aws:sns:::<endpoint-type>:<endpoint-name>:topic1</Topic>
+// <Event>s3:ObjectCreated:*</Event>
+// <Event>s3:ObjectRemoved:*</Event>
+// </TopicConfiguration>
+// </NotificationConfiguration>
class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
-protected:
- std::string topic_name;
-public:
- explicit RGWPSCreateNotif_ObjStore_S3() {}
-
- const char* name() const override { return "pubsub_notification_create_s3"; }
-
- void execute() override;
-
- int get_params() override {
- bool exists;
- auto no_value = s->info.args.get("notification", &exists);
- if (!exists) {
- ldout(s->cct, 20) << "param 'notification' not provided" << dendl;
- return -EINVAL;
- } else if (no_value.length() > 0) {
- ldout(s->cct, 1) << "param 'notification' should not be set with value" << dendl;
- return -EINVAL;
+ struct TopicConfiguration {
+ std::string id;
+ std::string endpoint_type;
+ std::string endpoint_id;
+ std::string topic;
+ std::list<std::string> events;
+
+ bool decode_xml(XMLObj *obj) {
+ const auto throw_if_missing = true;
+ RGWXMLDecoder::decode_xml("Id", id, obj, throw_if_missing);
+
+ std::string str_arn;
+ RGWXMLDecoder::decode_xml("Topic", str_arn, obj, throw_if_missing);
+
+ // parse ARN. allow wildcards
+ const auto arn = rgw::ARN::parse(str_arn, true);
+ if (arn == boost::none || arn->resource.empty()) {
+ throw RGWXMLDecoder::err("topic ARN parsing failed. ARN = '" + str_arn + "'");
+ }
+
+ // partition and service are expected to be "aws" and "sns"
+ // but there is no need to validate ARN them
+
+ const auto arn_resource = rgw::ARNResource::parse(arn->resource);
+ if (arn_resource == boost::none || arn_resource->resource.empty()) {
+ throw RGWXMLDecoder::err("topic ARN resource parsing failed. ARNResource = '" + arn->resource + "'");
+ }
+
+ if (!arn_resource->resource_type.empty()) {
+ // endpoint exists
+ endpoint_type = arn_resource->resource_type;
+ endpoint_id = arn_resource->resource;
+ if (arn_resource->qualifier.empty()) {
+ throw RGWXMLDecoder::err("topic ARN resource parsing failed. missing qualifier for endpoint");
+ }
+ topic = arn_resource->qualifier;
+ } else {
+ // only topic
+ topic = arn_resource->resource;
+ }
+
+ do_decode_xml_obj(events, "Event", obj);
+ if (events.empty()) {
+ // if no events are provided, we assume all events
+ events.push_back("s3:ObjectCreated:*");
+ events.push_back("s3:ObjectRemoved:*");
+ }
+ return true;
+ }
+ };
+
+ struct NotificationConfiguration {
+ std::list<TopicConfiguration> list;
+ bool decode_xml(XMLObj *obj) {
+ do_decode_xml_obj(list, "TopicConfiguration", obj);
+ if (list.empty()) {
+ throw RGWXMLDecoder::err("at least one 'TopicConfiguration' must exist");
+ }
+ return true;
}
+ } configurations;
- if (s->bucket_name.empty()) {
- ldout(s->cct, 1) << "notification must be set on a bucket" << dendl;
- return -EINVAL;
+ static std::string s3_to_gcp_event(const std::string& event) {
+ if (event == "s3:ObjectCreated:*") {
+ return "OBJECT_CREATE";
}
-
- bucket_name = s->bucket_name;
+ if (event == "s3:ObjectRemoved:*") {
+ return "OBJECT_DELETE";
+ }
+ return "";
+ }
+ int get_params_from_body() {
const auto max_size = s->cct->_conf->rgw_max_put_param_size;
-
- int r = 0;
+ int r;
bufferlist data;
- std::tie(r, data) = rgw_rest_read_all_input(s, max_size);
+ std::tie(r, data) = rgw_rest_read_all_input(s, max_size, false);
if (r < 0) {
ldout(s->cct, 1) << "failed to read notification parameters from payload" << dendl;
return -EINVAL;
}
- RGWXMLParser parser;
+ RGWXMLDecoder::XMLParser parser;
if (!parser.init()){
ldout(s->cct, 1) << "failed to initialize XML parser" << dendl;
return -EINVAL;
}
-
if (!parser.parse(data.c_str(), data.length(), 1)) {
- ldout(s->cct, 1) << "failed to parse XML payload or notification" << dendl;
+ ldout(s->cct, 1) << "failed to parse XML payload of notification" << dendl;
return -ERR_MALFORMED_XML;
}
-
try {
+ // TopicConfigurations is mandatory
+ RGWXMLDecoder::decode_xml("NotificationConfiguration", configurations, &parser, true);
} catch (RGWXMLDecoder::err& err) {
- ldout(s->cct, 5) << "Malformed tagging request: " << err << dendl;
- return -ERR_MALFORMED_XML;
+ ldout(s->cct, 1) << "failed to parse XML payload of notification. error: " << err << dendl;
+ return -ERR_MALFORMED_XML;
+ }
+ return 0;
+ }
+
+ int get_params() override {
+ bool exists;
+ const auto no_value = s->info.args.get("notification", &exists);
+ if (!exists) {
+ ldout(s->cct, 20) << "param 'notification' not provided" << dendl;
+ return -EINVAL;
+ }
+ if (no_value.length() > 0) {
+ ldout(s->cct, 1) << "param 'notification' should not be set with value" << dendl;
+ return -EINVAL;
}
+ if (s->bucket_name.empty()) {
+ ldout(s->cct, 1) << "notification must be set on a bucket" << dendl;
+ return -EINVAL;
+ }
+ bucket_name = s->bucket_name;
return 0;
}
+
+public:
+ RGWPSCreateNotif_ObjStore_S3() = default;
+
+ const char* name() const override { return "pubsub_notification_create_s3"; }
+
+ void execute() override;
};
void RGWPSCreateNotif_ObjStore_S3::execute()
{
- op_ret = get_params();
+ op_ret = get_params_from_body();
if (op_ret < 0) {
return;
}
ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
-
- op_ret = ups->create_topic(topic_name);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to auto-generate topic for notification, ret=" << op_ret << dendl;
- return;
- }
auto b = ups->get_bucket(bucket_info.bucket);
- op_ret = b->create_notification(topic_name, events);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to create notification, ret=" << op_ret << dendl;
- return;
+ const auto psmodule = static_cast<RGWPSSyncModuleInstance*>(store->get_sync_module().get());
+ const auto& conf = psmodule->get_effective_conf();
+
+ for (const auto& c : configurations.list) {
+ const auto& topic_name = c.topic;
+ const auto& sub_name = c.id;
+ if (topic_name.empty()) {
+ ldout(s->cct, 1) << "missing topic information" << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+ if (sub_name.empty()) {
+ ldout(s->cct, 1) << "missing subscription information" << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+ // get endpoint configuration according to type
+ // no endpoint (pull mode): arn:s3:sns:::<topic>
+ // TODO: HTTP/S endpoint: arn:s3:sns:::webhook:<endpoint-name>:<topic>
+ // TODO: AMQP endpoint: arn:s3:sns:::amqp:<endpoint-name>:<topic>
+ if (!c.endpoint_type.empty()) {
+ ldout(s->cct, 1) << "endpoint type '" << c.endpoint_type <<
+ "' not supported" << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+ // generate the topic
+ op_ret = ups->create_topic(topic_name);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to auto-generate topic '" << topic_name <<
+ "' for notification, ret=" << op_ret << dendl;
+ return;
+ }
+ // generate the notification
+ std::set<std::string, ltstr_nocase> events;
+ std::transform(c.events.begin(), c.events.end(), std::inserter(events, events.begin()), s3_to_gcp_event);
+ ceph_assert(b);
+ op_ret = b->create_notification(topic_name, events);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to auto-generate notification on topic '" << topic_name <<
+ "', ret=" << op_ret << dendl;
+ // rollback generated topic
+ ups->remove_topic(topic_name);
+ return;
+ }
+
+ rgw_pubsub_sub_dest dest;
+ dest.bucket_name = string(conf["data_bucket_prefix"]) + s->owner.get_id().to_str() + "-" + topic_name;
+ dest.oid_prefix = string(conf["data_oid_prefix"]) + sub_name + "/";
+ auto sub = ups->get_sub(sub_name);
+ op_ret = sub->subscribe(topic_name, dest, c.id);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to auto-generate subscription '" << sub_name << "', ret=" << op_ret << dendl;
+ // rollback generated topic
+ ups->remove_topic(topic_name);
+ return;
+ }
}
}
}
}
-class RGWPSDeleteNotif_ObjStore_GCP : public RGWPSDeleteNotifOp {
+class RGWPSDeleteNotif_ObjStore_Ceph : public RGWPSDeleteNotifOp {
public:
- explicit RGWPSDeleteNotif_ObjStore_GCP() {}
+ explicit RGWPSDeleteNotif_ObjStore_Ceph() {}
int get_params() override {
bool exists;
class RGWPSListNotifsOp : public RGWOp {
protected:
- string bucket_name;
+ std::string bucket_name;
RGWBucketInfo bucket_info;
std::unique_ptr<RGWUserPubSub> ups;
rgw_pubsub_bucket_topics result;
}
-class RGWPSListNotifs_ObjStore_GCP : public RGWPSListNotifsOp {
+class RGWPSListNotifs_ObjStore_Ceph : public RGWPSListNotifsOp {
public:
- explicit RGWPSListNotifs_ObjStore_GCP() {}
+ explicit RGWPSListNotifs_ObjStore_Ceph() {}
int get_params() override {
return notif_bucket_path(s->object.name, &bucket_name);
}
};
-
-// GCP-style notification handler factory
-class RGWHandler_REST_PSNotifs_GCP : public RGWHandler_REST_S3 {
+// ceph specific notification handler factory
+class RGWHandler_REST_PSNotifs_Ceph : public RGWHandler_REST_S3 {
protected:
int init_permissions(RGWOp* op) override {
return 0;
if (s->object.empty()) {
return nullptr;
}
- return new RGWPSListNotifs_ObjStore_GCP();
+ return new RGWPSListNotifs_ObjStore_Ceph();
}
RGWOp *op_put() override {
if (!s->object.empty()) {
- return new RGWPSCreateNotif_ObjStore_GCP();
+ return new RGWPSCreateNotif_ObjStore_Ceph();
}
return nullptr;
}
RGWOp *op_delete() override {
if (!s->object.empty()) {
- return new RGWPSDeleteNotif_ObjStore_GCP();
+ return new RGWPSDeleteNotif_ObjStore_Ceph();
}
return nullptr;
}
public:
- explicit RGWHandler_REST_PSNotifs_GCP(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
- virtual ~RGWHandler_REST_PSNotifs_GCP() {}
+ explicit RGWHandler_REST_PSNotifs_Ceph(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
+ virtual ~RGWHandler_REST_PSNotifs_Ceph() {}
};
-// S3-compliant notification handler factory
+// s3 compliant notification handler factory
class RGWHandler_REST_PSNotifs_S3 : public RGWHandler_REST_S3 {
protected:
int init_permissions(RGWOp* op) override {
RGWHandler_REST *handler = nullptr;
- // GCP-style PubSub API: topics/subscriptions/notification are reserved bucket names
+ // ceph specific PubSub API: topics/subscriptions/notification are reserved bucket names
if (s->init_state.url_bucket == "topics") {
handler = new RGWHandler_REST_PSTopic_S3(auth_registry);
} else if (s->init_state.url_bucket == "subscriptions") {
handler = new RGWHandler_REST_PSSub_S3(auth_registry);
} else if (s->init_state.url_bucket == "notifications") {
- handler = new RGWHandler_REST_PSNotifs_GCP(auth_registry);
+ handler = new RGWHandler_REST_PSNotifs_Ceph(auth_registry);
} else {
- // S3-compliant PubSub API: uses: <bucket name>?notification
+ // s3 compliant PubSub API: uses: <bucket name>?notification
handler = new RGWHandler_REST_PSNotifs_S3(auth_registry);
}
target_link_libraries(unittest_rgw_xml rgw_a ${EXPAT_LIBRARIES})
+# unittest_rgw_arn
+add_executable(unittest_rgw_arn test_rgw_arn.cc)
+add_ceph_unittest(unittest_rgw_arn)
+
+target_link_libraries(unittest_rgw_arn rgw_a)
+
from rgw_multi.zone_ps import PSTopic, PSNotification, PSSubscription
from nose import SkipTest
from nose.tools import assert_not_equal, assert_equal
+import boto3
# configure logging for the tests module
log = logging.getLogger('rgw_multi.tests')
realm = get_realm()
zonegroup = realm.master_zonegroup()
- es_zones = zonegroup.zones_by_type.get("pubsub")
- if not es_zones:
+ ps_zones = zonegroup.zones_by_type.get("pubsub")
+ if not ps_zones:
raise SkipTest("Requires at least one PS zone")
assert False, err
+def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=False):
+ """ verify there is at least one record per element """
+ err = ''
+ for key in keys:
+ key_found = False
+ for record in records:
+ if record['s3']['bucket']['name'] == key.bucket.name and \
+ record['s3']['object']['key'] == key.name:
+ if deletions and record['eventName'] == 'ObjectRemoved':
+ key_found = True
+ break
+ elif not deletions and record['eventName'] == 'ObjectCreated':
+ key_found = True
+ break
+ if not key_found:
+ err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
+ log.error(records)
+ assert False, err
+
+ if not len(records) == len(keys):
+ err = 'superfluous records are found'
+ log.debug(err)
+ if exact_match:
+ log.error(records)
+ assert False, err
+
+
def init_env():
"""initialize the environment"""
check_ps_configured()
TOPIC_SUFFIX = "_topic"
SUB_SUFFIX = "_sub"
+NOTIFICATION_SUFFIX = "_notif"
##############
# pubsub tests
##############
+def test_ps_s3_notification():
+ zones, ps_zones = init_env()
+ bucket_name = gen_bucket_name()
+ # create bucket on the first of the rados zones
+ bucket = zones[0].create_bucket(bucket_name)
+ # wait for sync
+ zone_meta_checkpoint(ps_zones[0].zone)
+ # create boto3 client
+ client = boto3.client('s3',
+ endpoint_url='http://'+ps_zones[0].conn.host+':'+str(ps_zones[0].conn.port),
+ aws_access_key_id=ps_zones[0].conn.aws_access_key_id,
+ aws_secret_access_key=ps_zones[0].conn.aws_secret_access_key)
+ # create s3 notification
+ topic_name = bucket_name + TOPIC_SUFFIX
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_arn = 'arn:aws:sns:::' + topic_name
+ response = client.put_bucket_notification_configuration(Bucket=bucket_name,
+ NotificationConfiguration={
+ 'TopicConfigurations': [
+ {
+ 'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:*'],
+ }
+ ]
+ })
+
+ status = response['ResponseMetadata']['HTTPStatusCode']
+ assert_equal(status/100, 2)
+ zone_meta_checkpoint(ps_zones[0].zone)
+ # get auto-generated topic
+ topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+ result, _ = topic_conf.get_config()
+ parsed_result = json.loads(result)
+ assert_equal(parsed_result['topic']['name'], topic_name)
+ # get auto-generated notification
+ notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+ topic_name)
+ result, _ = notification_conf.get_config()
+ parsed_result = json.loads(result)
+ assert_equal(len(parsed_result['topics']), 1)
+ # get auto-generated subscription
+ sub_conf = PSSubscription(ps_zones[0].conn, notification_name,
+ topic_name)
+ result, _ = sub_conf.get_config()
+ parsed_result = json.loads(result)
+ assert_equal(parsed_result['topic'], topic_name)
+ # create objects in the bucket
+ number_of_objects = 10
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ key.set_contents_from_string('bar')
+ # wait for sync
+ zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+
+ # get the events from the subscription
+ result, _ = sub_conf.get_events()
+ parsed_result = json.loads(result)
+ for record in parsed_result['Records']:
+ log.debug(record)
+ keys = list(bucket.list())
+ # TODO: set exact_match to true
+ verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False)
+
+ # cleanup
+ sub_conf.del_config()
+ notification_conf.del_config()
+ topic_conf.del_config()
+ # delete the keys
+ for key in bucket.list():
+ key.delete()
+ zones[0].delete_bucket(bucket_name)
+
+
def test_ps_topic():
""" test set/get/delete of topic """
_, ps_zones = init_env()
assert topic_name.strip()
self.resource = '/topics/'+topic_name
- def send_request(self, method):
+ def send_request(self, method, get_list=False):
"""send request to radosgw"""
+ if get_list:
+ return make_request(self.conn, method, '/topics')
return make_request(self.conn, method, self.resource)
def get_config(self):
def del_config(self):
"""delete topic"""
return self.send_request('DELETE')
+
+ def get_list(self):
+ """list all topics"""
+ return self.send_request('GET', get_list=True)
class PSNotification:
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rgw/rgw_arn.h"
+#include <gtest/gtest.h>
+
+using namespace rgw;
+
+const int BASIC_ENTRIES = 6;
+
+const std::string basic_str[BASIC_ENTRIES] = {"arn:aws:s3:us-east-1:12345:resource",
+ "arn:aws:s3:us-east-1:12345:resourceType/resource",
+ "arn:aws:s3:us-east-1:12345:resourceType/resource/qualifier",
+ "arn:aws:s3:us-east-1:12345:resourceType/resource:qualifier",
+ "arn:aws:s3:us-east-1:12345:resourceType:resource",
+ "arn:aws:s3:us-east-1:12345:resourceType:resource/qualifier"};
+
+const std::string expected_basic_resource[BASIC_ENTRIES] = {"resource",
+ "resourceType/resource",
+ "resourceType/resource/qualifier",
+ "resourceType/resource:qualifier",
+ "resourceType:resource",
+ "resourceType:resource/qualifier"};
+TEST(TestARN, Basic)
+{
+ for (auto i = 0; i < BASIC_ENTRIES; ++i) {
+ boost::optional<ARN> arn = ARN::parse(basic_str[i]);
+ ASSERT_TRUE(arn);
+ EXPECT_EQ(arn->partition, Partition::aws);
+ EXPECT_EQ(arn->service, Service::s3);
+ EXPECT_STREQ(arn->region.c_str(), "us-east-1");
+ EXPECT_STREQ(arn->account.c_str(), "12345");
+ EXPECT_STREQ(arn->resource.c_str(), expected_basic_resource[i].c_str());
+ }
+}
+
+const std::string expected_basic_resource_type[BASIC_ENTRIES] =
+ {"", "resourceType", "resourceType", "resourceType", "resourceType", "resourceType"};
+const std::string expected_basic_qualifier[BASIC_ENTRIES] =
+ {"", "", "qualifier", "qualifier", "", "qualifier"};
+
+TEST(TestARNResource, Basic)
+{
+ for (auto i = 0; i < BASIC_ENTRIES; ++i) {
+ boost::optional<ARN> arn = ARN::parse(basic_str[i]);
+ ASSERT_TRUE(arn);
+ ASSERT_FALSE(arn->resource.empty());
+ boost::optional<ARNResource> resource = ARNResource::parse(arn->resource);
+ ASSERT_TRUE(resource);
+ EXPECT_STREQ(resource->resource.c_str(), "resource");
+ EXPECT_STREQ(resource->resource_type.c_str(), expected_basic_resource_type[i].c_str());
+ EXPECT_STREQ(resource->qualifier.c_str(), expected_basic_qualifier[i].c_str());
+ }
+}
+
+const int EMPTY_ENTRIES = 4;
+
+const std::string empty_str[EMPTY_ENTRIES] = {"arn:aws:s3:::resource",
+ "arn:aws:s3::12345:resource",
+ "arn:aws:s3:us-east-1::resource",
+ "arn:aws:s3:us-east-1:12345:"};
+
+TEST(TestARN, Empty)
+{
+ for (auto i = 0; i < EMPTY_ENTRIES; ++i) {
+ boost::optional<ARN> arn = ARN::parse(empty_str[i]);
+ ASSERT_TRUE(arn);
+ EXPECT_EQ(arn->partition, Partition::aws);
+ EXPECT_EQ(arn->service, Service::s3);
+ EXPECT_TRUE(arn->region.empty() || arn->region == "us-east-1");
+ EXPECT_TRUE(arn->account.empty() || arn->account == "12345");
+ EXPECT_TRUE(arn->resource.empty() || arn->resource == "resource");
+ }
+}
+
+const int WILDCARD_ENTRIES = 3;
+
+const std::string wildcard_str[WILDCARD_ENTRIES] = {"arn:aws:s3:*:*:resource",
+ "arn:aws:s3:*:12345:resource",
+ "arn:aws:s3:us-east-1:*:resource"};
+
+// FIXME: currently the following: "arn:aws:s3:us-east-1:12345:*"
+// does not fail, even if "wildcard" is not set to "true"
+
+TEST(TestARN, Wildcard)
+{
+ for (auto i = 0; i < WILDCARD_ENTRIES; ++i) {
+ EXPECT_FALSE(ARN::parse(wildcard_str[i]));
+ boost::optional<ARN> arn = ARN::parse(wildcard_str[i], true);
+ ASSERT_TRUE(arn);
+ EXPECT_EQ(arn->partition, Partition::aws);
+ EXPECT_EQ(arn->service, Service::s3);
+ EXPECT_TRUE(arn->region == "*" || arn->region == "us-east-1");
+ EXPECT_TRUE(arn->account == "*" || arn->account == "12345");
+ EXPECT_TRUE(arn->resource == "*" || arn->resource == "resource");
+ }
+}
+
using rgw::auth::Identity;
using rgw::auth::Principal;
-using rgw::IAM::ARN;
+using rgw::ARN;
using rgw::IAM::Effect;
using rgw::IAM::Environment;
-using rgw::IAM::Partition;
+using rgw::Partition;
using rgw::IAM::Policy;
using rgw::IAM::s3All;
using rgw::IAM::s3Count;
using rgw::IAM::None;
using rgw::IAM::s3PutBucketAcl;
using rgw::IAM::s3PutBucketPolicy;
-using rgw::IAM::Service;
+using rgw::Service;
using rgw::IAM::TokenID;
using rgw::IAM::Version;
using rgw::IAM::Action_t;