]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Merge remote branch 'origin/next'
authorJosh Durgin <josh.durgin@dreamhost.com>
Wed, 18 May 2011 00:45:45 +0000 (17:45 -0700)
committerJosh Durgin <josh.durgin@dreamhost.com>
Wed, 18 May 2011 00:45:45 +0000 (17:45 -0700)
1  2 
src/Makefile.am
src/obsync/boto_tool
src/obsync/obsync
src/obsync/test-obsync.py
src/osd/PG.cc
src/osd/PG.h

diff --cc src/Makefile.am
Simple merge
index 0000000000000000000000000000000000000000,e60a040fea31079f7bf963d3c40be7e0a177d79b..04a60658ddb8a63675b369e16d0a23451a985878
mode 000000,100755..100755
--- /dev/null
@@@ -1,0 -1,287 +1,247 @@@
 -global conn
 -
 -def getenv(a):
 -    if os.environ.has_key(a):
 -        return os.environ[a]
 -    else:
 -        return None
 -
 -def strip_prefix(prefix, s):
 -    if not (s[0:len(prefix)] == prefix):
 -        return None
 -    return s[len(prefix):]
 -
 -def list_all_buckets(args):
 -    parser = OptionParser()
 -    parser.add_option("-v", "--verbose", action="store_true",
 -        dest="verbose", default=False, help="verbose output")
 -    (opts, args) = parser.parse_args(args)
+ #!/usr/bin/python
+ #
+ # Ceph - scalable distributed file system
+ #
+ # Copyright (C) 2011 New Dream Network
+ #
+ # This is free software; you can redistribute it and/or
+ # modify it under the terms of the GNU Lesser General Public
+ # License version 2.1, as published by the Free Software
+ # Foundation.  See file COPYING.
+ #
+ """
+ boto_tool.py: s3cmd-like tool for operating on s3
+ A lot of common s3 clients can't handle weird names.
+ But this little script can do it!
+ """
+ from boto.s3.connection import OrdinaryCallingFormat
+ from boto.s3.connection import S3Connection
+ from boto.s3.key import Key
+ from optparse import OptionParser
+ from sys import stderr
++import ConfigParser
+ import boto
+ import os
+ import string
+ import sys
 -def mkbucket(args):
 -    if (len(args) < 1):
 -        print "must give an argument to mkbucket"
 -        return 255
 -    bucket_name = args[0]
 -    print "creating bucket '%s' ..." % bucket_name
 -    bucket = conn.create_bucket(bucket_name)
++def list_all_buckets(conn, opts):
+     blrs = conn.get_all_buckets()
+     for b in blrs:
+         if (opts.verbose):
+             print b.__dict__
+         else:
+             print b
 -def rmbucket(args):
 -    if (len(args) < 1):
 -        print "must give an argument to rmbucket"
 -        return 255
 -    bucket = conn.get_bucket(args[0])
 -    print "deleting bucket '%s' ..." % args[0]
++def list_objects(conn, opts):
++    if opts.list_objects == True:
++        prefix = None
++    else:
++        prefix = opts.list_objects
++    bucket = conn.get_bucket(opts.bucket_name)
++    for key in bucket.list(prefix = prefix):
++        print key.name
++
++def mkbucket(conn, opts):
++    print "creating bucket '%s' ..." % opts.bucket_name
++    bucket = conn.create_bucket(opts.bucket_name)
++    print "done."
++    return 0
++
++def rmobjects(conn, opts):
++    bucket = conn.get_bucket(opts.bucket_name)
++    print "deleting all objects from bucket '%s' ..." % opts.bucket_name
++    bucket = conn.get_bucket(opts.bucket_name)
++    for key in bucket.list():
++        print key.name
++        bucket.delete_key(key)
+     print "done."
+     return 0
 -def bucket_exists(args):
 -    if (len(args) < 1):
 -        print "must give an argument to exists"
 -        return 255
 -    bucket = conn.get_bucket(opts.bucket_exists)
++def rmbucket(conn, opts):
++    bucket = conn.get_bucket(opts.bucket_name)
++    print "deleting bucket '%s' ..." % opts.bucket_name
+     bucket.delete()
+     print "done."
+     return 0
 -        print "bucket '%s' does not exist"
++def bucket_exists(conn, opts):
++    bucket = conn.get_bucket(opts.bucket_name)
+     if (bucket == None):
 -        print "found bucket '%s'."
++        print "bucket '%s' does not exist" % opts.bucket_name
+         return 1
+     else:
 -def put_obj(bucket_name, args):
 -    parser = OptionParser()
 -    parser.add_option("-f", "--filename", dest="filename",
 -                        help="file name (default stdin)")
 -    (opts, args) = parser.parse_args(args)
 -    if (len(args) < 1):
 -        print "put requires an argument: the object name"
 -        return 255
 -    obj_name = args[0]
 -    print "uploading to bucket: '%s', object name: '%s'" % (bucket_name, obj_name)
 -    bucket = conn.get_bucket(bucket_name)
++        print "found bucket '%s'." % opts.bucket_name
++        if (opts.verbose):
++            print bucket.__dict__
+     return 0
 -    k.key = obj_name
 -    if (opts.filename == None):
 -        print "sorry, no support for put-from-stdin yet. use -f"
 -        return 255
 -    else:
 -        k.set_contents_from_filename(opts.filename)
++def put_obj(conn, opts):
++    print "uploading to bucket: '%s', object name: '%s'" % \
++        (opts.bucket_name, opts.obj_name)
++    bucket = conn.get_bucket(opts.bucket_name)
+     k = Key(bucket)
 -def put_obj_acl(bucket_name, args):
 -    parser = OptionParser()
 -    parser.add_option("-f", "--filename", dest="filename",
 -                        help="file name (default stdin)")
 -    (opts, args) = parser.parse_args(args)
 -    if (len(args) < 1):
 -        print "put_acl requires an argument: the object name"
 -        return 255
 -    obj_name = args[0]
++    k.key = opts.obj_name
++    k.set_contents_from_filename(opts.put_file)
 -        % (bucket_name, obj_name)
 -    bucket = conn.get_bucket(bucket_name)
++def put_obj_acl(conn, opts):
+     print "uploading object ACL to bucket: '%s', object name: '%s'" \
 -    k.key = obj_name
 -    if (opts.filename == None):
 -        print "sorry, no support for put-from-stdin yet. use -f"
 -        return 255
 -    else:
 -        f = open(opts.filename, "r")
 -        try:
 -            xml = f.read()
 -        finally:
 -            f.close()
 -        k.set_xml_acl(xml)
++        % (opts.bucket_name, opts.obj_name)
++    bucket = conn.get_bucket(opts.bucket_name)
+     k = Key(bucket)
 -def get_obj(bucket_name, args):
 -    parser = OptionParser()
 -    parser.add_option("-f", "--filename", dest="filename",
 -                        help="file name (default stdin)")
 -    (opts, args) = parser.parse_args(args)
 -    if (len(args) < 1):
 -        print "get requires an argument: the object name"
 -        return 255
 -    obj_name = args[0]
 -    print "downloading from bucket: '%s', object name: '%s'" % (bucket_name, obj_name)
 -    bucket = conn.get_bucket(bucket_name)
++    k.key = opts.obj_name
++    f = open(opts.putacl_file, "r")
++    try:
++        xml = f.read()
++    finally:
++        f.close()
++    k.set_xml_acl(xml)
++    return 0
 -    k.key = obj_name
 -    if (opts.filename == None):
++def get_obj(conn, opts):
++    print "downloading from bucket: '%s', object name: '%s'" % \
++        (opts.bucket_name, opts.obj_name)
++    bucket = conn.get_bucket(opts.bucket_name)
+     k = Key(bucket)
 -        k.get_contents_to_filename(opts.filename)
++    k.key = opts.obj_name
++    if (opts.get_file == "-"):
+         k.get_contents_to_file(sys.stdout)
+     else:
 -def get_obj_acl(bucket_name, args):
 -    parser = OptionParser()
 -    parser.add_option("-f", "--filename", dest="filename",
 -                        help="file name (default stdin)")
 -    (opts, args) = parser.parse_args(args)
 -    if (len(args) < 1):
 -        print "get_acl requires an argument: the object name to get the acl for"
 -        return 255
 -    obj_name = args[0]
