1.2.6 v1.2.6
authorJenkins Build Slave User <jenkins-build@jenkins.ceph.com>
Thu, 9 Jun 2016 12:46:45 +0000 (12:46 +0000)
committerJenkins Build Slave User <jenkins-build@jenkins.ceph.com>
Thu, 9 Jun 2016 12:46:45 +0000 (12:46 +0000)
43 files changed:
.gitignore [new file with mode: 0644]
CHANGELOG.rst [new file with mode: 0644]
LICENSE [new file with mode: 0644]
MANIFEST.in [new file with mode: 0644]
README.rst [new file with mode: 0644]
bootstrap [new file with mode: 0755]
debian/changelog [new file with mode: 0644]
debian/compat [new file with mode: 0644]
debian/control [new file with mode: 0644]
debian/copyright [new file with mode: 0644]
debian/dirs [new file with mode: 0644]
debian/rules [new file with mode: 0755]
debian/source/format [new file with mode: 0644]
init-radosgw-agent [new file with mode: 0644]
logrotate.conf [new file with mode: 0644]
radosgw-agent.spec [new file with mode: 0644]
radosgw_agent/__init__.py [new file with mode: 0644]
radosgw_agent/cli.py [new file with mode: 0644]
radosgw_agent/client.py [new file with mode: 0644]
radosgw_agent/constants.py [new file with mode: 0644]
radosgw_agent/exceptions.py [new file with mode: 0644]
radosgw_agent/lock.py [new file with mode: 0644]
radosgw_agent/request.py [new file with mode: 0644]
radosgw_agent/sync.py [new file with mode: 0644]
radosgw_agent/tests/__init__.py [new file with mode: 0644]
radosgw_agent/tests/conftest.py [new file with mode: 0644]
radosgw_agent/tests/test_client.py [new file with mode: 0644]
radosgw_agent/tests/test_worker.py [new file with mode: 0644]
radosgw_agent/tests/util/test_configuration.py [new file with mode: 0644]
radosgw_agent/tests/util/test_network.py [new file with mode: 0644]
radosgw_agent/tests/util/test_obj.py [new file with mode: 0644]
radosgw_agent/util/__init__.py [new file with mode: 0644]
radosgw_agent/util/configuration.py [new file with mode: 0644]
radosgw_agent/util/decorators.py [new file with mode: 0644]
radosgw_agent/util/log.py [new file with mode: 0644]
radosgw_agent/util/network.py [new file with mode: 0644]
radosgw_agent/util/obj.py [new file with mode: 0644]
radosgw_agent/util/string.py [new file with mode: 0644]
radosgw_agent/worker.py [new file with mode: 0644]
scripts/radosgw-agent [new file with mode: 0644]
setup.cfg [new file with mode: 0644]
setup.py [new file with mode: 0644]
tox.ini [new file with mode: 0644]

diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..2907320
--- /dev/null
@@ -0,0 +1,36 @@
+*.py[cod]
+
+# C extensions
+*.so
+
+# Packages
+*.egg
+*.egg-info
+dist
+build
+eggs
+parts
+bin
+var
+sdist
+develop-eggs
+.installed.cfg
+lib
+lib64
+
+# Installer logs
+pip-log.txt
+*.log
+
+# Unit test / coverage reports
+.coverage
+.tox
+nosetests.xml
+
+# Translations
+*.mo
+
+# Mr Developer
+.mr.developer.cfg
+.project
+.pydevproject
diff --git a/CHANGELOG.rst b/CHANGELOG.rst
new file mode 100644 (file)
index 0000000..52433d4
--- /dev/null
@@ -0,0 +1,58 @@
+Changelog
+=========
+
+1.2.6
+-----
+9-Jun-2016
+* Improve logging when op state is empty when syncing
+* If there is no op state when syncing do not spit out a traceback
+
+1.2.5
+-----
+30-Mar-2016
+* Bump the minimum version of boto required to 2.10.0
+* Fix configuration not found error when using init script to restart
+  radosgw-agent
+
+1.2.4
+-----
+12-Aug-2015
+* Fix invalid references for HttpError in lock.py
+* Fix an issue where pinning of the mock library would make installation fail
+
+1.2.3
+-----
+15-Jul-2015
+* suppress override of config settings by argparse defaults
+* properly detect ipv6 endpoints
+* add Python 2.7 testing
+
+
+1.2.2
+-----
+27-Apr-2015
+* Improve terminal logging with better report to actua sync state
+* Catch all exceptions to create better error reporting at the terminal
+* If log location is not available fall back to current working directory
+* Add a flag to indicate versioning support of endpoints
+* support object versioning operations
+* ensure logging is fully configured before any parsing to display errors
+  regardless of failure
+* set the version in ``__init__.py`` and display it when using help
+* log all initial settings and flags of the agent when it starts
+
+1.2.1
+-----
+* Parity in version release for DEB/RPMs to PyPI. Previous 1.2 release had
+  fixes available only for the Python package.
+
+1.2
+---
+* Improve usage for log (working better with logrotate)
+* Fixes for racing threads when shard number changes
+* Better logging of exceptions
+* Retry sync when transient errors are returned by the gateway.
+* Drops dependency on Python's ``request`` library (in favour of ``boto``)
+* Better support of objects when they are not found.
+* When there are buckets with no logs, process them as a full sync.
+* Fix mishandling of reserved characters in URLs.
diff --git a/LICENSE b/LICENSE
new file mode 100644 (file)
index 0000000..6062a74
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,19 @@
+Copyright (c) 2013 Inktank Storage, Inc.
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644 (file)
index 0000000..03f1191
--- /dev/null
@@ -0,0 +1,5 @@
+include LICENSE
+include scripts/radosgw-agent
+include init-radosgw-agent
+include logrotate.conf
+prune radosgw_agent/tests
diff --git a/README.rst b/README.rst
new file mode 100644 (file)
index 0000000..656bd71
--- /dev/null
@@ -0,0 +1,3 @@
+====================================================================
+radosgw-agent -- synchronize data and users between radosgw clusters
+====================================================================
diff --git a/bootstrap b/bootstrap
new file mode 100755 (executable)
index 0000000..65cd46a
--- /dev/null
+++ b/bootstrap
@@ -0,0 +1,41 @@
+#!/bin/sh
+set -e
+
+if command -v lsb_release >/dev/null 2>&1; then
+    case "$(lsb_release --id --short)" in
+       Ubuntu|Debian)
+           for package in python-virtualenv; do
+               if [ "$(dpkg --status -- $package 2>/dev/null|sed -n 's/^Status: //p')" != "install ok installed" ]; then
+                   # add a space after old values
+                   missing="${missing:+$missing }$package"
+               fi
+           done
+           if [ -n "$missing" ]; then
+                       echo "$0: missing required packages, please install them:" 1>&2
+                       echo "  sudo apt-get install $missing"
+                       exit 1
+           fi
+           ;;
+    esac
+else
+       if [ -f /etc/redhat-release ]; then
+               case "$(cat /etc/redhat-release | awk '{print $1}')" in
+                       CentOS)
+                               for package in python-virtualenv; do
+                               if [ "$(rpm -qa $package 2>/dev/null)" == "" ]; then
+                                       missing="${missing:+$missing }$package"
+                               fi
+                               done
+                               if [ -n "$missing" ]; then
+                                       echo "$0: missing required packages, please install them:" 1>&2
+                                       echo "  sudo yum install $missing"
+                               exit 1
+                               fi
+                               ;;
+               esac
+       fi
+fi
+
+test -d virtualenv || virtualenv virtualenv
+./virtualenv/bin/python setup.py develop
+test -e radosgw-agent || ln -s ./virtualenv/bin/radosgw-agent .
diff --git a/debian/changelog b/debian/changelog
new file mode 100644 (file)
index 0000000..e33e769
--- /dev/null
@@ -0,0 +1,53 @@
+radosgw-agent (1.2.6) stable; urgency=medium
+
+  * New upstream release
+
+ -- Alfredo Deza <adeza@redhat.com>  Thu, 09 Jun 2016 12:46:44 +0000
+
+radosgw-agent (1.2.5) stable; urgency=medium
+
+  * New upstream release
+
+ -- Alfredo Deza <adeza@redhat.com>  Wed, 30 Mar 2016 22:52:21 +0000
+
+radosgw-agent (1.2.4) stable; urgency=low
+
+  * New upstream release
+
+ -- Alfredo Deza <adeza@redhat.com>  Wed, 12 Aug 2015 12:46:22 -0700
+
+radosgw-agent (1.2.3) stable; urgency=low
+
+  * New upstream release
+
+ -- Alfredo Deza <adeza@redhat.com>  Wed, 15 Jul 2015 09:08:11 -0700
+
+radosgw-agent (1.2.2) stable; urgency=low
+
+  * New upstream release
+
+ -- Alfredo Deza <adeza@redhat.com>  Mon, 27 Apr 2015 13:00:08 -0700
+
+radosgw-agent (1.2.1) stable; urgency=low
+
+  * New upstream release
+
+ -- Alfredo Deza <adeza@redhat.com>  Mon, 09 Feb 2015 12:52:46 -0800
+
+radosgw-agent (1.2-1) precise; urgency=low
+
+  * new upstream release
+
+ -- Sandon Van Ness <sandon@inktank.com>  Wed, 02 April 2014 11:25:54 -0800
+
+radosgw-agent (1.1-1) precise; urgency=low
+
+  * new upstream release 
+
+ -- Gary Lowell <glowell@pudgy.ops.newdream.net>  Thu, 21 Nov 2013 16:17:25 -0800
+
+radosgw-agent (1.0-1) stable; urgency=low
+
+  * Initial release 
+
+ -- Gary Lowell <gary.lowell@inktank.com>  Mon, 26 Aug 2013 09:19:47 -0700
diff --git a/debian/compat b/debian/compat
new file mode 100644 (file)
index 0000000..45a4fb7
--- /dev/null
@@ -0,0 +1 @@
+8
diff --git a/debian/control b/debian/control
new file mode 100644 (file)
index 0000000..6acbfba
--- /dev/null
@@ -0,0 +1,18 @@
+Source: radosgw-agent
+Maintainer: Sage Weil <sage@newdream.net>
+Uploaders: Sage Weil <sage@newdream.net>
+Section: admin
+Priority: optional
+Build-Depends: debhelper (>= 8), python-setuptools
+X-Python-Version: >= 2.6
+Standards-Version: 3.9.2
+Homepage: http://ceph.com/
+
+Package: radosgw-agent
+Architecture: all
+Depends: python,
+         python-argparse,
+         python-setuptools,
+         ${misc:Depends},
+         ${python:Depends}
+Description:  Rados gateway agents.
diff --git a/debian/copyright b/debian/copyright
new file mode 100644 (file)
index 0000000..730861e
--- /dev/null
@@ -0,0 +1,3 @@
+Files: *
+Copyright: (c) 2013 by Inktank Storage
+License: LGPL2.1 (see /usr/share/common-licenses/LGPL-2.1)
diff --git a/debian/dirs b/debian/dirs
new file mode 100644 (file)
index 0000000..b1f058b
--- /dev/null
@@ -0,0 +1,3 @@
+/etc/ceph/radosgw-agent
+/var/log/ceph/radosgw-agent
+/var/run/ceph/radosgw-agent
diff --git a/debian/rules b/debian/rules
new file mode 100755 (executable)
index 0000000..8b612b7
--- /dev/null
@@ -0,0 +1,15 @@
+#!/usr/bin/make -f
+
+# Uncomment this to turn on verbose mode.
+export DH_VERBOSE=1
+
+%:
+       dh $@ --buildsystem python_distutils --with python2
+
+override_dh_installlogrotate:
+       cp logrotate.conf debian/radosgw-agent.logrotate
+       dh_installlogrotate
+
+override_dh_installinit:
+       install -m0644 init-radosgw-agent debian/radosgw-agent.init
+       dh_installinit --no-start
diff --git a/debian/source/format b/debian/source/format
new file mode 100644 (file)
index 0000000..d3827e7
--- /dev/null
@@ -0,0 +1 @@
+1.0
diff --git a/init-radosgw-agent b/init-radosgw-agent
new file mode 100644 (file)
index 0000000..f43970e
--- /dev/null
@@ -0,0 +1,102 @@
+#!/bin/sh
+# Start/stop radosgw-agent daemons
+# chkconfig: 2345 60 80
+
+### BEGIN INIT INFO
+# Provides:          radosgw-agent
+# Required-Start:    $remote_fs $named $network
+# Required-Stop:     $remote_fs $named $network
+# Default-Start:     2 3 4 5
+# Default-Stop:      0 1 6
+# Short-Description: Start radosgw-agent at boot time
+# Description:       Enable radosgw-agent services
+### END INIT INFO
+
+dir="/"
+config_path="/etc/ceph/radosgw-agent/default.conf"
+
+if [ $2 ]; then
+        config_path=$2
+fi
+
+if [ ! -f "$config_path" ]; then
+        echo "$0: configuration file $config_path not found"
+        exit 0
+fi
+
+cmd="/usr/bin/radosgw-agent -c $config_path"
+
+name=`basename $config_path`
+pid_file="/var/run/ceph/radosgw-agent/$name.pid"
+
+is_running() {
+        [ -e "$pid_file" ] || return 1
+        pid=`cat "$pid_file"`
+        [ -e "/proc/$pid" ] && grep -q "/usr/bin/radosgw-agent.-c.$config_path" "/proc/$pid/cmdline" && return 0
+        return 1
+}
+
+case "$1" in
+        start)
+        if is_running; then
+                echo "Already started"
+                exit 0
+        fi
+        echo "Starting radosgw-agent $name"
+        cd "$dir"
+        $cmd > /dev/null 2>&1 &
+        echo $! > "$pid_file"
+        if ! is_running; then
+               echo "Unable to start, see /var/log/ceph/radosgw-agent/"
+               exit 1
+        fi
+        ;;
+
+        stop)
+        if is_running; then
+                echo -n "Stopping radosgw-agent $name.."
+                pid=`cat "$pid_file"`
+                kill $pid
+                for i in {1..10}
+                do
+                        if ! is_running; then
+                                break
+                        fi
+
+                        echo -n "."
+                        sleep 1
+                done
+
+                if is_running; then
+                        echo "Not stopped; may still be shutting down or shutdown may have failed"
+                        exit 1
+                else
+                        echo "Stopped"
+                        rm "$pid_file"
+                fi
+        else
+                echo "Not running"
+        fi
+        ;;
+        restart)
+                $0 stop $config_path
+                if is_running; then
+                        echo "Unable to stop, will not attempt to start"
+                        exit 1
+                fi
+                $0 start $config_path
+        ;;
+        status)
+                if is_running; then
+                        echo "Running"
+                else
+                        echo "Stopped"
+                        exit 1
+                fi
+        ;;
+        *)
+                echo "Usage: $0 {start|stop|restart|status} [config-file]"
+                exit 1
+        ;;
+esac
+exit 0
diff --git a/logrotate.conf b/logrotate.conf
new file mode 100644 (file)
index 0000000..745f1cb
--- /dev/null
@@ -0,0 +1,7 @@
+/var/log/ceph/radosgw-agent/*.log {
+    rotate 7
+    daily
+    compress
+    missingok
+    notifempty
+}
diff --git a/radosgw-agent.spec b/radosgw-agent.spec
new file mode 100644 (file)
index 0000000..0c636fb
--- /dev/null
@@ -0,0 +1,44 @@
+Summary: Synchronize users and data between radosgw clusters
+Name: radosgw-agent
+Version:       1.2.6
+Release: 0%{?dist}
+Source0: https://pypi.python.org/packages/source/r/%{name}/%{name}-%{version}.tar.gz
+License: MIT
+Group: Development/Libraries
+BuildArch: noarch
+Requires: python-argparse
+Requires: PyYAML
+Requires: python-boto >= 2.10.0
+Requires: python-boto < 3.0.0
+BuildRequires: python-devel
+BuildRequires: python-setuptools
+URL: https://github.com/ceph/radosgw-agent
+
+%description
+The Ceph RADOS Gateway agent replicates the data of a master zone to a
+secondary zone.
+
+%prep
+%setup -q
+
+%build
+python setup.py build
+
+%install
+python setup.py install --single-version-externally-managed -O1 --root=$RPM_BUILD_ROOT
+install -m 0755 -D scripts/radosgw-agent $RPM_BUILD_ROOT%{_bindir}/radosgw-agent
+install -m 0644 -D logrotate.conf $RPM_BUILD_ROOT%{_sysconfdir}/logrotate.d/radosgw-agent
+install -m 0755 -D init-radosgw-agent $RPM_BUILD_ROOT%{_initrddir}/radosgw-agent
+mkdir -p $RPM_BUILD_ROOT%{_sysconfdir}/ceph/radosgw-agent
+mkdir -p $RPM_BUILD_ROOT%{_localstatedir}/log/ceph/radosgw-agent
+mkdir -p $RPM_BUILD_ROOT%{_localstatedir}/run/ceph/radosgw-agent
+
+%files
+%doc LICENSE
+%dir %{_sysconfdir}/ceph/radosgw-agent
+%dir %{_localstatedir}/log/ceph/radosgw-agent
+%dir %{_localstatedir}/run/ceph/radosgw-agent
+%config(noreplace) %{_sysconfdir}/logrotate.d/radosgw-agent
+%{_bindir}/radosgw-agent
+%{_initrddir}/radosgw-agent
+%{python_sitelib}/radosgw_agent*/
diff --git a/radosgw_agent/__init__.py b/radosgw_agent/__init__.py
new file mode 100644 (file)
index 0000000..760a1b8
--- /dev/null
@@ -0,0 +1,5 @@
+from radosgw_agent.util import configuration as _configuration
+
+config = _configuration.Configuration()
+
+__version__ = '1.2.6'
diff --git a/radosgw_agent/cli.py b/radosgw_agent/cli.py
new file mode 100644 (file)
index 0000000..24df8c6
--- /dev/null
@@ -0,0 +1,481 @@
+from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+import argparse
+import contextlib
+import logging
+import logging.handlers
+import os.path
+import yaml
+import sys
+import time
+import base64
+import hmac
+import sha
+import urllib2
+from urllib2 import URLError, HTTPError
+
+import radosgw_agent
+from radosgw_agent import client
+from radosgw_agent import util
+from radosgw_agent.util import string
+from radosgw_agent.util.decorators import catches
+from radosgw_agent.exceptions import AgentError, RegionMapError, InvalidProtocol
+from radosgw_agent import sync, config
+
+log = logging.getLogger('radosgw_agent')
+
+
+def check_positive_int(string):
+    value = int(string)
+    if value < 1:
+        msg = '%r is not a positive integer' % string
+        raise argparse.ArgumentTypeError(msg)
+    return value
+
+
+def check_endpoint(endpoint):
+    try:
+        return client.parse_endpoint(endpoint)
+    except InvalidProtocol as e:
+        raise argparse.ArgumentTypeError(str(e))
+    except client.InvalidHost as e:
+        raise argparse.ArgumentTypeError(str(e))
+
+
+def parse_args():
+    conf_parser = argparse.ArgumentParser(add_help=False)
+    conf_parser.add_argument(
+        '-c', '--conf',
+        type=file,
+        help='configuration file'
+        )
+    args, remaining = conf_parser.parse_known_args()
+    log_dir = '/var/log/ceph/radosgw-agent/'
+    log_file = 'radosgw-agent.log'
+    if args.conf is not None:
+        log_file = os.path.basename(args.conf.name)
+    defaults = dict(
+        sync_scope='incremental',
+        log_lock_time=20,
+        log_file=os.path.join(log_dir, log_file),
+        )
+    if args.conf is not None:
+        with contextlib.closing(args.conf):
+            config = yaml.safe_load_all(args.conf)
+            for new in config:
+                defaults.update(new)
+
+    parser = argparse.ArgumentParser(
+        parents=[conf_parser],
+        description='Synchronize radosgw installations',
+        )
+    verbosity = parser.add_mutually_exclusive_group(required=False)
+    verbosity.add_argument(
+        '-v', '--verbose',
+        action='store_true', dest='verbose',
+        help='be more verbose',
+        )
+    verbosity.add_argument(
+        '-q', '--quiet',
+        action='store_true', dest='quiet',
+        help='be less verbose',
+        )
+    parser.add_argument(
+        '--src-access-key',
+        required='src_access_key' not in defaults,
+        help='access key for source zone system user',
+        )
+    parser.add_argument(
+        '--src-secret-key',
+        required='src_secret_key' not in defaults,
+        help='secret key for source zone system user',
+        )
+    parser.add_argument(
+        '--dest-access-key',
+        required='dest_access_key' not in defaults,
+        help='access key for destination zone system user',
+        )
+    parser.add_argument(
+        '--dest-secret-key',
+        required='dest_secret_key' not in defaults,
+        help='secret key for destination zone system user',
+        )
+    parser.add_argument(
+        'destination',
+        type=check_endpoint,
+        nargs=None if 'destination' not in defaults else '?',
+        help='radosgw endpoint to which to sync '
+        '(e.g. http://zone2.example.org:8080)',
+        )
+    src_options = parser.add_mutually_exclusive_group(required=False)
+    src_options.add_argument(
+        '--source',
+        type=check_endpoint,
+        help='radosgw endpoint from which to sync '
+        '(e.g. http://zone1.example.org:8080)',
+        )
+    src_options.add_argument(
+        '--src-zone',
+        help='radosgw zone from which to sync',
+        )
+    parser.add_argument(
+        '--metadata-only',
+        action='store_true',
+        help='sync bucket and user metadata, but not bucket contents',
+        )
+    parser.add_argument(
+        '--versioned',
+        action='store_true',
+        help='indicates that radosgw endpoints have object versioning enabled',
+        )
+    parser.add_argument(
+        '--num-workers',
+        default=1,
+        type=check_positive_int,
+        help='number of items to sync at once',
+        )
+    parser.add_argument(
+        '--sync-scope',
+        choices=['full', 'incremental'],
+        default='incremental',
+        help='synchronize everything (for a new region) or only things that '
+             'have changed since the last run',
+        )
+    parser.add_argument(
+        '--lock-timeout',
+        type=check_positive_int,
+        default=60,
+        help='timeout in seconds after which a log segment lock will expire if '
+             'not refreshed',
+        )
+    parser.add_argument(
+        '--log-file',
+        help='where to store log output',
+        )
+    parser.add_argument(
+        '--max-entries',
+        type=check_positive_int,
+        default=1000,
+        help='maximum number of log entries to process at once during '
+        'continuous sync',
+        )
+    parser.add_argument(
+        '--incremental-sync-delay',
+        type=check_positive_int,
+        default=30,
+        help='seconds to wait between syncs',
+        )
+    parser.add_argument(
+        '--object-sync-timeout',
+        type=check_positive_int,
+        default=60 * 60 * 60,
+        help='seconds to wait for an individual object to sync before '
+        'assuming failure',
+        )
+    parser.add_argument(
+        '--prepare-error-delay',
+        type=check_positive_int,
+        default=10,
+        help='seconds to wait before retrying when preparing '
+        'an incremental sync fails',
+        )
+    parser.add_argument(
+        '--rgw-data-log-window',
+        type=check_positive_int,
+        default=30,
+        help='period until a data log entry is valid - '
+        'must match radosgw configuration',
+        )
+    parser.add_argument(
+        '--test-server-host',
+        # host to run a simple http server for testing the sync agent on,
+        help=argparse.SUPPRESS,
+        )
+    parser.add_argument(
+        '--test-server-port',
+        # port to run a simple http server for testing the sync agent on,
+        type=check_positive_int,
+        default=8080,
+        help=argparse.SUPPRESS,
+        )
+    parser.set_defaults(**defaults)
+    return parser.parse_args(remaining)
+
+
+def sign_string(
+        secret_key,
+        verb="GET",
+        content_md5="",
+        content_type="",
+        date=None,
+        canonical_amz_headers="",
+        canonical_resource="/?versions"
+        ):
+
+    date = date or time.asctime(time.gmtime())
+    to_sign = string.concatenate(verb, content_md5, content_type, date)
+    to_sign = string.concatenate(
+        canonical_amz_headers,
+        canonical_resource,
+        newline=False
+    )
+    return base64.b64encode(hmac.new(secret_key, to_sign, sha).digest())
+
+
+def check_versioning(endpoint):
+    date = time.asctime(time.gmtime())
+    signed_string = sign_string(endpoint.secret_key, date=date)
+
+    url = str(endpoint) + '/?versions'
+    headers = {
+        'Authorization': 'AWS ' + endpoint.access_key + ':' + signed_string,
+        'Date': date
+    }
+
+    data = None
+    req = urllib2.Request(url, data, headers)
+    try:
+        response = urllib2.urlopen(req)
+        response.read()
+        log.debug('%s endpoint supports versioning' % endpoint)
+        return True
+    except HTTPError as error:
+        if error.code == 403:
+            log.info('%s endpoint does not support versioning' % endpoint)
+        log.warning('encountered issues reaching to endpoint %s' % endpoint)
+        log.warning(error)
+    except URLError as error:
+        log.error("was unable to connect to %s" % url)
+        log.error(error)
+    return False
+
+
+class TestHandler(BaseHTTPRequestHandler):
+    """HTTP handler for testing radosgw-agent.
+
+    This should never be used outside of testing.
+    """
+    num_workers = None
+    lock_timeout = None
+    max_entries = None
+    rgw_data_log_window = 30
+    src = None
+    dest = None
+
+    def do_POST(self):
+        log = logging.getLogger(__name__)
+        status = 200
+        resp = ''
+        sync_cls = None
+        if self.path.startswith('/metadata/full'):
+            sync_cls = sync.MetaSyncerFull
+        elif self.path.startswith('/metadata/incremental'):
+            sync_cls = sync.MetaSyncerInc
+        elif self.path.startswith('/data/full'):
+            sync_cls = sync.DataSyncerFull
+        elif self.path.startswith('/data/incremental'):
+            sync_cls = sync.DataSyncerInc
+        else:
+            log.warn('invalid request, ignoring')
+            status = 400
+            resp = 'bad path'
+
+        try:
+            if sync_cls is not None:
+                syncer = sync_cls(TestHandler.src, TestHandler.dest,
+                                  TestHandler.max_entries,
+                                  rgw_data_log_window=TestHandler.rgw_data_log_window,
+                                  object_sync_timeout=TestHandler.object_sync_timeout)
+                syncer.prepare()
+                syncer.sync(
+                    TestHandler.num_workers,
+                    TestHandler.lock_timeout,
+                    )
+        except Exception as e:
+            log.exception('error during sync')
+            status = 500
+            resp = str(e)
+
+        self.log_request(status, len(resp))
+        if status >= 400:
+            self.send_error(status, resp)
+        else:
+            self.send_response(status)
+            self.end_headers()
+
+
+def set_args_to_config(args):
+    """
+    Ensure that the arguments passed in to the CLI are slapped onto the config
+    object so that it can be referenced throghout the agent
+    """
+    if 'args' not in config:
+        config['args'] = args.__dict__
+
+
+def log_header():
+    version = radosgw_agent.__version__
+    lines = [
+        ' __            __           __   ___      ___ ',
+        '/__` \ / |\ | /  `     /\  / _` |__  |\ |  |  ',
+        '.__/  |  | \| \__,    /~~\ \__> |___ | \|  |  ',
+        '                                     v%s' % version,
+    ]
+    for line in lines:
+        log.info(line)
+    log.info('agent options:')
+
+    secrets = [
+        'src_secret_key', 'dest_secret_key',
+        'src_access_key', 'dest_access_key',
+    ]
+
+    def log_dict(k, _dict, level=1):
+        padding = ' ' * level
+        log.info('%s%s:' % (padding, k))
+        for key, value in sorted(_dict.items()):
+            if hasattr(value, '_dict'):
+                level += 1
+                return log_dict(key, value, level)
+            if key in secrets:
+                value = '*' * 16
+            log.info('%s%-30s: %s' % (padding+'  ', key, value))
+
+    for k, v in config.items():
+        if hasattr(v, '_dict'):
+            log_dict(k, v)
+        else:
+            log.info(' %-30s: %s' % (k, v))
+
+
+@catches((KeyboardInterrupt, RuntimeError, AgentError,), handle_all=True)
+def main():
+    # root (a.k.a. 'parent') and agent loggers
+    root_logger = logging.getLogger()
+
+    # allow all levels at root_logger, handlers control individual levels
+    root_logger.setLevel(logging.DEBUG)
+
+    # Console handler, meant only for user-facing information
+    console_loglevel = logging.INFO
+
+    sh = logging.StreamHandler()
+    sh.setFormatter(util.log.color_format())
+    # this console level set here before reading options from the arguments
+    # so that we can get errors if they pop up before
+    sh.setLevel(console_loglevel)
+
+    agent_logger = logging.getLogger('radosgw_agent')
+    agent_logger.addHandler(sh)
+
+    # After initial logging is configured, now parse args
+    args = parse_args()
+
+    # File handler
+    log_file = args.log_file or 'radosgw-agent.log'
+    try:
+        fh = logging.handlers.WatchedFileHandler(log_file)
+    except IOError as err:
+        agent_logger.warning('unable to use log location: %s' % log_file)
+        agent_logger.warning(err)
+        agent_logger.warning('will fallback to ./radosgw-agent.log')
+        # if the location is not present, fallback to cwd
+        fh = logging.handlers.WatchedFileHandler('radosgw-agent.log')
+
+    fh.setLevel(logging.DEBUG)
+    fh.setFormatter(logging.Formatter(util.log.BASE_FORMAT))
+
+    root_logger.addHandler(fh)
+
+    if args.verbose:
+        console_loglevel = logging.DEBUG
+    elif args.quiet:
+        console_loglevel = logging.WARN
+
+    # now that we have parsed the actual log level we need
+    # reset it in the handler
+    sh.setLevel(console_loglevel)
+
+    # after loggin is set ensure that the arguments are present in the
+    # config object
+    set_args_to_config(args)
+
+    log_header()
+    dest = args.destination
+    dest.access_key = args.dest_access_key
+    dest.secret_key = args.dest_secret_key
+    src = args.source or client.Endpoint(None, None, None)
+    if args.src_zone:
+        src.zone = args.src_zone
+    dest_conn = client.connection(dest)
+
+    try:
+        region_map = client.get_region_map(dest_conn)
+    except AgentError:
+        # anything that we know about and are correctly raising should
+        # just get raised so that the decorator can handle it
+        raise
+    except Exception as error:
+        # otherwise, we have the below exception that will nicely deal with
+        # explaining what happened
+        raise RegionMapError(error)
+
+    client.configure_endpoints(region_map, dest, src, args.metadata_only)
+
+    src.access_key = args.src_access_key
+    src.secret_key = args.src_secret_key
+
+    if config['args']['versioned']:
+        log.debug('versioned flag enabled, overriding versioning check')
+        config['use_versioning'] = True
+    else:
+        config['use_versioning'] = check_versioning(src)
+
+    if args.test_server_host:
+        log.warn('TEST MODE - do not run unless you are testing this program')
+        TestHandler.src = src
+        TestHandler.dest = dest
+        TestHandler.num_workers = args.num_workers
+        TestHandler.lock_timeout = args.lock_timeout
+        TestHandler.max_entries = args.max_entries
+        TestHandler.rgw_data_log_window = args.rgw_data_log_window
+        TestHandler.object_sync_timeout = args.object_sync_timeout
+        server = HTTPServer((args.test_server_host, args.test_server_port),
+                            TestHandler)
+        server.serve_forever()
+        sys.exit()
+
+    if args.sync_scope == 'full':
+        meta_cls = sync.MetaSyncerFull
+        data_cls = sync.DataSyncerFull
+    else:
+        meta_cls = sync.MetaSyncerInc
+        data_cls = sync.DataSyncerInc
+
+    meta_syncer = meta_cls(src, dest, args.max_entries)
+    data_syncer = data_cls(src, dest, args.max_entries,
+                           rgw_data_log_window=args.rgw_data_log_window,
+                           object_sync_timeout=args.object_sync_timeout)
+
+    # fetch logs first since data logs need to wait before becoming usable
+    # due to rgw's window of data log updates during which the bucket index
+    # log may still be updated without the data log getting a new entry for
+    # the bucket
+    sync.prepare_sync(meta_syncer, args.prepare_error_delay)
+    if not args.metadata_only:
+        sync.prepare_sync(data_syncer, args.prepare_error_delay)
+
+    if args.sync_scope == 'full':
+        log.info('syncing all metadata')
+        meta_syncer.sync(args.num_workers, args.lock_timeout)
+        if not args.metadata_only:
+            log.info('syncing all data')
+            data_syncer.sync(args.num_workers, args.lock_timeout)
+        log.info('Finished full sync. Check logs to see any issues that '
+                 'incremental sync will retry.')
+    else:
+        sync.incremental_sync(meta_syncer, data_syncer,
+                              args.num_workers,
+                              args.lock_timeout,
+                              args.incremental_sync_delay,
+                              args.metadata_only,
+                              args.prepare_error_delay)
diff --git a/radosgw_agent/client.py b/radosgw_agent/client.py
new file mode 100644 (file)
index 0000000..54e81c0
--- /dev/null
@@ -0,0 +1,701 @@
+import boto
+import functools
+import json
+import logging
+import os
+import random
+import socket
+import sys
+import urllib
+from urlparse import urlparse
+
+from boto.exception import BotoServerError
+from boto.s3.connection import S3Connection
+
+from radosgw_agent import request as aws_request
+from radosgw_agent import config
+from radosgw_agent import exceptions as exc
+from radosgw_agent.util import get_dev_logger, network
+from radosgw_agent.constants import DEFAULT_TIME
+from radosgw_agent.exceptions import NetworkError
+
+log = logging.getLogger(__name__)
+dev_log = get_dev_logger(__name__)
+
+
+class Endpoint(object):
+    def __init__(self, host, port, secure,
+                 access_key=None, secret_key=None, region=None, zone=None):
+        self.host = host
+        default_port = 443 if secure else 80
+        self.port = port or default_port
+        self.secure = secure
+        self.access_key = access_key
+        self.secret_key = secret_key
+        self.region = region
+        self.zone = zone
+
+    def __eq__(self, other):
+        if self.host != other.host:
+            return False
+        if self.port == other.port:
+            return True
+        # if self and other are mixed http/https with default ports,
+        # i.e. http://example.com and https://example.com, consider
+        # them the same
+
+        def diff_only_default_ports(a, b):
+            return a.secure and a.port == 443 and not b.secure and b.port == 80
+        return (diff_only_default_ports(self, other) or
+                diff_only_default_ports(other, self))
+
+    def __repr__(self):
+        return 'Endpoint(host={host}, port={port}, secure={secure})'.format(
+            host=self.host,
+            port=self.port,
+            secure=self.secure)
+
+    def __str__(self):
+        scheme = 'https' if self.secure else 'http'
+        return '{scheme}://{host}:{port}'.format(scheme=scheme,
+                                                 host=self.host,
+                                                 port=self.port)
+def normalize_netloc(url_obj, port):
+    """
+    Only needed for IPV6 addresses because ``urlparse`` is so very
+    inconsistent with parsing::
+
+        In [5]: print urlparse('http://[e40:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922]:8080').hostname
+        e40:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922
+
+        In [6]: print urlparse('http://e40:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922').hostname
+        e40
+
+    In Python 2.6 this situation is even worse, urlparse is completely unreliable for things
+    like looking up a port on an IPV6 url.
+    """
+    netloc = url_obj.netloc
+    if port is not None:
+        # we need to split because we don't want it as part of the URL
+        netloc = url_obj.netloc.split(':%s' % port)[0]
+    if not url_obj.netloc.startswith('[') and not url_obj.netloc.endswith(']'):
+        netloc = '[%s]' % url_obj.netloc
+    return netloc
+
+
+def detect_ipv6_port(url_obj):
+    netloc = url_obj.netloc
+    try:
+        port = url_obj.port
+    except ValueError:
+        port = None
+
+    # insist on checking the port because urlparse may be lying to us
+    netloc_parts = netloc.split(']:')
+    if len(netloc_parts) == 2:
+        _, port = netloc_parts
+    if port:
+        return int(port)
+    return port
+
+
+def parse_endpoint(endpoint):
+    url = urlparse(endpoint)
+    # IPV6 addresses will not work correctly with urlparse and Endpoint if we
+    # just use netloc as default. IPV4 works with .hostname while IPV6 works
+    # with .netloc
+    # for example an IPV6 address like e40:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922
+    # would evaluate to 'e40' if we used .hostname
+
+    # this looks repetitive but we don't know yet if we are IPV6
+    try:
+        port = url.port
+    except ValueError:
+        port = None
+
+    if network.is_ipv6(url.netloc):
+        port = detect_ipv6_port(url)
+        if (sys.version_info[0], sys.version_info[1]) <= (2, 6):
+            log.error(
+                'Python 2.6 does not support IPV6 addresses, cannot continue'
+            )
+            log.error('host can be used instead of raw IPV6 addresses')
+            if os.environ.get('PYTEST') is None:  # don't bail on tests
+                raise RuntimeError(
+                    'IPV6 address was used for endpoint: %s' % endpoint
+                )
+        host = normalize_netloc(url, port)
+    else:
+        host = url.hostname
+    if url.scheme not in ['http', 'https']:
+        raise exc.InvalidProtocol('invalid protocol %r' % url.scheme)
+    if not url.hostname:
+        raise exc.InvalidHost('no hostname in %r' % endpoint)
+    return Endpoint(host, port, url.scheme == 'https')
+
+code_to_exc = {
+    404: exc.NotFound,
+    }
+
+
+def boto_call(func):
+    @functools.wraps(func)
+    def translate_exception(*args, **kwargs):
+        try:
+            return func(*args, **kwargs)
+        except boto.exception.S3ResponseError as e:
+            raise code_to_exc.get(e.status, exc.HttpError)(e.status, e.body)
+    return translate_exception
+
+
+def check_result_status(result):
+    if result.status / 100 != 2:
+        raise code_to_exc.get(result.status,
+                              exc.HttpError)(result.status, result.reason)
+
+
+def url_safe(component):
+    if isinstance(component, basestring):
+        string = component.encode('utf8')
+    else:
+        string = str(component)
+    return urllib.quote(string)
+
+
+def request(connection, type_, resource, params=None, headers=None,
+            data=None, expect_json=True, special_first_param=None, _retries=3):
+    if headers is None:
+        headers = {}
+
+    if type_ in ['put', 'post']:
+        headers['Content-Type'] = 'application/json; charset=UTF-8'
+
+    request_data = data if data else ''
+    if params is None:
+        params = {}
+    safe_params = dict([(k, url_safe(v)) for k, v in params.iteritems()])
+    connection.count_request()
+    request = aws_request.base_http_request(connection.s3_connection,
+                             type_.upper(),
+                             resource=resource,
+                             special_first_param=special_first_param,
+                             headers=headers,
+                             data=request_data,
+                             params=safe_params)
+
+    url = '{protocol}://{host}{path}'.format(protocol=request.protocol,
+                                             host=request.host,
+                                             path=request.path)
+
+    request.authorize(connection=connection)
+
+    boto.log.debug('url = %r\nparams=%r\nheaders=%r\ndata=%r',
+                   url, params, request.headers, data)
+
+    try:
+        result = aws_request.make_request(
+            connection.s3_connection,
+            type_.upper(),
+            resource=resource,
+            special_first_param=special_first_param,
+            headers=headers,
+            data=request_data,
+            params=safe_params,
+            _retries=_retries)
+    except socket.error as error:
+        msg = 'unable to connect to %s %s' % (request.host, error)
+        raise NetworkError(msg)
+
+    except BotoServerError as error:
+        check_result_status(error)
+
+    check_result_status(result)
+
+    if data or not expect_json:
+        return result
+
+    return json.loads(result.read())
+
+
+def get_metadata(connection, section, name):
+    return request(connection, 'get', 'admin/metadata/' + section,
+                   params=dict(key=name))
+
+
+def update_metadata(connection, section, name, metadata):
+    if not isinstance(metadata, basestring):
+        metadata = json.dumps(metadata)
+    return request(connection, 'put', 'admin/metadata/' + section,
+                   params=dict(key=name), data=metadata)
+
+
+def delete_metadata(connection, section, name):
+    return request(connection, 'delete', 'admin/metadata/' + section,
+                   params=dict(key=name), expect_json=False)
+
+
+def get_metadata_sections(connection):
+    return request(connection, 'get', 'admin/metadata')
+
+
+def list_metadata_keys(connection, section):
+    return request(connection, 'get', 'admin/metadata/' + section)
+
+
+def get_op_state(connection, client_id, op_id, bucket, obj):
+    return request(connection, 'get', 'admin/opstate',
+                   params={
+                       'op-id': op_id,
+                       'object': u'{0}/{1}'.format(bucket, obj.name),
+                       'client-id': client_id,
+                      }
+                   )
+
+
+def remove_op_state(connection, client_id, op_id, bucket, obj):
+    return request(connection, 'delete', 'admin/opstate',
+                   params={
+                       'op-id': op_id,
+                       'object': u'{0}/{1}'.format(bucket, obj.name),
+                       'client-id': client_id,
+                      },
+                   expect_json=False,
+                   )
+
+
+def get_bucket_list(connection):
+    return list_metadata_keys(connection, 'bucket')
+
+
+@boto_call
+def list_objects_in_bucket(connection, bucket_name):
+    versioned = config['use_versioning']
+
+    # use the boto library to do this
+    bucket = connection.get_bucket(bucket_name)
+    list_call = bucket.list_versions if versioned else bucket.list
+    try:
+        for key in list_call():
+            yield key
+    except boto.exception.S3ResponseError as e:
+        # since this is a generator, the exception will be raised when
+        # it's read, rather than when this call returns, so raise a
+        # unique exception to distinguish this from client errors from
+        # other calls
+        if e.status == 404:
+            raise exc.BucketEmpty()
+        else:
+            raise
+
+
+@boto_call
+def mark_delete_object(connection, bucket_name, obj, params=None):
+    """
+    Marking an object for deletion is only necessary for versioned objects, we
+    should not try these calls for non-versioned ones.
+
+    Usually, only full-sync operations will use this call, incremental should
+    perform actual delete operations with ``delete_versioned_object``
+    """
+    params = params or {}
+
+    params['rgwx-version-id'] = obj.version_id
+    params['rgwx-versioned-epoch'] = obj.VersionedEpoch
+
+    path = u'{bucket}/{object}'.format(
+        bucket=bucket_name,
+        object=obj.name,
+        )
+
+    return request(connection, 'delete', path,
+                   params=params,
+                   expect_json=False)
+
+
+@boto_call
+def delete_versioned_object(connection, bucket_name, obj):
+    """
+    Perform a delete on a versioned object, the requirements for these types
+    of requests is to be able to pass the ``versionID`` as a query argument
+    """
+    # if obj.delete_marker is False we should not delete this and we shouldn't
+    # have been called, so return without doing anything
+    if getattr(obj, 'delete_marker', False) is False:
+        log.info('obj: %s has `delete_marker=False`, will skip' % obj.name)
+        return
+
+    params = {}
+
+    params['rgwx-version-id'] = obj.version_id
+    params['rgwx-versioned-epoch'] = obj.VersionedEpoch
+    params['versionID'] = obj.version_id
+
+    path = u'{bucket}/{object}'.format(
+        bucket=bucket_name,
+        object=obj.name,
+        )
+
+    return request(connection, 'delete', path,
+                   params=params,
+                   expect_json=False)
+
+
+@boto_call
+def delete_object(connection, bucket_name, obj):
+    if is_versioned(obj):
+        log.debug('performing a delete for versioned obj: %s' % obj.name)
+        delete_versioned_object(connection, bucket_name, obj)
+    else:
+        bucket = connection.get_bucket(bucket_name)
+        bucket.delete_key(obj.name)
+
+
+def is_versioned(obj):
+    """
+    Check if a given object is versioned by inspecting some of its attributes.
+    """
+    # before any heuristic, newer versions of RGW will tell if an obj is
+    # versioned so try that first
+    if hasattr(obj, 'versioned'):
+        return obj.versioned
+
+    if not hasattr(obj, 'VersionedEpoch'):
+        # overly paranoid here, an object that is not versioned should *never*
+        # have a `VersionedEpoch` attribute
+        if getattr(obj, 'version_id', None):
+            if obj.version_id is None:
+                return False
+            return True  # probably will never get here
+        return False
+    return True
+
+
+def sync_object_intra_region(connection, bucket_name, obj, src_zone,
+                             client_id, op_id):
+
+    params = {
+        'rgwx-source-zone': src_zone,
+        'rgwx-client-id': client_id,
+        'rgwx-op-id': op_id,
+    }
+
+    if is_versioned(obj):
+        log.debug('detected obj as versioned: %s' % obj.name)
+        log.debug('obj attributes are:')
+        for k in dir(obj):
+            if not k.startswith('_'):
+                v = getattr(obj, k, None)
+                log.debug('%s.%s = %s' % (obj.name, k, v))
+
+        # set the extra params to support versioned operations
+        params['rgwx-version-id'] = obj.version_id
+        params['rgwx-versioned-epoch'] = obj.VersionedEpoch
+
+        # delete_marker may not exist in the obj
+        if getattr(obj, 'delete_marker', None) is True:
+            log.debug('obj %s has a delete_marker, marking for deletion' % obj.name)
+            # when the object has a delete marker we need to create it with
+            # a delete marker on the destination rather than copying
+            return mark_delete_object(connection, bucket_name, obj, params=params)
+
+    path = u'{bucket}/{object}'.format(
+        bucket=bucket_name,
+        object=obj.name,
+        )
+
+    return request(connection, 'put', path,
+                   params=params,
+                   headers={
+                       'x-amz-copy-source': url_safe('%s/%s' % (bucket_name, obj.name)),
+                       },
+                   expect_json=False)
+
+
+def lock_shard(connection, lock_type, shard_num, zone_id, timeout, locker_id):
+    return request(connection, 'post', 'admin/log',
+                   params={
+                       'type': lock_type,
+                       'id': shard_num,
+                       'length': timeout,
+                       'zone-id': zone_id,
+                       'locker-id': locker_id,
+                       },
+                   special_first_param='lock',
+                   expect_json=False)
+
+
+def unlock_shard(connection, lock_type, shard_num, zone_id, locker_id):
+    return request(connection, 'post', 'admin/log',
+                   params={
+                       'type': lock_type,
+                       'id': shard_num,
+                       'locker-id': locker_id,
+                       'zone-id': zone_id,
+                       },
+                   special_first_param='unlock',
+                   expect_json=False)
+
+
+def _id_name(type_):
+    return 'bucket-instance' if type_ == 'bucket-index' else 'id'
+
+
+def get_log(connection, log_type, marker, max_entries, id_):
+    key = _id_name(log_type)
+    return request(connection, 'get', 'admin/log',
+                   params={
+                       'type': log_type,
+                       key: id_,
+                       'marker': marker,
+                       'max-entries': max_entries,
+                       },
+                   )
+
+
+def get_log_info(connection, log_type, id_):
+    key = _id_name(log_type)
+    return request(
+        connection, 'get', 'admin/log',
+        params={
+            'type': log_type,
+            key: id_,
+            },
+        special_first_param='info',
+        )
+
+
+def num_log_shards(connection, shard_type):
+    out = request(connection, 'get', 'admin/log', dict(type=shard_type))
+    return out['num_objects']
+
+
+def set_worker_bound(connection, type_, marker, timestamp,
+                     daemon_id, id_, data=None, sync_type='incremental'):
+    """
+
+    :param sync_type: The type of synchronization that should be attempted by
+    the agent, defaulting to "incremental" but can also be "full".
+    """
+    if data is None:
+        data = []
+    key = _id_name(type_)
+    boto.log.debug('set_worker_bound: data = %r', data)
+    return request(
+        connection, 'post', 'admin/replica_log',
+        params={
+            'type': type_,
+            key: id_,
+            'marker': marker,
+            'time': timestamp,
+            'daemon_id': daemon_id,
+            'sync-type': sync_type,
+            },
+        data=json.dumps(data),
+        special_first_param='work_bound',
+        )
+
+
+def del_worker_bound(connection, type_, daemon_id, id_):
+    key = _id_name(type_)
+    return request(
+        connection, 'delete', 'admin/replica_log',
+        params={
+            'type': type_,
+            key: id_,
+            'daemon_id': daemon_id,
+            },
+        special_first_param='work_bound',
+        expect_json=False,
+        )
+
+
+def get_worker_bound(connection, type_, id_):
+    key = _id_name(type_)
+    try:
+        out = request(
+            connection, 'get', 'admin/replica_log',
+            params={
+                'type': type_,
+                key: id_,
+                },
+            special_first_param='bounds',
+            )
+        dev_log.debug('get_worker_bound returned: %r', out)
+    except exc.NotFound:
+        dev_log.debug('no worker bound found for bucket instance "%s"',
+                      id_)
+        # if no worker bounds have been set, start from the beginning
+        # returning fallback, default values
+        return dict(
+            marker=' ',
+            oldest_time=DEFAULT_TIME,
+            retries=[]
+        )
+
+    retries = set()
+    for item in out['markers']:
+        names = [retry['name'] for retry in item['items_in_progress']]
+        retries = retries.union(names)
+    out['retries'] = retries
+    return out
+
+
+class Zone(object):
+    def __init__(self, zone_info):
+        self.name = zone_info['name']
+        self.is_master = False
+        self.endpoints = [parse_endpoint(e) for e in zone_info['endpoints']]
+        self.log_meta = zone_info['log_meta'] == 'true'
+        self.log_data = zone_info['log_data'] == 'true'
+
+    def __repr__(self):
+        return str(self)
+
+    def __str__(self):
+        return self.name
+
+
+class Region(object):
+    def __init__(self, region_info):
+        self.name = region_info['key']
+        self.is_master = region_info['val']['is_master'] == 'true'
+        self.zones = {}
+        for zone_info in region_info['val']['zones']:
+            zone = Zone(zone_info)
+            self.zones[zone.name] = zone
+            if zone.name == region_info['val']['master_zone']:
+                zone.is_master = True
+                self.master_zone = zone
+        assert hasattr(self, 'master_zone'), \
+               'No master zone found for region ' + self.name
+
+    def __repr__(self):
+        return str(self)
+
+    def __str__(self):
+        return str(self.zones.keys())
+
+
+class RegionMap(object):
+    def __init__(self, region_map):
+        self.regions = {}
+        for region_info in region_map['regions']:
+            region = Region(region_info)
+            self.regions[region.name] = region
+            if region.is_master:
+                self.master_region = region
+        assert hasattr(self, 'master_region'), \
+               'No master region found in region map'
+
+    def __repr__(self):
+        return str(self)
+
+    def __str__(self):
+        return str(self.regions)
+
+    def find_endpoint(self, endpoint):
+        for region in self.regions.itervalues():
+            for zone in region.zones.itervalues():
+                if endpoint in zone.endpoints or endpoint.zone == zone.name:
+                    return region, zone
+        raise exc.ZoneNotFound('%s not found in region map' % endpoint)
+
+
+def get_region_map(connection):
+    region_map = request(connection, 'get', 'admin/config')
+    return RegionMap(region_map)
+
+
+def _validate_sync_dest(dest_region, dest_zone):
+    if dest_region.is_master and dest_zone.is_master:
+        raise exc.InvalidZone('destination cannot be master zone of master region')
+
+
+def _validate_sync_source(src_region, src_zone, dest_region, dest_zone,
+                          meta_only):
+    if not src_zone.is_master:
+        raise exc.InvalidZone('source zone %s must be a master zone' % src_zone.name)
+    if (src_region.name == dest_region.name and
+        src_zone.name == dest_zone.name):
+        raise exc.InvalidZone('source and destination must be different zones')
+    if not src_zone.log_meta:
+        raise exc.InvalidZone('source zone %s must have metadata logging enabled' % src_zone.name)
+    if not meta_only and not src_zone.log_data:
+        raise exc.InvalidZone('source zone %s must have data logging enabled' % src_zone.name)
+    if not meta_only and src_region.name != dest_region.name:
+        raise exc.InvalidZone('data sync can only occur between zones in the same region')
+    if not src_zone.endpoints:
+        raise exc.InvalidZone('region map contains no endpoints for default source zone %s' % src_zone.name)
+
+
+def configure_endpoints(region_map, dest_endpoint, src_endpoint, meta_only):
+    print('region map is: %r' % region_map)
+
+    dest_region, dest_zone = region_map.find_endpoint(dest_endpoint)
+    _validate_sync_dest(dest_region, dest_zone)
+
+    # source may be specified by http endpoint or zone name
+    if src_endpoint.host or src_endpoint.zone:
+        src_region, src_zone = region_map.find_endpoint(src_endpoint)
+    else:
+        # try the master zone in the same region, then the master zone
+        # in the master region
+        try:
+            _validate_sync_source(dest_region, dest_region.master_zone,
+                                  dest_region, dest_zone, meta_only)
+            src_region, src_zone = dest_region, dest_region.master_zone
+        except exc.InvalidZone as e:
+            log.debug('source region %s zone %s unaccetpable: %s',
+                      dest_region.name, dest_region.master_zone.name, e)
+            master_region = region_map.master_region
+            src_region, src_zone = master_region, master_region.master_zone
+
+    _validate_sync_source(src_region, src_zone, dest_region, dest_zone,
+                          meta_only)
+
+    # choose a random source endpoint if one wasn't specified
+    if not src_endpoint.host:
+        endpoint = random.choice(src_zone.endpoints)
+        src_endpoint.host = endpoint.host
+        src_endpoint.port = endpoint.port
+        src_endpoint.secure = endpoint.secure
+
+    # fill in region and zone names
+    dest_endpoint.region = dest_region
+    dest_endpoint.zone = dest_zone
+    src_endpoint.region = src_region
+    src_endpoint.zone = src_zone
+
+
+class S3ConnectionWrapper(object):
+    def __init__(self, endpoint, debug):
+        self.endpoint = endpoint
+        self.debug = debug
+        self.s3_connection = None
+        self.reqs_before_reset = 512
+        self._recreate_s3_connection()
+
+    def count_request(self):
+        self.num_requests += 1
+        if self.num_requests > self.reqs_before_reset:
+            self._recreate_s3_connection()
+
+    def _recreate_s3_connection(self):
+        self.num_requests = 0
+        self.s3_connection = S3Connection(
+            aws_access_key_id=self.endpoint.access_key,
+            aws_secret_access_key=self.endpoint.secret_key,
+            is_secure=self.endpoint.secure,
+            host=self.endpoint.host,
+            port=self.endpoint.port,
+            calling_format=boto.s3.connection.OrdinaryCallingFormat(),
+            debug=self.debug,
+        )
+
+    def __getattr__(self, attrib):
+        return getattr(self.s3_connection, attrib)
+
+
+def connection(endpoint, debug=None):
+    log.info('creating connection to endpoint: %s' % endpoint)
+    return S3ConnectionWrapper(endpoint, debug)
diff --git a/radosgw_agent/constants.py b/radosgw_agent/constants.py
new file mode 100644 (file)
index 0000000..0492ff6
--- /dev/null
@@ -0,0 +1,6 @@
+
+RESULT_SUCCESS = 0
+RESULT_ERROR = 1
+
+DEFAULT_TIME = '1970-01-01 00:00:00'
+
diff --git a/radosgw_agent/exceptions.py b/radosgw_agent/exceptions.py
new file mode 100644 (file)
index 0000000..f461c40
--- /dev/null
@@ -0,0 +1,77 @@
+
+class AgentError(Exception):
+    """
+    The actual base exception for the agent
+    """
+
+
+class ClientException(AgentError):
+    """
+    Historical base radosgw_agent client exception.
+    """
+    pass
+
+
+class NetworkError(AgentError):
+    pass
+
+
+class RegionMapError(AgentError):
+
+    def __init__(self, error):
+        self.error = error
+
+    def __str__(self):
+        msg = 'Could not retrieve region map from destination: %s'
+        return msg % self.error
+
+
+class InvalidProtocol(ClientException):
+    pass
+
+
+class InvalidHost(ClientException):
+    pass
+
+
+class InvalidZone(ClientException):
+    pass
+
+
+class ZoneNotFound(ClientException):
+    pass
+
+
+class BucketEmpty(ClientException):
+    pass
+
+
+class HttpError(ClientException):
+    def __init__(self, code, body):
+        self.code = code
+        self.str_code = str(code)
+        self.body = body
+        self.message = 'Http error code %s content %s' % (code, body)
+
+    def __str__(self):
+        return self.message
+
+
+class NotFound(HttpError):
+    pass
+
+
+class SkipShard(Exception):
+    pass
+
+
+class SyncError(Exception):
+    pass
+
+
+class SyncTimedOut(SyncError):
+    pass
+
+
+class SyncFailed(SyncError):
+    pass
diff --git a/radosgw_agent/lock.py b/radosgw_agent/lock.py
new file mode 100644 (file)
index 0000000..3e64f16
--- /dev/null
@@ -0,0 +1,114 @@
+import logging
+import threading
+import time
+
+from radosgw_agent import client
+from radosgw_agent import exceptions as exc
+from radosgw_agent.util import get_dev_logger
+
+log = logging.getLogger(__name__)
+dev_log = get_dev_logger(__name__)
+
+
+class LockBroken(Exception):
+    pass
+
+
+class LockRenewFailed(LockBroken):
+    pass
+
+
+class LockExpired(LockBroken):
+    pass
+
+
+class Lock(threading.Thread):
+    """A lock on a shard log that automatically refreshes itself.
+
+    It may be used to lock different shards throughout its lifetime.
+    To lock a new shard, call aquire() with the shard_num desired.
+
+    To release the lock, call release_and_clear(). This will raise an
+    exception if the lock ever failed to be acquired in the timeout
+    period.
+    """
+
+    def __init__(self, conn, type_, locker_id, timeout, zone_id):
+        super(Lock, self).__init__()
+        self.conn = conn
+        self.type = type_
+        self.timeout = timeout
+        self.lock = threading.Lock()
+        self.locker_id = locker_id
+        self.zone_id = zone_id
+        self.shard_num = None
+        self.last_locked = None
+        self.failed = False
+
+    def set_shard(self, shard_num):
+        dev_log.debug('set_shard to %d', shard_num)
+        with self.lock:
+            assert self.shard_num is None, \
+                'attempted to acquire new lock without releasing old one'
+            self.failed = False
+            self.last_locked = None
+            self.shard_num = shard_num
+
+    def unset_shard(self):
+        dev_log.debug('unset shard')
+        with self.lock:
+            self.shard_num = None
+
+    def acquire(self):
+        """Renew an existing lock, or acquire a new one.
+
+        The old lock must have already been released if shard_num is specified.
+        client.NotFound may be raised if the log contains no entries.
+        """
+        dev_log.debug('acquire lock')
+        with self.lock:
+            self._acquire()
+
+    def _acquire(self):
+        # same as aqcuire() but assumes self.lock is held
+        now = time.time()
+        client.lock_shard(self.conn, self.type, self.shard_num,
+                          self.zone_id, self.timeout, self.locker_id)
+        self.last_locked = now
+
+    def release_and_clear(self):
+        """Release the lock currently being held.
+
+        Prevent it from being automatically renewed, and check if there
+        were any errors renewing the current lock or if it expired.
+        If the lock was not sustained, raise LockAcquireFailed or LockExpired.
+        """
+        dev_log.debug('release and clear lock')
+        with self.lock:
+            shard_num = self.shard_num
+            self.shard_num = None
+            diff = time.time() - self.last_locked
+            if diff > self.timeout:
+                msg = 'lock was not renewed in over %0.2f seconds' % diff
+                raise LockExpired(msg)
+            if self.failed:
+                raise LockRenewFailed()
+            try:
+                client.unlock_shard(self.conn, self.type, shard_num,
+                                    self.zone_id, self.locker_id)
+            except exc.HttpError as e:
+                log.warn('failed to unlock shard %d in zone %s: %s',
+                         shard_num, self.zone_id, e)
+            self.last_locked = None
+
+    def run(self):
+        while True:
+            with self.lock:
+                if self.shard_num is not None:
+                    try:
+                        self._acquire()
+                    except exc.HttpError as e:
+                        log.error('locking shard %d in zone %s failed: %s',
+                                  self.shard_num, self.zone_id, e)
+                        self.failed = True
+            time.sleep(0.5 * self.timeout)
diff --git a/radosgw_agent/request.py b/radosgw_agent/request.py
new file mode 100644 (file)
index 0000000..86cc066
--- /dev/null
@@ -0,0 +1,189 @@
+import boto
+import logging
+import sys
+from boto.connection import AWSAuthConnection
+
+log = logging.getLogger(__name__)
+
+
+def urlencode(query, doseq=0):
+    """
+    Note: ported from urllib.urlencode, but with the ability to craft the query
+    string without quoting the params again.
+
+    Encode a sequence of two-element tuples or dictionary into a URL query
+    string.
+
+    If any values in the query arg are sequences and doseq is true, each
+    sequence element is converted to a separate parameter.
+
+    If the query arg is a sequence of two-element tuples, the order of the
+    parameters in the output will match the order of parameters in the
+    input.
+    """
+
+    if hasattr(query, "items"):
+        # mapping objects
+        query = query.items()
+    else:
+        # it's a bother at times that strings and string-like objects are
+        # sequences...
+        try:
+            # non-sequence items should not work with len()
+            # non-empty strings will fail this
+            if len(query) and not isinstance(query[0], tuple):
+                raise TypeError
+            # zero-length sequences of all types will get here and succeed,
+            # but that's a minor nit - since the original implementation
+            # allowed empty dicts that type of behavior probably should be
+            # preserved for consistency
+        except TypeError:
+            ty, va, tb = sys.exc_info()
+            raise TypeError, "not a valid non-string sequence or mapping object", tb
+
+    l = []
+    if not doseq:
+        # preserve old behavior
+        for k, v in query:
+            k = str(k)
+            v = str(v)
+            l.append(k + '=' + v)
+        print l
+    else:
+        for k, v in query:
+            k = str(k)
+            if isinstance(v, str):
+                l.append(k + '=' + v)
+            elif isinstance(v, unicode):
+                # is there a reasonable way to convert to ASCII?
+                # encode generates a string, but "replace" or "ignore"
+                # lose information and "strict" can raise UnicodeError
+                v = v.encode("ASCII", "replace")
+                l.append(k + '=' + v)
+            else:
+                try:
+                    # is this a sufficient test for sequence-ness?
+                    len(v)
+                except TypeError:
+                    # not a sequence
+                    v = str(v)
+                    l.append(k + '=' + v)
+                else:
+                    # loop over the sequence
+                    for elt in v:
+                        l.append(k + '=' + str(elt))
+    return '&'.join(l)
+
+
+class MetaData(object):
+    """
+    A basic container class that other than the method, it just registers
+    all the keyword arguments passed in, so that it is easier/nicer to
+    re-use the values
+    """
+
+    def __init__(self, conn, method, **kw):
+        self.conn = conn
+        self.method = method
+        for k, v in kw.items():
+            setattr(self, k, v)
+
+
+def base_http_request(conn, method, basepath='', resource='', headers=None,
+                      data=None, special_first_param=None, params=None):
+    """
+    Returns a ``AWSAuthConnection.build_base_http_request`` call with the
+    preserving of the special params done by ``build``.
+    """
+
+    # request meta data
+    md = build(
+        conn,
+        method,
+        basepath=basepath,
+        resource=resource,
+        headers=headers,
+        data=data,
+        special_first_param=special_first_param,
+        params=params,
+    )
+
+    return AWSAuthConnection.build_base_http_request(
+        md.conn, md.method, md.path,
+        md.auth_path, md.params, md.headers,
+        md.data, md.host)
+
+
+def make_request(conn, method, basepath='', resource='', headers=None,
+                 data=None, special_first_param=None, params=None, _retries=3):
+    """
+    Returns a ``AWSAuthConnection.make_request`` call with the preserving
+    of the special params done by ``build``.
+    """
+    # request meta data
+    md = build(
+        conn,
+        method,
+        basepath=basepath,
+        resource=resource,
+        headers=headers,
+        data=data,
+        special_first_param=special_first_param,
+        params=params,
+    )
+
+    if params:
+        # we basically need to do this ourselves now. BOTO doesn't do it for us
+        # in make_request
+        result = []
+        for k, vs in params.items():
+            if isinstance(vs, basestring) or not hasattr(vs, '__iter__'):
+                vs = [vs]
+            for v in vs:
+                if v is not None:
+                    result.append(
+                        (k.encode('utf-8') if isinstance(k, str) else k,
+                         v.encode('utf-8') if isinstance(v, str) else v))
+        appending_char = '&' if md.special_first_param else '?'
+        md.path = '%s%s%s' % (md.path, appending_char, urlencode(result, doseq=True))
+
+    return AWSAuthConnection.make_request(
+        md.conn, md.method, md.path,
+        headers=md.headers,
+        data=md.data,
+        host=md.host,
+        auth_path=md.auth_path,
+        params=md.params,
+        override_num_retries=_retries
+    )
+
+
+def build(conn, method, basepath='', resource='', headers=None,
+           data=None, special_first_param=None, params=None):
+    """
+    Adapted from the build_request() method of boto.connection
+    """
+
+    path = conn.calling_format.build_path_base(basepath, resource)
+    auth_path = conn.calling_format.build_auth_path(basepath, resource)
+    host = conn.calling_format.build_host(conn.server_name(), '')
+
+    if special_first_param:
+        path += '?' + special_first_param
+        boto.log.debug('path=%s' % path)
+        auth_path += '?' + special_first_param
+        boto.log.debug('auth_path=%s' % auth_path)
+
+    return MetaData(
+        conn,
+        method,
+        path=path,
+        auth_path=auth_path,
+        basepath=basepath,
+        resource=resource,
+        headers=headers,
+        data=data,
+        special_first_param=special_first_param,
+        params=params,
+        host=host,
+    )
diff --git a/radosgw_agent/sync.py b/radosgw_agent/sync.py
new file mode 100644 (file)
index 0000000..6d40063
--- /dev/null
@@ -0,0 +1,330 @@
+import logging
+import multiprocessing
+import time
+
+from radosgw_agent import worker
+from radosgw_agent import client
+from radosgw_agent.util import get_dev_logger
+from radosgw_agent.exceptions import NotFound, HttpError
+
+
+log = logging.getLogger(__name__)
+dev_log = get_dev_logger(__name__)
+
+# the replica log api only supports one entry, and updating it
+# requires sending a daemon id that matches the existing one. This
+# doesn't make a whole lot of sense with the current structure of
+# radosgw-agent, so just use a constant value for the daemon id.
+DAEMON_ID = 'radosgw-agent'
+
+def prepare_sync(syncer, error_delay):
+    """Attempt to prepare a syncer for running a sync.
+
+    :param error_delay: seconds to wait before retrying
+
+    This will retry forever so the sync agent continues if radosgws
+    are unavailable temporarily.
+    """
+    while True:
+        try:
+            syncer.prepare()
+            break
+        except Exception:
+            log.warn('error preparing for sync, will retry. Traceback:',
+                     exc_info=True)
+            time.sleep(error_delay)
+
+def incremental_sync(meta_syncer, data_syncer, num_workers, lock_timeout,
+                     incremental_sync_delay, metadata_only, error_delay):
+    """Run a continuous incremental sync.
+
+    This will run forever, pausing between syncs by a
+    incremental_sync_delay seconds.
+    """
+    while True:
+        try:
+            meta_syncer.sync(num_workers, lock_timeout)
+            if not metadata_only:
+                data_syncer.sync(num_workers, lock_timeout)
+        except Exception:
+            log.warn('error doing incremental sync, will try again. Traceback:',
+                     exc_info=True)
+
+        # prepare data before sleeping due to rgw_log_bucket_window
+        if not metadata_only:
+            prepare_sync(data_syncer, error_delay)
+        log.info('waiting %d seconds until next sync',
+                 incremental_sync_delay)
+        time.sleep(incremental_sync_delay)
+        prepare_sync(meta_syncer, error_delay)
+
+class Syncer(object):
+    def __init__(self, src, dest, max_entries, *args, **kwargs):
+        self.src = src
+        self.dest = dest
+        self.src_conn = client.connection(src)
+        self.dest_conn = client.connection(dest)
+        self.daemon_id = DAEMON_ID
+        self.worker_cls = None # filled in by subclass constructor
+        self.num_shards = None
+        self.max_entries = max_entries
+        self.object_sync_timeout = kwargs.get('object_sync_timeout')
+
+    def init_num_shards(self):
+        if self.num_shards is not None:
+            return
+        try:
+            self.num_shards = client.num_log_shards(self.src_conn, self.type)
+            log.debug('%d shards to check', self.num_shards)
+        except Exception:
+            log.error('finding number of shards failed')
+            raise
+
+    def shard_num_for_key(self, key):
+        key = key.encode('utf8')
+        hash_val = 0
+        for char in key:
+            c = ord(char)
+            hash_val = (hash_val + (c << 4) + (c >> 4)) * 11
+        return hash_val % self.num_shards
+
+    def prepare(self):
+        """Setup any state required before syncing starts.
+
+        This must be called before sync().
+        """
+        pass
+
+    def generate_work(self):
+        """Generate items to be place in a queue or processing"""
+        pass
+
+    def wait_until_ready(self):
+        pass
+
+    def complete_item(self, shard_num, retries):
+        """Called when syncing a single item completes successfully"""
+        marker = self.shard_info.get(shard_num)
+        if not marker:
+            return
+        try:
+            data = [dict(name=retry, time=worker.DEFAULT_TIME)
+                    for retry in retries]
+            client.set_worker_bound(self.dest_conn,
+                                    self.type,
+                                    marker,
+                                    worker.DEFAULT_TIME,
+                                    self.daemon_id,
+                                    shard_num,
+                                    data)
+        except Exception:
+            log.warn('could not set worker bounds, may repeat some work.'
+                     'Traceback:', exc_info=True)
+
+    def sync(self, num_workers, log_lock_time):
+        workQueue = multiprocessing.Queue()
+        resultQueue = multiprocessing.Queue()
+
+        processes = [self.worker_cls(workQueue,
+                                     resultQueue,
+                                     log_lock_time,
+                                     self.src,
+                                     self.dest,
+                                     daemon_id=self.daemon_id,
+                                     max_entries=self.max_entries,
+                                     object_sync_timeout=self.object_sync_timeout,
+                                     )
+                     for i in xrange(num_workers)]
+        for process in processes:
+            process.daemon = True
+            process.start()
+
+        self.wait_until_ready()
+
+        log.info('Starting sync')
+        # enqueue the shards to be synced
+        num_items = 0
+        for item in self.generate_work():
+            num_items += 1
+            workQueue.put(item)
+
+        # add a poison pill for each worker
+        for i in xrange(num_workers):
+            workQueue.put(None)
+
+        # pull the results out as they are produced
+        retries = {}
+        for i in xrange(num_items):
+            result, item = resultQueue.get()
+            shard_num, retries = item
+            if result == worker.RESULT_SUCCESS:
+                log.debug('synced item %r successfully', item)
+                self.complete_item(shard_num, retries)
+            else:
+                log.error('error syncing shard %d', shard_num)
+                retries.append(shard_num)
+
+            log.info('%d/%d items processed', i + 1, num_items)
+        if retries:
+            log.error('Encountered errors syncing these %d shards: %r',
+                      len(retries), retries)
+
+
+class IncrementalSyncer(Syncer):
+
+    def get_worker_bound(self, shard_num):
+        bound = client.get_worker_bound(
+            self.dest_conn,
+            self.type,
+            shard_num)
+
+        marker = bound['marker']
+        retries = bound['retries']
+
+        dev_log.debug('oldest marker and time for shard %d are: %r %r',
+                      shard_num, marker, bound['oldest_time'])
+        dev_log.debug('%d items to retry are: %r', len(retries), retries)
+
+        return marker, retries
+
+
+    def get_log_entries(self, shard_num, marker):
+        try:
+            result = client.get_log(self.src_conn, self.type,
+                                    marker, self.max_entries,
+                                    shard_num)
+            last_marker = result['marker']
+            log_entries = result['entries']
+            if len(log_entries) == self.max_entries:
+                log.warn('shard %d log has fallen behind - log length >= %d',
+                         shard_num, self.max_entries)
+        except NotFound:
+            # no entries past this marker yet, but we my have retries
+            last_marker = ' '
+            log_entries = []
+        return last_marker, log_entries
+
+    def prepare(self):
+        self.init_num_shards()
+
+        self.shard_info = {}
+        self.shard_work = {}
+        for shard_num in xrange(self.num_shards):
+            marker, retries = self.get_worker_bound(shard_num)
+            last_marker, log_entries = self.get_log_entries(shard_num, marker)
+            self.shard_work[shard_num] = log_entries, retries
+            self.shard_info[shard_num] = last_marker
+
+        self.prepared_at = time.time()
+
+    def generate_work(self):
+        return self.shard_work.iteritems()
+
+
+class MetaSyncerInc(IncrementalSyncer):
+
+    def __init__(self, *args, **kwargs):
+        super(MetaSyncerInc, self).__init__(*args, **kwargs)
+        self.worker_cls = worker.MetadataWorkerIncremental
+        self.type = 'metadata'
+
+
+class DataSyncerInc(IncrementalSyncer):
+
+    def __init__(self, *args, **kwargs):
+        super(DataSyncerInc, self).__init__(*args, **kwargs)
+        self.worker_cls = worker.DataWorkerIncremental
+        self.type = 'data'
+        self.rgw_data_log_window = kwargs.get('rgw_data_log_window', 30)
+
+    def wait_until_ready(self):
+        log.info('waiting to make sure bucket log is consistent')
+        while time.time() < self.prepared_at + self.rgw_data_log_window:
+            time.sleep(1)
+
+
+class DataSyncerFull(Syncer):
+
+    def __init__(self, *args, **kwargs):
+        super(DataSyncerFull, self).__init__(*args, **kwargs)
+        self.worker_cls = worker.DataWorkerFull
+        self.type = 'data'
+        self.rgw_data_log_window = kwargs.get('rgw_data_log_window', 30)
+
+    def prepare(self):
+        log.info('preparing to do a full data sync')
+        self.init_num_shards()
+
+        # save data log markers for each shard
+        self.shard_info = {}
+        for shard in xrange(self.num_shards):
+            info = client.get_log_info(self.src_conn, 'data', shard)
+            # setting an empty marker returns an error
+            if info['marker']:
+                self.shard_info[shard] = info['marker']
+            else:
+                self.shard_info[shard] = ' '
+
+        # get list of buckets after getting any markers to avoid skipping
+        # entries added before we got the marker info
+        log.debug('getting bucket list')
+        buckets = client.get_bucket_list(self.src_conn)
+
+        self.prepared_at = time.time()
+
+        self.buckets_by_shard = {}
+        for bucket in buckets:
+            shard = self.shard_num_for_key(bucket)
+            self.buckets_by_shard.setdefault(shard, [])
+            self.buckets_by_shard[shard].append(bucket)
+
+    def generate_work(self):
+        return self.buckets_by_shard.iteritems()
+
+    def wait_until_ready(self):
+        log.info('waiting to make sure bucket log is consistent')
+        while time.time() < self.prepared_at + self.rgw_data_log_window:
+            time.sleep(1)
+
+
+class MetaSyncerFull(Syncer):
+    def __init__(self, *args, **kwargs):
+        super(MetaSyncerFull, self).__init__(*args, **kwargs)
+        self.worker_cls = worker.MetadataWorkerFull
+        self.type = 'metadata'
+
+    def prepare(self):
+        try:
+            self.sections = client.get_metadata_sections(self.src_conn)
+        except HttpError as e:
+            log.error('Error listing metadata sections: %s', e)
+            raise
+
+        # grab the lastest shard markers and timestamps before we sync
+        self.shard_info = {}
+        self.init_num_shards()
+        for shard_num in xrange(self.num_shards):
+            info = client.get_log_info(self.src_conn, 'metadata', shard_num)
+            # setting an empty marker returns an error
+            if info['marker']:
+                self.shard_info[shard_num] = info['marker']
+            else:
+                self.shard_info[shard_num] = ' '
+
+        self.metadata_by_shard = {}
+        for section in self.sections:
+            try:
+                for key in client.list_metadata_keys(self.src_conn, section):
+                    shard = self.shard_num_for_key(section + ':' + key)
+                    self.metadata_by_shard.setdefault(shard, [])
+                    self.metadata_by_shard[shard].append((section, key))
+            except NotFound:
+                # no keys of this type exist
+                continue
+            except HttpError as e:
+                log.error('Error listing metadata for section %s: %s',
+                          section, e)
+                raise
+
+    def generate_work(self):
+        return self.metadata_by_shard.iteritems()
diff --git a/radosgw_agent/tests/__init__.py b/radosgw_agent/tests/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/radosgw_agent/tests/conftest.py b/radosgw_agent/tests/conftest.py
new file mode 100644 (file)
index 0000000..f0ef3b5
--- /dev/null
@@ -0,0 +1,38 @@
+import logging
+import sys
+import os
+
+# set an environ variable that tells us that we are really testing
+os.environ['PYTEST'] = '1'
+
+# this console logging configuration is basically just to be able to see output
+# in tests, and this file gets executed by py.test when it runs, so we get that
+# for free.
+
+# Console Logger
+sh = logging.StreamHandler()
+sh.setLevel(logging.WARNING)
+
+formatter = logging.Formatter(
+    fmt='%(asctime)s.%(msecs)03d %(process)d:%(levelname)s:%(name)s:%(message)s',
+    datefmt='%Y-%m-%dT%H:%M:%S',
+    )
+sh.setFormatter(formatter)
+
+
+# because we're in a module already, __name__ is not the ancestor of
+# the rest of the package; use the root as the logger for everyone
+root_logger = logging.getLogger()
+
+# allow all levels at root_logger, handlers control individual levels
+root_logger.setLevel(logging.DEBUG)
+root_logger.addHandler(sh)
+
+console_loglevel = logging.DEBUG  # start at DEBUG for now
+
+# Console Logger
+sh.setLevel(console_loglevel)
+
+
+
+
diff --git a/radosgw_agent/tests/test_client.py b/radosgw_agent/tests/test_client.py
new file mode 100644 (file)
index 0000000..624b336
--- /dev/null
@@ -0,0 +1,678 @@
+import boto
+import py.test
+from mock import Mock
+import httpretty
+import re
+
+from radosgw_agent import client
+from radosgw_agent import exceptions as exc
+from radosgw_agent.constants import DEFAULT_TIME
+
+# parametrization helpers
+
+def endpoints():
+    return [
+        ('http://example.org', 'example.org', 80, False),
+        ('https://example.org', 'example.org', 443, True),
+        ('https://[e40:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922]', '[e40:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922]', 443, True),
+        ('http://[e40:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922]', '[e40:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922]', 80, False),
+        ('http://[e39:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922]:8080', '[e39:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922]', 8080, False),
+        ('http://e40:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922', '[e40:92be:ab1c:c9c1:3e2e:dbf6:57c6:8922]', 80, False),
+        ('https://example.org:8080', 'example.org', 8080, True),
+        ('https://example.org:8080/', 'example.org', 8080, True),
+        ('http://example.org:81/a/b/c?b#d', 'example.org', 81, False),
+    ]
+
+
+REGION_MAP = {
+    "regions": [
+        {
+            "val": {
+                "zones": [
+                    {
+                        "endpoints": [
+                            "http://vit:8001/"
+                            ],
+                        "log_data": "true",
+                        "log_meta": "true",
+                        "name": "skinny-1"
+                        },
+                    {
+                        "endpoints": [
+                            "http://vit:8002/"
+                            ],
+                        "log_data": "false",
+                        "log_meta": "false",
+                        "name": "skinny-2"
+                        }
+                    ],
+                "name": "skinny",
+                "default_placement": "",
+                "master_zone": "skinny-1",
+                "api_name": "slim",
+                "placement_targets": [],
+                "is_master": "true",
+                "endpoints": [
+                    "http://skinny:80/"
+                    ]
+                },
+            "key": "skinny"
+            },
+        {
+            "val": {
+                "zones": [
+                    {
+                        "endpoints": [
+                            "http://vit:8003/"
+                            ],
+                        "log_data": "false",
+                        "log_meta": "false",
+                        "name": "swab-2"
+                        },
+                    {
+                        "endpoints": [
+                            "http://vit:8004/"
+                            ],
+                        "log_data": "false",
+                        "log_meta": "false",
+                        "name": "swab-3"
+                        },
+                    {
+                        "endpoints": [
+                            "http://vit:8000/"
+                            ],
+                        "log_data": "true",
+                        "log_meta": "true",
+                        "name": "swab-1"
+                        }
+                    ],
+                "name": "swab",
+                "default_placement": "",
+                "master_zone": "swab-1",
+                "api_name": "shady",
+                "placement_targets": [],
+                "is_master": "false",
+                "endpoints": [
+                    "http://vit:8000/"
+                    ]
+                },
+            "key": "swab"
+            },
+        {
+            "val": {
+                "zones": [
+                    {
+                        "endpoints": [
+                            "http://ro:80/"
+                            ],
+                        "log_data": "false",
+                        "log_meta": "false",
+                        "name": "ro-1"
+                        },
+                    {
+                        "endpoints": [
+                            "http://ro:8080/"
+                            ],
+                        "log_data": "false",
+                        "log_meta": "false",
+                        "name": "ro-2"
+                        },
+                    ],
+                "name": "readonly",
+                "default_placement": "",
+                "master_zone": "ro-1",
+                "api_name": "readonly",
+                "placement_targets": [],
+                "is_master": "false",
+                "endpoints": [
+                    "http://ro:80/",
+                    "http://ro:8080/"
+                    ]
+                },
+            "key": "readonly"
+            },
+        {
+            "val": {
+                "zones": [
+                    {
+                        "endpoints": [
+                            "http://meta:80/"
+                            ],
+                        "log_data": "false",
+                        "log_meta": "true",
+                        "name": "meta-1"
+                        },
+                    {
+                        "endpoints": [
+                            "http://meta:8080/"
+                            ],
+                        "log_data": "false",
+                        "log_meta": "false",
+                        "name": "meta-2"
+                        },
+                    ],
+                "name": "metaonly",
+                "default_placement": "",
+                "master_zone": "meta-1",
+                "api_name": "metaonly",
+                "placement_targets": [],
+                "is_master": "false",
+                "endpoints": [
+                    "http://meta:80/",
+                    "http://meta:8080/"
+                    ]
+                },
+            "key": "metaonly"
+            }
+        ],
+    "master_region": "skinny"
+    }
+
+def test_endpoint_default_port():
+    endpoint = client.Endpoint('example.org', None, True)
+    assert endpoint.port == 443
+    endpoint = client.Endpoint('example.org', None, False)
+    assert endpoint.port == 80
+
+def test_endpoint_port_specified():
+    endpoint = client.Endpoint('example.org', 80, True)
+    assert endpoint.port == 80
+    endpoint = client.Endpoint('example.org', 443, True)
+    assert endpoint.port == 443
+
+
+def test_endpoint_equality():
+    default_port = client.Endpoint('a.org', None, True)
+    secure = client.Endpoint('a.org', 443, True)
+    insecure = client.Endpoint('a.org', 80, False)
+    assert default_port == secure
+    assert secure == insecure
+    assert insecure == default_port
+
+
+def test_endpoint_inequality():
+    base = client.Endpoint('a.org', 80, True)
+    diff_host = client.Endpoint('b.org', 80, True)
+    diff_port = client.Endpoint('a.org', 81, True)
+    insecure = client.Endpoint('a.org', 8080, False)
+    assert base != diff_host
+    assert base != diff_port
+    assert base != insecure
+
+
+@py.test.mark.parametrize('url, host, port, secure', endpoints())
+def test_parse_endpoint(url, host, port, secure):
+    endpoint = client.parse_endpoint(url)
+    assert endpoint.port == port
+    assert endpoint.host == host
+    assert endpoint.secure == secure
+
+
+@py.test.mark.parametrize('url, host, port, secure', endpoints())
+def test_parse_repr(url, host, port, secure):
+    endpoint = repr(client.parse_endpoint(url))
+    assert str(secure) in endpoint
+    assert str(host) in endpoint
+    assert str(port) in endpoint
+
+
+def test_parse_endpoint_bad_input():
+    with py.test.raises(exc.InvalidProtocol):
+        client.parse_endpoint('ftp://example.com')
+    with py.test.raises(exc.InvalidHost):
+        client.parse_endpoint('http://:80/')
+
+def _test_configure_endpoints(dest_url, dest_region, dest_zone,
+                              expected_src_url, expected_src_region,
+                              expected_src_zone, specified_src_url=None,
+                              meta_only=False):
+    dest = client.parse_endpoint(dest_url)
+    if specified_src_url is not None:
+        src = client.parse_endpoint(specified_src_url)
+    else:
+        src = client.Endpoint(None, None, None)
+    region_map = client.RegionMap(REGION_MAP)
+    client.configure_endpoints(region_map, dest, src, meta_only)
+    assert dest.region.name == dest_region
+    assert dest.zone.name == dest_zone
+    assert src == client.parse_endpoint(expected_src_url)
+    assert src.region.name == expected_src_region
+    assert src.zone.name == expected_src_zone
+
+def test_configure_endpoints_2nd_region_master_zone_meta():
+    _test_configure_endpoints('http://vit:8000', 'swab', 'swab-1',
+                              'http://vit:8001', 'skinny', 'skinny-1',
+                              meta_only=True)
+
+def test_configure_endpoints_2nd_region_master_zone_data():
+    with py.test.raises(exc.InvalidZone):
+        _test_configure_endpoints('http://vit:8000', 'swab', 'swab-1',
+                                  'http://vit:8001', 'skinny', 'skinny-1',
+                                  meta_only=False)
+
+def test_configure_endpoints_master_region_2nd_zone():
+    _test_configure_endpoints('http://vit:8002', 'skinny', 'skinny-2',
+                              'http://vit:8001', 'skinny', 'skinny-1')
+
+def test_configure_endpoints_2nd_region_2nd_zone():
+    _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+                              'http://vit:8000', 'swab', 'swab-1')
+
+def test_configure_endpoints_2nd_region_readonly_meta():
+    _test_configure_endpoints('http://ro:8080', 'readonly', 'ro-2',
+                              'http://vit:8001', 'skinny', 'skinny-1',
+                              meta_only=True)
+
+def test_configure_endpoints_2nd_region_readonly_data():
+    with py.test.raises(exc.InvalidZone):
+        _test_configure_endpoints('http://ro:8080', 'readonly', 'ro-2',
+                                  'http://vit:8001', 'skinny', 'skinny-1',
+                                  meta_only=False)
+
+def test_configure_endpoints_2nd_region_metaonly_meta():
+    _test_configure_endpoints('http://meta:8080', 'metaonly', 'meta-2',
+                              'http://meta:80', 'metaonly', 'meta-1',
+                              meta_only=True)
+
+def test_configure_endpoints_2nd_region_metaonly_data():
+    with py.test.raises(exc.InvalidZone):
+        _test_configure_endpoints('http://meta:8080', 'metaonly', 'meta-2',
+                                  'http://vit:8001', 'skinny', 'skinny-1',
+                                  meta_only=False)
+
+def test_configure_endpoints_master_region_master_zone():
+    with py.test.raises(exc.InvalidZone):
+        _test_configure_endpoints('http://vit:8001', 'skinny', 'skinny-1',
+                                  'http://vit:8001', 'skinny', 'skinny-1')
+
+def test_configure_endpoints_specified_src_same_region():
+    _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+                              'http://vit:8000', 'swab', 'swab-1',
+                              'http://vit:8000')
+
+def test_configure_endpoints_specified_src_master_region_meta():
+    _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+                              'http://vit:8001', 'skinny', 'skinny-1',
+                              'http://vit:8001', meta_only=True)
+
+def test_configure_endpoints_specified_src_master_region_data():
+    with py.test.raises(exc.InvalidZone):
+        _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+                                  'http://vit:8001', 'skinny', 'skinny-1',
+                                  'http://vit:8001', meta_only=False)
+
+def test_configure_endpoints_bad_src_same_region():
+    with py.test.raises(exc.InvalidZone):
+        _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+                                  'http://vit:8004', 'swab', 'swab-3',
+                                  'http://vit:8004')
+
+def test_configure_endpoints_bad_src_master_region():
+    with py.test.raises(exc.InvalidZone):
+        _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+                                  'http://vit:8002', 'skinny', 'skinny-2',
+                                  'http://vit:8002')
+
+def test_configure_endpoints_bad_src_same_zone():
+    with py.test.raises(exc.InvalidZone):
+        _test_configure_endpoints('http://vit:8000', 'swab', 'swab-1',
+                                  'http://vit:8000', 'swab', 'swab-1',
+                                  'http://vit:8000')
+
+def test_configure_endpoints_specified_nonexistent_src():
+    with py.test.raises(exc.ZoneNotFound):
+        _test_configure_endpoints('http://vit:8005', 'skinny', 'skinny-1',
+                                  'http://vit:8001', 'skinny', 'skinny-1',
+                                  'http://vit:80')
+
+def test_configure_endpoints_unknown_zone():
+    with py.test.raises(exc.ZoneNotFound):
+        _test_configure_endpoints('http://vit:8005', 'skinny', 'skinny-1',
+                                  'http://vit:8001', 'skinny', 'skinny-1')
+
+def http_invalid_status_codes():
+    return [
+        101, 102, 300, 301, 302, 303, 304, 305, 306, 307, 308,
+    ]
+
+def http_valid_status_codes():
+    return [
+        200, 201, 202, 203, 204, 205, 207, 208, 226,
+    ]
+
+class TestCheckResultStatus(object):
+
+    @py.test.mark.parametrize('code', http_invalid_status_codes())
+    def test_check_raises_http_error(self, code):
+        response = Mock()
+        response.status = code
+        with py.test.raises(exc.HttpError):
+            client.check_result_status(response)
+
+    @py.test.mark.parametrize('code', http_valid_status_codes())
+    def test_check_does_not_raise_http_error(self, code):
+        response = Mock()
+        response.status = code
+        assert client.check_result_status(response) is None
+
+
+    def test_check_raises_not_found(self):
+        response = Mock()
+        response.status = 404
+        with py.test.raises(exc.NotFound):
+            client.check_result_status(response)
+
+
+class TestBotoCall(object):
+
+    def test_return_val(self):
+        @client.boto_call
+        def foo(*args, **kwargs):
+            return (args, kwargs)
+        assert foo(1) == ((1,), {})
+        assert foo(b=2) == (tuple(), {'b': 2})
+
+    def test_boto_exception_not_found(self):
+        @client.boto_call
+        def foo():
+            raise boto.exception.S3ResponseError(404, '')
+
+        with py.test.raises(exc.NotFound):
+            foo()
+
+    def test_non_boto_exception(self):
+        @client.boto_call
+        def foo():
+            raise ValueError('')
+
+        with py.test.raises(ValueError):
+            foo()
+
+
+class TestRequest(object):
+
+    @httpretty.activate
+    def test_url(self):
+
+        httpretty.register_uri(
+            httpretty.GET,
+            re.compile("http://localhost:8888/(.*)"),
+            body='{}',
+            content_type="application/json",
+        )
+        connection = client.connection(
+            client.Endpoint('localhost', 8888, False, 'key', 'secret'),
+            True,
+        )
+
+        client.request(connection, 'get', '/%7E~', _retries=0)
+        server_request = httpretty.last_request()
+        assert server_request.path == '/%257E%7E'
+
+    @httpretty.activate
+    def test_url_response(self):
+
+        httpretty.register_uri(
+            httpretty.GET,
+            re.compile("http://localhost:8888/(.*)"),
+            body='{"msg": "ok"}',
+            content_type="application/json",
+        )
+        connection = client.connection(
+            client.Endpoint('localhost', 8888, False, 'key', 'secret'),
+            True,
+        )
+
+        result = client.request(connection, 'get', '/%7E~', _retries=0)
+        assert result == {'msg': 'ok'}
+
+    @httpretty.activate
+    def test_url_bad(self):
+
+        httpretty.register_uri(
+            httpretty.GET,
+            re.compile("http://localhost:8888/(.*)"),
+            body='{}',
+            content_type="application/json",
+            status=500,
+        )
+        connection = client.connection(
+            client.Endpoint('localhost', 8888, False, 'key', 'secret'),
+            True,
+        )
+
+        with py.test.raises(exc.HttpError):
+            client.request(connection, 'get', '/%7E~', _retries=0)
+
+
+class TestBotoCall(object):
+
+    def test_return_val(self):
+        @client.boto_call
+        def foo(*args, **kwargs):
+            return (args, kwargs)
+        assert foo(1) == ((1,), {})
+        assert foo(b=2) == (tuple(), {'b': 2})
+
+    def test_boto_exception_not_found(self):
+        @client.boto_call
+        def foo():
+            raise boto.exception.S3ResponseError(404, '')
+
+        with py.test.raises(exc.NotFound):
+            foo()
+
+    def test_non_boto_exception(self):
+        @client.boto_call
+        def foo():
+            raise ValueError('')
+
+        with py.test.raises(ValueError):
+            foo()
+
+
+class TestGETClientRequestsPaths(object):
+
+    def setup(self):
+        self.connection = client.connection(
+            client.Endpoint('localhost', 8888, False, 'key', 'secret'),
+            True,
+        )
+
+    def register(self, body=None):
+        body = body or '{}'
+        httpretty.register_uri(
+            httpretty.GET,
+            re.compile("http://localhost:8888/(.*)"),
+            body=body,
+            content_type="application/json",
+        )
+
+    @httpretty.activate
+    def test_get_metadata(self):
+        self.register()
+        client.get_metadata(self.connection, 'bucket.instance', 'foo')
+        server_request = httpretty.last_request()
+        assert server_request.path == '/admin/metadata/bucket.instance?key=foo'
+
+    @httpretty.activate
+    def test_get_metadata_no_re_encoding(self):
+        self.register()
+        client.get_metadata(self.connection, 'bucket.instance', 'mybar:r0z0.4140.1')
+        server_request = httpretty.last_request()
+        assert server_request.path == '/admin/metadata/bucket.instance?key=mybar%3Ar0z0.4140.1'
+
+    @httpretty.activate
+    def test_get_metadata_sections(self):
+        self.register()
+        client.get_metadata_sections(self.connection)
+        server_request = httpretty.last_request()
+        assert server_request.path == '/admin/metadata'
+
+    @httpretty.activate
+    def test_list_metadata_keys(self):
+        self.register()
+        client.list_metadata_keys(self.connection, 'foo')
+        server_request = httpretty.last_request()
+        assert server_request.path == '/admin/metadata/foo'
+
+    @httpretty.activate
+    def test_get_bucket_list(self):
+        self.register()
+        client.get_bucket_list(self.connection)
+        server_request = httpretty.last_request()
+        assert server_request.path == '/admin/metadata/bucket'
+
+    @httpretty.activate
+    def test_url_response(self):
+
+        httpretty.register_uri(
+            httpretty.GET,
+            re.compile("http://localhost:8888/(.*)"),
+            body='{"msg": "ok"}',
+            content_type="application/json",
+        )
+        result = client.request(self.connection, 'get', '/%7E~')
+        assert result == {'msg': 'ok'}
+
+
+class TestClientListObjectsInBucket(object):
+
+    def setup(self):
+        self.connection = client.connection(
+            client.Endpoint('localhost', 8888, False, 'key', 'secret'),
+            True,
+        )
+        self.body = """
+        [
+            {
+                "name": "mahobject/",
+                "etag": "d41d8cd98f00b204e9800998ecf8427e",
+                "content_type": "application/octet-stream",
+                "last_modified": "2015-01-15T15:24:42.000Z",
+                "storage_class": "STANDARD",
+                "owner": {
+                    "display_name": "client1-system-user",
+                    "id": "client1-system-user"
+                }
+            }
+        ]
+        """
+
+    def register(self, body=None):
+        body = body or self.body
+        httpretty.register_uri(
+            httpretty.GET,
+            re.compile("http://localhost:8888/(.*)"),
+            body=body,
+            content_type="application/json",
+        )
+
+    @httpretty.activate
+    def test_get_bucket_is_a_single_item(self):
+        self.register()
+        result = client.get_bucket_list(self.connection)
+        assert len(result) == 1
+
+    @httpretty.activate
+    def test_get_bucket_has_right_metadata(self):
+        self.register()
+        result = client.get_bucket_list(self.connection)
+        obj = result[0]
+        owner = {
+            "display_name": "client1-system-user",
+            "id": "client1-system-user"
+        }
+        assert obj['name'] == 'mahobject/'
+        assert obj['etag'] == 'd41d8cd98f00b204e9800998ecf8427e'
+        assert obj['content_type'] == 'application/octet-stream'
+        assert obj['last_modified'] == '2015-01-15T15:24:42.000Z'
+        assert obj['storage_class'] == 'STANDARD'
+        assert obj['owner'] == owner
+
+
+class TestClientGetWorkerBound(object):
+
+    def setup(self):
+        self.connection = client.connection(
+            client.Endpoint('localhost', 8888, False, 'key', 'secret'),
+            True,
+        )
+        self.body = """
+        {"marker": "00000000002.2.3",
+            "markers": [
+                {
+                    "entity": "radosgw-agent",
+                    "items_in_progress": [
+                        {
+                            "name": "hello",
+                            "timestamp": "0.000000"
+                        }
+                    ],
+                    "position_marker": "00000000002.2.3",
+                    "position_time": "0.000000"
+                }
+            ],
+         "oldest_time": "0.000000"
+        }
+        """
+
+    def register(self, body=None, status=200):
+        body = body or self.body
+        httpretty.register_uri(
+            httpretty.GET,
+            re.compile("http://localhost:8888/(.*)"),
+            body=body,
+            content_type="application/json",
+            status=status
+        )
+
+    @httpretty.activate
+    def test_get_bound_has_right_metadata(self):
+        self.register()
+        result = client.get_worker_bound(
+            self.connection,
+            'bucket-index',
+            'beast:us-east'
+        )
+        assert result['marker'] == "00000000002.2.3"
+        assert result['retries'] == set(['hello'])
+        assert result['oldest_time'] == "0.000000"
+
+    @httpretty.activate
+    def test_get_bound_fails_fallsback_to_defaults(self):
+        self.register(status=404)
+        result = client.get_worker_bound(
+            self.connection,
+            'bucket-index',
+            'beast:us-east'
+        )
+        assert result['marker'] == " "
+        assert result['retries'] == []
+        assert result['oldest_time'] == DEFAULT_TIME
+
+
+class TestIsVersioned(object):
+
+    def setup(self):
+        # set strict attributes in the mock
+        self.obj = Mock(spec=object)
+
+    def test_is_in_fact_versioned(self):
+        self.obj.VersionedEpoch = u'1'
+        self.obj.version_id = 'somehashvalue'
+        assert client.is_versioned(self.obj) is True
+
+    def test_is_not_versioned_no_attr_versioned_epoch(self):
+        assert client.is_versioned(self.obj) is False
+
+    def test_is_not_versioned_no_attr_version_id(self):
+        assert client.is_versioned(self.obj) is False
+
+    def test_is_versioned_version_id(self):
+        self.obj.version_id = 1
+        assert client.is_versioned(self.obj) is True
+
+    def test_is_not_versioned_versioned_id_is_none(self):
+        self.obj.version_id = None
+        assert client.is_versioned(self.obj) is False
diff --git a/radosgw_agent/tests/test_worker.py b/radosgw_agent/tests/test_worker.py
new file mode 100644 (file)
index 0000000..f29e8a1
--- /dev/null
@@ -0,0 +1,214 @@
+from mock import Mock, patch
+import json
+import py.test
+import time
+import httpretty
+import re
+
+from radosgw_agent import worker, client
+from radosgw_agent.exceptions import HttpError, NotFound, BucketEmpty, SyncTimedOut
+
+
+class TestSyncObject(object):
+
+    def setup(self):
+        # setup the fake client
+        self.client = Mock()
+
+        self.src = Mock()
+        self.src.zone.name = 'Zone Name'
+        self.src.host = 'example.com'
+        self.obj = Mock()
+        self.obj.name = 'mah-object'
+
+    def test_syncs_correctly(self):
+        with patch('radosgw_agent.worker.client'):
+            w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+            assert w.sync_object('mah-bucket', self.obj) is True
+
+    def test_syncs_not_found_on_master_deleting_from_secondary(self):
+        self.client.sync_object_intra_region = Mock(side_effect=NotFound(404, ''))
+
+        with patch('radosgw_agent.worker.client', self.client):
+            w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+            w.wait_for_object = lambda *a: None
+            assert w.sync_object('mah-bucket', self.obj) is True
+
+    def test_syncs_deletes_from_secondary(self):
+        self.client.sync_object_intra_region = Mock(side_effect=NotFound(404, ''))
+        self.client.delete_object = Mock(side_effect=NotFound(404, ''))
+
+        with patch('radosgw_agent.worker.client', self.client):
+            w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+            w.wait_for_object = lambda *a: None
+            assert w.sync_object('mah-bucket', self.obj) is False
+
+    def test_syncs_could_not_delete_from_secondary(self):
+        self.client.sync_object_intra_region = Mock(side_effect=NotFound(404, ''))
+        self.client.delete_object = Mock(side_effect=ValueError('unexpected error'))
+
+        with patch('radosgw_agent.worker.client', self.client):
+            w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+            w.wait_for_object = lambda *a: None
+
+            with py.test.raises(worker.SyncFailed):
+                w.sync_object('mah-bucket', self.obj)
+
+    def test_syncs_encounters_a_http_error(self):
+        self.client.sync_object_intra_region = Mock(side_effect=HttpError(400, ''))
+
+        with patch('radosgw_agent.worker.client', self.client):
+            w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+            w.wait_for_object = lambda *a: None
+            w.sync_object('mah-bucket', self.obj)
+
+    def test_sync_client_raises_sync_failed(self):
+        self.client.sync_object_intra_region = Mock(side_effect=worker.SyncFailed('failed intra region'))
+
+        with patch('radosgw_agent.worker.client', self.client):
+            w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+
+            with py.test.raises(worker.SyncFailed) as exc:
+                w.sync_object('mah-bucket', self.obj)
+
+            exc_message = exc.value[0]
+            assert 'failed intra region' in exc_message
+
+    def test_fails_to_remove_op_state(self, capsys):
+        # really tricky to test this one, we are forced to just use `capsys` from py.test
+        # which will allow us to check into the stderr logging output and see if the agent
+        # was spitting what we are expecting.
+        self.client.remove_op_state = Mock(side_effect=ValueError('could not remove op'))
+
+        with patch('radosgw_agent.worker.client', self.client):
+            w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+            w.wait_for_object = lambda *a: None
+            assert w.sync_object('mah-bucket', self.obj) is True
+        # logging does not play nice and so we are forced to comment this out.
+        # this test does test the right thing, but we are unable to have a nice
+        # assertion, the fix here is not the test it is the code that needs to
+        # improve. For now, this just
+        # gives us the coverage.
+        # out, err = capsys.readouterr()
+        # assert 'could not remove op state' in out
+        # assert 'could not remove op state' in err
+
+    def test_fails_to_do_anything_fallsback_to_wait_for_object(self):
+        self.client.sync_object_intra_region = Mock(side_effect=ValueError('severe error'))
+
+        with patch('radosgw_agent.worker.client', self.client):
+            w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+            w.wait_for_object = lambda *a: None
+            assert w.sync_object('mah-bucket', self.obj) is True
+
+    def test_wait_for_object_state_not_found_raises_sync_failed(self):
+        self.client.get_op_state = Mock(side_effect=NotFound(404, ''))
+        with patch('radosgw_agent.worker.client', self.client):
+            w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+            with py.test.raises(worker.SyncFailed) as exc:
+                w.wait_for_object(None, None, time.time() + 1000, None)
+
+        exc_message = exc.exconly()
+        assert 'state not found' in exc_message
+
+    def test_wait_for_object_state_is_empty_sync_timesout(self):
+        self.client.get_op_state = lambda *a: []
+        with patch('radosgw_agent.worker.client', self.client):
+            w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+            with py.test.raises(SyncTimedOut) as exc:
+                w.wait_for_object(None, None, time.time() + 1, None)
+
+    def test_wait_for_object_timeout(self):
+        msg = 'should not have called get_op_state'
+        self.client.get_op_state = Mock(side_effect=AssertionError(msg))
+        with patch('radosgw_agent.worker.client', self.client):
+            w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+            with py.test.raises(worker.SyncTimedOut) as exc:
+                w.wait_for_object(None, None, time.time() - 1, None)
+
+    def test_wait_for_object_state_complete(self):
+        with patch('radosgw_agent.worker.client', self.client):
+            w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+            self.client.get_op_state = lambda *a: [{'state': 'complete'}]
+            assert w.wait_for_object(None, None, time.time() + 1, None) is None
+
+    def test_wait_for_object_state_error(self):
+        with patch('radosgw_agent.worker.client', self.client):
+            w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+            self.client.get_op_state = lambda *a: [{'state': 'error'}]
+            with py.test.raises(worker.SyncFailed) as exc:
+                w.wait_for_object(None, None, time.time() + 1, None)
+
+        exc_message = exc.exconly()
+        assert 'state is error' in exc_message
+
+    def test_sync_bucket_delayed_not_found(self):
+        class fake_iterable(object):
+            def __iter__(self):
+                raise BucketEmpty
+        with patch('radosgw_agent.worker.client', self.client):
+            w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+            w.sync_object = lambda *a: None
+            objects = fake_iterable()
+            with py.test.raises(BucketEmpty):
+                w.sync_bucket('foo', objects)
+
+
+
+def create_fake_endpoint(name='source', **kw):
+    ep = Mock()
+    ep.zone.name = name
+    ep.secret_key = kw.get('secret', 'secret')
+    ep.access_key = kw.get('access', 'access')
+    ep.port = kw.get('port', 7777)
+    ep.host = kw.get('host', 'localhost')
+    ep.debug = kw.get('debug', True)
+    return ep
+
+def create_log_entry(name='foo', **kw):
+    return {
+        "op_id": kw.get('op_id',"00000000006.3741.3"),
+        "op_tag": kw.get('op_tag', "default.21007.10"),
+        "op": kw.get('op', "link_olh"),
+        "object": name,
+        "instance": kw.get('instance',"uWueo6N+Hm6Sp86OdxfnfDHfUKRy\/gOu"),
+        "state": kw.get('state', "complete"),
+        "index_ver": kw.get('index_ver', 6),
+        "timestamp": kw.get('timestamp', "2015-01-07 00:21:41.000000Z"),
+        "ver": kw.get('ver', { "pool": 12, "epoch": 20818}),
+        "versioned": kw.get('versioned', True),
+    }
+
+
+class TestDataWorkerIncremental(object):
+
+    def setup(self):
+        self.w = worker.DataWorkerIncremental(
+            None, None, None, create_fake_endpoint(),
+            create_fake_endpoint('dest'), daemon_id=1, max_entries=10
+        )
+
+    def register(self, src_body=None, dest_body=None, status=200):
+
+        httpretty.register_uri(
+            httpretty.GET,
+            re.compile("http://localhost:7777/admin/log(.*)"),
+            body=src_body or "{}",
+            content_type="application/json",
+            status=status
+        )
+        httpretty.register_uri(
+            httpretty.GET,
+            re.compile("http://localhost:8888/admin/log(.*)"),
+            body=dest_body or "{}",
+            content_type="application/json",
+            status=status
+        )
+
+    @httpretty.activate
+    def test_items_from_source_only(self):
+        src_body = json.dumps([create_log_entry('foo_1')])
+        self.register(src_body=src_body)
+        marker, entries = self.w.get_bucket_instance_entries(2, 'bucket')
+        assert marker == '00000000006.3741.3'
+        assert len(entries) == 1
diff --git a/radosgw_agent/tests/util/test_configuration.py b/radosgw_agent/tests/util/test_configuration.py
new file mode 100644 (file)
index 0000000..ca5dc88
--- /dev/null
@@ -0,0 +1,64 @@
+import pytest
+from radosgw_agent.util import configuration
+
+
+@pytest.fixture
+def conf():
+    return configuration.Configuration()
+
+
+class TestConfiguration(object):
+
+    def test_set_new_keys(self, conf):
+        conf['key'] = 1
+        assert conf['key'] == 1
+
+    def test_not_allowed_to_change_value(self, conf):
+        conf['key'] = 1
+        with pytest.raises(TypeError):
+            conf['key'] = 2
+
+    def test_not_allowed_to_pop_existing_key(self, conf):
+        conf['key'] = 1
+        with pytest.raises(TypeError):
+            conf.pop('key')
+
+    def test_keyerror_when_popping(self, conf):
+        with pytest.raises(KeyError):
+            conf.pop('key')
+
+    def test_adding_nested_values(self, conf):
+        conf['key'] = {}
+        conf['key']['bar'] = 1
+        assert conf['key']['bar'] == 1
+
+    def test_modifiying_nested_values_fails(self, conf):
+        conf['key'] = {}
+        conf['key']['bar'] = 1
+        with pytest.raises(TypeError):
+            conf['key']['bar'] = 2
+
+    def test_initial_dict_seeding(self):
+        my_dict = {'a': 1}
+        conf = configuration.Configuration(my_dict)
+        assert conf['a'] == 1
+
+    def test_initial_dict_seeding_doesnt_allow_updates(self):
+        my_dict = {'a': 1}
+        conf = configuration.Configuration(my_dict)
+        with pytest.raises(TypeError):
+            conf['a'] = 2
+
+    def test_assign_a_new_key_to_a_dict(self, conf):
+        my_dict = {'a': 1}
+        conf['args'] = my_dict
+        assert conf['args']['a'] == 1
+
+    def test_contains_element(self, conf):
+        exists = False
+        try:
+            if 'key' in conf:
+                exists = True
+        except KeyError:
+            assert False, "dict object should support 'contains' operations"
+        assert exists is False
diff --git a/radosgw_agent/tests/util/test_network.py b/radosgw_agent/tests/util/test_network.py
new file mode 100644 (file)
index 0000000..0c211d2
--- /dev/null
@@ -0,0 +1,48 @@
+import pytest
+from radosgw_agent.util import network
+import random
+
+
+def valid_ipv6_addr(ports=False, brackets=False, addresses=20):
+    max_rand_int = 16**4
+
+    def generate(brackets, ports):
+        address = ":".join(
+            ("%x" % random.randint(0, max_rand_int) for i in range(8))
+        )
+        if brackets:
+            address =  '[%s]' % address
+        if ports:
+            address = '%s:8080' % address
+        return address
+    return [generate(brackets, ports) for i in range(addresses)]
+
+
+def invalid_ipv6_addr():
+    return [
+        '',
+        1,
+        'some address',
+        '192.1.1.1',
+        '::!',
+    ]
+
+
+class TestIsIPV6(object):
+
+    @pytest.mark.parametrize('address', valid_ipv6_addr())
+    def test_passes_valid_addresses(self, address):
+        assert network.is_ipv6(address) is True
+
+    @pytest.mark.parametrize('address', valid_ipv6_addr(brackets=True))
+    def test_passes_valid_addresses_with_brackets(self, address):
+        assert network.is_ipv6(address) is True
+
+    @pytest.mark.parametrize('address', invalid_ipv6_addr())
+    def test_catches_invalid_addresses(self, address):
+        assert network.is_ipv6(address) is False
+
+    @pytest.mark.parametrize('address', valid_ipv6_addr(ports=True, brackets=True))
+    def test_passes_valid_addresses_with_brackets_and_ports(self, address):
+        assert network.is_ipv6(address) is True
+
diff --git a/radosgw_agent/tests/util/test_obj.py b/radosgw_agent/tests/util/test_obj.py
new file mode 100644 (file)
index 0000000..e3e3575
--- /dev/null
@@ -0,0 +1,40 @@
+from radosgw_agent.util import obj
+
+
+class Empty(object):
+
+    def __init__(self, **kw):
+        for k, v in kw.items():
+            setattr(self, k, v)
+
+
+class TestToDict(object):
+
+    def test_underscores_are_ignored(self):
+        fake = Empty(a=1, _b=2)
+        result = obj.to_dict(fake)
+        assert result.get('_b') is None
+        assert result.get('a') == 1
+
+    def test_overrides_are_respected(self):
+        fake = Empty(a=1, b=2)
+        result = obj.to_dict(fake, b=3)
+        assert result.get('b') == 3
+
+    def test_overrides_dont_mess_up_other_keys(self):
+        fake = Empty(a=1, b=2)
+        result = obj.to_dict(fake, b=3)
+        assert result.get('a') == 1
+
+    def test_extra_keys_are_set(self):
+        result = obj.to_dict(Empty(), a=1, b=2)
+        assert result['a'] == 1
+        assert result['b'] == 2
+
+
+class TestKeysToAttribute(object):
+
+    def test_replace_dashes(self):
+        dictionary = {'dashed-word': 1}
+        result = obj.to_obj(dictionary)
+        assert result.dashed_word == 1
diff --git a/radosgw_agent/util/__init__.py b/radosgw_agent/util/__init__.py
new file mode 100644 (file)
index 0000000..64b7d5f
--- /dev/null
@@ -0,0 +1,2 @@
+import log
+from log import get_dev_logger
diff --git a/radosgw_agent/util/configuration.py b/radosgw_agent/util/configuration.py
new file mode 100644 (file)
index 0000000..d0661f4
--- /dev/null
@@ -0,0 +1,77 @@
+
+
+class Configuration(object):
+    """
+    An immutable dictionary where values set for the first time are allowed and
+    existing keys raise a TypeError exception
+
+    Even though there is effort made into making it immutable, a consumer can
+    force its way through by accessing the private `dict` method that contains
+    the values, although it defeats the purpose it is exposed in that way in
+    the event that something needs to change.
+
+    All normal methods and operations should be supported.
+    """
+
+    def __init__(self, seed=None):
+        if seed and isinstance(seed, dict):
+            self._dict = seed
+        else:
+            self._dict = {}
+
+    def __str__(self):
+        return str(self._dict)
+
+    def pop(self, key, default=None):
+        try:
+            self._dict[key]
+        except KeyError:
+            raise
+        else:
+            self._default_error()
+
+    def popitem(self):
+        self._default_error()
+
+    def update(self):
+        self._default_error()
+
+    def clear(self):
+        self._default_error()
+
+    def values(self):
+        return self._dict.values()
+
+    def keys(self):
+        return self._dict.keys()
+
+    def items(self):
+        return self._dict.items()
+
+    def get(self, key, default=None):
+        return self._dict.get(key, default)
+
+    def _default_error(self):
+        msg = 'config object does not allow key changes'
+        raise TypeError(msg)
+
+    def __setitem__(self, key, value):
+        try:
+            self._dict[key]
+        except KeyError:
+            if isinstance(value, dict):
+                self._dict[key] = Configuration(value)
+            else:
+                self._dict[key] = value
+        else:
+            self._default_error()
+
+    def __getitem__(self, key):
+        return self._dict[key]
+
+    def __contains__(self, key):
+        try:
+            self._dict[key]
+            return True
+        except KeyError:
+            return False
diff --git a/radosgw_agent/util/decorators.py b/radosgw_agent/util/decorators.py
new file mode 100644 (file)
index 0000000..2923cf2
--- /dev/null
@@ -0,0 +1,112 @@
+import logging
+import sys
+import traceback
+from functools import wraps
+
+
+def catches(catch=None, handler=None, exit=True, handle_all=False):
+    """
+    Very simple decorator that tries any of the exception(s) passed in as
+    a single exception class or tuple (containing multiple ones) returning the
+    exception message and optionally handling the problem if it raises with the
+    handler if it is provided.
+
+    So instead of doing something like this::
+
+        def bar():
+            try:
+                some_call()
+                print "Success!"
+            except TypeError, exc:
+                print "Error while handling some call: %s" % exc
+                sys.exit(1)
+
+    You would need to decorate it like this to have the same effect::
+
+        @catches(TypeError)
+        def bar():
+            some_call()
+            print "Success!"
+
+    If multiple exceptions need to be caught they need to be provided as a
+    tuple::
+
+        @catches((TypeError, AttributeError))
+        def bar():
+            some_call()
+            print "Success!"
+
+    If adding a handler, it should accept a single argument, which would be the
+    exception that was raised, it would look like::
+
+        def my_handler(exc):
+            print 'Handling exception %s' % str(exc)
+            raise SystemExit
+
+        @catches(KeyboardInterrupt, handler=my_handler)
+        def bar():
+            some_call()
+
+    Note that the handler needs to raise its SystemExit if it wants to halt
+    execution, otherwise the decorator would continue as a normal try/except
+    block.
+
+
+    :param catch: A tuple with one (or more) Exceptions to catch
+    :param handler: Optional handler to have custom handling of exceptions
+    :param exit: Raise a ``SystemExit`` after handling exceptions
+    :param handle_all: Handle all other exceptions via logging.
+    """
+    catch = catch or Exception
+    logger = logging.getLogger('radosgw_agent')
+
+    def decorate(f):
+
+        @wraps(f)
+        def newfunc(*a, **kw):
+            exit_from_catch = False
+            try:
+                return f(*a, **kw)
+            except catch as e:
+                if handler:
+                    return handler(e)
+                else:
+                    logger.error(make_exception_message(e))
+
+                    if exit:
+                        exit_from_catch = True
+                        sys.exit(1)
+            except Exception:  # anything else, no need to save the exception as a variable
+                if handle_all is False:  # re-raise if we are not supposed to handle everything
+                    raise
+                # Make sure we don't spit double tracebacks if we are raising
+                # SystemExit from the `except catch` block
+
+                if exit_from_catch:
+                    sys.exit(1)
+
+                str_failure = traceback.format_exc()
+                for line in str_failure.split('\n'):
+                    logger.error(line)
+                sys.exit(1)
+
+        return newfunc
+
+    return decorate
+
+#
+# Decorator helpers
+#
+
+
+def make_exception_message(exc):
+    """
+    An exception is passed in and this function
+    returns the proper string depending on the result
+    so it is readable enough.
+    """
+    if str(exc):
+        return '%s: %s\n' % (exc.__class__.__name__, exc)
+    else:
+        return '%s\n' % (exc.__class__.__name__)
+
diff --git a/radosgw_agent/util/log.py b/radosgw_agent/util/log.py
new file mode 100644 (file)
index 0000000..32094ea
--- /dev/null
@@ -0,0 +1,89 @@
+import logging
+import sys
+
+
+def get_dev_logger(name='dev.radosgw_agent'):
+    """
+    A simple utility to be able to log things that are meant for developer-eyes
+    and not for user facing.
+
+    All developer logs must be prepended with `dev` so this utility ensures
+    that is the case. To use it::
+
+        dev_log = get_dev_logger(__name__)
+
+    Or::
+
+        dev_log = get_dev_logger('dev.custom_name')
+    """
+    if not name.startswith('dev'):
+        return logging.getLogger('%s.%s' % ('dev', name))
+    return logging.getLogger(name)
+
+
+BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
+
+COLORS = {
+    'WARNING': YELLOW,
+    'INFO': WHITE,
+    'DEBUG': BLUE,
+    'CRITICAL': RED,
+    'ERROR': RED,
+    'FATAL': RED,
+}
+
+RESET_SEQ = "\033[0m"
+COLOR_SEQ = "\033[1;%dm"
+BOLD_SEQ = "\033[1m"
+
+BASE_COLOR_FORMAT = "%(asctime)s %(process)d\
+ [$BOLD%(name)s$RESET][%(color_levelname)-17s] %(message)s"
+
+BASE_FORMAT = "%(asctime)s %(process)d [%(name)s][%(levelname)-6s] %(message)s"
+
+
+def supports_color():
+    """
+    Returns True if the running system's terminal supports color, and False
+    otherwise.
+    """
+    unsupported_platform = (sys.platform in ('win32', 'Pocket PC'))
+    # isatty is not always implemented
+    is_a_tty = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()
+    if unsupported_platform or not is_a_tty:
+        return False
+    return True
+
+
+def color_message(message):
+    message = message.replace("$RESET", RESET_SEQ).replace("$BOLD", BOLD_SEQ)
+    return message
+
+
+class ColoredFormatter(logging.Formatter):
+    """
+    A very basic logging formatter that not only applies color to the levels of
+    the ouput but will also truncate the level names so that they do not alter
+    the visuals of logging when presented on the terminal.
+    """
+
+    def __init__(self, msg):
+        logging.Formatter.__init__(self, msg)
+
+    def format(self, record):
+        levelname = record.levelname
+        truncated_level = record.levelname[:6]
+        levelname_color = COLOR_SEQ % (30 + COLORS[levelname]) + truncated_level + RESET_SEQ
+        record.color_levelname = levelname_color
+        return logging.Formatter.format(self, record)
+
+
+def color_format():
+    """
+    Main entry point to get a colored formatter, it will use the
+    BASE_FORMAT by default and fall back to no colors if the system
+    does not support it
+    """
+    str_format = BASE_COLOR_FORMAT if supports_color() else BASE_FORMAT
+    color_format = color_message(str_format)
+    return ColoredFormatter(color_format)
diff --git a/radosgw_agent/util/network.py b/radosgw_agent/util/network.py
new file mode 100644 (file)
index 0000000..baa4e13
--- /dev/null
@@ -0,0 +1,20 @@
+import socket
+
+
+def is_ipv6(address):
+    """
+    Check if an address is an IPV6 one, but trim commonly used brackets as the
+    ``socket`` module complains about them.
+    """
+    if not isinstance(address, str):
+        return False
+
+    if address.startswith('['):  # assume we need to split on possible port
+        address = address.split(']:')[0]
+    # strip leading/trailing brackets so inet_pton understands the address
+    address = address.strip('[]')
+    try:
+        socket.inet_pton(socket.AF_INET6, address)
+    except socket.error:  # not a valid address
+        return False
+    return True
diff --git a/radosgw_agent/util/obj.py b/radosgw_agent/util/obj.py
new file mode 100644 (file)
index 0000000..e170fb6
--- /dev/null
@@ -0,0 +1,42 @@
+
+
+def to_dict(_object, **extra_keys):
+    """
+    A utility to convert an object with attributes to a dictionary with the
+    optional feature of slapping extra_keys. Because extra_keys can be
+    optionally set, it is assumed that any keys that clash will get
+    overwritten.
+
+    Private methods (anything that starts with `_`) are ignored.
+    """
+    dictified_obj = {}
+    for k, v in _object.__dict__.items():
+        if not k.startswith('_'):
+            # get key
+            value = extra_keys.pop(k, v)
+            dictified_obj[k] = value
+    if extra_keys:
+        for k, v in extra_keys.items():
+            dictified_obj[k] = v
+
+    return dictified_obj
+
+
+def to_obj(dictionary, name="BucketEntry"):
+    """
+    Because some objects are dynamic, we are forced to skip namedtuples
+    and set the attributes from keys in dictionaries so that accessing them
+    is easier and compatible with code that accesses them as regular objects.
+
+    .. note: dashes are converted to underscores
+    """
+    class Meta(object):
+
+        def __init__(self, **kw):
+            for k, v in kw.items():
+                k = k.replace('-', '_')
+                setattr(self, k, v)
+
+    obj_ = Meta(**dictionary)
+    obj_.__class__.__name__ = name
+    return obj_
diff --git a/radosgw_agent/util/string.py b/radosgw_agent/util/string.py
new file mode 100644 (file)
index 0000000..53787d6
--- /dev/null
@@ -0,0 +1,14 @@
+
+def concatenate(*a, **kw):
+    """
+    helper function to concatenate all arguments with added (optional)
+    newlines
+    """
+    newline = kw.get('newline', False)
+    string = ''
+    for item in a:
+        if newline:
+            string += item + '\n'
+        else:
+            string += item
+    return string
diff --git a/radosgw_agent/worker.py b/radosgw_agent/worker.py
new file mode 100644 (file)
index 0000000..e5dde5a
--- /dev/null
@@ -0,0 +1,561 @@
+from collections import namedtuple
+from itertools import ifilter
+import logging
+import multiprocessing
+import os
+import socket
+import time
+
+from radosgw_agent import client
+from radosgw_agent import lock
+from radosgw_agent.util import obj as obj_, get_dev_logger
+from radosgw_agent.exceptions import SkipShard, SyncError, SyncTimedOut, SyncFailed, NotFound, BucketEmpty
+from radosgw_agent.constants import DEFAULT_TIME, RESULT_SUCCESS, RESULT_ERROR
+
+log = logging.getLogger(__name__)
+dev_log = get_dev_logger(__name__)
+
+
+class Worker(multiprocessing.Process):
+    """sync worker to run in its own process"""
+
+    def __init__(self, work_queue, result_queue, log_lock_time,
+                 src, dest, **kwargs):
+        super(Worker, self).__init__()
+        self.src = src
+        self.dest = dest
+        self.work_queue = work_queue
+        self.result_queue = result_queue
+        self.log_lock_time = log_lock_time
+        self.lock = None
+
+        self.local_lock_id = socket.gethostname() + ':' + str(os.getpid())
+
+        # construct the two connection objects
+        self.src_conn = client.connection(src)
+        self.dest_conn = client.connection(dest)
+
+    def prepare_lock(self):
+        assert self.lock is None
+        self.lock = lock.Lock(self.dest_conn, self.type, self.local_lock_id,
+                              self.log_lock_time, self.dest.zone.name)
+        self.lock.daemon = True
+        self.lock.start()
+
+    def lock_shard(self, shard_num):
+        result = shard_num, []
+        try:
+            self.lock.set_shard(shard_num)
+            self.lock.acquire()
+        except NotFound:
+            # no log means nothing changed this shard yet
+            self.lock.unset_shard()
+            self.result_queue.put((RESULT_SUCCESS, result))
+            raise SkipShard('no log for shard')
+        except Exception:
+            log.warn('error locking shard %d log, '
+                     ' skipping for now. Traceback: ',
+                     shard_num, exc_info=True)
+            self.lock.unset_shard()
+            self.result_queue.put((RESULT_ERROR, result))
+            raise SkipShard()
+
+    def unlock_shard(self):
+        try:
+            self.lock.release_and_clear()
+        except lock.LockBroken as e:
+            log.warn('work may be duplicated: %s', e)
+        except Exception as e:
+            log.warn('error unlocking log, continuing anyway '
+                     'since lock will timeout. Traceback:', exc_info=True)
+
+    def set_bound(self, key, marker, retries, type_=None):
+        # api doesn't allow setting a bound with a blank marker
+        if marker:
+            if type_ is None:
+                type_ = self.type
+            try:
+                data = [
+                    obj_.to_dict(item, time=DEFAULT_TIME) for item in retries
+                ]
+                client.set_worker_bound(self.dest_conn,
+                                        type_,
+                                        marker,
+                                        DEFAULT_TIME,
+                                        self.daemon_id,
+                                        key,
+                                        data=data)
+                return RESULT_SUCCESS
+            except Exception:
+                log.warn('error setting worker bound for key "%s",'
+                         ' may duplicate some work later. Traceback:', key,
+                         exc_info=True)
+                return RESULT_ERROR
+
+MetadataEntry = namedtuple('MetadataEntry',
+                           ['section', 'name', 'marker', 'timestamp'])
+
+
+def _meta_entry_from_json(entry):
+    return MetadataEntry(
+        entry['section'],
+        entry['name'],
+        entry['id'],
+        entry['timestamp'],
+        )
+
+BucketIndexEntry = namedtuple('BucketIndexEntry',
+                              [
+                                  'object',
+                                  'marker',
+                                  'timestamp',
+                                  'op',
+                                  'versioned',
+                                  'ver',
+                                  'name',
+                                  # compatibility with boto objects:
+                                  'VersionedEpoch',
+                                  'version_id',
+                              ])
+
+BucketVer = namedtuple('BucketVer',
+        [
+            'epoch',
+            'pool',
+        ])
+
+
+def _bi_entry_from_json(entry):
+    ver = entry.get('ver', {})
+    entry_ver = BucketVer(
+        ver.get('epoch'),
+        ver.get('pool')
+    )
+
+    # compatibility with boto objects:
+    VersionedEpoch = ver.get('epoch')
+    version_id = entry.get('instance', 'null')
+
+    return BucketIndexEntry(
+        entry['object'],
+        entry['op_id'],
+        entry['timestamp'],
+        entry.get('op', ''),
+        entry.get('versioned', False),
+        entry_ver,
+        entry['object'],
+        VersionedEpoch,
+        version_id,
+        )
+
+
+def filter_versioned_objects(entry):
+    """
+    On incremental sync operations, the log may indicate that 'olh' entries,
+    which should be ignored. So this filter function will check for the
+    different attributes present in an ``entry`` and return only valid ones.
+
+    This should be backwards compatible with older gateways that return log
+    entries that don't support versioning.
+    """
+    # do not attempt filtering on non-versioned entries
+    if not entry.versioned:
+        return entry
+
+    # writes or delete 'op' values should be ignored
+    if entry.op not in ['write', 'delete']:
+        # allowed op states are `link_olh` and `link_olh_del`
+        return entry
+
+
+class IncrementalMixin(object):
+    """This defines run() and get_and_process_entries() for incremental sync.
+
+    These are the same for data and metadata sync, so share their
+    implementation here.
+    """
+
+    def run(self):
+        self.prepare_lock()
+        while True:
+            item = self.work_queue.get()
+            if item is None:
+                dev_log.info('process %s is done. Exiting', self.ident)
+                break
+
+            shard_num, (log_entries, retries) = item
+
+            log.info('%s is processing shard number %d',
+                     self.ident, shard_num)
+
+            # first, lock the log
+            try:
+                self.lock_shard(shard_num)
+            except SkipShard:
+                continue
+
+            result = RESULT_SUCCESS
+            try:
+                new_retries = self.sync_entries(log_entries, retries)
+            except Exception:
+                log.exception('syncing entries for shard %d failed',
+                              shard_num)
+                result = RESULT_ERROR
+                new_retries = []
+
+            # finally, unlock the log
+            self.unlock_shard()
+            self.result_queue.put((result, (shard_num, new_retries)))
+            log.info('finished processing shard %d', shard_num)
+
+
+class DataWorker(Worker):
+
+    def __init__(self, *args, **kwargs):
+        super(DataWorker, self).__init__(*args, **kwargs)
+        self.type = 'data'
+        self.op_id = 0
+        self.object_sync_timeout = kwargs.get('object_sync_timeout', 60 * 60 * 60)
+        self.daemon_id = kwargs['daemon_id']
+
+    def sync_object(self, bucket, obj):
+        log.debug('syncing object %s/%s', bucket, obj.name)
+        self.op_id += 1
+        local_op_id = self.local_lock_id + ':' +  str(self.op_id)
+        found = False
+
+        try:
+            until = time.time() + self.object_sync_timeout
+            client.sync_object_intra_region(self.dest_conn, bucket, obj,
+                                            self.src.zone.name,
+                                            self.daemon_id,
+                                            local_op_id)
+            found = True
+        except NotFound:
+            log.debug('object "%s/%s" not found on master, deleting from secondary',
+                      bucket, obj.name)
+            try:
+                client.delete_object(self.dest_conn, bucket, obj)
+            except NotFound:
+                # Since we were trying to delete the object, just return
+                return False
+            except Exception:
+                msg = 'could not delete "%s/%s" from secondary' % (bucket, obj.name)
+                log.exception(msg)
+                raise SyncFailed(msg)
+        except SyncFailed:
+            raise
+        except Exception as error:
+            msg = 'encountered an error during sync'
+            dev_log.warn(msg, exc_info=True)
+            log.warning('%s: %s' % (msg, error))
+            # wait for it if the op state is in-progress
+            self.wait_for_object(bucket, obj, until, local_op_id)
+        # TODO: clean up old op states
+        try:
+            if found:
+                client.remove_op_state(self.dest_conn, self.daemon_id,
+                                       local_op_id, bucket, obj)
+        except NotFound:
+            log.debug('op state already gone')
+        except Exception:
+            log.exception('could not remove op state for daemon "%s" op_id %s',
+                          self.daemon_id, local_op_id)
+
+        return True
+
+    def wait_for_object(self, bucket, obj, until, local_op_id):
+        while time.time() < until:
+            try:
+                state = client.get_op_state(self.dest_conn,
+                                            self.daemon_id,
+                                            local_op_id,
+                                            bucket, obj)
+                log.debug('op state is %s', state)
+                if not state:
+                    time.sleep(1)
+                    continue
+                state = state[0]['state']
+                if state == 'complete':
+                    return
+                elif state != 'in-progress':
+                    raise SyncFailed('state is {0}'.format(state))
+                time.sleep(1)
+            except SyncFailed:
+                raise
+            except NotFound:
+                raise SyncFailed('object copy state not found')
+            except Exception as e:
+                log.debug('error geting op state: %s', e, exc_info=True)
+                log.info('will try to get op state again')
+                time.sleep(1)
+        # timeout expired
+        raise SyncTimedOut()
+
+    def get_bucket_instance(self, bucket):
+        metadata = client.get_metadata(self.src_conn, 'bucket', bucket)
+        return bucket + ':' + metadata['data']['bucket']['bucket_id']
+
+    def get_bucket(self, bucket_instance):
+        return bucket_instance.split(':', 1)[0]
+
+    def sync_bucket(self, bucket, objects):
+        log.info('*'*80)
+        log.info('syncing bucket "%s"', bucket)
+        retry_objs = []
+        count = 0
+        for obj in objects:
+            try:
+                self.sync_object(bucket, obj)
+                count += 1
+            except SyncError as err:
+                log.error('failed to sync object %s/%s: %s',
+                          bucket, obj.name, err)
+                log.warning(
+                    'will retry sync of failed object at next incremental sync'
+                )
+                retry_objs.append(obj)
+        log.info('synced %s objects' % count)
+        log.info('completed syncing bucket "%s"', bucket)
+        log.info('*'*80)
+
+        return retry_objs
+
+
+class DataWorkerIncremental(IncrementalMixin, DataWorker):
+
+    def __init__(self, *args, **kwargs):
+        super(DataWorkerIncremental, self).__init__(*args, **kwargs)
+        self.max_entries = kwargs['max_entries']
+
+    def get_bucket_instance_entries(self, marker, instance):
+        entries = []
+        while True:
+            try:
+                log_entries = client.get_log(self.src_conn, 'bucket-index',
+                                             marker, self.max_entries, instance)
+            except NotFound:
+                log_entries = []
+
+            log.debug('bucket instance "%s" has %d entries after "%s"', instance,
+                      len(log_entries), marker)
+
+            try:
+                entries += [_bi_entry_from_json(entry) for entry in log_entries]
+            except KeyError:
+                log.error('log missing key is: %s', log_entries)
+                raise
+
+            if entries:
+                marker = entries[-1].marker
+            else:
+                marker = ' '
+
+            if len(log_entries) < self.max_entries:
+                break
+        return marker, entries
+
+    def inc_sync_bucket_instance(self, instance, marker, timestamp, retries):
+        max_marker, entries = self.get_bucket_instance_entries(marker, instance)
+
+        # regardless if entries are versioned, make sure we filter them
+        entries = [i for i in ifilter(filter_versioned_objects, entries)]
+
+        objects = set([entry for entry in entries])
+        bucket = self.get_bucket(instance)
+        new_retries = self.sync_bucket(bucket, objects.union(retries))
+
+        result = self.set_bound(instance, max_marker, new_retries,
+                                'bucket-index')
+        if new_retries:
+            result = RESULT_ERROR
+        return result
+
+    def sync_entries(self, log_entries, retries):
+        try:
+            bucket_instances = set([entry['key'] for entry in log_entries])
+        except KeyError:
+            log.error('log containing bad key is: %s', log_entries)
+            raise
+
+        new_retries = []
+        for bucket_instance in bucket_instances.union(retries):
+            if ':' not in bucket_instance:
+                # it's just a plain bucket from an old version of the agent
+                bucket_instance = self.get_bucket_instance(bucket_instance)
+
+            bound = client.get_worker_bound(
+                self.dest_conn,
+                'bucket-index',
+                bucket_instance)
+
+            marker = bound['marker']
+            # remap dictionaries to object-like
+            retries = [obj_.to_obj(i) for i in bound['retries']]
+            timestamp = bound['oldest_time']
+
+            try:
+                sync_result = self.inc_sync_bucket_instance(bucket_instance,
+                                                            marker,
+                                                            timestamp,
+                                                            retries)
+            except Exception as e:
+                log.warn('error syncing bucket instance "%s": %s',
+                         bucket_instance, e, exc_info=True)
+                sync_result = RESULT_ERROR
+            if sync_result == RESULT_ERROR:
+                new_retries.append(bucket_instance)
+
+        return new_retries
+
+
+class DataWorkerFull(DataWorker):
+
+    def full_sync_bucket(self, bucket):
+        try:
+            instance = self.get_bucket_instance(bucket)
+            try:
+                marker = client.get_log_info(self.src_conn, 'bucket-index',
+                                             instance)['max_marker']
+            except NotFound:
+                marker = ' '
+            log.debug('bucket instance is "%s" with marker %s', instance, marker)
+
+            objects = client.list_objects_in_bucket(self.src_conn, bucket)
+            retries = self.sync_bucket(bucket, objects)
+
+            result = self.set_bound(instance, marker, retries, 'bucket-index')
+            return not retries and result == RESULT_SUCCESS
+        except BucketEmpty:
+            log.debug('no objects in bucket %s', bucket)
+            return True
+        except Exception:
+            log.exception('error preparing for full sync of bucket "%s"',
+                          bucket)
+            return False
+
+    def run(self):
+        self.prepare_lock()
+        while True:
+            item = self.work_queue.get()
+            if item is None:
+                log.info('No more entries in queue, exiting')
+                break
+
+            shard_num, buckets = item
+
+            # first, lock the log
+            try:
+                self.lock_shard(shard_num)
+            except SkipShard:
+                continue
+
+            # attempt to sync each bucket, add to a list to retry
+            # during incremental sync if sync fails
+            retry_buckets = []
+            for bucket in buckets:
+                if not self.full_sync_bucket(bucket):
+                    retry_buckets.append(bucket)
+
+            # unlock shard and report buckets to retry during incremental sync
+            self.unlock_shard()
+            self.result_queue.put((RESULT_SUCCESS, (shard_num, retry_buckets)))
+            log.info('finished syncing shard %d', shard_num)
+            if retry_buckets:
+                log.info('incremental sync will need to retry buckets: %s',
+                         retry_buckets)
+
+
+class MetadataWorker(Worker):
+
+    def __init__(self, *args, **kwargs):
+        super(MetadataWorker, self).__init__(*args, **kwargs)
+        self.type = 'metadata'
+
+    def sync_meta(self, section, name):
+        log.debug('syncing metadata type %s key "%s"', section, name)
+        try:
+            metadata = client.get_metadata(self.src_conn, section, name)
+        except NotFound:
+            log.debug('%s "%s" not found on master, deleting from secondary',
+                      section, name)
+            try:
+                client.delete_metadata(self.dest_conn, section, name)
+            except NotFound:
+                # Since this error is handled appropriately, return success
+                return RESULT_SUCCESS
+        except Exception as e:
+            log.warn('error getting metadata for %s "%s": %s',
+                     section, name, e, exc_info=True)
+            return RESULT_ERROR
+        else:
+            try:
+                client.update_metadata(self.dest_conn, section, name, metadata)
+                return RESULT_SUCCESS
+            except Exception as e:
+                log.warn('error updating metadata for %s "%s": %s',
+                          section, name, e, exc_info=True)
+                return RESULT_ERROR
+
+class MetadataWorkerIncremental(IncrementalMixin, MetadataWorker):
+
+    def __init__(self, *args, **kwargs):
+        super(MetadataWorkerIncremental, self).__init__(*args, **kwargs)
+
+    def sync_entries(self, log_entries, retries):
+        try:
+            entries = [_meta_entry_from_json(entry) for entry in log_entries]
+        except KeyError:
+            log.error('log containing bad key is: %s', log_entries)
+            raise
+
+        new_retries = []
+        mentioned = set([(entry.section, entry.name) for entry in entries])
+        split_retries = [tuple(entry.split('/', 1)) for entry in retries]
+        for section, name in mentioned.union(split_retries):
+            sync_result = self.sync_meta(section, name)
+            if sync_result == RESULT_ERROR:
+                new_retries.append(section + '/' + name)
+
+        return new_retries
+
+class MetadataWorkerFull(MetadataWorker):
+
+    def empty_result(self, shard):
+        return shard, []
+
+    def run(self):
+        self.prepare_lock()
+        while True:
+            item = self.work_queue.get()
+            if item is None:
+                log.info('No more entries in queue, exiting')
+                break
+
+            log.debug('syncing item "%s"', item)
+
+            shard_num, metadata = item
+
+            # first, lock the log
+            try:
+                self.lock_shard(shard_num)
+            except SkipShard:
+                continue
+
+            # attempt to sync each bucket, add to a list to retry
+            # during incremental sync if sync fails
+            retries = []
+            for section, name in metadata:
+                try:
+                    self.sync_meta(section, name)
+                except Exception as e:
+                    log.warn('could not sync %s "%s", saving for retry: %s',
+                             section, name, e, exc_info=True)
+                    retries.append(section + '/' + name)
+
+            # unlock shard and report buckets to retry during incremental sync
+            self.unlock_shard()
+            self.result_queue.put((RESULT_SUCCESS, (shard_num, retries)))
+            log.info('finished syncing shard %d', shard_num)
+            log.info('incremental sync will need to retry items: %s',
+                     retries)
diff --git a/scripts/radosgw-agent b/scripts/radosgw-agent
new file mode 100644 (file)
index 0000000..e59d143
--- /dev/null
@@ -0,0 +1,21 @@
+#!/usr/bin/env python
+import os
+import platform
+import sys
+"""
+radosgw-agent - admin tool for ceph
+"""
+
+if os.path.exists('/usr/share/pyshared/radosgw_agent'):
+    sys.path.insert(0,'/usr/share/pyshared/radosgw_agent')
+elif os.path.exists('/usr/share/radosgw-agent'):
+    sys.path.insert(0,'/usr/share/radosgw-agent')
+elif os.path.exists('/usr/share/pyshared/radosgw-agent'):
+    sys.path.insert(0,'/usr/share/pyshared/radosgw-agent')
+elif os.path.exists('/usr/lib/python2.6/site-packages/radosgw_agent'):
+    sys.path.insert(0,'/usr/lib/python2.6/site-packages/radosgw_agent')
+
+from radosgw_agent.cli import main
+
+if __name__ == '__main__':
+    sys.exit(main())
diff --git a/setup.cfg b/setup.cfg
new file mode 100644 (file)
index 0000000..44432e4
--- /dev/null
+++ b/setup.cfg
@@ -0,0 +1,2 @@
+[bdist_rpm]
+requires = python-argparse,PyYAML,python-boto >= 2.2.2,python-boto < 3.0.0
diff --git a/setup.py b/setup.py
new file mode 100644 (file)
index 0000000..fcbce18
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,43 @@
+from setuptools import setup, find_packages
+import sys
+import re
+
+
+module_file = open("radosgw_agent/__init__.py").read()
+metadata = dict(
+    re.findall(r"__([a-z]+)__\s*=\s*['\"]([^'\"]*)['\"]", module_file))
+
+
+install_requires = [
+    'boto >=2.10.0,<3.0.0',
+    'PyYAML',
+]
+
+pyversion = sys.version_info[:2]
+if pyversion < (2, 7) or (3, 0) <= pyversion <= (3, 1):
+    install_requires.append('argparse')
+
+setup(
+    name='radosgw-agent',
+    version=metadata['version'],
+    packages=find_packages(),
+
+    author='Josh Durgin',
+    author_email='jdurgin@redhat.com',
+    description='Synchronize users and data between radosgw clusters',
+    license='MIT',
+    keywords='radosgw ceph radosgw-agent',
+    url="https://github.com/ceph/radosgw-agent",
+    install_requires=install_requires,
+    test_requires=[
+        'pytest',
+        'mock',
+        'tox',
+        'httpretty',
+    ],
+    entry_points={
+        'console_scripts': [
+            'radosgw-agent = radosgw_agent.cli:main',
+            ],
+        },
+    )
diff --git a/tox.ini b/tox.ini
new file mode 100644 (file)
index 0000000..ee51e2d
--- /dev/null
+++ b/tox.ini
@@ -0,0 +1,9 @@
+[tox]
+envlist = py26, py27
+
+[testenv]
+deps=
+  pytest
+  mock
+  httpretty
+commands=py.test -s -v {posargs:radosgw_agent/tests}