expect_json=False)
@boto_call
-def delete_versioned_object(connection, bucket_name, obj):
+def delete_versioned_object(connection, bucket_name, obj, mtime):
"""
Perform a delete on a versioned object, the requirements for these types
of requests is to be able to pass the ``versionID`` as a query argument
"""
# if obj.delete_marker is False we should not delete this and we shouldn't
# have been called, so return without doing anything
- if getattr(obj, 'delete_marker', False) is False:
- log.info('obj: %s has `delete_marker=False`, will skip' % obj.name)
- return
+ # if getattr(obj, 'delete_marker', False) is False:
+ # log.info('obj: %s has `delete_marker=False`, will skip' % obj.name)
+ # return
params = {}
- params['rgwx-version-id'] = obj.version_id
- params['rgwx-versioned-epoch'] = obj.VersionedEpoch
- params['versionID'] = obj.version_id
+ if getattr(obj, 'version_id', None) is not None:
+ params['rgwx-version-id'] = obj.version_id
+ params['versionID'] = obj.version_id
+
+ if getattr(obj, 'VersionedEpoch', None) is not None:
+ params['rgwx-versioned-epoch'] = obj.VersionedEpoch
+
+ headers = {}
+ if mtime is not None:
+ params['no-precondition-error'] = 'true'
+ headers['X-Amz-Delete-If-Unmodified-Since'] = mtime
path = u'{bucket}/{object}'.format(
bucket=bucket_name,
return request(connection, 'delete', path,
params=params,
+ headers=headers,
expect_json=False)
@boto_call
-def delete_object(connection, bucket_name, obj):
- if is_versioned(obj):
- log.debug('performing a delete for versioned obj: %s' % obj.name)
- delete_versioned_object(connection, bucket_name, obj)
- else:
- bucket = connection.get_bucket(bucket_name)
- bucket.delete_key(obj.name)
-
+def delete_object(connection, bucket_name, obj, mtime=None):
+ delete_versioned_object(connection, bucket_name, obj, mtime)
def is_versioned(obj):
"""
if not self.worker.sync_object(bucket, obj):
log.info('failed to sync {b}/{o}'.format(b=bucket, o=obj))
+ def delete_object(self, bucket, obj, mtime):
+ if not self.worker.delete_object(bucket, obj, mtime):
+ log.info('failed to delete {b}/{o}'.format(b=bucket, o=obj))
+
def get_bucket_instance(self, bucket):
return self.worker.get_bucket_instance(bucket)
keys[key] = True
for e in l:
- yield (BILogIter.entry_to_obj(e), e.get('op_id'))
+ yield (BILogIter.entry_to_obj(e), e.get('op_id'), e.get('op'))
def get_value_map(s):
for obj in client.list_objects_in_bucket(self.shard.sync_work.src_conn, self.shard.bucket, shard_id=self.shard.shard_id, marker=list_pos):
marker = get_list_marker(obj.name, inc_pos)
print 'marker=', marker
- yield (ObjectEntry(obj, obj.last_modified, obj.RgwxTag), marker)
+ yield (ObjectEntry(obj, obj.last_modified, obj.RgwxTag), marker, 'write')
# now continue from where we last stopped
li = BILogIter(self.shard, inc_pos)
- for (e, marker) in li.iterate():
- print 'marker=', marker
- yield (e, marker)
+ for (e, marker, op) in li.iterate():
+ print 'marker=', marker, 'op=', op
+ yield (e, marker, op)
def sync_objs(self, sync, bucket, max_entries):
retries = []
count = 0
- for (obj, marker) in self.iterate_diff_objects():
+ for (obj, marker, op) in self.iterate_diff_objects():
obj = Object(bucket, obj, sync)
entry = obj.obj_entry
log.info('sync bucket={b} object={o}'.format(b=bucket, o=entry.key))
- print 'sync obj={o}, marker={m} last-modified={l}'.format(o=entry.key, m=marker, l=entry.mtime)
+ log.info('sync obj={o}, marker={m} last-modified={l}'.format(o=entry.key, m=marker, l=entry.mtime))
- ret = obj.sync()
+ if op == 'del':
+ ret = obj.delete()
+ else:
+ ret = obj.sync()
if ret is False:
log.info('sync bucket={b} object={o} failed'.format(b=bucket, o=entry.key))
retries.append(entry.key)
except:
return False
+ def delete(self):
+ try:
+ self.sync_work.delete_object(self.bucket, self.obj_entry.key, self.obj_entry.mtime)
+ return True
+ except:
+ return False
+
def status(self):
opstate_ret = client.get_op_state(self.sync_work.dest_conn, '', '', self.bucket, self.obj_entry)
entries = [OpStateEntry(entry) for entry in opstate_ret]
si = ShardIter(shard)
- for (obj, marker) in si.iterate_diff_objects():
+ for (obj, marker, op) in si.iterate_diff_objects():
print obj, marker
return True
+ def delete_object(self, bucket, obj, mtime):
+ log.debug('syncing object %s/%s', bucket, obj.name)
+ self.op_id += 1
+ local_op_id = self.local_lock_id + ':' + str(self.op_id)
+
+ try:
+ client.delete_object(self.dest_conn, bucket, obj, mtime)
+ except NotFound:
+ log.debug('object "%s/%s" not found', bucket, obj.name)
+ except Exception as error:
+ msg = 'encountered an error during sync'
+ dev_log.warn(msg, exc_info=True)
+ log.warning('%s: %s' % (msg, error))
+
+ return True
+
def wait_for_object(self, bucket, obj, until, local_op_id):
while time.time() < until:
try: