]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/rgw: initial work on cloud sync test
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 12 Mar 2018 21:34:05 +0000 (14:34 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Thu, 12 Apr 2018 22:38:39 +0000 (15:38 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/test/rgw/rgw_multi/zone_cloud.py [new file with mode: 0644]
src/test/rgw/rgw_multi/zone_es.py
src/test/rgw/test_multi.py

diff --git a/src/test/rgw/rgw_multi/zone_cloud.py b/src/test/rgw/rgw_multi/zone_cloud.py
new file mode 100644 (file)
index 0000000..9065a9f
--- /dev/null
@@ -0,0 +1,307 @@
+import json
+import requests.compat
+import logging
+
+import boto
+import boto.s3.connection
+
+import dateutil.parser
+
+import re
+
+from nose.tools import eq_ as eq
+try:
+    from itertools import izip_longest as zip_longest
+except ImportError:
+    from itertools import zip_longest
+
+from urlparse import urlparse
+
+from .multisite import *
+from .tools import *
+
+log = logging.getLogger(__name__)
+
+def get_key_ver(k):
+    if not k.version_id:
+        return 'null'
+    return k.version_id
+
+def check_object_eq(k1, k2, check_extra = True):
+    assert k1
+    assert k2
+    log.debug('comparing key name=%s', k1.name)
+    eq(k1.name, k2.name)
+    eq(k1.metadata, k2.metadata)
+    # eq(k1.cache_control, k2.cache_control)
+    eq(k1.content_type, k2.content_type)
+    # eq(k1.content_encoding, k2.content_encoding)
+    # eq(k1.content_disposition, k2.content_disposition)
+    # eq(k1.content_language, k2.content_language)
+    eq(k1.etag, k2.etag)
+    # mtime1 = dateutil.parser.parse(k1.last_modified)
+    # mtime2 = dateutil.parser.parse(k2.last_modified)
+    # assert abs((mtime1 - mtime2).total_seconds()) < 1 # handle different time resolution
+    # if check_extra:
+        # eq(k1.owner.id, k2.owner.id)
+        # eq(k1.owner.display_name, k2.owner.display_name)
+    # eq(k1.storage_class, k2.storage_class)
+    eq(k1.size, k2.size)
+    eq(get_key_ver(k1), get_key_ver(k2))
+    # eq(k1.encrypted, k2.encrypted)
+
+def make_request(conn, method, bucket, key, query_args, headers):
+    result = conn.make_request(method, bucket=bucket, key=key, query_args=query_args, headers=headers)
+    if result.status / 100 != 2:
+        raise boto.exception.S3ResponseError(result.status, result.reason, result.read())
+    return result
+
+def append_query_arg(s, n, v):
+    if not v:
+        return s
+    nv = '{n}={v}'.format(n=n, v=v)
+    if not s:
+        return nv
+    return '{s}&{nv}'.format(s=s, nv=nv)
+
+class CloudList:
+    def __init__(self, conn, bucket_name, query, query_args = None, marker = None):
+        self.conn = conn
+        self.bucket_name = bucket_name or ''
+        if bucket_name:
+            self.bucket = boto.s3.bucket.Bucket(name=bucket_name)
+        else:
+            self.bucket = None
+        self.query = query
+        self.query_args = query_args
+        self.max_keys = None
+        self.marker = marker
+
+    def raw_search(self):
+        q = self.query or ''
+        query_args = append_query_arg(self.query_args, 'query', requests.compat.quote_plus(q))
+        if self.max_keys is not None:
+            query_args = append_query_arg(query_args, 'max-keys', self.max_keys)
+        if self.marker:
+            query_args = append_query_arg(query_args, 'marker', self.marker)
+
+        query_args = append_query_arg(query_args, 'format', 'json')
+
+        headers = {}
+
+        result = make_request(self.conn, "GET", bucket=self.bucket_name, key='', query_args=query_args, headers=headers)
+
+        l = []
+
+        result_dict = json.loads(result.read())
+
+        for entry in result_dict['Objects']:
+            bucket = self.conn.get_bucket(entry['Bucket'], validate = False)
+            k = boto.s3.key.Key(bucket, entry['Key'])
+
+            k.version_id = entry['Instance']
+            k.etag = entry['ETag']
+            k.owner = boto.s3.user.User(id=entry['Owner']['ID'], display_name=entry['Owner']['DisplayName'])
+            k.last_modified = entry['LastModified']
+            k.size = entry['Size']
+            k.content_type = entry['ContentType']
+            k.versioned_epoch = entry['VersionedEpoch']
+
+            k.metadata = {}
+            for e in entry['CustomMetadata']:
+                k.metadata[e['Name']] = str(e['Value']) # int values will return as int, cast to string for compatibility with object meta response
+
+            l.append(k)
+
+        return result_dict, l
+
+    def search(self, drain = True, sort = True, sort_key = None):
+        l = []
+
+        is_done = False
+
+        while not is_done:
+            result, result_keys = self.raw_search()
+
+            l = l + result_keys
+
+            is_done = not (drain and (result['IsTruncated'] == "true"))
+            marker = result['Marker']
+
+        if sort:
+            if not sort_key:
+                sort_key = lambda k: (k.name, -k.versioned_epoch)
+            l.sort(key = sort_key)
+
+        return l
+
+
+class CloudZoneBucket:
+    def __init__(self, zone_conn, target_path, name):
+        self.zone_conn = zone_conn
+        self.name = name
+        self.cloud_conn = zone_conn.zone.cloud_conn
+
+        target_path = target_path[:]
+        if target_path[-1] != '/':
+            target_path += '/'
+        target_path = target_path.replace('${bucket}', name)
+
+        tp = target_path.split('/', 1)
+
+        if len(tp) == 1:
+            self.target_bucket = target_path
+            self.target_prefix = ''
+        else:
+            self.target_bucket = tp[0]
+            self.target_prefix = tp[1]
+
+        log.debug('target_path=%s target_bucket=%s target_prefix=%s', target_path, self.target_bucket, self.target_prefix)
+        self.bucket = self.cloud_conn.get_bucket(self.target_bucket)
+
+    def get_all_versions(self):
+        for o in self.bucket.get_all_keys(prefix=self.target_prefix):
+            new_name = o.name[len(self.target_prefix):]
+            log.debug('bucket=%s obj=%s new_name=', self.bucket.name, new_name)
+            o.name = new_name
+            yield o
+
+
+def parse_endpoint(endpoint):
+    o = urlparse(endpoint)
+
+    netloc = o.netloc.split(':')
+    
+    host = netloc[0]
+
+    if len(netloc) > 1:
+        port = int(netloc[1])
+    else:
+        port = o.port
+
+    is_secure = False
+
+    if o.scheme == 'https':
+        is_secure = True
+
+    if not port:
+        if is_secure:
+            port = 443
+        else:
+            port = 80
+
+    return host, port, is_secure
+
+
+class CloudZone(Zone):
+    def __init__(self, name, cloud_endpoint, credentials, source_bucket, target_path,
+            zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None):
+        self.cloud_endpoint = cloud_endpoint
+        self.credentials = credentials
+        self.source_bucket = source_bucket
+        self.target_path = target_path
+
+        self.target_path = self.target_path.replace('${zone}', name)
+        # self.target_path = self.target_path.replace('${zone_id}', zone_id)
+        self.target_path = self.target_path.replace('${zonegroup}', zonegroup.name)
+        self.target_path = self.target_path.replace('${zonegroup_id}', zonegroup.id)
+
+        log.debug('target_path=%s', self.target_path)
+
+        host, port, is_secure = parse_endpoint(cloud_endpoint)
+
+        self.cloud_conn = boto.connect_s3(
+                aws_access_key_id = credentials.access_key,
+                aws_secret_access_key = credentials.secret,
+                host = host,
+                port = port,
+                is_secure = is_secure,
+                calling_format = boto.s3.connection.OrdinaryCallingFormat())
+        super(CloudZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways)
+
+
+    def is_read_only(self):
+        return True
+
+    def tier_type(self):
+        return "aws"
+
+    def create(self, cluster, args = None, check_retcode = True):
+        """ create the object with the given arguments """
+
+        if args is None:
+            args = ''
+
+        tier_config = ','.join([ 'connection.endpoint=' + self.cloud_endpoint,
+                                 'connection.access_key=' + self.credentials.access_key,
+                                 'connection.secret=' + self.credentials.secret,
+                                 'target_path=' + re.escape(self.target_path)])
+
+        args += [ '--tier-type', self.tier_type(), '--tier-config', tier_config ] 
+
+        return self.json_command(cluster, 'create', args, check_retcode=check_retcode)
+
+    def has_buckets(self):
+        return False
+
+    class Conn(ZoneConn):
+        def __init__(self, zone, credentials):
+            super(CloudZone.Conn, self).__init__(zone, credentials)
+
+        def get_bucket(self, bucket_name):
+            return CloudZoneBucket(self, self.zone.target_path, bucket_name)
+
+        def create_bucket(self, name):
+            # should not be here, a bug in the test suite
+            log.critical('Conn.create_bucket() should not be called in cloud zone')
+            assert False
+
+        def check_bucket_eq(self, zone_conn, bucket_name):
+            assert(zone_conn.zone.tier_type() == "rados")
+
+            log.info('comparing bucket=%s zones={%s, %s}', bucket_name, self.name, self.name)
+            b1 = self.get_bucket(bucket_name)
+            b2 = zone_conn.get_bucket(bucket_name)
+
+            log.debug('bucket1 objects:')
+            for o in b1.get_all_versions():
+                log.debug('o=%s', o.name)
+            log.debug('bucket2 objects:')
+            for o in b2.get_all_versions():
+                log.debug('o=%s', o.name)
+
+            for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()):
+                if k1 is None:
+                    log.critical('key=%s is missing from zone=%s', k2.name, self.name)
+                    assert False
+                if k2 is None:
+                    log.critical('key=%s is missing from zone=%s', k1.name, zone_conn.name)
+                    assert False
+
+                check_object_eq(k1, k2)
+
+
+            log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name)
+
+            return True
+
+    def get_conn(self, credentials):
+        return self.Conn(self, credentials)
+
+
+class CloudZoneConfig:
+    def __init__(self, cfg, section):
+        self.endpoint = cfg.get(section, 'endpoint')
+        access_key = cfg.get(section, 'access_key')
+        secret = cfg.get(section, 'secret')
+        self.credentials = Credentials(access_key, secret)
+        try:
+            self.target_path = cfg.get(section, 'target_path')
+        except:
+            self.target_path = 'rgw-${zonegroup_id}/${bucket}'
+
+        try:
+            self.source_bucket = cfg.get(section, 'source_bucket')
+        except:
+            self.source_bucket = '*'
+
index dccab4093d152835e8aecc0708a58dba280ec7d6..55edae7292017abf84300ad8b0a9cfae0b5ae7dc 100644 (file)
@@ -254,3 +254,7 @@ class ESZone(Zone):
         return self.Conn(self, credentials)
 
 
