]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osync: first version
authorColin Patrick McCabe <cmccabe@alumni.cmu.edu>
Wed, 16 Mar 2011 23:33:54 +0000 (16:33 -0700)
committerColin Patrick McCabe <cmccabe@alumni.cmu.edu>
Thu, 17 Mar 2011 19:50:46 +0000 (12:50 -0700)
Signed-off-by: Colin McCabe <colin.mccabe@dreamhost.com>
src/osync/osync.py [new file with mode: 0755]

diff --git a/src/osync/osync.py b/src/osync/osync.py
new file mode 100755 (executable)
index 0000000..2971047
--- /dev/null
@@ -0,0 +1,333 @@
+#!/usr/bin/env python
+
+#
+# Ceph - scalable distributed file system
+#
+# Copyright (C) 2011 New Dream Network
+#
+# This is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1, as published by the Free Software
+# Foundation.  See file COPYING.
+#
+
+"""
+osync.py: the object synchronizer
+"""
+
+from boto.s3.connection import OrdinaryCallingFormat
+from boto.s3.connection import S3Connection
+from boto.s3.key import Key
+from optparse import OptionParser
+from sys import stderr
+import boto
+import errno
+import hashlib
+import mimetypes
+import os
+import shutil
+import sys
+import traceback
+
+global opts
+
+###### Helper functions #######
+def mkdir_p(path):
+    try:
+        os.makedirs(path)
+    except OSError as exc:
+        if exc.errno != errno.EEXIST:
+            raise
+        if (not os.path.isdir(path)):
+            raise
+
+def bytes_to_str(b):
+    return ''.join(["%02x"% ord(x) for x in b]).strip()
+
+def get_md5(f, block_size=2**20):
+    md5 = hashlib.md5()
+    while True:
+        data = f.read(block_size)
+        if not data:
+            break
+        md5.update(data)
+    return md5.digest()
+
+def strip_prefix(prefix, s):
+    if not (s[0:len(prefix)] == prefix):
+        return None
+    return s[len(prefix):]
+
+###### NonexistentStore #######
+class NonexistentStore(Exception):
+    pass
+
+###### Object #######
+class Object(object):
+    def __init__(self, name, md5, size):
+        self.name = name
+        self.md5 = md5
+        self.size = size
+    def equals(self, rhs):
+        if (self.name != rhs.name):
+            return False
+        if (self.md5 != rhs.md5):
+            return False
+        if (self.size != rhs.size):
+            return False
+        return True
+    @staticmethod
+    def from_file(obj_name, path):
+        f = open(path)
+        try:
+            md5 = get_md5(f)
+        finally:
+            f.close()
+        size = os.path.getsize(path)
+        #print "Object.from_file: path="+path+",md5=" + bytes_to_str(md5) +",size=" + str(size)
+        return Object(obj_name, md5, size)
+
+###### Store #######
+class Store(object):
+    @staticmethod
+    def make_store(url, create):
+        s3_url = strip_prefix("s3://", url)
+        if (s3_url):
+            return S3Store(url, create)
+        file_url = strip_prefix("file://", url)
+        if (file_url):
+            return FileStore(file_url, create)
+        raise Exception("Failed to find s3:// or file:// prefix.")
+    def __init__(self, url):
+        self.url = url
+
+###### S3 store #######
+class S3StoreLocalCopy(object):
+    def __init__(self, path):
+        self.path = path
+    def __del__(self):
+        self.remove()
+    def remove(self):
+        if (self.path):
+            os.unlink(self.path)
+            self.path = None
+
+class S3StoreIterator(object):
+    """S3Store iterator"""
+    def __init__(self, blrs):
+        self.blrs = blrs
+    def __iter__(self):
+        return self
+    def next(self):
+        # This will raise StopIteration when there are no more objects to
+        # iterate on
+        key = blrs.next()
+        ret = Object(key.name, key.md5, key.size)
+        return ret
+
+class S3Store(Store):
+    def __init__(self, url, create):
+        # Parse the s3 url
+        host_end = string.find(url, "/")
+        if (host_end == -1):
+            raise Exception("S3Store URLs are of the form \
+s3://host/bucket/key_prefix. Failed to find the host.")
+        self.host = url[0:host_end]
+        bucket_end = string.find(url, host_end+1, "/")
+        if (host_end == -1):
+            self.bucket_name = url[host_end+1:]
+            self.key_prefix = ""
+        else:
+            self.bucket_name = url[host_end+1:bucket_end]
+            self.key_prefix = url[bucket_end:]
+        self.host = url[0:host_end]
+        self.conn = S3Connection(calling_format=OrdinaryCallingFormat(),
+            host="rgw-1.ceph.dreamhost.com", is_secure=False)
+        self.bucket = conn.get_bucket(self.bucket_name)
+        Store.__init__(self, "s3://" + url)
+    def __str__(self):
+        return "s3://" + self.host + "/" + self.bucket_name + "/" + self.key_prefix
+    def make_local_copy(self, obj):
+        k = Key(self.bucket)
+        k.key = obj.name
+        temp_file = tempfile.NamedTemporaryFile(mode='w+b', delete=False)
+        try:
+            k.get_contents_to_filename(temp_file.name)
+        except:
+            os.unlink(temp_file.name)
+            raise
+        return S3StoreLocalCopy(temp_file.name)
+    def all_objects(self):
+        blrs = self.bucket.list(prefix = self.key_prefix)
+        return S3StoreIterator(blrs)
+    def locate_object(self, name):
+        k = self.bucket.get_key(name)
+        if (k == None):
+            return None
+        return Object(name, k.md5, k.size)
+    def upload(self, local_copy, obj):
+        mime = mimetypes.guess_type(local_copy)
+        if (mime == None):
+            mime = "application/octet-stream"
+        k = Key(self.bucket)
+        k.key = Key(obj.name)
+        k.set_metadata("Content-Type", mime)
+        k.set_contents_from_filename(local_copy.path)
+
+###### FileStore #######
+class FileStoreIterator(object):
+    """FileStore iterator"""
+    def __init__(self, base):
+        self.base = base
+        self.generator = os.walk(base)
+        self.path = ""
+        self.files = []
+    def __iter__(self):
+        return self
+    def next(self):
+        while True:
+            if (len(self.files) == 0):
+                self.path, dirs, self.files = self.generator.next()
+                continue
+            path = self.path + "/" + self.files[0]
+            self.files = self.files[1:]
+            obj_name = path[len(self.base)+1:]
+            if (not os.path.isfile(path)):
+                continue
+            return Object.from_file(obj_name, path)
+
+class FileStoreLocalCopy(object):
+    def __init__(self, path):
+        self.path = path
+    def remove(self):
+        self.path = None
+
+class FileStore(Store):
+    def __init__(self, url, create):
+        # Parse the file url
+        self.base = url
+        if (self.base[-1:] == '/'):
+            self.base = self.base[:-1]
+        if (create):
+            mkdir_p(self.base)
+        elif (not os.path.isdir(self.base)):
+            raise NonexistentStore()
+        Store.__init__(self, "file://" + url)
+    def __str__(self):
+        return "file://" + self.base
+    def make_local_copy(self, obj):
+        return FileStoreLocalCopy(self.base + "/" + obj.name)
+    def all_objects(self):
+        return FileStoreIterator(self.base)
+    def locate_object(self, obj):
+        path = self.base + "/" + obj.name
+        found = os.path.isfile(path)
+        if (opts.more_verbose):
+            if (found):
+                print "FileStore::locate_object: found object '" + \
+                    obj.name + "'"
+            else:
+                print "FileStore::locate_object: did not find object '" + \
+                    obj.name + "'"
+        if (not found):
+            return None
+        return Object.from_file(obj.name, path)
+    def upload(self, local_copy, obj):
+        s = local_copy.path
+        d = self.base + "/" + obj.name
+        #print "s='" + s +"', d='" + d + "'"
+        mkdir_p(os.path.dirname(d))
+        shutil.copy(s, d)
+
+###### Functions #######
+USAGE = """
+osync synchronizes objects. The source and destination can both be local or
+both remote.
+
+Examples:
+# copy contents of mybucket to disk
+osync -v s3://myhost/mybucket file://mydir
+
+# copy contents of mydir to an S3 bucket
+osync -v file://mydir s3://myhost/mybucket
+
+# synchronize two S3 buckets
+osync -v s3://myhost/mybucket1 s3://myhost/mybucket2
+
+osync (options) [source] [destination]"""
+
+parser = OptionParser(USAGE)
+#parser.add_option("-c", "--config-file", dest="config_file", \
+    #metavar="FILE", help="set config file")
+#parser.add_option("-n", "--dry-run", action="store_true", \
+#    dest="dry_run")
+parser.add_option("-c", "--create-dest", action="store_true", \
+    dest="create", help="create the destination if it doesn't already exist")
+parser.add_option("-v", "--verbose", action="store_true", \
+    dest="verbose", help="be verbose")
+parser.add_option("-V", "--more-verbose", action="store_true", \
+    dest="more_verbose", help="be really, really verbose (developer mode)")
+(opts, args) = parser.parse_args()
+if (len(args) < 2):
+    print >>stderr, "Expected two positional arguments: source and destination"
+    print >>stderr, USAGE
+    sys.exit(1)
+elif (len(args) > 2):
+    print >>stderr, "Too many positional arguments."
+    print >>stderr, USAGE
+    sys.exit(1)
+if (opts.more_verbose):
+    opts.verbose = True
+    boto.set_stream_logger("stdout")
+    boto.log.info("Enabling verbose boto logging.")
+src_name = args[0]
+dst_name = args[1]
+try:
+    if (opts.more_verbose):
+        print "SOURCE: " + src_name
+    src = Store.make_store(src_name, False)
+except NonexistentStore as e:
+    print >>stderr, "Fatal error: Source " + src_name + " does not exist."
+    sys.exit(1)
+except Exception as e:
+    print >>stderr, "error creating source: " + str(e)
+    sys.exit(1)
+try:
+    if (opts.more_verbose):
+        print "DESTINATION: " + dst_name
+    dst = Store.make_store(dst_name, opts.create)
+except NonexistentStore as e:
+    print >>stderr, "Fatal error: Destination " + dst_name + " does " +\
+        "not exist. Run with -c or --create-dest to create it automatically."
+    sys.exit(1)
+except Exception as e:
+    print >>stderr, "error creating destination: " + str(e)
+    sys.exit(1)
+
+for sobj in src.all_objects():
+    if (opts.more_verbose):
+        print "handling " + sobj.name
+    dobj = dst.locate_object(sobj)
+    upload = False
+    if (dobj == None):
+        if (opts.verbose):
+            print "+ " + sobj.name
+        upload = True
+    elif not sobj.equals(dobj):
+        if (opts.verbose):
+            print "> " + sobj.name
+        upload = True
+    else:
+        if (opts.verbose):
+            print ". " + sobj.name
+    if (upload):
+        try:
+            local_copy = src.make_local_copy(sobj)
+            dst.upload(local_copy, sobj)
+        finally:
+            local_copy.remove()
+
+if (opts.more_verbose):
+    print "finished."
+
+sys.exit(0)