bash(cmd)
def mstop(cluster_id, entity = None):
- cmd = mpath('mstop.sh', cluseter_id)
+ cmd = mpath('mstop.sh', cluster_id)
if entity is not None:
cmd += ' ' + entity
bash(cmd)
def mrgw(cluster_id, port, extra_cmd = None):
- cmd = mpath('mrgw.sh', port)
+ cmd = mpath('mrgw.sh', cluster_id, port)
if extra_cmd is not None:
cmd += ' ' + extra_cmd
bash(cmd)
self.secret = secret
class RGWCluster:
- def __init__(self, cluster_num):
+ def __init__(self, cluster_num, port):
self.cluster_num = cluster_num
self.cluster_id = 'c' + str(cluster_num)
+ self.port = port
self.needs_reset = True
def start(self):
def stop(self):
mstop(self.cluster_id)
- def start_rgw(port):
- mrgw(self.cluster_id, port, '--debug-rgw=20 --debug-ms=1')
+ def start_rgw(self):
+ mrgw(self.cluster_id, self.port, '--debug-rgw=20 --debug-ms=1')
def stop_rgw(self):
mstop(self.cluster_id, 'radosgw')
return (s, retcode)
class RGWZone:
- def __init__(self, realm, cluster, zg, zone_name, port):
+ def __init__(self, realm, cluster, zg, zone_name):
self.realm = realm
self.cluster = cluster
self.zg = zg
self.zone_name = zone_name
- self.port = port
self.connection = None
def get_connection(self, user):
self.connection = boto.connect_s3(aws_access_key_id = user.access_key,
aws_secret_access_key = user.secret,
host = 'localhost',
- port = self.port,
+ port = self.cluster.port,
is_secure = False,
calling_format = boto.s3.connection.OrdinaryCallingFormat())
return self.connection
class RGWRealm:
- def __init__(self, realm, credentials, clusters, master_index):
+ def __init__(self, realm, credentials, clusters):
self.realm = realm
self.credentials = credentials
self.clusters = clusters
- self.master_index = master_index
- self.master_cluster = clusters[master_index]
self.zones = {}
self.total_zones = 0
- self.cluster = None
- def init_zone(self, cluster, zg, zone_name, port, first_zone_port=0):
- is_master = (first_zone_port == 0)
+ def init_zone(self, cluster, zg, zone_name, first_zone_port):
+ is_master = (first_zone_port == cluster.port)
if is_master:
bash(tpath('test-rgw-call.sh', 'init_first_zone', cluster.cluster_num,
- self.realm, zg, zone_name, port,
+ self.realm, zg, zone_name, cluster.port,
self.credentials.access_key, self.credentials.secret))
else:
bash(tpath('test-rgw-call.sh', 'init_zone_in_existing_zg', cluster.cluster_num,
- self.realm, zg, zone_name, first_zone_port, port,
+ self.realm, zg, zone_name, first_zone_port, cluster.port,
self.credentials.access_key, self.credentials.secret))
- self.add_zone(cluster, zg, zone_name, port, is_master)
+ self.add_zone(cluster, zg, zone_name, is_master)
- def add_zone(self, cluster, zg, zone_name, port, is_master):
- zone = RGWZone(self.realm, cluster, zg, zone_name, port)
+ def add_zone(self, cluster, zg, zone_name, is_master):
+ zone = RGWZone(self.realm, cluster, zg, zone_name)
self.zones[self.total_zones] = zone
self.total_zones += 1
log(5, 'creating user uid=', user.uid)
cmd = build_cmd('--uid', user.uid, '--display-name', user.display_name,
'--access-key', user.access_key, '--secret', user.secret)
- self.master_cluster.rgw_admin('--rgw-realm=' + self.realm + ' user create ' + cmd)
+ self.master_zone.cluster.rgw_admin('--rgw-realm=' + self.realm + ' user create ' + cmd)
if wait_meta:
self.meta_checkpoint()
-
+ def set_master_zone(self, zone):
+ (zg_json, retcode) = zone.cluster.rgw_admin('--rgw-realm=' + self.realm + ' --rgw-zonegroup=' + zone.zg + ' --rgw-zone=' + zone.zone_name + ' zone modify --master=1')
+ (period_json, retcode) = zone.cluster.rgw_admin('--rgw-realm=' + self.realm + ' period update --commit')
+ self.master_zone = zone
class RGWUser:
def __init__(self, num_clusters):
self.num_clusters = num_clusters
+ self.base_port = 8000
+
self.clusters = {}
for i in xrange(num_clusters):
- self.clusters[i] = RGWCluster(i + 1)
-
- self.base_port = 8000
+ self.clusters[i] = RGWCluster(i + 1, self.base_port + i)
def setup(self, bootstrap):
global realm
global user
realm_credentials = RGWRealmCredentials(gen_access_key(), gen_secret())
- realm = RGWRealm('earth', realm_credentials, self.clusters, 0)
+ realm = RGWRealm('earth', realm_credentials, self.clusters)
if bootstrap:
log(1, 'bootstapping clusters')
for i in xrange(1, self.num_clusters):
self.clusters[i].start()
- realm.init_zone(self.clusters[i], 'us', 'us-' + str(i + 1), self.base_port + i, first_zone_port=self.base_port)
+ realm.init_zone(self.clusters[i], 'us', 'us-' + str(i + 1), self.base_port)
else:
for i in xrange(0, self.num_clusters):
- realm.add_zone(self.clusters[i], 'us', 'us-' + str(i + 1), self.base_port + i, (i == 0))
+ realm.add_zone(self.clusters[i], 'us', 'us-' + str(i + 1), (i == 0))
realm.meta_checkpoint()
realm.zone_data_checkpoint(target_zone, source_zone)
check_bucket_eq(source_zone, target_zone, bucket_name)
-
+
+def test_multi_period_incremental_sync():
+ if len(realm.clusters) < 3:
+ from nose.plugins.skip import SkipTest
+ raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more clusters.")
+
+ buckets, zone_bucket = create_bucket_per_zone()
+
+ all_zones = []
+ for z in zone_bucket:
+ all_zones.append(z)
+
+ for zone, bucket_name in zone_bucket.iteritems():
+ for objname in [ 'p1', '_p1' ]:
+ k = new_key(zone, bucket_name, objname)
+ k.set_contents_from_string('asdasd')
+ realm.meta_checkpoint()
+
+ # kill zone 3 gateway to freeze sync status to incremental in first period
+ z3 = realm.get_zone(2)
+ z3.cluster.stop_rgw()
+
+ # change master to zone 2 -> period 2
+ realm.set_master_zone(realm.get_zone(1))
+
+ for zone, bucket_name in zone_bucket.iteritems():
+ if zone == z3:
+ continue
+ for objname in [ 'p2', '_p2' ]:
+ k = new_key(zone, bucket_name, objname)
+ k.set_contents_from_string('qweqwe')
+
+ # wait for zone 1 to sync
+ realm.zone_meta_checkpoint(realm.get_zone(0))
+
+ # change master back to zone 1 -> period 3
+ realm.set_master_zone(realm.get_zone(0))
+
+ for zone, bucket_name in zone_bucket.iteritems():
+ if zone == z3:
+ continue
+ for objname in [ 'p3', '_p3' ]:
+ k = new_key(zone, bucket_name, objname)
+ k.set_contents_from_string('zxczxc')
+
+ # restart zone 3 gateway and wait for sync
+ z3.cluster.start_rgw()
+ realm.meta_checkpoint()
+
+ # verify that we end up with the same objects
+ for source_zone, bucket_name in zone_bucket.iteritems():
+ for target_zone in all_zones:
+ if source_zone.zone_name == target_zone.zone_name:
+ continue
+
+ realm.zone_data_checkpoint(target_zone, source_zone)
+
+ check_bucket_eq(source_zone, target_zone, bucket_name)
+
+
def init(parse_args):
cfg = ConfigParser.RawConfigParser({
- 'num_zones': 2,
+ 'num_zones': 3,
'no_bootstrap': 'false',
'log_level': 20,
})