]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: RGWRadosGetOmapKeysCR takes result by shared_ptr 24912/head
authorCasey Bodley <cbodley@redhat.com>
Fri, 17 Aug 2018 17:15:49 +0000 (13:15 -0400)
committerNathan Cutler <ncutler@suse.com>
Sat, 3 Nov 2018 14:38:43 +0000 (15:38 +0100)
Fixes: http://tracker.ceph.com/issues/21154
Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit fd77ff74ae47bf09553186a5a5e79ec13a9de16d)

Conflicts:
src/rgw/rgw_data_sync.cc
- in mimic, the entire "process bucket shards that previously failed"
  code block is enclosed in an if statement - in master, this is not
  the case

src/rgw/rgw_cr_rados.cc
src/rgw/rgw_cr_rados.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_sync.cc

index 706e84c4ab0dcb9401b529526c3be3f5097d1202..2a839b325358366aa4d4bb069df94b547dc1cf56 100644 (file)
@@ -256,19 +256,18 @@ int RGWRadosSetOmapKeysCR::request_complete()
 RGWRadosGetOmapKeysCR::RGWRadosGetOmapKeysCR(RGWRados *_store,
                       const rgw_raw_obj& _obj,
                       const string& _marker,
-                      std::set<std::string> *_entries,
-                      int _max_entries, bool *_pmore) : RGWSimpleCoroutine(_store->ctx()),
-                                                store(_store),
-                                                marker(_marker),
-                                                entries(_entries), max_entries(_max_entries),
-                                                pmore(_pmore),
-                                                obj(_obj), cn(NULL)
+                      int _max_entries,
+                      ResultPtr _result)
+  : RGWSimpleCoroutine(_store->ctx()), store(_store), obj(_obj),
+    marker(_marker), max_entries(_max_entries),
+    result(std::move(_result))
 {
+  ceph_assert(result); // must be allocated
   set_description() << "get omap keys dest=" << obj << " marker=" << marker;
 }
 
 int RGWRadosGetOmapKeysCR::send_request() {
-  int r = store->get_raw_obj_ref(obj, &ref);
+  int r = store->get_raw_obj_ref(obj, &result->ref);
   if (r < 0) {
     lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
     return r;
@@ -277,10 +276,10 @@ int RGWRadosGetOmapKeysCR::send_request() {
   set_status() << "send request";
 
   librados::ObjectReadOperation op;
-  op.omap_get_keys2(marker, max_entries, entries, pmore, nullptr);
+  op.omap_get_keys2(marker, max_entries, &result->entries, &result->more, nullptr);
 
-  cn = stack->create_completion_notifier();
-  return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op, NULL);
+  cn = stack->create_completion_notifier(result);
+  return result->ref.ioctx.aio_operate(result->ref.oid, cn->completion(), &op, NULL);
 }
 
 int RGWRadosGetOmapKeysCR::request_complete()
index 37b5bdd911ec6f9545eadb738aaa01f750845c7b..b8ff6128ed58e3fadc40102f95782338b2cabc03 100644 (file)
@@ -394,28 +394,28 @@ public:
 };
 
 class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
-  RGWRados *store;
+ public:
+  struct Result {
+    rgw_rados_ref ref;
+    std::set<std::string> entries;
+    bool more = false;
+  };
+  using ResultPtr = std::shared_ptr<Result>;
 
-  string marker;
-  std::set<std::string> *entries;
-  int max_entries;
-  bool *pmore;
+  RGWRadosGetOmapKeysCR(RGWRados *_store, const rgw_raw_obj& _obj,
+                        const string& _marker, int _max_entries,
+                        ResultPtr result);
 
-  rgw_rados_ref ref;
+  int send_request() override;
+  int request_complete() override;
 
+ private:
+  RGWRados *store;
   rgw_raw_obj obj;
-
+  string marker;
+  int max_entries;
+  ResultPtr result;
   boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
-
-public:
-  RGWRadosGetOmapKeysCR(RGWRados *_store,
-                     const rgw_raw_obj& _obj,
-                     const string& _marker,
-                     std::set<std::string> *_entries,
-                        int _max_entries, bool *pmore);
-
-  int send_request() override;
-  int request_complete() override;
 };
 
 class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine {
index 819fc0dd08ee8bef3597803f020e511add3ebb6d..ca2f1aec2b94379dd725d9ef5d2c9ffb09f4e9b9 100644 (file)
@@ -163,16 +163,16 @@ class RGWReadDataSyncRecoveringShardsCR : public RGWShardCollectCR {
 
   uint64_t max_entries;
   int num_shards;
-  int shard_id{0};;
+  int shard_id{0};
 
   string marker;
-  map<int, std::set<std::string>> &entries_map;
+  std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys;
 
  public:
   RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv *env, uint64_t _max_entries, int _num_shards,
-      map<int, std::set<std::string>>& _entries_map)
+                                    std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys)
     : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), env(env),
-    max_entries(_max_entries), num_shards(_num_shards), entries_map(_entries_map)
+      max_entries(_max_entries), num_shards(_num_shards), omapkeys(omapkeys)
   {}
   bool spawn_next() override;
 };
@@ -183,8 +183,10 @@ bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
     return false;
  
   string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry";
+  auto& shard_keys = omapkeys[shard_id];
+  shard_keys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
   spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->get_zone_params().log_pool, error_oid),
-                                  marker, &entries_map[shard_id], max_entries, nullptr), false);
+                                  marker, max_entries, shard_keys), false);
 
   ++shard_id;
   return true;
