From: Yehuda Sadeh Date: Thu, 9 Apr 2015 21:38:54 +0000 (-0700) Subject: radosgw-sync: handle object removal X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ff16bd45c03821ed34fdf4c52612da8fa0499af2;p=radosgw-agent.git radosgw-sync: handle object removal Signed-off-by: Yehuda Sadeh --- diff --git a/radosgw_agent/client.py b/radosgw_agent/client.py index b65691d..846d5c0 100644 --- a/radosgw_agent/client.py +++ b/radosgw_agent/client.py @@ -248,22 +248,30 @@ def mark_delete_object(connection, bucket_name, obj, params=None): expect_json=False) @boto_call -def delete_versioned_object(connection, bucket_name, obj): +def delete_versioned_object(connection, bucket_name, obj, mtime): """ Perform a delete on a versioned object, the requirements for these types of requests is to be able to pass the ``versionID`` as a query argument """ # if obj.delete_marker is False we should not delete this and we shouldn't # have been called, so return without doing anything - if getattr(obj, 'delete_marker', False) is False: - log.info('obj: %s has `delete_marker=False`, will skip' % obj.name) - return + # if getattr(obj, 'delete_marker', False) is False: + # log.info('obj: %s has `delete_marker=False`, will skip' % obj.name) + # return params = {} - params['rgwx-version-id'] = obj.version_id - params['rgwx-versioned-epoch'] = obj.VersionedEpoch - params['versionID'] = obj.version_id + if getattr(obj, 'version_id', None) is not None: + params['rgwx-version-id'] = obj.version_id + params['versionID'] = obj.version_id + + if getattr(obj, 'VersionedEpoch', None) is not None: + params['rgwx-versioned-epoch'] = obj.VersionedEpoch + + headers = {} + if mtime is not None: + params['no-precondition-error'] = 'true' + headers['X-Amz-Delete-If-Unmodified-Since'] = mtime path = u'{bucket}/{object}'.format( bucket=bucket_name, @@ -272,18 +280,13 @@ def delete_versioned_object(connection, bucket_name, obj): return request(connection, 'delete', path, params=params, + headers=headers, expect_json=False) @boto_call -def delete_object(connection, bucket_name, obj): - if is_versioned(obj): - log.debug('performing a delete for versioned obj: %s' % obj.name) - delete_versioned_object(connection, bucket_name, obj) - else: - bucket = connection.get_bucket(bucket_name) - bucket.delete_key(obj.name) - +def delete_object(connection, bucket_name, obj, mtime=None): + delete_versioned_object(connection, bucket_name, obj, mtime) def is_versioned(obj): """ diff --git a/radosgw_agent/sync_tool.py b/radosgw_agent/sync_tool.py index 353b137..9ef1795 100644 --- a/radosgw_agent/sync_tool.py +++ b/radosgw_agent/sync_tool.py @@ -404,6 +404,10 @@ class SyncToolDataSync(object): if not self.worker.sync_object(bucket, obj): log.info('failed to sync {b}/{o}'.format(b=bucket, o=obj)) + def delete_object(self, bucket, obj, mtime): + if not self.worker.delete_object(bucket, obj, mtime): + log.info('failed to delete {b}/{o}'.format(b=bucket, o=obj)) + def get_bucket_instance(self, bucket): return self.worker.get_bucket_instance(bucket) @@ -599,7 +603,7 @@ class BILogIter(object): keys[key] = True for e in l: - yield (BILogIter.entry_to_obj(e), e.get('op_id')) + yield (BILogIter.entry_to_obj(e), e.get('op_id'), e.get('op')) def get_value_map(s): @@ -658,13 +662,13 @@ class ShardIter(object): for obj in client.list_objects_in_bucket(self.shard.sync_work.src_conn, self.shard.bucket, shard_id=self.shard.shard_id, marker=list_pos): marker = get_list_marker(obj.name, inc_pos) print 'marker=', marker - yield (ObjectEntry(obj, obj.last_modified, obj.RgwxTag), marker) + yield (ObjectEntry(obj, obj.last_modified, obj.RgwxTag), marker, 'write') # now continue from where we last stopped li = BILogIter(self.shard, inc_pos) - for (e, marker) in li.iterate(): - print 'marker=', marker - yield (e, marker) + for (e, marker, op) in li.iterate(): + print 'marker=', marker, 'op=', op + yield (e, marker, op) def sync_objs(self, sync, bucket, max_entries): retries = [] @@ -673,14 +677,17 @@ class ShardIter(object): count = 0 - for (obj, marker) in self.iterate_diff_objects(): + for (obj, marker, op) in self.iterate_diff_objects(): obj = Object(bucket, obj, sync) entry = obj.obj_entry log.info('sync bucket={b} object={o}'.format(b=bucket, o=entry.key)) - print 'sync obj={o}, marker={m} last-modified={l}'.format(o=entry.key, m=marker, l=entry.mtime) + log.info('sync obj={o}, marker={m} last-modified={l}'.format(o=entry.key, m=marker, l=entry.mtime)) - ret = obj.sync() + if op == 'del': + ret = obj.delete() + else: + ret = obj.sync() if ret is False: log.info('sync bucket={b} object={o} failed'.format(b=bucket, o=entry.key)) retries.append(entry.key) @@ -712,6 +719,13 @@ class Object(object): except: return False + def delete(self): + try: + self.sync_work.delete_object(self.bucket, self.obj_entry.key, self.obj_entry.mtime) + return True + except: + return False + def status(self): opstate_ret = client.get_op_state(self.sync_work.dest_conn, '', '', self.bucket, self.obj_entry) entries = [OpStateEntry(entry) for entry in opstate_ret] @@ -963,7 +977,7 @@ The commands are: si = ShardIter(shard) - for (obj, marker) in si.iterate_diff_objects(): + for (obj, marker, op) in si.iterate_diff_objects(): print obj, marker diff --git a/radosgw_agent/worker.py b/radosgw_agent/worker.py index 86f9778..c648755 100644 --- a/radosgw_agent/worker.py +++ b/radosgw_agent/worker.py @@ -266,6 +266,22 @@ class DataWorker(Worker): return True + def delete_object(self, bucket, obj, mtime): + log.debug('syncing object %s/%s', bucket, obj.name) + self.op_id += 1 + local_op_id = self.local_lock_id + ':' + str(self.op_id) + + try: + client.delete_object(self.dest_conn, bucket, obj, mtime) + except NotFound: + log.debug('object "%s/%s" not found', bucket, obj.name) + except Exception as error: + msg = 'encountered an error during sync' + dev_log.warn(msg, exc_info=True) + log.warning('%s: %s' % (msg, error)) + + return True + def wait_for_object(self, bucket, obj, until, local_op_id): while time.time() < until: try: