From d9896b3c3148fdd0852ed0fbd5229f1690b8bda2 Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Wed, 11 May 2011 11:30:04 -0700 Subject: [PATCH] obsync: handle eventual consistency issues Handle eventual consistency issues so that obsync will be usable on more S3 stores. Signed-off-by: Colin McCabe --- src/obsync/obsync.py | 61 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 51 insertions(+), 10 deletions(-) diff --git a/src/obsync/obsync.py b/src/obsync/obsync.py index 44abf46b5e236..63cbeebcd24a5 100755 --- a/src/obsync/obsync.py +++ b/src/obsync/obsync.py @@ -34,9 +34,13 @@ import shutil import string import sys import tempfile +import time import traceback +# Command-line options global opts + +# Translation table mapping users in the source to users in the destination. global xuser ###### Constants classes ####### @@ -540,16 +544,20 @@ s3://host/bucket/key_prefix. Failed to find the bucket.") #k.set_metadata("Content-Type", mime) k.set_contents_from_filename(local_copy.path) if (src_acl.acl_policy != None): - try: - xml = src_acl.acl_policy.to_xml() - self.bucket.set_xml_acl(xml, k) - except Exception, e: + for retry_num in S3RetryIterator.create(): + try: + xml = src_acl.acl_policy.to_xml() + self.bucket.set_xml_acl(xml, k) + ex = None + except boto.exception.S3ResponseError, e: + ex = e + if (ex): print >>stderr, "ERROR SETTING ACL on object '" + sobj.name + "'" print >>stderr print >>stderr, "************* ACL: *************" print >>stderr, str(xml) print >>stderr, "********************************" - raise + raise ex def remove(self, obj): if (opts.dry_run): @@ -558,6 +566,36 @@ s3://host/bucket/key_prefix. Failed to find the bucket.") if (opts.more_verbose): print "S3Store: removed %s" % obj.name +# Some S3 servers offer "eventual consistency." +# What this means is that after a change has been made, like the creation of an +# object, it takes some time for this change to become visible to everyone. +# This potentially includes the client making the change. +# +# This means we need to implement a retry mechanism for certain operations. +# For example, setting the ACL on a newly created object may fail with an +# "object not found" error if the object creation hasn't yet become visible to +# us. +class S3RetryIterator(object): + def __init__(self, cur_try, try_times): + self.cur_try = cur_try + self.try_times = try_times + def __iter__(self): + return self + def next(self): + t = self.cur_try + if (self.cur_try >= len(self.try_times)): + raise StopIteration + sleep_time = self.try_times[self.cur_try] + if (opts.more_verbose): + print "retrying operation after " + str(sleep_time) + \ + " second delay" + time.sleep(sleep_time) + self.cur_try = self.cur_try + 1 + return t + @staticmethod + def create(): + return S3RetryIterator(0, [5, 15, 60]) + ###### FileStore ####### class FileStoreIterator(object): """FileStore iterator""" @@ -960,21 +998,22 @@ if (opts.delete_before): for sobj in src.all_objects(): if (opts.more_verbose): print "handling " + sobj.name + pline = "" dobj = dst.locate_object(sobj) upload = False src_acl = None dst_acl = None if (opts.force): if (opts.verbose): - print "F " + sobj.name + pline += "F " + sobj.name upload = True elif (dobj == None): if (opts.verbose): - print "+ " + sobj.name + pline += "+ " + sobj.name upload = True elif not sobj.equals(dobj): if (opts.verbose): - print "> " + sobj.name + pline += "> " + sobj.name upload = True elif (opts.preserve_acls): # Do the ACLs match? @@ -985,10 +1024,10 @@ for sobj in src.all_objects(): if (not src_acl.equals(dst_acl)): upload = True if (opts.verbose): - print "^ " + sobj.name + pline += "^ %s" % sobj.name else: if (opts.verbose): - print ". " + sobj.name + pline += ". " + sobj.name if (upload): if (not opts.preserve_acls): # Just default to an empty ACL @@ -1003,6 +1042,8 @@ for sobj in src.all_objects(): dst.upload(local_copy, src_acl, sobj) finally: local_copy.remove() + if (pline != ""): + print pline if (opts.delete_after): delete_unreferenced(src, dst) -- 2.39.5