From: Yehuda Sadeh Date: Mon, 12 Mar 2018 21:34:05 +0000 (-0700) Subject: test/rgw: initial work on cloud sync test X-Git-Tag: v13.1.0~270^2~16 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=340005c1f1a14fef8d23069a908080afe59991b3;p=ceph-ci.git test/rgw: initial work on cloud sync test Signed-off-by: Yehuda Sadeh --- diff --git a/src/test/rgw/rgw_multi/zone_cloud.py b/src/test/rgw/rgw_multi/zone_cloud.py new file mode 100644 index 00000000000..9065a9fcf82 --- /dev/null +++ b/src/test/rgw/rgw_multi/zone_cloud.py @@ -0,0 +1,307 @@ +import json +import requests.compat +import logging + +import boto +import boto.s3.connection + +import dateutil.parser + +import re + +from nose.tools import eq_ as eq +try: + from itertools import izip_longest as zip_longest +except ImportError: + from itertools import zip_longest + +from urlparse import urlparse + +from .multisite import * +from .tools import * + +log = logging.getLogger(__name__) + +def get_key_ver(k): + if not k.version_id: + return 'null' + return k.version_id + +def check_object_eq(k1, k2, check_extra = True): + assert k1 + assert k2 + log.debug('comparing key name=%s', k1.name) + eq(k1.name, k2.name) + eq(k1.metadata, k2.metadata) + # eq(k1.cache_control, k2.cache_control) + eq(k1.content_type, k2.content_type) + # eq(k1.content_encoding, k2.content_encoding) + # eq(k1.content_disposition, k2.content_disposition) + # eq(k1.content_language, k2.content_language) + eq(k1.etag, k2.etag) + # mtime1 = dateutil.parser.parse(k1.last_modified) + # mtime2 = dateutil.parser.parse(k2.last_modified) + # assert abs((mtime1 - mtime2).total_seconds()) < 1 # handle different time resolution + # if check_extra: + # eq(k1.owner.id, k2.owner.id) + # eq(k1.owner.display_name, k2.owner.display_name) + # eq(k1.storage_class, k2.storage_class) + eq(k1.size, k2.size) + eq(get_key_ver(k1), get_key_ver(k2)) + # eq(k1.encrypted, k2.encrypted) + +def make_request(conn, method, bucket, key, query_args, headers): + result = conn.make_request(method, bucket=bucket, key=key, query_args=query_args, headers=headers) + if result.status / 100 != 2: + raise boto.exception.S3ResponseError(result.status, result.reason, result.read()) + return result + +def append_query_arg(s, n, v): + if not v: + return s + nv = '{n}={v}'.format(n=n, v=v) + if not s: + return nv + return '{s}&{nv}'.format(s=s, nv=nv) + +class CloudList: + def __init__(self, conn, bucket_name, query, query_args = None, marker = None): + self.conn = conn + self.bucket_name = bucket_name or '' + if bucket_name: + self.bucket = boto.s3.bucket.Bucket(name=bucket_name) + else: + self.bucket = None + self.query = query + self.query_args = query_args + self.max_keys = None + self.marker = marker + + def raw_search(self): + q = self.query or '' + query_args = append_query_arg(self.query_args, 'query', requests.compat.quote_plus(q)) + if self.max_keys is not None: + query_args = append_query_arg(query_args, 'max-keys', self.max_keys) + if self.marker: + query_args = append_query_arg(query_args, 'marker', self.marker) + + query_args = append_query_arg(query_args, 'format', 'json') + + headers = {} + + result = make_request(self.conn, "GET", bucket=self.bucket_name, key='', query_args=query_args, headers=headers) + + l = [] + + result_dict = json.loads(result.read()) + + for entry in result_dict['Objects']: + bucket = self.conn.get_bucket(entry['Bucket'], validate = False) + k = boto.s3.key.Key(bucket, entry['Key']) + + k.version_id = entry['Instance'] + k.etag = entry['ETag'] + k.owner = boto.s3.user.User(id=entry['Owner']['ID'], display_name=entry['Owner']['DisplayName']) + k.last_modified = entry['LastModified'] + k.size = entry['Size'] + k.content_type = entry['ContentType'] + k.versioned_epoch = entry['VersionedEpoch'] + + k.metadata = {} + for e in entry['CustomMetadata']: + k.metadata[e['Name']] = str(e['Value']) # int values will return as int, cast to string for compatibility with object meta response + + l.append(k) + + return result_dict, l + + def search(self, drain = True, sort = True, sort_key = None): + l = [] + + is_done = False + + while not is_done: + result, result_keys = self.raw_search() + + l = l + result_keys + + is_done = not (drain and (result['IsTruncated'] == "true")) + marker = result['Marker'] + + if sort: + if not sort_key: + sort_key = lambda k: (k.name, -k.versioned_epoch) + l.sort(key = sort_key) + + return l + + +class CloudZoneBucket: + def __init__(self, zone_conn, target_path, name): + self.zone_conn = zone_conn + self.name = name + self.cloud_conn = zone_conn.zone.cloud_conn + + target_path = target_path[:] + if target_path[-1] != '/': + target_path += '/' + target_path = target_path.replace('${bucket}', name) + + tp = target_path.split('/', 1) + + if len(tp) == 1: + self.target_bucket = target_path + self.target_prefix = '' + else: + self.target_bucket = tp[0] + self.target_prefix = tp[1] + + log.debug('target_path=%s target_bucket=%s target_prefix=%s', target_path, self.target_bucket, self.target_prefix) + self.bucket = self.cloud_conn.get_bucket(self.target_bucket) + + def get_all_versions(self): + for o in self.bucket.get_all_keys(prefix=self.target_prefix): + new_name = o.name[len(self.target_prefix):] + log.debug('bucket=%s obj=%s new_name=', self.bucket.name, new_name) + o.name = new_name + yield o + + +def parse_endpoint(endpoint): + o = urlparse(endpoint) + + netloc = o.netloc.split(':') + + host = netloc[0] + + if len(netloc) > 1: + port = int(netloc[1]) + else: + port = o.port + + is_secure = False + + if o.scheme == 'https': + is_secure = True + + if not port: + if is_secure: + port = 443 + else: + port = 80 + + return host, port, is_secure + + +class CloudZone(Zone): + def __init__(self, name, cloud_endpoint, credentials, source_bucket, target_path, + zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None): + self.cloud_endpoint = cloud_endpoint + self.credentials = credentials + self.source_bucket = source_bucket + self.target_path = target_path + + self.target_path = self.target_path.replace('${zone}', name) + # self.target_path = self.target_path.replace('${zone_id}', zone_id) + self.target_path = self.target_path.replace('${zonegroup}', zonegroup.name) + self.target_path = self.target_path.replace('${zonegroup_id}', zonegroup.id) + + log.debug('target_path=%s', self.target_path) + + host, port, is_secure = parse_endpoint(cloud_endpoint) + + self.cloud_conn = boto.connect_s3( + aws_access_key_id = credentials.access_key, + aws_secret_access_key = credentials.secret, + host = host, + port = port, + is_secure = is_secure, + calling_format = boto.s3.connection.OrdinaryCallingFormat()) + super(CloudZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways) + + + def is_read_only(self): + return True + + def tier_type(self): + return "aws" + + def create(self, cluster, args = None, check_retcode = True): + """ create the object with the given arguments """ + + if args is None: + args = '' + + tier_config = ','.join([ 'connection.endpoint=' + self.cloud_endpoint, + 'connection.access_key=' + self.credentials.access_key, + 'connection.secret=' + self.credentials.secret, + 'target_path=' + re.escape(self.target_path)]) + + args += [ '--tier-type', self.tier_type(), '--tier-config', tier_config ] + + return self.json_command(cluster, 'create', args, check_retcode=check_retcode) + + def has_buckets(self): + return False + + class Conn(ZoneConn): + def __init__(self, zone, credentials): + super(CloudZone.Conn, self).__init__(zone, credentials) + + def get_bucket(self, bucket_name): + return CloudZoneBucket(self, self.zone.target_path, bucket_name) + + def create_bucket(self, name): + # should not be here, a bug in the test suite + log.critical('Conn.create_bucket() should not be called in cloud zone') + assert False + + def check_bucket_eq(self, zone_conn, bucket_name): + assert(zone_conn.zone.tier_type() == "rados") + + log.info('comparing bucket=%s zones={%s, %s}', bucket_name, self.name, self.name) + b1 = self.get_bucket(bucket_name) + b2 = zone_conn.get_bucket(bucket_name) + + log.debug('bucket1 objects:') + for o in b1.get_all_versions(): + log.debug('o=%s', o.name) + log.debug('bucket2 objects:') + for o in b2.get_all_versions(): + log.debug('o=%s', o.name) + + for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()): + if k1 is None: + log.critical('key=%s is missing from zone=%s', k2.name, self.name) + assert False + if k2 is None: + log.critical('key=%s is missing from zone=%s', k1.name, zone_conn.name) + assert False + + check_object_eq(k1, k2) + + + log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name) + + return True + + def get_conn(self, credentials): + return self.Conn(self, credentials) + + +class CloudZoneConfig: + def __init__(self, cfg, section): + self.endpoint = cfg.get(section, 'endpoint') + access_key = cfg.get(section, 'access_key') + secret = cfg.get(section, 'secret') + self.credentials = Credentials(access_key, secret) + try: + self.target_path = cfg.get(section, 'target_path') + except: + self.target_path = 'rgw-${zonegroup_id}/${bucket}' + + try: + self.source_bucket = cfg.get(section, 'source_bucket') + except: + self.source_bucket = '*' + diff --git a/src/test/rgw/rgw_multi/zone_es.py b/src/test/rgw/rgw_multi/zone_es.py index dccab4093d1..55edae72920 100644 --- a/src/test/rgw/rgw_multi/zone_es.py +++ b/src/test/rgw/rgw_multi/zone_es.py @@ -254,3 +254,7 @@ class ESZone(Zone): return self.Conn(self, credentials) +class ESZoneConfig: + def __init__(self, cfg, section): + self.endpoint = cfg.get(section, 'endpoint') + diff --git a/src/test/rgw/test_multi.py b/src/test/rgw/test_multi.py index bd42fe59064..d6a72258185 100644 --- a/src/test/rgw/test_multi.py +++ b/src/test/rgw/test_multi.py @@ -15,6 +15,9 @@ import nose.core from rgw_multi import multisite from rgw_multi.zone_rados import RadosZone as RadosZone from rgw_multi.zone_es import ESZone as ESZone +from rgw_multi.zone_es import ESZoneConfig as ESZoneConfig +from rgw_multi.zone_cloud import CloudZone as CloudZone +from rgw_multi.zone_cloud import CloudZoneConfig as CloudZoneConfig # make tests from rgw_multi.tests available to nose from rgw_multi.tests import * @@ -156,6 +159,7 @@ def init(parse_args): 'num_zonegroups': 1, 'num_zones': 3, 'num_es_zones': 0, + 'num_cloud_zones': 0, 'gateways_per_zone': 2, 'no_bootstrap': 'false', 'log_level': 20, @@ -165,7 +169,6 @@ def init(parse_args): 'checkpoint_retries': 60, 'checkpoint_delay': 5, 'reconfigure_delay': 5, - 'es_endpoint': None, }) try: path = os.environ['RGW_MULTI_TEST_CONF'] @@ -186,7 +189,6 @@ def init(parse_args): section = 'DEFAULT' parser.add_argument('--num-zonegroups', type=int, default=cfg.getint(section, 'num_zonegroups')) parser.add_argument('--num-zones', type=int, default=cfg.getint(section, 'num_zones')) - parser.add_argument('--num-es-zones', type=int, default=cfg.getint(section, 'num_es_zones')) parser.add_argument('--gateways-per-zone', type=int, default=cfg.getint(section, 'gateways_per_zone')) parser.add_argument('--no-bootstrap', action='store_true', default=cfg.getboolean(section, 'no_bootstrap')) parser.add_argument('--log-level', type=int, default=cfg.getint(section, 'log_level')) @@ -196,7 +198,15 @@ def init(parse_args): parser.add_argument('--checkpoint-retries', type=int, default=cfg.getint(section, 'checkpoint_retries')) parser.add_argument('--checkpoint-delay', type=int, default=cfg.getint(section, 'checkpoint_delay')) parser.add_argument('--reconfigure-delay', type=int, default=cfg.getint(section, 'reconfigure_delay')) - parser.add_argument('--es-endpoint', type=str, default=cfg.get(section, 'es_endpoint')) + + es_cfg = [] + cloud_cfg = [] + + for s in cfg.sections(): + if s.startswith('elasticsearch'): + es_cfg.append(ESZoneConfig(cfg, s)) + elif s.startswith('cloud'): + cloud_cfg.append(CloudZoneConfig(cfg, s)) argv = [] @@ -206,9 +216,6 @@ def init(parse_args): args = parser.parse_args(argv) bootstrap = not args.no_bootstrap - # if num_es_zones is defined, need to have es_endpoint defined too - assert(args.num_es_zones == 0 or args.es_endpoint) - setup_logging(args.log_level, args.log_file, args.file_log_level) # start first cluster @@ -233,7 +240,10 @@ def init(parse_args): period = multisite.Period(realm=realm) realm.current_period = period - num_zones = args.num_zones + args.num_es_zones + num_es_zones = len(es_cfg) + num_cloud_zones = len(cloud_cfg) + + num_zones = args.num_zones + num_es_zones + num_cloud_zones for zg in range(0, args.num_zonegroups): zonegroup = multisite.ZoneGroup(zonegroup_name(zg), period) @@ -271,12 +281,19 @@ def init(parse_args): else: zonegroup.get(cluster) - es_zone = (z >= args.num_zones) + es_zone = (z >= args.num_zones and z < args.num_zones + num_es_zones) + cloud_zone = (z >= args.num_zones + num_es_zones) # create the zone in its zonegroup zone = multisite.Zone(zone_name(zg, z), zonegroup, cluster) if es_zone: - zone = ESZone(zone_name(zg, z), args.es_endpoint, zonegroup, cluster) + zone_index = z - args.num_zones + zone = ESZone(zone_name(zg, z), es_cfg[zone_index].endpoint, zonegroup, cluster) + elif cloud_zone: + zone_index = z - args.num_zones - num_es_zones + ccfg = cloud_cfg[zone_index] + zone = CloudZone(zone_name(zg, z), ccfg.endpoint, ccfg.credentials, ccfg.source_bucket, + ccfg.target_path, zonegroup, cluster) else: zone = RadosZone(zone_name(zg, z), zonegroup, cluster)