]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
qa/multisite: boto3 in tests.py
authorShilpa Jagannath <smanjara@redhat.com>
Wed, 21 Jan 2026 23:54:55 +0000 (18:54 -0500)
committerAdam C. Emerson <aemerson@redhat.com>
Wed, 11 Feb 2026 20:56:36 +0000 (15:56 -0500)
Signed-off-by: Shilpa Jagannath <smanjara@redhat.com>
(cherry picked from commit 3438add3fc9e086da6313830e697e6a987893f3c)

Conflicts:
src/test/rgw/rgw_multi/tests.py
 - Whitespace and other drift
 - Tentacle doesn't sync object locks
 - `test_suspended_delete_marker_incremental_sync` is a new test but
   not a new behavior.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/test/rgw/rgw_multi/conn.py
src/test/rgw/rgw_multi/multisite.py
src/test/rgw/rgw_multi/tests.py
src/test/rgw/rgw_multi/zone_rados.py

index 34fb4ae340dc29c44133c3412ec34db74e2ab798..7d23b1699a0444b73def14bfe8a602e89e1b747b 100644 (file)
@@ -1,19 +1,29 @@
-
-import boto.iam.connection
-import boto.sts.connection
 import boto3
 
 def get_gateway_connection(gateway, credentials, region):
     """ connect to the given gateway """
     # Always create a new connection to the gateway to ensure each set of credentials gets its own connection
-    conn = boto3.client('s3',
+    s3_client = boto3.client('s3',
                         endpoint_url='http://' + gateway.host + ':' + str(gateway.port),
                         aws_access_key_id=credentials.access_key,
                         aws_secret_access_key=credentials.secret,
                         region_name=region)
-    if gateway.connection is None:
-        gateway.connection = conn
-    return conn
+    if gateway.s3_client is None:
+        gateway.s3_client = s3_client
+    return s3_client
+
+def get_gateway_s3_resource(gateway, credentials, region):
+    """ connect to boto3 s3 resource api of the given gateway """
+    print(f"Credentials: {credentials.access_key}")
+    s3_resource = boto3.resource('s3',
+                            endpoint_url='http://' + gateway.host + ':' + str(gateway.port),
+                            aws_access_key_id=credentials.access_key,
+                            aws_secret_access_key=credentials.secret,
+                            region_name=region,
+        )
+    if gateway.s3_resource is None:
+        gateway.s3_resource = s3_resource
+    return s3_resource
 
 def get_gateway_secure_connection(gateway, credentials, region):
     """ secure connect to the given gateway """
index 842344abdd3d6acaecaee06b161e3235d79d1e61..77704c01ee5cc1a8b5b57bd87b80b16fd6c534e9 100644 (file)
@@ -3,7 +3,7 @@ from io import StringIO
 
 import json
 
-from .conn import get_gateway_connection, get_gateway_iam_connection, get_gateway_secure_connection, get_gateway_sns_client, get_gateway_sts_connection, get_gateway_temp_s3_client
+from .conn import get_gateway_connection, get_gateway_s3_resource, get_gateway_iam_connection, get_gateway_secure_connection, get_gateway_sns_client, get_gateway_sts_connection, get_gateway_temp_s3_client
 
 class Cluster:
     """ interface to run commands against a distinct ceph cluster """
@@ -25,6 +25,8 @@ class Gateway:
         self.zone = zone
         self.connection = None
         self.secure_connection = None
+        self.s3_client = None
+        self.s3_resource = None
         self.ssl_port = ssl_port
         self.iam_connection = None
         self.sns_client = None
@@ -190,7 +192,8 @@ class ZoneConn(object):
         if self.zone.gateways is not None:
             region = "" if self.zone.zonegroup is None else self.zone.zonegroup.name
             self.iam_conn = get_gateway_iam_connection(self.zone.gateways[0], self.credentials, region)
-            self.conn = get_gateway_connection(self.zone.gateways[0], self.credentials, region)
+            self.s3_client = get_gateway_connection(self.zone.gateways[0], self.credentials, region)
+            self.s3_resource = get_gateway_s3_resource(self.zone.gateways[0], self.credentials, region)
             self.secure_conn = get_gateway_connection(self.zone.gateways[0], self.credentials, region)
             self.sns_client = get_gateway_sns_client(self.zone.gateways[0], self.credentials, region)
             self.temp_s3_client = None
index 45226a58a910cdb1c3a089f4d050cfa22e5cb0f0..cd4ba406e372c2c6357b923dfd0923e37d8eea69 100644 (file)
@@ -11,10 +11,7 @@ from itertools import combinations
 from itertools import zip_longest
 from io import StringIO
 
-import boto
-import boto.s3.connection
-from boto.s3.website import WebsiteConfiguration
-from boto.s3.cors import CORSConfiguration
+import boto3
 from botocore.exceptions import ClientError
 
 from nose.tools import eq_ as eq
@@ -489,6 +486,7 @@ class ZonegroupConns:
         self.master_zone = None
 
         for z in zonegroup.zones:
+            log.debug('=== Debug get_conn for zone=%s ===', z.name)
             zone_conn = z.get_conn(user.credentials)
             non_account_zone_conn = z.get_conn(non_account_user.credentials)
             non_account_alt_zone_conn = z.get_conn(non_account_alt_user.credentials)
@@ -603,6 +601,8 @@ def create_bucket_per_zone_in_realm():
 def test_bucket_create():
     zonegroup = realm.master_zonegroup()
     zonegroup_conns = ZonegroupConns(zonegroup)
+    zonegroup_meta_checkpoint(zonegroup)
+
     buckets, _ = create_bucket_per_zone(zonegroup_conns)
     zonegroup_meta_checkpoint(zonegroup)
 
@@ -624,30 +624,35 @@ def test_bucket_create_with_tenant():
     tenant = 'testx'
     uid = 'test'
 
-    tenant_secondary_conn = boto.s3.connection.S3Connection(aws_access_key_id=access_key,
-                                aws_secret_access_key=secret_key,
-                                is_secure=False,
-                                port=secondary.zone.gateways[0].port,
-                                host=secondary.zone.gateways[0].host,
-                                calling_format='boto.s3.connection.OrdinaryCallingFormat')
+    # create s3 clients for tenant user
+    tenant_secondary_conn = boto3.client(
+        's3',
+        endpoint_url=f'http://{secondary.zone.gateways[0].host}:{secondary.zone.gateways[0].port}',
+        aws_access_key_id=access_key,
+        aws_secret_access_key=secret_key,
+        region_name='default'
+    )
 
-    tenant_primary_conn = boto.s3.connection.S3Connection(aws_access_key_id=access_key,
-                                aws_secret_access_key=secret_key,
-                                is_secure=False,
-                                port=primary.zone.gateways[0].port,
-                                host=primary.zone.gateways[0].host,
-                                calling_format='boto.s3.connection.OrdinaryCallingFormat')
+    tenant_primary_conn = boto3.client(
+        's3',
+        endpoint_url=f'http://{primary.zone.gateways[0].host}:{primary.zone.gateways[0].port}',
+        aws_access_key_id=access_key,
+        aws_secret_access_key=secret_key,
+        region_name='default'
+    )
 
     cmd = ['user', 'create', '--tenant', tenant, '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', 'tenanted-user']
     primary.zone.cluster.admin(cmd)
     zonegroup_meta_checkpoint(zonegroup)
     try:
-        bucket = tenant_secondary_conn.create_bucket('tenanted-bucket')
+        bucket_name = 'tenanted-bucket'
+        tenant_secondary_conn.create_bucket(Bucket=bucket_name)
         zonegroup_meta_checkpoint(zonegroup)
-        assert tenant_primary_conn.get_bucket(bucket.name)
+        tenant_primary_conn.head_bucket(Bucket=bucket_name)
         log.info("bucket exists in tenant namespace")
-        e = assert_raises(boto.exception.S3ResponseError, primary.get_bucket, bucket.name)
-        assert e.error_code == 'NoSuchBucket'
+
+        e = assert_raises(ClientError, primary.s3_client.head_bucket, Bucket=bucket_name)
+        assert e.response['Error']['Code'] == 'NoSuchBucket'
         log.info("bucket does not exist in default user namespace")
     finally:
         cmd = ['user', 'rm', '--tenant', tenant, '--uid', uid, '--purge-data']
@@ -685,25 +690,14 @@ def test_bucket_remove():
         for zone in zonegroup_conns.zones:
             assert check_all_buckets_exist(zone, buckets)
 
-        for zone, bucket_name in zone_buckets:
-            zone.conn.delete_bucket(bucket_name)
+        for zone, bucket in zone_buckets:
+            zone.s3_client.delete_bucket(Bucket=bucket.name)
 
         zonegroup_meta_checkpoint(zonegroup)
 
         for zone in zonegroup_conns.zones:
             assert check_all_buckets_dont_exist(zone, buckets)
 
-def get_bucket(zone, bucket_name):
-    return zone.conn.get_bucket(bucket_name)
-
-def get_key(zone, bucket_name, obj_name):
-    b = get_bucket(zone, bucket_name)
-    return b.get_key(obj_name)
-
-def new_key(zone, bucket_name, obj_name):
-    b = get_bucket(zone, bucket_name)
-    return b.new_key(obj_name)
-
 def check_bucket_eq(zone_conn1, zone_conn2, bucket):
     if zone_conn2.zone.has_buckets():
         zone_conn2.check_bucket_eq(zone_conn1, bucket.name)
@@ -839,15 +833,13 @@ def test_object_sync():
     zonegroup_conns = ZonegroupConns(zonegroup)
     buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
 
-    objnames = [ 'myobj', '_myobj', ':', '&', '.', '..', '...',  '.o', '.o.']
-
+    objnames = ['myobj', '_myobj', ':', '&', '.', '..', '...', '.o', '.o.']
     content = 'asdasd'
 
-    # don't wait for meta sync just yet
-    for zone, bucket_name in zone_bucket:
+    # upload objects
+    for zone, bucket in zone_bucket:
         for objname in objnames:
-            k = new_key(zone, bucket_name, objname)
-            k.set_contents_from_string(content)
+            zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body=content)
 
     zonegroup_meta_checkpoint(zonegroup)
 
@@ -867,10 +859,9 @@ def test_object_delete():
     objname = 'myobj'
     content = 'asdasd'
 
-    # don't wait for meta sync just yet
+    # upload objects
     for zone, bucket in zone_bucket:
-        k = new_key(zone, bucket, objname)
-        k.set_contents_from_string(content)
+        zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body=content)
 
     zonegroup_meta_checkpoint(zonegroup)
 
@@ -883,10 +874,10 @@ def test_object_delete():
             zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
             check_bucket_eq(source_conn, target_conn, bucket)
 
-    # check object removal
+    # delete objects
     for source_conn, bucket in zone_bucket:
-        k = get_key(source_conn, bucket, objname)
-        k.delete()
+        source_conn.s3_client.delete_object(Bucket=bucket.name, Key=objname)
+        
         for target_conn in zonegroup_conns.zones:
             if source_conn.zone == target_conn.zone:
                 continue
@@ -899,12 +890,13 @@ def test_multi_object_delete():
     zonegroup_conns = ZonegroupConns(zonegroup)
     buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
 
-    objnames = [f'obj{i}' for i in range(1,50)]
+    objnames = [f'obj{i}' for i in range(1, 50)]
     content = 'asdasd'
 
-    # don't wait for meta sync just yet
+    # upload objects
     for zone, bucket in zone_bucket:
-        create_objects(zone, bucket, objnames, content)
+        for objname in objnames:
+            zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body=content)
 
     zonegroup_meta_checkpoint(zonegroup)
 
@@ -917,9 +909,14 @@ def test_multi_object_delete():
             zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
             check_bucket_eq(source_conn, target_conn, bucket)
 
-    # check object removal
+    # delete objects
     for source_conn, bucket in zone_bucket:
-        bucket.delete_keys(objnames)
+        objects = [{'Key': obj} for obj in objnames]
+        source_conn.s3_client.delete_objects(
+            Bucket=bucket.name,
+            Delete={'Objects': objects}
+        )
+        
         for target_conn in zonegroup_conns.zones:
             if source_conn.zone == target_conn.zone:
                 continue
@@ -927,27 +924,24 @@ def test_multi_object_delete():
             zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
             check_bucket_eq(source_conn, target_conn, bucket)
 
-def get_latest_object_version(key):
-    for k in key.bucket.list_versions(key.name):
-        if k.is_latest:
-            return k
-    return None
-
 def test_versioned_object_incremental_sync():
     zonegroup = realm.master_zonegroup()
     zonegroup_conns = ZonegroupConns(zonegroup)
     buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
 
     # enable versioning
-    for _, bucket in zone_bucket:
-        bucket.configure_versioning(True)
+    for zone, bucket in zone_bucket:
+        zone.s3_client.put_bucket_versioning(
+            Bucket=bucket.name,
+            VersioningConfiguration={'Status': 'Enabled'}
+        )
 
     zonegroup_meta_checkpoint(zonegroup)
 
     # upload a dummy object to each bucket and wait for sync. this forces each
     # bucket to finish a full sync and switch to incremental
     for source_conn, bucket in zone_bucket:
