From: Yuval Lifshitz Date: Wed, 18 Feb 2026 09:34:41 +0000 (+0000) Subject: test/rgw: remove depracated boto dependency X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fheads%2Fwip-yuval-bucket-notifications;p=ceph-ci.git test/rgw: remove depracated boto dependency Fixes: https://tracker.ceph.com/issues/73663 Signed-off-by: Yuval Lifshitz Co-authored-by: Bob-Shell --- diff --git a/src/test/rgw/bucket_notification/api.py b/src/test/rgw/bucket_notification/api.py index 795538d223a..ce9495d8b09 100644 --- a/src/test/rgw/bucket_notification/api.py +++ b/src/test/rgw/bucket_notification/api.py @@ -16,6 +16,239 @@ import json log = logging.getLogger('bucket_notification.tests') + +# Boto3 compatibility wrapper classes to minimize code changes +class S3Key: + """Wrapper class to provide boto-like interface for S3 objects using boto3""" + def __init__(self, bucket, key_name, s3_client): + self.bucket = bucket + self.name = key_name + self.key = key_name + self._s3_client = s3_client + self._metadata = {} + self.etag = None + self.version_id = None + + def set_contents_from_string(self, content): + """Upload object content""" + kwargs = { + 'Bucket': self.bucket.name, + 'Key': self.name, + 'Body': content + } + if self._metadata: + kwargs['Metadata'] = self._metadata + + response = self._s3_client.put_object(**kwargs) + self.etag = response['ETag'] + if 'VersionId' in response: + self.version_id = response['VersionId'] + return response + + def set_metadata(self, key, value): + """Set metadata for the object""" + self._metadata[key] = value + + def delete(self): + """Delete the object""" + kwargs = {'Bucket': self.bucket.name, 'Key': self.name} + if self.version_id: + kwargs['VersionId'] = self.version_id + return self._s3_client.delete_object(**kwargs) + + +class S3Bucket: + """Wrapper class to provide boto-like interface for S3 buckets using boto3""" + def __init__(self, connection, bucket_name): + self.connection = connection + self.name = bucket_name + self._s3_client = connection._s3_client + self._s3_resource = connection._s3_resource + self._bucket = self._s3_resource.Bucket(bucket_name) + + def new_key(self, key_name): + """Create a new key object""" + return S3Key(self, key_name, self._s3_client) + + def list(self, prefix=''): + """List objects in the bucket""" + try: + if prefix: + objects = self._bucket.objects.filter(Prefix=prefix) + else: + objects = self._bucket.objects.all() + + # Convert to S3Key objects for compatibility + keys = [] + for obj in objects: + key = S3Key(self, obj.key, self._s3_client) + key.etag = obj.e_tag + if hasattr(obj, 'version_id'): + key.version_id = obj.version_id + keys.append(key) + return keys + except Exception: + # Return empty list if bucket is empty or error occurs + return [] + + def delete_key(self, key_name, version_id=None): + """Delete a specific key""" + kwargs = {'Bucket': self.name, 'Key': key_name} + if version_id: + kwargs['VersionId'] = version_id + response = self._s3_client.delete_object(**kwargs) + # Return a key-like object with version_id for compatibility + deleted_key = S3Key(self, key_name, self._s3_client) + if 'VersionId' in response: + deleted_key.version_id = response['VersionId'] + if 'DeleteMarker' in response: + deleted_key.delete_marker = response['DeleteMarker'] + return deleted_key + + def copy_key(self, new_key_name, src_bucket_name, src_key_name, src_version_id=None, metadata=None): + """Copy a key within or between buckets""" + copy_source = {'Bucket': src_bucket_name, 'Key': src_key_name} + if src_version_id: + copy_source['VersionId'] = src_version_id + + kwargs = { + 'CopySource': copy_source, + 'Bucket': self.name, + 'Key': new_key_name + } + + if metadata is not None: + kwargs['Metadata'] = metadata + kwargs['MetadataDirective'] = 'REPLACE' + + response = self._s3_client.copy_object(**kwargs) + + # Return a key object for compatibility + new_key = S3Key(self, new_key_name, self._s3_client) + if 'CopyObjectResult' in response and 'ETag' in response['CopyObjectResult']: + new_key.etag = response['CopyObjectResult']['ETag'] + if 'VersionId' in response: + new_key.version_id = response['VersionId'] + return new_key + + def configure_versioning(self, enabled): + """Enable or disable versioning on the bucket""" + status = 'Enabled' if enabled else 'Suspended' + self._s3_client.put_bucket_versioning( + Bucket=self.name, + VersioningConfiguration={'Status': status} + ) + + def initiate_multipart_upload(self, key_name, metadata=None): + """Initiate a multipart upload""" + kwargs = {'Bucket': self.name, 'Key': key_name} + if metadata: + kwargs['Metadata'] = metadata + + response = self._s3_client.create_multipart_upload(**kwargs) + return MultipartUpload(self, key_name, response['UploadId'], self._s3_client) + + +class MultipartUpload: + """Wrapper for multipart upload operations""" + def __init__(self, bucket, key_name, upload_id, s3_client): + self.bucket = bucket + self.key_name = key_name + self.upload_id = upload_id + self._s3_client = s3_client + self.parts = [] + + def upload_part_from_file(self, fp, part_number, size=None): + """Upload a part from a file object""" + if size: + data = fp.read(size) + else: + data = fp.read() + + response = self._s3_client.upload_part( + Bucket=self.bucket.name, + Key=self.key_name, + PartNumber=part_number, + UploadId=self.upload_id, + Body=data + ) + + self.parts.append({ + 'PartNumber': part_number, + 'ETag': response['ETag'] + }) + + def complete_upload(self): + """Complete the multipart upload""" + # Sort parts by part number + self.parts.sort(key=lambda x: x['PartNumber']) + + response = self._s3_client.complete_multipart_upload( + Bucket=self.bucket.name, + Key=self.key_name, + UploadId=self.upload_id, + MultipartUpload={'Parts': self.parts} + ) + return response + + +class S3Connection: + """Wrapper class to provide boto-like S3Connection interface using boto3""" + def __init__(self, aws_access_key_id, aws_secret_access_key, + is_secure=True, port=None, host=None, calling_format=None): + self.aws_access_key_id = aws_access_key_id + self.aws_secret_access_key = aws_secret_access_key + self.access_key = aws_access_key_id # Boto compatibility + self.secret_key = aws_secret_access_key # Boto compatibility + self.is_secure = is_secure + self.port = port + self.host = host + self.num_retries = 5 # Default for compatibility + + # Build endpoint URL + protocol = 'https' if is_secure else 'http' + self.endpoint_url = f'{protocol}://{host}:{port}' + + # Create boto3 client and resource + self._s3_client = boto3.client( + 's3', + endpoint_url=self.endpoint_url, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + config=boto3.session.Config(retries={'max_attempts': self.num_retries}) + ) + + self._s3_resource = boto3.resource( + 's3', + endpoint_url=self.endpoint_url, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key + ) + + # For SSL connections + self.secure_conn = self if is_secure else None + + def create_bucket(self, bucket_name, **kwargs): + """Create a bucket""" + acl = kwargs.get('ACL', 'private') + try: + self._s3_client.create_bucket(Bucket=bucket_name, ACL=acl) + except ClientError as e: + # Bucket might already exist + if e.response['Error']['Code'] != 'BucketAlreadyOwnedByYou': + raise + return S3Bucket(self, bucket_name) + + def get_bucket(self, bucket_name): + """Get a bucket object""" + return S3Bucket(self, bucket_name) + + def delete_bucket(self, bucket_name): + """Delete a bucket""" + return self._s3_client.delete_bucket(Bucket=bucket_name) + + + NO_HTTP_BODY = '' def put_object_tagging(conn, bucket_name, key, tags): @@ -245,7 +478,6 @@ def admin(args, cluster='noname', **kwargs): def ceph_admin(args, cluster='noname', **kwargs): """ ceph command """ cmd = [test_path + 'test-rgw-call.sh', 'call_ceph', cluster] + args - print(' '.join(cmd)) return bash(cmd, **kwargs) def delete_all_topics(conn, tenant, cluster): @@ -272,4 +504,4 @@ def delete_all_topics(conn, tenant, cluster): def set_rgw_config_option(client, option, value, cluster='noname'): """ change a config option """ print(f'Setting {option} to {value} for {client} in cluster {cluster}') - return ceph_admin(['config', 'set', client, option, str(value)], cluster) \ No newline at end of file + return ceph_admin(['config', 'set', client, option, str(value)], cluster) diff --git a/src/test/rgw/bucket_notification/requirements.txt b/src/test/rgw/bucket_notification/requirements.txt index bb74eceedc3..d57533fbd87 100644 --- a/src/test/rgw/bucket_notification/requirements.txt +++ b/src/test/rgw/bucket_notification/requirements.txt @@ -1,5 +1,4 @@ nose-py3 >=1.0.0 -boto >=2.6.0 boto3 >=1.0.0 configparser >=5.0.0 kafka-python >=2.0.0 diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 2c82a3d5f9c..44473d2ca94 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -10,8 +10,7 @@ import time import os import io import string -# XXX this should be converted to use boto3 -import boto +import sys from botocore.exceptions import ClientError from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler from random import randint @@ -19,7 +18,6 @@ import hashlib # XXX this should be converted to use pytest from nose.plugins.attrib import attr import boto3 -from boto.s3.connection import S3Connection import datetime from cloudevents.http import from_http from dateutil import parser @@ -41,11 +39,14 @@ from .api import PSTopicS3, \ put_object_tagging, \ admin, \ set_rgw_config_option, \ - bash + bash, \ + S3Connection, \ + S3Bucket, \ + S3Key, \ + MultipartUpload from nose import SkipTest from nose.tools import assert_not_equal, assert_equal, assert_in, assert_not_in, assert_true -import boto.s3.tagging # configure logging for the tests module class LogWrapper: @@ -568,8 +569,7 @@ def connection(no_retries=False): vstart_secret_key = get_secret_key() conn = S3Connection(aws_access_key_id=vstart_access_key, aws_secret_access_key=vstart_secret_key, - is_secure=False, port=port_no, host=hostname, - calling_format='boto.s3.connection.OrdinaryCallingFormat') + is_secure=False, port=port_no, host=hostname) if no_retries: conn.num_retries = 0 return conn @@ -582,9 +582,8 @@ def connection2(): vstart_secret_key = get_secret_key() conn = S3Connection(aws_access_key_id=vstart_access_key, - aws_secret_access_key=vstart_secret_key, - is_secure=False, port=port_no, host=hostname, - calling_format='boto.s3.connection.OrdinaryCallingFormat') + aws_secret_access_key=vstart_secret_key, + is_secure=False, port=port_no, host=hostname) return conn @@ -606,9 +605,8 @@ def another_user(user=None, tenant=None, account=None): assert_equal(rc, 0) conn = S3Connection(aws_access_key_id=access_key, - aws_secret_access_key=secret_key, - is_secure=False, port=get_config_port(), host=get_config_host(), - calling_format='boto.s3.connection.OrdinaryCallingFormat') + aws_secret_access_key=secret_key, + is_secure=False, port=get_config_port(), host=get_config_host()) return conn, arn @@ -732,8 +730,7 @@ def connect_random_user(tenant=''): assert_equal(rc, 0) conn = S3Connection(aws_access_key_id=access_key, aws_secret_access_key=secret_key, - is_secure=False, port=get_config_port(), host=get_config_host(), - calling_format='boto.s3.connection.OrdinaryCallingFormat') + is_secure=False, port=get_config_port(), host=get_config_host()) return conn ############## @@ -5393,6 +5390,12 @@ def test_topic_migration_to_an_account(): """test the topic migration procedure described at https://docs.ceph.com/en/latest/radosgw/account/#migrate-an-existing-user-into-an-account """ + # Initialize variables for cleanup in finally block + user1_bucket_name = None + user2_bucket_name = None + user1_id = None + account_id = None + try: # create an http server for notification delivery host = get_ip() @@ -5592,16 +5595,20 @@ def test_topic_migration_to_an_account(): ) log.info("topic migration test has completed successfully") finally: - admin(["user", "rm", "--uid", user1_id, "--purge-data"], get_config_cluster()) - admin( - ["bucket", "rm", "--bucket", user1_bucket_name, "--purge-data"], - get_config_cluster(), - ) - admin( - ["bucket", "rm", "--bucket", user2_bucket_name, "--purge-data"], - get_config_cluster(), - ) - admin(["account", "rm", "--account-id", account_id], get_config_cluster()) + if user1_id is not None: + admin(["user", "rm", "--uid", user1_id, "--purge-data"], get_config_cluster()) + if user1_bucket_name is not None: + admin( + ["bucket", "rm", "--bucket", user1_bucket_name, "--purge-data"], + get_config_cluster(), + ) + if user2_bucket_name is not None: + admin( + ["bucket", "rm", "--bucket", user2_bucket_name, "--purge-data"], + get_config_cluster(), + ) + if account_id is not None: + admin(["account", "rm", "--account-id", account_id], get_config_cluster()) def persistent_notification_shard_config_change(endpoint_type, conn, new_num_shards, old_num_shards=11): """ test persistent notification shard config change """