]> git-server-git.apps.pok.os.sepia.ceph.com Git - radosgw-agent.git/commitdiff
Refactor sync and fix errors found by pyflakes
authorJosh Durgin <josh.durgin@inktank.com>
Wed, 11 Sep 2013 01:17:44 +0000 (18:17 -0700)
committerJosh Durgin <josh.durgin@inktank.com>
Wed, 11 Sep 2013 01:17:48 +0000 (18:17 -0700)
Instead of 4 copies of queue and worker creation
and results gathering, just have one, with subclasses
that can add special behavior at the beginning or end.

Signed-off-by: Josh Durgin <josh.durgin@inktank.com>
radosgw_agent/cli.py
radosgw_agent/sync.py
radosgw_agent/worker.py

index f0a76c4fdc8b04972a0057a788ae427e3a1abb32..cdb8de07594deb0eaeec4b98ac0f0486803f47fe 100644 (file)
@@ -235,11 +235,11 @@ def main():
     if args.data:
         # TODO: check src and dest zone names and endpoints match the region map
         syncer = sync.Syncer('data', src, dest, args.daemon_id)
-             log.info('syncing data')
+        log.info('syncing data')
     else:
         # TODO: check src and dest zone names and endpoints match the region map
         syncer = sync.Syncer('metadata', src, dest, args.daemon_id)
-             log.info('syncing metadata')
+        log.info('syncing metadata')
 
     if args.test_server_host:
         log.warn('TEST MODE - do not run unless you are testing this program')
index e7940734026c3dac15a2d744ba283d59826a9141..2f9714d7c88828ef22a1b6088e4da846ecd4c30e 100644 (file)
@@ -12,51 +12,65 @@ NEEDS_SYNC = 'NEEDSSYNC'
 SYNC_IN_PROGRESS = 'INPROGRESS'
 SYNC_COMPLETED = 'DONESYNC'
 
-class Syncer:
+class Syncer(object):
     def __init__(self, type_, src, dest, daemon_id):
-        self._type = type_
+        self.type = type_
         self.src = src
         self.dest = dest
         self.src_conn = client.connection(src)
         self.dest_conn = client.connection(dest)
         self.daemon_id = daemon_id
+        self.worker_cls = None # filled in by subclass constructor
 
-    def data_sync_incremental(self, num_workers, log_lock_time, max_entries):
-        pass
-
-    def metadata_sync_incremental(self, num_workers, log_lock_time, max_entries):
+    def get_num_shards(self):
         try:
-            num_shards = client.num_log_shards(self.src_conn, self._type)
-        except:
+            num_shards = client.num_log_shards(self.src_conn, self.type)
+            log.debug('%d shards to check', num_shards)
+            return num_shards
+        except Exception:
             log.exception('finding number of shards failed')
             raise
-        log.debug('We have %d shards to check', num_shards)
 
-        # create the work and results Queue
+    def prepare(self):
+        """Setup any state required before syncing starts"""
+        pass
+
+    def generate_work(self):
+        """Generate items to be place in a queue or processing"""
+        pass
+
+    def complete(self):
+        """Called when syncing completes successfully"""
+        pass
+
+    def get_worker_cls(self):
+        """Return the subclass of Worker to run"""
+        pass
+
+    def sync(self, num_workers, log_lock_time, max_entries=None):
+        self.prepare()
+
         workQueue = multiprocessing.Queue()
         resultQueue = multiprocessing.Queue()
 
