]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
test/rgw: remove depracated boto dependency
authorYuval Lifshitz <ylifshit@ibm.com>
Wed, 18 Feb 2026 09:34:41 +0000 (09:34 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Mon, 23 Feb 2026 17:48:47 +0000 (17:48 +0000)
Fixes: https://tracker.ceph.com/issues/73663
Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
Co-authored-by: Bob-Shell <bob-shell@ai-assistant>
src/test/rgw/bucket_notification/api.py
src/test/rgw/bucket_notification/requirements.txt
src/test/rgw/bucket_notification/test_bn.py

index 795538d223a97178ef19854480b93dbfd57411d2..ce9495d8b091f17e09492ce242c96c4f992149c5 100644 (file)
@@ -16,6 +16,239 @@ import json
 
 log = logging.getLogger('bucket_notification.tests')
 
+
+# Boto3 compatibility wrapper classes to minimize code changes
+class S3Key:
+    """Wrapper class to provide boto-like interface for S3 objects using boto3"""
+    def __init__(self, bucket, key_name, s3_client):
+        self.bucket = bucket
+        self.name = key_name
+        self.key = key_name
+        self._s3_client = s3_client
+        self._metadata = {}
+        self.etag = None
+        self.version_id = None
+
+    def set_contents_from_string(self, content):
+        """Upload object content"""
+        kwargs = {
+            'Bucket': self.bucket.name,
+            'Key': self.name,
+            'Body': content
+        }
+        if self._metadata:
+            kwargs['Metadata'] = self._metadata
+
+        response = self._s3_client.put_object(**kwargs)
+        self.etag = response['ETag']
+        if 'VersionId' in response:
+            self.version_id = response['VersionId']
+        return response
+
+    def set_metadata(self, key, value):
+        """Set metadata for the object"""
+        self._metadata[key] = value
+
+    def delete(self):
+        """Delete the object"""
+        kwargs = {'Bucket': self.bucket.name, 'Key': self.name}
+        if self.version_id:
+            kwargs['VersionId'] = self.version_id
+        return self._s3_client.delete_object(**kwargs)
+
+
+class S3Bucket:
+    """Wrapper class to provide boto-like interface for S3 buckets using boto3"""
+    def __init__(self, connection, bucket_name):
+        self.connection = connection
+        self.name = bucket_name
+        self._s3_client = connection._s3_client
+        self._s3_resource = connection._s3_resource
+        self._bucket = self._s3_resource.Bucket(bucket_name)
+
+    def new_key(self, key_name):
+        """Create a new key object"""
+        return S3Key(self, key_name, self._s3_client)
+
+    def list(self, prefix=''):
+        """List objects in the bucket"""
+        try:
+            if prefix:
+                objects = self._bucket.objects.filter(Prefix=prefix)
+            else:
+                objects = self._bucket.objects.all()
+
+            # Convert to S3Key objects for compatibility
+            keys = []
+            for obj in objects:
+                key = S3Key(self, obj.key, self._s3_client)
+                key.etag = obj.e_tag
+                if hasattr(obj, 'version_id'):
+                    key.version_id = obj.version_id
+                keys.append(key)
+            return keys
+        except Exception:
+            # Return empty list if bucket is empty or error occurs
+            return []
+
+    def delete_key(self, key_name, version_id=None):
+        """Delete a specific key"""
+        kwargs = {'Bucket': self.name, 'Key': key_name}
+        if version_id:
+            kwargs['VersionId'] = version_id
+        response = self._s3_client.delete_object(**kwargs)
+        # Return a key-like object with version_id for compatibility
+        deleted_key = S3Key(self, key_name, self._s3_client)
+        if 'VersionId' in response:
+            deleted_key.version_id = response['VersionId']
+        if 'DeleteMarker' in response:
+            deleted_key.delete_marker = response['DeleteMarker']
+        return deleted_key
+
+    def copy_key(self, new_key_name, src_bucket_name, src_key_name, src_version_id=None, metadata=None):
+        """Copy a key within or between buckets"""
+        copy_source = {'Bucket': src_bucket_name, 'Key': src_key_name}
+        if src_version_id:
+            copy_source['VersionId'] = src_version_id
+
+        kwargs = {
+            'CopySource': copy_source,
+            'Bucket': self.name,
+            'Key': new_key_name
+        }
+
+        if metadata is not None:
+            kwargs['Metadata'] = metadata
+            kwargs['MetadataDirective'] = 'REPLACE'
+
+        response = self._s3_client.copy_object(**kwargs)
+
+        # Return a key object for compatibility
+        new_key = S3Key(self, new_key_name, self._s3_client)
+        if 'CopyObjectResult' in response and 'ETag' in response['CopyObjectResult']:
+            new_key.etag = response['CopyObjectResult']['ETag']
+        if 'VersionId' in response:
+            new_key.version_id = response['VersionId']
+        return new_key
+
+    def configure_versioning(self, enabled):
+        """Enable or disable versioning on the bucket"""
+        status = 'Enabled' if enabled else 'Suspended'
+        self._s3_client.put_bucket_versioning(
+            Bucket=self.name,
+            VersioningConfiguration={'Status': status}
+        )
+
+    def initiate_multipart_upload(self, key_name, metadata=None):
+        """Initiate a multipart upload"""
+        kwargs = {'Bucket': self.name, 'Key': key_name}
+        if metadata:
+            kwargs['Metadata'] = metadata
+
+        response = self._s3_client.create_multipart_upload(**kwargs)
+        return MultipartUpload(self, key_name, response['UploadId'], self._s3_client)
+
+
+class MultipartUpload:
+    """Wrapper for multipart upload operations"""
+    def __init__(self, bucket, key_name, upload_id, s3_client):
+        self.bucket = bucket
+        self.key_name = key_name
+        self.upload_id = upload_id
+        self._s3_client = s3_client
+        self.parts = []
+
+    def upload_part_from_file(self, fp, part_number, size=None):
+        """Upload a part from a file object"""
+        if size:
+            data = fp.read(size)
+        else:
+            data = fp.read()
+
+        response = self._s3_client.upload_part(
+            Bucket=self.bucket.name,
+            Key=self.key_name,
+            PartNumber=part_number,
+            UploadId=self.upload_id,
+            Body=data
+        )
+
+        self.parts.append({
+            'PartNumber': part_number,
+            'ETag': response['ETag']
+        })
+
+    def complete_upload(self):
+        """Complete the multipart upload"""
+        # Sort parts by part number
+        self.parts.sort(key=lambda x: x['PartNumber'])
+
+        response = self._s3_client.complete_multipart_upload(
+            Bucket=self.bucket.name,
+            Key=self.key_name,
+            UploadId=self.upload_id,
+            MultipartUpload={'Parts': self.parts}
+        )
+        return response
+
+
+class S3Connection:
+    """Wrapper class to provide boto-like S3Connection interface using boto3"""
+    def __init__(self, aws_access_key_id, aws_secret_access_key,
+                 is_secure=True, port=None, host=None, calling_format=None):
+        self.aws_access_key_id = aws_access_key_id
+        self.aws_secret_access_key = aws_secret_access_key
+        self.access_key = aws_access_key_id  # Boto compatibility
+        self.secret_key = aws_secret_access_key  # Boto compatibility
+        self.is_secure = is_secure
+        self.port = port
+        self.host = host
+        self.num_retries = 5  # Default for compatibility
+
+        # Build endpoint URL
+        protocol = 'https' if is_secure else 'http'
+        self.endpoint_url = f'{protocol}://{host}:{port}'
+
+        # Create boto3 client and resource
+        self._s3_client = boto3.client(
+            's3',
+            endpoint_url=self.endpoint_url,
+            aws_access_key_id=aws_access_key_id,
+            aws_secret_access_key=aws_secret_access_key,
+            config=boto3.session.Config(retries={'max_attempts': self.num_retries})
+        )
+
+        self._s3_resource = boto3.resource(
+            's3',
+            endpoint_url=self.endpoint_url,
+            aws_access_key_id=aws_access_key_id,
+            aws_secret_access_key=aws_secret_access_key
+        )
+
+        # For SSL connections
+        self.secure_conn = self if is_secure else None
+
+    def create_bucket(self, bucket_name, **kwargs):
+        """Create a bucket"""
+        acl = kwargs.get('ACL', 'private')
+        try:
+            self._s3_client.create_bucket(Bucket=bucket_name, ACL=acl)
+        except ClientError as e:
+            # Bucket might already exist
+            if e.response['Error']['Code'] != 'BucketAlreadyOwnedByYou':
+                raise
+        return S3Bucket(self, bucket_name)
+
+    def get_bucket(self, bucket_name):
+        """Get a bucket object"""
+        return S3Bucket(self, bucket_name)
+
+    def delete_bucket(self, bucket_name):
+        """Delete a bucket"""
+        return self._s3_client.delete_bucket(Bucket=bucket_name)
+
+
+
 NO_HTTP_BODY = ''
 
 def put_object_tagging(conn, bucket_name, key, tags):
@@ -245,7 +478,6 @@ def admin(args, cluster='noname', **kwargs):
 def ceph_admin(args, cluster='noname', **kwargs):
     """ ceph command """
     cmd = [test_path + 'test-rgw-call.sh', 'call_ceph', cluster] + args
-    print(' '.join(cmd))
     return bash(cmd, **kwargs)
 
 def delete_all_topics(conn, tenant, cluster):
@@ -272,4 +504,4 @@ def delete_all_topics(conn, tenant, cluster):
 def set_rgw_config_option(client, option, value, cluster='noname'):
     """ change a config option """
     print(f'Setting {option} to {value} for {client} in cluster {cluster}')
-    return ceph_admin(['config', 'set', client, option, str(value)], cluster)
\ No newline at end of file
+    return ceph_admin(['config', 'set', client, option, str(value)], cluster)
index 321d6e43f22ff72b52a6aaeeba50129cb2c79af5..d57533fbd87286c522b0e1a8a72f02932c5d017f 100644 (file)
@@ -1,5 +1,4 @@
-pynose
-boto >=2.6.0
+nose-py3 >=1.0.0
 boto3 >=1.0.0
 configparser >=5.0.0
 kafka-python >=2.0.0
index 2c82a3d5f9c720cb347cf0ef00b49918e7edcb53..9168841e2883e1c2b2b1e0a9091740290158bd1c 100644 (file)
@@ -10,8 +10,7 @@ import time
 import os
 import io
 import string
-# XXX this should be converted to use boto3
-import boto
+import sys
 from botocore.exceptions import ClientError
 from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
 from random import randint
@@ -19,7 +18,6 @@ import hashlib
 # XXX this should be converted to use pytest
 from nose.plugins.attrib import attr
 import boto3
-from boto.s3.connection import S3Connection
 import datetime
 from cloudevents.http import from_http
 from dateutil import parser
@@ -41,11 +39,14 @@ from .api import PSTopicS3, \
     put_object_tagging, \
     admin, \
     set_rgw_config_option, \
-    bash
+    bash, \
+    S3Connection, \
+    S3Bucket, \
+    S3Key, \
+    MultipartUpload
 
 from nose import SkipTest
 from nose.tools import assert_not_equal, assert_equal, assert_in, assert_not_in, assert_true
-import boto.s3.tagging
 
 # configure logging for the tests module
 class LogWrapper:
@@ -568,8 +569,7 @@ def connection(no_retries=False):
     vstart_secret_key = get_secret_key()
     conn = S3Connection(aws_access_key_id=vstart_access_key,
                         aws_secret_access_key=vstart_secret_key,
-                        is_secure=False, port=port_no, host=hostname,
-                        calling_format='boto.s3.connection.OrdinaryCallingFormat')
+                        is_secure=False, port=port_no, host=hostname)
     if no_retries:
         conn.num_retries = 0
     return conn
@@ -582,9 +582,8 @@ def connection2():
     vstart_secret_key = get_secret_key()
 
     conn = S3Connection(aws_access_key_id=vstart_access_key,
-                  aws_secret_access_key=vstart_secret_key,
-                      is_secure=False, port=port_no, host=hostname,
-                      calling_format='boto.s3.connection.OrdinaryCallingFormat')
+                        aws_secret_access_key=vstart_secret_key,
+                        is_secure=False, port=port_no, host=hostname)
 
     return conn
 
@@ -606,9 +605,8 @@ def another_user(user=None, tenant=None, account=None):
     assert_equal(rc, 0)
 
     conn = S3Connection(aws_access_key_id=access_key,
-                  aws_secret_access_key=secret_key,
-                      is_secure=False, port=get_config_port(), host=get_config_host(),
-                      calling_format='boto.s3.connection.OrdinaryCallingFormat')
+                        aws_secret_access_key=secret_key,
+                        is_secure=False, port=get_config_port(), host=get_config_host())
     return conn, arn
 
 
@@ -732,8 +730,7 @@ def connect_random_user(tenant=''):
     assert_equal(rc, 0)
     conn = S3Connection(aws_access_key_id=access_key,
                         aws_secret_access_key=secret_key,
-                        is_secure=False, port=get_config_port(), host=get_config_host(),
-                        calling_format='boto.s3.connection.OrdinaryCallingFormat')
+                        is_secure=False, port=get_config_port(), host=get_config_host())
     return conn
 
 ##############
