]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/rgw: rewrite test_multi.py in terms of rgw_multi 14433/head
authorCasey Bodley <cbodley@redhat.com>
Wed, 12 Apr 2017 17:43:32 +0000 (13:43 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 18 Apr 2017 23:13:40 +0000 (19:13 -0400)
logging changes inspired by Abhishek Lekshmanan <abhishek@suse.com>

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/test/rgw/test_multi.py

index 18a44a07aa5acfb9b3b187fe74095f1c135d0aba..9f289861ebf3a4bb559b9453a39873711129532f 100644 (file)
@@ -1,36 +1,20 @@
 import subprocess
 import os
-import json
 import random
 import string
 import argparse
 import sys
-import time
-try:
-    from itertools import izip_longest as zip_longest
-except ImportError:
-    from itertools import zip_longest
+import logging
 try:
     import configparser
 except ImportError:
     import ConfigParser as configparser
 
-import boto
-import boto.s3.connection
-
-import inspect
-
-from nose.tools import eq_ as eq
-from nose.plugins.attrib import attr
+import nose.core
 
-# test-suite for rgw multisite, the last test destroys a zone,
-# so in order to use this as a dev cluster, do
-# $nosetests -a '!destructive' /path/to/test_multi.py
-
-log_level = 20
-
-num_buckets = 0
-run_prefix=''.join(random.SystemRandom().choice(string.ascii_lowercase) for _ in range(6))
+from rgw_multi import multisite
+# make tests from rgw_multi.tests available to nose
+from rgw_multi.tests import *
 
 mstart_path = os.getenv('MSTART_PATH')
 if mstart_path is None:
@@ -38,959 +22,122 @@ if mstart_path is None:
 
 test_path = os.path.normpath(os.path.dirname(os.path.realpath(__file__))) + '/'
 
-def lineno():
-    return inspect.currentframe().f_back.f_lineno
-
-def log(level, *params):
-    if level > log_level:
-        return
-
-    s = '>>> '
-    for p in params:
-        if p:
-            s += str(p)
-
-    print(s)
-    sys.stdout.flush()
-
-def build_cmd(*params):
-    s = ''
-    for p in params:
-        if len(s) != 0:
-            s += ' '
-        s += p
-
-    return s
-
-def mpath(bin, *params):
-    s = mstart_path + bin
-    for p in params:
-        s += ' ' + str(p)
-
-    return s
-
-def tpath(bin, *params):
-    s = test_path + bin
-    for p in params:
-        s += ' ' + str(p)
-
-    return s
+# configure logging for the tests module
+log = logging.getLogger('rgw_multi.tests')
 
-def bash(cmd, check_retcode = True):
-    log(5, 'running cmd: ', cmd)
-    process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
+def bash(cmd, **kwargs):
+    log.debug('running cmd: %s', ' '.join(cmd))
+    check_retcode = kwargs.pop('check_retcode', True)
+    kwargs['stdout'] = subprocess.PIPE
+    process = subprocess.Popen(cmd, **kwargs)
     s = process.communicate()[0]
-    log(20, 'command returned status=', process.returncode, ' stdout=', s.decode('utf-8'))
+    log.debug('command returned status=%d stdout=%s', process.returncode, s.decode('utf-8'))
     if check_retcode:
         assert(process.returncode == 0)
     return (s, process.returncode)
 
-def mstart(cluster_id, is_new):
-    cmd = mpath('mstart.sh', cluster_id)
-    if is_new:
-        cmd += ' -n'
-        cmd += ' --mds_num 0'
-    bash(cmd)
-
-def mstop(cluster_id, entity = None):
-    cmd  = mpath('mstop.sh', cluster_id)
-    if entity is not None:
-        cmd += ' ' + entity
-    bash(cmd)
-
-def mrgw(cluster_id, port, extra_cmd = None):
-    cmd  = mpath('mrgw.sh', cluster_id, port)
-    if extra_cmd is not None:
-        cmd += ' ' + extra_cmd
-    bash(cmd)
-
-def init_multi_site(num_clusters):
-    bash(tpath('test-rgw-multisite.sh', num_clusters))
-
-
-class RGWRealmCredentials:
-    def __init__(self, access_key, secret):
-        self.access_key = access_key
-        self.secret = secret
-
-class RGWCluster:
-    def __init__(self, zg_num, cluster_num, cluster_id, port, num_gateways):
-        self.zg_num = zg_num
-        self.cluster_num = cluster_num
+class Cluster(multisite.Cluster):
+    """ cluster implementation based on mstart/mrun scripts """
+    def __init__(self, cluster_id):
+        super(Cluster, self).__init__()
         self.cluster_id = cluster_id
-        self.port = port
-        self.num_gateways = num_gateways
         self.needs_reset = True
 
+    def admin(self, args = [], **kwargs):
+        """ radosgw-admin command """
+        cmd = [test_path + 'test-rgw-call.sh', 'call_rgw_admin', self.cluster_id] + args
+        if kwargs.pop('read_only', False):
+            cmd += ['--rgw-cache-enabled', 'false']
+        return bash(cmd, **kwargs)
+
     def start(self):
-        mstart(self.cluster_id, self.needs_reset)
+        cmd = [mstart_path + 'mstart.sh', self.cluster_id]
+        if self.needs_reset:
+            cmd += ['-n', '--mds_num', '0']
+        bash(cmd)
         self.needs_reset = False
 
     def stop(self):
-        mstop(self.cluster_id)
-
-    def start_rgw(self):
-        for i in range(self.num_gateways):
-            mrgw(self.cluster_id, self.port + i, '--debug-rgw=20 --debug-ms=1')
-
-    def stop_rgw(self):
-        mstop(self.cluster_id, 'radosgw')
-
-    def rgw_admin(self, cmd, check_retcode = True):
-        (s, retcode) = bash(tpath('test-rgw-call.sh', 'call_rgw_admin', self.cluster_id, cmd), check_retcode)
-        return (s, retcode)
-
-    def rgw_admin_ro(self, cmd, check_retcode = True):
-        (s, retcode) = bash(tpath('test-rgw-call.sh', 'call_rgw_admin', self.cluster_id, '--rgw-cache-enabled=false ' + cmd), check_retcode)
-        return (s, retcode)
-
-class RGWZone:
-    def __init__(self, realm_name, cluster, zg_name, zone_name):
-        self.realm_name = realm_name
-        self.cluster = cluster
-        self.zg_name = zg_name
-        self.zone_name = zone_name
-        self.connection = None
-
-    def get_connection(self, user):
-        if self.connection is None:
-            self.connection = boto.connect_s3(aws_access_key_id = user.access_key,
-                                          aws_secret_access_key = user.secret,
-                                          host = 'localhost',
-                                          port = self.cluster.port,
-                                          is_secure = False,
-                                          calling_format = boto.s3.connection.OrdinaryCallingFormat())
-        return self.connection
-
-class RGWZonegroup:
-    def __init__(self, realm_name, credentials, clusters):
-        self.realm_name = realm_name
-        self.credentials = credentials
-        self.clusters = clusters
-        self.zones = {}
-
-    def init_zone(self, cluster, zg_name, zone_name, first_zone_port, master_zg_first_zone_port):
-        self.is_master_zg = (first_zone_port == master_zg_first_zone_port)
-        is_master = (first_zone_port == cluster.port)
-        endpoints = ",".join(map(lambda x: "http://localhost:" + str(cluster.port + x), range(cluster.num_gateways)))
-        if is_master:
-            if self.is_master_zg:
-                bash(tpath('test-rgw-call.sh', 'init_first_zone', cluster.cluster_id,
-                    self.realm_name, zg_name, zone_name, endpoints,
-                    self.credentials.access_key, self.credentials.secret))
-            else:
-                bash(tpath('test-rgw-call.sh', 'init_first_zone_in_slave_zg', cluster.cluster_id,
-                       self.realm_name, zg_name, zone_name, master_zg_first_zone_port, endpoints,
-                       self.credentials.access_key, self.credentials.secret))
-        else:
-            bash(tpath('test-rgw-call.sh', 'init_zone_in_existing_zg', cluster.cluster_id,
-                       self.realm_name, zg_name, zone_name, master_zg_first_zone_port, endpoints,
-                       self.credentials.access_key, self.credentials.secret))
-
-        self.add_zone(cluster, zg_name, zone_name, is_master)
-        cluster.start_rgw()
-
-    def add_zone(self, cluster, zg_name, zone_name, is_master):
-        zone = RGWZone(self.realm_name, cluster, zg_name, zone_name)
-        self.zones[zone_name] = zone
-
-        if is_master:
-            self.master_zone = zone
-            if self.is_master_zg:
-                realm.master_zone = zone
-
-    def remove_zone(self, zone_name):
-        del self.zones[zone_name]
-
-    def get_zone(self, zone_name):
-        return self.zones[zone_name]
-
-    def get_zones(self):
-        for (k, zone) in self.zones.items():
-            yield zone
-
-    def meta_sync_status(self, zone):
-        if zone.zone_name == realm.master_zone.zone_name:
-            return None
-
-        while True:
-            (meta_sync_status_json, retcode) = zone.cluster.rgw_admin_ro('--rgw-realm=' + self.realm_name + ' metadata sync status', check_retcode = False)
-            if retcode == 0:
-                break
-
-            assert(retcode == 2) # ENOENT
-
-        meta_sync_status_json = meta_sync_status_json.decode('utf-8')
-        log(20, 'current meta sync status=', meta_sync_status_json)
-        sync_status = json.loads(meta_sync_status_json)
-
-        global_sync_status=sync_status['sync_status']['info']['status']
-        num_shards=sync_status['sync_status']['info']['num_shards']
-
-        sync_markers=sync_status['sync_status']['markers']
-        log(20, 'sync_markers=', sync_markers)
-        assert(num_shards == len(sync_markers))
-
-        markers={}
-        for i in range(num_shards):
-            markers[i] = sync_markers[i]['val']['marker']
-
-        return (num_shards, markers)
-
-    def meta_master_log_status(self, master_zone):
-        (mdlog_status_json, retcode) = master_zone.cluster.rgw_admin_ro('--rgw-realm=' + self.realm_name + ' mdlog status')
-        mdlog_status = json.loads(mdlog_status_json.decode('utf-8'))
-
-        markers={}
-        i = 0
-        for s in mdlog_status:
-            markers[i] = s['marker']
-            i += 1
-
-        log(20, 'master meta markers=', markers)
-
-        return markers
-
-    def compare_meta_status(self, zone, log_status, sync_status):
-        if len(log_status) != len(sync_status):
-            log(10, 'len(log_status)=', len(log_status), ' len(sync_status=', len(sync_status))
-            return False
-
-        msg =  ''
-        for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
-            if l > s:
-                if len(s) != 0:
-                    msg += ', '
-                msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
-
-        if len(msg) > 0:
-            log(1, 'zone ', zone.zone_name, ' behind master: ', msg)
-            return False
-
-        return True
-
-    def zone_meta_checkpoint(self, zone):
-        if zone.zone_name == realm.master_zone.zone_name:
-            return
-
-        log(10, 'starting meta checkpoint for zone=', zone.zone_name)
-
-        while True:
-            log_status = self.meta_master_log_status(realm.master_zone)
-            (num_shards, sync_status) = self.meta_sync_status(zone)
-
-            log(20, 'log_status=', log_status)
-            log(20, 'sync_status=', sync_status)
-
-            if self.compare_meta_status(zone, log_status, sync_status):
-                break
-
-            time.sleep(5)
-
-
-        log(10, 'finish meta checkpoint for zone=', zone.zone_name)
-
-    def meta_checkpoint(self):
-        log(5, 'meta checkpoint')
-        for z in self.get_zones():
-            self.zone_meta_checkpoint(z)
-
-    def data_sync_status(self, target_zone, source_zone):
-        if target_zone.zone_name == source_zone.zone_name:
-            return None
-
-        while True:
-            (data_sync_status_json, retcode) = target_zone.cluster.rgw_admin_ro('--rgw-realm=' + self.realm_name + ' data sync status --source-zone=' + source_zone.zone_name, check_retcode = False)
-            if retcode == 0:
-                break
-
-            assert(retcode == 2) # ENOENT
-
-        data_sync_status_json = data_sync_status_json.decode('utf-8')
-        log(20, 'current data sync status=', data_sync_status_json)
-        sync_status = json.loads(data_sync_status_json)
-
-        global_sync_status=sync_status['sync_status']['info']['status']
-        num_shards=sync_status['sync_status']['info']['num_shards']
-
-        sync_markers=sync_status['sync_status']['markers']
-        log(20, 'sync_markers=', sync_markers)
-        assert(num_shards == len(sync_markers))
-
-        markers={}
-        for i in range(num_shards):
-            markers[i] = sync_markers[i]['val']['marker']
-
-        return (num_shards, markers)
-
-    def bucket_sync_status(self, target_zone, source_zone, bucket_name):
-        if target_zone.zone_name == source_zone.zone_name:
-            return None
-
-        cmd = '--rgw-realm=' + self.realm_name + ' bucket sync status --source-zone=' + source_zone.zone_name + ' --bucket=' + bucket_name
-        global user
-        if user.tenant is not None:
-            cmd += ' --tenant=' + user.tenant + ' --uid=' + user.uid
-        while True:
-            (bucket_sync_status_json, retcode) = target_zone.cluster.rgw_admin_ro(cmd, check_retcode = False)
-            if retcode == 0:
-                break
-
-            assert(retcode == 2) # ENOENT
-
-        bucket_sync_status_json = bucket_sync_status_json.decode('utf-8')
-        log(20, 'current bucket sync status=', bucket_sync_status_json)
-        sync_status = json.loads(bucket_sync_status_json)
-
-        markers={}
-        for entry in sync_status:
-            val = entry['val']
-            if val['status'] == 'incremental-sync':
-                pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
-            else:
-                pos = ''
-            markers[entry['key']] = pos
-
-        return markers
-
-    def data_source_log_status(self, source_zone):
-        source_cluster = source_zone.cluster
-        (datalog_status_json, retcode) = source_cluster.rgw_admin_ro('--rgw-realm=' + self.realm_name + ' datalog status')
-        datalog_status = json.loads(datalog_status_json.decode('utf-8'))
-
-        markers={}
-        i = 0
-        for s in datalog_status:
-            markers[i] = s['marker']
-            i += 1
-
-        log(20, 'data markers for zone=', source_zone.zone_name, ' markers=', markers)
-
-        return markers
-
-    def bucket_source_log_status(self, source_zone, bucket_name):
-        cmd = '--rgw-realm=' + self.realm_name + ' bilog status --bucket=' + bucket_name
-        global user
-        if user.tenant is not None:
-            cmd += ' --tenant=' + user.tenant + ' --uid=' + user.uid
-        source_cluster = source_zone.cluster
-        (bilog_status_json, retcode) = source_cluster.rgw_admin_ro(cmd)
-        bilog_status = json.loads(bilog_status_json.decode('utf-8'))
-
-        m={}
-        markers={}
-        try:
-            m = bilog_status['markers']
-        except:
-            pass
-
-        for s in m:
-            key = s['key']
-            val = s['val']
-            markers[key] = val
-
-        log(20, 'bilog markers for zone=', source_zone.zone_name, ' bucket=', bucket_name, ' markers=', markers)
-
-        return markers
-
-    def compare_data_status(self, target_zone, source_zone, log_status, sync_status):
-        if len(log_status) != len(sync_status):
-            log(10, 'len(log_status)=', len(log_status), ' len(sync_status)=', len(sync_status))
-            return False
-
-        msg =  ''
-        for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
-            if l > s:
-                if len(s) != 0:
-                    msg += ', '
-                msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
-
-        if len(msg) > 0:
-            log(1, 'data of zone ', target_zone.zone_name, ' behind zone ', source_zone.zone_name, ': ', msg)
-            return False
-
-        return True
-
-    def compare_bucket_status(self, target_zone, source_zone, bucket_name, log_status, sync_status):
-        if len(log_status) != len(sync_status):
-            log(10, 'len(log_status)=', len(log_status), ' len(sync_status)=', len(sync_status))
-            return False
-
-        msg =  ''
-        for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
-            if l > s:
-                if len(s) != 0:
-                    msg += ', '
-                msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
-
-        if len(msg) > 0:
-            log(1, 'bucket ', bucket_name, ' zone ', target_zone.zone_name, ' behind zone ', source_zone.zone_name, ': ', msg)
-            return False
-
-        return True
-
-    def zone_data_checkpoint(self, target_zone, source_zone):
-        if target_zone.zone_name == source_zone.zone_name:
-            return
-
-        log(10, 'starting data checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name)
-
-        while True:
-            log_status = self.data_source_log_status(source_zone)
-            (num_shards, sync_status) = self.data_sync_status(target_zone, source_zone)
-
-            log(20, 'log_status=', log_status)
-            log(20, 'sync_status=', sync_status)
-
-            if self.compare_data_status(target_zone, source_zone, log_status, sync_status):
-                break
-
-            time.sleep(5)
-
-        log(10, 'finished data checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name)
-
-    def zone_bucket_checkpoint(self, target_zone, source_zone, bucket_name):
-        if target_zone.zone_name == source_zone.zone_name:
-            return
-
-        log(10, 'starting bucket checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name, ' bucket_name=', bucket_name)
-
-        while True:
-            log_status = self.bucket_source_log_status(source_zone, bucket_name)
-            sync_status = self.bucket_sync_status(target_zone, source_zone, bucket_name)
-
-            log(20, 'log_status=', log_status)
-            log(20, 'sync_status=', sync_status)
-
-            if self.compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
-                break
-
-            time.sleep(5)
-
-        log(10, 'finished bucket checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name, ' bucket_name=', bucket_name)
-
-
-    def create_user(self, user, wait_meta = True):
-        log(5, 'creating user uid=', user.uid)
-        cmd = build_cmd('--uid', user.uid, '--display-name', user.display_name,
-                        '--access-key', user.access_key, '--secret', user.secret)
-        if user.tenant is not None:
-            cmd += ' --tenant ' + user.tenant
-        self.master_zone.cluster.rgw_admin('--rgw-realm=' + self.realm_name + ' user create ' + cmd)
-
-        if wait_meta:
-            self.meta_checkpoint()
-
-    def set_master_zone(self, zone):
-        (zg_json, retcode) = zone.cluster.rgw_admin('--rgw-realm=' + self.realm_name + ' --rgw-zonegroup=' + zone.zg_name + ' --rgw-zone=' + zone.zone_name + ' zone modify --master=1')
-        (period_json, retcode) = zone.cluster.rgw_admin('--rgw-realm=' + self.realm_name + ' period update --commit')
-        self.master_zone = zone
-       if self.is_master_zg:
-           realm.master_zone = zone
-
-class RGWRealm:
-    def __init__(self, realm_name, credentials):
-        self.realm_name = realm_name
-        self.credentials = credentials
-        self.zonegroups = {}
-        self.zones = {}
-
-    def get_zone(self, zone_name):
-        return self.zones[zone_name]
-
-    def get_zones(self):
-        for (k, zone) in self.zones.iteritems():
-            yield zone
-
-    def add_zonegroup(self, zg_name, zonegroup, is_master_zg):
-        self.zonegroups[zg_name] = zonegroup
-
-        for zone in zonegroup.get_zones():
-           self.zones[zone.zone_name] = zone
+        cmd = [mstart_path + 'mstop.sh', self.cluster_id]
+        bash(cmd)
+
+class Gateway(multisite.Gateway):
+    """ gateway implementation based on mrgw/mstop scripts """
+    def __init__(self, client_id = None, *args, **kwargs):
+        super(Gateway, self).__init__(*args, **kwargs)
+        self.id = client_id
+
+    def start(self, args = []):
+        """ start the gateway """
+        assert(self.cluster)
+        cmd = [mstart_path + 'mrgw.sh', self.cluster.cluster_id, str(self.port)]
+        if self.id:
+            cmd += ['-i', self.id]
+        cmd += ['--debug-rgw=20', '--debug-ms=1']
+        cmd += args
+        bash(cmd)
 
-        if is_master_zg:
-            self.master_zonegroup = zonegroup
-
-    def get_zonegroup(self, zonegroup_name):
-        return self.zonegroups[zonegroup_name]
-
-    def get_zonegroups(self):
-        for (k, zonegroup) in self.zonegroups.iteritems():
-            yield zonegroup
-
-    def meta_checkpoint(self):
-        log(5, 'meta checkpoint')
-        for zg in self.get_zonegroups():
-            zg.meta_checkpoint()
-
-class RGWUser:
-    def __init__(self, uid, display_name, access_key, secret, tenant):
-        self.uid = uid
-        self.display_name = display_name
-        self.access_key = access_key
-        self.secret = secret
-        self.tenant = tenant
+    def stop(self):
+        """ stop the gateway """
+        assert(self.cluster)
+        cmd = [mstart_path + 'mstop.sh', self.cluster.cluster_id, 'radosgw', self.id]
+        bash(cmd)
 
 def gen_access_key():
-     return ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(16))
+    return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(16))
 
 def gen_secret():
-     return ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(32))
-
-def gen_bucket_name():
-    global num_buckets
-
-    num_buckets += 1
-    return run_prefix + '-' + str(num_buckets)
-
-class RGWMulti:
-    def __init__(self, zg_num, num_clusters, gateways_per_cluster, base_port, base_port_master_zg):
-        self.num_clusters = num_clusters
-
-        self.zg_num = zg_num
-        self.zg_name = 'zg' + str(zg_num)
-
-        self.base_port = base_port
-        self.base_port_master_zg = base_port_master_zg
-
-        self.clusters = {}
-        for i in range(num_clusters):
-            self.clusters[i] = RGWCluster(self.zg_num, i + 1, 'zg' + str(zg_num) + '-c' + str(i + 1), self.base_port + i * gateways_per_cluster, gateways_per_cluster)
-
-    def setup(self, bootstrap, tenant):
-        global realm
-        global master_zg
-        global realm_credentials
-        global user
-
-        is_master_zg = (self.base_port == self.base_port_master_zg)
-        if is_master_zg:
-            realm_credentials = RGWRealmCredentials(gen_access_key(), gen_secret())
-            realm = RGWRealm('earth', realm_credentials)
-        rgw_zg = RGWZonegroup('earth', realm_credentials, self.clusters)
-
-        if bootstrap:
-            log(1, 'bootstrapping clusters')
-            self.clusters[0].start()
-            rgw_zg.init_zone(self.clusters[0], self.zg_name, self.zg_name + '-1', self.base_port, self.base_port_master_zg)
-
-            for i in range(1, self.num_clusters):
-                self.clusters[i].start()
-                rgw_zg.init_zone(self.clusters[i], self.zg_name, self.zg_name + '-' + str(i + 1), self.base_port, self.base_port_master_zg)
-        else:
-            for i in range(0, self.num_clusters):
-                rgw_zg.add_zone(self.clusters[i], self.zg_name, self.zg_name + '-' + str(i + 1), (i == 0))
-
-        realm.add_zonegroup(self.zg_name, rgw_zg, is_master_zg)
-        rgw_zg.meta_checkpoint()
-
-        if is_master_zg:
-            master_zg = rgw_zg
-            user = RGWUser('tester', '"Test User"', gen_access_key(), gen_secret(), tenant)
-            master_zg.create_user(user)
-
-def check_all_buckets_exist(zone, buckets):
-    conn = zone.get_connection(user)
-
-    for b in buckets:
-        try:
-            conn.get_bucket(b)
-        except:
-            log(0, 'zone ', zone.zone_name, ' does not contain bucket ', b)
-            return False
-
-    return True
-
-def check_all_buckets_dont_exist(zone, buckets):
-    conn = zone.get_connection(user)
-
-    for b in buckets:
-        try:
-            conn.get_bucket(b)
-        except:
-            continue
-
-        log(0, 'zone ', zone.zone_name, ' contains bucket ', b)
-        return False
-
-    return True
-
-def create_bucket_per_zone_in_master_zg():
-    buckets = []
-    zone_bucket = {}
-    for zone in master_zg.get_zones():
-        conn = zone.get_connection(user)
-        bucket_name = gen_bucket_name()
-        log(1, 'create bucket zone=', zone.zone_name, ' name=', bucket_name)
-        bucket = conn.create_bucket(bucket_name)
-        buckets.append(bucket_name)
-        zone_bucket[zone] = bucket
-
-    return buckets, zone_bucket
-
-def create_bucket_per_zone_in_realm():
-    buckets = []
-    zone_bucket = {}
-    for zone in realm.get_zones():
-        conn = zone.get_connection(user)
-        bucket_name = gen_bucket_name()
-        log(1, 'create bucket zone=', zone.zone_name, ' name=', bucket_name)
-        bucket = conn.create_bucket(bucket_name, None, zone.zg_name)
-        buckets.append(bucket_name)
-        zone_bucket[zone] = bucket
-
-    return buckets, zone_bucket
-
-def test_bucket_create():
-    buckets, _ = create_bucket_per_zone_in_master_zg()
-    master_zg.meta_checkpoint()
-
-    for zone in master_zg.get_zones():
-        assert check_all_buckets_exist(zone, buckets)
-
-def test_bucket_recreate():
-    buckets, _ = create_bucket_per_zone_in_master_zg()
-    master_zg.meta_checkpoint()
-
-    for zone in master_zg.get_zones():
-        assert check_all_buckets_exist(zone, buckets)
-
-    # recreate buckets on all zones, make sure they weren't removed
-    for zone in master_zg.get_zones():
-        for bucket_name in buckets:
-            conn = zone.get_connection(user)
-            bucket = conn.create_bucket(bucket_name)
-
-    for zone in master_zg.get_zones():
-        assert check_all_buckets_exist(zone, buckets)
-
-    master_zg.meta_checkpoint()
-
-    for zone in master_zg.get_zones():
-        assert check_all_buckets_exist(zone, buckets)
-
-def test_bucket_remove():
-    buckets, zone_bucket = create_bucket_per_zone_in_master_zg()
-    master_zg.meta_checkpoint()
-
-    for zone in master_zg.get_zones():
-        assert check_all_buckets_exist(zone, buckets)
-
-    for zone, bucket_name in zone_bucket.items():
-        conn = zone.get_connection(user)
-        conn.delete_bucket(bucket_name)
-
-    master_zg.meta_checkpoint()
-
-    for zone in master_zg.get_zones():
-        assert check_all_buckets_dont_exist(zone, buckets)
-
-def get_bucket(zone, bucket_name):
-    conn = zone.get_connection(user)
-    return conn.get_bucket(bucket_name)
-
-def get_key(zone, bucket_name, obj_name):
-    b = get_bucket(zone, bucket_name)
-    return b.get_key(obj_name)
-
-def new_key(zone, bucket_name, obj_name):
-    b = get_bucket(zone, bucket_name)
-    return b.new_key(obj_name)
-
-def check_object_eq(k1, k2, check_extra = True):
-    assert k1
-    assert k2
-    log(10, 'comparing key name=', k1.name)
-    eq(k1.name, k2.name)
-    eq(k1.get_contents_as_string(), k2.get_contents_as_string())
-    eq(k1.metadata, k2.metadata)
-    eq(k1.cache_control, k2.cache_control)
-    eq(k1.content_type, k2.content_type)
-    eq(k1.content_encoding, k2.content_encoding)
-    eq(k1.content_disposition, k2.content_disposition)
-    eq(k1.content_language, k2.content_language)
-    eq(k1.etag, k2.etag)
-    eq(k1.last_modified, k2.last_modified)
-    if check_extra:
-        eq(k1.owner.id, k2.owner.id)
-        eq(k1.owner.display_name, k2.owner.display_name)
-    eq(k1.storage_class, k2.storage_class)
-    eq(k1.size, k2.size)
-    eq(k1.version_id, k2.version_id)
-    eq(k1.encrypted, k2.encrypted)
-
-def check_bucket_eq(zone1, zone2, bucket_name):
-    log(10, 'comparing bucket=', bucket_name, ' zones={', zone1.zone_name, ', ', zone2.zone_name, '}')
-    b1 = get_bucket(zone1, bucket_name)
-    b2 = get_bucket(zone2, bucket_name)
-
-    log(20, 'bucket1 objects:')
-    for o in b1.get_all_versions():
-        log(20, 'o=', o.name)
-    log(20, 'bucket2 objects:')
-    for o in b2.get_all_versions():
-        log(20, 'o=', o.name)
-
-    for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()):
-        if k1 is None:
-            log(0, 'failure: key=', k2.name, ' is missing from zone=', zone1.zone_name)
-            assert False
-        if k2 is None:
-            log(0, 'failure: key=', k1.name, ' is missing from zone=', zone2.zone_name)
-            assert False
-
-        check_object_eq(k1, k2)
-
-        # now get the keys through a HEAD operation, verify that the available data is the same
-        k1_head = b1.get_key(k1.name)
-        k2_head = b2.get_key(k2.name)
-
-        check_object_eq(k1_head, k2_head, False)
-
-    log(5, 'success, bucket identical: bucket=', bucket_name, ' zones={', zone1.zone_name, ', ', zone2.zone_name, '}')
-
-
-def test_object_sync():
-    buckets, zone_bucket = create_bucket_per_zone_in_master_zg()
-
-    all_zones = []
-    for z in zone_bucket:
-        all_zones.append(z)
-
-    objnames = [ 'myobj', '_myobj', ':', '&' ]
-    content = 'asdasd'
-
-    # don't wait for meta sync just yet
-    for zone, bucket_name in zone_bucket.items():
-        for objname in objnames:
-            k = new_key(zone, bucket_name, objname)
-            k.set_contents_from_string(content)
-
-    master_zg.meta_checkpoint()
-
-    for source_zone, bucket in zone_bucket.items():
-        for target_zone in all_zones:
-            if source_zone.zone_name == target_zone.zone_name:
-                continue
-
-            master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
-
-            check_bucket_eq(source_zone, target_zone, bucket)
-
-def test_object_delete():
-    buckets, zone_bucket = create_bucket_per_zone_in_master_zg()
-
-    all_zones = []
-    for z in zone_bucket:
-        all_zones.append(z)
-
-    objname = 'myobj'
-    content = 'asdasd'
-
-    # don't wait for meta sync just yet
-    for zone, bucket in zone_bucket.items():
-        k = new_key(zone, bucket, objname)
-        k.set_contents_from_string(content)
-
-    master_zg.meta_checkpoint()
-
-    # check object exists
-    for source_zone, bucket in zone_bucket.items():
-        for target_zone in all_zones:
-            if source_zone.zone_name == target_zone.zone_name:
-                continue
-
-            master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
-
-            check_bucket_eq(source_zone, target_zone, bucket)
-
-    # check object removal
-    for source_zone, bucket in zone_bucket.items():
-        k = get_key(source_zone, bucket, objname)
-        k.delete()
-        for target_zone in all_zones:
-            if source_zone.zone_name == target_zone.zone_name:
-                continue
-
-            master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
-
-            check_bucket_eq(source_zone, target_zone, bucket)
-
-def get_latest_object_version(key):
-    for k in key.bucket.list_versions(key.name):
-        if k.is_latest:
-            return k
-    return None
-
-def test_versioned_object_incremental_sync():
-    buckets, zone_bucket = create_bucket_per_zone_in_master_zg()
-
-    # enable versioning
-    all_zones = []
-    for zone, bucket in zone_bucket.items():
-        bucket.configure_versioning(True)
-        all_zones.append(zone)
-
-    master_zg.meta_checkpoint()
-
-    # upload a dummy object to each bucket and wait for sync. this forces each
-    # bucket to finish a full sync and switch to incremental
-    for source_zone, bucket in zone_bucket.items():
-        new_key(source_zone, bucket, 'dummy').set_contents_from_string('')
-        for target_zone in all_zones:
-            if source_zone.zone_name == target_zone.zone_name:
-                continue
-            master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
-
-    for _, bucket in zone_bucket.items():
-        # create and delete multiple versions of an object from each zone
-        for zone in all_zones:
-            obj = 'obj-' + zone.zone_name
-            k = new_key(zone, bucket, obj)
-
-            k.set_contents_from_string('version1')
-            v = get_latest_object_version(k)
-            log(10, 'version1 id=', v.version_id)
-            # don't delete version1 - this tests that the initial version
-            # doesn't get squashed into later versions
-
-            # create and delete the following object versions to test that
-            # the operations don't race with each other during sync
-            k.set_contents_from_string('version2')
-            v = get_latest_object_version(k)
-            log(10, 'version2 id=', v.version_id)
-            k.bucket.delete_key(obj, version_id=v.version_id)
-
-            k.set_contents_from_string('version3')
-            v = get_latest_object_version(k)
-            log(10, 'version3 id=', v.version_id)
-            k.bucket.delete_key(obj, version_id=v.version_id)
-
-    for source_zone, bucket in zone_bucket.items():
-        for target_zone in all_zones:
-            if source_zone.zone_name == target_zone.zone_name:
-                continue
-            master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
-            check_bucket_eq(source_zone, target_zone, bucket)
-
-def test_bucket_versioning():
-    buckets, zone_bucket = create_bucket_per_zone_in_realm()
-
-    for zone, bucket in zone_bucket.items():
-        bucket.configure_versioning(True)
-        res = bucket.get_versioning_status()
-        key = 'Versioning'
-        assert(key in res and res[key] == 'Enabled')
-
-def test_bucket_acl():
-    buckets, zone_bucket = create_bucket_per_zone_in_master_zg()
-
-    for zone, bucket in zone_bucket.items():
-        assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner
-        bucket.set_acl('public-read')
-        assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers
-
-def test_bucket_delete_notempty():
-    buckets, zone_bucket = create_bucket_per_zone_in_master_zg()
-    master_zg.meta_checkpoint()
-
-    for zone, bucket_name in zone_bucket.items():
-        # upload an object to each bucket on its own zone
-        conn = zone.get_connection(user)
-        bucket = conn.get_bucket(bucket_name)
-        k = bucket.new_key('foo')
-        k.set_contents_from_string('bar')
-        # attempt to delete the bucket before this object can sync
-        try:
-            conn.delete_bucket(bucket_name)
-        except boto.exception.S3ResponseError, e:
-            assert(e.error_code == 'BucketNotEmpty')
-            continue
-        assert False # expected 409 BucketNotEmpty
-
-    # assert that each bucket still exists on the master
-    z1 = master_zg.get_zone('zg1-1')
-    c1 = z1.get_connection(user)
-    for _, bucket_name in zone_bucket.items():
-        assert c1.get_bucket(bucket_name)
-
-def test_multi_period_incremental_sync():
-    if len(master_zg.clusters) < 3:
-        from nose.plugins.skip import SkipTest
-        raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more clusters.")
-
-    buckets, zone_bucket = create_bucket_per_zone_in_master_zg()
-
-    all_zones = []
-    for z in zone_bucket:
-        all_zones.append(z)
-
-    for zone, bucket_name in zone_bucket.items():
-        for objname in [ 'p1', '_p1' ]:
-            k = new_key(zone, bucket_name, objname)
-            k.set_contents_from_string('asdasd')
-    master_zg.meta_checkpoint()
-
-    # kill zone 3 gateway to freeze sync status to incremental in first period
-    z3 = master_zg.get_zone('zg1-3')
-    z3.cluster.stop_rgw()
-
-    # change master to zone 2 -> period 2
-    master_zg.set_master_zone(master_zg.get_zone('zg1-2'))
-
-    for zone, bucket_name in zone_bucket.items():
-        if zone == z3:
-            continue
-        for objname in [ 'p2', '_p2' ]:
-            k = new_key(zone, bucket_name, objname)
-            k.set_contents_from_string('qweqwe')
-
-    # wait for zone 1 to sync
-    master_zg.zone_meta_checkpoint(master_zg.get_zone('zg1-1'))
-
-    # change master back to zone 1 -> period 3
-    master_zg.set_master_zone(master_zg.get_zone('zg1-1'))
-
-    for zone, bucket_name in zone_bucket.items():
-        if zone == z3:
-            continue
-        for objname in [ 'p3', '_p3' ]:
-            k = new_key(zone, bucket_name, objname)
-            k.set_contents_from_string('zxczxc')
-
-    # restart zone 3 gateway and wait for sync
-    z3.cluster.start_rgw()
-    master_zg.meta_checkpoint()
-
-    # verify that we end up with the same objects
-    for source_zone, bucket in zone_bucket.items():
-        for target_zone in all_zones:
-            if source_zone.zone_name == target_zone.zone_name:
-                continue
-
-            master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
-
-            check_bucket_eq(source_zone, target_zone, bucket)
-
-@attr('destructive')
-def test_zonegroup_remove():
-    z1 = master_zg.get_zone('zg1-1')
-
-    # try to 'zone delete' zg1-2 from cluster 1
-    # must fail with ENOENT because the zone is local to cluster 2
-    (_, retcode) = z1.cluster.rgw_admin('zone delete --rgw-zone=zg1-2', False)
-    assert(retcode == 2) # ENOENT
-
-    # use 'zonegroup remove', expecting success
-    z1.cluster.rgw_admin('zonegroup remove --rgw-zone=zg1-2', True)
-
-    # another 'zonegroup remove' should fail with ENOENT
-    (_, retcode) = z1.cluster.rgw_admin('zonegroup remove --rgw-zone=zg1-2', False)
-    assert(retcode == 2) # ENOENT
-
-    # validate the resulting period
-    z1.cluster.rgw_admin('period update --commit', True)
-    master_zg.remove_zone('zg1-2')
+    return ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(32))
+
+def gen_credentials():
+    return multisite.Credentials(gen_access_key(), gen_secret())
+
+def cluster_name(cluster_num):
+    return 'c' + str(cluster_num)
+
+def zonegroup_name(zonegroup_num):
+    return string.ascii_lowercase[zonegroup_num]
+
+def zone_name(zonegroup_num, zone_num):
+    return zonegroup_name(zonegroup_num) + str(zone_num + 1)
+
+def gateway_port(zonegroup_num, gateway_num):
+    return 8000 + 100 * zonegroup_num + gateway_num
+
+def gateway_name(zonegroup_num, zone_num, gateway_num):
+    return zone_name(zonegroup_num, zone_num) + '-' + str(gateway_num + 1)
+
+def zone_endpoints(zonegroup_num, zone_num, gateways_per_zone):
+    endpoints = []
+    base = gateway_port(zonegroup_num, zone_num * gateways_per_zone)
+    for i in range(0, gateways_per_zone):
+        endpoints.append('http://localhost:' + str(base + i))
+    return endpoints
+
+def get_log_level(log_level):
+    if log_level >= 20:
+        return logging.DEBUG
+    if log_level >= 10:
+        return logging.INFO
+    if log_level >= 5:
+        return logging.WARN
+    if log_level >= 1:
+        return logging.ERROR
+    return logging.CRITICAL
+
+def setup_logging(log_level_console, log_file, log_level_file):
+    if log_file:
+        formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
+        fh = logging.FileHandler(log_file)
+        fh.setFormatter(formatter)
+        fh.setLevel(get_log_level(log_level_file))
+        log.addHandler(fh)
+
+    formatter = logging.Formatter('%(levelname)s %(message)s')
+    ch = logging.StreamHandler()
+    ch.setFormatter(formatter)
+    ch.setLevel(get_log_level(log_level_console))
+    log.addHandler(ch)
 
 def init(parse_args):
     cfg = configparser.RawConfigParser({
@@ -999,6 +146,8 @@ def init(parse_args):
                                          'gateways_per_zone': 2,
                                          'no_bootstrap': 'false',
                                          'log_level': 20,
+                                         'log_file': None,
+                                         'file_log_level': 20,
                                          'tenant': None,
                                          })
     try:
@@ -1023,6 +172,8 @@ def init(parse_args):
     parser.add_argument('--gateways-per-zone', type=int, default=cfg.getint(section, 'gateways_per_zone'))
     parser.add_argument('--no-bootstrap', action='store_true', default=cfg.getboolean(section, 'no_bootstrap'))
     parser.add_argument('--log-level', type=int, default=cfg.getint(section, 'log_level'))
+    parser.add_argument('--log-file', type=str, default=cfg.get(section, 'log_file'))
+    parser.add_argument('--file-log-level', type=int, default=cfg.getint(section, 'file_log_level'))
     parser.add_argument('--tenant', type=str, default=cfg.get(section, 'tenant'))
 
     argv = []
@@ -1031,16 +182,119 @@ def init(parse_args):
         argv = sys.argv[1:]
 
     args = parser.parse_args(argv)
+    bootstrap = not args.no_bootstrap
+
+    setup_logging(args.log_level, args.log_file, args.file_log_level)
+
+    # start first cluster
+    c1 = Cluster(cluster_name(1))
+    if bootstrap:
+        c1.start()
+    clusters = []
+    clusters.append(c1)
+
+    admin_creds = gen_credentials()
+    admin_user = multisite.User('zone.user')
 
-    global log_level
-    log_level = args.log_level
+    user_creds = gen_credentials()
+    user = multisite.User('tester')
 