-        new_key(source_conn, bucket, 'dummy').set_contents_from_string('')
+        source_conn.s3_client.put_object(Bucket=bucket.name, Key='dummy', Body='')
         for target_conn in zonegroup_conns.zones:
             if source_conn.zone == target_conn.zone:
                 continue
@@ -957,33 +951,48 @@ def test_versioned_object_incremental_sync():
         # create and delete multiple versions of an object from each zone
         for zone_conn in zonegroup_conns.rw_zones:
             obj = 'obj-' + zone_conn.name
-            k = new_key(zone_conn, bucket, obj)
 
-            k.set_contents_from_string('version1')
-            log.debug('version1 id=%s', k.version_id)
+            resp1 = zone_conn.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='version1')
+            version_id_1 = resp1['VersionId']
+            log.debug('version1 id=%s', version_id_1)
             # don't delete version1 - this tests that the initial version
             # doesn't get squashed into later versions
 
             # create and delete the following object versions to test that
             # the operations don't race with each other during sync
-            k.set_contents_from_string('version2')
-            log.debug('version2 id=%s', k.version_id)
-            k.bucket.delete_key(obj, version_id=k.version_id)
-
-            k.set_contents_from_string('version3')
-            log.debug('version3 id=%s', k.version_id)
-            k.bucket.delete_key(obj, version_id=k.version_id)
+            resp2 = zone_conn.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='version2')
+            version_id_2 = resp2['VersionId']
+            log.debug('version2 id=%s', version_id_2)
+            zone_conn.s3_client.delete_object(Bucket=bucket.name, Key=obj, VersionId=version_id_2)
+            
+            # Create and delete version3
+            resp3 = zone_conn.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='version3')
+            version_id_3 = resp3['VersionId']
+            log.debug('version3 id=%s', version_id_3)
+            zone_conn.s3_client.delete_object(Bucket=bucket.name, Key=obj, VersionId=version_id_3)
 
     for _, bucket in zone_bucket:
         zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
 
+    # update ACLs to test metadata-only entries
     for _, bucket in zone_bucket:
-        # overwrite the acls to test that metadata-only entries are applied
         for zone_conn in zonegroup_conns.rw_zones:
             obj = 'obj-' + zone_conn.name
-            k = new_key(zone_conn, bucket.name, obj)
-            v = get_latest_object_version(k)
-            v.make_public()
+            
+            # get latest version
+            response = zone_conn.s3_client.list_object_versions(
+                Bucket=bucket.name,
+                Prefix=obj,
+                MaxKeys=1
+            )
+            if response.get('Versions'):
+                latest = response['Versions'][0]
+                zone_conn.s3_client.put_object_acl(
+                    Bucket=bucket.name,
+                    Key=obj,
+                    VersionId=latest['VersionId'],
+                    ACL='public-read'
+                )
 
     for _, bucket in zone_bucket:
         zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
@@ -1001,24 +1010,29 @@ def test_null_version_id_delete():
     obj = 'obj'
 
     # upload an initial object
-    key1 = new_key(zone, bucket, obj)
-    key1.set_contents_from_string('')
-    log.debug('created initial version id=%s', key1.version_id)
+    resp1 = zone.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='')
+    version_id_1 = resp1.get('VersionId', 'null')
+    log.debug('created initial version id=%s', version_id_1)
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
 
     # enable versioning
-    bucket.configure_versioning(True)
+    zone.s3_client.put_bucket_versioning(
+        Bucket=bucket.name,
+        VersioningConfiguration={'Status': 'Enabled'}
+    )
     zonegroup_meta_checkpoint(zonegroup)
 
     # re-upload the object as a new version
-    key2 = new_key(zone, bucket, obj)
-    key2.set_contents_from_string('')
-    log.debug('created new version id=%s', key2.version_id)
+    resp2 = zone.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='')
+    version_id_2 = resp2['VersionId']
+    log.debug('created new version id=%s', version_id_2)
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
 
-    bucket.delete_key(obj, version_id='null')
+    # delete null version
+    zone.s3_client.delete_object(Bucket=bucket.name, Key=obj, VersionId='null')
 
-    bucket.delete_key(obj, version_id=key2.version_id)
+    # delete version 2
+    zone.s3_client.delete_object(Bucket=bucket.name, Key=obj, VersionId=version_id_2)
 
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
 
@@ -1030,22 +1044,24 @@ def test_concurrent_versioned_object_incremental_sync():
     # create a versioned bucket
     bucket = zone.create_bucket(gen_bucket_name())
     log.debug('created bucket=%s', bucket.name)
-    bucket.configure_versioning(True)
+    
+    zone.s3_client.put_bucket_versioning(
+        Bucket=bucket.name,
+        VersioningConfiguration={'Status': 'Enabled'}
+    )
 
     zonegroup_meta_checkpoint(zonegroup)
 
-    # upload a dummy object and wait for sync. this forces each zone to finish
-    # a full sync and switch to incremental
-    new_key(zone, bucket, 'dummy').set_contents_from_string('')
+    # upload a dummy object and wait for sync
+    zone.s3_client.put_object(Bucket=bucket.name, Key='dummy', Body='')
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
 
-    # create several concurrent versions on each zone and let them race to sync
+    # create several concurrent versions on each zone
     obj = 'obj'
     for i in range(10):
         for zone_conn in zonegroup_conns.rw_zones:
-            k = new_key(zone_conn, bucket, obj)
-            k.set_contents_from_string('version1')
-            log.debug('zone=%s version=%s', zone_conn.zone.name, k.version_id)
+            resp = zone_conn.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='version1')
+            log.debug('zone=%s version=%s', zone_conn.zone.name, resp['VersionId'])
 
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
     zonegroup_data_checkpoint(zonegroup_conns)
@@ -1053,7 +1069,6 @@ def test_concurrent_versioned_object_incremental_sync():
 def test_version_suspended_incremental_sync():
     zonegroup = realm.master_zonegroup()
     zonegroup_conns = ZonegroupConns(zonegroup)
-
     zone = zonegroup_conns.rw_zones[0]
 
     # create a non-versioned bucket
@@ -1062,29 +1077,32 @@ def test_version_suspended_incremental_sync():
     zonegroup_meta_checkpoint(zonegroup)
 
     # upload an initial object
-    key1 = new_key(zone, bucket, 'obj')
-    key1.set_contents_from_string('')
-    log.debug('created initial version id=%s', key1.version_id)
+    resp1 = zone.s3_client.put_object(Bucket=bucket.name, Key='obj', Body='')
+    log.debug('created initial version id=%s', resp1.get('VersionId', 'null'))
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
 
     # enable versioning
-    bucket.configure_versioning(True)
+    zone.s3_client.put_bucket_versioning(
+        Bucket=bucket.name,
+        VersioningConfiguration={'Status': 'Enabled'}
+    )
     zonegroup_meta_checkpoint(zonegroup)
 
     # re-upload the object as a new version
-    key2 = new_key(zone, bucket, 'obj')
-    key2.set_contents_from_string('')
-    log.debug('created new version id=%s', key2.version_id)
+    resp2 = zone.s3_client.put_object(Bucket=bucket.name, Key='obj', Body='')
+    log.debug('created new version id=%s', resp2['VersionId'])
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
 
     # suspend versioning
-    bucket.configure_versioning(False)
+    zone.s3_client.put_bucket_versioning(
+        Bucket=bucket.name,
+        VersioningConfiguration={'Status': 'Suspended'}
+    )
     zonegroup_meta_checkpoint(zonegroup)
 
     # re-upload the object as a 'null' version
-    key3 = new_key(zone, bucket, 'obj')
-    key3.set_contents_from_string('')
-    log.debug('created null version id=%s', key3.version_id)
+    resp3 = zone.s3_client.put_object(Bucket=bucket.name, Key='obj', Body='')
+    log.debug('created null version id=%s', resp3.get('VersionId', 'null'))
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
 
 def test_delete_marker_full_sync():
@@ -1093,20 +1111,21 @@ def test_delete_marker_full_sync():
     buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
 
     # enable versioning
-    for _, bucket in zone_bucket:
-        bucket.configure_versioning(True)
+    for zone, bucket in zone_bucket:
+        zone.s3_client.put_bucket_versioning(
+            Bucket=bucket.name,
+            VersioningConfiguration={'Status': 'Enabled'}
+        )
     zonegroup_meta_checkpoint(zonegroup)
 
     for zone, bucket in zone_bucket:
         # upload an initial object
-        key1 = new_key(zone, bucket, 'obj')
-        key1.set_contents_from_string('')
+        zone.s3_client.put_object(Bucket=bucket.name, Key='obj', Body='')
 
-        # create a delete marker
-        key2 = new_key(zone, bucket, 'obj')
-        key2.delete()
-        key2.delete()
-        key2.delete()
+        # create delete markers
+        zone.s3_client.delete_object(Bucket=bucket.name, Key='obj')
+        zone.s3_client.delete_object(Bucket=bucket.name, Key='obj')
+        zone.s3_client.delete_object(Bucket=bucket.name, Key='obj')
 
     # wait for full sync
     for _, bucket in zone_bucket:
@@ -1118,21 +1137,25 @@ def test_suspended_delete_marker_full_sync():
     buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
 
     # enable/suspend versioning
-    for _, bucket in zone_bucket:
-        bucket.configure_versioning(True)
-        bucket.configure_versioning(False)
+    for zone, bucket in zone_bucket:
+        zone.s3_client.put_bucket_versioning(
+            Bucket=bucket.name,
+            VersioningConfiguration={'Status': 'Enabled'}
+        )
+        zone.s3_client.put_bucket_versioning(
+            Bucket=bucket.name,
+            VersioningConfiguration={'Status': 'Suspended'}
+        )
     zonegroup_meta_checkpoint(zonegroup)
 
     for zone, bucket in zone_bucket:
         # upload an initial object
-        key1 = new_key(zone, bucket, 'obj')
-        key1.set_contents_from_string('')
+        zone.s3_client.put_object(Bucket=bucket.name, Key='obj', Body='')
 
-        # create a delete marker
-        key2 = new_key(zone, bucket, 'obj')
-        key2.delete()
-        key2.delete()
-        key2.delete()
+        # create delete markers
+        zone.s3_client.delete_object(Bucket=bucket.name, Key='obj')
+        zone.s3_client.delete_object(Bucket=bucket.name, Key='obj')
+        zone.s3_client.delete_object(Bucket=bucket.name, Key='obj')
 
     # wait for full sync
     for _, bucket in zone_bucket:
@@ -1146,47 +1169,94 @@ def test_concurrent_delete_markers_incremental_sync():
     # create a versioned bucket
     bucket = zone.create_bucket(gen_bucket_name())
     log.debug('created bucket=%s', bucket.name)
-    bucket.configure_versioning(True)
+    
+    zone.s3_client.put_bucket_versioning(
+        Bucket=bucket.name,
+        VersioningConfiguration={'Status': 'Enabled'}
+    )
 
     zonegroup_meta_checkpoint(zonegroup)
 
     obj = 'obj'
 
-    # upload a dummy object and wait for sync. this forces each zone to finish
-    # a full sync and switch to incremental
-    new_key(zone, bucket, obj).set_contents_from_string('')
+    # upload a dummy object and wait for sync
+    zone.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='')
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
 
-    # create several concurrent delete markers on each zone and let them race to sync
+    # create several concurrent delete markers on each zone
     for i in range(2):
         for zone_conn in zonegroup_conns.rw_zones:
-            key = new_key(zone_conn, bucket, obj)
-            key.delete()
+            zone_conn.s3_client.delete_object(Bucket=bucket.name, Key=obj)
+
+    zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+def test_suspended_delete_marker_incremental_sync():
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+    zone = zonegroup_conns.rw_zones[0]
+
+    # create a versioned bucket
+    bucket = zone.create_bucket(gen_bucket_name())
+    log.debug('created bucket=%s', bucket.name)
+    
+    zone.s3_client.put_bucket_versioning(
+        Bucket=bucket.name,
+        VersioningConfiguration={'Status': 'Enabled'}
+    )
+    zone.s3_client.put_bucket_versioning(
+        Bucket=bucket.name,
+        VersioningConfiguration={'Status': 'Suspended'}
+    )
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    obj = 'obj'
+
+    # upload a dummy object and wait for sync
+    zone.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='')
+    zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+    # create a delete marker on source zone
+    zone.s3_client.delete_object(Bucket=bucket.name, Key=obj)
 
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
 
 def test_bucket_versioning():
     buckets, zone_bucket = create_bucket_per_zone_in_realm()
