import boto.s3.connection
import dateutil.parser
+import datetime
import re
return 'null'
return k.version_id
+def unquote(s):
+ if s[0] == '"' and s[-1] == '"':
+ return s[1:-1]
+ return s
+
def check_object_eq(k1, k2, check_extra = True):
assert k1
assert k2
eq(k1.metadata, k2.metadata)
# eq(k1.cache_control, k2.cache_control)
eq(k1.content_type, k2.content_type)
- # eq(k1.content_encoding, k2.content_encoding)
- # eq(k1.content_disposition, k2.content_disposition)
- # eq(k1.content_language, k2.content_language)
- eq(k1.etag, k2.etag)
- # mtime1 = dateutil.parser.parse(k1.last_modified)
- # mtime2 = dateutil.parser.parse(k2.last_modified)
- # assert abs((mtime1 - mtime2).total_seconds()) < 1 # handle different time resolution
+ eq(k1.content_encoding, k2.content_encoding)
+ eq(k1.content_disposition, k2.content_disposition)
+ eq(k1.content_language, k2.content_language)
+
+ eq(unquote(k1.etag), unquote(k2.etag))
+
+ mtime1 = dateutil.parser.parse(k1.last_modified)
+ mtime2 = dateutil.parser.parse(k2.last_modified)
+ log.debug('k1.last_modified=%s k2.last_modified=%s', k1.last_modified, k2.last_modified)
+ assert abs((mtime1 - mtime2).total_seconds()) < 1 # handle different time resolution
# if check_extra:
# eq(k1.owner.id, k2.owner.id)
# eq(k1.owner.display_name, k2.owner.display_name)
raise boto.exception.S3ResponseError(result.status, result.reason, result.read())
return result
-def append_query_arg(s, n, v):
- if not v:
- return s
- nv = '{n}={v}'.format(n=n, v=v)
- if not s:
- return nv
- return '{s}&{nv}'.format(s=s, nv=nv)
-
-class CloudList:
- def __init__(self, conn, bucket_name, query, query_args = None, marker = None):
- self.conn = conn
- self.bucket_name = bucket_name or ''
- if bucket_name:
- self.bucket = boto.s3.bucket.Bucket(name=bucket_name)
+class CloudKey:
+ def __init__(self, zone_bucket, k):
+ self.zone_bucket = zone_bucket
+
+ # we need two keys: when listing buckets, we get keys that only contain partial data
+ # but we need to have the full data so that we could use all the meta-rgwx- headers
+ # that are needed in order to create a correct representation of the object
+ self.key = k
+ self.rgwx_key = k # assuming k has all the meta info on, if not then we'll update it in update()
+ self.update()
+
+ def update(self):
+ k = self.key
+ rk = self.rgwx_key
+
+ self.size = rk.size
+ orig_name = rk.metadata.get('rgwx-source-key')
+ if not orig_name:
+ self.rgwx_key = self.zone_bucket.bucket.get_key(k.name, version_id = k.version_id)
+ rk = self.rgwx_key
+ orig_name = rk.metadata.get('rgwx-source-key')
+
+ self.name = orig_name
+ self.version_id = rk.metadata.get('rgwx-source-version-id')
+
+ ve = rk.metadata.get('rgwx-versioned-epoch')
+ if ve:
+ self.versioned_epoch = int(ve)
else:
- self.bucket = None
- self.query = query
- self.query_args = query_args
- self.max_keys = None
- self.marker = marker
-
- def raw_search(self):
- q = self.query or ''
- query_args = append_query_arg(self.query_args, 'query', requests.compat.quote_plus(q))
- if self.max_keys is not None:
- query_args = append_query_arg(query_args, 'max-keys', self.max_keys)
- if self.marker:
- query_args = append_query_arg(query_args, 'marker', self.marker)
-
- query_args = append_query_arg(query_args, 'format', 'json')
-
- headers = {}
-
- result = make_request(self.conn, "GET", bucket=self.bucket_name, key='', query_args=query_args, headers=headers)
-
- l = []
-
- result_dict = json.loads(result.read())
+ self.versioned_epoch = 0
- for entry in result_dict['Objects']:
- bucket = self.conn.get_bucket(entry['Bucket'], validate = False)
- k = boto.s3.key.Key(bucket, entry['Key'])
+ mt = rk.metadata.get('rgwx-source-mtime')
+ if mt:
+ self.last_modified = datetime.datetime.utcfromtimestamp(float(mt)).strftime('%a, %d %b %Y %H:%M:%S GMT')
+ else:
+ self.last_modified = k.last_modified
+
+ et = rk.metadata.get('rgwx-source-etag')
+ if rk.etag.find('-') >= 0 or et.find('-') >= 0:
+ # in this case we will use the source etag as it was uploaded via multipart upload
+ # in one of the zones, so there's no way to make sure etags are calculated the same
+ # way. In the other case we'd just want to keep the etag that was generated in the
+ # regular upload mechanism, which should be consistent in both ends
+ self.etag = et
+ else:
+ self.etag = rk.etag
- k.version_id = entry['Instance']
- k.etag = entry['ETag']
- k.owner = boto.s3.user.User(id=entry['Owner']['ID'], display_name=entry['Owner']['DisplayName'])
- k.last_modified = entry['LastModified']
- k.size = entry['Size']
- k.content_type = entry['ContentType']
- k.versioned_epoch = entry['VersionedEpoch']
+ if k.etag[0] == '"' and self.etag[0] != '"': # inconsistent etag quoting when listing bucket vs object get
+ self.etag = '"' + self.etag + '"'
- k.metadata = {}
- for e in entry['CustomMetadata']:
- k.metadata[e['Name']] = str(e['Value']) # int values will return as int, cast to string for compatibility with object meta response
+ new_meta = {}
+ for meta_key, meta_val in k.metadata.iteritems():
+ if not meta_key.startswith('rgwx-'):
+ new_meta[meta_key] = meta_val
- l.append(k)
+ self.metadata = new_meta
- return result_dict, l
+ self.cache_control = k.cache_control
+ self.content_type = k.content_type
+ self.content_encoding = k.content_encoding
+ self.content_disposition = k.content_disposition
+ self.content_language = k.content_language
- def search(self, drain = True, sort = True, sort_key = None):
- l = []
- is_done = False
+ def get_contents_as_string(self):
+ r = self.key.get_contents_as_string()
- while not is_done:
- result, result_keys = self.raw_search()
+ # the previous call changed the status of the source object, as it loaded
+ # its metadata
- l = l + result_keys
+ self.rgwx_key = self.key
+ self.update()
- is_done = not (drain and (result['IsTruncated'] == "true"))
- marker = result['Marker']
+ return r
- if sort:
- if not sort_key:
- sort_key = lambda k: (k.name, -k.versioned_epoch)
- l.sort(key = sort_key)
-
- return l
+def append_query_arg(s, n, v):
+ if not v:
+ return s
+ nv = '{n}={v}'.format(n=n, v=v)
+ if not s:
+ return nv
+ return '{s}&{nv}'.format(s=s, nv=nv)
class CloudZoneBucket:
self.bucket = self.cloud_conn.get_bucket(self.target_bucket)
def get_all_versions(self):
- for o in self.bucket.get_all_keys(prefix=self.target_prefix):
- new_name = o.name[len(self.target_prefix):]
- log.debug('bucket=%s obj=%s new_name=', self.bucket.name, new_name)
- o.name = new_name
- yield o
+ l = []
+
+ for k in self.bucket.get_all_keys(prefix=self.target_prefix):
+ new_key = CloudKey(self, k)
+
+ log.debug('appending o=[\'%s\', \'%s\', \'%d\']', new_key.name, new_key.version_id, new_key.versioned_epoch)
+ l.append(new_key)
+
+
+ sort_key = lambda k: (k.name, -k.versioned_epoch)
+ l.sort(key = sort_key)
+
+ for new_key in l:
+ yield new_key
+
+ def get_key(self, name, version_id=None):
+ return CloudKey(self, self.bucket.get_key(name, version_id=version_id))
def parse_endpoint(endpoint):