]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: move mdlog trimming into rgw_trim_mdlog.cc
authorCasey Bodley <cbodley@redhat.com>
Sat, 13 Apr 2019 16:39:35 +0000 (12:39 -0400)
committerCasey Bodley <cbodley@redhat.com>
Thu, 18 Apr 2019 17:16:17 +0000 (13:16 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/CMakeLists.txt
src/rgw/rgw_admin.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_sync.cc
src/rgw/rgw_sync.h
src/rgw/rgw_trim_mdlog.cc [new file with mode: 0644]
src/rgw/rgw_trim_mdlog.h [new file with mode: 0644]

index ee7d174a8ee22ead527e7c869c421893668543c3..b93cac4aa1e1b79eb18d22c60a4d3fb9ec061398 100644 (file)
@@ -74,6 +74,7 @@ set(librgw_common_srcs
   rgw_sync_module_pubsub_rest.cc
   rgw_sync_log_trim.cc
   rgw_sync_trace.cc
+  rgw_trim_mdlog.cc
   rgw_period_history.cc
   rgw_period_puller.cc
   rgw_reshard.cc
index 6457050511e4e6326d0dc2cda8c97eb24b01c40d..e76d2954024f59efb0d6ed731d84d464f50a8ebe 100644 (file)
@@ -44,6 +44,7 @@ extern "C" {
 #include "rgw_usage.h"
 #include "rgw_orphan.h"
 #include "rgw_sync.h"
+#include "rgw_trim_mdlog.h"
 #include "rgw_sync_log_trim.h"
 #include "rgw_data_sync.h"
 #include "rgw_rest_conn.h"
index dc8932411022babcecf369b95d369a7fbcc84cf8..4a18def8d258c7ebe8bf6337d5d071c37e90d3f8 100644 (file)
@@ -68,6 +68,7 @@ using namespace librados;
 #include "rgw_sync.h"
 #include "rgw_sync_counters.h"
 #include "rgw_sync_trace.h"
+#include "rgw_trim_mdlog.h"
 #include "rgw_data_sync.h"
 #include "rgw_realm_watcher.h"
 #include "rgw_reshard.h"
index eb83a51192f334cc7ab1fbe289c6c193d7b04c40..1c3ee7d45e97a31ed674b13a9db91f91a829fd6b 100644 (file)
@@ -524,6 +524,14 @@ public:
   }
 };
 
+RGWCoroutine* create_read_remote_mdlog_shard_info_cr(RGWMetaSyncEnv *env,
+                                                     const std::string& period,
+                                                     int shard_id,
+                                                     RGWMetadataLogInfo* info)
+{
+  return new RGWReadRemoteMDLogShardInfoCR(env, period, shard_id, info);
+}
+
 class RGWListRemoteMDLogShardCR : public RGWSimpleCoroutine {
   RGWMetaSyncEnv *sync_env;
   RGWRESTReadResource *http_op;
@@ -586,6 +594,17 @@ public:
   }
 };
 
