]> git-server-git.apps.pok.os.sepia.ceph.com Git - radosgw-agent.git/commitdiff
packaging: add dependency on python-requests
authorJosh Durgin <josh.durgin@inktank.com>
Wed, 19 Mar 2014 09:27:32 +0000 (02:27 -0700)
committerJosh Durgin <josh.durgin@inktank.com>
Fri, 21 Mar 2014 23:24:10 +0000 (16:24 -0700)
Signed-off-by: Josh Durgin <josh.durgin@inktank.com>
24 files changed:
.gitignore [new file with mode: 0644]
LICENSE [new file with mode: 0644]
README.rst [new file with mode: 0644]
bootstrap [new file with mode: 0755]
debian/changelog [new file with mode: 0644]
debian/compat [new file with mode: 0644]
debian/control [new file with mode: 0644]
debian/copyright [new file with mode: 0644]
debian/rules [new file with mode: 0755]
debian/source/format [new file with mode: 0644]
radosgw-agent.spec [new file with mode: 0644]
radosgw_agent/__init__.py [new file with mode: 0644]
radosgw_agent/cli.py [new file with mode: 0644]
radosgw_agent/client.py [new file with mode: 0644]
radosgw_agent/lock.py [new file with mode: 0644]
radosgw_agent/sync.py [new file with mode: 0644]
radosgw_agent/tests/__init__.py [new file with mode: 0644]
radosgw_agent/tests/test_client.py [new file with mode: 0644]
radosgw_agent/worker.py [new file with mode: 0644]
requirements-dev.txt [new file with mode: 0644]
requirements.txt [new file with mode: 0644]
setup.cfg [new file with mode: 0644]
setup.py [new file with mode: 0644]
tox.ini [new file with mode: 0644]

diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..d2d6f36
--- /dev/null
@@ -0,0 +1,35 @@
+*.py[cod]
+
+# C extensions
+*.so
+
+# Packages
+*.egg
+*.egg-info
+dist
+build
+eggs
+parts
+bin
+var
+sdist
+develop-eggs
+.installed.cfg
+lib
+lib64
+
+# Installer logs
+pip-log.txt
+
+# Unit test / coverage reports
+.coverage
+.tox
+nosetests.xml
+
+# Translations
+*.mo
+
+# Mr Developer
+.mr.developer.cfg
+.project
+.pydevproject
diff --git a/LICENSE b/LICENSE
new file mode 100644 (file)
index 0000000..6062a74
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,19 @@
+Copyright (c) 2013 Inktank Storage, Inc.
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/README.rst b/README.rst
new file mode 100644 (file)
index 0000000..656bd71
--- /dev/null
@@ -0,0 +1,3 @@
+====================================================================
+radosgw-agent -- synchronize data and users between radosgw clusters
+====================================================================
diff --git a/bootstrap b/bootstrap
new file mode 100755 (executable)
index 0000000..6dd1a2d
--- /dev/null
+++ b/bootstrap
@@ -0,0 +1,42 @@
+#!/bin/sh
+set -e
+
+if command -v lsb_release >/dev/null 2>&1; then
+    case "$(lsb_release --id --short)" in
+       Ubuntu|Debian)
+           for package in python-virtualenv; do
+               if [ "$(dpkg --status -- $package 2>/dev/null|sed -n 's/^Status: //p')" != "install ok installed" ]; then
+                   # add a space after old values
+                   missing="${missing:+$missing }$package"
+               fi
+           done
+           if [ -n "$missing" ]; then
+                       echo "$0: missing required packages, please install them:" 1>&2
+                       echo "  sudo apt-get install $missing"
+                       exit 1
+           fi
+           ;;
+    esac
+else
+       if [ -f /etc/redhat-release ]; then
+               case "$(cat /etc/redhat-release | awk '{print $1}')" in
+                       CentOS)
+                               for package in python-virtualenv; do
+                               if [ "$(rpm -qa $package 2>/dev/null)" == "" ]; then
+                                       missing="${missing:+$missing }$package"
+                               fi
+                               done
+                               if [ -n "$missing" ]; then
+                                       echo "$0: missing required packages, please install them:" 1>&2
+                                       echo "  sudo yum install $missing"
+                               exit 1
+                               fi
+                               ;;
+               esac
+       fi
+fi
+
+test -d virtualenv || virtualenv virtualenv
+./virtualenv/bin/python setup.py develop
+./virtualenv/bin/pip install -r requirements.txt -r requirements-dev.txt
+test -e radosgw-agent || ln -s ./virtualenv/bin/radosgw-agent .
diff --git a/debian/changelog b/debian/changelog
new file mode 100644 (file)
index 0000000..f0099e9
--- /dev/null
@@ -0,0 +1,11 @@
+radosgw-agent (1.1-1) precise; urgency=low
+
+  * new upstream release 
+
+ -- Gary Lowell <glowell@pudgy.ops.newdream.net>  Thu, 21 Nov 2013 16:17:25 -0800
+
+radosgw-agent (1.0-1) stable; urgency=low
+
+  * Initial release 
+
+ -- Gary Lowell <gary.lowell@inktank.com>  Mon, 26 Aug 2013 09:19:47 -0700
diff --git a/debian/compat b/debian/compat
new file mode 100644 (file)
index 0000000..45a4fb7
--- /dev/null
@@ -0,0 +1 @@
+8
diff --git a/debian/control b/debian/control
new file mode 100644 (file)
index 0000000..9810f72
--- /dev/null
@@ -0,0 +1,19 @@
+Source: radosgw-agent
+Maintainer: Sage Weil <sage@newdream.net>
+Uploaders: Sage Weil <sage@newdream.net>
+Section: admin
+Priority: optional
+Build-Depends: debhelper (>= 8), python-setuptools
+X-Python-Version: >= 2.4
+Standards-Version: 3.9.2
+Homepage: http://ceph.com/
+
+Package: radosgw-agent
+Architecture: all
+Depends: python,
+         python-argparse,
+         python-setuptools,
+         python-requests,
+         ${misc:Depends},
+         ${python:Depends}
+Description:  Rados gateway agents.
diff --git a/debian/copyright b/debian/copyright
new file mode 100644 (file)
index 0000000..730861e
--- /dev/null
@@ -0,0 +1,3 @@
+Files: *
+Copyright: (c) 2013 by Inktank Storage
+License: LGPL2.1 (see /usr/share/common-licenses/LGPL-2.1)
diff --git a/debian/rules b/debian/rules
new file mode 100755 (executable)
index 0000000..45200da
--- /dev/null
@@ -0,0 +1,8 @@
+#!/usr/bin/make -f
+
+# Uncomment this to turn on verbose mode.
+export DH_VERBOSE=1
+
+%:
+       dh $@ --buildsystem python_distutils --with python2
+
diff --git a/debian/source/format b/debian/source/format
new file mode 100644 (file)
index 0000000..d3827e7
--- /dev/null
@@ -0,0 +1 @@
+1.0
diff --git a/radosgw-agent.spec b/radosgw-agent.spec
new file mode 100644 (file)
index 0000000..15e9b8a
--- /dev/null
@@ -0,0 +1,41 @@
+%define name radosgw-agent
+%define version 1.1
+%define unmangled_version 1.1
+%define unmangled_version 1.1
+%define release 1
+
+Summary: Synchronize users and data between radosgw clusters
+Name: %{name}
+Version: %{version}
+Release: %{release}
+Source0: %{name}-%{unmangled_version}.tar.gz
+License: MIT
+Group: Development/Libraries
+BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-buildroot
+Prefix: %{_prefix}
+BuildArch: noarch
+Vendor: Josh Durgin <josh.durgin@inktank.com>
+Requires: python-argparse
+Requires: PyYAML
+Requires: python-boto >= 2.2.2
+Requires: python-boto < 3.0.0
+Requires: python-requests
+Url: https://github.com/ceph/radosgw-agent
+
+%description
+UNKNOWN
+
+%prep
+%setup -n %{name}-%{unmangled_version} -n %{name}-%{unmangled_version}
+
+%build
+python setup.py build
+
+%install
+python setup.py install --single-version-externally-managed -O1 --root=$RPM_BUILD_ROOT --record=INSTALLED_FILES
+
+%clean
+rm -rf $RPM_BUILD_ROOT
+
+%files -f INSTALLED_FILES
+%defattr(-,root,root)
diff --git a/radosgw_agent/__init__.py b/radosgw_agent/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/radosgw_agent/cli.py b/radosgw_agent/cli.py
new file mode 100644 (file)
index 0000000..48b19e7
--- /dev/null
@@ -0,0 +1,325 @@
+from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+import argparse
+import contextlib
+import logging
+import logging.handlers
+import yaml
+import sys
+
+from radosgw_agent import client
+from radosgw_agent import sync
+
+def check_positive_int(string):
+    value = int(string)
+    if value < 1:
+        msg = '%r is not a positive integer' % string
+        raise argparse.ArgumentTypeError(msg)
+    return value
+
+def check_endpoint(endpoint):
+    try:
+        return client.parse_endpoint(endpoint)
+    except client.InvalidProtocol as e:
+        raise argparse.ArgumentTypeError(str(e))
+    except client.InvalidHost as e:
+        raise argparse.ArgumentTypeError(str(e))
+
+def parse_args():
+    conf_parser = argparse.ArgumentParser(add_help=False)
+    conf_parser.add_argument(
+        '-c', '--conf',
+        type=file,
+        help='configuration file'
+        )
+    args, remaining = conf_parser.parse_known_args()
+    defaults = dict(
+        sync_scope='incremental',
+        log_lock_time=20,
+        )
+    if args.conf is not None:
+        with contextlib.closing(args.conf):
+            config = yaml.safe_load_all(args.conf)
+            for new in config:
+                defaults.update(new)
+
+    parser = argparse.ArgumentParser(
+        parents=[conf_parser],
+        description='Synchronize radosgw installations',
+        )
+    parser.set_defaults(**defaults)
+    verbosity = parser.add_mutually_exclusive_group(required=False)
+    verbosity.add_argument(
+        '-v', '--verbose',
+        action='store_true', dest='verbose',
+        help='be more verbose',
+        )
+    verbosity.add_argument(
+        '-q', '--quiet',
+        action='store_true', dest='quiet',
+        help='be less verbose',
+        )
+    parser.add_argument(
+        '--src-access-key',
+        required='src_access_key' not in defaults,
+        help='access key for source zone system user',
+        )
+    parser.add_argument(
+        '--src-secret-key',
+        required='src_secret_key' not in defaults,
+        help='secret key for source zone system user',
+        )
+    parser.add_argument(
+        '--dest-access-key',
+        required='dest_access_key' not in defaults,
+        help='access key for destination zone system user',
+        )
+    parser.add_argument(
+        '--dest-secret-key',
+        required='dest_secret_key' not in defaults,
+        help='secret key for destination zone system user',
+        )
+    parser.add_argument(
+        'destination',
+        type=check_endpoint,
+        nargs=None if 'destination' not in defaults else '?',
+        help='radosgw endpoint to which to sync '
+        '(e.g. http://zone2.example.org:8080)',
+        )
+    src_options = parser.add_mutually_exclusive_group(required=False)
+    src_options.add_argument(
+        '--source',
+        type=check_endpoint,
+        help='radosgw endpoint from which to sync '
+        '(e.g. http://zone1.example.org:8080)',
+        )
+    src_options.add_argument(
+        '--src-zone',
+        help='radosgw zone from which to sync',
+        )
+    parser.add_argument(
+        '--metadata-only',
+        action='store_true',
+        help='sync bucket and user metadata, but not bucket contents',
+        )
+    parser.add_argument(
+        '--num-workers',
+        default=1,
+        type=check_positive_int,
+        help='number of items to sync at once',
+        )
+    parser.add_argument(
+        '--sync-scope',
+        choices=['full', 'incremental'],
+        default='incremental',
+        help='synchronize everything (for a new region) or only things that '
+             'have changed since the last run',
+        )
+    parser.add_argument(
+        '--lock-timeout',
+        type=check_positive_int,
+        default=60,
+        help='timeout in seconds after which a log segment lock will expire if '
+             'not refreshed',
+        )
+    parser.add_argument(
+        '--log-file',
+        help='where to store log output',
+        )
+    parser.add_argument(
+        '--max-entries',
+        type=check_positive_int,
+        default=1000,
+        help='maximum number of log entries to process at once during '
+        'continuous sync',
+        )
+    parser.add_argument(
+        '--incremental-sync-delay',
+        type=check_positive_int,
+        default=30,
+        help='seconds to wait between syncs',
+        )
+    parser.add_argument(
+        '--object-sync-timeout',
+        type=check_positive_int,
+        default=60 * 60 * 60,
+        help='seconds to wait for an individual object to sync before '
+        'assuming failure',
+        )
+    parser.add_argument(
+        '--prepare-error-delay',
+        type=check_positive_int,
+        default=10,
+        help='seconds to wait before retrying when preparing '
+        'an incremental sync fails',
+        )
+    parser.add_argument(
+        '--rgw-data-log-window',
+        type=check_positive_int,
+        default=30,
+        help='period until a data log entry is valid - '
+        'must match radosgw configuration',
+        )
+    parser.add_argument(
+        '--test-server-host',
+        # host to run a simple http server for testing the sync agent on,
+        help=argparse.SUPPRESS,
+        )
+    parser.add_argument(
+        '--test-server-port',
+        # port to run a simple http server for testing the sync agent on,
+        type=check_positive_int,
+        default=8080,
+        help=argparse.SUPPRESS,
+        )
+    return parser.parse_args(remaining)
+
+class TestHandler(BaseHTTPRequestHandler):
+    """HTTP handler for testing radosgw-agent.
+
+    This should never be used outside of testing.
+    """
+    num_workers = None
+    lock_timeout = None
+    max_entries = None
+    rgw_data_log_window = 30
+    src = None
+    dest = None
+
+    def do_POST(self):
+        log = logging.getLogger(__name__)
+        status = 200
+        resp = ''
+        sync_cls = None
+        if self.path.startswith('/metadata/full'):
+            sync_cls = sync.MetaSyncerFull
+        elif self.path.startswith('/metadata/incremental'):
+            sync_cls = sync.MetaSyncerInc
+        elif self.path.startswith('/data/full'):
+            sync_cls = sync.DataSyncerFull
+        elif self.path.startswith('/data/incremental'):
+            sync_cls = sync.DataSyncerInc
+        else:
+            log.warn('invalid request, ignoring')
+            status = 400
+            resp = 'bad path'
+
+        try:
+            if sync_cls is not None:
+                syncer = sync_cls(TestHandler.src, TestHandler.dest,
+                                  TestHandler.max_entries,
+                                  rgw_data_log_window=TestHandler.rgw_data_log_window,
+                                  object_sync_timeout=TestHandler.object_sync_timeout)
+                syncer.prepare()
+                syncer.sync(
+                    TestHandler.num_workers,
+                    TestHandler.lock_timeout,
+                    )
+        except Exception as e:
+            log.exception('error during sync')
+            status = 500
+            resp = str(e)
+
+        self.log_request(status, len(resp))
+        if status >= 400:
+            self.send_error(status, resp)
+        else:
+            self.send_response(status)
+            self.end_headers()
+
+def main():
+    args = parse_args()
+    log = logging.getLogger()
+    log_level = logging.INFO
+    lib_log_level = logging.WARN
+    if args.verbose:
+        log_level = logging.DEBUG
+        lib_log_level = logging.DEBUG
+    elif args.quiet:
+        log_level = logging.WARN
+    logging.basicConfig(level=log_level)
+    logging.getLogger('boto').setLevel(lib_log_level)
+    logging.getLogger('requests').setLevel(lib_log_level)
+
+    if args.log_file is not None:
+        handler = logging.handlers.WatchedFileHandler(
+            filename=args.log_file,
+            )
+        formatter = logging.Formatter(
+            fmt='%(asctime)s.%(msecs)03d %(process)d:%(levelname)s:%(name)s:%(message)s',
+            datefmt='%Y-%m-%dT%H:%M:%S',
+            )
+        handler.setFormatter(formatter)
+        logging.getLogger().addHandler(handler)
+
+    dest = args.destination
+    dest.access_key = args.dest_access_key
+    dest.secret_key = args.dest_secret_key
+    src = args.source or client.Endpoint(None, None, None)
+    if args.src_zone:
+        src.zone = args.src_zone
+    dest_conn = client.connection(dest)
+
+    try:
+        region_map = client.get_region_map(dest_conn)
+    except Exception:
+        log.exception('Could not retrieve region map from destination')
+        sys.exit(1)
+
+    try:
+        client.configure_endpoints(region_map, dest, src, args.metadata_only)
+    except client.ClientException as e:
+        log.error(e)
+        sys.exit(1)
+
+    src.access_key = args.src_access_key
+    src.secret_key = args.src_secret_key
+
+    if args.test_server_host:
+        log.warn('TEST MODE - do not run unless you are testing this program')
+        TestHandler.src = src
+        TestHandler.dest = dest
+        TestHandler.num_workers = args.num_workers
+        TestHandler.lock_timeout = args.lock_timeout
+        TestHandler.max_entries = args.max_entries
+        TestHandler.rgw_data_log_window = args.rgw_data_log_window
+        TestHandler.object_sync_timeout = args.object_sync_timeout
+        server = HTTPServer((args.test_server_host, args.test_server_port),
+                            TestHandler)
+        server.serve_forever()
+        sys.exit()
+
+    if args.sync_scope == 'full':
+        meta_cls = sync.MetaSyncerFull
+        data_cls = sync.DataSyncerFull
+    else:
+        meta_cls = sync.MetaSyncerInc
+        data_cls = sync.DataSyncerInc
+
+    meta_syncer = meta_cls(src, dest, args.max_entries)
+    data_syncer = data_cls(src, dest, args.max_entries,
+                           rgw_data_log_window=args.rgw_data_log_window,
+                           object_sync_timeout=args.object_sync_timeout)
+
+    # fetch logs first since data logs need to wait before becoming usable
+    # due to rgw's window of data log updates during which the bucket index
+    # log may still be updated without the data log getting a new entry for
+    # the bucket
+    sync.prepare_sync(meta_syncer, args.prepare_error_delay)
+    if not args.metadata_only:
+        sync.prepare_sync(data_syncer, args.prepare_error_delay)
+
+    if args.sync_scope == 'full':
+        log.info('syncing all metadata')
+        meta_syncer.sync(args.num_workers, args.lock_timeout)
+        if not args.metadata_only:
+            log.info('syncing all data')
+            data_syncer.sync(args.num_workers, args.lock_timeout)
+        log.info('Finished full sync. Check logs to see any issues that '
+                 'incremental sync will retry.')
+    else:
+        sync.incremental_sync(meta_syncer, data_syncer,
+                              args.num_workers,
+                              args.lock_timeout,
+                              args.incremental_sync_delay,
+                              args.metadata_only,
+                              args.prepare_error_delay)
diff --git a/radosgw_agent/client.py b/radosgw_agent/client.py
new file mode 100644 (file)
index 0000000..2b8f027
--- /dev/null
@@ -0,0 +1,461 @@
+import boto
+import functools
+import json
+import logging
+import random
+import requests
+import urllib
+from urlparse import urlparse
+
+from boto.connection import AWSAuthConnection
+from boto.s3.connection import S3Connection
+
+log = logging.getLogger(__name__)
+
+class Endpoint(object):
+    def __init__(self, host, port, secure,
+                 access_key=None, secret_key=None, region=None, zone=None):
+        self.host = host
+        default_port = 443 if secure else 80
+        self.port = port or default_port
+        self.secure = secure
+        self.access_key = access_key
+        self.secret_key = secret_key
+        self.region = region
+        self.zone = zone
+
+    def __eq__(self, other):
+        if self.host != other.host:
+            return False
+        if self.port == other.port:
+            return True
+        # if self and other are mixed http/https with default ports,
+        # i.e. http://example.com and https://example.com, consider
+        # them the same
+        def diff_only_default_ports(a, b):
+            return a.secure and a.port == 443 and not b.secure and b.port == 80
+        return (diff_only_default_ports(self, other) or
+                diff_only_default_ports(other, self))
+
+    def __repr__(self):
+        return 'Endpoint(host={host}, port={port}, secure={secure})'.format(
+            host=self.host,
+            port=self.port,
+            secure=self.secure)
+
+    def __str__(self):
+        scheme = 'https' if self.secure else 'http'
+        return '{scheme}://{host}:{port}'.format(scheme=scheme,
+                                                 host=self.host,
+                                                 port=self.port)
+
+class ClientException(Exception):
+    pass
+class InvalidProtocol(ClientException):
+    pass
+class InvalidHost(ClientException):
+    pass
+class InvalidZone(ClientException):
+    pass
+class ZoneNotFound(ClientException):
+    pass
+
+def parse_endpoint(endpoint):
+    url = urlparse(endpoint)
+    if url.scheme not in ['http', 'https']:
+        raise InvalidProtocol('invalid protocol %r' % url.scheme)
+    if not url.hostname:
+        raise InvalidHost('no hostname in %r' % endpoint)
+    return Endpoint(url.hostname, url.port, url.scheme == 'https')
+
+class HttpError(ClientException):
+    def __init__(self, code, body):
+        self.code = code
+        self.body = body
+        self.message = 'Http error code %s content %s' % (code, body)
+    def __str__(self):
+        return self.message
+class NotFound(HttpError):
+    pass
+code_to_exc = {
+    404: NotFound,
+    }
+
+def boto_call(func):
+    @functools.wraps(func)
+    def translate_exception(*args, **kwargs):
+        try:
+            func(*args, **kwargs)
+        except boto.exception.S3ResponseError as e:
+            raise code_to_exc.get(e.status, HttpError)(e.status, e.body)
+    return translate_exception
+
+
+"""
+Adapted from the build_request() method of boto.connection
+"""
+
+def _build_request(conn, method, basepath='', resource = '', headers=None,
+                   data=None, special_first_param=None, params=None):
+    path = conn.calling_format.build_path_base(basepath, resource)
+    auth_path = conn.calling_format.build_auth_path(basepath, resource)
+    host = conn.calling_format.build_host(conn.server_name(), '')
+
+    if special_first_param:
+        path += '?' + special_first_param
+        boto.log.debug('path=%s' % path)
+        auth_path += '?' + special_first_param
+        boto.log.debug('auth_path=%s' % auth_path)
+
+    return AWSAuthConnection.build_base_http_request(
+        conn, method, path, auth_path, params, headers, data, host)
+
+def check_result_status(result):
+    if result.status_code / 100 != 2:
+        raise code_to_exc.get(result.status_code,
+                              HttpError)(result.status_code, result.content)
+def url_safe(component):
+    if isinstance(component, basestring):
+        string = component.encode('utf8')
+    else:
+        string = str(component)
+    return urllib.quote(string)
+
+def request(connection, type_, resource, params=None, headers=None,
+            data=None, expect_json=True, special_first_param=None):
+    if headers is None:
+        headers = {}
+
+    if type_ in ['put', 'post']:
+        headers['Content-Type'] = 'application/json; charset=UTF-8'
+
+    request_data = data if data else ''
+    if params is None:
+        params = {}
+    safe_params = dict([(k, url_safe(v)) for k, v in params.iteritems()])
+    request = _build_request(connection,
+                             type_.upper(),
+                             resource=resource,
+                             special_first_param=special_first_param,
+                             headers=headers,
+                             data=request_data,
+                             params=safe_params)
+
+    url = '{protocol}://{host}{path}'.format(protocol=request.protocol,
+                                             host=request.host,
+                                             path=request.path)
+
+    request.authorize(connection=connection)
+
+    handler = getattr(requests, type_)
+    boto.log.debug('url = %r\nparams=%r\nheaders=%r\ndata=%r',
+                   url, params, request.headers, data)
+    result = handler(url, params=params, headers=request.headers, data=data)
+
+    check_result_status(result)
+
+    if data or not expect_json:
+        return result.raw
+    return result.json()
+
+def get_metadata(connection, section, name):
+    return request(connection, 'get', 'admin/metadata/' + section,
+                   params=dict(key=name))
+
+def update_metadata(connection, section, name, metadata):
+    if not isinstance(metadata, basestring):
+        metadata = json.dumps(metadata)
+    return request(connection, 'put', 'admin/metadata/' + section,
+                   params=dict(key=name), data=metadata)
+
+def delete_metadata(connection, section, name):
+    return request(connection, 'delete', 'admin/metadata/' + section,
+                   params=dict(key=name), expect_json=False)
+
+def get_metadata_sections(connection):
+    return request(connection, 'get', 'admin/metadata')
+
+def list_metadata_keys(connection, section):
+    return request(connection, 'get', 'admin/metadata/' + section)
+
+def get_op_state(connection, client_id, op_id, bucket, obj):
+    return request(connection, 'get', 'admin/opstate',
+                   params={
+                       'op-id': op_id,
+                       'object': '{0}/{1}'.format(bucket, obj),
+                       'client-id': client_id,
+                      }
+                   )
+
+def remove_op_state(connection, client_id, op_id, bucket, obj):
+    return request(connection, 'delete', 'admin/opstate',
+                   params={
+                       'op-id': op_id,
+                       'object': '{0}/{1}'.format(bucket, obj),
+                       'client-id': client_id,
+                      },
+                   expect_json=False,
+                   )
+
+def get_bucket_list(connection):
+    return list_metadata_keys(connection, 'bucket')
+
+@boto_call
+def list_objects_in_bucket(connection, bucket_name):
+    # use the boto library to do this
+    bucket = connection.get_bucket(bucket_name)
+    return bucket.list()
+
+@boto_call
+def delete_object(connection, bucket_name, object_name):
+    bucket = connection.get_bucket(bucket_name)
+    bucket.delete_key(object_name)
+
+def sync_object_intra_region(connection, bucket_name, object_name, src_zone,
+                             client_id, op_id):
+    path = '{bucket}/{object}'.format(
+        bucket=url_safe(bucket_name),
+        object=url_safe(object_name),
+        )
+    return request(connection, 'put', path,
+                   params={
+                       'rgwx-source-zone': src_zone,
+                       'rgwx-client-id': client_id,
+                       'rgwx-op-id': op_id,
+                       },
+                   headers={
+                       'x-amz-copy-source': '%s/%s' % (bucket_name, object_name),
+                       },
+                   expect_json=False)
+
+def lock_shard(connection, lock_type, shard_num, zone_id, timeout, locker_id):
+    return request(connection, 'post', 'admin/log',
+                   params={
+                       'type': lock_type,
+                       'id': shard_num,
+                       'length': timeout,
+                       'zone-id': zone_id,
+                       'locker-id': locker_id,
+                       },
+                   special_first_param='lock',
+                   expect_json=False)
+
+def unlock_shard(connection, lock_type, shard_num, zone_id, locker_id):
+    return request(connection, 'post', 'admin/log',
+                   params={
+                       'type': lock_type,
+                       'id': shard_num,
+                       'locker-id': locker_id,
+                       'zone-id': zone_id,
+                       },
+                   special_first_param='unlock',
+                   expect_json=False)
+
+def _id_name(type_):
+    return 'bucket-instance' if type_ == 'bucket-index' else 'id'
+
+def get_log(connection, log_type, marker, max_entries, id_):
+    key = _id_name(log_type)
+    return request(connection, 'get', 'admin/log',
+                   params={
+                       'type': log_type,
+                       key: id_,
+                       'marker': marker,
+                       'max-entries': max_entries,
+                       },
+                   )
+
+def get_log_info(connection, log_type, id_):
+    key = _id_name(log_type)
+    return request(
+        connection, 'get', 'admin/log',
+        params={
+            'type': log_type,
+            key: id_,
+            },
+        special_first_param='info',
+        )
+
+def num_log_shards(connection, shard_type):
+    out = request(connection, 'get', 'admin/log', dict(type=shard_type))
+    return out['num_objects']
+
+def set_worker_bound(connection, type_, marker, timestamp,
+                     daemon_id, id_, data=None):
+    if data is None:
+        data = []
+    key = _id_name(type_)
+    boto.log.debug('set_worker_bound: data = %r', data)
+    return request(
+        connection, 'post', 'admin/replica_log',
+        params={
+            'type': type_,
+            key: id_,
+            'marker': marker,
+            'time': timestamp,
+            'daemon_id': daemon_id,
+            },
+        data=json.dumps(data),
+        special_first_param='work_bound',
+        )
+
+def del_worker_bound(connection, type_, daemon_id, id_):
+    key = _id_name(type_)
+    return request(
+        connection, 'delete', 'admin/replica_log',
+        params={
+            'type': type_,
+            key: id_,
+            'daemon_id': daemon_id,
+            },
+        special_first_param='work_bound',
+        expect_json=False,
+        )
+
+def get_worker_bound(connection, type_, id_):
+    key = _id_name(type_)
+    out = request(
+        connection, 'get', 'admin/replica_log',
+        params={
+            'type': type_,
+            key: id_,
+            },
+        special_first_param='bounds',
+        )
+    boto.log.debug('get_worker_bound returned: %r', out)
+    retries = set()
+    for item in out['markers']:
+        names = [retry['name'] for retry in item['items_in_progress']]
+        retries = retries.union(names)
+    return out['marker'], out['oldest_time'], retries
+
+class Zone(object):
+    def __init__(self, zone_info):
+        self.name = zone_info['name']
+        self.is_master = False
+        self.endpoints = [parse_endpoint(e) for e in zone_info['endpoints']]
+        self.log_meta = zone_info['log_meta'] == 'true'
+        self.log_data = zone_info['log_data'] == 'true'
+
+    def __repr__(self):
+        return str(self)
+
+    def __str__(self):
+        return self.name
+
+class Region(object):
+    def __init__(self, region_info):
+        self.name = region_info['key']
+        self.is_master = region_info['val']['is_master'] == 'true'
+        self.zones = {}
+        for zone_info in region_info['val']['zones']:
+            zone = Zone(zone_info)
+            self.zones[zone.name] = zone
+            if zone.name == region_info['val']['master_zone']:
+                zone.is_master = True
+                self.master_zone = zone
+        assert hasattr(self, 'master_zone'), \
+               'No master zone found for region ' + self.name
+
+    def __repr__(self):
+        return str(self)
+
+    def __str__(self):
+        return str(self.zones.keys())
+
+class RegionMap(object):
+    def __init__(self, region_map):
+        self.regions = {}
+        for region_info in region_map['regions']:
+            region = Region(region_info)
+            self.regions[region.name] = region
+            if region.is_master:
+                self.master_region = region
+        assert hasattr(self, 'master_region'), \
+               'No master region found in region map'
+
+    def __repr__(self):
+        return str(self)
+
+    def __str__(self):
+        return str(self.regions)
+
+    def find_endpoint(self, endpoint):
+        for region in self.regions.itervalues():
+            for zone in region.zones.itervalues():
+                if endpoint in zone.endpoints or endpoint.zone == zone.name:
+                    return region, zone
+        raise ZoneNotFound('%s not found in region map' % endpoint)
+
+def get_region_map(connection):
+    region_map = request(connection, 'get', 'admin/config')
+    return RegionMap(region_map)
+
+def _validate_sync_dest(dest_region, dest_zone):
+    if dest_region.is_master and dest_zone.is_master:
+        raise InvalidZone('destination cannot be master zone of master region')
+
+def _validate_sync_source(src_region, src_zone, dest_region, dest_zone,
+                          meta_only):
+    if not src_zone.is_master:
+        raise InvalidZone('source zone %s must be a master zone' % src_zone.name)
+    if (src_region.name == dest_region.name and
+        src_zone.name == dest_zone.name):
+        raise InvalidZone('source and destination must be different zones')
+    if not src_zone.log_meta:
+        raise InvalidZone('source zone %s must have metadata logging enabled' % src_zone.name)
+    if not meta_only and not src_zone.log_data:
+        raise InvalidZone('source zone %s must have data logging enabled' % src_zone.name)
+    if not meta_only and src_region.name != dest_region.name:
+        raise InvalidZone('data sync can only occur between zones in the same region')
+    if not src_zone.endpoints:
+        raise InvalidZone('region map contains no endpoints for default source zone %s' % src_zone.name)
+
+def configure_endpoints(region_map, dest_endpoint, src_endpoint, meta_only):
+    print('region map is: %r' % region_map)
+
+    dest_region, dest_zone = region_map.find_endpoint(dest_endpoint)
+    _validate_sync_dest(dest_region, dest_zone)
+
+    # source may be specified by http endpoint or zone name
+    if src_endpoint.host or src_endpoint.zone:
+        src_region, src_zone = region_map.find_endpoint(src_endpoint)
+    else:
+        # try the master zone in the same region, then the master zone
+        # in the master region
+        try:
+            _validate_sync_source(dest_region, dest_region.master_zone,
+                                  dest_region, dest_zone, meta_only)
+            src_region, src_zone = dest_region, dest_region.master_zone
+        except InvalidZone as e:
+            log.debug('source region %s zone %s unaccetpable: %s',
+                      dest_region.name, dest_region.master_zone.name, e)
+            master_region = region_map.master_region
+            src_region, src_zone = master_region, master_region.master_zone
+
+    _validate_sync_source(src_region, src_zone, dest_region, dest_zone,
+                          meta_only)
+
+    # choose a random source endpoint if one wasn't specified
+    if not src_endpoint.host:
+        endpoint = random.choice(src_zone.endpoints)
+        src_endpoint.host = endpoint.host
+        src_endpoint.port = endpoint.port
+        src_endpoint.secure = endpoint.secure
+
+    # fill in region and zone names
+    dest_endpoint.region = dest_region
+    dest_endpoint.zone = dest_zone
+    src_endpoint.region = src_region
+    src_endpoint.zone = src_zone
+
+def connection(endpoint, debug=None):
+    return S3Connection(
+        aws_access_key_id=endpoint.access_key,
+        aws_secret_access_key=endpoint.secret_key,
+        is_secure=endpoint.secure,
+        host=endpoint.host,
+        port=endpoint.port,
+        calling_format=boto.s3.connection.OrdinaryCallingFormat(),
+        debug=debug,
+        )
diff --git a/radosgw_agent/lock.py b/radosgw_agent/lock.py
new file mode 100644 (file)
index 0000000..2036f8e
--- /dev/null
@@ -0,0 +1,107 @@
+import logging
+import threading
+import time
+
+from radosgw_agent import client
+
+log = logging.getLogger(__name__)
+
+class LockBroken(Exception):
+    pass
+
+class LockRenewFailed(LockBroken):
+    pass
+
+class LockExpired(LockBroken):
+    pass
+
+class Lock(threading.Thread):
+    """A lock on a shard log that automatically refreshes itself.
+
+    It may be used to lock different shards throughout its lifetime.
+    To lock a new shard, call aquire() with the shard_num desired.
+
+    To release the lock, call release_and_clear(). This will raise an
+    exception if the lock ever failed to be acquired in the timeout
+    period.
+    """
+
+    def __init__(self, conn, type_, locker_id, timeout, zone_id):
+        super(Lock, self).__init__()
+        self.conn = conn
+        self.type = type_
+        self.timeout = timeout
+        self.lock = threading.Lock()
+        self.locker_id = locker_id
+        self.zone_id = zone_id
+        self.shard_num = None
+        self.last_locked = None
+        self.failed = False
+
+    def set_shard(self, shard_num):
+        log.debug('set_shard to %d', shard_num)
+        with self.lock:
+            assert self.shard_num is None, \
+                'attempted to acquire new lock without releasing old one'
+            self.failed = False
+            self.last_locked = None
+            self.shard_num = shard_num
+
+    def unset_shard(self):
+        log.debug('unset shard')
+        with self.lock:
+            self.shard_num = None
+
+    def acquire(self):
+        """Renew an existing lock, or acquire a new one.
+
+        The old lock must have already been released if shard_num is specified.
+        client.NotFound may be raised if the log contains no entries.
+        """
+        log.debug('acquire lock')
+        with self.lock:
+            self._acquire()
+
+    def _acquire(self):
+        # same as aqcuire() but assumes self.lock is held
+        now = time.time()
+        client.lock_shard(self.conn, self.type, self.shard_num,
+                          self.zone_id, self.timeout, self.locker_id)
+        self.last_locked = now
+
+    def release_and_clear(self):
+        """Release the lock currently being held.
+
+        Prevent it from being automatically renewed, and check if there
+        were any errors renewing the current lock or if it expired.
+        If the lock was not sustained, raise LockAcquireFailed or LockExpired.
+        """
+        log.debug('release and clear lock')
+        with self.lock:
+            shard_num = self.shard_num
+            self.shard_num = None
+            diff = time.time() - self.last_locked
+            if diff > self.timeout:
+                msg = 'lock was not renewed in over %0.2f seconds' % diff
+                raise LockExpired(msg)
+            if self.failed:
+                raise LockRenewFailed()
+            try:
+                client.unlock_shard(self.conn, self.type, shard_num,
+                                    self.zone_id, self.locker_id)
+            except client.HttpError as e:
+                log.warn('failed to unlock shard %d in zone %s: %s',
+                         shard_num, self.zone_id, e)
+            self.last_locked = None
+
+    def run(self):
+        while True:
+            with self.lock:
+                if self.shard_num is not None:
+                    try:
+                        self._acquire()
+                    except client.HttpError as e:
+                        log.error('locking shard %d in zone %s failed: %s',
+                                  self.shard_num, self.zone_id, e)
+                        self.failed = True
+            time.sleep(0.5 * self.timeout)
diff --git a/radosgw_agent/sync.py b/radosgw_agent/sync.py
new file mode 100644 (file)
index 0000000..67ebea6
--- /dev/null
@@ -0,0 +1,318 @@
+import logging
+import multiprocessing
+import time
+
+from radosgw_agent import worker
+from radosgw_agent import client
+
+log = logging.getLogger(__name__)
+
+# the replica log api only supports one entry, and updating it
+# requires sending a daemon id that matches the existing one. This
+# doesn't make a whole lot of sense with the current structure of
+# radosgw-agent, so just use a constant value for the daemon id.
+DAEMON_ID = 'radosgw-agent'
+
+def prepare_sync(syncer, error_delay):
+    """Attempt to prepare a syncer for running a sync.
+
+    :param error_delay: seconds to wait before retrying
+
+    This will retry forever so the sync agent continues if radosgws
+    are unavailable temporarily.
+    """
+    while True:
+        try:
+            syncer.prepare()
+            break
+        except Exception:
+            log.warn('error preparing for sync, will retry. Traceback:',
+                     exc_info=True)
+            time.sleep(error_delay)
+
+def incremental_sync(meta_syncer, data_syncer, num_workers, lock_timeout,
+                     incremental_sync_delay, metadata_only, error_delay):
+    """Run a continuous incremental sync.
+
+    This will run forever, pausing between syncs by a
+    incremental_sync_delay seconds.
+    """
+    while True:
+        try:
+            meta_syncer.sync(num_workers, lock_timeout)
+            if not metadata_only:
+                data_syncer.sync(num_workers, lock_timeout)
+        except Exception:
+            log.warn('error doing incremental sync, will try again. Traceback:',
+                     exc_info=True)
+
+        # prepare data before sleeping due to rgw_log_bucket_window
+        if not metadata_only:
+            prepare_sync(data_syncer, error_delay)
+        log.info('waiting %d seconds until next sync',
+                 incremental_sync_delay)
+        time.sleep(incremental_sync_delay)
+        prepare_sync(meta_syncer, error_delay)
+
+class Syncer(object):
+    def __init__(self, src, dest, max_entries, *args, **kwargs):
+        self.src = src
+        self.dest = dest
+        self.src_conn = client.connection(src)
+        self.dest_conn = client.connection(dest)
+        self.daemon_id = DAEMON_ID
+        self.worker_cls = None # filled in by subclass constructor
+        self.num_shards = None
+        self.max_entries = max_entries
+        self.object_sync_timeout = kwargs.get('object_sync_timeout')
+
+    def init_num_shards(self):
+        if self.num_shards is not None:
+            return
+        try:
+            self.num_shards = client.num_log_shards(self.src_conn, self.type)
+            log.debug('%d shards to check', self.num_shards)
+        except Exception:
+            log.error('finding number of shards failed')
+            raise
+
+    def shard_num_for_key(self, key):
+        key = key.encode('utf8')
+        hash_val = 0
+        for char in key:
+            c = ord(char)
+            hash_val = (hash_val + (c << 4) + (c >> 4)) * 11
+        return hash_val % self.num_shards
+
+    def prepare(self):
+        """Setup any state required before syncing starts.
+
+        This must be called before sync().
+        """
+        pass
+
+    def generate_work(self):
+        """Generate items to be place in a queue or processing"""
+        pass
+
+    def wait_until_ready(self):
+        pass
+
+    def complete_item(self, shard_num, retries):
+        """Called when syncing a single item completes successfully"""
+        marker = self.shard_info.get(shard_num)
+        if not marker:
+            return
+        try:
+            data = [dict(name=retry, time=worker.DEFAULT_TIME)
+                    for retry in retries]
+            client.set_worker_bound(self.dest_conn,
+                                    self.type,
+                                    marker,
+                                    worker.DEFAULT_TIME,
+                                    self.daemon_id,
+                                    shard_num,
+                                    data)
+        except Exception:
+            log.warn('could not set worker bounds, may repeat some work.'
+                     'Traceback:', exc_info=True)
+
+    def sync(self, num_workers, log_lock_time):
+        workQueue = multiprocessing.Queue()
+        resultQueue = multiprocessing.Queue()
+
+        processes = [self.worker_cls(workQueue,
+                                     resultQueue,
+                                     log_lock_time,
+                                     self.src,
+                                     self.dest,
+                                     daemon_id=self.daemon_id,
+                                     max_entries=self.max_entries,
+                                     object_sync_timeout=self.object_sync_timeout,
+                                     )
+                     for i in xrange(num_workers)]
+        for process in processes:
+            process.daemon = True
+            process.start()
+
+        self.wait_until_ready()
+
+        log.info('Starting sync')
+        # enqueue the shards to be synced
+        num_items = 0
+        for item in self.generate_work():
+            num_items += 1
+            workQueue.put(item)
+
+        # add a poison pill for each worker
+        for i in xrange(num_workers):
+            workQueue.put(None)
+
+        # pull the results out as they are produced
+        retries = {}
+        for i in xrange(num_items):
+            result, item = resultQueue.get()
+            shard_num, retries = item
+            if result == worker.RESULT_SUCCESS:
+                log.debug('synced item %r successfully', item)
+                self.complete_item(shard_num, retries)
+            else:
+                log.error('error syncing shard %d', shard_num)
+                retries.append(shard_num)
+
+            log.info('%d/%d items processed', i + 1, num_items)
+        if retries:
+            log.error('Encountered errors syncing these %d shards: %r',
+                      len(retries), retries)
+
+
+class IncrementalSyncer(Syncer):
+
+    def get_worker_bound(self, shard_num):
+        try:
+            marker, timestamp, retries = client.get_worker_bound(
+                self.dest_conn,
+                self.type,
+                shard_num)
+            log.debug('oldest marker and time for shard %d are: %r %r',
+                      shard_num, marker, timestamp)
+            log.debug('%d items to retrie are: %r', len(retries), retries)
+        except client.NotFound:
+            # if no worker bounds have been set, start from the beginning
+            marker, retries = '', []
+        return marker, retries
+
+    def get_log_entries(self, shard_num, marker):
+        try:
+            result = client.get_log(self.src_conn, self.type,
+                                    marker, self.max_entries,
+                                    shard_num)
+            last_marker = result['marker']
+            log_entries = result['entries']
+            if len(log_entries) == self.max_entries:
+                log.warn('shard %d log has fallen behind - log length >= %d',
+                         shard_num, self.max_entries)
+        except client.NotFound:
+            # no entries past this marker yet, but we my have retries
+            last_marker = ''
+            log_entries = []
+        return last_marker, log_entries
+
+    def prepare(self):
+        self.init_num_shards()
+
+        self.shard_info = {}
+        self.shard_work = {}
+        for shard_num in xrange(self.num_shards):
+            marker, retries = self.get_worker_bound(shard_num)
+            last_marker, log_entries = self.get_log_entries(shard_num, marker)
+            self.shard_work[shard_num] = log_entries, retries
+            self.shard_info[shard_num] = last_marker
+
+        self.prepared_at = time.time()
+
+    def generate_work(self):
+        return self.shard_work.iteritems()
+
+
+class MetaSyncerInc(IncrementalSyncer):
+
+    def __init__(self, *args, **kwargs):
+        super(MetaSyncerInc, self).__init__(*args, **kwargs)
+        self.worker_cls = worker.MetadataWorkerIncremental
+        self.type = 'metadata'
+
+
+class DataSyncerInc(IncrementalSyncer):
+
+    def __init__(self, *args, **kwargs):
+        super(DataSyncerInc, self).__init__(*args, **kwargs)
+        self.worker_cls = worker.DataWorkerIncremental
+        self.type = 'data'
+        self.rgw_data_log_window = kwargs.get('rgw_data_log_window', 30)
+
+    def wait_until_ready(self):
+        log.info('waiting to make sure bucket log is consistent')
+        while time.time() < self.prepared_at + self.rgw_data_log_window:
+            time.sleep(1)
+
+
+class DataSyncerFull(Syncer):
+
+    def __init__(self, *args, **kwargs):
+        super(DataSyncerFull, self).__init__(*args, **kwargs)
+        self.worker_cls = worker.DataWorkerFull
+        self.type = 'data'
+        self.rgw_data_log_window = kwargs.get('rgw_data_log_window', 30)
+
+    def prepare(self):
+        self.init_num_shards()
+
+        # save data log markers for each shard
+        self.shard_info = {}
+        for shard in xrange(self.num_shards):
+            info = client.get_log_info(self.src_conn, 'data', shard)
+            # setting an empty marker returns an error
+            if info['marker']:
+                self.shard_info[shard] = info['marker']
+
+        # get list of buckets after getting any markers to avoid skipping
+        # entries added before we got the marker info
+        buckets = client.get_bucket_list(self.src_conn)
+
+        self.prepared_at = time.time()
+
+        self.buckets_by_shard = {}
+        for bucket in buckets:
+            shard = self.shard_num_for_key(bucket)
+            self.buckets_by_shard.setdefault(shard, [])
+            self.buckets_by_shard[shard].append(bucket)
+
+    def generate_work(self):
+        return self.buckets_by_shard.iteritems()
+
+    def wait_until_ready(self):
+        log.info('waiting to make sure bucket log is consistent')
+        while time.time() < self.prepared_at + self.rgw_data_log_window:
+            time.sleep(1)
+
+
+class MetaSyncerFull(Syncer):
+    def __init__(self, *args, **kwargs):
+        super(MetaSyncerFull, self).__init__(*args, **kwargs)
+        self.worker_cls = worker.MetadataWorkerFull
+        self.type = 'metadata'
+
+    def prepare(self):
+        try:
+            self.sections = client.get_metadata_sections(self.src_conn)
+        except client.HttpError as e:
+            log.error('Error listing metadata sections: %s', e)
+            raise
+
+        # grab the lastest shard markers and timestamps before we sync
+        self.shard_info = {}
+        self.init_num_shards()
+        for shard_num in xrange(self.num_shards):
+            info = client.get_log_info(self.src_conn, 'metadata', shard_num)
+            # setting an empty marker returns an error
+            if info['marker']:
+                self.shard_info[shard_num] = info['marker']
+
+        self.metadata_by_shard = {}
+        for section in self.sections:
+            try:
+                for key in client.list_metadata_keys(self.src_conn, section):
+                    shard = self.shard_num_for_key(section + ':' + key)
+                    self.metadata_by_shard.setdefault(shard, [])
+                    self.metadata_by_shard[shard].append((section, key))
+            except client.NotFound:
+                # no keys of this type exist
+                continue
+            except client.HttpError as e:
+                log.error('Error listing metadata for section %s: %s',
+                          section, e)
+                raise
+
+    def generate_work(self):
+        return self.metadata_by_shard.iteritems()
diff --git a/radosgw_agent/tests/__init__.py b/radosgw_agent/tests/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/radosgw_agent/tests/test_client.py b/radosgw_agent/tests/test_client.py
new file mode 100644 (file)
index 0000000..30b926a
--- /dev/null
@@ -0,0 +1,304 @@
+import py.test
+
+from radosgw_agent import client
+
+REGION_MAP = {
+    "regions": [
+        {
+            "val": {
+                "zones": [
+                    {
+                        "endpoints": [
+                            "http://vit:8001/"
+                            ],
+                        "log_data": "true",
+                        "log_meta": "true",
+                        "name": "skinny-1"
+                        },
+                    {
+                        "endpoints": [
+                            "http://vit:8002/"
+                            ],
+                        "log_data": "false",
+                        "log_meta": "false",
+                        "name": "skinny-2"
+                        }
+                    ],
+                "name": "skinny",
+                "default_placement": "",
+                "master_zone": "skinny-1",
+                "api_name": "slim",
+                "placement_targets": [],
+                "is_master": "true",
+                "endpoints": [
+                    "http://skinny:80/"
+                    ]
+                },
+            "key": "skinny"
+            },
+        {
+            "val": {
+                "zones": [
+                    {
+                        "endpoints": [
+                            "http://vit:8003/"
+                            ],
+                        "log_data": "false",
+                        "log_meta": "false",
+                        "name": "swab-2"
+                        },
+                    {
+                        "endpoints": [
+                            "http://vit:8004/"
+                            ],
+                        "log_data": "false",
+                        "log_meta": "false",
+                        "name": "swab-3"
+                        },
+                    {
+                        "endpoints": [
+                            "http://vit:8000/"
+                            ],
+                        "log_data": "true",
+                        "log_meta": "true",
+                        "name": "swab-1"
+                        }
+                    ],
+                "name": "swab",
+                "default_placement": "",
+                "master_zone": "swab-1",
+                "api_name": "shady",
+                "placement_targets": [],
+                "is_master": "false",
+                "endpoints": [
+                    "http://vit:8000/"
+                    ]
+                },
+            "key": "swab"
+            },
+        {
+            "val": {
+                "zones": [
+                    {
+                        "endpoints": [
+                            "http://ro:80/"
+                            ],
+                        "log_data": "false",
+                        "log_meta": "false",
+                        "name": "ro-1"
+                        },
+                    {
+                        "endpoints": [
+                            "http://ro:8080/"
+                            ],
+                        "log_data": "false",
+                        "log_meta": "false",
+                        "name": "ro-2"
+                        },
+                    ],
+                "name": "readonly",
+                "default_placement": "",
+                "master_zone": "ro-1",
+                "api_name": "readonly",
+                "placement_targets": [],
+                "is_master": "false",
+                "endpoints": [
+                    "http://ro:80/",
+                    "http://ro:8080/"
+                    ]
+                },
+            "key": "readonly"
+            },
+        {
+            "val": {
+                "zones": [
+                    {
+                        "endpoints": [
+                            "http://meta:80/"
+                            ],
+                        "log_data": "false",
+                        "log_meta": "true",
+                        "name": "meta-1"
+                        },
+                    {
+                        "endpoints": [
+                            "http://meta:8080/"
+                            ],
+                        "log_data": "false",
+                        "log_meta": "false",
+                        "name": "meta-2"
+                        },
+                    ],
+                "name": "metaonly",
+                "default_placement": "",
+                "master_zone": "meta-1",
+                "api_name": "metaonly",
+                "placement_targets": [],
+                "is_master": "false",
+                "endpoints": [
+                    "http://meta:80/",
+                    "http://meta:8080/"
+                    ]
+                },
+            "key": "metaonly"
+            }
+        ],
+    "master_region": "skinny"
+    }
+
+def test_endpoint_default_port():
+    endpoint = client.Endpoint('example.org', None, True)
+    assert endpoint.port == 443
+    endpoint = client.Endpoint('example.org', None, False)
+    assert endpoint.port == 80
+
+def test_endpoint_port_specified():
+    endpoint = client.Endpoint('example.org', 80, True)
+    assert endpoint.port == 80
+    endpoint = client.Endpoint('example.org', 443, True)
+    assert endpoint.port == 443
+
+def test_endpoint_equality():
+    default_port = client.Endpoint('a.org', None, True)
+    secure = client.Endpoint('a.org', 443, True)
+    insecure = client.Endpoint('a.org', 80, False)
+    assert default_port == secure
+    assert secure == insecure
+    assert insecure == default_port
+
+def test_endpoint_inequality():
+    base = client.Endpoint('a.org', 80, True)
+    diff_host = client.Endpoint('b.org', 80, True)
+    diff_port = client.Endpoint('a.org', 81, True)
+    insecure = client.Endpoint('a.org', 8080, False)
+    assert base != diff_host
+    assert base != diff_port
+    assert base != insecure
+
+def test_parse_endpoint():
+    endpoints = {
+        'http://example.org': ('example.org', 80, False),
+        'https://example.org': ('example.org', 443, True),
+        'https://example.org:8080': ('example.org', 8080, True),
+        'https://example.org:8080/': ('example.org', 8080, True),
+        'http://example.org:81/a/b/c?b#d': ('example.org', 81, False),
+        }
+    for url, (host, port, secure) in endpoints.iteritems():
+        endpoint = client.parse_endpoint(url)
+        assert endpoint.port == port
+        assert endpoint.host == host
+        assert endpoint.secure == secure
+
+def test_parse_endpoint_bad_input():
+    with py.test.raises(client.InvalidProtocol):
+        client.parse_endpoint('ftp://example.com')
+    with py.test.raises(client.InvalidHost):
+        client.parse_endpoint('http://:80/')
+
+def _test_configure_endpoints(dest_url, dest_region, dest_zone,
+                              expected_src_url, expected_src_region,
+                              expected_src_zone, specified_src_url=None,
+                              meta_only=False):
+    dest = client.parse_endpoint(dest_url)
+    if specified_src_url is not None:
+        src = client.parse_endpoint(specified_src_url)
+    else:
+        src = client.Endpoint(None, None, None)
+    region_map = client.RegionMap(REGION_MAP)
+    client.configure_endpoints(region_map, dest, src, meta_only)
+    assert dest.region.name == dest_region
+    assert dest.zone.name == dest_zone
+    assert src == client.parse_endpoint(expected_src_url)
+    assert src.region.name == expected_src_region
+    assert src.zone.name == expected_src_zone
+
+def test_configure_endpoints_2nd_region_master_zone_meta():
+    _test_configure_endpoints('http://vit:8000', 'swab', 'swab-1',
+                              'http://vit:8001', 'skinny', 'skinny-1',
+                              meta_only=True)
+
+def test_configure_endpoints_2nd_region_master_zone_data():
+    with py.test.raises(client.InvalidZone):
+        _test_configure_endpoints('http://vit:8000', 'swab', 'swab-1',
+                                  'http://vit:8001', 'skinny', 'skinny-1',
+                                  meta_only=False)
+
+def test_configure_endpoints_master_region_2nd_zone():
+    _test_configure_endpoints('http://vit:8002', 'skinny', 'skinny-2',
+                              'http://vit:8001', 'skinny', 'skinny-1')
+
+def test_configure_endpoints_2nd_region_2nd_zone():
+    _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+                              'http://vit:8000', 'swab', 'swab-1')
+
+def test_configure_endpoints_2nd_region_readonly_meta():
+    _test_configure_endpoints('http://ro:8080', 'readonly', 'ro-2',
+                              'http://vit:8001', 'skinny', 'skinny-1',
+                              meta_only=True)
+
+def test_configure_endpoints_2nd_region_readonly_data():
+    with py.test.raises(client.InvalidZone):
+        _test_configure_endpoints('http://ro:8080', 'readonly', 'ro-2',
+                                  'http://vit:8001', 'skinny', 'skinny-1',
+                                  meta_only=False)
+
+def test_configure_endpoints_2nd_region_metaonly_meta():
+    _test_configure_endpoints('http://meta:8080', 'metaonly', 'meta-2',
+                              'http://meta:80', 'metaonly', 'meta-1',
+                              meta_only=True)
+
+def test_configure_endpoints_2nd_region_metaonly_data():
+    with py.test.raises(client.InvalidZone):
+        _test_configure_endpoints('http://meta:8080', 'metaonly', 'meta-2',
+                                  'http://vit:8001', 'skinny', 'skinny-1',
+                                  meta_only=False)
+
+def test_configure_endpoints_master_region_master_zone():
+    with py.test.raises(client.InvalidZone):
+        _test_configure_endpoints('http://vit:8001', 'skinny', 'skinny-1',
+                                  'http://vit:8001', 'skinny', 'skinny-1')
+
+def test_configure_endpoints_specified_src_same_region():
+    _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+                              'http://vit:8000', 'swab', 'swab-1',
+                              'http://vit:8000')
+
+def test_configure_endpoints_specified_src_master_region_meta():
+    _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+                              'http://vit:8001', 'skinny', 'skinny-1',
+                              'http://vit:8001', meta_only=True)
+
+def test_configure_endpoints_specified_src_master_region_data():
+    with py.test.raises(client.InvalidZone):
+        _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+                                  'http://vit:8001', 'skinny', 'skinny-1',
+                                  'http://vit:8001', meta_only=False)
+
+def test_configure_endpoints_bad_src_same_region():
+    with py.test.raises(client.InvalidZone):
+        _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+                                  'http://vit:8004', 'swab', 'swab-3',
+                                  'http://vit:8004')
+
+def test_configure_endpoints_bad_src_master_region():
+    with py.test.raises(client.InvalidZone):
+        _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+                                  'http://vit:8002', 'skinny', 'skinny-2',
+                                  'http://vit:8002')
+
+def test_configure_endpoints_bad_src_same_zone():
+    with py.test.raises(client.InvalidZone):
+        _test_configure_endpoints('http://vit:8000', 'swab', 'swab-1',
+                                  'http://vit:8000', 'swab', 'swab-1',
+                                  'http://vit:8000')
+
+def test_configure_endpoints_specified_nonexistent_src():
+    with py.test.raises(client.ZoneNotFound):
+        _test_configure_endpoints('http://vit:8005', 'skinny', 'skinny-1',
+                                  'http://vit:8001', 'skinny', 'skinny-1',
+                                  'http://vit:80')
+
+def test_configure_endpoints_unknown_zone():
+    with py.test.raises(client.ZoneNotFound):
+        _test_configure_endpoints('http://vit:8005', 'skinny', 'skinny-1',
+                                  'http://vit:8001', 'skinny', 'skinny-1')
diff --git a/radosgw_agent/worker.py b/radosgw_agent/worker.py
new file mode 100644 (file)
index 0000000..7de850b
--- /dev/null
@@ -0,0 +1,492 @@
+from collections import namedtuple
+import logging
+import multiprocessing
+import os
+import socket
+import time
+
+from radosgw_agent import client
+from radosgw_agent import lock
+
+log = logging.getLogger(__name__)
+
+RESULT_SUCCESS = 0
+RESULT_ERROR = 1
+
+class SkipShard(Exception):
+    pass
+
+class SyncError(Exception):
+    pass
+class SyncTimedOut(SyncError):
+    pass
+class SyncFailed(SyncError):
+    pass
+
+DEFAULT_TIME = '1970-01-01 00:00:00'
+
+class Worker(multiprocessing.Process):
+    """sync worker to run in its own process"""
+
+    def __init__(self, work_queue, result_queue, log_lock_time,
+                 src, dest, **kwargs):
+        super(Worker, self).__init__()
+        self.src = src
+        self.dest = dest
+        self.work_queue = work_queue
+        self.result_queue = result_queue
+        self.log_lock_time = log_lock_time
+        self.lock = None
+
+        self.local_lock_id = socket.gethostname() + ':' + str(os.getpid())
+
+        # construct the two connection objects
+        self.src_conn = client.connection(src)
+        self.dest_conn = client.connection(dest)
+
+    def prepare_lock(self):
+        assert self.lock is None
+        self.lock = lock.Lock(self.dest_conn, self.type, self.local_lock_id,
+                              self.log_lock_time, self.dest.zone.name)
+        self.lock.daemon = True
+        self.lock.start()
+
+    def lock_shard(self, shard_num):
+        result = shard_num, []
+        try:
+            self.lock.set_shard(shard_num)
+            self.lock.acquire()
+        except client.NotFound:
+            # no log means nothing changed this shard yet
+            self.lock.unset_shard()
+            self.result_queue.put((RESULT_SUCCESS, result))
+            raise SkipShard('no log for shard')
+        except Exception:
+            log.warn('error locking shard %d log, '
+                     ' skipping for now. Traceback: ',
+                     shard_num, exc_info=True)
+            self.lock.unset_shard()
+            self.result_queue.put((RESULT_ERROR, result))
+            raise SkipShard()
+
+    def unlock_shard(self):
+        try:
+            self.lock.release_and_clear()
+        except lock.LockBroken as e:
+            log.warn('work may be duplicated: %s', e)
+        except Exception as e:
+            log.warn('error unlocking log, continuing anyway '
+                     'since lock will timeout. Traceback:', exc_info=True)
+
+    def set_bound(self, key, marker, retries, type_=None):
+        # api doesn't allow setting a bound with a blank marker
+        if marker:
+            if type_ is None:
+                type_ = self.type
+            try:
+                data = [dict(name=item, time=DEFAULT_TIME) for item in retries]
+                client.set_worker_bound(self.dest_conn,
+                                        type_,
+                                        marker,
+                                        DEFAULT_TIME,
+                                        self.daemon_id,
+                                        key,
+                                        data=data)
+                return RESULT_SUCCESS
+            except Exception:
+                log.warn('error setting worker bound for key "%s",'
+                         ' may duplicate some work later. Traceback:', key,
+                         exc_info=True)
+                return RESULT_ERROR
+
+MetadataEntry = namedtuple('MetadataEntry',
+                           ['section', 'name', 'marker', 'timestamp'])
+
+def _meta_entry_from_json(entry):
+    return MetadataEntry(
+        entry['section'],
+        entry['name'],
+        entry['id'],
+        entry['timestamp'],
+        )
+
+BucketIndexEntry = namedtuple('BucketIndexEntry',
+                              ['object', 'marker', 'timestamp'])
+
+def _bi_entry_from_json(entry):
+    return BucketIndexEntry(
+        entry['object'],
+        entry['op_id'],
+        entry['timestamp'],
+        )
+
+class IncrementalMixin(object):
+    """This defines run() and get_and_process_entries() for incremental sync.
+
+    These are the same for data and metadata sync, so share their
+    implementation here.
+    """
+
+    def run(self):
+        self.prepare_lock()
+        while True:
+            item = self.work_queue.get()
+            if item is None:
+                log.info('process %s is done. Exiting', self.ident)
+                break
+
+            shard_num, (log_entries, retries) = item
+
+            log.info('%s is processing shard number %d',
+                     self.ident, shard_num)
+
+            # first, lock the log
+            try:
+                self.lock_shard(shard_num)
+            except SkipShard:
+                continue
+
+            result = RESULT_SUCCESS
+            try:
+                new_retries = self.sync_entries(log_entries, retries)
+            except Exception:
+                log.exception('syncing entries for shard %d failed',
+                              shard_num)
+                result = RESULT_ERROR
+                new_retries = []
+
+            # finally, unlock the log
+            self.unlock_shard()
+            self.result_queue.put((result, (shard_num, new_retries)))
+            log.info('finished processing shard %d', shard_num)
+
+
+class DataWorker(Worker):
+
+    def __init__(self, *args, **kwargs):
+        super(DataWorker, self).__init__(*args, **kwargs)
+        self.type = 'data'
+        self.op_id = 0
+        self.object_sync_timeout = kwargs.get('object_sync_timeout', 60 * 60 * 60)
+        self.daemon_id = kwargs['daemon_id']
+
+    def sync_object(self, bucket, obj):
+        log.debug('sync_object %s/%s', bucket, obj)
+        self.op_id += 1
+        local_op_id = self.local_lock_id + ':' +  str(self.op_id)
+        try:
+            found = True
+            until = time.time() + self.object_sync_timeout
+            client.sync_object_intra_region(self.dest_conn, bucket, obj,
+                                            self.src.zone.name,
+                                            self.daemon_id,
+                                            local_op_id)
+        except client.NotFound:
+            found = False
+            log.debug('"%s/%s" not found on master, deleting from secondary',
+                      bucket, obj)
+            try:
+                client.delete_object(self.dest_conn, bucket, obj)
+            except client.NotFound:
+                # Since we were trying to delete the object, just return
+                return
+            except Exception:
+                msg = 'could not delete "%s/%s" from secondary' % (bucket, obj)
+                log.exception(msg)
+                raise SyncFailed(msg)
+        except SyncFailed:
+            raise
+        except Exception as e:
+            log.debug('exception during sync: %s', e)
+            if found:
+                self.wait_for_object(bucket, obj, until, local_op_id)
+        # TODO: clean up old op states
+        try:
+            if found:
+                client.remove_op_state(self.dest_conn, self.daemon_id,
+                                       local_op_id, bucket, obj)
+        except Exception:
+            log.exception('could not remove op state for daemon "%s" op_id %s',
+                          self.daemon_id, local_op_id)
+
+    def wait_for_object(self, bucket, obj, until, local_op_id):
+        while time.time() < until:
+            try:
+                state = client.get_op_state(self.dest_conn,
+                                            self.daemon_id,
+                                            local_op_id,
+                                            bucket, obj)
+                log.debug('op state is %s', state)
+                state = state[0]['state']
+                if state == 'complete':
+                    return
+                elif state != 'in-progress':
+                    raise SyncFailed('state is {0}'.format(state))
+                time.sleep(1)
+            except SyncFailed:
+                raise
+            except Exception as e:
+                log.debug('error geting op state: %s', e, exc_info=True)
+                time.sleep(1)
+        # timeout expired
+        raise SyncTimedOut()
+
+    def get_bucket_instance(self, bucket):
+        metadata = client.get_metadata(self.src_conn, 'bucket', bucket)
+        return bucket + ':' + metadata['data']['bucket']['bucket_id']
+
+    def get_bucket(self, bucket_instance):
+        return bucket_instance.split(':', 1)[0]
+
+    def sync_bucket(self, bucket, objects):
+        log.info('syncing bucket "%s"', bucket)
+        retry_objs = []
+        count = 0
+        for obj in objects:
+            count += 1
+            # sync each object
+            log.debug('syncing object "%s/%s"', bucket, obj),
+            try:
+                self.sync_object(bucket, obj)
+            except SyncError as err:
+                log.error('failed to sync object %s/%s: %s',
+                          bucket, obj, err)
+                retry_objs.append(obj)
+
+        log.debug('bucket {bucket} has {num_objects} object'.format(
+                  bucket=bucket, num_objects=count))
+        if retry_objs:
+            log.debug('these objects failed to be synced and will be during '
+                      'the next incremental sync: %s', retry_objs)
+
+        return retry_objs
+
+
+class DataWorkerIncremental(IncrementalMixin, DataWorker):
+
+    def __init__(self, *args, **kwargs):
+        super(DataWorkerIncremental, self).__init__(*args, **kwargs)
+        self.max_entries = kwargs['max_entries']
+
+    def get_bucket_instance_entries(self, marker, instance):
+        entries = []
+        while True:
+            try:
+                log_entries = client.get_log(self.src_conn, 'bucket-index',
+                                             marker, self.max_entries, instance)
+            except client.NotFound:
+                log_entries = []
+
+            log.debug('bucket instance "%s" has %d entries after "%s"', instance,
+                      len(log_entries), marker)
+
+            try:
+                entries += [_bi_entry_from_json(entry) for entry in log_entries]
+            except KeyError:
+                log.error('log missing key is: %s', log_entries)
+                raise
+
+            if entries:
+                marker = entries[-1].marker
+            else:
+                marker = ''
+
+            if len(log_entries) < self.max_entries:
+                break
+        return marker, entries
+
+    def inc_sync_bucket_instance(self, instance, marker, timestamp, retries):
+        max_marker, entries = self.get_bucket_instance_entries(marker, instance)
+        objects = set([entry.object for entry in entries])
+        bucket = self.get_bucket(instance)
+        new_retries = self.sync_bucket(bucket, objects.union(retries))
+
+        result = self.set_bound(instance, max_marker, new_retries,
+                                'bucket-index')
+        if new_retries:
+            result = RESULT_ERROR
+        return result
+
+    def sync_entries(self, log_entries, retries):
+        try:
+            bucket_instances = set([entry['key'] for entry in log_entries])
+        except KeyError:
+            log.error('log containing bad key is: %s', log_entries)
+            raise
+
+        new_retries = []
+        for bucket_instance in bucket_instances.union(retries):
+            try:
+                marker, timestamp, retries = client.get_worker_bound(
+                    self.dest_conn,
+                    'bucket-index',
+                    bucket_instance)
+            except client.NotFound:
+                log.debug('no worker bound found for bucket instance "%s"',
+                          bucket_instance)
+                marker, timestamp, retries = '', DEFAULT_TIME, []
+            try:
+                sync_result = self.inc_sync_bucket_instance(bucket_instance,
+                                                            marker,
+                                                            timestamp,
+                                                            retries)
+            except Exception as e:
+                log.warn('error syncing bucket instance "%s": %s',
+                         bucket_instance, e, exc_info=True)
+                sync_result = RESULT_ERROR
+            if sync_result == RESULT_ERROR:
+                new_retries.append(bucket_instance)
+
+        return new_retries
+
+class DataWorkerFull(DataWorker):
+
+    def full_sync_bucket(self, bucket):
+        try:
+            instance = self.get_bucket_instance(bucket)
+            try:
+                marker = client.get_log_info(self.src_conn, 'bucket-index',
+                                             instance)['max_marker']
+            except client.NotFound:
+                marker = ''
+            log.debug('bucket instance is "%s" with marker %s', instance, marker)
+            # nothing to do for this bucket
+            if not marker:
+                return True
+
+            objects = client.list_objects_in_bucket(self.src_conn, bucket)
+            if not objects:
+                return True
+        except Exception as e:
+            log.error('error preparing for full sync of bucket "%s": %s',
+                      bucket, e)
+            return False
+
+        retries = self.sync_bucket(bucket, objects)
+
+        result = self.set_bound(instance, marker, retries, 'bucket-index')
+        return not retries and result == RESULT_SUCCESS
+
+    def run(self):
+        self.prepare_lock()
+        while True:
+            item = self.work_queue.get()
+            if item is None:
+                log.info('No more entries in queue, exiting')
+                break
+
+            shard_num, buckets = item
+
+            # first, lock the log
+            try:
+                self.lock_shard(shard_num)
+            except SkipShard:
+                continue
+
+            # attempt to sync each bucket, add to a list to retry
+            # during incremental sync if sync fails
+            retry_buckets = []
+            for bucket in buckets:
+                if not self.full_sync_bucket(bucket):
+                    retry_buckets.append(bucket)
+
+            # unlock shard and report buckets to retry during incremental sync
+            self.unlock_shard()
+            self.result_queue.put((RESULT_SUCCESS, (shard_num, retry_buckets)))
+            log.info('finished syncing shard %d', shard_num)
+            log.info('incremental sync will need to retry buckets: %s',
+                     retry_buckets)
+
+class MetadataWorker(Worker):
+
+    def __init__(self, *args, **kwargs):
+        super(MetadataWorker, self).__init__(*args, **kwargs)
+        self.type = 'metadata'
+
+    def sync_meta(self, section, name):
+        log.debug('syncing metadata type %s key "%s"', section, name)
+        try:
+            metadata = client.get_metadata(self.src_conn, section, name)
+        except client.NotFound:
+            log.debug('%s "%s" not found on master, deleting from secondary',
+                      section, name)
+            try:
+                client.delete_metadata(self.dest_conn, section, name)
+            except client.NotFound:
+                # Since this error is handled appropriately, return success
+                return RESULT_SUCCESS
+        except Exception as e:
+            log.warn('error getting metadata for %s "%s": %s',
+                     section, name, e, exc_info=True)
+            return RESULT_ERROR
+        else:
+            try:
+                client.update_metadata(self.dest_conn, section, name, metadata)
+                return RESULT_SUCCESS
+            except Exception as e:
+                log.warn('error updating metadata for %s "%s": %s',
+                          section, name, e, exc_info=True)
+                return RESULT_ERROR
+
+class MetadataWorkerIncremental(IncrementalMixin, MetadataWorker):
+
+    def __init__(self, *args, **kwargs):
+        super(MetadataWorkerIncremental, self).__init__(*args, **kwargs)
+
+    def sync_entries(self, log_entries, retries):
+        try:
+            entries = [_meta_entry_from_json(entry) for entry in log_entries]
+        except KeyError:
+            log.error('log containing bad key is: %s', log_entries)
+            raise
+
+        new_retries = []
+        mentioned = set([(entry.section, entry.name) for entry in entries])
+        split_retries = [tuple(entry.split('/', 1)) for entry in retries]
+        for section, name in mentioned.union(split_retries):
+            sync_result = self.sync_meta(section, name)
+            if sync_result == RESULT_ERROR:
+                new_retries.append(section + '/' + name)
+
+        return new_retries
+
+class MetadataWorkerFull(MetadataWorker):
+
+    def empty_result(self, shard):
+        return shard, []
+
+    def run(self):
+        self.prepare_lock()
+        while True:
+            item = self.work_queue.get()
+            if item is None:
+                log.info('No more entries in queue, exiting')
+                break
+
+            log.debug('syncing item "%s"', item)
+
+            shard_num, metadata = item
+
+            # first, lock the log
+            try:
+                self.lock_shard(shard_num)
+            except SkipShard:
+                continue
+
+            # attempt to sync each bucket, add to a list to retry
+            # during incremental sync if sync fails
+            retries = []
+            for section, name in metadata:
+                try:
+                    self.sync_meta(section, name)
+                except Exception as e:
+                    log.warn('could not sync %s "%s", saving for retry: %s',
+                             section, name, e, exc_info=True)
+                    retries.append(section + '/' + name)
+
+            # unlock shard and report buckets to retry during incremental sync
+            self.unlock_shard()
+            self.result_queue.put((RESULT_SUCCESS, (shard_num, retries)))
+            log.info('finished syncing shard %d', shard_num)
+            log.info('incremental sync will need to retry items: %s',
+                     retries)
diff --git a/requirements-dev.txt b/requirements-dev.txt
new file mode 100644 (file)
index 0000000..43b2434
--- /dev/null
@@ -0,0 +1,3 @@
+pytest >=2.1.3
+mock >=1.0
+tox >=1.2
diff --git a/requirements.txt b/requirements.txt
new file mode 100644 (file)
index 0000000..2ac9bc1
--- /dev/null
@@ -0,0 +1,4 @@
+argparse
+boto >=2.2.2,<3.0.0
+requests >=1.2.1
+PyYAML
diff --git a/setup.cfg b/setup.cfg
new file mode 100644 (file)
index 0000000..cf24a05
--- /dev/null
+++ b/setup.cfg
@@ -0,0 +1,2 @@
+[bdist_rpm]
+requires = python-argparse,PyYAML,python-boto >= 2.2.2,python-boto < 3.0.0 python-requests
diff --git a/setup.py b/setup.py
new file mode 100644 (file)
index 0000000..6a9144b
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,39 @@
+#!/usr/bin/python
+from setuptools import setup, find_packages
+import sys
+
+
+install_requires = []
+pyversion = sys.version_info[:2]
+if pyversion < (2, 7) or (3, 0) <= pyversion <= (3, 1):
+    install_requires.append('argparse')
+
+setup(
+    name='radosgw-agent',
+    version='1.1',
+    packages=find_packages(),
+
+    author='Josh Durgin',
+    author_email='josh.durgin@inktank.com',
+    description='Synchronize users and data between radosgw clusters',
+    license='MIT',
+    keywords='radosgw ceph radosgw-agent',
+    url="https://github.com/ceph/radosgw-agent",
+
+    install_requires=[
+        'setuptools',
+        'boto >=2.2.2,<3.0.0',
+        'requests >=1.2.1',
+        ] + install_requires,
+
+    test_requires=[
+        'pytest >=2.1.3',
+        'mock >=1.0',
+        ],
+
+    entry_points={
+        'console_scripts': [
+            'radosgw-agent = radosgw_agent.cli:main',
+            ],
+        },
+    )
diff --git a/tox.ini b/tox.ini
new file mode 100644 (file)
index 0000000..74acee7
--- /dev/null
+++ b/tox.ini
@@ -0,0 +1,8 @@
+[tox]
+envlist = py26
+
+[testenv]
+deps=
+  pytest
+  mock
+commands=py.test -s -v {posargs:radosgw_agent/tests}