From be64adf916e0f318b22b57cedd02c2b0b2a503f2 Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Thu, 18 Apr 2019 11:21:26 +0300 Subject: [PATCH] rgw/pubsub: fix topic arn. tenant support to multisite tests Signed-off-by: Yuval Lifshitz --- doc/radosgw/pubsub-module.rst | 8 ++- src/rgw/rgw_pubsub.cc | 34 ------------- src/rgw/rgw_pubsub.h | 13 +---- src/rgw/rgw_pubsub_push.cc | 2 +- src/rgw/rgw_pubsub_push.h | 2 - src/rgw/rgw_sync_module_pubsub_rest.cc | 19 +++++-- src/test/rgw/rgw_multi/tests.py | 10 ++++ src/test/rgw/rgw_multi/tests_ps.py | 70 +++++++++++++++++++------- src/test/rgw/test_multi.py | 10 ++-- 9 files changed, 89 insertions(+), 79 deletions(-) diff --git a/doc/radosgw/pubsub-module.rst b/doc/radosgw/pubsub-module.rst index 26a0fb98f8d..5185ef06edf 100644 --- a/doc/radosgw/pubsub-module.rst +++ b/doc/radosgw/pubsub-module.rst @@ -157,13 +157,11 @@ Request parameters: - "none" - message is considered "delivered" if sent to broker - "broker" message is considered "delivered" if acked by broker -Response: -The ARN will have one of the following format (depending with whether a push-endpoint was defined): +The topic ARN in the response will have the following format: :: arn:aws:sns::: - arn:aws:sns::::: Get Topic Information ````````````````````` @@ -522,7 +520,7 @@ the events will have an S3-compatible record format (JSON): - s3.object.version: object version in case of versioned bucket - s3.object.sequencer: monotonically increasing identifier of the change per object (hexadecimal format) -In case that the subscription was not created via an S3-compatible notification, +In case that the subscription was not created via a non S3-compatible notification, the events will have the following event format (JSON): :: @@ -549,7 +547,7 @@ the events will have the following event format (JSON): } ]} -- id: unique ID of the event, that could be used for acking (an extension to the S3 notification API) +- id: unique ID of the event, that could be used for acking - event: either ``OBJECT_CREATE``, or ``OBJECT_DELETE`` - timestamp: timestamp indicating when the event was sent - info.attrs.mtime: timestamp indicating when the event was triggered diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index 0d276df6164..d797efa9e82 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -160,40 +160,6 @@ void rgw_pubsub_sub_config::dump(Formatter *f) const encode_json("s3_id", s3_id, f); } -std::string dest_to_topic_arn(const rgw_pubsub_sub_dest& dest, - const std::string& topic_name, - const std::string& zonegroup_name, - const std::string& user_name) { - rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns, zonegroup_name, user_name, ""); - rgw::ARNResource arn_resource; - const auto endpoint_type = RGWPubSubEndpoint::get_schema(dest.push_endpoint); - if (endpoint_type.empty()) { - arn_resource.resource = topic_name; - } else { - // add endpoint info as resource - arn_resource.resource_type = endpoint_type; - arn_resource.resource = dest.push_endpoint; - arn_resource.qualifier = topic_name; - } - arn.resource = arn_resource.to_string(); - return arn.to_string(); -} - -std::string topic_name_from_arn(const std::string& topic_arn) { - const auto arn = rgw::ARN::parse(topic_arn); - if (!arn || arn->resource.empty()) { - return ""; - } - const auto arn_resource = rgw::ARNResource::parse(arn->resource); - if (!arn_resource) { - return ""; - } - if (arn_resource->resource_type.empty()) { - return arn_resource->resource; - } - return arn_resource->qualifier; -} - int RGWUserPubSub::remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker) { int ret = rgw_delete_system_obj(store, obj.pool, obj.oid, objv_tracker); diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index 56c737ce63d..4dbaf5023a5 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -23,7 +23,7 @@ class XMLObj; notification1 - arn:aws:sns:::[:]: + arn:aws:sns::: s3:ObjectCreated:* s3:ObjectRemoved:* @@ -411,17 +411,6 @@ WRITE_CLASS_ENCODER(rgw_pubsub_user_topics) static std::string pubsub_user_oid_prefix = "pubsub.user."; -// generate a topic ARN string -// no endpoint (pull mode only): arn:s3:sns::: -// with endpoint : arn:s3:sns::::: -std::string dest_to_topic_arn(const rgw_pubsub_sub_dest& dest, - const std::string& topic_name, - const std::string& zonegroup_name, - const std::string& user_name); - -// extract the name of the topic from the string representation of the topic ARN -std::string topic_name_from_arn(const std::string& topic_arn); - class RGWUserPubSub { friend class Bucket; diff --git a/src/rgw/rgw_pubsub_push.cc b/src/rgw/rgw_pubsub_push.cc index 81c9aefa1a6..2435f9f0e75 100644 --- a/src/rgw/rgw_pubsub_push.cc +++ b/src/rgw/rgw_pubsub_push.cc @@ -312,7 +312,7 @@ static const std::string WEBHOOK_SCHEMA("webhook"); static const std::string UNKNOWN_SCHEMA("unknown"); static const std::string NO_SCHEMA(""); -const std::string& RGWPubSubEndpoint::get_schema(const std::string& endpoint) { +const std::string& get_schema(const std::string& endpoint) { if (endpoint.empty()) { return NO_SCHEMA; } diff --git a/src/rgw/rgw_pubsub_push.h b/src/rgw/rgw_pubsub_push.h index 96e3499aa60..3a0dc9f54b9 100644 --- a/src/rgw/rgw_pubsub_push.h +++ b/src/rgw/rgw_pubsub_push.h @@ -25,8 +25,6 @@ public: typedef std::unique_ptr Ptr; - static const std::string& get_schema(const std::string& endpoint); - // factory method for the actual notification endpoint // derived class specific arguments are passed in http args format // may throw a configuration_error if creation fails diff --git a/src/rgw/rgw_sync_module_pubsub_rest.cc b/src/rgw/rgw_sync_module_pubsub_rest.cc index 1c52d159136..f80c05e0b1c 100644 --- a/src/rgw/rgw_sync_module_pubsub_rest.cc +++ b/src/rgw/rgw_sync_module_pubsub_rest.cc @@ -66,8 +66,11 @@ public: // bucket to store events/records will be set only when subscription is created dest.bucket_name = ""; dest.oid_prefix = ""; - // the ARN will be sent in the reply - topic_arn = dest_to_topic_arn(dest, topic_name, s->zonegroup_name, s->account_name); + // the topic ARN will be sent in the reply + const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns, + store->svc.zone->get_zonegroup().get_name(), + s->user->user_id.tenant, topic_name); + topic_arn = arn.to_string(); return 0; } @@ -252,12 +255,15 @@ protected: int init_permissions(RGWOp* op) override { return 0; } + int read_permissions(RGWOp* op) override { return 0; } + bool supports_quota() override { return false; } + RGWOp *op_get() override { if (s->init_state.url_bucket.empty()) { return nullptr; @@ -687,11 +693,12 @@ public: ret = store->get_bucket_info(*s->sysobj_ctx, id.tenant, bucket_name, bucket_info, nullptr, nullptr); if (ret < 0) { + ldout(s->cct, 1) << "failed to get bucket info, cannot verify ownership" << dendl; return ret; } if (bucket_info.owner != id) { - ldout(s->cct, 1) << "user doesn't own bucket, cannot create notification" << dendl; + ldout(s->cct, 1) << "user doesn't own bucket, not allowed to create notification" << dendl; return -EPERM; } return 0; @@ -869,13 +876,15 @@ void RGWPSCreateNotif_ObjStore_S3::execute() { return; } - const auto topic_name = topic_name_from_arn(c.topic_arn); - if (topic_name.empty()) { + const auto arn = rgw::ARN::parse(c.topic_arn); + if (!arn || arn->resource.empty()) { ldout(s->cct, 1) << "topic ARN has invalid format:" << c.topic_arn << dendl; op_ret = -EINVAL; return; } + const auto topic_name = arn->resource; + // get topic information. destination information is stored in the topic rgw_pubsub_topic_subs topic_info; op_ret = ups->get_topic(topic_name, &topic_info); diff --git a/src/test/rgw/rgw_multi/tests.py b/src/test/rgw/rgw_multi/tests.py index cb77985bc4b..ab52be26694 100644 --- a/src/test/rgw/rgw_multi/tests.py +++ b/src/test/rgw/rgw_multi/tests.py @@ -35,6 +35,7 @@ class Config: self.checkpoint_delay = kwargs.get('checkpoint_delay', 5) # allow some time for realm reconfiguration after changing master zone self.reconfigure_delay = kwargs.get('reconfigure_delay', 5) + self.tenant = kwargs.get('tenant', '') # rgw multisite tests, written against the interfaces provided in rgw_multi. # these tests must be initialized and run by another module that provides @@ -51,6 +52,12 @@ def init_multi(_realm, _user, _config=None): config = _config or Config() realm_meta_checkpoint(realm) +def get_user(): + return user.id if user is not None else '' + +def get_tenant(): + return config.tenant if config is not None and config.tenant is not None else '' + def get_realm(): return realm @@ -108,6 +115,7 @@ def datalog_autotrim(zone): def bilog_list(zone, bucket, args = None): cmd = ['bilog', 'list', '--bucket', bucket] + (args or []) + cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else [] bilog, _ = zone.cluster.admin(cmd, read_only=True) bilog = bilog.decode('utf-8') return json.loads(bilog) @@ -265,6 +273,7 @@ def bucket_sync_status(target_zone, source_zone, bucket_name): cmd = ['bucket', 'sync', 'markers'] + target_zone.zone_args() cmd += ['--source-zone', source_zone.name] cmd += ['--bucket', bucket_name] + cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else [] while True: bucket_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True) if retcode == 0: @@ -300,6 +309,7 @@ def data_source_log_status(source_zone): def bucket_source_log_status(source_zone, bucket_name): cmd = ['bilog', 'status'] + source_zone.zone_args() cmd += ['--bucket', bucket_name] + cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else [] source_cluster = source_zone.cluster bilog_status_json, retcode = source_cluster.admin(cmd, read_only=True) bilog_status = json.loads(bilog_status_json.decode('utf-8')) diff --git a/src/test/rgw/rgw_multi/tests_ps.py b/src/test/rgw/rgw_multi/tests_ps.py index 8f95523d3a4..8846f5a8008 100644 --- a/src/test/rgw/rgw_multi/tests_ps.py +++ b/src/test/rgw/rgw_multi/tests_ps.py @@ -1,15 +1,18 @@ import logging import json import tempfile -from rgw_multi.tests import get_realm, \ +from .tests import get_realm, \ ZonegroupConns, \ zonegroup_meta_checkpoint, \ zone_meta_checkpoint, \ zone_bucket_checkpoint, \ zone_data_checkpoint, \ check_bucket_eq, \ - gen_bucket_name -from rgw_multi.zone_ps import PSTopic, PSNotification, PSSubscription, PSNotificationS3, print_connection_info + gen_bucket_name, \ + get_user, \ + get_tenant +from .zone_ps import PSTopic, PSNotification, PSSubscription, PSNotificationS3, print_connection_info +from multisite import User from nose import SkipTest from nose.tools import assert_not_equal, assert_equal @@ -126,6 +129,7 @@ NOTIFICATION_SUFFIX = "_notif" # pubsub tests ############## + def test_ps_info(): """ log information for manual testing """ return SkipTest("only used in manual testing") @@ -141,29 +145,33 @@ def test_ps_info(): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') print('Zonegroup: ' + zonegroup.name) + print('user: ' + get_user()) + print('tenant: ' + get_tenant()) print('Master Zone') print_connection_info(zones[0].conn) print('PubSub Zone') print_connection_info(ps_zones[0].conn) print('Bucket: ' + bucket_name) + def test_ps_s3_notification_low_level(): """ test low level implementation of s3 notifications """ zones, ps_zones = init_env() bucket_name = gen_bucket_name() # create bucket on the first of the rados zones - bucket = zones[0].create_bucket(bucket_name) + zones[0].create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zones[0].zone) # create topic topic_name = bucket_name + TOPIC_SUFFIX topic_conf = PSTopic(ps_zones[0].conn, topic_name) - _, status = topic_conf.set_config() + result, status = topic_conf.set_config() assert_equal(status/100, 2) + parsed_result = json.loads(result) + topic_arn = parsed_result['arn'] # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX generated_topic_name = notification_name+'_'+topic_name - topic_arn = 'arn:aws:sns:::' + topic_name s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, notification_name, topic_arn, ['s3:ObjectCreated:*']) response, status = s3_notification_conf.set_config() @@ -227,11 +235,12 @@ def test_ps_s3_notification_records(): # create topic topic_name = bucket_name + TOPIC_SUFFIX topic_conf = PSTopic(ps_zones[0].conn, topic_name) - _, status = topic_conf.set_config() + result, status = topic_conf.set_config() assert_equal(status/100, 2) + parsed_result = json.loads(result) + topic_arn = parsed_result['arn'] # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX - topic_arn = 'arn:aws:sns:::' + topic_name s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, notification_name, topic_arn, ['s3:ObjectCreated:*']) response, status = s3_notification_conf.set_config() @@ -273,18 +282,17 @@ def test_ps_s3_notification(): zones, ps_zones = init_env() bucket_name = gen_bucket_name() # create bucket on the first of the rados zones - bucket = zones[0].create_bucket(bucket_name) + zones[0].create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zones[0].zone) topic_name = bucket_name + TOPIC_SUFFIX # create topic topic_name = bucket_name + TOPIC_SUFFIX - topic_arn = 'arn:aws:sns:::' + topic_name topic_conf = PSTopic(ps_zones[0].conn, topic_name) response, status = topic_conf.set_config() assert_equal(status/100, 2) parsed_result = json.loads(response) - assert_equal(parsed_result['arn'], topic_arn) + topic_arn = parsed_result['arn'] # create one s3 notification notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1' s3_notification_conf1 = PSNotificationS3(ps_zones[0].conn, bucket_name, @@ -332,6 +340,8 @@ def test_ps_s3_notification(): def test_ps_topic(): """ test set/get/delete of topic """ _, ps_zones = init_env() + realm = get_realm() + zonegroup = realm.master_zonegroup() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX @@ -345,6 +355,8 @@ def test_ps_topic(): parsed_result = json.loads(result) assert_equal(parsed_result['topic']['name'], topic_name) assert_equal(len(parsed_result['subs']), 0) + assert_equal(parsed_result['topic']['arn'], + 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name) # delete topic _, status = topic_conf.del_config() assert_equal(status/100, 2) @@ -354,6 +366,30 @@ def test_ps_topic(): assert_equal(parsed_result['Code'], 'NoSuchKey') +def test_ps_topic_with_endpoint(): + """ test set topic with endpoint""" + _, ps_zones = init_env() + bucket_name = gen_bucket_name() + topic_name = bucket_name+TOPIC_SUFFIX + + # create topic + dest_endpoint = 'amqp://localhost:7001' + dest_args = 'amqp-exchange=amqp.direct&amqp-ack-level=none' + topic_conf = PSTopic(ps_zones[0].conn, topic_name, + endpoint=dest_endpoint, + endpoint_args=dest_args) + _, status = topic_conf.set_config() + assert_equal(status/100, 2) + # get topic + result, _ = topic_conf.get_config() + # verify topic content + parsed_result = json.loads(result) + assert_equal(parsed_result['topic']['name'], topic_name) + assert_equal(parsed_result['topic']['dest']['push_endpoint'], dest_endpoint) + # cleanup + topic_conf.del_config() + + def test_ps_notification(): """ test set/get/delete of notification """ zones, ps_zones = init_env() @@ -930,15 +966,16 @@ def test_ps_s3_push_http(): # create topic topic_conf = PSTopic(ps_zones[0].conn, topic_name, endpoint='http://localhost:9001') - _, status = topic_conf.set_config() + result, status = topic_conf.set_config() assert_equal(status/100, 2) + parsed_result = json.loads(result) + topic_arn = parsed_result['arn'] # create bucket on the first of the rados zones bucket = zones[0].create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zones[0].zone) # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX - topic_arn = 'arn:aws:sns:::' + topic_name s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, notification_name, topic_arn, ['s3:ObjectCreated:*']) _, status = s3_notification_conf.set_config() @@ -1027,15 +1064,15 @@ def test_ps_s3_push_amqp(): topic_conf = PSTopic(ps_zones[0].conn, topic_name, endpoint='amqp://localhost', endpoint_args='amqp-exchange=ex1&amqp-ack-level=none') - _, status = topic_conf.set_config() + result, status = topic_conf.set_config() assert_equal(status/100, 2) + topic_arn = 'arn:aws:sns:::' + topic_name # create bucket on the first of the rados zones bucket = zones[0].create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zones[0].zone) # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX - topic_arn = 'arn:aws:sns:::' + topic_name s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, notification_name, topic_arn, ['s3:ObjectCreated:*']) _, status = s3_notification_conf.set_config() @@ -1074,12 +1111,11 @@ def test_ps_delete_bucket(): topic_name = bucket_name + TOPIC_SUFFIX # create topic topic_name = bucket_name + TOPIC_SUFFIX - topic_arn = 'arn:aws:sns:::' + topic_name topic_conf = PSTopic(ps_zones[0].conn, topic_name) response, status = topic_conf.set_config() assert_equal(status/100, 2) parsed_result = json.loads(response) - assert_equal(parsed_result['arn'], topic_arn) + topic_arn = parsed_result['arn'] # create one s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, diff --git a/src/test/rgw/test_multi.py b/src/test/rgw/test_multi.py index 6b12a6c0e0e..ebfc5b505a1 100644 --- a/src/test/rgw/test_multi.py +++ b/src/test/rgw/test_multi.py @@ -347,13 +347,16 @@ def init(parse_args): arg = ['--display-name', '"Test User"'] arg += user_creds.credential_args() if args.tenant: - cmd += ['--tenant', args.tenant] + arg += ['--tenant', args.tenant] user.create(zone, arg) else: # read users and update keys admin_user.info(zone) admin_creds = admin_user.credentials[0] - user.info(zone) + arg = [] + if args.tenant: + arg += ['--tenant', args.tenant] + user.info(zone, arg) user_creds = user.credentials[0] if not bootstrap: @@ -361,7 +364,8 @@ def init(parse_args): config = Config(checkpoint_retries=args.checkpoint_retries, checkpoint_delay=args.checkpoint_delay, - reconfigure_delay=args.reconfigure_delay) + reconfigure_delay=args.reconfigure_delay, + tenant=args.tenant) init_multi(realm, user, config) def setup_module(): -- 2.39.5