From: Yuval Lifshitz Date: Wed, 13 Mar 2019 17:54:16 +0000 (+0200) Subject: rgw: pubsub support s3 records. refactor ARN X-Git-Tag: v15.1.0~3002^2~10 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2bd353233112ead32181b94a2af1b04e3fa9e1de;p=ceph.git rgw: pubsub support s3 records. refactor ARN Signed-off-by: Yuval Lifshitz --- diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index ccc4f74cfe4..4499a127453 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -37,6 +37,7 @@ set(librgw_common_srcs rgw_aio_throttle.cc rgw_auth.cc rgw_auth_s3.cc + rgw_arn.cc rgw_basic_types.cc rgw_bucket.cc rgw_cache.cc diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 4b457741c31..4cc7abe5193 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -8155,18 +8155,16 @@ next: RGWUserInfo& user_info = user_op.get_user_info(); RGWUserPubSub ups(store, user_info.user_id); - RGWUserPubSub::Sub::list_events_result result; - if (!max_entries_specified) { - max_entries = 100; + max_entries = RGWUserPubSub::Sub::DEFAULT_MAX_EVENTS; } auto sub = ups.get_sub(sub_name); - ret = sub->list_events(marker, max_entries, &result); + ret = sub->list_events(marker, max_entries); if (ret < 0) { cerr << "ERROR: could not list events: " << cpp_strerror(-ret) << std::endl; return -ret; } - encode_json("result", result, formatter); + encode_json("result", *sub, formatter); formatter->flush(cout); } diff --git a/src/rgw/rgw_arn.cc b/src/rgw/rgw_arn.cc new file mode 100644 index 00000000000..8dc736002a3 --- /dev/null +++ b/src/rgw/rgw_arn.cc @@ -0,0 +1,367 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "rgw_arn.h" +#include "rgw_common.h" +#include + +namespace rgw { + +namespace { +boost::optional to_partition(const smatch::value_type& p, + bool wildcards) { + if (p == "aws") { + return Partition::aws; + } else if (p == "aws-cn") { + return Partition::aws_cn; + } else if (p == "aws-us-gov") { + return Partition::aws_us_gov; + } else if (p == "*" && wildcards) { + return Partition::wildcard; + } else { + return boost::none; + } + + ceph_abort(); +} + +boost::optional to_service(const smatch::value_type& s, + bool wildcards) { + static const unordered_map services = { + { "acm", Service::acm }, + { "apigateway", Service::apigateway }, + { "appstream", Service::appstream }, + { "artifact", Service::artifact }, + { "autoscaling", Service::autoscaling }, + { "aws-marketplace", Service::aws_marketplace }, + { "aws-marketplace-management", + Service::aws_marketplace_management }, + { "aws-portal", Service::aws_portal }, + { "cloudformation", Service::cloudformation }, + { "cloudfront", Service::cloudfront }, + { "cloudhsm", Service::cloudhsm }, + { "cloudsearch", Service::cloudsearch }, + { "cloudtrail", Service::cloudtrail }, + { "cloudwatch", Service::cloudwatch }, + { "codebuild", Service::codebuild }, + { "codecommit", Service::codecommit }, + { "codedeploy", Service::codedeploy }, + { "codepipeline", Service::codepipeline }, + { "cognito-identity", Service::cognito_identity }, + { "cognito-idp", Service::cognito_idp }, + { "cognito-sync", Service::cognito_sync }, + { "config", Service::config }, + { "datapipeline", Service::datapipeline }, + { "devicefarm", Service::devicefarm }, + { "directconnect", Service::directconnect }, + { "dms", Service::dms }, + { "ds", Service::ds }, + { "dynamodb", Service::dynamodb }, + { "ec2", Service::ec2 }, + { "ecr", Service::ecr }, + { "ecs", Service::ecs }, + { "elasticache", Service::elasticache }, + { "elasticbeanstalk", Service::elasticbeanstalk }, + { "elasticfilesystem", Service::elasticfilesystem }, + { "elasticloadbalancing", Service::elasticloadbalancing }, + { "elasticmapreduce", Service::elasticmapreduce }, + { "elastictranscoder", Service::elastictranscoder }, + { "es", Service::es }, + { "events", Service::events }, + { "firehose", Service::firehose }, + { "gamelift", Service::gamelift }, + { "glacier", Service::glacier }, + { "health", Service::health }, + { "iam", Service::iam }, + { "importexport", Service::importexport }, + { "inspector", Service::inspector }, + { "iot", Service::iot }, + { "kinesis", Service::kinesis }, + { "kinesisanalytics", Service::kinesisanalytics }, + { "kms", Service::kms }, + { "lambda", Service::lambda }, + { "lightsail", Service::lightsail }, + { "logs", Service::logs }, + { "machinelearning", Service::machinelearning }, + { "mobileanalytics", Service::mobileanalytics }, + { "mobilehub", Service::mobilehub }, + { "opsworks", Service::opsworks }, + { "opsworks-cm", Service::opsworks_cm }, + { "polly", Service::polly }, + { "rds", Service::rds }, + { "redshift", Service::redshift }, + { "route53", Service::route53 }, + { "route53domains", Service::route53domains }, + { "s3", Service::s3 }, + { "sdb", Service::sdb }, + { "servicecatalog", Service::servicecatalog }, + { "ses", Service::ses }, + { "sns", Service::sns }, + { "sqs", Service::sqs }, + { "ssm", Service::ssm }, + { "states", Service::states }, + { "storagegateway", Service::storagegateway }, + { "sts", Service::sts }, + { "support", Service::support }, + { "swf", Service::swf }, + { "trustedadvisor", Service::trustedadvisor }, + { "waf", Service::waf }, + { "workmail", Service::workmail }, + { "workspaces", Service::workspaces }}; + + if (wildcards && s == "*") { + return Service::wildcard; + } + + auto i = services.find(s); + if (i == services.end()) { + return boost::none; + } else { + return i->second; + } +} +} +ARN::ARN(const rgw_obj& o) + : partition(Partition::aws), + service(Service::s3), + region(), + account(o.bucket.tenant), + resource(o.bucket.name) +{ + resource.push_back('/'); + resource.append(o.key.name); +} + +ARN::ARN(const rgw_bucket& b) + : partition(Partition::aws), + service(Service::s3), + region(), + account(b.tenant), + resource(b.name) { } + +ARN::ARN(const rgw_bucket& b, const std::string& o) + : partition(Partition::aws), + service(Service::s3), + region(), + account(b.tenant), + resource(b.name) { + resource.push_back('/'); + resource.append(o); +} + +ARN::ARN(const std::string& resource_name, const std::string& type, const std::string& tenant, bool has_path) + : partition(Partition::aws), + service(Service::iam), + region(), + account(tenant), + resource(type) { + if (! has_path) + resource.push_back('/'); + resource.append(resource_name); +} + +boost::optional ARN::parse(const std::string& s, bool wildcards) { + static const std::regex rx_wild("arn:([^:]*):([^:]*):([^:]*):([^:]*):([^:]*)", + std::regex_constants::ECMAScript | + std::regex_constants::optimize); + static const std::regex rx_no_wild( + "arn:([^:*]*):([^:*]*):([^:*]*):([^:*]*):(.*)", + std::regex_constants::ECMAScript | + std::regex_constants::optimize); + + smatch match; + + if ((s == "*") && wildcards) { + return ARN(Partition::wildcard, Service::wildcard, "*", "*", "*"); + } else if (regex_match(s, match, wildcards ? rx_wild : rx_no_wild) && + match.size() == 6) { + if (auto p = to_partition(match[1], wildcards)) { + if (auto s = to_service(match[2], wildcards)) { + return ARN(*p, *s, match[3], match[4], match[5]); + } + } + } + return boost::none; +} + +std::string ARN::to_string() const { + std::string s; + + if (partition == Partition::aws) { + s.append("aws:"); + } else if (partition == Partition::aws_cn) { + s.append("aws-cn:"); + } else if (partition == Partition::aws_us_gov) { + s.append("aws-us-gov:"); + } else { + s.append("*:"); + } + + static const std::unordered_map services = { + { Service::acm, "acm" }, + { Service::apigateway, "apigateway" }, + { Service::appstream, "appstream" }, + { Service::artifact, "artifact" }, + { Service::autoscaling, "autoscaling" }, + { Service::aws_marketplace, "aws-marketplace" }, + { Service::aws_marketplace_management, "aws-marketplace-management" }, + { Service::aws_portal, "aws-portal" }, + { Service::cloudformation, "cloudformation" }, + { Service::cloudfront, "cloudfront" }, + { Service::cloudhsm, "cloudhsm" }, + { Service::cloudsearch, "cloudsearch" }, + { Service::cloudtrail, "cloudtrail" }, + { Service::cloudwatch, "cloudwatch" }, + { Service::codebuild, "codebuild" }, + { Service::codecommit, "codecommit" }, + { Service::codedeploy, "codedeploy" }, + { Service::codepipeline, "codepipeline" }, + { Service::cognito_identity, "cognito-identity" }, + { Service::cognito_idp, "cognito-idp" }, + { Service::cognito_sync, "cognito-sync" }, + { Service::config, "config" }, + { Service::datapipeline, "datapipeline" }, + { Service::devicefarm, "devicefarm" }, + { Service::directconnect, "directconnect" }, + { Service::dms, "dms" }, + { Service::ds, "ds" }, + { Service::dynamodb, "dynamodb" }, + { Service::ec2, "ec2" }, + { Service::ecr, "ecr" }, + { Service::ecs, "ecs" }, + { Service::elasticache, "elasticache" }, + { Service::elasticbeanstalk, "elasticbeanstalk" }, + { Service::elasticfilesystem, "elasticfilesystem" }, + { Service::elasticloadbalancing, "elasticloadbalancing" }, + { Service::elasticmapreduce, "elasticmapreduce" }, + { Service::elastictranscoder, "elastictranscoder" }, + { Service::es, "es" }, + { Service::events, "events" }, + { Service::firehose, "firehose" }, + { Service::gamelift, "gamelift" }, + { Service::glacier, "glacier" }, + { Service::health, "health" }, + { Service::iam, "iam" }, + { Service::importexport, "importexport" }, + { Service::inspector, "inspector" }, + { Service::iot, "iot" }, + { Service::kinesis, "kinesis" }, + { Service::kinesisanalytics, "kinesisanalytics" }, + { Service::kms, "kms" }, + { Service::lambda, "lambda" }, + { Service::lightsail, "lightsail" }, + { Service::logs, "logs" }, + { Service::machinelearning, "machinelearning" }, + { Service::mobileanalytics, "mobileanalytics" }, + { Service::mobilehub, "mobilehub" }, + { Service::opsworks, "opsworks" }, + { Service::opsworks_cm, "opsworks-cm" }, + { Service::polly, "polly" }, + { Service::rds, "rds" }, + { Service::redshift, "redshift" }, + { Service::route53, "route53" }, + { Service::route53domains, "route53domains" }, + { Service::s3, "s3" }, + { Service::sdb, "sdb" }, + { Service::servicecatalog, "servicecatalog" }, + { Service::ses, "ses" }, + { Service::sns, "sns" }, + { Service::sqs, "sqs" }, + { Service::ssm, "ssm" }, + { Service::states, "states" }, + { Service::storagegateway, "storagegateway" }, + { Service::sts, "sts" }, + { Service::support, "support" }, + { Service::swf, "swf" }, + { Service::trustedadvisor, "trustedadvisor" }, + { Service::waf, "waf" }, + { Service::workmail, "workmail" }, + { Service::workspaces, "workspaces" }}; + + auto i = services.find(service); + if (i != services.end()) { + s.append(i->second); + } else { + s.push_back('*'); + } + s.push_back(':'); + + s.append(region); + s.push_back(':'); + + s.append(account); + s.push_back(':'); + + s.append(resource); + + return s; +} + +bool operator ==(const ARN& l, const ARN& r) { + return ((l.partition == r.partition) && + (l.service == r.service) && + (l.region == r.region) && + (l.account == r.account) && + (l.resource == r.resource)); +} +bool operator <(const ARN& l, const ARN& r) { + return ((l.partition < r.partition) || + (l.service < r.service) || + (l.region < r.region) || + (l.account < r.account) || + (l.resource < r.resource)); +} + +// The candidate is not allowed to have wildcards. The only way to +// do that sanely would be to use unification rather than matching. +bool ARN::match(const ARN& candidate) const { + if ((candidate.partition == Partition::wildcard) || + (partition != candidate.partition && partition + != Partition::wildcard)) { + return false; + } + + if ((candidate.service == Service::wildcard) || + (service != candidate.service && service != Service::wildcard)) { + return false; + } + + if (!match_policy(region, candidate.region, MATCH_POLICY_ARN)) { + return false; + } + + if (!match_policy(account, candidate.account, MATCH_POLICY_ARN)) { + return false; + } + + if (!match_policy(resource, candidate.resource, MATCH_POLICY_RESOURCE)) { + return false; + } + + return true; +} + +boost::optional ARNResource::parse(const std::string& s) { + static const std::regex rx("^([^:/]*)[:/]?([^:/]*)?[:/]?(.*)$", + std::regex_constants::ECMAScript | + std::regex_constants::optimize); + std::smatch match; + if (!regex_match(s, match, rx)) { + return boost::none; + } + if (match[2].str().empty() && match[3].str().empty()) { + // only resource exist + return rgw::ARNResource("", match[1], ""); + } + + // resource type also exist, and cannot be wildcard + if (match[1] != std::string(wildcard)) { + // resource type cannot be wildcard + return rgw::ARNResource(match[1], match[2], match[3]); + } + + return boost::none; +} + +} + diff --git a/src/rgw/rgw_arn.h b/src/rgw/rgw_arn.h new file mode 100644 index 00000000000..f1dd10cbdf0 --- /dev/null +++ b/src/rgw/rgw_arn.h @@ -0,0 +1,115 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once +#include +#include + +class rgw_obj; +class rgw_bucket; + +namespace rgw { + +enum struct Partition { + aws, aws_cn, aws_us_gov, wildcard + // If we wanted our own ARNs for principal type unique to us + // (maybe to integrate better with Swift) or for anything else we + // provide that doesn't map onto S3, we could add an 'rgw' + // partition type. +}; + +enum struct Service { + apigateway, appstream, artifact, autoscaling, aws_portal, acm, + cloudformation, cloudfront, cloudhsm, cloudsearch, cloudtrail, + cloudwatch, events, logs, codebuild, codecommit, codedeploy, + codepipeline, cognito_idp, cognito_identity, cognito_sync, + config, datapipeline, dms, devicefarm, directconnect, + ds, dynamodb, ec2, ecr, ecs, ssm, elasticbeanstalk, elasticfilesystem, + elasticloadbalancing, elasticmapreduce, elastictranscoder, elasticache, + es, gamelift, glacier, health, iam, importexport, inspector, iot, + kms, kinesisanalytics, firehose, kinesis, lambda, lightsail, + machinelearning, aws_marketplace, aws_marketplace_management, + mobileanalytics, mobilehub, opsworks, opsworks_cm, polly, + redshift, rds, route53, route53domains, sts, servicecatalog, + ses, sns, sqs, s3, swf, sdb, states, storagegateway, support, + trustedadvisor, waf, workmail, workspaces, wildcard +}; + +/* valid format: + * 'arn:partition:service:region:account-id:resource' + * The 'resource' part can be further broken down via ARNResource +*/ +struct ARN { + Partition partition; + Service service; + std::string region; + // Once we refit tenant, we should probably use that instead of a + // string. + std::string account; + std::string resource; + + ARN() + : partition(Partition::wildcard), service(Service::wildcard) {} + ARN(Partition partition, Service service, std::string region, + std::string account, std::string resource) + : partition(partition), service(service), region(std::move(region)), + account(std::move(account)), resource(std::move(resource)) {} + ARN(const rgw_obj& o); + ARN(const rgw_bucket& b); + ARN(const rgw_bucket& b, const std::string& o); + ARN(const std::string& resource_name, const std::string& type, const std::string& tenant, bool has_path=false); + + static boost::optional parse(const std::string& s, + bool wildcard = false); + std::string to_string() const; + + // `this` is the pattern + bool match(const ARN& candidate) const; +}; + +inline std::string to_string(const ARN& a) { + return a.to_string(); +} + +inline std::ostream& operator <<(std::ostream& m, const ARN& a) { + return m << to_string(a); +} + +bool operator ==(const ARN& l, const ARN& r); +bool operator <(const ARN& l, const ARN& r); + +/* valid formats (only resource part): + * 'resource' + * 'resourcetype/resource' + * 'resourcetype/resource/qualifier' + * 'resourcetype/resource:qualifier' + * 'resourcetype:resource' + * 'resourcetype:resource:qualifier' + * Note that 'resourceType' cannot be wildcard +*/ +struct ARNResource { + constexpr static const char* const wildcard = "*"; + std::string resource_type; + std::string resource; + std::string qualifier; + + ARNResource() : resource_type(""), resource(wildcard), qualifier("") {} + + ARNResource(const std::string& _resource_type, const std::string& _resource, const std::string& _qualifier) : + resource_type(std::move(_resource_type)), resource(std::move(_resource)), qualifier(std::move(_qualifier)) {} + + static boost::optional parse(const std::string& s); +}; + +} // namespace rgw + +namespace std { +template<> +struct hash<::rgw::Service> { + size_t operator()(const ::rgw::Service& s) const noexcept { + // Invoke a default-constructed hash object for int. + return hash()(static_cast(s)); + } +}; +} // namespace std + diff --git a/src/rgw/rgw_common.cc b/src/rgw/rgw_common.cc index d35a97eaf79..d44efee2186 100644 --- a/src/rgw/rgw_common.cc +++ b/src/rgw/rgw_common.cc @@ -16,6 +16,7 @@ #include "rgw_string.h" #include "rgw_rados.h" #include "rgw_http_errors.h" +#include "rgw_arn.h" #include "common/ceph_crypto.h" #include "common/armor.h" @@ -33,7 +34,7 @@ #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rgw -using rgw::IAM::ARN; +using rgw::ARN; using rgw::IAM::Effect; using rgw::IAM::op_to_perm; using rgw::IAM::Policy; @@ -1088,7 +1089,7 @@ bool verify_user_permission(const DoutPrefixProvider* dpp, struct req_state * const s, RGWAccessControlPolicy * const user_acl, const vector& user_policies, - const rgw::IAM::ARN& res, + const rgw::ARN& res, const uint64_t op) { auto usr_policy_res = eval_user_policies(user_policies, s->env, boost::none, op, res); @@ -1128,7 +1129,7 @@ bool verify_user_permission_no_policy(const DoutPrefixProvider* dpp, struct req_ bool verify_user_permission(const DoutPrefixProvider* dpp, struct req_state * const s, - const rgw::IAM::ARN& res, + const rgw::ARN& res, const uint64_t op) { return verify_user_permission(dpp, s, s->user_acl.get(), s->iam_user_policies, res, op); diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index 6dc8ef5f565..3a785e5fe8c 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -2450,12 +2450,12 @@ rgw::IAM::Effect eval_user_policies(const vector& user_policie const rgw::IAM::Environment& env, boost::optional id, const uint64_t op, - const rgw::IAM::ARN& arn); + const rgw::ARN& arn); bool verify_user_permission(const DoutPrefixProvider* dpp, struct req_state * const s, RGWAccessControlPolicy * const user_acl, const vector& user_policies, - const rgw::IAM::ARN& res, + const rgw::ARN& res, const uint64_t op); bool verify_user_permission_no_policy(const DoutPrefixProvider* dpp, struct req_state * const s, @@ -2463,7 +2463,7 @@ bool verify_user_permission_no_policy(const DoutPrefixProvider* dpp, const int perm); bool verify_user_permission(const DoutPrefixProvider* dpp, struct req_state * const s, - const rgw::IAM::ARN& res, + const rgw::ARN& res, const uint64_t op); bool verify_user_permission_no_policy(const DoutPrefixProvider* dpp, struct req_state * const s, diff --git a/src/rgw/rgw_iam_policy.cc b/src/rgw/rgw_iam_policy.cc index 05357917f07..d3e1cf05a6b 100644 --- a/src/rgw/rgw_iam_policy.cc +++ b/src/rgw/rgw_iam_policy.cc @@ -61,340 +61,7 @@ struct actpair { const uint64_t bit; }; -namespace { -boost::optional to_partition(const smatch::value_type& p, - bool wildcards) { - if (p == "aws") { - return Partition::aws; - } else if (p == "aws-cn") { - return Partition::aws_cn; - } else if (p == "aws-us-gov") { - return Partition::aws_us_gov; - } else if (p == "*" && wildcards) { - return Partition::wildcard; - } else { - return boost::none; - } - - ceph_abort(); -} - -boost::optional to_service(const smatch::value_type& s, - bool wildcards) { - static const unordered_map services = { - { "acm", Service::acm }, - { "apigateway", Service::apigateway }, - { "appstream", Service::appstream }, - { "artifact", Service::artifact }, - { "autoscaling", Service::autoscaling }, - { "aws-marketplace", Service::aws_marketplace }, - { "aws-marketplace-management", - Service::aws_marketplace_management }, - { "aws-portal", Service::aws_portal }, - { "cloudformation", Service::cloudformation }, - { "cloudfront", Service::cloudfront }, - { "cloudhsm", Service::cloudhsm }, - { "cloudsearch", Service::cloudsearch }, - { "cloudtrail", Service::cloudtrail }, - { "cloudwatch", Service::cloudwatch }, - { "codebuild", Service::codebuild }, - { "codecommit", Service::codecommit }, - { "codedeploy", Service::codedeploy }, - { "codepipeline", Service::codepipeline }, - { "cognito-identity", Service::cognito_identity }, - { "cognito-idp", Service::cognito_idp }, - { "cognito-sync", Service::cognito_sync }, - { "config", Service::config }, - { "datapipeline", Service::datapipeline }, - { "devicefarm", Service::devicefarm }, - { "directconnect", Service::directconnect }, - { "dms", Service::dms }, - { "ds", Service::ds }, - { "dynamodb", Service::dynamodb }, - { "ec2", Service::ec2 }, - { "ecr", Service::ecr }, - { "ecs", Service::ecs }, - { "elasticache", Service::elasticache }, - { "elasticbeanstalk", Service::elasticbeanstalk }, - { "elasticfilesystem", Service::elasticfilesystem }, - { "elasticloadbalancing", Service::elasticloadbalancing }, - { "elasticmapreduce", Service::elasticmapreduce }, - { "elastictranscoder", Service::elastictranscoder }, - { "es", Service::es }, - { "events", Service::events }, - { "firehose", Service::firehose }, - { "gamelift", Service::gamelift }, - { "glacier", Service::glacier }, - { "health", Service::health }, - { "iam", Service::iam }, - { "importexport", Service::importexport }, - { "inspector", Service::inspector }, - { "iot", Service::iot }, - { "kinesis", Service::kinesis }, - { "kinesisanalytics", Service::kinesisanalytics }, - { "kms", Service::kms }, - { "lambda", Service::lambda }, - { "lightsail", Service::lightsail }, - { "logs", Service::logs }, - { "machinelearning", Service::machinelearning }, - { "mobileanalytics", Service::mobileanalytics }, - { "mobilehub", Service::mobilehub }, - { "opsworks", Service::opsworks }, - { "opsworks-cm", Service::opsworks_cm }, - { "polly", Service::polly }, - { "rds", Service::rds }, - { "redshift", Service::redshift }, - { "route53", Service::route53 }, - { "route53domains", Service::route53domains }, - { "s3", Service::s3 }, - { "sdb", Service::sdb }, - { "servicecatalog", Service::servicecatalog }, - { "ses", Service::ses }, - { "sns", Service::sns }, - { "sqs", Service::sqs }, - { "ssm", Service::ssm }, - { "states", Service::states }, - { "storagegateway", Service::storagegateway }, - { "sts", Service::sts }, - { "support", Service::support }, - { "swf", Service::swf }, - { "trustedadvisor", Service::trustedadvisor }, - { "waf", Service::waf }, - { "workmail", Service::workmail }, - { "workspaces", Service::workspaces }}; - - if (wildcards && s == "*") { - return Service::wildcard; - } - - auto i = services.find(s); - if (i == services.end()) { - return boost::none; - } else { - return i->second; - } -} -} - -ARN::ARN(const rgw_obj& o) - : partition(Partition::aws), - service(Service::s3), - region(), - account(o.bucket.tenant), - resource(o.bucket.name) -{ - resource.push_back('/'); - resource.append(o.key.name); -} - -ARN::ARN(const rgw_bucket& b) - : partition(Partition::aws), - service(Service::s3), - region(), - account(b.tenant), - resource(b.name) { } - -ARN::ARN(const rgw_bucket& b, const string& o) - : partition(Partition::aws), - service(Service::s3), - region(), - account(b.tenant), - resource(b.name) { - resource.push_back('/'); - resource.append(o); -} - -ARN::ARN(const string& resource_name, const string& type, const string& tenant, bool has_path) - : partition(Partition::aws), - service(Service::iam), - region(), - account(tenant), - resource(type) { - if (! has_path) - resource.push_back('/'); - resource.append(resource_name); -} - -boost::optional ARN::parse(const string& s, bool wildcards) { - static const regex rx_wild("arn:([^:]*):([^:]*):([^:]*):([^:]*):([^:]*)", - std::regex_constants::ECMAScript | - std::regex_constants::optimize); - static const regex rx_no_wild( - "arn:([^:*]*):([^:*]*):([^:*]*):([^:*]*):(.*)", - std::regex_constants::ECMAScript | - std::regex_constants::optimize); - - smatch match; - - if ((s == "*") && wildcards) { - return ARN(Partition::wildcard, Service::wildcard, "*", "*", "*"); - } else if (regex_match(s, match, wildcards ? rx_wild : rx_no_wild) && - match.size() == 6) { - if (auto p = to_partition(match[1], wildcards)) { - if (auto s = to_service(match[2], wildcards)) { - return ARN(*p, *s, match[3], match[4], match[5]); - } - } - } - return boost::none; -} - -string ARN::to_string() const { - string s; - - if (partition == Partition::aws) { - s.append("aws:"); - } else if (partition == Partition::aws_cn) { - s.append("aws-cn:"); - } else if (partition == Partition::aws_us_gov) { - s.append("aws-us-gov:"); - } else { - s.append("*:"); - } - - static const unordered_map services = { - { Service::acm, "acm" }, - { Service::apigateway, "apigateway" }, - { Service::appstream, "appstream" }, - { Service::artifact, "artifact" }, - { Service::autoscaling, "autoscaling" }, - { Service::aws_marketplace, "aws-marketplace" }, - { Service::aws_marketplace_management, "aws-marketplace-management" }, - { Service::aws_portal, "aws-portal" }, - { Service::cloudformation, "cloudformation" }, - { Service::cloudfront, "cloudfront" }, - { Service::cloudhsm, "cloudhsm" }, - { Service::cloudsearch, "cloudsearch" }, - { Service::cloudtrail, "cloudtrail" }, - { Service::cloudwatch, "cloudwatch" }, - { Service::codebuild, "codebuild" }, - { Service::codecommit, "codecommit" }, - { Service::codedeploy, "codedeploy" }, - { Service::codepipeline, "codepipeline" }, - { Service::cognito_identity, "cognito-identity" }, - { Service::cognito_idp, "cognito-idp" }, - { Service::cognito_sync, "cognito-sync" }, - { Service::config, "config" }, - { Service::datapipeline, "datapipeline" }, - { Service::devicefarm, "devicefarm" }, - { Service::directconnect, "directconnect" }, - { Service::dms, "dms" }, - { Service::ds, "ds" }, - { Service::dynamodb, "dynamodb" }, - { Service::ec2, "ec2" }, - { Service::ecr, "ecr" }, - { Service::ecs, "ecs" }, - { Service::elasticache, "elasticache" }, - { Service::elasticbeanstalk, "elasticbeanstalk" }, - { Service::elasticfilesystem, "elasticfilesystem" }, - { Service::elasticloadbalancing, "elasticloadbalancing" }, - { Service::elasticmapreduce, "elasticmapreduce" }, - { Service::elastictranscoder, "elastictranscoder" }, - { Service::es, "es" }, - { Service::events, "events" }, - { Service::firehose, "firehose" }, - { Service::gamelift, "gamelift" }, - { Service::glacier, "glacier" }, - { Service::health, "health" }, - { Service::iam, "iam" }, - { Service::importexport, "importexport" }, - { Service::inspector, "inspector" }, - { Service::iot, "iot" }, - { Service::kinesis, "kinesis" }, - { Service::kinesisanalytics, "kinesisanalytics" }, - { Service::kms, "kms" }, - { Service::lambda, "lambda" }, - { Service::lightsail, "lightsail" }, - { Service::logs, "logs" }, - { Service::machinelearning, "machinelearning" }, - { Service::mobileanalytics, "mobileanalytics" }, - { Service::mobilehub, "mobilehub" }, - { Service::opsworks, "opsworks" }, - { Service::opsworks_cm, "opsworks-cm" }, - { Service::polly, "polly" }, - { Service::rds, "rds" }, - { Service::redshift, "redshift" }, - { Service::route53, "route53" }, - { Service::route53domains, "route53domains" }, - { Service::s3, "s3" }, - { Service::sdb, "sdb" }, - { Service::servicecatalog, "servicecatalog" }, - { Service::ses, "ses" }, - { Service::sns, "sns" }, - { Service::sqs, "sqs" }, - { Service::ssm, "ssm" }, - { Service::states, "states" }, - { Service::storagegateway, "storagegateway" }, - { Service::sts, "sts" }, - { Service::support, "support" }, - { Service::swf, "swf" }, - { Service::trustedadvisor, "trustedadvisor" }, - { Service::waf, "waf" }, - { Service::workmail, "workmail" }, - { Service::workspaces, "workspaces" }}; - - auto i = services.find(service); - if (i != services.end()) { - s.append(i->second); - } else { - s.push_back('*'); - } - s.push_back(':'); - - s.append(region); - s.push_back(':'); - - s.append(account); - s.push_back(':'); - - s.append(resource); - - return s; -} -bool operator ==(const ARN& l, const ARN& r) { - return ((l.partition == r.partition) && - (l.service == r.service) && - (l.region == r.region) && - (l.account == r.account) && - (l.resource == r.resource)); -} -bool operator <(const ARN& l, const ARN& r) { - return ((l.partition < r.partition) || - (l.service < r.service) || - (l.region < r.region) || - (l.account < r.account) || - (l.resource < r.resource)); -} - -// The candidate is not allowed to have wildcards. The only way to -// do that sanely would be to use unification rather than matching. -bool ARN::match(const ARN& candidate) const { - if ((candidate.partition == Partition::wildcard) || - (partition != candidate.partition && partition - != Partition::wildcard)) { - return false; - } - - if ((candidate.service == Service::wildcard) || - (service != candidate.service && service != Service::wildcard)) { - return false; - } - - if (!match_policy(region, candidate.region, MATCH_POLICY_ARN)) { - return false; - } - - if (!match_policy(account, candidate.account, MATCH_POLICY_ARN)) { - return false; - } - - if (!match_policy(resource, candidate.resource, MATCH_POLICY_RESOURCE)) { - return false; - } - - return true; -} static const actpair actpairs[] = {{ "s3:AbortMultipartUpload", s3AbortMultipartUpload }, diff --git a/src/rgw/rgw_iam_policy.h b/src/rgw/rgw_iam_policy.h index 0a325c5e1dd..e34aca0eea4 100644 --- a/src/rgw/rgw_iam_policy.h +++ b/src/rgw/rgw_iam_policy.h @@ -29,6 +29,7 @@ #include "rgw_basic_types.h" #include "rgw_iam_policy_keywords.h" #include "rgw_string.h" +#include "rgw_arn.h" class RGWRados; namespace rgw { @@ -36,8 +37,6 @@ namespace auth { class Identity; } } -struct rgw_obj; -struct rgw_bucket; namespace rgw { namespace IAM { @@ -206,70 +205,6 @@ inline int op_to_perm(std::uint64_t op) { using Environment = boost::container::flat_map; -enum struct Partition { - aws, aws_cn, aws_us_gov, wildcard - // If we wanted our own ARNs for principal type unique to us - // (maybe to integrate better with Swift) or for anything else we - // provide that doesn't map onto S3, we could add an 'rgw' - // partition type. -}; - -enum struct Service { - apigateway, appstream, artifact, autoscaling, aws_portal, acm, - cloudformation, cloudfront, cloudhsm, cloudsearch, cloudtrail, - cloudwatch, events, logs, codebuild, codecommit, codedeploy, - codepipeline, cognito_idp, cognito_identity, cognito_sync, - config, datapipeline, dms, devicefarm, directconnect, - ds, dynamodb, ec2, ecr, ecs, ssm, elasticbeanstalk, elasticfilesystem, - elasticloadbalancing, elasticmapreduce, elastictranscoder, elasticache, - es, gamelift, glacier, health, iam, importexport, inspector, iot, - kms, kinesisanalytics, firehose, kinesis, lambda, lightsail, - machinelearning, aws_marketplace, aws_marketplace_management, - mobileanalytics, mobilehub, opsworks, opsworks_cm, polly, - redshift, rds, route53, route53domains, sts, servicecatalog, - ses, sns, sqs, s3, swf, sdb, states, storagegateway, support, - trustedadvisor, waf, workmail, workspaces, wildcard -}; - -struct ARN { - Partition partition; - Service service; - std::string region; - // Once we refit tenant, we should probably use that instead of a - // string. - std::string account; - std::string resource; - - ARN() - : partition(Partition::wildcard), service(Service::wildcard) {} - ARN(Partition partition, Service service, std::string region, - std::string account, std::string resource) - : partition(partition), service(service), region(std::move(region)), - account(std::move(account)), resource(std::move(resource)) {} - ARN(const rgw_obj& o); - ARN(const rgw_bucket& b); - ARN(const rgw_bucket& b, const std::string& o); - ARN(const string& resource_name, const string& type, const string& tenant, bool has_path=false); - - static boost::optional parse(const std::string& s, - bool wildcard = false); - std::string to_string() const; - - // `this` is the pattern - bool match(const ARN& candidate) const; -}; - -inline std::string to_string(const ARN& a) { - return a.to_string(); -} - -inline std::ostream& operator <<(std::ostream& m, const ARN& a) { - return m << to_string(a); -} - -bool operator ==(const ARN& l, const ARN& r); -bool operator <(const ARN& l, const ARN& r); - using Address = std::bitset<128>; struct MaskedIP { bool v6; @@ -527,14 +462,4 @@ std::ostream& operator <<(ostream& m, const Policy& p); } } -namespace std { -template<> -struct hash<::rgw::IAM::Service> { - size_t operator()(const ::rgw::IAM::Service& s) const noexcept { - // Invoke a default-constructed hash object for int. - return hash()(static_cast(s)); - } -}; -} - #endif diff --git a/src/rgw/rgw_json_enc.cc b/src/rgw/rgw_json_enc.cc index e9030ad8308..58df9b199dc 100644 --- a/src/rgw/rgw_json_enc.cc +++ b/src/rgw/rgw_json_enc.cc @@ -1527,7 +1527,7 @@ void rgw::keystone::AdminTokenRequestVer2::dump(Formatter* const f) const f->open_object_section("auth"); f->open_object_section("passwordCredentials"); encode_json("username", to_string(conf.get_admin_user()), f); - encode_json("password", to_string(conf.get_admin_password()), f); + encode_json("password", ::to_string(conf.get_admin_password()), f); f->close_section(); encode_json("tenantName", to_string(conf.get_admin_tenant()), f); f->close_section(); @@ -1548,7 +1548,7 @@ void rgw::keystone::AdminTokenRequestVer3::dump(Formatter* const f) const encode_json("name", to_string(conf.get_admin_domain()), f); f->close_section(); encode_json("name", to_string(conf.get_admin_user()), f); - encode_json("password", to_string(conf.get_admin_password()), f); + encode_json("password", ::to_string(conf.get_admin_password()), f); f->close_section(); f->close_section(); f->close_section(); diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index e0d339e6946..dc04bffdb37 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -79,7 +79,7 @@ using ceph::crypto::MD5; using boost::optional; using boost::none; -using rgw::IAM::ARN; +using rgw::ARN; using rgw::IAM::Effect; using rgw::IAM::Policy; @@ -2147,8 +2147,8 @@ int RGWGetObj::init_common() int RGWListBuckets::verify_permission() { - rgw::IAM::Partition partition = rgw::IAM::Partition::aws; - rgw::IAM::Service service = rgw::IAM::Service::s3; + rgw::Partition partition = rgw::Partition::aws; + rgw::Service service = rgw::Service::s3; if (!verify_user_permission(this, s, ARN(partition, service, "", s->user->user_id.tenant, "*"), rgw::IAM::s3ListAllMyBuckets)) { return -EACCES; @@ -3329,7 +3329,7 @@ int RGWPutObj::verify_permission() cs_object.instance.empty() ? rgw::IAM::s3GetObject : rgw::IAM::s3GetObjectVersion, - rgw::IAM::ARN(obj)); usr_policy_res == Effect::Deny) + rgw::ARN(obj)); usr_policy_res == Effect::Deny) return -EACCES; else if (usr_policy_res == Effect::Allow) break; @@ -3340,7 +3340,7 @@ int RGWPutObj::verify_permission() cs_object.instance.empty() ? rgw::IAM::s3GetObject : rgw::IAM::s3GetObjectVersion, - rgw::IAM::ARN(obj)); + rgw::ARN(obj)); } if (e == Effect::Deny) { return -EACCES; diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index 5922760e212..665a2f8b657 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -1,3 +1,6 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + #include "rgw_b64.h" #include "rgw_rados.h" #include "rgw_pubsub.h" @@ -5,6 +8,57 @@ #define dout_subsys ceph_subsys_rgw +void rgw_pubsub_s3_record::dump(Formatter *f) const { + encode_json("eventVersion", eventVersion, f); + encode_json("eventSource", eventSource, f); + encode_json("awsRegion", awsRegion, f); + utime_t ut(eventTime); + encode_json("eventTime", ut, f); + if (eventName == "OBJECT_CREATE") { + encode_json("eventName", "ObjectCreated", f); + } + else if (eventName == "OBJECT_DELETE") { + encode_json("eventName", "ObjectRemoved", f); + } else { + encode_json("eventName", eventName, f); + } + { + Formatter::ObjectSection s(*f, "userIdentity"); + encode_json("principalId", userIdentity, f); + } + { + Formatter::ObjectSection s(*f, "requestParameters"); + encode_json("sourceIPAddress", sourceIPAddress, f); + } + { + Formatter::ObjectSection s(*f, "responseElements"); + encode_json("x-amz-request-id", x_amz_request_id, f); + encode_json("x-amz-id-2", x_amz_id_2, f); + } + { + Formatter::ObjectSection s(*f, "s3"); + encode_json("s3SchemaVersion", s3SchemaVersion, f); + encode_json("configurationId", configurationId, f); + { + Formatter::ObjectSection sub_s(*f, "bucket"); + encode_json("name", bucket_name, f); + { + Formatter::ObjectSection sub_sub_s(*f, "ownerIdentity"); + encode_json("principalId", bucket_ownerIdentity, f); + } + encode_json("arn", bucket_arn, f); + } + { + Formatter::ObjectSection sub_s(*f, "object"); + encode_json("key", object_key, f); + encode_json("size", object_size, f); + encode_json("etag", object_etag, f); + encode_json("versionId", object_versionId, f); + encode_json("sequencer", object_sequencer, f); + } + } + encode_json("eventId", id, f); +} void rgw_pubsub_event::dump(Formatter *f) const { @@ -62,9 +116,9 @@ void rgw_pubsub_sub_config::dump(Formatter *f) const encode_json("name", name, f); encode_json("topic", topic, f); encode_json("dest", dest, f); + encode_json("s3_id", s3_id, f); } - int RGWUserPubSub::remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker) { int ret = rgw_delete_system_obj(store, obj.pool, obj.oid, objv_tracker); @@ -79,7 +133,7 @@ int RGWUserPubSub::read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersio { int ret = read(user_meta_obj, result, objv_tracker); if (ret < 0 && ret != -ENOENT) { - ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; return ret; } return 0; @@ -89,7 +143,7 @@ int RGWUserPubSub::write_user_topics(const rgw_pubsub_user_topics& topics, RGWOb { int ret = write(user_meta_obj, topics, objv_tracker); if (ret < 0 && ret != -ENOENT) { - ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; return ret; } return 0; @@ -104,7 +158,7 @@ int RGWUserPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjV { int ret = ps->read(bucket_meta_obj, result, objv_tracker); if (ret < 0 && ret != -ENOENT) { - ldout(ps->store->ctx(), 0) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; + ldout(ps->store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; return ret; } return 0; @@ -114,7 +168,7 @@ int RGWUserPubSub::Bucket::write_topics(const rgw_pubsub_bucket_topics& topics, { int ret = ps->write(bucket_meta_obj, topics, objv_tracker); if (ret < 0) { - ldout(ps->store->ctx(), 0) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl; + ldout(ps->store->ctx(), 1) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl; return ret; } @@ -131,13 +185,13 @@ int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result) rgw_pubsub_user_topics topics; int ret = get_user_topics(&topics); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; return ret; } auto iter = topics.topics.find(name); if (iter == topics.topics.end()) { - ldout(store->ctx(), 0) << "ERROR: cannot add subscription to topic: topic not found" << dendl; + ldout(store->ctx(), 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl; return -ENOENT; } @@ -145,15 +199,14 @@ int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result) return 0; } - -int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const set& events) +int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const EventTypeList& events) { rgw_pubsub_topic_subs user_topic_info; RGWRados *store = ps->store; int ret = ps->get_topic(topic_name, &user_topic_info); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to read topic info: ret=" << ret << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to read topic info: ret=" << ret << dendl; return ret; } @@ -162,7 +215,7 @@ int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const s ret = read_topics(&bucket_topics, &objv_tracker); if (ret < 0 && ret != -ENOENT) { - ldout(store->ctx(), 0) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; return ret; } @@ -172,7 +225,7 @@ int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const s ret = write_topics(bucket_topics, &objv_tracker); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; return ret; } @@ -186,7 +239,7 @@ int RGWUserPubSub::Bucket::remove_notification(const string& topic_name) int ret = ps->get_topic(topic_name, &user_topic_info); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to read topic info: ret=" << ret << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to read topic info: ret=" << ret << dendl; return ret; } @@ -195,7 +248,7 @@ int RGWUserPubSub::Bucket::remove_notification(const string& topic_name) ret = read_topics(&bucket_topics, &objv_tracker); if (ret < 0 && ret != -ENOENT) { - ldout(store->ctx(), 0) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; return ret; } @@ -203,7 +256,7 @@ int RGWUserPubSub::Bucket::remove_notification(const string& topic_name) ret = write_topics(bucket_topics, &objv_tracker); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; return ret; } @@ -217,7 +270,7 @@ int RGWUserPubSub::create_topic(const string& name) int ret = read_user_topics(&topics, &objv_tracker); if (ret < 0 && ret != -ENOENT) { - ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; return ret; } @@ -227,7 +280,7 @@ int RGWUserPubSub::create_topic(const string& name) ret = write_user_topics(topics, &objv_tracker); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; return ret; } @@ -241,7 +294,7 @@ int RGWUserPubSub::remove_topic(const string& name) int ret = read_user_topics(&topics, &objv_tracker); if (ret < 0 && ret != -ENOENT) { - ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; return ret; } @@ -249,7 +302,7 @@ int RGWUserPubSub::remove_topic(const string& name) ret = write_user_topics(topics, &objv_tracker); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to remove topics info: ret=" << ret << dendl; return ret; } @@ -260,7 +313,7 @@ int RGWUserPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTra { int ret = ps->read(sub_meta_obj, result, objv_tracker); if (ret < 0 && ret != -ENOENT) { - ldout(ps->store->ctx(), 0) << "ERROR: failed to read subscription info: ret=" << ret << dendl; + ldout(ps->store->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl; return ret; } return 0; @@ -270,7 +323,7 @@ int RGWUserPubSub::Sub::write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjV { int ret = ps->write(sub_meta_obj, sub_conf, objv_tracker); if (ret < 0) { - ldout(ps->store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl; + ldout(ps->store->ctx(), 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl; return ret; } @@ -281,7 +334,7 @@ int RGWUserPubSub::Sub::remove_sub(RGWObjVersionTracker *objv_tracker) { int ret = ps->remove(sub_meta_obj, objv_tracker); if (ret < 0) { - ldout(ps->store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl; + ldout(ps->store->ctx(), 1) << "ERROR: failed to remove subscription info: ret=" << ret << dendl; return ret; } @@ -293,7 +346,7 @@ int RGWUserPubSub::Sub::get_conf(rgw_pubsub_sub_config *result) return read_sub(result, nullptr); } -int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest& dest) +int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest& dest, const std::string& s3_id) { RGWObjVersionTracker user_objv_tracker; rgw_pubsub_user_topics topics; @@ -301,13 +354,13 @@ int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest int ret = ps->read_user_topics(&topics, &user_objv_tracker); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; return ret; } auto iter = topics.topics.find(topic); if (iter == topics.topics.end()) { - ldout(store->ctx(), 0) << "ERROR: cannot add subscription to topic: topic not found" << dendl; + ldout(store->ctx(), 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl; return -ENOENT; } @@ -319,6 +372,7 @@ int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest sub_conf.name = sub; sub_conf.topic = topic; sub_conf.dest = dest; + sub_conf.s3_id = s3_id; t.subs.insert(sub); @@ -383,25 +437,26 @@ int RGWUserPubSub::Sub::unsubscribe(const string& _topic) return 0; } -void RGWUserPubSub::Sub::list_events_result::dump(Formatter *f) const +template +void RGWUserPubSub::SubWithEvents::list_events_result::dump(Formatter *f) const { encode_json("next_marker", next_marker, f); encode_json("is_truncated", is_truncated, f); - Formatter::ArraySection s(*f, "events"); + Formatter::ArraySection s(*f, EventType::json_type_plural); for (auto& event : events) { - encode_json("event", event, f); + encode_json(EventType::json_type_single, event, f); } } -int RGWUserPubSub::Sub::list_events(const string& marker, int max_events, - list_events_result *result) +template +int RGWUserPubSub::SubWithEvents::list_events(const string& marker, int max_events) { RGWRados *store = ps->store; rgw_pubsub_sub_config sub_conf; int ret = get_conf(&sub_conf); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to read sub config: ret=" << ret << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to read sub config: ret=" << ret << dendl; return ret; } @@ -410,11 +465,11 @@ int RGWUserPubSub::Sub::list_events(const string& marker, int max_events, RGWSysObjectCtx obj_ctx(store->svc.sysobj->init_obj_ctx()); ret = store->get_bucket_info(obj_ctx, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, nullptr); if (ret == -ENOENT) { - result->is_truncated = false; + list.is_truncated = false; return 0; } if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; return ret; } @@ -424,15 +479,15 @@ int RGWUserPubSub::Sub::list_events(const string& marker, int max_events, list_op.params.prefix = sub_conf.dest.oid_prefix; list_op.params.marker = marker; - vector objs; + std::vector objs; - ret = list_op.list_objects(max_events, &objs, nullptr, &result->is_truncated); + ret = list_op.list_objects(max_events, &objs, nullptr, &list.is_truncated); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to list bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to list bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; return ret; } - if (result->is_truncated) { - result->next_marker = list_op.get_next_marker().name; + if (list.is_truncated) { + list.next_marker = list_op.get_next_marker().name; } for (auto& obj : objs) { @@ -442,31 +497,32 @@ int RGWUserPubSub::Sub::list_events(const string& marker, int max_events, try { bl.decode_base64(bl64); } catch (buffer::error& err) { - ldout(store->ctx(), 0) << "ERROR: failed to event (not a valid base64)" << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to event (not a valid base64)" << dendl; continue; } - rgw_pubsub_event event; + EventType event; auto iter = bl.cbegin(); try { decode(event, iter); } catch (buffer::error& err) { - ldout(store->ctx(), 0) << "ERROR: failed to decode event" << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to decode event" << dendl; continue; }; - result->events.push_back(event); + list.events.push_back(event); } return 0; } -int RGWUserPubSub::Sub::remove_event(const string& event_id) +template +int RGWUserPubSub::SubWithEvents::remove_event(const string& event_id) { RGWRados *store = ps->store; rgw_pubsub_sub_config sub_conf; int ret = get_conf(&sub_conf); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to read sub config: ret=" << ret << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to read sub config: ret=" << ret << dendl; return ret; } @@ -475,7 +531,7 @@ int RGWUserPubSub::Sub::remove_event(const string& event_id) RGWSysObjectCtx sysobj_ctx(store->svc.sysobj->init_obj_ctx()); ret = store->get_bucket_info(sysobj_ctx, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, nullptr); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; return ret; } @@ -494,7 +550,18 @@ int RGWUserPubSub::Sub::remove_event(const string& event_id) ret = del_op.delete_obj(); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to remove event (obj=" << obj << "): ret=" << ret << dendl; + ldout(store->ctx(), 1) << "ERROR: failed to remove event (obj=" << obj << "): ret=" << ret << dendl; } return 0; } + +template +void RGWUserPubSub::SubWithEvents::dump(Formatter* f) const { + list.dump(f); +} + +// explicit instantiation for the only two possible types +// no need to move implementation to header +template class RGWUserPubSub::SubWithEvents; +template class RGWUserPubSub::SubWithEvents; + diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index 7c65fe28335..8c5e7074c4b 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -1,14 +1,159 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + #ifndef CEPH_RGW_PUBSUB_H #define CEPH_RGW_PUBSUB_H -#include "rgw_common.h" #include "rgw_tools.h" #include "rgw_zone.h" - #include "services/svc_sys_obj.h" +/* S3 event records structure + * based on: https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html +{ +"Records":[ + { + "eventVersion":"" + "eventSource":"", + "awsRegion":"", + "eventTime":"", + "eventName":"", + "userIdentity":{ + "principalId":"" + }, + "requestParameters":{ + "sourceIPAddress":"" + }, + "responseElements":{ + "x-amz-request-id":"", + "x-amz-id-2":"" + }, + "s3":{ + "s3SchemaVersion":"1.0", + "configurationId":"", + "bucket":{ + "name":"", + "ownerIdentity":{ + "principalId":"" + }, + "arn":"" + }, + "object":{ + "key":"", + "size": , + "eTag":"", + "versionId":"", + "sequencer": "" + } + }, + "eventId":"", + } +] +}*/ + +struct rgw_pubsub_s3_record { + constexpr static const char* const json_type_single = "Record"; + constexpr static const char* const json_type_plural = "Records"; + // 2.1 + std::string eventVersion; + // aws:s3 + std::string eventSource; + // zone? + std::string awsRegion; + // time of the request + ceph::real_time eventTime; + // type of the event + std::string eventName; + // user that sent the requet (not implemented) + std::string userIdentity; + // IP address of source of the request (not implemented) + std::string sourceIPAddress; + // request ID (not implemented) + std::string x_amz_request_id; + // radosgw that received the request + std::string x_amz_id_2; + // 1.0 + std::string s3SchemaVersion; + // ID received in the notification request + std::string configurationId; + // bucket name + std::string bucket_name; + // bucket owner (not implemented) + std::string bucket_ownerIdentity; + // bucket ARN + std::string bucket_arn; + // object key + std::string object_key; + // object size + uint64_t object_size; + // object etag + std::string object_etag; + // object version id bucket is versioned (not implemented) + std::string object_versionId; + // hexadecimal value used to determine event order for specific key + std::string object_sequencer; + // this is an rgw extension (not S3 standard) + // used to store a globally unique identifier of the event + // that could be used for acking + std::string id; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(eventVersion, bl); + encode(eventSource, bl); + encode(awsRegion, bl); + encode(eventTime, bl); + encode(eventName, bl); + encode(userIdentity, bl); + encode(sourceIPAddress, bl); + encode(x_amz_request_id, bl); + encode(x_amz_id_2, bl); + encode(s3SchemaVersion, bl); + encode(configurationId, bl); + encode(bucket_name, bl); + encode(bucket_ownerIdentity, bl); + encode(bucket_arn, bl); + encode(object_key, bl); + encode(object_size, bl); + encode(object_etag, bl); + encode(object_versionId, bl); + encode(object_sequencer, bl); + encode(id, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(eventVersion, bl); + decode(eventSource, bl); + decode(awsRegion, bl); + decode(eventTime, bl); + decode(eventName, bl); + decode(userIdentity, bl); + decode(sourceIPAddress, bl); + decode(x_amz_request_id, bl); + decode(x_amz_id_2, bl); + decode(s3SchemaVersion, bl); + decode(configurationId, bl); + decode(bucket_name, bl); + decode(bucket_ownerIdentity, bl); + decode(bucket_arn, bl); + decode(object_key, bl); + decode(object_size, bl); + decode(object_etag, bl); + decode(object_versionId, bl); + decode(object_sequencer, bl); + decode(id, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; +}; +WRITE_CLASS_ENCODER(rgw_pubsub_s3_record) struct rgw_pubsub_event { + constexpr static const char* const json_type_single = "event"; + constexpr static const char* const json_type_plural = "events"; string id; string event; string source; @@ -71,16 +216,18 @@ WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest) struct rgw_pubsub_sub_config { rgw_user user; - string name; - string topic; + std::string name; + std::string topic; rgw_pubsub_sub_dest dest; + std::string s3_id; void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 1, bl); encode(user, bl); encode(name, bl); encode(topic, bl); encode(dest, bl); + encode(s3_id, bl); ENCODE_FINISH(bl); } @@ -90,6 +237,9 @@ struct rgw_pubsub_sub_config { decode(name, bl); decode(topic, bl); decode(dest, bl); + if (struct_v >= 2) { + decode(s3_id, bl); + } DECODE_FINISH(bl); } @@ -129,7 +279,7 @@ WRITE_CLASS_ENCODER(rgw_pubsub_topic) struct rgw_pubsub_topic_subs { rgw_pubsub_topic topic; - set subs; + std::set subs; void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); @@ -149,9 +299,11 @@ struct rgw_pubsub_topic_subs { }; WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs) +typedef std::set EventTypeList; + struct rgw_pubsub_topic_filter { rgw_pubsub_topic topic; - set events; + EventTypeList events; void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); @@ -172,7 +324,7 @@ struct rgw_pubsub_topic_filter { WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter) struct rgw_pubsub_bucket_topics { - map topics; + std::map topics; void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); @@ -191,7 +343,7 @@ struct rgw_pubsub_bucket_topics { WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics) struct rgw_pubsub_user_topics { - map topics; + std::map topics; void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); @@ -243,6 +395,7 @@ class RGWUserPubSub int read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker); int write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker); + public: RGWUserPubSub(RGWRados *_store, const rgw_user& _user) : store(_store), user(_user), @@ -264,14 +417,16 @@ public: } int get_topics(rgw_pubsub_bucket_topics *result); - int create_notification(const string& topic_name, const set& events); + int create_notification(const string& topic_name, const EventTypeList& events); int remove_notification(const string& topic_name); }; + // base class for subscription class Sub { friend class RGWUserPubSub; - RGWUserPubSub *ps; - string sub; + protected: + RGWUserPubSub* const ps; + const std::string sub; rgw_raw_obj sub_meta_obj; int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker); @@ -282,20 +437,38 @@ public: ps->get_sub_meta_obj(sub, &sub_meta_obj); } - int subscribe(const string& topic_name, const rgw_pubsub_sub_dest& dest); + virtual ~Sub() = default; + + int subscribe(const string& topic_name, const rgw_pubsub_sub_dest& dest, const std::string& s3_id=""); int unsubscribe(const string& topic_name); - int get_conf(rgw_pubsub_sub_config *result); + int get_conf(rgw_pubsub_sub_config* result); + + static const int DEFAULT_MAX_EVENTS = 100; + // followint virtual methods should only be called in derived + virtual int list_events(const string& marker, int max_events) {ceph_assert(false);} + virtual int remove_event(const string& event_id) {ceph_assert(false);} + virtual void dump(Formatter* f) const {ceph_assert(false);} + }; + // subscription with templated list of events to support both S3 compliant and Ceph specific events + template + class SubWithEvents : public Sub { + private: struct list_events_result { - string next_marker; + std::string next_marker; bool is_truncated{false}; - std::vector events; - void dump(Formatter *f) const; - }; + std::vector events; + } list; + + public: + SubWithEvents(RGWUserPubSub *_ps, const string& _sub) : Sub(_ps, _sub) {} - int list_events(const string& marker, int max_events, list_events_result *result); - int remove_event(const string& event_id); + virtual ~SubWithEvents() = default; + + int list_events(const string& marker, int max_events) override; + int remove_event(const string& event_id) override; + void dump(Formatter* f) const override; }; using BucketRef = std::shared_ptr; @@ -308,6 +481,18 @@ public: SubRef get_sub(const string& sub) { return std::make_shared(this, sub); } + + SubRef get_sub_with_events(const string& sub) { + auto tmpsub = Sub(this, sub); + rgw_pubsub_sub_config conf; + if (tmpsub.get_conf(&conf) < 0) { + return nullptr; + } + if (conf.s3_id.empty()) { + return std::make_shared>(this, sub); + } + return std::make_shared>(this, sub); + } void get_user_meta_obj(rgw_raw_obj *obj) const { *obj = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, user_meta_oid()); @@ -327,6 +512,7 @@ public: int remove_topic(const string& name); }; + template int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker *objv_tracker) { diff --git a/src/rgw/rgw_pubsub_push.cc b/src/rgw/rgw_pubsub_push.cc index efc1823e748..5c4ca84db9f 100644 --- a/src/rgw/rgw_pubsub_push.cc +++ b/src/rgw/rgw_pubsub_push.cc @@ -21,10 +21,11 @@ using namespace rgw; -std::string json_format_pubsub_event(const rgw_pubsub_event& event) { +template +std::string json_format_pubsub_event(const EventType& event) { std::stringstream ss; JSONFormatter f(false); - encode_json("event", event, &f); + encode_json(EventType::json_type_single, event, &f); f.flush(ss); return ss.str(); } @@ -88,39 +89,42 @@ private: public: RGWPubSubHTTPEndpoint(const std::string& _endpoint, - const RGWHTTPArgs& args) : - endpoint(_endpoint) { - bool exists; - - str_ack_level = args.get("http-ack-level", &exists); - if (!exists || str_ack_level == "any") { - // "any" is default - ack_level = ACK_LEVEL_ANY; - } else if (str_ack_level == "non-error") { - ack_level = ACK_LEVEL_NON_ERROR; - } else { - ack_level = std::atoi(str_ack_level.c_str()); - if (ack_level < 100 || ack_level >= 600) { - throw configuration_error("HTTP: invalid http-ack-level " + str_ack_level); - } - } + const RGWHTTPArgs& args) : endpoint(_endpoint) { + bool exists; - auto str_verify_ssl = args.get("verify-ssl", &exists); - boost::algorithm::to_lower(str_verify_ssl); - // verify server certificate by default - if (!exists || str_verify_ssl == "true") { - verify_ssl = true; - } else if (str_verify_ssl == "false") { - verify_ssl = false; - } else { - throw configuration_error("HTTP: verify-ssl must be true/false, not: " + str_verify_ssl); + str_ack_level = args.get("http-ack-level", &exists); + if (!exists || str_ack_level == "any") { + // "any" is default + ack_level = ACK_LEVEL_ANY; + } else if (str_ack_level == "non-error") { + ack_level = ACK_LEVEL_NON_ERROR; + } else { + ack_level = std::atoi(str_ack_level.c_str()); + if (ack_level < 100 || ack_level >= 600) { + throw configuration_error("HTTP: invalid http-ack-level " + str_ack_level); } } + auto str_verify_ssl = args.get("verify-ssl", &exists); + boost::algorithm::to_lower(str_verify_ssl); + // verify server certificate by default + if (!exists || str_verify_ssl == "true") { + verify_ssl = true; + } else if (str_verify_ssl == "false") { + verify_ssl = false; + } else { + throw configuration_error("HTTP: verify-ssl must be true/false, not: " + str_verify_ssl); + } + } + RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override { return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl); } + RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override { + return new PostCR(json_format_pubsub_event(record), env, endpoint, ack_level, verify_ssl); + } + std::string to_str() const override { std::string str("HTTP Endpoint"); str += "\nURI: " + endpoint; @@ -133,26 +137,26 @@ public: #ifdef WITH_RADOSGW_AMQP_ENDPOINT class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint { - private: - enum ack_level_t { - ACK_LEVEL_NONE, - ACK_LEVEL_BROKER, - ACK_LEVEL_ROUTEABLE - }; - const std::string endpoint; - const std::string topic; - amqp::connection_ptr_t conn; - ack_level_t ack_level; - std::string str_ack_level; - - static std::string get_exchange(const RGWHTTPArgs& args) { - bool exists; - const auto exchange = args.get("amqp-exchange", &exists); - if (!exists) { - throw configuration_error("AMQP: missing amqp-exchange"); - } - return exchange; +private: + enum ack_level_t { + ACK_LEVEL_NONE, + ACK_LEVEL_BROKER, + ACK_LEVEL_ROUTEABLE + }; + const std::string endpoint; + const std::string topic; + amqp::connection_ptr_t conn; + ack_level_t ack_level; + std::string str_ack_level; + + static std::string get_exchange(const RGWHTTPArgs& args) { + bool exists; + const auto exchange = args.get("amqp-exchange", &exists); + if (!exists) { + throw configuration_error("AMQP: missing amqp-exchange"); } + return exchange; + } // NoAckPublishCR implements async amqp publishing via coroutine // This coroutine ends when it send the message and does not wait for an ack @@ -247,45 +251,55 @@ class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint { } }; - public: - RGWPubSubAMQPEndpoint(const std::string& _endpoint, - const std::string& _topic, - const RGWHTTPArgs& args) : - endpoint(_endpoint), - topic(_topic), - conn(amqp::connect(endpoint, get_exchange(args))) { - bool exists; - // get ack level - str_ack_level = args.get("amqp-ack-level", &exists); - if (!exists || str_ack_level == "broker") { - // "broker" is default - ack_level = ACK_LEVEL_BROKER; - } else if (str_ack_level == "none") { - ack_level = ACK_LEVEL_NONE; - } else if (str_ack_level == "routable") { - ack_level = ACK_LEVEL_ROUTEABLE; - } else { - throw configuration_error("HTTP: invalid amqp-ack-level " + str_ack_level); - } +public: + RGWPubSubAMQPEndpoint(const std::string& _endpoint, + const std::string& _topic, + const RGWHTTPArgs& args) : + endpoint(_endpoint), + topic(_topic), + conn(amqp::connect(endpoint, get_exchange(args))) { + bool exists; + // get ack level + str_ack_level = args.get("amqp-ack-level", &exists); + if (!exists || str_ack_level == "broker") { + // "broker" is default + ack_level = ACK_LEVEL_BROKER; + } else if (str_ack_level == "none") { + ack_level = ACK_LEVEL_NONE; + } else if (str_ack_level == "routable") { + ack_level = ACK_LEVEL_ROUTEABLE; + } else { + throw configuration_error("HTTP: invalid amqp-ack-level " + str_ack_level); } + } - RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override { - if (ack_level == ACK_LEVEL_NONE) { - return new NoAckPublishCR(env, topic, conn, json_format_pubsub_event(event)); - } else { - // TODO: currently broker and routable are the same - this will require different flags - // but the same mechanism - return new AckPublishCR(env, topic, conn, json_format_pubsub_event(event), ack_level); - } + RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override { + if (ack_level == ACK_LEVEL_NONE) { + return new NoAckPublishCR(env, topic, conn, json_format_pubsub_event(event)); + } else { + // TODO: currently broker and routable are the same - this will require different flags + // but the same mechanism + return new AckPublishCR(env, topic, conn, json_format_pubsub_event(event), ack_level); } - - std::string to_str() const override { - std::string str("AMQP(0.9.1) Endpoint"); - str += "\nURI: " + endpoint; - str += "\nTopic: " + topic; - str += "\nAck Level: " + str_ack_level; - return str; + } + + RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override { + if (ack_level == ACK_LEVEL_NONE) { + return new NoAckPublishCR(env, topic, conn, json_format_pubsub_event(record)); + } else { + // TODO: currently broker and routable are the same - this will require different flags + // but the same mechanism + return new AckPublishCR(env, topic, conn, json_format_pubsub_event(record), ack_level); } + } + + std::string to_str() const override { + std::string str("AMQP(0.9.1) Endpoint"); + str += "\nURI: " + endpoint; + str += "\nTopic: " + topic; + str += "\nAck Level: " + str_ack_level; + return str; + } }; static const std::string AMQP_0_9_1("0-9-1"); diff --git a/src/rgw/rgw_pubsub_push.h b/src/rgw/rgw_pubsub_push.h index 1a6c0d85f7c..40838e78a31 100644 --- a/src/rgw/rgw_pubsub_push.h +++ b/src/rgw/rgw_pubsub_push.h @@ -14,6 +14,7 @@ class RGWDataSyncEnv; class RGWCoroutine; class RGWHTTPArgs; struct rgw_pubsub_event; +struct rgw_pubsub_s3_record; // endpoint base class all endpoint - types should derive from it class RGWPubSubEndpoint { @@ -30,10 +31,14 @@ public: // may throw a configuration_error if creation fails static Ptr create(const std::string& endpoint, const std::string& topic, const RGWHTTPArgs& args); - // this method is used in order to send notification and wait for completion + // this method is used in order to send notification (Ceph specific) and wait for completion // in async manner via a coroutine virtual RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) = 0; + // this method is used in order to send notification (S3 compliant) and wait for completion + // in async manner via a coroutine + virtual RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) = 0; + // present as string virtual std::string to_str() const { return ""; } diff --git a/src/rgw/rgw_rest_role.cc b/src/rgw/rgw_rest_role.cc index 7c36a032538..300aebbe551 100644 --- a/src/rgw/rgw_rest_role.cc +++ b/src/rgw/rgw_rest_role.cc @@ -42,7 +42,7 @@ int RGWRestRole::verify_permission() uint64_t op = get_op(); if (!verify_user_permission(this, s, - rgw::IAM::ARN(resource_name, + rgw::ARN(resource_name, "role", s->user->user_id.tenant, true), op)) { @@ -89,7 +89,7 @@ int RGWCreateRole::verify_permission() string resource_name = role_path + role_name; if (!verify_user_permission(this, s, - rgw::IAM::ARN(resource_name, + rgw::ARN(resource_name, "role", s->user->user_id.tenant, true), get_op())) { @@ -188,7 +188,7 @@ int RGWGetRole::_verify_permission(const RGWRole& role) string resource_name = role.get_path() + role.get_name(); if (!verify_user_permission(this, s, - rgw::IAM::ARN(resource_name, + rgw::ARN(resource_name, "role", s->user->user_id.tenant, true), get_op())) { @@ -274,7 +274,7 @@ int RGWListRoles::verify_permission() if (!verify_user_permission(this, s, - rgw::IAM::ARN(), + rgw::ARN(), get_op())) { return -EACCES; } diff --git a/src/rgw/rgw_rest_sts.cc b/src/rgw/rgw_rest_sts.cc index 072ee9c25c5..19c97dfc046 100644 --- a/src/rgw/rgw_rest_sts.cc +++ b/src/rgw/rgw_rest_sts.cc @@ -169,11 +169,11 @@ void RGWREST_STS::send_response() int RGWSTSGetSessionToken::verify_permission() { - rgw::IAM::Partition partition = rgw::IAM::Partition::aws; - rgw::IAM::Service service = rgw::IAM::Service::s3; + rgw::Partition partition = rgw::Partition::aws; + rgw::Service service = rgw::Service::s3; if (!verify_user_permission(this, s, - rgw::IAM::ARN(partition, service, "", s->user->user_id.tenant, ""), + rgw::ARN(partition, service, "", s->user->user_id.tenant, ""), rgw::IAM::stsGetSessionToken)) { return -EACCES; } diff --git a/src/rgw/rgw_rest_user_policy.cc b/src/rgw/rgw_rest_user_policy.cc index 31323f34038..3808933e757 100644 --- a/src/rgw/rgw_rest_user_policy.cc +++ b/src/rgw/rgw_rest_user_policy.cc @@ -49,7 +49,7 @@ int RGWRestUserPolicy::verify_permission() uint64_t op = get_op(); string user_name = s->info.args.get("UserName"); rgw_user user_id(user_name); - if (! verify_user_permission(this, s, rgw::IAM::ARN(rgw::IAM::ARN(user_id.id, + if (! verify_user_permission(this, s, rgw::ARN(rgw::ARN(user_id.id, "user", user_id.tenant)), op)) { return -EACCES; diff --git a/src/rgw/rgw_sts.cc b/src/rgw/rgw_sts.cc index a06ae29a235..6ccd3a7e447 100644 --- a/src/rgw/rgw_sts.cc +++ b/src/rgw/rgw_sts.cc @@ -142,7 +142,7 @@ void AssumedRoleUser::dump(Formatter *f) const int AssumedRoleUser::generateAssumedRoleUser(CephContext* cct, RGWRados *store, const string& roleId, - const rgw::IAM::ARN& roleArn, + const rgw::ARN& roleArn, const string& roleSessionName) { string resource = std::move(roleArn.resource); @@ -150,8 +150,8 @@ int AssumedRoleUser::generateAssumedRoleUser(CephContext* cct, resource.append("/"); resource.append(roleSessionName); - rgw::IAM::ARN assumed_role_arn(rgw::IAM::Partition::aws, - rgw::IAM::Service::sts, + rgw::ARN assumed_role_arn(rgw::Partition::aws, + rgw::Service::sts, "", roleArn.account, resource); arn = assumed_role_arn.to_string(); @@ -248,7 +248,7 @@ int AssumeRoleRequest::validate_input() const std::tuple STSService::getRoleInfo(const string& arn) { - if (auto r_arn = rgw::IAM::ARN::parse(arn); r_arn) { + if (auto r_arn = rgw::ARN::parse(arn); r_arn) { auto pos = r_arn->resource.find_last_of('/'); string roleName = r_arn->resource.substr(pos + 1); RGWRole role(cct, store, roleName, r_arn->account); @@ -296,7 +296,7 @@ AssumeRoleWithWebIdentityResponse STSService::assumeRoleWithWebIdentity(AssumeRo response.sub = req.getSub(); //Get the role info which is being assumed - boost::optional r_arn = rgw::IAM::ARN::parse(req.getRoleARN()); + boost::optional r_arn = rgw::ARN::parse(req.getRoleARN()); if (r_arn == boost::none) { response.assumeRoleResp.retCode = -EINVAL; return response; @@ -345,7 +345,7 @@ AssumeRoleResponse STSService::assumeRole(AssumeRoleRequest& req) response.packedPolicySize = 0; //Get the role info which is being assumed - boost::optional r_arn = rgw::IAM::ARN::parse(req.getRoleARN()); + boost::optional r_arn = rgw::ARN::parse(req.getRoleARN()); if (r_arn == boost::none) { response.retCode = -EINVAL; return response; diff --git a/src/rgw/rgw_sts.h b/src/rgw/rgw_sts.h index 89d66e2d15e..68187ba1996 100644 --- a/src/rgw/rgw_sts.h +++ b/src/rgw/rgw_sts.h @@ -109,7 +109,7 @@ public: int generateAssumedRoleUser( CephContext* cct, RGWRados *store, const string& roleId, - const rgw::IAM::ARN& roleArn, + const rgw::ARN& roleArn, const string& roleSessionName); const string& getARN() const { return arn; } const string& getAssumeRoleId() const { return assumeRoleId; } diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index 8e977ff4b85..4c5ae0593bf 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -1,3 +1,6 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + #include "rgw_common.h" #include "rgw_coroutine.h" #include "rgw_sync_module.h" @@ -13,6 +16,7 @@ #include "rgw_pubsub_push.h" #include "rgw_perf_counters.h" +#include #include #define dout_subsys ceph_subsys_rgw @@ -24,42 +28,6 @@ config: -{ - "tenant": , # default: - "uid": , # default: "pubsub" - "data_bucket_prefix": # default: "pubsub-" - "data_oid_prefix": # - - "events_retention_days": # default: 7 - - # non-dynamic config - "notifications": [ - { - "path": , # this can be either an explicit path: , or /, - # or a prefix if it ends with a wildcard - "topic": - }, - ... - ], - "subscriptions": [ - { - "name": , - "topic": , - "push_endpoint": , - "args:" . # any push endpoint specific args (include all args) - "data_bucket": , # override name of bucket where subscription data will be store - "data_oid_prefix": # set prefix for subscription data object ids - }, - ... - ] -} - -*/ - -/* - -config: - { "tenant": , # default: "uid": , # default: "pubsub" @@ -83,6 +51,7 @@ config: "args:" . # any push endpoint specific args (include all args) "data_bucket": , # override name of bucket where subscription data will be store "data_oid_prefix": # set prefix for subscription data object ids + "s3_id": # in case of S3 compatible notifications, the notification ID will be set here }, ... ] @@ -99,22 +68,23 @@ RGWHTTPArgs string_to_args(const std::string& str_args) { return args; } -struct PSSubConfig { /* subscription config */ - string name; - string topic; - string push_endpoint_name; - string push_endpoint_args; +struct PSSubConfig { + std::string name; + std::string topic; + std::string push_endpoint_name; + std::string push_endpoint_args; + std::string data_bucket_name; + std::string data_oid_prefix; + std::string s3_id; RGWPubSubEndpoint::Ptr push_endpoint; - string data_bucket_name; - string data_oid_prefix; - void from_user_conf(CephContext *cct, const rgw_pubsub_sub_config& uc) { name = uc.name; topic = uc.topic; push_endpoint_name = uc.dest.push_endpoint; data_bucket_name = uc.dest.bucket_name; data_oid_prefix = uc.dest.oid_prefix; + s3_id = uc.s3_id; if (push_endpoint_name != "") { push_endpoint_args = uc.dest.push_endpoint_args; try { @@ -133,6 +103,7 @@ struct PSSubConfig { /* subscription config */ encode_json("args", push_endpoint_args, f); encode_json("data_bucket_name", data_bucket_name, f); encode_json("data_oid_prefix", data_oid_prefix, f); + encode_json("s3_id", s3_id, f); } void init(CephContext *cct, const JSONFormattable& config, @@ -144,12 +115,13 @@ struct PSSubConfig { /* subscription config */ string default_bucket_name = data_bucket_prefix + name; data_bucket_name = config["data_bucket"](default_bucket_name.c_str()); data_oid_prefix = config["data_oid_prefix"](default_oid_prefix.c_str()); + s3_id = config["s3_id"]; if (!push_endpoint_name.empty()) { push_endpoint_args = config["push_endpoint_args"]; try { push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, topic, string_to_args(push_endpoint_args)); } catch (const RGWPubSubEndpoint::configuration_error& e) { - ldout(cct, 0) << "ERROR: failed to create push endpoint: " + ldout(cct, 1) << "ERROR: failed to create push endpoint: " << push_endpoint_name << " due to: " << e.what() << dendl; } } @@ -159,8 +131,8 @@ struct PSSubConfig { /* subscription config */ using PSSubConfigRef = std::shared_ptr; struct PSTopicConfig { - string name; - set subs; + std::string name; + std::set subs; void dump(Formatter *f) const { encode_json("name", name, f); @@ -205,8 +177,7 @@ static string json_str(const char *name, const T& obj, bool pretty = false) } using PSTopicConfigRef = std::shared_ptr; -using TopicsRef = std::shared_ptr>; - +using TopicsRef = std::shared_ptr>; struct PSConfig { string id{"pubsub"}; @@ -220,9 +191,9 @@ struct PSConfig { uint64_t max_id{0}; /* FIXME: no hard coded buckets, we'll have configurable topics */ - map subs; - map topics; - multimap notifications; + std::map subs; + std::map topics; + std::multimap notifications; void dump(Formatter *f) const { encode_json("id", id, f); @@ -326,7 +297,8 @@ struct PSConfig { continue; } - ldout(cct, 10) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id << " target_path=" << target.path << ", topic=" << target.topic << dendl; + ldout(cct, 20) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id << + " target_path=" << target.path << ", topic=" << target.topic << dendl; (*result)->push_back(topic->second); } while (iter != notifications.begin()); } @@ -368,7 +340,8 @@ static const char *get_event_name(const RGWPubSubEventType& val) } using PSConfigRef = std::shared_ptr; -using EventRef = std::shared_ptr; +template +using EventRef = std::shared_ptr; struct objstore_event { string id; @@ -421,30 +394,73 @@ struct objstore_event { } }; +static void set_event_id(std::string& id, const std::string& hash, const utime_t& ts) { + char buf[64]; + const auto len = snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str()); + if (len > 0) { + id.assign(buf, len); + } +} + static void make_event_ref(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, const ceph::real_time& mtime, const std::vector > *attrs, const string& event_name, - EventRef *event) { + EventRef *event) { *event = std::make_shared(); - EventRef& e = *event; + EventRef& e = *event; e->event = event_name; e->source = bucket.name + "/" + key.name; e->timestamp = real_clock::now(); objstore_event oevent(bucket, key, mtime, attrs); - string hash = oevent.get_hash(); - utime_t ts(e->timestamp); - char buf[64]; - snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str()); - e->id = buf; + const utime_t ts(e->timestamp); + set_event_id(e->id, oevent.get_hash(), ts); encode_json("info", oevent, &e->info); } +static void make_s3_record_ref(CephContext *cct, const rgw_bucket& bucket, + const rgw_obj_key& key, + const ceph::real_time& mtime, + const std::vector > *attrs, + const string& event_name, + EventRef *record) { + *record = std::make_shared(); + + EventRef& r = *record; + r->eventVersion = "2.1"; + r->eventSource = "aws:s3"; + r->eventTime = mtime; + r->eventName = event_name; + r->userIdentity = ""; // not supported yet + r->sourceIPAddress = ""; // not supported yet + r->x_amz_request_id = ""; // not supported yet + r->x_amz_id_2 = ""; // TODO: get that? + r->s3SchemaVersion = "1.0"; + // configurationId is filled from subscription configuration + r->bucket_name = bucket.name; + r->bucket_ownerIdentity = bucket.tenant; + r->bucket_arn = to_string(rgw::ARN(bucket)); + r->object_key = key.name; + r->object_size = 0; // TODO: get that? + objstore_event oevent(bucket, key, mtime, attrs); + r->object_etag = oevent.get_hash(); + r->object_versionId = ""; // not supported yet + + // use timestamp as per key sequence id (hex encoded) + const utime_t ts(real_clock::now()); + boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), + std::back_inserter(r->object_sequencer)); + + // event ID is rgw extension (not in the S3 spec), used for acking the event + // same format is used in both S3 compliant and Ceph specific events + set_event_id(r->id, r->object_etag, ts); +} + class PSManager; using PSManagerRef = std::shared_ptr; @@ -465,14 +481,15 @@ struct PSEnv { using PSEnvRef = std::shared_ptr; +template class PSEvent { - const EventRef event; + const EventRef event; public: - PSEvent(const EventRef& _event) : event(_event) {} + PSEvent(const EventRef& _event) : event(_event) {} void format(bufferlist *bl) const { - bl->append(json_str("event", *event)); + bl->append(json_str(EventType::json_type_single, *event)); } void encode_event(bufferlist& bl) const { @@ -587,16 +604,11 @@ class PSSubscription { RGWDataSyncEnv *sync_env; PSEnvRef env; PSSubConfigRef sub_conf; - shared_ptr get_bucket_info_result; + std::shared_ptr get_bucket_info_result; RGWBucketInfo *bucket_info{nullptr}; RGWDataAccessRef data_access; RGWDataAccess::BucketRef bucket; - struct push_endpoint_info { - shared_ptr conn; - string path; - } push; - InitCR *init_cr{nullptr}; class InitBucketLifecycleCR : public RGWCoroutine { @@ -612,7 +624,7 @@ class PSSubscription { InitBucketLifecycleCR(RGWDataSyncEnv *_sync_env, PSConfigRef& _conf, RGWBucketInfo& _bucket_info, - map& _bucket_attrs) : RGWCoroutine(_sync_env->cct), + std::map& _bucket_attrs) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), conf(_conf) { lc_config.bucket_info = _bucket_info; @@ -645,7 +657,7 @@ class PSSubscription { if (old_rule.get_prefix().empty() && old_rule.get_expiration().get_days() == retention_days && old_rule.is_enabled()) { - ldout(sync_env->cct, 20) << "no need to set lifecycle rule on bucketi, existing rule matches config" << dendl; + ldout(sync_env->cct, 20) << "no need to set lifecycle rule on bucket, existing rule matches config" << dendl; return set_cr_done(); } } @@ -695,7 +707,7 @@ class PSSubscription { get_bucket_info, sub->get_bucket_info_result)); if (retcode < 0 && retcode != -ENOENT) { - ldout(sync_env->cct, 0) << "ERROR: failed to geting bucket info: " << "tenant=" + ldout(sync_env->cct, 1) << "ERROR: failed to geting bucket info: " << "tenant=" << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl; } if (retcode == 0) { @@ -705,7 +717,7 @@ class PSSubscription { int ret = sub->data_access->get_bucket(result->bucket_info, result->attrs, &sub->bucket); if (ret < 0) { - ldout(sync_env->cct, 0) << "ERROR: data_access.get_bucket() bucket=" << result->bucket_info.bucket << " failed, ret=" << ret << dendl; + ldout(sync_env->cct, 1) << "ERROR: data_access.get_bucket() bucket=" << result->bucket_info.bucket << " failed, ret=" << ret << dendl; return set_cr_error(ret); } } @@ -714,7 +726,7 @@ class PSSubscription { sub->get_bucket_info_result->bucket_info, sub->get_bucket_info_result->attrs)); if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf->data_bucket_name << ") ret=" << retcode << dendl; + ldout(sync_env->cct, 1) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf->data_bucket_name << ") ret=" << retcode << dendl; return set_cr_error(retcode); } @@ -728,7 +740,7 @@ class PSSubscription { sync_env->store, create_bucket)); if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to create bucket: " << "tenant=" + ldout(sync_env->cct, 1) << "ERROR: failed to create bucket: " << "tenant=" << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl; return set_cr_error(retcode); } @@ -737,7 +749,7 @@ class PSSubscription { } /* failed twice on -ENOENT, unexpected */ - ldout(sync_env->cct, 0) << "ERROR: failed to create bucket " << "tenant=" << get_bucket_info.tenant + ldout(sync_env->cct, 1) << "ERROR: failed to create bucket " << "tenant=" << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << dendl; return set_cr_error(-EIO); } @@ -745,16 +757,17 @@ class PSSubscription { } }; + template class StoreEventCR : public RGWCoroutine { RGWDataSyncEnv* const sync_env; const PSSubscriptionRef sub; - const PSEvent pse; + const PSEvent pse; const string oid_prefix; public: StoreEventCR(RGWDataSyncEnv* const _sync_env, const PSSubscriptionRef& _sub, - const EventRef& _event) : RGWCoroutine(_sync_env->cct), + const EventRef& _event) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sub(_sub), pse(_event), @@ -762,7 +775,6 @@ class PSSubscription { } int operate() override { - // TODO: in case of "push-only" subscription no need to store event rgw_object_simple_put_params put_obj; reenter(this) { @@ -783,7 +795,7 @@ class PSSubscription { sync_env->store, put_obj)); if (retcode < 0) { - ldout(sync_env->cct, 10) << "ERROR: failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl; + ldout(sync_env->cct, 1) << "ERROR: failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl; if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail); return set_cr_error(retcode); } else { @@ -797,15 +809,16 @@ class PSSubscription { } }; + template class PushEventCR : public RGWCoroutine { RGWDataSyncEnv* const sync_env; - const EventRef event; + const EventRef event; const PSSubConfigRef& sub_conf; public: PushEventCR(RGWDataSyncEnv* const _sync_env, const PSSubscriptionRef& _sub, - const EventRef& _event) : RGWCoroutine(_sync_env->cct), + const EventRef& _event) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), event(_event), sub_conf(_sub->sub_conf) { @@ -868,23 +881,24 @@ public: return init_cr->execute(caller); } - static RGWCoroutine *store_event_cr(RGWDataSyncEnv* const sync_env, const PSSubscriptionRef& sub, const EventRef& event) { - return new StoreEventCR(sync_env, sub, event); + template + static RGWCoroutine *store_event_cr(RGWDataSyncEnv* const sync_env, const PSSubscriptionRef& sub, const EventRef& event) { + return new StoreEventCR(sync_env, sub, event); } - static RGWCoroutine *push_event_cr(RGWDataSyncEnv* const sync_env, const PSSubscriptionRef& sub, const EventRef& event) { - return new PushEventCR(sync_env, sub, event); + template + static RGWCoroutine *push_event_cr(RGWDataSyncEnv* const sync_env, const PSSubscriptionRef& sub, const EventRef& event) { + return new PushEventCR(sync_env, sub, event); } friend class InitCR; }; - class PSManager { RGWDataSyncEnv *sync_env; PSEnvRef env; - map subs; + std::map subs; class GetSubCR : public RGWSingletonCR { RGWDataSyncEnv *sync_env; @@ -898,6 +912,7 @@ class PSManager PSSubConfigRef sub_conf; rgw_pubsub_sub_config user_sub_conf; + public: GetSubCR(RGWDataSyncEnv *_sync_env, PSManagerRef& _mgr, @@ -911,14 +926,13 @@ class PSManager ref(_ref), conf(mgr->env->conf) { } - ~GetSubCR() { - } + ~GetSubCR() { } int operate() override { reenter(this) { if (owner.empty()) { if (!conf->find_sub(sub_name, &sub_conf)) { - ldout(sync_env->cct, 10) << "ERROR: could not find subscription config: name=" << sub_name << dendl; + ldout(sync_env->cct, 1) << "ERROR: could not find subscription config: name=" << sub_name << dendl; mgr->remove_get_sub(owner, sub_name); return set_cr_error(-ENOENT); } @@ -945,7 +959,7 @@ class PSManager yield (*ref)->call_init_cr(this); if (retcode < 0) { - ldout(sync_env->cct, 10) << "ERROR: failed to init subscription" << dendl; + ldout(sync_env->cct, 1) << "ERROR: failed to init subscription" << dendl; mgr->remove_get_sub(owner, sub_name); return set_cr_error(retcode); } @@ -977,7 +991,7 @@ class PSManager return owner_prefix + sub_name; } - map get_subs; + std::map get_subs; GetSubCR *& get_get_subs(const rgw_user& owner, const string& name) { return get_subs[sub_id(owner, name)]; @@ -1043,7 +1057,7 @@ public: env(_env), conf(env->conf) {} int operate() override { reenter(this) { - ldout(sync_env->cct, 0) << ": init pubsub config zone=" << sync_env->source_zone << dendl; + ldout(sync_env->cct, 5) << ": init pubsub config zone=" << sync_env->source_zone << dendl; /* nothing to do here right now */ create_user.user = conf->user; @@ -1052,14 +1066,14 @@ public: create_user.generate_key = false; yield call(new RGWUserCreateCR(sync_env->async_rados, sync_env->store, create_user)); if (retcode < 0) { - ldout(sync_env->store->ctx(), 0) << "ERROR: failed to create rgw user: ret=" << retcode << dendl; + ldout(sync_env->store->ctx(), 1) << "ERROR: failed to create rgw user: ret=" << retcode << dendl; return set_cr_error(retcode); } get_user_info.user = conf->user; yield call(new RGWGetUserInfoCR(sync_env->async_rados, sync_env->store, get_user_info, env->data_user_info)); if (retcode < 0) { - ldout(sync_env->store->ctx(), 0) << "ERROR: failed to create rgw user: ret=" << retcode << dendl; + ldout(sync_env->store->ctx(), 1) << "ERROR: failed to create rgw user: ret=" << retcode << dendl; return set_cr_error(retcode); } @@ -1160,7 +1174,8 @@ class RGWPSHandleObjEventCR : public RGWCoroutine { RGWDataSyncEnv* const sync_env; const PSEnvRef env; const rgw_user& owner; - const EventRef event; + const EventRef event; + const EventRef record; const TopicsRef topics; const std::array owners; bool has_subscriptions; @@ -1168,20 +1183,22 @@ class RGWPSHandleObjEventCR : public RGWCoroutine { bool sub_conf_found; PSSubscriptionRef sub; std::array::const_iterator oiter; - vector::const_iterator titer; - set::const_iterator siter; + std::vector::const_iterator titer; + std::set::const_iterator siter; int last_sub_conf_error; public: RGWPSHandleObjEventCR(RGWDataSyncEnv* const _sync_env, const PSEnvRef _env, const rgw_user& _owner, - const EventRef& _event, + const EventRef& _event, + const EventRef& _record, const TopicsRef& _topics) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), env(_env), owner(_owner), event(_event), + record(_record), topics(_topics), owners({owner, rgw_user{}}), has_subscriptions(false), @@ -1189,25 +1206,23 @@ public: int operate() override { reenter(this) { - ldout(sync_env->cct, 10) << ": handle event: obj: z=" << sync_env->source_zone + ldout(sync_env->cct, 20) << ": handle event: obj: z=" << sync_env->source_zone << " event=" << json_str("event", *event, false) << " owner=" << owner << dendl; ldout(sync_env->cct, 20) << "pubsub: " << topics->size() << " topics found for path" << dendl; - - if (topics->empty()) { - // if event has no topics - no further processing is needed - return set_cr_done(); - } + + // outside caller should check that + ceph_assert(!topics->empty()); if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_triggered); for (titer = topics->begin(); titer != topics->end(); ++titer) { - ldout(sync_env->cct, 10) << ": notification for " << event->source << ": topic=" << + ldout(sync_env->cct, 20) << ": notification for " << event->source << ": topic=" << (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl; for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) { - ldout(sync_env->cct, 10) << ": subscription: " << *siter << dendl; + ldout(sync_env->cct, 20) << ": subscription: " << *siter << dendl; has_subscriptions = true; sub_conf_found = false; for (oiter = owners.begin(); oiter != owners.end(); ++oiter) { @@ -1220,27 +1235,48 @@ public: continue; } sub_conf_found = true; - - ldout(sync_env->cct, 20) << "storing event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; - yield call(PSSubscription::store_event_cr(sync_env, sub, event)); - if (retcode < 0) { - ldout(sync_env->cct, 10) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl; + if (sub->sub_conf->s3_id.empty()) { + // subscription was not made by S3 compatible API + ldout(sync_env->cct, 20) << "storing event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; + yield call(PSSubscription::store_event_cr(sync_env, sub, event)); + if (retcode < 0) { + ldout(sync_env->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl; + } else { + event_handled = true; + } + if (sub->sub_conf->push_endpoint) { + ldout(sync_env->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; + yield call(PSSubscription::push_event_cr(sync_env, sub, event)); + if (retcode < 0) { + ldout(sync_env->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl; + } else { + event_handled = true; + } + } } else { - event_handled = true; - } - if (sub->sub_conf->push_endpoint) { - ldout(sync_env->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; - yield call(PSSubscription::push_event_cr(sync_env, sub, event)); + // subscription was made by S3 compatible API + ldout(sync_env->cct, 20) << "storing record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; + record->configurationId = sub->sub_conf->s3_id; + yield call(PSSubscription::store_event_cr(sync_env, sub, record)); if (retcode < 0) { - ldout(sync_env->cct, 10) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl; + ldout(sync_env->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl; } else { event_handled = true; } + if (sub->sub_conf->push_endpoint) { + ldout(sync_env->cct, 20) << "push record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; + yield call(PSSubscription::push_event_cr(sync_env, sub, record)); + if (retcode < 0) { + ldout(sync_env->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl; + } else { + event_handled = true; + } + } } } if (!sub_conf_found) { // could not find conf for subscription at user or global levels - ldout(sync_env->cct, 10) << "ERROR: failed to find subscription config for subscription=" << *siter + ldout(sync_env->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter << " ret=" << last_sub_conf_error << dendl; } } @@ -1259,12 +1295,13 @@ public: } }; - +// coroutine invoked on remote object creation class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { RGWDataSyncEnv *sync_env; PSEnvRef env; std::optional versioned_epoch; - EventRef event; + EventRef event; + EventRef record; TopicsRef topics; public: RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env, @@ -1278,7 +1315,7 @@ public: } int operate() override { reenter(this) { - ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone + ldout(sync_env->cct, 20) << ": stat of remote obj: z=" << sync_env->source_zone << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime << " attrs=" << attrs << dendl; { @@ -1289,14 +1326,21 @@ public: k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1); } attrs.push_back(std::make_pair(k, attr.second)); - } + } + // at this point we don't know whether we need the ceph event or S3 record + // this is why both are created here, once we have information about the + // subscription, we will store/push only the relevant ones make_event_ref(sync_env->cct, bucket_info.bucket, key, mtime, &attrs, EVENT_NAME_OBJECT_CREATE, &event); + make_s3_record_ref(sync_env->cct, + bucket_info.bucket, key, + mtime, &attrs, + EVENT_NAME_OBJECT_CREATE, &record); } - yield call(new RGWPSHandleObjEventCR(sync_env, env, bucket_info.owner, event, topics)); + yield call(new RGWPSHandleObjEventCR(sync_env, env, bucket_info.owner, event, record, topics)); if (retcode < 0) { return set_cr_error(retcode); } @@ -1371,6 +1415,7 @@ public: } }; +// coroutine invoked on remote object deletion class RGWPSGenericObjEventCBCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; PSEnvRef env; @@ -1379,7 +1424,8 @@ class RGWPSGenericObjEventCBCR : public RGWCoroutine { rgw_obj_key key; ceph::real_time mtime; string event_name; - EventRef event; + EventRef event; + EventRef record; TopicsRef topics; public: RGWPSGenericObjEventCBCR(RGWDataSyncEnv *_sync_env, @@ -1398,21 +1444,25 @@ public: << " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl; yield call(new RGWPSFindBucketTopicsCR(sync_env, env, owner, bucket, key, event_name, &topics)); if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; + ldout(sync_env->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; return set_cr_error(retcode); } if (topics->empty()) { ldout(sync_env->cct, 20) << "no topics found for " << bucket << "/" << key << dendl; return set_cr_done(); } - { - make_event_ref(sync_env->cct, - bucket, key, - mtime, nullptr, - event_name, &event); - } - - yield call(new RGWPSHandleObjEventCR(sync_env, env, owner, event, topics)); + // at this point we don't know whether we need the ceph event or S3 record + // this is why both are created here, once we have information about the + // subscription, we will store/push only the relevant ones + make_event_ref(sync_env->cct, + bucket, key, + mtime, nullptr, + event_name, &event); + make_s3_record_ref(sync_env->cct, + bucket, key, + mtime, nullptr, + event_name, &record); + yield call(new RGWPSHandleObjEventCR(sync_env, env, owner, event, record, topics)); if (retcode < 0) { return set_cr_error(retcode); } @@ -1426,10 +1476,12 @@ public: class RGWPSDataSyncModule : public RGWDataSyncModule { PSEnvRef env; PSConfigRef& conf; + public: RGWPSDataSyncModule(CephContext *cct, const JSONFormattable& config) : env(std::make_shared()), conf(env->conf) { env->init(cct, config); } + ~RGWPSDataSyncModule() override {} void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override { @@ -1441,18 +1493,25 @@ public: ldout(sync_env->cct, 5) << conf->id << ": start" << dendl; return new RGWPSInitEnvCBCR(sync_env, env); } - RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; + + RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, + rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { + ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << + " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; return new RGWPSHandleObjCreateCR(sync_env, bucket_info, key, env, versioned_epoch); } - RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; + + RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, + rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { + ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << + " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, OBJECT_DELETE); } - RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, - rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime - << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; + + RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, + rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { + ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << + " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, DELETE_MARKER_CREATE); } @@ -1465,7 +1524,7 @@ RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFor string jconf = json_str("conf", *data_handler->get_conf()); JSONParser p; if (!p.parse(jconf.c_str(), jconf.size())) { - ldout(cct, 0) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl; + ldout(cct, 1) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl; effective_conf = config; } else { effective_conf.decode_json(&p); diff --git a/src/rgw/rgw_sync_module_pubsub_rest.cc b/src/rgw/rgw_sync_module_pubsub_rest.cc index eaa8c003e3a..b8865906361 100644 --- a/src/rgw/rgw_sync_module_pubsub_rest.cc +++ b/src/rgw/rgw_sync_module_pubsub_rest.cc @@ -1,9 +1,14 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include #include "rgw_sync_module_pubsub.h" #include "rgw_sync_module_pubsub_rest.h" #include "rgw_pubsub.h" #include "rgw_op.h" #include "rgw_rest.h" #include "rgw_rest_s3.h" +#include "rgw_arn.h" #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rgw @@ -46,6 +51,8 @@ void RGWPSCreateTopicOp::execute() } } +// create a topic +// command: PUT /topics/ class RGWPSCreateTopic_ObjStore_S3 : public RGWPSCreateTopicOp { public: explicit RGWPSCreateTopic_ObjStore_S3() {} @@ -89,6 +96,8 @@ void RGWPSListTopicsOp::execute() } +// list all topics +// command: GET /topics class RGWPSListTopics_ObjStore_S3 : public RGWPSListTopicsOp { public: explicit RGWPSListTopics_ObjStore_S3() {} @@ -146,6 +155,8 @@ void RGWPSGetTopicOp::execute() } } +// get topic information (including subscriptions) +// command: GET /topics/ class RGWPSGetTopic_ObjStore_S3 : public RGWPSGetTopicOp { public: explicit RGWPSGetTopic_ObjStore_S3() {} @@ -208,6 +219,8 @@ void RGWPSDeleteTopicOp::execute() } } +// delete a topic +// command: DELETE /topics/ class RGWPSDeleteTopic_ObjStore_S3 : public RGWPSDeleteTopicOp { public: explicit RGWPSDeleteTopic_ObjStore_S3() {} @@ -218,6 +231,7 @@ public: } }; +// topics handler factory class RGWHandler_REST_PSTopic_S3 : public RGWHandler_REST_S3 { protected: int init_permissions(RGWOp* op) override { @@ -255,7 +269,6 @@ public: virtual ~RGWHandler_REST_PSTopic_S3() {} }; - class RGWPSCreateSubOp : public RGWDefaultResponseOp { protected: string sub_name; @@ -295,6 +308,8 @@ void RGWPSCreateSubOp::execute() } } +// create a subscription +// command: PUT /subscriptions/?topic=[&push-endpoint=[&=]... class RGWPSCreateSub_ObjStore_S3 : public RGWPSCreateSubOp { public: explicit RGWPSCreateSub_ObjStore_S3() {} @@ -360,6 +375,8 @@ void RGWPSGetSubOp::execute() } } +// get subscription configuration +// command: GET /subscriptions/ class RGWPSGetSub_ObjStore_S3 : public RGWPSGetSubOp { public: explicit RGWPSGetSub_ObjStore_S3() {} @@ -428,6 +445,8 @@ void RGWPSDeleteSubOp::execute() } } +// delete subscription +// Command: DELETE /subscriptions/ class RGWPSDeleteSub_ObjStore_S3 : public RGWPSDeleteSubOp { public: explicit RGWPSDeleteSub_ObjStore_S3() {} @@ -441,8 +460,8 @@ public: class RGWPSAckSubEventOp : public RGWDefaultResponseOp { protected: - string sub_name; - string event_id; + std::string sub_name; + std::string event_id; std::unique_ptr ups; public: @@ -469,7 +488,7 @@ void RGWPSAckSubEventOp::execute() return; } ups = make_unique(store, s->owner.get_id()); - auto sub = ups->get_sub(sub_name); + auto sub = ups->get_sub_with_events(sub_name); op_ret = sub->remove_event(event_id); if (op_ret < 0) { ldout(s->cct, 1) << "failed to ack event, ret=" << op_ret << dendl; @@ -477,6 +496,8 @@ void RGWPSAckSubEventOp::execute() } } +// acking of an event +// command POST /subscriptions/?ack&event-id= class RGWPSAckSubEvent_ObjStore_S3 : public RGWPSAckSubEventOp { public: explicit RGWPSAckSubEvent_ObjStore_S3() {} @@ -498,10 +519,11 @@ public: class RGWPSPullSubEventsOp : public RGWOp { protected: int max_entries{0}; - string sub_name; - string marker; + std::string sub_name; + std::string marker; std::unique_ptr ups; - RGWUserPubSub::Sub::list_events_result result; + RGWUserPubSub::SubRef sub; + //RGWUserPubSub::SubWithEvents::list_events_result result; public: RGWPSPullSubEventsOp() {} @@ -527,14 +549,23 @@ void RGWPSPullSubEventsOp::execute() return; } ups = make_unique(store, s->owner.get_id()); - auto sub = ups->get_sub(sub_name); - op_ret = sub->list_events(marker, max_entries, &result); - if (op_ret < 0) { + sub = ups->get_sub_with_events(sub_name); + if (!sub) { + op_ret = -ENOENT; ldout(s->cct, 1) << "failed to get subscription, ret=" << op_ret << dendl; return; } + op_ret = sub->list_events(marker, max_entries); + if (op_ret < 0) { + ldout(s->cct, 1) << "failed to get subscription events, ret=" << op_ret << dendl; + return; + } } +// fetching events from a subscription +// command: GET /subscriptions/?events[&max-entries=][&marker=] +// dpending on whether the subscription was created via s3 compliant API or not +// the matching events will be returned class RGWPSPullSubEvents_ObjStore_S3 : public RGWPSPullSubEventsOp { public: explicit RGWPSPullSubEvents_ObjStore_S3() {} @@ -542,8 +573,8 @@ public: int get_params() override { sub_name = s->object.name; marker = s->info.args.get("marker"); -#define DEFAULT_MAX_ENTRIES 100 - int ret = s->info.args.get_int("max-entries", &max_entries, DEFAULT_MAX_ENTRIES); + const int ret = s->info.args.get_int("max-entries", &max_entries, + RGWUserPubSub::Sub::DEFAULT_MAX_EVENTS); if (ret < 0) { ldout(s->cct, 1) << "failed to parse 'max-entries' param" << dendl; return -EINVAL; @@ -562,7 +593,7 @@ public: return; } - encode_json("result", result, s->formatter); + encode_json("result", *sub, s->formatter); rgw_flush_formatter_and_reset(s, s->formatter); } }; @@ -637,14 +668,13 @@ static int notif_bucket_path(const string& path, string *bucket_name) class RGWPSCreateNotifOp : public RGWDefaultResponseOp { protected: std::unique_ptr ups; - string topic_name; - set events; - string bucket_name; RGWBucketInfo bucket_info; + virtual int get_params() = 0; + public: - RGWPSCreateNotifOp() {} + RGWPSCreateNotifOp() = default; int verify_permission() override { int ret = get_params(); @@ -666,26 +696,23 @@ public: } return 0; } + void pre_exec() override { rgw_bucket_object_pre_exec(s); } RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; } uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; } - virtual int get_params() = 0; }; -// GCP style notification creation -// Command: PUT /notification/bucket/?topic= -// "topic" has to be created beforehand -class RGWPSCreateNotif_ObjStore_GCP : public RGWPSCreateNotifOp { -public: - explicit RGWPSCreateNotif_ObjStore_GCP() {} - - const char* name() const override { return "pubsub_notification_create_gcp"; } - - void execute() override; +// ceph specific notification creation +// command: PUT /notification/bucket/?topic= +// ("topic" has to be created beforehand) +class RGWPSCreateNotif_ObjStore_Ceph : public RGWPSCreateNotifOp { +private: + string topic_name; + std::set events; int get_params() override { bool exists; @@ -701,15 +728,18 @@ public: } return notif_bucket_path(s->object.name, &bucket_name); } + +public: + RGWPSCreateNotif_ObjStore_Ceph() = default; + + const char* name() const override { return "pubsub_notification_create_gcp"; } + + void execute() override; + }; -void RGWPSCreateNotif_ObjStore_GCP::execute() +void RGWPSCreateNotif_ObjStore_Ceph::execute() { - op_ret = get_params(); - if (op_ret < 0) { - return; - } - ups = make_unique(store, s->owner.get_id()); auto b = ups->get_bucket(bucket_info.bucket); @@ -720,43 +750,105 @@ void RGWPSCreateNotif_ObjStore_GCP::execute() } } -// S3 compliant notification creation -// Command: PUT /?notification -// a "topic" will be auto-generated +// s3 compliant notification creation +// command: PUT /?notification +// a "topic", a "notification" and a subscription will be auto-generated +// actual configuration is XML encoded in the body of the message, with following schema example: +// +// +// +// +// +// suffix +// jpg +// +// +// +// notification1 +// arn:aws:sns:::::topic1 +// s3:ObjectCreated:* +// s3:ObjectRemoved:* +// +// class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp { -protected: - std::string topic_name; -public: - explicit RGWPSCreateNotif_ObjStore_S3() {} - - const char* name() const override { return "pubsub_notification_create_s3"; } - - void execute() override; - - int get_params() override { - bool exists; - auto no_value = s->info.args.get("notification", &exists); - if (!exists) { - ldout(s->cct, 20) << "param 'notification' not provided" << dendl; - return -EINVAL; - } else if (no_value.length() > 0) { - ldout(s->cct, 1) << "param 'notification' should not be set with value" << dendl; - return -EINVAL; + struct TopicConfiguration { + std::string id; + std::string endpoint_type; + std::string endpoint_id; + std::string topic; + std::list events; + + bool decode_xml(XMLObj *obj) { + const auto throw_if_missing = true; + RGWXMLDecoder::decode_xml("Id", id, obj, throw_if_missing); + + std::string str_arn; + RGWXMLDecoder::decode_xml("Topic", str_arn, obj, throw_if_missing); + + // parse ARN. allow wildcards + const auto arn = rgw::ARN::parse(str_arn, true); + if (arn == boost::none || arn->resource.empty()) { + throw RGWXMLDecoder::err("topic ARN parsing failed. ARN = '" + str_arn + "'"); + } + + // partition and service are expected to be "aws" and "sns" + // but there is no need to validate ARN them + + const auto arn_resource = rgw::ARNResource::parse(arn->resource); + if (arn_resource == boost::none || arn_resource->resource.empty()) { + throw RGWXMLDecoder::err("topic ARN resource parsing failed. ARNResource = '" + arn->resource + "'"); + } + + if (!arn_resource->resource_type.empty()) { + // endpoint exists + endpoint_type = arn_resource->resource_type; + endpoint_id = arn_resource->resource; + if (arn_resource->qualifier.empty()) { + throw RGWXMLDecoder::err("topic ARN resource parsing failed. missing qualifier for endpoint"); + } + topic = arn_resource->qualifier; + } else { + // only topic + topic = arn_resource->resource; + } + + do_decode_xml_obj(events, "Event", obj); + if (events.empty()) { + // if no events are provided, we assume all events + events.push_back("s3:ObjectCreated:*"); + events.push_back("s3:ObjectRemoved:*"); + } + return true; + } + }; + + struct NotificationConfiguration { + std::list list; + bool decode_xml(XMLObj *obj) { + do_decode_xml_obj(list, "TopicConfiguration", obj); + if (list.empty()) { + throw RGWXMLDecoder::err("at least one 'TopicConfiguration' must exist"); + } + return true; } + } configurations; - if (s->bucket_name.empty()) { - ldout(s->cct, 1) << "notification must be set on a bucket" << dendl; - return -EINVAL; + static std::string s3_to_gcp_event(const std::string& event) { + if (event == "s3:ObjectCreated:*") { + return "OBJECT_CREATE"; } - - bucket_name = s->bucket_name; + if (event == "s3:ObjectRemoved:*") { + return "OBJECT_DELETE"; + } + return ""; + } + int get_params_from_body() { const auto max_size = s->cct->_conf->rgw_max_put_param_size; - - int r = 0; + int r; bufferlist data; - std::tie(r, data) = rgw_rest_read_all_input(s, max_size); + std::tie(r, data) = rgw_rest_read_all_input(s, max_size, false); if (r < 0) { ldout(s->cct, 1) << "failed to read notification parameters from payload" << dendl; @@ -767,46 +859,119 @@ public: return -EINVAL; } - RGWXMLParser parser; + RGWXMLDecoder::XMLParser parser; if (!parser.init()){ ldout(s->cct, 1) << "failed to initialize XML parser" << dendl; return -EINVAL; } - if (!parser.parse(data.c_str(), data.length(), 1)) { - ldout(s->cct, 1) << "failed to parse XML payload or notification" << dendl; + ldout(s->cct, 1) << "failed to parse XML payload of notification" << dendl; return -ERR_MALFORMED_XML; } - try { + // TopicConfigurations is mandatory + RGWXMLDecoder::decode_xml("NotificationConfiguration", configurations, &parser, true); } catch (RGWXMLDecoder::err& err) { - ldout(s->cct, 5) << "Malformed tagging request: " << err << dendl; - return -ERR_MALFORMED_XML; + ldout(s->cct, 1) << "failed to parse XML payload of notification. error: " << err << dendl; + return -ERR_MALFORMED_XML; + } + return 0; + } + + int get_params() override { + bool exists; + const auto no_value = s->info.args.get("notification", &exists); + if (!exists) { + ldout(s->cct, 20) << "param 'notification' not provided" << dendl; + return -EINVAL; + } + if (no_value.length() > 0) { + ldout(s->cct, 1) << "param 'notification' should not be set with value" << dendl; + return -EINVAL; } + if (s->bucket_name.empty()) { + ldout(s->cct, 1) << "notification must be set on a bucket" << dendl; + return -EINVAL; + } + bucket_name = s->bucket_name; return 0; } + +public: + RGWPSCreateNotif_ObjStore_S3() = default; + + const char* name() const override { return "pubsub_notification_create_s3"; } + + void execute() override; }; void RGWPSCreateNotif_ObjStore_S3::execute() { - op_ret = get_params(); + op_ret = get_params_from_body(); if (op_ret < 0) { return; } ups = make_unique(store, s->owner.get_id()); - - op_ret = ups->create_topic(topic_name); - if (op_ret < 0) { - ldout(s->cct, 1) << "failed to auto-generate topic for notification, ret=" << op_ret << dendl; - return; - } auto b = ups->get_bucket(bucket_info.bucket); - op_ret = b->create_notification(topic_name, events); - if (op_ret < 0) { - ldout(s->cct, 1) << "failed to create notification, ret=" << op_ret << dendl; - return; + const auto psmodule = static_cast(store->get_sync_module().get()); + const auto& conf = psmodule->get_effective_conf(); + + for (const auto& c : configurations.list) { + const auto& topic_name = c.topic; + const auto& sub_name = c.id; + if (topic_name.empty()) { + ldout(s->cct, 1) << "missing topic information" << dendl; + op_ret = -EINVAL; + return; + } + if (sub_name.empty()) { + ldout(s->cct, 1) << "missing subscription information" << dendl; + op_ret = -EINVAL; + return; + } + // get endpoint configuration according to type + // no endpoint (pull mode): arn:s3:sns::: + // TODO: HTTP/S endpoint: arn:s3:sns:::webhook:: + // TODO: AMQP endpoint: arn:s3:sns:::amqp:: + if (!c.endpoint_type.empty()) { + ldout(s->cct, 1) << "endpoint type '" << c.endpoint_type << + "' not supported" << dendl; + op_ret = -EINVAL; + return; + } + // generate the topic + op_ret = ups->create_topic(topic_name); + if (op_ret < 0) { + ldout(s->cct, 1) << "failed to auto-generate topic '" << topic_name << + "' for notification, ret=" << op_ret << dendl; + return; + } + // generate the notification + std::set events; + std::transform(c.events.begin(), c.events.end(), std::inserter(events, events.begin()), s3_to_gcp_event); + ceph_assert(b); + op_ret = b->create_notification(topic_name, events); + if (op_ret < 0) { + ldout(s->cct, 1) << "failed to auto-generate notification on topic '" << topic_name << + "', ret=" << op_ret << dendl; + // rollback generated topic + ups->remove_topic(topic_name); + return; + } + + rgw_pubsub_sub_dest dest; + dest.bucket_name = string(conf["data_bucket_prefix"]) + s->owner.get_id().to_str() + "-" + topic_name; + dest.oid_prefix = string(conf["data_oid_prefix"]) + sub_name + "/"; + auto sub = ups->get_sub(sub_name); + op_ret = sub->subscribe(topic_name, dest, c.id); + if (op_ret < 0) { + ldout(s->cct, 1) << "failed to auto-generate subscription '" << sub_name << "', ret=" << op_ret << dendl; + // rollback generated topic + ups->remove_topic(topic_name); + return; + } } } @@ -865,9 +1030,9 @@ void RGWPSDeleteNotifOp::execute() } } -class RGWPSDeleteNotif_ObjStore_GCP : public RGWPSDeleteNotifOp { +class RGWPSDeleteNotif_ObjStore_Ceph : public RGWPSDeleteNotifOp { public: - explicit RGWPSDeleteNotif_ObjStore_GCP() {} + explicit RGWPSDeleteNotif_ObjStore_Ceph() {} int get_params() override { bool exists; @@ -882,7 +1047,7 @@ public: class RGWPSListNotifsOp : public RGWOp { protected: - string bucket_name; + std::string bucket_name; RGWBucketInfo bucket_info; std::unique_ptr ups; rgw_pubsub_bucket_topics result; @@ -933,9 +1098,9 @@ void RGWPSListNotifsOp::execute() } -class RGWPSListNotifs_ObjStore_GCP : public RGWPSListNotifsOp { +class RGWPSListNotifs_ObjStore_Ceph : public RGWPSListNotifsOp { public: - explicit RGWPSListNotifs_ObjStore_GCP() {} + explicit RGWPSListNotifs_ObjStore_Ceph() {} int get_params() override { return notif_bucket_path(s->object.name, &bucket_name); @@ -957,9 +1122,8 @@ public: } }; - -// GCP-style notification handler factory -class RGWHandler_REST_PSNotifs_GCP : public RGWHandler_REST_S3 { +// ceph specific notification handler factory +class RGWHandler_REST_PSNotifs_Ceph : public RGWHandler_REST_S3 { protected: int init_permissions(RGWOp* op) override { return 0; @@ -975,26 +1139,26 @@ protected: if (s->object.empty()) { return nullptr; } - return new RGWPSListNotifs_ObjStore_GCP(); + return new RGWPSListNotifs_ObjStore_Ceph(); } RGWOp *op_put() override { if (!s->object.empty()) { - return new RGWPSCreateNotif_ObjStore_GCP(); + return new RGWPSCreateNotif_ObjStore_Ceph(); } return nullptr; } RGWOp *op_delete() override { if (!s->object.empty()) { - return new RGWPSDeleteNotif_ObjStore_GCP(); + return new RGWPSDeleteNotif_ObjStore_Ceph(); } return nullptr; } public: - explicit RGWHandler_REST_PSNotifs_GCP(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {} - virtual ~RGWHandler_REST_PSNotifs_GCP() {} + explicit RGWHandler_REST_PSNotifs_Ceph(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {} + virtual ~RGWHandler_REST_PSNotifs_Ceph() {} }; -// S3-compliant notification handler factory +// s3 compliant notification handler factory class RGWHandler_REST_PSNotifs_S3 : public RGWHandler_REST_S3 { protected: int init_permissions(RGWOp* op) override { @@ -1035,15 +1199,15 @@ RGWHandler_REST* RGWRESTMgr_PubSub_S3::get_handler(struct req_state* const s, RGWHandler_REST *handler = nullptr; - // GCP-style PubSub API: topics/subscriptions/notification are reserved bucket names + // ceph specific PubSub API: topics/subscriptions/notification are reserved bucket names if (s->init_state.url_bucket == "topics") { handler = new RGWHandler_REST_PSTopic_S3(auth_registry); } else if (s->init_state.url_bucket == "subscriptions") { handler = new RGWHandler_REST_PSSub_S3(auth_registry); } else if (s->init_state.url_bucket == "notifications") { - handler = new RGWHandler_REST_PSNotifs_GCP(auth_registry); + handler = new RGWHandler_REST_PSNotifs_Ceph(auth_registry); } else { - // S3-compliant PubSub API: uses: ?notification + // s3 compliant PubSub API: uses: ?notification handler = new RGWHandler_REST_PSNotifs_S3(auth_registry); } diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt index 9b2c6b63217..6d39dea37d9 100644 --- a/src/test/rgw/CMakeLists.txt +++ b/src/test/rgw/CMakeLists.txt @@ -158,3 +158,9 @@ add_ceph_unittest(unittest_rgw_xml) target_link_libraries(unittest_rgw_xml rgw_a ${EXPAT_LIBRARIES}) +# unittest_rgw_arn +add_executable(unittest_rgw_arn test_rgw_arn.cc) +add_ceph_unittest(unittest_rgw_arn) + +target_link_libraries(unittest_rgw_arn rgw_a) + diff --git a/src/test/rgw/rgw_multi/tests_ps.py b/src/test/rgw/rgw_multi/tests_ps.py index 38d7088182f..1c4e5eeeadc 100644 --- a/src/test/rgw/rgw_multi/tests_ps.py +++ b/src/test/rgw/rgw_multi/tests_ps.py @@ -12,6 +12,7 @@ from rgw_multi.tests import get_realm, \ from rgw_multi.zone_ps import PSTopic, PSNotification, PSSubscription from nose import SkipTest from nose.tools import assert_not_equal, assert_equal +import boto3 # configure logging for the tests module log = logging.getLogger('rgw_multi.tests') @@ -26,8 +27,8 @@ def check_ps_configured(): realm = get_realm() zonegroup = realm.master_zonegroup() - es_zones = zonegroup.zones_by_type.get("pubsub") - if not es_zones: + ps_zones = zonegroup.zones_by_type.get("pubsub") + if not ps_zones: raise SkipTest("Requires at least one PS zone") @@ -65,6 +66,33 @@ def verify_events_by_elements(events, keys, exact_match=False, deletions=False): assert False, err +def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=False): + """ verify there is at least one record per element """ + err = '' + for key in keys: + key_found = False + for record in records: + if record['s3']['bucket']['name'] == key.bucket.name and \ + record['s3']['object']['key'] == key.name: + if deletions and record['eventName'] == 'ObjectRemoved': + key_found = True + break + elif not deletions and record['eventName'] == 'ObjectCreated': + key_found = True + break + if not key_found: + err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key) + log.error(records) + assert False, err + + if not len(records) == len(keys): + err = 'superfluous records are found' + log.debug(err) + if exact_match: + log.error(records) + assert False, err + + def init_env(): """initialize the environment""" check_ps_configured() @@ -91,12 +119,87 @@ def init_env(): TOPIC_SUFFIX = "_topic" SUB_SUFFIX = "_sub" +NOTIFICATION_SUFFIX = "_notif" ############## # pubsub tests ############## +def test_ps_s3_notification(): + zones, ps_zones = init_env() + bucket_name = gen_bucket_name() + # create bucket on the first of the rados zones + bucket = zones[0].create_bucket(bucket_name) + # wait for sync + zone_meta_checkpoint(ps_zones[0].zone) + # create boto3 client + client = boto3.client('s3', + endpoint_url='http://'+ps_zones[0].conn.host+':'+str(ps_zones[0].conn.port), + aws_access_key_id=ps_zones[0].conn.aws_access_key_id, + aws_secret_access_key=ps_zones[0].conn.aws_secret_access_key) + # create s3 notification + topic_name = bucket_name + TOPIC_SUFFIX + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_arn = 'arn:aws:sns:::' + topic_name + response = client.put_bucket_notification_configuration(Bucket=bucket_name, + NotificationConfiguration={ + 'TopicConfigurations': [ + { + 'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': ['s3:ObjectCreated:*'], + } + ] + }) + + status = response['ResponseMetadata']['HTTPStatusCode'] + assert_equal(status/100, 2) + zone_meta_checkpoint(ps_zones[0].zone) + # get auto-generated topic + topic_conf = PSTopic(ps_zones[0].conn, topic_name) + result, _ = topic_conf.get_config() + parsed_result = json.loads(result) + assert_equal(parsed_result['topic']['name'], topic_name) + # get auto-generated notification + notification_conf = PSNotification(ps_zones[0].conn, bucket_name, + topic_name) + result, _ = notification_conf.get_config() + parsed_result = json.loads(result) + assert_equal(len(parsed_result['topics']), 1) + # get auto-generated subscription + sub_conf = PSSubscription(ps_zones[0].conn, notification_name, + topic_name) + result, _ = sub_conf.get_config() + parsed_result = json.loads(result) + assert_equal(parsed_result['topic'], topic_name) + # create objects in the bucket + number_of_objects = 10 + for i in range(number_of_objects): + key = bucket.new_key(str(i)) + key.set_contents_from_string('bar') + # wait for sync + zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + + # get the events from the subscription + result, _ = sub_conf.get_events() + parsed_result = json.loads(result) + for record in parsed_result['Records']: + log.debug(record) + keys = list(bucket.list()) + # TODO: set exact_match to true + verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False) + + # cleanup + sub_conf.del_config() + notification_conf.del_config() + topic_conf.del_config() + # delete the keys + for key in bucket.list(): + key.delete() + zones[0].delete_bucket(bucket_name) + + def test_ps_topic(): """ test set/get/delete of topic """ _, ps_zones = init_env() diff --git a/src/test/rgw/rgw_multi/zone_ps.py b/src/test/rgw/rgw_multi/zone_ps.py index 31a7d4726b1..e59d5ce7025 100644 --- a/src/test/rgw/rgw_multi/zone_ps.py +++ b/src/test/rgw/rgw_multi/zone_ps.py @@ -71,8 +71,10 @@ class PSTopic: assert topic_name.strip() self.resource = '/topics/'+topic_name - def send_request(self, method): + def send_request(self, method, get_list=False): """send request to radosgw""" + if get_list: + return make_request(self.conn, method, '/topics') return make_request(self.conn, method, self.resource) def get_config(self): @@ -86,6 +88,10 @@ class PSTopic: def del_config(self): """delete topic""" return self.send_request('DELETE') + + def get_list(self): + """list all topics""" + return self.send_request('GET', get_list=True) class PSNotification: diff --git a/src/test/rgw/test_rgw_arn.cc b/src/test/rgw/test_rgw_arn.cc new file mode 100644 index 00000000000..a685a006285 --- /dev/null +++ b/src/test/rgw/test_rgw_arn.cc @@ -0,0 +1,98 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "rgw/rgw_arn.h" +#include + +using namespace rgw; + +const int BASIC_ENTRIES = 6; + +const std::string basic_str[BASIC_ENTRIES] = {"arn:aws:s3:us-east-1:12345:resource", + "arn:aws:s3:us-east-1:12345:resourceType/resource", + "arn:aws:s3:us-east-1:12345:resourceType/resource/qualifier", + "arn:aws:s3:us-east-1:12345:resourceType/resource:qualifier", + "arn:aws:s3:us-east-1:12345:resourceType:resource", + "arn:aws:s3:us-east-1:12345:resourceType:resource/qualifier"}; + +const std::string expected_basic_resource[BASIC_ENTRIES] = {"resource", + "resourceType/resource", + "resourceType/resource/qualifier", + "resourceType/resource:qualifier", + "resourceType:resource", + "resourceType:resource/qualifier"}; +TEST(TestARN, Basic) +{ + for (auto i = 0; i < BASIC_ENTRIES; ++i) { + boost::optional arn = ARN::parse(basic_str[i]); + ASSERT_TRUE(arn); + EXPECT_EQ(arn->partition, Partition::aws); + EXPECT_EQ(arn->service, Service::s3); + EXPECT_STREQ(arn->region.c_str(), "us-east-1"); + EXPECT_STREQ(arn->account.c_str(), "12345"); + EXPECT_STREQ(arn->resource.c_str(), expected_basic_resource[i].c_str()); + } +} + +const std::string expected_basic_resource_type[BASIC_ENTRIES] = + {"", "resourceType", "resourceType", "resourceType", "resourceType", "resourceType"}; +const std::string expected_basic_qualifier[BASIC_ENTRIES] = + {"", "", "qualifier", "qualifier", "", "qualifier"}; + +TEST(TestARNResource, Basic) +{ + for (auto i = 0; i < BASIC_ENTRIES; ++i) { + boost::optional arn = ARN::parse(basic_str[i]); + ASSERT_TRUE(arn); + ASSERT_FALSE(arn->resource.empty()); + boost::optional resource = ARNResource::parse(arn->resource); + ASSERT_TRUE(resource); + EXPECT_STREQ(resource->resource.c_str(), "resource"); + EXPECT_STREQ(resource->resource_type.c_str(), expected_basic_resource_type[i].c_str()); + EXPECT_STREQ(resource->qualifier.c_str(), expected_basic_qualifier[i].c_str()); + } +} + +const int EMPTY_ENTRIES = 4; + +const std::string empty_str[EMPTY_ENTRIES] = {"arn:aws:s3:::resource", + "arn:aws:s3::12345:resource", + "arn:aws:s3:us-east-1::resource", + "arn:aws:s3:us-east-1:12345:"}; + +TEST(TestARN, Empty) +{ + for (auto i = 0; i < EMPTY_ENTRIES; ++i) { + boost::optional arn = ARN::parse(empty_str[i]); + ASSERT_TRUE(arn); + EXPECT_EQ(arn->partition, Partition::aws); + EXPECT_EQ(arn->service, Service::s3); + EXPECT_TRUE(arn->region.empty() || arn->region == "us-east-1"); + EXPECT_TRUE(arn->account.empty() || arn->account == "12345"); + EXPECT_TRUE(arn->resource.empty() || arn->resource == "resource"); + } +} + +const int WILDCARD_ENTRIES = 3; + +const std::string wildcard_str[WILDCARD_ENTRIES] = {"arn:aws:s3:*:*:resource", + "arn:aws:s3:*:12345:resource", + "arn:aws:s3:us-east-1:*:resource"}; + +// FIXME: currently the following: "arn:aws:s3:us-east-1:12345:*" +// does not fail, even if "wildcard" is not set to "true" + +TEST(TestARN, Wildcard) +{ + for (auto i = 0; i < WILDCARD_ENTRIES; ++i) { + EXPECT_FALSE(ARN::parse(wildcard_str[i])); + boost::optional arn = ARN::parse(wildcard_str[i], true); + ASSERT_TRUE(arn); + EXPECT_EQ(arn->partition, Partition::aws); + EXPECT_EQ(arn->service, Service::s3); + EXPECT_TRUE(arn->region == "*" || arn->region == "us-east-1"); + EXPECT_TRUE(arn->account == "*" || arn->account == "12345"); + EXPECT_TRUE(arn->resource == "*" || arn->resource == "resource"); + } +} + diff --git a/src/test/rgw/test_rgw_iam_policy.cc b/src/test/rgw/test_rgw_iam_policy.cc index a42360030ab..9cb2c29e38e 100644 --- a/src/test/rgw/test_rgw_iam_policy.cc +++ b/src/test/rgw/test_rgw_iam_policy.cc @@ -39,10 +39,10 @@ using boost::none; using rgw::auth::Identity; using rgw::auth::Principal; -using rgw::IAM::ARN; +using rgw::ARN; using rgw::IAM::Effect; using rgw::IAM::Environment; -using rgw::IAM::Partition; +using rgw::Partition; using rgw::IAM::Policy; using rgw::IAM::s3All; using rgw::IAM::s3Count; @@ -76,7 +76,7 @@ using rgw::IAM::s3ListMultipartUploadParts; using rgw::IAM::None; using rgw::IAM::s3PutBucketAcl; using rgw::IAM::s3PutBucketPolicy; -using rgw::IAM::Service; +using rgw::Service; using rgw::IAM::TokenID; using rgw::IAM::Version; using rgw::IAM::Action_t;