@@ -2896,7 +2893,7 @@ def test_versioning_amqp():
     s3_notification_conf.del_config()
     topic_conf.del_config()
     # delete the bucket
-    bucket.delete_key(copy_of_key, version_id=ver3)
+    bucket.delete_key(copy_of_key.name, version_id=ver3)
     bucket.delete_key(key.name, version_id=ver2)
     bucket.delete_key(key.name, version_id=ver1)
     #conn.delete_bucket(bucket_name)
@@ -5393,6 +5390,12 @@ def test_topic_migration_to_an_account():
     """test the topic migration procedure described at
     https://docs.ceph.com/en/latest/radosgw/account/#migrate-an-existing-user-into-an-account
     """
+    # Initialize variables for cleanup in finally block
+    user1_bucket_name = None
+    user2_bucket_name = None
+    user1_id = None
+    account_id = None
+
     try:
         # create an http server for notification delivery
         host = get_ip()
@@ -5592,16 +5595,20 @@ def test_topic_migration_to_an_account():
         )
         log.info("topic migration test has completed successfully")
     finally:
-        admin(["user", "rm", "--uid", user1_id, "--purge-data"], get_config_cluster())
-        admin(
-            ["bucket", "rm", "--bucket", user1_bucket_name, "--purge-data"],
-            get_config_cluster(),
-        )
-        admin(
-            ["bucket", "rm", "--bucket", user2_bucket_name, "--purge-data"],
-            get_config_cluster(),
-        )
-        admin(["account", "rm", "--account-id", account_id], get_config_cluster())
+        if user1_id is not None:
+            admin(["user", "rm", "--uid", user1_id, "--purge-data"], get_config_cluster())
+        if user1_bucket_name is not None:
+            admin(
+                ["bucket", "rm", "--bucket", user1_bucket_name, "--purge-data"],
+                get_config_cluster(),
+            )
+        if user2_bucket_name is not None:
+            admin(
+                ["bucket", "rm", "--bucket", user2_bucket_name, "--purge-data"],
+                get_config_cluster(),
+            )
+        if account_id is not None:
+            admin(["account", "rm", "--account-id", account_id], get_config_cluster())
 
 def persistent_notification_shard_config_change(endpoint_type, conn, new_num_shards, old_num_shards=11): 
     """ test persistent notification shard config change """