++        k.get_contents_to_filename(opts.get_file)
++    return 0
 -        (bucket_name, obj_name)
 -    bucket = conn.get_bucket(bucket_name)
++def get_obj_acl(conn, opts):
+     print "downloading object acl from bucket: '%s', object name: '%s'" % \
 -    k.key = obj_name
++        (opts.bucket_name, opts.obj_name)
++    bucket = conn.get_bucket(opts.bucket_name)
+     k = Key(bucket)
 -    if (opts.filename == None):
++    k.key = opts.obj_name
+     xml = k.get_xml_acl()
 -        f = open(opts.filename, "w")
++    if (opts.getacl_file == "-"):
+         print xml
+     else:
 -def list_obj(bucket_name, args):
 -    if (len(args) < 1):
 -        prefix = None
 -    else:
 -        prefix = args[0]
 -    bucket = conn.get_bucket(bucket_name)
 -    for key in bucket.list(prefix = prefix):
 -        print key.name
 -
 -def rm_obj(bucket_name, args):
 -    if (len(args) < 1):
 -        obj_name = None
 -    else:
 -        obj_name = args[0]
 -    print "removing from bucket: '%s', object name: '%s'" % (bucket_name, obj_name)
 -    bucket = conn.get_bucket(bucket_name)
 -    bucket.delete_key(obj_name)
++        f = open(opts.getacl_file, "w")
+         try:
+             f.write(xml)
+         finally:
+             f.close()
 -def head_obj(bucket_name, args):
 -    parser = OptionParser()
 -    parser.add_option("-f", "--filename", dest="filename",
 -                        help="file name (default stdin)")
 -    (opts, args) = parser.parse_args(args)
 -    if (len(args) < 1):
 -        print "get requires an argument: the object name"
 -        return 255
 -    obj_name = args[0]
 -    print "downloading from bucket: '%s', object name: '%s'" % (bucket_name, obj_name)
 -    bucket = conn.get_bucket(bucket_name)
 -    k = bucket.get_key(k, obj_name)
++def rm_obj(conn, opts):
++    print "removing from bucket: '%s', object name: '%s'" \
++        % (opts.bucket_name, opts.obj_name)
++    bucket = conn.get_bucket(opts.bucket_name)
++    bucket.delete_key(opts.obj_name)
+     print "done."
++    return 0
 -def usage():
 -    print """
 -boto_tool.py
 -    ./boto_tool.py -h
 -    ./boto_tool.py --help
 -        Show this help
 -    ./boto_tool.py <host> ls
 -        Lists all buckets in a host
 -    ./boto_tool.py <host> <bucket> ls
 -        Lists all objects in a bucket
 -    ./boto_tool.py <host> <bucket> ls <prefix>
 -        Lists all objects in a bucket that have a given prefix
 -    ./boto_tool.py <host> mkbucket <bucket>
 -        Create a new bucket
 -    ./boto_tool.py <host> rmbucket <bucket>
 -        Remove a bucket
 -    ./boto_tool.py <host> exists <bucket>
 -        Tests if a bucket exists
 -    ./boto_tool.py <host> <bucket> put <object> [opts]
 -        Upload an object
 -        opts:
 -            -f filename            file name (default stdin)
 -    ./boto_tool.py <host> <bucket> get <object> [opts]
 -        Gets an object
 -        opts:
 -            -f filename            file name (default stdout)
 -    ./boto_tool.py <host> <bucket> head <object> [opts]
 -        Gets the headers of an object
 -"""
 -
 -if (len(sys.argv) < 3):
 -    usage()
 -    sys.exit(255)
 -
 -if (sys.argv[1] == "-h") or (sys.argv[1] == "--help"):
 -    usage()
 -    sys.exit(0)
 -
 -host = sys.argv[1]
++def head_obj(conn, opts):
++    print "downloading from bucket: '%s', object name: '%s'" \
++        % (opts.bucket_name, opts.obj_name)
++    bucket = conn.get_bucket(opts.bucket_name)
++    k = bucket.get_key(opts.obj_name)
+     print k
 -                aws_access_key_id=getenv("AKEY"),
 -                aws_secret_access_key=getenv("SKEY"))
 -
 -if (sys.argv[2] == "ls"):
 -    sys.exit(list_all_buckets(sys.argv[3:]))
 -elif (sys.argv[2] == "mkbucket"):
 -    sys.exit(mkbucket(sys.argv[3:]))
 -elif (sys.argv[2] == "rmbucket"):
 -    sys.exit(rmbucket(sys.argv[3:]))
 -elif (sys.argv[2] == "exists"):
 -    sys.exit(bucket_exists(sys.argv[3:]))
++########################## main ##########################
++host = None
++aws_access_key_id = None
++secret_key = None
++
++USAGE = """
++boto_tool.py is a simple S3 client that can be used for testing.
++It uses libboto.
++
++ENVIRONMENT VARIABLES
++S3TEST_CONF:      if this is set, we'll get the host, access key, and secret
++key from this file.
++AKEY              AWS access key
++SKEY              AWS secret access key"""
++
++parser = OptionParser(USAGE)
++parser.add_option("-b", "--bucket-name",
++    dest="bucket_name", help="specify bucket name")
++parser.add_option("-l", "--list-objects", action="store_true", \
++    dest="list_objects", help="list all objects")
++parser.add_option("-L", "--list-objects-with-prefix",
++    dest="list_objects", help="list all objects with the given prefix")
++parser.add_option("--mkbucket", action="store_true",
++    dest="mkbucket", help="create a bucket")
++parser.add_option("--rmbucket", action="store_true",
++    dest="rmbucket", help="remove a bucket")
++parser.add_option("-o", "--object-name", dest="obj_name", help="object name")
++parser.add_option("--put", dest="put_file", help="put FILE")
++parser.add_option("--get", dest="get_file", help="get to FILE")
++parser.add_option("--rm", action="store_true", dest="rm", help="remove an object")
++parser.add_option("--rmobjects", action="store_true", dest="rmobjects",
++    help="remove all objects from a bucket")
++parser.add_option("--rm_rf", action="store_true", dest="rm_rf",
++    help="remove all objects from a bucket and remove the bucket")
++parser.add_option("--head", action="store_true", dest="head",
++    help="use the HEAD operation to find out about an object")
++parser.add_option("--putacl", dest="putacl_file", help="set XML acl from FILE")
++parser.add_option("--getacl", dest="getacl_file", help="dump XML acl into FILE")
++parser.add_option("-v", "--verbose", action="store_true", dest="verbose",
++    help="be verbose")
++(opts, args) = parser.parse_args()
++
++if os.environ.has_key("S3TEST_CONF"):
++    # use S3TEST_CONF
++    path = os.environ["S3TEST_CONF"]
++    cfg = ConfigParser.RawConfigParser()
++    with file(path) as f:
++        cfg.readfp(f)
++        host = cfg.get("DEFAULT", "host")
++        aws_access_key_id = cfg.get("s3 main", "access_key")
++        secret_key = cfg.get("s3 main", "secret_key")
++elif len(args) < 1:
++    print >>stderr, "boto_tool: You must specify an action. Try --help."
++    sys.exit(1)
++else:
++    # use environment variables and command line
++    host = args[0]
++    aws_access_key_id=os.environ["AKEY"]
++    secret_key=os.environ["SKEY"]
+ conn = S3Connection(calling_format=OrdinaryCallingFormat(), is_secure=False,
+                 host = host,
 -    # bucket operations
 -    wb = strip_prefix("bucket:", sys.argv[2])
 -    if (wb):
 -        bucket_name = wb
 -    else:
 -        bucket_name = sys.argv[2]
 -    if (len(sys.argv) < 4):
 -        print "too few arguments. -h for help."
 -        sys.exit(255)
 -    if (sys.argv[3] == "put"):
 -        sys.exit(put_obj(bucket_name, sys.argv[4:]))
 -    if (sys.argv[3] == "putacl"):
 -        sys.exit(put_obj_acl(bucket_name, sys.argv[4:]))
 -    elif (sys.argv[3] == "get"):
 -        sys.exit(get_obj(bucket_name, sys.argv[4:]))
 -    elif (sys.argv[3] == "getacl"):
 -        sys.exit(get_obj_acl(bucket_name, sys.argv[4:]))
 -    elif (sys.argv[3] == "ls"):
 -        sys.exit(list_obj(bucket_name, sys.argv[4:]))
 -    elif (sys.argv[3] == "rm"):
 -        sys.exit(rm_obj(bucket_name, sys.argv[4:]))
 -    elif (sys.argv[3] == "head"):
 -        sys.exit(head_obj(bucket_name, sys.argv[4:]))
 -    else:
 -        print "unknown operation on bucket"
 -        sys.exit(255)
 -
++                aws_access_key_id=aws_access_key_id,
++                aws_secret_access_key=secret_key)
++
++if not opts.bucket_name:
++    sys.exit(list_all_buckets(conn, opts))
++elif opts.list_objects:
++    sys.exit(list_objects(conn, opts))
++elif opts.mkbucket:
++    sys.exit(mkbucket(conn, opts))
++elif opts.rmobjects:
++    sys.exit(rmobjects(conn, opts))
++elif opts.rm_rf:
++    ret =rmobjects(conn, opts)
++    if (ret):
++        sys.exit(ret)
++    ret = rmbucket(conn, opts)
++    sys.exit(ret)
++elif opts.rmbucket:
++    sys.exit(rmbucket(conn, opts))
++elif not opts.obj_name:
++    sys.exit(bucket_exists(conn, opts))
++elif opts.put_file:
++    sys.exit(put_obj(conn, opts))
++elif opts.get_file:
++    sys.exit(get_obj(conn, opts))
++elif opts.rm:
++    sys.exit(rm_obj(conn, opts))
++elif opts.head:
++    sys.exit(head_obj(conn, opts))
++elif opts.putacl_file:
++    sys.exit(put_obj_acl(conn, opts))
++elif opts.getacl_file:
++    sys.exit(get_obj_acl(conn, opts))
++elif opts.head:
++    sys.exit(head_obj(conn, opts))
+ else:
++    print "unknown arguments. Try --help"
++    sys.exit(255)
index 0000000000000000000000000000000000000000,44abf46b5e236fbc13bf811d6e44a41233b45ad0..4d47e2260a806285d11b4343c9e246452bef48f4
mode 000000,100755..100755
--- /dev/null
@@@ -1,0 -1,1013 +1,1113 @@@
 -###### Constants classes #######