-    master_zg_base_port = 8000
+    realm = multisite.Realm('r')
+    if bootstrap:
+        # create the realm on c1
+        realm.create(c1)
+    else:
+        realm.get(c1)
+    period = multisite.Period(realm=realm)
+    realm.current_period = period
 
-    for i in range(0, args.num_zonegroups):
-        rgw_multi = RGWMulti(i + 1, int(args.num_zones), int(args.gateways_per_zone), master_zg_base_port + 100 * i, master_zg_base_port)
-        rgw_multi.setup(not args.no_bootstrap, args.tenant)
+    for zg in range(0, args.num_zonegroups):
+        zonegroup = multisite.ZoneGroup(zonegroup_name(zg), period)
+        period.zonegroups.append(zonegroup)
 
+        is_master_zg = zg == 0
+        if is_master_zg:
+            period.master_zonegroup = zonegroup
+
+        for z in range(0, args.num_zones):
+            is_master = z == 0
+            # start a cluster, or use c1 for first zone
+            cluster = None
+            if is_master_zg and is_master:
+                cluster = c1
+            else:
+                cluster = Cluster(cluster_name(len(clusters) + 1))
+                clusters.append(cluster)
+                if bootstrap:
+                    cluster.start()
+                    # pull realm configuration from the master's gateway
+                    gateway = realm.meta_master_zone().gateways[0]
+                    realm.pull(cluster, gateway, admin_creds)
+
+            endpoints = zone_endpoints(zg, z, args.gateways_per_zone)
+            if is_master:
+                if bootstrap:
+                    # create the zonegroup on its first zone's cluster
+                    arg = []
+                    if is_master_zg:
+                        arg += ['--master']
+                    if len(endpoints): # use master zone's endpoints
+                        arg += ['--endpoints', ','.join(endpoints)]
+                    zonegroup.create(cluster, arg)
+                else:
+                    zonegroup.get(cluster)
+
+            # create the zone in its zonegroup
+            zone = multisite.Zone(zone_name(zg, z), zonegroup, cluster)
+            if bootstrap:
+                arg = admin_creds.credential_args()
+                if is_master:
+                    arg += ['--master']
+                if len(endpoints):
+                    arg += ['--endpoints', ','.join(endpoints)]
+                zone.create(cluster, arg)
+            else:
+                zone.get(cluster)
+            zonegroup.zones.append(zone)
+            if is_master:
+                zonegroup.master_zone = zone
+
+            # update/commit the period
+            if bootstrap:
+                period.update(zone, commit=True)
+
+            # start the gateways
+            for g in range(0, args.gateways_per_zone):
+                port = gateway_port(zg, g + z * args.gateways_per_zone)
+                client_id = gateway_name(zg, z, g)
+                gateway = Gateway(client_id, 'localhost', port, cluster, zone)
+                if bootstrap:
+                    gateway.start()
+                zone.gateways.append(gateway)
+
+            if is_master_zg and is_master:
+                if bootstrap:
+                    # create admin user
+                    arg = ['--display-name', '"Zone User"', '--system']
+                    arg += admin_creds.credential_args()
+                    admin_user.create(zone, arg)
+                    # create test user
+                    arg = ['--display-name', '"Test User"']
+                    arg += user_creds.credential_args()
+                    if args.tenant:
+                        cmd += ['--tenant', args.tenant]
+                    user.create(zone, arg)
+                else:
+                    # read users and update keys
+                    admin_user.info(zone)
+                    admin_creds = admin_user.credentials[0]
+                    user.info(zone)
+                    user_creds = user.credentials[0]
+
+    if not bootstrap:
+        period.get(c1)
+
+    init_multi(realm, user)
 
 def setup_module():
     init(False)