import boto
import boto.s3.connection
import boto.iam.connection
+import boto3
def get_gateway_connection(gateway, credentials):
""" connect to the given gateway """
port = gateway.port,
is_secure = False)
return gateway.iam_connection
+
+
+def get_gateway_s3_client(gateway, credentials, region):
+ """ connect to boto3 s3 client api of the given gateway """
+ if gateway.s3_client is None:
+ gateway.s3_client = boto3.client('s3',
+ endpoint_url='http://' + gateway.host + ':' + str(gateway.port),
+ aws_access_key_id=credentials.access_key,
+ aws_secret_access_key=credentials.secret,
+ region_name=region)
+ return gateway.s3_client
+
+
+def get_gateway_sns_client(gateway, credentials, region):
+ """ connect to boto3 s3 client api of the given gateway """
+ if gateway.sns_client is None:
+ gateway.sns_client = boto3.client('sns',
+ endpoint_url='http://' + gateway.host + ':' + str(gateway.port),
+ aws_access_key_id=credentials.access_key,
+ aws_secret_access_key=credentials.secret,
+ region_name=region)
+ return gateway.sns_client
from boto.s3.cors import CORSConfiguration
from nose.tools import eq_ as eq
-from nose.tools import assert_not_equal, assert_equal
+from nose.tools import assert_not_equal, assert_equal, assert_true, assert_false
from nose.plugins.attrib import attr
from nose.plugins.skip import SkipTest
run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
num_roles = 0
+num_topic = 0
def get_zone_connection(zone, credentials):
""" connect to the zone's first gateway """
num_roles += 1
return "roles" + '-' + run_prefix + '-' + str(num_roles)
+
+def gen_topic_name():
+ global num_topic
+
+ num_topic += 1
+ return "topic" + '-' + run_prefix + '-' + str(num_topic)
+
class ZonegroupConns:
def __init__(self, zonegroup):
self.zonegroup = zonegroup
return True
+
+def get_topics(zone):
+ """
+ Get list of topics in cluster.
+ """
+ cmd = ['topic', 'list'] + zone.zone_args()
+ topics_json, _ = zone.cluster.admin(cmd, read_only=True)
+ topics = json.loads(topics_json)
+ return topics['topics']
+
+
+def create_topic_per_zone(zonegroup_conns, topics_per_zone=1):
+ topics = []
+ zone_topic = []
+ for zone in zonegroup_conns.rw_zones:
+ for _ in range(topics_per_zone):
+ topic_name = gen_topic_name()
+ log.info('create topic zone=%s name=%s', zone.name, topic_name)
+ attributes = {
+ "push-endpoint": "http://kaboom:9999",
+ "persistent": "true",
+ }
+ topic_arn = zone.create_topic(topic_name, attributes)
+ topics.append(topic_arn)
+ zone_topic.append((zone, topic_arn))
+
+ return topics, zone_topic
+
def create_role_per_zone(zonegroup_conns, roles_per_zone = 1):
roles = []
zone_role = []
test_sync_flow_symmetrical_zonegroup_all()
finally:
start_2nd_rgw(zonegroup)
+
+def test_topic_notification_sync():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_meta_checkpoint(zonegroup)
+ # let wait for users and other settings to sync across all zones.
+ time.sleep(config.checkpoint_delay)
+ # create topics in each zone.
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ topic_arns, zone_topic = create_topic_per_zone(zonegroup_conns)
+ log.debug("topic_arns: %s", topic_arns)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # verify topics exists in all zones
+ for conn in zonegroup_conns.zones:
+ topic_list = conn.list_topics()
+ log.debug("topics for zone=%s = %s", conn.name, topic_list)
+ assert_equal(len(topic_list), len(topic_arns))
+ for topic_arn_map in topic_list:
+ assert_true(topic_arn_map['TopicArn'] in topic_arns)
+
+ # create a bucket
+ bucket = zonegroup_conns.rw_zones[0].create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create bucket_notification in each zone.
+ notification_ids = []
+ num = 1
+ for zone_conn, topic_arn in zone_topic:
+ noti_id = "bn" + '-' + run_prefix + '-' + str(num)
+ notification_ids.append(noti_id)
+ topic_conf = {'Id': noti_id,
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:*']
+ }
+ num += 1
+ log.info('creating bucket notification for zone=%s name=%s', zone_conn.name, noti_id)
+ zone_conn.create_notification(bucket.name, [topic_conf])
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # verify notifications exists in all zones
+ for conn in zonegroup_conns.zones:
+ notification_list = conn.list_notifications(bucket.name)
+ log.debug("notifications for zone=%s = %s", conn.name, notification_list)
+ assert_equal(len(notification_list), len(topic_arns))
+ for notification in notification_list:
+ assert_true(notification['Id'] in notification_ids)
+
+ # verify bucket_topic mapping
+ # create a new bucket and subcribe it to first topic.
+ bucket_2 = zonegroup_conns.rw_zones[0].create_bucket(gen_bucket_name())
+ notif_id = "bn-2" + '-' + run_prefix
+ topic_conf = {'Id': notif_id,
+ 'TopicArn': topic_arns[0],
+ 'Events': ['s3:ObjectCreated:*']
+ }
+ zonegroup_conns.rw_zones[0].create_notification(bucket_2.name, [topic_conf])
+ zonegroup_meta_checkpoint(zonegroup)
+ for conn in zonegroup_conns.zones:
+ topics = get_topics(conn.zone)
+ for topic in topics:
+ if topic['arn'] == topic_arns[0]:
+ assert_equal(len(topic['subscribed_buckets']), 2)
+ assert_true(bucket_2.name in topic['subscribed_buckets'])
+ else:
+ assert_equal(len(topic['subscribed_buckets']), 1)
+ assert_true(bucket.name in topic['subscribed_buckets'])
+
+ # delete the 2nd bucket and verify the mapping is removed.
+ zonegroup_conns.rw_zones[0].delete_bucket(bucket_2.name)
+ zonegroup_meta_checkpoint(zonegroup)
+ for conn in zonegroup_conns.zones:
+ topics = get_topics(conn.zone)
+ for topic in topics:
+ assert_equal(len(topic['subscribed_buckets']), 1)
+ '''TODO(Remove the break once the https://tracker.ceph.com/issues/20802
+ is fixed, as the secondary site bucket instance info is currently not
+ getting deleted coz of the bug hence the bucket-topic mapping
+ deletion is not invoked on secondary sites.)'''
+ break
+
+ # delete notifications
+ zonegroup_conns.rw_zones[0].delete_notifications(bucket.name)
+ log.debug('Deleting all notifications for bucket=%s', bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # verify notification deleted in all zones
+ for conn in zonegroup_conns.zones:
+ notification_list = conn.list_notifications(bucket.name)
+ assert_equal(len(notification_list), 0)
+
+ # delete topics
+ for zone_conn, topic_arn in zone_topic:
+ log.debug('deleting topic zone=%s arn=%s', zone_conn.name, topic_arn)
+ zone_conn.delete_topic(topic_arn)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # verify topics deleted in all zones
+ for conn in zonegroup_conns.zones:
+ topic_list = conn.list_topics()
+ assert_equal(len(topic_list), 0)
return False
return True
+ def create_topic(self, topicname, attributes):
+ result = self.sns_client.create_topic(Name=topicname, Attributes=attributes)
+ self.topic_arn = result['TopicArn']
+ return self.topic_arn
+
+ def delete_topic(self, topic_arn):
+ return self.sns_client.delete_topic(TopicArn=topic_arn)
+
+ def get_topic(self, topic_arn):
+ return self.sns_client.get_topic_attributes(TopicArn=topic_arn)
+
+ def list_topics(self):
+ return self.sns_client.list_topics()['Topics']
+
+ def create_notification(self, bucket_name, topic_conf_list):
+ return self.s3_client.put_bucket_notification_configuration(
+ Bucket=bucket_name, NotificationConfiguration={'TopicConfigurations': topic_conf_list})
+
+ def delete_notifications(self, bucket_name):
+ return self.s3_client.put_bucket_notification_configuration(Bucket=bucket_name,
+ NotificationConfiguration={})
+
+ def list_notifications(self, bucket_name):
+ out = self.s3_client.get_bucket_notification_configuration(Bucket=bucket_name)
+ if 'TopicConfigurations' in out:
+ return out['TopicConfigurations']
+ return []
+
def get_conn(self, credentials):
return self.Conn(self, credentials)