+ #!/usr/bin/env python
+ #
+ # Ceph - scalable distributed file system
+ #
+ # Copyright (C) 2011 New Dream Network
+ #
+ # This is free software; you can redistribute it and/or
+ # modify it under the terms of the GNU Lesser General Public
+ # License version 2.1, as published by the Free Software
+ # Foundation.  See file COPYING.
+ #
+ """
+ obsync.py: the object synchronizer
+ """
+ from boto.s3.connection import OrdinaryCallingFormat
+ from boto.s3.connection import S3Connection
+ from boto.s3.key import Key
+ from optparse import OptionParser
+ from sys import stderr
+ from lxml import etree
+ import base64
+ import boto
+ import errno
+ import hashlib
+ import mimetypes
+ import os
+ from StringIO import StringIO
+ import rados
+ import re
+ import shutil
+ import string
+ import sys
+ import tempfile
++import time
+ import traceback
++import xattr
++# Command-line options
+ global opts
++
++# Translation table mapping users in the source to users in the destination.
+ global xuser
 -class LocalFileIsAcl(Exception):
 -    pass
 -
++###### Constants #######
+ RGW_META_BUCKET_NAME = ".rgw"
++ACL_XATTR = "rados.acl"
++META_XATTR_PREFIX = "rados.meta."
++CONTENT_TYPE_XATTR = "rados.content_type"
+ ###### Exception classes #######
 -    if local_name.find(r'$acl') != -1:
 -        raise LocalFileIsAcl()
+ class InvalidLocalName(Exception):
+     pass
+ class NonexistentStore(Exception):
+     pass
++###### Extended Attributes #######
++def test_xattr_support(path):
++    test_file = path + "/$TEST"
++    f = open(test_file, 'w')
++    f.close
++    try:
++        xattr.set(test_file, "test", "123", namespace=xattr.NS_USER)
++        if xattr.get(test_file, "test", namespace=xattr.NS_USER) !=  "123":
++            raise Exception("test_xattr_support: failed to set an xattr and " + \
++                "read it back.")
++    except IOError, e:
++        print >>stderr, "**** ERRROR: You do not appear to have xattr support " + \
++            "at %s ****" % path
++        raise
++    finally:
++        os.unlink(test_file)
++
++def xattr_is_metadata(k):
++    # miscellaneous user-defined metadata
++    if (k[:len(META_XATTR_PREFIX)] == META_XATTR_PREFIX):
++        return True
++    # content-type
++    elif (k == CONTENT_TYPE_XATTR):
++        return True
++    return False
++
+ ###### Helper functions #######
+ def mkdir_p(path):
+     try:
+         os.makedirs(path)
+     except OSError, exc:
+         if exc.errno != errno.EEXIST:
+             raise
+         if (not os.path.isdir(path)):
+             raise
+ def bytes_to_str(b):
+     return ''.join(["%02x"% ord(x) for x in b]).strip()
+ def get_md5(f, block_size=2**20):
+     md5 = hashlib.md5()
+     while True:
+         data = f.read(block_size)
+         if not data:
+             break
+         md5.update(data)
+     return "%s" % md5.hexdigest()
+ def strip_prefix(prefix, s):
+     if not (s[0:len(prefix)] == prefix):
+         return None
+     return s[len(prefix):]
+ def etag_to_md5(etag):
+     if (etag[:1] == '"'):
+         start = 1
+     else:
+         start = 0
+     if (etag[-1:] == '"'):
+         end = -1
+     else:
+         end = None
+     return etag[start:end]
+ def getenv(a, b):
+     if os.environ.has_key(a):
+         return os.environ[a]
+     elif b and os.environ.has_key(b):
+         return os.environ[b]
+     else:
+         return None
+ # Escaping functions.
+ #
+ # Valid names for local files are a little different than valid object
+ # names for S3. So these functions are needed to translate.
+ #
+ # Basically, in local names, every sequence starting with a dollar sign is
+ # reserved as a special escape sequence. If you want to create an S3 object
+ # with a dollar sign in the name, the local file should have a double dollar
+ # sign ($$).
+ #
+ # TODO: translate local files' control characters into escape sequences.
+ # Most S3 clients (boto included) cannot handle control characters in S3 object
+ # names.
+ # TODO: check for invalid utf-8 in local file names. Ideally, escape it, but
+ # if not, just reject the local file name. S3 object names must be valid
+ # utf-8.
+ #
+ # ----------          -----------
+ # In S3                               Locally
+ # ----------          -----------
+ # foo/                                foo$slash
+ #
+ # $money                      $$money
+ #
+ # obj-with-acl                obj-with-acl
+ #                                     .obj-with-acl$acl
+ def s3_name_to_local_name(s3_name):
+     s3_name = re.sub(r'\$', "$$", s3_name)
+     if (s3_name[-1:] == "/"):
+         s3_name = s3_name[:-1] + "$slash"
+     return s3_name
+ def local_name_to_s3_name(local_name):
 -def get_local_acl_file_name(local_name):
 -    if local_name.find(r'$acl') != -1:
 -        raise LocalFileIsAcl()
 -    return os.path.dirname(local_name) + "/." + \
 -        os.path.basename(local_name) + "$acl"
 -
+     local_name = re.sub(r'\$slash', "/", local_name)
+     mre = re.compile("[$][^$]")
+     if mre.match(local_name):
+         raise InvalidLocalName("Local name contains a dollar sign escape \
+ sequence we don't understand.")
+     local_name = re.sub(r'\$\$', "$", local_name)
+     return local_name
 -            self.display_name = ""
+ ###### ACLs #######
+ # for buckets: allow list
+ # for object: allow grantee to read object data and metadata
+ READ = 1
+ # for buckets: allow create, overwrite, or deletion of any object in the bucket
+ WRITE = 2
+ # for buckets: allow grantee to read the bucket ACL
+ # for objects: allow grantee to read the object ACL
+ READ_ACP = 4
+ # for buckets: allow grantee to write the bucket ACL
+ # for objects: allow grantee to write the object ACL
+ WRITE_ACP = 8
+ # all of the above
+ FULL_CONTROL = READ | WRITE | READ_ACP | WRITE_ACP
+ ACL_TYPE_CANON_USER = "canon:"
+ ACL_TYPE_EMAIL_USER = "email:"
+ ACL_TYPE_GROUP = "group:"
+ ALL_ACL_TYPES = [ ACL_TYPE_CANON_USER, ACL_TYPE_EMAIL_USER, ACL_TYPE_GROUP ]
+ S3_GROUP_AUTH_USERS = ACL_TYPE_GROUP +  "AuthenticatedUsers"
+ S3_GROUP_ALL_USERS = ACL_TYPE_GROUP +  "AllUsers"
+ S3_GROUP_LOG_DELIVERY = ACL_TYPE_GROUP +  "LogDelivery"
+ NS = "http://s3.amazonaws.com/doc/2006-03-01/"
+ NS2 = "http://www.w3.org/2001/XMLSchema-instance"
+ def get_user_type(utype):
+     for ut in [ ACL_TYPE_CANON_USER, ACL_TYPE_EMAIL_USER, ACL_TYPE_GROUP ]:
+         if utype[:len(ut)] == ut:
+             return ut
+     raise Exception("unknown user type for user %s" % utype)
+ def strip_user_type(utype):
+     for ut in [ ACL_TYPE_CANON_USER, ACL_TYPE_EMAIL_USER, ACL_TYPE_GROUP ]:
+         if utype[:len(ut)] == ut:
+             return utype[len(ut):]
+     raise Exception("unknown user type for user %s" % utype)
+ def grantee_attribute_to_user_type(utype):
+     if (utype == "Canonical User"):
+         return ACL_TYPE_CANON_USER
+     elif (utype == "CanonicalUser"):
+         return ACL_TYPE_CANON_USER
+     elif (utype == "Group"):
+         return ACL_TYPE_GROUP
+     elif (utype == "Email User"):
+         return ACL_TYPE_EMAIL_USER
+     elif (utype == "EmailUser"):
+         return ACL_TYPE_EMAIL_USER
+     else:
+         raise Exception("unknown user type for user %s" % utype)
+ def user_type_to_attr(t):
+     if (t == ACL_TYPE_CANON_USER):
+         return "CanonicalUser"
+     elif (t == ACL_TYPE_GROUP):
+         return "Group"
+     elif (t ==  ACL_TYPE_EMAIL_USER):
+         return "EmailUser"
+     else:
+         raise Exception("unknown user type %s" % t)
+ def add_user_type(user):
+     """ All users that are not specifically marked as something else
+ are treated as canonical users"""
+     for atype in ALL_ACL_TYPES:
+         if (user[:len(atype)] == atype):
+             return user
+     return ACL_TYPE_CANON_USER + user
+ class AclGrant(object):
+     def __init__(self, user_id, display_name, permission):
+         self.user_id = user_id
+         self.display_name = display_name
+         self.permission = permission
+     def translate_users(self, xusers):
+         # Keep in mind that xusers contains user_ids of the form "type:value"
+         # So typical contents might be like { canon:XYZ => canon.123 }
+         if (xusers.has_key(self.user_id)):
+             self.user_id = xusers[self.user_id]
+             # It's not clear what the new pretty-name should be, so just leave it blank.
 -            display_name = grantee.find("{%s}DisplayName" % NS).text
++            self.display_name = None
+     def equals(self, rhs):
+         if (self.user_id != rhs.user_id):
+             return False
+         if (self.permission != rhs.permission):
+             return False
+         # ignore display_name
+         return True
+ class AclPolicy(object):
+     def __init__(self, owner_id, owner_display_name, grants):
+         self.owner_id = owner_id
+         self.owner_display_name = owner_display_name
+         self.grants = grants  # dict of { string -> ACLGrant }
+     @staticmethod
+     def from_xml(s):
+         root = etree.parse(StringIO(s))
+         owner_id_node = root.find("{%s}Owner/{%s}ID" % (NS,NS))
+         owner_id = owner_id_node.text
+         owner_display_name_node = root.find("{%s}Owner/{%s}DisplayName" \
+                                         % (NS,NS))
+         if (owner_display_name_node != None):
+             owner_display_name = owner_display_name_node.text
+         else:
+             owner_display_name = None
+         grantlist = root.findall("{%s}AccessControlList/{%s}Grant" \
+             % (NS,NS))
+         grants = { }
+         for g in grantlist:
+             grantee = g.find("{%s}Grantee" % NS)
+             user_id = grantee.find("{%s}ID" % NS).text
+             if (grantee.attrib.has_key("type")):
+                 user_type = grantee.attrib["type"]
+             else:
+                 user_type = grantee.attrib["{%s}type" % NS2]
 -            display_name_elem = etree.SubElement(grantee_elem, "{%s}DisplayName" % NS)
 -            display_name_elem.text = g.display_name
++            display_name_node = grantee.find("{%s}DisplayName" % NS)
++            if (display_name_node != None):
++                display_name = grantee.find("{%s}DisplayName" % NS).text
++            else:
++                display_name = None
+             permission = g.find("{%s}Permission" % NS).text
+             grant_user_id = grantee_attribute_to_user_type(user_type) + user_id
+             grants[grant_user_id] = AclGrant(grant_user_id, display_name, permission)
+         return AclPolicy(owner_id, owner_display_name, grants)
+     def to_xml(self):
+         root = etree.Element("AccessControlPolicy", nsmap={None: NS})
+         owner = etree.SubElement(root, "Owner")
+         id_elem = etree.SubElement(owner, "ID")
+         id_elem.text = self.owner_id
+         if (self.owner_display_name and self.owner_display_name != ""):
+             display_name_elem = etree.SubElement(owner, "DisplayName")
+             display_name_elem.text = self.owner_display_name
+         access_control_list = etree.SubElement(root, "AccessControlList")
+         for k,g in self.grants.items():
+             grant_elem = etree.SubElement(access_control_list, "Grant")
+             grantee_elem = etree.SubElement(grant_elem, "{%s}Grantee" % NS,
+                 nsmap={None: NS, "xsi" : NS2})
+             grantee_elem.set("{%s}type" % NS2, user_type_to_attr(get_user_type(g.user_id)))
+             user_id_elem = etree.SubElement(grantee_elem, "{%s}ID" % NS)
+             user_id_elem.text = strip_user_type(g.user_id)
 -"xsi:type=\"CanonicalUser\"><ID>*** Owner-Canonical-User-ID ***</ID>" + \
++            if (g.display_name != None):
++                display_name_elem = etree.SubElement(grantee_elem, "{%s}DisplayName" % NS)
++                display_name_elem.text = g.display_name
+             permission_elem = etree.SubElement(grant_elem, "{%s}Permission" % NS)
+             permission_elem.text = g.permission
+         return etree.tostring(root, encoding="UTF-8")
+     def translate_users(self, xusers):
+         # Owner ids are always expressed in terms of canonical user id
+         if (xusers.has_key(ACL_TYPE_CANON_USER + self.owner_id)):
+             self.owner_id = \
+                 strip_user_type(xusers[ACL_TYPE_CANON_USER + self.owner_id])
+             self.owner_display_name = ""
+         for k,g in self.grants.items():
+             g.translate_users(xusers)
+     def set_owner(self, owner_id):
+         self.owner_id = owner_id
+         self.owner_display_name = ""
+     def equals(self, rhs):
+         if (self.owner_id != rhs.owner_id):
+             return False
+         for k,g in self.grants.items():
+             if (not rhs.grants.has_key(k)):
+                 return False
+             if (not g.equals(rhs.grants[k])):
+                 return False
+         for l,r in rhs.grants.items():
+             if (not self.grants.has_key(l)):
+                 return False
+             if (not r.equals(self.grants[l])):
+                 return False
+         return True
+ def compare_xml(xml1, xml2):
+     tree1 = etree.parse(StringIO(xml1))
+     out1 = etree.tostring(tree1, encoding="UTF-8")
+     tree2 = etree.parse(StringIO(xml2))
+     out2 = etree.tostring(tree2, encoding="UTF-8")
+     out1 = out1.replace("xsi:type", "type")
+     out2 = out2.replace("xsi:type", "type")
+     if out1 != out2:
+         print "out1 = %s" % out1
+         print "out2 = %s" % out2
+         raise Exception("compare xml failed")
+ #<?xml version="1.0" encoding="UTF-8"?>
+ def test_acl_policy():
+     test1_xml = \
+ "<AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">" + \
+ "<Owner><ID>foo</ID><DisplayName>MrFoo</DisplayName></Owner><AccessControlList>" + \
+ "<Grant><Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" " + \
 -    def __init__(self, name, md5, size):
++"xsi:type=\"CanonicalUser\"><ID>bar</ID>" + \
+ "<DisplayName>display-name</DisplayName></Grantee>" + \
+ "<Permission>FULL_CONTROL</Permission></Grant></AccessControlList></AccessControlPolicy>"
+     test1 = AclPolicy.from_xml(test1_xml)
+     compare_xml(test1_xml, test1.to_xml())
+ ###### Object #######
+ class Object(object):
 -        return Object(obj_name, md5, size)
++    def __init__(self, name, md5, size, meta):
+         self.name = name
+         self.md5 = md5
+         self.size = int(size)
++        self.meta = meta
+     def equals(self, rhs):
+         if (self.name != rhs.name):
+             return False
+         if (self.md5 != rhs.md5):
+             return False
+         if (self.size != rhs.size):
+             return False
++        for k,v in self.meta.items():
++            if (not rhs.meta.has_key(k)):
++                return False
++            if (rhs.meta[k] != v):
++                return False
++        for k,v in rhs.meta.items():
++            if (not self.meta.has_key(k)):
++                return False
++            if (self.meta[k] != v):
++                return False
+         return True
+     def local_name(self):
+         return s3_name_to_local_name(self.name)
++    def local_path(self, base):
++        return base + "/" + s3_name_to_local_name(self.name)
+     @staticmethod
+     def from_file(obj_name, path):
+         f = open(path, 'r')
+         try:
+             md5 = get_md5(f)
+         finally:
+             f.close()
+         size = os.path.getsize(path)
++        meta = {}
++        try:
++            xlist = xattr.get_all(path, namespace=xattr.NS_USER)
++        except IOError, e:
++            if e.errno == 2:
++                return meta
++            else:
++                raise
++        for k,v in xlist:
++            if xattr_is_metadata(k):
++                meta[k] = v
+         #print "Object.from_file: path="+path+",md5=" + bytes_to_str(md5) +",size=" + str(size)
 -    def from_file(obj_name, file_name):
 -        f = open(file_name, "r")
 -        try:
 -            xml = f.read()
 -        finally:
 -            f.close()
 -        return LocalAcl.from_xml(obj_name, xml)
 -    @staticmethod
++        return Object(obj_name, md5, size, meta)
+ ###### Store #######
+ class Store(object):
+     @staticmethod
+     def make_store(url, create, akey, skey):
+         s3_url = strip_prefix("s3://", url)
+         if (s3_url):
+             return S3Store(s3_url, create, akey, skey)
+         rados_url = strip_prefix("rados:", url)
+         if (rados_url):
+             return RadosStore(rados_url, create, akey, skey)
+         file_url = strip_prefix("file://", url)
+         if (file_url):
+             return FileStore(file_url, create)
+         if (url[0:1] == "/"):
+             return FileStore(url, create)
+         if (url[0:2] == "./"):
+             return FileStore(url, create)
+         raise Exception("Failed to find a prefix of s3://, file://, /, or ./ \
+ Cannot handle this URL.")
+     def __init__(self, url):
+         self.url = url
+ ###### LocalCopy ######
+ class LocalCopy(object):
+     def __init__(self, obj_name, path, path_is_temp):
+         self.obj_name = obj_name
+         self.path = path
+         self.path_is_temp = path_is_temp
+     def remove(self):
+         if ((self.path_is_temp == True) and (self.path != None)):
+             os.unlink(self.path)
+         self.path = None
+         self.path_is_temp = False
+     def __del__(self):
+         self.remove()
+ class LocalAcl(object):
+     @staticmethod
 -    def write_to_file(self, file_name):
 -        """ Write this ACL to a file """
+     def from_xml(obj_name, xml):
+         acl_policy = AclPolicy.from_xml(xml)
+         return LocalAcl(obj_name, acl_policy)
+     @staticmethod
+     def get_empty(obj_name):
+         return LocalAcl(obj_name, None)
+     def __init__(self, obj_name, acl_policy):
+         self.obj_name = obj_name
+         self.acl_policy = acl_policy
+     def equals(self, rhs):
+         """ Compare two LocalAcls """
+         if (self.acl_policy == None):
+             return (rhs.acl_policy == None)
+         if (rhs.acl_policy == None):
+             return (self.acl_policy == None)
+         return self.acl_policy.equals(rhs.acl_policy)
+     def translate_users(self, xusers):
+         """ Translate the users in this ACL """
+         if (self.acl_policy == None):
+             return
+         self.acl_policy.translate_users(xusers)
+     def set_owner(self, owner_id):
+         if (self.acl_policy == None):
+             return
+         self.acl_policy.set_owner(owner_id)
 -        f = open(file_name, 'w')
 -        try:
 -            f.write(xml)
 -        finally:
 -            f.close()
++    def write_to_xattr(self, file_name):
++        """ Write this ACL to an extended attribute """
+         if (self.acl_policy == None):
+             return
+         xml = self.acl_policy.to_xml()
 -    def __init__(self, blrs):
++        xattr.set(file_name, ACL_XATTR, xml, namespace=xattr.NS_USER)
+ ###### S3 store #######
++def s3_key_to_meta(k):
++    meta = {}
++    if (k.__dict__.has_key("content_type")):
++        meta[CONTENT_TYPE_XATTR] = k.content_type
++    for k,v in k.metadata.items():
++        meta[META_XATTR_PREFIX + k] = v
++    return meta
++
++def meta_to_s3_key(key, meta):
++    for k,v in meta.items():
++        if (k == CONTENT_TYPE_XATTR):
++            key.set_metadata("Content-Type", v)
++        elif (k[:len(META_XATTR_PREFIX)] == META_XATTR_PREFIX):
++            k_name = k[len(META_XATTR_PREFIX):]
++            key.set_metadata(k_name, v)
++        else:
++            raise Exception("can't understand meta entry: %s" % k)
++
+ class S3StoreIterator(object):
+     """S3Store iterator"""
 -        ret = Object(key.name, etag_to_md5(key.etag), key.size)
++    def __init__(self, bucket, blrs):
++        self.bucket = bucket
+         self.blrs = blrs
+     def __iter__(self):
+         return self
+     def next(self):
+         # This will raise StopIteration when there are no more objects to
+         # iterate on
+         key = self.blrs.next()
 -        return S3StoreIterator(blrs.__iter__())
