self.secret = secret
class RGWCluster:
- def __init__(self, cluster_num, port, num_gateways):
+ def __init__(self, zg_num, cluster_num, cluster_id, port, num_gateways):
+ self.zg_num = zg_num
self.cluster_num = cluster_num
- self.cluster_id = 'c' + str(cluster_num)
+ self.cluster_id = cluster_id
self.port = port
self.num_gateways = num_gateways
self.needs_reset = True
mstop(self.cluster_id, 'radosgw')
def rgw_admin(self, cmd, check_retcode = True):
- (s, retcode) = bash(tpath('test-rgw-call.sh', 'call_rgw_admin', self.cluster_num, cmd), check_retcode)
+ (s, retcode) = bash(tpath('test-rgw-call.sh', 'call_rgw_admin', self.zg_num, self.cluster_num, cmd), check_retcode)
return (s, retcode)
def rgw_admin_ro(self, cmd, check_retcode = True):
- (s, retcode) = bash(tpath('test-rgw-call.sh', 'call_rgw_admin', self.cluster_num, '--rgw-cache-enabled=false ' + cmd), check_retcode)
+ (s, retcode) = bash(tpath('test-rgw-call.sh', 'call_rgw_admin', self.zg_num, self.cluster_num, '--rgw-cache-enabled=false ' + cmd), check_retcode)
return (s, retcode)
class RGWZone:
- def __init__(self, realm, cluster, zg, zone_name):
- self.realm = realm
+ def __init__(self, realm_name, cluster, zg_name, zone_name):
+ self.realm_name = realm_name
self.cluster = cluster
- self.zg = zg
+ self.zg_name = zg_name
self.zone_name = zone_name
self.connection = None
calling_format = boto.s3.connection.OrdinaryCallingFormat())
return self.connection
-class RGWRealm:
- def __init__(self, realm, credentials, clusters):
- self.realm = realm
+class RGWZonegroup:
+ def __init__(self, realm_name, credentials, clusters):
+ self.realm_name = realm_name
self.credentials = credentials
self.clusters = clusters
self.zones = {}
- def init_zone(self, cluster, zg, zone_name, first_zone_port):
+ def init_zone(self, cluster, zg_name, zone_name, first_zone_port, master_zg_first_zone_port):
+ self.is_master_zg = (first_zone_port == master_zg_first_zone_port)
is_master = (first_zone_port == cluster.port)
endpoints = ",".join(map(lambda x: "http://localhost:" + str(cluster.port + x), range(cluster.num_gateways)))
if is_master:
- bash(tpath('test-rgw-call.sh', 'init_first_zone', cluster.cluster_num,
- self.realm, zg, zone_name, endpoints,
+ if self.is_master_zg:
+ bash(tpath('test-rgw-call.sh', 'init_first_zone', cluster.zg_num, cluster.cluster_num,
+ self.realm_name, zg_name, zone_name, endpoints,
+ self.credentials.access_key, self.credentials.secret))
+ else:
+ bash(tpath('test-rgw-call.sh', 'init_first_zone_in_slave_zg', cluster.zg_num, cluster.cluster_num,
+ self.realm_name, zg_name, zone_name, master_zg_first_zone_port, endpoints,
self.credentials.access_key, self.credentials.secret))
else:
- bash(tpath('test-rgw-call.sh', 'init_zone_in_existing_zg', cluster.cluster_num,
- self.realm, zg, zone_name, first_zone_port, endpoints,
+ bash(tpath('test-rgw-call.sh', 'init_zone_in_existing_zg', cluster.zg_num, cluster.cluster_num,
+ self.realm_name, zg_name, zone_name, master_zg_first_zone_port, endpoints,
self.credentials.access_key, self.credentials.secret))
- self.add_zone(cluster, zg, zone_name, is_master)
+ self.add_zone(cluster, zg_name, zone_name, is_master)
cluster.start_rgw()
- def add_zone(self, cluster, zg, zone_name, is_master):
- zone = RGWZone(self.realm, cluster, zg, zone_name)
+ def add_zone(self, cluster, zg_name, zone_name, is_master):
+ zone = RGWZone(self.realm_name, cluster, zg_name, zone_name)
self.zones[zone_name] = zone
if is_master:
self.master_zone = zone
+ if self.is_master_zg:
+ realm.master_zone = zone
def remove_zone(self, zone_name):
del self.zones[zone_name]
yield zone
def meta_sync_status(self, zone):
- if zone.zone_name == self.master_zone.zone_name:
+ if zone.zone_name == realm.master_zone.zone_name:
return None
while True:
- (meta_sync_status_json, retcode) = zone.cluster.rgw_admin_ro('--rgw-realm=' + self.realm + ' metadata sync status', check_retcode = False)
+ (meta_sync_status_json, retcode) = zone.cluster.rgw_admin_ro('--rgw-realm=' + self.realm_name + ' metadata sync status', check_retcode = False)
if retcode == 0:
break
return (num_shards, markers)
def meta_master_log_status(self, master_zone):
- (mdlog_status_json, retcode) = master_zone.cluster.rgw_admin_ro('--rgw-realm=' + self.realm + ' mdlog status')
+ (mdlog_status_json, retcode) = master_zone.cluster.rgw_admin_ro('--rgw-realm=' + self.realm_name + ' mdlog status')
mdlog_status = json.loads(mdlog_status_json.decode('utf-8'))
markers={}
return True
def zone_meta_checkpoint(self, zone):
- if zone.zone_name == self.master_zone.zone_name:
+ if zone.zone_name == realm.master_zone.zone_name:
return
log(10, 'starting meta checkpoint for zone=', zone.zone_name)
while True:
- log_status = self.meta_master_log_status(self.master_zone)
+ log_status = self.meta_master_log_status(realm.master_zone)
(num_shards, sync_status) = self.meta_sync_status(zone)
log(20, 'log_status=', log_status)
return None
while True:
- (data_sync_status_json, retcode) = target_zone.cluster.rgw_admin_ro('--rgw-realm=' + self.realm + ' data sync status --source-zone=' + source_zone.zone_name, check_retcode = False)
+ (data_sync_status_json, retcode) = target_zone.cluster.rgw_admin_ro('--rgw-realm=' + self.realm_name + ' data sync status --source-zone=' + source_zone.zone_name, check_retcode = False)
if retcode == 0:
break
if target_zone.zone_name == source_zone.zone_name:
return None
- cmd = '--rgw-realm=' + self.realm + ' bucket sync status --source-zone=' + source_zone.zone_name + ' --bucket=' + bucket_name
+ cmd = '--rgw-realm=' + self.realm_name + ' bucket sync status --source-zone=' + source_zone.zone_name + ' --bucket=' + bucket_name
global user
if user.tenant is not None:
cmd += ' --tenant=' + user.tenant + ' --uid=' + user.uid
def data_source_log_status(self, source_zone):
source_cluster = source_zone.cluster
- (datalog_status_json, retcode) = source_cluster.rgw_admin_ro('--rgw-realm=' + self.realm + ' datalog status')
+ (datalog_status_json, retcode) = source_cluster.rgw_admin_ro('--rgw-realm=' + self.realm_name + ' datalog status')
datalog_status = json.loads(datalog_status_json.decode('utf-8'))
markers={}
return markers
def bucket_source_log_status(self, source_zone, bucket_name):
- cmd = '--rgw-realm=' + self.realm + ' bilog status --bucket=' + bucket_name
+ cmd = '--rgw-realm=' + self.realm_name + ' bilog status --bucket=' + bucket_name
global user
if user.tenant is not None:
cmd += ' --tenant=' + user.tenant + ' --uid=' + user.uid
'--access-key', user.access_key, '--secret', user.secret)
if user.tenant is not None:
cmd += ' --tenant ' + user.tenant
- self.master_zone.cluster.rgw_admin('--rgw-realm=' + self.realm + ' user create ' + cmd)
+ self.master_zone.cluster.rgw_admin('--rgw-realm=' + self.realm_name + ' user create ' + cmd)
if wait_meta:
self.meta_checkpoint()
def set_master_zone(self, zone):
- (zg_json, retcode) = zone.cluster.rgw_admin('--rgw-realm=' + self.realm + ' --rgw-zonegroup=' + zone.zg + ' --rgw-zone=' + zone.zone_name + ' zone modify --master=1')
- (period_json, retcode) = zone.cluster.rgw_admin('--rgw-realm=' + self.realm + ' period update --commit')
+ (zg_json, retcode) = zone.cluster.rgw_admin('--rgw-realm=' + self.realm_name + ' --rgw-zonegroup=' + zone.zg_name + ' --rgw-zone=' + zone.zone_name + ' zone modify --master=1')
+ (period_json, retcode) = zone.cluster.rgw_admin('--rgw-realm=' + self.realm_name + ' period update --commit')
self.master_zone = zone
+ if self.is_master_zg:
+ realm.master_zone = zone
+
+class RGWRealm:
+ def __init__(self, realm_name, credentials):
+ self.realm_name = realm_name
+ self.credentials = credentials
+ self.zonegroups = {}
+ self.zones = {}
+
+ def get_zone(self, zone_name):
+ return self.zones[zone_name]
+
+ def get_zones(self):
+ for (k, zone) in self.zones.iteritems():
+ yield zone
+
+ def add_zonegroup(self, zg_name, zonegroup, is_master_zg):
+ self.zonegroups[zg_name] = zonegroup
+
+ for zone in zonegroup.get_zones():
+ self.zones[zone.zone_name] = zone
+
+ if is_master_zg:
+ self.master_zonegroup = zonegroup
+ def get_zonegroup(self, zonegroup_name):
+ return self.zonegroups[zonegroup_name]
+
+ def get_zonegroups(self):
+ for (k, zonegroup) in self.zonegroups.iteritems():
+ yield zonegroup
+
+ def meta_checkpoint(self):
+ log(5, 'meta checkpoint')
+ for zg in self.get_zonegroups():
+ zg.meta_checkpoint()
class RGWUser:
def __init__(self, uid, display_name, access_key, secret, tenant):
return run_prefix + '-' + str(num_buckets)
class RGWMulti:
- def __init__(self, num_clusters, gateways_per_cluster):
+ def __init__(self, zg_num, num_clusters, gateways_per_cluster, base_port, base_port_master_zg):
self.num_clusters = num_clusters
- self.base_port = 8000
+ self.zg_num = zg_num
+ self.zg_name = 'zg' + str(zg_num)
+
+ self.base_port = base_port
+ self.base_port_master_zg = base_port_master_zg
self.clusters = {}
for i in range(num_clusters):
- self.clusters[i] = RGWCluster(i + 1, self.base_port + i * gateways_per_cluster, gateways_per_cluster)
+ self.clusters[i] = RGWCluster(self.zg_num, i + 1, 'zg' + str(zg_num) + '-c' + str(i + 1), self.base_port + i * gateways_per_cluster, gateways_per_cluster)
def setup(self, bootstrap, tenant):
global realm
+ global master_zg
global realm_credentials
global user
- realm_credentials = RGWRealmCredentials(gen_access_key(), gen_secret())
- realm = RGWRealm('earth', realm_credentials, self.clusters)
+ is_master_zg = (self.base_port == self.base_port_master_zg)
+ if is_master_zg:
+ realm_credentials = RGWRealmCredentials(gen_access_key(), gen_secret())
+ realm = RGWRealm('earth', realm_credentials)
+ rgw_zg = RGWZonegroup('earth', realm_credentials, self.clusters)
if bootstrap:
log(1, 'bootstrapping clusters')
self.clusters[0].start()
- realm.init_zone(self.clusters[0], 'us', 'us-1', self.base_port)
+ rgw_zg.init_zone(self.clusters[0], self.zg_name, self.zg_name + '-1', self.base_port, self.base_port_master_zg)
for i in range(1, self.num_clusters):
self.clusters[i].start()
- realm.init_zone(self.clusters[i], 'us', 'us-' + str(i + 1), self.base_port)
+ rgw_zg.init_zone(self.clusters[i], self.zg_name, self.zg_name + '-' + str(i + 1), self.base_port, self.base_port_master_zg)
else:
for i in range(0, self.num_clusters):
- realm.add_zone(self.clusters[i], 'us', 'us-' + str(i + 1), (i == 0))
+ rgw_zg.add_zone(self.clusters[i], self.zg_name, self.zg_name + '-' + str(i + 1), (i == 0))
- realm.meta_checkpoint()
+ realm.add_zonegroup(self.zg_name, rgw_zg, is_master_zg)
+ rgw_zg.meta_checkpoint()
- user = RGWUser('tester', '"Test User"', gen_access_key(), gen_secret(), tenant)
- realm.create_user(user)
+ if is_master_zg:
+ master_zg = rgw_zg
+ user = RGWUser('tester', '"Test User"', gen_access_key(), gen_secret(), tenant)
+ master_zg.create_user(user)
def check_all_buckets_exist(zone, buckets):
conn = zone.get_connection(user)
return True
-def create_bucket_per_zone():
+def create_bucket_per_zone_in_master_zg():
buckets = []
zone_bucket = {}
- for zone in realm.get_zones():
+ for zone in master_zg.get_zones():
conn = zone.get_connection(user)
bucket_name = gen_bucket_name()
log(1, 'create bucket zone=', zone.zone_name, ' name=', bucket_name)
return buckets, zone_bucket
+def create_bucket_per_zone_in_realm():
+ buckets = []
+ zone_bucket = {}
+ for zone in realm.get_zones():
+ conn = zone.get_connection(user)
+ bucket_name = gen_bucket_name()
+ log(1, 'create bucket zone=', zone.zone_name, ' name=', bucket_name)
+ bucket = conn.create_bucket(bucket_name, None, zone.zg_name)
+ buckets.append(bucket_name)
+ zone_bucket[zone] = bucket
+
+ return buckets, zone_bucket
+
def test_bucket_create():
- buckets, _ = create_bucket_per_zone()
- realm.meta_checkpoint()
+ buckets, _ = create_bucket_per_zone_in_master_zg()
+ master_zg.meta_checkpoint()
- for zone in realm.get_zones():
+ for zone in master_zg.get_zones():
assert check_all_buckets_exist(zone, buckets)
def test_bucket_recreate():
- buckets, _ = create_bucket_per_zone()
- realm.meta_checkpoint()
+ buckets, _ = create_bucket_per_zone_in_master_zg()
+ master_zg.meta_checkpoint()
- for zone in realm.get_zones():
+ for zone in master_zg.get_zones():
assert check_all_buckets_exist(zone, buckets)
# recreate buckets on all zones, make sure they weren't removed
- for zone in realm.get_zones():
+ for zone in master_zg.get_zones():
for bucket_name in buckets:
conn = zone.get_connection(user)
bucket = conn.create_bucket(bucket_name)
- for zone in realm.get_zones():
+ for zone in master_zg.get_zones():
assert check_all_buckets_exist(zone, buckets)
- realm.meta_checkpoint()
+ master_zg.meta_checkpoint()
- for zone in realm.get_zones():
+ for zone in master_zg.get_zones():
assert check_all_buckets_exist(zone, buckets)
def test_bucket_remove():
- buckets, zone_bucket = create_bucket_per_zone()
- realm.meta_checkpoint()
+ buckets, zone_bucket = create_bucket_per_zone_in_master_zg()
+ master_zg.meta_checkpoint()
- for zone in realm.get_zones():
+ for zone in master_zg.get_zones():
assert check_all_buckets_exist(zone, buckets)
for zone, bucket_name in zone_bucket.items():
conn = zone.get_connection(user)
conn.delete_bucket(bucket_name)
- realm.meta_checkpoint()
+ master_zg.meta_checkpoint()
- for zone in realm.get_zones():
+ for zone in master_zg.get_zones():
assert check_all_buckets_dont_exist(zone, buckets)
def get_bucket(zone, bucket_name):
def test_object_sync():
- buckets, zone_bucket = create_bucket_per_zone()
+ buckets, zone_bucket = create_bucket_per_zone_in_master_zg()
all_zones = []
for z in zone_bucket:
k = new_key(zone, bucket_name, objname)
k.set_contents_from_string(content)
- realm.meta_checkpoint()
+ master_zg.meta_checkpoint()
for source_zone, bucket in zone_bucket.items():
for target_zone in all_zones:
if source_zone.zone_name == target_zone.zone_name:
continue
- realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
+ master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
check_bucket_eq(source_zone, target_zone, bucket)
def test_object_delete():
- buckets, zone_bucket = create_bucket_per_zone()
+ buckets, zone_bucket = create_bucket_per_zone_in_master_zg()
all_zones = []
for z in zone_bucket:
k = new_key(zone, bucket, objname)
k.set_contents_from_string(content)
- realm.meta_checkpoint()
+ master_zg.meta_checkpoint()
# check object exists
for source_zone, bucket in zone_bucket.items():
if source_zone.zone_name == target_zone.zone_name:
continue
- realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
+ master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
check_bucket_eq(source_zone, target_zone, bucket)
if source_zone.zone_name == target_zone.zone_name:
continue
- realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
+ master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
check_bucket_eq(source_zone, target_zone, bucket)
return None
def test_versioned_object_incremental_sync():
- buckets, zone_bucket = create_bucket_per_zone()
+ buckets, zone_bucket = create_bucket_per_zone_in_master_zg()
# enable versioning
all_zones = []
bucket.configure_versioning(True)
all_zones.append(zone)
- realm.meta_checkpoint()
+ master_zg.meta_checkpoint()
# upload a dummy object to each bucket and wait for sync. this forces each
# bucket to finish a full sync and switch to incremental
for target_zone in all_zones:
if source_zone.zone_name == target_zone.zone_name:
continue
- realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
+ master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
for _, bucket in zone_bucket.items():
# create and delete multiple versions of an object from each zone
for target_zone in all_zones:
if source_zone.zone_name == target_zone.zone_name:
continue
- realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
+ master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
check_bucket_eq(source_zone, target_zone, bucket)
def test_bucket_versioning():
- buckets, zone_bucket = create_bucket_per_zone()
+ buckets, zone_bucket = create_bucket_per_zone_in_realm()
for zone, bucket in zone_bucket.items():
bucket.configure_versioning(True)
assert(key in res and res[key] == 'Enabled')
def test_bucket_acl():
- buckets, zone_bucket = create_bucket_per_zone()
+ buckets, zone_bucket = create_bucket_per_zone_in_master_zg()
for zone, bucket in zone_bucket.items():
assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner
assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers
def test_bucket_delete_notempty():
- buckets, zone_bucket = create_bucket_per_zone()
- realm.meta_checkpoint()
+ buckets, zone_bucket = create_bucket_per_zone_in_master_zg()
+ master_zg.meta_checkpoint()
for zone, bucket_name in zone_bucket.items():
# upload an object to each bucket on its own zone
assert False # expected 409 BucketNotEmpty
# assert that each bucket still exists on the master
- z1 = realm.get_zone('us-1')
+ z1 = master_zg.get_zone('zg1-1')
c1 = z1.get_connection(user)
for _, bucket_name in zone_bucket.items():
assert c1.get_bucket(bucket_name)
def test_multi_period_incremental_sync():
- if len(realm.clusters) < 3:
+ if len(master_zg.clusters) < 3:
from nose.plugins.skip import SkipTest
raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more clusters.")
- buckets, zone_bucket = create_bucket_per_zone()
+ buckets, zone_bucket = create_bucket_per_zone_in_master_zg()
all_zones = []
for z in zone_bucket:
for objname in [ 'p1', '_p1' ]:
k = new_key(zone, bucket_name, objname)
k.set_contents_from_string('asdasd')
- realm.meta_checkpoint()
+ master_zg.meta_checkpoint()
# kill zone 3 gateway to freeze sync status to incremental in first period
- z3 = realm.get_zone('us-3')
+ z3 = master_zg.get_zone('zg1-3')
z3.cluster.stop_rgw()
# change master to zone 2 -> period 2
- realm.set_master_zone(realm.get_zone('us-2'))
+ master_zg.set_master_zone(master_zg.get_zone('zg1-2'))
for zone, bucket_name in zone_bucket.items():
if zone == z3:
k.set_contents_from_string('qweqwe')
# wait for zone 1 to sync
- realm.zone_meta_checkpoint(realm.get_zone('us-1'))
+ master_zg.zone_meta_checkpoint(master_zg.get_zone('zg1-1'))
# change master back to zone 1 -> period 3
- realm.set_master_zone(realm.get_zone('us-1'))
+ master_zg.set_master_zone(master_zg.get_zone('zg1-1'))
for zone, bucket_name in zone_bucket.items():
if zone == z3:
# restart zone 3 gateway and wait for sync
z3.cluster.start_rgw()
- realm.meta_checkpoint()
+ master_zg.meta_checkpoint()
# verify that we end up with the same objects
for source_zone, bucket in zone_bucket.items():
if source_zone.zone_name == target_zone.zone_name:
continue
- realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
+ master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
check_bucket_eq(source_zone, target_zone, bucket)
@attr('destructive')
def test_zonegroup_remove():
- z1 = realm.get_zone('us-1')
+ z1 = master_zg.get_zone('zg1-1')
- # try to 'zone delete' us-2 from cluster 1
+ # try to 'zone delete' zg1-2 from cluster 1
# must fail with ENOENT because the zone is local to cluster 2
- (_, retcode) = z1.cluster.rgw_admin('zone delete --rgw-zone=us-2', False)
+ (_, retcode) = z1.cluster.rgw_admin('zone delete --rgw-zone=zg1-2', False)
assert(retcode == 2) # ENOENT
# use 'zonegroup remove', expecting success
- z1.cluster.rgw_admin('zonegroup remove --rgw-zone=us-2', True)
+ z1.cluster.rgw_admin('zonegroup remove --rgw-zone=zg1-2', True)
# another 'zonegroup remove' should fail with ENOENT
- (_, retcode) = z1.cluster.rgw_admin('zonegroup remove --rgw-zone=us-2', False)
+ (_, retcode) = z1.cluster.rgw_admin('zonegroup remove --rgw-zone=zg1-2', False)
assert(retcode == 2) # ENOENT
# validate the resulting period
z1.cluster.rgw_admin('period update --commit', True)
- realm.remove_zone('us-2')
+ master_zg.remove_zone('zg1-2')
def init(parse_args):
cfg = configparser.RawConfigParser({
+ 'num_zonegroups': 1,
'num_zones': 3,
'gateways_per_zone': 2,
'no_bootstrap': 'false',
parser = argparse.ArgumentParser(
description='Run rgw multi-site tests',
- usage='test_multi [--num-zones <num>] [--no-bootstrap]')
+ usage='test_multi [--num-zonegroups <num>] [--num-zones <num>] [--no-bootstrap]')
section = 'DEFAULT'
+ parser.add_argument('--num-zonegroups', type=int, default=cfg.getint(section, 'num_zonegroups'))
parser.add_argument('--num-zones', type=int, default=cfg.getint(section, 'num_zones'))
parser.add_argument('--gateways-per-zone', type=int, default=cfg.getint(section, 'gateways_per_zone'))
parser.add_argument('--no-bootstrap', action='store_true', default=cfg.getboolean(section, 'no_bootstrap'))
global log_level
log_level = args.log_level
- global rgw_multi
-
- rgw_multi = RGWMulti(int(args.num_zones), int(args.gateways_per_zone))
+ master_zg_base_port = 8000
- rgw_multi.setup(not args.no_bootstrap, args.tenant)
+ for i in range(0, args.num_zonegroups):
+ rgw_multi = RGWMulti(i + 1, int(args.num_zones), int(args.gateways_per_zone), master_zg_base_port + 100 * i, master_zg_base_port)
+ rgw_multi.setup(not args.no_bootstrap, args.tenant)
def setup_module():