]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: RGWRadosGetOmapKeysCR takes result by shared_ptr 23634/head
authorCasey Bodley <cbodley@redhat.com>
Fri, 17 Aug 2018 17:15:49 +0000 (13:15 -0400)
committerCasey Bodley <cbodley@redhat.com>
Fri, 7 Sep 2018 13:34:05 +0000 (09:34 -0400)
Fixes: http://tracker.ceph.com/issues/21154
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_cr_rados.cc
src/rgw/rgw_cr_rados.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_sync.cc

index 48a421f95f14d3d9032da28c888efd323189414f..6bcc62b91d50acead0be257e35e4d420aa576799 100644 (file)
@@ -249,19 +249,18 @@ int RGWRadosSetOmapKeysCR::request_complete()
 RGWRadosGetOmapKeysCR::RGWRadosGetOmapKeysCR(RGWRados *_store,
                       const rgw_raw_obj& _obj,
                       const string& _marker,
-                      std::set<std::string> *_entries,
-                      int _max_entries, bool *_pmore) : RGWSimpleCoroutine(_store->ctx()),
-                                                store(_store),
-                                                marker(_marker),
-                                                entries(_entries), max_entries(_max_entries),
-                                                pmore(_pmore),
-                                                obj(_obj), cn(NULL)
+                      int _max_entries,
+                      ResultPtr _result)
+  : RGWSimpleCoroutine(_store->ctx()), store(_store), obj(_obj),
+    marker(_marker), max_entries(_max_entries),
+    result(std::move(_result))
 {
+  ceph_assert(result); // must be allocated
   set_description() << "get omap keys dest=" << obj << " marker=" << marker;
 }
 
 int RGWRadosGetOmapKeysCR::send_request() {
-  int r = store->get_raw_obj_ref(obj, &ref);
+  int r = store->get_raw_obj_ref(obj, &result->ref);
   if (r < 0) {
     lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
     return r;
@@ -270,10 +269,10 @@ int RGWRadosGetOmapKeysCR::send_request() {
   set_status() << "send request";
 
   librados::ObjectReadOperation op;
-  op.omap_get_keys2(marker, max_entries, entries, pmore, nullptr);
+  op.omap_get_keys2(marker, max_entries, &result->entries, &result->more, nullptr);
 
-  cn = stack->create_completion_notifier();
-  return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op, NULL);
+  cn = stack->create_completion_notifier(result);
+  return result->ref.ioctx.aio_operate(result->ref.oid, cn->completion(), &op, NULL);
 }
 
 int RGWRadosGetOmapKeysCR::request_complete()
index 13d6a29c325909261f56bc4973af8770b1ce236d..56bec6d6cb644a3b11bccf430249b9980b49e52d 100644 (file)
@@ -407,28 +407,28 @@ public:
 };
 
 class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
-  RGWRados *store;
+ public:
+  struct Result {
+    rgw_rados_ref ref;
+    std::set<std::string> entries;
+    bool more = false;
+  };
+  using ResultPtr = std::shared_ptr<Result>;
 
-  string marker;
-  std::set<std::string> *entries;
-  int max_entries;
-  bool *pmore;
+  RGWRadosGetOmapKeysCR(RGWRados *_store, const rgw_raw_obj& _obj,
+                        const string& _marker, int _max_entries,
+                        ResultPtr result);
 
-  rgw_rados_ref ref;
+  int send_request() override;
+  int request_complete() override;
 
+ private:
+  RGWRados *store;
   rgw_raw_obj obj;
-
+  string marker;
+  int max_entries;
+  ResultPtr result;
   boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
-
-public:
-  RGWRadosGetOmapKeysCR(RGWRados *_store,
-                     const rgw_raw_obj& _obj,
-                     const string& _marker,
-                     std::set<std::string> *_entries,
-                        int _max_entries, bool *pmore);
-
-  int send_request() override;
-  int request_complete() override;
 };
 
 class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine {
index 1c550a6381cc2c83f54caa7c4e47130536233702..361a0de50e1eedc3ea868af3009467a25d6facfa 100644 (file)
@@ -163,16 +163,16 @@ class RGWReadDataSyncRecoveringShardsCR : public RGWShardCollectCR {
 
   uint64_t max_entries;
   int num_shards;
-  int shard_id{0};;
+  int shard_id{0};
 
   string marker;
-  map<int, std::set<std::string>> &entries_map;
+  std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys;
 
  public:
   RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv *env, uint64_t _max_entries, int _num_shards,
-      map<int, std::set<std::string>>& _entries_map)
+                                    std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys)
     : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), env(env),
-    max_entries(_max_entries), num_shards(_num_shards), entries_map(_entries_map)
+      max_entries(_max_entries), num_shards(_num_shards), omapkeys(omapkeys)
   {}
   bool spawn_next() override;
 };
@@ -183,8 +183,10 @@ bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
     return false;
  
   string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry";
+  auto& shard_keys = omapkeys[shard_id];
+  shard_keys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
   spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->get_zone_params().log_pool, error_oid),
-                                  marker, &entries_map[shard_id], max_entries, nullptr), false);
+                                  marker, max_entries, shard_keys), false);
 
   ++shard_id;
   return true;
@@ -722,15 +724,16 @@ int RGWRemoteDataLog::read_recovering_shards(const int num_shards, set<int>& rec
   }
   RGWDataSyncEnv sync_env_local = sync_env;
   sync_env_local.http_manager = &http_manager;