++        # Issue a HEAD request to get content-type and other metadata
++        k = self.bucket.get_key(key.name)
++        ret = Object(key.name, etag_to_md5(key.etag), key.size, s3_key_to_meta(k))
+         return ret
+ class S3Store(Store):
+     def __init__(self, url, create, akey, skey):
+         # Parse the s3 url
+         host_end = string.find(url, "/")
+         if (host_end == -1):
+             raise Exception("S3Store URLs are of the form \
+ s3://host/bucket/key_prefix. Failed to find the host.")
+         self.host = url[0:host_end]
+         bucket_end = url.find("/", host_end+1)
+         if (bucket_end == -1):
+             self.bucket_name = url[host_end+1:]
+             self.key_prefix = ""
+         else:
+             self.bucket_name = url[host_end+1:bucket_end]
+             self.key_prefix = url[bucket_end+1:]
+         if (self.bucket_name == ""):
+             raise Exception("S3Store URLs are of the form \
+ s3://host/bucket/key_prefix. Failed to find the bucket.")
+         if (opts.more_verbose):
+             print "self.host = '" + self.host + "', ",
+             print "self.bucket_name = '" + self.bucket_name + "' ",
+             print "self.key_prefix = '" + self.key_prefix + "'"
+         self.conn = S3Connection(calling_format=OrdinaryCallingFormat(),
+                 host=self.host, is_secure=False,
+                 aws_access_key_id=akey, aws_secret_access_key=skey)
+         self.bucket = self.conn.lookup(self.bucket_name)
+         if (self.bucket == None):
+             if (create):
+                 if (opts.dry_run):
+                     raise Exception("logic error: this should be unreachable.")
+                 self.bucket = self.conn.create_bucket(bucket_name = self.bucket_name)
+             else:
+                 raise RuntimeError("%s: no such bucket as %s" % \
+                     (url, self.bucket_name))
+         Store.__init__(self, "s3://" + url)
+     def __str__(self):
+         return "s3://" + self.host + "/" + self.bucket_name + "/" + self.key_prefix
+     def get_acl(self, obj):
+         acl_xml = self.bucket.get_xml_acl(obj.name)
+         return LocalAcl.from_xml(obj.name, acl_xml)
+     def make_local_copy(self, obj):
+         k = Key(self.bucket)
+         k.key = obj.name
+         temp_file = tempfile.NamedTemporaryFile(mode='w+b', delete=False).name
+         try:
+             k.get_contents_to_filename(temp_file)
+         except:
+             os.unlink(temp_file)
+             raise
+         return LocalCopy(obj.name, temp_file, True)
+     def all_objects(self):
+         blrs = self.bucket.list(prefix = self.key_prefix)
 -        return Object(obj.name, etag_to_md5(k.etag), k.size)
++        return S3StoreIterator(self.bucket, blrs.__iter__())
+     def locate_object(self, obj):
+         k = self.bucket.get_key(obj.name)
+         if (k == None):
+             return None
 -#        mime = mimetypes.guess_type(local_copy.path)[0]
 -#        if (mime == NoneType):
 -#            mime = "application/octet-stream"
++        return Object(obj.name, etag_to_md5(k.etag), k.size, s3_key_to_meta(k))
+     def upload(self, local_copy, src_acl, obj):
+         if (opts.more_verbose):
+             print "S3Store.UPLOAD: local_copy.path='" + local_copy.path + "' " + \
+                 "obj='" + obj.name + "'"
+         if (opts.dry_run):
+             return
 -        #k.set_metadata("Content-Type", mime)
+         k = Key(self.bucket)
+         k.key = obj.name
 -                xml = src_acl.acl_policy.to_xml()
 -                self.bucket.set_xml_acl(xml, k)
 -            except Exception, e:
++        meta_to_s3_key(k, obj.meta)
+         k.set_contents_from_filename(local_copy.path)
+         if (src_acl.acl_policy != None):
++            xml = src_acl.acl_policy.to_xml()
+             try:
 -        acl_name = get_local_acl_file_name(obj.local_name())
 -        acl_path = self.base + "/" + acl_name
 -        if (os.path.exists(acl_path)):
 -            return LocalAcl.from_file(obj.name, acl_path)
 -        else:
 -            return LocalAcl.get_empty(obj.name)
++                def fn():
++                    self.bucket.set_xml_acl(xml, k)
++                do_with_s3_retries(fn)
++            except boto.exception.S3ResponseError, e:
+                 print >>stderr, "ERROR SETTING ACL on object '" + sobj.name + "'"
+                 print >>stderr
+                 print >>stderr, "************* ACL: *************"
+                 print >>stderr, str(xml)
+                 print >>stderr, "********************************"
+                 raise
+     def remove(self, obj):
+         if (opts.dry_run):
+             return
+         self.bucket.delete_key(obj.name)
+         if (opts.more_verbose):
+             print "S3Store: removed %s" % obj.name
++# Some S3 servers offer "eventual consistency."
++# What this means is that after a change has been made, like the creation of an
++# object, it takes some time for this change to become visible to everyone.
++# This potentially includes the client making the change.
++#
++# This means we need to implement a retry mechanism for certain operations.
++# For example, setting the ACL on a newly created object may fail with an
++# "object not found" error if the object creation hasn't yet become visible to
++# us.
++def do_with_s3_retries(fn):
++    if (os.environ.has_key("DST_CONSISTENCY") and
++            os.environ["DST_CONSISTENCY"] == "eventual"):
++        sleep_times = [5, 10, 60, -1]
++    else:
++        sleep_times = [-1]
++    for stime in sleep_times:
++        try:
++            fn()
++            return
++        except boto.exception.S3ResponseError, e:
++            if (stime == -1):
++                raise
++            if (opts.verbose):
++                print "encountered s3 response error: ",
++            if (opts.more_verbose):
++                print str(e) + ": ",
++            if (opts.verbose):
++                print "retrying operation after " + str(stime) + \
++                    " second delay"
++            time.sleep(stime)
++
+ ###### FileStore #######
+ class FileStoreIterator(object):
+     """FileStore iterator"""
+     def __init__(self, base):
+         self.base = base
+         if (opts.follow_symlinks):
+             self.generator = os.walk(base, followlinks=True)
+         else:
+             self.generator = os.walk(base)
+         self.path = ""
+         self.files = []
+     def __iter__(self):
+         return self
+     def next(self):
+         while True:
+             if (len(self.files) == 0):
+                 self.path, dirs, self.files = self.generator.next()
+                 continue
+             path = self.path + "/" + self.files[0]
+             self.files = self.files[1:]
+             # Ignore non-files when iterating.
+             if (not os.path.isfile(path)):
+                 continue
+             try:
+                 obj_name = local_name_to_s3_name(path[len(self.base)+1:])
+             except LocalFileIsAcl, e:
+                 # ignore ACL side files when iterating
+                 continue
+             return Object.from_file(obj_name, path)
+ class FileStore(Store):
+     def __init__(self, url, create):
+         # Parse the file url
+         self.base = url
+         if (self.base[-1:] == '/'):
+             self.base = self.base[:-1]
+         if (create):
+             if (opts.dry_run):
+                 raise Exception("logic error: this should be unreachable.")
+             mkdir_p(self.base)
+         elif (not os.path.isdir(self.base)):
+             raise NonexistentStore()
+         Store.__init__(self, "file://" + url)
++        test_xattr_support(self.base)
+     def __str__(self):
+         return "file://" + self.base
+     def get_acl(self, obj):
 -        local_name = obj.local_name()
 -        return LocalCopy(obj.name, self.base + "/" + local_name, False)
++        try:
++            xml = xattr.get(obj.local_path(self.base), ACL_XATTR,
++                            namespace=xattr.NS_USER)
++        except IOError, e:
++            #print "failed to get XML ACL from %s" % obj.local_name()
++            if e.errno == 61:
++                return LocalAcl.get_empty(obj.name)
++            raise
++        return LocalAcl.from_xml(obj.name, xml)
+     def make_local_copy(self, obj):
 -        path = self.base + "/" + obj.local_name()
++        return LocalCopy(obj.name, obj.local_path(self.base), False)
+     def all_objects(self):
+         return FileStoreIterator(self.base)
+     def locate_object(self, obj):
 -        #print "s='" + s +"', d='" + d + "'"
++        path = obj.local_path(self.base)
+         found = os.path.isfile(path)
+         if (opts.more_verbose):
+             if (found):
+                 print "FileStore::locate_object: found object '" + \
+                     obj.name + "'"
+             else:
+                 print "FileStore::locate_object: did not find object '" + \
+                     obj.name + "'"
+         if (not found):
+             return None
+         return Object.from_file(obj.name, path)
+     def upload(self, local_copy, src_acl, obj):
+         if (opts.more_verbose):
+             print "FileStore.UPLOAD: local_copy.path='" + local_copy.path + "' " + \
+                 "obj='" + obj.name + "'"
+         if (opts.dry_run):
+             return
+         s = local_copy.path
+         lname = obj.local_name()
+         d = self.base + "/" + lname
 -        if (src_acl.acl_policy != None):
 -            src_acl.write_to_file(self.base + "/" + get_local_acl_file_name(lname))
+         mkdir_p(os.path.dirname(d))
+         shutil.copy(s, d)
 -        return Object(key, md5, size)
++        src_acl.write_to_xattr(d)
++        # Store metadata in extended attributes
++        for k,v in obj.meta.items():
++            xattr.set(d, k, v, namespace=xattr.NS_USER)
+     def remove(self, obj):
+         if (opts.dry_run):
+             return
+         os.unlink(self.base + "/" + obj.name)
+         if (opts.more_verbose):
+             print "FileStore: removed %s" % obj.name
+ ###### Rados store #######
+ class RadosStoreIterator(object):
+     """RadosStore iterator"""
+     def __init__(self, it, rados_store):
+         self.it = it # has type rados.ObjectIterator
+         self.rados_store = rados_store
+         self.prefix = self.rados_store.prefix
+         self.prefix_len = len(self.rados_store.prefix)
+     def __iter__(self):
+         return self
+     def next(self):
+         rados_obj = None
+         while True:
+             # This will raise StopIteration when there are no more objects to
+             # iterate on
+             rados_obj = self.it.next()
+             # do the prefixes match?
+             if rados_obj.key[:self.prefix_len] == self.prefix:
+                 break
+         ret = self.rados_store.obsync_obj_from_rgw(rados_obj.key)
+         if (ret == None):
+             raise Exception("internal iterator error")
+         return ret
+ class RadosStore(Store):
+     def __init__(self, url, create, akey, skey):
+         # Parse the rados url
+         conf_end = string.find(url, ":")
+         if (conf_end == -1):
+             raise Exception("RadosStore URLs are of the form \
+ rados:path/to/ceph/conf:bucket:key_prefix. Failed to find the path to the conf.")
+         self.conf_file_path = url[0:conf_end]
+         bucket_end = url.find(":", conf_end+1)
+         if (bucket_end == -1):
+             self.rgw_bucket_name = url[conf_end+1:]
+             self.key_prefix = ""
+         else:
+             self.rgw_bucket_name = url[conf_end+1:bucket_end]
+             self.key_prefix = url[bucket_end+1:]
+         if (self.rgw_bucket_name == ""):
+             raise Exception("RadosStore URLs are of the form \
+ rados:/path/to/ceph/conf:pool:key_prefix. Failed to find the bucket.")
+         if (opts.more_verbose):
+             print "self.conf_file_path = '" + self.conf_file_path + "', ",
+             print "self.rgw_bucket_name = '" + self.rgw_bucket_name + "' ",
+             print "self.key_prefix = '" + self.key_prefix + "'"
+         acl_hack = getenv("ACL_HACK", None)
+         if (acl_hack == None):
+             raise Exception("RadosStore error: You must specify an environment " +
+                 "variable called ACL_HACK containing the name of a file. This " +
+                 "file contains a serialized RGW ACL that you want " +
+                 "to insert into the user.rgw.acl extended attribute of all " +
+                 "the objects you create. This is a hack and yes, it will go " +
+                 "away soon.")
+         acl_hack_f = open(acl_hack, "r")
+         try:
+             self.acl_hack = acl_hack_f.read()
+         finally:
+             acl_hack_f.close()
+         self.rados = rados.Rados()
+         self.rados.conf_read_file(self.conf_file_path)
+         self.rados.connect()
+         if (not self.rados.pool_exists(self.rgw_bucket_name)):
+             if (create):
+                 self.create_rgw_bucket(self.rgw_bucket_name)
+             else:
+                 raise NonexistentStore()
+         self.ioctx = self.rados.open_ioctx(self.rgw_bucket_name)
+         Store.__init__(self, "rados:" + url)
+     def create_rgw_bucket(self, rgw_bucket_name):
+         """ Create an rgw bucket named 'rgw_bucket_name' """
+         self.rados.create_pool(self.rgw_bucket_name)
+         meta_ctx = None
+         try:
+             meta_ctx = self.rados.open_ioctx(RGW_META_BUCKET_NAME)
+             meta_ctx.write(rgw_bucket_name, "", 0)
+             print "meta_ctx.set_xattr(rgw_bucket_name=" + rgw_bucket_name + ", " + \
+                     "user.rgw.acl, self.acl_hack=" + self.acl_hack + ")"
+             meta_ctx.set_xattr(rgw_bucket_name, "user.rgw.acl", self.acl_hack)
+         finally:
+             if (meta_ctx):
+                meta_ctx.close()
+     def obsync_obj_from_rgw(self, key):
+         """Create an obsync object from a Rados object"""
+         try:
+             size, tm = self.ioctx.stat(key)
+         except rados.ObjectNotFound:
+             return None
+         md5 = self.ioctx.get_xattr(key, "user.rgw.etag")
 -            print "F " + sobj.name
++        # TODO: support meta
++        return Object(key, md5, size, {})
+     def __str__(self):
+         return "rados:" + self.conf_file_path + ":" + self.rgw_bucket_name + ":" + self.key_prefix
+     def get_acl(self, obj):
+         acl = LocalAcl(obj.name)
+         # todo: set XML ACL
+         return acl
+     def make_local_copy(self, obj):
+         temp_file = None
+         temp_file_f = None
+         try:
+             # read the object from rados in chunks
+             temp_file = tempfile.NamedTemporaryFile(mode='w+b', delete=False)
+             temp_file_f = open(temp_file.name, 'w')
+             while True:
+                 buf = self.ioctx.read(obj.name, off, 8192)
+                 if (len(buf) == 0):
+                     break
+                 temp_file_f.write(buf)
+                 if (len(buf) < 8192):
+                     break
+                 off += 8192
+             temp_file_f.close()
+             # TODO: implement ACLs
+         except:
+             if (temp_file_f):
+                 temp_file_f.close()
+             if (temp_file):
+                 os.unlink(temp_file.name)
+             raise
+         return LocalCopy(obj.name, temp_file.name, True)
+     def all_objects(self):
+         it = self.bucket.list_objects()
+         return RadosStoreIterator(it, self.key_prefix)
+     def locate_object(self, obj):
+         return self.obsync_obj_from_rgw(obj.name)
+     def upload(self, local_copy, src_acl, obj):
+         if (opts.more_verbose):
+             print "RadosStore.UPLOAD: local_copy.path='" + local_copy.path + "' " + \
+                 "obj='" + obj.name + "'"
+         if (opts.dry_run):
+             return
+         local_copy_f = open(local_copy.path, 'r')
+         off = 0
+         while True:
+             buf = local_copy_f.read(8192)
+             if ((len(buf) == 0) and (off != 0)):
+                 break
+             self.ioctx.write(obj.name, buf, off)
+             if (len(buf) < 8192):
+                 break
+             off += 8192
++        # TODO: examine obj.meta
+         self.ioctx.set_xattr(obj.name, "user.rgw.etag", obj.md5)
+         self.ioctx.set_xattr(obj.name, "user.rgw.acl", self.acl_hack)
+         self.ioctx.set_xattr(obj.name, "user.rgw.content_type",
+                             "application/octet-stream")
+     def remove(self, obj):
+         if (opts.dry_run):
+             return
+         self.ioctx.remove_object(obj.name)
+         if (opts.more_verbose):
+             print "RadosStore: removed %s" % obj.name
+ ###### Functions #######
+ def delete_unreferenced(src, dst):
+     """ delete everything from dst that is not referenced in src """
+     if (opts.more_verbose):
+         print "handling deletes."
+     for dobj in dst.all_objects():
+         sobj = src.locate_object(dobj)
+         if (sobj == None):
+             dst.remove(dobj)
+ def xuser_cb(opt, opt_str, value, parser):
+     """ handle an --xuser argument """
+     equals = value.find(r'=')
+     if equals == -1:
+         print >>stderr, "Error parsing --xuser: You must give both a source \
+ and destination user name, like so:\n\
+ --xuser SOURCE_USER=DEST_USER\n\
+ \n\
+ This will translate the user SOURCE_USER in the source to the user DEST_USER \n\
+ in the destination."
+         sys.exit(1)
+     src_user = value[:equals]
+     dst_user = value[equals+1:]
+     if ((len(src_user) == 0) or (len(dst_user) == 0)):
+         print >>stderr, "Error parsing --xuser: can't have a zero-length \
+ user name."
+         sys.exit(1)
+     src_user = add_user_type(src_user)
+     dst_user = add_user_type(dst_user)
+     if (xuser.has_key(src_user)):
+         print >>stderr, "Error parsing --xuser: we are already translating \
+ \"%s\" to \"%s\"; we cannot translate it to \"%s\"" % \
+ (src_user, xuser[src_user], dst_user)
+         sys.exit(1)
+     xuser[src_user] = dst_user
+ USAGE = """
+ obsync synchronizes S3, Rados, and local objects. The source and destination
+ can both be local or both remote.
+ Examples:
+ # copy contents of mybucket to disk
+ obsync -v s3://myhost/mybucket file://mydir
+ # copy contents of mydir to an S3 bucket
+ obsync -v file://mydir s3://myhost/mybucket
+ # synchronize two S3 buckets
+ SRC_AKEY=... SRC_SKEY=... \
+ DST_AKEY=... DST_SKEY=... \
+ obsync -v s3://myhost/mybucket1 s3://myhost2/mybucket2
+    --xuser bob=robert --xuser joe=joseph -O bob
+ Note: You must specify an AWS access key and secret access key when accessing
+ S3. obsync honors these environment variables:
+ SRC_AKEY          Access key for the source URL
+ SRC_SKEY          Secret access key for the source URL
+ DST_AKEY          Access key for the destination URL
+ DST_SKEY          Secret access key for the destination URL
+ AKEY              Access key for both source and dest
+ SKEY              Secret access key for both source and dest
+ If these environment variables are not given, we will fall back on libboto
+ defaults.
+ obsync (options) [source] [destination]"""
+ parser = OptionParser(USAGE)
+ parser.add_option("-n", "--dry-run", action="store_true", \
+     dest="dry_run", default=False)
+ parser.add_option("-c", "--create-dest", action="store_true", \
+     dest="create", help="create the destination if it doesn't already exist")
+ parser.add_option("--delete-before", action="store_true", \
+     dest="delete_before", help="delete objects that aren't in SOURCE from \
+ DESTINATION before transferring any objects")
+ parser.add_option("-d", "--delete-after", action="store_true", \
+     dest="delete_after", help="delete objects that aren't in SOURCE from \
+ DESTINATION after doing all transfers.")
+ parser.add_option("-L", "--follow-symlinks", action="store_true", \
+     dest="follow_symlinks", help="follow symlinks (please avoid symlink " + \
+     "loops when using this option!)")
+ parser.add_option("--no-preserve-acls", action="store_true", \
+     dest="no_preserve_acls", help="don't preserve ACLs when copying objects.")
+ parser.add_option("-v", "--verbose", action="store_true", \
+     dest="verbose", help="be verbose")
+ parser.add_option("-V", "--more-verbose", action="store_true", \
+     dest="more_verbose", help="be really, really verbose (developer mode)")
+ parser.add_option("-x", "--xuser", type="string", nargs=1, action="callback", \
+     dest="SRC=DST", callback=xuser_cb, help="set up a user tranlation. You \
+ can specify multiple user translations with multiple --xuser arguments.")
+ parser.add_option("--force", action="store_true", \
+     dest="force", help="overwrite all destination objects, even if they \
+ appear to be the same as the source objects.")
+ parser.add_option("--unit", action="store_true", \
+     dest="run_unit_tests", help="run unit tests and quit")
+ xuser = {}
+ (opts, args) = parser.parse_args()
+ if (opts.run_unit_tests):
+     test_acl_policy()
+     sys.exit(0)
+ opts.preserve_acls = not opts.no_preserve_acls
+ if (opts.create and opts.dry_run):
+     raise Exception("You can't run with both --create-dest and --dry-run! \
+ By definition, a dry run never changes anything.")
+ if (len(args) < 2):
+     print >>stderr, "Expected two positional arguments: source and destination"
+     print >>stderr, USAGE
+     sys.exit(1)
+ elif (len(args) > 2):
+     print >>stderr, "Too many positional arguments."
+     print >>stderr, USAGE
+     sys.exit(1)
+ if (opts.more_verbose):
+     print >>stderr, "User translations:"
+     for k,v in xuser.items():
+         print >>stderr, "\"%s\" ==> \"%s\"" % (k, v)
+     print >>stderr, ""
+ if (opts.more_verbose):
+     opts.verbose = True
+     boto.set_stream_logger("stdout")
+     boto.log.info("Enabling verbose boto logging.")
+ if (opts.delete_before and opts.delete_after):
+     print >>stderr, "It doesn't make sense to specify both --delete-before \
+ and --delete-after."
+     sys.exit(1)
+ src_name = args[0]
+ dst_name = args[1]
+ try:
+     if (opts.more_verbose):
+         print "SOURCE: " + src_name
+     src = Store.make_store(src_name, False,
+             getenv("SRC_AKEY", "AKEY"), getenv("SRC_SKEY", "SKEY"))
+ except NonexistentStore, e:
+     print >>stderr, "Fatal error: Source " + src_name + " does not exist."
+     sys.exit(1)
+ except Exception, e:
+     print >>stderr, "error creating source: " + str(e)
+     traceback.print_exc(100000, stderr)
+     sys.exit(1)
+ try:
+     if (opts.more_verbose):
+         print "DESTINATION: " + dst_name
+     dst = Store.make_store(dst_name, opts.create,
+             getenv("DST_AKEY", "AKEY"), getenv("DST_SKEY", "SKEY"))
+ except NonexistentStore, e:
+     print >>stderr, "Fatal error: Destination " + dst_name + " does " +\
+         "not exist. Run with -c or --create-dest to create it automatically."
+     sys.exit(1)
+ except Exception, e:
+     print >>stderr, "error creating destination: " + str(e)
+     traceback.print_exc(100000, stderr)
+     sys.exit(1)
+ if (opts.delete_before):
+     delete_unreferenced(src, dst)
+ for sobj in src.all_objects():
+     if (opts.more_verbose):
+         print "handling " + sobj.name
++    pline = ""
+     dobj = dst.locate_object(sobj)
+     upload = False
+     src_acl = None
+     dst_acl = None
+     if (opts.force):
+         if (opts.verbose):
 -            print "+ " + sobj.name
++            pline += "F " + sobj.name
+         upload = True
+     elif (dobj == None):
+         if (opts.verbose):
 -            print "> " + sobj.name
++            pline += "+ " + sobj.name
+         upload = True
+     elif not sobj.equals(dobj):
+         if (opts.verbose):
 -                print "^ " + sobj.name
++            pline += "> " + sobj.name
+         upload = True
+     elif (opts.preserve_acls):
+         # Do the ACLs match?
+         src_acl = src.get_acl(sobj)
+         dst_acl = dst.get_acl(dobj)
+         src_acl.translate_users(xuser)
+         #src_acl.set_owner()
+         if (not src_acl.equals(dst_acl)):
+             upload = True
+             if (opts.verbose):
 -            print ". " + sobj.name
++                pline += "^ %s" % sobj.name
+     else:
+         if (opts.verbose):
++            pline += ". " + sobj.name
+     if (upload):
+         if (not opts.preserve_acls):
+             # Just default to an empty ACL
+             src_acl = LocalAcl.get_empty(sobj.name)
+         else:
+             if (src_acl == None):
+                 src_acl = src.get_acl(sobj)
+                 src_acl.translate_users(xuser)
+                 #src_acl.set_owner()
+         local_copy = src.make_local_copy(sobj)
+         try:
+             dst.upload(local_copy, src_acl, sobj)
+         finally:
+             local_copy.remove()
++    if (pline != ""):
++        print pline
+ if (opts.delete_after):
+     delete_unreferenced(src, dst)
+ if (opts.more_verbose):
+     print "finished."
+ sys.exit(0)
Simple merge
diff --cc src/osd/PG.cc
Simple merge
diff --cc src/osd/PG.h
Simple merge