--- /dev/null
+import json
+import boto
+
+def append_attr_value(d, attr, attrv):
+ if attrv and len(str(attrv)) > 0:
+ d[attr] = attrv
+
+def append_attr(d, k, attr):
+ try:
+ attrv = getattr(k, attr)
+ except:
+ return
+ append_attr_value(d, attr, attrv)
+
+def get_attrs(k, attrs):
+ d = {}
+ for a in attrs:
+ append_attr(d, k, a)
+
+ return d
+
+def append_query_arg(s, n, v):
+ if not v:
+ return s
+ nv = '{n}={v}'.format(n=n, v=v)
+ if not s:
+ return nv
+ return '{s}&{nv}'.format(s=s, nv=nv)
+
+class KeyJSONEncoder(boto.s3.key.Key):
+ @staticmethod
+ def default(k, versioned=False):
+ attrs = ['bucket', 'name', 'size', 'last_modified', 'metadata', 'cache_control',
+ 'content_type', 'content_disposition', 'content_language',
+ 'owner', 'storage_class', 'md5', 'version_id', 'encrypted',
+ 'delete_marker', 'expiry_date', 'VersionedEpoch', 'RgwxTag']
+ d = get_attrs(k, attrs)
+ d['etag'] = k.etag[1:-1]
+ if versioned:
+ d['is_latest'] = k.is_latest
+ return d
+
+class DeleteMarkerJSONEncoder(boto.s3.key.Key):
+ @staticmethod
+ def default(k):
+ attrs = ['name', 'version_id', 'last_modified', 'owner']
+ d = get_attrs(k, attrs)
+ d['delete_marker'] = True
+ d['is_latest'] = k.is_latest
+ return d
+
+class UserJSONEncoder(boto.s3.user.User):
+ @staticmethod
+ def default(k):
+ attrs = ['id', 'display_name']
+ return get_attrs(k, attrs)
+
+class BucketJSONEncoder(boto.s3.bucket.Bucket):
+ @staticmethod
+ def default(k):
+ attrs = ['name', 'creation_date']
+ return get_attrs(k, attrs)
+
+class BotoJSONEncoder(json.JSONEncoder):
+ def default(self, obj):
+ if isinstance(obj, boto.s3.key.Key):
+ return KeyJSONEncoder.default(obj)
+ if isinstance(obj, boto.s3.deletemarker.DeleteMarker):
+ return DeleteMarkerJSONEncoder.default(obj)
+ if isinstance(obj, boto.s3.user.User):
+ return UserJSONEncoder.default(obj)
+ if isinstance(obj, boto.s3.prefix.Prefix):
+ return (lambda x: {'prefix': x.name})(obj)
+ if isinstance(obj, boto.s3.bucket.Bucket):
+ return BucketJSONEncoder.default(obj)
+ return json.JSONEncoder.default(self, obj)
+
+
+def dump_json(o, cls=BotoJSONEncoder):
+ return json.dumps(o, cls=cls, indent=4)
+
+
import boto
import boto.s3.connection
+import dateutil.parser
+
from nose.tools import eq_ as eq
try:
from itertools import izip_longest as zip_longest
from itertools import zip_longest
from rgw_multi.multisite import *
+from rgw_multi.tools import *
log = logging.getLogger(__name__)
+def get_key_ver(k):
+ if not k.version_id:
+ return 'null'
+ return k.version_id
+
def check_object_eq(k1, k2, check_extra = True):
assert k1
assert k2
# eq(k1.content_disposition, k2.content_disposition)
# eq(k1.content_language, k2.content_language)
eq(k1.etag, k2.etag)
- eq(k1.last_modified, k2.last_modified)
+ mtime1 = dateutil.parser.parse(k1.last_modified)
+ mtime2 = dateutil.parser.parse(k2.last_modified)
+ assert abs((mtime1 - mtime2).total_seconds()) < 1 # handle different time resolution
if check_extra:
eq(k1.owner.id, k2.owner.id)
eq(k1.owner.display_name, k2.owner.display_name)
# eq(k1.storage_class, k2.storage_class)
eq(k1.size, k2.size)
- eq(k1.version_id, k2.version_id)
+ eq(get_key_ver(k1), get_key_ver(k2))
# eq(k1.encrypted, k2.encrypted)
def make_request(conn, method, bucket, key, query_args, headers):
raise boto.exception.S3ResponseError(result.status, result.reason, result.read())
return result
-def dump_json(o):
- return json.dumps(o, indent=4)
-
def append_query_arg(s, n, v):
if not v:
return s
def __init__(self, conn, bucket_name, query, query_args = None, marker = None):
self.conn = conn
self.bucket_name = bucket_name or ''
+ if bucket_name:
+ self.bucket = boto.s3.bucket.Bucket(name=bucket_name)
+ else:
+ self.bucket = None
self.query = query
self.query_args = query_args
self.max_keys = None
self.marker = marker
- def search(self):
+ def raw_search(self):
q = self.query or ''
query_args = append_query_arg(self.query_args, 'query', urllib.quote_plus(q))
if self.max_keys is not None:
headers = {}
result = make_request(self.conn, "GET", bucket=self.bucket_name, key='', query_args=query_args, headers=headers)
- return json.loads(result.read())
+ l = []
-class ESZoneBucket:
- def __init__(self, zone_conn, name, conn):
- self.zone_conn = zone_conn
- self.name = name
- self.conn = conn
+ result_dict = json.loads(result.read())
- self.bucket = boto.s3.bucket.Bucket(name=name)
+ for entry in result_dict['Objects']:
+ k = boto.s3.key.Key(self.bucket, entry['Key'])
- def get_all_versions(self):
+ k.version_id = entry['Instance']
+ k.etag = entry['ETag']
+ k.owner = boto.s3.user.User(id=entry['Owner']['ID'], display_name=entry['Owner']['DisplayName'])
+ k.last_modified = entry['LastModified']
+ k.size = entry['Size']
+ k.content_type = entry['ContentType']
+ k.versioned_epoch = entry['VersionedEpoch']
- marker = None
- is_done = False
+ k.metadata = {}
+ for e in entry['CustomMetadata']:
+ k.metadata[e['Name']] = e['Value']
+
+ l.append(k)
+ return result_dict, l
+
+ def search(self, drain = True, sort = True, sort_key = None):
l = []
+ is_done = False
+
while not is_done:
- req = MDSearch(self.conn, self.name, 'bucket == ' + self.name, marker=marker)
+ result, result_keys = self.raw_search()
+
+ l = l + result_keys
- result = req.search()
+ is_done = not (drain and (result['IsTruncated'] == "true"))
+ marker = result['Marker']
- for entry in result['Objects']:
- k = boto.s3.key.Key(self.bucket, entry['Key'])
+ if sort:
+ if not sort_key:
+ sort_key = lambda k: (k.name, -k.versioned_epoch)
+ l.sort(key = sort_key)
- k.version_id = entry['Instance']
- k.etag = entry['ETag']
- k.owner = boto.s3.user.User(id=entry['Owner']['ID'], display_name=entry['Owner']['DisplayName'])
- k.last_modified = entry['LastModified']
- k.size = entry['Size']
- k.content_type = entry['ContentType']
- k.versioned_epoch = entry['VersionedEpoch']
+ return l
- k.metadata = {}
- for e in entry['CustomMetadata']:
- k.metadata[e['Name']] = e['Value']
- l.append(k)
- is_done = (result['IsTruncated'] == "false")
- marker = result['Marker']
+class ESZoneBucket:
+ def __init__(self, zone_conn, name, conn):
+ self.zone_conn = zone_conn
+ self.name = name
+ self.conn = conn
+
+ self.bucket = boto.s3.bucket.Bucket(name=name)
+
+ def get_all_versions(self):
+
+ marker = None
+ is_done = False
- l.sort(key = lambda l: (l.name, -l.versioned_epoch))
+ req = MDSearch(self.conn, self.name, 'bucket == ' + self.name, marker=marker)
- for k in l:
+ for k in req.search():
yield k
for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()):
if k1 is None:
- log.critical('key=%s is missing from zone=%s', k2.name, self.self.name)
+ log.critical('key=%s is missing from zone=%s', k2.name, self.name)
assert False
if k2 is None:
log.critical('key=%s is missing from zone=%s', k1.name, zone_conn.name)