]> git-server-git.apps.pok.os.sepia.ceph.com Git - radosgw-agent.git/commitdiff
Set up full data sync
authorJosh Durgin <josh.durgin@inktank.com>
Fri, 13 Sep 2013 20:01:34 +0000 (13:01 -0700)
committerJosh Durgin <josh.durgin@inktank.com>
Fri, 13 Sep 2013 20:01:59 +0000 (13:01 -0700)
Get data log markers and store them after all buckets are synced. Also
get bucket-index markers for each bucket, and update them in the
bucket index replica log once syncing is complete. The bucket-index
replica log updates should be moved to happend after each bucket once
the actual object copying is implemented.

Signed-off-by: Josh Durgin <josh.durgin@inktank.com>
radosgw_agent/client.py
radosgw_agent/sync.py
radosgw_agent/worker.py

index e9c47568199b6c8fc02ca0f90be71af3babda2aa..8254e75f775c793e39f8d421ca6c8699a06b1957 100644 (file)
@@ -152,20 +152,10 @@ def unlock_shard(connection, lock_type, shard_num, zone_id, locker_id):
                    special_first_param='unlock',
                    expect_json=False)
 
-def get_meta_log(connection, shard_num, marker, max_entries):
+def get_log(connection, log_type, shard_num, marker, max_entries):
     return request(connection, 'get', '/admin/log',
                    params={
-                       'type': 'metadata',
-                       'id': shard_num,
-                       'marker': marker,
-                       'max-entries': max_entries,
-                       },
-                   )
-
-def get_data_log(connection, shard_num, marker, max_entries):
-    return request(connection, 'get', '/admin/log',
-                   params={
-                       'type': 'data',
+                       'type': log_type,
                        'id': shard_num,
                        'marker': marker,
                        'max-entries': max_entries,
@@ -182,21 +172,44 @@ def get_log_info(connection, log_type, shard_num):
         special_first_param='info',
         )
 
+
+def get_bucket_index_marker(connection, bucket_instance):
+    out = request(
+        connection, 'get', '/admin/log',
+        params={
+            'type': 'bucket-index',
+            'bucket-instance': bucket_instance,
+            },
+        special_first_param='info',
+        )
+    return out['max_marker']
+
 def num_log_shards(connection, shard_type):
     out = request(connection, 'get', '/admin/log', dict(type=shard_type))
     return out['num_objects']
 
-def set_worker_bound(connection, type_, shard_num, marker, timestamp,
-                     daemon_id, data=[]):
+def set_worker_bound(connection, type_, marker, timestamp,
+                     daemon_id, data=None, shard_num=None,
+                     bucket_instance=None):
+    if data is None:
+        data = []
+
+    if type_ == 'bucket-index':
+        key = 'bucket-index'
+        value = bucket_instance
+    else:
+        key = 'id'
+        value = shard_num
+
     return request(
         connection, 'post', '/admin/replica_log',
-        params=dict(
-            type=type_,
-            id=shard_num,
-            marker=marker,
-            time=timestamp,
-            daemon_id=daemon_id,
-            ),
+        params={
+            'type': type_,
+            key: value,
+            'marker': marker,
+            'time': timestamp,
+            'daemon_id': daemon_id,
+            },
         data=json.dumps(data),
         special_first_param='work_bound',
         )
index 2f9714d7c88828ef22a1b6088e4da846ecd4c30e..9e5a50440f9859ef0a1b04c7a31d50aa4433e6ca 100644 (file)
@@ -1,5 +1,4 @@
 import datetime
-import hashlib
 import logging
 import multiprocessing
 
@@ -110,44 +109,61 @@ class DataSyncerFull(Syncer):
         super(DataSyncerFull, self).__init__(*args, **kwargs)
         self.worker_cls = worker.DataWorkerFull
 
+    def shard_num_for_bucket(self, bucket_name, num_shards):
+        bucket_name = bucket_name.encode('utf8')
+        hash_val = 0
+        for char in bucket_name:
+            c = ord(char)
+            hash_val = (hash_val + (c << 4) + (c >> 4)) * 11;
+        return hash_val % num_shards;
+
+    def get_bucket_instance(self, bucket_name):
+        metadata = client.get_metadata(self.src_conn, 'bucket', bucket_name)
+        return bucket_name + ':' + metadata['data']['bucket']['bucket_id']
+
     def prepare(self):
         # TODO we need some sort of a lock here to make sure that only
         # one client is getting a list of buckets to sync so that it's 
         # consistent.
 
-        num_data_shards = self.get_num_shards()
+        self.num_shards = self.get_num_shards()
+
+        # get list of buckets before getting any markers to avoid inconsistency
+        buckets = client.get_bucket_list(self.src_conn)
+
+        # save data log markers for each shard
+        self.shard_info = []
+        for shard in xrange(self.num_shards):
+            info = client.get_log_info(self.src_conn, 'data', shard)
+            # setting an empty marker returns an error
+            if info['marker']:
+                self.shard_info.append((shard, info['marker'],
+                                        info['last_update']))
 
-        # get the set of all buckets and then add an entry to the data replica 
-        # log for each
-        buckets = client.get_bucket_list(self.src_conn) 
+        # bucket index info doesn't include a timestamp, so just use
+        # local time since it isn't important for correctness
         now = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%SZ")
-        shard_to_bucket_dict = dict()
+        self.bucket_info = []
         for bucket_name in buckets:
-            bucket_hash = int(hashlib.md5(bucket_name).hexdigest(),16) % num_data_shards
-            if bucket_hash not in shard_to_bucket_dict:
-                shard_to_bucket_dict[bucket_hash] = list()
-            # TODO: the docs for set_worker_bound suggest that the data payload should be a list of dicts, with each dict
-            # haveing a 'bucket' and 'time' entry. I'm trying to stash the state of the bucket (NEEDSSYNC) and the 
-            # bucket name as the value. May need a different approach / delimiter
-            shard_to_bucket_dict[bucket_hash].append({
-                                                      'bucket': '{needssync}:{bucket_name}'.format(needssync=NEEDS_SYNC,bucket_name=bucket_name),
-                                                      'time':now
-                                                      })  
-        for shard_num, entries in shard_to_bucket_dict.items():
-            # this call returns a list of buckets that are "in progress" from before the time in the "now" variable
-            # TODO: sort out if this is the proper usage of set_worker_bound
-            replica_log_output = client.set_worker_bound(self.dest_conn, 'data', shard_num, 
-                'buckets_in_shard_{n}'.format(n=shard_num), 
-                now, self.daemon_id,in_data=entries
-                )
-            log.debug('jbuck, set replica log output:\n{data}'.format(data=replica_log_output))
+            instance = self.get_bucket_instance(bucket_name)
+            marker = client.get_bucket_index_marker('bucket-instance', instance)
+            self.bucket_info.append((instance, marker, now))
 
     def generate_work(self):
-        return xrange(self.get_num_shards())
+        return self.bucket_info
 
     def complete(self):
-        # TODO: set replica log
-        pass
+        for shard_num, marker, time in self.shard_info:
+            out = client.set_worker_bound(self.dest_conn, 'data', marker,
+                                          time, self.daemon_id,
+                                          shard_num=shard_num)
+            log.debug('jbuck, set replica log output:\n%s', out)
+
+        for bucket_instance, marker, time in self.bucket_info:
+            out = client.set_worker_bound(self.dest_conn, 'bucket-index', marker,
+                                          time, self.daemon_id,
+                                          bucket_instance=bucket_instance)
+            log.debug('set_worker_bound on bucket index replica log returned:\n%s', out)
 
 
 class MetaSyncerFull(Syncer):
index 318f5bbef3601774d4dc87629d3adc7cfa83bd08..46fd89d0bb01a873f2fd7603c1e56bb472b850dc 100644 (file)
@@ -155,7 +155,7 @@ class DataWorkerFull(DataWorker):
             worker_bound_info = None
             buckets_to_sync = []
             try:
-                worker_bound_info = client.get_worker_bound(self.dest_conn, 'data', shard_num)
+                worker_bound_info = client.get_worker_bound(self.dest_conn, 'data', shard_num=shard_num)
                 log.debug('data full sync shard {i} data log is {data}'.format(i=shard_num,data=worker_bound_info))
                 # determine whether there's a marker string with NEEDSSYNC and with 
                 # our daemon_id. If so, sync it the bucket(s) that match
@@ -261,8 +261,8 @@ class MetadataWorkerIncremental(MetadataWorker):
         sync up to self.max_entries entries, returning number of entries
         processed and the last marker of the entries processed.
         """
-        log_entries = client.get_meta_log(self.source_conn, shard_num,
-                                          marker, self.max_entries)
+        log_entries = client.get_log(self.source_conn, 'metadata', shard_num,
+                                     marker, self.max_entries)
 
         log.info('shard %d has %d entries after %r', shard_num, len(log_entries),
                  marker)
@@ -284,9 +284,10 @@ class MetadataWorkerIncremental(MetadataWorker):
         if entries and not error_encountered:
             try:
                 client.set_worker_bound(self.dest_conn, 'metadata',
-                                        shard_num, entries[-1].marker,
+                                        entries[-1].marker,
                                         entries[-1].timestamp,
-                                        self.daemon_id)
+                                        self.daemon_id,
+                                        shard_num=shard_num)
                 return len(entries), entries[-1].marker
             except:
                 log.exception('error setting worker bound for shard {shard_num},'
@@ -336,7 +337,7 @@ class MetadataWorkerIncremental(MetadataWorker):
             try:
                 marker, time = client.get_min_worker_bound(self.dest_conn,
                                                            'metadata',
-                                                           shard_num)
+                                                           shard_num=shard_num)
                 log.debug('oldest marker and time for shard %d are: %r %r',
                           shard_num, marker, time)
             except client.NotFound: