]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: move top level data sync function into coroutine
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 2 Oct 2015 20:20:47 +0000 (13:20 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:12:56 +0000 (16:12 -0800)
So that it can be called from a coroutine

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h

index d7362b7fc0ffb24b0787a6fced218611b509b97b..c1493366e17093a0b2399d075e6e3fe2d6be5389 100644 (file)
@@ -23,7 +23,7 @@
 
 #define dout_subsys ceph_subsys_rgw
 
-static string datalog_sync_status_oid = "datalog.sync-status";
+static string datalog_sync_status_oid_prefix = "datalog.sync-status";
 static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
 static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
 static string bucket_status_oid_prefix = "bucket.sync-status";
@@ -73,7 +73,7 @@ public:
                      RGWObjectCtx& _obj_ctx, const string& _source_zone,
                      rgw_data_sync_status *_status) : RGWSimpleRadosReadCR(_async_rados, _store, _obj_ctx,
                                                                            _store->get_zone_params().log_pool,
-                                                                           datalog_sync_status_oid,
+                                                                           RGWDataSyncStatusManager::sync_status_oid(_source_zone),
                                                                            &_status->sync_info),
                                                                             async_rados(_async_rados), store(_store),
                                                                             obj_ctx(_obj_ctx), source_zone(_source_zone),
@@ -162,6 +162,8 @@ class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
   RGWObjectCtx& obj_ctx;
   string source_zone;
 
+  string sync_status_oid;
+
   string lock_name;
   string cookie;
   rgw_data_sync_info status;
@@ -179,6 +181,8 @@ public:
 
     gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
     string cookie = buf;
+
+    sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(source_zone);
   }
 
   int operate() {
@@ -186,23 +190,23 @@ public:
     reenter(this) {
       yield {
        uint32_t lock_duration = 30;
-       call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, datalog_sync_status_oid,
+       call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
                                     lock_name, cookie, lock_duration));
        if (retcode < 0) {
-         ldout(cct, 0) << "ERROR: failed to take a lock on " << datalog_sync_status_oid << dendl;
+         ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
          return set_state(RGWCoroutine_Error, retcode);
        }
       }
       yield {
         call(new RGWSimpleRadosWriteCR<rgw_data_sync_info>(async_rados, store, store->get_zone_params().log_pool,
-                                datalog_sync_status_oid, status));
+                                sync_status_oid, status));
       }
       yield { /* take lock again, we just recreated the object */
        uint32_t lock_duration = 30;
-       call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, datalog_sync_status_oid,
+       call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
                                     lock_name, cookie, lock_duration));
        if (retcode < 0) {
-         ldout(cct, 0) << "ERROR: failed to take a lock on " << datalog_sync_status_oid << dendl;
+         ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
          return set_state(RGWCoroutine_Error, retcode);
        }
       }
@@ -229,10 +233,10 @@ public:
       yield {
        status.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
         call(new RGWSimpleRadosWriteCR<rgw_data_sync_info>(async_rados, store, store->get_zone_params().log_pool,
-                                datalog_sync_status_oid, status));
+                                sync_status_oid, status));
       }
       yield { /* unlock */
-       call(new RGWSimpleRadosUnlockCR(async_rados, store, store->get_zone_params().log_pool, datalog_sync_status_oid,
+       call(new RGWSimpleRadosUnlockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
                                     lock_name, cookie));
       }
       while (collect(&ret)) {
@@ -367,12 +371,6 @@ int RGWRemoteDataLog::init_sync_status(int num_shards)
   return run(new RGWInitDataSyncStatusCoroutine(async_rados, store, &http_manager, obj_ctx, source_zone, num_shards));
 }
 
-int RGWRemoteDataLog::set_sync_info(const rgw_data_sync_info& sync_info)
-{
-  return run(new RGWSimpleRadosWriteCR<rgw_data_sync_info>(async_rados, store, store->get_zone_params().log_pool,
-                                datalog_sync_status_oid, sync_info));
-}
-
 static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id)
 {
   char buf[datalog_sync_full_sync_index_prefix.size() + 16];
@@ -706,8 +704,6 @@ public:
   }
 
   int full_sync() {
-    int ret;
-
 #define OMAP_GET_MAX_ENTRIES 100
     int max_entries = OMAP_GET_MAX_ENTRIES;
     reenter(&full_cr) {
@@ -791,6 +787,120 @@ public:
     /* TODO */
     return 0;
 #endif
+    return 0;
+  }
+};
+
+class RGWDataSyncCR : public RGWCoroutine {
+  RGWRados *store;
+  RGWHTTPManager *http_manager;
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRESTConn *conn;
+  string source_zone;
+
+  RGWObjectCtx obj_ctx;
+
+  rgw_data_sync_status sync_status;
+
+  RGWDataSyncShardMarkerTrack *marker_tracker;
+
+
+public:
+  RGWDataSyncCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+                RGWRESTConn *_conn, rgw_bucket& _pool, const string& _source_zone) : RGWCoroutine(_store->ctx()), store(_store),
+                                                      http_manager(_mgr),
+                                                     async_rados(_async_rados),
+                                                      conn(_conn),
+                                                      source_zone(_source_zone),
+                                                      obj_ctx(store),
+                                                      marker_tracker(NULL) {
+  }
+
+  int operate() {
+    reenter(this) {
+      int r;
+
+      yield {
+        /* read sync status */
+        r = call(new RGWReadDataSyncStatusCoroutine(async_rados, store, obj_ctx, source_zone, &sync_status));
+        if (r < 0) {
+          ldout(store->ctx(), 0) << "ERROR: failed to call RGWReadDataSyncStatusCoroutine r=" << r << dendl;
+          return set_state(RGWCoroutine_Error, r);
+        }
+      }
+
+      if (retcode < 0) {
+        ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status, retcode=" << retcode << dendl;
+        return set_state(RGWCoroutine_Error, retcode);
+      }
+
+      yield {
+        /* state: init status */
+        if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
+          ldout(store->ctx(), 20) << __func__ << "(): init" << dendl;
+          r = call(new RGWInitDataSyncStatusCoroutine(async_rados, store, http_manager, obj_ctx, source_zone, sync_status.sync_info.num_shards));
+          if (r < 0) {
+            ldout(store->ctx(), 0) << "ERROR: failed to call RGWReadDataSyncStatusCoroutine r=" << r << dendl;
+            return  set_state(RGWCoroutine_Error, r);
+          }
+          sync_status.sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
+          /* update new state */
+          yield {
+            r = call(set_sync_info_cr());
+            if (r < 0) {
+              ldout(store->ctx(), 0) << "ERROR: failed to write sync status" << dendl;
+              return r;
+            }
+          }
+        }
+      }
+
+      if (retcode < 0) {
+        ldout(store->ctx(), 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
+        return set_state(RGWCoroutine_Error, retcode);
+      }
+
+      if  ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
+        /* state: building full sync maps */
+        yield {
+          ldout(store->ctx(), 20) << __func__ << "(): building full sync maps" << dendl;
+          r = call(new RGWListBucketIndexesCR(store, http_manager, async_rados, conn, source_zone, sync_status.sync_info.num_shards));
+          if (r < 0) {
+            ldout(store->ctx(), 0) << "ERROR: failed to call RGWListBucketIndexesCR r=" << r << dendl;
+            return set_state(RGWCoroutine_Error, r);
+          }
+        }
+        sync_status.sync_info.state = rgw_data_sync_info::StateSync;
+        /* update new state */
+        yield {
+          r = call(set_sync_info_cr());
+          if (r < 0) {
+            ldout(store->ctx(), 0) << "ERROR: failed to write sync status" << dendl;
+            return r;
+          }
+        }
+      }
+      if  ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
+#if 0
+        case rgw_data_sync_info::StateSync:
+          for (int i = 0; i < num_shards; i++) {
+            RGWCoroutine *cr = new RGWDataSyncShardCR(store, &http_manager, async_rados,
+                                                      conn, store->get_zone_params().log_pool, source_zone,
+                                                      i, rgw_data_sync_marker& _marker)
+          }
+#endif
+#warning FIXME
+      }
+
+      return set_state(RGWCoroutine_Done, 0);
+    }
+    return 0;
+  }
+
+  RGWCoroutine *set_sync_info_cr() {
+    return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(async_rados, store, store->get_zone_params().log_pool,
+                                                         RGWDataSyncStatusManager::sync_status_oid(source_zone),
+                                                         sync_status.sync_info);
   }
 };
 
