--- /dev/null
+*.py[cod]
+
+# C extensions
+*.so
+
+# Packages
+*.egg
+*.egg-info
+dist
+build
+eggs
+parts
+bin
+var
+sdist
+develop-eggs
+.installed.cfg
+lib
+lib64
+
+# Installer logs
+pip-log.txt
+
+# Unit test / coverage reports
+.coverage
+.tox
+nosetests.xml
+
+# Translations
+*.mo
+
+# Mr Developer
+.mr.developer.cfg
+.project
+.pydevproject
--- /dev/null
+Copyright (c) 2013 Inktank Storage, Inc.
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
--- /dev/null
+====================================================================
+radosgw-agent -- synchronize data and users between radosgw clusters
+====================================================================
--- /dev/null
+#!/bin/sh
+set -e
+
+if command -v lsb_release >/dev/null 2>&1; then
+ case "$(lsb_release --id --short)" in
+ Ubuntu|Debian)
+ for package in python-virtualenv; do
+ if [ "$(dpkg --status -- $package 2>/dev/null|sed -n 's/^Status: //p')" != "install ok installed" ]; then
+ # add a space after old values
+ missing="${missing:+$missing }$package"
+ fi
+ done
+ if [ -n "$missing" ]; then
+ echo "$0: missing required packages, please install them:" 1>&2
+ echo " sudo apt-get install $missing"
+ exit 1
+ fi
+ ;;
+ esac
+else
+ if [ -f /etc/redhat-release ]; then
+ case "$(cat /etc/redhat-release | awk '{print $1}')" in
+ CentOS)
+ for package in python-virtualenv; do
+ if [ "$(rpm -qa $package 2>/dev/null)" == "" ]; then
+ missing="${missing:+$missing }$package"
+ fi
+ done
+ if [ -n "$missing" ]; then
+ echo "$0: missing required packages, please install them:" 1>&2
+ echo " sudo yum install $missing"
+ exit 1
+ fi
+ ;;
+ esac
+ fi
+fi
+
+test -d virtualenv || virtualenv virtualenv
+./virtualenv/bin/python setup.py develop
+./virtualenv/bin/pip install -r requirements.txt -r requirements-dev.txt
+test -e radosgw-agent || ln -s ./virtualenv/bin/radosgw-agent .
--- /dev/null
+radosgw-agent (1.1-1) precise; urgency=low
+
+ * new upstream release
+
+ -- Gary Lowell <glowell@pudgy.ops.newdream.net> Thu, 21 Nov 2013 16:17:25 -0800
+
+radosgw-agent (1.0-1) stable; urgency=low
+
+ * Initial release
+
+ -- Gary Lowell <gary.lowell@inktank.com> Mon, 26 Aug 2013 09:19:47 -0700
--- /dev/null
+Source: radosgw-agent
+Maintainer: Sage Weil <sage@newdream.net>
+Uploaders: Sage Weil <sage@newdream.net>
+Section: admin
+Priority: optional
+Build-Depends: debhelper (>= 8), python-setuptools
+X-Python-Version: >= 2.4
+Standards-Version: 3.9.2
+Homepage: http://ceph.com/
+
+Package: radosgw-agent
+Architecture: all
+Depends: python,
+ python-argparse,
+ python-setuptools,
+ python-requests,
+ ${misc:Depends},
+ ${python:Depends}
+Description: Rados gateway agents.
--- /dev/null
+Files: *
+Copyright: (c) 2013 by Inktank Storage
+License: LGPL2.1 (see /usr/share/common-licenses/LGPL-2.1)
--- /dev/null
+#!/usr/bin/make -f
+
+# Uncomment this to turn on verbose mode.
+export DH_VERBOSE=1
+
+%:
+ dh $@ --buildsystem python_distutils --with python2
+
--- /dev/null
+%define name radosgw-agent
+%define version 1.1
+%define unmangled_version 1.1
+%define unmangled_version 1.1
+%define release 1
+
+Summary: Synchronize users and data between radosgw clusters
+Name: %{name}
+Version: %{version}
+Release: %{release}
+Source0: %{name}-%{unmangled_version}.tar.gz
+License: MIT
+Group: Development/Libraries
+BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-buildroot
+Prefix: %{_prefix}
+BuildArch: noarch
+Vendor: Josh Durgin <josh.durgin@inktank.com>
+Requires: python-argparse
+Requires: PyYAML
+Requires: python-boto >= 2.2.2
+Requires: python-boto < 3.0.0
+Requires: python-requests
+Url: https://github.com/ceph/radosgw-agent
+
+%description
+UNKNOWN
+
+%prep
+%setup -n %{name}-%{unmangled_version} -n %{name}-%{unmangled_version}
+
+%build
+python setup.py build
+
+%install
+python setup.py install --single-version-externally-managed -O1 --root=$RPM_BUILD_ROOT --record=INSTALLED_FILES
+
+%clean
+rm -rf $RPM_BUILD_ROOT
+
+%files -f INSTALLED_FILES
+%defattr(-,root,root)
--- /dev/null
+from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+import argparse
+import contextlib
+import logging
+import logging.handlers
+import yaml
+import sys
+
+from radosgw_agent import client
+from radosgw_agent import sync
+
+def check_positive_int(string):
+ value = int(string)
+ if value < 1:
+ msg = '%r is not a positive integer' % string
+ raise argparse.ArgumentTypeError(msg)
+ return value
+
+def check_endpoint(endpoint):
+ try:
+ return client.parse_endpoint(endpoint)
+ except client.InvalidProtocol as e:
+ raise argparse.ArgumentTypeError(str(e))
+ except client.InvalidHost as e:
+ raise argparse.ArgumentTypeError(str(e))
+
+def parse_args():
+ conf_parser = argparse.ArgumentParser(add_help=False)
+ conf_parser.add_argument(
+ '-c', '--conf',
+ type=file,
+ help='configuration file'
+ )
+ args, remaining = conf_parser.parse_known_args()
+ defaults = dict(
+ sync_scope='incremental',
+ log_lock_time=20,
+ )
+ if args.conf is not None:
+ with contextlib.closing(args.conf):
+ config = yaml.safe_load_all(args.conf)
+ for new in config:
+ defaults.update(new)
+
+ parser = argparse.ArgumentParser(
+ parents=[conf_parser],
+ description='Synchronize radosgw installations',
+ )
+ parser.set_defaults(**defaults)
+ verbosity = parser.add_mutually_exclusive_group(required=False)
+ verbosity.add_argument(
+ '-v', '--verbose',
+ action='store_true', dest='verbose',
+ help='be more verbose',
+ )
+ verbosity.add_argument(
+ '-q', '--quiet',
+ action='store_true', dest='quiet',
+ help='be less verbose',
+ )
+ parser.add_argument(
+ '--src-access-key',
+ required='src_access_key' not in defaults,
+ help='access key for source zone system user',
+ )
+ parser.add_argument(
+ '--src-secret-key',
+ required='src_secret_key' not in defaults,
+ help='secret key for source zone system user',
+ )
+ parser.add_argument(
+ '--dest-access-key',
+ required='dest_access_key' not in defaults,
+ help='access key for destination zone system user',
+ )
+ parser.add_argument(
+ '--dest-secret-key',
+ required='dest_secret_key' not in defaults,
+ help='secret key for destination zone system user',
+ )
+ parser.add_argument(
+ 'destination',
+ type=check_endpoint,
+ nargs=None if 'destination' not in defaults else '?',
+ help='radosgw endpoint to which to sync '
+ '(e.g. http://zone2.example.org:8080)',
+ )
+ src_options = parser.add_mutually_exclusive_group(required=False)
+ src_options.add_argument(
+ '--source',
+ type=check_endpoint,
+ help='radosgw endpoint from which to sync '
+ '(e.g. http://zone1.example.org:8080)',
+ )
+ src_options.add_argument(
+ '--src-zone',
+ help='radosgw zone from which to sync',
+ )
+ parser.add_argument(
+ '--metadata-only',
+ action='store_true',
+ help='sync bucket and user metadata, but not bucket contents',
+ )
+ parser.add_argument(
+ '--num-workers',
+ default=1,
+ type=check_positive_int,
+ help='number of items to sync at once',
+ )
+ parser.add_argument(
+ '--sync-scope',
+ choices=['full', 'incremental'],
+ default='incremental',
+ help='synchronize everything (for a new region) or only things that '
+ 'have changed since the last run',
+ )
+ parser.add_argument(
+ '--lock-timeout',
+ type=check_positive_int,
+ default=60,
+ help='timeout in seconds after which a log segment lock will expire if '
+ 'not refreshed',
+ )
+ parser.add_argument(
+ '--log-file',
+ help='where to store log output',
+ )
+ parser.add_argument(
+ '--max-entries',
+ type=check_positive_int,
+ default=1000,
+ help='maximum number of log entries to process at once during '
+ 'continuous sync',
+ )
+ parser.add_argument(
+ '--incremental-sync-delay',
+ type=check_positive_int,
+ default=30,
+ help='seconds to wait between syncs',
+ )
+ parser.add_argument(
+ '--object-sync-timeout',
+ type=check_positive_int,
+ default=60 * 60 * 60,
+ help='seconds to wait for an individual object to sync before '
+ 'assuming failure',
+ )
+ parser.add_argument(
+ '--prepare-error-delay',
+ type=check_positive_int,
+ default=10,
+ help='seconds to wait before retrying when preparing '
+ 'an incremental sync fails',
+ )
+ parser.add_argument(
+ '--rgw-data-log-window',
+ type=check_positive_int,
+ default=30,
+ help='period until a data log entry is valid - '
+ 'must match radosgw configuration',
+ )
+ parser.add_argument(
+ '--test-server-host',
+ # host to run a simple http server for testing the sync agent on,
+ help=argparse.SUPPRESS,
+ )
+ parser.add_argument(
+ '--test-server-port',
+ # port to run a simple http server for testing the sync agent on,
+ type=check_positive_int,
+ default=8080,
+ help=argparse.SUPPRESS,
+ )
+ return parser.parse_args(remaining)
+
+class TestHandler(BaseHTTPRequestHandler):
+ """HTTP handler for testing radosgw-agent.
+
+ This should never be used outside of testing.
+ """
+ num_workers = None
+ lock_timeout = None
+ max_entries = None
+ rgw_data_log_window = 30
+ src = None
+ dest = None
+
+ def do_POST(self):
+ log = logging.getLogger(__name__)
+ status = 200
+ resp = ''
+ sync_cls = None
+ if self.path.startswith('/metadata/full'):
+ sync_cls = sync.MetaSyncerFull
+ elif self.path.startswith('/metadata/incremental'):
+ sync_cls = sync.MetaSyncerInc
+ elif self.path.startswith('/data/full'):
+ sync_cls = sync.DataSyncerFull
+ elif self.path.startswith('/data/incremental'):
+ sync_cls = sync.DataSyncerInc
+ else:
+ log.warn('invalid request, ignoring')
+ status = 400
+ resp = 'bad path'
+
+ try:
+ if sync_cls is not None:
+ syncer = sync_cls(TestHandler.src, TestHandler.dest,
+ TestHandler.max_entries,
+ rgw_data_log_window=TestHandler.rgw_data_log_window,
+ object_sync_timeout=TestHandler.object_sync_timeout)
+ syncer.prepare()
+ syncer.sync(
+ TestHandler.num_workers,
+ TestHandler.lock_timeout,
+ )
+ except Exception as e:
+ log.exception('error during sync')
+ status = 500
+ resp = str(e)
+
+ self.log_request(status, len(resp))
+ if status >= 400:
+ self.send_error(status, resp)
+ else:
+ self.send_response(status)
+ self.end_headers()
+
+def main():
+ args = parse_args()
+ log = logging.getLogger()
+ log_level = logging.INFO
+ lib_log_level = logging.WARN
+ if args.verbose:
+ log_level = logging.DEBUG
+ lib_log_level = logging.DEBUG
+ elif args.quiet:
+ log_level = logging.WARN
+ logging.basicConfig(level=log_level)
+ logging.getLogger('boto').setLevel(lib_log_level)
+ logging.getLogger('requests').setLevel(lib_log_level)
+
+ if args.log_file is not None:
+ handler = logging.handlers.WatchedFileHandler(
+ filename=args.log_file,
+ )
+ formatter = logging.Formatter(
+ fmt='%(asctime)s.%(msecs)03d %(process)d:%(levelname)s:%(name)s:%(message)s',
+ datefmt='%Y-%m-%dT%H:%M:%S',
+ )
+ handler.setFormatter(formatter)
+ logging.getLogger().addHandler(handler)
+
+ dest = args.destination
+ dest.access_key = args.dest_access_key
+ dest.secret_key = args.dest_secret_key
+ src = args.source or client.Endpoint(None, None, None)
+ if args.src_zone:
+ src.zone = args.src_zone
+ dest_conn = client.connection(dest)
+
+ try:
+ region_map = client.get_region_map(dest_conn)
+ except Exception:
+ log.exception('Could not retrieve region map from destination')
+ sys.exit(1)
+
+ try:
+ client.configure_endpoints(region_map, dest, src, args.metadata_only)
+ except client.ClientException as e:
+ log.error(e)
+ sys.exit(1)
+
+ src.access_key = args.src_access_key
+ src.secret_key = args.src_secret_key
+
+ if args.test_server_host:
+ log.warn('TEST MODE - do not run unless you are testing this program')
+ TestHandler.src = src
+ TestHandler.dest = dest
+ TestHandler.num_workers = args.num_workers
+ TestHandler.lock_timeout = args.lock_timeout
+ TestHandler.max_entries = args.max_entries
+ TestHandler.rgw_data_log_window = args.rgw_data_log_window
+ TestHandler.object_sync_timeout = args.object_sync_timeout
+ server = HTTPServer((args.test_server_host, args.test_server_port),
+ TestHandler)
+ server.serve_forever()
+ sys.exit()
+
+ if args.sync_scope == 'full':
+ meta_cls = sync.MetaSyncerFull
+ data_cls = sync.DataSyncerFull
+ else:
+ meta_cls = sync.MetaSyncerInc
+ data_cls = sync.DataSyncerInc
+
+ meta_syncer = meta_cls(src, dest, args.max_entries)
+ data_syncer = data_cls(src, dest, args.max_entries,
+ rgw_data_log_window=args.rgw_data_log_window,
+ object_sync_timeout=args.object_sync_timeout)
+
+ # fetch logs first since data logs need to wait before becoming usable
+ # due to rgw's window of data log updates during which the bucket index
+ # log may still be updated without the data log getting a new entry for
+ # the bucket
+ sync.prepare_sync(meta_syncer, args.prepare_error_delay)
+ if not args.metadata_only:
+ sync.prepare_sync(data_syncer, args.prepare_error_delay)
+
+ if args.sync_scope == 'full':
+ log.info('syncing all metadata')
+ meta_syncer.sync(args.num_workers, args.lock_timeout)
+ if not args.metadata_only:
+ log.info('syncing all data')
+ data_syncer.sync(args.num_workers, args.lock_timeout)
+ log.info('Finished full sync. Check logs to see any issues that '
+ 'incremental sync will retry.')
+ else:
+ sync.incremental_sync(meta_syncer, data_syncer,
+ args.num_workers,
+ args.lock_timeout,
+ args.incremental_sync_delay,
+ args.metadata_only,
+ args.prepare_error_delay)
--- /dev/null
+import boto
+import functools
+import json
+import logging
+import random
+import requests
+import urllib
+from urlparse import urlparse
+
+from boto.connection import AWSAuthConnection
+from boto.s3.connection import S3Connection
+
+log = logging.getLogger(__name__)
+
+class Endpoint(object):
+ def __init__(self, host, port, secure,
+ access_key=None, secret_key=None, region=None, zone=None):
+ self.host = host
+ default_port = 443 if secure else 80
+ self.port = port or default_port
+ self.secure = secure
+ self.access_key = access_key
+ self.secret_key = secret_key
+ self.region = region
+ self.zone = zone
+
+ def __eq__(self, other):
+ if self.host != other.host:
+ return False
+ if self.port == other.port:
+ return True
+ # if self and other are mixed http/https with default ports,
+ # i.e. http://example.com and https://example.com, consider
+ # them the same
+ def diff_only_default_ports(a, b):
+ return a.secure and a.port == 443 and not b.secure and b.port == 80
+ return (diff_only_default_ports(self, other) or
+ diff_only_default_ports(other, self))
+
+ def __repr__(self):
+ return 'Endpoint(host={host}, port={port}, secure={secure})'.format(
+ host=self.host,
+ port=self.port,
+ secure=self.secure)
+
+ def __str__(self):
+ scheme = 'https' if self.secure else 'http'
+ return '{scheme}://{host}:{port}'.format(scheme=scheme,
+ host=self.host,
+ port=self.port)
+
+class ClientException(Exception):
+ pass
+class InvalidProtocol(ClientException):
+ pass
+class InvalidHost(ClientException):
+ pass
+class InvalidZone(ClientException):
+ pass
+class ZoneNotFound(ClientException):
+ pass
+
+def parse_endpoint(endpoint):
+ url = urlparse(endpoint)
+ if url.scheme not in ['http', 'https']:
+ raise InvalidProtocol('invalid protocol %r' % url.scheme)
+ if not url.hostname:
+ raise InvalidHost('no hostname in %r' % endpoint)
+ return Endpoint(url.hostname, url.port, url.scheme == 'https')
+
+class HttpError(ClientException):
+ def __init__(self, code, body):
+ self.code = code
+ self.body = body
+ self.message = 'Http error code %s content %s' % (code, body)
+ def __str__(self):
+ return self.message
+class NotFound(HttpError):
+ pass
+code_to_exc = {
+ 404: NotFound,
+ }
+
+def boto_call(func):
+ @functools.wraps(func)
+ def translate_exception(*args, **kwargs):
+ try:
+ func(*args, **kwargs)
+ except boto.exception.S3ResponseError as e:
+ raise code_to_exc.get(e.status, HttpError)(e.status, e.body)
+ return translate_exception
+
+
+"""
+Adapted from the build_request() method of boto.connection
+"""
+
+def _build_request(conn, method, basepath='', resource = '', headers=None,
+ data=None, special_first_param=None, params=None):
+ path = conn.calling_format.build_path_base(basepath, resource)
+ auth_path = conn.calling_format.build_auth_path(basepath, resource)
+ host = conn.calling_format.build_host(conn.server_name(), '')
+
+ if special_first_param:
+ path += '?' + special_first_param
+ boto.log.debug('path=%s' % path)
+ auth_path += '?' + special_first_param
+ boto.log.debug('auth_path=%s' % auth_path)
+
+ return AWSAuthConnection.build_base_http_request(
+ conn, method, path, auth_path, params, headers, data, host)
+
+def check_result_status(result):
+ if result.status_code / 100 != 2:
+ raise code_to_exc.get(result.status_code,
+ HttpError)(result.status_code, result.content)
+def url_safe(component):
+ if isinstance(component, basestring):
+ string = component.encode('utf8')
+ else:
+ string = str(component)
+ return urllib.quote(string)
+
+def request(connection, type_, resource, params=None, headers=None,
+ data=None, expect_json=True, special_first_param=None):
+ if headers is None:
+ headers = {}
+
+ if type_ in ['put', 'post']:
+ headers['Content-Type'] = 'application/json; charset=UTF-8'
+
+ request_data = data if data else ''
+ if params is None:
+ params = {}
+ safe_params = dict([(k, url_safe(v)) for k, v in params.iteritems()])
+ request = _build_request(connection,
+ type_.upper(),
+ resource=resource,
+ special_first_param=special_first_param,
+ headers=headers,
+ data=request_data,
+ params=safe_params)
+
+ url = '{protocol}://{host}{path}'.format(protocol=request.protocol,
+ host=request.host,
+ path=request.path)
+
+ request.authorize(connection=connection)
+
+ handler = getattr(requests, type_)
+ boto.log.debug('url = %r\nparams=%r\nheaders=%r\ndata=%r',
+ url, params, request.headers, data)
+ result = handler(url, params=params, headers=request.headers, data=data)
+
+ check_result_status(result)
+
+ if data or not expect_json:
+ return result.raw
+ return result.json()
+
+def get_metadata(connection, section, name):
+ return request(connection, 'get', 'admin/metadata/' + section,
+ params=dict(key=name))
+
+def update_metadata(connection, section, name, metadata):
+ if not isinstance(metadata, basestring):
+ metadata = json.dumps(metadata)
+ return request(connection, 'put', 'admin/metadata/' + section,
+ params=dict(key=name), data=metadata)
+
+def delete_metadata(connection, section, name):
+ return request(connection, 'delete', 'admin/metadata/' + section,
+ params=dict(key=name), expect_json=False)
+
+def get_metadata_sections(connection):
+ return request(connection, 'get', 'admin/metadata')
+
+def list_metadata_keys(connection, section):
+ return request(connection, 'get', 'admin/metadata/' + section)
+
+def get_op_state(connection, client_id, op_id, bucket, obj):
+ return request(connection, 'get', 'admin/opstate',
+ params={
+ 'op-id': op_id,
+ 'object': '{0}/{1}'.format(bucket, obj),
+ 'client-id': client_id,
+ }
+ )
+
+def remove_op_state(connection, client_id, op_id, bucket, obj):
+ return request(connection, 'delete', 'admin/opstate',
+ params={
+ 'op-id': op_id,
+ 'object': '{0}/{1}'.format(bucket, obj),
+ 'client-id': client_id,
+ },
+ expect_json=False,
+ )
+
+def get_bucket_list(connection):
+ return list_metadata_keys(connection, 'bucket')
+
+@boto_call
+def list_objects_in_bucket(connection, bucket_name):
+ # use the boto library to do this
+ bucket = connection.get_bucket(bucket_name)
+ return bucket.list()
+
+@boto_call
+def delete_object(connection, bucket_name, object_name):
+ bucket = connection.get_bucket(bucket_name)
+ bucket.delete_key(object_name)
+
+def sync_object_intra_region(connection, bucket_name, object_name, src_zone,
+ client_id, op_id):
+ path = '{bucket}/{object}'.format(
+ bucket=url_safe(bucket_name),
+ object=url_safe(object_name),
+ )
+ return request(connection, 'put', path,
+ params={
+ 'rgwx-source-zone': src_zone,
+ 'rgwx-client-id': client_id,
+ 'rgwx-op-id': op_id,
+ },
+ headers={
+ 'x-amz-copy-source': '%s/%s' % (bucket_name, object_name),
+ },
+ expect_json=False)
+
+def lock_shard(connection, lock_type, shard_num, zone_id, timeout, locker_id):
+ return request(connection, 'post', 'admin/log',
+ params={
+ 'type': lock_type,
+ 'id': shard_num,
+ 'length': timeout,
+ 'zone-id': zone_id,
+ 'locker-id': locker_id,
+ },
+ special_first_param='lock',
+ expect_json=False)
+
+def unlock_shard(connection, lock_type, shard_num, zone_id, locker_id):
+ return request(connection, 'post', 'admin/log',
+ params={
+ 'type': lock_type,
+ 'id': shard_num,
+ 'locker-id': locker_id,
+ 'zone-id': zone_id,
+ },
+ special_first_param='unlock',
+ expect_json=False)
+
+def _id_name(type_):
+ return 'bucket-instance' if type_ == 'bucket-index' else 'id'
+
+def get_log(connection, log_type, marker, max_entries, id_):
+ key = _id_name(log_type)
+ return request(connection, 'get', 'admin/log',
+ params={
+ 'type': log_type,
+ key: id_,
+ 'marker': marker,
+ 'max-entries': max_entries,
+ },
+ )
+
+def get_log_info(connection, log_type, id_):
+ key = _id_name(log_type)
+ return request(
+ connection, 'get', 'admin/log',
+ params={
+ 'type': log_type,
+ key: id_,
+ },
+ special_first_param='info',
+ )
+
+def num_log_shards(connection, shard_type):
+ out = request(connection, 'get', 'admin/log', dict(type=shard_type))
+ return out['num_objects']
+
+def set_worker_bound(connection, type_, marker, timestamp,
+ daemon_id, id_, data=None):
+ if data is None:
+ data = []
+ key = _id_name(type_)
+ boto.log.debug('set_worker_bound: data = %r', data)
+ return request(
+ connection, 'post', 'admin/replica_log',
+ params={
+ 'type': type_,
+ key: id_,
+ 'marker': marker,
+ 'time': timestamp,
+ 'daemon_id': daemon_id,
+ },
+ data=json.dumps(data),
+ special_first_param='work_bound',
+ )
+
+def del_worker_bound(connection, type_, daemon_id, id_):
+ key = _id_name(type_)
+ return request(
+ connection, 'delete', 'admin/replica_log',
+ params={
+ 'type': type_,
+ key: id_,
+ 'daemon_id': daemon_id,
+ },
+ special_first_param='work_bound',
+ expect_json=False,
+ )
+
+def get_worker_bound(connection, type_, id_):
+ key = _id_name(type_)
+ out = request(
+ connection, 'get', 'admin/replica_log',
+ params={
+ 'type': type_,
+ key: id_,
+ },
+ special_first_param='bounds',
+ )
+ boto.log.debug('get_worker_bound returned: %r', out)
+ retries = set()
+ for item in out['markers']:
+ names = [retry['name'] for retry in item['items_in_progress']]
+ retries = retries.union(names)
+ return out['marker'], out['oldest_time'], retries
+
+class Zone(object):
+ def __init__(self, zone_info):
+ self.name = zone_info['name']
+ self.is_master = False
+ self.endpoints = [parse_endpoint(e) for e in zone_info['endpoints']]
+ self.log_meta = zone_info['log_meta'] == 'true'
+ self.log_data = zone_info['log_data'] == 'true'
+
+ def __repr__(self):
+ return str(self)
+
+ def __str__(self):
+ return self.name
+
+class Region(object):
+ def __init__(self, region_info):
+ self.name = region_info['key']
+ self.is_master = region_info['val']['is_master'] == 'true'
+ self.zones = {}
+ for zone_info in region_info['val']['zones']:
+ zone = Zone(zone_info)
+ self.zones[zone.name] = zone
+ if zone.name == region_info['val']['master_zone']:
+ zone.is_master = True
+ self.master_zone = zone
+ assert hasattr(self, 'master_zone'), \
+ 'No master zone found for region ' + self.name
+
+ def __repr__(self):
+ return str(self)
+
+ def __str__(self):
+ return str(self.zones.keys())
+
+class RegionMap(object):
+ def __init__(self, region_map):
+ self.regions = {}
+ for region_info in region_map['regions']:
+ region = Region(region_info)
+ self.regions[region.name] = region
+ if region.is_master:
+ self.master_region = region
+ assert hasattr(self, 'master_region'), \
+ 'No master region found in region map'
+
+ def __repr__(self):
+ return str(self)
+
+ def __str__(self):
+ return str(self.regions)
+
+ def find_endpoint(self, endpoint):
+ for region in self.regions.itervalues():
+ for zone in region.zones.itervalues():
+ if endpoint in zone.endpoints or endpoint.zone == zone.name:
+ return region, zone
+ raise ZoneNotFound('%s not found in region map' % endpoint)
+
+def get_region_map(connection):
+ region_map = request(connection, 'get', 'admin/config')
+ return RegionMap(region_map)
+
+def _validate_sync_dest(dest_region, dest_zone):
+ if dest_region.is_master and dest_zone.is_master:
+ raise InvalidZone('destination cannot be master zone of master region')
+
+def _validate_sync_source(src_region, src_zone, dest_region, dest_zone,
+ meta_only):
+ if not src_zone.is_master:
+ raise InvalidZone('source zone %s must be a master zone' % src_zone.name)
+ if (src_region.name == dest_region.name and
+ src_zone.name == dest_zone.name):
+ raise InvalidZone('source and destination must be different zones')
+ if not src_zone.log_meta:
+ raise InvalidZone('source zone %s must have metadata logging enabled' % src_zone.name)
+ if not meta_only and not src_zone.log_data:
+ raise InvalidZone('source zone %s must have data logging enabled' % src_zone.name)
+ if not meta_only and src_region.name != dest_region.name:
+ raise InvalidZone('data sync can only occur between zones in the same region')
+ if not src_zone.endpoints:
+ raise InvalidZone('region map contains no endpoints for default source zone %s' % src_zone.name)
+
+def configure_endpoints(region_map, dest_endpoint, src_endpoint, meta_only):
+ print('region map is: %r' % region_map)
+
+ dest_region, dest_zone = region_map.find_endpoint(dest_endpoint)
+ _validate_sync_dest(dest_region, dest_zone)
+
+ # source may be specified by http endpoint or zone name
+ if src_endpoint.host or src_endpoint.zone:
+ src_region, src_zone = region_map.find_endpoint(src_endpoint)
+ else:
+ # try the master zone in the same region, then the master zone
+ # in the master region
+ try:
+ _validate_sync_source(dest_region, dest_region.master_zone,
+ dest_region, dest_zone, meta_only)
+ src_region, src_zone = dest_region, dest_region.master_zone
+ except InvalidZone as e:
+ log.debug('source region %s zone %s unaccetpable: %s',
+ dest_region.name, dest_region.master_zone.name, e)
+ master_region = region_map.master_region
+ src_region, src_zone = master_region, master_region.master_zone
+
+ _validate_sync_source(src_region, src_zone, dest_region, dest_zone,
+ meta_only)
+
+ # choose a random source endpoint if one wasn't specified
+ if not src_endpoint.host:
+ endpoint = random.choice(src_zone.endpoints)
+ src_endpoint.host = endpoint.host
+ src_endpoint.port = endpoint.port
+ src_endpoint.secure = endpoint.secure
+
+ # fill in region and zone names
+ dest_endpoint.region = dest_region
+ dest_endpoint.zone = dest_zone
+ src_endpoint.region = src_region
+ src_endpoint.zone = src_zone
+
+def connection(endpoint, debug=None):
+ return S3Connection(
+ aws_access_key_id=endpoint.access_key,
+ aws_secret_access_key=endpoint.secret_key,
+ is_secure=endpoint.secure,
+ host=endpoint.host,
+ port=endpoint.port,
+ calling_format=boto.s3.connection.OrdinaryCallingFormat(),
+ debug=debug,
+ )
--- /dev/null
+import logging
+import threading
+import time
+
+from radosgw_agent import client
+
+log = logging.getLogger(__name__)
+
+class LockBroken(Exception):
+ pass
+
+class LockRenewFailed(LockBroken):
+ pass
+
+class LockExpired(LockBroken):
+ pass
+
+class Lock(threading.Thread):
+ """A lock on a shard log that automatically refreshes itself.
+
+ It may be used to lock different shards throughout its lifetime.
+ To lock a new shard, call aquire() with the shard_num desired.
+
+ To release the lock, call release_and_clear(). This will raise an
+ exception if the lock ever failed to be acquired in the timeout
+ period.
+ """
+
+ def __init__(self, conn, type_, locker_id, timeout, zone_id):
+ super(Lock, self).__init__()
+ self.conn = conn
+ self.type = type_
+ self.timeout = timeout
+ self.lock = threading.Lock()
+ self.locker_id = locker_id
+ self.zone_id = zone_id
+ self.shard_num = None
+ self.last_locked = None
+ self.failed = False
+
+ def set_shard(self, shard_num):
+ log.debug('set_shard to %d', shard_num)
+ with self.lock:
+ assert self.shard_num is None, \
+ 'attempted to acquire new lock without releasing old one'
+ self.failed = False
+ self.last_locked = None
+ self.shard_num = shard_num
+
+ def unset_shard(self):
+ log.debug('unset shard')
+ with self.lock:
+ self.shard_num = None
+
+ def acquire(self):
+ """Renew an existing lock, or acquire a new one.
+
+ The old lock must have already been released if shard_num is specified.
+ client.NotFound may be raised if the log contains no entries.
+ """
+ log.debug('acquire lock')
+ with self.lock:
+ self._acquire()
+
+ def _acquire(self):
+ # same as aqcuire() but assumes self.lock is held
+ now = time.time()
+ client.lock_shard(self.conn, self.type, self.shard_num,
+ self.zone_id, self.timeout, self.locker_id)
+ self.last_locked = now
+
+ def release_and_clear(self):
+ """Release the lock currently being held.
+
+ Prevent it from being automatically renewed, and check if there
+ were any errors renewing the current lock or if it expired.
+ If the lock was not sustained, raise LockAcquireFailed or LockExpired.
+ """
+ log.debug('release and clear lock')
+ with self.lock:
+ shard_num = self.shard_num
+ self.shard_num = None
+ diff = time.time() - self.last_locked
+ if diff > self.timeout:
+ msg = 'lock was not renewed in over %0.2f seconds' % diff
+ raise LockExpired(msg)
+ if self.failed:
+ raise LockRenewFailed()
+ try:
+ client.unlock_shard(self.conn, self.type, shard_num,
+ self.zone_id, self.locker_id)
+ except client.HttpError as e:
+ log.warn('failed to unlock shard %d in zone %s: %s',
+ shard_num, self.zone_id, e)
+ self.last_locked = None
+
+ def run(self):
+ while True:
+ with self.lock:
+ if self.shard_num is not None:
+ try:
+ self._acquire()
+ except client.HttpError as e:
+ log.error('locking shard %d in zone %s failed: %s',
+ self.shard_num, self.zone_id, e)
+ self.failed = True
+ time.sleep(0.5 * self.timeout)
--- /dev/null
+import logging
+import multiprocessing
+import time
+
+from radosgw_agent import worker
+from radosgw_agent import client
+
+log = logging.getLogger(__name__)
+
+# the replica log api only supports one entry, and updating it
+# requires sending a daemon id that matches the existing one. This
+# doesn't make a whole lot of sense with the current structure of
+# radosgw-agent, so just use a constant value for the daemon id.
+DAEMON_ID = 'radosgw-agent'
+
+def prepare_sync(syncer, error_delay):
+ """Attempt to prepare a syncer for running a sync.
+
+ :param error_delay: seconds to wait before retrying
+
+ This will retry forever so the sync agent continues if radosgws
+ are unavailable temporarily.
+ """
+ while True:
+ try:
+ syncer.prepare()
+ break
+ except Exception:
+ log.warn('error preparing for sync, will retry. Traceback:',
+ exc_info=True)
+ time.sleep(error_delay)
+
+def incremental_sync(meta_syncer, data_syncer, num_workers, lock_timeout,
+ incremental_sync_delay, metadata_only, error_delay):
+ """Run a continuous incremental sync.
+
+ This will run forever, pausing between syncs by a
+ incremental_sync_delay seconds.
+ """
+ while True:
+ try:
+ meta_syncer.sync(num_workers, lock_timeout)
+ if not metadata_only:
+ data_syncer.sync(num_workers, lock_timeout)
+ except Exception:
+ log.warn('error doing incremental sync, will try again. Traceback:',
+ exc_info=True)
+
+ # prepare data before sleeping due to rgw_log_bucket_window
+ if not metadata_only:
+ prepare_sync(data_syncer, error_delay)
+ log.info('waiting %d seconds until next sync',
+ incremental_sync_delay)
+ time.sleep(incremental_sync_delay)
+ prepare_sync(meta_syncer, error_delay)
+
+class Syncer(object):
+ def __init__(self, src, dest, max_entries, *args, **kwargs):
+ self.src = src
+ self.dest = dest
+ self.src_conn = client.connection(src)
+ self.dest_conn = client.connection(dest)
+ self.daemon_id = DAEMON_ID
+ self.worker_cls = None # filled in by subclass constructor
+ self.num_shards = None
+ self.max_entries = max_entries
+ self.object_sync_timeout = kwargs.get('object_sync_timeout')
+
+ def init_num_shards(self):
+ if self.num_shards is not None:
+ return
+ try:
+ self.num_shards = client.num_log_shards(self.src_conn, self.type)
+ log.debug('%d shards to check', self.num_shards)
+ except Exception:
+ log.error('finding number of shards failed')
+ raise
+
+ def shard_num_for_key(self, key):
+ key = key.encode('utf8')
+ hash_val = 0
+ for char in key:
+ c = ord(char)
+ hash_val = (hash_val + (c << 4) + (c >> 4)) * 11
+ return hash_val % self.num_shards
+
+ def prepare(self):
+ """Setup any state required before syncing starts.
+
+ This must be called before sync().
+ """
+ pass
+
+ def generate_work(self):
+ """Generate items to be place in a queue or processing"""
+ pass
+
+ def wait_until_ready(self):
+ pass
+
+ def complete_item(self, shard_num, retries):
+ """Called when syncing a single item completes successfully"""
+ marker = self.shard_info.get(shard_num)
+ if not marker:
+ return
+ try:
+ data = [dict(name=retry, time=worker.DEFAULT_TIME)
+ for retry in retries]
+ client.set_worker_bound(self.dest_conn,
+ self.type,
+ marker,
+ worker.DEFAULT_TIME,
+ self.daemon_id,
+ shard_num,
+ data)
+ except Exception:
+ log.warn('could not set worker bounds, may repeat some work.'
+ 'Traceback:', exc_info=True)
+
+ def sync(self, num_workers, log_lock_time):
+ workQueue = multiprocessing.Queue()
+ resultQueue = multiprocessing.Queue()
+
+ processes = [self.worker_cls(workQueue,
+ resultQueue,
+ log_lock_time,
+ self.src,
+ self.dest,
+ daemon_id=self.daemon_id,
+ max_entries=self.max_entries,
+ object_sync_timeout=self.object_sync_timeout,
+ )
+ for i in xrange(num_workers)]
+ for process in processes:
+ process.daemon = True
+ process.start()
+
+ self.wait_until_ready()
+
+ log.info('Starting sync')
+ # enqueue the shards to be synced
+ num_items = 0
+ for item in self.generate_work():
+ num_items += 1
+ workQueue.put(item)
+
+ # add a poison pill for each worker
+ for i in xrange(num_workers):
+ workQueue.put(None)
+
+ # pull the results out as they are produced
+ retries = {}
+ for i in xrange(num_items):
+ result, item = resultQueue.get()
+ shard_num, retries = item
+ if result == worker.RESULT_SUCCESS:
+ log.debug('synced item %r successfully', item)
+ self.complete_item(shard_num, retries)
+ else:
+ log.error('error syncing shard %d', shard_num)
+ retries.append(shard_num)
+
+ log.info('%d/%d items processed', i + 1, num_items)
+ if retries:
+ log.error('Encountered errors syncing these %d shards: %r',
+ len(retries), retries)
+
+
+class IncrementalSyncer(Syncer):
+
+ def get_worker_bound(self, shard_num):
+ try:
+ marker, timestamp, retries = client.get_worker_bound(
+ self.dest_conn,
+ self.type,
+ shard_num)
+ log.debug('oldest marker and time for shard %d are: %r %r',
+ shard_num, marker, timestamp)
+ log.debug('%d items to retrie are: %r', len(retries), retries)
+ except client.NotFound:
+ # if no worker bounds have been set, start from the beginning
+ marker, retries = '', []
+ return marker, retries
+
+ def get_log_entries(self, shard_num, marker):
+ try:
+ result = client.get_log(self.src_conn, self.type,
+ marker, self.max_entries,
+ shard_num)
+ last_marker = result['marker']
+ log_entries = result['entries']
+ if len(log_entries) == self.max_entries:
+ log.warn('shard %d log has fallen behind - log length >= %d',
+ shard_num, self.max_entries)
+ except client.NotFound:
+ # no entries past this marker yet, but we my have retries
+ last_marker = ''
+ log_entries = []
+ return last_marker, log_entries
+
+ def prepare(self):
+ self.init_num_shards()
+
+ self.shard_info = {}
+ self.shard_work = {}
+ for shard_num in xrange(self.num_shards):
+ marker, retries = self.get_worker_bound(shard_num)
+ last_marker, log_entries = self.get_log_entries(shard_num, marker)
+ self.shard_work[shard_num] = log_entries, retries
+ self.shard_info[shard_num] = last_marker
+
+ self.prepared_at = time.time()
+
+ def generate_work(self):
+ return self.shard_work.iteritems()
+
+
+class MetaSyncerInc(IncrementalSyncer):
+
+ def __init__(self, *args, **kwargs):
+ super(MetaSyncerInc, self).__init__(*args, **kwargs)
+ self.worker_cls = worker.MetadataWorkerIncremental
+ self.type = 'metadata'
+
+
+class DataSyncerInc(IncrementalSyncer):
+
+ def __init__(self, *args, **kwargs):
+ super(DataSyncerInc, self).__init__(*args, **kwargs)
+ self.worker_cls = worker.DataWorkerIncremental
+ self.type = 'data'
+ self.rgw_data_log_window = kwargs.get('rgw_data_log_window', 30)
+
+ def wait_until_ready(self):
+ log.info('waiting to make sure bucket log is consistent')
+ while time.time() < self.prepared_at + self.rgw_data_log_window:
+ time.sleep(1)
+
+
+class DataSyncerFull(Syncer):
+
+ def __init__(self, *args, **kwargs):
+ super(DataSyncerFull, self).__init__(*args, **kwargs)
+ self.worker_cls = worker.DataWorkerFull
+ self.type = 'data'
+ self.rgw_data_log_window = kwargs.get('rgw_data_log_window', 30)
+
+ def prepare(self):
+ self.init_num_shards()
+
+ # save data log markers for each shard
+ self.shard_info = {}
+ for shard in xrange(self.num_shards):
+ info = client.get_log_info(self.src_conn, 'data', shard)
+ # setting an empty marker returns an error
+ if info['marker']:
+ self.shard_info[shard] = info['marker']
+
+ # get list of buckets after getting any markers to avoid skipping
+ # entries added before we got the marker info
+ buckets = client.get_bucket_list(self.src_conn)
+
+ self.prepared_at = time.time()
+
+ self.buckets_by_shard = {}
+ for bucket in buckets:
+ shard = self.shard_num_for_key(bucket)
+ self.buckets_by_shard.setdefault(shard, [])
+ self.buckets_by_shard[shard].append(bucket)
+
+ def generate_work(self):
+ return self.buckets_by_shard.iteritems()
+
+ def wait_until_ready(self):
+ log.info('waiting to make sure bucket log is consistent')
+ while time.time() < self.prepared_at + self.rgw_data_log_window:
+ time.sleep(1)
+
+
+class MetaSyncerFull(Syncer):
+ def __init__(self, *args, **kwargs):
+ super(MetaSyncerFull, self).__init__(*args, **kwargs)
+ self.worker_cls = worker.MetadataWorkerFull
+ self.type = 'metadata'
+
+ def prepare(self):
+ try:
+ self.sections = client.get_metadata_sections(self.src_conn)
+ except client.HttpError as e:
+ log.error('Error listing metadata sections: %s', e)
+ raise
+
+ # grab the lastest shard markers and timestamps before we sync
+ self.shard_info = {}
+ self.init_num_shards()
+ for shard_num in xrange(self.num_shards):
+ info = client.get_log_info(self.src_conn, 'metadata', shard_num)
+ # setting an empty marker returns an error
+ if info['marker']:
+ self.shard_info[shard_num] = info['marker']
+
+ self.metadata_by_shard = {}
+ for section in self.sections:
+ try:
+ for key in client.list_metadata_keys(self.src_conn, section):
+ shard = self.shard_num_for_key(section + ':' + key)
+ self.metadata_by_shard.setdefault(shard, [])
+ self.metadata_by_shard[shard].append((section, key))
+ except client.NotFound:
+ # no keys of this type exist
+ continue
+ except client.HttpError as e:
+ log.error('Error listing metadata for section %s: %s',
+ section, e)
+ raise
+
+ def generate_work(self):
+ return self.metadata_by_shard.iteritems()
--- /dev/null
+import py.test
+
+from radosgw_agent import client
+
+REGION_MAP = {
+ "regions": [
+ {
+ "val": {
+ "zones": [
+ {
+ "endpoints": [
+ "http://vit:8001/"
+ ],
+ "log_data": "true",
+ "log_meta": "true",
+ "name": "skinny-1"
+ },
+ {
+ "endpoints": [
+ "http://vit:8002/"
+ ],
+ "log_data": "false",
+ "log_meta": "false",
+ "name": "skinny-2"
+ }
+ ],
+ "name": "skinny",
+ "default_placement": "",
+ "master_zone": "skinny-1",
+ "api_name": "slim",
+ "placement_targets": [],
+ "is_master": "true",
+ "endpoints": [
+ "http://skinny:80/"
+ ]
+ },
+ "key": "skinny"
+ },
+ {
+ "val": {
+ "zones": [
+ {
+ "endpoints": [
+ "http://vit:8003/"
+ ],
+ "log_data": "false",
+ "log_meta": "false",
+ "name": "swab-2"
+ },
+ {
+ "endpoints": [
+ "http://vit:8004/"
+ ],
+ "log_data": "false",
+ "log_meta": "false",
+ "name": "swab-3"
+ },
+ {
+ "endpoints": [
+ "http://vit:8000/"
+ ],
+ "log_data": "true",
+ "log_meta": "true",
+ "name": "swab-1"
+ }
+ ],
+ "name": "swab",
+ "default_placement": "",
+ "master_zone": "swab-1",
+ "api_name": "shady",
+ "placement_targets": [],
+ "is_master": "false",
+ "endpoints": [
+ "http://vit:8000/"
+ ]
+ },
+ "key": "swab"
+ },
+ {
+ "val": {
+ "zones": [
+ {
+ "endpoints": [
+ "http://ro:80/"
+ ],
+ "log_data": "false",
+ "log_meta": "false",
+ "name": "ro-1"
+ },
+ {
+ "endpoints": [
+ "http://ro:8080/"
+ ],
+ "log_data": "false",
+ "log_meta": "false",
+ "name": "ro-2"
+ },
+ ],
+ "name": "readonly",
+ "default_placement": "",
+ "master_zone": "ro-1",
+ "api_name": "readonly",
+ "placement_targets": [],
+ "is_master": "false",
+ "endpoints": [
+ "http://ro:80/",
+ "http://ro:8080/"
+ ]
+ },
+ "key": "readonly"
+ },
+ {
+ "val": {
+ "zones": [
+ {
+ "endpoints": [
+ "http://meta:80/"
+ ],
+ "log_data": "false",
+ "log_meta": "true",
+ "name": "meta-1"
+ },
+ {
+ "endpoints": [
+ "http://meta:8080/"
+ ],
+ "log_data": "false",
+ "log_meta": "false",
+ "name": "meta-2"
+ },
+ ],
+ "name": "metaonly",
+ "default_placement": "",
+ "master_zone": "meta-1",
+ "api_name": "metaonly",
+ "placement_targets": [],
+ "is_master": "false",
+ "endpoints": [
+ "http://meta:80/",
+ "http://meta:8080/"
+ ]
+ },
+ "key": "metaonly"
+ }
+ ],
+ "master_region": "skinny"
+ }
+
+def test_endpoint_default_port():
+ endpoint = client.Endpoint('example.org', None, True)
+ assert endpoint.port == 443
+ endpoint = client.Endpoint('example.org', None, False)
+ assert endpoint.port == 80
+
+def test_endpoint_port_specified():
+ endpoint = client.Endpoint('example.org', 80, True)
+ assert endpoint.port == 80
+ endpoint = client.Endpoint('example.org', 443, True)
+ assert endpoint.port == 443
+
+def test_endpoint_equality():
+ default_port = client.Endpoint('a.org', None, True)
+ secure = client.Endpoint('a.org', 443, True)
+ insecure = client.Endpoint('a.org', 80, False)
+ assert default_port == secure
+ assert secure == insecure
+ assert insecure == default_port
+
+def test_endpoint_inequality():
+ base = client.Endpoint('a.org', 80, True)
+ diff_host = client.Endpoint('b.org', 80, True)
+ diff_port = client.Endpoint('a.org', 81, True)
+ insecure = client.Endpoint('a.org', 8080, False)
+ assert base != diff_host
+ assert base != diff_port
+ assert base != insecure
+
+def test_parse_endpoint():
+ endpoints = {
+ 'http://example.org': ('example.org', 80, False),
+ 'https://example.org': ('example.org', 443, True),
+ 'https://example.org:8080': ('example.org', 8080, True),
+ 'https://example.org:8080/': ('example.org', 8080, True),
+ 'http://example.org:81/a/b/c?b#d': ('example.org', 81, False),
+ }
+ for url, (host, port, secure) in endpoints.iteritems():
+ endpoint = client.parse_endpoint(url)
+ assert endpoint.port == port
+ assert endpoint.host == host
+ assert endpoint.secure == secure
+
+def test_parse_endpoint_bad_input():
+ with py.test.raises(client.InvalidProtocol):
+ client.parse_endpoint('ftp://example.com')
+ with py.test.raises(client.InvalidHost):
+ client.parse_endpoint('http://:80/')
+
+def _test_configure_endpoints(dest_url, dest_region, dest_zone,
+ expected_src_url, expected_src_region,
+ expected_src_zone, specified_src_url=None,
+ meta_only=False):
+ dest = client.parse_endpoint(dest_url)
+ if specified_src_url is not None:
+ src = client.parse_endpoint(specified_src_url)
+ else:
+ src = client.Endpoint(None, None, None)
+ region_map = client.RegionMap(REGION_MAP)
+ client.configure_endpoints(region_map, dest, src, meta_only)
+ assert dest.region.name == dest_region
+ assert dest.zone.name == dest_zone
+ assert src == client.parse_endpoint(expected_src_url)
+ assert src.region.name == expected_src_region
+ assert src.zone.name == expected_src_zone
+
+def test_configure_endpoints_2nd_region_master_zone_meta():
+ _test_configure_endpoints('http://vit:8000', 'swab', 'swab-1',
+ 'http://vit:8001', 'skinny', 'skinny-1',
+ meta_only=True)
+
+def test_configure_endpoints_2nd_region_master_zone_data():
+ with py.test.raises(client.InvalidZone):
+ _test_configure_endpoints('http://vit:8000', 'swab', 'swab-1',
+ 'http://vit:8001', 'skinny', 'skinny-1',
+ meta_only=False)
+
+def test_configure_endpoints_master_region_2nd_zone():
+ _test_configure_endpoints('http://vit:8002', 'skinny', 'skinny-2',
+ 'http://vit:8001', 'skinny', 'skinny-1')
+
+def test_configure_endpoints_2nd_region_2nd_zone():
+ _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+ 'http://vit:8000', 'swab', 'swab-1')
+
+def test_configure_endpoints_2nd_region_readonly_meta():
+ _test_configure_endpoints('http://ro:8080', 'readonly', 'ro-2',
+ 'http://vit:8001', 'skinny', 'skinny-1',
+ meta_only=True)
+
+def test_configure_endpoints_2nd_region_readonly_data():
+ with py.test.raises(client.InvalidZone):
+ _test_configure_endpoints('http://ro:8080', 'readonly', 'ro-2',
+ 'http://vit:8001', 'skinny', 'skinny-1',
+ meta_only=False)
+
+def test_configure_endpoints_2nd_region_metaonly_meta():
+ _test_configure_endpoints('http://meta:8080', 'metaonly', 'meta-2',
+ 'http://meta:80', 'metaonly', 'meta-1',
+ meta_only=True)
+
+def test_configure_endpoints_2nd_region_metaonly_data():
+ with py.test.raises(client.InvalidZone):
+ _test_configure_endpoints('http://meta:8080', 'metaonly', 'meta-2',
+ 'http://vit:8001', 'skinny', 'skinny-1',
+ meta_only=False)
+
+def test_configure_endpoints_master_region_master_zone():
+ with py.test.raises(client.InvalidZone):
+ _test_configure_endpoints('http://vit:8001', 'skinny', 'skinny-1',
+ 'http://vit:8001', 'skinny', 'skinny-1')
+
+def test_configure_endpoints_specified_src_same_region():
+ _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+ 'http://vit:8000', 'swab', 'swab-1',
+ 'http://vit:8000')
+
+def test_configure_endpoints_specified_src_master_region_meta():
+ _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+ 'http://vit:8001', 'skinny', 'skinny-1',
+ 'http://vit:8001', meta_only=True)
+
+def test_configure_endpoints_specified_src_master_region_data():
+ with py.test.raises(client.InvalidZone):
+ _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+ 'http://vit:8001', 'skinny', 'skinny-1',
+ 'http://vit:8001', meta_only=False)
+
+def test_configure_endpoints_bad_src_same_region():
+ with py.test.raises(client.InvalidZone):
+ _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+ 'http://vit:8004', 'swab', 'swab-3',
+ 'http://vit:8004')
+
+def test_configure_endpoints_bad_src_master_region():
+ with py.test.raises(client.InvalidZone):
+ _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2',
+ 'http://vit:8002', 'skinny', 'skinny-2',
+ 'http://vit:8002')
+
+def test_configure_endpoints_bad_src_same_zone():
+ with py.test.raises(client.InvalidZone):
+ _test_configure_endpoints('http://vit:8000', 'swab', 'swab-1',
+ 'http://vit:8000', 'swab', 'swab-1',
+ 'http://vit:8000')
+
+def test_configure_endpoints_specified_nonexistent_src():
+ with py.test.raises(client.ZoneNotFound):
+ _test_configure_endpoints('http://vit:8005', 'skinny', 'skinny-1',
+ 'http://vit:8001', 'skinny', 'skinny-1',
+ 'http://vit:80')
+
+def test_configure_endpoints_unknown_zone():
+ with py.test.raises(client.ZoneNotFound):
+ _test_configure_endpoints('http://vit:8005', 'skinny', 'skinny-1',
+ 'http://vit:8001', 'skinny', 'skinny-1')
--- /dev/null
+from collections import namedtuple
+import logging
+import multiprocessing
+import os
+import socket
+import time
+
+from radosgw_agent import client
+from radosgw_agent import lock
+
+log = logging.getLogger(__name__)
+
+RESULT_SUCCESS = 0
+RESULT_ERROR = 1
+
+class SkipShard(Exception):
+ pass
+
+class SyncError(Exception):
+ pass
+class SyncTimedOut(SyncError):
+ pass
+class SyncFailed(SyncError):
+ pass
+
+DEFAULT_TIME = '1970-01-01 00:00:00'
+
+class Worker(multiprocessing.Process):
+ """sync worker to run in its own process"""
+
+ def __init__(self, work_queue, result_queue, log_lock_time,
+ src, dest, **kwargs):
+ super(Worker, self).__init__()
+ self.src = src
+ self.dest = dest
+ self.work_queue = work_queue
+ self.result_queue = result_queue
+ self.log_lock_time = log_lock_time
+ self.lock = None
+
+ self.local_lock_id = socket.gethostname() + ':' + str(os.getpid())
+
+ # construct the two connection objects
+ self.src_conn = client.connection(src)
+ self.dest_conn = client.connection(dest)
+
+ def prepare_lock(self):
+ assert self.lock is None
+ self.lock = lock.Lock(self.dest_conn, self.type, self.local_lock_id,
+ self.log_lock_time, self.dest.zone.name)
+ self.lock.daemon = True
+ self.lock.start()
+
+ def lock_shard(self, shard_num):
+ result = shard_num, []
+ try:
+ self.lock.set_shard(shard_num)
+ self.lock.acquire()
+ except client.NotFound:
+ # no log means nothing changed this shard yet
+ self.lock.unset_shard()
+ self.result_queue.put((RESULT_SUCCESS, result))
+ raise SkipShard('no log for shard')
+ except Exception:
+ log.warn('error locking shard %d log, '
+ ' skipping for now. Traceback: ',
+ shard_num, exc_info=True)
+ self.lock.unset_shard()
+ self.result_queue.put((RESULT_ERROR, result))
+ raise SkipShard()
+
+ def unlock_shard(self):
+ try:
+ self.lock.release_and_clear()
+ except lock.LockBroken as e:
+ log.warn('work may be duplicated: %s', e)
+ except Exception as e:
+ log.warn('error unlocking log, continuing anyway '
+ 'since lock will timeout. Traceback:', exc_info=True)
+
+ def set_bound(self, key, marker, retries, type_=None):
+ # api doesn't allow setting a bound with a blank marker
+ if marker:
+ if type_ is None:
+ type_ = self.type
+ try:
+ data = [dict(name=item, time=DEFAULT_TIME) for item in retries]
+ client.set_worker_bound(self.dest_conn,
+ type_,
+ marker,
+ DEFAULT_TIME,
+ self.daemon_id,
+ key,
+ data=data)
+ return RESULT_SUCCESS
+ except Exception:
+ log.warn('error setting worker bound for key "%s",'
+ ' may duplicate some work later. Traceback:', key,
+ exc_info=True)
+ return RESULT_ERROR
+
+MetadataEntry = namedtuple('MetadataEntry',
+ ['section', 'name', 'marker', 'timestamp'])
+
+def _meta_entry_from_json(entry):
+ return MetadataEntry(
+ entry['section'],
+ entry['name'],
+ entry['id'],
+ entry['timestamp'],
+ )
+
+BucketIndexEntry = namedtuple('BucketIndexEntry',
+ ['object', 'marker', 'timestamp'])
+
+def _bi_entry_from_json(entry):
+ return BucketIndexEntry(
+ entry['object'],
+ entry['op_id'],
+ entry['timestamp'],
+ )
+
+class IncrementalMixin(object):
+ """This defines run() and get_and_process_entries() for incremental sync.
+
+ These are the same for data and metadata sync, so share their
+ implementation here.
+ """
+
+ def run(self):
+ self.prepare_lock()
+ while True:
+ item = self.work_queue.get()
+ if item is None:
+ log.info('process %s is done. Exiting', self.ident)
+ break
+
+ shard_num, (log_entries, retries) = item
+
+ log.info('%s is processing shard number %d',
+ self.ident, shard_num)
+
+ # first, lock the log
+ try:
+ self.lock_shard(shard_num)
+ except SkipShard:
+ continue
+
+ result = RESULT_SUCCESS
+ try:
+ new_retries = self.sync_entries(log_entries, retries)
+ except Exception:
+ log.exception('syncing entries for shard %d failed',
+ shard_num)
+ result = RESULT_ERROR
+ new_retries = []
+
+ # finally, unlock the log
+ self.unlock_shard()
+ self.result_queue.put((result, (shard_num, new_retries)))
+ log.info('finished processing shard %d', shard_num)
+
+
+class DataWorker(Worker):
+
+ def __init__(self, *args, **kwargs):
+ super(DataWorker, self).__init__(*args, **kwargs)
+ self.type = 'data'
+ self.op_id = 0
+ self.object_sync_timeout = kwargs.get('object_sync_timeout', 60 * 60 * 60)
+ self.daemon_id = kwargs['daemon_id']
+
+ def sync_object(self, bucket, obj):
+ log.debug('sync_object %s/%s', bucket, obj)
+ self.op_id += 1
+ local_op_id = self.local_lock_id + ':' + str(self.op_id)
+ try:
+ found = True
+ until = time.time() + self.object_sync_timeout
+ client.sync_object_intra_region(self.dest_conn, bucket, obj,
+ self.src.zone.name,
+ self.daemon_id,
+ local_op_id)
+ except client.NotFound:
+ found = False
+ log.debug('"%s/%s" not found on master, deleting from secondary',
+ bucket, obj)
+ try:
+ client.delete_object(self.dest_conn, bucket, obj)
+ except client.NotFound:
+ # Since we were trying to delete the object, just return
+ return
+ except Exception:
+ msg = 'could not delete "%s/%s" from secondary' % (bucket, obj)
+ log.exception(msg)
+ raise SyncFailed(msg)
+ except SyncFailed:
+ raise
+ except Exception as e:
+ log.debug('exception during sync: %s', e)
+ if found:
+ self.wait_for_object(bucket, obj, until, local_op_id)
+ # TODO: clean up old op states
+ try:
+ if found:
+ client.remove_op_state(self.dest_conn, self.daemon_id,
+ local_op_id, bucket, obj)
+ except Exception:
+ log.exception('could not remove op state for daemon "%s" op_id %s',
+ self.daemon_id, local_op_id)
+
+ def wait_for_object(self, bucket, obj, until, local_op_id):
+ while time.time() < until:
+ try:
+ state = client.get_op_state(self.dest_conn,
+ self.daemon_id,
+ local_op_id,
+ bucket, obj)
+ log.debug('op state is %s', state)
+ state = state[0]['state']
+ if state == 'complete':
+ return
+ elif state != 'in-progress':
+ raise SyncFailed('state is {0}'.format(state))
+ time.sleep(1)
+ except SyncFailed:
+ raise
+ except Exception as e:
+ log.debug('error geting op state: %s', e, exc_info=True)
+ time.sleep(1)
+ # timeout expired
+ raise SyncTimedOut()
+
+ def get_bucket_instance(self, bucket):
+ metadata = client.get_metadata(self.src_conn, 'bucket', bucket)
+ return bucket + ':' + metadata['data']['bucket']['bucket_id']
+
+ def get_bucket(self, bucket_instance):
+ return bucket_instance.split(':', 1)[0]
+
+ def sync_bucket(self, bucket, objects):
+ log.info('syncing bucket "%s"', bucket)
+ retry_objs = []
+ count = 0
+ for obj in objects:
+ count += 1
+ # sync each object
+ log.debug('syncing object "%s/%s"', bucket, obj),
+ try:
+ self.sync_object(bucket, obj)
+ except SyncError as err:
+ log.error('failed to sync object %s/%s: %s',
+ bucket, obj, err)
+ retry_objs.append(obj)
+
+ log.debug('bucket {bucket} has {num_objects} object'.format(
+ bucket=bucket, num_objects=count))
+ if retry_objs:
+ log.debug('these objects failed to be synced and will be during '
+ 'the next incremental sync: %s', retry_objs)
+
+ return retry_objs
+
+
+class DataWorkerIncremental(IncrementalMixin, DataWorker):
+
+ def __init__(self, *args, **kwargs):
+ super(DataWorkerIncremental, self).__init__(*args, **kwargs)
+ self.max_entries = kwargs['max_entries']
+
+ def get_bucket_instance_entries(self, marker, instance):
+ entries = []
+ while True:
+ try:
+ log_entries = client.get_log(self.src_conn, 'bucket-index',
+ marker, self.max_entries, instance)
+ except client.NotFound:
+ log_entries = []
+
+ log.debug('bucket instance "%s" has %d entries after "%s"', instance,
+ len(log_entries), marker)
+
+ try:
+ entries += [_bi_entry_from_json(entry) for entry in log_entries]
+ except KeyError:
+ log.error('log missing key is: %s', log_entries)
+ raise
+
+ if entries:
+ marker = entries[-1].marker
+ else:
+ marker = ''
+
+ if len(log_entries) < self.max_entries:
+ break
+ return marker, entries
+
+ def inc_sync_bucket_instance(self, instance, marker, timestamp, retries):
+ max_marker, entries = self.get_bucket_instance_entries(marker, instance)
+ objects = set([entry.object for entry in entries])
+ bucket = self.get_bucket(instance)
+ new_retries = self.sync_bucket(bucket, objects.union(retries))
+
+ result = self.set_bound(instance, max_marker, new_retries,
+ 'bucket-index')
+ if new_retries:
+ result = RESULT_ERROR
+ return result
+
+ def sync_entries(self, log_entries, retries):
+ try:
+ bucket_instances = set([entry['key'] for entry in log_entries])
+ except KeyError:
+ log.error('log containing bad key is: %s', log_entries)
+ raise
+
+ new_retries = []
+ for bucket_instance in bucket_instances.union(retries):
+ try:
+ marker, timestamp, retries = client.get_worker_bound(
+ self.dest_conn,
+ 'bucket-index',
+ bucket_instance)
+ except client.NotFound:
+ log.debug('no worker bound found for bucket instance "%s"',
+ bucket_instance)
+ marker, timestamp, retries = '', DEFAULT_TIME, []
+ try:
+ sync_result = self.inc_sync_bucket_instance(bucket_instance,
+ marker,
+ timestamp,
+ retries)
+ except Exception as e:
+ log.warn('error syncing bucket instance "%s": %s',
+ bucket_instance, e, exc_info=True)
+ sync_result = RESULT_ERROR
+ if sync_result == RESULT_ERROR:
+ new_retries.append(bucket_instance)
+
+ return new_retries
+
+class DataWorkerFull(DataWorker):
+
+ def full_sync_bucket(self, bucket):
+ try:
+ instance = self.get_bucket_instance(bucket)
+ try:
+ marker = client.get_log_info(self.src_conn, 'bucket-index',
+ instance)['max_marker']
+ except client.NotFound:
+ marker = ''
+ log.debug('bucket instance is "%s" with marker %s', instance, marker)
+ # nothing to do for this bucket
+ if not marker:
+ return True
+
+ objects = client.list_objects_in_bucket(self.src_conn, bucket)
+ if not objects:
+ return True
+ except Exception as e:
+ log.error('error preparing for full sync of bucket "%s": %s',
+ bucket, e)
+ return False
+
+ retries = self.sync_bucket(bucket, objects)
+
+ result = self.set_bound(instance, marker, retries, 'bucket-index')
+ return not retries and result == RESULT_SUCCESS
+
+ def run(self):
+ self.prepare_lock()
+ while True:
+ item = self.work_queue.get()
+ if item is None:
+ log.info('No more entries in queue, exiting')
+ break
+
+ shard_num, buckets = item
+
+ # first, lock the log
+ try:
+ self.lock_shard(shard_num)
+ except SkipShard:
+ continue
+
+ # attempt to sync each bucket, add to a list to retry
+ # during incremental sync if sync fails
+ retry_buckets = []
+ for bucket in buckets:
+ if not self.full_sync_bucket(bucket):
+ retry_buckets.append(bucket)
+
+ # unlock shard and report buckets to retry during incremental sync
+ self.unlock_shard()
+ self.result_queue.put((RESULT_SUCCESS, (shard_num, retry_buckets)))
+ log.info('finished syncing shard %d', shard_num)
+ log.info('incremental sync will need to retry buckets: %s',
+ retry_buckets)
+
+class MetadataWorker(Worker):
+
+ def __init__(self, *args, **kwargs):
+ super(MetadataWorker, self).__init__(*args, **kwargs)
+ self.type = 'metadata'
+
+ def sync_meta(self, section, name):
+ log.debug('syncing metadata type %s key "%s"', section, name)
+ try:
+ metadata = client.get_metadata(self.src_conn, section, name)
+ except client.NotFound:
+ log.debug('%s "%s" not found on master, deleting from secondary',
+ section, name)
+ try:
+ client.delete_metadata(self.dest_conn, section, name)
+ except client.NotFound:
+ # Since this error is handled appropriately, return success
+ return RESULT_SUCCESS
+ except Exception as e:
+ log.warn('error getting metadata for %s "%s": %s',
+ section, name, e, exc_info=True)
+ return RESULT_ERROR
+ else:
+ try:
+ client.update_metadata(self.dest_conn, section, name, metadata)
+ return RESULT_SUCCESS
+ except Exception as e:
+ log.warn('error updating metadata for %s "%s": %s',
+ section, name, e, exc_info=True)
+ return RESULT_ERROR
+
+class MetadataWorkerIncremental(IncrementalMixin, MetadataWorker):
+
+ def __init__(self, *args, **kwargs):
+ super(MetadataWorkerIncremental, self).__init__(*args, **kwargs)
+
+ def sync_entries(self, log_entries, retries):
+ try:
+ entries = [_meta_entry_from_json(entry) for entry in log_entries]
+ except KeyError:
+ log.error('log containing bad key is: %s', log_entries)
+ raise
+
+ new_retries = []
+ mentioned = set([(entry.section, entry.name) for entry in entries])
+ split_retries = [tuple(entry.split('/', 1)) for entry in retries]
+ for section, name in mentioned.union(split_retries):
+ sync_result = self.sync_meta(section, name)
+ if sync_result == RESULT_ERROR:
+ new_retries.append(section + '/' + name)
+
+ return new_retries
+
+class MetadataWorkerFull(MetadataWorker):
+
+ def empty_result(self, shard):
+ return shard, []
+
+ def run(self):
+ self.prepare_lock()
+ while True:
+ item = self.work_queue.get()
+ if item is None:
+ log.info('No more entries in queue, exiting')
+ break
+
+ log.debug('syncing item "%s"', item)
+
+ shard_num, metadata = item
+
+ # first, lock the log
+ try:
+ self.lock_shard(shard_num)
+ except SkipShard:
+ continue
+
+ # attempt to sync each bucket, add to a list to retry
+ # during incremental sync if sync fails
+ retries = []
+ for section, name in metadata:
+ try:
+ self.sync_meta(section, name)
+ except Exception as e:
+ log.warn('could not sync %s "%s", saving for retry: %s',
+ section, name, e, exc_info=True)
+ retries.append(section + '/' + name)
+
+ # unlock shard and report buckets to retry during incremental sync
+ self.unlock_shard()
+ self.result_queue.put((RESULT_SUCCESS, (shard_num, retries)))
+ log.info('finished syncing shard %d', shard_num)
+ log.info('incremental sync will need to retry items: %s',
+ retries)
--- /dev/null
+pytest >=2.1.3
+mock >=1.0
+tox >=1.2
--- /dev/null
+argparse
+boto >=2.2.2,<3.0.0
+requests >=1.2.1
+PyYAML
--- /dev/null
+[bdist_rpm]
+requires = python-argparse,PyYAML,python-boto >= 2.2.2,python-boto < 3.0.0 python-requests
--- /dev/null
+#!/usr/bin/python
+from setuptools import setup, find_packages
+import sys
+
+
+install_requires = []
+pyversion = sys.version_info[:2]
+if pyversion < (2, 7) or (3, 0) <= pyversion <= (3, 1):
+ install_requires.append('argparse')
+
+setup(
+ name='radosgw-agent',
+ version='1.1',
+ packages=find_packages(),
+
+ author='Josh Durgin',
+ author_email='josh.durgin@inktank.com',
+ description='Synchronize users and data between radosgw clusters',
+ license='MIT',
+ keywords='radosgw ceph radosgw-agent',
+ url="https://github.com/ceph/radosgw-agent",
+
+ install_requires=[
+ 'setuptools',
+ 'boto >=2.2.2,<3.0.0',
+ 'requests >=1.2.1',
+ ] + install_requires,
+
+ test_requires=[
+ 'pytest >=2.1.3',
+ 'mock >=1.0',
+ ],
+
+ entry_points={
+ 'console_scripts': [
+ 'radosgw-agent = radosgw_agent.cli:main',
+ ],
+ },
+ )
--- /dev/null
+[tox]
+envlist = py26
+
+[testenv]
+deps=
+ pytest
+ mock
+commands=py.test -s -v {posargs:radosgw_agent/tests}