@staticmethod
def from_xml(s):
root = etree.parse(StringIO(s))
- owner_id = root.find("{%s}Owner/{%s}ID" % (NS,NS)).text
- owner_display_name = root.find("{%s}Owner/{%s}DisplayName" \
- % (NS,NS)).text
+ owner_id_node = root.find("{%s}Owner/{%s}ID" % (NS,NS))
+ if (owner_id_node != None):
+ owner_id = owner_id_node.text
+ else:
+ owner_id = None
+ owner_display_name_node = root.find("{%s}Owner/{%s}DisplayName" \
+ % (NS,NS))
+ if (owner_display_name_node != None):
+ owner_display_name = owner_display_name_node.text
+ else:
+ owner_display_name = None
grantlist = root.findall("{%s}AccessControlList/{%s}Grant" \
% (NS,NS))
grants = [ ]
return AclPolicy(owner_id, owner_display_name, grants)
def to_xml(self, omit_owner):
root = etree.Element("AccessControlPolicy", nsmap={None: NS})
- if (not omit_owner):
+ if ((not omit_owner) and self.owner_id):
owner = etree.SubElement(root, "Owner")
id_elem = etree.SubElement(owner, "ID")
id_elem.text = self.owner_id
- display_name_elem = etree.SubElement(owner, "DisplayName")
- display_name_elem.text = self.owner_display_name
+ if (self.owner_display_name and self.owner_display_name != ""):
+ display_name_elem = etree.SubElement(owner, "DisplayName")
+ display_name_elem.text = self.owner_display_name
access_control_list = etree.SubElement(root, "AccessControlList")
for g in self.grants:
grant_elem = etree.SubElement(access_control_list, "Grant")
grantee_elem = etree.SubElement(grant_elem, "{%s}Grantee" % NS,
nsmap={None: NS, "xsi" : NS2})
- grantee_elem.set("type", user_type_to_attr(get_user_type(g.user_id)))
+ grantee_elem.set("{%s}type" % NS2, user_type_to_attr(get_user_type(g.user_id)))
user_id_elem = etree.SubElement(grantee_elem, "{%s}ID" % NS)
user_id_elem.text = strip_user_type(g.user_id)
display_name_elem = etree.SubElement(grantee_elem, "{%s}DisplayName" % NS)
permission_elem.text = g.permission
return etree.tostring(root, encoding="UTF-8")
def translate_users(self, xusers):
- # Translate the owner for consistency, although most of the time we
- # don't write out the owner to the ACL.
- # Owner ids are always expressed in terms of canonical user id
- if (xusers.has_key(ACL_TYPE_CANON_USER + self.owner_id)):
- self.owner_id = \
- strip_user_type(xusers[ACL_TYPE_CANON_USER + self.owner_id])
- self.owner_display_name = ""
+ if (self.owner_id != None):
+ # Translate the owner for consistency, although most of the time we
+ # don't write out the owner to the ACL.
+ # Owner ids are always expressed in terms of canonical user id
+ if (xusers.has_key(ACL_TYPE_CANON_USER + self.owner_id)):
+ self.owner_id = \
+ strip_user_type(xusers[ACL_TYPE_CANON_USER + self.owner_id])
+ self.owner_display_name = ""
for g in self.grants:
g.translate_users(xusers)
def get_acl(self, obj):
acl = LocalAcl(obj.name)
acl_name = get_local_acl_file_name(obj.local_name())
- if (os.path.exists(acl_name)):
- acl.acl_path = self.base + "/" + acl_name
+ acl_path = self.base + "/" + acl_name
+# print "get_acl(obj.name = " + obj.name + ", acl_name=" + acl_name + \
+# ", acl_path = " + acl_path + ")"
+ if (os.path.exists(acl_path)):
+ acl.acl_path = acl_path
return acl
def make_local_copy(self, obj):
local_name = obj.local_name()
src_acl.translate_users(xuser)
if (not src_acl.equals(dst_acl)):
upload = True
- if (opts.verbose):
- print "^ " + sobj.name
+ if (opts.verbose):
+ print "^ " + sobj.name
dst_acl.remove()
else:
if (opts.verbose):
print ". " + sobj.name
if (upload):
- if (opts.preserve_acls and src_acl == None):
- src_acl = src.get_acl(sobj)
- src_acl.translate_users(xuser)
- else:
+ if (not opts.preserve_acls):
# Just default to an empty ACL
src_acl = LocalAcl(sobj.name)
+ else:
+ if (src_acl == None):
+ src_acl = src.get_acl(sobj)
+ src_acl.translate_users(xuser)
local_copy = src.make_local_copy(sobj)
try:
dst.upload(local_copy, src_acl, sobj)
if (opts.more_verbose):
for f in full:
print f,
- print " ",
print
return subprocess.call(full, stderr=opts.error_out, env=e)
if tdir != None and opts.keep_tempdir == False:
shutil.rmtree(tdir)
-def compare_directories(dir_a, dir_b):
+def compare_directories(dir_a, dir_b, ignore_acl = True, expect_same = True):
if (opts.verbose):
print "comparing directories %s and %s" % (dir_a, dir_b)
- subprocess.check_call(["diff", "-x", "*$acl", "-r", dir_a, dir_b])
+ full = ["diff", "-q"]
+ if (ignore_acl):
+ full.extend(["-x", "*$acl"])
+ full.extend(["-r", dir_a, dir_b])
+ ret = subprocess.call(full)
+ if ((ret == 0) and (not expect_same)):
+ print "expected the directories %s and %s to differ, but \
+they were the same!" % (dir_a, dir_b)
+ sys.exit(1)
+ if ((ret != 0) and expect_same):
+ print "expected the directories %s and %s to be the same, but \
+they were different!" % (dir_a, dir_b)
+ sys.exit(1)
def count_obj_in_dir(d):
"""counts the number of objects in a directory (WITHOUT recursing)"""
# test ACL transformations
obsync_check("file://%s/dira" % tdir, "file://%s/dirb" % tdir,
["-d", "-c", "--xuser", user1 + "=" + user2])
+# The transformation should result in different directories
+compare_directories("%s/dira" % tdir, "%s/dirb" % tdir, \
+ ignore_acl=False, expect_same=False)
+# Test ACL syncing. It should sync the ACLs even when the object data is
+# the same!
+obsync_check("file://%s/dirb" % tdir, "file://%s/dira" % tdir, ["-d"])
+obsync_check("file://%s/dira" % tdir, "file://%s/dirb" % tdir, ["-d"])
+compare_directories("%s/dira" % tdir, "%s/dirb" % tdir, \
+ ignore_acl=False, expect_same=True)
if (len(opts.buckets) >= 1):
# first, let's empty out the S3 bucket