+RGWCoroutine* create_list_remote_mdlog_shard_cr(RGWMetaSyncEnv *env,
+                                                const std::string& period,
+                                                int shard_id,
+                                                const std::string& marker,
+                                                uint32_t max_entries,
+                                                rgw_mdlog_shard_data *result)
+{
+  return new RGWListRemoteMDLogShardCR(env, period, shard_id, marker,
+                                       max_entries, result);
+}
+
 bool RGWReadRemoteMDLogInfoCR::spawn_next() {
   if (shard_id >= num_shards) {
     return false;
@@ -2445,694 +2464,3 @@ int RGWCloneMetaLogCoroutine::state_store_mdlog_entries_complete()
 {
   return set_cr_done();
 }
-
-
-// TODO: move into rgw_sync_trim.cc
-#undef dout_prefix
-#define dout_prefix (*_dout << "meta trim: ")
-
-/// purge all log shards for the given mdlog
-class PurgeLogShardsCR : public RGWShardCollectCR {
-  RGWRados *const store;
-  const RGWMetadataLog* mdlog;
-  const int num_shards;
-  rgw_raw_obj obj;
-  int i{0};
-
-  static constexpr int max_concurrent = 16;
-
- public:
-  PurgeLogShardsCR(RGWRados *store, const RGWMetadataLog* mdlog,
-                   const rgw_pool& pool, int num_shards)
-    : RGWShardCollectCR(store->ctx(), max_concurrent),
-      store(store), mdlog(mdlog), num_shards(num_shards), obj(pool, "")
-  {}
-
-  bool spawn_next() override {
-    if (i == num_shards) {
-      return false;
-    }
-    mdlog->get_shard_oid(i++, obj.oid);
-    spawn(new RGWRadosRemoveCR(store, obj), false);
-    return true;
-  }
-};
-
-using Cursor = RGWPeriodHistory::Cursor;
-
-/// purge mdlogs from the oldest up to (but not including) the given realm_epoch
-class PurgePeriodLogsCR : public RGWCoroutine {
-  RGWRados *const store;
-  RGWMetadataManager *const metadata;
-  RGWObjVersionTracker objv;
-  Cursor cursor;
-  epoch_t realm_epoch;
-  epoch_t *last_trim_epoch; //< update last trim on success
-
- public:
-  PurgePeriodLogsCR(RGWRados *store, epoch_t realm_epoch, epoch_t *last_trim)
-    : RGWCoroutine(store->ctx()), store(store), metadata(store->meta_mgr),
-      realm_epoch(realm_epoch), last_trim_epoch(last_trim)
-  {}
-
-  int operate() override;
-};
-
-int PurgePeriodLogsCR::operate()
-{
-  reenter(this) {
-    // read our current oldest log period
-    yield call(metadata->read_oldest_log_period_cr(&cursor, &objv));
-    if (retcode < 0) {
-      return set_cr_error(retcode);
-    }
-    ceph_assert(cursor);
-    ldout(cct, 20) << "oldest log realm_epoch=" << cursor.get_epoch()
-        << " period=" << cursor.get_period().get_id() << dendl;
-
-    // trim -up to- the given realm_epoch
-    while (cursor.get_epoch() < realm_epoch) {
-      ldout(cct, 4) << "purging log shards for realm_epoch=" << cursor.get_epoch()
-          << " period=" << cursor.get_period().get_id() << dendl;
-      yield {
-        const auto mdlog = metadata->get_log(cursor.get_period().get_id());
-        const auto& pool = store->svc.zone->get_zone_params().log_pool;
-        auto num_shards = cct->_conf->rgw_md_log_max_shards;
-        call(new PurgeLogShardsCR(store, mdlog, pool, num_shards));
-      }
-      if (retcode < 0) {
-        ldout(cct, 1) << "failed to remove log shards: "
-            << cpp_strerror(retcode) << dendl;
-        return set_cr_error(retcode);
-      }
-      ldout(cct, 10) << "removed log shards for realm_epoch=" << cursor.get_epoch()
-          << " period=" << cursor.get_period().get_id() << dendl;
-
-      // update our mdlog history
-      yield call(metadata->trim_log_period_cr(cursor, &objv));
-      if (retcode == -ENOENT) {
-        // must have raced to update mdlog history. return success and allow the
-        // winner to continue purging
-        ldout(cct, 10) << "already removed log shards for realm_epoch=" << cursor.get_epoch()
-            << " period=" << cursor.get_period().get_id() << dendl;
-        return set_cr_done();
-      } else if (retcode < 0) {
-        ldout(cct, 1) << "failed to remove log shards for realm_epoch="
-            << cursor.get_epoch() << " period=" << cursor.get_period().get_id()
-            << " with: " << cpp_strerror(retcode) << dendl;
-        return set_cr_error(retcode);
-      }
-
-      if (*last_trim_epoch < cursor.get_epoch()) {
-        *last_trim_epoch = cursor.get_epoch();
-      }
-
-      ceph_assert(cursor.has_next()); // get_current() should always come after
-      cursor.next();
-    }
-    return set_cr_done();
-  }
-  return 0;
-}
-
-namespace {
-
-using connection_map = std::map<std::string, std::unique_ptr<RGWRESTConn>>;
-
-/// construct a RGWRESTConn for each zone in the realm
-template <typename Zonegroups>
-connection_map make_peer_connections(RGWRados *store,
-                                     const Zonegroups& zonegroups)
-{
-  connection_map connections;
-  for (auto& g : zonegroups) {
-    for (auto& z : g.second.zones) {
-      std::unique_ptr<RGWRESTConn> conn{
-        new RGWRESTConn(store->ctx(), store->svc.zone, z.first, z.second.endpoints)};
-      connections.emplace(z.first, std::move(conn));
-    }
-  }
-  return connections;
-}
-
-/// return the marker that it's safe to trim up to
-const std::string& get_stable_marker(const rgw_meta_sync_marker& m)
-{
-  return m.state == m.FullSync ? m.next_step_marker : m.marker;
-}
-
-/// comparison operator for take_min_status()
-bool operator<(const rgw_meta_sync_marker& lhs, const rgw_meta_sync_marker& rhs)
-{
-  // sort by stable marker
-  return get_stable_marker(lhs) < get_stable_marker(rhs);
-}
-
-/// populate the status with the minimum stable marker of each shard for any
-/// peer whose realm_epoch matches the minimum realm_epoch in the input
-template <typename Iter>
-int take_min_status(CephContext *cct, Iter first, Iter last,
-                    rgw_meta_sync_status *status)
-{
-  if (first == last) {
-    return -EINVAL;
-  }
-  const size_t num_shards = cct->_conf->rgw_md_log_max_shards;
-
-  status->sync_info.realm_epoch = std::numeric_limits<epoch_t>::max();
-  for (auto p = first; p != last; ++p) {
-    // validate peer's shard count
-    if (p->sync_markers.size() != num_shards) {
-      ldout(cct, 1) << "take_min_status got peer status with "
-          << p->sync_markers.size() << " shards, expected "
-          << num_shards << dendl;
-      return -EINVAL;
-    }
-    if (p->sync_info.realm_epoch < status->sync_info.realm_epoch) {
-      // earlier epoch, take its entire status
-      *status = std::move(*p);
-    } else if (p->sync_info.realm_epoch == status->sync_info.realm_epoch) {
-      // same epoch, take any earlier markers
-      auto m = status->sync_markers.begin();
-      for (auto& shard : p->sync_markers) {
-        if (shard.second < m->second) {
-          m->second = std::move(shard.second);
-        }
-        ++m;
-      }
-    }
-  }
-  return 0;
-}
-
-struct TrimEnv {
-  const DoutPrefixProvider *dpp;
-  RGWRados *const store;
-  RGWHTTPManager *const http;
-  int num_shards;
-  const std::string& zone;
-  Cursor current; //< cursor to current period
-  epoch_t last_trim_epoch{0}; //< epoch of last mdlog that was purged
-
-  TrimEnv(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards)
-    : dpp(dpp), store(store), http(http), num_shards(num_shards),
-      zone(store->svc.zone->get_zone_params().get_id()),
-      current(store->period_history->get_current())
-  {}
-};
-
-struct MasterTrimEnv : public TrimEnv {
-  connection_map connections; //< peer connections
-  std::vector<rgw_meta_sync_status> peer_status; //< sync status for each peer
-  /// last trim marker for each shard, only applies to current period's mdlog
-  std::vector<std::string> last_trim_markers;
-
-  MasterTrimEnv(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards)
-    : TrimEnv(dpp, store, http, num_shards),
-      last_trim_markers(num_shards)
-  {
-    auto& period = current.get_period();
-    connections = make_peer_connections(store, period.get_map().zonegroups);
-    connections.erase(zone);
-    peer_status.resize(connections.size());
-  }
-};
-
-struct PeerTrimEnv : public TrimEnv {
-  /// last trim timestamp for each shard, only applies to current period's mdlog
-  std::vector<ceph::real_time> last_trim_timestamps;
-
-  PeerTrimEnv(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards)
-    : TrimEnv(dpp, store, http, num_shards),
-      last_trim_timestamps(num_shards)
-  {}
-
-  void set_num_shards(int num_shards) {
-    this->num_shards = num_shards;
-    last_trim_timestamps.resize(num_shards);
-  }
-};
-
-} // anonymous namespace
-
-
-/// spawn a trim cr for each shard that needs it, while limiting the number
-/// of concurrent shards
-class MetaMasterTrimShardCollectCR : public RGWShardCollectCR {
- private:
-  static constexpr int MAX_CONCURRENT_SHARDS = 16;
-
-  MasterTrimEnv& env;
-  RGWMetadataLog *mdlog;
-  int shard_id{0};
-  std::string oid;
-  const rgw_meta_sync_status& sync_status;
-
- public:
-  MetaMasterTrimShardCollectCR(MasterTrimEnv& env, RGWMetadataLog *mdlog,
-                               const rgw_meta_sync_status& sync_status)
-    : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
-      env(env), mdlog(mdlog), sync_status(sync_status)
-  {}
-
-  bool spawn_next() override;
-};
-
-bool MetaMasterTrimShardCollectCR::spawn_next()
-{
-  while (shard_id < env.num_shards) {
-    auto m = sync_status.sync_markers.find(shard_id);
-    if (m == sync_status.sync_markers.end()) {
-      shard_id++;
-      continue;
-    }
-    auto& stable = get_stable_marker(m->second);
-    auto& last_trim = env.last_trim_markers[shard_id];
-
-    if (stable <= last_trim) {
-      // already trimmed
-      ldout(cct, 20) << "skipping log shard " << shard_id
-          << " at marker=" << stable
-          << " last_trim=" << last_trim
-          << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl;
-      shard_id++;
-      continue;
-    }
-
-    mdlog->get_shard_oid(shard_id, oid);
-
-    ldout(cct, 10) << "trimming log shard " << shard_id
-        << " at marker=" << stable
-        << " last_trim=" << last_trim
-        << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl;
-    spawn(new RGWSyncLogTrimCR(env.store, oid, stable, &last_trim), false);
-    shard_id++;
-    return true;
-  }
-  return false;
-}
-
-/// spawn rest requests to read each peer's sync status
-class MetaMasterStatusCollectCR : public RGWShardCollectCR {
-  static constexpr int MAX_CONCURRENT_SHARDS = 16;
-
-  MasterTrimEnv& env;
-  connection_map::iterator c;
-  std::vector<rgw_meta_sync_status>::iterator s;
- public:
-  explicit MetaMasterStatusCollectCR(MasterTrimEnv& env)
-    : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
-      env(env), c(env.connections.begin()), s(env.peer_status.begin())
-  {}
-
-  bool spawn_next() override {
-    if (c == env.connections.end()) {
-      return false;
-    }
-    static rgw_http_param_pair params[] = {
-      { "type", "metadata" },
-      { "status", nullptr },
-      { nullptr, nullptr }
-    };
-
-    ldout(cct, 20) << "query sync status from " << c->first << dendl;
-    auto conn = c->second.get();
-    using StatusCR = RGWReadRESTResourceCR<rgw_meta_sync_status>;
-    spawn(new StatusCR(cct, conn, env.http, "/admin/log/", params, &*s),
-          false);
-    ++c;
-    ++s;
-    return true;
-  }
-};
-
-class MetaMasterTrimCR : public RGWCoroutine {
-  MasterTrimEnv& env;
-  rgw_meta_sync_status min_status; //< minimum sync status of all peers
-  int ret{0};
-
- public:
-  explicit MetaMasterTrimCR(MasterTrimEnv& env)
-    : RGWCoroutine(env.store->ctx()), env(env)
-  {}
-
-  int operate() override;
-};
-
-int MetaMasterTrimCR::operate()
-{
-  reenter(this) {
-    // TODO: detect this and fail before we spawn the trim thread?
-    if (env.connections.empty()) {
-      ldout(cct, 4) << "no peers, exiting" << dendl;
-      return set_cr_done();
-    }
-
-    ldout(cct, 10) << "fetching sync status for zone " << env.zone << dendl;
-    // query mdlog sync status from peers
-    yield call(new MetaMasterStatusCollectCR(env));
-
-    // must get a successful reply from all peers to consider trimming
-    if (ret < 0) {
-      ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
-      return set_cr_error(ret);
-    }
-
-    // determine the minimum epoch and markers
-    ret = take_min_status(env.store->ctx(), env.peer_status.begin(),
-                          env.peer_status.end(), &min_status);
-    if (ret < 0) {
-      ldout(cct, 4) << "failed to calculate min sync status from peers" << dendl;
-      return set_cr_error(ret);
-    }
-    yield {
-      auto store = env.store;
-      auto epoch = min_status.sync_info.realm_epoch;
-      ldout(cct, 4) << "realm epoch min=" << epoch
-          << " current=" << env.current.get_epoch()<< dendl;
-      if (epoch > env.last_trim_epoch + 1) {
-        // delete any prior mdlog periods
-        spawn(new PurgePeriodLogsCR(store, epoch, &env.last_trim_epoch), true);
-      } else {
-        ldout(cct, 10) << "mdlogs already purged up to realm_epoch "
-            << env.last_trim_epoch << dendl;
-      }
-
-      // if realm_epoch == current, trim mdlog based on markers
-      if (epoch == env.current.get_epoch()) {
-        auto mdlog = store->meta_mgr->get_log(env.current.get_period().get_id());
-        spawn(new MetaMasterTrimShardCollectCR(env, mdlog, min_status), true);
-      }
-    }
-    // ignore any errors during purge/trim because we want to hold the lock open
-    return set_cr_done();
-  }
-  return 0;
-}
-
-
-/// read the first entry of the master's mdlog shard and trim to that position
-class MetaPeerTrimShardCR : public RGWCoroutine {
-  RGWMetaSyncEnv& env;
-  RGWMetadataLog *mdlog;
-  const std::string& period_id;
-  const int shard_id;
-  RGWMetadataLogInfo info;
-  ceph::real_time stable; //< safe timestamp to trim, according to master
-  ceph::real_time *last_trim; //< last trimmed timestamp, updated on trim
-  rgw_mdlog_shard_data result; //< result from master's mdlog listing
-
- public:
-  MetaPeerTrimShardCR(RGWMetaSyncEnv& env, RGWMetadataLog *mdlog,
-                      const std::string& period_id, int shard_id,
-                      ceph::real_time *last_trim)
-    : RGWCoroutine(env.store->ctx()), env(env), mdlog(mdlog),
-      period_id(period_id), shard_id(shard_id), last_trim(last_trim)
-  {}
-
-  int operate() override;
-};
-
-int MetaPeerTrimShardCR::operate()
-{
-  reenter(this) {
-    // query master's first mdlog entry for this shard
-    yield call(new RGWListRemoteMDLogShardCR(&env, period_id, shard_id,
-                                             "", 1, &result));
-    if (retcode < 0) {
-      ldpp_dout(env.dpp, 5) << "failed to read first entry from master's mdlog shard "
-          << shard_id << " for period " << period_id
-          << ": " << cpp_strerror(retcode) << dendl;
-      return set_cr_error(retcode);
-    }
-    if (result.entries.empty()) {
-      // if there are no mdlog entries, we don't have a timestamp to compare. we
-      // can't just trim everything, because there could be racing updates since
-      // this empty reply. query the mdlog shard info to read its max timestamp,
-      // then retry the listing to make sure it's still empty before trimming to
-      // that
-      ldpp_dout(env.dpp, 10) << "empty master mdlog shard " << shard_id
-          << ", reading last timestamp from shard info" << dendl;
-      // read the mdlog shard info for the last timestamp
-      using ShardInfoCR = RGWReadRemoteMDLogShardInfoCR;
-      yield call(new ShardInfoCR(&env, period_id, shard_id, &info));
-      if (retcode < 0) {
-        ldpp_dout(env.dpp, 5) << "failed to read info from master's mdlog shard "
-            << shard_id << " for period " << period_id
-            << ": " << cpp_strerror(retcode) << dendl;
-        return set_cr_error(retcode);
-      }
-      if (ceph::real_clock::is_zero(info.last_update)) {
-        return set_cr_done(); // nothing to trim
-      }
-      ldpp_dout(env.dpp, 10) << "got mdlog shard info with last update="
-          << info.last_update << dendl;
-      // re-read the master's first mdlog entry to make sure it hasn't changed
-      yield call(new RGWListRemoteMDLogShardCR(&env, period_id, shard_id,
-                                               "", 1, &result));
-      if (retcode < 0) {
-        ldpp_dout(env.dpp, 5) << "failed to read first entry from master's mdlog shard "
-            << shard_id << " for period " << period_id
-            << ": " << cpp_strerror(retcode) << dendl;
-        return set_cr_error(retcode);
-      }
-      // if the mdlog is still empty, trim to max marker
-      if (result.entries.empty()) {
-        stable = info.last_update;
-      } else {
-        stable = result.entries.front().timestamp;
-
-        // can only trim -up to- master's first timestamp, so subtract a second.
-        // (this is why we use timestamps instead of markers for the peers)
-        stable -= std::chrono::seconds(1);
-      }
-    } else {
-      stable = result.entries.front().timestamp;
-      stable -= std::chrono::seconds(1);
-    }
-
-    if (stable <= *last_trim) {
-      ldpp_dout(env.dpp, 10) << "skipping log shard " << shard_id
-          << " at timestamp=" << stable
-          << " last_trim=" << *last_trim << dendl;
-      return set_cr_done();
-    }
-
-    ldpp_dout(env.dpp, 10) << "trimming log shard " << shard_id
-        << " at timestamp=" << stable
-        << " last_trim=" << *last_trim << dendl;
-    yield {
-      std::string oid;
-      mdlog->get_shard_oid(shard_id, oid);
-      call(new RGWRadosTimelogTrimCR(env.store, oid, real_time{}, stable, "", ""));
-    }
-    if (retcode < 0 && retcode != -ENODATA) {
-      ldpp_dout(env.dpp, 1) << "failed to trim mdlog shard " << shard_id
-          << ": " << cpp_strerror(retcode) << dendl;
-      return set_cr_error(retcode);
-    }
-    *last_trim = stable;
-    return set_cr_done();
-  }
-  return 0;
-}
-
-class MetaPeerTrimShardCollectCR : public RGWShardCollectCR {
-  static constexpr int MAX_CONCURRENT_SHARDS = 16;
-
-  PeerTrimEnv& env;
-  RGWMetadataLog *mdlog;
-  const std::string& period_id;
-  RGWMetaSyncEnv meta_env; //< for RGWListRemoteMDLogShardCR
-  int shard_id{0};
-
- public:
-  MetaPeerTrimShardCollectCR(PeerTrimEnv& env, RGWMetadataLog *mdlog)
-    : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
-      env(env), mdlog(mdlog), period_id(env.current.get_period().get_id())
-  {
-    meta_env.init(env.dpp, cct, env.store, env.store->svc.zone->get_master_conn(),
-                  env.store->get_async_rados(), env.http, nullptr,
-                  env.store->get_sync_tracer());
-  }
-
-  bool spawn_next() override;
-};
-
-bool MetaPeerTrimShardCollectCR::spawn_next()
-{
-  if (shard_id >= env.num_shards) {
-    return false;
-  }
-  auto& last_trim = env.last_trim_timestamps[shard_id];
-  spawn(new MetaPeerTrimShardCR(meta_env, mdlog, period_id, shard_id, &last_trim),
-        false);
-  shard_id++;
-  return true;
-}
-
-class MetaPeerTrimCR : public RGWCoroutine {
-  PeerTrimEnv& env;
-  rgw_mdlog_info mdlog_info; //< master's mdlog info
-
- public:
-  explicit MetaPeerTrimCR(PeerTrimEnv& env) : RGWCoroutine(env.store->ctx()), env(env) {}
-
-  int operate() override;
-};
-
-int MetaPeerTrimCR::operate()
-{
-  reenter(this) {
-    ldout(cct, 10) << "fetching master mdlog info" << dendl;
-    yield {
-      // query mdlog_info from master for oldest_log_period
-      rgw_http_param_pair params[] = {
-        { "type", "metadata" },
-        { nullptr, nullptr }
-      };
-
-      using LogInfoCR = RGWReadRESTResourceCR<rgw_mdlog_info>;
-      call(new LogInfoCR(cct, env.store->svc.zone->get_master_conn(), env.http,
-                         "/admin/log/", params, &mdlog_info));
-    }
-    if (retcode < 0) {
-      ldout(cct, 4) << "failed to read mdlog info from master" << dendl;
-      return set_cr_error(retcode);
-    }
-    // use master's shard count instead
-    env.set_num_shards(mdlog_info.num_shards);
-
-    if (mdlog_info.realm_epoch > env.last_trim_epoch + 1) {
-      // delete any prior mdlog periods
-      yield call(new PurgePeriodLogsCR(env.store, mdlog_info.realm_epoch,
-                                       &env.last_trim_epoch));
-    } else {
-      ldout(cct, 10) << "mdlogs already purged through realm_epoch "
-          << env.last_trim_epoch << dendl;
-    }
-
-    // if realm_epoch == current, trim mdlog based on master's markers
-    if (mdlog_info.realm_epoch == env.current.get_epoch()) {
-      yield {
-        auto meta_mgr = env.store->meta_mgr;
-        auto mdlog = meta_mgr->get_log(env.current.get_period().get_id());
-        call(new MetaPeerTrimShardCollectCR(env, mdlog));
-        // ignore any errors during purge/trim because we want to hold the lock open
-      }
-    }
-    return set_cr_done();
-  }
-  return 0;
-}
-
-class MetaTrimPollCR : public RGWCoroutine {
-  RGWRados *const store;
-  const utime_t interval; //< polling interval
-  const rgw_raw_obj obj;
-  const std::string name{"meta_trim"}; //< lock name
-  const std::string cookie;
-
- protected:
-  /// allocate the coroutine to run within the lease
-  virtual RGWCoroutine* alloc_cr() = 0;
-
- public:
-  MetaTrimPollCR(RGWRados *store, utime_t interval)
-    : RGWCoroutine(store->ctx()), store(store), interval(interval),
-      obj(store->svc.zone->get_zone_params().log_pool, RGWMetadataLogHistory::oid),
-      cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct))
-  {}
-
-  int operate() override;
-};
-
-int MetaTrimPollCR::operate()
-{
-  reenter(this) {
-    for (;;) {
-      set_status("sleeping");
-      wait(interval);
-
-      // prevent others from trimming for our entire wait interval
-      set_status("acquiring trim lock");
-      yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
-                                          obj, name, cookie, interval.sec()));
-      if (retcode < 0) {
-        ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl;
-        continue;
-      }
-
-      set_status("trimming");
-      yield call(alloc_cr());
-
-      if (retcode < 0) {
-        // on errors, unlock so other gateways can try
-        set_status("unlocking");
-        yield call(new RGWSimpleRadosUnlockCR(store->get_async_rados(), store,
-                                              obj, name, cookie));
-      }
-    }
-  }
-  return 0;
-}
-
-class MetaMasterTrimPollCR : public MetaTrimPollCR  {
-  MasterTrimEnv env; //< trim state to share between calls
-  RGWCoroutine* alloc_cr() override {
-    return new MetaMasterTrimCR(env);
-  }
- public:
-  MetaMasterTrimPollCR(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http,
-                       int num_shards, utime_t interval)
-    : MetaTrimPollCR(store, interval),
-      env(dpp, store, http, num_shards)
-  {}
-};
-
-class MetaPeerTrimPollCR : public MetaTrimPollCR {
-  PeerTrimEnv env; //< trim state to share between calls
-  RGWCoroutine* alloc_cr() override {
-    return new MetaPeerTrimCR(env);
-  }
- public:
-  MetaPeerTrimPollCR(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http,
-                     int num_shards, utime_t interval)
-    : MetaTrimPollCR(store, interval),
-      env(dpp, store, http, num_shards)
-  {}
-};
-
-RGWCoroutine* create_meta_log_trim_cr(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http,
-                                      int num_shards, utime_t interval)
-{
-  if (store->svc.zone->is_meta_master()) {
-    return new MetaMasterTrimPollCR(dpp, store, http, num_shards, interval);
-  }
-  return new MetaPeerTrimPollCR(dpp, store, http, num_shards, interval);
-}
-
-
-struct MetaMasterAdminTrimCR : private MasterTrimEnv, public MetaMasterTrimCR {
-  MetaMasterAdminTrimCR(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards)
-    : MasterTrimEnv(dpp, store, http, num_shards),
-      MetaMasterTrimCR(*static_cast<MasterTrimEnv*>(this))
-  {}
-};
-
-struct MetaPeerAdminTrimCR : private PeerTrimEnv, public MetaPeerTrimCR {
-  MetaPeerAdminTrimCR(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards)
-    : PeerTrimEnv(dpp, store, http, num_shards),
-      MetaPeerTrimCR(*static_cast<PeerTrimEnv*>(this))
-  {}
-};
-
-RGWCoroutine* create_admin_meta_log_trim_cr(const DoutPrefixProvider *dpp, RGWRados *store,
-                                            RGWHTTPManager *http,
-                                            int num_shards)
-{
-  if (store->svc.zone->is_meta_master()) {
-    return new MetaMasterAdminTrimCR(dpp, store, http, num_shards);
-  }
-  return new MetaPeerAdminTrimCR(dpp, store, http, num_shards);
-}
index 7774e164522aba2050edadd3f6349544bf607ea5..777b2e62d653ef0fa94f749bddc17572b8e8c6c9 100644 (file)
@@ -522,13 +522,18 @@ public:
   int operate() override;
 };
 
-// MetaLogTrimCR factory function
-RGWCoroutine* create_meta_log_trim_cr(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http,
-                                      int num_shards, utime_t interval);
-
-// factory function for mdlog trim via radosgw-admin
-RGWCoroutine* create_admin_meta_log_trim_cr(const DoutPrefixProvider *dpp, RGWRados *store,
-                                            RGWHTTPManager *http,
-                                            int num_shards);
+// factory functions for meta sync coroutines needed in mdlog trimming
+
+RGWCoroutine* create_read_remote_mdlog_shard_info_cr(RGWMetaSyncEnv *env,
+                                                     const std::string& period,
+                                                     int shard_id,
+                                                     RGWMetadataLogInfo* info);
+
+RGWCoroutine* create_list_remote_mdlog_shard_cr(RGWMetaSyncEnv *env,
+                                                const std::string& period,
+                                                int shard_id,
+                                                const std::string& marker,
+                                                uint32_t max_entries,
+                                                rgw_mdlog_shard_data *result);
 
 #endif
diff --git a/src/rgw/rgw_trim_mdlog.cc b/src/rgw/rgw_trim_mdlog.cc
new file mode 100644 (file)
index 0000000..5efd4f7
--- /dev/null
@@ -0,0 +1,704 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/errno.h"
+
+#include "rgw_trim_mdlog.h"
+#include "rgw_sync.h"
+#include "rgw_cr_rados.h"
+#include "rgw_cr_rest.h"
+#include "rgw_rados.h"
+#include "rgw_zone.h"
+#include "services/svc_zone.h"
+
+#include <boost/asio/yield.hpp>
+
+#define dout_subsys ceph_subsys_rgw
+
+#undef dout_prefix
+#define dout_prefix (*_dout << "meta trim: ")
+
+/// purge all log shards for the given mdlog
+class PurgeLogShardsCR : public RGWShardCollectCR {
+  RGWRados *const store;
+  const RGWMetadataLog* mdlog;
+  const int num_shards;
+  rgw_raw_obj obj;
+  int i{0};
+
+  static constexpr int max_concurrent = 16;
+
+ public:
+  PurgeLogShardsCR(RGWRados *store, const RGWMetadataLog* mdlog,
+                   const rgw_pool& pool, int num_shards)
+    : RGWShardCollectCR(store->ctx(), max_concurrent),
+      store(store), mdlog(mdlog), num_shards(num_shards), obj(pool, "")
+  {}
+
+  bool spawn_next() override {
+    if (i == num_shards) {
+      return false;
+    }
+    mdlog->get_shard_oid(i++, obj.oid);
+    spawn(new RGWRadosRemoveCR(store, obj), false);
+    return true;
+  }
+};
+
+using Cursor = RGWPeriodHistory::Cursor;
+
+/// purge mdlogs from the oldest up to (but not including) the given realm_epoch
+class PurgePeriodLogsCR : public RGWCoroutine {
+  RGWRados *const store;
+  RGWMetadataManager *const metadata;
+  RGWObjVersionTracker objv;
+  Cursor cursor;
+  epoch_t realm_epoch;
+  epoch_t *last_trim_epoch; //< update last trim on success
+
+ public:
+  PurgePeriodLogsCR(RGWRados *store, epoch_t realm_epoch, epoch_t *last_trim)
+    : RGWCoroutine(store->ctx()), store(store), metadata(store->meta_mgr),
+      realm_epoch(realm_epoch), last_trim_epoch(last_trim)
+  {}
+
+  int operate() override;
+};
+
+int PurgePeriodLogsCR::operate()
+{
+  reenter(this) {
+    // read our current oldest log period
+    yield call(metadata->read_oldest_log_period_cr(&cursor, &objv));
+    if (retcode < 0) {
+      return set_cr_error(retcode);
+    }
+    ceph_assert(cursor);
+    ldout(cct, 20) << "oldest log realm_epoch=" << cursor.get_epoch()
+        << " period=" << cursor.get_period().get_id() << dendl;
+
+    // trim -up to- the given realm_epoch
+    while (cursor.get_epoch() < realm_epoch) {
+      ldout(cct, 4) << "purging log shards for realm_epoch=" << cursor.get_epoch()
+          << " period=" << cursor.get_period().get_id() << dendl;
+      yield {
+        const auto mdlog = metadata->get_log(cursor.get_period().get_id());
+        const auto& pool = store->svc.zone->get_zone_params().log_pool;
+        auto num_shards = cct->_conf->rgw_md_log_max_shards;
+        call(new PurgeLogShardsCR(store, mdlog, pool, num_shards));
+      }
+      if (retcode < 0) {
+        ldout(cct, 1) << "failed to remove log shards: "
+            << cpp_strerror(retcode) << dendl;
+        return set_cr_error(retcode);
+      }
+      ldout(cct, 10) << "removed log shards for realm_epoch=" << cursor.get_epoch()
+          << " period=" << cursor.get_period().get_id() << dendl;
+
+      // update our mdlog history
+      yield call(metadata->trim_log_period_cr(cursor, &objv));
+      if (retcode == -ENOENT) {
+        // must have raced to update mdlog history. return success and allow the
+        // winner to continue purging
+        ldout(cct, 10) << "already removed log shards for realm_epoch=" << cursor.get_epoch()
+            << " period=" << cursor.get_period().get_id() << dendl;
+        return set_cr_done();
+      } else if (retcode < 0) {
+        ldout(cct, 1) << "failed to remove log shards for realm_epoch="
+            << cursor.get_epoch() << " period=" << cursor.get_period().get_id()
+            << " with: " << cpp_strerror(retcode) << dendl;
+        return set_cr_error(retcode);
+      }
+
+      if (*last_trim_epoch < cursor.get_epoch()) {
+        *last_trim_epoch = cursor.get_epoch();
+      }
+
+      ceph_assert(cursor.has_next()); // get_current() should always come after
+      cursor.next();
+    }
+    return set_cr_done();
+  }
+  return 0;
+}
+
+namespace {
+
+using connection_map = std::map<std::string, std::unique_ptr<RGWRESTConn>>;
+
+/// construct a RGWRESTConn for each zone in the realm
+template <typename Zonegroups>
+connection_map make_peer_connections(RGWRados *store,
+                                     const Zonegroups& zonegroups)
+{
+  connection_map connections;
+  for (auto& g : zonegroups) {
+    for (auto& z : g.second.zones) {
+      std::unique_ptr<RGWRESTConn> conn{
+        new RGWRESTConn(store->ctx(), store->svc.zone, z.first, z.second.endpoints)};
+      connections.emplace(z.first, std::move(conn));
+    }
+  }
+  return connections;
+}
+
+/// return the marker that it's safe to trim up to
+const std::string& get_stable_marker(const rgw_meta_sync_marker& m)
+{
+  return m.state == m.FullSync ? m.next_step_marker : m.marker;
+}
+
+/// comparison operator for take_min_status()
+bool operator<(const rgw_meta_sync_marker& lhs, const rgw_meta_sync_marker& rhs)
+{
+  // sort by stable marker
+  return get_stable_marker(lhs) < get_stable_marker(rhs);
+}
+
+/// populate the status with the minimum stable marker of each shard for any
+/// peer whose realm_epoch matches the minimum realm_epoch in the input
+template <typename Iter>
+int take_min_status(CephContext *cct, Iter first, Iter last,
+                    rgw_meta_sync_status *status)
+{
+  if (first == last) {
+    return -EINVAL;
+  }
+  const size_t num_shards = cct->_conf->rgw_md_log_max_shards;
+
+  status->sync_info.realm_epoch = std::numeric_limits<epoch_t>::max();
+  for (auto p = first; p != last; ++p) {
+    // validate peer's shard count
+    if (p->sync_markers.size() != num_shards) {
+      ldout(cct, 1) << "take_min_status got peer status with "
+          << p->sync_markers.size() << " shards, expected "
+          << num_shards << dendl;
+      return -EINVAL;
+    }
+    if (p->sync_info.realm_epoch < status->sync_info.realm_epoch) {
+      // earlier epoch, take its entire status
+      *status = std::move(*p);
+    } else if (p->sync_info.realm_epoch == status->sync_info.realm_epoch) {
+      // same epoch, take any earlier markers
+      auto m = status->sync_markers.begin();
+      for (auto& shard : p->sync_markers) {
+        if (shard.second < m->second) {
+          m->second = std::move(shard.second);
+        }
+        ++m;
+      }
+    }
+  }
+  return 0;
+}
+
+struct TrimEnv {
+  const DoutPrefixProvider *dpp;
+  RGWRados *const store;
+  RGWHTTPManager *const http;
+  int num_shards;
+  const std::string& zone;
+  Cursor current; //< cursor to current period
+  epoch_t last_trim_epoch{0}; //< epoch of last mdlog that was purged
+
+  TrimEnv(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards)
+    : dpp(dpp), store(store), http(http), num_shards(num_shards),
+      zone(store->svc.zone->get_zone_params().get_id()),
+      current(store->period_history->get_current())
+  {}
+};
+
+struct MasterTrimEnv : public TrimEnv {
+  connection_map connections; //< peer connections
+  std::vector<rgw_meta_sync_status> peer_status; //< sync status for each peer
+  /// last trim marker for each shard, only applies to current period's mdlog
+  std::vector<std::string> last_trim_markers;
+
+  MasterTrimEnv(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards)
+    : TrimEnv(dpp, store, http, num_shards),
+      last_trim_markers(num_shards)
+  {
+    auto& period = current.get_period();
+    connections = make_peer_connections(store, period.get_map().zonegroups);
+    connections.erase(zone);
+    peer_status.resize(connections.size());
+  }
+};
+
+struct PeerTrimEnv : public TrimEnv {
+  /// last trim timestamp for each shard, only applies to current period's mdlog
+  std::vector<ceph::real_time> last_trim_timestamps;
+
+  PeerTrimEnv(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards)
+    : TrimEnv(dpp, store, http, num_shards),
+      last_trim_timestamps(num_shards)
+  {}
+
+  void set_num_shards(int num_shards) {
+    this->num_shards = num_shards;
+    last_trim_timestamps.resize(num_shards);
+  }
+};
+
+} // anonymous namespace
+
+
+/// spawn a trim cr for each shard that needs it, while limiting the number
+/// of concurrent shards
+class MetaMasterTrimShardCollectCR : public RGWShardCollectCR {
+ private:
+  static constexpr int MAX_CONCURRENT_SHARDS = 16;
+
+  MasterTrimEnv& env;
+  RGWMetadataLog *mdlog;
+  int shard_id{0};
+  std::string oid;
+  const rgw_meta_sync_status& sync_status;
+
+ public:
+  MetaMasterTrimShardCollectCR(MasterTrimEnv& env, RGWMetadataLog *mdlog,
+                               const rgw_meta_sync_status& sync_status)
+    : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
+      env(env), mdlog(mdlog), sync_status(sync_status)
+  {}
+
+  bool spawn_next() override;
+};
+
+bool MetaMasterTrimShardCollectCR::spawn_next()
+{
+  while (shard_id < env.num_shards) {
+    auto m = sync_status.sync_markers.find(shard_id);
+    if (m == sync_status.sync_markers.end()) {
+      shard_id++;
+      continue;
+    }
+    auto& stable = get_stable_marker(m->second);
+    auto& last_trim = env.last_trim_markers[shard_id];
+
+    if (stable <= last_trim) {
+      // already trimmed
+      ldout(cct, 20) << "skipping log shard " << shard_id
+          << " at marker=" << stable
+          << " last_trim=" << last_trim
+          << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl;
+      shard_id++;
+      continue;
+    }
+
+    mdlog->get_shard_oid(shard_id, oid);
+
+    ldout(cct, 10) << "trimming log shard " << shard_id
+        << " at marker=" << stable
+        << " last_trim=" << last_trim
+        << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl;
+    spawn(new RGWSyncLogTrimCR(env.store, oid, stable, &last_trim), false);
+    shard_id++;
+    return true;
+  }
+  return false;
+}
+
+/// spawn rest requests to read each peer's sync status
+class MetaMasterStatusCollectCR : public RGWShardCollectCR {
+  static constexpr int MAX_CONCURRENT_SHARDS = 16;
+
+  MasterTrimEnv& env;
+  connection_map::iterator c;
+  std::vector<rgw_meta_sync_status>::iterator s;
+ public:
+  explicit MetaMasterStatusCollectCR(MasterTrimEnv& env)
+    : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
+      env(env), c(env.connections.begin()), s(env.peer_status.begin())
+  {}
+
+  bool spawn_next() override {
+    if (c == env.connections.end()) {
+      return false;
+    }
+    static rgw_http_param_pair params[] = {
+      { "type", "metadata" },
+      { "status", nullptr },
+      { nullptr, nullptr }
+    };
+
+    ldout(cct, 20) << "query sync status from " << c->first << dendl;
+    auto conn = c->second.get();
+    using StatusCR = RGWReadRESTResourceCR<rgw_meta_sync_status>;
+    spawn(new StatusCR(cct, conn, env.http, "/admin/log/", params, &*s),
+          false);
+    ++c;
+    ++s;
+    return true;
+  }
+};
+
+class MetaMasterTrimCR : public RGWCoroutine {
+  MasterTrimEnv& env;
+  rgw_meta_sync_status min_status; //< minimum sync status of all peers
+  int ret{0};
+
+ public:
+  explicit MetaMasterTrimCR(MasterTrimEnv& env)
+    : RGWCoroutine(env.store->ctx()), env(env)
+  {}
+
+  int operate() override;
+};
+
+int MetaMasterTrimCR::operate()
+{
+  reenter(this) {
+    // TODO: detect this and fail before we spawn the trim thread?
+    if (env.connections.empty()) {
+      ldout(cct, 4) << "no peers, exiting" << dendl;
+      return set_cr_done();
+    }
+
+    ldout(cct, 10) << "fetching sync status for zone " << env.zone << dendl;
+    // query mdlog sync status from peers
+    yield call(new MetaMasterStatusCollectCR(env));
+
+    // must get a successful reply from all peers to consider trimming
+    if (ret < 0) {
+      ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
+      return set_cr_error(ret);
+    }
+
+    // determine the minimum epoch and markers
+    ret = take_min_status(env.store->ctx(), env.peer_status.begin(),
+                          env.peer_status.end(), &min_status);
+    if (ret < 0) {
+      ldout(cct, 4) << "failed to calculate min sync status from peers" << dendl;
+      return set_cr_error(ret);
+    }
+    yield {
+      auto store = env.store;
+      auto epoch = min_status.sync_info.realm_epoch;
+      ldout(cct, 4) << "realm epoch min=" << epoch
+          << " current=" << env.current.get_epoch()<< dendl;
+      if (epoch > env.last_trim_epoch + 1) {
+        // delete any prior mdlog periods
+        spawn(new PurgePeriodLogsCR(store, epoch, &env.last_trim_epoch), true);
+      } else {
+        ldout(cct, 10) << "mdlogs already purged up to realm_epoch "
+            << env.last_trim_epoch << dendl;
+      }
+
+      // if realm_epoch == current, trim mdlog based on markers
+      if (epoch == env.current.get_epoch()) {
+        auto mdlog = store->meta_mgr->get_log(env.current.get_period().get_id());
+        spawn(new MetaMasterTrimShardCollectCR(env, mdlog, min_status), true);
+      }
+    }
+    // ignore any errors during purge/trim because we want to hold the lock open
+    return set_cr_done();
+  }
+  return 0;
+}
+
+
+/// read the first entry of the master's mdlog shard and trim to that position
+class MetaPeerTrimShardCR : public RGWCoroutine {
+  RGWMetaSyncEnv& env;
+  RGWMetadataLog *mdlog;
+  const std::string& period_id;
+  const int shard_id;
+  RGWMetadataLogInfo info;
+  ceph::real_time stable; //< safe timestamp to trim, according to master
+  ceph::real_time *last_trim; //< last trimmed timestamp, updated on trim
+  rgw_mdlog_shard_data result; //< result from master's mdlog listing
+
+ public:
+  MetaPeerTrimShardCR(RGWMetaSyncEnv& env, RGWMetadataLog *mdlog,
+                      const std::string& period_id, int shard_id,
+                      ceph::real_time *last_trim)
+    : RGWCoroutine(env.store->ctx()), env(env), mdlog(mdlog),
+      period_id(period_id), shard_id(shard_id), last_trim(last_trim)
+  {}
+
+  int operate() override;
+};
+
+int MetaPeerTrimShardCR::operate()
+{
+  reenter(this) {
+    // query master's first mdlog entry for this shard
+    yield call(create_list_remote_mdlog_shard_cr(&env, period_id, shard_id,
+                                                 "", 1, &result));
+    if (retcode < 0) {
+      ldpp_dout(env.dpp, 5) << "failed to read first entry from master's mdlog shard "
+          << shard_id << " for period " << period_id
+          << ": " << cpp_strerror(retcode) << dendl;
+      return set_cr_error(retcode);
+    }
+    if (result.entries.empty()) {
+      // if there are no mdlog entries, we don't have a timestamp to compare. we
+      // can't just trim everything, because there could be racing updates since
+      // this empty reply. query the mdlog shard info to read its max timestamp,
+      // then retry the listing to make sure it's still empty before trimming to
+      // that
+      ldpp_dout(env.dpp, 10) << "empty master mdlog shard " << shard_id
+          << ", reading last timestamp from shard info" << dendl;
+      // read the mdlog shard info for the last timestamp
+      yield call(create_read_remote_mdlog_shard_info_cr(&env, period_id, shard_id, &info));
+      if (retcode < 0) {
+        ldpp_dout(env.dpp, 5) << "failed to read info from master's mdlog shard "
+            << shard_id << " for period " << period_id
+            << ": " << cpp_strerror(retcode) << dendl;
+        return set_cr_error(retcode);
+      }
+      if (ceph::real_clock::is_zero(info.last_update)) {
+        return set_cr_done(); // nothing to trim
+      }
+      ldpp_dout(env.dpp, 10) << "got mdlog shard info with last update="
+          << info.last_update << dendl;
+      // re-read the master's first mdlog entry to make sure it hasn't changed
+      yield call(create_list_remote_mdlog_shard_cr(&env, period_id, shard_id,
+                                                   "", 1, &result));
+      if (retcode < 0) {
+        ldpp_dout(env.dpp, 5) << "failed to read first entry from master's mdlog shard "
+            << shard_id << " for period " << period_id
+            << ": " << cpp_strerror(retcode) << dendl;
+        return set_cr_error(retcode);
+      }
+      // if the mdlog is still empty, trim to max marker
+      if (result.entries.empty()) {
+        stable = info.last_update;
+      } else {
+        stable = result.entries.front().timestamp;
+
+        // can only trim -up to- master's first timestamp, so subtract a second.
+        // (this is why we use timestamps instead of markers for the peers)
+        stable -= std::chrono::seconds(1);
+      }
+    } else {
+      stable = result.entries.front().timestamp;
+      stable -= std::chrono::seconds(1);
+    }
+
+    if (stable <= *last_trim) {
+      ldpp_dout(env.dpp, 10) << "skipping log shard " << shard_id
+          << " at timestamp=" << stable
+          << " last_trim=" << *last_trim << dendl;
+      return set_cr_done();
+    }
+
+    ldpp_dout(env.dpp, 10) << "trimming log shard " << shard_id
+        << " at timestamp=" << stable
+        << " last_trim=" << *last_trim << dendl;
+    yield {
+      std::string oid;
+      mdlog->get_shard_oid(shard_id, oid);
+      call(new RGWRadosTimelogTrimCR(env.store, oid, real_time{}, stable, "", ""));
+    }
+    if (retcode < 0 && retcode != -ENODATA) {
+      ldpp_dout(env.dpp, 1) << "failed to trim mdlog shard " << shard_id
+          << ": " << cpp_strerror(retcode) << dendl;
+      return set_cr_error(retcode);
+    }
+    *last_trim = stable;
+    return set_cr_done();
+  }
+  return 0;
+}
+
+class MetaPeerTrimShardCollectCR : public RGWShardCollectCR {
+  static constexpr int MAX_CONCURRENT_SHARDS = 16;
+
+  PeerTrimEnv& env;
+  RGWMetadataLog *mdlog;
+  const std::string& period_id;
+  RGWMetaSyncEnv meta_env; //< for RGWListRemoteMDLogShardCR
+  int shard_id{0};
+
+ public:
+  MetaPeerTrimShardCollectCR(PeerTrimEnv& env, RGWMetadataLog *mdlog)
+    : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
+      env(env), mdlog(mdlog), period_id(env.current.get_period().get_id())
+  {
+    meta_env.init(env.dpp, cct, env.store, env.store->svc.zone->get_master_conn(),
+                  env.store->get_async_rados(), env.http, nullptr,
+                  env.store->get_sync_tracer());
+  }
+
+  bool spawn_next() override;
+};
+
+bool MetaPeerTrimShardCollectCR::spawn_next()
+{
+  if (shard_id >= env.num_shards) {
+    return false;
+  }
+  auto& last_trim = env.last_trim_timestamps[shard_id];
+  spawn(new MetaPeerTrimShardCR(meta_env, mdlog, period_id, shard_id, &last_trim),
+        false);
+  shard_id++;
+  return true;
+}
+
+class MetaPeerTrimCR : public RGWCoroutine {
+  PeerTrimEnv& env;
+  rgw_mdlog_info mdlog_info; //< master's mdlog info
+
+ public:
+  explicit MetaPeerTrimCR(PeerTrimEnv& env) : RGWCoroutine(env.store->ctx()), env(env) {}
+
+  int operate() override;
+};
+
+int MetaPeerTrimCR::operate()
+{
+  reenter(this) {
+    ldout(cct, 10) << "fetching master mdlog info" << dendl;
+    yield {
+      // query mdlog_info from master for oldest_log_period
+      rgw_http_param_pair params[] = {
+        { "type", "metadata" },
+        { nullptr, nullptr }
+      };
+
+      using LogInfoCR = RGWReadRESTResourceCR<rgw_mdlog_info>;
+      call(new LogInfoCR(cct, env.store->svc.zone->get_master_conn(), env.http,
+                         "/admin/log/", params, &mdlog_info));
+    }
+    if (retcode < 0) {
+      ldout(cct, 4) << "failed to read mdlog info from master" << dendl;
+      return set_cr_error(retcode);
+    }
+    // use master's shard count instead
+    env.set_num_shards(mdlog_info.num_shards);
+
+    if (mdlog_info.realm_epoch > env.last_trim_epoch + 1) {
+      // delete any prior mdlog periods
+      yield call(new PurgePeriodLogsCR(env.store, mdlog_info.realm_epoch,
+                                       &env.last_trim_epoch));
+    } else {
+      ldout(cct, 10) << "mdlogs already purged through realm_epoch "
+          << env.last_trim_epoch << dendl;
+    }
+
+    // if realm_epoch == current, trim mdlog based on master's markers
+    if (mdlog_info.realm_epoch == env.current.get_epoch()) {
+      yield {
+        auto meta_mgr = env.store->meta_mgr;
+        auto mdlog = meta_mgr->get_log(env.current.get_period().get_id());
+        call(new MetaPeerTrimShardCollectCR(env, mdlog));
+        // ignore any errors during purge/trim because we want to hold the lock open
+      }
+    }
+    return set_cr_done();
+  }
+  return 0;
+}
+
+class MetaTrimPollCR : public RGWCoroutine {
+  RGWRados *const store;
+  const utime_t interval; //< polling interval
+  const rgw_raw_obj obj;
+  const std::string name{"meta_trim"}; //< lock name
+  const std::string cookie;
+
+ protected:
+  /// allocate the coroutine to run within the lease
+  virtual RGWCoroutine* alloc_cr() = 0;
+
+ public:
+  MetaTrimPollCR(RGWRados *store, utime_t interval)
+    : RGWCoroutine(store->ctx()), store(store), interval(interval),
+      obj(store->svc.zone->get_zone_params().log_pool, RGWMetadataLogHistory::oid),
+      cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct))
+  {}
+
+  int operate() override;
+};
+
+int MetaTrimPollCR::operate()
+{
+  reenter(this) {
+    for (;;) {
+      set_status("sleeping");
+      wait(interval);
+
+      // prevent others from trimming for our entire wait interval
+      set_status("acquiring trim lock");
+      yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
+                                          obj, name, cookie, interval.sec()));
+      if (retcode < 0) {
+        ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl;
+        continue;
+      }
+
+      set_status("trimming");
+      yield call(alloc_cr());
+
+      if (retcode < 0) {
+        // on errors, unlock so other gateways can try
+        set_status("unlocking");
+        yield call(new RGWSimpleRadosUnlockCR(store->get_async_rados(), store,
+                                              obj, name, cookie));
+      }
+    }
+  }
+  return 0;
+}
+
+class MetaMasterTrimPollCR : public MetaTrimPollCR  {
+  MasterTrimEnv env; //< trim state to share between calls
+  RGWCoroutine* alloc_cr() override {
+    return new MetaMasterTrimCR(env);
+  }
+ public:
+  MetaMasterTrimPollCR(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http,
+                       int num_shards, utime_t interval)
+    : MetaTrimPollCR(store, interval),
+      env(dpp, store, http, num_shards)
+  {}
+};
+
+class MetaPeerTrimPollCR : public MetaTrimPollCR {
+  PeerTrimEnv env; //< trim state to share between calls
+  RGWCoroutine* alloc_cr() override {
+    return new MetaPeerTrimCR(env);
+  }
+ public:
+  MetaPeerTrimPollCR(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http,
+                     int num_shards, utime_t interval)
+    : MetaTrimPollCR(store, interval),
+      env(dpp, store, http, num_shards)
+  {}
+};
+
+RGWCoroutine* create_meta_log_trim_cr(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http,
+                                      int num_shards, utime_t interval)
+{
+  if (store->svc.zone->is_meta_master()) {
+    return new MetaMasterTrimPollCR(dpp, store, http, num_shards, interval);
+  }
+  return new MetaPeerTrimPollCR(dpp, store, http, num_shards, interval);
+}
+
+
+struct MetaMasterAdminTrimCR : private MasterTrimEnv, public MetaMasterTrimCR {
+  MetaMasterAdminTrimCR(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards)
+    : MasterTrimEnv(dpp, store, http, num_shards),
+      MetaMasterTrimCR(*static_cast<MasterTrimEnv*>(this))
+  {}
+};
+
+struct MetaPeerAdminTrimCR : private PeerTrimEnv, public MetaPeerTrimCR {
+  MetaPeerAdminTrimCR(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards)
+    : PeerTrimEnv(dpp, store, http, num_shards),
+      MetaPeerTrimCR(*static_cast<PeerTrimEnv*>(this))
+  {}
+};
+
+RGWCoroutine* create_admin_meta_log_trim_cr(const DoutPrefixProvider *dpp, RGWRados *store,
+                                            RGWHTTPManager *http,
+                                            int num_shards)
+{
+  if (store->svc.zone->is_meta_master()) {
+    return new MetaMasterAdminTrimCR(dpp, store, http, num_shards);
+  }
+  return new MetaPeerAdminTrimCR(dpp, store, http, num_shards);
+}
diff --git a/src/rgw/rgw_trim_mdlog.h b/src/rgw/rgw_trim_mdlog.h
new file mode 100644 (file)
index 0000000..7946566
--- /dev/null
@@ -0,0 +1,22 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+class RGWCoroutine;
+class DoutPrefixProvider;
+class RGWRados;
+class RGWHTTPManager;
+class utime_t;
+
+// MetaLogTrimCR factory function
+RGWCoroutine* create_meta_log_trim_cr(const DoutPrefixProvider *dpp,
+                                      RGWRados *store,
+                                      RGWHTTPManager *http,
+                                      int num_shards, utime_t interval);
+
+// factory function for mdlog trim via radosgw-admin
+RGWCoroutine* create_admin_meta_log_trim_cr(const DoutPrefixProvider *dpp,
+                                            RGWRados *store,
+                                            RGWHTTPManager *http,
+                                            int num_shards);