]> git-server-git.apps.pok.os.sepia.ceph.com Git - radosgw-agent.git/commitdiff
radosgw-sync: handle object removal
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 9 Apr 2015 21:38:54 +0000 (14:38 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 14 Apr 2015 22:40:51 +0000 (15:40 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
radosgw_agent/client.py
radosgw_agent/sync_tool.py
radosgw_agent/worker.py

index b65691da467c008fd50a3873496e493ab84fc701..846d5c0bbfe6b50a6029eafcdf76f9c51a578105 100644 (file)
@@ -248,22 +248,30 @@ def mark_delete_object(connection, bucket_name, obj, params=None):
                    expect_json=False)
 
 @boto_call
-def delete_versioned_object(connection, bucket_name, obj):
+def delete_versioned_object(connection, bucket_name, obj, mtime):
     """
     Perform a delete on a versioned object, the requirements for these types
     of requests is to be able to pass the ``versionID`` as a query argument
     """
     # if obj.delete_marker is False we should not delete this and we shouldn't
     # have been called, so return without doing anything
-    if getattr(obj, 'delete_marker', False) is False:
-        log.info('obj: %s has `delete_marker=False`, will skip' % obj.name)
-        return
+    if getattr(obj, 'delete_marker', False) is False:
+        log.info('obj: %s has `delete_marker=False`, will skip' % obj.name)
+        return
 
     params = {}
 
-    params['rgwx-version-id'] = obj.version_id
-    params['rgwx-versioned-epoch'] = obj.VersionedEpoch
-    params['versionID'] = obj.version_id
+    if getattr(obj, 'version_id', None) is not None:
+        params['rgwx-version-id'] = obj.version_id
+        params['versionID'] = obj.version_id
+
+    if getattr(obj, 'VersionedEpoch', None) is not None:
+        params['rgwx-versioned-epoch'] = obj.VersionedEpoch
+
+    headers = {}
+    if mtime is not None:
+        params['no-precondition-error'] = 'true'
+        headers['X-Amz-Delete-If-Unmodified-Since'] = mtime
 
     path = u'{bucket}/{object}'.format(
         bucket=bucket_name,
@@ -272,18 +280,13 @@ def delete_versioned_object(connection, bucket_name, obj):
 
     return request(connection, 'delete', path,
                    params=params,
+                   headers=headers,
                    expect_json=False)
 
 
 @boto_call
-def delete_object(connection, bucket_name, obj):
-    if is_versioned(obj):
-        log.debug('performing a delete for versioned obj: %s' % obj.name)
-        delete_versioned_object(connection, bucket_name, obj)
-    else:
-        bucket = connection.get_bucket(bucket_name)
-        bucket.delete_key(obj.name)
-
+def delete_object(connection, bucket_name, obj, mtime=None):
+    delete_versioned_object(connection, bucket_name, obj, mtime)
 
 def is_versioned(obj):
     """
index 353b1373a32c1af35a975c86c55ac07384852a0a..9ef17955ce6172834078526c832cb730929650a1 100644 (file)
@@ -404,6 +404,10 @@ class SyncToolDataSync(object):
         if not self.worker.sync_object(bucket, obj):
             log.info('failed to sync {b}/{o}'.format(b=bucket, o=obj))
 
+    def delete_object(self, bucket, obj, mtime):
+        if not self.worker.delete_object(bucket, obj, mtime):
+            log.info('failed to delete {b}/{o}'.format(b=bucket, o=obj))
+
     def get_bucket_instance(self, bucket):
         return self.worker.get_bucket_instance(bucket)
 
@@ -599,7 +603,7 @@ class BILogIter(object):
                     keys[key] = True
 
         for e in l:
-            yield (BILogIter.entry_to_obj(e), e.get('op_id'))
+            yield (BILogIter.entry_to_obj(e), e.get('op_id'), e.get('op'))
 
 
 def get_value_map(s):
@@ -658,13 +662,13 @@ class ShardIter(object):
             for obj in client.list_objects_in_bucket(self.shard.sync_work.src_conn, self.shard.bucket, shard_id=self.shard.shard_id, marker=list_pos):
                 marker = get_list_marker(obj.name, inc_pos)
                 print 'marker=', marker
-                yield (ObjectEntry(obj, obj.last_modified, obj.RgwxTag), marker)
+                yield (ObjectEntry(obj, obj.last_modified, obj.RgwxTag), marker, 'write')
 
         # now continue from where we last stopped
         li = BILogIter(self.shard, inc_pos)
-        for (e, marker) in li.iterate():
-            print 'marker=', marker
-            yield (e, marker)
+        for (e, marker, op) in li.iterate():
+            print 'marker=', marker, 'op=', op
+            yield (e, marker, op)
 
     def sync_objs(self, sync, bucket, max_entries):
         retries = []
@@ -673,14 +677,17 @@ class ShardIter(object):
 
         count = 0
 
-        for (obj, marker) in self.iterate_diff_objects():
+        for (obj, marker, op) in self.iterate_diff_objects():
             obj = Object(bucket, obj, sync)
             entry = obj.obj_entry
             log.info('sync bucket={b} object={o}'.format(b=bucket, o=entry.key))
 
-            print 'sync obj={o}, marker={m} last-modified={l}'.format(o=entry.key, m=marker, l=entry.mtime)
+            log.info('sync obj={o}, marker={m} last-modified={l}'.format(o=entry.key, m=marker, l=entry.mtime))
 
-            ret = obj.sync()
+            if op == 'del':
+                ret = obj.delete()
+            else:
+                ret = obj.sync()
             if ret is False:
                 log.info('sync bucket={b} object={o} failed'.format(b=bucket, o=entry.key))
                 retries.append(entry.key)
@@ -712,6 +719,13 @@ class Object(object):
         except:
             return False
 
+    def delete(self):
+        try:
+            self.sync_work.delete_object(self.bucket, self.obj_entry.key, self.obj_entry.mtime)
+            return True
+        except:
+            return False
+
     def status(self):
         opstate_ret = client.get_op_state(self.sync_work.dest_conn, '', '', self.bucket, self.obj_entry)
         entries = [OpStateEntry(entry) for entry in opstate_ret]
@@ -963,7 +977,7 @@ The commands are:
 
             si = ShardIter(shard)
 
-            for (obj, marker) in si.iterate_diff_objects():
+            for (obj, marker, op) in si.iterate_diff_objects():
                 print obj, marker
 
 
index 86f97782541fffd89ca9ad53342d8dd3f3666c3a..c6487557b8b3624352a5866b7e1f85258d3f0ceb 100644 (file)
@@ -266,6 +266,22 @@ class DataWorker(Worker):
 
         return True
 
+    def delete_object(self, bucket, obj, mtime):
+        log.debug('syncing object %s/%s', bucket, obj.name)
+        self.op_id += 1
+        local_op_id = self.local_lock_id + ':' +  str(self.op_id)
+
+        try:
+            client.delete_object(self.dest_conn, bucket, obj, mtime)
+        except NotFound:
+            log.debug('object "%s/%s" not found', bucket, obj.name)
+        except Exception as error:
+            msg = 'encountered an error during sync'
+            dev_log.warn(msg, exc_info=True)
+            log.warning('%s: %s' % (msg, error))
+
+        return True
+
     def wait_for_object(self, bucket, obj, until, local_op_id):
         while time.time() < until:
             try: