global lrgw
lrgw = None
+###### Usage #######
+USAGE = """
+obsync synchronizes S3, Rados, and local objects. The source and destination
+can both be local or both remote.
+
+Examples:
+# copy contents of mybucket to disk
+obsync -v s3://myhost/mybucket file://mydir
+
+# copy contents of mydir to an S3 bucket
+obsync -v file://mydir s3://myhost/mybucket
+
+# synchronize two S3 buckets
+SRC_AKEY=... SRC_SKEY=... \
+DST_AKEY=... DST_SKEY=... \
+obsync -v s3://myhost/mybucket1 s3://myhost2/mybucket2
+ --xuser bob=robert --xuser joe=joseph -O bob
+
+Note: You must specify an AWS access key and secret access key when accessing
+S3. obsync honors these environment variables:
+SRC_AKEY Access key for the source URL
+SRC_SKEY Secret access key for the source URL
+DST_AKEY Access key for the destination URL
+DST_SKEY Secret access key for the destination URL
+AKEY Access key for both source and dest
+SKEY Secret access key for both source and dest
+DST_CONSISTENCY Set to 'eventual' if the destination is eventually consistent
+
+If these environment variables are not given, we will fall back on libboto
+defaults.
+
+obsync (options) [source] [destination]"""
+
###### Constants #######
ACL_XATTR = "rados.acl"
META_XATTR_PREFIX = "rados.meta."
print s
###### Exception classes #######
-class InvalidLocalName(Exception):
- pass
+class ObsyncException(Exception):
+ def __init__(self, ty, e):
+ if (isinstance(e, str)):
+ # from a string
+ self.tb = "".join(traceback.format_stack())
+ self.comment = e
+ else:
+ # from another exception
+ self.tb = format_exc(100000)
+ self.comment = None
+ self.ty = ty
+
+""" A temporary obsync exception.
+The user may want to retry the operation that failed.
+We can create one of these from a string or from another exception.
+"""
+class ObsyncTemporaryException(ObsyncException):
+ def __init__(self, e):
+ ObsyncException.__init__(self, "temporary", e)
+
+""" A permanent obsync exception.
+We can create one of these from a string or from another exception.
+"""
+class ObsyncPermanentException(ObsyncException):
+ def __init__(self, e):
+ ObsyncException.__init__(self, "permanent", e)
+
+""" An exception we encountered while parsing input arguments.
+"""
+class ObsyncArgumentParsingException(ObsyncException):
+ def __init__(self, e):
+ ObsyncException.__init__(self, "argument_parsing", e)
-class NonexistentStore(Exception):
- pass
+""" Print out some exception information and exit the program.
+Normally, this function expects to see Obsync exceptions. If a random exception
+slips through, we treat it as an unknown error type.
+"""
+def print_obsync_exception_and_abort(e, origin):
+ if (isinstance(e, ObsyncException)):
+ if (isinstance(e, ObsyncArgumentParsingException)):
+ print >>stderr, USAGE
+ else:
+ print >>stderr, e.tb
+ if (e.comment != None):
+ print >>stderr, e.comment
+ print >>stderr, ("ERROR TYPE: %s, ORIGIN: %s" % (e.ty, origin))
+ sys.exit(1)
+ else:
+ traceback.print_exc(100000, stderr)
+ print >>stderr, "ERROR TYPE: unknown, ORIGIN: %s" % origin
+ sys.exit(1)
###### Extended Attributes #######
def test_xattr_support(path):
try:
xattr.set(test_file, "test", "123", namespace=xattr.NS_USER)
if xattr.get(test_file, "test", namespace=xattr.NS_USER) != "123":
- raise Exception("test_xattr_support: failed to set an xattr and " + \
+ raise ObsyncPermanentException("test_xattr_support: failed to set an xattr and " + \
"read it back.")
except IOError, e:
print >>stderr, "**** ERRROR: You do not appear to have xattr support " + \
"at %s ****" % path
- raise
+ raise ObsyncPermanentException(exc)
finally:
os.unlink(test_file)
os.makedirs(path)
except OSError, exc:
if exc.errno != errno.EEXIST:
- raise
+ raise ObsyncTemporaryException(exc)
if (not os.path.isdir(path)):
- raise
+ raise ObsyncTemporaryException(exc)
def bytes_to_str(b):
return ''.join(["%02x"% ord(x) for x in b]).strip()
local_name = re.sub(r'\$slash', "/", local_name)
mre = re.compile("[$][^$]")
if mre.match(local_name):
- raise InvalidLocalName("Local name contains a dollar sign escape \
+ raise ObsyncPermanentException("Local name contains a dollar sign escape \
sequence we don't understand.")
local_name = re.sub(r'\$\$', "$", local_name)
return local_name
for ut in [ ACL_TYPE_CANON_USER, ACL_TYPE_EMAIL_USER, ACL_TYPE_GROUP ]:
if utype[:len(ut)] == ut:
return ut
- raise Exception("unknown user type for user %s" % utype)
+ raise ObsyncPermanentException("unknown user type for user %s" % utype)
def strip_user_type(utype):
for ut in [ ACL_TYPE_CANON_USER, ACL_TYPE_EMAIL_USER, ACL_TYPE_GROUP ]:
if utype[:len(ut)] == ut:
return utype[len(ut):]
- raise Exception("unknown user type for user %s" % utype)
+ raise ObsyncPermanentException("unknown user type for user %s" % utype)
def grantee_attribute_to_user_type(utype):
if (utype == "Canonical User"):
elif (utype == "EmailUser"):
return ACL_TYPE_EMAIL_USER
else:
- raise Exception("unknown user type for user %s" % utype)
+ raise ObsyncPermanentException("unknown user type for user %s" % utype)
def user_type_to_attr(t):
if (t == ACL_TYPE_CANON_USER):
elif (t == ACL_TYPE_EMAIL_USER):
return "EmailUser"
else:
- raise Exception("unknown user type %s" % t)
+ raise ObsyncPermanentException("unknown user type %s" % t)
def add_user_type(user):
""" All users that are not specifically marked as something else
if out1 != out2:
print "out1 = %s" % out1
print "out2 = %s" % out2
- raise Exception("compare xml failed")
+ raise ObsyncPermanentException("compare xml failed")
#<?xml version="1.0" encoding="UTF-8"?>
def test_acl_policy():
if e.errno == 2:
return meta
else:
- raise
+ raise ObsyncTemporaryException(e)
for k,v in xlist:
if xattr_is_metadata(k):
meta[k] = v
dst_owner = None
if (is_dst):
if not os.environ.has_key("DST_OWNER"):
- raise Exception("You must set DST_OWNER when uploading \
-files to RgwStore.")
+ raise ObsyncArgumentParsingException("You must set \
+DST_OWNER when uploading files to RgwStore.")
dst_owner = os.environ["DST_OWNER"]
return RgwStore(rados_url, create, akey, skey, dst_owner)
file_url = strip_prefix("file://", url)
return FileStore(url, create)
if (url[0:2] == "./"):
return FileStore(url, create)
- raise Exception("Failed to find a prefix of s3://, file://, /, or ./ \
+ raise ObsyncPermanentException("Failed to find a prefix of s3://, file://, /, or ./ \
Cannot handle this URL.")
def __init__(self, url):
self.url = url
k_name = k[len(META_XATTR_PREFIX):]
key.set_metadata(k_name, v)
else:
- raise Exception("can't understand meta entry: %s" % k)
+ raise ObsyncPermanentException("can't understand meta entry: %s" % k)
class S3StoreIterator(object):
"""S3Store iterator"""
# Parse the s3 url
host_end = string.find(url, "/")
if (host_end == -1):
- raise Exception("S3Store URLs are of the form \
+ raise ObsyncPermanentException("S3Store URLs are of the form \
s3://host/bucket/key_prefix. Failed to find the host.")
self.host = url[0:host_end]
bucket_end = url.find("/", host_end+1)
self.bucket_name = url[host_end+1:bucket_end]
self.key_prefix = url[bucket_end+1:]
if (self.bucket_name == ""):
- raise Exception("S3Store URLs are of the form \
+ raise ObsyncPermanentException("S3Store URLs are of the form \
s3://host/bucket/key_prefix. Failed to find the bucket.")
if (opts.more_verbose):
print "self.host = '" + self.host + "', ",
if (self.bucket == None):
if (create):
if (opts.dry_run):
- raise Exception("logic error: this should be unreachable.")
+ raise ObsyncPermanentException("logic error: this should be unreachable.")
self.bucket = self.conn.create_bucket(bucket_name = self.bucket_name)
else:
- raise RuntimeError("%s: no such bucket as %s" % \
+ raise ObsyncPermanentException("%s: no such bucket as %s" % \
(url, self.bucket_name))
Store.__init__(self, "s3://" + url)
def __str__(self):
temp_file = tempfile.NamedTemporaryFile(mode='w+b', delete=False).name
try:
k.get_contents_to_filename(temp_file)
- except:
+ except Exception, e:
os.unlink(temp_file)
- raise
+ raise ObsyncTemporaryException(e)
return LocalCopy(obj.name, temp_file, True)
def all_objects(self):
blrs = self.bucket.list(prefix = self.key_prefix)
print >>stderr, "************* ACL: *************"
print >>stderr, str(xml)
print >>stderr, "********************************"
- raise
+ raise ObsyncTemporaryException(e)
def remove(self, obj):
if (opts.dry_run):
return
except boto.exception.S3ResponseError, e:
if (stime == -1):
- raise
+ raise ObsyncTemporaryException(e)
if (opts.verbose):
print "encountered s3 response error: ",
if (opts.more_verbose):
self.base = self.base[:-1]
if (create):
if (opts.dry_run):
- raise Exception("logic error: this should be unreachable.")
+ raise ObsyncPermanentException("logic error: this should be unreachable.")
mkdir_p(self.base)
elif (not os.path.isdir(self.base)):
- raise NonexistentStore()
+ raise ObsyncPermanentException("NonexistentStore")
Store.__init__(self, "file://" + url)
test_xattr_support(self.base)
def __str__(self):
#print "failed to get XML ACL from %s" % obj.local_name()
if e.errno == 61:
return LocalAcl.get_empty(obj.name)
- raise
+ raise ObsyncPermanentException(e)
return LocalAcl.from_xml(obj.name, xml)
def make_local_copy(self, obj):
return LocalCopy(obj.name, obj.local_path(self.base), False)
break
ret = self.rgw_store.obsync_obj_from_rgw(rados_obj.key)
if (ret == None):
- raise Exception("internal iterator error")
+ raise ObsyncPermanentException("internal iterator error")
return ret
class RgwStore(Store):
# Parse the rados url
conf_end = string.find(url, ":")
if (conf_end == -1):
- raise Exception("RgwStore URLs are of the form \
+ raise ObsyncPermanentException("RgwStore URLs are of the form \
rgw:path/to/ceph/conf:bucket:key_prefix. Failed to find the path to the conf.")
self.conf_file_path = url[0:conf_end]
bucket_end = url.find(":", conf_end+1)
self.rgw_bucket_name = url[conf_end+1:bucket_end]
self.key_prefix = url[bucket_end+1:]
if (self.rgw_bucket_name == ""):
- raise Exception("RgwStore URLs are of the form \
+ raise ObsyncPermanentException("RgwStore URLs are of the form \
rgw:/path/to/ceph/conf:pool:key_prefix. Failed to find the bucket.")
if (opts.more_verbose):
print "self.conf_file_path = '" + self.conf_file_path + "', ",
self.rados.conf_read_file(self.conf_file_path)
self.rados.connect()
if self.owner != None and not self.user_exists(ACL_TYPE_CANON_USER + self.owner):
- raise Exception("Unknown owner! DST_OWNER=%s" % self.owner)
+ raise ObsyncPermanentException("Unknown owner! DST_OWNER=%s" % self.owner)
if (not self.rados.pool_exists(self.rgw_bucket_name)):
if (create):
self.create_rgw_bucket(self.rgw_bucket_name)
else:
- raise NonexistentStore()
+ raise ObsyncPermanentException("NonexistentStore")
elif self.owner == None:
# Figure out what owner we should use when creating objects.
# We use the owner of the destination bucket
global lrgw
""" Create an rgw bucket named 'rgw_bucket_name' """
if (self.owner == None):
- raise Exception("Can't create a bucket without knowing who " +
- "should own it. Please set DST_OWNER")
+ raise ObsyncArgumentParsingException("Can't create a bucket \
+without knowing who should own it. Please set DST_OWNER")
self.rados.create_pool(self.rgw_bucket_name)
ioctx = None
try:
elif opts.more_verbose:
print "ignoring unknown xattr " + k
if (md5 == None):
- raise RuntimeError("error on object %s: expected to find " + \
+ raise ObsyncPermanentException("error on object %s: expected to find " + \
"extended attribute %s" % (obj_name, RGW_META_ETAG))
if (opts.more_verbose):
print "meta = " + str(meta)
break
off += 8192
temp_file_f.close()
- except:
+ except Exception, e:
if (temp_file_f):
temp_file_f.close()
if (temp_file):
os.unlink(temp_file.name)
- raise
+ raise ObsyncTemporaryException(e)
return LocalCopy(obj.name, temp_file.name, True)
def all_objects(self):
it = self.ioctx.list_objects()
self.user_exists_cache[user] = True
return True
elif user[:len(ACL_TYPE_EMAIL_USER)] == ACL_TYPE_EMAIL_USER:
- raise Exception("rgw target can't handle email users yet.")
+ raise ObsyncPermanentException("rgw target can't handle email users yet.")
elif user[:len(ACL_TYPE_GROUP)] == ACL_TYPE_GROUP:
- raise Exception("rgw target can't handle groups yet.")
+ raise ObsyncPermanentException("rgw target can't handle groups yet.")
else:
- raise Exception("can't understand user name %s" % user)
+ raise ObsyncPermanentException("can't understand user name %s" % user)
def upload(self, local_copy, src_acl, obj):
global lrgw
if (opts.more_verbose):
ap = src_acl.acl_policy
for user in ap.get_all_users():
if not self.user_exists(user):
- raise Exception("You must provide an --xuser entry to translate \
+ raise ObsyncPermanentException("You must provide an --xuser entry to translate \
user %s into something valid for the rgw destination.")
xml = ap.to_xml()
bin_ = lrgw.acl_xml2bin(xml)
self.ioctx.remove_object(obj.name)
if (opts.more_verbose):
print "RgwStore: removed %s" % obj.name
+
###### Functions #######
def delete_unreferenced(src, dst):
""" delete everything from dst that is not referenced in src """
if (opts.more_verbose):
print "handling deletes."
- for dobj in dst.all_objects():
- sobj = src.locate_object(dobj)
- if (sobj == None):
- dst.remove(dobj)
+ currently_handling = "destination"
+ try:
+ dst_all_objects = dst.all_objects()
+ while True:
+ dobj = dst_all_objects.next()
+ currently_handling = "source"
+ sobj = src.locate_object(dobj)
+ currently_handling = "destination"
+ if (sobj == None):
+ dst.remove(dobj)
+ except StopIteration:
+ pass
+ except Exception, e:
+ print_obsync_exception_and_abort(e, currently_handling)
def xuser_cb(opt, opt_str, value, parser):
""" handle an --xuser argument """
equals = value.find(r'=')
if equals == -1:
- print >>stderr, "Error parsing --xuser: You must give both a source \
-and destination user name, like so:\n\
+ raise ObsyncArgumentParsingException("Error parsing --xuser: You must \
+give both a source and destination user name, like so:\n\
--xuser SOURCE_USER=DEST_USER\n\
\n\
This will translate the user SOURCE_USER in the source to the user DEST_USER \n\
-in the destination."
- sys.exit(1)
+in the destination.")
src_user = value[:equals]
dst_user = value[equals+1:]
if ((len(src_user) == 0) or (len(dst_user) == 0)):
- print >>stderr, "Error parsing --xuser: can't have a zero-length \
-user name."
- sys.exit(1)
+ raise ObsyncArgumentParsingException("Error parsing --xuser: \
+can't have a zero-length user name.")
src_user = add_user_type(src_user)
dst_user = add_user_type(dst_user)
if (xuser.has_key(src_user)):
- print >>stderr, "Error parsing --xuser: we are already translating \
-\"%s\" to \"%s\"; we cannot translate it to \"%s\"" % \
-(src_user, xuser[src_user], dst_user)
- sys.exit(1)
+ raise ObsyncArgumentParsingException("Error parsing --xuser: \
+we are already translating \"%s\" to \"%s\"; we cannot translate it \
+to \"%s\"" % (src_user, xuser[src_user], dst_user))
xuser[src_user] = dst_user
-USAGE = """
-obsync synchronizes S3, Rados, and local objects. The source and destination
-can both be local or both remote.
-
-Examples:
-# copy contents of mybucket to disk
-obsync -v s3://myhost/mybucket file://mydir
-
-# copy contents of mydir to an S3 bucket
-obsync -v file://mydir s3://myhost/mybucket
-
-# synchronize two S3 buckets
-SRC_AKEY=... SRC_SKEY=... \
-DST_AKEY=... DST_SKEY=... \
-obsync -v s3://myhost/mybucket1 s3://myhost2/mybucket2
- --xuser bob=robert --xuser joe=joseph -O bob
-
-Note: You must specify an AWS access key and secret access key when accessing
-S3. obsync honors these environment variables:
-SRC_AKEY Access key for the source URL
-SRC_SKEY Secret access key for the source URL
-DST_AKEY Access key for the destination URL
-DST_SKEY Secret access key for the destination URL
-AKEY Access key for both source and dest
-SKEY Secret access key for both source and dest
-DST_CONSISTENCY Set to 'eventual' if the destination is eventually consistent
-
-If these environment variables are not given, we will fall back on libboto
-defaults.
-
-obsync (options) [source] [destination]"""
-
-parser = OptionParser(USAGE)
-parser.add_option("-n", "--dry-run", action="store_true", \
- dest="dry_run", default=False)
-parser.add_option("-c", "--create-dest", action="store_true", \
- dest="create", help="create the destination if it doesn't already exist")
-parser.add_option("--delete-before", action="store_true", \
- dest="delete_before", help="delete objects that aren't in SOURCE from \
-DESTINATION before transferring any objects")
-parser.add_option("--boto-retries", dest="boto_retries", type="int",
- help="set number of times we'll retry the same S3 operation")
-parser.add_option("-d", "--delete-after", action="store_true", \
- dest="delete_after", help="delete objects that aren't in SOURCE from \
-DESTINATION after doing all transfers.")
-parser.add_option("-L", "--follow-symlinks", action="store_true", \
- dest="follow_symlinks", help="follow symlinks (please avoid symlink " + \
- "loops when using this option!)")
-parser.add_option("--no-preserve-acls", action="store_true", \
- dest="no_preserve_acls", help="don't preserve ACLs when copying objects.")
-parser.add_option("-v", "--verbose", action="store_true", \
- dest="verbose", help="be verbose")
-parser.add_option("-V", "--more-verbose", action="store_true", \
- dest="more_verbose", help="be really, really verbose (developer mode)")
-parser.add_option("-x", "--xuser", type="string", nargs=1, action="callback", \
- dest="SRC=DST", callback=xuser_cb, help="set up a user tranlation. You \
-can specify multiple user translations with multiple --xuser arguments.")
-parser.add_option("--force", action="store_true", \
- dest="force", help="overwrite all destination objects, even if they \
-appear to be the same as the source objects.")
-parser.add_option("--unit", action="store_true", \
- dest="run_unit_tests", help="run unit tests and quit")
-xuser = {}
-(opts, args) = parser.parse_args()
-if (opts.run_unit_tests):
- test_acl_policy()
- sys.exit(0)
-
-if opts.boto_retries != None:
- if not boto.config.has_section('Boto'):
- boto.config.add_section('Boto')
- boto.config.set('Boto', 'num_retries', str(opts.boto_retries))
-
-opts.preserve_acls = not opts.no_preserve_acls
-if (opts.create and opts.dry_run):
- raise Exception("You can't run with both --create-dest and --dry-run! \
-By definition, a dry run never changes anything.")
-if (len(args) < 2):
- print >>stderr, "Expected two positional arguments: source and destination"
- print >>stderr, USAGE
- sys.exit(1)
-elif (len(args) > 2):
- print >>stderr, "Too many positional arguments."
- print >>stderr, USAGE
- sys.exit(1)
-if (opts.more_verbose):
- print >>stderr, "User translations:"
- for k,v in xuser.items():
- print >>stderr, "\"%s\" ==> \"%s\"" % (k, v)
- print >>stderr, ""
-if (opts.more_verbose):
- opts.verbose = True
- boto.set_stream_logger("stdout")
- boto.log.info("Enabling verbose boto logging.")
-if (opts.delete_before and opts.delete_after):
- print >>stderr, "It doesn't make sense to specify both --delete-before \
-and --delete-after."
- sys.exit(1)
-src_name = args[0]
-dst_name = args[1]
try:
+ currently_handling = "unknown"
+ parser = OptionParser(USAGE)
+ parser.add_option("-n", "--dry-run", action="store_true", \
+ dest="dry_run", default=False)
+ parser.add_option("-c", "--create-dest", action="store_true", \
+ dest="create", help="create the destination if it doesn't already exist")
+ parser.add_option("--delete-before", action="store_true", \
+ dest="delete_before", help="delete objects that aren't in SOURCE from \
+ DESTINATION before transferring any objects")
+ parser.add_option("--boto-retries", dest="boto_retries", type="int",
+ help="set number of times we'll retry the same S3 operation")
+ parser.add_option("-d", "--delete-after", action="store_true", \
+ dest="delete_after", help="delete objects that aren't in SOURCE from \
+ DESTINATION after doing all transfers.")
+ parser.add_option("-L", "--follow-symlinks", action="store_true", \
+ dest="follow_symlinks", help="follow symlinks (please avoid symlink " + \
+ "loops when using this option!)")
+ parser.add_option("--no-preserve-acls", action="store_true", \
+ dest="no_preserve_acls", help="don't preserve ACLs when copying objects.")
+ parser.add_option("-v", "--verbose", action="store_true", \
+ dest="verbose", help="be verbose")
+ parser.add_option("-V", "--more-verbose", action="store_true", \
+ dest="more_verbose", help="be really, really verbose (developer mode)")
+ parser.add_option("-x", "--xuser", type="string", nargs=1, action="callback", \
+ dest="SRC=DST", callback=xuser_cb, help="set up a user tranlation. You \
+ can specify multiple user translations with multiple --xuser arguments.")
+ parser.add_option("--force", action="store_true", \
+ dest="force", help="overwrite all destination objects, even if they \
+ appear to be the same as the source objects.")
+ parser.add_option("--unit", action="store_true", \
+ dest="run_unit_tests", help="run unit tests and quit")
+ xuser = {}
+ (opts, args) = parser.parse_args()
+ if (opts.run_unit_tests):
+ test_acl_policy()
+ sys.exit(0)
+
+ if opts.boto_retries != None:
+ if not boto.config.has_section('Boto'):
+ boto.config.add_section('Boto')
+ boto.config.set('Boto', 'num_retries', str(opts.boto_retries))
+
+ opts.preserve_acls = not opts.no_preserve_acls
+ if (opts.create and opts.dry_run):
+ raise ObsyncArgumentParsingException("You can't run with both \
+--create-dest and --dry-run! By definition, a dry run never changes anything.")
+
+ if (len(args) < 2):
+ raise ObsyncArgumentParsingException("Expected two positional \
+arguments: source and destination")
+ elif (len(args) > 2):
+ raise ObsyncArgumentParsingException("Too many positional arguments.")
+ if (opts.more_verbose):
+ print >>stderr, "User translations:"
+ for k,v in xuser.items():
+ print >>stderr, "\"%s\" ==> \"%s\"" % (k, v)
+ print >>stderr, ""
+ if (opts.more_verbose):
+ opts.verbose = True
+ boto.set_stream_logger("stdout")
+ boto.log.info("Enabling verbose boto logging.")
+ if (opts.delete_before and opts.delete_after):
+ raise ObsyncArgumentParsingException("It doesn't make sense to \
+specify both --delete-before and --delete-after.")
+ src_name = args[0]
+ dst_name = args[1]
+
+ currently_handling = "source"
if (opts.more_verbose):
print "SOURCE: " + src_name
- src = Store.make_store(src_name, False, False,
- getenv("SRC_AKEY", "AKEY"), getenv("SRC_SKEY", "SKEY"))
-except NonexistentStore, e:
- print >>stderr, "Fatal error: Source " + src_name + " does not exist."
- sys.exit(1)
-except Exception, e:
- print >>stderr, "error creating source: " + str(e)
- traceback.print_exc(100000, stderr)
- sys.exit(1)
-try:
+ try:
+ src = Store.make_store(src_name, False, False,
+ getenv("SRC_AKEY", "AKEY"), getenv("SRC_SKEY", "SKEY"))
+ except ObsyncException, e:
+ if (e.comment == "NonexistentStore"):
+ e.comment = "Fatal error: Source " + dst_name + " does " +\
+ "not appear to exist."
+ raise
+
+ currently_handling = "destination"
if (opts.more_verbose):
print "DESTINATION: " + dst_name
- dst = Store.make_store(dst_name, True, opts.create,
- getenv("DST_AKEY", "AKEY"), getenv("DST_SKEY", "SKEY"))
-except NonexistentStore, e:
- print >>stderr, "Fatal error: Destination " + dst_name + " does " +\
- "not exist. Run with -c or --create-dest to create it automatically."
- sys.exit(1)
-except Exception, e:
- print >>stderr, "error creating destination: " + str(e)
- traceback.print_exc(100000, stderr)
- sys.exit(1)
+ try:
+ dst = Store.make_store(dst_name, True, opts.create,
+ getenv("DST_AKEY", "AKEY"), getenv("DST_SKEY", "SKEY"))
+ except ObsyncException, e:
+ if (e.comment == "NonexistentStore"):
+ e.comment = "Fatal error: Destination " + dst_name + " does " +\
+ "not exist. Run with -c or --create-dest to create it automatically."
+ raise
-if (opts.delete_before):
- delete_unreferenced(src, dst)
+ if (opts.delete_before):
+ delete_unreferenced(src, dst)
-for sobj in src.all_objects():
- if (opts.more_verbose):
- print "handling " + sobj.name
- pline = ""
- dobj = dst.locate_object(sobj)
- upload = False
- src_acl = None
- dst_acl = None
- if (opts.force):
- if (opts.verbose):
- pline += "F " + sobj.name
- upload = True
- elif (dobj == None):
- if (opts.verbose):
- pline += "+ " + sobj.name
- upload = True
- elif not sobj.equals(dobj):
- if (opts.verbose):
- pline += "> " + sobj.name
- upload = True
- elif (opts.preserve_acls):
- # Do the ACLs match?
- src_acl = src.get_acl(sobj)
- dst_acl = dst.get_acl(dobj)
- src_acl.translate_users(xuser)
- #src_acl.set_owner()
- if (not src_acl.equals(dst_acl)):
+ currently_handling = "source"
+ src_all_objects = src.all_objects()
+ while True:
+ currently_handling = "source"
+ try:
+ sobj = src_all_objects.next()
+ except StopIteration:
+ break
+ if (opts.more_verbose):
+ print "handling " + sobj.name
+ pline = ""
+ currently_handling = "destination"
+ dobj = dst.locate_object(sobj)
+ upload = False
+ src_acl = None
+ dst_acl = None
+ if (opts.force):
+ if (opts.verbose):
+ pline += "F " + sobj.name
upload = True
+ elif (dobj == None):
if (opts.verbose):
- pline += "^ %s" % sobj.name
- else:
- if (opts.verbose):
- pline += ". " + sobj.name
- if (upload):
- if (not opts.preserve_acls):
- # Just default to an empty ACL
- src_acl = LocalAcl.get_empty(sobj.name)
+ pline += "+ " + sobj.name
+ upload = True
+ elif not sobj.equals(dobj):
+ if (opts.verbose):
+ pline += "> " + sobj.name
+ upload = True
+ elif (opts.preserve_acls):
+ # Do the ACLs match?
+ currently_handling = "source"
+ src_acl = src.get_acl(sobj)
+ currently_handling = "destination"
+ dst_acl = dst.get_acl(dobj)
+ currently_handling = "source"
+ src_acl.translate_users(xuser)
+ #src_acl.set_owner()
+ if (not src_acl.equals(dst_acl)):
+ upload = True
+ if (opts.verbose):
+ pline += "^ %s" % sobj.name
else:
- if (src_acl == None):
- src_acl = src.get_acl(sobj)
- src_acl.translate_users(xuser)
- #src_acl.set_owner()
- local_copy = src.make_local_copy(sobj)
- try:
- dst.upload(local_copy, src_acl, sobj)
- finally:
- local_copy.remove()
- if (pline != ""):
- print pline
+ if (opts.verbose):
+ pline += ". " + sobj.name
+ if (upload):
+ if (not opts.preserve_acls):
+ # Just default to an empty ACL
+ src_acl = LocalAcl.get_empty(sobj.name)
+ else:
+ if (src_acl == None):
+ currently_handling = "source"
+ src_acl = src.get_acl(sobj)
+ src_acl.translate_users(xuser)
+ #src_acl.set_owner()
+ currently_handling = "source"
+ local_copy = src.make_local_copy(sobj)
+ try:
+ currently_handling = "destination"
+ dst.upload(local_copy, src_acl, sobj)
+ finally:
+ local_copy.remove()
+ if (pline != ""):
+ print pline
-if (opts.delete_after):
- delete_unreferenced(src, dst)
+ if (opts.delete_after):
+ delete_unreferenced(src, dst)
+except Exception, e:
+ print_obsync_exception_and_abort(e, currently_handling)
if (opts.more_verbose):
print "finished."
-
sys.exit(0)