log = logging.getLogger('bucket_notification.tests')
+
+# Boto3 compatibility wrapper classes to minimize code changes
+class S3Key:
+ """Wrapper class to provide boto-like interface for S3 objects using boto3"""
+ def __init__(self, bucket, key_name, s3_client):
+ self.bucket = bucket
+ self.name = key_name
+ self.key = key_name
+ self._s3_client = s3_client
+ self._metadata = {}
+ self.etag = None
+ self.version_id = None
+
+ def set_contents_from_string(self, content):
+ """Upload object content"""
+ kwargs = {
+ 'Bucket': self.bucket.name,
+ 'Key': self.name,
+ 'Body': content
+ }
+ if self._metadata:
+ kwargs['Metadata'] = self._metadata
+
+ response = self._s3_client.put_object(**kwargs)
+ self.etag = response['ETag']
+ if 'VersionId' in response:
+ self.version_id = response['VersionId']
+ return response
+
+ def set_metadata(self, key, value):
+ """Set metadata for the object"""
+ self._metadata[key] = value
+
+ def delete(self):
+ """Delete the object"""
+ kwargs = {'Bucket': self.bucket.name, 'Key': self.name}
+ if self.version_id:
+ kwargs['VersionId'] = self.version_id
+ return self._s3_client.delete_object(**kwargs)
+
+
+class S3Bucket:
+ """Wrapper class to provide boto-like interface for S3 buckets using boto3"""
+ def __init__(self, connection, bucket_name):
+ self.connection = connection
+ self.name = bucket_name
+ self._s3_client = connection._s3_client
+ self._s3_resource = connection._s3_resource
+ self._bucket = self._s3_resource.Bucket(bucket_name)
+
+ def new_key(self, key_name):
+ """Create a new key object"""
+ return S3Key(self, key_name, self._s3_client)
+
+ def list(self, prefix=''):
+ """List objects in the bucket"""
+ try:
+ if prefix:
+ objects = self._bucket.objects.filter(Prefix=prefix)
+ else:
+ objects = self._bucket.objects.all()
+
+ # Convert to S3Key objects for compatibility
+ keys = []
+ for obj in objects:
+ key = S3Key(self, obj.key, self._s3_client)
+ key.etag = obj.e_tag
+ if hasattr(obj, 'version_id'):
+ key.version_id = obj.version_id
+ keys.append(key)
+ return keys
+ except Exception:
+ # Return empty list if bucket is empty or error occurs
+ return []
+
+ def delete_key(self, key_name, version_id=None):
+ """Delete a specific key"""
+ kwargs = {'Bucket': self.name, 'Key': key_name}
+ if version_id:
+ kwargs['VersionId'] = version_id
+ response = self._s3_client.delete_object(**kwargs)
+ # Return a key-like object with version_id for compatibility
+ deleted_key = S3Key(self, key_name, self._s3_client)
+ if 'VersionId' in response:
+ deleted_key.version_id = response['VersionId']
+ if 'DeleteMarker' in response:
+ deleted_key.delete_marker = response['DeleteMarker']
+ return deleted_key
+
+ def copy_key(self, new_key_name, src_bucket_name, src_key_name, src_version_id=None, metadata=None):
+ """Copy a key within or between buckets"""
+ copy_source = {'Bucket': src_bucket_name, 'Key': src_key_name}
+ if src_version_id:
+ copy_source['VersionId'] = src_version_id
+
+ kwargs = {
+ 'CopySource': copy_source,
+ 'Bucket': self.name,
+ 'Key': new_key_name
+ }
+
+ if metadata is not None:
+ kwargs['Metadata'] = metadata
+ kwargs['MetadataDirective'] = 'REPLACE'
+
+ response = self._s3_client.copy_object(**kwargs)
+
+ # Return a key object for compatibility
+ new_key = S3Key(self, new_key_name, self._s3_client)
+ if 'CopyObjectResult' in response and 'ETag' in response['CopyObjectResult']:
+ new_key.etag = response['CopyObjectResult']['ETag']
+ if 'VersionId' in response:
+ new_key.version_id = response['VersionId']
+ return new_key
+
+ def configure_versioning(self, enabled):
+ """Enable or disable versioning on the bucket"""
+ status = 'Enabled' if enabled else 'Suspended'
+ self._s3_client.put_bucket_versioning(
+ Bucket=self.name,
+ VersioningConfiguration={'Status': status}
+ )
+
+ def initiate_multipart_upload(self, key_name, metadata=None):
+ """Initiate a multipart upload"""
+ kwargs = {'Bucket': self.name, 'Key': key_name}
+ if metadata:
+ kwargs['Metadata'] = metadata
+
+ response = self._s3_client.create_multipart_upload(**kwargs)
+ return MultipartUpload(self, key_name, response['UploadId'], self._s3_client)
+
+
+class MultipartUpload:
+ """Wrapper for multipart upload operations"""
+ def __init__(self, bucket, key_name, upload_id, s3_client):
+ self.bucket = bucket
+ self.key_name = key_name
+ self.upload_id = upload_id
+ self._s3_client = s3_client
+ self.parts = []
+
+ def upload_part_from_file(self, fp, part_number, size=None):
+ """Upload a part from a file object"""
+ if size:
+ data = fp.read(size)
+ else:
+ data = fp.read()
+
+ response = self._s3_client.upload_part(
+ Bucket=self.bucket.name,
+ Key=self.key_name,
+ PartNumber=part_number,
+ UploadId=self.upload_id,
+ Body=data
+ )
+
+ self.parts.append({
+ 'PartNumber': part_number,
+ 'ETag': response['ETag']
+ })
+
+ def complete_upload(self):
+ """Complete the multipart upload"""
+ # Sort parts by part number
+ self.parts.sort(key=lambda x: x['PartNumber'])
+
+ response = self._s3_client.complete_multipart_upload(
+ Bucket=self.bucket.name,
+ Key=self.key_name,
+ UploadId=self.upload_id,
+ MultipartUpload={'Parts': self.parts}
+ )
+ return response
+
+
+class S3Connection:
+ """Wrapper class to provide boto-like S3Connection interface using boto3"""
+ def __init__(self, aws_access_key_id, aws_secret_access_key,
+ is_secure=True, port=None, host=None, calling_format=None):
+ self.aws_access_key_id = aws_access_key_id
+ self.aws_secret_access_key = aws_secret_access_key
+ self.access_key = aws_access_key_id # Boto compatibility
+ self.secret_key = aws_secret_access_key # Boto compatibility
+ self.is_secure = is_secure
+ self.port = port
+ self.host = host
+ self.num_retries = 5 # Default for compatibility
+
+ # Build endpoint URL
+ protocol = 'https' if is_secure else 'http'
+ self.endpoint_url = f'{protocol}://{host}:{port}'
+
+ # Create boto3 client and resource
+ self._s3_client = boto3.client(
+ 's3',
+ endpoint_url=self.endpoint_url,
+ aws_access_key_id=aws_access_key_id,
+ aws_secret_access_key=aws_secret_access_key,
+ config=boto3.session.Config(retries={'max_attempts': self.num_retries})
+ )
+
+ self._s3_resource = boto3.resource(
+ 's3',
+ endpoint_url=self.endpoint_url,
+ aws_access_key_id=aws_access_key_id,
+ aws_secret_access_key=aws_secret_access_key
+ )
+
+ # For SSL connections
+ self.secure_conn = self if is_secure else None
+
+ def create_bucket(self, bucket_name, **kwargs):
+ """Create a bucket"""
+ acl = kwargs.get('ACL', 'private')
+ try:
+ self._s3_client.create_bucket(Bucket=bucket_name, ACL=acl)
+ except ClientError as e:
+ # Bucket might already exist
+ if e.response['Error']['Code'] != 'BucketAlreadyOwnedByYou':
+ raise
+ return S3Bucket(self, bucket_name)
+
+ def get_bucket(self, bucket_name):
+ """Get a bucket object"""
+ return S3Bucket(self, bucket_name)
+
+ def delete_bucket(self, bucket_name):
+ """Delete a bucket"""
+ return self._s3_client.delete_bucket(Bucket=bucket_name)
+
+
+
NO_HTTP_BODY = ''
def put_object_tagging(conn, bucket_name, key, tags):
def ceph_admin(args, cluster='noname', **kwargs):
""" ceph command """
cmd = [test_path + 'test-rgw-call.sh', 'call_ceph', cluster] + args
- print(' '.join(cmd))
return bash(cmd, **kwargs)
def delete_all_topics(conn, tenant, cluster):
def set_rgw_config_option(client, option, value, cluster='noname'):
""" change a config option """
print(f'Setting {option} to {value} for {client} in cluster {cluster}')
- return ceph_admin(['config', 'set', client, option, str(value)], cluster)
\ No newline at end of file
+ return ceph_admin(['config', 'set', client, option, str(value)], cluster)
import os
import io
import string
-# XXX this should be converted to use boto3
-import boto
+import sys
from botocore.exceptions import ClientError
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from random import randint
# XXX this should be converted to use pytest
from nose.plugins.attrib import attr
import boto3
-from boto.s3.connection import S3Connection
import datetime
from cloudevents.http import from_http
from dateutil import parser
put_object_tagging, \
admin, \
set_rgw_config_option, \
- bash
+ bash, \
+ S3Connection, \
+ S3Bucket, \
+ S3Key, \
+ MultipartUpload
from nose import SkipTest
from nose.tools import assert_not_equal, assert_equal, assert_in, assert_not_in, assert_true
-import boto.s3.tagging
# configure logging for the tests module
class LogWrapper:
vstart_secret_key = get_secret_key()
conn = S3Connection(aws_access_key_id=vstart_access_key,
aws_secret_access_key=vstart_secret_key,
- is_secure=False, port=port_no, host=hostname,
- calling_format='boto.s3.connection.OrdinaryCallingFormat')
+ is_secure=False, port=port_no, host=hostname)
if no_retries:
conn.num_retries = 0
return conn
vstart_secret_key = get_secret_key()
conn = S3Connection(aws_access_key_id=vstart_access_key,
- aws_secret_access_key=vstart_secret_key,
- is_secure=False, port=port_no, host=hostname,
- calling_format='boto.s3.connection.OrdinaryCallingFormat')
+ aws_secret_access_key=vstart_secret_key,
+ is_secure=False, port=port_no, host=hostname)
return conn
assert_equal(rc, 0)
conn = S3Connection(aws_access_key_id=access_key,
- aws_secret_access_key=secret_key,
- is_secure=False, port=get_config_port(), host=get_config_host(),
- calling_format='boto.s3.connection.OrdinaryCallingFormat')
+ aws_secret_access_key=secret_key,
+ is_secure=False, port=get_config_port(), host=get_config_host())
return conn, arn
assert_equal(rc, 0)
conn = S3Connection(aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
- is_secure=False, port=get_config_port(), host=get_config_host(),
- calling_format='boto.s3.connection.OrdinaryCallingFormat')
+ is_secure=False, port=get_config_port(), host=get_config_host())
return conn
##############
s3_notification_conf.del_config()
topic_conf.del_config()
# delete the bucket
- bucket.delete_key(copy_of_key, version_id=ver3)
+ bucket.delete_key(copy_of_key.name, version_id=ver3)
bucket.delete_key(key.name, version_id=ver2)
bucket.delete_key(key.name, version_id=ver1)
#conn.delete_bucket(bucket_name)
"""test the topic migration procedure described at
https://docs.ceph.com/en/latest/radosgw/account/#migrate-an-existing-user-into-an-account
"""
+ # Initialize variables for cleanup in finally block
+ user1_bucket_name = None
+ user2_bucket_name = None
+ user1_id = None
+ account_id = None
+
try:
# create an http server for notification delivery
host = get_ip()
)
log.info("topic migration test has completed successfully")
finally:
- admin(["user", "rm", "--uid", user1_id, "--purge-data"], get_config_cluster())
- admin(
- ["bucket", "rm", "--bucket", user1_bucket_name, "--purge-data"],
- get_config_cluster(),
- )
- admin(
- ["bucket", "rm", "--bucket", user2_bucket_name, "--purge-data"],
- get_config_cluster(),
- )
- admin(["account", "rm", "--account-id", account_id], get_config_cluster())
+ if user1_id is not None:
+ admin(["user", "rm", "--uid", user1_id, "--purge-data"], get_config_cluster())
+ if user1_bucket_name is not None:
+ admin(
+ ["bucket", "rm", "--bucket", user1_bucket_name, "--purge-data"],
+ get_config_cluster(),
+ )
+ if user2_bucket_name is not None:
+ admin(
+ ["bucket", "rm", "--bucket", user2_bucket_name, "--purge-data"],
+ get_config_cluster(),
+ )
+ if account_id is not None:
+ admin(["account", "rm", "--account-id", account_id], get_config_cluster())
def persistent_notification_shard_config_change(endpoint_type, conn, new_num_shards, old_num_shards=11):
""" test persistent notification shard config change """