]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/rgw/test_multi: differentiate between zone and zone connection
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 19 Apr 2017 23:25:38 +0000 (16:25 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 30 May 2017 20:26:56 +0000 (13:26 -0700)
Instead of having a Zone type used for the connection, create a new
ZoneConn type that represents the connection. This frees us from the
need to pass in credentials all around.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/test/rgw/rgw_multi/multisite.py
src/test/rgw/rgw_multi/tests.py
src/test/rgw/rgw_multi/tests_es.py [new file with mode: 0644]
src/test/rgw/rgw_multi/zone_es.py
src/test/rgw/rgw_multi/zone_rados.py
src/test/rgw/test_multi.py

index 278b74b6ac8e8b05fcd2b0eaf8ffeba3e09b907a..58bd98224b14a734f7ea20bc55d755be2fd79a80 100644 (file)
@@ -165,11 +165,24 @@ class Zone(SystemObject, SystemObject.CreateDelete, SystemObject.GetSet, SystemO
     def has_buckets(self):
         return True
 
-    def get_connection(self, credentials):
+    def get_conn(self, credentials):
+        return ZoneConn(self, credentials) # not implemented, but can be used
+
+class ZoneConn(object):
+    def __init__(self, zone, credentials):
+        self.zone = zone
+        self.name = zone.name
         """ connect to the zone's first gateway """
         if isinstance(credentials, list):
-            credentials = credentials[0]
-        return get_gateway_connection(self.gateways[0], credentials)
+            self.credentials = credentials[0]
+        else:
+            self.credentials = credentials
+
+        if self.zone.gateways is not None:
+            self.conn = get_gateway_connection(self.zone.gateways[0], self.credentials)
+
+    def get_connection(self):
+        return self.conn
 
     def get_bucket(self, bucket_name, credentials):
         raise NotImplementedError
@@ -186,6 +199,7 @@ class ZoneGroup(SystemObject, SystemObject.CreateDelete, SystemObject.GetSet, Sy
         super(ZoneGroup, self).__init__(data, zonegroup_id)
         self.rw_zones = []
         self.ro_zones = []
+        self.zones_by_type = {}
         for z in self.zones:
             if z.is_read_only():
                 self.ro_zones.append(z)
index c89229fa848ecd35b709487d7efc0c9a5eac1f03..edee9ef5eec367111a501c809354961aff2aa8ba 100644 (file)
@@ -45,6 +45,9 @@ def init_multi(_realm, _user, _config=None):
     global config
     config = _config or Config()
 
+def get_realm():
+    return realm
+
 log = logging.getLogger(__name__)
 
 num_buckets = 0
@@ -328,7 +331,7 @@ def compare_bucket_status(target_zone, source_zone, bucket_name, log_status, syn
 
     return True
 
-def zone_data_checkpoint(target_zone, source_zone):
+def zone_data_checkpoint(target_zone, source_zone_conn):
     if target_zone == source_zone:
         return
 
@@ -387,28 +390,44 @@ def gen_bucket_name():
     num_buckets += 1
     return run_prefix + '-' + str(num_buckets)
 
-def check_all_buckets_exist(zone, buckets):
-    if not zone.has_buckets():
+class ZonegroupConns:
+    def __init__(self, zonegroup):
+        self.zonegroup = zonegroup
+        self.zones = []
+        self.ro_zones = []
+        self.rw_zones = []
+        self.master_zone = None
+        for z in zonegroup.zones:
+            zone_conn = z.get_conn(user.credentials)
+            self.zones.append(zone_conn)
+            if z.is_read_only():
+                self.ro_zones.append(zone_conn)
+            else:
+                self.rw_zones.append(zone_conn)
+
+            if z == zonegroup.master_zone:
+                self.master_zone = zone_conn
+
+def check_all_buckets_exist(zone_conn, buckets):
+    if not zone_conn.zone.has_buckets():
         return True
 
-    conn = zone.get_connection(user.credentials)
     for b in buckets:
         try:
-            conn.get_bucket(b)
+            zone_conn.get_bucket(b)
         except:
             log.critical('zone %s does not contain bucket %s', zone.name, b)
             return False
 
     return True
 
-def check_all_buckets_dont_exist(zone, buckets):
-    if not zone.has_buckets():
+def check_all_buckets_dont_exist(zone_conn, buckets):
+    if not zone_conn.zone.has_buckets():
         return True
 
-    conn = zone.get_connection(user.credentials)
     for b in buckets:
         try:
-            conn.get_bucket(b)
+            zone_conn.get_bucket(b)
         except:
             continue
 
@@ -417,14 +436,13 @@ def check_all_buckets_dont_exist(zone, buckets):
 
     return True
 
-def create_bucket_per_zone(zonegroup):
+def create_bucket_per_zone(zonegroup_conns):
     buckets = []
     zone_bucket = {}
-    for zone in zonegroup.rw_zones:
-        conn = zone.get_connection(user.credentials)
+    for zone in zonegroup_conns.rw_zones:
         bucket_name = gen_bucket_name()
         log.info('create bucket zone=%s name=%s', zone.name, bucket_name)
-        bucket = conn.create_bucket(bucket_name)
+        bucket = zone.create_bucket(bucket_name)
         buckets.append(bucket_name)
         zone_bucket[zone] = bucket
 
@@ -434,61 +452,63 @@ def create_bucket_per_zone_in_realm():
     buckets = []
     zone_bucket = {}
     for zonegroup in realm.current_period.zonegroups:
-        b, z = create_bucket_per_zone(zonegroup)
+        zg_conn = ZonegroupConns(zonegroup)
+        b, z = create_bucket_per_zone(zg_conn)
         buckets.extend(b)
         zone_bucket.update(z)
     return buckets, zone_bucket
 
 def test_bucket_create():
     zonegroup = realm.master_zonegroup()
-    buckets, _ = create_bucket_per_zone(zonegroup)
+    zonegroup_conns = ZonegroupConns(zonegroup)
+    buckets, _ = create_bucket_per_zone(zonegroup_conns)
     zonegroup_meta_checkpoint(zonegroup)
 
-    for zone in zonegroup.zones:
+    for zone in zonegroup_conns.zones:
         assert check_all_buckets_exist(zone, buckets)
 
 def test_bucket_recreate():
     zonegroup = realm.master_zonegroup()
-    buckets, _ = create_bucket_per_zone(zonegroup)
+    zonegroup_conns = ZonegroupConns(zonegroup)
+    buckets, _ = create_bucket_per_zone(zonegroup_conns)
     zonegroup_meta_checkpoint(zonegroup)
 
-    for zone in zonegroup.zones:
+
+    for zone in zonegroup_conns.zones:
         assert check_all_buckets_exist(zone, buckets)
 
     # recreate buckets on all zones, make sure they weren't removed
-    for zone in zonegroup.rw_zones:
+    for zone in zonegroup_conns.rw_zones:
         for bucket_name in buckets:
-            conn = zone.get_connection(user.credentials)
-            bucket = conn.create_bucket(bucket_name)
+            bucket = zone.create_bucket(bucket_name)
 
-    for zone in zonegroup.zones:
+    for zone in zonegroup_conns.zones:
         assert check_all_buckets_exist(zone, buckets)
 
     zonegroup_meta_checkpoint(zonegroup)
 
-    for zone in zonegroup.zones:
+    for zone in zonegroup_conns.zones:
         assert check_all_buckets_exist(zone, buckets)
 
 def test_bucket_remove():
     zonegroup = realm.master_zonegroup()
-    buckets, zone_bucket = create_bucket_per_zone(zonegroup)
+    zonegroup_conns = ZonegroupConns(zonegroup)
+    buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
     zonegroup_meta_checkpoint(zonegroup)
 
-    for zone in zonegroup.zones:
+    for zone in zonegroup_conns.zones:
         assert check_all_buckets_exist(zone, buckets)
 
     for zone, bucket_name in zone_bucket.items():
-        conn = zone.get_connection(user.credentials)
-        conn.delete_bucket(bucket_name)
+        zone.conn.delete_bucket(bucket_name)
 
     zonegroup_meta_checkpoint(zonegroup)
 
-    for zone in zonegroup.zones:
+    for zone in zonegroup_conns.zones:
         assert check_all_buckets_dont_exist(zone, buckets)
 
 def get_bucket(zone, bucket_name):
-    conn = zone.get_connection(user.credentials)
-    return conn.get_bucket(bucket_name)
+    return zone.conn.get_bucket(bucket_name)
 
 def get_key(zone, bucket_name, obj_name):
     b = get_bucket(zone, bucket_name)
@@ -498,12 +518,13 @@ def new_key(zone, bucket_name, obj_name):
     b = get_bucket(zone, bucket_name)
     return b.new_key(obj_name)
 
-def check_bucket_eq(zone1, zone2, bucket):
-    return zone2.check_bucket_eq(zone1, bucket.name, user.credentials)
+def check_bucket_eq(zone_conn1, zone_conn2, bucket):
+    return zone_conn2.check_bucket_eq(zone_conn1, bucket.name)
 
 def test_object_sync():
     zonegroup = realm.master_zonegroup()
-    buckets, zone_bucket = create_bucket_per_zone(zonegroup)
+    zonegroup_conns = ZonegroupConns(zonegroup)
+    buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
 
     objnames = [ 'myobj', '_myobj', ':', '&' ]
     content = 'asdasd'
@@ -516,17 +537,18 @@ def test_object_sync():
 
     zonegroup_meta_checkpoint(zonegroup)
 
-    for source_zone, bucket in zone_bucket.items():
-        for target_zone in zonegroup.zones:
-            if source_zone == target_zone:
+    for source_conn, bucket in zone_bucket.items():
+        for target_conn in zonegroup_conns.zones:
+            if source_conn.zone == target_conn.zone:
                 continue
 
-            zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
-            check_bucket_eq(source_zone, target_zone, bucket)
+            zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
+            check_bucket_eq(source_conn, target_conn, bucket)
 
 def test_object_delete():
     zonegroup = realm.master_zonegroup()
-    buckets, zone_bucket = create_bucket_per_zone(zonegroup)
+    zonegroup_conns = ZonegroupConns(zonegroup)
+    buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
 
     objname = 'myobj'
     content = 'asdasd'
@@ -539,24 +561,24 @@ def test_object_delete():
     zonegroup_meta_checkpoint(zonegroup)
 
     # check object exists
-    for source_zone, bucket in zone_bucket.items():
-        for target_zone in zonegroup.zones:
-            if source_zone == target_zone:
+    for source_conn, bucket in zone_bucket.items():
+        for target_conn in zonegroup_conns.zones:
+            if source_conn.zone == target_conn.zone:
                 continue
 
-            zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
-            check_bucket_eq(source_zone, target_zone, bucket)
+            zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
+            check_bucket_eq(source_conn, target_conn, bucket)
 
     # check object removal
-    for source_zone, bucket in zone_bucket.items():
-        k = get_key(source_zone, bucket, objname)
+    for source_conn, bucket in zone_bucket.items():
+        k = get_key(source_conn, bucket, objname)
         k.delete()
-        for target_zone in zonegroup.zones:
-            if source_zone == target_zone:
+        for target_conn in zonegroup_conns.zones:
+            if source_conn.zone == target_conn.zone:
                 continue
 
-            zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
-            check_bucket_eq(source_zone, target_zone, bucket)
+            zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
+            check_bucket_eq(source_conn, target_conn, bucket)
 
 def get_latest_object_version(key):
     for k in key.bucket.list_versions(key.name):
@@ -566,28 +588,29 @@ def get_latest_object_version(key):
 
 def test_versioned_object_incremental_sync():
     zonegroup = realm.master_zonegroup()
-    buckets, zone_bucket = create_bucket_per_zone(zonegroup)
+    zonegroup_conns = ZonegroupConns(zonegroup)
+    buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
 
     # enable versioning
-    for zone, bucket in zone_bucket.items():
+    for _, bucket in zone_bucket.items():
         bucket.configure_versioning(True)
 
     zonegroup_meta_checkpoint(zonegroup)
 
     # upload a dummy object to each bucket and wait for sync. this forces each
     # bucket to finish a full sync and switch to incremental
-    for source_zone, bucket in zone_bucket.items():
-        new_key(source_zone, bucket, 'dummy').set_contents_from_string('')
-        for target_zone in zonegroup.zones:
-            if source_zone == target_zone:
+    for source_conn, bucket in zone_bucket.items():
+        new_key(source_conn, bucket, 'dummy').set_contents_from_string('')
+        for target_conn in zonegroup_conns.zones:
+            if source_conn.zone == target_conn.zone:
                 continue
-            zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
+            zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
 
     for _, bucket in zone_bucket.items():
         # create and delete multiple versions of an object from each zone
-        for zone in zonegroup.rw_zones:
-            obj = 'obj-' + zone.name
-            k = new_key(zone, bucket, obj)
+        for zone_conn in zonegroup_conns.rw_zones:
+            obj = 'obj-' + zone_conn.name
+            k = new_key(zone_conn, bucket, obj)
 
             k.set_contents_from_string('version1')
             v = get_latest_object_version(k)
@@ -607,16 +630,16 @@ def test_versioned_object_incremental_sync():
             log.debug('version3 id=%s', v.version_id)
             k.bucket.delete_key(obj, version_id=v.version_id)
 
-    for source_zone, bucket in zone_bucket.items():
-        for target_zone in zonegroup.zones:
-            if source_zone == target_zone:
+    for source_conn, bucket in zone_bucket.items():
+        for target_conn in zonegroup_conns.zones:
+            if source_conn.zone == target_conn.zone:
                 continue
-            zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
-            check_bucket_eq(source_zone, target_zone, bucket)
+            zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
+            check_bucket_eq(source_conn, target_conn, bucket)
 
 def test_bucket_versioning():
     buckets, zone_bucket = create_bucket_per_zone_in_realm()
-    for zone, bucket in zone_bucket.items():
+    for _, bucket in zone_bucket.items():
         bucket.configure_versioning(True)
         res = bucket.get_versioning_status()
         key = 'Versioning'
@@ -624,19 +647,20 @@ def test_bucket_versioning():
 
 def test_bucket_acl():
     buckets, zone_bucket = create_bucket_per_zone_in_realm()
-    for zone, bucket in zone_bucket.items():
+    for _, bucket in zone_bucket.items():
         assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner
         bucket.set_acl('public-read')
         assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers
 
 def test_bucket_delete_notempty():
     zonegroup = realm.master_zonegroup()
-    buckets, zone_bucket = create_bucket_per_zone(zonegroup)
+    zonegroup_conns = ZonegroupConns(zonegroup)
+    buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
     zonegroup_meta_checkpoint(zonegroup)
 
-    for zone, bucket_name in zone_bucket.items():
+    for zone_conn, bucket_name in zone_bucket.items():
         # upload an object to each bucket on its own zone
-        conn = zone.get_connection(user.credentials)
+        conn = zone_conn.get_connection()
         bucket = conn.get_bucket(bucket_name)
         k = bucket.new_key('foo')
         k.set_contents_from_string('bar')
@@ -649,7 +673,7 @@ def test_bucket_delete_notempty():
         assert False # expected 409 BucketNotEmpty
 
     # assert that each bucket still exists on the master
-    c1 = zonegroup.master_zone.get_connection(user.credentials)
+    c1 = zonegroup_conns.master_zone.conn
     for _, bucket_name in zone_bucket.items():
         assert c1.get_bucket(bucket_name)
 
@@ -662,13 +686,8 @@ def test_multi_period_incremental_sync():
     mdlog_periods = [realm.current_period.id]
 
     # create a bucket in each zone
-    buckets = []
-    for zone in zonegroup.zones:
-        conn = get_zone_connection(zone, user.credentials)
-        bucket_name = gen_bucket_name()
-        log.info('create bucket zone=%s name=%s', zone.name, bucket_name)
-        bucket = conn.create_bucket(bucket_name)
-        buckets.append(bucket_name)
+    zonegroup_conns = ZonegroupConns(zonegroup)
+    buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
 
     zonegroup_meta_checkpoint(zonegroup)
 
@@ -682,14 +701,12 @@ def test_multi_period_incremental_sync():
     set_master_zone(z2)
     mdlog_periods += [realm.current_period.id]
 
-    # create another bucket in each zone, except for z3
-    for zone in zonegroup.zones:
-        if zone == z3:
+    for zone_conn, _ in zone_bucket.items():
+        if zone_conn.zone == z3:
             continue
-        conn = get_zone_connection(zone, user.credentials)
         bucket_name = gen_bucket_name()
-        log.info('create bucket zone=%s name=%s', zone.name, bucket_name)
-        bucket = conn.create_bucket(bucket_name)
+        log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
+        bucket = zone_conn.conn.create_bucket(bucket_name)
         buckets.append(bucket_name)
 
     # wait for zone 1 to sync
@@ -699,24 +716,26 @@ def test_multi_period_incremental_sync():
     set_master_zone(z1)
     mdlog_periods += [realm.current_period.id]
 
-    # create another bucket in each zone, except for z3
-    for zone in zonegroup.zones:
-        if zone == z3:
+    for zone_conn, bucket_name in zone_bucket.items():
+        if zone_conn.zone == z3:
             continue
-        conn = get_zone_connection(zone, user.credentials)
         bucket_name = gen_bucket_name()
-        log.info('create bucket zone=%s name=%s', zone.name, bucket_name)
-        bucket = conn.create_bucket(bucket_name)
+        log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
+        bucket = zone_conn.conn.create_bucket(bucket_name)
         buckets.append(bucket_name)
 
     # restart zone 3 gateway and wait for sync
     z3.start()
     zonegroup_meta_checkpoint(zonegroup)
 
-    # verify that we end up with the same buckets
-    for bucket_name in buckets:
-        for source_zone, target_zone in combinations(zonegroup.zones, 2):
-            check_bucket_eq(source_zone, target_zone, bucket_name)
+    # verify that we end up with the same objects
+    for source_conn, _ in zone_bucket.items():
+        for bucket_name in buckets:
+            for target_conn in zonegroup_conns.zones:
+                if source_conn.zone == target_conn.zone:
+                    continue
+
+                check_bucket_eq(source_conn, target_conn, bucket)
 
     # verify that mdlogs are not empty and match for each period
     for period in mdlog_periods:
@@ -745,6 +764,7 @@ def test_multi_period_incremental_sync():
 
 def test_zonegroup_remove():
     zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
     if len(zonegroup.zones) < 2:
         raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
 
diff --git a/src/test/rgw/rgw_multi/tests_es.py b/src/test/rgw/rgw_multi/tests_es.py
new file mode 100644 (file)
index 0000000..18e159c
--- /dev/null
@@ -0,0 +1,105 @@
+import json
+import urllib
+import logging
+
+import boto
+import boto.s3.connection
+
+from nose.tools import eq_ as eq
+
+from rgw_multi.multisite import *
+from rgw_multi.tests import *
+from rgw_multi.zone_es import *
+
+log = logging.getLogger(__name__)
+
+
+def check_es_configured():
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+
+    es_zones = zonegroup.zones_by_type.get("elasticsearch")
+    if not es_zones:
+        raise SkipTest("Requires at least one ES zone")
+
+def is_es_zone(zone_conn):
+    if not zone_conn:
+        return False
+
+    return zone_conn.zone.tier_type() == "elasticsearch"
+
+def verify_search(src_keys, result_keys, f):
+    check_keys = []
+    for k in src_keys:
+        log.debug('ZZZ ' + k.bucket.name)
+        if f(k):
+            check_keys.append(k)
+    check_keys.sort(key = lambda l: (l.name, l.version_id))
+
+    log.debug('check keys:' + dump_json(check_keys))
+    log.debug('result keys:' + dump_json(result_keys))
+
+    for k1, k2 in zip_longest(check_keys, result_keys):
+        assert k1
+        assert k2
+        check_object_eq(k1, k2)
+
+def do_check_mdsearch(conn, bucket, src_keys, req_str, src_filter):
+    if bucket:
+        bucket_name = bucket.name
+    else:
+        bucket_name = ''
+    req = MDSearch(conn, bucket_name, req_str)
+    result_keys = req.search(sort_key = lambda k: (k.name, k.version_id))
+    verify_search(src_keys, result_keys, src_filter)
+
+def test_es_object_search_by_name():
+    check_es_configured()
+
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+    buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
+
+    objnames = [ 'foo1', 'foo2', 'foo3', 'foo4' ]
+    content = 'asdasd'
+
+    src_keys = []
+
+    owner = None
+
+    # don't wait for meta sync just yet
+    for zone, bucket in zone_bucket.items():
+        for objname in objnames:
+            k = new_key(zone, bucket.name, objname)
+            k.set_contents_from_string(content)
+
+            if not owner:
+                for list_key in bucket.list_versions():
+                    owner = list_key.owner
+                    break
+
+            k = bucket.get_key(k.name, version_id = k.version_id)
+            k.owner = owner # owner is not set when doing get_key()
+
+            src_keys.append(k)
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    for source_conn, bucket in zone_bucket.items():
+        for target_conn in zonegroup_conns.zones:
+            if source_conn.zone == target_conn.zone:
+                continue
+            if not is_es_zone(target_conn):
+                continue
+
+            zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
+
+            do_check_mdsearch(target_conn.conn, None, src_keys , 'bucket == ' + bucket.name, lambda k: True)
+            do_check_mdsearch(target_conn.conn, bucket, src_keys , 'bucket == ' + bucket.name, lambda k: k.bucket.name == bucket.name)
+            # req = MDSearch(target_conn.conn, bucket.name, 'bucket == ' + bucket.name)
+            # result_keys = req.search(sort_key = lambda k: (k.name, k.version_id))
+            # verify_search(src_keys, result_keys, lambda k: k.bucket.name == bucket.name)
+            
+
+
index c9bd6850e66bd593143fbb86caac20ff4d8d09f7..edb08cfee1a02da75500ed8e0d0f406115a19afc 100644 (file)
@@ -79,10 +79,10 @@ class MDSearch:
 
 
 class ESZoneBucket:
-    def __init__(self, zone, name, credentials):
-        self.zone = zone
+    def __init__(self, zone_conn, name, conn):
+        self.zone_conn = zone_conn
         self.name = name
-        self.conn = zone.get_connection(credentials)
+        self.conn = conn
 
         self.bucket = boto.s3.bucket.Bucket(name=name)
 
@@ -152,35 +152,48 @@ class ESZone(Zone):
     def has_buckets(self):
         return False
 
-    def get_bucket(self, bucket_name, credentials):
-        return ESZoneBucket(self, bucket_name, credentials)
+    class Conn(ZoneConn):
+        def __init__(self, zone, credentials):
+            super(ESZone.Conn, self).__init__(zone, credentials)
 
-    def check_bucket_eq(self, zone, bucket_name, credentials):
-        assert(zone.tier_type() == "rados")
+        def get_bucket(self, bucket_name):
+            return ESZoneBucket(self, bucket_name, self.conn)
 
-        log.info('comparing bucket=%s zones={%s, %s}', bucket_name, self.name, zone.name)
-        b1 = self.get_bucket(bucket_name, credentials)
-        b2 = zone.get_bucket(bucket_name, credentials)
+        def create_bucket(self, name):
+            # should not be here, a bug in the test suite
+            log.critical('Conn.create_bucket() should not be called in ES zone')
+            assert False
 
-        log.debug('bucket1 objects:')
-        for o in b1.get_all_versions():
-            log.debug('o=%s', o.name)
-        log.debug('bucket2 objects:')
-        for o in b2.get_all_versions():
-            log.debug('o=%s', o.name)
+        def check_bucket_eq(self, zone_conn, bucket_name):
+            assert(zone_conn.zone.tier_type() == "rados")
 
-        for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()):
-            if k1 is None:
-                log.critical('key=%s is missing from zone=%s', k2.name, self.name)
-                assert False
-            if k2 is None:
-                log.critical('key=%s is missing from zone=%s', k1.name, zone.name)
-                assert False
+            log.info('comparing bucket=%s zones={%s, %s}', bucket_name, self.name, self.name)
+            b1 = self.get_bucket(bucket_name)
+            b2 = zone_conn.get_bucket(bucket_name)
 
-            check_object_eq(k1, k2)
+            log.debug('bucket1 objects:')
+            for o in b1.get_all_versions():
+                log.debug('o=%s', o.name)
+            log.debug('bucket2 objects:')
+            for o in b2.get_all_versions():
+                log.debug('o=%s', o.name)
 
+            for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()):
+                if k1 is None:
+                    log.critical('key=%s is missing from zone=%s', k2.name, self.self.name)
+                    assert False
+                if k2 is None:
+                    log.critical('key=%s is missing from zone=%s', k1.name, zone_conn.name)
+                    assert False
 
-        log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone.name)
+                check_object_eq(k1, k2)
+
+
+            log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name)
+
+            return True
+
+    def get_conn(self, credentials):
+        return self.Conn(self, credentials)
 
 
-        return True
index 675dd5b5b2fd887927d445e66c8eefac32a5b384..409804e9a85381fd8708d29036fc08890801f09c 100644 (file)
@@ -41,38 +41,49 @@ class RadosZone(Zone):
     def  tier_type(self):
         return "rados"
 
