// bucket to store events/records will be set only when subscription is created
dest.bucket_name = "";
dest.oid_prefix = "";
- // the ARN will be sent in the reply
- topic_arn = dest_to_topic_arn(dest, topic_name, s->zonegroup_name, s->account_name);
+ // the topic ARN will be sent in the reply
+ const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns,
+ store->svc.zone->get_zonegroup().get_name(),
+ s->user->user_id.tenant, topic_name);
+ topic_arn = arn.to_string();
return 0;
}
int init_permissions(RGWOp* op) override {
return 0;
}
+
int read_permissions(RGWOp* op) override {
return 0;
}
+
bool supports_quota() override {
return false;
}
+
RGWOp *op_get() override {
if (s->init_state.url_bucket.empty()) {
return nullptr;
ret = store->get_bucket_info(*s->sysobj_ctx, id.tenant, bucket_name,
bucket_info, nullptr, nullptr);
if (ret < 0) {
+ ldout(s->cct, 1) << "failed to get bucket info, cannot verify ownership" << dendl;
return ret;
}
if (bucket_info.owner != id) {
- ldout(s->cct, 1) << "user doesn't own bucket, cannot create notification" << dendl;
+ ldout(s->cct, 1) << "user doesn't own bucket, not allowed to create notification" << dendl;
return -EPERM;
}
return 0;
return;
}
- const auto topic_name = topic_name_from_arn(c.topic_arn);
- if (topic_name.empty()) {
+ const auto arn = rgw::ARN::parse(c.topic_arn);
+ if (!arn || arn->resource.empty()) {
ldout(s->cct, 1) << "topic ARN has invalid format:" << c.topic_arn << dendl;
op_ret = -EINVAL;
return;
}
+ const auto topic_name = arn->resource;
+
// get topic information. destination information is stored in the topic
rgw_pubsub_topic_subs topic_info;
op_ret = ups->get_topic(topic_name, &topic_info);
self.checkpoint_delay = kwargs.get('checkpoint_delay', 5)
# allow some time for realm reconfiguration after changing master zone
self.reconfigure_delay = kwargs.get('reconfigure_delay', 5)
+ self.tenant = kwargs.get('tenant', '')
# rgw multisite tests, written against the interfaces provided in rgw_multi.
# these tests must be initialized and run by another module that provides
config = _config or Config()
realm_meta_checkpoint(realm)
+def get_user():
+ return user.id if user is not None else ''
+
+def get_tenant():
+ return config.tenant if config is not None and config.tenant is not None else ''
+
def get_realm():
return realm
def bilog_list(zone, bucket, args = None):
cmd = ['bilog', 'list', '--bucket', bucket] + (args or [])
+ cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else []
bilog, _ = zone.cluster.admin(cmd, read_only=True)
bilog = bilog.decode('utf-8')
return json.loads(bilog)
cmd = ['bucket', 'sync', 'markers'] + target_zone.zone_args()
cmd += ['--source-zone', source_zone.name]
cmd += ['--bucket', bucket_name]
+ cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else []
while True:
bucket_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
if retcode == 0:
def bucket_source_log_status(source_zone, bucket_name):
cmd = ['bilog', 'status'] + source_zone.zone_args()
cmd += ['--bucket', bucket_name]
+ cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else []
source_cluster = source_zone.cluster
bilog_status_json, retcode = source_cluster.admin(cmd, read_only=True)
bilog_status = json.loads(bilog_status_json.decode('utf-8'))
import logging
import json
import tempfile
-from rgw_multi.tests import get_realm, \
+from .tests import get_realm, \
ZonegroupConns, \
zonegroup_meta_checkpoint, \
zone_meta_checkpoint, \
zone_bucket_checkpoint, \
zone_data_checkpoint, \
check_bucket_eq, \
- gen_bucket_name
-from rgw_multi.zone_ps import PSTopic, PSNotification, PSSubscription, PSNotificationS3, print_connection_info
+ gen_bucket_name, \
+ get_user, \
+ get_tenant
+from .zone_ps import PSTopic, PSNotification, PSSubscription, PSNotificationS3, print_connection_info
+from multisite import User
from nose import SkipTest
from nose.tools import assert_not_equal, assert_equal
# pubsub tests
##############
+
def test_ps_info():
""" log information for manual testing """
return SkipTest("only used in manual testing")
key = bucket.new_key(str(i))
key.set_contents_from_string('bar')
print('Zonegroup: ' + zonegroup.name)
+ print('user: ' + get_user())
+ print('tenant: ' + get_tenant())
print('Master Zone')
print_connection_info(zones[0].conn)
print('PubSub Zone')
print_connection_info(ps_zones[0].conn)
print('Bucket: ' + bucket_name)
+
def test_ps_s3_notification_low_level():
""" test low level implementation of s3 notifications """
zones, ps_zones = init_env()
bucket_name = gen_bucket_name()
# create bucket on the first of the rados zones
- bucket = zones[0].create_bucket(bucket_name)
+ zones[0].create_bucket(bucket_name)
# wait for sync
zone_meta_checkpoint(ps_zones[0].zone)
# create topic
topic_name = bucket_name + TOPIC_SUFFIX
topic_conf = PSTopic(ps_zones[0].conn, topic_name)
- _, status = topic_conf.set_config()
+ result, status = topic_conf.set_config()
assert_equal(status/100, 2)
+ parsed_result = json.loads(result)
+ topic_arn = parsed_result['arn']
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
generated_topic_name = notification_name+'_'+topic_name
- topic_arn = 'arn:aws:sns:::' + topic_name
s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name,
notification_name, topic_arn, ['s3:ObjectCreated:*'])
response, status = s3_notification_conf.set_config()
# create topic
topic_name = bucket_name + TOPIC_SUFFIX
topic_conf = PSTopic(ps_zones[0].conn, topic_name)
- _, status = topic_conf.set_config()
+ result, status = topic_conf.set_config()
assert_equal(status/100, 2)
+ parsed_result = json.loads(result)
+ topic_arn = parsed_result['arn']
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
- topic_arn = 'arn:aws:sns:::' + topic_name
s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name,
notification_name, topic_arn, ['s3:ObjectCreated:*'])
response, status = s3_notification_conf.set_config()
zones, ps_zones = init_env()
bucket_name = gen_bucket_name()
# create bucket on the first of the rados zones
- bucket = zones[0].create_bucket(bucket_name)
+ zones[0].create_bucket(bucket_name)
# wait for sync
zone_meta_checkpoint(ps_zones[0].zone)
topic_name = bucket_name + TOPIC_SUFFIX
# create topic
topic_name = bucket_name + TOPIC_SUFFIX
- topic_arn = 'arn:aws:sns:::' + topic_name
topic_conf = PSTopic(ps_zones[0].conn, topic_name)
response, status = topic_conf.set_config()
assert_equal(status/100, 2)
parsed_result = json.loads(response)
- assert_equal(parsed_result['arn'], topic_arn)
+ topic_arn = parsed_result['arn']
# create one s3 notification
notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1'
s3_notification_conf1 = PSNotificationS3(ps_zones[0].conn, bucket_name,
def test_ps_topic():
""" test set/get/delete of topic """
_, ps_zones = init_env()
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
parsed_result = json.loads(result)
assert_equal(parsed_result['topic']['name'], topic_name)
assert_equal(len(parsed_result['subs']), 0)
+ assert_equal(parsed_result['topic']['arn'],
+ 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name)
# delete topic
_, status = topic_conf.del_config()
assert_equal(status/100, 2)
assert_equal(parsed_result['Code'], 'NoSuchKey')
+def test_ps_topic_with_endpoint():
+ """ test set topic with endpoint"""
+ _, ps_zones = init_env()
+ bucket_name = gen_bucket_name()
+ topic_name = bucket_name+TOPIC_SUFFIX
+
+ # create topic
+ dest_endpoint = 'amqp://localhost:7001'
+ dest_args = 'amqp-exchange=amqp.direct&amqp-ack-level=none'
+ topic_conf = PSTopic(ps_zones[0].conn, topic_name,
+ endpoint=dest_endpoint,
+ endpoint_args=dest_args)
+ _, status = topic_conf.set_config()
+ assert_equal(status/100, 2)
+ # get topic
+ result, _ = topic_conf.get_config()
+ # verify topic content
+ parsed_result = json.loads(result)
+ assert_equal(parsed_result['topic']['name'], topic_name)
+ assert_equal(parsed_result['topic']['dest']['push_endpoint'], dest_endpoint)
+ # cleanup
+ topic_conf.del_config()
+
+
def test_ps_notification():
""" test set/get/delete of notification """
zones, ps_zones = init_env()
# create topic
topic_conf = PSTopic(ps_zones[0].conn, topic_name,
endpoint='http://localhost:9001')
- _, status = topic_conf.set_config()
+ result, status = topic_conf.set_config()
assert_equal(status/100, 2)
+ parsed_result = json.loads(result)
+ topic_arn = parsed_result['arn']
# create bucket on the first of the rados zones
bucket = zones[0].create_bucket(bucket_name)
# wait for sync
zone_meta_checkpoint(ps_zones[0].zone)
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
- topic_arn = 'arn:aws:sns:::' + topic_name
s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name,
notification_name, topic_arn, ['s3:ObjectCreated:*'])
_, status = s3_notification_conf.set_config()
topic_conf = PSTopic(ps_zones[0].conn, topic_name,
endpoint='amqp://localhost',
endpoint_args='amqp-exchange=ex1&amqp-ack-level=none')
- _, status = topic_conf.set_config()
+ result, status = topic_conf.set_config()
assert_equal(status/100, 2)
+ topic_arn = 'arn:aws:sns:::' + topic_name
# create bucket on the first of the rados zones
bucket = zones[0].create_bucket(bucket_name)
# wait for sync
zone_meta_checkpoint(ps_zones[0].zone)
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
- topic_arn = 'arn:aws:sns:::' + topic_name
s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name,
notification_name, topic_arn, ['s3:ObjectCreated:*'])
_, status = s3_notification_conf.set_config()
topic_name = bucket_name + TOPIC_SUFFIX
# create topic
topic_name = bucket_name + TOPIC_SUFFIX
- topic_arn = 'arn:aws:sns:::' + topic_name
topic_conf = PSTopic(ps_zones[0].conn, topic_name)
response, status = topic_conf.set_config()
assert_equal(status/100, 2)
parsed_result = json.loads(response)
- assert_equal(parsed_result['arn'], topic_arn)
+ topic_arn = parsed_result['arn']
# create one s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name,