From 65121e7093fbba610496f618e942516fa4557777 Mon Sep 17 00:00:00 2001 From: Alfredo Deza Date: Thu, 26 Mar 2015 11:26:15 -0400 Subject: [PATCH] inform the user about data sync operations Signed-off-by: Alfredo Deza --- .gitignore | 36 + CHANGELOG.rst | 18 + LICENSE | 19 + MANIFEST.in | 5 + README.rst | 3 + bootstrap | 42 ++ debian/changelog | 23 + debian/compat | 1 + debian/control | 18 + debian/copyright | 3 + debian/dirs | 3 + debian/rules | 15 + debian/source/format | 1 + init-radosgw-agent | 102 +++ logrotate.conf | 7 + radosgw-agent.spec | 44 ++ radosgw_agent/__init__.py | 5 + radosgw_agent/cli.py | 481 +++++++++++++ radosgw_agent/client.py | 634 +++++++++++++++++ radosgw_agent/constants.py | 6 + radosgw_agent/exceptions.py | 77 ++ radosgw_agent/lock.py | 107 +++ radosgw_agent/request.py | 189 +++++ radosgw_agent/sync.py | 328 +++++++++ radosgw_agent/tests/__init__.py | 0 radosgw_agent/tests/conftest.py | 33 + radosgw_agent/tests/test_client.py | 672 ++++++++++++++++++ radosgw_agent/tests/test_worker.py | 209 ++++++ .../tests/util/test_configuration.py | 64 ++ radosgw_agent/tests/util/test_obj.py | 40 ++ radosgw_agent/util/__init__.py | 2 + radosgw_agent/util/configuration.py | 77 ++ radosgw_agent/util/decorators.py | 112 +++ radosgw_agent/util/log.py | 89 +++ radosgw_agent/util/obj.py | 42 ++ radosgw_agent/util/string.py | 14 + radosgw_agent/worker.py | 549 ++++++++++++++ requirements-dev.txt | 3 + requirements.txt | 3 + scripts/radosgw-agent | 21 + setup.cfg | 2 + setup.py | 44 ++ tox.ini | 9 + 43 files changed, 4152 insertions(+) create mode 100644 .gitignore create mode 100644 CHANGELOG.rst create mode 100644 LICENSE create mode 100644 MANIFEST.in create mode 100644 README.rst create mode 100755 bootstrap create mode 100644 debian/changelog create mode 100644 debian/compat create mode 100644 debian/control create mode 100644 debian/copyright create mode 100644 debian/dirs create mode 100755 debian/rules create mode 100644 debian/source/format create mode 100644 init-radosgw-agent create mode 100644 logrotate.conf create mode 100644 radosgw-agent.spec create mode 100644 radosgw_agent/__init__.py create mode 100644 radosgw_agent/cli.py create mode 100644 radosgw_agent/client.py create mode 100644 radosgw_agent/constants.py create mode 100644 radosgw_agent/exceptions.py create mode 100644 radosgw_agent/lock.py create mode 100644 radosgw_agent/request.py create mode 100644 radosgw_agent/sync.py create mode 100644 radosgw_agent/tests/__init__.py create mode 100644 radosgw_agent/tests/conftest.py create mode 100644 radosgw_agent/tests/test_client.py create mode 100644 radosgw_agent/tests/test_worker.py create mode 100644 radosgw_agent/tests/util/test_configuration.py create mode 100644 radosgw_agent/tests/util/test_obj.py create mode 100644 radosgw_agent/util/__init__.py create mode 100644 radosgw_agent/util/configuration.py create mode 100644 radosgw_agent/util/decorators.py create mode 100644 radosgw_agent/util/log.py create mode 100644 radosgw_agent/util/obj.py create mode 100644 radosgw_agent/util/string.py create mode 100644 radosgw_agent/worker.py create mode 100644 requirements-dev.txt create mode 100644 requirements.txt create mode 100644 scripts/radosgw-agent create mode 100644 setup.cfg create mode 100644 setup.py create mode 100644 tox.ini diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2907320 --- /dev/null +++ b/.gitignore @@ -0,0 +1,36 @@ +*.py[cod] + +# C extensions +*.so + +# Packages +*.egg +*.egg-info +dist +build +eggs +parts +bin +var +sdist +develop-eggs +.installed.cfg +lib +lib64 + +# Installer logs +pip-log.txt +*.log + +# Unit test / coverage reports +.coverage +.tox +nosetests.xml + +# Translations +*.mo + +# Mr Developer +.mr.developer.cfg +.project +.pydevproject diff --git a/CHANGELOG.rst b/CHANGELOG.rst new file mode 100644 index 0000000..115b0a6 --- /dev/null +++ b/CHANGELOG.rst @@ -0,0 +1,18 @@ +Changelog +========= + +1.2.1 +----- +* Parity in version release for DEB/RPMs to PyPI. Previous 1.2 release had + fixes available only for the Python package. + +1.2 +--- +* Improve usage for log (working better with logrotate) +* Fixes for racing threads when shard number changes +* Better logging of exceptions +* Retry sync when transient errors are returned by the gateway. +* Drops dependency on Python's ``request`` library (in favour of ``boto``) +* Better support of objects when they are not found. +* When there are buckets with no logs, process them as a full sync. +* Fix mishandling of reserved characters in URLs. diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..6062a74 --- /dev/null +++ b/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2013 Inktank Storage, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..03f1191 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,5 @@ +include LICENSE +include scripts/radosgw-agent +include init-radosgw-agent +include logrotate.conf +prune radosgw_agent/tests diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..656bd71 --- /dev/null +++ b/README.rst @@ -0,0 +1,3 @@ +==================================================================== +radosgw-agent -- synchronize data and users between radosgw clusters +==================================================================== diff --git a/bootstrap b/bootstrap new file mode 100755 index 0000000..6dd1a2d --- /dev/null +++ b/bootstrap @@ -0,0 +1,42 @@ +#!/bin/sh +set -e + +if command -v lsb_release >/dev/null 2>&1; then + case "$(lsb_release --id --short)" in + Ubuntu|Debian) + for package in python-virtualenv; do + if [ "$(dpkg --status -- $package 2>/dev/null|sed -n 's/^Status: //p')" != "install ok installed" ]; then + # add a space after old values + missing="${missing:+$missing }$package" + fi + done + if [ -n "$missing" ]; then + echo "$0: missing required packages, please install them:" 1>&2 + echo " sudo apt-get install $missing" + exit 1 + fi + ;; + esac +else + if [ -f /etc/redhat-release ]; then + case "$(cat /etc/redhat-release | awk '{print $1}')" in + CentOS) + for package in python-virtualenv; do + if [ "$(rpm -qa $package 2>/dev/null)" == "" ]; then + missing="${missing:+$missing }$package" + fi + done + if [ -n "$missing" ]; then + echo "$0: missing required packages, please install them:" 1>&2 + echo " sudo yum install $missing" + exit 1 + fi + ;; + esac + fi +fi + +test -d virtualenv || virtualenv virtualenv +./virtualenv/bin/python setup.py develop +./virtualenv/bin/pip install -r requirements.txt -r requirements-dev.txt +test -e radosgw-agent || ln -s ./virtualenv/bin/radosgw-agent . diff --git a/debian/changelog b/debian/changelog new file mode 100644 index 0000000..fc5a180 --- /dev/null +++ b/debian/changelog @@ -0,0 +1,23 @@ +radosgw-agent (1.2.1) stable; urgency=low + + * New upstream release + + -- Alfredo Deza Mon, 09 Feb 2015 12:52:46 -0800 + +radosgw-agent (1.2-1) precise; urgency=low + + * new upstream release + + -- Sandon Van Ness Wed, 02 April 2014 11:25:54 -0800 + +radosgw-agent (1.1-1) precise; urgency=low + + * new upstream release + + -- Gary Lowell Thu, 21 Nov 2013 16:17:25 -0800 + +radosgw-agent (1.0-1) stable; urgency=low + + * Initial release + + -- Gary Lowell Mon, 26 Aug 2013 09:19:47 -0700 diff --git a/debian/compat b/debian/compat new file mode 100644 index 0000000..45a4fb7 --- /dev/null +++ b/debian/compat @@ -0,0 +1 @@ +8 diff --git a/debian/control b/debian/control new file mode 100644 index 0000000..6acbfba --- /dev/null +++ b/debian/control @@ -0,0 +1,18 @@ +Source: radosgw-agent +Maintainer: Sage Weil +Uploaders: Sage Weil +Section: admin +Priority: optional +Build-Depends: debhelper (>= 8), python-setuptools +X-Python-Version: >= 2.6 +Standards-Version: 3.9.2 +Homepage: http://ceph.com/ + +Package: radosgw-agent +Architecture: all +Depends: python, + python-argparse, + python-setuptools, + ${misc:Depends}, + ${python:Depends} +Description: Rados gateway agents. diff --git a/debian/copyright b/debian/copyright new file mode 100644 index 0000000..730861e --- /dev/null +++ b/debian/copyright @@ -0,0 +1,3 @@ +Files: * +Copyright: (c) 2013 by Inktank Storage +License: LGPL2.1 (see /usr/share/common-licenses/LGPL-2.1) diff --git a/debian/dirs b/debian/dirs new file mode 100644 index 0000000..b1f058b --- /dev/null +++ b/debian/dirs @@ -0,0 +1,3 @@ +/etc/ceph/radosgw-agent +/var/log/ceph/radosgw-agent +/var/run/ceph/radosgw-agent diff --git a/debian/rules b/debian/rules new file mode 100755 index 0000000..8b612b7 --- /dev/null +++ b/debian/rules @@ -0,0 +1,15 @@ +#!/usr/bin/make -f + +# Uncomment this to turn on verbose mode. +export DH_VERBOSE=1 + +%: + dh $@ --buildsystem python_distutils --with python2 + +override_dh_installlogrotate: + cp logrotate.conf debian/radosgw-agent.logrotate + dh_installlogrotate + +override_dh_installinit: + install -m0644 init-radosgw-agent debian/radosgw-agent.init + dh_installinit --no-start diff --git a/debian/source/format b/debian/source/format new file mode 100644 index 0000000..d3827e7 --- /dev/null +++ b/debian/source/format @@ -0,0 +1 @@ +1.0 diff --git a/init-radosgw-agent b/init-radosgw-agent new file mode 100644 index 0000000..ace91ae --- /dev/null +++ b/init-radosgw-agent @@ -0,0 +1,102 @@ +#!/bin/sh +# Start/stop radosgw-agent daemons +# chkconfig: 2345 60 80 + +### BEGIN INIT INFO +# Provides: radosgw-agent +# Required-Start: $remote_fs $named $network +# Required-Stop: $remote_fs $named $network +# Default-Start: 2 3 4 5 +# Default-Stop: 0 1 6 +# Short-Description: Start radosgw-agent at boot time +# Description: Enable radosgw-agent services +### END INIT INFO + +dir="/" +config_path="/etc/ceph/radosgw-agent/default.conf" + +if [ $2 ]; then + config_path=$2 +fi + +if [ ! -f "$config_path" ]; then + echo "$0: configuration file $config_path not found" + exit 0 +fi + +cmd="/usr/bin/radosgw-agent -c $config_path" + +name=`basename $config_path` +pid_file="/var/run/ceph/radosgw-agent/$name.pid" + +is_running() { + [ -e "$pid_file" ] || return 1 + pid=`cat "$pid_file"` + [ -e "/proc/$pid" ] && grep -q "/usr/bin/radosgw-agent.-c.$config_path" "/proc/$pid/cmdline" && return 0 + return 1 +} + +case "$1" in + start) + if is_running; then + echo "Already started" + exit 0 + fi + echo "Starting radosgw-agent $name" + cd "$dir" + $cmd > /dev/null 2>&1 & + echo $! > "$pid_file" + if ! is_running; then + echo "Unable to start, see /var/log/ceph/radosgw-agent/" + exit 1 + fi + ;; + + stop) + if is_running; then + echo -n "Stopping radosgw-agent $name.." + pid=`cat "$pid_file"` + kill $pid + for i in {1..10} + do + if ! is_running; then + break + fi + + echo -n "." + sleep 1 + done + + if is_running; then + echo "Not stopped; may still be shutting down or shutdown may have failed" + exit 1 + else + echo "Stopped" + rm "$pid_file" + fi + else + echo "Not running" + fi + ;; + restart) + $0 stop $name + if is_running; then + echo "Unable to stop, will not attempt to start" + exit 1 + fi + $0 start $name + ;; + status) + if is_running; then + echo "Running" + else + echo "Stopped" + exit 1 + fi + ;; + *) + echo "Usage: $0 {start|stop|restart|status} [config-file]" + exit 1 + ;; +esac +exit 0 diff --git a/logrotate.conf b/logrotate.conf new file mode 100644 index 0000000..745f1cb --- /dev/null +++ b/logrotate.conf @@ -0,0 +1,7 @@ +/var/log/ceph/radosgw-agent/*.log { + rotate 7 + daily + compress + missingok + notifempty +} diff --git a/radosgw-agent.spec b/radosgw-agent.spec new file mode 100644 index 0000000..e1d5a51 --- /dev/null +++ b/radosgw-agent.spec @@ -0,0 +1,44 @@ +Summary: Synchronize users and data between radosgw clusters +Name: radosgw-agent +Version: 1.2.1 +Release: 0%{?dist} +Source0: https://pypi.python.org/packages/source/r/%{name}/%{name}-%{version}.tar.gz +License: MIT +Group: Development/Libraries +BuildArch: noarch +Requires: python-argparse +Requires: PyYAML +Requires: python-boto >= 2.2.2 +Requires: python-boto < 3.0.0 +BuildRequires: python-devel +BuildRequires: python-setuptools +URL: https://github.com/ceph/radosgw-agent + +%description +The Ceph RADOS Gateway agent replicates the data of a master zone to a +secondary zone. + +%prep +%setup -q + +%build +python setup.py build + +%install +python setup.py install --single-version-externally-managed -O1 --root=$RPM_BUILD_ROOT +install -m 0755 -D scripts/radosgw-agent $RPM_BUILD_ROOT%{_bindir}/radosgw-agent +install -m 0644 -D logrotate.conf $RPM_BUILD_ROOT%{_sysconfdir}/logrotate.d/radosgw-agent +install -m 0755 -D init-radosgw-agent $RPM_BUILD_ROOT%{_initrddir}/radosgw-agent +mkdir -p $RPM_BUILD_ROOT%{_sysconfdir}/ceph/radosgw-agent +mkdir -p $RPM_BUILD_ROOT%{_localstatedir}/log/ceph/radosgw-agent +mkdir -p $RPM_BUILD_ROOT%{_localstatedir}/run/ceph/radosgw-agent + +%files +%doc LICENSE +%dir %{_sysconfdir}/ceph/radosgw-agent +%dir %{_localstatedir}/log/ceph/radosgw-agent +%dir %{_localstatedir}/run/ceph/radosgw-agent +%config(noreplace) %{_sysconfdir}/logrotate.d/radosgw-agent +%{_bindir}/radosgw-agent +%{_initrddir}/radosgw-agent +%{python_sitelib}/radosgw_agent*/ diff --git a/radosgw_agent/__init__.py b/radosgw_agent/__init__.py new file mode 100644 index 0000000..61759fc --- /dev/null +++ b/radosgw_agent/__init__.py @@ -0,0 +1,5 @@ +from radosgw_agent.util import configuration as _configuration + +config = _configuration.Configuration() + +__version__ = '1.2.1' diff --git a/radosgw_agent/cli.py b/radosgw_agent/cli.py new file mode 100644 index 0000000..9072cbf --- /dev/null +++ b/radosgw_agent/cli.py @@ -0,0 +1,481 @@ +from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer +import argparse +import contextlib +import logging +import logging.handlers +import os.path +import yaml +import sys +import time +import base64 +import hmac +import sha +import urllib2 +from urllib2 import URLError, HTTPError + +import radosgw_agent +from radosgw_agent import client +from radosgw_agent import util +from radosgw_agent.util import string +from radosgw_agent.util.decorators import catches +from radosgw_agent.exceptions import AgentError, RegionMapError, InvalidProtocol +from radosgw_agent import sync, config + +log = logging.getLogger('radosgw_agent') + + +def check_positive_int(string): + value = int(string) + if value < 1: + msg = '%r is not a positive integer' % string + raise argparse.ArgumentTypeError(msg) + return value + + +def check_endpoint(endpoint): + try: + return client.parse_endpoint(endpoint) + except InvalidProtocol as e: + raise argparse.ArgumentTypeError(str(e)) + except client.InvalidHost as e: + raise argparse.ArgumentTypeError(str(e)) + + +def parse_args(): + conf_parser = argparse.ArgumentParser(add_help=False) + conf_parser.add_argument( + '-c', '--conf', + type=file, + help='configuration file' + ) + args, remaining = conf_parser.parse_known_args() + log_dir = '/var/log/ceph/radosgw-agent/' + log_file = 'radosgw-agent.log' + if args.conf is not None: + log_file = os.path.basename(args.conf.name) + defaults = dict( + sync_scope='incremental', + log_lock_time=20, + log_file=os.path.join(log_dir, log_file), + ) + if args.conf is not None: + with contextlib.closing(args.conf): + config = yaml.safe_load_all(args.conf) + for new in config: + defaults.update(new) + + parser = argparse.ArgumentParser( + parents=[conf_parser], + description='Synchronize radosgw installations', + ) + parser.set_defaults(**defaults) + verbosity = parser.add_mutually_exclusive_group(required=False) + verbosity.add_argument( + '-v', '--verbose', + action='store_true', dest='verbose', + help='be more verbose', + ) + verbosity.add_argument( + '-q', '--quiet', + action='store_true', dest='quiet', + help='be less verbose', + ) + parser.add_argument( + '--src-access-key', + required='src_access_key' not in defaults, + help='access key for source zone system user', + ) + parser.add_argument( + '--src-secret-key', + required='src_secret_key' not in defaults, + help='secret key for source zone system user', + ) + parser.add_argument( + '--dest-access-key', + required='dest_access_key' not in defaults, + help='access key for destination zone system user', + ) + parser.add_argument( + '--dest-secret-key', + required='dest_secret_key' not in defaults, + help='secret key for destination zone system user', + ) + parser.add_argument( + 'destination', + type=check_endpoint, + nargs=None if 'destination' not in defaults else '?', + help='radosgw endpoint to which to sync ' + '(e.g. http://zone2.example.org:8080)', + ) + src_options = parser.add_mutually_exclusive_group(required=False) + src_options.add_argument( + '--source', + type=check_endpoint, + help='radosgw endpoint from which to sync ' + '(e.g. http://zone1.example.org:8080)', + ) + src_options.add_argument( + '--src-zone', + help='radosgw zone from which to sync', + ) + parser.add_argument( + '--metadata-only', + action='store_true', + help='sync bucket and user metadata, but not bucket contents', + ) + parser.add_argument( + '--versioned', + action='store_true', + help='indicates that radosgw endpoints have object versioning enabled', + ) + parser.add_argument( + '--num-workers', + default=1, + type=check_positive_int, + help='number of items to sync at once', + ) + parser.add_argument( + '--sync-scope', + choices=['full', 'incremental'], + default='incremental', + help='synchronize everything (for a new region) or only things that ' + 'have changed since the last run', + ) + parser.add_argument( + '--lock-timeout', + type=check_positive_int, + default=60, + help='timeout in seconds after which a log segment lock will expire if ' + 'not refreshed', + ) + parser.add_argument( + '--log-file', + help='where to store log output', + ) + parser.add_argument( + '--max-entries', + type=check_positive_int, + default=1000, + help='maximum number of log entries to process at once during ' + 'continuous sync', + ) + parser.add_argument( + '--incremental-sync-delay', + type=check_positive_int, + default=30, + help='seconds to wait between syncs', + ) + parser.add_argument( + '--object-sync-timeout', + type=check_positive_int, + default=60 * 60 * 60, + help='seconds to wait for an individual object to sync before ' + 'assuming failure', + ) + parser.add_argument( + '--prepare-error-delay', + type=check_positive_int, + default=10, + help='seconds to wait before retrying when preparing ' + 'an incremental sync fails', + ) + parser.add_argument( + '--rgw-data-log-window', + type=check_positive_int, + default=30, + help='period until a data log entry is valid - ' + 'must match radosgw configuration', + ) + parser.add_argument( + '--test-server-host', + # host to run a simple http server for testing the sync agent on, + help=argparse.SUPPRESS, + ) + parser.add_argument( + '--test-server-port', + # port to run a simple http server for testing the sync agent on, + type=check_positive_int, + default=8080, + help=argparse.SUPPRESS, + ) + return parser.parse_args(remaining) + + +def sign_string( + secret_key, + verb="GET", + content_md5="", + content_type="", + date=None, + canonical_amz_headers="", + canonical_resource="/?versions" + ): + + date = date or time.asctime(time.gmtime()) + to_sign = string.concatenate(verb, content_md5, content_type, date) + to_sign = string.concatenate( + canonical_amz_headers, + canonical_resource, + newline=False + ) + return base64.b64encode(hmac.new(secret_key, to_sign, sha).digest()) + + +def check_versioning(endpoint): + date = time.asctime(time.gmtime()) + signed_string = sign_string(endpoint.secret_key, date=date) + + url = str(endpoint) + '/?versions' + headers = { + 'Authorization': 'AWS ' + endpoint.access_key + ':' + signed_string, + 'Date': date + } + + data = None + req = urllib2.Request(url, data, headers) + try: + response = urllib2.urlopen(req) + response.read() + log.debug('%s endpoint supports versioning' % endpoint) + return True + except HTTPError as error: + if error.code == 403: + log.info('%s endpoint does not support versioning' % endpoint) + log.warning('encountered issues reaching to endpoint %s' % endpoint) + log.warning(error) + except URLError as error: + log.error("was unable to connect to %s" % url) + log.error(error) + return False + + +class TestHandler(BaseHTTPRequestHandler): + """HTTP handler for testing radosgw-agent. + + This should never be used outside of testing. + """ + num_workers = None + lock_timeout = None + max_entries = None + rgw_data_log_window = 30 + src = None + dest = None + + def do_POST(self): + log = logging.getLogger(__name__) + status = 200 + resp = '' + sync_cls = None + if self.path.startswith('/metadata/full'): + sync_cls = sync.MetaSyncerFull + elif self.path.startswith('/metadata/incremental'): + sync_cls = sync.MetaSyncerInc + elif self.path.startswith('/data/full'): + sync_cls = sync.DataSyncerFull + elif self.path.startswith('/data/incremental'): + sync_cls = sync.DataSyncerInc + else: + log.warn('invalid request, ignoring') + status = 400 + resp = 'bad path' + + try: + if sync_cls is not None: + syncer = sync_cls(TestHandler.src, TestHandler.dest, + TestHandler.max_entries, + rgw_data_log_window=TestHandler.rgw_data_log_window, + object_sync_timeout=TestHandler.object_sync_timeout) + syncer.prepare() + syncer.sync( + TestHandler.num_workers, + TestHandler.lock_timeout, + ) + except Exception as e: + log.exception('error during sync') + status = 500 + resp = str(e) + + self.log_request(status, len(resp)) + if status >= 400: + self.send_error(status, resp) + else: + self.send_response(status) + self.end_headers() + + +def set_args_to_config(args): + """ + Ensure that the arguments passed in to the CLI are slapped onto the config + object so that it can be referenced throghout the agent + """ + if 'args' not in config: + config['args'] = args.__dict__ + + +def log_header(): + version = radosgw_agent.__version__ + lines = [ + ' __ __ __ ___ ___ ', + '/__` \ / |\ | / ` /\ / _` |__ |\ | | ', + '.__/ | | \| \__, /~~\ \__> |___ | \| | ', + ' v%s' % version, + ] + for line in lines: + log.info(line) + log.info('agent options:') + + secrets = [ + 'src_secret_key', 'dest_secret_key', + 'src_access_key', 'dest_access_key', + ] + + def log_dict(k, _dict, level=1): + padding = ' ' * level + log.info('%s%s:' % (padding, k)) + for key, value in sorted(_dict.items()): + if hasattr(value, '_dict'): + level += 1 + return log_dict(key, value, level) + if key in secrets: + value = '*' * 16 + log.info('%s%-30s: %s' % (padding+' ', key, value)) + + for k, v in config.items(): + if hasattr(v, '_dict'): + log_dict(k, v) + else: + log.info(' %-30s: %s' % (k, v)) + + +@catches((KeyboardInterrupt, RuntimeError, AgentError,), handle_all=True) +def main(): + # root (a.k.a. 'parent') and agent loggers + root_logger = logging.getLogger() + + # allow all levels at root_logger, handlers control individual levels + root_logger.setLevel(logging.DEBUG) + + # Console handler, meant only for user-facing information + console_loglevel = logging.INFO + + sh = logging.StreamHandler() + sh.setFormatter(util.log.color_format()) + # this console level set here before reading options from the arguments + # so that we can get errors if they pop up before + sh.setLevel(console_loglevel) + + agent_logger = logging.getLogger('radosgw_agent') + agent_logger.addHandler(sh) + + # After initial logging is configured, now parse args + args = parse_args() + + # File handler + log_file = args.log_file or 'radosgw-agent.log' + try: + fh = logging.handlers.WatchedFileHandler(log_file) + except IOError as err: + agent_logger.warning('unable to use log location: %s' % log_file) + agent_logger.warning(err) + agent_logger.warning('will fallback to ./radosgw-agent.log') + # if the location is not present, fallback to cwd + fh = logging.handlers.WatchedFileHandler('radosgw-agent.log') + + fh.setLevel(logging.DEBUG) + fh.setFormatter(logging.Formatter(util.log.BASE_FORMAT)) + + root_logger.addHandler(fh) + + if args.verbose: + console_loglevel = logging.DEBUG + elif args.quiet: + console_loglevel = logging.WARN + + # now that we have parsed the actual log level we need + # reset it in the handler + sh.setLevel(console_loglevel) + + # after loggin is set ensure that the arguments are present in the + # config object + set_args_to_config(args) + + log_header() + dest = args.destination + dest.access_key = args.dest_access_key + dest.secret_key = args.dest_secret_key + src = args.source or client.Endpoint(None, None, None) + if args.src_zone: + src.zone = args.src_zone + dest_conn = client.connection(dest) + + try: + region_map = client.get_region_map(dest_conn) + except AgentError: + # anything that we know about and are correctly raising should + # just get raised so that the decorator can handle it + raise + except Exception as error: + # otherwise, we have the below exception that will nicely deal with + # explaining what happened + raise RegionMapError(error) + + client.configure_endpoints(region_map, dest, src, args.metadata_only) + + src.access_key = args.src_access_key + src.secret_key = args.src_secret_key + + if config['args']['versioned']: + log.debug('versioned flag enabled, overriding versioning check') + config['use_versioning'] = True + else: + config['use_versioning'] = check_versioning(src) + + if args.test_server_host: + log.warn('TEST MODE - do not run unless you are testing this program') + TestHandler.src = src + TestHandler.dest = dest + TestHandler.num_workers = args.num_workers + TestHandler.lock_timeout = args.lock_timeout + TestHandler.max_entries = args.max_entries + TestHandler.rgw_data_log_window = args.rgw_data_log_window + TestHandler.object_sync_timeout = args.object_sync_timeout + server = HTTPServer((args.test_server_host, args.test_server_port), + TestHandler) + server.serve_forever() + sys.exit() + + if args.sync_scope == 'full': + meta_cls = sync.MetaSyncerFull + data_cls = sync.DataSyncerFull + else: + meta_cls = sync.MetaSyncerInc + data_cls = sync.DataSyncerInc + + meta_syncer = meta_cls(src, dest, args.max_entries) + data_syncer = data_cls(src, dest, args.max_entries, + rgw_data_log_window=args.rgw_data_log_window, + object_sync_timeout=args.object_sync_timeout) + + # fetch logs first since data logs need to wait before becoming usable + # due to rgw's window of data log updates during which the bucket index + # log may still be updated without the data log getting a new entry for + # the bucket + sync.prepare_sync(meta_syncer, args.prepare_error_delay) + if not args.metadata_only: + sync.prepare_sync(data_syncer, args.prepare_error_delay) + + if args.sync_scope == 'full': + log.info('syncing all metadata') + meta_syncer.sync(args.num_workers, args.lock_timeout) + if not args.metadata_only: + log.info('syncing all data') + data_syncer.sync(args.num_workers, args.lock_timeout) + log.info('Finished full sync. Check logs to see any issues that ' + 'incremental sync will retry.') + else: + sync.incremental_sync(meta_syncer, data_syncer, + args.num_workers, + args.lock_timeout, + args.incremental_sync_delay, + args.metadata_only, + args.prepare_error_delay) diff --git a/radosgw_agent/client.py b/radosgw_agent/client.py new file mode 100644 index 0000000..2677b5b --- /dev/null +++ b/radosgw_agent/client.py @@ -0,0 +1,634 @@ +import boto +import functools +import json +import logging +import random +import socket +import urllib +from urlparse import urlparse + +from boto.exception import BotoServerError +from boto.s3.connection import S3Connection + +from radosgw_agent import request as aws_request +from radosgw_agent import config +from radosgw_agent import exceptions as exc +from radosgw_agent.constants import DEFAULT_TIME +from radosgw_agent.exceptions import NetworkError + +log = logging.getLogger(__name__) + + +class Endpoint(object): + def __init__(self, host, port, secure, + access_key=None, secret_key=None, region=None, zone=None): + self.host = host + default_port = 443 if secure else 80 + self.port = port or default_port + self.secure = secure + self.access_key = access_key + self.secret_key = secret_key + self.region = region + self.zone = zone + + def __eq__(self, other): + if self.host != other.host: + return False + if self.port == other.port: + return True + # if self and other are mixed http/https with default ports, + # i.e. http://example.com and https://example.com, consider + # them the same + + def diff_only_default_ports(a, b): + return a.secure and a.port == 443 and not b.secure and b.port == 80 + return (diff_only_default_ports(self, other) or + diff_only_default_ports(other, self)) + + def __repr__(self): + return 'Endpoint(host={host}, port={port}, secure={secure})'.format( + host=self.host, + port=self.port, + secure=self.secure) + + def __str__(self): + scheme = 'https' if self.secure else 'http' + return '{scheme}://{host}:{port}'.format(scheme=scheme, + host=self.host, + port=self.port) + + +def parse_endpoint(endpoint): + url = urlparse(endpoint) + if url.scheme not in ['http', 'https']: + raise exc.InvalidProtocol('invalid protocol %r' % url.scheme) + if not url.hostname: + raise exc.InvalidHost('no hostname in %r' % endpoint) + return Endpoint(url.hostname, url.port, url.scheme == 'https') + +code_to_exc = { + 404: exc.NotFound, + } + + +def boto_call(func): + @functools.wraps(func) + def translate_exception(*args, **kwargs): + try: + return func(*args, **kwargs) + except boto.exception.S3ResponseError as e: + raise code_to_exc.get(e.status, exc.HttpError)(e.status, e.body) + return translate_exception + + +def check_result_status(result): + if result.status / 100 != 2: + raise code_to_exc.get(result.status, + exc.HttpError)(result.status, result.reason) + + +def url_safe(component): + if isinstance(component, basestring): + string = component.encode('utf8') + else: + string = str(component) + return urllib.quote(string) + + +def request(connection, type_, resource, params=None, headers=None, + data=None, expect_json=True, special_first_param=None, _retries=3): + if headers is None: + headers = {} + + if type_ in ['put', 'post']: + headers['Content-Type'] = 'application/json; charset=UTF-8' + + request_data = data if data else '' + if params is None: + params = {} + safe_params = dict([(k, url_safe(v)) for k, v in params.iteritems()]) + connection.count_request() + request = aws_request.base_http_request(connection.s3_connection, + type_.upper(), + resource=resource, + special_first_param=special_first_param, + headers=headers, + data=request_data, + params=safe_params) + + url = '{protocol}://{host}{path}'.format(protocol=request.protocol, + host=request.host, + path=request.path) + + request.authorize(connection=connection) + + boto.log.debug('url = %r\nparams=%r\nheaders=%r\ndata=%r', + url, params, request.headers, data) + + try: + result = aws_request.make_request( + connection.s3_connection, + type_.upper(), + resource=resource, + special_first_param=special_first_param, + headers=headers, + data=request_data, + params=safe_params, + _retries=_retries) + except socket.error as error: + msg = 'unable to connect to %s %s' % (request.host, error) + raise NetworkError(msg) + + except BotoServerError as error: + check_result_status(error) + + check_result_status(result) + + if data or not expect_json: + return result + + return json.loads(result.read()) + + +def get_metadata(connection, section, name): + return request(connection, 'get', 'admin/metadata/' + section, + params=dict(key=name)) + + +def update_metadata(connection, section, name, metadata): + if not isinstance(metadata, basestring): + metadata = json.dumps(metadata) + return request(connection, 'put', 'admin/metadata/' + section, + params=dict(key=name), data=metadata) + + +def delete_metadata(connection, section, name): + return request(connection, 'delete', 'admin/metadata/' + section, + params=dict(key=name), expect_json=False) + + +def get_metadata_sections(connection): + return request(connection, 'get', 'admin/metadata') + + +def list_metadata_keys(connection, section): + return request(connection, 'get', 'admin/metadata/' + section) + + +def get_op_state(connection, client_id, op_id, bucket, obj): + return request(connection, 'get', 'admin/opstate', + params={ + 'op-id': op_id, + 'object': u'{0}/{1}'.format(bucket, obj.name), + 'client-id': client_id, + } + ) + + +def remove_op_state(connection, client_id, op_id, bucket, obj): + return request(connection, 'delete', 'admin/opstate', + params={ + 'op-id': op_id, + 'object': u'{0}/{1}'.format(bucket, obj.name), + 'client-id': client_id, + }, + expect_json=False, + ) + + +def get_bucket_list(connection): + return list_metadata_keys(connection, 'bucket') + + +@boto_call +def list_objects_in_bucket(connection, bucket_name): + versioned = config['use_versioning'] + + # use the boto library to do this + bucket = connection.get_bucket(bucket_name) + list_call = bucket.list_versions if versioned else bucket.list + try: + for key in list_call(): + yield key + except boto.exception.S3ResponseError as e: + # since this is a generator, the exception will be raised when + # it's read, rather than when this call returns, so raise a + # unique exception to distinguish this from client errors from + # other calls + if e.status == 404: + raise exc.BucketEmpty() + else: + raise + + +@boto_call +def mark_delete_object(connection, bucket_name, obj, params=None): + """ + Marking an object for deletion is only necessary for versioned objects, we + should not try these calls for non-versioned ones. + + Usually, only full-sync operations will use this call, incremental should + perform actual delete operations with ``delete_versioned_object`` + """ + params = params or {} + + params['rgwx-version-id'] = obj.version_id + params['rgwx-versioned-epoch'] = obj.VersionedEpoch + + path = u'{bucket}/{object}'.format( + bucket=bucket_name, + object=obj.name, + ) + + return request(connection, 'delete', path, + params=params, + expect_json=False) + + +@boto_call +def delete_versioned_object(connection, bucket_name, obj): + """ + Perform a delete on a versioned object, the requirements for these types + of requests is to be able to pass the ``versionID`` as a query argument + """ + # if obj.delete_marker is False we should not delete this and we shouldn't + # have been called, so return without doing anything + if getattr(obj, 'delete_marker', False) is False: + log.info('obj: %s has `delete_marker=False`, will skip' % obj.name) + return + + params = {} + + params['rgwx-version-id'] = obj.version_id + params['rgwx-versioned-epoch'] = obj.VersionedEpoch + params['versionID'] = obj.version_id + + path = u'{bucket}/{object}'.format( + bucket=bucket_name, + object=obj.name, + ) + + return request(connection, 'delete', path, + params=params, + expect_json=False) + + +@boto_call +def delete_object(connection, bucket_name, obj): + if is_versioned(obj): + log.debug('performing a delete for versioned obj: %s' % obj.name) + delete_versioned_object(connection, bucket_name, obj) + else: + bucket = connection.get_bucket(bucket_name) + bucket.delete_key(obj.name) + + +def is_versioned(obj): + """ + Check if a given object is versioned by inspecting some of its attributes. + """ + # before any heuristic, newer versions of RGW will tell if an obj is + # versioned so try that first + if hasattr(obj, 'versioned'): + return obj.versioned + + if not hasattr(obj, 'VersionedEpoch'): + # overly paranoid here, an object that is not versioned should *never* + # have a `VersionedEpoch` attribute + if getattr(obj, 'version_id', None): + if obj.version_id is None: + return False + return True # probably will never get here + return False + return True + + +def sync_object_intra_region(connection, bucket_name, obj, src_zone, + client_id, op_id): + + params = { + 'rgwx-source-zone': src_zone, + 'rgwx-client-id': client_id, + 'rgwx-op-id': op_id, + } + + if is_versioned(obj): + log.debug('detected obj as versioned: %s' % obj.name) + log.debug('obj attributes are:') + for k in dir(obj): + if not k.startswith('_'): + v = getattr(obj, k, None) + log.debug('%s.%s = %s' % (obj.name, k, v)) + + # set the extra params to support versioned operations + params['rgwx-version-id'] = obj.version_id + params['rgwx-versioned-epoch'] = obj.VersionedEpoch + + # delete_marker may not exist in the obj + if getattr(obj, 'delete_marker', None) is True: + log.debug('obj %s has a delete_marker, marking for deletion' % obj.name) + # when the object has a delete marker we need to create it with + # a delete marker on the destination rather than copying + return mark_delete_object(connection, bucket_name, obj, params=params) + + path = u'{bucket}/{object}'.format( + bucket=bucket_name, + object=obj.name, + ) + + return request(connection, 'put', path, + params=params, + headers={ + 'x-amz-copy-source': url_safe('%s/%s' % (bucket_name, obj.name)), + }, + expect_json=False) + + +def lock_shard(connection, lock_type, shard_num, zone_id, timeout, locker_id): + return request(connection, 'post', 'admin/log', + params={ + 'type': lock_type, + 'id': shard_num, + 'length': timeout, + 'zone-id': zone_id, + 'locker-id': locker_id, + }, + special_first_param='lock', + expect_json=False) + + +def unlock_shard(connection, lock_type, shard_num, zone_id, locker_id): + return request(connection, 'post', 'admin/log', + params={ + 'type': lock_type, + 'id': shard_num, + 'locker-id': locker_id, + 'zone-id': zone_id, + }, + special_first_param='unlock', + expect_json=False) + + +def _id_name(type_): + return 'bucket-instance' if type_ == 'bucket-index' else 'id' + + +def get_log(connection, log_type, marker, max_entries, id_): + key = _id_name(log_type) + return request(connection, 'get', 'admin/log', + params={ + 'type': log_type, + key: id_, + 'marker': marker, + 'max-entries': max_entries, + }, + ) + + +def get_log_info(connection, log_type, id_): + key = _id_name(log_type) + return request( + connection, 'get', 'admin/log', + params={ + 'type': log_type, + key: id_, + }, + special_first_param='info', + ) + + +def num_log_shards(connection, shard_type): + out = request(connection, 'get', 'admin/log', dict(type=shard_type)) + return out['num_objects'] + + +def set_worker_bound(connection, type_, marker, timestamp, + daemon_id, id_, data=None, sync_type='incremental'): + """ + + :param sync_type: The type of synchronization that should be attempted by + the agent, defaulting to "incremental" but can also be "full". + """ + if data is None: + data = [] + key = _id_name(type_) + boto.log.debug('set_worker_bound: data = %r', data) + return request( + connection, 'post', 'admin/replica_log', + params={ + 'type': type_, + key: id_, + 'marker': marker, + 'time': timestamp, + 'daemon_id': daemon_id, + 'sync-type': sync_type, + }, + data=json.dumps(data), + special_first_param='work_bound', + ) + + +def del_worker_bound(connection, type_, daemon_id, id_): + key = _id_name(type_) + return request( + connection, 'delete', 'admin/replica_log', + params={ + 'type': type_, + key: id_, + 'daemon_id': daemon_id, + }, + special_first_param='work_bound', + expect_json=False, + ) + + +def get_worker_bound(connection, type_, id_): + key = _id_name(type_) + try: + out = request( + connection, 'get', 'admin/replica_log', + params={ + 'type': type_, + key: id_, + }, + special_first_param='bounds', + ) + boto.log.debug('get_worker_bound returned: %r', out) + except exc.NotFound: + log.debug('no worker bound found for bucket instance "%s"', + id_) + # if no worker bounds have been set, start from the beginning + # returning fallback, default values + return dict( + marker=' ', + oldest_time=DEFAULT_TIME, + retries=[] + ) + + retries = set() + for item in out['markers']: + names = [retry['name'] for retry in item['items_in_progress']] + retries = retries.union(names) + out['retries'] = retries + return out + + +class Zone(object): + def __init__(self, zone_info): + self.name = zone_info['name'] + self.is_master = False + self.endpoints = [parse_endpoint(e) for e in zone_info['endpoints']] + self.log_meta = zone_info['log_meta'] == 'true' + self.log_data = zone_info['log_data'] == 'true' + + def __repr__(self): + return str(self) + + def __str__(self): + return self.name + + +class Region(object): + def __init__(self, region_info): + self.name = region_info['key'] + self.is_master = region_info['val']['is_master'] == 'true' + self.zones = {} + for zone_info in region_info['val']['zones']: + zone = Zone(zone_info) + self.zones[zone.name] = zone + if zone.name == region_info['val']['master_zone']: + zone.is_master = True + self.master_zone = zone + assert hasattr(self, 'master_zone'), \ + 'No master zone found for region ' + self.name + + def __repr__(self): + return str(self) + + def __str__(self): + return str(self.zones.keys()) + + +class RegionMap(object): + def __init__(self, region_map): + self.regions = {} + for region_info in region_map['regions']: + region = Region(region_info) + self.regions[region.name] = region + if region.is_master: + self.master_region = region + assert hasattr(self, 'master_region'), \ + 'No master region found in region map' + + def __repr__(self): + return str(self) + + def __str__(self): + return str(self.regions) + + def find_endpoint(self, endpoint): + for region in self.regions.itervalues(): + for zone in region.zones.itervalues(): + if endpoint in zone.endpoints or endpoint.zone == zone.name: + return region, zone + raise exc.ZoneNotFound('%s not found in region map' % endpoint) + + +def get_region_map(connection): + region_map = request(connection, 'get', 'admin/config') + return RegionMap(region_map) + + +def _validate_sync_dest(dest_region, dest_zone): + if dest_region.is_master and dest_zone.is_master: + raise exc.InvalidZone('destination cannot be master zone of master region') + + +def _validate_sync_source(src_region, src_zone, dest_region, dest_zone, + meta_only): + if not src_zone.is_master: + raise exc.InvalidZone('source zone %s must be a master zone' % src_zone.name) + if (src_region.name == dest_region.name and + src_zone.name == dest_zone.name): + raise exc.InvalidZone('source and destination must be different zones') + if not src_zone.log_meta: + raise exc.InvalidZone('source zone %s must have metadata logging enabled' % src_zone.name) + if not meta_only and not src_zone.log_data: + raise exc.InvalidZone('source zone %s must have data logging enabled' % src_zone.name) + if not meta_only and src_region.name != dest_region.name: + raise exc.InvalidZone('data sync can only occur between zones in the same region') + if not src_zone.endpoints: + raise exc.InvalidZone('region map contains no endpoints for default source zone %s' % src_zone.name) + + +def configure_endpoints(region_map, dest_endpoint, src_endpoint, meta_only): + print('region map is: %r' % region_map) + + dest_region, dest_zone = region_map.find_endpoint(dest_endpoint) + _validate_sync_dest(dest_region, dest_zone) + + # source may be specified by http endpoint or zone name + if src_endpoint.host or src_endpoint.zone: + src_region, src_zone = region_map.find_endpoint(src_endpoint) + else: + # try the master zone in the same region, then the master zone + # in the master region + try: + _validate_sync_source(dest_region, dest_region.master_zone, + dest_region, dest_zone, meta_only) + src_region, src_zone = dest_region, dest_region.master_zone + except exc.InvalidZone as e: + log.debug('source region %s zone %s unaccetpable: %s', + dest_region.name, dest_region.master_zone.name, e) + master_region = region_map.master_region + src_region, src_zone = master_region, master_region.master_zone + + _validate_sync_source(src_region, src_zone, dest_region, dest_zone, + meta_only) + + # choose a random source endpoint if one wasn't specified + if not src_endpoint.host: + endpoint = random.choice(src_zone.endpoints) + src_endpoint.host = endpoint.host + src_endpoint.port = endpoint.port + src_endpoint.secure = endpoint.secure + + # fill in region and zone names + dest_endpoint.region = dest_region + dest_endpoint.zone = dest_zone + src_endpoint.region = src_region + src_endpoint.zone = src_zone + + +class S3ConnectionWrapper(object): + def __init__(self, endpoint, debug): + self.endpoint = endpoint + self.debug = debug + self.s3_connection = None + self.reqs_before_reset = 512 + self._recreate_s3_connection() + + def count_request(self): + self.num_requests += 1 + if self.num_requests > self.reqs_before_reset: + self._recreate_s3_connection() + + def _recreate_s3_connection(self): + self.num_requests = 0 + self.s3_connection = S3Connection( + aws_access_key_id=self.endpoint.access_key, + aws_secret_access_key=self.endpoint.secret_key, + is_secure=self.endpoint.secure, + host=self.endpoint.host, + port=self.endpoint.port, + calling_format=boto.s3.connection.OrdinaryCallingFormat(), + debug=self.debug, + ) + + def __getattr__(self, attrib): + return getattr(self.s3_connection, attrib) + + +def connection(endpoint, debug=None): + log.info('creating connection to endpoint: %s' % endpoint) + return S3ConnectionWrapper(endpoint, debug) diff --git a/radosgw_agent/constants.py b/radosgw_agent/constants.py new file mode 100644 index 0000000..0492ff6 --- /dev/null +++ b/radosgw_agent/constants.py @@ -0,0 +1,6 @@ + +RESULT_SUCCESS = 0 +RESULT_ERROR = 1 + +DEFAULT_TIME = '1970-01-01 00:00:00' + diff --git a/radosgw_agent/exceptions.py b/radosgw_agent/exceptions.py new file mode 100644 index 0000000..f461c40 --- /dev/null +++ b/radosgw_agent/exceptions.py @@ -0,0 +1,77 @@ + +class AgentError(Exception): + """ + The actual base exception for the agent + """ + + +class ClientException(AgentError): + """ + Historical base radosgw_agent client exception. + """ + pass + + +class NetworkError(AgentError): + pass + + +class RegionMapError(AgentError): + + def __init__(self, error): + self.error = error + + def __str__(self): + msg = 'Could not retrieve region map from destination: %s' + return msg % self.error + + +class InvalidProtocol(ClientException): + pass + + +class InvalidHost(ClientException): + pass + + +class InvalidZone(ClientException): + pass + + +class ZoneNotFound(ClientException): + pass + + +class BucketEmpty(ClientException): + pass + + +class HttpError(ClientException): + def __init__(self, code, body): + self.code = code + self.str_code = str(code) + self.body = body + self.message = 'Http error code %s content %s' % (code, body) + + def __str__(self): + return self.message + + +class NotFound(HttpError): + pass + + +class SkipShard(Exception): + pass + + +class SyncError(Exception): + pass + + +class SyncTimedOut(SyncError): + pass + + +class SyncFailed(SyncError): + pass diff --git a/radosgw_agent/lock.py b/radosgw_agent/lock.py new file mode 100644 index 0000000..2036f8e --- /dev/null +++ b/radosgw_agent/lock.py @@ -0,0 +1,107 @@ +import logging +import threading +import time + +from radosgw_agent import client + +log = logging.getLogger(__name__) + +class LockBroken(Exception): + pass + +class LockRenewFailed(LockBroken): + pass + +class LockExpired(LockBroken): + pass + +class Lock(threading.Thread): + """A lock on a shard log that automatically refreshes itself. + + It may be used to lock different shards throughout its lifetime. + To lock a new shard, call aquire() with the shard_num desired. + + To release the lock, call release_and_clear(). This will raise an + exception if the lock ever failed to be acquired in the timeout + period. + """ + + def __init__(self, conn, type_, locker_id, timeout, zone_id): + super(Lock, self).__init__() + self.conn = conn + self.type = type_ + self.timeout = timeout + self.lock = threading.Lock() + self.locker_id = locker_id + self.zone_id = zone_id + self.shard_num = None + self.last_locked = None + self.failed = False + + def set_shard(self, shard_num): + log.debug('set_shard to %d', shard_num) + with self.lock: + assert self.shard_num is None, \ + 'attempted to acquire new lock without releasing old one' + self.failed = False + self.last_locked = None + self.shard_num = shard_num + + def unset_shard(self): + log.debug('unset shard') + with self.lock: + self.shard_num = None + + def acquire(self): + """Renew an existing lock, or acquire a new one. + + The old lock must have already been released if shard_num is specified. + client.NotFound may be raised if the log contains no entries. + """ + log.debug('acquire lock') + with self.lock: + self._acquire() + + def _acquire(self): + # same as aqcuire() but assumes self.lock is held + now = time.time() + client.lock_shard(self.conn, self.type, self.shard_num, + self.zone_id, self.timeout, self.locker_id) + self.last_locked = now + + def release_and_clear(self): + """Release the lock currently being held. + + Prevent it from being automatically renewed, and check if there + were any errors renewing the current lock or if it expired. + If the lock was not sustained, raise LockAcquireFailed or LockExpired. + """ + log.debug('release and clear lock') + with self.lock: + shard_num = self.shard_num + self.shard_num = None + diff = time.time() - self.last_locked + if diff > self.timeout: + msg = 'lock was not renewed in over %0.2f seconds' % diff + raise LockExpired(msg) + if self.failed: + raise LockRenewFailed() + try: + client.unlock_shard(self.conn, self.type, shard_num, + self.zone_id, self.locker_id) + except client.HttpError as e: + log.warn('failed to unlock shard %d in zone %s: %s', + shard_num, self.zone_id, e) + self.last_locked = None + + def run(self): + while True: + with self.lock: + if self.shard_num is not None: + try: + self._acquire() + except client.HttpError as e: + log.error('locking shard %d in zone %s failed: %s', + self.shard_num, self.zone_id, e) + self.failed = True + time.sleep(0.5 * self.timeout) diff --git a/radosgw_agent/request.py b/radosgw_agent/request.py new file mode 100644 index 0000000..86cc066 --- /dev/null +++ b/radosgw_agent/request.py @@ -0,0 +1,189 @@ +import boto +import logging +import sys +from boto.connection import AWSAuthConnection + +log = logging.getLogger(__name__) + + +def urlencode(query, doseq=0): + """ + Note: ported from urllib.urlencode, but with the ability to craft the query + string without quoting the params again. + + Encode a sequence of two-element tuples or dictionary into a URL query + string. + + If any values in the query arg are sequences and doseq is true, each + sequence element is converted to a separate parameter. + + If the query arg is a sequence of two-element tuples, the order of the + parameters in the output will match the order of parameters in the + input. + """ + + if hasattr(query, "items"): + # mapping objects + query = query.items() + else: + # it's a bother at times that strings and string-like objects are + # sequences... + try: + # non-sequence items should not work with len() + # non-empty strings will fail this + if len(query) and not isinstance(query[0], tuple): + raise TypeError + # zero-length sequences of all types will get here and succeed, + # but that's a minor nit - since the original implementation + # allowed empty dicts that type of behavior probably should be + # preserved for consistency + except TypeError: + ty, va, tb = sys.exc_info() + raise TypeError, "not a valid non-string sequence or mapping object", tb + + l = [] + if not doseq: + # preserve old behavior + for k, v in query: + k = str(k) + v = str(v) + l.append(k + '=' + v) + print l + else: + for k, v in query: + k = str(k) + if isinstance(v, str): + l.append(k + '=' + v) + elif isinstance(v, unicode): + # is there a reasonable way to convert to ASCII? + # encode generates a string, but "replace" or "ignore" + # lose information and "strict" can raise UnicodeError + v = v.encode("ASCII", "replace") + l.append(k + '=' + v) + else: + try: + # is this a sufficient test for sequence-ness? + len(v) + except TypeError: + # not a sequence + v = str(v) + l.append(k + '=' + v) + else: + # loop over the sequence + for elt in v: + l.append(k + '=' + str(elt)) + return '&'.join(l) + + +class MetaData(object): + """ + A basic container class that other than the method, it just registers + all the keyword arguments passed in, so that it is easier/nicer to + re-use the values + """ + + def __init__(self, conn, method, **kw): + self.conn = conn + self.method = method + for k, v in kw.items(): + setattr(self, k, v) + + +def base_http_request(conn, method, basepath='', resource='', headers=None, + data=None, special_first_param=None, params=None): + """ + Returns a ``AWSAuthConnection.build_base_http_request`` call with the + preserving of the special params done by ``build``. + """ + + # request meta data + md = build( + conn, + method, + basepath=basepath, + resource=resource, + headers=headers, + data=data, + special_first_param=special_first_param, + params=params, + ) + + return AWSAuthConnection.build_base_http_request( + md.conn, md.method, md.path, + md.auth_path, md.params, md.headers, + md.data, md.host) + + +def make_request(conn, method, basepath='', resource='', headers=None, + data=None, special_first_param=None, params=None, _retries=3): + """ + Returns a ``AWSAuthConnection.make_request`` call with the preserving + of the special params done by ``build``. + """ + # request meta data + md = build( + conn, + method, + basepath=basepath, + resource=resource, + headers=headers, + data=data, + special_first_param=special_first_param, + params=params, + ) + + if params: + # we basically need to do this ourselves now. BOTO doesn't do it for us + # in make_request + result = [] + for k, vs in params.items(): + if isinstance(vs, basestring) or not hasattr(vs, '__iter__'): + vs = [vs] + for v in vs: + if v is not None: + result.append( + (k.encode('utf-8') if isinstance(k, str) else k, + v.encode('utf-8') if isinstance(v, str) else v)) + appending_char = '&' if md.special_first_param else '?' + md.path = '%s%s%s' % (md.path, appending_char, urlencode(result, doseq=True)) + + return AWSAuthConnection.make_request( + md.conn, md.method, md.path, + headers=md.headers, + data=md.data, + host=md.host, + auth_path=md.auth_path, + params=md.params, + override_num_retries=_retries + ) + + +def build(conn, method, basepath='', resource='', headers=None, + data=None, special_first_param=None, params=None): + """ + Adapted from the build_request() method of boto.connection + """ + + path = conn.calling_format.build_path_base(basepath, resource) + auth_path = conn.calling_format.build_auth_path(basepath, resource) + host = conn.calling_format.build_host(conn.server_name(), '') + + if special_first_param: + path += '?' + special_first_param + boto.log.debug('path=%s' % path) + auth_path += '?' + special_first_param + boto.log.debug('auth_path=%s' % auth_path) + + return MetaData( + conn, + method, + path=path, + auth_path=auth_path, + basepath=basepath, + resource=resource, + headers=headers, + data=data, + special_first_param=special_first_param, + params=params, + host=host, + ) diff --git a/radosgw_agent/sync.py b/radosgw_agent/sync.py new file mode 100644 index 0000000..39797ed --- /dev/null +++ b/radosgw_agent/sync.py @@ -0,0 +1,328 @@ +import logging +import multiprocessing +import time + +from radosgw_agent import worker +from radosgw_agent import client +from radosgw_agent.exceptions import NotFound, HttpError + + +log = logging.getLogger(__name__) + +# the replica log api only supports one entry, and updating it +# requires sending a daemon id that matches the existing one. This +# doesn't make a whole lot of sense with the current structure of +# radosgw-agent, so just use a constant value for the daemon id. +DAEMON_ID = 'radosgw-agent' + +def prepare_sync(syncer, error_delay): + """Attempt to prepare a syncer for running a sync. + + :param error_delay: seconds to wait before retrying + + This will retry forever so the sync agent continues if radosgws + are unavailable temporarily. + """ + while True: + try: + syncer.prepare() + break + except Exception: + log.warn('error preparing for sync, will retry. Traceback:', + exc_info=True) + time.sleep(error_delay) + +def incremental_sync(meta_syncer, data_syncer, num_workers, lock_timeout, + incremental_sync_delay, metadata_only, error_delay): + """Run a continuous incremental sync. + + This will run forever, pausing between syncs by a + incremental_sync_delay seconds. + """ + while True: + try: + meta_syncer.sync(num_workers, lock_timeout) + if not metadata_only: + data_syncer.sync(num_workers, lock_timeout) + except Exception: + log.warn('error doing incremental sync, will try again. Traceback:', + exc_info=True) + + # prepare data before sleeping due to rgw_log_bucket_window + if not metadata_only: + prepare_sync(data_syncer, error_delay) + log.info('waiting %d seconds until next sync', + incremental_sync_delay) + time.sleep(incremental_sync_delay) + prepare_sync(meta_syncer, error_delay) + +class Syncer(object): + def __init__(self, src, dest, max_entries, *args, **kwargs): + self.src = src + self.dest = dest + self.src_conn = client.connection(src) + self.dest_conn = client.connection(dest) + self.daemon_id = DAEMON_ID + self.worker_cls = None # filled in by subclass constructor + self.num_shards = None + self.max_entries = max_entries + self.object_sync_timeout = kwargs.get('object_sync_timeout') + + def init_num_shards(self): + if self.num_shards is not None: + return + try: + self.num_shards = client.num_log_shards(self.src_conn, self.type) + log.debug('%d shards to check', self.num_shards) + except Exception: + log.error('finding number of shards failed') + raise + + def shard_num_for_key(self, key): + key = key.encode('utf8') + hash_val = 0 + for char in key: + c = ord(char) + hash_val = (hash_val + (c << 4) + (c >> 4)) * 11 + return hash_val % self.num_shards + + def prepare(self): + """Setup any state required before syncing starts. + + This must be called before sync(). + """ + pass + + def generate_work(self): + """Generate items to be place in a queue or processing""" + pass + + def wait_until_ready(self): + pass + + def complete_item(self, shard_num, retries): + """Called when syncing a single item completes successfully""" + marker = self.shard_info.get(shard_num) + if not marker: + return + try: + data = [dict(name=retry, time=worker.DEFAULT_TIME) + for retry in retries] + client.set_worker_bound(self.dest_conn, + self.type, + marker, + worker.DEFAULT_TIME, + self.daemon_id, + shard_num, + data) + except Exception: + log.warn('could not set worker bounds, may repeat some work.' + 'Traceback:', exc_info=True) + + def sync(self, num_workers, log_lock_time): + workQueue = multiprocessing.Queue() + resultQueue = multiprocessing.Queue() + + processes = [self.worker_cls(workQueue, + resultQueue, + log_lock_time, + self.src, + self.dest, + daemon_id=self.daemon_id, + max_entries=self.max_entries, + object_sync_timeout=self.object_sync_timeout, + ) + for i in xrange(num_workers)] + for process in processes: + process.daemon = True + process.start() + + self.wait_until_ready() + + log.info('Starting sync') + # enqueue the shards to be synced + num_items = 0 + for item in self.generate_work(): + num_items += 1 + workQueue.put(item) + + # add a poison pill for each worker + for i in xrange(num_workers): + workQueue.put(None) + + # pull the results out as they are produced + retries = {} + for i in xrange(num_items): + result, item = resultQueue.get() + shard_num, retries = item + if result == worker.RESULT_SUCCESS: + log.debug('synced item %r successfully', item) + self.complete_item(shard_num, retries) + else: + log.error('error syncing shard %d', shard_num) + retries.append(shard_num) + + log.info('%d/%d items processed', i + 1, num_items) + if retries: + log.error('Encountered errors syncing these %d shards: %r', + len(retries), retries) + + +class IncrementalSyncer(Syncer): + + def get_worker_bound(self, shard_num): + bound = client.get_worker_bound( + self.dest_conn, + self.type, + shard_num) + + marker = bound['marker'] + retries = bound['retries'] + + log.debug('oldest marker and time for shard %d are: %r %r', + shard_num, marker, bound['oldest_time']) + log.debug('%d items to retrie are: %r', len(retries), retries) + + return marker, retries + + + def get_log_entries(self, shard_num, marker): + try: + result = client.get_log(self.src_conn, self.type, + marker, self.max_entries, + shard_num) + last_marker = result['marker'] + log_entries = result['entries'] + if len(log_entries) == self.max_entries: + log.warn('shard %d log has fallen behind - log length >= %d', + shard_num, self.max_entries) + except NotFound: + # no entries past this marker yet, but we my have retries + last_marker = ' ' + log_entries = [] + return last_marker, log_entries + + def prepare(self): + self.init_num_shards() + + self.shard_info = {} + self.shard_work = {} + for shard_num in xrange(self.num_shards): + marker, retries = self.get_worker_bound(shard_num) + last_marker, log_entries = self.get_log_entries(shard_num, marker) + self.shard_work[shard_num] = log_entries, retries + self.shard_info[shard_num] = last_marker + + self.prepared_at = time.time() + + def generate_work(self): + return self.shard_work.iteritems() + + +class MetaSyncerInc(IncrementalSyncer): + + def __init__(self, *args, **kwargs): + super(MetaSyncerInc, self).__init__(*args, **kwargs) + self.worker_cls = worker.MetadataWorkerIncremental + self.type = 'metadata' + + +class DataSyncerInc(IncrementalSyncer): + + def __init__(self, *args, **kwargs): + super(DataSyncerInc, self).__init__(*args, **kwargs) + self.worker_cls = worker.DataWorkerIncremental + self.type = 'data' + self.rgw_data_log_window = kwargs.get('rgw_data_log_window', 30) + + def wait_until_ready(self): + log.info('waiting to make sure bucket log is consistent') + while time.time() < self.prepared_at + self.rgw_data_log_window: + time.sleep(1) + + +class DataSyncerFull(Syncer): + + def __init__(self, *args, **kwargs): + super(DataSyncerFull, self).__init__(*args, **kwargs) + self.worker_cls = worker.DataWorkerFull + self.type = 'data' + self.rgw_data_log_window = kwargs.get('rgw_data_log_window', 30) + + def prepare(self): + log.info('preparing to do a full data sync') + self.init_num_shards() + + # save data log markers for each shard + self.shard_info = {} + for shard in xrange(self.num_shards): + info = client.get_log_info(self.src_conn, 'data', shard) + # setting an empty marker returns an error + if info['marker']: + self.shard_info[shard] = info['marker'] + else: + self.shard_info[shard] = ' ' + + # get list of buckets after getting any markers to avoid skipping + # entries added before we got the marker info + log.debug('getting bucket list') + buckets = client.get_bucket_list(self.src_conn) + + self.prepared_at = time.time() + + self.buckets_by_shard = {} + for bucket in buckets: + shard = self.shard_num_for_key(bucket) + self.buckets_by_shard.setdefault(shard, []) + self.buckets_by_shard[shard].append(bucket) + + def generate_work(self): + return self.buckets_by_shard.iteritems() + + def wait_until_ready(self): + log.info('waiting to make sure bucket log is consistent') + while time.time() < self.prepared_at + self.rgw_data_log_window: + time.sleep(1) + + +class MetaSyncerFull(Syncer): + def __init__(self, *args, **kwargs): + super(MetaSyncerFull, self).__init__(*args, **kwargs) + self.worker_cls = worker.MetadataWorkerFull + self.type = 'metadata' + + def prepare(self): + try: + self.sections = client.get_metadata_sections(self.src_conn) + except HttpError as e: + log.error('Error listing metadata sections: %s', e) + raise + + # grab the lastest shard markers and timestamps before we sync + self.shard_info = {} + self.init_num_shards() + for shard_num in xrange(self.num_shards): + info = client.get_log_info(self.src_conn, 'metadata', shard_num) + # setting an empty marker returns an error + if info['marker']: + self.shard_info[shard_num] = info['marker'] + else: + self.shard_info[shard_num] = ' ' + + self.metadata_by_shard = {} + for section in self.sections: + try: + for key in client.list_metadata_keys(self.src_conn, section): + shard = self.shard_num_for_key(section + ':' + key) + self.metadata_by_shard.setdefault(shard, []) + self.metadata_by_shard[shard].append((section, key)) + except NotFound: + # no keys of this type exist + continue + except HttpError as e: + log.error('Error listing metadata for section %s: %s', + section, e) + raise + + def generate_work(self): + return self.metadata_by_shard.iteritems() diff --git a/radosgw_agent/tests/__init__.py b/radosgw_agent/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/radosgw_agent/tests/conftest.py b/radosgw_agent/tests/conftest.py new file mode 100644 index 0000000..8e9caf6 --- /dev/null +++ b/radosgw_agent/tests/conftest.py @@ -0,0 +1,33 @@ +import logging +import sys + +# this console logging configuration is basically just to be able to see output in +# tests, and this file gets executed by py.test when it runs, so we get that for free. + +# Console Logger +sh = logging.StreamHandler() +sh.setLevel(logging.WARNING) + +formatter = logging.Formatter( + fmt='%(asctime)s.%(msecs)03d %(process)d:%(levelname)s:%(name)s:%(message)s', + datefmt='%Y-%m-%dT%H:%M:%S', + ) +sh.setFormatter(formatter) + + +# because we're in a module already, __name__ is not the ancestor of +# the rest of the package; use the root as the logger for everyone +root_logger = logging.getLogger() + +# allow all levels at root_logger, handlers control individual levels +root_logger.setLevel(logging.DEBUG) +root_logger.addHandler(sh) + +console_loglevel = logging.DEBUG # start at DEBUG for now + +# Console Logger +sh.setLevel(console_loglevel) + + + + diff --git a/radosgw_agent/tests/test_client.py b/radosgw_agent/tests/test_client.py new file mode 100644 index 0000000..7e24dc1 --- /dev/null +++ b/radosgw_agent/tests/test_client.py @@ -0,0 +1,672 @@ +import boto +import py.test +from mock import Mock +import httpretty +import re + +from radosgw_agent import client +from radosgw_agent import exceptions as exc +from radosgw_agent.constants import DEFAULT_TIME + +# parametrization helpers + +def endpoints(): + return [ + ('http://example.org', 'example.org', 80, False), + ('https://example.org', 'example.org', 443, True), + ('https://example.org:8080', 'example.org', 8080, True), + ('https://example.org:8080/', 'example.org', 8080, True), + ('http://example.org:81/a/b/c?b#d', 'example.org', 81, False), + ] + + +REGION_MAP = { + "regions": [ + { + "val": { + "zones": [ + { + "endpoints": [ + "http://vit:8001/" + ], + "log_data": "true", + "log_meta": "true", + "name": "skinny-1" + }, + { + "endpoints": [ + "http://vit:8002/" + ], + "log_data": "false", + "log_meta": "false", + "name": "skinny-2" + } + ], + "name": "skinny", + "default_placement": "", + "master_zone": "skinny-1", + "api_name": "slim", + "placement_targets": [], + "is_master": "true", + "endpoints": [ + "http://skinny:80/" + ] + }, + "key": "skinny" + }, + { + "val": { + "zones": [ + { + "endpoints": [ + "http://vit:8003/" + ], + "log_data": "false", + "log_meta": "false", + "name": "swab-2" + }, + { + "endpoints": [ + "http://vit:8004/" + ], + "log_data": "false", + "log_meta": "false", + "name": "swab-3" + }, + { + "endpoints": [ + "http://vit:8000/" + ], + "log_data": "true", + "log_meta": "true", + "name": "swab-1" + } + ], + "name": "swab", + "default_placement": "", + "master_zone": "swab-1", + "api_name": "shady", + "placement_targets": [], + "is_master": "false", + "endpoints": [ + "http://vit:8000/" + ] + }, + "key": "swab" + }, + { + "val": { + "zones": [ + { + "endpoints": [ + "http://ro:80/" + ], + "log_data": "false", + "log_meta": "false", + "name": "ro-1" + }, + { + "endpoints": [ + "http://ro:8080/" + ], + "log_data": "false", + "log_meta": "false", + "name": "ro-2" + }, + ], + "name": "readonly", + "default_placement": "", + "master_zone": "ro-1", + "api_name": "readonly", + "placement_targets": [], + "is_master": "false", + "endpoints": [ + "http://ro:80/", + "http://ro:8080/" + ] + }, + "key": "readonly" + }, + { + "val": { + "zones": [ + { + "endpoints": [ + "http://meta:80/" + ], + "log_data": "false", + "log_meta": "true", + "name": "meta-1" + }, + { + "endpoints": [ + "http://meta:8080/" + ], + "log_data": "false", + "log_meta": "false", + "name": "meta-2" + }, + ], + "name": "metaonly", + "default_placement": "", + "master_zone": "meta-1", + "api_name": "metaonly", + "placement_targets": [], + "is_master": "false", + "endpoints": [ + "http://meta:80/", + "http://meta:8080/" + ] + }, + "key": "metaonly" + } + ], + "master_region": "skinny" + } + +def test_endpoint_default_port(): + endpoint = client.Endpoint('example.org', None, True) + assert endpoint.port == 443 + endpoint = client.Endpoint('example.org', None, False) + assert endpoint.port == 80 + +def test_endpoint_port_specified(): + endpoint = client.Endpoint('example.org', 80, True) + assert endpoint.port == 80 + endpoint = client.Endpoint('example.org', 443, True) + assert endpoint.port == 443 + +def test_endpoint_equality(): + default_port = client.Endpoint('a.org', None, True) + secure = client.Endpoint('a.org', 443, True) + insecure = client.Endpoint('a.org', 80, False) + assert default_port == secure + assert secure == insecure + assert insecure == default_port + +def test_endpoint_inequality(): + base = client.Endpoint('a.org', 80, True) + diff_host = client.Endpoint('b.org', 80, True) + diff_port = client.Endpoint('a.org', 81, True) + insecure = client.Endpoint('a.org', 8080, False) + assert base != diff_host + assert base != diff_port + assert base != insecure + + +@py.test.mark.parametrize('url, host, port, secure', endpoints()) +def test_parse_endpoint(url, host, port, secure): + endpoint = client.parse_endpoint(url) + assert endpoint.port == port + assert endpoint.host == host + assert endpoint.secure == secure + + +@py.test.mark.parametrize('url, host, port, secure', endpoints()) +def test_parse_repr(url, host, port, secure): + endpoint = repr(client.parse_endpoint(url)) + assert str(secure) in endpoint + assert str(host) in endpoint + assert str(port) in endpoint + + +def test_parse_endpoint_bad_input(): + with py.test.raises(exc.InvalidProtocol): + client.parse_endpoint('ftp://example.com') + with py.test.raises(exc.InvalidHost): + client.parse_endpoint('http://:80/') + +def _test_configure_endpoints(dest_url, dest_region, dest_zone, + expected_src_url, expected_src_region, + expected_src_zone, specified_src_url=None, + meta_only=False): + dest = client.parse_endpoint(dest_url) + if specified_src_url is not None: + src = client.parse_endpoint(specified_src_url) + else: + src = client.Endpoint(None, None, None) + region_map = client.RegionMap(REGION_MAP) + client.configure_endpoints(region_map, dest, src, meta_only) + assert dest.region.name == dest_region + assert dest.zone.name == dest_zone + assert src == client.parse_endpoint(expected_src_url) + assert src.region.name == expected_src_region + assert src.zone.name == expected_src_zone + +def test_configure_endpoints_2nd_region_master_zone_meta(): + _test_configure_endpoints('http://vit:8000', 'swab', 'swab-1', + 'http://vit:8001', 'skinny', 'skinny-1', + meta_only=True) + +def test_configure_endpoints_2nd_region_master_zone_data(): + with py.test.raises(exc.InvalidZone): + _test_configure_endpoints('http://vit:8000', 'swab', 'swab-1', + 'http://vit:8001', 'skinny', 'skinny-1', + meta_only=False) + +def test_configure_endpoints_master_region_2nd_zone(): + _test_configure_endpoints('http://vit:8002', 'skinny', 'skinny-2', + 'http://vit:8001', 'skinny', 'skinny-1') + +def test_configure_endpoints_2nd_region_2nd_zone(): + _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2', + 'http://vit:8000', 'swab', 'swab-1') + +def test_configure_endpoints_2nd_region_readonly_meta(): + _test_configure_endpoints('http://ro:8080', 'readonly', 'ro-2', + 'http://vit:8001', 'skinny', 'skinny-1', + meta_only=True) + +def test_configure_endpoints_2nd_region_readonly_data(): + with py.test.raises(exc.InvalidZone): + _test_configure_endpoints('http://ro:8080', 'readonly', 'ro-2', + 'http://vit:8001', 'skinny', 'skinny-1', + meta_only=False) + +def test_configure_endpoints_2nd_region_metaonly_meta(): + _test_configure_endpoints('http://meta:8080', 'metaonly', 'meta-2', + 'http://meta:80', 'metaonly', 'meta-1', + meta_only=True) + +def test_configure_endpoints_2nd_region_metaonly_data(): + with py.test.raises(exc.InvalidZone): + _test_configure_endpoints('http://meta:8080', 'metaonly', 'meta-2', + 'http://vit:8001', 'skinny', 'skinny-1', + meta_only=False) + +def test_configure_endpoints_master_region_master_zone(): + with py.test.raises(exc.InvalidZone): + _test_configure_endpoints('http://vit:8001', 'skinny', 'skinny-1', + 'http://vit:8001', 'skinny', 'skinny-1') + +def test_configure_endpoints_specified_src_same_region(): + _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2', + 'http://vit:8000', 'swab', 'swab-1', + 'http://vit:8000') + +def test_configure_endpoints_specified_src_master_region_meta(): + _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2', + 'http://vit:8001', 'skinny', 'skinny-1', + 'http://vit:8001', meta_only=True) + +def test_configure_endpoints_specified_src_master_region_data(): + with py.test.raises(exc.InvalidZone): + _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2', + 'http://vit:8001', 'skinny', 'skinny-1', + 'http://vit:8001', meta_only=False) + +def test_configure_endpoints_bad_src_same_region(): + with py.test.raises(exc.InvalidZone): + _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2', + 'http://vit:8004', 'swab', 'swab-3', + 'http://vit:8004') + +def test_configure_endpoints_bad_src_master_region(): + with py.test.raises(exc.InvalidZone): + _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2', + 'http://vit:8002', 'skinny', 'skinny-2', + 'http://vit:8002') + +def test_configure_endpoints_bad_src_same_zone(): + with py.test.raises(exc.InvalidZone): + _test_configure_endpoints('http://vit:8000', 'swab', 'swab-1', + 'http://vit:8000', 'swab', 'swab-1', + 'http://vit:8000') + +def test_configure_endpoints_specified_nonexistent_src(): + with py.test.raises(exc.ZoneNotFound): + _test_configure_endpoints('http://vit:8005', 'skinny', 'skinny-1', + 'http://vit:8001', 'skinny', 'skinny-1', + 'http://vit:80') + +def test_configure_endpoints_unknown_zone(): + with py.test.raises(exc.ZoneNotFound): + _test_configure_endpoints('http://vit:8005', 'skinny', 'skinny-1', + 'http://vit:8001', 'skinny', 'skinny-1') + +def http_invalid_status_codes(): + return [ + 101, 102, 300, 301, 302, 303, 304, 305, 306, 307, 308, + ] + +def http_valid_status_codes(): + return [ + 200, 201, 202, 203, 204, 205, 207, 208, 226, + ] + +class TestCheckResultStatus(object): + + @py.test.mark.parametrize('code', http_invalid_status_codes()) + def test_check_raises_http_error(self, code): + response = Mock() + response.status = code + with py.test.raises(exc.HttpError): + client.check_result_status(response) + + @py.test.mark.parametrize('code', http_valid_status_codes()) + def test_check_does_not_raise_http_error(self, code): + response = Mock() + response.status = code + assert client.check_result_status(response) is None + + + def test_check_raises_not_found(self): + response = Mock() + response.status = 404 + with py.test.raises(exc.NotFound): + client.check_result_status(response) + + +class TestBotoCall(object): + + def test_return_val(self): + @client.boto_call + def foo(*args, **kwargs): + return (args, kwargs) + assert foo(1) == ((1,), {}) + assert foo(b=2) == (tuple(), {'b': 2}) + + def test_boto_exception_not_found(self): + @client.boto_call + def foo(): + raise boto.exception.S3ResponseError(404, '') + + with py.test.raises(exc.NotFound): + foo() + + def test_non_boto_exception(self): + @client.boto_call + def foo(): + raise ValueError('') + + with py.test.raises(ValueError): + foo() + + +class TestRequest(object): + + @httpretty.activate + def test_url(self): + + httpretty.register_uri( + httpretty.GET, + re.compile("http://localhost:8888/(.*)"), + body='{}', + content_type="application/json", + ) + connection = client.connection( + client.Endpoint('localhost', 8888, False, 'key', 'secret'), + True, + ) + + client.request(connection, 'get', '/%7E~', _retries=0) + server_request = httpretty.last_request() + assert server_request.path == '/%257E%7E' + + @httpretty.activate + def test_url_response(self): + + httpretty.register_uri( + httpretty.GET, + re.compile("http://localhost:8888/(.*)"), + body='{"msg": "ok"}', + content_type="application/json", + ) + connection = client.connection( + client.Endpoint('localhost', 8888, False, 'key', 'secret'), + True, + ) + + result = client.request(connection, 'get', '/%7E~', _retries=0) + assert result == {'msg': 'ok'} + + @httpretty.activate + def test_url_bad(self): + + httpretty.register_uri( + httpretty.GET, + re.compile("http://localhost:8888/(.*)"), + body='{}', + content_type="application/json", + status=500, + ) + connection = client.connection( + client.Endpoint('localhost', 8888, False, 'key', 'secret'), + True, + ) + + with py.test.raises(exc.HttpError): + client.request(connection, 'get', '/%7E~', _retries=0) + + +class TestBotoCall(object): + + def test_return_val(self): + @client.boto_call + def foo(*args, **kwargs): + return (args, kwargs) + assert foo(1) == ((1,), {}) + assert foo(b=2) == (tuple(), {'b': 2}) + + def test_boto_exception_not_found(self): + @client.boto_call + def foo(): + raise boto.exception.S3ResponseError(404, '') + + with py.test.raises(exc.NotFound): + foo() + + def test_non_boto_exception(self): + @client.boto_call + def foo(): + raise ValueError('') + + with py.test.raises(ValueError): + foo() + + +class TestGETClientRequestsPaths(object): + + def setup(self): + self.connection = client.connection( + client.Endpoint('localhost', 8888, False, 'key', 'secret'), + True, + ) + + def register(self, body=None): + body = body or '{}' + httpretty.register_uri( + httpretty.GET, + re.compile("http://localhost:8888/(.*)"), + body=body, + content_type="application/json", + ) + + @httpretty.activate + def test_get_metadata(self): + self.register() + client.get_metadata(self.connection, 'bucket.instance', 'foo') + server_request = httpretty.last_request() + assert server_request.path == '/admin/metadata/bucket.instance?key=foo' + + @httpretty.activate + def test_get_metadata_no_re_encoding(self): + self.register() + client.get_metadata(self.connection, 'bucket.instance', 'mybar:r0z0.4140.1') + server_request = httpretty.last_request() + assert server_request.path == '/admin/metadata/bucket.instance?key=mybar%3Ar0z0.4140.1' + + @httpretty.activate + def test_get_metadata_sections(self): + self.register() + client.get_metadata_sections(self.connection) + server_request = httpretty.last_request() + assert server_request.path == '/admin/metadata' + + @httpretty.activate + def test_list_metadata_keys(self): + self.register() + client.list_metadata_keys(self.connection, 'foo') + server_request = httpretty.last_request() + assert server_request.path == '/admin/metadata/foo' + + @httpretty.activate + def test_get_bucket_list(self): + self.register() + client.get_bucket_list(self.connection) + server_request = httpretty.last_request() + assert server_request.path == '/admin/metadata/bucket' + + @httpretty.activate + def test_url_response(self): + + httpretty.register_uri( + httpretty.GET, + re.compile("http://localhost:8888/(.*)"), + body='{"msg": "ok"}', + content_type="application/json", + ) + result = client.request(self.connection, 'get', '/%7E~') + assert result == {'msg': 'ok'} + + +class TestClientListObjectsInBucket(object): + + def setup(self): + self.connection = client.connection( + client.Endpoint('localhost', 8888, False, 'key', 'secret'), + True, + ) + self.body = """ + [ + { + "name": "mahobject/", + "etag": "d41d8cd98f00b204e9800998ecf8427e", + "content_type": "application/octet-stream", + "last_modified": "2015-01-15T15:24:42.000Z", + "storage_class": "STANDARD", + "owner": { + "display_name": "client1-system-user", + "id": "client1-system-user" + } + } + ] + """ + + def register(self, body=None): + body = body or self.body + httpretty.register_uri( + httpretty.GET, + re.compile("http://localhost:8888/(.*)"), + body=body, + content_type="application/json", + ) + + @httpretty.activate + def test_get_bucket_is_a_single_item(self): + self.register() + result = client.get_bucket_list(self.connection) + assert len(result) == 1 + + @httpretty.activate + def test_get_bucket_has_right_metadata(self): + self.register() + result = client.get_bucket_list(self.connection) + obj = result[0] + owner = { + "display_name": "client1-system-user", + "id": "client1-system-user" + } + assert obj['name'] == 'mahobject/' + assert obj['etag'] == 'd41d8cd98f00b204e9800998ecf8427e' + assert obj['content_type'] == 'application/octet-stream' + assert obj['last_modified'] == '2015-01-15T15:24:42.000Z' + assert obj['storage_class'] == 'STANDARD' + assert obj['owner'] == owner + + +class TestClientGetWorkerBound(object): + + def setup(self): + self.connection = client.connection( + client.Endpoint('localhost', 8888, False, 'key', 'secret'), + True, + ) + self.body = """ + {"marker": "00000000002.2.3", + "markers": [ + { + "entity": "radosgw-agent", + "items_in_progress": [ + { + "name": "hello", + "timestamp": "0.000000" + } + ], + "position_marker": "00000000002.2.3", + "position_time": "0.000000" + } + ], + "oldest_time": "0.000000" + } + """ + + def register(self, body=None, status=200): + body = body or self.body + httpretty.register_uri( + httpretty.GET, + re.compile("http://localhost:8888/(.*)"), + body=body, + content_type="application/json", + status=status + ) + + @httpretty.activate + def test_get_bound_has_right_metadata(self): + self.register() + result = client.get_worker_bound( + self.connection, + 'bucket-index', + 'beast:us-east' + ) + assert result['marker'] == "00000000002.2.3" + assert result['retries'] == set(['hello']) + assert result['oldest_time'] == "0.000000" + + @httpretty.activate + def test_get_bound_fails_fallsback_to_defaults(self): + self.register(status=404) + result = client.get_worker_bound( + self.connection, + 'bucket-index', + 'beast:us-east' + ) + assert result['marker'] == " " + assert result['retries'] == [] + assert result['oldest_time'] == DEFAULT_TIME + + +class TestIsVersioned(object): + + def setup(self): + # set strict attributes in the mock + self.obj = Mock(spec=object) + + def test_is_in_fact_versioned(self): + self.obj.VersionedEpoch = u'1' + self.obj.version_id = 'somehashvalue' + assert client.is_versioned(self.obj) is True + + def test_is_not_versioned_no_attr_versioned_epoch(self): + assert client.is_versioned(self.obj) is False + + def test_is_not_versioned_no_attr_version_id(self): + assert client.is_versioned(self.obj) is False + + def test_is_versioned_version_id(self): + self.obj.version_id = 1 + assert client.is_versioned(self.obj) is True + + def test_is_not_versioned_versioned_id_is_none(self): + self.obj.version_id = None + assert client.is_versioned(self.obj) is False diff --git a/radosgw_agent/tests/test_worker.py b/radosgw_agent/tests/test_worker.py new file mode 100644 index 0000000..9194ce6 --- /dev/null +++ b/radosgw_agent/tests/test_worker.py @@ -0,0 +1,209 @@ +from mock import Mock, patch +import json +import py.test +import time +import httpretty +import re + +from radosgw_agent import worker, client +from radosgw_agent.exceptions import HttpError, NotFound, BucketEmpty + + +class TestSyncObject(object): + + def setup(self): + # setup the fake client, but get the exceptions back into place + self.client = Mock() + self.client.HttpError = HttpError + self.client.NotFound = NotFound + + self.src = Mock() + self.src.zone.name = 'Zone Name' + self.src.host = 'example.com' + self.obj = Mock() + self.obj.name = 'mah-object' + + def test_syncs_correctly(self): + with patch('radosgw_agent.worker.client'): + w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1) + assert w.sync_object('mah-bucket', self.obj) is True + + def test_syncs_not_found_on_master_deleting_from_secondary(self): + self.client.sync_object_intra_region = Mock(side_effect=NotFound(404, '')) + + with patch('radosgw_agent.worker.client', self.client): + w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1) + w.wait_for_object = lambda *a: None + assert w.sync_object('mah-bucket', self.obj) is True + + def test_syncs_deletes_from_secondary(self): + self.client.sync_object_intra_region = Mock(side_effect=NotFound(404, '')) + self.client.delete_object = Mock(side_effect=NotFound(404, '')) + + with patch('radosgw_agent.worker.client', self.client): + w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1) + w.wait_for_object = lambda *a: None + assert w.sync_object('mah-bucket', self.obj) is False + + def test_syncs_could_not_delete_from_secondary(self): + self.client.sync_object_intra_region = Mock(side_effect=NotFound(404, '')) + self.client.delete_object = Mock(side_effect=ValueError('unexpected error')) + + with patch('radosgw_agent.worker.client', self.client): + w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1) + w.wait_for_object = lambda *a: None + + with py.test.raises(worker.SyncFailed): + w.sync_object('mah-bucket', self.obj) + + def test_syncs_encounters_a_http_error(self): + self.client.sync_object_intra_region = Mock(side_effect=HttpError(400, '')) + + with patch('radosgw_agent.worker.client', self.client): + w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1) + w.wait_for_object = lambda *a: None + w.sync_object('mah-bucket', self.obj) + + def test_sync_client_raises_sync_failed(self): + self.client.sync_object_intra_region = Mock(side_effect=worker.SyncFailed('failed intra region')) + + with patch('radosgw_agent.worker.client', self.client): + w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1) + + with py.test.raises(worker.SyncFailed) as exc: + w.sync_object('mah-bucket', self.obj) + + exc_message = exc.value[0] + assert 'failed intra region' in exc_message + + def test_fails_to_remove_op_state(self, capsys): + # really tricky to test this one, we are forced to just use `capsys` from py.test + # which will allow us to check into the stderr logging output and see if the agent + # was spitting what we are expecting. + self.client.remove_op_state = Mock(side_effect=ValueError('could not remove op')) + + with patch('radosgw_agent.worker.client', self.client): + w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1) + w.wait_for_object = lambda *a: None + assert w.sync_object('mah-bucket', self.obj) is True + # logging does not play nice and so we are forced to comment this out. + # this test does test the right thing, but we are unable to have a nice + # assertion, the fix here is not the test it is the code that needs to + # improve. For now, this just + # gives us the coverage. + # out, err = capsys.readouterr() + # assert 'could not remove op state' in out + # assert 'could not remove op state' in err + + def test_fails_to_do_anything_fallsback_to_wait_for_object(self): + self.client.sync_object_intra_region = Mock(side_effect=ValueError('severe error')) + + with patch('radosgw_agent.worker.client', self.client): + w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1) + w.wait_for_object = lambda *a: None + assert w.sync_object('mah-bucket', self.obj) is True + + def test_wait_for_object_state_not_found_raises_sync_failed(self): + self.client.get_op_state = Mock(side_effect=NotFound(404, '')) + with patch('radosgw_agent.worker.client', self.client): + w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1) + with py.test.raises(worker.SyncFailed) as exc: + w.wait_for_object(None, None, time.time() + 1000, None) + + exc_message = exc.exconly() + assert 'state not found' in exc_message + + def test_wait_for_object_timeout(self): + msg = 'should not have called get_op_state' + self.client.get_op_state = Mock(side_effect=AssertionError(msg)) + with patch('radosgw_agent.worker.client', self.client): + w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1) + with py.test.raises(worker.SyncTimedOut) as exc: + w.wait_for_object(None, None, time.time() - 1, None) + + def test_wait_for_object_state_complete(self): + with patch('radosgw_agent.worker.client', self.client): + w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1) + self.client.get_op_state = lambda *a: [{'state': 'complete'}] + assert w.wait_for_object(None, None, time.time() + 1, None) is None + + def test_wait_for_object_state_error(self): + with patch('radosgw_agent.worker.client', self.client): + w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1) + self.client.get_op_state = lambda *a: [{'state': 'error'}] + with py.test.raises(worker.SyncFailed) as exc: + w.wait_for_object(None, None, time.time() + 1, None) + + exc_message = exc.exconly() + assert 'state is error' in exc_message + + def test_sync_bucket_delayed_not_found(self): + class fake_iterable(object): + def __iter__(self): + raise BucketEmpty + with patch('radosgw_agent.worker.client', self.client): + w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1) + w.sync_object = lambda *a: None + objects = fake_iterable() + with py.test.raises(BucketEmpty): + w.sync_bucket('foo', objects) + + + +def create_fake_endpoint(name='source', **kw): + ep = Mock() + ep.zone.name = name + ep.secret_key = kw.get('secret', 'secret') + ep.access_key = kw.get('access', 'access') + ep.port = kw.get('port', 7777) + ep.host = kw.get('host', 'localhost') + ep.debug = kw.get('debug', True) + return ep + +def create_log_entry(name='foo', **kw): + return { + "op_id": kw.get('op_id',"00000000006.3741.3"), + "op_tag": kw.get('op_tag', "default.21007.10"), + "op": kw.get('op', "link_olh"), + "object": name, + "instance": kw.get('instance',"uWueo6N+Hm6Sp86OdxfnfDHfUKRy\/gOu"), + "state": kw.get('state', "complete"), + "index_ver": kw.get('index_ver', 6), + "timestamp": kw.get('timestamp', "2015-01-07 00:21:41.000000Z"), + "ver": kw.get('ver', { "pool": 12, "epoch": 20818}), + "versioned": kw.get('versioned', True), + } + + +class TestDataWorkerIncremental(object): + + def setup(self): + self.w = worker.DataWorkerIncremental( + None, None, None, create_fake_endpoint(), + create_fake_endpoint('dest'), daemon_id=1, max_entries=10 + ) + + def register(self, src_body=None, dest_body=None, status=200): + + httpretty.register_uri( + httpretty.GET, + re.compile("http://localhost:7777/admin/log(.*)"), + body=src_body or "{}", + content_type="application/json", + status=status + ) + httpretty.register_uri( + httpretty.GET, + re.compile("http://localhost:8888/admin/log(.*)"), + body=dest_body or "{}", + content_type="application/json", + status=status + ) + + @httpretty.activate + def test_items_from_source_only(self): + src_body = json.dumps([create_log_entry('foo_1')]) + self.register(src_body=src_body) + marker, entries = self.w.get_bucket_instance_entries(2, 'bucket') + assert marker == '00000000006.3741.3' + assert len(entries) == 1 diff --git a/radosgw_agent/tests/util/test_configuration.py b/radosgw_agent/tests/util/test_configuration.py new file mode 100644 index 0000000..ca5dc88 --- /dev/null +++ b/radosgw_agent/tests/util/test_configuration.py @@ -0,0 +1,64 @@ +import pytest +from radosgw_agent.util import configuration + + +@pytest.fixture +def conf(): + return configuration.Configuration() + + +class TestConfiguration(object): + + def test_set_new_keys(self, conf): + conf['key'] = 1 + assert conf['key'] == 1 + + def test_not_allowed_to_change_value(self, conf): + conf['key'] = 1 + with pytest.raises(TypeError): + conf['key'] = 2 + + def test_not_allowed_to_pop_existing_key(self, conf): + conf['key'] = 1 + with pytest.raises(TypeError): + conf.pop('key') + + def test_keyerror_when_popping(self, conf): + with pytest.raises(KeyError): + conf.pop('key') + + def test_adding_nested_values(self, conf): + conf['key'] = {} + conf['key']['bar'] = 1 + assert conf['key']['bar'] == 1 + + def test_modifiying_nested_values_fails(self, conf): + conf['key'] = {} + conf['key']['bar'] = 1 + with pytest.raises(TypeError): + conf['key']['bar'] = 2 + + def test_initial_dict_seeding(self): + my_dict = {'a': 1} + conf = configuration.Configuration(my_dict) + assert conf['a'] == 1 + + def test_initial_dict_seeding_doesnt_allow_updates(self): + my_dict = {'a': 1} + conf = configuration.Configuration(my_dict) + with pytest.raises(TypeError): + conf['a'] = 2 + + def test_assign_a_new_key_to_a_dict(self, conf): + my_dict = {'a': 1} + conf['args'] = my_dict + assert conf['args']['a'] == 1 + + def test_contains_element(self, conf): + exists = False + try: + if 'key' in conf: + exists = True + except KeyError: + assert False, "dict object should support 'contains' operations" + assert exists is False diff --git a/radosgw_agent/tests/util/test_obj.py b/radosgw_agent/tests/util/test_obj.py new file mode 100644 index 0000000..e3e3575 --- /dev/null +++ b/radosgw_agent/tests/util/test_obj.py @@ -0,0 +1,40 @@ +from radosgw_agent.util import obj + + +class Empty(object): + + def __init__(self, **kw): + for k, v in kw.items(): + setattr(self, k, v) + + +class TestToDict(object): + + def test_underscores_are_ignored(self): + fake = Empty(a=1, _b=2) + result = obj.to_dict(fake) + assert result.get('_b') is None + assert result.get('a') == 1 + + def test_overrides_are_respected(self): + fake = Empty(a=1, b=2) + result = obj.to_dict(fake, b=3) + assert result.get('b') == 3 + + def test_overrides_dont_mess_up_other_keys(self): + fake = Empty(a=1, b=2) + result = obj.to_dict(fake, b=3) + assert result.get('a') == 1 + + def test_extra_keys_are_set(self): + result = obj.to_dict(Empty(), a=1, b=2) + assert result['a'] == 1 + assert result['b'] == 2 + + +class TestKeysToAttribute(object): + + def test_replace_dashes(self): + dictionary = {'dashed-word': 1} + result = obj.to_obj(dictionary) + assert result.dashed_word == 1 diff --git a/radosgw_agent/util/__init__.py b/radosgw_agent/util/__init__.py new file mode 100644 index 0000000..64b7d5f --- /dev/null +++ b/radosgw_agent/util/__init__.py @@ -0,0 +1,2 @@ +import log +from log import get_dev_logger diff --git a/radosgw_agent/util/configuration.py b/radosgw_agent/util/configuration.py new file mode 100644 index 0000000..d0661f4 --- /dev/null +++ b/radosgw_agent/util/configuration.py @@ -0,0 +1,77 @@ + + +class Configuration(object): + """ + An immutable dictionary where values set for the first time are allowed and + existing keys raise a TypeError exception + + Even though there is effort made into making it immutable, a consumer can + force its way through by accessing the private `dict` method that contains + the values, although it defeats the purpose it is exposed in that way in + the event that something needs to change. + + All normal methods and operations should be supported. + """ + + def __init__(self, seed=None): + if seed and isinstance(seed, dict): + self._dict = seed + else: + self._dict = {} + + def __str__(self): + return str(self._dict) + + def pop(self, key, default=None): + try: + self._dict[key] + except KeyError: + raise + else: + self._default_error() + + def popitem(self): + self._default_error() + + def update(self): + self._default_error() + + def clear(self): + self._default_error() + + def values(self): + return self._dict.values() + + def keys(self): + return self._dict.keys() + + def items(self): + return self._dict.items() + + def get(self, key, default=None): + return self._dict.get(key, default) + + def _default_error(self): + msg = 'config object does not allow key changes' + raise TypeError(msg) + + def __setitem__(self, key, value): + try: + self._dict[key] + except KeyError: + if isinstance(value, dict): + self._dict[key] = Configuration(value) + else: + self._dict[key] = value + else: + self._default_error() + + def __getitem__(self, key): + return self._dict[key] + + def __contains__(self, key): + try: + self._dict[key] + return True + except KeyError: + return False diff --git a/radosgw_agent/util/decorators.py b/radosgw_agent/util/decorators.py new file mode 100644 index 0000000..2923cf2 --- /dev/null +++ b/radosgw_agent/util/decorators.py @@ -0,0 +1,112 @@ +import logging +import sys +import traceback +from functools import wraps + + +def catches(catch=None, handler=None, exit=True, handle_all=False): + """ + Very simple decorator that tries any of the exception(s) passed in as + a single exception class or tuple (containing multiple ones) returning the + exception message and optionally handling the problem if it raises with the + handler if it is provided. + + So instead of doing something like this:: + + def bar(): + try: + some_call() + print "Success!" + except TypeError, exc: + print "Error while handling some call: %s" % exc + sys.exit(1) + + You would need to decorate it like this to have the same effect:: + + @catches(TypeError) + def bar(): + some_call() + print "Success!" + + If multiple exceptions need to be caught they need to be provided as a + tuple:: + + @catches((TypeError, AttributeError)) + def bar(): + some_call() + print "Success!" + + If adding a handler, it should accept a single argument, which would be the + exception that was raised, it would look like:: + + def my_handler(exc): + print 'Handling exception %s' % str(exc) + raise SystemExit + + @catches(KeyboardInterrupt, handler=my_handler) + def bar(): + some_call() + + Note that the handler needs to raise its SystemExit if it wants to halt + execution, otherwise the decorator would continue as a normal try/except + block. + + + :param catch: A tuple with one (or more) Exceptions to catch + :param handler: Optional handler to have custom handling of exceptions + :param exit: Raise a ``SystemExit`` after handling exceptions + :param handle_all: Handle all other exceptions via logging. + """ + catch = catch or Exception + logger = logging.getLogger('radosgw_agent') + + def decorate(f): + + @wraps(f) + def newfunc(*a, **kw): + exit_from_catch = False + try: + return f(*a, **kw) + except catch as e: + if handler: + return handler(e) + else: + logger.error(make_exception_message(e)) + + if exit: + exit_from_catch = True + sys.exit(1) + except Exception: # anything else, no need to save the exception as a variable + if handle_all is False: # re-raise if we are not supposed to handle everything + raise + # Make sure we don't spit double tracebacks if we are raising + # SystemExit from the `except catch` block + + if exit_from_catch: + sys.exit(1) + + str_failure = traceback.format_exc() + for line in str_failure.split('\n'): + logger.error(line) + sys.exit(1) + + return newfunc + + return decorate + +# +# Decorator helpers +# + + +def make_exception_message(exc): + """ + An exception is passed in and this function + returns the proper string depending on the result + so it is readable enough. + """ + if str(exc): + return '%s: %s\n' % (exc.__class__.__name__, exc) + else: + return '%s\n' % (exc.__class__.__name__) + diff --git a/radosgw_agent/util/log.py b/radosgw_agent/util/log.py new file mode 100644 index 0000000..32094ea --- /dev/null +++ b/radosgw_agent/util/log.py @@ -0,0 +1,89 @@ +import logging +import sys + + +def get_dev_logger(name='dev.radosgw_agent'): + """ + A simple utility to be able to log things that are meant for developer-eyes + and not for user facing. + + All developer logs must be prepended with `dev` so this utility ensures + that is the case. To use it:: + + dev_log = get_dev_logger(__name__) + + Or:: + + dev_log = get_dev_logger('dev.custom_name') + """ + if not name.startswith('dev'): + return logging.getLogger('%s.%s' % ('dev', name)) + return logging.getLogger(name) + + +BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8) + +COLORS = { + 'WARNING': YELLOW, + 'INFO': WHITE, + 'DEBUG': BLUE, + 'CRITICAL': RED, + 'ERROR': RED, + 'FATAL': RED, +} + +RESET_SEQ = "\033[0m" +COLOR_SEQ = "\033[1;%dm" +BOLD_SEQ = "\033[1m" + +BASE_COLOR_FORMAT = "%(asctime)s %(process)d\ + [$BOLD%(name)s$RESET][%(color_levelname)-17s] %(message)s" + +BASE_FORMAT = "%(asctime)s %(process)d [%(name)s][%(levelname)-6s] %(message)s" + + +def supports_color(): + """ + Returns True if the running system's terminal supports color, and False + otherwise. + """ + unsupported_platform = (sys.platform in ('win32', 'Pocket PC')) + # isatty is not always implemented + is_a_tty = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty() + if unsupported_platform or not is_a_tty: + return False + return True + + +def color_message(message): + message = message.replace("$RESET", RESET_SEQ).replace("$BOLD", BOLD_SEQ) + return message + + +class ColoredFormatter(logging.Formatter): + """ + A very basic logging formatter that not only applies color to the levels of + the ouput but will also truncate the level names so that they do not alter + the visuals of logging when presented on the terminal. + """ + + def __init__(self, msg): + logging.Formatter.__init__(self, msg) + + def format(self, record): + levelname = record.levelname + truncated_level = record.levelname[:6] + levelname_color = COLOR_SEQ % (30 + COLORS[levelname]) + truncated_level + RESET_SEQ + record.color_levelname = levelname_color + return logging.Formatter.format(self, record) + + +def color_format(): + """ + Main entry point to get a colored formatter, it will use the + BASE_FORMAT by default and fall back to no colors if the system + does not support it + """ + str_format = BASE_COLOR_FORMAT if supports_color() else BASE_FORMAT + color_format = color_message(str_format) + return ColoredFormatter(color_format) diff --git a/radosgw_agent/util/obj.py b/radosgw_agent/util/obj.py new file mode 100644 index 0000000..e170fb6 --- /dev/null +++ b/radosgw_agent/util/obj.py @@ -0,0 +1,42 @@ + + +def to_dict(_object, **extra_keys): + """ + A utility to convert an object with attributes to a dictionary with the + optional feature of slapping extra_keys. Because extra_keys can be + optionally set, it is assumed that any keys that clash will get + overwritten. + + Private methods (anything that starts with `_`) are ignored. + """ + dictified_obj = {} + for k, v in _object.__dict__.items(): + if not k.startswith('_'): + # get key + value = extra_keys.pop(k, v) + dictified_obj[k] = value + if extra_keys: + for k, v in extra_keys.items(): + dictified_obj[k] = v + + return dictified_obj + + +def to_obj(dictionary, name="BucketEntry"): + """ + Because some objects are dynamic, we are forced to skip namedtuples + and set the attributes from keys in dictionaries so that accessing them + is easier and compatible with code that accesses them as regular objects. + + .. note: dashes are converted to underscores + """ + class Meta(object): + + def __init__(self, **kw): + for k, v in kw.items(): + k = k.replace('-', '_') + setattr(self, k, v) + + obj_ = Meta(**dictionary) + obj_.__class__.__name__ = name + return obj_ diff --git a/radosgw_agent/util/string.py b/radosgw_agent/util/string.py new file mode 100644 index 0000000..53787d6 --- /dev/null +++ b/radosgw_agent/util/string.py @@ -0,0 +1,14 @@ + +def concatenate(*a, **kw): + """ + helper function to concatenate all arguments with added (optional) + newlines + """ + newline = kw.get('newline', False) + string = '' + for item in a: + if newline: + string += item + '\n' + else: + string += item + return string diff --git a/radosgw_agent/worker.py b/radosgw_agent/worker.py new file mode 100644 index 0000000..b92361c --- /dev/null +++ b/radosgw_agent/worker.py @@ -0,0 +1,549 @@ +from collections import namedtuple +from itertools import ifilter +import logging +import multiprocessing +import os +import socket +import time + +from radosgw_agent import client +from radosgw_agent import lock +from radosgw_agent.util import obj as obj_ +from radosgw_agent.exceptions import SkipShard, SyncError, SyncTimedOut, SyncFailed, NotFound, BucketEmpty +from radosgw_agent.constants import DEFAULT_TIME, RESULT_SUCCESS, RESULT_ERROR + +log = logging.getLogger(__name__) + + +class Worker(multiprocessing.Process): + """sync worker to run in its own process""" + + def __init__(self, work_queue, result_queue, log_lock_time, + src, dest, **kwargs): + super(Worker, self).__init__() + self.src = src + self.dest = dest + self.work_queue = work_queue + self.result_queue = result_queue + self.log_lock_time = log_lock_time + self.lock = None + + self.local_lock_id = socket.gethostname() + ':' + str(os.getpid()) + + # construct the two connection objects + self.src_conn = client.connection(src) + self.dest_conn = client.connection(dest) + + def prepare_lock(self): + assert self.lock is None + self.lock = lock.Lock(self.dest_conn, self.type, self.local_lock_id, + self.log_lock_time, self.dest.zone.name) + self.lock.daemon = True + self.lock.start() + + def lock_shard(self, shard_num): + result = shard_num, [] + try: + self.lock.set_shard(shard_num) + self.lock.acquire() + except NotFound: + # no log means nothing changed this shard yet + self.lock.unset_shard() + self.result_queue.put((RESULT_SUCCESS, result)) + raise SkipShard('no log for shard') + except Exception: + log.warn('error locking shard %d log, ' + ' skipping for now. Traceback: ', + shard_num, exc_info=True) + self.lock.unset_shard() + self.result_queue.put((RESULT_ERROR, result)) + raise SkipShard() + + def unlock_shard(self): + try: + self.lock.release_and_clear() + except lock.LockBroken as e: + log.warn('work may be duplicated: %s', e) + except Exception as e: + log.warn('error unlocking log, continuing anyway ' + 'since lock will timeout. Traceback:', exc_info=True) + + def set_bound(self, key, marker, retries, type_=None): + # api doesn't allow setting a bound with a blank marker + if marker: + if type_ is None: + type_ = self.type + try: + data = [ + obj_.to_dict(item, time=DEFAULT_TIME) for item in retries + ] + client.set_worker_bound(self.dest_conn, + type_, + marker, + DEFAULT_TIME, + self.daemon_id, + key, + data=data) + return RESULT_SUCCESS + except Exception: + log.warn('error setting worker bound for key "%s",' + ' may duplicate some work later. Traceback:', key, + exc_info=True) + return RESULT_ERROR + +MetadataEntry = namedtuple('MetadataEntry', + ['section', 'name', 'marker', 'timestamp']) + + +def _meta_entry_from_json(entry): + return MetadataEntry( + entry['section'], + entry['name'], + entry['id'], + entry['timestamp'], + ) + +BucketIndexEntry = namedtuple('BucketIndexEntry', + [ + 'object', + 'marker', + 'timestamp', + 'op', + 'versioned', + 'ver', + 'name', + # compatibility with boto objects: + 'VersionedEpoch', + 'version_id', + ]) + +BucketVer = namedtuple('BucketVer', + [ + 'epoch', + 'pool', + ]) + + +def _bi_entry_from_json(entry): + ver = entry.get('ver', {}) + entry_ver = BucketVer( + ver.get('epoch'), + ver.get('pool') + ) + + # compatibility with boto objects: + VersionedEpoch = ver.get('epoch') + version_id = entry.get('instance', 'null') + + return BucketIndexEntry( + entry['object'], + entry['op_id'], + entry['timestamp'], + entry.get('op', ''), + entry.get('versioned', False), + entry_ver, + entry['object'], + VersionedEpoch, + version_id, + ) + + +def filter_versioned_objects(entry): + """ + On incremental sync operations, the log may indicate that 'olh' entries, + which should be ignored. So this filter function will check for the + different attributes present in an ``entry`` and return only valid ones. + + This should be backwards compatible with older gateways that return log + entries that don't support versioning. + """ + # do not attempt filtering on non-versioned entries + if not entry.versioned: + return entry + + # writes or delete 'op' values should be ignored + if entry.op not in ['write', 'delete']: + # allowed op states are `link_olh` and `link_olh_del` + return entry + + +class IncrementalMixin(object): + """This defines run() and get_and_process_entries() for incremental sync. + + These are the same for data and metadata sync, so share their + implementation here. + """ + + def run(self): + self.prepare_lock() + while True: + item = self.work_queue.get() + if item is None: + log.info('process %s is done. Exiting', self.ident) + break + + shard_num, (log_entries, retries) = item + + log.info('%s is processing shard number %d', + self.ident, shard_num) + + # first, lock the log + try: + self.lock_shard(shard_num) + except SkipShard: + continue + + result = RESULT_SUCCESS + try: + new_retries = self.sync_entries(log_entries, retries) + except Exception: + log.exception('syncing entries for shard %d failed', + shard_num) + result = RESULT_ERROR + new_retries = [] + + # finally, unlock the log + self.unlock_shard() + self.result_queue.put((result, (shard_num, new_retries))) + log.info('finished processing shard %d', shard_num) + + +class DataWorker(Worker): + + def __init__(self, *args, **kwargs): + super(DataWorker, self).__init__(*args, **kwargs) + self.type = 'data' + self.op_id = 0 + self.object_sync_timeout = kwargs.get('object_sync_timeout', 60 * 60 * 60) + self.daemon_id = kwargs['daemon_id'] + + def sync_object(self, bucket, obj): + log.debug('sync_object %s/%s', bucket, obj.name) + self.op_id += 1 + local_op_id = self.local_lock_id + ':' + str(self.op_id) + found = False + + try: + until = time.time() + self.object_sync_timeout + client.sync_object_intra_region(self.dest_conn, bucket, obj, + self.src.zone.name, + self.daemon_id, + local_op_id) + found = True + except NotFound: + log.debug('"%s/%s" not found on master, deleting from secondary', + bucket, obj) + try: + client.delete_object(self.dest_conn, bucket, obj) + except NotFound: + # Since we were trying to delete the object, just return + return False + except Exception: + msg = 'could not delete "%s/%s" from secondary' % (bucket, obj.name) + log.exception(msg) + raise SyncFailed(msg) + except SyncFailed: + raise + except Exception: + log.warn('encountered an exception during sync', exc_info=True) + # wait for it if the op state is in-progress + self.wait_for_object(bucket, obj, until, local_op_id) + # TODO: clean up old op states + try: + if found: + client.remove_op_state(self.dest_conn, self.daemon_id, + local_op_id, bucket, obj) + except NotFound: + log.debug('op state already gone') + except Exception: + log.exception('could not remove op state for daemon "%s" op_id %s', + self.daemon_id, local_op_id) + + return True + + def wait_for_object(self, bucket, obj, until, local_op_id): + while time.time() < until: + try: + state = client.get_op_state(self.dest_conn, + self.daemon_id, + local_op_id, + bucket, obj) + log.debug('op state is %s', state) + state = state[0]['state'] + if state == 'complete': + return + elif state != 'in-progress': + raise SyncFailed('state is {0}'.format(state)) + time.sleep(1) + except SyncFailed: + raise + except NotFound: + raise SyncFailed('object copy state not found') + except Exception as e: + log.debug('error geting op state: %s', e, exc_info=True) + time.sleep(1) + # timeout expired + raise SyncTimedOut() + + def get_bucket_instance(self, bucket): + metadata = client.get_metadata(self.src_conn, 'bucket', bucket) + return bucket + ':' + metadata['data']['bucket']['bucket_id'] + + def get_bucket(self, bucket_instance): + return bucket_instance.split(':', 1)[0] + + def sync_bucket(self, bucket, objects): + log.info('syncing bucket "%s"', bucket) + retry_objs = [] + count = 0 + for obj in objects: + count += 1 + # sync each object + log.debug('syncing object "%s/%s"', bucket, obj.name), + try: + self.sync_object(bucket, obj) + except SyncError as err: + log.error('failed to sync object %s/%s: %s', + bucket, obj.name, err) + retry_objs.append(obj) + + log.debug('bucket {bucket} has {num_objects} object'.format( + bucket=bucket, num_objects=count)) + if retry_objs: + log.debug('these objects failed to be synced and will be during ' + 'the next incremental sync: %s', retry_objs) + + return retry_objs + + +class DataWorkerIncremental(IncrementalMixin, DataWorker): + + def __init__(self, *args, **kwargs): + super(DataWorkerIncremental, self).__init__(*args, **kwargs) + self.max_entries = kwargs['max_entries'] + + def get_bucket_instance_entries(self, marker, instance): + entries = [] + while True: + try: + log_entries = client.get_log(self.src_conn, 'bucket-index', + marker, self.max_entries, instance) + except NotFound: + log_entries = [] + + log.debug('bucket instance "%s" has %d entries after "%s"', instance, + len(log_entries), marker) + + try: + entries += [_bi_entry_from_json(entry) for entry in log_entries] + except KeyError: + log.error('log missing key is: %s', log_entries) + raise + + if entries: + marker = entries[-1].marker + else: + marker = ' ' + + if len(log_entries) < self.max_entries: + break + return marker, entries + + def inc_sync_bucket_instance(self, instance, marker, timestamp, retries): + max_marker, entries = self.get_bucket_instance_entries(marker, instance) + + # regardless if entries are versioned, make sure we filter them + entries = [i for i in ifilter(filter_versioned_objects, entries)] + + objects = set([entry for entry in entries]) + bucket = self.get_bucket(instance) + new_retries = self.sync_bucket(bucket, objects.union(retries)) + + result = self.set_bound(instance, max_marker, new_retries, + 'bucket-index') + if new_retries: + result = RESULT_ERROR + return result + + def sync_entries(self, log_entries, retries): + try: + bucket_instances = set([entry['key'] for entry in log_entries]) + except KeyError: + log.error('log containing bad key is: %s', log_entries) + raise + + new_retries = [] + for bucket_instance in bucket_instances.union(retries): + bound = client.get_worker_bound( + self.dest_conn, + 'bucket-index', + bucket_instance) + + marker = bound['marker'] + # remap dictionaries to object-like + retries = [obj_.to_obj(i) for i in bound['retries']] + timestamp = bound['oldest_time'] + + try: + sync_result = self.inc_sync_bucket_instance(bucket_instance, + marker, + timestamp, + retries) + except Exception as e: + log.warn('error syncing bucket instance "%s": %s', + bucket_instance, e, exc_info=True) + sync_result = RESULT_ERROR + if sync_result == RESULT_ERROR: + new_retries.append(bucket_instance) + + return new_retries + + +class DataWorkerFull(DataWorker): + + def full_sync_bucket(self, bucket): + try: + instance = self.get_bucket_instance(bucket) + try: + marker = client.get_log_info(self.src_conn, 'bucket-index', + instance)['max_marker'] + except NotFound: + marker = ' ' + log.debug('bucket instance is "%s" with marker %s', instance, marker) + + objects = client.list_objects_in_bucket(self.src_conn, bucket) + retries = self.sync_bucket(bucket, objects) + + result = self.set_bound(instance, marker, retries, 'bucket-index') + return not retries and result == RESULT_SUCCESS + except BucketEmpty: + log.debug('no objects in bucket %s', bucket) + return True + except Exception: + log.exception('error preparing for full sync of bucket "%s"', + bucket) + return False + + def run(self): + self.prepare_lock() + while True: + item = self.work_queue.get() + if item is None: + log.info('No more entries in queue, exiting') + break + + shard_num, buckets = item + + # first, lock the log + try: + self.lock_shard(shard_num) + except SkipShard: + continue + + # attempt to sync each bucket, add to a list to retry + # during incremental sync if sync fails + retry_buckets = [] + for bucket in buckets: + if not self.full_sync_bucket(bucket): + retry_buckets.append(bucket) + + # unlock shard and report buckets to retry during incremental sync + self.unlock_shard() + self.result_queue.put((RESULT_SUCCESS, (shard_num, retry_buckets))) + log.info('finished syncing shard %d', shard_num) + log.info('incremental sync will need to retry buckets: %s', + retry_buckets) + +class MetadataWorker(Worker): + + def __init__(self, *args, **kwargs): + super(MetadataWorker, self).__init__(*args, **kwargs) + self.type = 'metadata' + + def sync_meta(self, section, name): + log.debug('syncing metadata type %s key "%s"', section, name) + try: + metadata = client.get_metadata(self.src_conn, section, name) + except NotFound: + log.debug('%s "%s" not found on master, deleting from secondary', + section, name) + try: + client.delete_metadata(self.dest_conn, section, name) + except NotFound: + # Since this error is handled appropriately, return success + return RESULT_SUCCESS + except Exception as e: + log.warn('error getting metadata for %s "%s": %s', + section, name, e, exc_info=True) + return RESULT_ERROR + else: + try: + client.update_metadata(self.dest_conn, section, name, metadata) + return RESULT_SUCCESS + except Exception as e: + log.warn('error updating metadata for %s "%s": %s', + section, name, e, exc_info=True) + return RESULT_ERROR + +class MetadataWorkerIncremental(IncrementalMixin, MetadataWorker): + + def __init__(self, *args, **kwargs): + super(MetadataWorkerIncremental, self).__init__(*args, **kwargs) + + def sync_entries(self, log_entries, retries): + try: + entries = [_meta_entry_from_json(entry) for entry in log_entries] + except KeyError: + log.error('log containing bad key is: %s', log_entries) + raise + + new_retries = [] + mentioned = set([(entry.section, entry.name) for entry in entries]) + split_retries = [tuple(entry.split('/', 1)) for entry in retries] + for section, name in mentioned.union(split_retries): + sync_result = self.sync_meta(section, name) + if sync_result == RESULT_ERROR: + new_retries.append(section + '/' + name) + + return new_retries + +class MetadataWorkerFull(MetadataWorker): + + def empty_result(self, shard): + return shard, [] + + def run(self): + self.prepare_lock() + while True: + item = self.work_queue.get() + if item is None: + log.info('No more entries in queue, exiting') + break + + log.debug('syncing item "%s"', item) + + shard_num, metadata = item + + # first, lock the log + try: + self.lock_shard(shard_num) + except SkipShard: + continue + + # attempt to sync each bucket, add to a list to retry + # during incremental sync if sync fails + retries = [] + for section, name in metadata: + try: + self.sync_meta(section, name) + except Exception as e: + log.warn('could not sync %s "%s", saving for retry: %s', + section, name, e, exc_info=True) + retries.append(section + '/' + name) + + # unlock shard and report buckets to retry during incremental sync + self.unlock_shard() + self.result_queue.put((RESULT_SUCCESS, (shard_num, retries))) + log.info('finished syncing shard %d', shard_num) + log.info('incremental sync will need to retry items: %s', + retries) diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..43b2434 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,3 @@ +pytest >=2.1.3 +mock >=1.0 +tox >=1.2 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..6b955e0 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +argparse +boto >=2.2.2,<3.0.0 +PyYAML diff --git a/scripts/radosgw-agent b/scripts/radosgw-agent new file mode 100644 index 0000000..e59d143 --- /dev/null +++ b/scripts/radosgw-agent @@ -0,0 +1,21 @@ +#!/usr/bin/env python +import os +import platform +import sys +""" +radosgw-agent - admin tool for ceph +""" + +if os.path.exists('/usr/share/pyshared/radosgw_agent'): + sys.path.insert(0,'/usr/share/pyshared/radosgw_agent') +elif os.path.exists('/usr/share/radosgw-agent'): + sys.path.insert(0,'/usr/share/radosgw-agent') +elif os.path.exists('/usr/share/pyshared/radosgw-agent'): + sys.path.insert(0,'/usr/share/pyshared/radosgw-agent') +elif os.path.exists('/usr/lib/python2.6/site-packages/radosgw_agent'): + sys.path.insert(0,'/usr/lib/python2.6/site-packages/radosgw_agent') + +from radosgw_agent.cli import main + +if __name__ == '__main__': + sys.exit(main()) diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..44432e4 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,2 @@ +[bdist_rpm] +requires = python-argparse,PyYAML,python-boto >= 2.2.2,python-boto < 3.0.0 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..b633f43 --- /dev/null +++ b/setup.py @@ -0,0 +1,44 @@ +from setuptools import setup, find_packages +import sys +import re + + +module_file = open("radosgw_agent/__init__.py").read() +metadata = dict( + re.findall(r"__([a-z]+)__\s*=\s*['\"]([^'\"]*)['\"]", module_file)) + + +install_requires = [] +pyversion = sys.version_info[:2] +if pyversion < (2, 7) or (3, 0) <= pyversion <= (3, 1): + install_requires.append('argparse') + +setup( + name='radosgw-agent', + version=metadata['version'], + packages=find_packages(), + + author='Josh Durgin', + author_email='jdurgin@redhat.com', + description='Synchronize users and data between radosgw clusters', + license='MIT', + keywords='radosgw ceph radosgw-agent', + url="https://github.com/ceph/radosgw-agent", + + install_requires=[ + 'setuptools', + 'boto >=2.2.2,<3.0.0', + ] + install_requires, + + test_requires=[ + 'pytest >=2.1.3', + 'mock >=1.0', + 'httpretty', + ], + + entry_points={ + 'console_scripts': [ + 'radosgw-agent = radosgw_agent.cli:main', + ], + }, + ) diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..4126a3b --- /dev/null +++ b/tox.ini @@ -0,0 +1,9 @@ +[tox] +envlist = py26 + +[testenv] +deps= + pytest + mock + httpretty +commands=py.test -s -v {posargs:radosgw_agent/tests} -- 2.47.3