-    def get_bucket(self, name, credentials):
-        conn = self.get_connection(credentials)
-        return conn.get_bucket(name, credentials)
 
-    def check_bucket_eq(self, zone, bucket_name, credentials):
-        log.info('comparing bucket=%s zones={%s, %s}', bucket_name, self.name, zone.name)
-        b1 = self.get_bucket(bucket_name, credentials)
-        b2 = zone.get_bucket(bucket_name, credentials)
+    class Conn(ZoneConn):
+        def __init__(self, zone, credentials):
+            super(RadosZone.Conn, self).__init__(zone, credentials)
 
-        log.debug('bucket1 objects:')
-        for o in b1.get_all_versions():
-            log.debug('o=%s', o.name)
-        log.debug('bucket2 objects:')
-        for o in b2.get_all_versions():
-            log.debug('o=%s', o.name)
+        def get_bucket(self, name):
+            return self.conn.get_bucket(name)
 
-        for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()):
-            if k1 is None:
-                log.critical('key=%s is missing from zone=%s', k2.name, self.name)
-                assert False
-            if k2 is None:
-                log.critical('key=%s is missing from zone=%s', k1.name, zone.name)
-                assert False
+        def create_bucket(self, name):
+            return self.conn.create_bucket(name)
 
-            check_object_eq(k1, k2)
+        def check_bucket_eq(self, zone_conn, bucket_name):
+            log.info('comparing bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name)
+            b1 = self.get_bucket(bucket_name)
+            b2 = zone_conn.get_bucket(bucket_name)
 
-            # now get the keys through a HEAD operation, verify that the available data is the same
-            k1_head = b1.get_key(k1.name)
-            k2_head = b2.get_key(k2.name)
+            log.debug('bucket1 objects:')
+            for o in b1.get_all_versions():
+                log.debug('o=%s', o.name)
+            log.debug('bucket2 objects:')
+            for o in b2.get_all_versions():
+                log.debug('o=%s', o.name)
 
-            check_object_eq(k1_head, k2_head, False)
+            for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()):
+                if k1 is None:
+                    log.critical('key=%s is missing from zone=%s', k2.name, self.name)
+                    assert False
+                if k2 is None:
+                    log.critical('key=%s is missing from zone=%s', k1.name, zone_conn.name)
+                    assert False
 
-        log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone.name)
+                check_object_eq(k1, k2)
 
+                # now get the keys through a HEAD operation, verify that the available data is the same
+                k1_head = b1.get_key(k1.name)
+                k2_head = b2.get_key(k2.name)
+
+                check_object_eq(k1_head, k2_head, False)
+
+            log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name)
+
+            return True
+
+    def get_conn(self, credentials):
+        return self.Conn(self, credentials)
 
index 2e0870868ff171034b990d6a9b81c6c877692226..21154585ceb05f2962acafb54fc252b339c349f0 100644 (file)
@@ -18,6 +18,7 @@ from rgw_multi.zone_es  import ESZone as ESZone
 
 # make tests from rgw_multi.tests available to nose
 from rgw_multi.tests import *
+from rgw_multi.tests_es import *
 
 mstart_path = os.getenv('MSTART_PATH')
 if mstart_path is None:
@@ -287,6 +288,8 @@ def init(parse_args):
             if is_master:
                 zonegroup.master_zone = zone
 
+            zonegroup.zones_by_type.setdefault(zone.tier_type(), []).append(zone)
+
             if zone.is_read_only():
                 zonegroup.ro_zones.append(zone)
             else: