import os
from StringIO import StringIO
import rados
+import rgw
import re
import shutil
import string
global xuser
# Librgw instance
-global rgw
+global lrgw
+lrgw = None
###### Constants #######
ACL_XATTR = "rados.acl"
class RgwStore(Store):
def __init__(self, url, create, akey, skey, owner):
- if (rgw == None):
- rgw = Rgw()
+ global lrgw
+ if (lrgw == None):
+ lrgw = rgw.Rgw()
self.owner = owner
# Parse the rados url
conf_end = string.find(url, ":")
# Figure out what owner we should use when creating objects.
# We use the owner of the destination bucket
bin_ = meta_ctx.get_xattr(self.rgw_bucket_name, "user.rgw.acl")
- xml = rgw.acl_bin2xml(bin_)
+ xml = lrgw.acl_bin2xml(bin_)
acl = AclPolicy.from_xml(obj.name, xml)
self.bucket_owner = acl.owner_id
if (self.more_verbose):
self.ioctx = self.rados.open_ioctx(self.rgw_bucket_name)
Store.__init__(self, "rgw:" + url)
def create_rgw_bucket(self, rgw_bucket_name):
+ global lrgw
""" Create an rgw bucket named 'rgw_bucket_name' """
if (self.bucket_owner == None):
raise Exception("Can't create a bucket without knowing who " +
meta_ctx = self.rados.open_ioctx(RGW_META_BUCKET_NAME)
meta_ctx.write(rgw_bucket_name, "", 0)
print "meta_ctx.set_xattr(rgw_bucket_name=" + rgw_bucket_name + ", " + \
- "user.rgw.acl=" + self.acl_hack + ")"
+ "user.rgw.acl=" + self.dst_owner + ")"
new_bucket_acl = "\
<AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\"> \
<Owner><ID>%s</ID></Owner><AccessControlList>\
<DisplayName>display-name</DisplayName></Grantee> \
<Permission>FULL_CONTROL</Permission></Grant>\
</AccessControlList></AccessControlPolicy>" % (self.dst_owner, self.dst_owner)
- new_bucket_acl_bin = rgw.acl_xml2bin(new_bucket_acl)
+ new_bucket_acl_bin = lrgw.acl_xml2bin(new_bucket_acl)
meta_ctx.set_xattr(rgw_bucket_name, "user.rgw.acl", new_bucket_acl_bin)
finally:
if (meta_ctx):
def __str__(self):
return "rgw:" + self.conf_file_path + ":" + self.rgw_bucket_name + ":" + self.key_prefix
def get_acl(self, obj):
+ global lrgw
try:
bin_ = self.ioctx.get_xattr(obj.name, ACL_XATTR)
except rados.ObjectNotFound:
return LocalAcl.get_empty(obj.name)
- xml = rgw.acl_bin2xml(bin_)
+ xml = lrgw.acl_bin2xml(bin_)
return LocalAcl.from_xml(obj.name, xml)
def make_local_copy(self, obj):
temp_file = None
def locate_object(self, obj):
return self.obsync_obj_from_rgw(obj.name)
def upload(self, local_copy, src_acl, obj):
+ global lrgw
if (opts.more_verbose):
print "RgwStore.UPLOAD: local_copy.path='" + local_copy.path + "' " + \
"obj='" + obj.name + "'"
self.ioctx.set_xattr(obj.name, "user.rgw.etag", obj.md5)
if (src_acl.acl_policy != None):
xml = src_acl.acl_policy.to_xml()
- bin_ = rgw.acl_xml2bin(xml)
+ bin_ = lrgw.acl_xml2bin(xml)
self.ioctx.set_xattr(obj.name, "user.rgw.acl", bin_)
for k,v in obj.meta.items():
self.ioctx.set_xattr(obj.name,
else:
return random.randint(9999, 99999)
+def read_s3_config(cfg, section, sconfig, name):
+ # TODO: support 'port', 'is_secure'
+ sconfig[name] = {}
+ for var in [ 'access_key', 'host', 'secret_key', 'user_id',
+ 'display_name', 'email', 'consistency', ]:
+ try:
+ sconfig[name][var] = cfg.get(section, var)
+ except ConfigParser.NoOptionError:
+ pass
+ # Make sure connection works
+ try:
+ conn = boto.s3.connection.S3Connection(
+ aws_access_key_id = sconfig[name]["access_key"],
+ aws_secret_access_key = sconfig[name]["secret_key"],
+ host = sconfig[name]["host"],
+ # TODO support & test all variations
+ calling_format=boto.s3.connection.OrdinaryCallingFormat(),
+ )
+ except Exception, e:
+ print >>stderr, "error initializing connection!"
+ raise
+
+ # Create bucket name
+ try:
+ template = cfg.get('fixtures', 'bucket prefix')
+ except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
+ template = 'test-{random}-'
+ random.seed()
+ try:
+ sconfig[name]["bucket_name"] = \
+ template.format(random=get_nonce())
+ except:
+ print >>stderr, "error parsing bucket prefix template"
+ raise
+
+def read_rgw_config(cfg, section, sconfig, rconfig, name):
+ rconfig[name] = {}
+ for var in [ 'ceph_conf' ]:
+ try:
+ rconfig[name][var] = cfg.get(section, var)
+ except ConfigParser.NoOptionError:
+ pass
+
def read_config():
- config = {}
+ sconfig = {}
+ rconfig = {}
cfg = ConfigParser.RawConfigParser()
try:
path = os.environ['S3TEST_CONF']
(type_, name) = section.split(None, 1)
except ValueError:
continue
- if type_ != 's3':
- continue
- # TODO: support 'port', 'is_secure'
-
- config[name] = {}
- for var in [ 'access_key', 'host', 'secret_key', 'user_id',
- 'display_name', 'email', 'consistency', ]:
- try:
- config[name][var] = cfg.get(section, var)
- except ConfigParser.NoOptionError:
- pass
- # Make sure connection works
- try:
- conn = boto.s3.connection.S3Connection(
- aws_access_key_id = config[name]["access_key"],
- aws_secret_access_key = config[name]["secret_key"],
- host = config[name]["host"],
- # TODO support & test all variations
- calling_format=boto.s3.connection.OrdinaryCallingFormat(),
- )
- except Exception, e:
- print >>stderr, "error initializing connection!"
- raise
-
- # Create bucket name
- try:
- template = cfg.get('fixtures', 'bucket prefix')
- except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
- template = 'test-{random}-'
- random.seed()
- try:
- config[name]["bucket_name"] = \
- template.format(random=get_nonce())
- except:
- print >>stderr, "error parsing bucket prefix template"
- raise
- return config
+ if type_ == 's3':
+ read_s3_config(cfg, section, sconfig, name)
+ elif type_ == 'rgw':
+ read_rgw_config(cfg, section, sconfig, rconfig, name)
+ for k,v in rconfig.items():
+ if (not sconfig.has_key(k)):
+ raise Exception("Can't find the S3 bucket associated with \
+ rgw pool %s" % k)
+ v["bucket"] = sconfig[k]
+ return sconfig, rconfig
def obsync(src, dst, misc):
env = {}
num_objects = num_objects + 1
return num_objects
-def xuser(src, dst):
- return [ "--xuser", config[src]["user_id"] + "=" + config[dst]["user_id"]]
+def xuser(sconfig, src, dst):
+ return [ "--xuser", sconfig[src]["user_id"] + "=" + sconfig[dst]["user_id"]]
def get_optional(h, k):
if (h.has_key(k)):
- print "found " + str(h[k])
return h[k]
else:
- print "found nothing"
return None
def xattr_sync_impl(file_name, meta):
if (self.consistency != None):
env["DST_CONSISTENCY"] = self.consistency
+class ObSyncTestPool(object):
+ def __init__(self, bucket, ceph_conf):
+ self.bucket = bucket
+ self.ceph_conf = ceph_conf
+ def to_src(self, env, args):
+ args.append(self.get_url())
+ def to_dst(self, env, args):
+ env["DST_OWNER"] = self.bucket["user_id"]
+ args.append(self.get_url())
+ def get_url(self):
+ return "rgw:%s:%s" % (self.ceph_conf, self.bucket["bucket_name"])
+
###### Main #######
# change directory to obsync directory
os.chdir(os.path.dirname(os.path.abspath(__file__)))
opts.verbose = True
# parse configuration file
-config = read_config()
+sconfig, rconfig = read_config()
opts.buckets = []
-opts.buckets.append(ObSyncTestBucket(config["main"]["bucket_name"], \
- "s3://" + config["main"]["host"] + "/" + config["main"]["bucket_name"], \
- config["main"]["access_key"], config["main"]["secret_key"],
- get_optional(config["main"], "consistency")))
-opts.buckets.append(ObSyncTestBucket(config["alt"]["bucket_name"], \
- "s3://" + config["alt"]["host"] + "/" + config["alt"]["bucket_name"], \
- config["alt"]["access_key"], config["alt"]["secret_key"],
- get_optional(config["alt"], "consistency")))
-
-if not config["main"]["user_id"]:
+opts.buckets.append(ObSyncTestBucket(sconfig["main"]["bucket_name"], \
+ "s3://" + sconfig["main"]["host"] + "/" + sconfig["main"]["bucket_name"], \
+ sconfig["main"]["access_key"], sconfig["main"]["secret_key"],
+ get_optional(sconfig["main"], "consistency")))
+opts.buckets.append(ObSyncTestBucket(sconfig["alt"]["bucket_name"], \
+ "s3://" + sconfig["alt"]["host"] + "/" + sconfig["alt"]["bucket_name"], \
+ sconfig["alt"]["access_key"], sconfig["alt"]["secret_key"],
+ get_optional(sconfig["alt"], "consistency")))
+
+opts.pools = []
+if (rconfig.has_key("main")):
+ if (opts.verbose):
+ print "running rgw target tests..."
+ opts.pools.append(ObSyncTestPool(sconfig["main"], \
+ rconfig["main"]["ceph_conf"]))
+
+if not sconfig["main"]["user_id"]:
raise Exception("You must specify a user_id for the main section.")
-if not config["alt"]["user_id"]:
+if not sconfig["alt"]["user_id"]:
raise Exception("You must specify a user_id for the alt section.")
# set up temporary directory
synthetic_xml1 = \
"<AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\n\
<Owner>\n\
-<ID>" + config["main"]["user_id"] + "</ID>\n\
+<ID>" + sconfig["main"]["user_id"] + "</ID>\n\
<DisplayName></DisplayName>\n\
</Owner>\n\
<AccessControlList>\n\
<Grant>\n\
<Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" \
xsi:type=\"CanonicalUser\">\n\
- <ID>" + config["main"]["user_id"] + "</ID>\n\
+ <ID>" + sconfig["main"]["user_id"] + "</ID>\n\
<DisplayName></DisplayName>\n\
</Grantee>\n\
<Permission>FULL_CONTROL</Permission>\n\
</AccessControlPolicy>"
xattr.set("%s/dira/a" % tdir, ACL_XATTR, synthetic_xml1,
namespace=xattr.NS_USER)
-print "set attr on %s" % ("%s/dira/a" % tdir)
+if (opts.verbose):
+ print "set attr on %s" % ("%s/dira/a" % tdir)
# test ACL transformations
# canonicalize xml by parse + write out
obsync_check("file://%s/dira" % tdir, "file://%s/dira2" % tdir,
- ["-d", "-c"] + xuser("main", "alt"))
+ ["-d", "-c"] + xuser(sconfig, "main", "alt"))
# test that ACL is preserved
obsync_check("file://%s/dira2" % tdir, "file://%s/dira3" % tdir,
["-d", "-c"])
raise Exception("xml not preserved across obsync!")
# test ACL transformation
obsync_check("file://%s/dira3" % tdir, "file://%s/dira4" % tdir,
- ["-d", "-c"] + xuser("main", "alt"))
+ ["-d", "-c"] + xuser(sconfig, "main", "alt"))
synthetic_xml4 = xattr.get("%s/dira4/a" % tdir, ACL_XATTR,
namespace=xattr.NS_USER)
if (synthetic_xml3 != synthetic_xml4):
raise Exception("xml not preserved across obsync!")
# test ACL transformation back
obsync_check("file://%s/dira5" % tdir, "file://%s/dira6" % tdir,
- ["-d", "-c"] + xuser("alt", "main"))
+ ["-d", "-c"] + xuser(sconfig, "alt", "main"))
if (synthetic_xml5 != synthetic_xml2):
raise Exception("expected to transform XML back to original form \
through a double xuser")
if (opts.verbose):
print "successfully copied a directory with --follow-symlinks"
+# rgw target tests
+if len(opts.pools) > 0:
+ print "testing rgw target"
+ os.mkdir("%s/rgw1" % tdir)
+ f = open("%s/rgw1/aaa" % tdir, 'w')
+ f.write("aaa")
+ f.close()
+ obsync_check("%s/rgw1" % tdir, opts.pools[0], [])
+ print "testing rgw source"
+ obsync_check(opts.pools[0], "%s/rgw1" % tdir, ["-c"])
+# print "testing rgw target with --create"
+# obsync_check("%s/rgw1" % tdir, opts.pools[0], ["--create"])
+
# test escaping
os.mkdir("%s/escape_dir1" % tdir)
f = open("%s/escape_dir1/$$foo" % tdir, 'w')
if (opts.verbose):
print "copying " + opts.buckets[0].name + " to " + opts.buckets[1].name
obsync_check(opts.buckets[0], opts.buckets[1], ["-c", "--delete-after"] + \
- xuser("main", "alt"))
+ xuser(sconfig, "main", "alt"))
if (opts.verbose):
print "copying bucket1 to dir4..."
obsync_check(opts.buckets[1], "file://%s/dir4" % tdir, ["-c"])
if (opts.verbose):
print "copying bucket0 to bucket1..."
obsync_check(opts.buckets[0], opts.buckets[1], ["-c", "--delete-before"] + \
- xuser("main", "alt"))
+ xuser(sconfig, "main", "alt"))
obsync_check(opts.buckets[0], "%s/bucket0_out" % tdir, ["--delete-after"])
obsync_check(opts.buckets[1], "%s/bucket1_out" % tdir, ["--delete-after"])
bucket0_count = count_obj_in_dir("/%s/bucket0_out" % tdir)