-    for _, bucket in zone_bucket:
-        bucket.configure_versioning(True)
-        res = bucket.get_versioning_status()
-        key = 'Versioning'
-        assert(key in res and res[key] == 'Enabled')
+    for zone, bucket in zone_bucket:
+        zone.s3_client.put_bucket_versioning(
+            Bucket=bucket.name,
+            VersioningConfiguration={'Status': 'Enabled'}
+        )
+        response = zone.s3_client.get_bucket_versioning(Bucket=bucket.name)
+        assert response.get('Status') == 'Enabled'
 
 def test_bucket_acl():
     buckets, zone_bucket = create_bucket_per_zone_in_realm()
-    for _, bucket in zone_bucket:
-        assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner
-        bucket.set_acl('public-read')
-        assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers
+    for zone, bucket in zone_bucket:
+        acl = zone.s3_client.get_bucket_acl(Bucket=bucket.name)
+        assert len(acl['Grants']) == 1  # single grant on owner
+        
+        zone.s3_client.put_bucket_acl(Bucket=bucket.name, ACL='public-read')
+        
+        acl = zone.s3_client.get_bucket_acl(Bucket=bucket.name)
+        assert len(acl['Grants']) == 2  # new grant on AllUsers
 
 def test_bucket_cors():
     buckets, zone_bucket = create_bucket_per_zone_in_realm()
-    for _, bucket in zone_bucket:
-        cors_cfg = CORSConfiguration()
-        cors_cfg.add_rule(['DELETE'], 'https://www.example.com', allowed_header='*', max_age_seconds=3000)
-        bucket.set_cors(cors_cfg)
-        assert(bucket.get_cors().to_xml() == cors_cfg.to_xml())
+    for zone, bucket in zone_bucket:
+        cors_cfg = {
+            'CORSRules': [{
+                'AllowedMethods': ['DELETE'],
+                'AllowedOrigins': ['https://www.example.com'],
+                'AllowedHeaders': ['*'],
+                'MaxAgeSeconds': 3000
+            }]
+        }
+        zone.s3_client.put_bucket_cors(Bucket=bucket.name, CORSConfiguration=cors_cfg)
+        
+        response = zone.s3_client.get_bucket_cors(Bucket=bucket.name)
+        assert response['CORSRules'] == cors_cfg['CORSRules']
 
 def test_bucket_delete_notempty():
     zonegroup = realm.master_zonegroup()
@@ -1194,24 +1264,21 @@ def test_bucket_delete_notempty():
     buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
     zonegroup_meta_checkpoint(zonegroup)
 
-    for zone_conn, bucket_name in zone_bucket:
+    for zone_conn, bucket in zone_bucket:
         # upload an object to each bucket on its own zone
-        conn = zone_conn.get_connection()
-        bucket = conn.get_bucket(bucket_name)
-        k = bucket.new_key('foo')
-        k.set_contents_from_string('bar')
+        zone_conn.s3_client.put_object(Bucket=bucket.name, Key='foo', Body='bar')
+        
         # attempt to delete the bucket before this object can sync
         try:
-            conn.delete_bucket(bucket_name)
-        except boto.exception.S3ResponseError as e:
-            assert(e.error_code == 'BucketNotEmpty')
+            zone_conn.s3_client.delete_bucket(Bucket=bucket.name)
+        except ClientError as e:
+            assert e.response['Error']['Code'] == 'BucketNotEmpty'
             continue
-        assert False # expected 409 BucketNotEmpty
+        assert False  # expected BucketNotEmpty
 
     # assert that each bucket still exists on the master
-    c1 = zonegroup_conns.master_zone.conn
-    for _, bucket_name in zone_bucket:
-        assert c1.get_bucket(bucket_name)
+    for _, bucket in zone_bucket:
+        zonegroup_conns.master_zone.s3_client.head_bucket(Bucket=bucket.name)
 
 def test_multi_period_incremental_sync():
     zonegroup = realm.master_zonegroup()
@@ -1242,7 +1309,7 @@ def test_multi_period_incremental_sync():
             continue
         bucket_name = gen_bucket_name()
         log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
-        bucket = zone_conn.conn.create_bucket(bucket_name)
+        zone_conn.s3_client.create_bucket(Bucket=bucket_name)
         buckets.append(bucket_name)
 
     # wait for zone 1 to sync
@@ -1252,12 +1319,12 @@ def test_multi_period_incremental_sync():
     set_master_zone(z1)
     mdlog_periods += [realm.current_period.id]
 
-    for zone_conn, bucket_name in zone_bucket:
+    for zone_conn, _ in zone_bucket:
         if zone_conn.zone == z3:
             continue
         bucket_name = gen_bucket_name()
         log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
-        zone_conn.conn.create_bucket(bucket_name)
+        zone_conn.s3_client.create_bucket(Bucket=bucket_name)
         buckets.append(bucket_name)
 
     # restart zone 3 gateway and wait for sync
@@ -1270,7 +1337,6 @@ def test_multi_period_incremental_sync():
             for target_conn in zonegroup_conns.zones:
                 if source_conn.zone == target_conn.zone:
                     continue
-
                 if target_conn.zone.has_buckets():
                     target_conn.check_bucket_eq(source_conn, bucket_name)
 
@@ -1306,8 +1372,7 @@ def test_datalog_autotrim():
 
     # upload an object to each zone to generate a datalog entry
     for zone, bucket in zone_bucket:
-        k = new_key(zone, bucket.name, 'key')
-        k.set_contents_from_string('body')
+        zone.s3_client.put_object(Bucket=bucket.name, Key='key', Body='body')
 
     # wait for metadata and data sync to catch up
     zonegroup_meta_checkpoint(zonegroup)
@@ -1337,45 +1402,41 @@ def test_datalog_autotrim():
 def test_multi_zone_redirect():
     zonegroup = realm.master_zonegroup()
     if len(zonegroup.rw_zones) < 2:
-        raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
+        raise SkipTest("test_multi_zone_redirect skipped. Requires 2 or more zones in master zonegroup.")
 
     zonegroup_conns = ZonegroupConns(zonegroup)
     (zc1, zc2) = zonegroup_conns.rw_zones[0:2]
-
     z1, z2 = (zc1.zone, zc2.zone)
 
     set_sync_from_all(z2, False)
 
-    # create a bucket on the first zone
     bucket_name = gen_bucket_name()
     log.info('create bucket zone=%s name=%s', z1.name, bucket_name)
-    bucket = zc1.conn.create_bucket(bucket_name)
+    zc1.s3_client.create_bucket(Bucket=bucket_name)
+    
     obj = 'testredirect'
-
-    key = bucket.new_key(obj)
-    data = 'A'*512
-    key.set_contents_from_string(data)
+    data = 'A' * 512
+    zc1.s3_client.put_object(Bucket=bucket_name, Key=obj, Body=data)
 
     zonegroup_meta_checkpoint(zonegroup)
 
     # try to read object from second zone (should fail)
-    bucket2 = get_bucket(zc2, bucket_name)
-    assert_raises(boto.exception.S3ResponseError, bucket2.get_key, obj)
+    e = assert_raises(ClientError, zc2.s3_client.get_object, Bucket=bucket_name, Key=obj)
+    assert e.response['Error']['Code'] == 'NoSuchKey'
 
     set_redirect_zone(z2, z1)
 
-    key2 = bucket2.get_key(obj)
-
-    eq(data, key2.get_contents_as_string(encoding='ascii'))
-
-    key = bucket.new_key(obj)
+    # Should work now with redirect
+    response = zc2.s3_client.get_object(Bucket=bucket_name, Key=obj)
+    eq(data, response['Body'].read().decode('ascii'))
 
+    # Test updates
     for x in ['a', 'b', 'c', 'd']:
-        data = x*512
-        key.set_contents_from_string(data)
-        eq(data, key2.get_contents_as_string(encoding='ascii'))
+        data = x * 512
+        zc1.s3_client.put_object(Bucket=bucket_name, Key=obj, Body=data)
+        response = zc2.s3_client.get_object(Bucket=bucket_name, Key=obj)
+        eq(data, response['Body'].read().decode('ascii'))
 
-    # revert config changes
     set_sync_from_all(z2, True)
     set_redirect_zone(z2, None)
 
@@ -1441,14 +1502,23 @@ def test_zg_master_zone_delete():
 
 def test_set_bucket_website():
     buckets, zone_bucket = create_bucket_per_zone_in_realm()
-    for _, bucket in zone_bucket:
-        website_cfg = WebsiteConfiguration(suffix='index.html',error_key='error.html')
+    for zone, bucket in zone_bucket:
+        website_cfg = {
+            'IndexDocument': {'Suffix': 'index.html'},
+            'ErrorDocument': {'Key': 'error.html'}
+        }
         try:
-            bucket.set_website_configuration(website_cfg)
-        except boto.exception.S3ResponseError as e:
-            if e.error_code == 'MethodNotAllowed':
+            zone.s3_client.put_bucket_website(
+                Bucket=bucket.name,
+                WebsiteConfiguration=website_cfg
+            )
+        except ClientError as e:
+            if e.response['Error']['Code'] == 'MethodNotAllowed':
                 raise SkipTest("test_set_bucket_website skipped. Requires rgw_enable_static_website = 1.")
-        assert(bucket.get_website_configuration_with_xml()[1] == website_cfg.to_xml())
+        
+        response = zone.s3_client.get_bucket_website(Bucket=bucket.name)
+        assert response['IndexDocument']['Suffix'] == 'index.html'
+        assert response['ErrorDocument']['Key'] == 'error.html'
 
 def test_set_bucket_policy():
     policy = '''{
@@ -1459,9 +1529,10 @@ def test_set_bucket_policy():
   }]
 }'''
     buckets, zone_bucket = create_bucket_per_zone_in_realm()
-    for _, bucket in zone_bucket:
-        bucket.set_policy(policy)
-        assert(bucket.get_policy().decode('ascii') == policy)
+    for zone, bucket in zone_bucket:
+        zone.s3_client.put_bucket_policy(Bucket=bucket.name, Policy=policy)
+        response = zone.s3_client.get_bucket_policy(Bucket=bucket.name)
+        assert response['Policy'] == policy
 
 @attr('bucket_sync_disable')
 def test_bucket_sync_disable():
@@ -1489,8 +1560,7 @@ def test_bucket_sync_enable_right_after_disable():
 
     for zone, bucket in zone_bucket:
         for objname in objnames:
-            k = new_key(zone, bucket.name, objname)
-            k.set_contents_from_string(content)
+            zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body=content)
 
     zonegroup_meta_checkpoint(zonegroup)
 
@@ -1505,8 +1575,7 @@ def test_bucket_sync_enable_right_after_disable():
 
     for zone, bucket in zone_bucket:
         for objname in objnames_2:
-            k = new_key(zone, bucket.name, objname)
-            k.set_contents_from_string(content)
+            zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body=content)
 
     for bucket_name in buckets:
         zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
@@ -1519,13 +1588,12 @@ def test_bucket_sync_disable_enable():
     zonegroup_conns = ZonegroupConns(zonegroup)
     buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
 
-    objnames = [ 'obj1', 'obj2', 'obj3', 'obj4' ]
+    objnames = ['obj1', 'obj2', 'obj3', 'obj4']
     content = 'asdasd'
 
     for zone, bucket in zone_bucket:
         for objname in objnames:
-            k = new_key(zone, bucket.name, objname)
-            k.set_contents_from_string(content)
+            zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body=content)
 
     zonegroup_meta_checkpoint(zonegroup)
 
@@ -1537,12 +1605,11 @@ def test_bucket_sync_disable_enable():
 
     zonegroup_meta_checkpoint(zonegroup)
 
-    objnames_2 = [ 'obj5', 'obj6', 'obj7', 'obj8' ]
+    objnames_2 = ['obj5', 'obj6', 'obj7', 'obj8']
 
     for zone, bucket in zone_bucket:
         for objname in objnames_2:
-            k = new_key(zone, bucket.name, objname)
-            k.set_contents_from_string(content)
+            zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body=content)
 
     for bucket_name in buckets:
         enable_bucket_sync(realm.meta_master_zone(), bucket_name)
@@ -1557,19 +1624,32 @@ def test_multipart_object_sync():
     zonegroup_conns = ZonegroupConns(zonegroup)
     buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
 
-    _, bucket = zone_bucket[0]
+    zone, bucket = zone_bucket[0]
 
     # initiate a multipart upload