@@ -722,15 +724,16 @@ int RGWRemoteDataLog::read_recovering_shards(const int num_shards, set<int>& rec
   }
   RGWDataSyncEnv sync_env_local = sync_env;
   sync_env_local.http_manager = &http_manager;
-  map<int, std::set<std::string>> entries_map;
+  std::vector<RGWRadosGetOmapKeysCR::ResultPtr> omapkeys;
+  omapkeys.resize(num_shards);
   uint64_t max_entries{1};
-  ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, entries_map));
+  ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, omapkeys));
   http_manager.stop();
 
   if (ret == 0) {
-    for (const auto& entry : entries_map) {
-      if (entry.second.size() != 0) {
-        recovering_shards.insert(entry.first);
+    for (int i = 0; i < num_shards; i++) {
+      if (omapkeys[i]->entries.size() != 0) {
+        recovering_shards.insert(i);
       }
     }
   }
@@ -1154,9 +1157,9 @@ class RGWDataSyncShardCR : public RGWCoroutine {
   uint32_t shard_id;
   rgw_data_sync_marker sync_marker;
 
+  RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
   std::set<std::string> entries;
   std::set<std::string>::iterator iter;
-  bool more = false;
 
   string oid;
 
@@ -1317,15 +1320,16 @@ public:
           drain_all();
           return set_cr_error(-ECANCELED);
         }
+        omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
         yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
-                                             sync_marker.marker, &entries,
-                                             max_entries, &more));
+                                             sync_marker.marker, max_entries, omapkeys));
         if (retcode < 0) {
           tn->log(0, SSTR("ERROR: RGWRadosGetOmapKeysCR() returned ret=" << retcode));
           lease_cr->go_down();
           drain_all();
           return set_cr_error(retcode);
         }
+        entries = std::move(omapkeys->entries);
         if (entries.size() > 0) {
           tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
         }
@@ -1353,7 +1357,8 @@ public:
             }
           }
         }
-      } while (more);
+      } while (omapkeys->more);
+      omapkeys.reset();
 
       drain_all_but_stack(lease_stack.get());
 
@@ -1430,9 +1435,10 @@ public:
 
         if (error_retry_time <= ceph::coarse_real_clock::now()) {
           /* process bucket shards that previously failed */
+          omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
           yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
-                                               error_marker, &error_entries,
-                                               max_error_entries, &more));
+                                               error_marker, max_error_entries, omapkeys));
+          error_entries = std::move(omapkeys->entries);
           tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries"));
           iter = error_entries.begin();
           for (; iter != error_entries.end(); ++iter) {
@@ -1440,7 +1446,7 @@ public:
             tn->log(20, SSTR("handle error entry: " << error_marker));
             spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false);
           }
-          if (!more) {
+          if (!omapkeys->more) {
             if (error_marker.empty() && error_entries.empty()) {
               /* the retry repo is empty, we back off a bit before calling it again */
               retry_backoff_secs *= 2;
@@ -1454,6 +1460,7 @@ public:
             error_marker.clear();
           }
         }
+        omapkeys.reset();
 
 #define INCREMENTAL_MAX_ENTRIES 100
        tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker));
@@ -2123,9 +2130,9 @@ class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine {
   string marker;
   string error_oid;
 
+  RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
   set<string> error_entries;
   int max_omap_entries;
-  bool more = false;
   int count;
 
 public:
@@ -2147,8 +2154,9 @@ int RGWReadRecoveringBucketShardsCoroutine::operate()
     //read recovering bucket shards
     count = 0;
     do {
+      omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
       yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->get_zone_params().log_pool, error_oid), 
-            marker, &error_entries, max_omap_entries, &more));
+            marker, max_omap_entries, omapkeys));
 
       if (retcode == -ENOENT) {
         break;
@@ -2160,6 +2168,7 @@ int RGWReadRecoveringBucketShardsCoroutine::operate()
         return set_cr_error(retcode);
       }
 
+      error_entries = std::move(omapkeys->entries);
       if (error_entries.empty()) {
         break;
       }
@@ -2168,7 +2177,7 @@ int RGWReadRecoveringBucketShardsCoroutine::operate()
       marker = *error_entries.rbegin();
       recovering_buckets.insert(std::make_move_iterator(error_entries.begin()),
                                 std::make_move_iterator(error_entries.end()));
-    } while(more && count < max_entries);
+    } while (omapkeys->more && count < max_entries);
   
     return set_cr_done();
   }
index 11953426bc12a843fc3f56ec6498d039a74a7555..c4979bd53461dcbe3643403cd1c4d4bb26f12ef9 100644 (file)
@@ -1384,8 +1384,8 @@ class RGWMetaSyncShardCR : public RGWCoroutine {
   string max_marker;
   const std::string& period_marker; //< max marker stored in next period
 
+  RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
   std::set<std::string> entries;
-  bool more = false;
   std::set<std::string>::iterator iter;
 
   string oid;
@@ -1572,8 +1572,9 @@ public:
           lost_lock = true;
           break;
         }
+        omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
         yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
-                                             marker, &entries, max_entries, &more));
+                                             marker, max_entries, omapkeys));
         if (retcode < 0) {
           ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
           tn->log(0, SSTR("ERROR: failed to list omap keys, status=" << retcode));
@@ -1581,6 +1582,7 @@ public:
           drain_all();
           return retcode;
         }
+        entries = std::move(omapkeys->entries);
         tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
         if (entries.size() > 0) {
           tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
@@ -1603,7 +1605,7 @@ public:
           }
         }
         collect_children();
-      } while (more && can_adjust_marker);
+      } while (omapkeys->more && can_adjust_marker);
 
       tn->unset_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */