]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
obsync: handle eventual consistency issues
authorColin Patrick McCabe <cmccabe@alumni.cmu.edu>
Wed, 11 May 2011 18:30:04 +0000 (11:30 -0700)
committerColin Patrick McCabe <cmccabe@alumni.cmu.edu>
Wed, 11 May 2011 21:00:51 +0000 (14:00 -0700)
Handle eventual consistency issues so that obsync will be usable on more
S3 stores.

Signed-off-by: Colin McCabe <colin.mccabe@dreamhost.com>
src/obsync/obsync.py

index 44abf46b5e236fbc13bf811d6e44a41233b45ad0..63cbeebcd24a5ff6be6dd394bb65c91cb9b8264f 100755 (executable)
@@ -34,9 +34,13 @@ import shutil
 import string
 import sys
 import tempfile
+import time
 import traceback
 
+# Command-line options 
 global opts
+
+# Translation table mapping users in the source to users in the destination.
 global xuser
 
 ###### Constants classes #######
@@ -540,16 +544,20 @@ s3://host/bucket/key_prefix. Failed to find the bucket.")
         #k.set_metadata("Content-Type", mime)
         k.set_contents_from_filename(local_copy.path)
         if (src_acl.acl_policy != None):
-            try:
-                xml = src_acl.acl_policy.to_xml()
-                self.bucket.set_xml_acl(xml, k)
-            except Exception, e:
+            for retry_num in S3RetryIterator.create():
+                try:
+                    xml = src_acl.acl_policy.to_xml()
+                    self.bucket.set_xml_acl(xml, k)
+                    ex = None
+                except boto.exception.S3ResponseError, e:
+                    ex = e
+            if (ex):
                 print >>stderr, "ERROR SETTING ACL on object '" + sobj.name + "'"
                 print >>stderr
                 print >>stderr, "************* ACL: *************"
                 print >>stderr, str(xml)
                 print >>stderr, "********************************"
-                raise
+                raise ex
 
     def remove(self, obj):
         if (opts.dry_run):
@@ -558,6 +566,36 @@ s3://host/bucket/key_prefix. Failed to find the bucket.")
         if (opts.more_verbose):
             print "S3Store: removed %s" % obj.name
 
+# Some S3 servers offer "eventual consistency."
+# What this means is that after a change has been made, like the creation of an
+# object, it takes some time for this change to become visible to everyone.
+# This potentially includes the client making the change.
+#
+# This means we need to implement a retry mechanism for certain operations.
+# For example, setting the ACL on a newly created object may fail with an
+# "object not found" error if the object creation hasn't yet become visible to
+# us.
+class S3RetryIterator(object):
+    def __init__(self, cur_try, try_times):
+        self.cur_try = cur_try
+        self.try_times = try_times
+    def __iter__(self):
+        return self
+    def next(self):
+        t = self.cur_try
+        if (self.cur_try >= len(self.try_times)):
+            raise StopIteration
+        sleep_time = self.try_times[self.cur_try]
+        if (opts.more_verbose):
+            print "retrying operation after " + str(sleep_time) + \
+                " second delay"
+        time.sleep(sleep_time)
+        self.cur_try = self.cur_try + 1
+        return t
+    @staticmethod
+    def create():
+        return S3RetryIterator(0, [5, 15, 60])
+
 ###### FileStore #######
 class FileStoreIterator(object):
     """FileStore iterator"""
@@ -960,21 +998,22 @@ if (opts.delete_before):
 for sobj in src.all_objects():
     if (opts.more_verbose):
         print "handling " + sobj.name
+    pline = ""
     dobj = dst.locate_object(sobj)
     upload = False
     src_acl = None
     dst_acl = None
     if (opts.force):
         if (opts.verbose):
-            print "F " + sobj.name
+            pline += "F " + sobj.name
         upload = True
     elif (dobj == None):
         if (opts.verbose):
-            print "+ " + sobj.name
+            pline += "+ " + sobj.name
         upload = True
     elif not sobj.equals(dobj):
         if (opts.verbose):
-            print "> " + sobj.name
+            pline += "> " + sobj.name
         upload = True
     elif (opts.preserve_acls):
         # Do the ACLs match?
@@ -985,10 +1024,10 @@ for sobj in src.all_objects():
         if (not src_acl.equals(dst_acl)):
             upload = True
             if (opts.verbose):
-                print "^ " + sobj.name
+                pline += "^ %s" % sobj.name
     else:
         if (opts.verbose):
-            print ". " + sobj.name
+            pline += ". " + sobj.name
     if (upload):
         if (not opts.preserve_acls):
             # Just default to an empty ACL
@@ -1003,6 +1042,8 @@ for sobj in src.all_objects():
             dst.upload(local_copy, src_acl, sobj)
         finally:
             local_copy.remove()
+    if (pline != ""):
+        print pline
 
 if (opts.delete_after):
     delete_unreferenced(src, dst)