+class ESZoneConfig:
+    def __init__(self, cfg, section):
+        self.endpoint = cfg.get(section, 'endpoint')
+
index bd42fe590649734aaf08508b9ad1e729e4251839..d6a7225818535c2673b8503c1c264fb1ddd91e7c 100644 (file)
@@ -15,6 +15,9 @@ import nose.core
 from rgw_multi import multisite
 from rgw_multi.zone_rados import RadosZone as RadosZone
 from rgw_multi.zone_es  import ESZone as ESZone
+from rgw_multi.zone_es  import ESZoneConfig as ESZoneConfig
+from rgw_multi.zone_cloud  import CloudZone as CloudZone
+from rgw_multi.zone_cloud  import CloudZoneConfig as CloudZoneConfig
 
 # make tests from rgw_multi.tests available to nose
 from rgw_multi.tests import *
@@ -156,6 +159,7 @@ def init(parse_args):
                                          'num_zonegroups': 1,
                                          'num_zones': 3,
                                          'num_es_zones': 0,
+                                         'num_cloud_zones': 0,
                                          'gateways_per_zone': 2,
                                          'no_bootstrap': 'false',
                                          'log_level': 20,
@@ -165,7 +169,6 @@ def init(parse_args):
                                          'checkpoint_retries': 60,
                                          'checkpoint_delay': 5,
                                          'reconfigure_delay': 5,