-        # create the worker processes
-        if self._type == 'data':
-            worker_cls = worker.DataWorkerIncremental
-        else:
-            worker_cls = worker.MetadataWorkerIncremental
-        processes = [worker_cls(workQueue,
-                                resultQueue,
-                                log_lock_time,
-                                self.src,
-                                self.dest,
-                                daemon_id=self.daemon_id,
-                                max_entries=max_entries)
+        processes = [self.worker_cls(workQueue,
+                                     resultQueue,
+                                     log_lock_time,
+                                     self.src,
+                                     self.dest,
+                                     daemon_id=self.daemon_id,
+                                     max_entries=max_entries)
                      for i in xrange(num_workers)]
         for process in processes:
             process.daemon = True
             process.start()
 
-        log.info('Starting incremental sync')
+        log.info('Starting sync')
         # enqueue the shards to be synced
-        for i in xrange(num_shards):
-            workQueue.put(i)
+        num_items = 0
+        for item in self.generate_work():
+            num_items += 1
+            workQueue.put(item)
 
         # add a poison pill for each worker
         for i in xrange(num_workers):
@@ -64,35 +78,44 @@ class Syncer:
 
         # pull the results out as they are produced
         errors = []
-        for i in xrange(num_shards):
-            result, shard_num = resultQueue.get()
+        for i in xrange(num_items):
+            result, item = resultQueue.get()
             if result == worker.RESULT_SUCCESS:
-                log.debug('synced shard %d', shard_num)
+                log.debug('synced item %r', item)
             else:
-                log.error('error on incremental sync of shard %d', shard_num)
-                errors.append(shard_num)
+                log.error('error syncing item %r', item)
+                errors.append(item)
 
-            log.info('%d/%d shards processed', i + 1, num_shards)
+            log.info('%d/%d items processed', i + 1, num_items)
         if errors:
-            log.error('Encountered  errors syncing these %d shards: %s',
+            log.error('Encountered errors syncing these %d items: %r',
                       len(errors), errors)
+        else:
+            self.complete()
 
 
-    def sync_incremental(self, num_workers, log_lock_time, max_entries):
-        if self._type == 'metadata':
-            self.metadata_sync_incremental(num_workers, log_lock_time, max_entries)
-        elif self._type == 'data':
-            self.data_sync_incremental(num_workers, log_lock_time, max_entries)
-        else:
-            raise Exception('Unknown _type in sync.py: {_type}'.format(_type=self._type))
+class MetaSyncerInc(Syncer):
+
+    def __init__(self, *args, **kwargs):
+        super(MetaSyncerInc, self).__init__(*args, **kwargs)
+        self.worker_cls = worker.MetadataWorkerIncremental
+
+    def generate_work(self):
+        return xrange(self.get_num_shards())
 
-    def data_sync_full(self, num_workers, log_lock_time):
+
+class DataSyncerFull(Syncer):
+
+    def __init__(self, *args, **kwargs):
+        super(DataSyncerFull, self).__init__(*args, **kwargs)
+        self.worker_cls = worker.DataWorkerFull
+
+    def prepare(self):
         # TODO we need some sort of a lock here to make sure that only
         # one client is getting a list of buckets to sync so that it's 
         # consistent.
 
-        num_data_shards = client.num_log_shards(self.src_conn, 'data')
-        log.debug('There are {ns} data log shards'.format(ns=num_data_shards))
+        num_data_shards = self.get_num_shards()
 
         # get the set of all buckets and then add an entry to the data replica 
         # log for each
@@ -119,73 +142,41 @@ class Syncer:
                 )
             log.debug('jbuck, set replica log output:\n{data}'.format(data=replica_log_output))
 
-        # create the work and results Queue
-        workQueue = multiprocessing.Queue()
-        resultQueue = multiprocessing.Queue()
-
-        # create the worker processes
-        if self._type == 'data':
-            worker_cls = worker.DataWorkerFull
-        else:
-            worker_cls = worker.MetadataWorkerFull
-
-        processes = [worker_cls(workQueue, resultQueue, log_lock_time, self.src,
-                                self.dest, daemon_id=self.daemon_id) for i in xrange(num_workers)]
-        for process in processes:
-            process.daemon = True
-            process.start()
-
-        start_time = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%SZ")
-        log.info('Starting full data sync at %s', start_time)
+    def generate_work(self):
+        return xrange(self.get_num_shards())
 
-        # enqueue the shards to be synced. 
-        # the shards simply tell the worker tasks which data replica log shard to check
-        for shard_num in xrange(num_data_shards):
-            workQueue.put(shard_num)
-
-        # add a poison pill for each worker
-        for i in xrange(num_workers):
-            workQueue.put(None)
+    def complete(self):
+        # TODO: set replica log
+        pass
 
-        # pull the results out as they are produced
-        errors = []
-        for i in xrange(num_data_shards):
-            log.info('%d/%d shards synced, ', i, num_data_shards)
-            result, shard_num = resultQueue.get()
-            if result != worker.RESULT_SUCCESS:
-                log.error('error syncing shard %d', shard_num)
-                errors.append((shard_num))
-            else:
-                log.debug('synced shard %s', shard_num)
 
-        for process in processes:
-            process.join()
-        if errors:
-            log.error('Encountered  errors syncing these %d entries: %s',
-                      len(errors), errors)
+class MetaSyncerFull(Syncer):
+    def __init__(self, *args, **kwargs):
+        super(MetaSyncerInc, self).__init__(*args, **kwargs)
+        self.worker_cls = worker.MetadataWorkerFull
 
-    def metadata_sync_full(self, num_workers, log_lock_time):
+    def prepare(self):
         try:
-            sections = client.get_metadata_sections(self.src_conn)
+            self.sections = client.get_metadata_sections(self.src_conn)
         except client.HttpError as e:
             log.error('Error listing metadata sections: %s', e)
             raise
 
         # grab the lastest shard markers and timestamps before we sync
-        shard_info = []
-        num_shards = client.num_log_shards(self.src_conn, 'metadata')
+        self.shard_info = []
+        num_shards = self.get_num_shards()
         for shard_num in xrange(num_shards):
             info = client.get_log_info(self.src_conn, 'metadata', shard_num)
             # setting an empty marker returns an error
             if info['marker']:
-                shard_info.append((shard_num, info['marker'],
-                                   info['last_update']))
+                self.shard_info.append((shard_num, info['marker'],
+                                        info['last_update']))
 
-        meta_keys = []
-        for section in sections:
+    def generate_work(self):
+        for section in self.sections:
             try:
-                meta_keys += [(section, key) for key in
-                              client.list_metadata_keys(self.src_conn, section)]
+                yield [(section, key) for key in
+                       client.list_metadata_keys(self.src_conn, section)]
             except client.NotFound:
                 # no keys of this type exist
                 continue
@@ -194,59 +185,9 @@ class Syncer:
                           section, e)
                 raise
 