-  map<int, std::set<std::string>> entries_map;
+  std::vector<RGWRadosGetOmapKeysCR::ResultPtr> omapkeys;
+  omapkeys.resize(num_shards);
   uint64_t max_entries{1};
-  ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, entries_map));
+  ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, omapkeys));
   http_manager.stop();
 
   if (ret == 0) {
-    for (const auto& entry : entries_map) {
-      if (entry.second.size() != 0) {
-        recovering_shards.insert(entry.first);
+    for (int i = 0; i < num_shards; i++) {
+      if (omapkeys[i]->entries.size() != 0) {
+        recovering_shards.insert(i);
       }
     }
   }
@@ -1163,9 +1166,9 @@ class RGWDataSyncShardCR : public RGWCoroutine {
   uint32_t shard_id;
   rgw_data_sync_marker sync_marker;
 
+  RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
   std::set<std::string> entries;
   std::set<std::string>::iterator iter;
-  bool more = false;
 
   string oid;
 
@@ -1325,15 +1328,16 @@ public:
       set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker, tn));
       total_entries = sync_marker.pos;
       do {
+        omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
         yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
-                                             sync_marker.marker, &entries,
-                                             max_entries, &more));
+                                             sync_marker.marker, max_entries, omapkeys));
         if (retcode < 0) {
           tn->log(0, SSTR("ERROR: RGWRadosGetOmapKeysCR() returned ret=" << retcode));
           lease_cr->go_down();
           drain_all();
           return set_cr_error(retcode);
         }
+        entries = std::move(omapkeys->entries);
         if (entries.size() > 0) {
           tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
         }
@@ -1355,7 +1359,8 @@ public:
           }
           sync_marker.marker = *iter;
         }
-      } while (more);
+      } while (omapkeys->more);
+      omapkeys.reset();
 
       tn->unset_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
 
@@ -1420,9 +1425,10 @@ public:
         }
 
         /* process bucket shards that previously failed */
+        omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
         yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
-                                             error_marker, &error_entries,
-                                             max_error_entries, &more));
+                                             error_marker, max_error_entries, omapkeys));
+        error_entries = std::move(omapkeys->entries);
         tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries"));
         iter = error_entries.begin();
         for (; iter != error_entries.end(); ++iter) {
@@ -1430,7 +1436,7 @@ public:
           tn->log(20, SSTR("handle error entry: " << error_marker));
           spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false);
         }
-        if (!more) {
+        if (!omapkeys->more) {
           if (error_marker.empty() && error_entries.empty()) {
             /* the retry repo is empty, we back off a bit before calling it again */
             retry_backoff_secs *= 2;
@@ -1443,6 +1449,7 @@ public:
           error_retry_time = ceph::real_clock::now() + make_timespan(retry_backoff_secs);
           error_marker.clear();
         }
+        omapkeys.reset();
 
 
         yield call(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &shard_info));
@@ -2113,9 +2120,9 @@ class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine {
   string marker;
   string error_oid;
 
+  RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
   set<string> error_entries;
   int max_omap_entries;
-  bool more = false;
   int count;
 
 public:
@@ -2137,8 +2144,9 @@ int RGWReadRecoveringBucketShardsCoroutine::operate()
     //read recovering bucket shards
     count = 0;
     do {
+      omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
       yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->get_zone_params().log_pool, error_oid), 
-            marker, &error_entries, max_omap_entries, &more));
+            marker, max_omap_entries, omapkeys));
 
       if (retcode == -ENOENT) {
         break;
@@ -2150,6 +2158,7 @@ int RGWReadRecoveringBucketShardsCoroutine::operate()
         return set_cr_error(retcode);
       }
 
+      error_entries = std::move(omapkeys->entries);
       if (error_entries.empty()) {
         break;
       }
@@ -2158,7 +2167,7 @@ int RGWReadRecoveringBucketShardsCoroutine::operate()
       marker = *error_entries.rbegin();
       recovering_buckets.insert(std::make_move_iterator(error_entries.begin()),
                                 std::make_move_iterator(error_entries.end()));
-    } while(more && count < max_entries);
+    } while (omapkeys->more && count < max_entries);
   
     return set_cr_done();
   }
index bf0dde2dec5b2402db63c8f861db84a5cfc68173..d1e44328715892c86bec21a8c555a5bc1c84b8e5 100644 (file)
@@ -1384,8 +1384,8 @@ class RGWMetaSyncShardCR : public RGWCoroutine {
   string max_marker;
   const std::string& period_marker; //< max marker stored in next period
 
+  RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
   std::set<std::string> entries;
-  bool more = false;
   std::set<std::string>::iterator iter;
 
   string oid;
@@ -1572,8 +1572,9 @@ public:
           lost_lock = true;
           break;
         }
+        omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
         yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
-                                             marker, &entries, max_entries, &more));
+                                             marker, max_entries, omapkeys));
         if (retcode < 0) {
           ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
           tn->log(0, SSTR("ERROR: failed to list omap keys, status=" << retcode));
@@ -1581,6 +1582,7 @@ public:
           drain_all();
           return retcode;
         }
+        entries = std::move(omapkeys->entries);
         tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
         if (entries.size() > 0) {
           tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
@@ -1603,7 +1605,7 @@ public:
           }
         }
         collect_children();
-      } while (more && can_adjust_marker);
+      } while (omapkeys->more && can_adjust_marker);
 
       tn->unset_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */