]> git-server-git.apps.pok.os.sepia.ceph.com Git - radosgw-agent.git/commitdiff
Getting code readh for handoff
authorJoe Buck <jbbuck@gmail.com>
Tue, 27 Aug 2013 18:14:06 +0000 (11:14 -0700)
committerJosh Durgin <josh.durgin@inktank.com>
Mon, 9 Sep 2013 20:47:14 +0000 (13:47 -0700)
Signed-off-by: Joe Buck <jbbuck@gmail.com>
radosgw_agent/cli.py
radosgw_agent/client.py
radosgw_agent/sync.py
radosgw_agent/worker.py

index 107fabc96669e7448dd26275734877e8791d991a..f0a76c4fdc8b04972a0057a788ae427e3a1abb32 100644 (file)
@@ -49,6 +49,11 @@ def parse_args():
         action='store_true', dest='quiet',
         help='be less verbose',
         )
+    parser.add_argument(
+        '--data',
+        action='store_true', dest='data',
+        help='sync data',
+        )
     parser.add_argument(
         '--src-access-key',
         required='src_access_key' not in defaults,
@@ -171,17 +176,17 @@ class TestHandler(BaseHTTPRequestHandler):
         resp = ''
         if self.path.startswith('/metadata/full'):
             try:
-                TestHandler.syncer.sync_full(TestHandler.num_workers,
-                                             TestHandler.lock_timeout)
+                TestHandler.syncer.metadata_sync_full(TestHandler.num_workers,
+                                                      TestHandler.lock_timeout)
             except Exception as e:
                 log.exception('error doing full sync')
                 status = 500
                 resp = str(e)
         elif self.path.startswith('/metadata/incremental'):
             try:
-                TestHandler.syncer.sync_incremental(TestHandler.num_workers,
-                                                TestHandler.lock_timeout,
-                                                TestHandler.max_entries)
+                TestHandler.syncer.metadata_sync_incremental(TestHandler.num_workers,
+                                                             TestHandler.lock_timeout,
+                                                             TestHandler.max_entries)
             except Exception as e:
                 log.exception('error doing incremental sync')
                 status = 500
@@ -226,8 +231,15 @@ def main():
                           args.src_secret_key, args.src_zone)
     dest = client.Endpoint(args.dest_host, args.dest_port, args.dest_access_key,
                            args.dest_secret_key, args.dest_zone)
-    # TODO: check src and dest zone names and endpoints match the region map
-    syncer = sync.Syncer('metadata', src, dest, args.daemon_id)
+
+    if args.data:
+        # TODO: check src and dest zone names and endpoints match the region map
+        syncer = sync.Syncer('data', src, dest, args.daemon_id)
+             log.info('syncing data')
+    else:
+        # TODO: check src and dest zone names and endpoints match the region map
+        syncer = sync.Syncer('metadata', src, dest, args.daemon_id)
+             log.info('syncing metadata')
 
     if args.test_server_host:
         log.warn('TEST MODE - do not run unless you are testing this program')
index 90f22ffd6b40be4ba8126f6642a7590478017b92..f6d25a294ee2a293d6dae4eee4230520020c4e10 100644 (file)
@@ -93,7 +93,32 @@ def get_metadata_sections(connection):
     return request(connection, 'get', '/admin/metadata')
 
 def list_metadata_keys(connection, section):
-    return request(connection, 'get', '/admin/metadata/' + section)
+    return request(connection, 'get', '/admin/metadata/' + section) 
+
+def list_ops_for_client(connection, client_id, op_id):
+    return request(connection, 'get', '/admin/opstate/',
+                   params={
+                      'client_id':client_id,
+                      'op_id':op_id,
+                      }
+                   )
+
+def get_bucket_list(connection):
+    return list_metadata_keys(connection, 'bucket')
+
+def list_objects_in_bucket(connection, bucket_name):
+    # use the boto library to do this
+    bucket = connection.get_bucket(bucket_name)
+    return bucket.list()
+
+def sync_object_intra_region(connection, bucket_name, object_name, src_zone, client_id, op_id):
+    return request(connection, 'put', '/{bucket_name}/{object_name}'.format(bucket_name=bucket_name,object_name=object_name),
+                   params={
+                       'rgwx-source-zone': src_zone,
+                       'rgwx-client-id': client_id,
+                       'rgwx-op-id': op_id,
+                       },
+                   expect_json=False)
 
 def lock_shard(connection, lock_type, shard_num, zone_id, timeout, locker_id):
     return request(connection, 'post', '/admin/log',
@@ -128,6 +153,16 @@ def get_meta_log(connection, shard_num, marker, max_entries):
                        },
                    )
 
+def get_data_log(connection, shard_num, marker, max_entries):
+    return request(connection, 'get', '/admin/log',
+                   params={
+                       'type': 'data',
+                       'id': shard_num,
+                       'marker': marker,
+                       'max-entries': max_entries,
+                       },
+                   )
+
 def get_log_info(connection, log_type, shard_num):
     return request(
         connection, 'get', '/admin/log',
@@ -143,7 +178,7 @@ def num_log_shards(connection, shard_type):
     return out['num_objects']
 
 def set_worker_bound(connection, type_, shard_num, marker, timestamp,
-                     daemon_id):
+                     daemon_id, data=[]):
     return request(
         connection, 'post', '/admin/replica_log',
         params=dict(
@@ -153,7 +188,7 @@ def set_worker_bound(connection, type_, shard_num, marker, timestamp,
             time=timestamp,
             daemon_id=daemon_id,
             ),
-        data='[]',
+        data=json.dumps(data),
         special_first_param='work_bound',
         )
 
@@ -169,6 +204,16 @@ def del_worker_bound(connection, type_, shard_num, daemon_id):
         expect_json=False,
         )
 
+def get_worker_bound(connection, type_, shard_num):
+    return request(
+        connection, 'get', '/admin/replica_log',
+        params=dict(
+            type=type_,
+            id=shard_num,
+            ),
+        special_first_param='work_bound',
+        )
+
 def get_min_worker_bound(connection, type_, shard_num):
     out = request(
         connection, 'get', '/admin/replica_log',
index 98552411cd9feac350cd4f2da8ba14a0e084f614..025c32fcad9e33f6fd02e78cbb08ae1c6016a214 100644 (file)
@@ -1,4 +1,5 @@
 import datetime
+import hashlib
 import logging
 import multiprocessing
 
@@ -7,8 +8,11 @@ from radosgw_agent import client
 
 log = logging.getLogger(__name__)
 
-class Syncer:
+NEEDS_SYNC = 'NEEDSSYNC'
+SYNC_IN_PROGRESS = 'INPROGRESS'
+SYNC_COMPLETED = 'DONESYNC'
 
+class Syncer:
     def __init__(self, type_, src, dest, daemon_id):
         self._type = type_
         self.src = src
@@ -17,7 +21,10 @@ class Syncer:
         self.dest_conn = client.connection(dest)
         self.daemon_id = daemon_id
 
-    def sync_incremental(self, num_workers, log_lock_time, max_entries):
+    def data_sync_incremental(self, num_workers, log_lock_time, max_entries):
+        pass
+
+    def metadata_sync_incremental(self, num_workers, log_lock_time, max_entries):
         try:
             num_shards = client.num_log_shards(self.src_conn, self._type)
         except:
@@ -57,19 +64,122 @@ class Syncer:
 
         # pull the results out as they are produced
         errors = []
+        connection_errors = []
         for i in xrange(num_shards):
+            # if all processes error out, stop trying to process data
+            if len(connection_errors) == len(processes):
+                log.error('All {num_workers} incremental sync workers have failed.'
+                          ' Ceasing to process shards'.format(num_workers=len(processes)))
+                break
             result, shard_num = resultQueue.get()
             if result == worker.RESULT_SUCCESS:
                 log.debug('synced shard %d', shard_num)
             else:
-                log.error('error syncing shard %d', shard_num)
+                log.error('error on incremental sync of shard %d', shard_num)
                 errors.append(shard_num)
+            if result == worker.RESULT_CONNECTION_ERROR:
+                connection_errors.append(shard_num)
+
             log.info('%d/%d shards processed', i + 1, num_shards)
         if errors:
             log.error('Encountered  errors syncing these %d shards: %s',
                       len(errors), errors)
 
-    def sync_full(self, num_workers, log_lock_time):
+
+    def sync_incremental(self, num_workers, log_lock_time, max_entries):
+        if self._type == 'metadata':
+            self.metadata_sync_incremental(num_workers, log_lock_time, max_entries)
+        elif self._type == 'data':
+            self.data_sync_incremental(num_workers, log_lock_time, max_entries)
+        else:
+            raise Exception('Unknown _type in sync.py: {_type}'.format(_type=self._type))
+
+    def data_sync_full(self, num_workers, log_lock_time):
+        # TODO we need some sort of a lock here to make sure that only
+        # one client is getting a list of buckets to sync so that it's 
+        # consistent.
+
+        num_data_shards = client.num_log_shards(self.src_conn, 'data')
+        log.debug('There are {ns} data log shards'.format(ns=num_data_shards))
+
+        # get the set of all buckets and then add an entry to the data replica 
+        # log for each
+        buckets = client.get_bucket_list(self.src_conn) 
+        now = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%SZ")
+        shard_to_bucket_dict = dict()
+        for bucket_name in buckets:
+            bucket_hash = int(hashlib.md5(bucket_name).hexdigest(),16) % num_data_shards
+            if bucket_hash not in shard_to_bucket_dict:
+                shard_to_bucket_dict[bucket_hash] = list()
+            # TODO: the docs for set_worker_bound suggest that the data payload should be a list of dicts, with each dict
+            # haveing a 'bucket' and 'time' entry. I'm trying to stash the state of the bucket (NEEDSSYNC) and the 
+            # bucket name as the value. May need a different approach / delimiter
+            shard_to_bucket_dict[bucket_hash].append({
+                                                      'bucket': '{needssync}:{bucket_name}'.format(needssync=NEEDS_SYNC,bucket_name=bucket_name),
+                                                      'time':now
+                                                      })  
+        for shard_num, entries in shard_to_bucket_dict.items():
+            # this call returns a list of buckets that are "in progress" from before the time in the "now" variable
+            # TODO: sort out if this is the proper usage of set_worker_bound
+            replica_log_output = client.set_worker_bound(self.dest_conn, 'data', shard_num, 
+                'buckets_in_shard_{n}'.format(n=shard_num), 
+                now, self.daemon_id,in_data=entries
+                )
+            log.debug('jbuck, set replica log output:\n{data}'.format(data=replica_log_output))
+
+        # create the work and results Queue
+        workQueue = multiprocessing.Queue()
+        resultQueue = multiprocessing.Queue()
+
+        # create the worker processes
+        if self._type == 'data':
+            worker_cls = worker.DataWorkerFull
+        else:
+            worker_cls = worker.MetadataWorkerFull
+
+        processes = [worker_cls(workQueue, resultQueue, log_lock_time, self.src,
+                                self.dest, daemon_id=self.daemon_id) for i in xrange(num_workers)]
+        for process in processes:
+            process.daemon = True
+            process.start()
+
+        start_time = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%SZ")
+        log.info('Starting full data sync at %s', start_time)
+
+        # enqueue the shards to be synced. 
+        # the shards simply tell the worker tasks which data replica log shard to check
+        for shard_num in xrange(num_data_shards):
+            workQueue.put(shard_num)
+
+        # add a poison pill for each worker
+        for i in xrange(num_workers):
+            workQueue.put(None)
+
+        # pull the results out as they are produced
+        errors = []
+        connection_errors = []
+        for i in xrange(num_data_shards):
+            if len(connection_errors) == len(processes):
+                log.error('All {num_workers} full sync workers have failed.'
+                          ' Ceasing to process shards'.format(num_workers=len(processes)))
+                break
+            log.info('%d/%d shards synced, ', i, num_data_shards)
+            result, shard_num = resultQueue.get()
+            if result != worker.RESULT_SUCCESS:
+                log.error('error syncing shard %d', shard_num)
+                errors.append((shard_num))
+            else:
+                log.debug('synced shard %s', shard_num)
+            if result == worker.RESULT_CONNECTION_ERROR:
+                connection_errors.append(shard_num)
+
+        for process in processes:
+            process.join()
+        if errors:
+            log.error('Encountered  errors syncing these %d entries: %s',
+                      len(errors), errors)
+
+    def metadata_sync_full(self, num_workers, log_lock_time):
         try:
             sections = client.get_metadata_sections(self.src_conn)
         except client.HttpError as e:
@@ -127,14 +237,23 @@ class Syncer:
 
         # pull the results out as they are produced
         errors = []
+        connection_errors = []
         for i in xrange(len(meta_keys)):
+            # if all processes error out, stop trying to process data
+            if len(connection_errors) == len(processes):
+                log.error('All {num_workers} full sync workers have failed.'
+                          ' Ceasing to process shards'.format(num_workers=len(processes)))
+                break
+
             log.info('%d/%d items synced', i, len(meta_keys))
             result, section, name = resultQueue.get()
             if result != worker.RESULT_SUCCESS:
-                log.error('error syncing %s %r', section, name)
+                log.error('error on full sync of %s %r', section, name)
                 errors.append((section, name))
             else:
                 log.debug('synced %s %r', section, name)
+            if result == worker.RESULT_CONNECTION_ERROR:
+                connection_errors.append(shard_num)
         for process in processes:
             process.join()
         if errors:
@@ -146,3 +265,12 @@ class Syncer:
                                         marker, timestamp, self.daemon_id)
                 client.del_worker_bound(self.dest_conn, 'metadata', shard_num,
                                         self.daemon_id)