-        # create the work and results Queue
-        workQueue = multiprocessing.Queue()
-        resultQueue = multiprocessing.Queue()
-
-        # create the worker processes
-        if self._type == 'data':
-            worker_cls = worker.DataWorkerFull
-        else:
-            worker_cls = worker.MetadataWorkerFull
-        processes = [worker_cls(workQueue, resultQueue, log_lock_time, self.src,
-                                self.dest) for i in xrange(num_workers)]
-        for process in processes:
-            process.daemon = True
-            process.start()
-
-        start_time = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%SZ")
-        log.info('Starting full sync at %s', start_time)
-
-        # enqueue the shards to be synced
-        for meta in meta_keys:
-            workQueue.put(meta)
-
-        # add a poison pill for each worker
-        for i in xrange(num_workers):
-            workQueue.put(None)
-
-        # pull the results out as they are produced
-        errors = []
-        for i in xrange(len(meta_keys)):
-            log.info('%d/%d items synced', i, len(meta_keys))
-            result, section, name = resultQueue.get()
-            if result != worker.RESULT_SUCCESS:
-                log.error('error on full sync of %s %r', section, name)
-                errors.append((section, name))
-            else:
-                log.debug('synced %s %r', section, name)
-        for process in processes:
-            process.join()
-        if errors:
-            log.error('Encountered  errors syncing these %d entries: %s',
-                      len(errors), errors)
-        else:
-            for shard_num, marker, timestamp in shard_info:
-                client.set_worker_bound(self.dest_conn, 'metadata', shard_num,
-                                        marker, timestamp, self.daemon_id)
-                client.del_worker_bound(self.dest_conn, 'metadata', shard_num,
-                                        self.daemon_id)
-
-    def sync_full(self, num_workers, log_lock_time):
-        if self._type == 'metadata':
-            self.metadata_sync_full(num_workers, log_lock_time)
-        elif self._type == 'data':
-            self.data_sync_full(num_workers, log_lock_time)
-        else:
-            raise Exception('Unknown _type in sync.py: {_type}'.format(_type=self._type))
-
+    def complete(self):
+        for shard_num, marker, timestamp in self.shard_info:
+            client.set_worker_bound(self.dest_conn, 'metadata', shard_num,
+                                    marker, timestamp, self.daemon_id)
+            client.del_worker_bound(self.dest_conn, 'metadata', shard_num,
+                                    self.daemon_id)
index e6410f0b0b66aa2a5b36e81f6e77ab23dcca878d..318f5bbef3601774d4dc87629d3adc7cfa83bd08 100644 (file)
@@ -1,5 +1,4 @@
 from collections import namedtuple
-import hashlib
 import logging
 import multiprocessing
 import requests
@@ -399,4 +398,4 @@ class MetadataWorkerFull(MetadataWorker):
                 log.exception('could not sync entry %s "%s": %s',
                               section, name, e)
                 result = RESULT_ERROR
-            self.result_queue.put((result, section, name))
+            self.result_queue.put((result, (section, name)))