From aeb553d84465e0b52c41ce67b62b7f74fed8bb2c Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Wed, 19 Mar 2014 02:27:32 -0700 Subject: [PATCH] packaging: add dependency on python-requests Signed-off-by: Josh Durgin --- .gitignore | 35 ++ LICENSE | 19 ++ README.rst | 3 + bootstrap | 42 +++ debian/changelog | 11 + debian/compat | 1 + debian/control | 19 ++ debian/copyright | 3 + debian/rules | 8 + debian/source/format | 1 + radosgw-agent.spec | 41 +++ radosgw_agent/__init__.py | 0 radosgw_agent/cli.py | 325 +++++++++++++++++++ radosgw_agent/client.py | 461 +++++++++++++++++++++++++++ radosgw_agent/lock.py | 107 +++++++ radosgw_agent/sync.py | 318 +++++++++++++++++++ radosgw_agent/tests/__init__.py | 0 radosgw_agent/tests/test_client.py | 304 ++++++++++++++++++ radosgw_agent/worker.py | 492 +++++++++++++++++++++++++++++ requirements-dev.txt | 3 + requirements.txt | 4 + setup.cfg | 2 + setup.py | 39 +++ tox.ini | 8 + 24 files changed, 2246 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.rst create mode 100755 bootstrap create mode 100644 debian/changelog create mode 100644 debian/compat create mode 100644 debian/control create mode 100644 debian/copyright create mode 100755 debian/rules create mode 100644 debian/source/format create mode 100644 radosgw-agent.spec create mode 100644 radosgw_agent/__init__.py create mode 100644 radosgw_agent/cli.py create mode 100644 radosgw_agent/client.py create mode 100644 radosgw_agent/lock.py create mode 100644 radosgw_agent/sync.py create mode 100644 radosgw_agent/tests/__init__.py create mode 100644 radosgw_agent/tests/test_client.py create mode 100644 radosgw_agent/worker.py create mode 100644 requirements-dev.txt create mode 100644 requirements.txt create mode 100644 setup.cfg create mode 100644 setup.py create mode 100644 tox.ini diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d2d6f36 --- /dev/null +++ b/.gitignore @@ -0,0 +1,35 @@ +*.py[cod] + +# C extensions +*.so + +# Packages +*.egg +*.egg-info +dist +build +eggs +parts +bin +var +sdist +develop-eggs +.installed.cfg +lib +lib64 + +# Installer logs +pip-log.txt + +# Unit test / coverage reports +.coverage +.tox +nosetests.xml + +# Translations +*.mo + +# Mr Developer +.mr.developer.cfg +.project +.pydevproject diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..6062a74 --- /dev/null +++ b/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2013 Inktank Storage, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..656bd71 --- /dev/null +++ b/README.rst @@ -0,0 +1,3 @@ +==================================================================== +radosgw-agent -- synchronize data and users between radosgw clusters +==================================================================== diff --git a/bootstrap b/bootstrap new file mode 100755 index 0000000..6dd1a2d --- /dev/null +++ b/bootstrap @@ -0,0 +1,42 @@ +#!/bin/sh +set -e + +if command -v lsb_release >/dev/null 2>&1; then + case "$(lsb_release --id --short)" in + Ubuntu|Debian) + for package in python-virtualenv; do + if [ "$(dpkg --status -- $package 2>/dev/null|sed -n 's/^Status: //p')" != "install ok installed" ]; then + # add a space after old values + missing="${missing:+$missing }$package" + fi + done + if [ -n "$missing" ]; then + echo "$0: missing required packages, please install them:" 1>&2 + echo " sudo apt-get install $missing" + exit 1 + fi + ;; + esac +else + if [ -f /etc/redhat-release ]; then + case "$(cat /etc/redhat-release | awk '{print $1}')" in + CentOS) + for package in python-virtualenv; do + if [ "$(rpm -qa $package 2>/dev/null)" == "" ]; then + missing="${missing:+$missing }$package" + fi + done + if [ -n "$missing" ]; then + echo "$0: missing required packages, please install them:" 1>&2 + echo " sudo yum install $missing" + exit 1 + fi + ;; + esac + fi +fi + +test -d virtualenv || virtualenv virtualenv +./virtualenv/bin/python setup.py develop +./virtualenv/bin/pip install -r requirements.txt -r requirements-dev.txt +test -e radosgw-agent || ln -s ./virtualenv/bin/radosgw-agent . diff --git a/debian/changelog b/debian/changelog new file mode 100644 index 0000000..f0099e9 --- /dev/null +++ b/debian/changelog @@ -0,0 +1,11 @@ +radosgw-agent (1.1-1) precise; urgency=low + + * new upstream release + + -- Gary Lowell Thu, 21 Nov 2013 16:17:25 -0800 + +radosgw-agent (1.0-1) stable; urgency=low + + * Initial release + + -- Gary Lowell Mon, 26 Aug 2013 09:19:47 -0700 diff --git a/debian/compat b/debian/compat new file mode 100644 index 0000000..45a4fb7 --- /dev/null +++ b/debian/compat @@ -0,0 +1 @@ +8 diff --git a/debian/control b/debian/control new file mode 100644 index 0000000..9810f72 --- /dev/null +++ b/debian/control @@ -0,0 +1,19 @@ +Source: radosgw-agent +Maintainer: Sage Weil +Uploaders: Sage Weil +Section: admin +Priority: optional +Build-Depends: debhelper (>= 8), python-setuptools +X-Python-Version: >= 2.4 +Standards-Version: 3.9.2 +Homepage: http://ceph.com/ + +Package: radosgw-agent +Architecture: all +Depends: python, + python-argparse, + python-setuptools, + python-requests, + ${misc:Depends}, + ${python:Depends} +Description: Rados gateway agents. diff --git a/debian/copyright b/debian/copyright new file mode 100644 index 0000000..730861e --- /dev/null +++ b/debian/copyright @@ -0,0 +1,3 @@ +Files: * +Copyright: (c) 2013 by Inktank Storage +License: LGPL2.1 (see /usr/share/common-licenses/LGPL-2.1) diff --git a/debian/rules b/debian/rules new file mode 100755 index 0000000..45200da --- /dev/null +++ b/debian/rules @@ -0,0 +1,8 @@ +#!/usr/bin/make -f + +# Uncomment this to turn on verbose mode. +export DH_VERBOSE=1 + +%: + dh $@ --buildsystem python_distutils --with python2 + diff --git a/debian/source/format b/debian/source/format new file mode 100644 index 0000000..d3827e7 --- /dev/null +++ b/debian/source/format @@ -0,0 +1 @@ +1.0 diff --git a/radosgw-agent.spec b/radosgw-agent.spec new file mode 100644 index 0000000..15e9b8a --- /dev/null +++ b/radosgw-agent.spec @@ -0,0 +1,41 @@ +%define name radosgw-agent +%define version 1.1 +%define unmangled_version 1.1 +%define unmangled_version 1.1 +%define release 1 + +Summary: Synchronize users and data between radosgw clusters +Name: %{name} +Version: %{version} +Release: %{release} +Source0: %{name}-%{unmangled_version}.tar.gz +License: MIT +Group: Development/Libraries +BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-buildroot +Prefix: %{_prefix} +BuildArch: noarch +Vendor: Josh Durgin +Requires: python-argparse +Requires: PyYAML +Requires: python-boto >= 2.2.2 +Requires: python-boto < 3.0.0 +Requires: python-requests +Url: https://github.com/ceph/radosgw-agent + +%description +UNKNOWN + +%prep +%setup -n %{name}-%{unmangled_version} -n %{name}-%{unmangled_version} + +%build +python setup.py build + +%install +python setup.py install --single-version-externally-managed -O1 --root=$RPM_BUILD_ROOT --record=INSTALLED_FILES + +%clean +rm -rf $RPM_BUILD_ROOT + +%files -f INSTALLED_FILES +%defattr(-,root,root) diff --git a/radosgw_agent/__init__.py b/radosgw_agent/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/radosgw_agent/cli.py b/radosgw_agent/cli.py new file mode 100644 index 0000000..48b19e7 --- /dev/null +++ b/radosgw_agent/cli.py @@ -0,0 +1,325 @@ +from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer +import argparse +import contextlib +import logging +import logging.handlers +import yaml +import sys + +from radosgw_agent import client +from radosgw_agent import sync + +def check_positive_int(string): + value = int(string) + if value < 1: + msg = '%r is not a positive integer' % string + raise argparse.ArgumentTypeError(msg) + return value + +def check_endpoint(endpoint): + try: + return client.parse_endpoint(endpoint) + except client.InvalidProtocol as e: + raise argparse.ArgumentTypeError(str(e)) + except client.InvalidHost as e: + raise argparse.ArgumentTypeError(str(e)) + +def parse_args(): + conf_parser = argparse.ArgumentParser(add_help=False) + conf_parser.add_argument( + '-c', '--conf', + type=file, + help='configuration file' + ) + args, remaining = conf_parser.parse_known_args() + defaults = dict( + sync_scope='incremental', + log_lock_time=20, + ) + if args.conf is not None: + with contextlib.closing(args.conf): + config = yaml.safe_load_all(args.conf) + for new in config: + defaults.update(new) + + parser = argparse.ArgumentParser( + parents=[conf_parser], + description='Synchronize radosgw installations', + ) + parser.set_defaults(**defaults) + verbosity = parser.add_mutually_exclusive_group(required=False) + verbosity.add_argument( + '-v', '--verbose', + action='store_true', dest='verbose', + help='be more verbose', + ) + verbosity.add_argument( + '-q', '--quiet', + action='store_true', dest='quiet', + help='be less verbose', + ) + parser.add_argument( + '--src-access-key', + required='src_access_key' not in defaults, + help='access key for source zone system user', + ) + parser.add_argument( + '--src-secret-key', + required='src_secret_key' not in defaults, + help='secret key for source zone system user', + ) + parser.add_argument( + '--dest-access-key', + required='dest_access_key' not in defaults, + help='access key for destination zone system user', + ) + parser.add_argument( + '--dest-secret-key', + required='dest_secret_key' not in defaults, + help='secret key for destination zone system user', + ) + parser.add_argument( + 'destination', + type=check_endpoint, + nargs=None if 'destination' not in defaults else '?', + help='radosgw endpoint to which to sync ' + '(e.g. http://zone2.example.org:8080)', + ) + src_options = parser.add_mutually_exclusive_group(required=False) + src_options.add_argument( + '--source', + type=check_endpoint, + help='radosgw endpoint from which to sync ' + '(e.g. http://zone1.example.org:8080)', + ) + src_options.add_argument( + '--src-zone', + help='radosgw zone from which to sync', + ) + parser.add_argument( + '--metadata-only', + action='store_true', + help='sync bucket and user metadata, but not bucket contents', + ) + parser.add_argument( + '--num-workers', + default=1, + type=check_positive_int, + help='number of items to sync at once', + ) + parser.add_argument( + '--sync-scope', + choices=['full', 'incremental'], + default='incremental', + help='synchronize everything (for a new region) or only things that ' + 'have changed since the last run', + ) + parser.add_argument( + '--lock-timeout', + type=check_positive_int, + default=60, + help='timeout in seconds after which a log segment lock will expire if ' + 'not refreshed', + ) + parser.add_argument( + '--log-file', + help='where to store log output', + ) + parser.add_argument( + '--max-entries', + type=check_positive_int, + default=1000, + help='maximum number of log entries to process at once during ' + 'continuous sync', + ) + parser.add_argument( + '--incremental-sync-delay', + type=check_positive_int, + default=30, + help='seconds to wait between syncs', + ) + parser.add_argument( + '--object-sync-timeout', + type=check_positive_int, + default=60 * 60 * 60, + help='seconds to wait for an individual object to sync before ' + 'assuming failure', + ) + parser.add_argument( + '--prepare-error-delay', + type=check_positive_int, + default=10, + help='seconds to wait before retrying when preparing ' + 'an incremental sync fails', + ) + parser.add_argument( + '--rgw-data-log-window', + type=check_positive_int, + default=30, + help='period until a data log entry is valid - ' + 'must match radosgw configuration', + ) + parser.add_argument( + '--test-server-host', + # host to run a simple http server for testing the sync agent on, + help=argparse.SUPPRESS, + ) + parser.add_argument( + '--test-server-port', + # port to run a simple http server for testing the sync agent on, + type=check_positive_int, + default=8080, + help=argparse.SUPPRESS, + ) + return parser.parse_args(remaining) + +class TestHandler(BaseHTTPRequestHandler): + """HTTP handler for testing radosgw-agent. + + This should never be used outside of testing. + """ + num_workers = None + lock_timeout = None + max_entries = None + rgw_data_log_window = 30 + src = None + dest = None + + def do_POST(self): + log = logging.getLogger(__name__) + status = 200 + resp = '' + sync_cls = None + if self.path.startswith('/metadata/full'): + sync_cls = sync.MetaSyncerFull + elif self.path.startswith('/metadata/incremental'): + sync_cls = sync.MetaSyncerInc + elif self.path.startswith('/data/full'): + sync_cls = sync.DataSyncerFull + elif self.path.startswith('/data/incremental'): + sync_cls = sync.DataSyncerInc + else: + log.warn('invalid request, ignoring') + status = 400 + resp = 'bad path' + + try: + if sync_cls is not None: + syncer = sync_cls(TestHandler.src, TestHandler.dest, + TestHandler.max_entries, + rgw_data_log_window=TestHandler.rgw_data_log_window, + object_sync_timeout=TestHandler.object_sync_timeout) + syncer.prepare() + syncer.sync( + TestHandler.num_workers, + TestHandler.lock_timeout, + ) + except Exception as e: + log.exception('error during sync') + status = 500 + resp = str(e) + + self.log_request(status, len(resp)) + if status >= 400: + self.send_error(status, resp) + else: + self.send_response(status) + self.end_headers() + +def main(): + args = parse_args() + log = logging.getLogger() + log_level = logging.INFO + lib_log_level = logging.WARN + if args.verbose: + log_level = logging.DEBUG + lib_log_level = logging.DEBUG + elif args.quiet: + log_level = logging.WARN + logging.basicConfig(level=log_level) + logging.getLogger('boto').setLevel(lib_log_level) + logging.getLogger('requests').setLevel(lib_log_level) + + if args.log_file is not None: + handler = logging.handlers.WatchedFileHandler( + filename=args.log_file, + ) + formatter = logging.Formatter( + fmt='%(asctime)s.%(msecs)03d %(process)d:%(levelname)s:%(name)s:%(message)s', + datefmt='%Y-%m-%dT%H:%M:%S', + ) + handler.setFormatter(formatter) + logging.getLogger().addHandler(handler) + + dest = args.destination + dest.access_key = args.dest_access_key + dest.secret_key = args.dest_secret_key + src = args.source or client.Endpoint(None, None, None) + if args.src_zone: + src.zone = args.src_zone + dest_conn = client.connection(dest) + + try: + region_map = client.get_region_map(dest_conn) + except Exception: + log.exception('Could not retrieve region map from destination') + sys.exit(1) + + try: + client.configure_endpoints(region_map, dest, src, args.metadata_only) + except client.ClientException as e: + log.error(e) + sys.exit(1) + + src.access_key = args.src_access_key + src.secret_key = args.src_secret_key + + if args.test_server_host: + log.warn('TEST MODE - do not run unless you are testing this program') + TestHandler.src = src + TestHandler.dest = dest + TestHandler.num_workers = args.num_workers + TestHandler.lock_timeout = args.lock_timeout + TestHandler.max_entries = args.max_entries + TestHandler.rgw_data_log_window = args.rgw_data_log_window + TestHandler.object_sync_timeout = args.object_sync_timeout + server = HTTPServer((args.test_server_host, args.test_server_port), + TestHandler) + server.serve_forever() + sys.exit() + + if args.sync_scope == 'full': + meta_cls = sync.MetaSyncerFull + data_cls = sync.DataSyncerFull + else: + meta_cls = sync.MetaSyncerInc + data_cls = sync.DataSyncerInc + + meta_syncer = meta_cls(src, dest, args.max_entries) + data_syncer = data_cls(src, dest, args.max_entries, + rgw_data_log_window=args.rgw_data_log_window, + object_sync_timeout=args.object_sync_timeout) + + # fetch logs first since data logs need to wait before becoming usable + # due to rgw's window of data log updates during which the bucket index + # log may still be updated without the data log getting a new entry for + # the bucket + sync.prepare_sync(meta_syncer, args.prepare_error_delay) + if not args.metadata_only: + sync.prepare_sync(data_syncer, args.prepare_error_delay) + + if args.sync_scope == 'full': + log.info('syncing all metadata') + meta_syncer.sync(args.num_workers, args.lock_timeout) + if not args.metadata_only: + log.info('syncing all data') + data_syncer.sync(args.num_workers, args.lock_timeout) + log.info('Finished full sync. Check logs to see any issues that ' + 'incremental sync will retry.') + else: + sync.incremental_sync(meta_syncer, data_syncer, + args.num_workers, + args.lock_timeout, + args.incremental_sync_delay, + args.metadata_only, + args.prepare_error_delay) diff --git a/radosgw_agent/client.py b/radosgw_agent/client.py new file mode 100644 index 0000000..2b8f027 --- /dev/null +++ b/radosgw_agent/client.py @@ -0,0 +1,461 @@ +import boto +import functools +import json +import logging +import random +import requests +import urllib +from urlparse import urlparse + +from boto.connection import AWSAuthConnection +from boto.s3.connection import S3Connection + +log = logging.getLogger(__name__) + +class Endpoint(object): + def __init__(self, host, port, secure, + access_key=None, secret_key=None, region=None, zone=None): + self.host = host + default_port = 443 if secure else 80 + self.port = port or default_port + self.secure = secure + self.access_key = access_key + self.secret_key = secret_key + self.region = region + self.zone = zone + + def __eq__(self, other): + if self.host != other.host: + return False + if self.port == other.port: + return True + # if self and other are mixed http/https with default ports, + # i.e. http://example.com and https://example.com, consider + # them the same + def diff_only_default_ports(a, b): + return a.secure and a.port == 443 and not b.secure and b.port == 80 + return (diff_only_default_ports(self, other) or + diff_only_default_ports(other, self)) + + def __repr__(self): + return 'Endpoint(host={host}, port={port}, secure={secure})'.format( + host=self.host, + port=self.port, + secure=self.secure) + + def __str__(self): + scheme = 'https' if self.secure else 'http' + return '{scheme}://{host}:{port}'.format(scheme=scheme, + host=self.host, + port=self.port) + +class ClientException(Exception): + pass +class InvalidProtocol(ClientException): + pass +class InvalidHost(ClientException): + pass +class InvalidZone(ClientException): + pass +class ZoneNotFound(ClientException): + pass + +def parse_endpoint(endpoint): + url = urlparse(endpoint) + if url.scheme not in ['http', 'https']: + raise InvalidProtocol('invalid protocol %r' % url.scheme) + if not url.hostname: + raise InvalidHost('no hostname in %r' % endpoint) + return Endpoint(url.hostname, url.port, url.scheme == 'https') + +class HttpError(ClientException): + def __init__(self, code, body): + self.code = code + self.body = body + self.message = 'Http error code %s content %s' % (code, body) + def __str__(self): + return self.message +class NotFound(HttpError): + pass +code_to_exc = { + 404: NotFound, + } + +def boto_call(func): + @functools.wraps(func) + def translate_exception(*args, **kwargs): + try: + func(*args, **kwargs) + except boto.exception.S3ResponseError as e: + raise code_to_exc.get(e.status, HttpError)(e.status, e.body) + return translate_exception + + +""" +Adapted from the build_request() method of boto.connection +""" + +def _build_request(conn, method, basepath='', resource = '', headers=None, + data=None, special_first_param=None, params=None): + path = conn.calling_format.build_path_base(basepath, resource) + auth_path = conn.calling_format.build_auth_path(basepath, resource) + host = conn.calling_format.build_host(conn.server_name(), '') + + if special_first_param: + path += '?' + special_first_param + boto.log.debug('path=%s' % path) + auth_path += '?' + special_first_param + boto.log.debug('auth_path=%s' % auth_path) + + return AWSAuthConnection.build_base_http_request( + conn, method, path, auth_path, params, headers, data, host) + +def check_result_status(result): + if result.status_code / 100 != 2: + raise code_to_exc.get(result.status_code, + HttpError)(result.status_code, result.content) +def url_safe(component): + if isinstance(component, basestring): + string = component.encode('utf8') + else: + string = str(component) + return urllib.quote(string) + +def request(connection, type_, resource, params=None, headers=None, + data=None, expect_json=True, special_first_param=None): + if headers is None: + headers = {} + + if type_ in ['put', 'post']: + headers['Content-Type'] = 'application/json; charset=UTF-8' + + request_data = data if data else '' + if params is None: + params = {} + safe_params = dict([(k, url_safe(v)) for k, v in params.iteritems()]) + request = _build_request(connection, + type_.upper(), + resource=resource, + special_first_param=special_first_param, + headers=headers, + data=request_data, + params=safe_params) + + url = '{protocol}://{host}{path}'.format(protocol=request.protocol, + host=request.host, + path=request.path) + + request.authorize(connection=connection) + + handler = getattr(requests, type_) + boto.log.debug('url = %r\nparams=%r\nheaders=%r\ndata=%r', + url, params, request.headers, data) + result = handler(url, params=params, headers=request.headers, data=data) + + check_result_status(result) + + if data or not expect_json: + return result.raw + return result.json() + +def get_metadata(connection, section, name): + return request(connection, 'get', 'admin/metadata/' + section, + params=dict(key=name)) + +def update_metadata(connection, section, name, metadata): + if not isinstance(metadata, basestring): + metadata = json.dumps(metadata) + return request(connection, 'put', 'admin/metadata/' + section, + params=dict(key=name), data=metadata) + +def delete_metadata(connection, section, name): + return request(connection, 'delete', 'admin/metadata/' + section, + params=dict(key=name), expect_json=False) + +def get_metadata_sections(connection): + return request(connection, 'get', 'admin/metadata') + +def list_metadata_keys(connection, section): + return request(connection, 'get', 'admin/metadata/' + section) + +def get_op_state(connection, client_id, op_id, bucket, obj): + return request(connection, 'get', 'admin/opstate', + params={ + 'op-id': op_id, + 'object': '{0}/{1}'.format(bucket, obj), + 'client-id': client_id, + } + ) + +def remove_op_state(connection, client_id, op_id, bucket, obj): + return request(connection, 'delete', 'admin/opstate', + params={ + 'op-id': op_id, + 'object': '{0}/{1}'.format(bucket, obj), + 'client-id': client_id, + }, + expect_json=False, + ) + +def get_bucket_list(connection): + return list_metadata_keys(connection, 'bucket') + +@boto_call +def list_objects_in_bucket(connection, bucket_name): + # use the boto library to do this + bucket = connection.get_bucket(bucket_name) + return bucket.list() + +@boto_call +def delete_object(connection, bucket_name, object_name): + bucket = connection.get_bucket(bucket_name) + bucket.delete_key(object_name) + +def sync_object_intra_region(connection, bucket_name, object_name, src_zone, + client_id, op_id): + path = '{bucket}/{object}'.format( + bucket=url_safe(bucket_name), + object=url_safe(object_name), + ) + return request(connection, 'put', path, + params={ + 'rgwx-source-zone': src_zone, + 'rgwx-client-id': client_id, + 'rgwx-op-id': op_id, + }, + headers={ + 'x-amz-copy-source': '%s/%s' % (bucket_name, object_name), + }, + expect_json=False) + +def lock_shard(connection, lock_type, shard_num, zone_id, timeout, locker_id): + return request(connection, 'post', 'admin/log', + params={ + 'type': lock_type, + 'id': shard_num, + 'length': timeout, + 'zone-id': zone_id, + 'locker-id': locker_id, + }, + special_first_param='lock', + expect_json=False) + +def unlock_shard(connection, lock_type, shard_num, zone_id, locker_id): + return request(connection, 'post', 'admin/log', + params={ + 'type': lock_type, + 'id': shard_num, + 'locker-id': locker_id, + 'zone-id': zone_id, + }, + special_first_param='unlock', + expect_json=False) + +def _id_name(type_): + return 'bucket-instance' if type_ == 'bucket-index' else 'id' + +def get_log(connection, log_type, marker, max_entries, id_): + key = _id_name(log_type) + return request(connection, 'get', 'admin/log', + params={ + 'type': log_type, + key: id_, + 'marker': marker, + 'max-entries': max_entries, + }, + ) + +def get_log_info(connection, log_type, id_): + key = _id_name(log_type) + return request( + connection, 'get', 'admin/log', + params={ + 'type': log_type, + key: id_, + }, + special_first_param='info', + ) + +def num_log_shards(connection, shard_type): + out = request(connection, 'get', 'admin/log', dict(type=shard_type)) + return out['num_objects'] + +def set_worker_bound(connection, type_, marker, timestamp, + daemon_id, id_, data=None): + if data is None: + data = [] + key = _id_name(type_) + boto.log.debug('set_worker_bound: data = %r', data) + return request( + connection, 'post', 'admin/replica_log', + params={ + 'type': type_, + key: id_, + 'marker': marker, + 'time': timestamp, + 'daemon_id': daemon_id, + }, + data=json.dumps(data), + special_first_param='work_bound', + ) + +def del_worker_bound(connection, type_, daemon_id, id_): + key = _id_name(type_) + return request( + connection, 'delete', 'admin/replica_log', + params={ + 'type': type_, + key: id_, + 'daemon_id': daemon_id, + }, + special_first_param='work_bound', + expect_json=False, + ) + +def get_worker_bound(connection, type_, id_): + key = _id_name(type_) + out = request( + connection, 'get', 'admin/replica_log', + params={ + 'type': type_, + key: id_, + }, + special_first_param='bounds', + ) + boto.log.debug('get_worker_bound returned: %r', out) + retries = set() + for item in out['markers']: + names = [retry['name'] for retry in item['items_in_progress']] + retries = retries.union(names) + return out['marker'], out['oldest_time'], retries + +class Zone(object): + def __init__(self, zone_info): + self.name = zone_info['name'] + self.is_master = False + self.endpoints = [parse_endpoint(e) for e in zone_info['endpoints']] + self.log_meta = zone_info['log_meta'] == 'true' + self.log_data = zone_info['log_data'] == 'true' + + def __repr__(self): + return str(self) + + def __str__(self): + return self.name + +class Region(object): + def __init__(self, region_info): + self.name = region_info['key'] + self.is_master = region_info['val']['is_master'] == 'true' + self.zones = {} + for zone_info in region_info['val']['zones']: + zone = Zone(zone_info) + self.zones[zone.name] = zone + if zone.name == region_info['val']['master_zone']: + zone.is_master = True + self.master_zone = zone + assert hasattr(self, 'master_zone'), \ + 'No master zone found for region ' + self.name + + def __repr__(self): + return str(self) + + def __str__(self): + return str(self.zones.keys()) + +class RegionMap(object): + def __init__(self, region_map): + self.regions = {} + for region_info in region_map['regions']: + region = Region(region_info) + self.regions[region.name] = region + if region.is_master: + self.master_region = region + assert hasattr(self, 'master_region'), \ + 'No master region found in region map' + + def __repr__(self): + return str(self) + + def __str__(self): + return str(self.regions) + + def find_endpoint(self, endpoint): + for region in self.regions.itervalues(): + for zone in region.zones.itervalues(): + if endpoint in zone.endpoints or endpoint.zone == zone.name: + return region, zone + raise ZoneNotFound('%s not found in region map' % endpoint) + +def get_region_map(connection): + region_map = request(connection, 'get', 'admin/config') + return RegionMap(region_map) + +def _validate_sync_dest(dest_region, dest_zone): + if dest_region.is_master and dest_zone.is_master: + raise InvalidZone('destination cannot be master zone of master region') + +def _validate_sync_source(src_region, src_zone, dest_region, dest_zone, + meta_only): + if not src_zone.is_master: + raise InvalidZone('source zone %s must be a master zone' % src_zone.name) + if (src_region.name == dest_region.name and + src_zone.name == dest_zone.name): + raise InvalidZone('source and destination must be different zones') + if not src_zone.log_meta: + raise InvalidZone('source zone %s must have metadata logging enabled' % src_zone.name) + if not meta_only and not src_zone.log_data: + raise InvalidZone('source zone %s must have data logging enabled' % src_zone.name) + if not meta_only and src_region.name != dest_region.name: + raise InvalidZone('data sync can only occur between zones in the same region') + if not src_zone.endpoints: + raise InvalidZone('region map contains no endpoints for default source zone %s' % src_zone.name) + +def configure_endpoints(region_map, dest_endpoint, src_endpoint, meta_only): + print('region map is: %r' % region_map) + + dest_region, dest_zone = region_map.find_endpoint(dest_endpoint) + _validate_sync_dest(dest_region, dest_zone) + + # source may be specified by http endpoint or zone name + if src_endpoint.host or src_endpoint.zone: + src_region, src_zone = region_map.find_endpoint(src_endpoint) + else: + # try the master zone in the same region, then the master zone + # in the master region + try: + _validate_sync_source(dest_region, dest_region.master_zone, + dest_region, dest_zone, meta_only) + src_region, src_zone = dest_region, dest_region.master_zone + except InvalidZone as e: + log.debug('source region %s zone %s unaccetpable: %s', + dest_region.name, dest_region.master_zone.name, e) + master_region = region_map.master_region + src_region, src_zone = master_region, master_region.master_zone + + _validate_sync_source(src_region, src_zone, dest_region, dest_zone, + meta_only) + + # choose a random source endpoint if one wasn't specified + if not src_endpoint.host: + endpoint = random.choice(src_zone.endpoints) + src_endpoint.host = endpoint.host + src_endpoint.port = endpoint.port + src_endpoint.secure = endpoint.secure + + # fill in region and zone names + dest_endpoint.region = dest_region + dest_endpoint.zone = dest_zone + src_endpoint.region = src_region + src_endpoint.zone = src_zone + +def connection(endpoint, debug=None): + return S3Connection( + aws_access_key_id=endpoint.access_key, + aws_secret_access_key=endpoint.secret_key, + is_secure=endpoint.secure, + host=endpoint.host, + port=endpoint.port, + calling_format=boto.s3.connection.OrdinaryCallingFormat(), + debug=debug, + ) diff --git a/radosgw_agent/lock.py b/radosgw_agent/lock.py new file mode 100644 index 0000000..2036f8e --- /dev/null +++ b/radosgw_agent/lock.py @@ -0,0 +1,107 @@ +import logging +import threading +import time + +from radosgw_agent import client + +log = logging.getLogger(__name__) + +class LockBroken(Exception): + pass + +class LockRenewFailed(LockBroken): + pass + +class LockExpired(LockBroken): + pass + +class Lock(threading.Thread): + """A lock on a shard log that automatically refreshes itself. + + It may be used to lock different shards throughout its lifetime. + To lock a new shard, call aquire() with the shard_num desired. + + To release the lock, call release_and_clear(). This will raise an + exception if the lock ever failed to be acquired in the timeout + period. + """ + + def __init__(self, conn, type_, locker_id, timeout, zone_id): + super(Lock, self).__init__() + self.conn = conn + self.type = type_ + self.timeout = timeout + self.lock = threading.Lock() + self.locker_id = locker_id + self.zone_id = zone_id + self.shard_num = None + self.last_locked = None + self.failed = False + + def set_shard(self, shard_num): + log.debug('set_shard to %d', shard_num) + with self.lock: + assert self.shard_num is None, \ + 'attempted to acquire new lock without releasing old one' + self.failed = False + self.last_locked = None + self.shard_num = shard_num + + def unset_shard(self): + log.debug('unset shard') + with self.lock: + self.shard_num = None + + def acquire(self): + """Renew an existing lock, or acquire a new one. + + The old lock must have already been released if shard_num is specified. + client.NotFound may be raised if the log contains no entries. + """ + log.debug('acquire lock') + with self.lock: + self._acquire() + + def _acquire(self): + # same as aqcuire() but assumes self.lock is held + now = time.time() + client.lock_shard(self.conn, self.type, self.shard_num, + self.zone_id, self.timeout, self.locker_id) + self.last_locked = now + + def release_and_clear(self): + """Release the lock currently being held. + + Prevent it from being automatically renewed, and check if there + were any errors renewing the current lock or if it expired. + If the lock was not sustained, raise LockAcquireFailed or LockExpired. + """ + log.debug('release and clear lock') + with self.lock: + shard_num = self.shard_num + self.shard_num = None + diff = time.time() - self.last_locked + if diff > self.timeout: + msg = 'lock was not renewed in over %0.2f seconds' % diff + raise LockExpired(msg) + if self.failed: + raise LockRenewFailed() + try: + client.unlock_shard(self.conn, self.type, shard_num, + self.zone_id, self.locker_id) + except client.HttpError as e: + log.warn('failed to unlock shard %d in zone %s: %s', + shard_num, self.zone_id, e) + self.last_locked = None + + def run(self): + while True: + with self.lock: + if self.shard_num is not None: + try: + self._acquire() + except client.HttpError as e: + log.error('locking shard %d in zone %s failed: %s', + self.shard_num, self.zone_id, e) + self.failed = True + time.sleep(0.5 * self.timeout) diff --git a/radosgw_agent/sync.py b/radosgw_agent/sync.py new file mode 100644 index 0000000..67ebea6 --- /dev/null +++ b/radosgw_agent/sync.py @@ -0,0 +1,318 @@ +import logging +import multiprocessing +import time + +from radosgw_agent import worker +from radosgw_agent import client + +log = logging.getLogger(__name__) + +# the replica log api only supports one entry, and updating it +# requires sending a daemon id that matches the existing one. This +# doesn't make a whole lot of sense with the current structure of +# radosgw-agent, so just use a constant value for the daemon id. +DAEMON_ID = 'radosgw-agent' + +def prepare_sync(syncer, error_delay): + """Attempt to prepare a syncer for running a sync. + + :param error_delay: seconds to wait before retrying + + This will retry forever so the sync agent continues if radosgws + are unavailable temporarily. + """ + while True: + try: + syncer.prepare() + break + except Exception: + log.warn('error preparing for sync, will retry. Traceback:', + exc_info=True) + time.sleep(error_delay) + +def incremental_sync(meta_syncer, data_syncer, num_workers, lock_timeout, + incremental_sync_delay, metadata_only, error_delay): + """Run a continuous incremental sync. + + This will run forever, pausing between syncs by a + incremental_sync_delay seconds. + """ + while True: + try: + meta_syncer.sync(num_workers, lock_timeout) + if not metadata_only: + data_syncer.sync(num_workers, lock_timeout) + except Exception: + log.warn('error doing incremental sync, will try again. Traceback:', + exc_info=True) + + # prepare data before sleeping due to rgw_log_bucket_window + if not metadata_only: + prepare_sync(data_syncer, error_delay) + log.info('waiting %d seconds until next sync', + incremental_sync_delay) + time.sleep(incremental_sync_delay) + prepare_sync(meta_syncer, error_delay) + +class Syncer(object): + def __init__(self, src, dest, max_entries, *args, **kwargs): + self.src = src + self.dest = dest + self.src_conn = client.connection(src) + self.dest_conn = client.connection(dest) + self.daemon_id = DAEMON_ID + self.worker_cls = None # filled in by subclass constructor + self.num_shards = None + self.max_entries = max_entries + self.object_sync_timeout = kwargs.get('object_sync_timeout') + + def init_num_shards(self): + if self.num_shards is not None: + return + try: + self.num_shards = client.num_log_shards(self.src_conn, self.type) + log.debug('%d shards to check', self.num_shards) + except Exception: + log.error('finding number of shards failed') + raise + + def shard_num_for_key(self, key): + key = key.encode('utf8') + hash_val = 0 + for char in key: + c = ord(char) + hash_val = (hash_val + (c << 4) + (c >> 4)) * 11 + return hash_val % self.num_shards + + def prepare(self): + """Setup any state required before syncing starts. + + This must be called before sync(). + """ + pass + + def generate_work(self): + """Generate items to be place in a queue or processing""" + pass + + def wait_until_ready(self): + pass + + def complete_item(self, shard_num, retries): + """Called when syncing a single item completes successfully""" + marker = self.shard_info.get(shard_num) + if not marker: + return + try: + data = [dict(name=retry, time=worker.DEFAULT_TIME) + for retry in retries] + client.set_worker_bound(self.dest_conn, + self.type, + marker, + worker.DEFAULT_TIME, + self.daemon_id, + shard_num, + data) + except Exception: + log.warn('could not set worker bounds, may repeat some work.' + 'Traceback:', exc_info=True) + + def sync(self, num_workers, log_lock_time): + workQueue = multiprocessing.Queue() + resultQueue = multiprocessing.Queue() + + processes = [self.worker_cls(workQueue, + resultQueue, + log_lock_time, + self.src, + self.dest, + daemon_id=self.daemon_id, + max_entries=self.max_entries, + object_sync_timeout=self.object_sync_timeout, + ) + for i in xrange(num_workers)] + for process in processes: + process.daemon = True + process.start() + + self.wait_until_ready() + + log.info('Starting sync') + # enqueue the shards to be synced + num_items = 0 + for item in self.generate_work(): + num_items += 1 + workQueue.put(item) + + # add a poison pill for each worker + for i in xrange(num_workers): + workQueue.put(None) + + # pull the results out as they are produced + retries = {} + for i in xrange(num_items): + result, item = resultQueue.get() + shard_num, retries = item + if result == worker.RESULT_SUCCESS: + log.debug('synced item %r successfully', item) + self.complete_item(shard_num, retries) + else: + log.error('error syncing shard %d', shard_num) + retries.append(shard_num) + + log.info('%d/%d items processed', i + 1, num_items) + if retries: + log.error('Encountered errors syncing these %d shards: %r', + len(retries), retries) + + +class IncrementalSyncer(Syncer): + + def get_worker_bound(self, shard_num): + try: + marker, timestamp, retries = client.get_worker_bound( + self.dest_conn, + self.type, + shard_num) + log.debug('oldest marker and time for shard %d are: %r %r', + shard_num, marker, timestamp) + log.debug('%d items to retrie are: %r', len(retries), retries) + except client.NotFound: + # if no worker bounds have been set, start from the beginning + marker, retries = '', [] + return marker, retries + + def get_log_entries(self, shard_num, marker): + try: + result = client.get_log(self.src_conn, self.type, + marker, self.max_entries, + shard_num) + last_marker = result['marker'] + log_entries = result['entries'] + if len(log_entries) == self.max_entries: + log.warn('shard %d log has fallen behind - log length >= %d', + shard_num, self.max_entries) + except client.NotFound: + # no entries past this marker yet, but we my have retries + last_marker = '' + log_entries = [] + return last_marker, log_entries + + def prepare(self): + self.init_num_shards() + + self.shard_info = {} + self.shard_work = {} + for shard_num in xrange(self.num_shards): + marker, retries = self.get_worker_bound(shard_num) + last_marker, log_entries = self.get_log_entries(shard_num, marker) + self.shard_work[shard_num] = log_entries, retries + self.shard_info[shard_num] = last_marker + + self.prepared_at = time.time() + + def generate_work(self): + return self.shard_work.iteritems() + + +class MetaSyncerInc(IncrementalSyncer): + + def __init__(self, *args, **kwargs): + super(MetaSyncerInc, self).__init__(*args, **kwargs) + self.worker_cls = worker.MetadataWorkerIncremental + self.type = 'metadata' + + +class DataSyncerInc(IncrementalSyncer): + + def __init__(self, *args, **kwargs): + super(DataSyncerInc, self).__init__(*args, **kwargs) + self.worker_cls = worker.DataWorkerIncremental + self.type = 'data' + self.rgw_data_log_window = kwargs.get('rgw_data_log_window', 30) + + def wait_until_ready(self): + log.info('waiting to make sure bucket log is consistent') + while time.time() < self.prepared_at + self.rgw_data_log_window: + time.sleep(1) + + +class DataSyncerFull(Syncer): + + def __init__(self, *args, **kwargs): + super(DataSyncerFull, self).__init__(*args, **kwargs) + self.worker_cls = worker.DataWorkerFull + self.type = 'data' + self.rgw_data_log_window = kwargs.get('rgw_data_log_window', 30) + + def prepare(self): + self.init_num_shards() + + # save data log markers for each shard + self.shard_info = {} + for shard in xrange(self.num_shards): + info = client.get_log_info(self.src_conn, 'data', shard) + # setting an empty marker returns an error + if info['marker']: + self.shard_info[shard] = info['marker'] + + # get list of buckets after getting any markers to avoid skipping + # entries added before we got the marker info + buckets = client.get_bucket_list(self.src_conn) + + self.prepared_at = time.time() + + self.buckets_by_shard = {} + for bucket in buckets: + shard = self.shard_num_for_key(bucket) + self.buckets_by_shard.setdefault(shard, []) + self.buckets_by_shard[shard].append(bucket) + + def generate_work(self): + return self.buckets_by_shard.iteritems() + + def wait_until_ready(self): + log.info('waiting to make sure bucket log is consistent') + while time.time() < self.prepared_at + self.rgw_data_log_window: + time.sleep(1) + + +class MetaSyncerFull(Syncer): + def __init__(self, *args, **kwargs): + super(MetaSyncerFull, self).__init__(*args, **kwargs) + self.worker_cls = worker.MetadataWorkerFull + self.type = 'metadata' + + def prepare(self): + try: + self.sections = client.get_metadata_sections(self.src_conn) + except client.HttpError as e: + log.error('Error listing metadata sections: %s', e) + raise + + # grab the lastest shard markers and timestamps before we sync + self.shard_info = {} + self.init_num_shards() + for shard_num in xrange(self.num_shards): + info = client.get_log_info(self.src_conn, 'metadata', shard_num) + # setting an empty marker returns an error + if info['marker']: + self.shard_info[shard_num] = info['marker'] + + self.metadata_by_shard = {} + for section in self.sections: + try: + for key in client.list_metadata_keys(self.src_conn, section): + shard = self.shard_num_for_key(section + ':' + key) + self.metadata_by_shard.setdefault(shard, []) + self.metadata_by_shard[shard].append((section, key)) + except client.NotFound: + # no keys of this type exist + continue + except client.HttpError as e: + log.error('Error listing metadata for section %s: %s', + section, e) + raise + + def generate_work(self): + return self.metadata_by_shard.iteritems() diff --git a/radosgw_agent/tests/__init__.py b/radosgw_agent/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/radosgw_agent/tests/test_client.py b/radosgw_agent/tests/test_client.py new file mode 100644 index 0000000..30b926a --- /dev/null +++ b/radosgw_agent/tests/test_client.py @@ -0,0 +1,304 @@ +import py.test + +from radosgw_agent import client + +REGION_MAP = { + "regions": [ + { + "val": { + "zones": [ + { + "endpoints": [ + "http://vit:8001/" + ], + "log_data": "true", + "log_meta": "true", + "name": "skinny-1" + }, + { + "endpoints": [ + "http://vit:8002/" + ], + "log_data": "false", + "log_meta": "false", + "name": "skinny-2" + } + ], + "name": "skinny", + "default_placement": "", + "master_zone": "skinny-1", + "api_name": "slim", + "placement_targets": [], + "is_master": "true", + "endpoints": [ + "http://skinny:80/" + ] + }, + "key": "skinny" + }, + { + "val": { + "zones": [ + { + "endpoints": [ + "http://vit:8003/" + ], + "log_data": "false", + "log_meta": "false", + "name": "swab-2" + }, + { + "endpoints": [ + "http://vit:8004/" + ], + "log_data": "false", + "log_meta": "false", + "name": "swab-3" + }, + { + "endpoints": [ + "http://vit:8000/" + ], + "log_data": "true", + "log_meta": "true", + "name": "swab-1" + } + ], + "name": "swab", + "default_placement": "", + "master_zone": "swab-1", + "api_name": "shady", + "placement_targets": [], + "is_master": "false", + "endpoints": [ + "http://vit:8000/" + ] + }, + "key": "swab" + }, + { + "val": { + "zones": [ + { + "endpoints": [ + "http://ro:80/" + ], + "log_data": "false", + "log_meta": "false", + "name": "ro-1" + }, + { + "endpoints": [ + "http://ro:8080/" + ], + "log_data": "false", + "log_meta": "false", + "name": "ro-2" + }, + ], + "name": "readonly", + "default_placement": "", + "master_zone": "ro-1", + "api_name": "readonly", + "placement_targets": [], + "is_master": "false", + "endpoints": [ + "http://ro:80/", + "http://ro:8080/" + ] + }, + "key": "readonly" + }, + { + "val": { + "zones": [ + { + "endpoints": [ + "http://meta:80/" + ], + "log_data": "false", + "log_meta": "true", + "name": "meta-1" + }, + { + "endpoints": [ + "http://meta:8080/" + ], + "log_data": "false", + "log_meta": "false", + "name": "meta-2" + }, + ], + "name": "metaonly", + "default_placement": "", + "master_zone": "meta-1", + "api_name": "metaonly", + "placement_targets": [], + "is_master": "false", + "endpoints": [ + "http://meta:80/", + "http://meta:8080/" + ] + }, + "key": "metaonly" + } + ], + "master_region": "skinny" + } + +def test_endpoint_default_port(): + endpoint = client.Endpoint('example.org', None, True) + assert endpoint.port == 443 + endpoint = client.Endpoint('example.org', None, False) + assert endpoint.port == 80 + +def test_endpoint_port_specified(): + endpoint = client.Endpoint('example.org', 80, True) + assert endpoint.port == 80 + endpoint = client.Endpoint('example.org', 443, True) + assert endpoint.port == 443 + +def test_endpoint_equality(): + default_port = client.Endpoint('a.org', None, True) + secure = client.Endpoint('a.org', 443, True) + insecure = client.Endpoint('a.org', 80, False) + assert default_port == secure + assert secure == insecure + assert insecure == default_port + +def test_endpoint_inequality(): + base = client.Endpoint('a.org', 80, True) + diff_host = client.Endpoint('b.org', 80, True) + diff_port = client.Endpoint('a.org', 81, True) + insecure = client.Endpoint('a.org', 8080, False) + assert base != diff_host + assert base != diff_port + assert base != insecure + +def test_parse_endpoint(): + endpoints = { + 'http://example.org': ('example.org', 80, False), + 'https://example.org': ('example.org', 443, True), + 'https://example.org:8080': ('example.org', 8080, True), + 'https://example.org:8080/': ('example.org', 8080, True), + 'http://example.org:81/a/b/c?b#d': ('example.org', 81, False), + } + for url, (host, port, secure) in endpoints.iteritems(): + endpoint = client.parse_endpoint(url) + assert endpoint.port == port + assert endpoint.host == host + assert endpoint.secure == secure + +def test_parse_endpoint_bad_input(): + with py.test.raises(client.InvalidProtocol): + client.parse_endpoint('ftp://example.com') + with py.test.raises(client.InvalidHost): + client.parse_endpoint('http://:80/') + +def _test_configure_endpoints(dest_url, dest_region, dest_zone, + expected_src_url, expected_src_region, + expected_src_zone, specified_src_url=None, + meta_only=False): + dest = client.parse_endpoint(dest_url) + if specified_src_url is not None: + src = client.parse_endpoint(specified_src_url) + else: + src = client.Endpoint(None, None, None) + region_map = client.RegionMap(REGION_MAP) + client.configure_endpoints(region_map, dest, src, meta_only) + assert dest.region.name == dest_region + assert dest.zone.name == dest_zone + assert src == client.parse_endpoint(expected_src_url) + assert src.region.name == expected_src_region + assert src.zone.name == expected_src_zone + +def test_configure_endpoints_2nd_region_master_zone_meta(): + _test_configure_endpoints('http://vit:8000', 'swab', 'swab-1', + 'http://vit:8001', 'skinny', 'skinny-1', + meta_only=True) + +def test_configure_endpoints_2nd_region_master_zone_data(): + with py.test.raises(client.InvalidZone): + _test_configure_endpoints('http://vit:8000', 'swab', 'swab-1', + 'http://vit:8001', 'skinny', 'skinny-1', + meta_only=False) + +def test_configure_endpoints_master_region_2nd_zone(): + _test_configure_endpoints('http://vit:8002', 'skinny', 'skinny-2', + 'http://vit:8001', 'skinny', 'skinny-1') + +def test_configure_endpoints_2nd_region_2nd_zone(): + _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2', + 'http://vit:8000', 'swab', 'swab-1') + +def test_configure_endpoints_2nd_region_readonly_meta(): + _test_configure_endpoints('http://ro:8080', 'readonly', 'ro-2', + 'http://vit:8001', 'skinny', 'skinny-1', + meta_only=True) + +def test_configure_endpoints_2nd_region_readonly_data(): + with py.test.raises(client.InvalidZone): + _test_configure_endpoints('http://ro:8080', 'readonly', 'ro-2', + 'http://vit:8001', 'skinny', 'skinny-1', + meta_only=False) + +def test_configure_endpoints_2nd_region_metaonly_meta(): + _test_configure_endpoints('http://meta:8080', 'metaonly', 'meta-2', + 'http://meta:80', 'metaonly', 'meta-1', + meta_only=True) + +def test_configure_endpoints_2nd_region_metaonly_data(): + with py.test.raises(client.InvalidZone): + _test_configure_endpoints('http://meta:8080', 'metaonly', 'meta-2', + 'http://vit:8001', 'skinny', 'skinny-1', + meta_only=False) + +def test_configure_endpoints_master_region_master_zone(): + with py.test.raises(client.InvalidZone): + _test_configure_endpoints('http://vit:8001', 'skinny', 'skinny-1', + 'http://vit:8001', 'skinny', 'skinny-1') + +def test_configure_endpoints_specified_src_same_region(): + _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2', + 'http://vit:8000', 'swab', 'swab-1', + 'http://vit:8000') + +def test_configure_endpoints_specified_src_master_region_meta(): + _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2', + 'http://vit:8001', 'skinny', 'skinny-1', + 'http://vit:8001', meta_only=True) + +def test_configure_endpoints_specified_src_master_region_data(): + with py.test.raises(client.InvalidZone): + _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2', + 'http://vit:8001', 'skinny', 'skinny-1', + 'http://vit:8001', meta_only=False) + +def test_configure_endpoints_bad_src_same_region(): + with py.test.raises(client.InvalidZone): + _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2', + 'http://vit:8004', 'swab', 'swab-3', + 'http://vit:8004') + +def test_configure_endpoints_bad_src_master_region(): + with py.test.raises(client.InvalidZone): + _test_configure_endpoints('http://vit:8003', 'swab', 'swab-2', + 'http://vit:8002', 'skinny', 'skinny-2', + 'http://vit:8002') + +def test_configure_endpoints_bad_src_same_zone(): + with py.test.raises(client.InvalidZone): + _test_configure_endpoints('http://vit:8000', 'swab', 'swab-1', + 'http://vit:8000', 'swab', 'swab-1', + 'http://vit:8000') + +def test_configure_endpoints_specified_nonexistent_src(): + with py.test.raises(client.ZoneNotFound): + _test_configure_endpoints('http://vit:8005', 'skinny', 'skinny-1', + 'http://vit:8001', 'skinny', 'skinny-1', + 'http://vit:80') + +def test_configure_endpoints_unknown_zone(): + with py.test.raises(client.ZoneNotFound): + _test_configure_endpoints('http://vit:8005', 'skinny', 'skinny-1', + 'http://vit:8001', 'skinny', 'skinny-1') diff --git a/radosgw_agent/worker.py b/radosgw_agent/worker.py new file mode 100644 index 0000000..7de850b --- /dev/null +++ b/radosgw_agent/worker.py @@ -0,0 +1,492 @@ +from collections import namedtuple +import logging +import multiprocessing +import os +import socket +import time + +from radosgw_agent import client +from radosgw_agent import lock + +log = logging.getLogger(__name__) + +RESULT_SUCCESS = 0 +RESULT_ERROR = 1 + +class SkipShard(Exception): + pass + +class SyncError(Exception): + pass +class SyncTimedOut(SyncError): + pass +class SyncFailed(SyncError): + pass + +DEFAULT_TIME = '1970-01-01 00:00:00' + +class Worker(multiprocessing.Process): + """sync worker to run in its own process""" + + def __init__(self, work_queue, result_queue, log_lock_time, + src, dest, **kwargs): + super(Worker, self).__init__() + self.src = src + self.dest = dest + self.work_queue = work_queue + self.result_queue = result_queue + self.log_lock_time = log_lock_time + self.lock = None + + self.local_lock_id = socket.gethostname() + ':' + str(os.getpid()) + + # construct the two connection objects + self.src_conn = client.connection(src) + self.dest_conn = client.connection(dest) + + def prepare_lock(self): + assert self.lock is None + self.lock = lock.Lock(self.dest_conn, self.type, self.local_lock_id, + self.log_lock_time, self.dest.zone.name) + self.lock.daemon = True + self.lock.start() + + def lock_shard(self, shard_num): + result = shard_num, [] + try: + self.lock.set_shard(shard_num) + self.lock.acquire() + except client.NotFound: + # no log means nothing changed this shard yet + self.lock.unset_shard() + self.result_queue.put((RESULT_SUCCESS, result)) + raise SkipShard('no log for shard') + except Exception: + log.warn('error locking shard %d log, ' + ' skipping for now. Traceback: ', + shard_num, exc_info=True) + self.lock.unset_shard() + self.result_queue.put((RESULT_ERROR, result)) + raise SkipShard() + + def unlock_shard(self): + try: + self.lock.release_and_clear() + except lock.LockBroken as e: + log.warn('work may be duplicated: %s', e) + except Exception as e: + log.warn('error unlocking log, continuing anyway ' + 'since lock will timeout. Traceback:', exc_info=True) + + def set_bound(self, key, marker, retries, type_=None): + # api doesn't allow setting a bound with a blank marker + if marker: + if type_ is None: + type_ = self.type + try: + data = [dict(name=item, time=DEFAULT_TIME) for item in retries] + client.set_worker_bound(self.dest_conn, + type_, + marker, + DEFAULT_TIME, + self.daemon_id, + key, + data=data) + return RESULT_SUCCESS + except Exception: + log.warn('error setting worker bound for key "%s",' + ' may duplicate some work later. Traceback:', key, + exc_info=True) + return RESULT_ERROR + +MetadataEntry = namedtuple('MetadataEntry', + ['section', 'name', 'marker', 'timestamp']) + +def _meta_entry_from_json(entry): + return MetadataEntry( + entry['section'], + entry['name'], + entry['id'], + entry['timestamp'], + ) + +BucketIndexEntry = namedtuple('BucketIndexEntry', + ['object', 'marker', 'timestamp']) + +def _bi_entry_from_json(entry): + return BucketIndexEntry( + entry['object'], + entry['op_id'], + entry['timestamp'], + ) + +class IncrementalMixin(object): + """This defines run() and get_and_process_entries() for incremental sync. + + These are the same for data and metadata sync, so share their + implementation here. + """ + + def run(self): + self.prepare_lock() + while True: + item = self.work_queue.get() + if item is None: + log.info('process %s is done. Exiting', self.ident) + break + + shard_num, (log_entries, retries) = item + + log.info('%s is processing shard number %d', + self.ident, shard_num) + + # first, lock the log + try: + self.lock_shard(shard_num) + except SkipShard: + continue + + result = RESULT_SUCCESS + try: + new_retries = self.sync_entries(log_entries, retries) + except Exception: + log.exception('syncing entries for shard %d failed', + shard_num) + result = RESULT_ERROR + new_retries = [] + + # finally, unlock the log + self.unlock_shard() + self.result_queue.put((result, (shard_num, new_retries))) + log.info('finished processing shard %d', shard_num) + + +class DataWorker(Worker): + + def __init__(self, *args, **kwargs): + super(DataWorker, self).__init__(*args, **kwargs) + self.type = 'data' + self.op_id = 0 + self.object_sync_timeout = kwargs.get('object_sync_timeout', 60 * 60 * 60) + self.daemon_id = kwargs['daemon_id'] + + def sync_object(self, bucket, obj): + log.debug('sync_object %s/%s', bucket, obj) + self.op_id += 1 + local_op_id = self.local_lock_id + ':' + str(self.op_id) + try: + found = True + until = time.time() + self.object_sync_timeout + client.sync_object_intra_region(self.dest_conn, bucket, obj, + self.src.zone.name, + self.daemon_id, + local_op_id) + except client.NotFound: + found = False + log.debug('"%s/%s" not found on master, deleting from secondary', + bucket, obj) + try: + client.delete_object(self.dest_conn, bucket, obj) + except client.NotFound: + # Since we were trying to delete the object, just return + return + except Exception: + msg = 'could not delete "%s/%s" from secondary' % (bucket, obj) + log.exception(msg) + raise SyncFailed(msg) + except SyncFailed: + raise + except Exception as e: + log.debug('exception during sync: %s', e) + if found: + self.wait_for_object(bucket, obj, until, local_op_id) + # TODO: clean up old op states + try: + if found: + client.remove_op_state(self.dest_conn, self.daemon_id, + local_op_id, bucket, obj) + except Exception: + log.exception('could not remove op state for daemon "%s" op_id %s', + self.daemon_id, local_op_id) + + def wait_for_object(self, bucket, obj, until, local_op_id): + while time.time() < until: + try: + state = client.get_op_state(self.dest_conn, + self.daemon_id, + local_op_id, + bucket, obj) + log.debug('op state is %s', state) + state = state[0]['state'] + if state == 'complete': + return + elif state != 'in-progress': + raise SyncFailed('state is {0}'.format(state)) + time.sleep(1) + except SyncFailed: + raise + except Exception as e: + log.debug('error geting op state: %s', e, exc_info=True) + time.sleep(1) + # timeout expired + raise SyncTimedOut() + + def get_bucket_instance(self, bucket): + metadata = client.get_metadata(self.src_conn, 'bucket', bucket) + return bucket + ':' + metadata['data']['bucket']['bucket_id'] + + def get_bucket(self, bucket_instance): + return bucket_instance.split(':', 1)[0] + + def sync_bucket(self, bucket, objects): + log.info('syncing bucket "%s"', bucket) + retry_objs = [] + count = 0 + for obj in objects: + count += 1 + # sync each object + log.debug('syncing object "%s/%s"', bucket, obj), + try: + self.sync_object(bucket, obj) + except SyncError as err: + log.error('failed to sync object %s/%s: %s', + bucket, obj, err) + retry_objs.append(obj) + + log.debug('bucket {bucket} has {num_objects} object'.format( + bucket=bucket, num_objects=count)) + if retry_objs: + log.debug('these objects failed to be synced and will be during ' + 'the next incremental sync: %s', retry_objs) + + return retry_objs + + +class DataWorkerIncremental(IncrementalMixin, DataWorker): + + def __init__(self, *args, **kwargs): + super(DataWorkerIncremental, self).__init__(*args, **kwargs) + self.max_entries = kwargs['max_entries'] + + def get_bucket_instance_entries(self, marker, instance): + entries = [] + while True: + try: + log_entries = client.get_log(self.src_conn, 'bucket-index', + marker, self.max_entries, instance) + except client.NotFound: + log_entries = [] + + log.debug('bucket instance "%s" has %d entries after "%s"', instance, + len(log_entries), marker) + + try: + entries += [_bi_entry_from_json(entry) for entry in log_entries] + except KeyError: + log.error('log missing key is: %s', log_entries) + raise + + if entries: + marker = entries[-1].marker + else: + marker = '' + + if len(log_entries) < self.max_entries: + break + return marker, entries + + def inc_sync_bucket_instance(self, instance, marker, timestamp, retries): + max_marker, entries = self.get_bucket_instance_entries(marker, instance) + objects = set([entry.object for entry in entries]) + bucket = self.get_bucket(instance) + new_retries = self.sync_bucket(bucket, objects.union(retries)) + + result = self.set_bound(instance, max_marker, new_retries, + 'bucket-index') + if new_retries: + result = RESULT_ERROR + return result + + def sync_entries(self, log_entries, retries): + try: + bucket_instances = set([entry['key'] for entry in log_entries]) + except KeyError: + log.error('log containing bad key is: %s', log_entries) + raise + + new_retries = [] + for bucket_instance in bucket_instances.union(retries): + try: + marker, timestamp, retries = client.get_worker_bound( + self.dest_conn, + 'bucket-index', + bucket_instance) + except client.NotFound: + log.debug('no worker bound found for bucket instance "%s"', + bucket_instance) + marker, timestamp, retries = '', DEFAULT_TIME, [] + try: + sync_result = self.inc_sync_bucket_instance(bucket_instance, + marker, + timestamp, + retries) + except Exception as e: + log.warn('error syncing bucket instance "%s": %s', + bucket_instance, e, exc_info=True) + sync_result = RESULT_ERROR + if sync_result == RESULT_ERROR: + new_retries.append(bucket_instance) + + return new_retries + +class DataWorkerFull(DataWorker): + + def full_sync_bucket(self, bucket): + try: + instance = self.get_bucket_instance(bucket) + try: + marker = client.get_log_info(self.src_conn, 'bucket-index', + instance)['max_marker'] + except client.NotFound: + marker = '' + log.debug('bucket instance is "%s" with marker %s', instance, marker) + # nothing to do for this bucket + if not marker: + return True + + objects = client.list_objects_in_bucket(self.src_conn, bucket) + if not objects: + return True + except Exception as e: + log.error('error preparing for full sync of bucket "%s": %s', + bucket, e) + return False + + retries = self.sync_bucket(bucket, objects) + + result = self.set_bound(instance, marker, retries, 'bucket-index') + return not retries and result == RESULT_SUCCESS + + def run(self): + self.prepare_lock() + while True: + item = self.work_queue.get() + if item is None: + log.info('No more entries in queue, exiting') + break + + shard_num, buckets = item + + # first, lock the log + try: + self.lock_shard(shard_num) + except SkipShard: + continue + + # attempt to sync each bucket, add to a list to retry + # during incremental sync if sync fails + retry_buckets = [] + for bucket in buckets: + if not self.full_sync_bucket(bucket): + retry_buckets.append(bucket) + + # unlock shard and report buckets to retry during incremental sync + self.unlock_shard() + self.result_queue.put((RESULT_SUCCESS, (shard_num, retry_buckets))) + log.info('finished syncing shard %d', shard_num) + log.info('incremental sync will need to retry buckets: %s', + retry_buckets) + +class MetadataWorker(Worker): + + def __init__(self, *args, **kwargs): + super(MetadataWorker, self).__init__(*args, **kwargs) + self.type = 'metadata' + + def sync_meta(self, section, name): + log.debug('syncing metadata type %s key "%s"', section, name) + try: + metadata = client.get_metadata(self.src_conn, section, name) + except client.NotFound: + log.debug('%s "%s" not found on master, deleting from secondary', + section, name) + try: + client.delete_metadata(self.dest_conn, section, name) + except client.NotFound: + # Since this error is handled appropriately, return success + return RESULT_SUCCESS + except Exception as e: + log.warn('error getting metadata for %s "%s": %s', + section, name, e, exc_info=True) + return RESULT_ERROR + else: + try: + client.update_metadata(self.dest_conn, section, name, metadata) + return RESULT_SUCCESS + except Exception as e: + log.warn('error updating metadata for %s "%s": %s', + section, name, e, exc_info=True) + return RESULT_ERROR + +class MetadataWorkerIncremental(IncrementalMixin, MetadataWorker): + + def __init__(self, *args, **kwargs): + super(MetadataWorkerIncremental, self).__init__(*args, **kwargs) + + def sync_entries(self, log_entries, retries): + try: + entries = [_meta_entry_from_json(entry) for entry in log_entries] + except KeyError: + log.error('log containing bad key is: %s', log_entries) + raise + + new_retries = [] + mentioned = set([(entry.section, entry.name) for entry in entries]) + split_retries = [tuple(entry.split('/', 1)) for entry in retries] + for section, name in mentioned.union(split_retries): + sync_result = self.sync_meta(section, name) + if sync_result == RESULT_ERROR: + new_retries.append(section + '/' + name) + + return new_retries + +class MetadataWorkerFull(MetadataWorker): + + def empty_result(self, shard): + return shard, [] + + def run(self): + self.prepare_lock() + while True: + item = self.work_queue.get() + if item is None: + log.info('No more entries in queue, exiting') + break + + log.debug('syncing item "%s"', item) + + shard_num, metadata = item + + # first, lock the log + try: + self.lock_shard(shard_num) + except SkipShard: + continue + + # attempt to sync each bucket, add to a list to retry + # during incremental sync if sync fails + retries = [] + for section, name in metadata: + try: + self.sync_meta(section, name) + except Exception as e: + log.warn('could not sync %s "%s", saving for retry: %s', + section, name, e, exc_info=True) + retries.append(section + '/' + name) + + # unlock shard and report buckets to retry during incremental sync + self.unlock_shard() + self.result_queue.put((RESULT_SUCCESS, (shard_num, retries))) + log.info('finished syncing shard %d', shard_num) + log.info('incremental sync will need to retry items: %s', + retries) diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..43b2434 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,3 @@ +pytest >=2.1.3 +mock >=1.0 +tox >=1.2 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2ac9bc1 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +argparse +boto >=2.2.2,<3.0.0 +requests >=1.2.1 +PyYAML diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..cf24a05 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,2 @@ +[bdist_rpm] +requires = python-argparse,PyYAML,python-boto >= 2.2.2,python-boto < 3.0.0 python-requests diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..6a9144b --- /dev/null +++ b/setup.py @@ -0,0 +1,39 @@ +#!/usr/bin/python +from setuptools import setup, find_packages +import sys + + +install_requires = [] +pyversion = sys.version_info[:2] +if pyversion < (2, 7) or (3, 0) <= pyversion <= (3, 1): + install_requires.append('argparse') + +setup( + name='radosgw-agent', + version='1.1', + packages=find_packages(), + + author='Josh Durgin', + author_email='josh.durgin@inktank.com', + description='Synchronize users and data between radosgw clusters', + license='MIT', + keywords='radosgw ceph radosgw-agent', + url="https://github.com/ceph/radosgw-agent", + + install_requires=[ + 'setuptools', + 'boto >=2.2.2,<3.0.0', + 'requests >=1.2.1', + ] + install_requires, + + test_requires=[ + 'pytest >=2.1.3', + 'mock >=1.0', + ], + + entry_points={ + 'console_scripts': [ + 'radosgw-agent = radosgw_agent.cli:main', + ], + }, + ) diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..74acee7 --- /dev/null +++ b/tox.ini @@ -0,0 +1,8 @@ +[tox] +envlist = py26 + +[testenv] +deps= + pytest + mock +commands=py.test -s -v {posargs:radosgw_agent/tests} -- 2.47.3