@@ -798,35 +908,12 @@ int RGWRemoteDataLog::run_sync(int num_shards, rgw_data_sync_status& sync_status
 {
   RGWObjectCtx obj_ctx(store, NULL);
 
-  int r = run(new RGWReadDataSyncStatusCoroutine(async_rados, store, obj_ctx, source_zone, &sync_status));
+  int r = run(new RGWDataSyncCR(store, &http_manager, async_rados, conn, store->get_zone_params().log_pool, source_zone));
   if (r < 0) {
-    ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status" << dendl;
+    ldout(store->ctx(), 0) << "ERROR: failed to run sync" << dendl;
     return r;
   }
 
-  switch ((rgw_data_sync_info::SyncState)sync_status.sync_info.state) {
-    case rgw_data_sync_info::StateInit:
-      ldout(store->ctx(), 20) << __func__ << "(): init" << dendl;
-      r = run(new RGWInitDataSyncStatusCoroutine(async_rados, store, &http_manager, obj_ctx, source_zone, num_shards));
-      /* fall through */
-    case rgw_data_sync_info::StateBuildingFullSyncMaps:
-      ldout(store->ctx(), 20) << __func__ << "(): building full sync maps" << dendl;
-      r = run(new RGWListBucketIndexesCR(store, &http_manager, async_rados, conn, source_zone, num_shards));
-      sync_status.sync_info.state = rgw_data_sync_info::StateSync;
-      r = set_sync_info(sync_status.sync_info);
-      if (r < 0) {
-        ldout(store->ctx(), 0) << "ERROR: failed to update sync status" << dendl;
-        return r;
-      }
-      /* fall through */
-    case rgw_data_sync_info::StateSync:
-#warning FIXME
-      break;
-    default:
-      ldout(store->ctx(), 0) << "ERROR: bad sync state!" << dendl;
-      return -EIO;
-  }
-
   return 0;
 }
 
@@ -848,7 +935,7 @@ int RGWDataSyncStatusManager::init()
     return r;
   }
 
-  source_status_obj = rgw_obj(store->get_zone_params().log_pool, datalog_sync_status_oid);
+  source_status_obj = rgw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(source_zone));
 
   r = source_log.init(source_zone, conn);
   if (r < 0) {
@@ -872,6 +959,14 @@ int RGWDataSyncStatusManager::init()
   return 0;
 }
 
+string RGWDataSyncStatusManager::sync_status_oid(const string& source_zone)
+{
+  char buf[datalog_sync_status_oid_prefix.size() + source_zone.size() + 16];
+  snprintf(buf, sizeof(buf), "%s.%s", datalog_sync_status_oid_prefix.c_str(), source_zone.c_str());
+
+  return string(buf);
+}
+
 string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int shard_id)
 {
   char buf[datalog_sync_status_shard_prefix.size() + source_zone.size() + 16];
index e2699955aef6fb604e9bb514f9a9a200110b1cde..5ed9135a6b8766bc0ae96b34ecb409c68afc9b6e 100644 (file)
@@ -107,14 +107,14 @@ struct rgw_data_sync_status {
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
     ::encode(sync_info, bl);
-    ::encode(sync_markers, bl);
+    /* sync markers are encoded separately */
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::iterator& bl) {
      DECODE_START(1, bl);
     ::decode(sync_info, bl);
-    ::decode(sync_markers, bl);
+    /* sync markers are decoded separately */
      DECODE_FINISH(bl);
   }
 
@@ -155,7 +155,6 @@ public:
   int get_shard_info(int shard_id);
   int read_sync_status(rgw_data_sync_status *sync_status);
   int init_sync_status(int num_shards);
-  int set_sync_info(const rgw_data_sync_info& sync_info);
   int run_sync(int num_shards, rgw_data_sync_status& sync_status);
 
   void wakeup(int shard_id);
@@ -187,6 +186,7 @@ public:
   rgw_data_sync_status& get_sync_status() { return sync_status; }
 
   static string shard_obj_name(const string& source_zone, int shard_id);
+  static string sync_status_oid(const string& source_zone);
 
   int read_sync_status() { return source_log.read_sync_status(&sync_status); }
   int init_sync_status() { return source_log.init_sync_status(num_shards); }