+
+    def sync_full(self, num_workers, log_lock_time):
+        if self._type == 'metadata':
+            self.metadata_sync_full(num_workers, log_lock_time)
+        elif self._type == 'data':
+            self.data_sync_full(num_workers, log_lock_time)
+        else:
+            raise Exception('Unknown _type in sync.py: {_type}'.format(_type=self._type))
+
index 7913e4b3b684b387f5fbaf4b6a3f1d3529cbf788..e6410f0b0b66aa2a5b36e81f6e77ab23dcca878d 100644 (file)
@@ -1,6 +1,8 @@
 from collections import namedtuple
+import hashlib
 import logging
 import multiprocessing
+import requests
 import os
 import socket
 
@@ -11,6 +13,8 @@ log = logging.getLogger(__name__)
 
 RESULT_SUCCESS = 0
 RESULT_ERROR = 1
+RESULT_CONNECTION_ERROR = 2
+MAX_CONCURRENT_OPS = 5
 
 class Worker(multiprocessing.Process):
     """sync worker to run in its own process"""
@@ -49,6 +53,166 @@ def _meta_entry_from_json(entry):
         entry['timestamp'],
         )
 
+class DataWorker(Worker):
+
+    def __init__(self, *args, **kwargs):
+        super(DataWorker, self).__init__(*args, **kwargs)
+        self.type = 'data'
+
+    def get_new_op_id(self):
+        self.op_id = self.op_id + 1
+        return self.op_id
+
+    def sync_object(self, connection, bucket, key, src_zone, client_id):
+        op_id = self.get_new_op_id()
+        client.sync_object_intra_region(connection, bucket, key, src_zone, client_id, op_id)
+        return op_id
+
+    # TODO
+    # use the op_id to keep track of on-going copies
+    def sync_data(self, bucket_name):
+        log.debug('syncing bucket {bucket}'.format(bucket=bucket_name))
+
+        objects = client.list_objects_in_bucket(self.source_conn, bucket_name)
+        counter = 0
+
+        # sync each object in the list.
+        # We only want to have X in flight at once, so use the op_id to track when 
+        # individual operations finish.  
+        inflight_ops = []
+        for key in objects:
+            counter = counter + 1
+            # sync each object
+            log.debug('syncing object {bucket}:{key}'.format(bucket=bucket_name,key=key))
+            op_id = self.sync_object(self.dest_conn, key.bucket.name, key.name, self.source_zone, self.daemon_id)
+            inflight_ops.append(op_id)
+            # Do not progress to the next key until there are less than the maximum
+            # number of syncs in-flight
+            #while(len(inflight_ops) == MAX_CONCURRENT_OPS):
+                # check each of the inflight ops. If they've been completed, remove them from the in-flight list
+                #for op_id in inflight_ops:
+
+        # Once all the copy commands are issued, track op_ids until they're all done
+        for op_id in inflight_ops:
+            op_ids = client.list_ops_for_client(self.dest_conn, self.daemon_id, op_id)
+            log.debug('jbuck, op_ids: {op_ids}'.format(op_ids=op_ids))
+
+        log.debug('bucket {bucket} has {num_objects} object'.format(
+                  bucket=bucket_name,num_objects=counter))
+
+class DataWorkerIncremental(DataWorker):
+
+    def __init__(self, *args, **kwargs):
+        self.daemon_id = kwargs['daemon_id']
+        self.max_entries = kwargs['max_entries']
+        self.op_id = 0
+        super(DataWorkerIncremental, self).__init__(*args, **kwargs)
+
+    def get_and_process_entries(self, marker, shard_num):
+        pass
+
+    def _get_and_process_entries(self, marker, shard_num):
+        pass
+
+    def run(self):
+        pass
+
+class DataWorkerFull(DataWorker):
+
+    def __init__(self, *args, **kwargs):
+        self.daemon_id = kwargs['daemon_id']
+        self.op_id = 0
+        super(DataWorkerFull, self).__init__(*args, **kwargs)
+
+    def run(self):
+        self.prepare_lock()
+        num_data_shards = client.num_log_shards(self.source_conn, 'data')
+
+        while True:
+            shard_num = self.work_queue.get()
+            if shard_num is None:
+                log.info('No more entries in queue, exiting')
+                break
+
+            log.info('%s is processing shard %d', self.ident, shard_num)
+
+            # lock the log
+            try:
+                self.lock.set_shard(shard_num)
+                self.lock.acquire()
+            except client.NotFound:
+                self.lock.unset_shard()
+                self.result_queue.put((RESULT_SUCCESS, shard_num))
+                continue
+            except client.HttpError as e:
+                log.info('error locking shard %d log, assuming'
+                         ' it was processed by someone else and skipping: %s',
+                         shard_num, e)
+                self.lock.unset_shard()
+                self.result_queue.put((RESULT_ERROR, shard_num))
+                continue
+
+            # set a marker in the replica log 
+            worker_bound_info = None
+            buckets_to_sync = []
+            try:
+                worker_bound_info = client.get_worker_bound(self.dest_conn, 'data', shard_num)
+                log.debug('data full sync shard {i} data log is {data}'.format(i=shard_num,data=worker_bound_info))
+                # determine whether there's a marker string with NEEDSSYNC and with 
+                # our daemon_id. If so, sync it the bucket(s) that match
+            except Exception as e:
+                log.exception('could not set worker bound for shard_num %d: %s',
+                              shard_num, e)
+                self.lock.unset_shard()
+                self.result_queue.put((RESULT_ERROR, shard_num))
+                continue
+
+            # set the default result
+            result = RESULT_SUCCESS
+
+
+            log_bucket_name = ""
+#            try:
+#               for bucket_name in buckets_to_sync:
+#                   log.info('bucket %s is processed as part of shard %d', bucket_name, shard_num)
+#                   self.sync_data(bucket_name)
+#
+#                result = RESULT_SUCCESS
+#            except Exception as e:
+#                log.exception('could not sync shard_num %d failed on bucket %s: %s',
+#                              shard_num, log_bucket_name, e)
+#                self.lock.unset_shard()
+#                self.result_queue.put((RESULT_ERROR, shard_num))
+#                continue
+#
+#            # TODO this may need to do a set the omits the synced buckets and then delete the entries for the 
+#            # synced buckets? The replica log usage is still pretty ill-defined
+#            # remove the data replica log entry. If we have gotten to this part, all 
+#            # the pertinent info should be valid.
+#            try:
+#                client.del_worker_bound(self.dest_conn, 'data', shard_num,
+#                                        self.daemon_id)
+#            except Exception as e:
+#                log.exception('could not delete worker bound for shard_num %d: %s',
+#                              shard_num, e)
+#                self.lock.unset_shard()
+#                self.result_queue.put((RESULT_ERROR, shard_num))
+#                continue
+
+            # TODO
+            # update the bucket index log (trim it)
+
+            # finally, unlock the log
+            try:
+                self.lock.release_and_clear()
+            except lock.LockBroken as e:
+                log.warn('work may be duplicated: %s', e)
+            except:
+                log.exception('error unlocking data log, continuing anyway '
+                              'since lock will timeout')
+
+            self.result_queue.put((result, shard_num))
+
 class MetadataWorker(Worker):
 
     def __init__(self, *args, **kwargs):
@@ -155,12 +319,19 @@ class MetadataWorkerIncremental(MetadataWorker):
                 self.result_queue.put((RESULT_SUCCESS, shard_num))
                 continue
             except client.HttpError as e:
-                log.info('error locking shard %d log, assuming'
+                log.exception('error locking shard %d log, assuming'
                          ' it was processed by someone else and skipping: %s',
                          shard_num, e)
                 self.lock.unset_shard()
                 self.result_queue.put((RESULT_ERROR, shard_num))
                 continue
+            except requests.exceptions.ConnectionError as e:
+                log.exception('ConnectionError encountered. Bailing out of'
+                              ' processing loop for shard %d. %s', 
+                              shard_num, e)
+                self.lock.unset_shard()
+                self.result_queue.put((RESULT_CONNECTION_ERROR, shard_num))
+                break
 
             result = RESULT_SUCCESS
             try:
@@ -172,6 +343,13 @@ class MetadataWorkerIncremental(MetadataWorker):
             except client.NotFound:
                 # if no worker bounds have been set, start from the beginning
                 marker, time = '', '1970-01-01 00:00:00'
+            except requests.exceptions.ConnectionError as e:
+                log.exception('ConnectionError encountered. Bailing out of'
+                              ' processing loop for shard %d. %s', 
+                              shard_num, e)
+                self.lock.unset_shard()
+                self.result_queue.put((RESULT_CONNECTION_ERROR, shard_num))
+                break
             except Exception as e:
                 log.exception('error getting worker bound for shard %d',
                               shard_num)
@@ -180,6 +358,13 @@ class MetadataWorkerIncremental(MetadataWorker):
             try:
                 if result == RESULT_SUCCESS:
                     self.get_and_process_entries(marker, shard_num)
+            except requests.exceptions.ConnectionError as e:
+                log.exception('ConnectionError encountered. Bailing out of'
+                              ' processing loop for shard %d. %s', 
+                              shard_num, e)
+                self.lock.unset_shard()
+                self.result_queue.put((RESULT_CONNECTION_ERROR, shard_num))
+                break
             except:
                 log.exception('syncing entries from %s for shard %d failed',
                               marker, shard_num)