-    upload = bucket.initiate_multipart_upload('MULTIPART')
-    mp = boto.s3.multipart.MultiPartUpload(bucket)
-    mp.key_name = upload.key_name
-    mp.id = upload.id
-    part_size = 5 * 1024 * 1024 # 5M min part size
-    mp.upload_part_from_file(StringIO('a' * part_size), 1)
-    mp.upload_part_from_file(StringIO('b' * part_size), 2)
-    mp.upload_part_from_file(StringIO('c' * part_size), 3)
-    mp.upload_part_from_file(StringIO('d' * part_size), 4)
-    mp.complete_upload()
+    key_name = 'MULTIPART'
+    response = zone.s3_client.create_multipart_upload(Bucket=bucket.name, Key=key_name)
+    upload_id = response['UploadId']
+    
+    part_size = 5 * 1024 * 1024  # 5M min part size
+    parts = []
+    
+    for part_num, char in enumerate(['a', 'b', 'c', 'd'], start=1):
+        part_response = zone.s3_client.upload_part(
+            Bucket=bucket.name,
+            Key=key_name,
+            PartNumber=part_num,
+            UploadId=upload_id,
+            Body=char * part_size
+        )
+        parts.append({'PartNumber': part_num, 'ETag': part_response['ETag']})
+    
+    zone.s3_client.complete_multipart_upload(
+        Bucket=bucket.name,
+        Key=key_name,
+        UploadId=upload_id,
+        MultipartUpload={'Parts': parts}
+    )
 
     zonegroup_meta_checkpoint(zonegroup)
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
@@ -1579,45 +1659,50 @@ def test_encrypted_object_sync():
     zonegroup_conns = ZonegroupConns(zonegroup)
 
     if len(zonegroup.rw_zones) < 2:
-        raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
+        raise SkipTest("test_encrypted_object_sync skipped. Requires 2 or more zones in master zonegroup.")
 
     (zone1, zone2) = zonegroup_conns.rw_zones[0:2]
 
-    # create a bucket on the first zone
     bucket_name = gen_bucket_name()
     log.info('create bucket zone=%s name=%s', zone1.name, bucket_name)
-    bucket = zone1.conn.create_bucket(bucket_name)
+    zone1.s3_client.create_bucket(Bucket=bucket_name)
 
+    data = 'A' * 512
+    
     # upload an object with sse-c encryption
-    sse_c_headers = {
-        'x-amz-server-side-encryption-customer-algorithm': 'AES256',
-        'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
-        'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
-    }
-    key = bucket.new_key('testobj-sse-c')
-    data = 'A'*512
-    key.set_contents_from_string(data, headers=sse_c_headers)
+    zone1.s3_client.put_object(
+        Bucket=bucket_name,
+        Key='testobj-sse-c',
+        Body=data,
+        SSECustomerAlgorithm='AES256',
+        SSECustomerKey='pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
+        SSECustomerKeyMD5='DWygnHRtgiJ77HCm+1rvHw=='
+    )
 
     # upload an object with sse-kms encryption
-    sse_kms_headers = {
-        'x-amz-server-side-encryption': 'aws:kms',
-        # testkey-1 must be present in 'rgw crypt s3 kms encryption keys' (vstart.sh adds this)
-        'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
-    }
-    key = bucket.new_key('testobj-sse-kms')
-    key.set_contents_from_string(data, headers=sse_kms_headers)
+    zone1.s3_client.put_object(
+        Bucket=bucket_name,
+        Key='testobj-sse-kms',
+        Body=data,
+        ServerSideEncryption='aws:kms',
+        SSEKMSKeyId='testkey-1'
+    )
 
-    # wait for the bucket metadata and data to sync
     zonegroup_meta_checkpoint(zonegroup)
     zone_bucket_checkpoint(zone2.zone, zone1.zone, bucket_name)
 
     # read the encrypted objects from the second zone
-    bucket2 = get_bucket(zone2, bucket_name)
-    key = bucket2.get_key('testobj-sse-c', headers=sse_c_headers)
-    eq(data, key.get_contents_as_string(headers=sse_c_headers, encoding='ascii'))
+    response = zone2.s3_client.get_object(
+        Bucket=bucket_name,
+        Key='testobj-sse-c',
+        SSECustomerAlgorithm='AES256',
+        SSECustomerKey='pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
+        SSECustomerKeyMD5='DWygnHRtgiJ77HCm+1rvHw=='
+    )
+    eq(data, response['Body'].read().decode('ascii'))
 
-    key = bucket2.get_key('testobj-sse-kms')
-    eq(data, key.get_contents_as_string(encoding='ascii'))
+    response = zone2.s3_client.get_object(Bucket=bucket_name, Key='testobj-sse-kms')
+    eq(data, response['Body'].read().decode('ascii'))
 
 @attr('bucket_trim')
 def test_bucket_index_log_trim():
@@ -1630,10 +1715,9 @@ def test_bucket_index_log_trim():
     def make_test_bucket():
         name = gen_bucket_name()
         log.info('create bucket zone=%s name=%s', zone.name, name)
-        bucket = zone.conn.create_bucket(name)
+        bucket = zone.create_bucket(name)
         for objname in ('a', 'b', 'c', 'd'):
-            k = new_key(zone, name, objname)
-            k.set_contents_from_string('foo')
+            zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo')
         zonegroup_meta_checkpoint(zonegroup)
         zonegroup_bucket_checkpoint(zonegroup_conns, name)
         return bucket
@@ -1688,10 +1772,9 @@ def test_bucket_reshard_index_log_trim():
     def make_test_bucket():
         name = gen_bucket_name()
         log.info('create bucket zone=%s name=%s', zone.name, name)
-        bucket = zone.conn.create_bucket(name)
+        bucket = zone.create_bucket(name)
         for objname in ('a', 'b', 'c', 'd'):
-            k = new_key(zone, name, objname)
-            k.set_contents_from_string('foo')
+            zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo')
         zonegroup_meta_checkpoint(zonegroup)
         zonegroup_bucket_checkpoint(zonegroup_conns, name)
         return bucket
@@ -1725,8 +1808,8 @@ def test_bucket_reshard_index_log_trim():
 
     # upload more objects
     for objname in ('e', 'f', 'g', 'h'):
-        k = new_key(zone, test_bucket.name, objname)
-        k.set_contents_from_string('foo')
+        zone.s3_client.put_object(Bucket=test_bucket.name, Key=objname, Body='foo')
+
     zonegroup_bucket_checkpoint(zonegroup_conns, test_bucket.name)
 
     # Resharding the bucket again
@@ -1757,8 +1840,8 @@ def test_bucket_reshard_index_log_trim():
 
     # upload more objects
     for objname in ('i', 'j', 'k', 'l'):
-        k = new_key(zone, test_bucket.name, objname)
-        k.set_contents_from_string('foo')
+        zone.s3_client.put_object(Bucket=test_bucket.name, Key=objname, Body='foo')
+
     zonegroup_bucket_checkpoint(zonegroup_conns, test_bucket.name)
 
     # verify the bucket has non-empty bilog
@@ -1776,11 +1859,10 @@ def test_bucket_log_trim_after_delete_bucket_primary_reshard():
     # create a test bucket, upload some objects, and wait for sync
     def make_test_bucket():
         name = gen_bucket_name()
-        log.info('create bucket zone=%s name=%s', primary.name, name)
-        bucket = primary.conn.create_bucket(name)
+        log.info('create bucket zone=%s name=%s', primary.zone.name, name)
+        bucket = primary.create_bucket(name)
         for objname in ('a', 'b', 'c', 'd'):
-            k = new_key(primary, name, objname)
-            k.set_contents_from_string('foo')
+            primary.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo')
         zonegroup_meta_checkpoint(zonegroup)
         zonegroup_bucket_checkpoint(zonegroup_conns, name)
         return bucket
@@ -1801,7 +1883,7 @@ def test_bucket_log_trim_after_delete_bucket_primary_reshard():
         primary.zone.cluster.admin(cmd)
 
     # delete bucket and test bilog autotrim
-    primary.conn.delete_bucket(test_bucket.name)
+    primary.s3_client.delete_bucket(Bucket=test_bucket.name)
     zonegroup_data_checkpoint(zonegroup_conns)
 
     bilog_autotrim(primary.zone, ['--rgw-sync-log-trim-max-buckets', '50'],)
@@ -1838,11 +1920,10 @@ def test_bucket_log_trim_after_delete_bucket_secondary_reshard():
     # create a test bucket, upload some objects, and wait for sync
     def make_test_bucket():
         name = gen_bucket_name()
-        log.info('create bucket zone=%s name=%s', primary.name, name)
-        bucket = primary.conn.create_bucket(name)
+        log.info('create bucket zone=%s name=%s', primary.zone.name, name)
+        bucket = primary.create_bucket(name)
         for objname in ('a', 'b', 'c', 'd'):
-            k = new_key(primary, name, objname)
-            k.set_contents_from_string('foo')
+            primary.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo')
         zonegroup_meta_checkpoint(zonegroup)
         zonegroup_bucket_checkpoint(zonegroup_conns, name)
         return bucket
@@ -1864,7 +1945,7 @@ def test_bucket_log_trim_after_delete_bucket_secondary_reshard():
         primary.zone.cluster.admin(cmd)
 
     # delete bucket and test bilog autotrim
-    primary.conn.delete_bucket(test_bucket.name)
+    primary.s3_client.delete_bucket(Bucket=test_bucket.name)
     zonegroup_data_checkpoint(zonegroup_conns)
 
     bilog_autotrim(secondary.zone, ['--rgw-sync-log-trim-max-buckets', '50'],)
@@ -1897,15 +1978,13 @@ def test_bucket_reshard_incremental():
     zonegroup_conns = ZonegroupConns(zonegroup)
     zone = zonegroup_conns.rw_zones[0]
 
-    # create a bucket
     bucket = zone.create_bucket(gen_bucket_name())
     log.debug('created bucket=%s', bucket.name)
     zonegroup_meta_checkpoint(zonegroup)
 
     # upload some objects
     for objname in ('a', 'b', 'c', 'd'):
-        k = new_key(zone, bucket.name, objname)
-        k.set_contents_from_string('foo')
+        zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo')
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
 
     # reshard in each zone
@@ -1917,8 +1996,7 @@ def test_bucket_reshard_incremental():
 
     # upload more objects
     for objname in ('e', 'f', 'g', 'h'):
-        k = new_key(zone, bucket.name, objname)
-        k.set_contents_from_string('foo')
+        zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo')
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
 
 @attr('bucket_reshard')
@@ -1940,8 +2018,7 @@ def test_bucket_reshard_full():
     try:
         # upload some objects
         for objname in ('a', 'b', 'c', 'd'):
-            k = new_key(zone, bucket.name, objname)
-            k.set_contents_from_string('foo')
+            zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo')
 
         # reshard on first zone
         zone.zone.cluster.admin(['bucket', 'reshard',
@@ -1951,8 +2028,7 @@ def test_bucket_reshard_full():
 
         # upload more objects
         for objname in ('e', 'f', 'g', 'h'):
-            k = new_key(zone, bucket.name, objname)
-            k.set_contents_from_string('foo')
+            zone.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo')
     finally:
         for z in zonegroup_conns.rw_zones[1:]:
             z.zone.start()
@@ -1962,14 +2038,17 @@ def test_bucket_reshard_full():
 def test_bucket_creation_time():
     zonegroup = realm.master_zonegroup()
     zonegroup_conns = ZonegroupConns(zonegroup)
-
     zonegroup_meta_checkpoint(zonegroup)
 
-    zone_buckets = [zone.get_connection().get_all_buckets() for zone in zonegroup_conns.rw_zones]
+    zone_buckets = []
+    for zone in zonegroup_conns.rw_zones:
+        response = zone.s3_client.list_buckets()
+        zone_buckets.append(response['Buckets'])
+    
     for z1, z2 in combinations(zone_buckets, 2):
         for a, b in zip(z1, z2):
-            eq(a.name, b.name)
-            eq(a.creation_date, b.creation_date)
+            eq(a['Name'], b['Name'])
+            eq(a['CreationDate'], b['CreationDate'])
 
 def get_bucket_shard_objects(zone, num_shards):
     """
@@ -1990,8 +2069,7 @@ def write_most_shards(zone, bucket_name, num_shards):
     random.shuffle(objs)
     del objs[-(len(objs)//10):]
     for obj in objs:
-        k = new_key(zone, bucket_name, obj)
-        k.set_contents_from_string('foo')
+        zone.s3_client.put_object(Bucket=bucket_name, Key=obj, Body='foo')
 
 def reshard_bucket(zone, bucket_name, num_shards):
     """
@@ -2118,8 +2196,7 @@ def test_zap_init_bucket_sync_run():
 
     # Write zeroth generation
     for obj in range(1, 6):
-        k = new_key(primary, bucket.name, f'obj{obj * 11}')
-        k.set_contents_from_string('foo')
+        primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * 11}', Body='foo')
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
 
     # Write several more generations
@@ -2127,8 +2204,7 @@ def test_zap_init_bucket_sync_run():
     for num_shards in generations:
         reshard_bucket(primary.zone, bucket.name, num_shards)
         for obj in range(1, 6):
-            k = new_key(primary, bucket.name, f'obj{obj * num_shards}')
-            k.set_contents_from_string('foo')
+            primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * num_shards}', Body='foo')
         zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
 
 
@@ -2168,8 +2244,7 @@ def test_list_bucket_key_marker_encoding():
 
     # test for object names with '%' character.
     for obj in range(1, 1100):
-        k = new_key(primary, bucket.name, f'obj%{obj * 11}')
-        k.set_contents_from_string('foo')
+        primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj%{obj * 11}', Body='foo')
 
     # wait for the secondary to catch up
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
@@ -2185,12 +2260,11 @@ def test_list_bucket_key_marker_encoding():
     
     # write an object during incremental sync.
     objname = 'test_incremental'
-    k = new_key(primary, bucket, objname)
-    k.set_contents_from_string('foo')
+    primary.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo')
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
     
     # the object uploaded after bucket sync init should be replicated
-    check_object_exists(bucket, objname)
+    check_object_exists(secondary, bucket.name, objname)
     
 
 def test_role_sync():
@@ -2270,8 +2344,7 @@ def test_replication_status():
 
     bucket = zone.conn.create_bucket(gen_bucket_name())
     obj_name = "a"
-    k = new_key(zone, bucket.name, obj_name)
-    k.set_contents_from_string('foo')
+    zone.s3_client.put_object(Bucket=bucket.name, Key=obj_name, Body='foo')
     zonegroup_meta_checkpoint(zonegroup)
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
 
@@ -2301,26 +2374,24 @@ def test_object_acl():
     log.debug('created bucket=%s', bucket.name)
 
     # upload a dummy object and wait for sync.
-    k = new_key(primary, bucket, 'dummy')
-    k.set_contents_from_string('foo')
+    objname = 'dummy'
+    primary.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo')
     zonegroup_meta_checkpoint(zonegroup)
     zonegroup_data_checkpoint(zonegroup_conns)
 
-    #check object on secondary before setacl
-    bucket2 = get_bucket(secondary, bucket.name)
-    before_set_acl = bucket2.get_acl(k)
-    assert(len(before_set_acl.acl.grants) == 1)
+    # check object on secondary before setacl
+    before_acl = secondary.s3_client.get_object_acl(Bucket=bucket.name, Key=objname)
+    assert len(before_acl['Grants']) == 1
 
-    #set object acl on primary and wait for sync.
-    bucket.set_canned_acl('public-read', key_name=k)
-    log.debug('set acl=%s', bucket.name)
+    # set object acl on secondary and wait for sync
+    secondary.s3_client.put_object_acl(Bucket=bucket.name, Key=objname, ACL='public-read')
+    log.debug('set acl on bucket=%s', bucket.name)
     zonegroup_data_checkpoint(zonegroup_conns)
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
 
-    #check object secondary after setacl
-    bucket2 = get_bucket(secondary, bucket.name)
-    after_set_acl = bucket2.get_acl(k)
-    assert(len(after_set_acl.acl.grants) == 2) # read grant added on AllUsers
+    # check object on primary after setacl
+    after_acl = primary.s3_client.get_object_acl(Bucket=bucket.name, Key=objname)
+    assert len(after_acl['Grants']) == 2  # read grant added on AllUsers
 
 def test_assume_role_after_sync():
     zonegroup = realm.master_zonegroup()
@@ -2375,8 +2446,7 @@ def test_bucket_full_sync_after_data_sync_init():
 
         # write some objects that don't sync yet
         for obj in range(1, 6):
-            k = new_key(primary, bucket.name, f'obj{obj * 11}')
-            k.set_contents_from_string('foo')
+            primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * 11}', Body='foo')
 
         cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
         cmd += ['--source-zone', primary.name]
@@ -2408,16 +2478,14 @@ def test_resharded_bucket_full_sync_after_data_sync_init():
 
         # Write zeroth generation
         for obj in range(1, 6):
-            k = new_key(primary, bucket.name, f'obj{obj * 11}')
-            k.set_contents_from_string('foo')
+            primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * 11}', Body='foo')
 
         # Write several more generations
         generations = [17, 19, 23, 29, 31, 37]
         for num_shards in generations:
             reshard_bucket(primary.zone, bucket.name, num_shards)
             for obj in range(1, 6):
-                k = new_key(primary, bucket.name, f'obj{obj * num_shards}')
-                k.set_contents_from_string('foo')
+                primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * num_shards}', Body='foo')
 
         cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
         cmd += ['--source-zone', primary.name]
@@ -2444,8 +2512,7 @@ def test_bucket_incremental_sync_after_data_sync_init():
 
     # upload a dummy object and wait for sync. this forces each zone to finish
     # a full sync and switch to incremental
-    k = new_key(primary, bucket, 'dummy')
-    k.set_contents_from_string('foo')
+    primary.s3_client.put_object(Bucket=bucket.name, Key='dummy', Body='foo')
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
 
     try:
@@ -2454,8 +2521,7 @@ def test_bucket_incremental_sync_after_data_sync_init():
 
         # Write more objects to primary
         for obj in range(1, 6):
-            k = new_key(primary, bucket.name, f'obj{obj * 11}')
-            k.set_contents_from_string('foo')
+            primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * 11}', Body='foo')
 
         cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
         cmd += ['--source-zone', primary.name]
@@ -2483,16 +2549,15 @@ def test_resharded_bucket_incremental_sync_latest_after_data_sync_init():
 
     # Write zeroth generation to primary
     for obj in range(1, 6):
-        k = new_key(primary, bucket.name, f'obj{obj * 11}')
-        k.set_contents_from_string('foo')
+        primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * 11}', Body='foo')
 
     # Write several more generations
     generations = [17, 19, 23, 29, 31, 37]
     for num_shards in generations:
         reshard_bucket(primary.zone, bucket.name, num_shards)
         for obj in range(1, 6):
-            k = new_key(primary, bucket.name, f'obj{obj * num_shards}')
-            k.set_contents_from_string('foo')
+            primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * num_shards}', Body='foo')
+
 
     # wait for the secondary to catch up to the latest gen
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
@@ -2503,8 +2568,7 @@ def test_resharded_bucket_incremental_sync_latest_after_data_sync_init():
 
         # write some more objects to the last gen
         for obj in range(1, 6):
-            k = new_key(primary, bucket.name, f'obj{obj * generations[-1]}')
-            k.set_contents_from_string('foo')
+            primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * generations[-1]}', Body='foo')
 
         cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
         cmd += ['--source-zone', primary.name]
@@ -2532,8 +2596,7 @@ def test_resharded_bucket_incremental_sync_oldest_after_data_sync_init():
 
     # Write zeroth generation to primary
     for obj in range(1, 6):
-        k = new_key(primary, bucket.name, f'obj{obj * 11}')
-        k.set_contents_from_string('foo')
+        primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * 11}', Body='foo')
 
     # wait for the secondary to catch up
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
@@ -2547,8 +2610,8 @@ def test_resharded_bucket_incremental_sync_oldest_after_data_sync_init():
         for num_shards in generations:
             reshard_bucket(primary.zone, bucket.name, num_shards)
             for obj in range(1, 6):
-                k = new_key(primary, bucket.name, f'obj{obj * num_shards}')
-                k.set_contents_from_string('foo')
+                primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{obj * num_shards}', Body='foo')
+
 
         cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
         cmd += ['--source-zone', primary.name]
@@ -2699,30 +2762,49 @@ def create_zone_bucket(zone):
     return bucket
 
 def create_object(zone_conn, bucket, objname, content):
-    k = new_key(zone_conn, bucket.name, objname)
-    k.set_contents_from_string(content)
+    bucket_name = bucket.name if hasattr(bucket, 'name') else bucket
+    zone_conn.s3_client.put_object(Bucket=bucket_name, Key=objname, Body=content)
 
 def create_objects(zone_conn, bucket, obj_arr, content):
+    bucket_name = bucket.name if hasattr(bucket, 'name') else bucket
     for objname in obj_arr:
-        create_object(zone_conn, bucket, objname, content)
+        zone_conn.s3_client.put_object(Bucket=bucket_name, Key=objname, Body=content)
 
-def check_object_exists(bucket, objname, content = None):
-    k = bucket.get_key(objname)
-    assert_not_equal(k, None)
-    if (content != None):
-        assert_equal(k.get_contents_as_string(encoding='ascii'), content)
+def check_object_exists(zone_or_client, bucket_name, objname, content=None):
+    # handle both zone connection and direct client request
+    client = zone_or_client.s3_client if hasattr(zone_or_client, 's3_client') else zone_or_client
 
-def check_objects_exist(bucket, obj_arr, content = None):
+    try:
+        response = client.get_object(Bucket=bucket_name, Key=objname)
+        if content is not None:
+            actual_content = response['Body'].read()
+            if isinstance(content, str):
+                actual_content = actual_content.decode('utf-8')
+            assert_equal(actual_content, content)
+    except ClientError as e:
+        if e.response['Error']['Code'] == 'NoSuchKey':
+            assert False, f"Object {objname} does not exist in bucket {bucket_name}"
+        raise
+
+def check_objects_exist(zone_or_client, bucket_name, obj_arr, content=None):
+    if isinstance(obj_arr, str):
+        obj_arr = [obj_arr]
     for objname in obj_arr:
-        check_object_exists(bucket, objname, content)
+        check_object_exists(zone_or_client, bucket_name, objname, content)
 
-def check_object_not_exists(bucket, objname):
-    k = bucket.get_key(objname)
-    assert_equal(k, None)
-
-def check_objects_not_exist(bucket, obj_arr):
+def check_object_not_exists(zone_or_client, bucket_name, objname):
+    client = zone_or_client.s3_client if hasattr(zone_or_client, 's3_client') else zone_or_client
+    try:
+        client.head_object(Bucket=bucket_name, Key=objname)
+        assert False, f"Object {objname} should not exist in bucket {bucket_name}"
+    except ClientError as e:
+        assert e.response['Error']['Code'] == '404'
+
+def check_objects_not_exist(zone_or_client, bucket_name, obj_arr):
+    if isinstance(obj_arr, str):
+        obj_arr = [obj_arr]
     for objname in obj_arr:
-        check_object_not_exists(bucket, objname)
+        check_object_not_exists(zone_or_client, bucket_name, objname)
 
 @attr('fails_with_rgw')
 @attr('sync_policy')
@@ -2825,31 +2907,22 @@ def test_sync_flow_symmetrical_zonegroup_all():
     zonegroup.period.update(zoneA, commit=True)
     get_sync_policy(c1)
 
-    objnames = [ 'obj1', 'obj2' ]
+    objnames = ['obj1', 'obj2']
     content = 'asdasd'
-    buckets = []
 
     # create bucket & object in all zones
     bucketA = create_zone_bucket(zcA)
-    buckets.append(bucketA)
-    create_object(zcA, bucketA, objnames[0], content)
+    zcA.s3_client.put_object(Bucket=bucketA.name, Key=objnames[0], Body=content)
 
     bucketB = create_zone_bucket(zcB)
-    buckets.append(bucketB)
-    create_object(zcB, bucketB, objnames[1], content)
+    zcB.s3_client.put_object(Bucket=bucketB.name, Key=objnames[1], Body=content)
 
     zonegroup_meta_checkpoint(zonegroup)
-    # 'zonegroup_data_checkpoint' currently fails for the zones not
-    # allowed to sync. So as a workaround, data checkpoint is done
-    # for only the ones configured.
     zone_data_checkpoint(zoneB, zoneA)
 
-    # verify if objects are synced accross the zone
-    bucket = get_bucket(zcB, bucketA.name)
-    check_object_exists(bucket, objnames[0], content)
-
-    bucket = get_bucket(zcA, bucketB.name)
-    check_object_exists(bucket, objnames[1], content)
+    # verify objects synced
+    check_object_exists(zcB, bucketA.name, objnames[0], content)
+    check_object_exists(zcA, bucketB.name, objnames[1], content)
 
     remove_sync_policy_group(c1, "sync-group")
     return
@@ -2891,13 +2964,13 @@ def test_sync_flow_symmetrical_zonegroup_select():
     content = 'asdasd'
 
     # create bucketA & objects in zoneA
-    objnamesA = [ 'obj1', 'obj2', 'obj3' ]
+    objnamesA = ['obj1', 'obj2', 'obj3']
     bucketA = create_zone_bucket(zcA)
     buckets.append(bucketA)
     create_objects(zcA, bucketA, objnamesA, content)
 
     # create bucketB & objects in zoneB
-    objnamesB = [ 'obj4', 'obj5', 'obj6' ]
+    objnamesB = ['obj4', 'obj5', 'obj6']
     bucketB = create_zone_bucket(zcB)
     buckets.append(bucketB)
     create_objects(zcB, bucketB, objnamesB, content)
@@ -2907,18 +2980,12 @@ def test_sync_flow_symmetrical_zonegroup_select():
     zone_data_checkpoint(zoneA, zoneB)
 
     # verify if objnamesA synced to only zoneB but not zoneC
-    bucket = get_bucket(zcB, bucketA.name)
-    check_objects_exist(bucket, objnamesA, content)
-
-    bucket = get_bucket(zcC, bucketA.name)
-    check_objects_not_exist(bucket, objnamesA)
+    check_objects_exist(zcB, bucketA.name, objnamesA, content)
+    check_objects_not_exist(zcC, bucketA.name, objnamesA)
 
     # verify if objnamesB synced to only zoneA but not zoneC
-    bucket = get_bucket(zcA, bucketB.name)
-    check_objects_exist(bucket, objnamesB, content)
-
-    bucket = get_bucket(zcC, bucketB.name)
-    check_objects_not_exist(bucket, objnamesB)
+    check_objects_exist(zcA, bucketB.name, objnamesB, content)
+    check_objects_not_exist(zcC, bucketB.name, objnamesB)
 
     remove_sync_policy_group(c1, "sync-group")
     return
@@ -2938,7 +3005,7 @@ def test_sync_flow_directional_zonegroup_select():
     zonegroup_conns = ZonegroupConns(zonegroup)
 
     if len(zonegroup.zones) < 3:
-        raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.")
+        raise SkipTest("test_sync_flow_directional_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.")
 
     zonegroup_meta_checkpoint(zonegroup)
 
@@ -2977,18 +3044,12 @@ def test_sync_flow_directional_zonegroup_select():
     zone_data_checkpoint(zoneB, zoneA)
 
     # verify if objnamesA synced to only zoneB but not zoneC
-    bucket = get_bucket(zcB, bucketA.name)
-    check_objects_exist(bucket, objnamesA, content)
-
-    bucket = get_bucket(zcC, bucketA.name)
-    check_objects_not_exist(bucket, objnamesA)
+    check_objects_exist(zcB, bucketA.name, objnamesA, content)
+    check_objects_not_exist(zcC, bucketA.name, objnamesA)
 
     # verify if objnamesB are not synced to either zoneA or zoneC
-    bucket = get_bucket(zcA, bucketB.name)
-    check_objects_not_exist(bucket, objnamesB)
-
-    bucket = get_bucket(zcC, bucketB.name)
-    check_objects_not_exist(bucket, objnamesB)
+    check_objects_not_exist(zcA, bucketB.name, objnamesB)
+    check_objects_not_exist(zcC, bucketB.name, objnamesB)
 
     """
         verify the same at bucketA level
@@ -3030,17 +3091,16 @@ def test_sync_flow_directional_zonegroup_select():
     zone_data_checkpoint(zoneB, zoneA)
 
     # verify that objnamesC are synced to bucketA in zoneB
-    bucket = get_bucket(zcB, bucketA.name)
-    check_objects_exist(bucket, objnamesC, content)
+    check_objects_exist(zcB, bucketA.name, objnamesC, content)
 
     # verify that objnamesD are not synced to bucketA in zoneA
-    bucket = get_bucket(zcA, bucketA.name)
-    check_objects_not_exist(bucket, objnamesD)
+    check_objects_not_exist(zcA, bucketA.name, objnamesD)
 
     remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
     remove_sync_policy_group(c1, "sync-group")
     return
 
+
 @attr('fails_with_rgw')
 @attr('sync_policy')
 def test_sync_single_bucket():
@@ -3103,12 +3163,10 @@ def test_sync_single_bucket():
     zone_data_checkpoint(zoneB, zoneA)
 
     # verify if bucketA objects are synced
-    bucket = get_bucket(zcB, bucketA.name)
-    check_objects_exist(bucket, objnames, content)
+    check_objects_exist(zcB, bucketA.name, objnames, content)
 
     # bucketB objects should not be synced
-    bucket = get_bucket(zcB, bucketB.name)
-    check_objects_not_exist(bucket, objnames)
+    check_objects_not_exist(zcB, bucketB.name, objnames)
 
 
     """
@@ -3143,12 +3201,10 @@ def test_sync_single_bucket():
     zone_data_checkpoint(zoneB, zoneA)
 
     # verify if bucketA objects are synced
-    bucket = get_bucket(zcB, bucketA.name)
-    check_object_exists(bucket, objnames[2], content)
+    check_object_exists(zcB, bucketA.name, objnames[2], content)
 
     # bucketB objects should not be synced
-    bucket = get_bucket(zcB, bucketB.name)
-    check_object_not_exists(bucket, objnames[2])
+    check_object_not_exists(zcB, bucketB.name, objnames[2])
 
     remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
     remove_sync_policy_group(c1, "sync-group")
@@ -3217,11 +3273,9 @@ def test_sync_different_buckets():
 
     # verify that objects are synced to bucketB in zoneB
     # but not to bucketA
-    bucket = get_bucket(zcB, bucketA.name)
-    check_objects_not_exist(bucket, objnames)
+    check_objects_not_exist(zcB, bucketA.name, objnames)
+    check_objects_exist(zcB, bucketB.name, objnames, content)
 
-    bucket = get_bucket(zcB, bucketB.name)
-    check_objects_exist(bucket, objnames, content)
     """
         Method (b): configure policy at only bucketA level with pipe
         set to bucketB in target zone
@@ -3258,11 +3312,8 @@ def test_sync_different_buckets():
     # verify that objects are synced to bucketB in zoneB
     # but not to bucketA
     """
-    bucket = get_bucket(zcB, bucketA.name)
-    check_objects_not_exist(bucket, objnamesC)
-
-    bucket = get_bucket(zcB, bucketB.name)
-    check_objects_exist(bucket, objnamesC, content)
+    check_objects_not_exist(zcB, bucketA.name, objnamesC)
+    check_objects_exist(zcB, bucketB.name, objnamesC, content)
 
     remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
     zonegroup_meta_checkpoint(zonegroup)
@@ -3296,9 +3347,8 @@ def test_sync_different_buckets():
     # verify that objects from only bucketB are synced to
     # bucketA in zoneA
     """
-    bucket = get_bucket(zcA, bucketA.name)
-    check_objects_not_exist(bucket, objnamesD)
-    check_objects_exist(bucket, objnamesE, content)
+    check_objects_not_exist(zcA, bucketA.name, objnamesD)
+    check_objects_exist(zcA, bucketA.name, objnamesE, content)
 
     remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
     remove_sync_policy_group(c1, "sync-group")
@@ -3365,13 +3415,11 @@ def test_sync_multiple_buckets_to_single():
 
     # verify that both zoneA bucketA & bucketB objects are synced to
     # bucketB in zoneB but not to bucketA
-    bucket = get_bucket(zcB, bucketA.name)
-    check_objects_not_exist(bucket, objnamesA)
-    check_objects_not_exist(bucket, objnamesB)
+    check_objects_not_exist(zcB, bucketA.name, objnamesA)
+    check_objects_not_exist(zcB, bucketA.name, objnamesB)
 
-    bucket = get_bucket(zcB, bucketB.name)
-    check_objects_exist(bucket, objnamesA, content)
-    check_objects_exist(bucket, objnamesB, content)
+    check_objects_exist(zcB, bucketB.name, objnamesA, content)
+    check_objects_exist(zcB, bucketB.name, objnamesB, content)
 
     """
         Method (b): configure at bucket level
@@ -3414,13 +3462,11 @@ def test_sync_multiple_buckets_to_single():
 
     # verify that both zoneA bucketA & bucketB objects are synced to
     # bucketA in zoneB but not to bucketB
-    bucket = get_bucket(zcB, bucketB.name)
-    check_objects_not_exist(bucket, objnamesC)
-    check_objects_not_exist(bucket, objnamesD)
+    check_objects_not_exist(zcB, bucketB.name, objnamesC)
+    check_objects_not_exist(zcB, bucketB.name, objnamesD)
 
-    bucket = get_bucket(zcB, bucketA.name)
-    check_objects_exist(bucket, objnamesD, content)
-    check_objects_exist(bucket, objnamesD, content)
+    check_objects_exist(zcB, bucketA.name, objnamesC, content)
+    check_objects_exist(zcB, bucketA.name, objnamesD, content)
 
     remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
     remove_sync_policy_group(c1, "sync-group")
@@ -3487,11 +3533,8 @@ def test_sync_single_bucket_to_multiple():
 
     # verify that objects from zoneA bucketA are synced to both
     # bucketA & bucketB in zoneB
-    bucket = get_bucket(zcB, bucketA.name)
-    check_objects_exist(bucket, objnamesA, content)
-
-    bucket = get_bucket(zcB, bucketB.name)
-    check_objects_exist(bucket, objnamesA, content)
+    check_objects_exist(zcB, bucketA.name, objnamesA, content)
+    check_objects_exist(zcB, bucketB.name, objnamesA, content)
 
     """
         Method (b): configure at bucket level
@@ -3528,11 +3571,8 @@ def test_sync_single_bucket_to_multiple():
 
     # verify that objects from zoneA bucketA are synced to both
     # bucketA & bucketB in zoneB
-    bucket = get_bucket(zcB, bucketA.name)
-    check_objects_exist(bucket, objnamesB, content)
-
-    bucket = get_bucket(zcB, bucketB.name)
-    check_objects_exist(bucket, objnamesB, content)
+    check_objects_exist(zcB, bucketA.name, objnamesB, content)
+    check_objects_exist(zcB, bucketB.name, objnamesB, content)
 
     remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
     remove_sync_policy_group(c1, "sync-group")
@@ -3589,7 +3629,7 @@ def test_bucket_remove_rgw_down():
             assert check_all_buckets_exist(zone, buckets)
 
         for zone, bucket_name in zone_bucket:
-            zone.conn.delete_bucket(bucket_name)
+            zone.s3_client.delete_bucket(Bucket=bucket.name)
 
         zonegroup_meta_checkpoint(zonegroup)
 
@@ -3927,8 +3967,7 @@ def test_copy_object_same_bucket():
     objname = 'dummy'
 
     # upload a dummy object and wait for sync.
-    k = new_key(primary, bucket, objname)
-    k.set_contents_from_string('foo')
+    primary.s3_client.put_object(Bucket=bucket.name, Key=objname, Body='foo')
     zonegroup_meta_checkpoint(zonegroup)
 
     zonegroup_data_checkpoint(zonegroup_conns)
@@ -3937,16 +3976,20 @@ def test_copy_object_same_bucket():
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
 
     # copy object on primary zone
-    primary.s3_client.copy_object(Bucket=bucket.name,
-        CopySource=bucket.name + '/'+ objname,
-        Key= objname + '-copy1')
+    primary.s3_client.copy_object(
+        Bucket=bucket.name,
+        CopySource={'Bucket': bucket.name, 'Key': objname},
+        Key=objname + '-copy1'
+    )
 
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
 
     # copy object on secondary zone
-    secondary.s3_client.copy_object(Bucket=bucket.name,
-        Key= objname + '-copy2', 
-        CopySource=bucket.name + '/'+ objname)
+    secondary.s3_client.copy_object(
+        Bucket=bucket.name,
+        Key=objname + '-copy2',
+        CopySource={'Bucket': bucket.name, 'Key': objname}
+    )
 
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
 
@@ -3963,8 +4006,7 @@ def test_copy_object_different_bucket():
     objname = 'dummy'
 
     # upload a dummy object and wait for sync.
-    k = new_key(primary, source_bucket, objname)
-    k.set_contents_from_string('foo')
+    primary.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
     zonegroup_meta_checkpoint(zonegroup)
     
     zonegroup_bucket_checkpoint(zonegroup_conns, source_bucket.name)
@@ -3976,9 +4018,11 @@ def test_copy_object_different_bucket():
     zonegroup_meta_checkpoint(zonegroup)
             
     # copy object on primary zone
-    primary.s3_client.copy_object(Bucket = dest_bucket.name,
-        Key = objname + '-copy',
-        CopySource = source_bucket.name + '/' + objname)
+    primary.s3_client.copy_object(
+        Bucket=dest_bucket.name,
+        Key=objname + '-copy',
+        CopySource={'Bucket': source_bucket.name, 'Key': objname}
+    )
     
     zonegroup_bucket_checkpoint(zonegroup_conns, dest_bucket.name)
 
@@ -4108,14 +4152,13 @@ def test_bucket_replication_normal_delete():
 
     # upload an object and wait for sync.
     objname = 'dummy'
-    k = new_key(source, source_bucket, objname)
-    k.set_contents_from_string('foo')
+    source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
     time.sleep(config.checkpoint_delay)
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object exists in destination bucket
-    k = get_key(dest, dest_bucket, objname)
-    assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+    response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+    assert_equal(response['Body'].read().decode('utf-8'), 'foo')
 
     # delete object on source
     source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname)
@@ -4167,13 +4210,12 @@ def test_bucket_replication_normal_deletemarker():
 
     # upload an object and wait for sync.
     objname = 'dummy'
-    k = new_key(source, source_bucket, objname)
-    k.set_contents_from_string('foo')
+    source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object exists in destination bucket
-    k = get_key(dest, dest_bucket, objname)
-    assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+    response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+    assert_equal(response['Body'].read().decode('utf-8'), 'foo')
 
     # delete object on source
     source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname)
@@ -4215,8 +4257,7 @@ def test_bucket_replication_alt_user_forbidden():
 
     # upload an object and wait for sync.
     objname = 'dummy'
-    k = new_key(source, source_bucket, objname)
-    k.set_contents_from_string('foo')
+    source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object does not exist in destination bucket
@@ -4379,8 +4420,7 @@ def test_bucket_replication_non_versioned_to_versioned():
 
     # upload an object and wait for sync.
     objname = 'dummy'
-    k = new_key(source, source_bucket, objname)
-    k.set_contents_from_string('foo')
+    source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object not exists in destination bucket
@@ -4427,8 +4467,7 @@ def test_bucket_replication_versioned_to_non_versioned():
 
     # upload an object and wait for sync.
     objname = 'dummy'
-    k = new_key(source, source_bucket, objname)
-    k.set_contents_from_string('foo')
+    source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object not exists in destination bucket
@@ -4491,8 +4530,7 @@ def test_bucket_replication_lock_enabled_to_lock_disabled():
 
     # upload an object and wait for sync.
     objname = 'dummy'
-    k = new_key(source, source_bucket_name, objname)
-    k.set_contents_from_string('foo')
+    source.s3_client.put_object(Bucket=source_bucket_name, Key=objname, Body='foo')
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object does not exist in destination bucket
@@ -4555,8 +4593,7 @@ def test_bucket_replication_lock_disabled_to_lock_enabled():
 
     # upload an object and wait for sync.
     objname = 'dummy'
-    k = new_key(source, source_bucket.name, objname)
-    k.set_contents_from_string('foo')
+    source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object does not exist in destination bucket
@@ -4566,10 +4603,8 @@ def test_bucket_replication_lock_disabled_to_lock_enabled():
 @attr('sync_policy')
 @attr('fails_with_rgw')
 def test_bucket_delete_with_zonegroup_sync_policy_directional():
-
     zonegroup = realm.master_zonegroup()
     zonegroup_conns = ZonegroupConns(zonegroup)
-
     zonegroup_meta_checkpoint(zonegroup)
 
     (zoneA, zoneB) = zonegroup.zones[0:2]
@@ -4577,7 +4612,6 @@ def test_bucket_delete_with_zonegroup_sync_policy_directional():
 
     c1 = zoneA.cluster
 
-    # configure sync policy
     zones = zoneA.name + ',' + zoneB.name
     c1.admin(['sync', 'policy', 'get'])
     create_sync_policy_group(c1, "sync-group")
@@ -4588,74 +4622,58 @@ def test_bucket_delete_with_zonegroup_sync_policy_directional():
     zonegroup.period.update(zoneA, commit=True)
     get_sync_policy(c1)
 
-    # configure sync policy for only bucketA and enable it
     bucketA = create_zone_bucket(zcA)
-    buckets = []
-    buckets.append(bucketA)
 
     time.sleep(config.checkpoint_delay)
     zonegroup_meta_checkpoint(zonegroup)
 
-    # create bucketA and objects in zoneA and zoneB
     objnameA = 'a'
     objnameB = 'b'
 
-    # upload object in each zone and wait for sync.
-    k = new_key(zcA, bucketA, objnameA)
-    k.set_contents_from_string('foo')
-    k = new_key(zcB, bucketA, objnameB)
-    k.set_contents_from_string('foo')
+    zcA.s3_client.put_object(Bucket=bucketA.name, Key=objnameA, Body='foo')
+    zcB.s3_client.put_object(Bucket=bucketA.name, Key=objnameB, Body='foo')
 
     zonegroup_meta_checkpoint(zonegroup)
-    zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+    zone_bucket_checkpoint(zoneB, zoneA, bucketA.name)
     zone_data_checkpoint(zoneB, zoneA)
 
-    # verify that objnameA is synced to bucketA in zoneB
-    bucket = get_bucket(zcB, bucketA.name)
-    check_objects_exist(bucket, objnameA)
-
-    # verify that objnameB is not synced to bucketA in zoneA
-    bucket = get_bucket(zcA, bucketA.name)
-    check_objects_not_exist(bucket, objnameB)
+    # verify sync
+    check_object_exists(zcB, bucketA.name, objnameA, 'foo')
+    check_object_not_exists(zcA, bucketA.name, objnameB)
 
     log.debug('deleting object on zone A')
-    k = get_key(zcA, bucket, objnameA)
-    k.delete()
+    zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameA)
+    zone_bucket_checkpoint(zoneB, zoneA, bucketA.name)
 
-    zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
-
-    # delete bucket on zoneA. it should fail to delete
+    # delete bucket - should fail
     log.debug('deleting bucket')
-    assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name)
+    e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name)
+    assert e.response['Error']['Code'] == 'BucketNotEmpty'
 
-    assert check_all_buckets_exist(zcA, buckets)
-    assert check_all_buckets_exist(zcB, buckets)
+    assert check_all_buckets_exist(zcA, [bucketA.name])
+    assert check_all_buckets_exist(zcB, [bucketA.name])
 
-    # retry deleting bucket after removing the object from zone B. should succeed
+    # retry after deleting object from zone B
     log.debug('deleting object on zone B')
-    k = get_key(zcB, bucket, objnameB)
-    k.delete()
+    zcB.s3_client.delete_object(Bucket=bucketA.name, Key=objnameB)
     time.sleep(config.checkpoint_delay)
 
     log.debug('retry deleting bucket')
-    zcA.delete_bucket(bucketA.name)
+    zcA.s3_client.delete_bucket(Bucket=bucketA.name)
 
     zonegroup_meta_checkpoint(zonegroup)
 
-    assert check_all_buckets_dont_exist(zcA, buckets)
-    assert check_all_buckets_dont_exist(zcB, buckets)
+    assert check_all_buckets_dont_exist(zcA, [bucketA.name])
+    assert check_all_buckets_dont_exist(zcB, [bucketA.name])
 
     remove_sync_policy_group(c1, "sync-group")
-
     return
 
 @attr('sync_policy')
 @attr('fails_with_rgw')
 def test_bucket_delete_with_bucket_sync_policy_directional():
-
     zonegroup = realm.master_zonegroup()
     zonegroup_conns = ZonegroupConns(zonegroup)
-
     zonegroup_meta_checkpoint(zonegroup)
 
     (zoneA, zoneB) = zonegroup.zones[0:2]
@@ -4663,7 +4681,6 @@ def test_bucket_delete_with_bucket_sync_policy_directional():
 
     c1 = zoneA.cluster
 
-    # configure sync policy
     zones = zoneA.name + ',' + zoneB.name
     c1.admin(['sync', 'policy', 'get'])
     create_sync_policy_group(c1, "sync-group")
@@ -4674,20 +4691,9 @@ def test_bucket_delete_with_bucket_sync_policy_directional():
     zonegroup.period.update(zoneA, commit=True)
     get_sync_policy(c1)
 
-    """
-        configure policy at bucketA level with src and dest
-        zones specified to zoneA and zoneB resp.
-
-        verify zoneA bucketA syncs to zoneB BucketA but not viceversa.
-    """
-
-    # configure sync policy for only bucketA and enable it
     bucketA = create_zone_bucket(zcA)
-    buckets = []
-    buckets.append(bucketA)
     create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
     create_sync_group_flow_directional(c1, "sync-bucket", "sync-flow-bucket", zoneA.name, zoneB.name, bucketA.name)
-    #create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow-bucket", zones, bucketA.name)
     create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zoneA.name, zoneB.name, bucketA.name)
     set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
 
@@ -4696,56 +4702,42 @@ def test_bucket_delete_with_bucket_sync_policy_directional():
 
     zonegroup_meta_checkpoint(zonegroup)
 
-    # create bucketA and objects in zoneA and zoneB
     objnameA = 'a'
     objnameB = 'b'
 
-    # upload object in each zone and wait for sync.
-    k = new_key(zcA, bucketA, objnameA)
-    k.set_contents_from_string('foo')
-    k = new_key(zcB, bucketA, objnameB)
-    k.set_contents_from_string('foo')
+    zcA.s3_client.put_object(Bucket=bucketA.name, Key=objnameA, Body='foo')
+    zcB.s3_client.put_object(Bucket=bucketA.name, Key=objnameB, Body='foo')
 
     zonegroup_meta_checkpoint(zonegroup)
     zone_data_checkpoint(zoneB, zoneA)
 
-    # verify that objnameA is synced to bucketA in zoneB
-    bucket = get_bucket(zcB, bucketA.name)
-    check_objects_exist(bucket, objnameA)
-
-    # verify that objnameB is not synced to bucketA in zoneA
-    bucket = get_bucket(zcA, bucketA.name)
-    check_objects_not_exist(bucket, objnameB)
+    check_object_exists(zcB, bucketA.name, objnameA, 'foo')
+    check_object_not_exists(zcA, bucketA.name, objnameB)
 
     log.debug('deleting object on zone A')
-    k = get_key(zcA, bucket, objnameA)
-    k.delete()
-
-    zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
+    zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameA)
+    zone_bucket_checkpoint(zoneB, zoneA, bucketA.name)
 
-    # delete bucket on zoneA. it should fail to delete
     log.debug('deleting bucket')
-    assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name)
+    e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name)
+    assert e.response['Error']['Code'] == 'BucketNotEmpty'
 
-    assert check_all_buckets_exist(zcA, buckets)
-    assert check_all_buckets_exist(zcB, buckets)
+    assert check_all_buckets_exist(zcA, [bucketA.name])
+    assert check_all_buckets_exist(zcB, [bucketA.name])
 
     log.debug('deleting object on zone B')
-    k = get_key(zcB, bucket, objnameB)
-    k.delete()
+    zcB.s3_client.delete_object(Bucket=bucketA.name, Key=objnameB)
     time.sleep(config.checkpoint_delay)
 
-    # retry deleting bucket after removing the object from zone B. should succeed
     log.debug('retry deleting bucket')
-    zcA.delete_bucket(bucketA.name)
+    zcA.s3_client.delete_bucket(Bucket=bucketA.name)
 
     zonegroup_meta_checkpoint(zonegroup)
 
-    assert check_all_buckets_dont_exist(zcA, buckets)
-    assert check_all_buckets_dont_exist(zcB, buckets)
+    assert check_all_buckets_dont_exist(zcA, [bucketA.name])
+    assert check_all_buckets_dont_exist(zcB, [bucketA.name])
     
     remove_sync_policy_group(c1, "sync-group")
-
     return
 
 @attr('sync_policy')
@@ -4796,33 +4788,28 @@ def test_bucket_delete_with_bucket_sync_policy_symmetric():
     objnameB = 'b'
 
     # upload object in each zone and wait for sync.
-    k = new_key(zcA, bucketA, objnameA)
-    k.set_contents_from_string('foo')
-    k = new_key(zcB, bucketA, objnameB)
-    k.set_contents_from_string('foo')
+    zcA.s3_client.put_object(Bucket=bucketA.name, Key=objnameA, Body='foo')
+    zcB.s3_client.put_object(Bucket=bucketA.name, Key=objnameB, Body='foo')
 
     zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
     zone_data_checkpoint(zoneB, zoneA)
 
     log.debug('deleting object A')
-    k = get_key(zcA, bucketA, objnameA)
-    k.delete()
+    zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameA)
 
     log.debug('deleting object B')
-    k = get_key(zcA, bucketA, objnameB)
-    k.delete()
+    zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameB)
 
     zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
     zone_data_checkpoint(zoneB, zoneA)
 
     # delete bucket on zoneA.
     log.debug('deleting bucket')
-    zcA.delete_bucket(bucketA.name)
+    zcA.s3_client.delete_bucket(Bucket=bucketA.name)
     zonegroup_meta_checkpoint(zonegroup)
 
-    assert check_all_buckets_dont_exist(zcA, buckets)
-    assert check_all_buckets_dont_exist(zcB, buckets)
-    
+    assert check_all_buckets_dont_exist(zcA, [bucketA.name])
+    assert check_all_buckets_dont_exist(zcB, [bucketA.name])
     remove_sync_policy_group(c1, "sync-group")
     return
 
@@ -4864,32 +4851,28 @@ def test_bucket_delete_with_zonegroup_sync_policy_symmetric():
     objnameB = 'b'
 
     # upload object in each zone and wait for sync.
-    k = new_key(zcA, bucketA, objnameA)
-    k.set_contents_from_string('foo')
-    k = new_key(zcB, bucketA, objnameB)
-    k.set_contents_from_string('foo')
+    zcA.s3_client.put_object(Bucket=bucketA.name, Key=objnameA, Body='foo')
+    zcB.s3_client.put_object(Bucket=bucketA.name, Key=objnameB, Body='foo')
 
     zone_data_checkpoint(zoneB, zoneA)
     zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
 
     log.debug('deleting object A')
-    k = get_key(zcA, bucketA, objnameA)
-    k.delete()
+    zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameA)
 
     log.debug('deleting object B')
-    k = get_key(zcA, bucketA, objnameB)
-    k.delete()
+    zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameB)
 
     zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
     zone_data_checkpoint(zoneB, zoneA)
 
     # delete bucket on zoneA.
     log.debug('deleting bucket')
-    zcA.delete_bucket(bucketA.name)
+    zcA.s3_client.delete_bucket(Bucket=bucketA.name)
     zonegroup_meta_checkpoint(zonegroup)
 
-    assert check_all_buckets_dont_exist(zcA, buckets)
-    assert check_all_buckets_dont_exist(zcB, buckets)
+    assert check_all_buckets_dont_exist(zcA, [bucketA.name])
+    assert check_all_buckets_dont_exist(zcB, [bucketA.name])
 
     remove_sync_flow(c1, "sync-group", "sync-flow", "symmetrical")
     remove_sync_policy_group(c1, "sync-group")
@@ -4941,49 +4924,40 @@ def test_delete_bucket_with_zone_opt_out():
     objnameB = 'b'
 
     # upload object in zone A and zone C
-    k = new_key(zcA, bucketA, objnameA)
-    k.set_contents_from_string('foo')
-    k = new_key(zcC, bucketA, objnameB)
-    k.set_contents_from_string('foo')
+    zcA.s3_client.put_object(Bucket=bucketA.name, Key=objnameA, Body='foo')
+    zcC.s3_client.put_object(Bucket=bucketA.name, Key=objnameB, Body='foo')
 
     zonegroup_meta_checkpoint(zonegroup)
     zone_data_checkpoint(zoneB, zoneA)
 
     # verify that objnameA is synced to zoneB but not zoneC
-    bucket = get_bucket(zcB, bucketA.name)
-    check_objects_exist(bucket, objnameA)
-
-    bucket = get_bucket(zcC, bucketA.name)
-    check_objects_not_exist(bucket, objnameA)
+    check_object_exists(zcB, bucketA.name, objnameA, 'foo')
+    check_object_not_exists(zcC, bucketA.name, objnameA)
     
     # verify that objnameB is not synced to either zoneA or zoneB
-    bucket = get_bucket(zcA, bucketA.name)
-    check_objects_not_exist(bucket, objnameB)
-
-    bucket = get_bucket(zcB, bucketA.name)
-    check_objects_not_exist(bucket, objnameB)
+    check_object_not_exists(zcA, bucketA.name, objnameB)
+    check_object_not_exists(zcB, bucketA.name, objnameB)
 
     log.debug('deleting object on zone A')
-    k = get_key(zcA, bucket, objnameA)
-    k.delete()
+    zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameA)
 
     zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
 
     # delete bucket on zoneA. it should fail to delete because zoneC still has objnameB
     log.debug('deleting bucket')
-    assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name)
+    e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name)
+    assert e.response['Error']['Code'] == 'BucketNotEmpty'
 
     assert check_all_buckets_exist(zcA, buckets)
     assert check_all_buckets_exist(zcC, buckets)
 
     # retry deleting bucket after removing the object from zone C. should succeed
     log.debug('deleting object on zone C')
-    k = get_key(zcC, bucket, objnameB)
-    k.delete()
+    zcC.s3_client.delete_object(Bucket=bucketA.name, Key=objnameB)
     time.sleep(config.checkpoint_delay)
 
     log.debug('retry deleting bucket')
-    zcA.delete_bucket(bucketA.name)
+    zcA.s3_client.delete_bucket(Bucket=bucketA.name)
 
     zonegroup_meta_checkpoint(zonegroup)
 
@@ -4991,7 +4965,7 @@ def test_delete_bucket_with_zone_opt_out():
     assert check_all_buckets_dont_exist(zcC, buckets)
 
     remove_sync_policy_group(c1, "sync-group")
-    
+
     return
 
 @attr('sync_policy')
@@ -5043,32 +5017,28 @@ def test_bucket_delete_with_sync_policy_object_prefix():
     objnameB = 'b'
 
     # upload object in each zone and wait for sync.
-    k = new_key(zcA, bucketA, objnameA)
-    k.set_contents_from_string('foo')
-    k = new_key(zcB, bucketA, objnameB)
-    k.set_contents_from_string('foo')
+    zcA.s3_client.put_object(Bucket=bucketA.name, Key=objnameA, Body='foo')
+    zcB.s3_client.put_object(Bucket=bucketA.name, Key=objnameB, Body='foo')
 
     zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
     zone_data_checkpoint(zoneB, zoneA)
     
     # verify that objnameA is synced to zoneB
-    bucket = get_bucket(zcB, bucketA.name)
-    check_object_exists(bucket, objnameA)
+    check_object_exists(zcB, bucketA.name, objnameA, 'foo')
 
     # verify that objnameB is not synced to zoneA
-    bucket = get_bucket(zcA, bucketA.name)
-    check_object_not_exists(bucket, objnameB)
+    check_object_not_exists(zcA, bucketA.name, objnameB)
 
     log.debug('deleting object A')
-    k = get_key(zcA, bucketA, objnameA)
-    k.delete()
+    zcA.s3_client.delete_object(Bucket=bucketA.name, Key=objnameA)
 
     zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
     zone_data_checkpoint(zoneB, zoneA)
 
     # delete bucket on zoneA. it should fail to delete because zoneB still has objnameB
     log.debug('deleting bucket')
-    assert_raises(boto.exception.S3ResponseError, zcA.delete_bucket, bucketA.name)
+    e = assert_raises(ClientError, zcA.s3_client.delete_bucket, Bucket=bucketA.name)
+    assert e.response['Error']['Code'] == 'BucketNotEmpty'
 
     assert check_all_buckets_exist(zcA, buckets)
     assert check_all_buckets_exist(zcB, buckets)
@@ -5097,16 +5067,20 @@ def test_copy_obj_between_zonegroups(zonegroup):
         dest_bucket = dest_zone.create_bucket(gen_bucket_name())
         realm_meta_checkpoint(realm)
 
-        # copy object
-        dest_zone.s3_client.copy_object(
-            Bucket=dest_bucket.name,
-            CopySource=f'{source_bucket.name}/{objname}',
-            Key=objname
-        )
+        obj_sizes = [4096, 8 * 1024 * 1024]
+        for size in obj_sizes:
+            source_zone.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='x' * size)
 
-        # check that object exists in destination bucket
-        k = get_key(dest_zone, dest_bucket, objname)
-        assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+            # copy object
+            dest_zone.s3_client.copy_object(
+                Bucket=dest_bucket.name,
+                CopySource=f'{source_bucket.name}/{objname}',
+                Key=objname
+            )
+
+            # verify
+            response = dest_zone.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+            assert_equal(response['Body'].read().decode('utf-8'), 'x' * size)
 
 @allow_bucket_replication
 def test_bucket_replication_alt_user_delete_forbidden():
@@ -5155,13 +5129,12 @@ def test_bucket_replication_alt_user_delete_forbidden():
 
     # upload an object and wait for sync.
     objname = 'dummy'
-    k = new_key(source, source_bucket, objname)
-    k.set_contents_from_string('foo')
+    source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object exists in destination bucket
-    k = get_key(dest, dest_bucket, objname)
-    assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+    response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+    assert_equal(response['Body'].read().decode('utf-8'), 'foo')
 
     # delete object on source
     source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname)
@@ -5169,8 +5142,8 @@ def test_bucket_replication_alt_user_delete_forbidden():
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object does exist in destination bucket
-    k = get_key(dest, dest_bucket, objname)
-    assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+    response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+    assert_equal(response['Body'].read().decode('utf-8'), 'foo')
 
 @allow_bucket_replication
 def test_bucket_replication_alt_user_delete():
@@ -5219,13 +5192,12 @@ def test_bucket_replication_alt_user_delete():
 
     # upload an object and wait for sync.
     objname = 'dummy'
-    k = new_key(source, source_bucket, objname)
-    k.set_contents_from_string('foo')
+    source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object exists in destination bucket
-    k = get_key(dest, dest_bucket, objname)
-    assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+    response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+    assert_equal(response['Body'].read().decode('utf-8'), 'foo')
 
     # delete object on source
     source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname)
@@ -5293,13 +5265,12 @@ def test_bucket_replication_alt_user_deletemarker_forbidden():
 
     # upload an object and wait for sync.
     objname = 'dummy'
-    k = new_key(source, source_bucket, objname)
-    k.set_contents_from_string('foo')
+    source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object exists in destination bucket
-    k = get_key(dest, dest_bucket, objname)
-    assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+    response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+    assert_equal(response['Body'].read().decode('utf-8'), 'foo')
 
     # delete object on source
     source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname)
@@ -5307,8 +5278,8 @@ def test_bucket_replication_alt_user_deletemarker_forbidden():
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object does exist in destination bucket
-    k = get_key(dest, dest_bucket, objname)
-    assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+    response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+    assert_equal(response['Body'].read().decode('utf-8'), 'foo')
 
 @allow_bucket_replication
 def test_bucket_replication_alt_user_deletemarker():
@@ -5367,13 +5338,12 @@ def test_bucket_replication_alt_user_deletemarker():
 
     # upload an object and wait for sync.
     objname = 'dummy'
-    k = new_key(source, source_bucket, objname)
-    k.set_contents_from_string('foo')
+    source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object exists in destination bucket
-    k = get_key(dest, dest_bucket, objname)
-    assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
+    response = dest.s3_client.get_object(Bucket=dest_bucket.name, Key=objname)
+    assert_equal(response['Body'].read().decode('utf-8'), 'foo')
 
     # delete object on source
     source.s3_client.delete_object(Bucket=source_bucket.name, Key=objname)
@@ -5491,8 +5461,7 @@ def test_bucket_replication_source_forbidden():
 
     # upload an object and wait for sync.
     objname = 'dummy'
-    k = new_key(source, source_bucket, objname)
-    k.set_contents_from_string('foo')
+    source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object does not exist in destination bucket
@@ -5556,8 +5525,7 @@ def test_bucket_replication_source_forbidden_versioned():
 
     # upload an object and wait for sync.
     objname = 'dummy'
-    k = new_key(source, source_bucket, objname)
-    k.set_contents_from_string('foo')
+    source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object does not exist in destination bucket
@@ -5616,8 +5584,7 @@ def test_bucket_replication_source_allow_either_getobject_or_getobjectversionfor
 
     # upload an object and wait for sync.
     objname = 'dummy'
-    k = new_key(source, source_bucket, objname)
-    k.set_contents_from_string('foo')
+    source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object exists in destination bucket
@@ -5673,8 +5640,7 @@ def test_bucket_replication_source_allow_either_getobjectversion_or_getobjectver
 
     # upload an object and wait for sync.
     objname = 'dummy'
-    k = new_key(source, source_bucket, objname)
-    k.set_contents_from_string('foo')
+    source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object exists in destination bucket
@@ -5726,8 +5692,7 @@ def test_bucket_replication_source_forbidden_objretention():
     
     # upload an object and wait for sync.
     objname = 'dummy'
-    k = new_key(source, source_bucket_name, objname)
-    k.set_contents_from_string('foo')
+    source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object does not exist in destination bucket
@@ -5784,8 +5749,7 @@ def test_bucket_replication_source_forbidden_legalhold():
     
     # upload an object and wait for sync.
     objname = 'dummy'
-    k = new_key(source, source_bucket_name, objname)
-    k.set_contents_from_string('foo')
+    source.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
     zone_data_checkpoint(dest.zone, source.zone)
 
     # check that object does not exist in destination bucket
@@ -5912,8 +5876,7 @@ def test_copy_obj_perm_check_between_zonegroups(zonegroup):
     source_bucket = source_zone.create_bucket(gen_bucket_name())
 
     objname = 'dummy'
-    k = new_key(source_zone, source_bucket.name, objname)
-    k.set_contents_from_string('foo')
+    source_zone.s3_client.put_object(Bucket=source_bucket.name, Key=objname, Body='foo')
 
     for zg in realm.current_period.zonegroups:
         if zg.name == zonegroup.name:
index 50fa15cf1fa259a1f8768829a894f0d515e7b721..c2830c96a6fc602b41c9eabad56acb1af461d2e3 100644 (file)
@@ -1,4 +1,5 @@
 import logging
+import boto3
 from botocore.exceptions import ClientError
 
 from itertools import zip_longest  # type: ignore
@@ -67,16 +68,6 @@ class RadosZone(Zone):
         def __init__(self, zone, credentials):
             super(RadosZone.Conn, self).__init__(zone, credentials)
 
-            region = "" if zone.zonegroup is None else zone.zonegroup.name
-            self.s3_resource = boto3.resource(
-                's3',
-                endpoint_url='http://' + zone.gateways[0].host + ':' + str(zone.gateways[0].port),
-                aws_access_key_id=credentials.access_key,
-                aws_secret_access_key=credentials.secret,
-                region_name=region,
-                verify=False
-            )
-
         def get_bucket(self, name):
             try:
                 bucket = self.s3_resource.Bucket(name)
@@ -92,7 +83,7 @@ class RadosZone(Zone):
                 bucket = self.s3_resource.create_bucket(Bucket=name)
                 return bucket
             except ClientError as e:
-                if e.response['Error']['Code'] == 'BucketAlreadyOwnedByYou':
+                if e.response['Error']['Code'] == '409':
                     return self.s3_resource.Bucket(name)
                 raise
 
@@ -194,7 +185,7 @@ class RadosZone(Zone):
         def has_role(self, role_name):
             try:
                 self.get_role(role_name)
-            except BotoServerError:
+            except ClientError:
                 return False
             return True