import subprocess
import os
-import json
import random
import string
import argparse
import sys
-import time
-try:
- from itertools import izip_longest as zip_longest
-except ImportError:
- from itertools import zip_longest
+import logging
try:
import configparser
except ImportError:
import ConfigParser as configparser
-import boto
-import boto.s3.connection
-
-import inspect
-
-from nose.tools import eq_ as eq
-from nose.plugins.attrib import attr
+import nose.core
-# test-suite for rgw multisite, the last test destroys a zone,
-# so in order to use this as a dev cluster, do
-# $nosetests -a '!destructive' /path/to/test_multi.py
-
-log_level = 20
-
-num_buckets = 0
-run_prefix=''.join(random.SystemRandom().choice(string.ascii_lowercase) for _ in range(6))
+from rgw_multi import multisite
+# make tests from rgw_multi.tests available to nose
+from rgw_multi.tests import *
mstart_path = os.getenv('MSTART_PATH')
if mstart_path is None:
test_path = os.path.normpath(os.path.dirname(os.path.realpath(__file__))) + '/'
-def lineno():
- return inspect.currentframe().f_back.f_lineno
-
-def log(level, *params):
- if level > log_level:
- return
-
- s = '>>> '
- for p in params:
- if p:
- s += str(p)
-
- print(s)
- sys.stdout.flush()
-
-def build_cmd(*params):
- s = ''
- for p in params:
- if len(s) != 0:
- s += ' '
- s += p
-
- return s
-
-def mpath(bin, *params):
- s = mstart_path + bin
- for p in params:
- s += ' ' + str(p)
-
- return s
-
-def tpath(bin, *params):
- s = test_path + bin
- for p in params:
- s += ' ' + str(p)
-
- return s
+# configure logging for the tests module
+log = logging.getLogger('rgw_multi.tests')
-def bash(cmd, check_retcode = True):
- log(5, 'running cmd: ', cmd)
- process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
+def bash(cmd, **kwargs):
+ log.debug('running cmd: %s', ' '.join(cmd))
+ check_retcode = kwargs.pop('check_retcode', True)
+ kwargs['stdout'] = subprocess.PIPE
+ process = subprocess.Popen(cmd, **kwargs)
s = process.communicate()[0]
- log(20, 'command returned status=', process.returncode, ' stdout=', s.decode('utf-8'))
+ log.debug('command returned status=%d stdout=%s', process.returncode, s.decode('utf-8'))
if check_retcode:
assert(process.returncode == 0)
return (s, process.returncode)
-def mstart(cluster_id, is_new):
- cmd = mpath('mstart.sh', cluster_id)
- if is_new:
- cmd += ' -n'
- cmd += ' --mds_num 0'
- bash(cmd)
-
-def mstop(cluster_id, entity = None):
- cmd = mpath('mstop.sh', cluster_id)
- if entity is not None:
- cmd += ' ' + entity
- bash(cmd)
-
-def mrgw(cluster_id, port, extra_cmd = None):
- cmd = mpath('mrgw.sh', cluster_id, port)
- if extra_cmd is not None:
- cmd += ' ' + extra_cmd
- bash(cmd)
-
-def init_multi_site(num_clusters):
- bash(tpath('test-rgw-multisite.sh', num_clusters))
-
-
-class RGWRealmCredentials:
- def __init__(self, access_key, secret):
- self.access_key = access_key
- self.secret = secret
-
-class RGWCluster:
- def __init__(self, zg_num, cluster_num, cluster_id, port, num_gateways):
- self.zg_num = zg_num
- self.cluster_num = cluster_num
+class Cluster(multisite.Cluster):
+ """ cluster implementation based on mstart/mrun scripts """
+ def __init__(self, cluster_id):
+ super(Cluster, self).__init__()
self.cluster_id = cluster_id
- self.port = port
- self.num_gateways = num_gateways
self.needs_reset = True
+ def admin(self, args = [], **kwargs):
+ """ radosgw-admin command """
+ cmd = [test_path + 'test-rgw-call.sh', 'call_rgw_admin', self.cluster_id] + args
+ if kwargs.pop('read_only', False):
+ cmd += ['--rgw-cache-enabled', 'false']
+ return bash(cmd, **kwargs)
+
def start(self):
- mstart(self.cluster_id, self.needs_reset)
+ cmd = [mstart_path + 'mstart.sh', self.cluster_id]
+ if self.needs_reset:
+ cmd += ['-n', '--mds_num', '0']
+ bash(cmd)
self.needs_reset = False
def stop(self):
- mstop(self.cluster_id)
-
- def start_rgw(self):
- for i in range(self.num_gateways):
- mrgw(self.cluster_id, self.port + i, '--debug-rgw=20 --debug-ms=1')
-
- def stop_rgw(self):
- mstop(self.cluster_id, 'radosgw')
-
- def rgw_admin(self, cmd, check_retcode = True):
- (s, retcode) = bash(tpath('test-rgw-call.sh', 'call_rgw_admin', self.cluster_id, cmd), check_retcode)
- return (s, retcode)
-
- def rgw_admin_ro(self, cmd, check_retcode = True):
- (s, retcode) = bash(tpath('test-rgw-call.sh', 'call_rgw_admin', self.cluster_id, '--rgw-cache-enabled=false ' + cmd), check_retcode)
- return (s, retcode)
-
-class RGWZone:
- def __init__(self, realm_name, cluster, zg_name, zone_name):
- self.realm_name = realm_name
- self.cluster = cluster
- self.zg_name = zg_name
- self.zone_name = zone_name
- self.connection = None
-
- def get_connection(self, user):
- if self.connection is None:
- self.connection = boto.connect_s3(aws_access_key_id = user.access_key,
- aws_secret_access_key = user.secret,
- host = 'localhost',
- port = self.cluster.port,
- is_secure = False,
- calling_format = boto.s3.connection.OrdinaryCallingFormat())
- return self.connection
-
-class RGWZonegroup:
- def __init__(self, realm_name, credentials, clusters):
- self.realm_name = realm_name
- self.credentials = credentials
- self.clusters = clusters
- self.zones = {}
-
- def init_zone(self, cluster, zg_name, zone_name, first_zone_port, master_zg_first_zone_port):
- self.is_master_zg = (first_zone_port == master_zg_first_zone_port)
- is_master = (first_zone_port == cluster.port)
- endpoints = ",".join(map(lambda x: "http://localhost:" + str(cluster.port + x), range(cluster.num_gateways)))
- if is_master:
- if self.is_master_zg:
- bash(tpath('test-rgw-call.sh', 'init_first_zone', cluster.cluster_id,
- self.realm_name, zg_name, zone_name, endpoints,
- self.credentials.access_key, self.credentials.secret))
- else:
- bash(tpath('test-rgw-call.sh', 'init_first_zone_in_slave_zg', cluster.cluster_id,
- self.realm_name, zg_name, zone_name, master_zg_first_zone_port, endpoints,
- self.credentials.access_key, self.credentials.secret))
- else:
- bash(tpath('test-rgw-call.sh', 'init_zone_in_existing_zg', cluster.cluster_id,
- self.realm_name, zg_name, zone_name, master_zg_first_zone_port, endpoints,
- self.credentials.access_key, self.credentials.secret))
-
- self.add_zone(cluster, zg_name, zone_name, is_master)
- cluster.start_rgw()
-
- def add_zone(self, cluster, zg_name, zone_name, is_master):
- zone = RGWZone(self.realm_name, cluster, zg_name, zone_name)
- self.zones[zone_name] = zone
-
- if is_master:
- self.master_zone = zone
- if self.is_master_zg:
- realm.master_zone = zone
-
- def remove_zone(self, zone_name):
- del self.zones[zone_name]
-
- def get_zone(self, zone_name):
- return self.zones[zone_name]
-
- def get_zones(self):
- for (k, zone) in self.zones.items():
- yield zone
-
- def meta_sync_status(self, zone):
- if zone.zone_name == realm.master_zone.zone_name:
- return None
-
- while True:
- (meta_sync_status_json, retcode) = zone.cluster.rgw_admin_ro('--rgw-realm=' + self.realm_name + ' metadata sync status', check_retcode = False)
- if retcode == 0:
- break
-
- assert(retcode == 2) # ENOENT
-
- meta_sync_status_json = meta_sync_status_json.decode('utf-8')
- log(20, 'current meta sync status=', meta_sync_status_json)
- sync_status = json.loads(meta_sync_status_json)
-
- global_sync_status=sync_status['sync_status']['info']['status']
- num_shards=sync_status['sync_status']['info']['num_shards']
-
- sync_markers=sync_status['sync_status']['markers']
- log(20, 'sync_markers=', sync_markers)
- assert(num_shards == len(sync_markers))
-
- markers={}
- for i in range(num_shards):
- markers[i] = sync_markers[i]['val']['marker']
-
- return (num_shards, markers)
-
- def meta_master_log_status(self, master_zone):
- (mdlog_status_json, retcode) = master_zone.cluster.rgw_admin_ro('--rgw-realm=' + self.realm_name + ' mdlog status')
- mdlog_status = json.loads(mdlog_status_json.decode('utf-8'))
-
- markers={}
- i = 0
- for s in mdlog_status:
- markers[i] = s['marker']
- i += 1
-
- log(20, 'master meta markers=', markers)
-
- return markers
-
- def compare_meta_status(self, zone, log_status, sync_status):
- if len(log_status) != len(sync_status):
- log(10, 'len(log_status)=', len(log_status), ' len(sync_status=', len(sync_status))
- return False
-
- msg = ''
- for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
- if l > s:
- if len(s) != 0:
- msg += ', '
- msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
-
- if len(msg) > 0:
- log(1, 'zone ', zone.zone_name, ' behind master: ', msg)
- return False
-
- return True
-
- def zone_meta_checkpoint(self, zone):
- if zone.zone_name == realm.master_zone.zone_name:
- return
-
- log(10, 'starting meta checkpoint for zone=', zone.zone_name)
-
- while True:
- log_status = self.meta_master_log_status(realm.master_zone)
- (num_shards, sync_status) = self.meta_sync_status(zone)
-
- log(20, 'log_status=', log_status)
- log(20, 'sync_status=', sync_status)
-
- if self.compare_meta_status(zone, log_status, sync_status):
- break
-
- time.sleep(5)
-
-
- log(10, 'finish meta checkpoint for zone=', zone.zone_name)
-
- def meta_checkpoint(self):
- log(5, 'meta checkpoint')
- for z in self.get_zones():
- self.zone_meta_checkpoint(z)
-
- def data_sync_status(self, target_zone, source_zone):
- if target_zone.zone_name == source_zone.zone_name:
- return None
-
- while True:
- (data_sync_status_json, retcode) = target_zone.cluster.rgw_admin_ro('--rgw-realm=' + self.realm_name + ' data sync status --source-zone=' + source_zone.zone_name, check_retcode = False)
- if retcode == 0:
- break
-
- assert(retcode == 2) # ENOENT
-
- data_sync_status_json = data_sync_status_json.decode('utf-8')
- log(20, 'current data sync status=', data_sync_status_json)
- sync_status = json.loads(data_sync_status_json)
-
- global_sync_status=sync_status['sync_status']['info']['status']
- num_shards=sync_status['sync_status']['info']['num_shards']
-
- sync_markers=sync_status['sync_status']['markers']
- log(20, 'sync_markers=', sync_markers)
- assert(num_shards == len(sync_markers))
-
- markers={}
- for i in range(num_shards):
- markers[i] = sync_markers[i]['val']['marker']
-
- return (num_shards, markers)
-
- def bucket_sync_status(self, target_zone, source_zone, bucket_name):
- if target_zone.zone_name == source_zone.zone_name:
- return None
-
- cmd = '--rgw-realm=' + self.realm_name + ' bucket sync status --source-zone=' + source_zone.zone_name + ' --bucket=' + bucket_name
- global user
- if user.tenant is not None:
- cmd += ' --tenant=' + user.tenant + ' --uid=' + user.uid
- while True:
- (bucket_sync_status_json, retcode) = target_zone.cluster.rgw_admin_ro(cmd, check_retcode = False)
- if retcode == 0:
- break
-
- assert(retcode == 2) # ENOENT
-
- bucket_sync_status_json = bucket_sync_status_json.decode('utf-8')
- log(20, 'current bucket sync status=', bucket_sync_status_json)
- sync_status = json.loads(bucket_sync_status_json)
-
- markers={}
- for entry in sync_status:
- val = entry['val']
- if val['status'] == 'incremental-sync':
- pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
- else:
- pos = ''
- markers[entry['key']] = pos
-
- return markers
-
- def data_source_log_status(self, source_zone):
- source_cluster = source_zone.cluster
- (datalog_status_json, retcode) = source_cluster.rgw_admin_ro('--rgw-realm=' + self.realm_name + ' datalog status')
- datalog_status = json.loads(datalog_status_json.decode('utf-8'))
-
- markers={}
- i = 0
- for s in datalog_status:
- markers[i] = s['marker']
- i += 1
-
- log(20, 'data markers for zone=', source_zone.zone_name, ' markers=', markers)
-
- return markers
-
- def bucket_source_log_status(self, source_zone, bucket_name):
- cmd = '--rgw-realm=' + self.realm_name + ' bilog status --bucket=' + bucket_name
- global user
- if user.tenant is not None:
- cmd += ' --tenant=' + user.tenant + ' --uid=' + user.uid
- source_cluster = source_zone.cluster
- (bilog_status_json, retcode) = source_cluster.rgw_admin_ro(cmd)
- bilog_status = json.loads(bilog_status_json.decode('utf-8'))
-
- m={}
- markers={}
- try:
- m = bilog_status['markers']
- except:
- pass
-
- for s in m:
- key = s['key']
- val = s['val']
- markers[key] = val
-
- log(20, 'bilog markers for zone=', source_zone.zone_name, ' bucket=', bucket_name, ' markers=', markers)
-
- return markers
-
- def compare_data_status(self, target_zone, source_zone, log_status, sync_status):
- if len(log_status) != len(sync_status):
- log(10, 'len(log_status)=', len(log_status), ' len(sync_status)=', len(sync_status))
- return False
-
- msg = ''
- for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
- if l > s:
- if len(s) != 0:
- msg += ', '
- msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
-
- if len(msg) > 0:
- log(1, 'data of zone ', target_zone.zone_name, ' behind zone ', source_zone.zone_name, ': ', msg)
- return False
-
- return True
-
- def compare_bucket_status(self, target_zone, source_zone, bucket_name, log_status, sync_status):
- if len(log_status) != len(sync_status):
- log(10, 'len(log_status)=', len(log_status), ' len(sync_status)=', len(sync_status))
- return False
-
- msg = ''
- for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
- if l > s:
- if len(s) != 0:
- msg += ', '
- msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
-
- if len(msg) > 0:
- log(1, 'bucket ', bucket_name, ' zone ', target_zone.zone_name, ' behind zone ', source_zone.zone_name, ': ', msg)
- return False
-
- return True
-
- def zone_data_checkpoint(self, target_zone, source_zone):
- if target_zone.zone_name == source_zone.zone_name:
- return
-
- log(10, 'starting data checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name)
-
- while True:
- log_status = self.data_source_log_status(source_zone)
- (num_shards, sync_status) = self.data_sync_status(target_zone, source_zone)
-
- log(20, 'log_status=', log_status)
- log(20, 'sync_status=', sync_status)
-
- if self.compare_data_status(target_zone, source_zone, log_status, sync_status):
- break
-
- time.sleep(5)
-
- log(10, 'finished data checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name)
-
- def zone_bucket_checkpoint(self, target_zone, source_zone, bucket_name):
- if target_zone.zone_name == source_zone.zone_name:
- return
-
- log(10, 'starting bucket checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name, ' bucket_name=', bucket_name)
-
- while True:
- log_status = self.bucket_source_log_status(source_zone, bucket_name)
- sync_status = self.bucket_sync_status(target_zone, source_zone, bucket_name)
-
- log(20, 'log_status=', log_status)
- log(20, 'sync_status=', sync_status)
-
- if self.compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
- break
-
- time.sleep(5)
-
- log(10, 'finished bucket checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name, ' bucket_name=', bucket_name)
-
-
- def create_user(self, user, wait_meta = True):
- log(5, 'creating user uid=', user.uid)
- cmd = build_cmd('--uid', user.uid, '--display-name', user.display_name,
- '--access-key', user.access_key, '--secret', user.secret)
- if user.tenant is not None:
- cmd += ' --tenant ' + user.tenant
- self.master_zone.cluster.rgw_admin('--rgw-realm=' + self.realm_name + ' user create ' + cmd)
-
- if wait_meta:
- self.meta_checkpoint()
-
- def set_master_zone(self, zone):
- (zg_json, retcode) = zone.cluster.rgw_admin('--rgw-realm=' + self.realm_name + ' --rgw-zonegroup=' + zone.zg_name + ' --rgw-zone=' + zone.zone_name + ' zone modify --master=1')
- (period_json, retcode) = zone.cluster.rgw_admin('--rgw-realm=' + self.realm_name + ' period update --commit')
- self.master_zone = zone
- if self.is_master_zg:
- realm.master_zone = zone
-
-class RGWRealm:
- def __init__(self, realm_name, credentials):
- self.realm_name = realm_name
- self.credentials = credentials
- self.zonegroups = {}
- self.zones = {}
-
- def get_zone(self, zone_name):
- return self.zones[zone_name]
-
- def get_zones(self):
- for (k, zone) in self.zones.iteritems():
- yield zone
-
- def add_zonegroup(self, zg_name, zonegroup, is_master_zg):
- self.zonegroups[zg_name] = zonegroup
-
- for zone in zonegroup.get_zones():
- self.zones[zone.zone_name] = zone
+ cmd = [mstart_path + 'mstop.sh', self.cluster_id]
+ bash(cmd)
+
+class Gateway(multisite.Gateway):
+ """ gateway implementation based on mrgw/mstop scripts """
+ def __init__(self, client_id = None, *args, **kwargs):
+ super(Gateway, self).__init__(*args, **kwargs)
+ self.id = client_id
+
+ def start(self, args = []):
+ """ start the gateway """
+ assert(self.cluster)
+ cmd = [mstart_path + 'mrgw.sh', self.cluster.cluster_id, str(self.port)]
+ if self.id:
+ cmd += ['-i', self.id]
+ cmd += ['--debug-rgw=20', '--debug-ms=1']
+ cmd += args
+ bash(cmd)
- if is_master_zg:
- self.master_zonegroup = zonegroup
-
- def get_zonegroup(self, zonegroup_name):
- return self.zonegroups[zonegroup_name]
-
- def get_zonegroups(self):
- for (k, zonegroup) in self.zonegroups.iteritems():
- yield zonegroup
-
- def meta_checkpoint(self):
- log(5, 'meta checkpoint')
- for zg in self.get_zonegroups():
- zg.meta_checkpoint()
-
-class RGWUser:
- def __init__(self, uid, display_name, access_key, secret, tenant):
- self.uid = uid
- self.display_name = display_name
- self.access_key = access_key
- self.secret = secret
- self.tenant = tenant
+ def stop(self):
+ """ stop the gateway """
+ assert(self.cluster)
+ cmd = [mstart_path + 'mstop.sh', self.cluster.cluster_id, 'radosgw', self.id]
+ bash(cmd)
def gen_access_key():
- return ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(16))
+ return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(16))
def gen_secret():
- return ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(32))
-
-def gen_bucket_name():
- global num_buckets
-
- num_buckets += 1
- return run_prefix + '-' + str(num_buckets)
-
-class RGWMulti:
- def __init__(self, zg_num, num_clusters, gateways_per_cluster, base_port, base_port_master_zg):
- self.num_clusters = num_clusters
-
- self.zg_num = zg_num
- self.zg_name = 'zg' + str(zg_num)
-
- self.base_port = base_port
- self.base_port_master_zg = base_port_master_zg
-
- self.clusters = {}
- for i in range(num_clusters):
- self.clusters[i] = RGWCluster(self.zg_num, i + 1, 'zg' + str(zg_num) + '-c' + str(i + 1), self.base_port + i * gateways_per_cluster, gateways_per_cluster)
-
- def setup(self, bootstrap, tenant):
- global realm
- global master_zg
- global realm_credentials
- global user
-
- is_master_zg = (self.base_port == self.base_port_master_zg)
- if is_master_zg:
- realm_credentials = RGWRealmCredentials(gen_access_key(), gen_secret())
- realm = RGWRealm('earth', realm_credentials)
- rgw_zg = RGWZonegroup('earth', realm_credentials, self.clusters)
-
- if bootstrap:
- log(1, 'bootstrapping clusters')
- self.clusters[0].start()
- rgw_zg.init_zone(self.clusters[0], self.zg_name, self.zg_name + '-1', self.base_port, self.base_port_master_zg)
-
- for i in range(1, self.num_clusters):
- self.clusters[i].start()
- rgw_zg.init_zone(self.clusters[i], self.zg_name, self.zg_name + '-' + str(i + 1), self.base_port, self.base_port_master_zg)
- else:
- for i in range(0, self.num_clusters):
- rgw_zg.add_zone(self.clusters[i], self.zg_name, self.zg_name + '-' + str(i + 1), (i == 0))
-
- realm.add_zonegroup(self.zg_name, rgw_zg, is_master_zg)
- rgw_zg.meta_checkpoint()
-
- if is_master_zg:
- master_zg = rgw_zg
- user = RGWUser('tester', '"Test User"', gen_access_key(), gen_secret(), tenant)
- master_zg.create_user(user)
-
-def check_all_buckets_exist(zone, buckets):
- conn = zone.get_connection(user)
-
- for b in buckets:
- try:
- conn.get_bucket(b)
- except:
- log(0, 'zone ', zone.zone_name, ' does not contain bucket ', b)
- return False
-
- return True
-
-def check_all_buckets_dont_exist(zone, buckets):
- conn = zone.get_connection(user)
-
- for b in buckets:
- try:
- conn.get_bucket(b)
- except:
- continue
-
- log(0, 'zone ', zone.zone_name, ' contains bucket ', b)
- return False
-
- return True
-
-def create_bucket_per_zone_in_master_zg():
- buckets = []
- zone_bucket = {}
- for zone in master_zg.get_zones():
- conn = zone.get_connection(user)
- bucket_name = gen_bucket_name()
- log(1, 'create bucket zone=', zone.zone_name, ' name=', bucket_name)
- bucket = conn.create_bucket(bucket_name)
- buckets.append(bucket_name)
- zone_bucket[zone] = bucket
-
- return buckets, zone_bucket
-
-def create_bucket_per_zone_in_realm():
- buckets = []
- zone_bucket = {}
- for zone in realm.get_zones():
- conn = zone.get_connection(user)
- bucket_name = gen_bucket_name()
- log(1, 'create bucket zone=', zone.zone_name, ' name=', bucket_name)
- bucket = conn.create_bucket(bucket_name, None, zone.zg_name)
- buckets.append(bucket_name)
- zone_bucket[zone] = bucket
-
- return buckets, zone_bucket
-
-def test_bucket_create():
- buckets, _ = create_bucket_per_zone_in_master_zg()
- master_zg.meta_checkpoint()
-
- for zone in master_zg.get_zones():
- assert check_all_buckets_exist(zone, buckets)
-
-def test_bucket_recreate():
- buckets, _ = create_bucket_per_zone_in_master_zg()
- master_zg.meta_checkpoint()
-
- for zone in master_zg.get_zones():
- assert check_all_buckets_exist(zone, buckets)
-
- # recreate buckets on all zones, make sure they weren't removed
- for zone in master_zg.get_zones():
- for bucket_name in buckets:
- conn = zone.get_connection(user)
- bucket = conn.create_bucket(bucket_name)
-
- for zone in master_zg.get_zones():
- assert check_all_buckets_exist(zone, buckets)
-
- master_zg.meta_checkpoint()
-
- for zone in master_zg.get_zones():
- assert check_all_buckets_exist(zone, buckets)
-
-def test_bucket_remove():
- buckets, zone_bucket = create_bucket_per_zone_in_master_zg()
- master_zg.meta_checkpoint()
-
- for zone in master_zg.get_zones():
- assert check_all_buckets_exist(zone, buckets)
-
- for zone, bucket_name in zone_bucket.items():
- conn = zone.get_connection(user)
- conn.delete_bucket(bucket_name)
-
- master_zg.meta_checkpoint()
-
- for zone in master_zg.get_zones():
- assert check_all_buckets_dont_exist(zone, buckets)
-
-def get_bucket(zone, bucket_name):
- conn = zone.get_connection(user)
- return conn.get_bucket(bucket_name)
-
-def get_key(zone, bucket_name, obj_name):
- b = get_bucket(zone, bucket_name)
- return b.get_key(obj_name)
-
-def new_key(zone, bucket_name, obj_name):
- b = get_bucket(zone, bucket_name)
- return b.new_key(obj_name)
-
-def check_object_eq(k1, k2, check_extra = True):
- assert k1
- assert k2
- log(10, 'comparing key name=', k1.name)
- eq(k1.name, k2.name)
- eq(k1.get_contents_as_string(), k2.get_contents_as_string())
- eq(k1.metadata, k2.metadata)
- eq(k1.cache_control, k2.cache_control)
- eq(k1.content_type, k2.content_type)
- eq(k1.content_encoding, k2.content_encoding)
- eq(k1.content_disposition, k2.content_disposition)
- eq(k1.content_language, k2.content_language)
- eq(k1.etag, k2.etag)
- eq(k1.last_modified, k2.last_modified)
- if check_extra:
- eq(k1.owner.id, k2.owner.id)
- eq(k1.owner.display_name, k2.owner.display_name)
- eq(k1.storage_class, k2.storage_class)
- eq(k1.size, k2.size)
- eq(k1.version_id, k2.version_id)
- eq(k1.encrypted, k2.encrypted)
-
-def check_bucket_eq(zone1, zone2, bucket_name):
- log(10, 'comparing bucket=', bucket_name, ' zones={', zone1.zone_name, ', ', zone2.zone_name, '}')
- b1 = get_bucket(zone1, bucket_name)
- b2 = get_bucket(zone2, bucket_name)
-
- log(20, 'bucket1 objects:')
- for o in b1.get_all_versions():
- log(20, 'o=', o.name)
- log(20, 'bucket2 objects:')
- for o in b2.get_all_versions():
- log(20, 'o=', o.name)
-
- for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()):
- if k1 is None:
- log(0, 'failure: key=', k2.name, ' is missing from zone=', zone1.zone_name)
- assert False
- if k2 is None:
- log(0, 'failure: key=', k1.name, ' is missing from zone=', zone2.zone_name)
- assert False
-
- check_object_eq(k1, k2)
-
- # now get the keys through a HEAD operation, verify that the available data is the same
- k1_head = b1.get_key(k1.name)
- k2_head = b2.get_key(k2.name)
-
- check_object_eq(k1_head, k2_head, False)
-
- log(5, 'success, bucket identical: bucket=', bucket_name, ' zones={', zone1.zone_name, ', ', zone2.zone_name, '}')
-
-
-def test_object_sync():
- buckets, zone_bucket = create_bucket_per_zone_in_master_zg()
-
- all_zones = []
- for z in zone_bucket:
- all_zones.append(z)
-
- objnames = [ 'myobj', '_myobj', ':', '&' ]
- content = 'asdasd'
-
- # don't wait for meta sync just yet
- for zone, bucket_name in zone_bucket.items():
- for objname in objnames:
- k = new_key(zone, bucket_name, objname)
- k.set_contents_from_string(content)
-
- master_zg.meta_checkpoint()
-
- for source_zone, bucket in zone_bucket.items():
- for target_zone in all_zones:
- if source_zone.zone_name == target_zone.zone_name:
- continue
-
- master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
-
- check_bucket_eq(source_zone, target_zone, bucket)
-
-def test_object_delete():
- buckets, zone_bucket = create_bucket_per_zone_in_master_zg()
-
- all_zones = []
- for z in zone_bucket:
- all_zones.append(z)
-
- objname = 'myobj'
- content = 'asdasd'
-
- # don't wait for meta sync just yet
- for zone, bucket in zone_bucket.items():
- k = new_key(zone, bucket, objname)
- k.set_contents_from_string(content)
-
- master_zg.meta_checkpoint()
-
- # check object exists
- for source_zone, bucket in zone_bucket.items():
- for target_zone in all_zones:
- if source_zone.zone_name == target_zone.zone_name:
- continue
-
- master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
-
- check_bucket_eq(source_zone, target_zone, bucket)
-
- # check object removal
- for source_zone, bucket in zone_bucket.items():
- k = get_key(source_zone, bucket, objname)
- k.delete()
- for target_zone in all_zones:
- if source_zone.zone_name == target_zone.zone_name:
- continue
-
- master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
-
- check_bucket_eq(source_zone, target_zone, bucket)
-
-def get_latest_object_version(key):
- for k in key.bucket.list_versions(key.name):
- if k.is_latest:
- return k
- return None
-
-def test_versioned_object_incremental_sync():
- buckets, zone_bucket = create_bucket_per_zone_in_master_zg()
-
- # enable versioning
- all_zones = []
- for zone, bucket in zone_bucket.items():
- bucket.configure_versioning(True)
- all_zones.append(zone)
-
- master_zg.meta_checkpoint()
-
- # upload a dummy object to each bucket and wait for sync. this forces each
- # bucket to finish a full sync and switch to incremental
- for source_zone, bucket in zone_bucket.items():
- new_key(source_zone, bucket, 'dummy').set_contents_from_string('')
- for target_zone in all_zones:
- if source_zone.zone_name == target_zone.zone_name:
- continue
- master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
-
- for _, bucket in zone_bucket.items():
- # create and delete multiple versions of an object from each zone
- for zone in all_zones:
- obj = 'obj-' + zone.zone_name
- k = new_key(zone, bucket, obj)
-
- k.set_contents_from_string('version1')
- v = get_latest_object_version(k)
- log(10, 'version1 id=', v.version_id)
- # don't delete version1 - this tests that the initial version
- # doesn't get squashed into later versions
-
- # create and delete the following object versions to test that
- # the operations don't race with each other during sync
- k.set_contents_from_string('version2')
- v = get_latest_object_version(k)
- log(10, 'version2 id=', v.version_id)
- k.bucket.delete_key(obj, version_id=v.version_id)
-
- k.set_contents_from_string('version3')
- v = get_latest_object_version(k)
- log(10, 'version3 id=', v.version_id)
- k.bucket.delete_key(obj, version_id=v.version_id)
-
- for source_zone, bucket in zone_bucket.items():
- for target_zone in all_zones:
- if source_zone.zone_name == target_zone.zone_name:
- continue
- master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
- check_bucket_eq(source_zone, target_zone, bucket)
-
-def test_bucket_versioning():
- buckets, zone_bucket = create_bucket_per_zone_in_realm()
-
- for zone, bucket in zone_bucket.items():
- bucket.configure_versioning(True)
- res = bucket.get_versioning_status()
- key = 'Versioning'
- assert(key in res and res[key] == 'Enabled')
-
-def test_bucket_acl():
- buckets, zone_bucket = create_bucket_per_zone_in_master_zg()
-
- for zone, bucket in zone_bucket.items():
- assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner
- bucket.set_acl('public-read')
- assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers
-
-def test_bucket_delete_notempty():
- buckets, zone_bucket = create_bucket_per_zone_in_master_zg()
- master_zg.meta_checkpoint()
-
- for zone, bucket_name in zone_bucket.items():
- # upload an object to each bucket on its own zone
- conn = zone.get_connection(user)
- bucket = conn.get_bucket(bucket_name)
- k = bucket.new_key('foo')
- k.set_contents_from_string('bar')
- # attempt to delete the bucket before this object can sync
- try:
- conn.delete_bucket(bucket_name)
- except boto.exception.S3ResponseError, e:
- assert(e.error_code == 'BucketNotEmpty')
- continue
- assert False # expected 409 BucketNotEmpty
-
- # assert that each bucket still exists on the master
- z1 = master_zg.get_zone('zg1-1')
- c1 = z1.get_connection(user)
- for _, bucket_name in zone_bucket.items():
- assert c1.get_bucket(bucket_name)
-
-def test_multi_period_incremental_sync():
- if len(master_zg.clusters) < 3:
- from nose.plugins.skip import SkipTest
- raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more clusters.")
-
- buckets, zone_bucket = create_bucket_per_zone_in_master_zg()
-
- all_zones = []
- for z in zone_bucket:
- all_zones.append(z)
-
- for zone, bucket_name in zone_bucket.items():
- for objname in [ 'p1', '_p1' ]:
- k = new_key(zone, bucket_name, objname)
- k.set_contents_from_string('asdasd')
- master_zg.meta_checkpoint()
-
- # kill zone 3 gateway to freeze sync status to incremental in first period
- z3 = master_zg.get_zone('zg1-3')
- z3.cluster.stop_rgw()
-
- # change master to zone 2 -> period 2
- master_zg.set_master_zone(master_zg.get_zone('zg1-2'))
-
- for zone, bucket_name in zone_bucket.items():
- if zone == z3:
- continue
- for objname in [ 'p2', '_p2' ]:
- k = new_key(zone, bucket_name, objname)
- k.set_contents_from_string('qweqwe')
-
- # wait for zone 1 to sync
- master_zg.zone_meta_checkpoint(master_zg.get_zone('zg1-1'))
-
- # change master back to zone 1 -> period 3
- master_zg.set_master_zone(master_zg.get_zone('zg1-1'))
-
- for zone, bucket_name in zone_bucket.items():
- if zone == z3:
- continue
- for objname in [ 'p3', '_p3' ]:
- k = new_key(zone, bucket_name, objname)
- k.set_contents_from_string('zxczxc')
-
- # restart zone 3 gateway and wait for sync
- z3.cluster.start_rgw()
- master_zg.meta_checkpoint()
-
- # verify that we end up with the same objects
- for source_zone, bucket in zone_bucket.items():
- for target_zone in all_zones:
- if source_zone.zone_name == target_zone.zone_name:
- continue
-
- master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
-
- check_bucket_eq(source_zone, target_zone, bucket)
-
-@attr('destructive')
-def test_zonegroup_remove():
- z1 = master_zg.get_zone('zg1-1')
-
- # try to 'zone delete' zg1-2 from cluster 1
- # must fail with ENOENT because the zone is local to cluster 2
- (_, retcode) = z1.cluster.rgw_admin('zone delete --rgw-zone=zg1-2', False)
- assert(retcode == 2) # ENOENT
-
- # use 'zonegroup remove', expecting success
- z1.cluster.rgw_admin('zonegroup remove --rgw-zone=zg1-2', True)
-
- # another 'zonegroup remove' should fail with ENOENT
- (_, retcode) = z1.cluster.rgw_admin('zonegroup remove --rgw-zone=zg1-2', False)
- assert(retcode == 2) # ENOENT
-
- # validate the resulting period
- z1.cluster.rgw_admin('period update --commit', True)
- master_zg.remove_zone('zg1-2')
+ return ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(32))
+
+def gen_credentials():
+ return multisite.Credentials(gen_access_key(), gen_secret())
+
+def cluster_name(cluster_num):
+ return 'c' + str(cluster_num)
+
+def zonegroup_name(zonegroup_num):
+ return string.ascii_lowercase[zonegroup_num]
+
+def zone_name(zonegroup_num, zone_num):
+ return zonegroup_name(zonegroup_num) + str(zone_num + 1)
+
+def gateway_port(zonegroup_num, gateway_num):
+ return 8000 + 100 * zonegroup_num + gateway_num
+
+def gateway_name(zonegroup_num, zone_num, gateway_num):
+ return zone_name(zonegroup_num, zone_num) + '-' + str(gateway_num + 1)
+
+def zone_endpoints(zonegroup_num, zone_num, gateways_per_zone):
+ endpoints = []
+ base = gateway_port(zonegroup_num, zone_num * gateways_per_zone)
+ for i in range(0, gateways_per_zone):
+ endpoints.append('http://localhost:' + str(base + i))
+ return endpoints
+
+def get_log_level(log_level):
+ if log_level >= 20:
+ return logging.DEBUG
+ if log_level >= 10:
+ return logging.INFO
+ if log_level >= 5:
+ return logging.WARN
+ if log_level >= 1:
+ return logging.ERROR
+ return logging.CRITICAL
+
+def setup_logging(log_level_console, log_file, log_level_file):
+ if log_file:
+ formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
+ fh = logging.FileHandler(log_file)
+ fh.setFormatter(formatter)
+ fh.setLevel(get_log_level(log_level_file))
+ log.addHandler(fh)
+
+ formatter = logging.Formatter('%(levelname)s %(message)s')
+ ch = logging.StreamHandler()
+ ch.setFormatter(formatter)
+ ch.setLevel(get_log_level(log_level_console))
+ log.addHandler(ch)
def init(parse_args):
cfg = configparser.RawConfigParser({
'gateways_per_zone': 2,
'no_bootstrap': 'false',
'log_level': 20,
+ 'log_file': None,
+ 'file_log_level': 20,
'tenant': None,
})
try:
parser.add_argument('--gateways-per-zone', type=int, default=cfg.getint(section, 'gateways_per_zone'))
parser.add_argument('--no-bootstrap', action='store_true', default=cfg.getboolean(section, 'no_bootstrap'))
parser.add_argument('--log-level', type=int, default=cfg.getint(section, 'log_level'))
+ parser.add_argument('--log-file', type=str, default=cfg.get(section, 'log_file'))
+ parser.add_argument('--file-log-level', type=int, default=cfg.getint(section, 'file_log_level'))
parser.add_argument('--tenant', type=str, default=cfg.get(section, 'tenant'))
argv = []
argv = sys.argv[1:]
args = parser.parse_args(argv)
+ bootstrap = not args.no_bootstrap
+
+ setup_logging(args.log_level, args.log_file, args.file_log_level)
+
+ # start first cluster
+ c1 = Cluster(cluster_name(1))
+ if bootstrap:
+ c1.start()
+ clusters = []
+ clusters.append(c1)
+
+ admin_creds = gen_credentials()
+ admin_user = multisite.User('zone.user')
- global log_level
- log_level = args.log_level
+ user_creds = gen_credentials()
+ user = multisite.User('tester')
- master_zg_base_port = 8000
+ realm = multisite.Realm('r')
+ if bootstrap:
+ # create the realm on c1
+ realm.create(c1)
+ else:
+ realm.get(c1)
+ period = multisite.Period(realm=realm)
+ realm.current_period = period
- for i in range(0, args.num_zonegroups):
- rgw_multi = RGWMulti(i + 1, int(args.num_zones), int(args.gateways_per_zone), master_zg_base_port + 100 * i, master_zg_base_port)
- rgw_multi.setup(not args.no_bootstrap, args.tenant)
+ for zg in range(0, args.num_zonegroups):
+ zonegroup = multisite.ZoneGroup(zonegroup_name(zg), period)
+ period.zonegroups.append(zonegroup)
+ is_master_zg = zg == 0
+ if is_master_zg:
+ period.master_zonegroup = zonegroup
+
+ for z in range(0, args.num_zones):
+ is_master = z == 0
+ # start a cluster, or use c1 for first zone
+ cluster = None
+ if is_master_zg and is_master:
+ cluster = c1
+ else:
+ cluster = Cluster(cluster_name(len(clusters) + 1))
+ clusters.append(cluster)
+ if bootstrap:
+ cluster.start()
+ # pull realm configuration from the master's gateway
+ gateway = realm.meta_master_zone().gateways[0]
+ realm.pull(cluster, gateway, admin_creds)
+
+ endpoints = zone_endpoints(zg, z, args.gateways_per_zone)
+ if is_master:
+ if bootstrap:
+ # create the zonegroup on its first zone's cluster
+ arg = []
+ if is_master_zg:
+ arg += ['--master']
+ if len(endpoints): # use master zone's endpoints
+ arg += ['--endpoints', ','.join(endpoints)]
+ zonegroup.create(cluster, arg)
+ else:
+ zonegroup.get(cluster)
+
+ # create the zone in its zonegroup
+ zone = multisite.Zone(zone_name(zg, z), zonegroup, cluster)
+ if bootstrap:
+ arg = admin_creds.credential_args()
+ if is_master:
+ arg += ['--master']
+ if len(endpoints):
+ arg += ['--endpoints', ','.join(endpoints)]
+ zone.create(cluster, arg)
+ else:
+ zone.get(cluster)
+ zonegroup.zones.append(zone)
+ if is_master:
+ zonegroup.master_zone = zone
+
+ # update/commit the period
+ if bootstrap:
+ period.update(zone, commit=True)
+
+ # start the gateways
+ for g in range(0, args.gateways_per_zone):
+ port = gateway_port(zg, g + z * args.gateways_per_zone)
+ client_id = gateway_name(zg, z, g)
+ gateway = Gateway(client_id, 'localhost', port, cluster, zone)
+ if bootstrap:
+ gateway.start()
+ zone.gateways.append(gateway)
+
+ if is_master_zg and is_master:
+ if bootstrap:
+ # create admin user
+ arg = ['--display-name', '"Zone User"', '--system']
+ arg += admin_creds.credential_args()
+ admin_user.create(zone, arg)
+ # create test user
+ arg = ['--display-name', '"Test User"']
+ arg += user_creds.credential_args()
+ if args.tenant:
+ cmd += ['--tenant', args.tenant]
+ user.create(zone, arg)
+ else:
+ # read users and update keys
+ admin_user.info(zone)
+ admin_creds = admin_user.credentials[0]
+ user.info(zone)
+ user_creds = user.credentials[0]
+
+ if not bootstrap:
+ period.get(c1)
+
+ init_multi(realm, user)
def setup_module():
init(False)