From 2aaf2c60cea5d4c8aa92915fde770af0f28879cc Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Wed, 16 Mar 2011 16:33:54 -0700 Subject: [PATCH] osync: first version Signed-off-by: Colin McCabe --- src/osync/osync.py | 333 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 333 insertions(+) create mode 100755 src/osync/osync.py diff --git a/src/osync/osync.py b/src/osync/osync.py new file mode 100755 index 0000000000000..29710473b459f --- /dev/null +++ b/src/osync/osync.py @@ -0,0 +1,333 @@ +#!/usr/bin/env python + +# +# Ceph - scalable distributed file system +# +# Copyright (C) 2011 New Dream Network +# +# This is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1, as published by the Free Software +# Foundation. See file COPYING. +# + +""" +osync.py: the object synchronizer +""" + +from boto.s3.connection import OrdinaryCallingFormat +from boto.s3.connection import S3Connection +from boto.s3.key import Key +from optparse import OptionParser +from sys import stderr +import boto +import errno +import hashlib +import mimetypes +import os +import shutil +import sys +import traceback + +global opts + +###### Helper functions ####### +def mkdir_p(path): + try: + os.makedirs(path) + except OSError as exc: + if exc.errno != errno.EEXIST: + raise + if (not os.path.isdir(path)): + raise + +def bytes_to_str(b): + return ''.join(["%02x"% ord(x) for x in b]).strip() + +def get_md5(f, block_size=2**20): + md5 = hashlib.md5() + while True: + data = f.read(block_size) + if not data: + break + md5.update(data) + return md5.digest() + +def strip_prefix(prefix, s): + if not (s[0:len(prefix)] == prefix): + return None + return s[len(prefix):] + +###### NonexistentStore ####### +class NonexistentStore(Exception): + pass + +###### Object ####### +class Object(object): + def __init__(self, name, md5, size): + self.name = name + self.md5 = md5 + self.size = size + def equals(self, rhs): + if (self.name != rhs.name): + return False + if (self.md5 != rhs.md5): + return False + if (self.size != rhs.size): + return False + return True + @staticmethod + def from_file(obj_name, path): + f = open(path) + try: + md5 = get_md5(f) + finally: + f.close() + size = os.path.getsize(path) + #print "Object.from_file: path="+path+",md5=" + bytes_to_str(md5) +",size=" + str(size) + return Object(obj_name, md5, size) + +###### Store ####### +class Store(object): + @staticmethod + def make_store(url, create): + s3_url = strip_prefix("s3://", url) + if (s3_url): + return S3Store(url, create) + file_url = strip_prefix("file://", url) + if (file_url): + return FileStore(file_url, create) + raise Exception("Failed to find s3:// or file:// prefix.") + def __init__(self, url): + self.url = url + +###### S3 store ####### +class S3StoreLocalCopy(object): + def __init__(self, path): + self.path = path + def __del__(self): + self.remove() + def remove(self): + if (self.path): + os.unlink(self.path) + self.path = None + +class S3StoreIterator(object): + """S3Store iterator""" + def __init__(self, blrs): + self.blrs = blrs + def __iter__(self): + return self + def next(self): + # This will raise StopIteration when there are no more objects to + # iterate on + key = blrs.next() + ret = Object(key.name, key.md5, key.size) + return ret + +class S3Store(Store): + def __init__(self, url, create): + # Parse the s3 url + host_end = string.find(url, "/") + if (host_end == -1): + raise Exception("S3Store URLs are of the form \ +s3://host/bucket/key_prefix. Failed to find the host.") + self.host = url[0:host_end] + bucket_end = string.find(url, host_end+1, "/") + if (host_end == -1): + self.bucket_name = url[host_end+1:] + self.key_prefix = "" + else: + self.bucket_name = url[host_end+1:bucket_end] + self.key_prefix = url[bucket_end:] + self.host = url[0:host_end] + self.conn = S3Connection(calling_format=OrdinaryCallingFormat(), + host="rgw-1.ceph.dreamhost.com", is_secure=False) + self.bucket = conn.get_bucket(self.bucket_name) + Store.__init__(self, "s3://" + url) + def __str__(self): + return "s3://" + self.host + "/" + self.bucket_name + "/" + self.key_prefix + def make_local_copy(self, obj): + k = Key(self.bucket) + k.key = obj.name + temp_file = tempfile.NamedTemporaryFile(mode='w+b', delete=False) + try: + k.get_contents_to_filename(temp_file.name) + except: + os.unlink(temp_file.name) + raise + return S3StoreLocalCopy(temp_file.name) + def all_objects(self): + blrs = self.bucket.list(prefix = self.key_prefix) + return S3StoreIterator(blrs) + def locate_object(self, name): + k = self.bucket.get_key(name) + if (k == None): + return None + return Object(name, k.md5, k.size) + def upload(self, local_copy, obj): + mime = mimetypes.guess_type(local_copy) + if (mime == None): + mime = "application/octet-stream" + k = Key(self.bucket) + k.key = Key(obj.name) + k.set_metadata("Content-Type", mime) + k.set_contents_from_filename(local_copy.path) + +###### FileStore ####### +class FileStoreIterator(object): + """FileStore iterator""" + def __init__(self, base): + self.base = base + self.generator = os.walk(base) + self.path = "" + self.files = [] + def __iter__(self): + return self + def next(self): + while True: + if (len(self.files) == 0): + self.path, dirs, self.files = self.generator.next() + continue + path = self.path + "/" + self.files[0] + self.files = self.files[1:] + obj_name = path[len(self.base)+1:] + if (not os.path.isfile(path)): + continue + return Object.from_file(obj_name, path) + +class FileStoreLocalCopy(object): + def __init__(self, path): + self.path = path + def remove(self): + self.path = None + +class FileStore(Store): + def __init__(self, url, create): + # Parse the file url + self.base = url + if (self.base[-1:] == '/'): + self.base = self.base[:-1] + if (create): + mkdir_p(self.base) + elif (not os.path.isdir(self.base)): + raise NonexistentStore() + Store.__init__(self, "file://" + url) + def __str__(self): + return "file://" + self.base + def make_local_copy(self, obj): + return FileStoreLocalCopy(self.base + "/" + obj.name) + def all_objects(self): + return FileStoreIterator(self.base) + def locate_object(self, obj): + path = self.base + "/" + obj.name + found = os.path.isfile(path) + if (opts.more_verbose): + if (found): + print "FileStore::locate_object: found object '" + \ + obj.name + "'" + else: + print "FileStore::locate_object: did not find object '" + \ + obj.name + "'" + if (not found): + return None + return Object.from_file(obj.name, path) + def upload(self, local_copy, obj): + s = local_copy.path + d = self.base + "/" + obj.name + #print "s='" + s +"', d='" + d + "'" + mkdir_p(os.path.dirname(d)) + shutil.copy(s, d) + +###### Functions ####### +USAGE = """ +osync synchronizes objects. The source and destination can both be local or +both remote. + +Examples: +# copy contents of mybucket to disk +osync -v s3://myhost/mybucket file://mydir + +# copy contents of mydir to an S3 bucket +osync -v file://mydir s3://myhost/mybucket + +# synchronize two S3 buckets +osync -v s3://myhost/mybucket1 s3://myhost/mybucket2 + +osync (options) [source] [destination]""" + +parser = OptionParser(USAGE) +#parser.add_option("-c", "--config-file", dest="config_file", \ + #metavar="FILE", help="set config file") +#parser.add_option("-n", "--dry-run", action="store_true", \ +# dest="dry_run") +parser.add_option("-c", "--create-dest", action="store_true", \ + dest="create", help="create the destination if it doesn't already exist") +parser.add_option("-v", "--verbose", action="store_true", \ + dest="verbose", help="be verbose") +parser.add_option("-V", "--more-verbose", action="store_true", \ + dest="more_verbose", help="be really, really verbose (developer mode)") +(opts, args) = parser.parse_args() +if (len(args) < 2): + print >>stderr, "Expected two positional arguments: source and destination" + print >>stderr, USAGE + sys.exit(1) +elif (len(args) > 2): + print >>stderr, "Too many positional arguments." + print >>stderr, USAGE + sys.exit(1) +if (opts.more_verbose): + opts.verbose = True + boto.set_stream_logger("stdout") + boto.log.info("Enabling verbose boto logging.") +src_name = args[0] +dst_name = args[1] +try: + if (opts.more_verbose): + print "SOURCE: " + src_name + src = Store.make_store(src_name, False) +except NonexistentStore as e: + print >>stderr, "Fatal error: Source " + src_name + " does not exist." + sys.exit(1) +except Exception as e: + print >>stderr, "error creating source: " + str(e) + sys.exit(1) +try: + if (opts.more_verbose): + print "DESTINATION: " + dst_name + dst = Store.make_store(dst_name, opts.create) +except NonexistentStore as e: + print >>stderr, "Fatal error: Destination " + dst_name + " does " +\ + "not exist. Run with -c or --create-dest to create it automatically." + sys.exit(1) +except Exception as e: + print >>stderr, "error creating destination: " + str(e) + sys.exit(1) + +for sobj in src.all_objects(): + if (opts.more_verbose): + print "handling " + sobj.name + dobj = dst.locate_object(sobj) + upload = False + if (dobj == None): + if (opts.verbose): + print "+ " + sobj.name + upload = True + elif not sobj.equals(dobj): + if (opts.verbose): + print "> " + sobj.name + upload = True + else: + if (opts.verbose): + print ". " + sobj.name + if (upload): + try: + local_copy = src.make_local_copy(sobj) + dst.upload(local_copy, sobj) + finally: + local_copy.remove() + +if (opts.more_verbose): + print "finished." + +sys.exit(0) -- 2.39.5