]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
obsync: tear out rgw
authorYehuda Sadeh <yehuda.sadeh@dreamhost.com>
Tue, 22 Nov 2011 23:05:45 +0000 (15:05 -0800)
committerYehuda Sadeh <yehuda.sadeh@dreamhost.com>
Tue, 22 Nov 2011 23:06:16 +0000 (15:06 -0800)
src/obsync/obsync

index 2d5e2928827f4d0a3088544c584d086762ab4545..89751494d75c42891be6ee4c02b4f99ae1a86a2c 100755 (executable)
@@ -28,8 +28,6 @@ import hashlib
 import mimetypes
 import os
 from StringIO import StringIO
-import rados
-import rgw
 import re
 import shutil
 import string
@@ -45,13 +43,9 @@ global opts
 # Translation table mapping users in the source to users in the destination.
 global xuser
 
-# Librgw instance
-global lrgw
-lrgw = None
-
 ###### Usage #######
 USAGE = """
-obsync synchronizes S3, Rados, and local objects. The source and destination
+obsync synchronizes S3 and local objects. The source and destination
 can both be local or both remote.
 
 Examples:
@@ -84,22 +78,6 @@ defaults.
 
 obsync (options) [source] [destination]"""
 
-###### Constants #######
-ACL_XATTR = "rados.acl"
-META_XATTR_PREFIX = "rados.meta."
-CONTENT_TYPE_XATTR = "rados.content_type"
-
-RGW_META_BUCKET_NAME = ".rgw"
-RGW_USERS_UID_BUCKET_NAME = ".users.uid"
-RGW_META_ETAG = "user.rgw.etag"
-RGW_META_PREFIX = "user.x-amz-meta-"
-RGW_META_CONTENT_TYPE = "user.rgw.content_type"
-RGW_META_ACL = "user.rgw.acl"
-
-def vvprint(s):
-    if (opts.more_verbose):
-        print s
-
 ###### Exception classes #######
 class ObsyncException(Exception):
     def __init__(self, ty, e):
@@ -550,15 +528,6 @@ class Store(object):
             else:
                 is_secure = os.environ.has_key("SRC_SECURE")
             return S3Store(s3_url, create, akey, skey, is_secure)
-        rados_url = strip_prefix("rgw:", url)
-        if (rados_url):
-            dst_owner = None
-            if (is_dst):
-                if not os.environ.has_key("DST_OWNER"):
-                    raise ObsyncArgumentParsingException("You must set \
-DST_OWNER when uploading files to RgwStore.")
-                dst_owner = os.environ["DST_OWNER"]
-            return RgwStore(rados_url, create, akey, skey, dst_owner)
         file_url = strip_prefix("file://", url)
         if (file_url):
             return FileStore(file_url, create)
@@ -865,235 +834,6 @@ class FileStore(Store):
         if (opts.more_verbose):
             print "FileStore: removed %s" % obj.name
 
-###### Rgw store #######
-class RgwStoreIterator(object):
-    """RgwStore iterator"""
-    def __init__(self, it, rgw_store):
-        self.it = it # has type rados.ObjectIterator
-        self.rgw_store = rgw_store
-        self.prefix = self.rgw_store.key_prefix
-        self.prefix_len = len(self.rgw_store.key_prefix)
-    def __iter__(self):
-        return self
-    def next(self):
-        rados_obj = None
-        while True:
-            # This will raise StopIteration when there are no more objects to
-            # iterate on
-            rados_obj = self.it.next()
-            # do the prefixes match?
-            if rados_obj.key[:self.prefix_len] == self.prefix:
-                break
-        ret = self.rgw_store.obsync_obj_from_rgw(rados_obj.key)
-        if (ret == None):
-            raise ObsyncPermanentException("internal iterator error")
-        return ret
-
-class RgwStore(Store):
-    def __init__(self, url, create, akey, skey, owner):
-        global lrgw
-        if (lrgw == None):
-            lrgw = rgw.Rgw()
-        self.owner = owner
-        self.user_exists_cache = {}
-        self.users_uid_ioctx = None
-        # Parse the rados url
-        conf_end = string.find(url, ":")
-        if (conf_end == -1):
-            raise ObsyncPermanentException("RgwStore URLs are of the form \
-rgw:path/to/ceph/conf:bucket:key_prefix. Failed to find the path to the conf.")
-        self.conf_file_path = url[0:conf_end]
-        bucket_end = url.find(":", conf_end+1)
-        if (bucket_end == -1):
-            self.rgw_bucket_name = url[conf_end+1:]
-            self.key_prefix = ""
-        else:
-            self.rgw_bucket_name = url[conf_end+1:bucket_end]
-            self.key_prefix = url[bucket_end+1:]
-        if (self.rgw_bucket_name == ""):
-            raise ObsyncPermanentException("RgwStore URLs are of the form \
-rgw:/path/to/ceph/conf:pool:key_prefix. Failed to find the bucket.")
-        if (opts.more_verbose):
-            print "self.conf_file_path = '" + self.conf_file_path + "', ",
-            print "self.rgw_bucket_name = '" + self.rgw_bucket_name + "' ",
-            print "self.key_prefix = '" + self.key_prefix + "'"
-        self.rados = rados.Rados()
-        self.rados.conf_read_file(self.conf_file_path)
-        self.rados.connect()
-        if self.owner != None and not self.user_exists(ACL_TYPE_CANON_USER + self.owner):
-            raise ObsyncPermanentException("Unknown owner! DST_OWNER=%s" % self.owner)
-        if (not self.rados.pool_exists(self.rgw_bucket_name)):
-            if (create):
-                self.create_rgw_bucket(self.rgw_bucket_name)
-            else:
-                raise ObsyncPermanentException("NonexistentStore")
-        elif self.owner == None:
-            # Figure out what owner we should use when creating objects.
-            # We use the owner of the destination bucket
-            ioctx = self.rados.open_ioctx(RGW_META_BUCKET_NAME)
-            try:
-                bin_ = ioctx.get_xattr(self.rgw_bucket_name, RGW_META_ACL)
-                xml = lrgw.acl_bin2xml(bin_)
-                acl = AclPolicy.from_xml(xml)
-                self.owner = acl.owner_id
-                if (opts.more_verbose):
-                    print "using owner \"%s\"" % self.owner
-            finally:
-               ioctx.close()
-        self.ioctx = self.rados.open_ioctx(self.rgw_bucket_name)
-        Store.__init__(self, "rgw:" + url)
-    def create_rgw_bucket(self, rgw_bucket_name):
-        global lrgw
-        """ Create an rgw bucket named 'rgw_bucket_name' """
-        if (self.owner == None):
-            raise ObsyncArgumentParsingException("Can't create a bucket \
-without knowing who should own it. Please set DST_OWNER")
-        self.rados.create_pool(self.rgw_bucket_name)
-        ioctx = None
-        try:
-            ioctx = self.rados.open_ioctx(RGW_META_BUCKET_NAME)
-            ioctx.write(rgw_bucket_name, "", 0)
-            print "ioctx.set_xattr(rgw_bucket_name=" + rgw_bucket_name + ", " + \
-                    "user.rgw.acl=" + self.owner + ")"
-            new_bucket_acl = "\
-<AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\"> \
-<Owner><ID>%s</ID></Owner><AccessControlList>\
-<Grant><Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" \
-xsi:type=\"CanonicalUser\"><ID>%s</ID> \
-<DisplayName>display-name</DisplayName></Grantee> \
-<Permission>FULL_CONTROL</Permission></Grant>\
-</AccessControlList></AccessControlPolicy>" % (self.owner, self.owner)
-            new_bucket_acl_bin = lrgw.acl_xml2bin(new_bucket_acl)
-            ioctx.set_xattr(rgw_bucket_name, "user.rgw.acl", new_bucket_acl_bin)
-        finally:
-            if (ioctx):
-               ioctx.close()
-    def obsync_obj_from_rgw(self, obj_name):
-        """Create an obsync object from a Rados object"""
-        try:
-            size, tm = self.ioctx.stat(obj_name)
-        except rados.ObjectNotFound:
-            return None
-        md5 = None
-        meta = {}
-        for k,v in self.ioctx.get_xattrs(obj_name):
-            if k == RGW_META_ETAG:
-                md5 = v
-            elif k == RGW_META_CONTENT_TYPE:
-                meta[CONTENT_TYPE_XATTR] = v
-            elif k[:len(RGW_META_PREFIX)] == RGW_META_PREFIX:
-                meta["rados.meta." + k[len(RGW_META_PREFIX):]] = v
-            elif opts.more_verbose:
-                print "ignoring unknown xattr " + k
-        if (md5 == None):
-            raise ObsyncPermanentException("error on object %s: expected to find " + \
-                "extended attribute %s" % (obj_name, RGW_META_ETAG))
-        if (opts.more_verbose):
-            print "meta = " + str(meta)
-        return Object(obj_name, md5, size, meta)
-    def __str__(self):
-        return "rgw:" + self.conf_file_path + ":" + self.rgw_bucket_name
-    def get_acl(self, obj):
-        global lrgw
-        bin_ = None
-        try:
-            bin_ = self.ioctx.get_xattr(obj.name, RGW_META_ACL)
-        except rados.NoData:
-            return LocalAcl.get_empty(obj.name)
-        xml = lrgw.acl_bin2xml(bin_)
-        return LocalAcl.from_xml(obj.name, xml)
-    def make_local_copy(self, obj):
-        temp_file = None
-        temp_file_f = None
-        try:
-            # read the object from rgw in chunks
-            temp_file = tempfile.NamedTemporaryFile(mode='w+b', delete=False)
-            temp_file_f = open(temp_file.name, 'w')
-            off = 0
-            while True:
-                buf = self.ioctx.read(obj.name, offset = off, length = 8192)
-                if (len(buf) == 0):
-                    break
-                temp_file_f.write(buf)
-                if (len(buf) < 8192):
-                    break
-                off += 8192
-            temp_file_f.close()
-        except Exception, e:
-            if (temp_file_f):
-                temp_file_f.close()
-            if (temp_file):
-                os.unlink(temp_file.name)
-            raise ObsyncTemporaryException(e)
-        return LocalCopy(obj.name, temp_file.name, True)
-    def all_objects(self):
-        it = self.ioctx.list_objects()
-        return RgwStoreIterator(it, self)
-    def locate_object(self, obj):
-        return self.obsync_obj_from_rgw(obj.name)
-    def user_exists(self, user):
-        if (self.user_exists_cache.has_key(user)):
-            return self.user_exists_cache[user]
-        if user[:len(ACL_TYPE_CANON_USER)] == ACL_TYPE_CANON_USER:
-            if (self.users_uid_ioctx == None):
-                # will be closed in __del__
-                self.users_uid_ioctx = self.rados.open_ioctx(RGW_USERS_UID_BUCKET_NAME)
-            try:
-                self.users_uid_ioctx.stat(user[len(ACL_TYPE_CANON_USER):])
-            except rados.ObjectNotFound:
-                return False
-            self.user_exists_cache[user] = True
-            return True
-        elif user[:len(ACL_TYPE_EMAIL_USER)] == ACL_TYPE_EMAIL_USER:
-            raise ObsyncPermanentException("rgw target can't handle email users yet.")
-        elif user[:len(ACL_TYPE_GROUP)] == ACL_TYPE_GROUP:
-            raise ObsyncPermanentException("rgw target can't handle groups yet.")
-        else:
-            raise ObsyncPermanentException("can't understand user name %s" % user)
-    def upload(self, local_copy, src_acl, obj):
-        global lrgw
-        if (opts.more_verbose):
-            print "RgwStore.UPLOAD: local_copy.path='" + local_copy.path + "' " + \
-                "obj='" + obj.name + "'"
-        if (opts.dry_run):
-            return
-        local_copy_f = open(local_copy.path, 'r')
-        off = 0
-        while True:
-            buf = local_copy_f.read(8192)
-            if ((len(buf) == 0) and (off != 0)):
-                break
-            self.ioctx.write(obj.name, buf, off)
-            if (len(buf) < 8192):
-                break
-            off += 8192
-        self.ioctx.set_xattr(obj.name, "user.rgw.etag", obj.md5)
-        if (src_acl.acl_policy == None):
-            ap = AclPolicy.create_default(self.owner)
-        else:
-            ap = src_acl.acl_policy
-        for user in ap.get_all_users():
-            if not self.user_exists(user):
-                raise ObsyncPermanentException("You must provide an --xuser entry to translate \
-user %s into something valid for the rgw destination.")
-        xml = ap.to_xml()
-        bin_ = lrgw.acl_xml2bin(xml)
-        self.ioctx.set_xattr(obj.name, "user.rgw.acl", bin_)
-        content_type = "application/octet-stream"
-        for k,v in obj.meta.items():
-            if k == CONTENT_TYPE_XATTR:
-                content_type = v
-            elif k[:len(META_XATTR_PREFIX)] == META_XATTR_PREFIX:
-                self.ioctx.set_xattr(obj.name,
-                        RGW_META_PREFIX + k[len(META_XATTR_PREFIX):], v)
-        self.ioctx.set_xattr(obj.name, "user.rgw.content_type", content_type)
-    def remove(self, obj):
-        if (opts.dry_run):
-            return
-        self.ioctx.remove_object(obj.name)
-        if (opts.more_verbose):
-            print "RgwStore: removed %s" % obj.name
-
 ###### Functions #######
 def delete_unreferenced(src, dst):
     """ delete everything from dst that is not referenced in src """