-                                         'es_endpoint': None,
                                          })
     try:
         path = os.environ['RGW_MULTI_TEST_CONF']
@@ -186,7 +189,6 @@ def init(parse_args):
     section = 'DEFAULT'
     parser.add_argument('--num-zonegroups', type=int, default=cfg.getint(section, 'num_zonegroups'))
     parser.add_argument('--num-zones', type=int, default=cfg.getint(section, 'num_zones'))
-    parser.add_argument('--num-es-zones', type=int, default=cfg.getint(section, 'num_es_zones'))
     parser.add_argument('--gateways-per-zone', type=int, default=cfg.getint(section, 'gateways_per_zone'))
     parser.add_argument('--no-bootstrap', action='store_true', default=cfg.getboolean(section, 'no_bootstrap'))
     parser.add_argument('--log-level', type=int, default=cfg.getint(section, 'log_level'))
@@ -196,7 +198,15 @@ def init(parse_args):
     parser.add_argument('--checkpoint-retries', type=int, default=cfg.getint(section, 'checkpoint_retries'))
     parser.add_argument('--checkpoint-delay', type=int, default=cfg.getint(section, 'checkpoint_delay'))
     parser.add_argument('--reconfigure-delay', type=int, default=cfg.getint(section, 'reconfigure_delay'))
-    parser.add_argument('--es-endpoint', type=str, default=cfg.get(section, 'es_endpoint'))
+
+    es_cfg = []
+    cloud_cfg = []
+
+    for s in cfg.sections():
+        if s.startswith('elasticsearch'):
+            es_cfg.append(ESZoneConfig(cfg, s))
+        elif s.startswith('cloud'):
+            cloud_cfg.append(CloudZoneConfig(cfg, s))
 
     argv = []
 
@@ -206,9 +216,6 @@ def init(parse_args):
     args = parser.parse_args(argv)
     bootstrap = not args.no_bootstrap
 
-    # if num_es_zones is defined, need to have es_endpoint defined too
-    assert(args.num_es_zones == 0 or args.es_endpoint)
-
     setup_logging(args.log_level, args.log_file, args.file_log_level)
 
     # start first cluster
@@ -233,7 +240,10 @@ def init(parse_args):
     period = multisite.Period(realm=realm)
     realm.current_period = period
 
-    num_zones = args.num_zones + args.num_es_zones
+    num_es_zones = len(es_cfg)
+    num_cloud_zones = len(cloud_cfg)
+
+    num_zones = args.num_zones + num_es_zones + num_cloud_zones
 
     for zg in range(0, args.num_zonegroups):
         zonegroup = multisite.ZoneGroup(zonegroup_name(zg), period)
@@ -271,12 +281,19 @@ def init(parse_args):
                 else:
                     zonegroup.get(cluster)
 
-            es_zone = (z >= args.num_zones)
+            es_zone = (z >= args.num_zones and z < args.num_zones + num_es_zones)
+            cloud_zone = (z >= args.num_zones + num_es_zones)
 
             # create the zone in its zonegroup
             zone = multisite.Zone(zone_name(zg, z), zonegroup, cluster)
             if es_zone:
-                zone = ESZone(zone_name(zg, z), args.es_endpoint, zonegroup, cluster)
+                zone_index = z - args.num_zones
+                zone = ESZone(zone_name(zg, z), es_cfg[zone_index].endpoint, zonegroup, cluster)
+            elif cloud_zone:
+                zone_index = z - args.num_zones - num_es_zones
+                ccfg = cloud_cfg[zone_index]
+                zone = CloudZone(zone_name(zg, z), ccfg.endpoint, ccfg.credentials, ccfg.source_bucket,
+                                 ccfg.target_path, zonegroup, cluster)
             else:
                 zone = RadosZone(zone_name(zg, z), zonegroup, cluster)