--- /dev/null
+*.py[cod]
+
+# C extensions
+*.so
+
+# Packages
+*.egg
+*.egg-info
+dist
+build
+eggs
+parts
+bin
+var
+sdist
+develop-eggs
+.installed.cfg
+lib
+lib64
+
+# Installer logs
+pip-log.txt
+*.log
+
+# Unit test / coverage reports
+.coverage
+.tox
+nosetests.xml
+
+# Translations
+*.mo
+
+# Mr Developer
+.mr.developer.cfg
+.project
+.pydevproject
--- /dev/null
+Changelog
+=========
+
+1.2.6
+-----
+9-Jun-2016
+* Improve logging when op state is empty when syncing
+* If there is no op state when syncing do not spit out a traceback
+
+1.2.5
+-----
+30-Mar-2016
+* Bump the minimum version of boto required to 2.10.0
+* Fix configuration not found error when using init script to restart
+ radosgw-agent
+
+1.2.4
+-----
+12-Aug-2015
+* Fix invalid references for HttpError in lock.py
+* Fix an issue where pinning of the mock library would make installation fail
+
+1.2.3
+-----
+15-Jul-2015
+* suppress override of config settings by argparse defaults
+* properly detect ipv6 endpoints
+* add Python 2.7 testing
+
+
+1.2.2
+-----
+27-Apr-2015
+* Improve terminal logging with better report to actua sync state
+* Catch all exceptions to create better error reporting at the terminal
+* If log location is not available fall back to current working directory
+* Add a flag to indicate versioning support of endpoints
+* support object versioning operations
+* ensure logging is fully configured before any parsing to display errors
+ regardless of failure
+* set the version in ``__init__.py`` and display it when using help
+* log all initial settings and flags of the agent when it starts
+
+1.2.1
+-----
+* Parity in version release for DEB/RPMs to PyPI. Previous 1.2 release had
+ fixes available only for the Python package.
+
+1.2
+---
+* Improve usage for log (working better with logrotate)
+* Fixes for racing threads when shard number changes
+* Better logging of exceptions
+* Retry sync when transient errors are returned by the gateway.
+* Drops dependency on Python's ``request`` library (in favour of ``boto``)
+* Better support of objects when they are not found.
+* When there are buckets with no logs, process them as a full sync.
+* Fix mishandling of reserved characters in URLs.
--- /dev/null
+Copyright (c) 2013 Inktank Storage, Inc.
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
--- /dev/null
+include LICENSE
+include scripts/radosgw-agent
+include init-radosgw-agent
+include logrotate.conf
+prune radosgw_agent/tests
--- /dev/null
+====================================================================
+radosgw-agent -- synchronize data and users between radosgw clusters
+====================================================================
--- /dev/null
+#!/bin/sh
+set -e
+
+if command -v lsb_release >/dev/null 2>&1; then
+ case "$(lsb_release --id --short)" in
+ Ubuntu|Debian)
+ for package in python-virtualenv; do
+ if [ "$(dpkg --status -- $package 2>/dev/null|sed -n 's/^Status: //p')" != "install ok installed" ]; then
+ # add a space after old values
+ missing="${missing:+$missing }$package"
+ fi
+ done
+ if [ -n "$missing" ]; then
+ echo "$0: missing required packages, please install them:" 1>&2
+ echo " sudo apt-get install $missing"
+ exit 1
+ fi
+ ;;
+ esac
+else
+ if [ -f /etc/redhat-release ]; then
+ case "$(cat /etc/redhat-release | awk '{print $1}')" in
+ CentOS)
+ for package in python-virtualenv; do
+ if [ "$(rpm -qa $package 2>/dev/null)" == "" ]; then
+ missing="${missing:+$missing }$package"
+ fi
+ done
+ if [ -n "$missing" ]; then
+ echo "$0: missing required packages, please install them:" 1>&2
+ echo " sudo yum install $missing"
+ exit 1
+ fi
+ ;;
+ esac
+ fi
+fi
+
+test -d virtualenv || virtualenv virtualenv
+./virtualenv/bin/python setup.py develop
+test -e radosgw-agent || ln -s ./virtualenv/bin/radosgw-agent .
--- /dev/null
+radosgw-agent (1.2.6) stable; urgency=medium
+
+ * New upstream release
+
+ -- Alfredo Deza <adeza@redhat.com> Thu, 09 Jun 2016 12:46:44 +0000
+
+radosgw-agent (1.2.5) stable; urgency=medium
+
+ * New upstream release
+
+ -- Alfredo Deza <adeza@redhat.com> Wed, 30 Mar 2016 22:52:21 +0000
+
+radosgw-agent (1.2.4) stable; urgency=low
+
+ * New upstream release
+
+ -- Alfredo Deza <adeza@redhat.com> Wed, 12 Aug 2015 12:46:22 -0700
+
+radosgw-agent (1.2.3) stable; urgency=low
+
+ * New upstream release
+
+ -- Alfredo Deza <adeza@redhat.com> Wed, 15 Jul 2015 09:08:11 -0700
+
+radosgw-agent (1.2.2) stable; urgency=low
+
+ * New upstream release
+
+ -- Alfredo Deza <adeza@redhat.com> Mon, 27 Apr 2015 13:00:08 -0700
+
+radosgw-agent (1.2.1) stable; urgency=low
+
+ * New upstream release
+
+ -- Alfredo Deza <adeza@redhat.com> Mon, 09 Feb 2015 12:52:46 -0800
+
+radosgw-agent (1.2-1) precise; urgency=low
+
+ * new upstream release
+
+ -- Sandon Van Ness <sandon@inktank.com> Wed, 02 April 2014 11:25:54 -0800
+
+radosgw-agent (1.1-1) precise; urgency=low
+
+ * new upstream release
+
+ -- Gary Lowell <glowell@pudgy.ops.newdream.net> Thu, 21 Nov 2013 16:17:25 -0800
+
+radosgw-agent (1.0-1) stable; urgency=low
+
+ * Initial release
+
+ -- Gary Lowell <gary.lowell@inktank.com> Mon, 26 Aug 2013 09:19:47 -0700
--- /dev/null
+Source: radosgw-agent
+Maintainer: Sage Weil <sage@newdream.net>
+Uploaders: Sage Weil <sage@newdream.net>
+Section: admin
+Priority: optional
+Build-Depends: debhelper (>= 8), python-setuptools
+X-Python-Version: >= 2.6
+Standards-Version: 3.9.2
+Homepage: http://ceph.com/
+
+Package: radosgw-agent
+Architecture: all
+Depends: python,
+ python-argparse,
+ python-setuptools,
+ ${misc:Depends},
+ ${python:Depends}
+Description: Rados gateway agents.
--- /dev/null
+Files: *
+Copyright: (c) 2013 by Inktank Storage
+License: LGPL2.1 (see /usr/share/common-licenses/LGPL-2.1)
--- /dev/null
+/etc/ceph/radosgw-agent
+/var/log/ceph/radosgw-agent
+/var/run/ceph/radosgw-agent
--- /dev/null
+#!/usr/bin/make -f
+
+# Uncomment this to turn on verbose mode.
+export DH_VERBOSE=1
+
+%:
+ dh $@ --buildsystem python_distutils --with python2
+
+override_dh_installlogrotate:
+ cp logrotate.conf debian/radosgw-agent.logrotate
+ dh_installlogrotate
+
+override_dh_installinit:
+ install -m0644 init-radosgw-agent debian/radosgw-agent.init
+ dh_installinit --no-start
--- /dev/null
+#!/bin/sh
+# Start/stop radosgw-agent daemons
+# chkconfig: 2345 60 80
+
+### BEGIN INIT INFO
+# Provides: radosgw-agent
+# Required-Start: $remote_fs $named $network
+# Required-Stop: $remote_fs $named $network
+# Default-Start: 2 3 4 5
+# Default-Stop: 0 1 6
+# Short-Description: Start radosgw-agent at boot time
+# Description: Enable radosgw-agent services
+### END INIT INFO
+
+dir="/"
+config_path="/etc/ceph/radosgw-agent/default.conf"
+
+if [ $2 ]; then
+ config_path=$2
+fi
+
+if [ ! -f "$config_path" ]; then
+ echo "$0: configuration file $config_path not found"
+ exit 0
+fi
+
+cmd="/usr/bin/radosgw-agent -c $config_path"
+
+name=`basename $config_path`
+pid_file="/var/run/ceph/radosgw-agent/$name.pid"
+
+is_running() {
+ [ -e "$pid_file" ] || return 1
+ pid=`cat "$pid_file"`
+ [ -e "/proc/$pid" ] && grep -q "/usr/bin/radosgw-agent.-c.$config_path" "/proc/$pid/cmdline" && return 0
+ return 1
+}
+
+case "$1" in
+ start)
+ if is_running; then
+ echo "Already started"
+ exit 0
+ fi
+ echo "Starting radosgw-agent $name"
+ cd "$dir"
+ $cmd > /dev/null 2>&1 &
+ echo $! > "$pid_file"
+ if ! is_running; then
+ echo "Unable to start, see /var/log/ceph/radosgw-agent/"
+ exit 1
+ fi
+ ;;
+
+ stop)
+ if is_running; then
+ echo -n "Stopping radosgw-agent $name.."
+ pid=`cat "$pid_file"`
+ kill $pid
+ for i in {1..10}
+ do
+ if ! is_running; then
+ break
+ fi
+
+ echo -n "."
+ sleep 1
+ done
+
+ if is_running; then
+ echo "Not stopped; may still be shutting down or shutdown may have failed"
+ exit 1
+ else
+ echo "Stopped"
+ rm "$pid_file"
+ fi
+ else
+ echo "Not running"
+ fi
+ ;;
+ restart)
+ $0 stop $config_path
+ if is_running; then
+ echo "Unable to stop, will not attempt to start"
+ exit 1
+ fi
+ $0 start $config_path
+ ;;
+ status)
+ if is_running; then
+ echo "Running"
+ else
+ echo "Stopped"
+ exit 1
+ fi
+ ;;
+ *)
+ echo "Usage: $0 {start|stop|restart|status} [config-file]"
+ exit 1
+ ;;
+esac
+exit 0
--- /dev/null
+/var/log/ceph/radosgw-agent/*.log {
+ rotate 7
+ daily
+ compress
+ missingok
+ notifempty
+}
--- /dev/null
+Summary: Synchronize users and data between radosgw clusters
+Name: radosgw-agent
+Version: 1.2.6
+Release: 0%{?dist}
+Source0: https://pypi.python.org/packages/source/r/%{name}/%{name}-%{version}.tar.gz
+License: MIT
+Group: Development/Libraries
+BuildArch: noarch
+Requires: python-argparse
+Requires: PyYAML
+Requires: python-boto >= 2.10.0
+Requires: python-boto < 3.0.0
+BuildRequires: python-devel
+BuildRequires: python-setuptools
+URL: https://github.com/ceph/radosgw-agent
+
+%description
+The Ceph RADOS Gateway agent replicates the data of a master zone to a
+secondary zone.
+
+%prep
+%setup -q
+
+%build
+python setup.py build
+
+%install
+python setup.py install --single-version-externally-managed -O1 --root=$RPM_BUILD_ROOT
+install -m 0755 -D scripts/radosgw-agent $RPM_BUILD_ROOT%{_bindir}/radosgw-agent
+install -m 0644 -D logrotate.conf $RPM_BUILD_ROOT%{_sysconfdir}/logrotate.d/radosgw-agent
+install -m 0755 -D init-radosgw-agent $RPM_BUILD_ROOT%{_initrddir}/radosgw-agent
+mkdir -p $RPM_BUILD_ROOT%{_sysconfdir}/ceph/radosgw-agent
+mkdir -p $RPM_BUILD_ROOT%{_localstatedir}/log/ceph/radosgw-agent
+mkdir -p $RPM_BUILD_ROOT%{_localstatedir}/run/ceph/radosgw-agent
+
+%files
+%doc LICENSE
+%dir %{_sysconfdir}/ceph/radosgw-agent
+%dir %{_localstatedir}/log/ceph/radosgw-agent
+%dir %{_localstatedir}/run/ceph/radosgw-agent
+%config(noreplace) %{_sysconfdir}/logrotate.d/radosgw-agent
+%{_bindir}/radosgw-agent
+%{_initrddir}/radosgw-agent
+%{python_sitelib}/radosgw_agent*/
--- /dev/null
+from radosgw_agent.util import configuration as _configuration
+
+config = _configuration.Configuration()
+
+__version__ = '1.2.6'
--- /dev/null
+from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+import argparse
+import contextlib
+import logging
+import logging.handlers
+import os.path
+import yaml
+import sys
+import time
+import base64
+import hmac
+import sha
+import urllib2
+from urllib2 import URLError, HTTPError
+
+import radosgw_agent
+from radosgw_agent import client
+from radosgw_agent import util
+from radosgw_agent.util import string
+from radosgw_agent.util.decorators import catches
+from radosgw_agent.exceptions import AgentError, RegionMapError, InvalidProtocol
+from radosgw_agent import sync, config
+
+log = logging.getLogger('radosgw_agent')
+
+
+def check_positive_int(string):
+ value = int(string)
+ if value < 1:
+ msg = '%r is not a positive integer' % string
+ raise argparse.ArgumentTypeError(msg)
+ return value
+
+
+def check_endpoint(endpoint):
+ try:
+ return client.parse_endpoint(endpoint)
+ except InvalidProtocol as e:
+ raise argparse.ArgumentTypeError(str(e))
+ except client.InvalidHost as e:
+ raise argparse.ArgumentTypeError(str(e))
+
+
+def parse_args():
+ conf_parser = argparse.ArgumentParser(add_help=False)
+ conf_parser.add_argument(
+ '-c', '--conf',
+ type=file,
+ help='configuration file'
+ )
+ args, remaining = conf_parser.parse_known_args()
+ log_dir = '/var/log/ceph/radosgw-agent/'
+ log_file = 'radosgw-agent.log'
+ if args.conf is not None:
+ log_file = os.path.basename(args.conf.name)
+ defaults = dict(
+ sync_scope='incremental',
+ log_lock_time=20,
+ log_file=os.path.join(log_dir, log_file),
+ )
+ if args.conf is not None:
+ with contextlib.closing(args.conf):
+ config = yaml.safe_load_all(args.conf)
+ for new in config:
+ defaults.update(new)
+
+ parser = argparse.ArgumentParser(
+ parents=[conf_parser],
+ description='Synchronize radosgw installations',
+ )
+ verbosity = parser.add_mutually_exclusive_group(required=False)
+ verbosity.add_argument(
+ '-v', '--verbose',
+ action='store_true', dest='verbose',
+ help='be more verbose',
+ )
+ verbosity.add_argument(
+ '-q', '--quiet',
+ action='store_true', dest='quiet',
+ help='be less verbose',
+ )
+ parser.add_argument(
+ '--src-access-key',
+ required='src_access_key' not in defaults,
+ help='access key for source zone system user',
+ )
+ parser.add_argument(
+ '--src-secret-key',
+ required='src_secret_key' not in defaults,
+ help='secret key for source zone system user',
+ )
+ parser.add_argument(
+ '--dest-access-key',
+ required='dest_access_key' not in defaults,
+ help='access key for destination zone system user',
+ )
+ parser.add_argument(
+ '--dest-secret-key',
+ required='dest_secret_key' not in defaults,
+ help='secret key for destination zone system user',
+ )
+ parser.add_argument(
+ 'destination',
+ type=check_endpoint,
+ nargs=None if 'destination' not in defaults else '?',
+ help='radosgw endpoint to which to sync '
+ '(e.g. http://zone2.example.org:8080)',
+ )
+ src_options = parser.add_mutually_exclusive_group(required=False)
+ src_options.add_argument(
+ '--source',
+ type=check_endpoint,
+ help='radosgw endpoint from which to sync '
+ '(e.g. http://zone1.example.org:8080)',
+ )
+ src_options.add_argument(
+ '--src-zone',
+ help='radosgw zone from which to sync',
+ )
+ parser.add_argument(
+ '--metadata-only',
+ action='store_true',
+ help='sync bucket and user metadata, but not bucket contents',
+ )
+ parser.add_argument(
+ '--versioned',
+ action='store_true',
+ help='indicates that radosgw endpoints have object versioning enabled',
+ )
+ parser.add_argument(
+ '--num-workers',
+ default=1,
+ type=check_positive_int,
+ help='number of items to sync at once',
+ )
+ parser.add_argument(
+ '--sync-scope',
+ choices=['full', 'incremental'],
+ default='incremental',
+ help='synchronize everything (for a new region) or only things that '
+ 'have changed since the last run',
+ )
+ parser.add_argument(
+ '--lock-timeout',
+ type=check_positive_int,
+ default=60,
+ help='timeout in seconds after which a log segment lock will expire if '
+ 'not refreshed',
+ )
+ parser.add_argument(
+ '--log-file',
+ help='where to store log output',
+ )
+ parser.add_argument(
+ '--max-entries',
+ type=check_positive_int,
+ default=1000,
+ help='maximum number of log entries to process at once during '
+ 'continuous sync',
+ )
+ parser.add_argument(
+ '--incremental-sync-delay',
+ type=check_positive_int,
+ default=30,
+ help='seconds to wait between syncs',
+ )
+ parser.add_argument(
+ '--object-sync-timeout',
+ type=check_positive_int,
+ default=60 * 60 * 60,
+ help='seconds to wait for an individual object to sync before '
+ 'assuming failure',
+ )
+ parser.add_argument(
+ '--prepare-error-delay',
+ type=check_positive_int,
+ default=10,
+ help='seconds to wait before retrying when preparing '
+ 'an incremental sync fails',
+ )
+ parser.add_argument(
+ '--rgw-data-log-window',
+ type=check_positive_int,
+ default=30,
+ help='period until a data log entry is valid - '
+ 'must match radosgw configuration',
+ )
+ parser.add_argument(
+ '--test-server-host',
+ # host to run a simple http server for testing the sync agent on,
+ help=argparse.SUPPRESS,
+ )
+ parser.add_argument(
+ '--test-server-port',
+ # port to run a simple http server for testing the sync agent on,
+ type=check_positive_int,
+ default=8080,
+ help=argparse.SUPPRESS,
+ )
+ parser.set_defaults(**defaults)
+ return parser.parse_args(remaining)
+
+
+def sign_string(
+ secret_key,
+ verb="GET",
+ content_md5="",
+ content_type="",
+ date=None,
+ canonical_amz_headers="",
+ canonical_resource="/?versions"
+ ):
+
+ date = date or time.asctime(time.gmtime())
+ to_sign = string.concatenate(verb, content_md5, content_type, date)
+ to_sign = string.concatenate(
+ canonical_amz_headers,
+ canonical_resource,
+ newline=False
+ )
+ return base64.b64encode(hmac.new(secret_key, to_sign, sha).digest())
+
+
+def check_versioning(endpoint):
+ date = time.asctime(time.gmtime())
+ signed_string = sign_string(endpoint.secret_key, date=date)
+
+ url = str(endpoint) + '/?versions'
+ headers = {
+ 'Authorization': 'AWS ' + endpoint.access_key + ':' + signed_string,
+ 'Date': date
+ }
+
+ data = None
+ req = urllib2.Request(url, data, headers)
+ try:
+ response = urllib2.urlopen(req)
+ response.read()
+ log.debug('%s endpoint supports versioning' % endpoint)
+ return True
+ except HTTPError as error:
+ if error.code == 403:
+ log.info('%s endpoint does not support versioning' % endpoint)
+ log.warning('encountered issues reaching to endpoint %s' % endpoint)
+ log.warning(error)
+ except URLError as error:
+ log.error("was unable to connect to %s" % url)
+ log.error(error)
+ return False
+
+
+class TestHandler(BaseHTTPRequestHandler):
+ """HTTP handler for testing radosgw-agent.
+
+ This should never be used outside of testing.
+ """
+ num_workers = None
+ lock_timeout = None
+ max_entries = None
+ rgw_data_log_window = 30
+ src = None
+ dest = None
+
+ def do_POST(self):
+ log = logging.getLogger(__name__)
+ status = 200
+ resp = ''
+ sync_cls = None
+ if self.path.startswith('/metadata/full'):
+ sync_cls = sync.MetaSyncerFull
+ elif self.path.startswith('/metadata/incremental'):
+ sync_cls = sync.MetaSyncerInc
+ elif self.path.startswith('/data/full'):
+ sync_cls = sync.DataSyncerFull
+ elif self.path.startswith('/data/incremental'):
+ sync_cls = sync.DataSyncerInc
+ else:
+ log.warn('invalid request, ignoring')
+ status = 400
+ resp = 'bad path'
+
+ try:
+ if sync_cls is not None:
+ syncer = sync_cls(TestHandler.src, TestHandler.dest,
+ TestHandler.max_entries,
+ rgw_data_log_window=TestHandler.rgw_data_log_window,
+ object_sync_timeout=TestHandler.object_sync_timeout)
+ syncer.prepare()
+ syncer.sync(
+ TestHandler.num_workers,
+ TestHandler.lock_timeout,
+ )
+ except Exception as e:
+ log.exception('error during sync')
+ status = 500
+ resp = str(e)
+
+ self.log_request(status, len(resp))
+ if status >= 400:
+ self.send_error(status, resp)
+ else:
+ self.send_response(status)
+ self.end_headers()
+
+
+def set_args_to_config(args):
+ """
+ Ensure that the arguments passed in to the CLI are slapped onto the config
+ object so that it can be referenced throghout the agent
+ """
+ if 'args' not in config:
+ config['args'] = args.__dict__
+
+
+def log_header():
+ version = radosgw_agent.__version__
+ lines = [
+ ' __ __ __ ___ ___ ',
+ '/__` \ / |\ | / ` /\ / _` |__ |\ | | ',
+ '.__/ | | \| \__, /~~\ \__> |___ | \| | ',
+ ' v%s' % version,
+ ]
+ for line in lines:
+ log.info(line)
+ log.info('agent options:')
+
+ secrets = [
+ 'src_secret_key', 'dest_secret_key',
+ 'src_access_key', 'dest_access_key',
+ ]
+
+ def log_dict(k, _dict, level=1):
+ padding = ' ' * level
+ log.info('%s%s:' % (padding, k))
+ for key, value in sorted(_dict.items()):
+ if hasattr(value, '_dict'):
+ level += 1
+ return log_dict(key, value, level)
+ if key in secrets:
+ value = '*' * 16
+ log.info('%s%-30s: %s' % (padding+' ', key, value))
+
+ for k, v in config.items():
+ if hasattr(v, '_dict'):
+ log_dict(k, v)
+ else:
+ log.info(' %-30s: %s' % (k, v))
+
+
+@catches((KeyboardInterrupt, RuntimeError, AgentError,), handle_all=True)
+def main():
+ # root (a.k.a. 'parent') and agent loggers
+ root_logger = logging.getLogger()
+
+ # allow all levels at root_logger, handlers control individual levels
+ root_logger.setLevel(logging.DEBUG)
+
+ # Console handler, meant only for user-facing information
+ console_loglevel = logging.INFO
+
+ sh = logging.StreamHandler()
+ sh.setFormatter(util.log.color_format())
+ # this console level set here before reading options from the arguments
+ # so that we can get errors if they pop up before
+ sh.setLevel(console_loglevel)
+
+ agent_logger = logging.getLogger('radosgw_agent')
+ agent_logger.addHandler(sh)
+
+ # After initial logging is configured, now parse args
+ args = parse_args()
+
+ # File handler
+ log_file = args.log_file or 'radosgw-agent.log'
+ try:
+ fh = logging.handlers.WatchedFileHandler(log_file)
+ except IOError as err:
+ agent_logger.warning('unable to use log location: %s' % log_file)
+ agent_logger.warning(err)
+ agent_logger.warning('will fallback to ./radosgw-agent.log')
+ # if the location is not present, fallback to cwd
+ fh = logging.handlers.WatchedFileHandler('radosgw-agent.log')
+
+ fh.setLevel(logging.DEBUG)
+ fh.setFormatter(logging.Formatter(util.log.BASE_FORMAT))
+
+ root_logger.addHandler(fh)
+
+ if args.verbose:
+ console_loglevel = logging.DEBUG
+ elif args.quiet:
+ console_loglevel = logging.WARN
+
+ # now that we have parsed the actual log level we need
+ # reset it in the handler
+ sh.setLevel(console_loglevel)
+
+ # after loggin is set ensure that the arguments are present in the
+ # config object
+ set_args_to_config(args)
+
+ log_header()
+ dest = args.destination
+ dest.access_key = args.dest_access_key
+ dest.secret_key = args.dest_secret_key
+ src = args.source or client.Endpoint(None, None, None)
+ if args.src_zone:
+ src.zone = args.src_zone
+ dest_conn = client.connection(dest)
+
+ try:
+ region_map = client.get_region_map(dest_conn)
+ except AgentError:
+ # anything that we know about and are correctly raising should
+ # just get raised so that the decorator can handle it
+ raise
+ except Exception as error:
+ # otherwise, we have the below exception that will nicely deal with
+ # explaining what happened
+ raise RegionMapError(error)
+
+ client.configure_endpoints(region_map, dest, src, args.metadata_only)
+
+ src.access_key = args.src_access_key
+ src.secret_key = args.src_secret_key
+
+ if config['args']['versioned']:
+ log.debug('versioned flag enabled, overriding versioning check')
+ config['use_versioning'] = True
+ else:
+ config['use_versioning'] = check_versioning(src)
+
+ if args.test_server_host:
+ log.warn('TEST MODE - do not run unless you are testing this program')
+ TestHandler.src = src
+ TestHandler.dest = dest
+ TestHandler.num_workers = args.num_workers
+ TestHandler.lock_timeout = args.lock_timeout
+ TestHandler.max_entries = args.max_entries
+ TestHandler.rgw_data_log_window = args.rgw_data_log_window
+ TestHandler.object_sync_timeout = args.object_sync_timeout
+ server = HTTPServer((args.test_server_host, args.test_server_port),
+ TestHandler)
+ server.serve_forever()
+ sys.exit()
+
+ if args.sync_scope == 'full':
+ meta_cls = sync.MetaSyncerFull
+ data_cls = sync.DataSyncerFull
+ else:
+ meta_cls = sync.MetaSyncerInc
+ data_cls = sync.DataSyncerInc
+
+ meta_syncer = meta_cls(src, dest, args.max_entries)
+ data_syncer = data_cls(src, dest, args.max_entries,
+ rgw_data_log_window=args.rgw_data_log_window,
+ object_sync_timeout=args.object_sync_timeout)
+
+ # fetch logs first since data logs need to wait before becoming usable
+ # due to rgw's window of data log updates during which the bucket index
+ # log may still be updated without the data log getting a new entry for
+ # the bucket
+ sync.prepare_sync(meta_syncer, args.prepare_error_delay)
+ if not args.metadata_only:
+ sync.prepare_sync(data_syncer, args.prepare_error_delay)
+
+ if args.sync_scope == 'full':
+ log.info('syncing all metadata')
+ meta_syncer.sync(args.num_workers, args.lock_timeout)
+ if not args.metadata_only:
+ log.info('syncing all data')
+ data_syncer.sync(args.num_workers, args.lock_timeout)
+ log.info('Finished full sync. Check logs to see any issues that '
+ 'incremental sync will retry.')
+ else:
+ sync.incremental_sync(meta_syncer, data_syncer,
+ args.num_workers,
+ args.lock_timeout,
+ args.incremental_sync_delay,
+ args.metadata_only,
+ args.prepare_error_delay)
--- /dev/null
+import boto
+import functools
+import json
+import logging
+import os
+import random
+import socket
+import sys
+import urllib
+from urlparse import urlparse
+
+from boto.exception import BotoServerError
+from boto.s3.connection import S3Connection
+
+from radosgw_agent import request as aws_request
+from radosgw_agent import config
+from radosgw_agent import exceptions as exc
+from radosgw_agent.util import get_dev_logger, network
+from radosgw_agent.constants import DEFAULT_TIME
+from radosgw_agent.exceptions import NetworkError
+
+log = logging.getLogger(__name__)
+dev_log = get_dev_logger(__name__)
+
+
+class Endpoint(object):
+ def __init__(self, host, port, secure,
+ access_key=None, secret_key=None, region=None, zone=None):
+ self.host = host
+ default_port = 443 if secure else 80
+ self.port = port or default_port
+ self.secure = secure
+ self.access_key = access_key
+ self.secret_key = secret_key
+ self.region = region
+ self.zone = zone
+
+ def __eq__(self, other):
+ if self.host != other.host:
+ return False
+ if self.port == other.port:
+ return True
+ # if self and other are mixed http/https with default ports,
+ # i.e. http://example.com and https://example.com, consider
+ # them the same
+
+ def diff_only_default_ports(a, b):
+ return a.secure and a.port == 443 and not b.secure and b.port == 80
+ return (diff_only_default_ports(self, other) or
+ diff_only_default_ports(other, self))
+
+ def __repr__(self):
+ return 'Endpoint(host={host}, port={port}, secure={secure})'.format(
+ host=self.host,
+ port=self.port,
+ secure=self.secure)
+
+ def __str__(self):
+ scheme = 'https' if self.secure else 'http'
+ return '{scheme}://{host}:{port}'.format(scheme=scheme,
+ host=self.host,
+ port=self.port)
+def normalize_netloc(url_obj, port):
+ """
+ Only needed for IPV6 addresses because ``urlparse`` is so very
+ inconsistent with parsing::
+
+ In [5]: print urlparse('http://[e40:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922]:8080').hostname
+ e40:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922
+
+ In [6]: print urlparse('http://e40:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922').hostname
+ e40
+
+ In Python 2.6 this situation is even worse, urlparse is completely unreliable for things
+ like looking up a port on an IPV6 url.
+ """
+ netloc = url_obj.netloc
+ if port is not None:
+ # we need to split because we don't want it as part of the URL
+ netloc = url_obj.netloc.split(':%s' % port)[0]
+ if not url_obj.netloc.startswith('[') and not url_obj.netloc.endswith(']'):
+ netloc = '[%s]' % url_obj.netloc
+ return netloc
+
+
+def detect_ipv6_port(url_obj):
+ netloc = url_obj.netloc
+ try:
+ port = url_obj.port
+ except ValueError:
+ port = None
+
+ # insist on checking the port because urlparse may be lying to us
+ netloc_parts = netloc.split(']:')
+ if len(netloc_parts) == 2:
+ _, port = netloc_parts
+ if port:
+ return int(port)
+ return port
+
+
+def parse_endpoint(endpoint):
+ url = urlparse(endpoint)
+ # IPV6 addresses will not work correctly with urlparse and Endpoint if we
+ # just use netloc as default. IPV4 works with .hostname while IPV6 works
+ # with .netloc
+ # for example an IPV6 address like e40:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922
+ # would evaluate to 'e40' if we used .hostname
+
+ # this looks repetitive but we don't know yet if we are IPV6
+ try:
+ port = url.port
+ except ValueError:
+ port = None
+
+ if network.is_ipv6(url.netloc):
+ port = detect_ipv6_port(url)
+ if (sys.version_info[0], sys.version_info[1]) <= (2, 6):
+ log.error(
+ 'Python 2.6 does not support IPV6 addresses, cannot continue'
+ )
+ log.error('host can be used instead of raw IPV6 addresses')
+ if os.environ.get('PYTEST') is None: # don't bail on tests
+ raise RuntimeError(
+ 'IPV6 address was used for endpoint: %s' % endpoint
+ )
+ host = normalize_netloc(url, port)
+ else:
+ host = url.hostname
+ if url.scheme not in ['http', 'https']:
+ raise exc.InvalidProtocol('invalid protocol %r' % url.scheme)
+ if not url.hostname:
+ raise exc.InvalidHost('no hostname in %r' % endpoint)
+ return Endpoint(host, port, url.scheme == 'https')
+
+code_to_exc = {
+ 404: exc.NotFound,
+ }
+
+
+def boto_call(func):
+ @functools.wraps(func)
+ def translate_exception(*args, **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except boto.exception.S3ResponseError as e:
+ raise code_to_exc.get(e.status, exc.HttpError)(e.status, e.body)
+ return translate_exception
+
+
+def check_result_status(result):
+ if result.status / 100 != 2:
+ raise code_to_exc.get(result.status,
+ exc.HttpError)(result.status, result.reason)
+
+
+def url_safe(component):
+ if isinstance(component, basestring):
+ string = component.encode('utf8')
+ else:
+ string = str(component)
+ return urllib.quote(string)
+
+
+def request(connection, type_, resource, params=None, headers=None,
+ data=None, expect_json=True, special_first_param=None, _retries=3):
+ if headers is None:
+ headers = {}
+
+ if type_ in ['put', 'post']:
+ headers['Content-Type'] = 'application/json; charset=UTF-8'
+
+ request_data = data if data else ''
+ if params is None:
+ params = {}
+ safe_params = dict([(k, url_safe(v)) for k, v in params.iteritems()])
+ connection.count_request()
+ request = aws_request.base_http_request(connection.s3_connection,
+ type_.upper(),
+ resource=resource,
+ special_first_param=special_first_param,
+ headers=headers,
+ data=request_data,
+ params=safe_params)
+
+ url = '{protocol}://{host}{path}'.format(protocol=request.protocol,
+ host=request.host,
+ path=request.path)
+
+ request.authorize(connection=connection)
+
+ boto.log.debug('url = %r\nparams=%r\nheaders=%r\ndata=%r',
+ url, params, request.headers, data)
+
+ try:
+ result = aws_request.make_request(
+ connection.s3_connection,
+ type_.upper(),
+ resource=resource,
+ special_first_param=special_first_param,
+ headers=headers,
+ data=request_data,
+ params=safe_params,
+ _retries=_retries)
+ except socket.error as error:
+ msg = 'unable to connect to %s %s' % (request.host, error)
+ raise NetworkError(msg)
+
+ except BotoServerError as error:
+ check_result_status(error)
+
+ check_result_status(result)
+
+ if data or not expect_json:
+ return result
+
+ return json.loads(result.read())
+
+
+def get_metadata(connection, section, name):
+ return request(connection, 'get', 'admin/metadata/' + section,
+ params=dict(key=name))
+
+
+def update_metadata(connection, section, name, metadata):
+ if not isinstance(metadata, basestring):
+ metadata = json.dumps(metadata)
+ return request(connection, 'put', 'admin/metadata/' + section,
+ params=dict(key=name), data=metadata)
+
+
+def delete_metadata(connection, section, name):
+ return request(connection, 'delete', 'admin/metadata/' + section,
+ params=dict(key=name), expect_json=False)
+
+
+def get_metadata_sections(connection):
+ return request(connection, 'get', 'admin/metadata')
+
+
+def list_metadata_keys(connection, section):
+ return request(connection, 'get', 'admin/metadata/' + section)
+
+
+def get_op_state(connection, client_id, op_id, bucket, obj):
+ return request(connection, 'get', 'admin/opstate',
+ params={
+ 'op-id': op_id,
+ 'object': u'{0}/{1}'.format(bucket, obj.name),
+ 'client-id': client_id,
+ }
+ )
+
+
+def remove_op_state(connection, client_id, op_id, bucket, obj):
+ return request(connection, 'delete', 'admin/opstate',
+ params={
+ 'op-id': op_id,
+ 'object': u'{0}/{1}'.format(bucket, obj.name),
+ 'client-id': client_id,
+ },
+ expect_json=False,
+ )
+
+
+def get_bucket_list(connection):
+ return list_metadata_keys(connection, 'bucket')
+
+
+@boto_call
+def list_objects_in_bucket(connection, bucket_name):
+ versioned = config['use_versioning']
+
+ # use the boto library to do this
+ bucket = connection.get_bucket(bucket_name)
+ list_call = bucket.list_versions if versioned else bucket.list
+ try:
+ for key in list_call():
+ yield key
+ except boto.exception.S3ResponseError as e:
+ # since this is a generator, the exception will be raised when
+ # it's read, rather than when this call returns, so raise a
+ # unique exception to distinguish this from client errors from
+ # other calls
+ if e.status == 404:
+ raise exc.BucketEmpty()
+ else:
+ raise
+
+
+@boto_call
+def mark_delete_object(connection, bucket_name, obj, params=None):
+ """
+ Marking an object for deletion is only necessary for versioned objects, we
+ should not try these calls for non-versioned ones.
+
+ Usually, only full-sync operations will use this call, incremental should
+ perform actual delete operations with ``delete_versioned_object``
+ """
+ params = params or {}
+
+ params['rgwx-version-id'] = obj.version_id
+ params['rgwx-versioned-epoch'] = obj.VersionedEpoch
+
+ path = u'{bucket}/{object}'.format(
+ bucket=bucket_name,
+ object=obj.name,
+ )
+
+ return request(connection, 'delete', path,
+ params=params,
+ expect_json=False)
+
+
+@boto_call
+def delete_versioned_object(connection, bucket_name, obj):
+ """
+ Perform a delete on a versioned object, the requirements for these types
+ of requests is to be able to pass the ``versionID`` as a query argument
+ """
+ # if obj.delete_marker is False we should not delete this and we shouldn't
+ # have been called, so return without doing anything
+ if getattr(obj, 'delete_marker', False) is False:
+ log.info('obj: %s has `delete_marker=False`, will skip' % obj.name)
+ return
+
+ params = {}
+
+ params['rgwx-version-id'] = obj.version_id
+ params['rgwx-versioned-epoch'] = obj.VersionedEpoch
+ params['versionID'] = obj.version_id
+
+ path = u'{bucket}/{object}'.format(
+ bucket=bucket_name,
+ object=obj.name,
+ )
+
+ return request(connection, 'delete', path,
+ params=params,
+ expect_json=False)
+
+
+@boto_call
+def delete_object(connection, bucket_name, obj):
+ if is_versioned(obj):
+ log.debug('performing a delete for versioned obj: %s' % obj.name)
+ delete_versioned_object(connection, bucket_name, obj)
+ else:
+ bucket = connection.get_bucket(bucket_name)
+ bucket.delete_key(obj.name)
+
+
+def is_versioned(obj):
+ """
+ Check if a given object is versioned by inspecting some of its attributes.
+ """
+ # before any heuristic, newer versions of RGW will tell if an obj is
+ # versioned so try that first
+ if hasattr(obj, 'versioned'):
+ return obj.versioned
+
+ if not hasattr(obj, 'VersionedEpoch'):
+ # overly paranoid here, an object that is not versioned should *never*
+ # have a `VersionedEpoch` attribute
+ if getattr(obj, 'version_id', None):
+ if obj.version_id is None:
+ return False
+ return True # probably will never get here
+ return False
+ return True
+
+
+def sync_object_intra_region(connection, bucket_name, obj, src_zone,
+ client_id, op_id):
+
+ params = {
+ 'rgwx-source-zone': src_zone,
+ 'rgwx-client-id': client_id,
+ 'rgwx-op-id': op_id,
+ }
+
+ if is_versioned(obj):
+ log.debug('detected obj as versioned: %s' % obj.name)
+ log.debug('obj attributes are:')
+ for k in dir(obj):
+ if not k.startswith('_'):
+ v = getattr(obj, k, None)
+ log.debug('%s.%s = %s' % (obj.name, k, v))
+
+ # set the extra params to support versioned operations
+ params['rgwx-version-id'] = obj.version_id
+ params['rgwx-versioned-epoch'] = obj.VersionedEpoch
+
+ # delete_marker may not exist in the obj
+ if getattr(obj, 'delete_marker', None) is True:
+ log.debug('obj %s has a delete_marker, marking for deletion' % obj.name)
+ # when the object has a delete marker we need to create it with
+ # a delete marker on the destination rather than copying
+ return mark_delete_object(connection, bucket_name, obj, params=params)
+
+ path = u'{bucket}/{object}'.format(
+ bucket=bucket_name,
+ object=obj.name,
+ )
+
+ return request(connection, 'put', path,
+ params=params,
+ headers={
+ 'x-amz-copy-source': url_safe('%s/%s' % (bucket_name, obj.name)),
+ },
+ expect_json=False)
+
+
+def lock_shard(connection, lock_type, shard_num, zone_id, timeout, locker_id):
+ return request(connection, 'post', 'admin/log',
+ params={
+ 'type': lock_type,
+ 'id': shard_num,
+ 'length': timeout,
+ 'zone-id': zone_id,
+ 'locker-id': locker_id,
+ },
+ special_first_param='lock',
+ expect_json=False)
+
+
+def unlock_shard(connection, lock_type, shard_num, zone_id, locker_id):
+ return request(connection, 'post', 'admin/log',
+ params={
+ 'type': lock_type,
+ 'id': shard_num,
+ 'locker-id': locker_id,
+ 'zone-id': zone_id,
+ },
+ special_first_param='unlock',
+ expect_json=False)
+
+
+def _id_name(type_):
+ return 'bucket-instance' if type_ == 'bucket-index' else 'id'
+
+
+def get_log(connection, log_type, marker, max_entries, id_):
+ key = _id_name(log_type)
+ return request(connection, 'get', 'admin/log',
+ params={
+ 'type': log_type,
+ key: id_,
+ 'marker': marker,
+ 'max-entries': max_entries,
+ },
+ )
+
+
+def get_log_info(connection, log_type, id_):
+ key = _id_name(log_type)
+ return request(
+ connection, 'get', 'admin/log',
+ params={
+ 'type': log_type,
+ key: id_,
+ },
+ special_first_param='info',
+ )
+
+
+def num_log_shards(connection, shard_type):
+ out = request(connection, 'get', 'admin/log', dict(type=shard_type))
+ return out['num_objects']
+
+
+def set_worker_bound(connection, type_, marker, timestamp,
+ daemon_id, id_, data=None, sync_type='incremental'):
+ """
+
+ :param sync_type: The type of synchronization that should be attempted by
+ the agent, defaulting to "incremental" but can also be "full".
+ """
+ if data is None:
+ data = []
+ key = _id_name(type_)
+ boto.log.debug('set_worker_bound: data = %r', data)
+ return request(
+ connection, 'post', 'admin/replica_log',
+ params={
+ 'type': type_,
+ key: id_,
+ 'marker': marker,
+ 'time': timestamp,
+ 'daemon_id': daemon_id,
+ 'sync-type': sync_type,
+ },
+ data=json.dumps(data),
+ special_first_param='work_bound',
+ )
+
+
+def del_worker_bound(connection, type_, daemon_id, id_):
+ key = _id_name(type_)
+ return request(
+ connection, 'delete', 'admin/replica_log',
+ params={
+ 'type': type_,
+ key: id_,
+ 'daemon_id': daemon_id,
+ },
+ special_first_param='work_bound',
+ expect_json=False,
+ )
+
+
+def get_worker_bound(connection, type_, id_):
+ key = _id_name(type_)
+ try:
+ out = request(
+ connection, 'get', 'admin/replica_log',
+ params={
+ 'type': type_,
+ key: id_,
+ },
+ special_first_param='bounds',
+ )
+ dev_log.debug('get_worker_bound returned: %r', out)
+ except exc.NotFound:
+ dev_log.debug('no worker bound found for bucket instance "%s"',
+ id_)
+ # if no worker bounds have been set, start from the beginning
+ # returning fallback, default values
+ return dict(
+ marker=' ',
+ oldest_time=DEFAULT_TIME,
+ retries=[]
+ )
+
+ retries = set()
+ for item in out['markers']:
+ names = [retry['name'] for retry in item['items_in_progress']]
+ retries = retries.union(names)
+ out['retries'] = retries
+ return out
+
+
+class Zone(object):
+ def __init__(self, zone_info):
+ self.name = zone_info['name']
+ self.is_master = False
+ self.endpoints = [parse_endpoint(e) for e in zone_info['endpoints']]
+ self.log_meta = zone_info['log_meta'] == 'true'
+ self.log_data = zone_info['log_data'] == 'true'
+
+ def __repr__(self):
+ return str(self)
+
+ def __str__(self):
+ return self.name
+
+
+class Region(object):
+ def __init__(self, region_info):
+ self.name = region_info['key']
+ self.is_master = region_info['val']['is_master'] == 'true'
+ self.zones = {}
+ for zone_info in region_info['val']['zones']:
+ zone = Zone(zone_info)
+ self.zones[zone.name] = zone
+ if zone.name == region_info['val']['master_zone']:
+ zone.is_master = True
+ self.master_zone = zone
+ assert hasattr(self, 'master_zone'), \
+ 'No master zone found for region ' + self.name
+
+ def __repr__(self):
+ return str(self)
+
+ def __str__(self):
+ return str(self.zones.keys())
+
+
+class RegionMap(object):
+ def __init__(self, region_map):
+ self.regions = {}
+ for region_info in region_map['regions']:
+ region = Region(region_info)
+ self.regions[region.name] = region
+ if region.is_master:
+ self.master_region = region
+ assert hasattr(self, 'master_region'), \
+ 'No master region found in region map'
+
+ def __repr__(self):
+ return str(self)
+
+ def __str__(self):
+ return str(self.regions)
+
+ def find_endpoint(self, endpoint):
+ for region in self.regions.itervalues():
+ for zone in region.zones.itervalues():
+ if endpoint in zone.endpoints or endpoint.zone == zone.name:
+ return region, zone
+ raise exc.ZoneNotFound('%s not found in region map' % endpoint)
+
+
+def get_region_map(connection):
+ region_map = request(connection, 'get', 'admin/config')
+ return RegionMap(region_map)
+
+
+def _validate_sync_dest(dest_region, dest_zone):
+ if dest_region.is_master and dest_zone.is_master:
+ raise exc.InvalidZone('destination cannot be master zone of master region')
+
+
+def _validate_sync_source(src_region, src_zone, dest_region, dest_zone,
+ meta_only):
+ if not src_zone.is_master:
+ raise exc.InvalidZone('source zone %s must be a master zone' % src_zone.name)
+ if (src_region.name == dest_region.name and
+ src_zone.name == dest_zone.name):
+ raise exc.InvalidZone('source and destination must be different zones')
+ if not src_zone.log_meta:
+ raise exc.InvalidZone('source zone %s must have metadata logging enabled' % src_zone.name)
+ if not meta_only and not src_zone.log_data:
+ raise exc.InvalidZone('source zone %s must have data logging enabled' % src_zone.name)
+ if not meta_only and src_region.name != dest_region.name:
+ raise exc.InvalidZone('data sync can only occur between zones in the same region')
+ if not src_zone.endpoints:
+ raise exc.InvalidZone('region map contains no endpoints for default source zone %s' % src_zone.name)
+
+
+def configure_endpoints(region_map, dest_endpoint, src_endpoint, meta_only):
+ print('region map is: %r' % region_map)
+
+ dest_region, dest_zone = region_map.find_endpoint(dest_endpoint)
+ _validate_sync_dest(dest_region, dest_zone)
+
+ # source may be specified by http endpoint or zone name
+ if src_endpoint.host or src_endpoint.zone:
+ src_region, src_zone = region_map.find_endpoint(src_endpoint)
+ else:
+ # try the master zone in the same region, then the master zone
+ # in the master region
+ try:
+ _validate_sync_source(dest_region, dest_region.master_zone,
+ dest_region, dest_zone, meta_only)
+ src_region, src_zone = dest_region, dest_region.master_zone
+ except exc.InvalidZone as e:
+ log.debug('source region %s zone %s unaccetpable: %s',
+ dest_region.name, dest_region.master_zone.name, e)
+ master_region = region_map.master_region
+ src_region, src_zone = master_region, master_region.master_zone
+
+ _validate_sync_source(src_region, src_zone, dest_region, dest_zone,
+ meta_only)
+
+ # choose a random source endpoint if one wasn't specified
+ if not src_endpoint.host:
+ endpoint = random.choice(src_zone.endpoints)
+ src_endpoint.host = endpoint.host
+ src_endpoint.port = endpoint.port
+ src_endpoint.secure = endpoint.secure
+
+ # fill in region and zone names
+ dest_endpoint.region = dest_region
+ dest_endpoint.zone = dest_zone
+ src_endpoint.region = src_region
+ src_endpoint.zone = src_zone
+
+
+class S3ConnectionWrapper(object):
+ def __init__(self, endpoint, debug):
+ self.endpoint = endpoint
+ self.debug = debug
+ self.s3_connection = None
+ self.reqs_before_reset = 512
+ self._recreate_s3_connection()
+
+ def count_request(self):
+ self.num_requests += 1
+ if self.num_requests > self.reqs_before_reset:
+ self._recreate_s3_connection()
+
+ def _recreate_s3_connection(self):
+ self.num_requests = 0
+ self.s3_connection = S3Connection(
+ aws_access_key_id=self.endpoint.access_key,
+ aws_secret_access_key=self.endpoint.secret_key,
+ is_secure=self.endpoint.secure,
+ host=self.endpoint.host,
+ port=self.endpoint.port,
+ calling_format=boto.s3.connection.OrdinaryCallingFormat(),
+ debug=self.debug,
+ )
+
+ def __getattr__(self, attrib):
+ return getattr(self.s3_connection, attrib)
+
+
+def connection(endpoint, debug=None):
+ log.info('creating connection to endpoint: %s' % endpoint)
+ return S3ConnectionWrapper(endpoint, debug)
--- /dev/null
+
+RESULT_SUCCESS = 0
+RESULT_ERROR = 1
+
+DEFAULT_TIME = '1970-01-01 00:00:00'
+
--- /dev/null
+
+class AgentError(Exception):
+ """
+ The actual base exception for the agent
+ """
+
+
+class ClientException(AgentError):
+ """
+ Historical base radosgw_agent client exception.
+ """
+ pass
+
+
+class NetworkError(AgentError):
+ pass
+
+
+class RegionMapError(AgentError):
+
+ def __init__(self, error):
+ self.error = error
+
+ def __str__(self):
+ msg = 'Could not retrieve region map from destination: %s'
+ return msg % self.error
+
+
+class InvalidProtocol(ClientException):
+ pass
+
+
+class InvalidHost(ClientException):
+ pass
+
+
+class InvalidZone(ClientException):
+ pass
+
+
+class ZoneNotFound(ClientException):
+ pass
+
+
+class BucketEmpty(ClientException):
+ pass
+
+
+class HttpError(ClientException):
+ def __init__(self, code, body):
+ self.code = code
+ self.str_code = str(code)
+ self.body = body
+ self.message = 'Http error code %s content %s' % (code, body)
+
+ def __str__(self):
+ return self.message
+
+
+class NotFound(HttpError):
+ pass
+
+
+class SkipShard(Exception):
+ pass
+
+
+class SyncError(Exception):
+ pass
+
+
+class SyncTimedOut(SyncError):
+ pass
+
+
+class SyncFailed(SyncError):
+ pass
--- /dev/null
+import logging
+import threading
+import time
+
+from radosgw_agent import client
+from radosgw_agent import exceptions as exc
+from radosgw_agent.util import get_dev_logger
+
+log = logging.getLogger(__name__)
+dev_log = get_dev_logger(__name__)
+
+
+class LockBroken(Exception):
+ pass
+
+
+class LockRenewFailed(LockBroken):
+ pass
+
+
+class LockExpired(LockBroken):
+ pass
+
+
+class Lock(threading.Thread):
+ """A lock on a shard log that automatically refreshes itself.
+
+ It may be used to lock different shards throughout its lifetime.
+ To lock a new shard, call aquire() with the shard_num desired.
+
+ To release the lock, call release_and_clear(). This will raise an
+ exception if the lock ever failed to be acquired in the timeout
+ period.
+ """
+
+ def __init__(self, conn, type_, locker_id, timeout, zone_id):
+ super(Lock, self).__init__()
+ self.conn = conn
+ self.type = type_
+ self.timeout = timeout
+ self.lock = threading.Lock()
+ self.locker_id = locker_id
+ self.zone_id = zone_id
+ self.shard_num = None
+ self.last_locked = None
+ self.failed = False
+
+ def set_shard(self, shard_num):
+ dev_log.debug('set_shard to %d', shard_num)
+ with self.lock:
+ assert self.shard_num is None, \
+ 'attempted to acquire new lock without releasing old one'
+ self.failed = False
+ self.last_locked = None
+ self.shard_num = shard_num
+
+ def unset_shard(self):
+ dev_log.debug('unset shard')
+ with self.lock:
+ self.shard_num = None
+
+ def acquire(self):
+ """Renew an existing lock, or acquire a new one.
+
+ The old lock must have already been released if shard_num is specified.
+ client.NotFound may be raised if the log contains no entries.
+ """
+ dev_log.debug('acquire lock')
+ with self.lock:
+ self._acquire()
+
+ def _acquire(self):
+ # same as aqcuire() but assumes self.lock is held
+ now = time.time()
+ client.lock_shard(self.conn, self.type, self.shard_num,
+ self.zone_id, self.timeout, self.locker_id)
+ self.last_locked = now
+
+ def release_and_clear(self):
+ """Release the lock currently being held.
+
+ Prevent it from being automatically renewed, and check if there
+ were any errors renewing the current lock or if it expired.
+ If the lock was not sustained, raise LockAcquireFailed or LockExpired.
+ """
+ dev_log.debug('release and clear lock')
+ with self.lock:
+ shard_num = self.shard_num
+ self.shard_num = None
+ diff = time.time() - self.last_locked
+ if diff > self.timeout:
+ msg = 'lock was not renewed in over %0.2f seconds' % diff
+ raise LockExpired(msg)
+ if self.failed:
+ raise LockRenewFailed()
+ try:
+ client.unlock_shard(self.conn, self.type, shard_num,
+ self.zone_id, self.locker_id)
+ except exc.HttpError as e:
+ log.warn('failed to unlock shard %d in zone %s: %s',
+ shard_num, self.zone_id, e)
+ self.last_locked = None
+
+ def run(self):
+ while True:
+ with self.lock:
+ if self.shard_num is not None:
+ try:
+ self._acquire()
+ except exc.HttpError as e:
+ log.error('locking shard %d in zone %s failed: %s',
+ self.shard_num, self.zone_id, e)
+ self.failed = True
+ time.sleep(0.5 * self.timeout)
--- /dev/null
+import boto
+import logging
+import sys
+from boto.connection import AWSAuthConnection
+
+log = logging.getLogger(__name__)
+
+
+def urlencode(query, doseq=0):
+ """
+ Note: ported from urllib.urlencode, but with the ability to craft the query
+ string without quoting the params again.
+
+ Encode a sequence of two-element tuples or dictionary into a URL query
+ string.
+
+ If any values in the query arg are sequences and doseq is true, each
+ sequence element is converted to a separate parameter.
+
+ If the query arg is a sequence of two-element tuples, the order of the
+ parameters in the output will match the order of parameters in the
+ input.
+ """
+
+ if hasattr(query, "items"):
+ # mapping objects
+ query = query.items()
+ else:
+ # it's a bother at times that strings and string-like objects are
+ # sequences...
+ try:
+ # non-sequence items should not work with len()
+ # non-empty strings will fail this
+ if len(query) and not isinstance(query[0], tuple):
+ raise TypeError
+ # zero-length sequences of all types will get here and succeed,
+ # but that's a minor nit - since the original implementation
+ # allowed empty dicts that type of behavior probably should be
+ # preserved for consistency
+ except TypeError:
+ ty, va, tb = sys.exc_info()
+ raise TypeError, "not a valid non-string sequence or mapping object", tb
+
+ l = []
+ if not doseq:
+ # preserve old behavior
+ for k, v in query:
+ k = str(k)
+ v = str(v)
+ l.append(k + '=' + v)
+ print l
+ else:
+ for k, v in query:
+ k = str(k)
+ if isinstance(v, str):
+ l.append(k + '=' + v)
+ elif isinstance(v, unicode):
+ # is there a reasonable way to convert to ASCII?
+ # encode generates a string, but "replace" or "ignore"
+ # lose information and "strict" can raise UnicodeError
+ v = v.encode("ASCII", "replace")
+ l.append(k + '=' + v)
+ else:
+ try:
+ # is this a sufficient test for sequence-ness?
+ len(v)
+ except TypeError:
+ # not a sequence
+ v = str(v)
+ l.append(k + '=' + v)
+ else:
+ # loop over the sequence
+ for elt in v:
+ l.append(k + '=' + str(elt))
+ return '&'.join(l)
+
+
+class MetaData(object):
+ """
+ A basic container class that other than the method, it just registers
+ all the keyword arguments passed in, so that it is easier/nicer to
+ re-use the values
+ """
+
+ def __init__(self, conn, method, **kw):
+ self.conn = conn
+ self.method = method
+ for k, v in kw.items():
+ setattr(self, k, v)
+
+
+def base_http_request(conn, method, basepath='', resource='', headers=None,
+ data=None, special_first_param=None, params=None):
+ """
+ Returns a ``AWSAuthConnection.build_base_http_request`` call with the
+ preserving of the special params done by ``build``.
+ """
+
+ # request meta data
+ md = build(
+ conn,
+ method,
+ basepath=basepath,
+ resource=resource,
+ headers=headers,
+ data=data,
+ special_first_param=special_first_param,
+ params=params,
+ )
+
+ return AWSAuthConnection.build_base_http_request(
+ md.conn, md.method, md.path,
+ md.auth_path, md.params, md.headers,
+ md.data, md.host)
+
+
+def make_request(conn, method, basepath='', resource='', headers=None,
+ data=None, special_first_param=None, params=None, _retries=3):
+ """
+ Returns a ``AWSAuthConnection.make_request`` call with the preserving
+ of the special params done by ``build``.
+ """
+ # request meta data
+ md = build(
+ conn,
+ method,
+ basepath=basepath,
+ resource=resource,
+ headers=headers,
+ data=data,
+ special_first_param=special_first_param,
+ params=params,
+ )
+
+ if params:
+ # we basically need to do this ourselves now. BOTO doesn't do it for us
+ # in make_request
+ result = []
+ for k, vs in params.items():
+ if isinstance(vs, basestring) or not hasattr(vs, '__iter__'):
+ vs = [vs]
+ for v in vs:
+ if v is not None:
+ result.append(
+ (k.encode('utf-8') if isinstance(k, str) else k,
+ v.encode('utf-8') if isinstance(v, str) else v))
+ appending_char = '&' if md.special_first_param else '?'
+ md.path = '%s%s%s' % (md.path, appending_char, urlencode(result, doseq=True))
+
+ return AWSAuthConnection.make_request(
+ md.conn, md.method, md.path,
+ headers=md.headers,
+ data=md.data,
+ host=md.host,
+ auth_path=md.auth_path,
+ params=md.params,
+ override_num_retries=_retries
+ )
+
+
+def build(conn, method, basepath='', resource='', headers=None,
+ data=None, special_first_param=None, params=None):
+ """
+ Adapted from the build_request() method of boto.connection
+ """
+
+ path = conn.calling_format.build_path_base(basepath, resource)
+ auth_path = conn.calling_format.build_auth_path(basepath, resource)
+ host = conn.calling_format.build_host(conn.server_name(), '')
+
+ if special_first_param:
+ path += '?' + special_first_param
+ boto.log.debug('path=%s' % path)
+ auth_path += '?' + special_first_param
+ boto.log.debug('auth_path=%s' % auth_path)
+
+ return MetaData(
+ conn,
+ method,
+ path=path,
+ auth_path=auth_path,
+ basepath=basepath,
+ resource=resource,
+ headers=headers,
+ data=data,
+ special_first_param=special_first_param,
+ params=params,
+ host=host,
+ )
--- /dev/null
+import logging
+import multiprocessing
+import time
+
+from radosgw_agent import worker
+from radosgw_agent import client
+from radosgw_agent.util import get_dev_logger
+from radosgw_agent.exceptions import NotFound, HttpError
+
+
+log = logging.getLogger(__name__)
+dev_log = get_dev_logger(__name__)
+
+# the replica log api only supports one entry, and updating it
+# requires sending a daemon id that matches the existing one. This
+# doesn't make a whole lot of sense with the current structure of
+# radosgw-agent, so just use a constant value for the daemon id.
+DAEMON_ID = 'radosgw-agent'
+
+def prepare_sync(syncer, error_delay):
+ """Attempt to prepare a syncer for running a sync.
+
+ :param error_delay: seconds to wait before retrying
+
+ This will retry forever so the sync agent continues if radosgws
+ are unavailable temporarily.
+ """
+ while True:
+ try:
+ syncer.prepare()
+ break
+ except Exception:
+ log.warn('error preparing for sync, will retry. Traceback:',
+ exc_info=True)
+ time.sleep(error_delay)
+
+def incremental_sync(meta_syncer, data_syncer, num_workers, lock_timeout,
+ incremental_sync_delay, metadata_only, error_delay):
+ """Run a continuous incremental sync.
+
+ This will run forever, pausing between syncs by a
+ incremental_sync_delay seconds.
+ """
+ while True:
+ try:
+ meta_syncer.sync(num_workers, lock_timeout)
+ if not metadata_only:
+ data_syncer.sync(num_workers, lock_timeout)
+ except Exception:
+ log.warn('error doing incremental sync, will try again. Traceback:',
+ exc_info=True)
+
+ # prepare data before sleeping due to rgw_log_bucket_window
+ if not metadata_only:
+ prepare_sync(data_syncer, error_delay)
+ log.info('waiting %d seconds until next sync',
+ incremental_sync_delay)
+ time.sleep(incremental_sync_delay)
+ prepare_sync(meta_syncer, error_delay)
+
+class Syncer(object):
+ def __init__(self, src, dest, max_entries, *args, **kwargs):
+ self.src = src
+ self.dest = dest
+ self.src_conn = client.connection(src)
+ self.dest_conn = client.connection(dest)
+ self.daemon_id = DAEMON_ID
+ self.worker_cls = None # filled in by subclass constructor
+ self.num_shards = None
+ self.max_entries = max_entries
+ self.object_sync_timeout = kwargs.get('object_sync_timeout')
+
+ def init_num_shards(self):
+ if self.num_shards is not None:
+ return
+ try:
+ self.num_shards = client.num_log_shards(self.src_conn, self.type)
+ log.debug('%d shards to check', self.num_shards)
+ except Exception:
+ log.error('finding number of shards failed')
+ raise
+
+ def shard_num_for_key(self, key):
+ key = key.encode('utf8')
+ hash_val = 0
+ for char in key:
+ c = ord(char)
+ hash_val = (hash_val + (c << 4) + (c >> 4)) * 11
+ return hash_val % self.num_shards
+
+ def prepare(self):
+ """Setup any state required before syncing starts.
+
+ This must be called before sync().
+ """
+ pass
+
+ def generate_work(self):
+ """Generate items to be place in a queue or processing"""
+ pass
+
+ def wait_until_ready(self):
+ pass
+
+ def complete_item(self, shard_num, retries):
+ """Called when syncing a single item completes successfully"""
+ marker = self.shard_info.get(shard_num)
+ if not marker:
+ return
+ try:
+ data = [dict(name=retry, time=worker.DEFAULT_TIME)
+ for retry in retries]
+ client.set_worker_bound(self.dest_conn,
+ self.type,
+ marker,
+ worker.DEFAULT_TIME,
+ self.daemon_id,
+ shard_num,
+ data)
+ except Exception:
+ log.warn('could not set worker bounds, may repeat some work.'
+ 'Traceback:', exc_info=True)
+
+ def sync(self, num_workers, log_lock_time):
+ workQueue = multiprocessing.Queue()
+ resultQueue = multiprocessing.Queue()
+
+ processes = [self.worker_cls(workQueue,
+ resultQueue,
+ log_lock_time,
+ self.src,
+ self.dest,
+ daemon_id=self.daemon_id,
+ max_entries=self.max_entries,
+ object_sync_timeout=self.object_sync_timeout,
+ )
+ for i in xrange(num_workers)]
+ for process in processes:
+ process.daemon = True
+ process.start()
+
+ self.wait_until_ready()
+
+ log.info('Starting sync')
+ # enqueue the shards to be synced
+ num_items = 0
+ for item in self.generate_work():
+ num_items += 1
+ workQueue.put(item)
+
+ # add a poison pill for each worker
+ for i in xrange(num_workers):
+ workQueue.put(None)
+
+ # pull the results out as they are produced
+ retries = {}
+ for i in xrange(num_items):
+ result, item = resultQueue.get()
+ shard_num, retries = item
+ if result == worker.RESULT_SUCCESS:
+ log.debug('synced item %r successfully', item)
+ self.complete_item(shard_num, retries)
+ else:
+ log.error('error syncing shard %d', shard_num)
+ retries.append(shard_num)
+
+ log.info('%d/%d items processed', i + 1, num_items)
+ if retries:
+ log.error('Encountered errors syncing these %d shards: %r',
+ len(retries), retries)
+
+
+class IncrementalSyncer(Syncer):
+
+ def get_worker_bound(self, shard_num):
+ bound = client.get_worker_bound(
+ self.dest_conn,
+ self.type,
+ shard_num)
+
+ marker = bound['marker']
+ retries = bound['retries']
+
+ dev_log.debug('oldest marker and time for shard %d are: %r %r',
+ shard_num, marker, bound['oldest_time'])
+ dev_log.debug('%d items to retry are: %r', len(retries), retries)
+
+ return marker, retries
+
+
+ def get_log_entries(self, shard_num, marker):
+ try:
+ result = client.get_log(self.src_conn, self.type,
+ marker, self.max_entries,
+ shard_num)
+ last_marker = result['marker']
+ log_entries = result['entries']
+ if len(log_entries) == self.max_entries:
+ log.warn('shard %d log has fallen behind - log length >= %d',
+ shard_num, self.max_entries)
+ except NotFound:
+ # no entries past this marker yet, but we my have retries
+ last_marker = ' '
+ log_entries = []
+ return last_marker, log_entries
+
+ def prepare(self):
+ self.init_num_shards()
+
+ self.shard_info = {}
+ self.shard_work = {}
+ for shard_num in xrange(self.num_shards):
+ marker, retries = self.get_worker_bound(shard_num)
+ last_marker, log_entries = self.get_log_entries(shard_num, marker)
+ self.shard_work[shard_num] = log_entries, retries
+ self.shard_info[shard_num] = last_marker
+
+ self.prepared_at = time.time()
+
+ def generate_work(self):
+ return self.shard_work.iteritems()
+
+
+class MetaSyncerInc(IncrementalSyncer):
+
+ def __init__(self, *args, **kwargs):
+ super(MetaSyncerInc, self).__init__(*args, **kwargs)
+ self.worker_cls = worker.MetadataWorkerIncremental
+ self.type = 'metadata'
+
+
+class DataSyncerInc(IncrementalSyncer):
+
+ def __init__(self, *args, **kwargs):
+ super(DataSyncerInc, self).__init__(*args, **kwargs)
+ self.worker_cls = worker.DataWorkerIncremental
+ self.type = 'data'
+ self.rgw_data_log_window = kwargs.get('rgw_data_log_window', 30)
+
+ def wait_until_ready(self):
+ log.info('waiting to make sure bucket log is consistent')
+ while time.time() < self.prepared_at + self.rgw_data_log_window:
+ time.sleep(1)
+
+
+class DataSyncerFull(Syncer):
+
+ def __init__(self, *args, **kwargs):
+ super(DataSyncerFull, self).__init__(*args, **kwargs)
+ self.worker_cls = worker.DataWorkerFull
+ self.type = 'data'
+ self.rgw_data_log_window = kwargs.get('rgw_data_log_window', 30)
+
+ def prepare(self):
+ log.info('preparing to do a full data sync')
+ self.init_num_shards()
+
+ # save data log markers for each shard
+ self.shard_info = {}
+ for shard in xrange(self.num_shards):
+ info = client.get_log_info(self.src_conn, 'data', shard)
+ # setting an empty marker returns an error
+ if info['marker']:
+ self.shard_info[shard] = info['marker']
+ else:
+ self.shard_info[shard] = ' '
+
+ # get list of buckets after getting any markers to avoid skipping
+ # entries added before we got the marker info
+ log.debug('getting bucket list')
+ buckets = client.get_bucket_list(self.src_conn)
+
+ self.prepared_at = time.time()
+
+ self.buckets_by_shard = {}
+ for bucket in buckets:
+ shard = self.shard_num_for_key(bucket)
+ self.buckets_by_shard.setdefault(shard, [])
+ self.buckets_by_shard[shard].append(bucket)
+
+ def generate_work(self):
+ return self.buckets_by_shard.iteritems()
+
+ def wait_until_ready(self):
+ log.info('waiting to make sure bucket log is consistent')
+ while time.time() < self.prepared_at + self.rgw_data_log_window:
+ time.sleep(1)
+
+
+class MetaSyncerFull(Syncer):
+ def __init__(self, *args, **kwargs):
+ super(MetaSyncerFull, self).__init__(*args, **kwargs)
+ self.worker_cls = worker.MetadataWorkerFull
+ self.type = 'metadata'
+
+ def prepare(self):
+ try:
+ self.sections = client.get_metadata_sections(self.src_conn)
+ except HttpError as e:
+ log.error('Error listing metadata sections: %s', e)
+ raise
+
+ # grab the lastest shard markers and timestamps before we sync
+ self.shard_info = {}
+ self.init_num_shards()
+ for shard_num in xrange(self.num_shards):
+ info = client.get_log_info(self.src_conn, 'metadata', shard_num)
+ # setting an empty marker returns an error
+ if info['marker']:
+ self.shard_info[shard_num] = info['marker']
+ else:
+ self.shard_info[shard_num] = ' '
+
+ self.metadata_by_shard = {}
+ for section in self.sections:
+ try:
+ for key in client.list_metadata_keys(self.src_conn, section):
+ shard = self.shard_num_for_key(section + ':' + key)
+ self.metadata_by_shard.setdefault(shard, [])
+ self.metadata_by_shard[shard].append((section, key))
+ except NotFound:
+ # no keys of this type exist
+ continue
+ except HttpError as e:
+ log.error('Error listing metadata for section %s: %s',
+ section, e)
+ raise
+
+ def generate_work(self):
+ return self.metadata_by_shard.iteritems()
--- /dev/null
+import logging
+import sys
+import os
+
+# set an environ variable that tells us that we are really testing
+os.environ['PYTEST'] = '1'
+
+# this console logging configuration is basically just to be able to see output
+# in tests, and this file gets executed by py.test when it runs, so we get that
+# for free.
+
+# Console Logger
+sh = logging.StreamHandler()
+sh.setLevel(logging.WARNING)
+
+formatter = logging.Formatter(
+ fmt='%(asctime)s.%(msecs)03d %(process)d:%(levelname)s:%(name)s:%(message)s',
+ datefmt='%Y-%m-%dT%H:%M:%S',
+ )
+sh.setFormatter(formatter)
+
+
+# because we're in a module already, __name__ is not the ancestor of
+# the rest of the package; use the root as the logger for everyone
+root_logger = logging.getLogger()
+
+# allow all levels at root_logger, handlers control individual levels
+root_logger.setLevel(logging.DEBUG)
+root_logger.addHandler(sh)
+
+console_loglevel = logging.DEBUG # start at DEBUG for now
+
+# Console Logger
+sh.setLevel(console_loglevel)
+
+
+
+
--- /dev/null
+import boto
+import py.test
+from mock import Mock
+import httpretty
+import re
+
+from radosgw_agent import client
+from radosgw_agent import exceptions as exc
+from radosgw_agent.constants import DEFAULT_TIME
+
+# parametrization helpers
+
+def endpoints():
+ return [
+ ('http://example.org', 'example.org', 80, False),
+ ('https://example.org', 'example.org', 443, True),
+ ('https://[e40:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922]', '[e40:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922]', 443, True),
+ ('http://[e40:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922]', '[e40:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922]', 80, False),
+ ('http://[e39:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922]:8080', '[e39:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922]', 8080, False),
+ ('http://e40:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922', '[e40:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922]', 80, False),
+ ('https://example.org:8080', 'example.org', 8080, True),
+ ('https://example.org:8080/', 'example.org', 8080, True),
+ ('http://example.org:81/a/b/c?b#d', 'example.org', 81, False),
+ ]
+
+
+REGION_MAP = {
+ "regions": [
+ {
+ "val": {
+ "zones": [
+ {
+ "endpoints": [
+ "http://vit:8001/"
+ ],
+ "log_data": "true",
+ "log_meta": "true",
+ "name": "skinny-1"
+ },
+ {
+ "endpoints": [
+ "http://vit:8002/"
+ ],
+ "log_data": "false",
+ "log_meta": "false",
+ "name": "skinny-2"
+ }
+ ],
+ "name": "skinny",
+ "default_placement": "",
+ "master_zone": "skinny-1",
+ "api_name": "slim",
+ "placement_targets": [],
+ "is_master": "true",
+ "endpoints": [
+ "http://skinny:80/"
+ ]
+ },
+ "key": "skinny"
+ },
+ {
+ "val": {
+ "zones": [
+ {
+ "endpoints": [
+ "http://vit:8003/"
+ ],
+ "log_data": "false",
+ "log_meta": "false",
+ "name": "swab-2"
+ },
+ {
+ "endpoints": [
+ "http://vit:8004/"
+ ],
+ "log_data": "false",
+ "log_meta": "false",
+ "name": "swab-3"
+ },
+ {
+ "endpoints": [
+ "http://vit:8000/"
+ ],
+ "log_data": "true",
+ "log_meta": "true",
+ "name": "swab-1"
+ }
+ ],
+ "name": "swab",
+ "default_placement": "",
+ "master_zone": "swab-1",
+ "api_name": "shady",
+ "placement_targets": [],
+ "is_master": "false",
+ "endpoints": [
+ "http://vit:8000/"
+ ]
+ },
+ "key": "swab"
+ },
+ {
+ "val": {
+ "zones": [
+ {
+ "endpoints": [
+ "http://ro:80/"
+ ],
+ "log_data": "false",
+ "log_meta": "false",
+ "name": "ro-1"
+ },
+ {
+ "endpoints": [
+ "http://ro:8080/"
+ ],
+ "log_data": "false",
+ "log_meta": "false",
+ "name": "ro-2"
+ },
+ ],
+ "name": "readonly",
+ "default_placement": "",
+ "master_zone": "ro-1",
+ "api_name": "readonly",
+ "placement_targets": [],
+ "is_master": "false",
+ "endpoints": [
+ "http://ro:80/",
+ "http://ro:8080/"
+ ]
+ },
+ "key": "readonly"
+ },
+ {
+ "val": {
+ "zones": [
+ {
+ "endpoints": [
+ "http://meta:80/"
+ ],
+ "log_data": "false",
+ "log_meta": "true",
+ "name": "meta-1"
+ },
+ {
+ "endpoints": [
+ "http://meta:8080/"
+ ],
+ "log_data": "false",
+ "log_meta": "false",
+ "name": "meta-2"
+ },
+ ],
+ "name": "metaonly",
+ "default_placement": "",
+ "master_zone": "meta-1",
+ "api_name": "metaonly",
+ "placement_targets": [],
+ "is_master": "false",
+ "endpoints": [
+ "http://meta:80/",
+ "http://meta:8080/"
+ ]
+ },
+ "key": "metaonly"
+ }
+ ],
+ "master_region": "skinny"
+ }
+
+def test_endpoint_default_port():
+ endpoint = client.Endpoint('example.org', None, True)
+ assert endpoint.port == 443
+ endpoint = client.Endpoint('example.org', None, False)
+ assert endpoint.port == 80
+
+def test_endpoint_port_specified():
+ endpoint = client.Endpoint('example.org', 80, True)
+ assert endpoint.port == 80
+ endpoint = client.Endpoint('example.org', 443, True)
+ assert endpoint.port == 443
+
+
+def test_endpoint_equality():
+ default_port = client.Endpoint('a.org', None, True)
+ secure = client.Endpoint('a.org', 443, True)
+ insecure = client.Endpoint('a.org', 80, False)
+ assert default_port == secure
+ assert secure == insecure
+ assert insecure == default_port
+
+
+def test_endpoint_inequality():
+ base = client.Endpoint('a.org', 80, True)
+ diff_host = client.Endpoint('b.org', 80, True)
+ diff_port = client.Endpoint('a.org', 81, True)
+ insecure = client.Endpoint('a.org', 8080, False)
+ assert base != diff_host
+ assert base != diff_port
+ assert base != insecure
+
+
+@py.test.mark.parametrize('url, host, port, secure', endpoints())
+def test_parse_endpoint(url, host, port, secure):
+ endpoint = client.parse_endpoint(url)
+ assert endpoint.port == port
+ assert endpoint.host == host
+ assert endpoint.secure == secure
+
+
+@py.test.mark.parametrize('url, host, port, secure', endpoints())
+def test_parse_repr(url, host, port, secure):
+ endpoint = repr(client.parse_endpoint(url))
+ assert str(secure) in endpoint
+ assert str(host) in endpoint
+ assert str(port) in endpoint
+
+
+def test_parse_endpoint_bad_input():
+ with py.test.raises(exc.InvalidProtocol):
+ client.parse_endpoint('ftp://example.com')
+ with py.test.raises(exc.InvalidHost):
+ client.parse_endpoint('http://:80/')
+
+def _test_configure_endpoints(dest_url, dest_region, dest_zone,
+ expected_src_url, expected_src_region,
+ expected_src_zone, specified_src_url=None,
+ meta_only=False):
+ dest = client.parse_endpoint(dest_url)
+ if specified_src_url is not None:
+ src = client.parse_endpoint(specified_src_url)
+ else:
+ src = client.Endpoint(None, None, None)
+ region_map = client.RegionMap(REGION_MAP)
+ client.configure_endpoints(region_map, dest, src, meta_only)
+ assert dest.region.name == dest_region
+ assert dest.zone.name == dest_zone
+ assert src == client.parse_endpoint(expected_src_url)
+ assert src.region.name == expected_src_region
+ assert src.zone.name == expected_src_zone
+
+def test_configure_endpoints_2nd_region_master_zone_meta():
+ _test_configure_endpoints('http://vit:8000', 'swab', 'swab-1',
+ 'http://vit:8001', 'skinny', 'skinny-1',
+ meta_only=True)
+
+def test_configure_endpoints_2nd_region_master_zone_data():
+ with py.test.raises(exc.InvalidZone):
+ _test_configure_endpoints('http://vit:8000', 'swab', 'swab-1',
+ 'http://vit:8001', 'skinny', 'skinny-1',
+ meta_only=False)
+
+def test_configure_endpoints_master_region_2nd_zone():
+ _test_configure_endpoints('http://vit:8002', 'skinny', 'skinny-2',
+ 'http://vit:8001', 'skinny', 'skinny-1')
+
+def test_configure_endpoints_2nd_region_2nd_zone():
+ _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+ 'http://vit:8000', 'swab', 'swab-1')
+
+def test_configure_endpoints_2nd_region_readonly_meta():
+ _test_configure_endpoints('http://ro:8080', 'readonly', 'ro-2',
+ 'http://vit:8001', 'skinny', 'skinny-1',
+ meta_only=True)
+
+def test_configure_endpoints_2nd_region_readonly_data():
+ with py.test.raises(exc.InvalidZone):
+ _test_configure_endpoints('http://ro:8080', 'readonly', 'ro-2',
+ 'http://vit:8001', 'skinny', 'skinny-1',
+ meta_only=False)
+
+def test_configure_endpoints_2nd_region_metaonly_meta():
+ _test_configure_endpoints('http://meta:8080', 'metaonly', 'meta-2',
+ 'http://meta:80', 'metaonly', 'meta-1',
+ meta_only=True)
+
+def test_configure_endpoints_2nd_region_metaonly_data():
+ with py.test.raises(exc.InvalidZone):
+ _test_configure_endpoints('http://meta:8080', 'metaonly', 'meta-2',
+ 'http://vit:8001', 'skinny', 'skinny-1',
+ meta_only=False)
+
+def test_configure_endpoints_master_region_master_zone():
+ with py.test.raises(exc.InvalidZone):
+ _test_configure_endpoints('http://vit:8001', 'skinny', 'skinny-1',
+ 'http://vit:8001', 'skinny', 'skinny-1')
+
+def test_configure_endpoints_specified_src_same_region():
+ _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+ 'http://vit:8000', 'swab', 'swab-1',
+ 'http://vit:8000')
+
+def test_configure_endpoints_specified_src_master_region_meta():
+ _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+ 'http://vit:8001', 'skinny', 'skinny-1',
+ 'http://vit:8001', meta_only=True)
+
+def test_configure_endpoints_specified_src_master_region_data():
+ with py.test.raises(exc.InvalidZone):
+ _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+ 'http://vit:8001', 'skinny', 'skinny-1',
+ 'http://vit:8001', meta_only=False)
+
+def test_configure_endpoints_bad_src_same_region():
+ with py.test.raises(exc.InvalidZone):
+ _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+ 'http://vit:8004', 'swab', 'swab-3',
+ 'http://vit:8004')
+
+def test_configure_endpoints_bad_src_master_region():
+ with py.test.raises(exc.InvalidZone):
+ _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+ 'http://vit:8002', 'skinny', 'skinny-2',
+ 'http://vit:8002')
+
+def test_configure_endpoints_bad_src_same_zone():
+ with py.test.raises(exc.InvalidZone):
+ _test_configure_endpoints('http://vit:8000', 'swab', 'swab-1',
+ 'http://vit:8000', 'swab', 'swab-1',
+ 'http://vit:8000')
+
+def test_configure_endpoints_specified_nonexistent_src():
+ with py.test.raises(exc.ZoneNotFound):
+ _test_configure_endpoints('http://vit:8005', 'skinny', 'skinny-1',
+ 'http://vit:8001', 'skinny', 'skinny-1',
+ 'http://vit:80')
+
+def test_configure_endpoints_unknown_zone():
+ with py.test.raises(exc.ZoneNotFound):
+ _test_configure_endpoints('http://vit:8005', 'skinny', 'skinny-1',
+ 'http://vit:8001', 'skinny', 'skinny-1')
+
+def http_invalid_status_codes():
+ return [
+ 101, 102, 300, 301, 302, 303, 304, 305, 306, 307, 308,
+ ]
+
+def http_valid_status_codes():
+ return [
+ 200, 201, 202, 203, 204, 205, 207, 208, 226,
+ ]
+
+class TestCheckResultStatus(object):
+
+ @py.test.mark.parametrize('code', http_invalid_status_codes())
+ def test_check_raises_http_error(self, code):
+ response = Mock()
+ response.status = code
+ with py.test.raises(exc.HttpError):
+ client.check_result_status(response)
+
+ @py.test.mark.parametrize('code', http_valid_status_codes())
+ def test_check_does_not_raise_http_error(self, code):
+ response = Mock()
+ response.status = code
+ assert client.check_result_status(response) is None
+
+
+ def test_check_raises_not_found(self):
+ response = Mock()
+ response.status = 404
+ with py.test.raises(exc.NotFound):
+ client.check_result_status(response)
+
+
+class TestBotoCall(object):
+
+ def test_return_val(self):
+ @client.boto_call
+ def foo(*args, **kwargs):
+ return (args, kwargs)
+ assert foo(1) == ((1,), {})
+ assert foo(b=2) == (tuple(), {'b': 2})
+
+ def test_boto_exception_not_found(self):
+ @client.boto_call
+ def foo():
+ raise boto.exception.S3ResponseError(404, '')
+
+ with py.test.raises(exc.NotFound):
+ foo()
+
+ def test_non_boto_exception(self):
+ @client.boto_call
+ def foo():
+ raise ValueError('')
+
+ with py.test.raises(ValueError):
+ foo()
+
+
+class TestRequest(object):
+
+ @httpretty.activate
+ def test_url(self):
+
+ httpretty.register_uri(
+ httpretty.GET,
+ re.compile("http://localhost:8888/(.*)"),
+ body='{}',
+ content_type="application/json",
+ )
+ connection = client.connection(
+ client.Endpoint('localhost', 8888, False, 'key', 'secret'),
+ True,
+ )
+
+ client.request(connection, 'get', '/%7E~', _retries=0)
+ server_request = httpretty.last_request()
+ assert server_request.path == '/%257E%7E'
+
+ @httpretty.activate
+ def test_url_response(self):
+
+ httpretty.register_uri(
+ httpretty.GET,
+ re.compile("http://localhost:8888/(.*)"),
+ body='{"msg": "ok"}',
+ content_type="application/json",
+ )
+ connection = client.connection(
+ client.Endpoint('localhost', 8888, False, 'key', 'secret'),
+ True,
+ )
+
+ result = client.request(connection, 'get', '/%7E~', _retries=0)
+ assert result == {'msg': 'ok'}
+
+ @httpretty.activate
+ def test_url_bad(self):
+
+ httpretty.register_uri(
+ httpretty.GET,
+ re.compile("http://localhost:8888/(.*)"),
+ body='{}',
+ content_type="application/json",
+ status=500,
+ )
+ connection = client.connection(
+ client.Endpoint('localhost', 8888, False, 'key', 'secret'),
+ True,
+ )
+
+ with py.test.raises(exc.HttpError):
+ client.request(connection, 'get', '/%7E~', _retries=0)
+
+
+class TestBotoCall(object):
+
+ def test_return_val(self):
+ @client.boto_call
+ def foo(*args, **kwargs):
+ return (args, kwargs)
+ assert foo(1) == ((1,), {})
+ assert foo(b=2) == (tuple(), {'b': 2})
+
+ def test_boto_exception_not_found(self):
+ @client.boto_call
+ def foo():
+ raise boto.exception.S3ResponseError(404, '')
+
+ with py.test.raises(exc.NotFound):
+ foo()
+
+ def test_non_boto_exception(self):
+ @client.boto_call
+ def foo():
+ raise ValueError('')
+
+ with py.test.raises(ValueError):
+ foo()
+
+
+class TestGETClientRequestsPaths(object):
+
+ def setup(self):
+ self.connection = client.connection(
+ client.Endpoint('localhost', 8888, False, 'key', 'secret'),
+ True,
+ )
+
+ def register(self, body=None):
+ body = body or '{}'
+ httpretty.register_uri(
+ httpretty.GET,
+ re.compile("http://localhost:8888/(.*)"),
+ body=body,
+ content_type="application/json",
+ )
+
+ @httpretty.activate
+ def test_get_metadata(self):
+ self.register()
+ client.get_metadata(self.connection, 'bucket.instance', 'foo')
+ server_request = httpretty.last_request()
+ assert server_request.path == '/admin/metadata/bucket.instance?key=foo'
+
+ @httpretty.activate
+ def test_get_metadata_no_re_encoding(self):
+ self.register()
+ client.get_metadata(self.connection, 'bucket.instance', 'mybar:r0z0.4140.1')
+ server_request = httpretty.last_request()
+ assert server_request.path == '/admin/metadata/bucket.instance?key=mybar%3Ar0z0.4140.1'
+
+ @httpretty.activate
+ def test_get_metadata_sections(self):
+ self.register()
+ client.get_metadata_sections(self.connection)
+ server_request = httpretty.last_request()
+ assert server_request.path == '/admin/metadata'
+
+ @httpretty.activate
+ def test_list_metadata_keys(self):
+ self.register()
+ client.list_metadata_keys(self.connection, 'foo')
+ server_request = httpretty.last_request()
+ assert server_request.path == '/admin/metadata/foo'
+
+ @httpretty.activate
+ def test_get_bucket_list(self):
+ self.register()
+ client.get_bucket_list(self.connection)
+ server_request = httpretty.last_request()
+ assert server_request.path == '/admin/metadata/bucket'
+
+ @httpretty.activate
+ def test_url_response(self):
+
+ httpretty.register_uri(
+ httpretty.GET,
+ re.compile("http://localhost:8888/(.*)"),
+ body='{"msg": "ok"}',
+ content_type="application/json",
+ )
+ result = client.request(self.connection, 'get', '/%7E~')
+ assert result == {'msg': 'ok'}
+
+
+class TestClientListObjectsInBucket(object):
+
+ def setup(self):
+ self.connection = client.connection(
+ client.Endpoint('localhost', 8888, False, 'key', 'secret'),
+ True,
+ )
+ self.body = """
+ [
+ {
+ "name": "mahobject/",
+ "etag": "d41d8cd98f00b204e9800998ecf8427e",
+ "content_type": "application/octet-stream",
+ "last_modified": "2015-01-15T15:24:42.000Z",
+ "storage_class": "STANDARD",
+ "owner": {
+ "display_name": "client1-system-user",
+ "id": "client1-system-user"
+ }
+ }
+ ]
+ """
+
+ def register(self, body=None):
+ body = body or self.body
+ httpretty.register_uri(
+ httpretty.GET,
+ re.compile("http://localhost:8888/(.*)"),
+ body=body,
+ content_type="application/json",
+ )
+
+ @httpretty.activate
+ def test_get_bucket_is_a_single_item(self):
+ self.register()
+ result = client.get_bucket_list(self.connection)
+ assert len(result) == 1
+
+ @httpretty.activate
+ def test_get_bucket_has_right_metadata(self):
+ self.register()
+ result = client.get_bucket_list(self.connection)
+ obj = result[0]
+ owner = {
+ "display_name": "client1-system-user",
+ "id": "client1-system-user"
+ }
+ assert obj['name'] == 'mahobject/'
+ assert obj['etag'] == 'd41d8cd98f00b204e9800998ecf8427e'
+ assert obj['content_type'] == 'application/octet-stream'
+ assert obj['last_modified'] == '2015-01-15T15:24:42.000Z'
+ assert obj['storage_class'] == 'STANDARD'
+ assert obj['owner'] == owner
+
+
+class TestClientGetWorkerBound(object):
+
+ def setup(self):
+ self.connection = client.connection(
+ client.Endpoint('localhost', 8888, False, 'key', 'secret'),
+ True,
+ )
+ self.body = """
+ {"marker": "00000000002.2.3",
+ "markers": [
+ {
+ "entity": "radosgw-agent",
+ "items_in_progress": [
+ {
+ "name": "hello",
+ "timestamp": "0.000000"
+ }
+ ],
+ "position_marker": "00000000002.2.3",
+ "position_time": "0.000000"
+ }
+ ],
+ "oldest_time": "0.000000"
+ }
+ """
+
+ def register(self, body=None, status=200):
+ body = body or self.body
+ httpretty.register_uri(
+ httpretty.GET,
+ re.compile("http://localhost:8888/(.*)"),
+ body=body,
+ content_type="application/json",
+ status=status
+ )
+
+ @httpretty.activate
+ def test_get_bound_has_right_metadata(self):
+ self.register()
+ result = client.get_worker_bound(
+ self.connection,
+ 'bucket-index',
+ 'beast:us-east'
+ )
+ assert result['marker'] == "00000000002.2.3"
+ assert result['retries'] == set(['hello'])
+ assert result['oldest_time'] == "0.000000"
+
+ @httpretty.activate
+ def test_get_bound_fails_fallsback_to_defaults(self):
+ self.register(status=404)
+ result = client.get_worker_bound(
+ self.connection,
+ 'bucket-index',
+ 'beast:us-east'
+ )
+ assert result['marker'] == " "
+ assert result['retries'] == []
+ assert result['oldest_time'] == DEFAULT_TIME
+
+
+class TestIsVersioned(object):
+
+ def setup(self):
+ # set strict attributes in the mock
+ self.obj = Mock(spec=object)
+
+ def test_is_in_fact_versioned(self):
+ self.obj.VersionedEpoch = u'1'
+ self.obj.version_id = 'somehashvalue'
+ assert client.is_versioned(self.obj) is True
+
+ def test_is_not_versioned_no_attr_versioned_epoch(self):
+ assert client.is_versioned(self.obj) is False
+
+ def test_is_not_versioned_no_attr_version_id(self):
+ assert client.is_versioned(self.obj) is False
+
+ def test_is_versioned_version_id(self):
+ self.obj.version_id = 1
+ assert client.is_versioned(self.obj) is True
+
+ def test_is_not_versioned_versioned_id_is_none(self):
+ self.obj.version_id = None
+ assert client.is_versioned(self.obj) is False
--- /dev/null
+from mock import Mock, patch
+import json
+import py.test
+import time
+import httpretty
+import re
+
+from radosgw_agent import worker, client
+from radosgw_agent.exceptions import HttpError, NotFound, BucketEmpty, SyncTimedOut
+
+
+class TestSyncObject(object):
+
+ def setup(self):
+ # setup the fake client
+ self.client = Mock()
+
+ self.src = Mock()
+ self.src.zone.name = 'Zone Name'
+ self.src.host = 'example.com'
+ self.obj = Mock()
+ self.obj.name = 'mah-object'
+
+ def test_syncs_correctly(self):
+ with patch('radosgw_agent.worker.client'):
+ w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+ assert w.sync_object('mah-bucket', self.obj) is True
+
+ def test_syncs_not_found_on_master_deleting_from_secondary(self):
+ self.client.sync_object_intra_region = Mock(side_effect=NotFound(404, ''))
+
+ with patch('radosgw_agent.worker.client', self.client):
+ w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+ w.wait_for_object = lambda *a: None
+ assert w.sync_object('mah-bucket', self.obj) is True
+
+ def test_syncs_deletes_from_secondary(self):
+ self.client.sync_object_intra_region = Mock(side_effect=NotFound(404, ''))
+ self.client.delete_object = Mock(side_effect=NotFound(404, ''))
+
+ with patch('radosgw_agent.worker.client', self.client):
+ w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+ w.wait_for_object = lambda *a: None
+ assert w.sync_object('mah-bucket', self.obj) is False
+
+ def test_syncs_could_not_delete_from_secondary(self):
+ self.client.sync_object_intra_region = Mock(side_effect=NotFound(404, ''))
+ self.client.delete_object = Mock(side_effect=ValueError('unexpected error'))
+
+ with patch('radosgw_agent.worker.client', self.client):
+ w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+ w.wait_for_object = lambda *a: None
+
+ with py.test.raises(worker.SyncFailed):
+ w.sync_object('mah-bucket', self.obj)
+
+ def test_syncs_encounters_a_http_error(self):
+ self.client.sync_object_intra_region = Mock(side_effect=HttpError(400, ''))
+
+ with patch('radosgw_agent.worker.client', self.client):
+ w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+ w.wait_for_object = lambda *a: None
+ w.sync_object('mah-bucket', self.obj)
+
+ def test_sync_client_raises_sync_failed(self):
+ self.client.sync_object_intra_region = Mock(side_effect=worker.SyncFailed('failed intra region'))
+
+ with patch('radosgw_agent.worker.client', self.client):
+ w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+
+ with py.test.raises(worker.SyncFailed) as exc:
+ w.sync_object('mah-bucket', self.obj)
+
+ exc_message = exc.value[0]
+ assert 'failed intra region' in exc_message
+
+ def test_fails_to_remove_op_state(self, capsys):
+ # really tricky to test this one, we are forced to just use `capsys` from py.test
+ # which will allow us to check into the stderr logging output and see if the agent
+ # was spitting what we are expecting.
+ self.client.remove_op_state = Mock(side_effect=ValueError('could not remove op'))
+
+ with patch('radosgw_agent.worker.client', self.client):
+ w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+ w.wait_for_object = lambda *a: None
+ assert w.sync_object('mah-bucket', self.obj) is True
+ # logging does not play nice and so we are forced to comment this out.
+ # this test does test the right thing, but we are unable to have a nice
+ # assertion, the fix here is not the test it is the code that needs to
+ # improve. For now, this just
+ # gives us the coverage.
+ # out, err = capsys.readouterr()
+ # assert 'could not remove op state' in out
+ # assert 'could not remove op state' in err
+
+ def test_fails_to_do_anything_fallsback_to_wait_for_object(self):
+ self.client.sync_object_intra_region = Mock(side_effect=ValueError('severe error'))
+
+ with patch('radosgw_agent.worker.client', self.client):
+ w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+ w.wait_for_object = lambda *a: None
+ assert w.sync_object('mah-bucket', self.obj) is True
+
+ def test_wait_for_object_state_not_found_raises_sync_failed(self):
+ self.client.get_op_state = Mock(side_effect=NotFound(404, ''))
+ with patch('radosgw_agent.worker.client', self.client):
+ w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+ with py.test.raises(worker.SyncFailed) as exc:
+ w.wait_for_object(None, None, time.time() + 1000, None)
+
+ exc_message = exc.exconly()
+ assert 'state not found' in exc_message
+
+ def test_wait_for_object_state_is_empty_sync_timesout(self):
+ self.client.get_op_state = lambda *a: []
+ with patch('radosgw_agent.worker.client', self.client):
+ w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+ with py.test.raises(SyncTimedOut) as exc:
+ w.wait_for_object(None, None, time.time() + 1, None)
+
+ def test_wait_for_object_timeout(self):
+ msg = 'should not have called get_op_state'
+ self.client.get_op_state = Mock(side_effect=AssertionError(msg))
+ with patch('radosgw_agent.worker.client', self.client):
+ w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+ with py.test.raises(worker.SyncTimedOut) as exc:
+ w.wait_for_object(None, None, time.time() - 1, None)
+
+ def test_wait_for_object_state_complete(self):
+ with patch('radosgw_agent.worker.client', self.client):
+ w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+ self.client.get_op_state = lambda *a: [{'state': 'complete'}]
+ assert w.wait_for_object(None, None, time.time() + 1, None) is None
+
+ def test_wait_for_object_state_error(self):
+ with patch('radosgw_agent.worker.client', self.client):
+ w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+ self.client.get_op_state = lambda *a: [{'state': 'error'}]
+ with py.test.raises(worker.SyncFailed) as exc:
+ w.wait_for_object(None, None, time.time() + 1, None)
+
+ exc_message = exc.exconly()
+ assert 'state is error' in exc_message
+
+ def test_sync_bucket_delayed_not_found(self):
+ class fake_iterable(object):
+ def __iter__(self):
+ raise BucketEmpty
+ with patch('radosgw_agent.worker.client', self.client):
+ w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+ w.sync_object = lambda *a: None
+ objects = fake_iterable()
+ with py.test.raises(BucketEmpty):
+ w.sync_bucket('foo', objects)
+
+
+
+def create_fake_endpoint(name='source', **kw):
+ ep = Mock()
+ ep.zone.name = name
+ ep.secret_key = kw.get('secret', 'secret')
+ ep.access_key = kw.get('access', 'access')
+ ep.port = kw.get('port', 7777)
+ ep.host = kw.get('host', 'localhost')
+ ep.debug = kw.get('debug', True)
+ return ep
+
+def create_log_entry(name='foo', **kw):
+ return {
+ "op_id": kw.get('op_id',"00000000006.3741.3"),
+ "op_tag": kw.get('op_tag', "default.21007.10"),
+ "op": kw.get('op', "link_olh"),
+ "object": name,
+ "instance": kw.get('instance',"uWueo6N+Hm6Sp86OdxfnfDHfUKRy\/gOu"),
+ "state": kw.get('state', "complete"),
+ "index_ver": kw.get('index_ver', 6),
+ "timestamp": kw.get('timestamp', "2015-01-07 00:21:41.000000Z"),
+ "ver": kw.get('ver', { "pool": 12, "epoch": 20818}),
+ "versioned": kw.get('versioned', True),
+ }
+
+
+class TestDataWorkerIncremental(object):
+
+ def setup(self):
+ self.w = worker.DataWorkerIncremental(
+ None, None, None, create_fake_endpoint(),
+ create_fake_endpoint('dest'), daemon_id=1, max_entries=10
+ )
+
+ def register(self, src_body=None, dest_body=None, status=200):
+
+ httpretty.register_uri(
+ httpretty.GET,
+ re.compile("http://localhost:7777/admin/log(.*)"),
+ body=src_body or "{}",
+ content_type="application/json",
+ status=status
+ )
+ httpretty.register_uri(
+ httpretty.GET,
+ re.compile("http://localhost:8888/admin/log(.*)"),
+ body=dest_body or "{}",
+ content_type="application/json",
+ status=status
+ )
+
+ @httpretty.activate
+ def test_items_from_source_only(self):
+ src_body = json.dumps([create_log_entry('foo_1')])
+ self.register(src_body=src_body)
+ marker, entries = self.w.get_bucket_instance_entries(2, 'bucket')
+ assert marker == '00000000006.3741.3'
+ assert len(entries) == 1
--- /dev/null
+import pytest
+from radosgw_agent.util import configuration
+
+
+@pytest.fixture
+def conf():
+ return configuration.Configuration()
+
+
+class TestConfiguration(object):
+
+ def test_set_new_keys(self, conf):
+ conf['key'] = 1
+ assert conf['key'] == 1
+
+ def test_not_allowed_to_change_value(self, conf):
+ conf['key'] = 1
+ with pytest.raises(TypeError):
+ conf['key'] = 2
+
+ def test_not_allowed_to_pop_existing_key(self, conf):
+ conf['key'] = 1
+ with pytest.raises(TypeError):
+ conf.pop('key')
+
+ def test_keyerror_when_popping(self, conf):
+ with pytest.raises(KeyError):
+ conf.pop('key')
+
+ def test_adding_nested_values(self, conf):
+ conf['key'] = {}
+ conf['key']['bar'] = 1
+ assert conf['key']['bar'] == 1
+
+ def test_modifiying_nested_values_fails(self, conf):
+ conf['key'] = {}
+ conf['key']['bar'] = 1
+ with pytest.raises(TypeError):
+ conf['key']['bar'] = 2
+
+ def test_initial_dict_seeding(self):
+ my_dict = {'a': 1}
+ conf = configuration.Configuration(my_dict)
+ assert conf['a'] == 1
+
+ def test_initial_dict_seeding_doesnt_allow_updates(self):
+ my_dict = {'a': 1}
+ conf = configuration.Configuration(my_dict)
+ with pytest.raises(TypeError):
+ conf['a'] = 2
+
+ def test_assign_a_new_key_to_a_dict(self, conf):
+ my_dict = {'a': 1}
+ conf['args'] = my_dict
+ assert conf['args']['a'] == 1
+
+ def test_contains_element(self, conf):
+ exists = False
+ try:
+ if 'key' in conf:
+ exists = True
+ except KeyError:
+ assert False, "dict object should support 'contains' operations"
+ assert exists is False
--- /dev/null
+import pytest
+from radosgw_agent.util import network
+import random
+
+
+def valid_ipv6_addr(ports=False, brackets=False, addresses=20):
+ max_rand_int = 16**4
+
+ def generate(brackets, ports):
+ address = ":".join(
+ ("%x" % random.randint(0, max_rand_int) for i in range(8))
+ )
+ if brackets:
+ address = '[%s]' % address
+ if ports:
+ address = '%s:8080' % address
+ return address
+ return [generate(brackets, ports) for i in range(addresses)]
+
+
+def invalid_ipv6_addr():
+ return [
+ '',
+ 1,
+ 'some address',
+ '192.1.1.1',
+ '::!',
+ ]
+
+
+class TestIsIPV6(object):
+
+ @pytest.mark.parametrize('address', valid_ipv6_addr())
+ def test_passes_valid_addresses(self, address):
+ assert network.is_ipv6(address) is True
+
+ @pytest.mark.parametrize('address', valid_ipv6_addr(brackets=True))
+ def test_passes_valid_addresses_with_brackets(self, address):
+ assert network.is_ipv6(address) is True
+
+ @pytest.mark.parametrize('address', invalid_ipv6_addr())
+ def test_catches_invalid_addresses(self, address):
+ assert network.is_ipv6(address) is False
+
+ @pytest.mark.parametrize('address', valid_ipv6_addr(ports=True, brackets=True))
+ def test_passes_valid_addresses_with_brackets_and_ports(self, address):
+ assert network.is_ipv6(address) is True
+
--- /dev/null
+from radosgw_agent.util import obj
+
+
+class Empty(object):
+
+ def __init__(self, **kw):
+ for k, v in kw.items():
+ setattr(self, k, v)
+
+
+class TestToDict(object):
+
+ def test_underscores_are_ignored(self):
+ fake = Empty(a=1, _b=2)
+ result = obj.to_dict(fake)
+ assert result.get('_b') is None
+ assert result.get('a') == 1
+
+ def test_overrides_are_respected(self):
+ fake = Empty(a=1, b=2)
+ result = obj.to_dict(fake, b=3)
+ assert result.get('b') == 3
+
+ def test_overrides_dont_mess_up_other_keys(self):
+ fake = Empty(a=1, b=2)
+ result = obj.to_dict(fake, b=3)
+ assert result.get('a') == 1
+
+ def test_extra_keys_are_set(self):
+ result = obj.to_dict(Empty(), a=1, b=2)
+ assert result['a'] == 1
+ assert result['b'] == 2
+
+
+class TestKeysToAttribute(object):
+
+ def test_replace_dashes(self):
+ dictionary = {'dashed-word': 1}
+ result = obj.to_obj(dictionary)
+ assert result.dashed_word == 1
--- /dev/null
+import log
+from log import get_dev_logger
--- /dev/null
+
+
+class Configuration(object):
+ """
+ An immutable dictionary where values set for the first time are allowed and
+ existing keys raise a TypeError exception
+
+ Even though there is effort made into making it immutable, a consumer can
+ force its way through by accessing the private `dict` method that contains
+ the values, although it defeats the purpose it is exposed in that way in
+ the event that something needs to change.
+
+ All normal methods and operations should be supported.
+ """
+
+ def __init__(self, seed=None):
+ if seed and isinstance(seed, dict):
+ self._dict = seed
+ else:
+ self._dict = {}
+
+ def __str__(self):
+ return str(self._dict)
+
+ def pop(self, key, default=None):
+ try:
+ self._dict[key]
+ except KeyError:
+ raise
+ else:
+ self._default_error()
+
+ def popitem(self):
+ self._default_error()
+
+ def update(self):
+ self._default_error()
+
+ def clear(self):
+ self._default_error()
+
+ def values(self):
+ return self._dict.values()
+
+ def keys(self):
+ return self._dict.keys()
+
+ def items(self):
+ return self._dict.items()
+
+ def get(self, key, default=None):
+ return self._dict.get(key, default)
+
+ def _default_error(self):
+ msg = 'config object does not allow key changes'
+ raise TypeError(msg)
+
+ def __setitem__(self, key, value):
+ try:
+ self._dict[key]
+ except KeyError:
+ if isinstance(value, dict):
+ self._dict[key] = Configuration(value)
+ else:
+ self._dict[key] = value
+ else:
+ self._default_error()
+
+ def __getitem__(self, key):
+ return self._dict[key]
+
+ def __contains__(self, key):
+ try:
+ self._dict[key]
+ return True
+ except KeyError:
+ return False
--- /dev/null
+import logging
+import sys
+import traceback
+from functools import wraps
+
+
+def catches(catch=None, handler=None, exit=True, handle_all=False):
+ """
+ Very simple decorator that tries any of the exception(s) passed in as
+ a single exception class or tuple (containing multiple ones) returning the
+ exception message and optionally handling the problem if it raises with the
+ handler if it is provided.
+
+ So instead of doing something like this::
+
+ def bar():
+ try:
+ some_call()
+ print "Success!"
+ except TypeError, exc:
+ print "Error while handling some call: %s" % exc
+ sys.exit(1)
+
+ You would need to decorate it like this to have the same effect::
+
+ @catches(TypeError)
+ def bar():
+ some_call()
+ print "Success!"
+
+ If multiple exceptions need to be caught they need to be provided as a
+ tuple::
+
+ @catches((TypeError, AttributeError))
+ def bar():
+ some_call()
+ print "Success!"
+
+ If adding a handler, it should accept a single argument, which would be the
+ exception that was raised, it would look like::
+
+ def my_handler(exc):
+ print 'Handling exception %s' % str(exc)
+ raise SystemExit
+
+ @catches(KeyboardInterrupt, handler=my_handler)
+ def bar():
+ some_call()
+
+ Note that the handler needs to raise its SystemExit if it wants to halt
+ execution, otherwise the decorator would continue as a normal try/except
+ block.
+
+
+ :param catch: A tuple with one (or more) Exceptions to catch
+ :param handler: Optional handler to have custom handling of exceptions
+ :param exit: Raise a ``SystemExit`` after handling exceptions
+ :param handle_all: Handle all other exceptions via logging.
+ """
+ catch = catch or Exception
+ logger = logging.getLogger('radosgw_agent')
+
+ def decorate(f):
+
+ @wraps(f)
+ def newfunc(*a, **kw):
+ exit_from_catch = False
+ try:
+ return f(*a, **kw)
+ except catch as e:
+ if handler:
+ return handler(e)
+ else:
+ logger.error(make_exception_message(e))
+
+ if exit:
+ exit_from_catch = True
+ sys.exit(1)
+ except Exception: # anything else, no need to save the exception as a variable
+ if handle_all is False: # re-raise if we are not supposed to handle everything
+ raise
+ # Make sure we don't spit double tracebacks if we are raising
+ # SystemExit from the `except catch` block
+
+ if exit_from_catch:
+ sys.exit(1)
+
+ str_failure = traceback.format_exc()
+ for line in str_failure.split('\n'):
+ logger.error(line)
+ sys.exit(1)
+
+ return newfunc
+
+ return decorate
+
+#
+# Decorator helpers
+#
+
+
+def make_exception_message(exc):
+ """
+ An exception is passed in and this function
+ returns the proper string depending on the result
+ so it is readable enough.
+ """
+ if str(exc):
+ return '%s: %s\n' % (exc.__class__.__name__, exc)
+ else:
+ return '%s\n' % (exc.__class__.__name__)
+
--- /dev/null
+import logging
+import sys
+
+
+def get_dev_logger(name='dev.radosgw_agent'):
+ """
+ A simple utility to be able to log things that are meant for developer-eyes
+ and not for user facing.
+
+ All developer logs must be prepended with `dev` so this utility ensures
+ that is the case. To use it::
+
+ dev_log = get_dev_logger(__name__)
+
+ Or::
+
+ dev_log = get_dev_logger('dev.custom_name')
+ """
+ if not name.startswith('dev'):
+ return logging.getLogger('%s.%s' % ('dev', name))
+ return logging.getLogger(name)
+
+
+BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
+
+COLORS = {
+ 'WARNING': YELLOW,
+ 'INFO': WHITE,
+ 'DEBUG': BLUE,
+ 'CRITICAL': RED,
+ 'ERROR': RED,
+ 'FATAL': RED,
+}
+
+RESET_SEQ = "\033[0m"
+COLOR_SEQ = "\033[1;%dm"
+BOLD_SEQ = "\033[1m"
+
+BASE_COLOR_FORMAT = "%(asctime)s %(process)d\
+ [$BOLD%(name)s$RESET][%(color_levelname)-17s] %(message)s"
+
+BASE_FORMAT = "%(asctime)s %(process)d [%(name)s][%(levelname)-6s] %(message)s"
+
+
+def supports_color():
+ """
+ Returns True if the running system's terminal supports color, and False
+ otherwise.
+ """
+ unsupported_platform = (sys.platform in ('win32', 'Pocket PC'))
+ # isatty is not always implemented
+ is_a_tty = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()
+ if unsupported_platform or not is_a_tty:
+ return False
+ return True
+
+
+def color_message(message):
+ message = message.replace("$RESET", RESET_SEQ).replace("$BOLD", BOLD_SEQ)
+ return message
+
+
+class ColoredFormatter(logging.Formatter):
+ """
+ A very basic logging formatter that not only applies color to the levels of
+ the ouput but will also truncate the level names so that they do not alter
+ the visuals of logging when presented on the terminal.
+ """
+
+ def __init__(self, msg):
+ logging.Formatter.__init__(self, msg)
+
+ def format(self, record):
+ levelname = record.levelname
+ truncated_level = record.levelname[:6]
+ levelname_color = COLOR_SEQ % (30 + COLORS[levelname]) + truncated_level + RESET_SEQ
+ record.color_levelname = levelname_color
+ return logging.Formatter.format(self, record)
+
+
+def color_format():
+ """
+ Main entry point to get a colored formatter, it will use the
+ BASE_FORMAT by default and fall back to no colors if the system
+ does not support it
+ """
+ str_format = BASE_COLOR_FORMAT if supports_color() else BASE_FORMAT
+ color_format = color_message(str_format)
+ return ColoredFormatter(color_format)
--- /dev/null
+import socket
+
+
+def is_ipv6(address):
+ """
+ Check if an address is an IPV6 one, but trim commonly used brackets as the
+ ``socket`` module complains about them.
+ """
+ if not isinstance(address, str):
+ return False
+
+ if address.startswith('['): # assume we need to split on possible port
+ address = address.split(']:')[0]
+ # strip leading/trailing brackets so inet_pton understands the address
+ address = address.strip('[]')
+ try:
+ socket.inet_pton(socket.AF_INET6, address)
+ except socket.error: # not a valid address
+ return False
+ return True
--- /dev/null
+
+
+def to_dict(_object, **extra_keys):
+ """
+ A utility to convert an object with attributes to a dictionary with the
+ optional feature of slapping extra_keys. Because extra_keys can be
+ optionally set, it is assumed that any keys that clash will get
+ overwritten.
+
+ Private methods (anything that starts with `_`) are ignored.
+ """
+ dictified_obj = {}
+ for k, v in _object.__dict__.items():
+ if not k.startswith('_'):
+ # get key
+ value = extra_keys.pop(k, v)
+ dictified_obj[k] = value
+ if extra_keys:
+ for k, v in extra_keys.items():
+ dictified_obj[k] = v
+
+ return dictified_obj
+
+
+def to_obj(dictionary, name="BucketEntry"):
+ """
+ Because some objects are dynamic, we are forced to skip namedtuples
+ and set the attributes from keys in dictionaries so that accessing them
+ is easier and compatible with code that accesses them as regular objects.
+
+ .. note: dashes are converted to underscores
+ """
+ class Meta(object):
+
+ def __init__(self, **kw):
+ for k, v in kw.items():
+ k = k.replace('-', '_')
+ setattr(self, k, v)
+
+ obj_ = Meta(**dictionary)
+ obj_.__class__.__name__ = name
+ return obj_
--- /dev/null
+
+def concatenate(*a, **kw):
+ """
+ helper function to concatenate all arguments with added (optional)
+ newlines
+ """
+ newline = kw.get('newline', False)
+ string = ''
+ for item in a:
+ if newline:
+ string += item + '\n'
+ else:
+ string += item
+ return string
--- /dev/null
+from collections import namedtuple
+from itertools import ifilter
+import logging
+import multiprocessing
+import os
+import socket
+import time
+
+from radosgw_agent import client
+from radosgw_agent import lock
+from radosgw_agent.util import obj as obj_, get_dev_logger
+from radosgw_agent.exceptions import SkipShard, SyncError, SyncTimedOut, SyncFailed, NotFound, BucketEmpty
+from radosgw_agent.constants import DEFAULT_TIME, RESULT_SUCCESS, RESULT_ERROR
+
+log = logging.getLogger(__name__)
+dev_log = get_dev_logger(__name__)
+
+
+class Worker(multiprocessing.Process):
+ """sync worker to run in its own process"""
+
+ def __init__(self, work_queue, result_queue, log_lock_time,
+ src, dest, **kwargs):
+ super(Worker, self).__init__()
+ self.src = src
+ self.dest = dest
+ self.work_queue = work_queue
+ self.result_queue = result_queue
+ self.log_lock_time = log_lock_time
+ self.lock = None
+
+ self.local_lock_id = socket.gethostname() + ':' + str(os.getpid())
+
+ # construct the two connection objects
+ self.src_conn = client.connection(src)
+ self.dest_conn = client.connection(dest)
+
+ def prepare_lock(self):
+ assert self.lock is None
+ self.lock = lock.Lock(self.dest_conn, self.type, self.local_lock_id,
+ self.log_lock_time, self.dest.zone.name)
+ self.lock.daemon = True
+ self.lock.start()
+
+ def lock_shard(self, shard_num):
+ result = shard_num, []
+ try:
+ self.lock.set_shard(shard_num)
+ self.lock.acquire()
+ except NotFound:
+ # no log means nothing changed this shard yet
+ self.lock.unset_shard()
+ self.result_queue.put((RESULT_SUCCESS, result))
+ raise SkipShard('no log for shard')
+ except Exception:
+ log.warn('error locking shard %d log, '
+ ' skipping for now. Traceback: ',
+ shard_num, exc_info=True)
+ self.lock.unset_shard()
+ self.result_queue.put((RESULT_ERROR, result))
+ raise SkipShard()
+
+ def unlock_shard(self):
+ try:
+ self.lock.release_and_clear()
+ except lock.LockBroken as e:
+ log.warn('work may be duplicated: %s', e)
+ except Exception as e:
+ log.warn('error unlocking log, continuing anyway '
+ 'since lock will timeout. Traceback:', exc_info=True)
+
+ def set_bound(self, key, marker, retries, type_=None):
+ # api doesn't allow setting a bound with a blank marker
+ if marker:
+ if type_ is None:
+ type_ = self.type
+ try:
+ data = [
+ obj_.to_dict(item, time=DEFAULT_TIME) for item in retries
+ ]
+ client.set_worker_bound(self.dest_conn,
+ type_,
+ marker,
+ DEFAULT_TIME,
+ self.daemon_id,
+ key,
+ data=data)
+ return RESULT_SUCCESS
+ except Exception:
+ log.warn('error setting worker bound for key "%s",'
+ ' may duplicate some work later. Traceback:', key,
+ exc_info=True)
+ return RESULT_ERROR
+
+MetadataEntry = namedtuple('MetadataEntry',
+ ['section', 'name', 'marker', 'timestamp'])
+
+
+def _meta_entry_from_json(entry):
+ return MetadataEntry(
+ entry['section'],
+ entry['name'],
+ entry['id'],
+ entry['timestamp'],
+ )
+
+BucketIndexEntry = namedtuple('BucketIndexEntry',
+ [
+ 'object',
+ 'marker',
+ 'timestamp',
+ 'op',
+ 'versioned',
+ 'ver',
+ 'name',
+ # compatibility with boto objects:
+ 'VersionedEpoch',
+ 'version_id',
+ ])
+
+BucketVer = namedtuple('BucketVer',
+ [
+ 'epoch',
+ 'pool',
+ ])
+
+
+def _bi_entry_from_json(entry):
+ ver = entry.get('ver', {})
+ entry_ver = BucketVer(
+ ver.get('epoch'),
+ ver.get('pool')
+ )
+
+ # compatibility with boto objects:
+ VersionedEpoch = ver.get('epoch')
+ version_id = entry.get('instance', 'null')
+
+ return BucketIndexEntry(
+ entry['object'],
+ entry['op_id'],
+ entry['timestamp'],
+ entry.get('op', ''),
+ entry.get('versioned', False),
+ entry_ver,
+ entry['object'],
+ VersionedEpoch,
+ version_id,
+ )
+
+
+def filter_versioned_objects(entry):
+ """
+ On incremental sync operations, the log may indicate that 'olh' entries,
+ which should be ignored. So this filter function will check for the
+ different attributes present in an ``entry`` and return only valid ones.
+
+ This should be backwards compatible with older gateways that return log
+ entries that don't support versioning.
+ """
+ # do not attempt filtering on non-versioned entries
+ if not entry.versioned:
+ return entry
+
+ # writes or delete 'op' values should be ignored
+ if entry.op not in ['write', 'delete']:
+ # allowed op states are `link_olh` and `link_olh_del`
+ return entry
+
+
+class IncrementalMixin(object):
+ """This defines run() and get_and_process_entries() for incremental sync.
+
+ These are the same for data and metadata sync, so share their
+ implementation here.
+ """
+
+ def run(self):
+ self.prepare_lock()
+ while True:
+ item = self.work_queue.get()
+ if item is None:
+ dev_log.info('process %s is done. Exiting', self.ident)
+ break
+
+ shard_num, (log_entries, retries) = item
+
+ log.info('%s is processing shard number %d',
+ self.ident, shard_num)
+
+ # first, lock the log
+ try:
+ self.lock_shard(shard_num)
+ except SkipShard:
+ continue
+
+ result = RESULT_SUCCESS
+ try:
+ new_retries = self.sync_entries(log_entries, retries)
+ except Exception:
+ log.exception('syncing entries for shard %d failed',
+ shard_num)
+ result = RESULT_ERROR
+ new_retries = []
+
+ # finally, unlock the log
+ self.unlock_shard()
+ self.result_queue.put((result, (shard_num, new_retries)))
+ log.info('finished processing shard %d', shard_num)
+
+
+class DataWorker(Worker):
+
+ def __init__(self, *args, **kwargs):
+ super(DataWorker, self).__init__(*args, **kwargs)
+ self.type = 'data'
+ self.op_id = 0
+ self.object_sync_timeout = kwargs.get('object_sync_timeout', 60 * 60 * 60)
+ self.daemon_id = kwargs['daemon_id']
+
+ def sync_object(self, bucket, obj):
+ log.debug('syncing object %s/%s', bucket, obj.name)
+ self.op_id += 1
+ local_op_id = self.local_lock_id + ':' + str(self.op_id)
+ found = False
+
+ try:
+ until = time.time() + self.object_sync_timeout
+ client.sync_object_intra_region(self.dest_conn, bucket, obj,
+ self.src.zone.name,
+ self.daemon_id,
+ local_op_id)
+ found = True
+ except NotFound:
+ log.debug('object "%s/%s" not found on master, deleting from secondary',
+ bucket, obj.name)
+ try:
+ client.delete_object(self.dest_conn, bucket, obj)
+ except NotFound:
+ # Since we were trying to delete the object, just return
+ return False
+ except Exception:
+ msg = 'could not delete "%s/%s" from secondary' % (bucket, obj.name)
+ log.exception(msg)
+ raise SyncFailed(msg)
+ except SyncFailed:
+ raise
+ except Exception as error:
+ msg = 'encountered an error during sync'
+ dev_log.warn(msg, exc_info=True)
+ log.warning('%s: %s' % (msg, error))
+ # wait for it if the op state is in-progress
+ self.wait_for_object(bucket, obj, until, local_op_id)
+ # TODO: clean up old op states
+ try:
+ if found:
+ client.remove_op_state(self.dest_conn, self.daemon_id,
+ local_op_id, bucket, obj)
+ except NotFound:
+ log.debug('op state already gone')
+ except Exception:
+ log.exception('could not remove op state for daemon "%s" op_id %s',
+ self.daemon_id, local_op_id)
+
+ return True
+
+ def wait_for_object(self, bucket, obj, until, local_op_id):
+ while time.time() < until:
+ try:
+ state = client.get_op_state(self.dest_conn,
+ self.daemon_id,
+ local_op_id,
+ bucket, obj)
+ log.debug('op state is %s', state)
+ if not state:
+ time.sleep(1)
+ continue
+ state = state[0]['state']
+ if state == 'complete':
+ return
+ elif state != 'in-progress':
+ raise SyncFailed('state is {0}'.format(state))
+ time.sleep(1)
+ except SyncFailed:
+ raise
+ except NotFound:
+ raise SyncFailed('object copy state not found')
+ except Exception as e:
+ log.debug('error geting op state: %s', e, exc_info=True)
+ log.info('will try to get op state again')
+ time.sleep(1)
+ # timeout expired
+ raise SyncTimedOut()
+
+ def get_bucket_instance(self, bucket):
+ metadata = client.get_metadata(self.src_conn, 'bucket', bucket)
+ return bucket + ':' + metadata['data']['bucket']['bucket_id']
+
+ def get_bucket(self, bucket_instance):
+ return bucket_instance.split(':', 1)[0]
+
+ def sync_bucket(self, bucket, objects):
+ log.info('*'*80)
+ log.info('syncing bucket "%s"', bucket)
+ retry_objs = []
+ count = 0
+ for obj in objects:
+ try:
+ self.sync_object(bucket, obj)
+ count += 1
+ except SyncError as err:
+ log.error('failed to sync object %s/%s: %s',
+ bucket, obj.name, err)
+ log.warning(
+ 'will retry sync of failed object at next incremental sync'
+ )
+ retry_objs.append(obj)
+ log.info('synced %s objects' % count)
+ log.info('completed syncing bucket "%s"', bucket)
+ log.info('*'*80)
+
+ return retry_objs
+
+
+class DataWorkerIncremental(IncrementalMixin, DataWorker):
+
+ def __init__(self, *args, **kwargs):
+ super(DataWorkerIncremental, self).__init__(*args, **kwargs)
+ self.max_entries = kwargs['max_entries']
+
+ def get_bucket_instance_entries(self, marker, instance):
+ entries = []
+ while True:
+ try:
+ log_entries = client.get_log(self.src_conn, 'bucket-index',
+ marker, self.max_entries, instance)
+ except NotFound:
+ log_entries = []
+
+ log.debug('bucket instance "%s" has %d entries after "%s"', instance,
+ len(log_entries), marker)
+
+ try:
+ entries += [_bi_entry_from_json(entry) for entry in log_entries]
+ except KeyError:
+ log.error('log missing key is: %s', log_entries)
+ raise
+
+ if entries:
+ marker = entries[-1].marker
+ else:
+ marker = ' '
+
+ if len(log_entries) < self.max_entries:
+ break
+ return marker, entries
+
+ def inc_sync_bucket_instance(self, instance, marker, timestamp, retries):
+ max_marker, entries = self.get_bucket_instance_entries(marker, instance)
+
+ # regardless if entries are versioned, make sure we filter them
+ entries = [i for i in ifilter(filter_versioned_objects, entries)]
+
+ objects = set([entry for entry in entries])
+ bucket = self.get_bucket(instance)
+ new_retries = self.sync_bucket(bucket, objects.union(retries))
+
+ result = self.set_bound(instance, max_marker, new_retries,
+ 'bucket-index')
+ if new_retries:
+ result = RESULT_ERROR
+ return result
+
+ def sync_entries(self, log_entries, retries):
+ try:
+ bucket_instances = set([entry['key'] for entry in log_entries])
+ except KeyError:
+ log.error('log containing bad key is: %s', log_entries)
+ raise
+
+ new_retries = []
+ for bucket_instance in bucket_instances.union(retries):
+ if ':' not in bucket_instance:
+ # it's just a plain bucket from an old version of the agent
+ bucket_instance = self.get_bucket_instance(bucket_instance)
+
+ bound = client.get_worker_bound(
+ self.dest_conn,
+ 'bucket-index',
+ bucket_instance)
+
+ marker = bound['marker']
+ # remap dictionaries to object-like
+ retries = [obj_.to_obj(i) for i in bound['retries']]
+ timestamp = bound['oldest_time']
+
+ try:
+ sync_result = self.inc_sync_bucket_instance(bucket_instance,
+ marker,
+ timestamp,
+ retries)
+ except Exception as e:
+ log.warn('error syncing bucket instance "%s": %s',
+ bucket_instance, e, exc_info=True)
+ sync_result = RESULT_ERROR
+ if sync_result == RESULT_ERROR:
+ new_retries.append(bucket_instance)
+
+ return new_retries
+
+
+class DataWorkerFull(DataWorker):
+
+ def full_sync_bucket(self, bucket):
+ try:
+ instance = self.get_bucket_instance(bucket)
+ try:
+ marker = client.get_log_info(self.src_conn, 'bucket-index',
+ instance)['max_marker']
+ except NotFound:
+ marker = ' '
+ log.debug('bucket instance is "%s" with marker %s', instance, marker)
+
+ objects = client.list_objects_in_bucket(self.src_conn, bucket)
+ retries = self.sync_bucket(bucket, objects)
+
+ result = self.set_bound(instance, marker, retries, 'bucket-index')
+ return not retries and result == RESULT_SUCCESS
+ except BucketEmpty:
+ log.debug('no objects in bucket %s', bucket)
+ return True
+ except Exception:
+ log.exception('error preparing for full sync of bucket "%s"',
+ bucket)
+ return False
+
+ def run(self):
+ self.prepare_lock()
+ while True:
+ item = self.work_queue.get()
+ if item is None:
+ log.info('No more entries in queue, exiting')
+ break
+
+ shard_num, buckets = item
+
+ # first, lock the log
+ try:
+ self.lock_shard(shard_num)
+ except SkipShard:
+ continue
+
+ # attempt to sync each bucket, add to a list to retry
+ # during incremental sync if sync fails
+ retry_buckets = []
+ for bucket in buckets:
+ if not self.full_sync_bucket(bucket):
+ retry_buckets.append(bucket)
+
+ # unlock shard and report buckets to retry during incremental sync
+ self.unlock_shard()
+ self.result_queue.put((RESULT_SUCCESS, (shard_num, retry_buckets)))
+ log.info('finished syncing shard %d', shard_num)
+ if retry_buckets:
+ log.info('incremental sync will need to retry buckets: %s',
+ retry_buckets)
+
+
+class MetadataWorker(Worker):
+
+ def __init__(self, *args, **kwargs):
+ super(MetadataWorker, self).__init__(*args, **kwargs)
+ self.type = 'metadata'
+
+ def sync_meta(self, section, name):
+ log.debug('syncing metadata type %s key "%s"', section, name)
+ try:
+ metadata = client.get_metadata(self.src_conn, section, name)
+ except NotFound:
+ log.debug('%s "%s" not found on master, deleting from secondary',
+ section, name)
+ try:
+ client.delete_metadata(self.dest_conn, section, name)
+ except NotFound:
+ # Since this error is handled appropriately, return success
+ return RESULT_SUCCESS
+ except Exception as e:
+ log.warn('error getting metadata for %s "%s": %s',
+ section, name, e, exc_info=True)
+ return RESULT_ERROR
+ else:
+ try:
+ client.update_metadata(self.dest_conn, section, name, metadata)
+ return RESULT_SUCCESS
+ except Exception as e:
+ log.warn('error updating metadata for %s "%s": %s',
+ section, name, e, exc_info=True)
+ return RESULT_ERROR
+
+class MetadataWorkerIncremental(IncrementalMixin, MetadataWorker):
+
+ def __init__(self, *args, **kwargs):
+ super(MetadataWorkerIncremental, self).__init__(*args, **kwargs)
+
+ def sync_entries(self, log_entries, retries):
+ try:
+ entries = [_meta_entry_from_json(entry) for entry in log_entries]
+ except KeyError:
+ log.error('log containing bad key is: %s', log_entries)
+ raise
+
+ new_retries = []
+ mentioned = set([(entry.section, entry.name) for entry in entries])
+ split_retries = [tuple(entry.split('/', 1)) for entry in retries]
+ for section, name in mentioned.union(split_retries):
+ sync_result = self.sync_meta(section, name)
+ if sync_result == RESULT_ERROR:
+ new_retries.append(section + '/' + name)
+
+ return new_retries
+
+class MetadataWorkerFull(MetadataWorker):
+
+ def empty_result(self, shard):
+ return shard, []
+
+ def run(self):
+ self.prepare_lock()
+ while True:
+ item = self.work_queue.get()
+ if item is None:
+ log.info('No more entries in queue, exiting')
+ break
+
+ log.debug('syncing item "%s"', item)
+
+ shard_num, metadata = item
+
+ # first, lock the log
+ try:
+ self.lock_shard(shard_num)
+ except SkipShard:
+ continue
+
+ # attempt to sync each bucket, add to a list to retry
+ # during incremental sync if sync fails
+ retries = []
+ for section, name in metadata:
+ try:
+ self.sync_meta(section, name)
+ except Exception as e:
+ log.warn('could not sync %s "%s", saving for retry: %s',
+ section, name, e, exc_info=True)
+ retries.append(section + '/' + name)
+
+ # unlock shard and report buckets to retry during incremental sync
+ self.unlock_shard()
+ self.result_queue.put((RESULT_SUCCESS, (shard_num, retries)))
+ log.info('finished syncing shard %d', shard_num)
+ log.info('incremental sync will need to retry items: %s',
+ retries)
--- /dev/null
+#!/usr/bin/env python
+import os
+import platform
+import sys
+"""
+radosgw-agent - admin tool for ceph
+"""
+
+if os.path.exists('/usr/share/pyshared/radosgw_agent'):
+ sys.path.insert(0,'/usr/share/pyshared/radosgw_agent')
+elif os.path.exists('/usr/share/radosgw-agent'):
+ sys.path.insert(0,'/usr/share/radosgw-agent')
+elif os.path.exists('/usr/share/pyshared/radosgw-agent'):
+ sys.path.insert(0,'/usr/share/pyshared/radosgw-agent')
+elif os.path.exists('/usr/lib/python2.6/site-packages/radosgw_agent'):
+ sys.path.insert(0,'/usr/lib/python2.6/site-packages/radosgw_agent')
+
+from radosgw_agent.cli import main
+
+if __name__ == '__main__':
+ sys.exit(main())
--- /dev/null
+[bdist_rpm]
+requires = python-argparse,PyYAML,python-boto >= 2.2.2,python-boto < 3.0.0
--- /dev/null
+from setuptools import setup, find_packages
+import sys
+import re
+
+
+module_file = open("radosgw_agent/__init__.py").read()
+metadata = dict(
+ re.findall(r"__([a-z]+)__\s*=\s*['\"]([^'\"]*)['\"]", module_file))
+
+
+install_requires = [
+ 'boto >=2.10.0,<3.0.0',
+ 'PyYAML',
+]
+
+pyversion = sys.version_info[:2]
+if pyversion < (2, 7) or (3, 0) <= pyversion <= (3, 1):
+ install_requires.append('argparse')
+
+setup(
+ name='radosgw-agent',
+ version=metadata['version'],
+ packages=find_packages(),
+
+ author='Josh Durgin',
+ author_email='jdurgin@redhat.com',
+ description='Synchronize users and data between radosgw clusters',
+ license='MIT',
+ keywords='radosgw ceph radosgw-agent',
+ url="https://github.com/ceph/radosgw-agent",
+ install_requires=install_requires,
+ test_requires=[
+ 'pytest',
+ 'mock',
+ 'tox',
+ 'httpretty',
+ ],
+ entry_points={
+ 'console_scripts': [
+ 'radosgw-agent = radosgw_agent.cli:main',
+ ],
+ },
+ )
--- /dev/null
+[tox]
+envlist = py26, py27
+
+[testenv]
+deps=
+ pytest
+ mock
+ httpretty
+commands=py.test -s -v {posargs:radosgw_agent/tests}