import sys
import time
+import inspect
+
+def lineno():
+ return inspect.currentframe().f_back.f_lineno
+
mstart_path = os.getenv('MSTART_PATH')
if mstart_path is None:
mstart_path = os.path.normpath(os.path.dirname(os.path.realpath(__file__)) + '/../..') + '/'
test_path = os.path.normpath(os.path.dirname(os.path.realpath(__file__))) + '/'
+def log(*params):
+ s = '>>> '
+ for p in params:
+ if p:
+ s += str(p)
+
+ print s
+ sys.stdout.flush()
+
+def build_cmd(*params):
+ s = ''
+ for p in params:
+ if len(s) != 0:
+ s += ' '
+ s += p
+
+ return s
+
def mpath(bin, *params):
s = mstart_path + bin
for p in params:
return s
def bash(cmd, check_retcode = True):
- print 'cmd:', cmd
+ log('running cmd: ', cmd)
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
s = process.communicate()[0]
- print s
+ log('command returned status=', process.returncode, ' stdout=', s)
if check_retcode:
assert(process.returncode == 0)
return (s, process.returncode)
def rgw_admin(self, cmd, check_retcode = True):
(s, retcode) = bash(tpath('test-rgw-call.sh', 'call_rgw_admin', self.cluster_num, cmd))
- print s
return (s, retcode)
def rgw_admin_ro(self, cmd, check_retcode = True):
(s, retcode) = bash(tpath('test-rgw-call.sh', 'call_rgw_admin', self.cluster_num, '--rgw-cache-enabled=false ' + cmd), check_retcode)
- print s
return (s, retcode)
class RGWRealm:
- def __init__(self, realm, credentials, master_index):
+ def __init__(self, realm, credentials, clusters, master_index):
self.realm = realm
self.credentials = credentials
+ self.clusters = clusters
self.master_index = master_index
+ self.master_cluster = clusters[master_index]
def init_zone(self, cluster, zg, zone_name, port, first_zone_port=0):
if first_zone_port == 0:
self.credentials.access_key, self.credentials.secret))
def meta_sync_status(self, cluster):
- if cluster.cluster_num == self.master_index:
+ if cluster.cluster_num == self.master_cluster.cluster_num:
return None
while True:
assert(retcode == 2) # ENOENT
- print 'm=', meta_sync_status_json
- meta_sync_status = json.loads(meta_sync_status_json)
+ log('current meta sync status=', meta_sync_status_json)
+ sync_status = json.loads(meta_sync_status_json)
- global_sync_status=meta_sync_status['sync_status']['info']['status']
- num_shards=meta_sync_status['sync_status']['info']['num_shards']
+ global_sync_status=sync_status['sync_status']['info']['status']
+ num_shards=sync_status['sync_status']['info']['num_shards']
- sync_markers=meta_sync_status['sync_status']['markers']
- print 'sync_markers=', sync_markers
+ sync_markers=sync_status['sync_status']['markers']
+ log('sync_markers=', sync_markers)
assert(num_shards == len(sync_markers))
markers={}
markers[i] = s['marker']
i += 1
+ log('master meta markers=', markers)
+
return markers
- def meta_checkpoint(self, master_cluster, cluster):
- if cluster.cluster_num == self.master_index:
+ def compare_meta_status(self, cluster, log_status, sync_status):
+ if len(log_status) != len(sync_status):
+ log('len(log_status)=', len(log_status), ' len(sync_status=', len(sync_status))
+ return False
+
+ i = 0
+ msg = ''
+ for l, s in zip(log_status, sync_status):
+ if l > s:
+ if len(s) != 0:
+ msg += ', '
+ msg += 'shard=' + str(i) + ' master=' + ' target=' + s
+ i += 1
+
+ if len(msg) > 0:
+ log('cluster ', cluster.cluster_id, ' behind master: ', msg)
+ return False
+
+ return True
+
+ def cluster_meta_checkpoint(self, cluster):
+ if cluster.cluster_num == self.master_cluster.cluster_num:
return
+ log('starting meta checkpoint for cluster=', cluster.cluster_id)
+
while True:
- log_status = self.meta_master_log_status(master_cluster)
+ log_status = self.meta_master_log_status(self.master_cluster)
(num_shards, sync_status) = self.meta_sync_status(cluster)
- print 'log_status', log_status
- print 'sync_status', sync_status
-
- if (log_status == sync_status):
+ if self.compare_meta_status(cluster, log_status, sync_status):
break
time.sleep(5)
- bash(tpath('test-rgw-call.sh', 'wait_for_meta_sync', self.master_index + 1, cluster.cluster_num, self.realm))
+ log('finish meta checkpoint for cluster=', cluster.cluster_id)
+
+ def meta_checkpoint(self):
+ log('meta checkpoint')
+ for (index, c) in self.clusters.iteritems():
+ self.cluster_meta_checkpoint(c)
+
+
+ def create_user(self, user, wait_meta = True):
+ log('creating user uid=', user.uid)
+ cmd = build_cmd('--uid', user.uid, '--display-name', user.display_name,
+ '--access-key', user.access_key, '--secret', user.secret)
+ self.master_cluster.rgw_admin('--rgw-realm=' + self.realm + ' user create ' + cmd)
+
+ if wait_meta:
+ self.meta_checkpoint()
+
+
+
+
+class RGWUser:
+ def __init__(self, uid, display_name, access_key, secret):
+ self.uid = uid
+ self.display_name = display_name
+ self.access_key = access_key
+ self.secret = secret
def gen_access_key():
return ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(16))
def setup(self, bootstrap):
credentials = RGWRealmCredentials(gen_access_key(), gen_secret())
- realm = RGWRealm('earth', credentials, 0)
+ realm = RGWRealm('earth', credentials, self.clusters, 0)
if bootstrap:
+ log('bootstapping clusters')
self.clusters[0].start()
realm.init_zone(self.clusters[0], 'us', 'us-1', self.base_port)
self.clusters[i].start()
realm.init_zone(self.clusters[i], 'us', 'us-' + str(i + 1), self.base_port + i, first_zone_port=self.base_port)
- for i in xrange(1, self.num_clusters):
- print 'meta checkpoint start on cluster #', i
- realm.meta_checkpoint(self.clusters[0], self.clusters[i])
- print 'meta checkpoint finish on cluster #', i
+ realm.meta_checkpoint()
+
+ user = RGWUser('tester', '"Test User"', gen_access_key(), gen_secret())
+ realm.create_user(user)
+
def main():